
    G:h                     B    S SK r S SKJr  S SKJr  S SKJr  S SKJr  S rg)    N)datetime)	json_util)AIOKafkaConsumer)connected_clientsc            	        #    [        SSSS9n U R                  5       I S h  vN    U   S h  vN n[        R                  " UR                  R                  S5      5      n[        SU5        [        S[        [        5      S5        [        R                  " 5        H>  n UR                  [        R                  " U[        R                  S	95      I S h  vN   M@     M   N N N! [         a/  n[        R                  " U5        [        S
U 35         S nAM~  S nAff = f
 U R!                  5       I S h  vN    g ! U R!                  5       I S h  vN    f = f7f)Nvehicle_trackingzlocalhost:9092ztracking-group)bootstrap_serversgroup_idzutf-8u   📦 Received from Kafka:zBroadcasting toclients)defaultu   ❌ Error sending to client: )r   startjsonloadsvaluedecodeprintlenr   copy	send_textdumpsr   r   	Exceptionremovestop)consumermsgdatawses        MC:\Suresh\moveshuttle\MDcreated\moveengine\app\v1\consumers\kafka_consumer.pyconsume_tracking_datar    	   s    *!H
 ..
! 
	?#::cii..w78D-t4#S):%;YG',,.?,,tzz$	@Q@Q'RSSS / 
	? T  ?%,,R09!=>>? " mmohmmos   EC ED= D"C"D"A.D=  5C&C$C&D=  E"D"$C&&
D0$DD= DD= #E6D97E=EEEE)	r   r   bsonr   aiokafkar   app.utils.connectionsr   r         r   <module>r&      s       % 3r%   