o
    4hp                      @   s   d dl m Z mZ d dlmZmZ d dlZd dlmZmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d	d
 dd
 dd
 dd
 dd
 dd
 dZe Zdd ZdS )    )datetime	timedelta)	json_utilObjectIdN)PointPolygon)AIOKafkaConsumer)database)connected_clients)notify_nearby_subscribersc                 C   s   | |kS N abr   r   ;/var/www/html/moveengine/app/v1/consumers/kafka_consumer.py<lambda>       r   c                 C   s   | |kS r   r   r   r   r   r   r      r   c                 C   s   | |k S r   r   r   r   r   r   r      r   c                 C   s   | |kS r   r   r   r   r   r   r      r   c                 C   s   | |kS r   r   r   r   r   r   r      r   c                 C   s   | |kS r   r   r   r   r   r   r      r   )z==><z>=z<=z!=c            .         s  t dddddd} |  I d H  t }t|}|d }|d }|d	 }|d
 }|d }|d }|d }	|d }
zwtd | 2 zS3 d H W }td zt|j	d}td| W n t
ys } ztd| W Y d }~qAd }~ww t |d< d|d< |d}|di dg }|| g }|jd|idgd}|r|d }t }|| }|tddkr|dd| d |d!}|| || |d"d| d#|jd$  d%|d!}|| || t|d&|i}|D ]}|d'}|d(}|d)t|i}|r	|s
qt|}t|d* d d+ }||}|d,d-kr|s|r4|d)t|ind }d} |r|d.r|d. D ]P}!|!d/}"|!d0}#|!d1}$|#tvr_d2}  n7||"}%z$t|$}&|%d urrt|%nd }'|'d u st|# |'|&sd2} W  nW qE t
y   d2} Y  nw | r|r|d3nd| d4}(|d5|d6 |(|d d7}|| || q|d)t|i})||d8|d9|d:|d;d<t d=}*|)r|)d>}+|)d?},|+d@kr|	d)t|,idA|*i n|+dkr|
d)t|,idA|*i t|I d H  ntdB| dC |	dD|idA|*i |
dD|idA|*i t D ]\}-z<|- tj!|t"j#dEI d H  |D ](}|- tj!dF|dGdH|d|dI|dJ|dKdLt"j#dEI d H  qKW q8 t
y } zt$|- tdM| W Y d }~q8d }~ww qA6 W | % I d H  zt| W d S  t&y   Y d S w | % I d H  zt| W w  t&y   Y w w )NNvehicle_trackingz127.0.0.1:9092ztracking-groupearliestT)bootstrap_serversgroup_idauto_offset_resetenable_auto_committracking_logs
alert_logs	geofencesgeofence_rulesgeofence_rule_mappingiot_devicesfleets	workforceu   ✅ Kafka consumer startedu    📩 Message received from Kafkazutf-8u   🔹 Kafka payload:u   ❌ Error decoding message:received_atW
alert_type	device_idlocationcoordinates)r$   )sort   )minutes
inactivityzDevice z inactive for over 30 minutes.)r'   typemessagetriggered_atrecoveryz is back online after <   z minutes of inactivity.assigned_entity_idgeofence_idgeofence_rule_id_idlocr   trigger_eventsexit
conditions	parameteroperatorvalueFalert_messagez exited geofence.zgeofence-exitname)r'   r/   geofencer0   r1   speedheadingfuel	timestamp)r)   rB   rC   rD   rE   )last_locationlast_updatedassociated_entity_typeassociated_entity_idvehiclez$setu   ⚠️ Device z# not found. Fallback update applieddevices)defaultalertr0    r/   rA   r1   )r/   r0   r'   r&   rA   r1   u   ❌ WebSocket error:)'r   startr	   get_mongo_dbnextprintjsonloadsr>   decode	Exceptionr   utcnowget
insert_onefind_oner   appendsecondslistfindr   r   r   contains	OPERATORSfloat
update_oner   update_manyr
   copy	send_textdumpsr   rL   removestopStopIteration).consumer	mongo_genmongor   r   r   r   geofence_mappingsrK   r"   r#   msgdataer'   r)   alerts
last_entry	last_timecurrent_timegaprM   online_alertmappingsmappinggeo_idrule_idrA   pointpolygoninsideruleconditions_passedcondparamopexpectedactualexpected_val
actual_val	alert_msgdevicelast_location_updateassociated_typeassociated_idwsr   r   r   consume_tracking_data   s@  






















	
  r   )r   r   bsonr   r   rS   shapely.geometryr   r   aiokafkar   app.dbr	   app.utils.connectionsr
   app.v1.sockets.tracking_datar   r`   setr   r   r   r   r   <module>   s"    	