
    9h                        S SK r S SKrS SKrS SKrS SKJr  S SKJrJ	r	  S SKJ
r
JrJrJrJrJrJrJrJrJrJrJrJrJrJrJrJr  S SKJr  S SKJrJrJ r J!r!J"r"  S SK#J$r$  S SK%J&r&  \RN                  " \(5      r)Sr* " S	 S
5      r+ " S S5      r, " S S\,5      r- " S S\,5      r. " S S\,5      r/ " S S\,5      r0 " S S\,5      r1 " S S\,5      r2g)    N)ConnectionGroupCoordinationType)ConcurrentTransactionsCoordinatorLoadInProgressErrorCoordinatorNotAvailableErrorDuplicateSequenceNumberGroupAuthorizationFailedErrorInvalidProducerEpochInvalidProducerIdMappingInvalidTxnState
KafkaErrorNotCoordinatorErrorOperationNotAttemptedOutOfOrderSequenceNumberProducerFencedRequestTimedOutErrorTopicAuthorizationFailedError"TransactionalIdAuthorizationFailedUnknownTopicOrPartitionError)ProduceRequest)AddOffsetsToTxnRequestAddPartitionsToTxnRequestEndTxnRequestInitProducerIdRequestTxnOffsetCommitRequest)TopicPartition)create_taskg{Gz?c                       \ rS rSrSrS rS rS r\S 5       r	S r
S rS	 rS
 rS rS rS rS rS rS rS rS rSrg)Sender+   zBackground processing abstraction for Producer. By all means just
separates batch delivery and transaction management from the main Producer
code
c                    Xl         X0l        X l        X@l        S U l        [        5       U l        [        5       U l        0 U l        US-  U l	        Xpl
        US-  U l        g )Ni  )client_txn_manager_acks_message_accumulator_sender_taskset
_in_flight_muted_partitions_coordinators_retry_backoff_request_timeout_ms_linger_time)selfr"   ackstxn_managermessage_accumulatorretry_backoff_ms	linger_msrequest_timeout_mss           ]C:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/producer/sender.py__init__Sender.__init__1   s]     '
$7! %!$.5#5 %,    c                    #    U R                  5       I S h  vN   [        U R                  5       5      U l        U R                  R	                  U R
                  5        g  NH7fN)_maybe_wait_for_pidr   _sender_routiner&   add_done_callback	_fail_allr.   s    r5   startSender.startI   sJ     &&((('(<(<(>?++DNN; 	)s   A!AA	A!c                     UR                  5       (       a  gUR                  5       nUbE  U R                  R                  U5        U R                  b  U R                  R                  U5        ggg)zuCalled when sender fails. Will fail all pending batches, as they
will never be delivered as well as fail transaction
N)	cancelled	exceptionr%   fail_allr#   fatal_error)r.   tasktask_exceptions      r5   r>   Sender._fail_allO   sd     >>)%%%..~>  ,!!--n= - &r8   c                     U R                   $ r:   )r&   r?   s    r5   sender_taskSender.sender_task\   s       r8   c                    #    U R                   bO  U R                   R                  5       (       d/  U R                   R                  5         U R                   I S h  vN   g g g  N7fr:   )r&   donecancelr?   s    r5   closeSender.close`   sR     (1B1B1G1G1I1I$$&#### 2J(#s   AA"A A"c                 8  #    [        5       nSn  U R                  5       I Sh  vN   [        5       nU R                  nU R                  nUb{  UR                  bn  Ub  UR                  5       (       aD  U R                  5       nUb  UR                  U5        OUR                  UR                  5       5        XTR                  5       -  nU R                  R                  U R                  US9u  nnUR                  5        Ho  u  p[        U R                  X5      5      n
U R                  R                  U5        U	 H  nU R                  R                  U5        M      UR                  U
5        Mq     U(       a/  U R                   R#                  5       nX1R%                  U/5      -  nO.U R                  R'                  5       nX1R%                  U/5      -  n[(        R*                  " U[(        R,                  S9I Sh  vN u  pU H  n
U
R/                  5         M     X-  nGM   GN  N)! [(        R0                   a    U H  n
U
I Sh  vN    M      g[2        [4        [6        4 a    e [8         a&  n[:        R=                  S5        [?        S5      UeSnAff = f7f)a  Background task, that sends pending batches to leader nodes for
batch's partition. This incapsulates same logic as Java's `Sender`
background thread. Because we use asyncio this is more event based
loop, rather than counting timeout till next possible even like in
Java.
N)ignore_nodesmuted_partitions)return_whenz"Unexpected error in sender routinez&Unexpected error during batch delivery) r'   r;   r#   r)   transactional_idrN   _maybe_do_transactional_requestaddmake_task_waiterpartitions_to_addr%   drain_by_nodesr(   itemsr   _send_produce_reqr"   force_metadata_updateuniondata_waiterasynciowaitFIRST_COMPLETEDresultCancelledErrorr   r   r   	ExceptionlogrD   r   )r.   taskstxn_taskwaitersr0   rT   batchesunknown_leaders_existnode_idnode_batchesrG   tpfutrN   _excs                   r5   r<   Sender._sender_routinee   s`     Q	P ..000% #//#'#9#9 *{/K/K/W'8==??#'#G#G#I#/!IIh/ $KK(D(D(FG
 )+H+H+JJ % --<<!%%5 = ) .5]]_)G&t'='=g'TUDOO''0*..2226 +IIdO .= ) ++;;=C{{C511G33??AC{{C511G !( ' 7 7!  !DKKM ! E  1j %% 	

  $.
 	
  	PMM>?EFCO	PsW   JH) H$GH) =H'>'H) 'H) )JI
	JJJ1!JJJc                   #    U R                   b  U R                   R                  5       (       a  g  U R                   R                  b=  U R                  [        R
                  U R                   R                  5      I S h  vN nOU R                  R                  5       nU R                  U5      I S h  vN nU(       d#  U R                  R                  5       I S h  vN   Og M   Ne N3 N7fr:   )
r#   has_pidrV   _find_coordinatorr   TRANSACTIONr"   get_random_node_do_init_pidr^   )r.   rm   successs      r5   r;   Sender._maybe_wait_for_pid   s     $(9(9(A(A(C(C   11= $ 6 6$00$2C2C2T2T!  ++557 --g66Gkk77999 
 79s6   A=C+?C% 3C+3C'4(C+C)	C+'C+)C+c                 <    U R                   R                  US 5        g r:   )r*   pop)r.   coordinator_types     r5   _coordinator_deadSender._coordinator_dead   s    /6r8   c                 `  #    U R                   c   eXR                  ;   a  U R                  U   $   U R                  R                  X5      I S h  vN nU R                  R#                  U[$        R&                  S9I S h  vN nU(       d*  [        R                  " U R                  5      I S h  vN   M  X0R                  U'   U[(        R*                  :X  a  [        R-                  SUU5        U$ [        R-                  SUU5        U$  N! [        R
                   a1  n[        R
                  " U R                   R                  5      nXTeS nAf[        R                   a  n[        R                  " U5      nXTeS nAf[        R                   aQ    U R                  R                  5       I S h  vN    [        R                  " U R                  5      I S h  vN     GM  [        R                   a0  n[        R                  SU5        [        [!        U5      5      UeS nAff = f GN GN|7f)Nz"FindCoordinator Request failed: %sgroupz)Discovered coordinator %s for group id %sz1Discovered coordinator %s for transactional id %s)r#   r*   r"   coordinator_lookupErrorsr   rV   r	   r   r^   ra   sleepr+   r   rg   errorreprreadyr   COORDINATIONr   GROUPinfo)r.   r~   coordinator_keycoordinator_iderrnew_errr   s          r5   rv   Sender._find_coordinator   s      ,,,111%%&6775'+{{'E'E$( "* ++++o&B&B ,  E mmD$7$78883A/0#3#9#99?"# "! G"#
 "!U" << ' CC%%66 &77 ' >>O&66 kk77999mmD$7$7888$$ 5		>D c+45 9s   /H.D D
D +H. H(.H./H+0AH.
D H% ,EH%#E;;4H%/F20(H%GH%H."H%5+H  H%%H.+H.c                 V   #    [        U 5      nUR                  U5      I S h  vN $  N7fr:   )InitPIDHandlerdo)r.   rm   handlers      r5   ry   Sender._do_init_pid
  s$      &ZZ((((s    )')c                   #    [         R                  " 5       n[        X5      nUR                  U5      I Sh  vN   U R                  [         R                  " 5       U-
  -
  nUS:  a  [
        R                  " U5      I Sh  vN   U R                  R                  U5        U H  nU R                  R                  U5        M      g N NF7f)a_  Create produce request to node
If producer configured with `retries`>0 and produce response contain
"failed" partitions produce request for this partition will try
resend to broker `retries` times with `retry_timeout_ms` timeouts.

Arguments:
    node_id (int): kafka broker identifier
    batches (dict): dictionary of {TopicPartition: MessageBatch}
Nr   )
time	monotonicSendProduceReqHandlerr   r-   ra   r   r(   remover)   )r.   rm   rk   t0r   
sleep_timero   s          r5   r]   Sender._send_produce_req  s      ^^'6jj!!! &&$..*:R*?@
>--
+++w'B""))"-  	" ,s#   5C	CAC	 CAC	C	c                    U R                   nUR                  5       nU(       a  [        U R                  U5      5      $ UR	                  5       nUb  [        U R                  U5      5      $ UR                  5       nUb  Uu  pS[        U R                  XS5      5      $ UR                  5       nUb  [        U R                  U5      5      $ g r:   )
r#   rZ   r   _do_add_partitions_to_txnconsumer_group_to_add_do_add_offsets_to_txnoffsets_to_commit_do_txn_offset_commitneeds_transaction_commit_do_txn_commit)r.   r0   tpsgroup_idcommit_dataoffsetscommit_results          r5   rW   &Sender._maybe_do_transactional_request/  s    '' ++-t==cBCC 446t::8DEE "335" +Gt99'LMM#<<>$t22=ABBr8   c                    #    U R                  [        R                  U R                  R                  5      I S h  vN n[        X5      nUR                  U5      I S h  vN $  N( N7fr:   )rv   r   rw   r#   rV   AddPartitionsToTxnHandlerr   )r.   r   rm   r   s       r5   r    Sender._do_add_partitions_to_txnI  sY     ..(($*;*;*L*L
 
 ,D6ZZ(((	
 )!   8A'A##A'A%A'%A'c                    #    U R                  [        R                  U R                  R                  5      I S h  vN n[        X5      nUR                  U5      I S h  vN $  N( N7fr:   )rv   r   rw   r#   rV   AddOffsetsToTxnHandlerr   )r.   r   rm   r   s       r5   r   Sender._do_add_offsets_to_txnQ  sY     ..(($*;*;*L*L
 
 )8ZZ(((	
 )r   c                 T  #    U(       d  g  U R                  [        R                  U5      I S h  vN n[        R                  SUUU5        [        XU5      nUR                  U5      I S h  vN   g  NB! [         a%  nU R                  R                  U5         S nAg S nAff = f N97f)Nz8Sending offset-commit request with %s for group %s to %s)
rv   r   r   r	   r#   error_transactionrg   debugTxnOffsetCommitHandlerr   )r.   r   r   rm   rr   r   s         r5   r   Sender._do_txn_offset_commitY  s     	 223C3I3I8TTG 			F		
 )Ajj!!! U, 	//4	 	"sI   	B(#A4 A2A4 8B(,B&-B(2A4 4
B#>BB(B##B(c                 l  #    U R                   R                  5       I Sh  vN   U R                  nUR                  5       (       a  UR	                  5         gU R                  [        R                  UR                  5      I Sh  vN n[        X5      nUR                  U5      I Sh  vN   g N N+ N	7f)zCommitting transaction should be done with care.
    Transactional requests will be blocked by this coroutine, so no new
offsets or new partitions will be added.
    Produce requests will be stopped, as accumulator will not be
yielding any new batches.
N)r%   flush_for_commitr#   is_empty_transactioncomplete_transactionrv   r   rw   rV   EndTxnHandlerr   )r.   r   r0   rm   r   s        r5   r   Sender._do_txn_commitl  s      ''88:::'' ++--,,. ..((+*F*F
 
  4jj!!! 	;

 	"s4   B4B.A#B4B0#B4(B2)B40B42B4)r$   r*   r(   r-   r%   r)   r,   r+   r&   r#   r"   N)__name__
__module____qualname____firstlineno____doc__r6   r@   r>   propertyrK   rP   r<   r;   r   rv   ry   r]   rW   r   r   r   r   __static_attributes__ r8   r5   r   r   +   sj    
-0<> ! !$
[Pz&70"d).:4))"&"r8   r   c                   D    \ rS rSr\R
                  rS rS rS r	S r
Srg)BaseHandleri  c                 2    Xl         UR                  U l        g r:   )_senderr+   _default_backoff)r.   senders     r5   r6   BaseHandler.__init__  s     & 5 5r8   c                   #    U R                  5       n U R                  R                  R                  XU R                  S9I S h  vN nU R                  U5      nUb  [        R                  " U5      I S h  vN   gg N8! [
         aT  n[        R                  SUR                  U5        [        R                  " U R                  5      I S h  vN     S nAgS nAff = f Ni7f)Nr   zCould not send %r: %rFT)create_requestr   r"   sendr   r   rg   warning	__class__ra   r   r   handle_response)r.   rm   reqrespr   retry_backoffs         r5   r   BaseHandler.do  s     !!#	,,11'djj1QQD ,,T2$--... R 	KK/D-- 5 5666	 /sY   C#1B  A>B  
-C#7C!8C#>B   
C
ACCCC#CC#c                     [         er:   NotImplementedErrorr?   s    r5   r   BaseHandler.create_request      !!r8   c                     [         er:   r   )r.   responses     r5   r   BaseHandler.handle_response  r   r8   )r   r   N)r   r   r   r   r   DEFAULTr   r6   r   r   r   r   r   r8   r5   r   r     s!    ##E6 ""r8   r   c                        \ rS rSrS rS rSrg)r   i  c                 r    U R                   R                  n[        S   " UR                  UR                  S9$ )Nr   )rV   transaction_timeout_ms)r   r#   r   rV   r   )r.   r0   s     r5   r   InitPIDHandler.create_request  s5    ll//$Q'(99#.#E#E
 	
r8   c                    U R                   R                  n[        R                  " UR                  5      nU[        R
                  L a  [        R                  SUR                  UR                  U R                   R                  R                  5        U R                   R                  R                  UR                  UR                  5        g U[        L d	  U[        L a5  U R                   R                  [         R"                  5        U R.                  $ U[$        L d	  U[&        L a   U R.                  $ U[(        L a  U" UR*                  5      e[        R-                  SU5        U" 5       e)Nz2Successfully found PID=%s EPOCH=%s for Producer %sz1Unexpected error during InitProducerIdRequest: %s)r   r#   r   for_code
error_codeNoErrorrg   r   producer_idproducer_epochr"   
_client_idset_pid_and_epochr   r   r   r   rw   r   r   r   rV   r   r   r.   r   r0   
error_types       r5   r   InitPIDHandler.handle_response  s$   ll//__T__5
'IID  ####..	 LL%%77  $"5"5 6600LL**+;+G+GH $$$ 8833 $$$ ==[99::III:V,r8   r   N)r   r   r   r   r   r   r   r   r8   r5   r   r     s    
%r8   r   c                   L   ^  \ rS rSr\R
                  rU 4S jrS rS r	Sr
U =r$ )r   i  c                 0   > [         TU ]  U5        X l        g r:   )superr6   _tps)r.   r   topic_partitionsr   s      r5   r6   "AddPartitionsToTxnHandler.__init__  s     $	r8   c           	      d   U R                   R                  n[        R                  " [        5      nU R
                   H*  nX#R                     R                  UR                  5        M,     [        S   " UR                  UR                  UR                  [	        UR                  5       5      S9nU$ )Nr   )rV   r   r   topics)r   r#   collectionsdefaultdictlistr   topicappend	partitionr   rV   r   r   r\   )r.   r0   partition_dataro   r   s        r5   r   (AddPartitionsToTxnHandler.create_request  s    ll//$006))B88$++BLL9  (*(99#//&55,,./	
 
r8   c                    U R                   R                  n[        5       nUR                   GH  u  pEU GH  u  pg[	        XF5      n[
        R                  " U5      n	U	[
        R                  L a)  [        R                  SU5        UR                  U5        Mc  U	[        L d	  U	[        L a9  U R                   R                  [        R                  5        U R                   s  s  $ U	["        L a+  UR$                  (       d
  [&        s  s  $ U R                   s  s  $ U	[(        L d	  U	[*        L a  U R                   s  s  $ U	[,        L a
  [/        5       eU	[0        L d	  U	[2        L a  U	" 5       eU	[4        L a  UR7                  U5        GMM  U	[8        L a  GMY  U	[:        L a  U	" UR<                  5      e[        R?                  SUU	5        U	" 5       e   GM     U(       a  URA                  [5        U5      5        g )Nz!Added partition %s to transactionz6Could not add partition %s due to unexpected error: %s)!r   r#   r'   errorsr   r   r   r   rg   r   partition_addedr   r   r   r   rw   r   r   txn_partitionsBACKOFF_OVERRIDEr   r   r
   r   r   r   r   rX   r   r   rV   r   r   )
r.   r   r0   unauthorized_topicsr   
partitionsr   r   ro   r   s
             r5   r   )AddPartitionsToTxnHandler.handle_response  s   ll//!e!%E)3%	#E5#__Z8
/IIA2F//3">>!%88LL223C3O3OP000#99
 '55//#444"@@!%AA000#77(**"::!_4$,&#@@'++E2#88#EE$[%A%ABBIIP!"
 %,&] *4 "-` ))-.AB r8   )r   r   r   r   r   r   r   r   r6   r   r   r   __classcell__r   s   @r5   r   r     s"    ((E%8 8r8   r   c                   L   ^  \ rS rSr\R
                  rU 4S jrS rS r	Sr
U =r$ )r   i   c                 0   > [         TU ]  U5        X l        g r:   )r   r6   	_group_id)r.   r   r   r   s      r5   r6   AddOffsetsToTxnHandler.__init__#  s     !r8   c                     U R                   R                  n[        S   " UR                  UR                  UR
                  U R                  S9nU$ )Nr   )rV   r   r   r   )r   r#   r   rV   r   r   r  r.   r0   r   s      r5   r   %AddOffsetsToTxnHandler.create_request'  sI    ll//$Q'(99#//&55^^	
 
r8   c                    U R                   R                  nU R                  n[        R                  " UR
                  5      nU[        R                  L a(  [        R                  SU5        UR                  U5        g U[        L d	  U[        L a5  U R                   R                  [        R                  5        U R2                  $ U[        L d	  U[         L a   U R2                  $ U["        L a
  [%        5       eU[&        L a  U" 5       eU[(        L a  U" UR*                  5      eU[,        L a"  UR/                  U" U R                  5      5        g [        R1                  SU5        U" 5       e)Nz3Successfully added consumer group %s to transactionz8Could not add consumer group due to unexpected error: %s)r   r#   r  r   r   r   r   rg   r   consumer_group_addedr   r   r   r   rw   r   r   r
   r   r   r   rV   r	   r   r   r   )r.   r   r0   r   r   s        r5   r   &AddOffsetsToTxnHandler.handle_response2  s4   ll//>>__T__5
'IIKXV,,X66600LL**+;+G+GH, $$$) 8833   $$$ // ""?*,==[99::88))*T^^*DEIIJJ ,r8   )r  r  r
  s   @r5   r   r      s"    ((E"	#% #%r8   r   c                   L   ^  \ rS rSr\R
                  rU 4S jrS rS r	Sr
U =r$ )r   iX  c                 <   > [         TU ]  U5        X l        X0l        g r:   )r   r6   _offsetsr  )r.   r   r   r   r   s       r5   r6   TxnOffsetCommitHandler.__init__[  s     !r8   c           
         U R                   R                  n[        R                  " [        5      n[        U R                  R                  5       5       HC  u  p4X#R                     R                  UR                  UR                  UR                  45        ME     [        S   " UR                  U R                  UR                   UR"                  [	        UR                  5       5      S9nU$ )Nr   )rV   r   r   r   r   )r   r#   r   r   r   sortedr  r\   r   r   r   offsetmetadatar   rV   r  r   r   )r.   r0   offset_dataro   r  r   s         r5   r   %TxnOffsetCommitHandler.create_request`  s    ll//!--d3 !4!4!67JB!((v}}foo> 8
 %Q'(99^^#//&55))+,
 
r8   c                 ^   U R                   R                  nU R                  nUR                   GHz  u  pEU GHm  u  pg[	        XF5      n[
        R                  " U5      n	U	[
        R                  L aE  U R                  U   R                  n
[        R                  SU
UU5        UR                  XU5        M  U	[        L d  U	[        L d	  U	[        L a9  U R                   R!                  ["        R$                  5        U R&                  s  s  $ U	[(        L d	  U	[*        L a  U R&                  s  s  $ U	[,        L a
  [/        5       eU	[0        L a  U	" UR2                  5      eU	[4        L a&  U	" U R                  5      nUR7                  U5            g [        R9                  SUU	5        U	" 5       e   GM}     g )Nz0Offset %s for partition %s committed to group %szDCould not commit offset for partition %s due to unexpected error: %s)r   r#   r  r  r   r   r   r   r  r  rg   r   offset_committedr   r   r   r   r   r   r   r   r   r
   r   r   rV   r	   r   r   )r.   r   r0   r   r   r  r   r   ro   r   r  rr   s               r5   r   &TxnOffsetCommitHandler.handle_responser  sv   ll//>>!%E)3%	#E5#__Z8
/!]]2.55FIIJ 	  00XF">>!%88!%99LL223C3I3IJ000"@@!%AA  000#77(**#EE$[%A%ABB#@@$T^^4C11#6II/!"	 %,&U *4 "-Z r8   )r  r  r  r
  s   @r5   r   r   X  s"    ((E"
$1 1r8   r   c                   L   ^  \ rS rSr\R
                  rU 4S jrS rS r	Sr
U =r$ )r   i  c                 0   > [         TU ]  U5        X l        g r:   )r   r6   _commit_result)r.   r   r   r   s      r5   r6   EndTxnHandler.__init__  s     +r8   c                     U R                   R                  n[        S   " UR                  UR                  UR
                  U R                  S9nU$ )Nr   )rV   r   r   transaction_result)r   r#   r   rV   r   r   r$  r  s      r5   r   EndTxnHandler.create_request  sK    ll//A(99#//&55#22	
 
r8   c                    U R                   R                  n[        R                  " UR                  5      nU[        R
                  L a  UR                  5         g U[        L d	  U[        L a5  U R                   R                  [        R                  5        U R&                  $ U[        L d	  U[        L a   U R&                  $ U[        L a
  [        5       eU[         L a  U" 5       e["        R%                  SU5        U" 5       e)Nz5Could not end transaction due to unexpected error: %s)r   r#   r   r   r   r   r   r   r   r   r   rw   r   r   r
   r   r   rg   r   r   r   s       r5   r   EndTxnHandler.handle_response  s    ll//__T__5
',,.6600LL**+;+G+GH" $$$ 8833  $$$ // ""?*,IIG ,r8   )r$  r  r
  s   @r5   r   r     s"    ((E,% %r8   r   c                   @   ^  \ rS rSrU 4S jrS rS rS rS rSr	U =r
$ )r   i  c                 `   > [         TU ]  U5        X l        UR                  U l        / U l        g r:   )r   r6   _batchesr"   _client_to_reenqueue)r.   r   rk   r   s      r5   r6   SendProduceReqHandler.__init__  s(     }}r8   c           	         [         R                  " [        5      nU R                  R	                  5        H<  u  p#XR
                     R                  UR                  UR                  5       45        M>     U R                  R                  S:  a  SnOU R                  R                  S:  a  SnOU R                  R                  S:  a  SnOvU R                  R                  S:  a  SnOYU R                  R                  S	:  a  S
nO<U R                  R                  S:  a  SnOU R                  R                  S:X  a  SnOSn0 nUS
:  a@  U R                  R                  b$  U R                  R                  R                  US'   OS US'   [        U   " SU R                  R                  U R                  R                   [        UR	                  5       5      S.UD6nU$ )N)         )r2  r      )r3  r3     )r3  r      )r         )r   
   r2  )r   	   r3  r   rV   )required_ackstimeoutr   r   )r   r   r   r-  r\   r   r   r   get_data_bufferr.  api_versionr   r#   rV   r   r$   r,   )r.   r   ro   batchversionkwargsrequests          r5   r   $SendProduceReqHandler.create_request  s   ((.,,.IB88##u4467 /
 <<##v-G\\%%/G\\%%/G\\%%/G\\%%0G\\%%0G\\%%/GGa<||((4-1\\-F-F-W-W)*-1)* ) 
,,,,LL44'
 	
 r8   c                   #    U R                  5       n U R                  R                  X5      I S h  vN nUR                  S:X  a2  U R                  R                  5        H  nUR                  5         M     OU R                  U5         U R                  (       a  ["        R$                  " U R&                  5      I S h  vN   U R                   H(  nU R(                  R*                  R-                  U5        M*     U R                  R/                  5       I S h  vN   g g  N! [         a  n[        R                  SU5        [        USS5      (       a  U R                  R                  5         U R                  R                  5        HE  nU R                  XT5      (       d  UR                  US9  M*  U R                  R!                  U5        MG      S nAGNLS nAff = f GN  N7f)Nr   zGot error produce response: %sinvalid_metadataFrD   )r   r.  r   r<  r-  values
done_noackr   r   rg   r   getattrr^   
_can_retryfailurer/  r   ra   r   r   r   r%   	reenqueue_maybe_wait_metadata)r.   rm   rC  r   r@  r   s         r5   r   SendProduceReqHandler.do  so    %%'	/!\\..w@@H $$)!]]113E$$& 4 $$X.-- 5 5666++11;;EB , ,,33555 ' A 		5KK8#>s.66224--/s22MMCM0&&--e4	 0		5( 7 6s^   G$D" D D" BG$?G AG$G"G$ D" "
G,B%GG$GG$"G$c           	         UR                    GH  u  p#U GH  nS nS nUR                  S:  a  Uu  pxn	Sn
OMSUR                  s=::  a  S::  a	  O  OUu  pxpO-SUR                  s=::  a  S::  a  O  O	Uu  nnn	n
nO
Uu  nnn	n
nnn[        X'5      n[        R                  " U5      nU R
                  R                  U5      nUc  M  U[        R                  L a  UR                  XU5        M  U[        L a  UR                  XU5        M  U R                  U" 5       U5      (       d?  U[        L a  [        5       nOU[        L a	  U" U5      nOU" 5       nUR                  US9  GMM  [        R!                  SUU=(       d    U5        [#        USS	5      (       a  U R$                  R'                  5         U R(                  R+                  U5        GM     GM     g )
Nr2  r7  r6  r4  rG  zEGot error produce response on topic-partition %s, retrying. Error: %srF  F)r   API_VERSIONr   r   r   r-  getr   rN   r   rK  r
   r   r   rL  rg   r   rJ  r.  r^   r/  r   )r.   r   r   r  partition_infoglobal_errorlog_start_offsetr   r   r  	timestamprq   ro   r   r@  rr   s                   r5   r   %SendProduceReqHandler.handle_response$  s   !)E",##' ''!+4B1I6 "I(..3!3?M<I69(..3!3 '!"!( '!"!($#E5
3))"-=FNN*JJv2BC55 JJv2BC%88 44,."??#El#gMMCM0KK3$-	 u&8%@@::<&&--e4E #- "1r8   c                 t    U R                   R                  c  UR                  5       (       a  gUR                  $ )NF)r   r#   expired	retriable)r.   r   r@  s      r5   rK   SendProduceReqHandler._can_retryj  s+     <<$$,r8   )r-  r.  r/  )r   r   r   r   r6   r   r   r   rK  r   r	  r
  s   @r5   r   r     s%     %N6@D5L r8   r   )3ra   r   loggingr   aiokafka.errorsr  r   aiokafka.clientr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   r   r   r   r   r   aiokafka.protocol.producer   aiokafka.protocol.transactionr   r   r   r   r   aiokafka.structsr   aiokafka.utilr   	getLoggerr   rg   r  r   r   r   r   r   r   r   r   r   r8   r5   <module>re     s          =    & 5  , %! Z" Z"z
" "<&%[ &%RN Nb5%[ 5%pK[ K\-%K -%`ZK Zr8   