
    9hO0              	          S SK r S SKrS SKrS SKJrJrJr  S SKJr   " S S\5      r	 " S S\
5      r\S:X  Ga=  \ R                  " S	S
9r\R                  SS\SS9  \R                  SSSS9  \R                  SSS9  \R                  SSSS9  \R                  SSS9  \R                  S\SSS9  \R                  SSSSS 9  \R                  S!\S"S#S9  \R                  S$S%S9  \R                  S&S'S(S)9  \R                  S*SS+SS,S-9  \R                  S.S/S9  \R                  S0S1S2SS3/ S49  \" \R'                  5       5      rS5SS6.r\R-                  S/S5      b$  \R/                  \R0                  " \S/   5      5        \R/                  \R-                  S2/ 5       V s/ s H  o S    R3                  S75      PM     sn 5        \R4                  " \\5        \	" \5      r\S   \l        \S"   \l        \S+   \l        \R?                  S8\R@                  " 5       -  5        \R?                  S9\-  5        \R?                  S:\S;   -  5        \RB                  RE                  \S;   \RF                  \RH                  S<9  Sr% \RL                  (       a`  \RB                  RO                  S=S>9r(\(c   \RS                  SS?9  \RU                  SS?9  MM  \RW                  \(5        \RL                  (       a  M`  \R?                  SB5        \RS                  SS?9  \%(       d<   \R8                  (       d  \RU                  SSSC9  \RB                  R_                  5         \Ra                  SE\%SF.5        \R?                  SG5        ggs  sn f ! \, a    \R?                  S@5        S\l&         N\- a&  r.\R?                  SA\" \.5      -  5        Sr% Sr.C.NSr.C.ff = f! \- a&  r.\R?                  SD\" \.5      -  5        Sr% Sr.C.NSr.C.ff = f)H    N)Consumer
KafkaErrorKafkaException)VerifiableClientc                   d   ^  \ rS rSrSrU 4S jrS rSS jrS rS r	S r
S	 rSS
 jrS rSrU =r$ )VerifiableConsumer   zf
confluent-kafka-python backed VerifiableConsumer class for use with
Kafka's kafkatests client tests.
c                    > [         [        U ]  U5        U R                  U R                  S'   [        S0 UD6U l        SU l        SU l        SU l	        SU l
        SU l        SU l        / U l        [        5       U l        g)z<
conf is a config dict passed to confluent_kafka.Consumer()
	on_commitr   FN )superr   __init__r   confr   consumerconsumed_msgsconsumed_msgs_last_reportedconsumed_msgs_at_last_commituse_auto_commituse_async_commitmax_msgs
assignmentdictassignment_dict)selfr   	__class__s     rC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\confluent_kafka/kafkatest/verifiable_consumer.pyr   VerifiableConsumer.__init__   sv     	 $06!%		+ (4(+,(,-)$ %#v    c                 D    SX4-  nU R                   R                  U5      $ )zSFind and return existing assignment based on topic and partition,
or None on miss. %s %d)r   get)r   topic	partitionskeys       r   find_assignment"VerifiableConsumer.find_assignment/   s(     %++##''--r   c                    U R                   U R                  U(       a  SOS-   ::  a  g[        U R                  5      S:X  a  gSU R                   U R                  -
  / S.nU R                   H>  nUR                  S:X  a  M  US   R                  UR                  5       5        SUl        M@     U R                  U5        U R                   U l        g)zOSend records_consumed, every 100 messages, on timeout,
or if immediate is set. r   d   Nrecords_consumed)namecount
partitionsr   r-   )r   r   lenr   
min_offsetappendto_dictsend)r   	immediatedas       r   send_records_consumed(VerifiableConsumer.send_records_consumed5   s     !A!A)QY\!]]t1$'((4+K+KK A||r! lO""199;/AL ! 			!+/+=+=(r   c                     SU-   U Vs/ s H  o3R                   UR                  S.PM     snS.nU R                  U5        gs  snf )zASend assignment update, evtype is either 'assigned' or 'revoked' partitions_)r#   r$   )r+   r-   N)r#   r$   r2   )r   evtyper-   xr4   s        r   send_assignment"VerifiableConsumer.send_assignmentM   sB    "V+R\]R\QggAKKHR\]_		! ^s   "Ac                    U R                   nU Vs/ s H#  n[        UR                  UR                  5      PM%     snU l         U H:  nU R	                  UR                  UR                  5      nUR
                  Ul        M<     U R                    Vs0 s H  oUR                  U_M     snU l        U R                  SU5        gs  snf s  snf )zRebalance on_assign callback assignedN)	r   AssignedPartitionr#   r$   r&   r/   r%   r   r<   )r   r   r-   old_assignmentpr5   bs          r   	on_assignVerifiableConsumer.on_assignS   s    LVWJq,QWWakkBJW  A$$QWWakk:A<<AL   48??C?a	?CZ4 X  Ds   *CCc                     U R                  SS9  U R                  SSS9  [        5       U l        [	        5       U l        U R                  SU5        g)zRebalance on_revoke callback Tr3   Fr3   asynchronousrevokedN)r6   	do_commitlistr   r   r   r<   )r   r   r-   s      r   	on_revokeVerifiableConsumer.on_revoke`   sH     	""T"2E:&#vY
3r   c                 &   Ub4  UR                  5       [        R                  :X  a  U R                  S5        gU R	                  SS9  S/ S.nUb  SUS'   [        U5      US	'   O
SUS'   S
US	'   U H`  nUR                  UR                  UR                  S.nUR                  b  [        UR                  5      US	'   US   R                  U5        Mb     [        U R                  5      S:X  a  U R                  SU-  5        gU R                  U5        g)zOffsets Committed callback Nz!on_commit(): no offsets to commitTrG   offsets_committed)r+   offsetsFsuccesserror )r#   r$   offsetrQ   r   zBNot sending offsets_committed: No current assignment: would be: %s)coder   
_NO_OFFSETdbgr6   strr#   r$   rU   rS   r0   r.   r   r2   )r   errr-   r4   rB   pds         r   r   VerifiableConsumer.on_commitj   s    ?sxxzZ-B-BBHH89 	""T"2( ? AiLSAgJAiLAgJA77QBww"!!''l7iL#	  t1$HHY\]]^		!r   c                    U R                   (       d&  U R                  U(       a  SOS-   U R                  :  a  gU R                  U R                  :  a  U R                  SS9  Uc  U R                  nOUnU R                  SU R                  U R                  -
  U4-  5        Sn  U R                  S5        U R                  R                  US	9nU R                  S
U-  5        U(       d  U R                  SU5         U R                  U l        g! [         a  nUR                  S   R                  5       [        R                  :X  a  U R                  S5         SnAMb  UR                  S   R                  5       [        R                  [        R                  [        R                   4;   aH  U R                  S[#        U5      U4-  5        US::  a  e US-  n[$        R&                  " S5         SnAGMR  e SnAff = f)zPCommit every 1000 messages or whenever there is a consume timeout
or immediate. r   i  NTrG   z!Committing %d messages (Async=%s)   Commit)rI   zCommit done: offsets %szNo offsets to commitzCommit failed: %s (%d retries)   )r   r   r   r6   r   rX   r   commitr   r   argsrV   r   rW   REQUEST_TIMED_OUTNOT_COORDINATOR_WAIT_COORDrY   timesleep)r   r3   rI   
async_moderetriesrQ   es          r   rK   VerifiableConsumer.do_commit   s      44YDQ""# ,,t/A/AA&&&6..J%J4$$t'H'HH 	 "--..J.G2W<=!NN41$ -1,>,>)! " 66!9>>#z'<'<<HH34VVAY^^%**F*F*4*D*D*4*@*@*B B HH=Q@QQR!|qLGJJqMs'   %AD 
G;A G6BG65G66G;c           	         UR                  5       (       a"  U R                  SUR                  5       -  SS9  gU R                  (       a^  U R                  SUR	                  5       UR                  5       UR                  5       UR                  5       UR                  5       S.5        U R                  S:  a  U R                  U R                  :  a  gU R                  UR	                  5       UR                  5       5      nUc@  U R                  SUR	                  5       UR                  5       UR                  5       4-  S	S9  U=R                  S
-  sl
        UR                  S:X  a  UR                  5       Ul        UR                  UR                  5       :  a  UR                  5       Ul        U =R                  S
-  sl
        U R                  R                  US9  U R!                  SS9  U R#                  SS9  g)z)Handle consumed message (or error event) zConsume failed: %sF)termNrecord_data)r+   r#   r$   keyvaluerU   r   z5Received message on unassigned partition %s [%d] @ %dTr`   r   )messagerG   )rS   rZ   verboser2   r#   r$   ro   rp   rU   r   r   r&   r/   
max_offsetr   store_offsetsr6   rK   )r   msgr5   s      r   msg_consumeVerifiableConsumer.msg_consume   s   99;;HH)CIIK7eHD<<II} #		$'MMO!ggi #		!$/ 0 ==A$"4"4"E   cmmo>9HHLiik3==?CJJLABHL  N 	
1<<2::<AL<<#**,&::<ALa##C#0""U"3'r   )	r   r   r   r   r   r   r   r   r   )F)FN)__name__
__module____qualname____firstlineno____doc__r   r&   r6   r<   rD   rM   r   rK   rv   __static_attributes____classcell__r   s   @r   r   r      s;    
& .>054>2?h!( !(r   r   c                   2   ^  \ rS rSrSrU 4S jrS rSrU =r$ )r@      z.Local state container for assigned partition. c                    > [         [        U ]  5         Xl        X l        SU R                  U R                  4-  U l        SU l        SU l        SU l        g )Nr!   r   r   )	r   r@   r   r#   r$   r%   r   r/   rs   )r   r#   r$   r   s      r   r   AssignedPartition.__init__   sJ    /1
"tzz4>>::	r   c                 `    U R                   U R                  U R                  U R                  S.$ )z(Return a dict of this partition's state )r#   r$   	minOffset	maxOffset)r#   r$   r/   rs   )r   s    r   r1   AssignedPartition.to_dict   s(    $..!__4??L 	Lr   )r   rs   r/   r$   r%   r#   )	rx   ry   rz   r{   r|   r   r1   r}   r~   r   s   @r   r@   r@      s    9L Lr   r@   __main__zVerifiable Python Consumer)descriptionz--topicr0   T)actiontyperequiredz
--group-idzconf_group.id)destr   z--group-instance-idzconf_group.instance.id)r   z--broker-listzconf_bootstrap.serversz--bootstrap-serverz--session-timeoutzconf_session.timeout.msip  )r   r   defaultz--enable-autocommit
store_truezconf_enable.auto.commitF)r   r   r   z--max-messagesmax_messagesr   z--assignment-strategyz"conf_partition.assignment.strategyz--reset-policyztopicconf_auto.offset.resetearliest)r   r   z	--verboserr   zPer-message stats)r   r   r   helpz--consumer.configconsumer_configz-Xr`   
extra_confzConfiguration property)nargsr   r   r   r   z0.9.0)zbroker.version.fallbackzenable.auto.offset.store=zPid %dzUsing config: %szSubscribing to %sr#   )rD   rM   g      ?)timeoutrG   KeyboardInterruptzTerminating on exception: %szClosing consumerrH   z$Ignoring exception while closing: %sshutdown_complete)r+   failedzAll done)1argparseosrf   confluent_kafkar   r   r   verifiable_clientr   r   objectr@   rx   ArgumentParserparseradd_argumentrY   intvars
parse_argsrb   r   r"   updateread_config_filesplit
set_configvcr   r   rr   rX   getpidr   	subscriberD   rM   r   runpollru   r6   rK   rv   r   	Exceptionrj   closer2   )r;   s   0r   <module>r      s  $  	  @ @ .E() E(PL L$ z$$1MNF
	(tL
?TJ
-4LM
.FQUV
,3KL
+#<U_cd
-lIblqr
(sQST
/6Z[
(/LV`a
LyRW^qr
+2CD
ALPhrtu!!#$D'. ).	/D xx!4(4$55d;L6MNOKK$((<*DE*DQ1C*DEFd+	D	!B78B~&BKiBJFF8biik!"FF$%FFg./KK$w-$&LLBLL  J Fff++""3"/C{ ((4(8 t, NN3 fff. FFt,	%%t%@KK
 GG(F;<FF:g 6 FB  
"# 
-A67  	FF9CFBCF	s=   M8=A/M= ;O =OO"OOO7O22O7