
    9h(                     @   S SK r S SKrS SKrS SKrS SKrS SKJr  S SKJ	r	J
r
  S SKJr  S SKJr  S SKJr  S SKJr  S SKJr  S SKJrJrJrJrJrJr  S S	KJrJr  S S
K J!r!J"r"  \RF                  " \$5      r%Sr& " S S5      r' " S S\'5      r( " S S\'5      r) " S S5      r*g)    N)ConnectionGroupCoordinationType)RoundRobinPartitionAssignor)ConsumerProtocol)Response)OffsetCommitRequest_v2)OffsetFetchRequest_v1)HeartbeatRequestJoinGroupRequestJoinGroupResponseJoinGroupResponse_v5LeaveGroupRequestSyncGroupRequest)OffsetAndMetadataTopicPartition)create_futurecreate_taskc                   .    \ rS rSrSS.S jrS rS rSrg)	BaseCoordinator   Texclude_internal_topicsc                    Xl         X0l        X l        0 U l        UR                  U l        U R                  U R
                  5        U R
                  R                  U R                  5        g N)_client_exclude_internal_topics_subscription_metadata_snapshotcluster_cluster_handle_metadata_updateadd_listener)selfclientsubscriptionr   s       hC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/consumer/group_coordinator.py__init__BaseCoordinator.__init__   sT     (?%)"$ 	$$T]]3""4#?#?@    c                 \   U R                   nUR                  (       a  UR                  U R                  5       Vs/ s H'  nUR                  R	                  U5      (       d  M%  UPM)     nnUR
                  b#  [        U5      UR
                  R                  :w  a  UR                  U5        UR                  5       (       ag  U R                  bY  U R                  5       nU R                  U:w  a8  [        R                  SU R                  U5        XPl        U R                  5         g g g g s  snf )Nz.Metadata for topic has changed from %s to %s. )r   subscribed_patterntopicsr   matchr&   setsubscribe_from_patternpartitions_auto_assigned_group_subscription_get_metadata_snapshotr   loginfo_on_metadata_change)r$   r    r&   topicr-   metadata_snapshots         r'   r"   'BaseCoordinator._handle_metadata_update1   s   ))** %^^D,I,IJJE2288? J   ))1v;,";";"B"BB33F; 1133((4 $ ; ; =&&*;;D++%
 +<'((* < 5 4s   $D)#D)c                     0 nU R                    H4  nU R                  R                  U5      =(       d    / n[        U5      X'   M6     U$ r   )r2   r!   partitions_for_topiclen)r$   partitions_per_topicr7   
partitionss       r'   r3   &BaseCoordinator._get_metadata_snapshotN   sH    !--E;;EBHbJ +.j/ '	 .
 $#r*   )r   r!   r   r   r   N)__name__
__module____qualname____firstlineno__r(   r"   r3   __static_attributes__ r*   r'   r   r      s     !%A$+:$r*   r   c                   ^   ^  \ rS rSrSrU 4S jrS rSS jrS r\	S 5       r
S rS	 rS
rU =r$ )NoGroupCoordinatorX   ad  
When `group_id` consumer option is not used we don't have the functionality
provided by Coordinator node in Kafka cluster, like committing offsets (
Kafka based offset storage) or automatic partition assignment between
consumers. But `GroupCoordinator` class has some other responsibilities,
that this class takes care of to avoid code duplication, like:

    * Static topic partition assignment when we subscribed to topic.
      Partition changes will be noticed by metadata update and assigned.
    * Pattern topic subscription. New topics will be noticed by metadata
      update and added to subscription.
c                 b   > [         TU ]  " U0 UD6  [        U R                  5       5      U l        g r   )superr(   r   _reset_committed_routine_reset_committed_task)r$   argskw	__class__s      r'   r(   NoGroupCoordinator.__init__f   s+    $%"%%01N1N1P%Q"r*   c                 $    U R                  5         g r   )assign_all_partitionsr$   s    r'   r6   &NoGroupCoordinator._on_metadata_changek   s    ""$r*   c           	         / nU R                   R                  R                   Ha  nU R                  R	                  U5      nU(       d  U(       a  [
        R                  " 5       eMC  X$ Vs/ s H  n[        X55      PM     sn-  nMc     U R                   R                  R                  nUb  [        U5      UR                  :w  a  U R                   R                  U5        ggs  snf )zAssign all partitions from subscribed topics to this consumer.
If `check_unknown` we will raise UnknownTopicOrPartitionError if
subscribed topic is not found in metadata response.
N)r   r&   r-   r!   r;   ErrorsUnknownTopicOrPartitionErrorr   
assignmentr/   tpsassign_from_subscribed)r$   check_unknownr>   r7   p_idsp_idrX   s          r'   rR   (NoGroupCoordinator.assign_all_partitionsn   s    
 
''44;;EMM66u=E  ==??
 5I54>%65IIJ < ''44??
ZJNN!B55jA "C	 Js   *C!c                 l  #    Sn  U R                   R                  c$  U R                   R                  5       I Sh  vN   M<  U R                   R                  R                  nUc$  U R                   R	                  5       I Sh  vN   M  UR
                  nUR                  5         UR                  5        H3  nUR                  U5      nUR                  [        [        S5      5        M5     [        UR                  5       5      n[        R                  " UR                  U/[        R                   S9I Sh  vN   UR#                  5       (       d  UR%                  5         SnGM_   GN+ N N3! [        R&                   a     Of = fUb)  UR#                  5       (       d  UR%                  5         Snggg7f)zGroup coordinator will reset committed points to UNKNOWN_OFFSET
if no commit is found for group. In the NoGroup mode we need to force
it after each assignment
N return_when)r   r&   wait_for_subscriptionrX   wait_for_assignmentcommit_refresh_neededclearrequesting_committedstate_valueupdate_committedr   UNKNOWN_OFFSETr   waitasynciounassign_futureFIRST_COMPLETEDdonecancelCancelledError)r$   event_waiterrX   re   tptp_states         r'   rK   +NoGroupCoordinator._reset_committed_routine   s    
 	%%22:,,BBDDD!//<<GG
%,,@@BBB(2(H(H%%++-$99;B)55b9H--.?PR.ST <  ++@+E+E+GHll//> ' 7 7  
 $((** '')#'L5 D
 C %% 		
 #L,=,=,?,?!L -@#sR   F45E- E&AE- E)B4E- 7E+8/E- )E- +E- -FF4F0F4c                 B    U R                   R                  R                  $ r   )r   r&   r-   rS   s    r'   r2   &NoGroupCoordinator._group_subscription   s    !!..555r*   c                 z   #    U R                   R                  5         U R                   I S h  vN   S U l         g  N7fr   )rL   rp   rS   s    r'   closeNoGroupCoordinator.close   s3     ""))+((((%)" 	)s   *;9;c                 x    U R                   R                  5       (       a  U R                   R                  5         g g r   )rL   ro   resultrS   s    r'   check_errorsNoGroupCoordinator.check_errors   s.    %%**,,&&--/ -r*   )rL   )F)r@   rA   rB   rC   __doc__r(   r6   rR   rK   propertyr2   ry   r}   rD   __classcell__rO   s   @r'   rG   rG   X   sB    R
%B.* X 6 6*
0 0r*   rG   c                   $  ^  \ rS rSrSrSSSSSSS	\4SS
SS.U 4S jjrS rS rS r	S r
S rS rS rS rS rS\4S jrS rS rS rS rS rS rS rS  rS! rS" rS# rS$ rS% rS& rS' r S( r!S) r"S* r#S+ r$S, r%S- r&S. r'S/ r(S0 r)S1 r*S2r+U =r,$ )3GroupCoordinator   a>  
GroupCoordinator implements group management for single group member
by interacting with a designated Kafka broker (the coordinator). Group
semantics are provided by extending this class.

From a high level, Kafka's group management protocol consists of the
following sequence of actions:

1. Group Registration: Group members register with the coordinator
   providing their own metadata
   (such as the set of topics they are interested in).

2. Group/Leader Selection: The coordinator (one of Kafka nodes) select
   the members of the group and chooses one member (one of client's)
   as the leader.

3. State Assignment: The leader receives metadata for all members and
   assigns partitions to them.

4. Group Stabilization: Each member receives the state assigned by the
   leader and begins processing.
   Between each phase coordinator awaits all clients to respond. If some
   do not respond in time - it will revoke their membership

NOTE: Try to maintain same log messages and behaviour as Java and
      kafka-python clients:

    https://github.com/apache/kafka/blob/0.10.1.1/clients/src/main/java/          org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
    https://github.com/apache/kafka/blob/0.10.1.1/clients/src/main/java/          org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
zaiokafka-default-groupNi'  i  d   Ti  i i0u  )group_idgroup_instance_idsession_timeout_msheartbeat_interval_msretry_backoff_msenable_auto_commitauto_commit_interval_ms	assignorsr   max_poll_interval_msrebalance_timeout_msc                  > SU l         [        TU ]	  UUUS9  XPl        X`l        US-  U l        Xl        Xpl        Xl        Xl	        Xl
        [        R                  U l        [        S   R                  U l        X0l        X@l        SU l        SU l        [+        5       U l        [+        5       U l        [1        U R3                  5       5      U l        SU l        SU l        SU l        SU l        [>        R@                  " 5       U l!        [>        R@                  " 5       U l"        [F        RH                  " 5       U	S-  -   U l%        [+        5       U l&        g)zHInitialize the coordination manager.

Parameters (see AIOKafkaConsumer)
Nr     r   F)'r2   rJ   r(   _session_timeout_ms_heartbeat_interval_ms_max_poll_interval_rebalance_timeout_ms_retry_backoff_ms
_assignors_enable_auto_commit_auto_commit_interval_msOffsetCommitRequestDEFAULT_GENERATION_ID
generationr   UNKNOWN_MEMBER_ID	member_idr   _group_instance_idcoordinator_id_performed_join_preparer   _rejoin_needed_fut_coordinator_dead_futr   _coordination_routine_coordination_task_heartbeat_task_commit_refresh_task_pending_exception_error_consumed_futrl   Lock_coordinator_lookup_lock_commit_locktime	monotonic_next_autocommit_deadline_closing)r$   r%   r&   r   r   r   r   r   r   r   r   r   r   r   rO   s                 r'   r(   GroupCoordinator.__init__   s2   , $( $; 	 	
 $6 &;#"6"=%9"!1##5 (?%-CC)!,>> "3" (-$"//%2_""-d.H.H.J"K  $$(!
 #'#' (/% $LLN NN6== 	&
 &r*   c                 $    U R                  5         g r   )request_rejoinrS   s    r'   r6   $GroupCoordinator._on_metadata_change+  s    r*   c                 n  #    U R                   nUc  [        R                  " 5       e U R                  R	                  X![
        R                  S9I Sh  vN nU$  N! [        R                   aB  n[        R                  SUR                  R                  UU5        U R                  5         e SnAff = f7f)zjSend request to coordinator node. In case the coordinator is not
ready a respective error will be raised.
Ngroupz<Error sending %s to node %s [%s] -- marking coordinator dead)r   rV   !GroupCoordinatorNotAvailableErrorr   sendr   COORDINATION
KafkaErrorr4   errorrO   r@   coordinator_dead)r$   requestnode_idresperrs        r'   	_send_reqGroupCoordinator._send_req.  s      %%?::<<	**(D(D +  D     	IIN!!**	 !!#	s:   %B5+A AA B5A B20=B--B22B5c                    U R                   R                  5       (       a  U R                   R                  5         U R                  b"  U R                  R	                  S5        SU l        U R
                  b  U R
                  nSU l        Ueg)zSCheck if coordinator is well and no authorization or unrecoverable
errors occurred
N)r   ro   r|   r   
set_resultr   r$   excs     r'   r}   GroupCoordinator.check_errorsD  s|     ""''))##**,##/$$//5'+D$"".))C&*D#I /r*   c                     [         R                   " U5      nU R                  R                  U5        Xl        [	        5       U l        [        R                  " U R
                  U R                  /[        R                  S9$ )a  Most critical errors are not something we can continue execution
without user action. Well right now we just drop the Consumer, but
java client would certainly be ok if we just poll another time, maybe
it will need to rejoin, but not fail with GroupAuthorizationFailedError
till the end of days...
XXX: Research if we can't have the same error several times. For
     example if user gets GroupAuthorizationFailedError and adds
     permission for the group, would Consumer work right away or would
     still raise exception a few times?
ra   )
copyr   abort_waitersr   r   r   rl   rk   r   rn   r   s     r'   _push_error_to_user$GroupCoordinator._push_error_to_userR  sa     iin((-"%#0? ||%%t}}5//
 	
r*   c                   #    U R                   R                  5       (       a  gU R                   R                  S5        U R                  R                  5       (       d  U R                  I Sh  vN   U R	                  5       I Sh  vN   U R                  5       I Sh  vN   U R                  5       I Sh  vN   g NM N7 N! N7f)zSClose the coordinator, leave the current group
and reset local generation/memberId.N)r   ro   r   r   _stop_heartbeat_task!_stop_commit_offsets_refresh_task_maybe_leave_grouprS   s    r'   ry   GroupCoordinator.closef  s      ==  &&&++--))))'')))44666%%'''	 *)6'sH   A*C,B:-CB<CB>C4C 5C<C>C Cc                 8    [        U R                  5       5      nU$ r   )r   r   )r$   tasks     r'   maybe_leave_group"GroupCoordinator.maybe_leave_groupu  s    42245r*   c                   #    U R                   S:  a~  U R                  cq  U R                  R                  S:  a  SOSn[        U   " U R
                  U R                  5      n U R                  U5      I S h  vN   [        R                  S5        U R                  5         g  N*! [        R                   a   n[        R                  SU5         S nANBS nAff = f7f)Nr   r      r      zLeaveGroup request succeededzLeaveGroup request failed: %s)r   r   r   api_versionr   r   r   r   r4   r5   rV   r   r   reset_generation)r$   versionr   r   s       r'   r   #GroupCoordinator._maybe_leave_groupy  s     ??Q4#:#:#B
  <<33j@aaG'0OG9nnW--- 78 .$$ @		93??@sB   A C#B$ 7B"8B$ <&C"B$ $C8CCCCc                 R    U R                    H  nUR                  U:X  d  M  Us  $    g r   )r   name)r$   r   assignors      r'   _lookup_assignor!GroupCoordinator._lookup_assignor  s&    H}}$ ( r*   c                   #    U R                   R                  5         S U l        Ub'   U R                  U5      I S h  vN   UR                  nO
[        5       n[        R                  SUU R                  5        U R                   R                  (       aM   U R                   R                  R                  U5      n[        R                  " U5      (       a  UI S h  vN   g g g  N! [        R
                   a   n[        R                  SU5         S nANS nAff = f N@! [          a8    [        R#                  SU R                   R                  U R                  5         g f = f7f)Nz-OffsetCommit failed before join, ignoring: %sz7Revoking previously assigned partitions %s for group %szPUser provided subscription listener %s for group %s failed on_partitions_revoked)r   begin_reassignmentr2   _maybe_do_last_autocommitrV   r   r4   r   rY   r/   r5   r   listeneron_partitions_revokedrl   iscoroutine	Exception	exception)r$   previous_assignmentr   revokedress        r'   _on_join_prepare!GroupCoordinator._on_join_prepare  s0    --/#'  *P445HIII *--GeG 	EMM	

 &&
((11GGP&&s++II , ' J$$ P		I3OOP"  A&&//MM	sv   %E"C$ C"C$ AE"AD DD E""C$ $D8DE"DE"D ?EE"EE"responsec                   #    UR                   nUR                  nU R                  U5      nU(       d
   SU 35       e0 n[        5       nU H  n[	        U[
        5      (       a  Uu  pn
O9[	        U[        S   [        S   [        S   45      (       a  Uu  pO[        S5      e[        R                  R                  U
5      nXU'   UR                  UR                  5        M     X`l        U R                  R                  (       d%  U R                   R#                  U R                  5        U R                   R%                  5       I S h  vN   [&        R)                  SU R*                  UR,                  U5        UR/                  U R0                  U5      n[&        R)                  SU R*                  U5        U R3                  5       U l        U$  N7f)NzInvalid assignment protocol: r   r      z)unknown protocol returned from assignmentzJPerforming assignment for group %s using strategy %s with subscriptions %sz$Finished assignment for group %s: %s)group_protocolmembersr   r/   
isinstancer   r   RuntimeErrorr   METADATAdecodeupdater&   r2   r   r,   r   
set_topics_maybe_wait_metadatar4   debugr   r   assignr!   r3   r   )r$   r   assignment_strategyr   r   member_metadataall_subscribed_topicsmemberr   r   metadata_bytesmetadataassignmentss                r'   _perform_assignment$GroupCoordinator._perform_assignment  s    &55""(()<=N89L8MNNx #F($899?E<	n"1%'8';=Nq=QR  -3)	>"#NOO'0077GH)1I&!(()>)>? " $9 !!44LL##D$<$<= ll//111		%MMMM	
 oodmm_E		8$--U
 #'"="="?% 	2s   EGGBGc                 ~  #    U R                  U5      nU(       d
   SU 35       e[        R                  R                  U5      nU R                  R                  UR                  5       5        U R                  R                  nUR                  U5        U R                  5       I S h  vN   U R                  UR                  5        [        U R                  R                  5       5      n[        R                  SXR                   5        U R                  R"                  (       aM   U R                  R"                  R%                  U5      n	[&        R(                  " U	5      (       a  U	I S h  vN   g g g  N N	! [*         a9    [        R-                  SU R                  R"                  U R                   U5         g f = f7f)Nzinvalid assignment protocol: z1Setting newly assigned partitions %s for group %szIUser provided listener %s for group %s failed on partition assignment: %s)r   r   
ASSIGNMENTr   r   rZ   r>   r&   on_assignmentr   !start_commit_offsets_refresh_taskrX   r/   assigned_partitionsr4   r5   r   r   on_partitions_assignedrl   r   r   r   )
r$   r   r   protocolmember_assignment_bytesr   rX   r&   assignedr   s
             r'   _on_join_complete"GroupCoordinator._on_join_complete  so     ((2C8
CCx%00778OP
 	11*2G2G2IJ))66 	z*
 44666..|/F/FGt))==?@?==	

 &&((11HHR&&s++II , ' 	7  :&&//MMsK   B%F='E3(A=F=&AE7 +E5,E7 0F=5E7 7A F:7F=9F::F=c                     U R                   bN  [        R                  SU R                   U R                  5        SU l         U R                  R                  S5        gg)zMark the current coordinator as dead.
NOTE: this will not force a group rejoin. If new coordinator is able to
recognize this member we will just continue with current generation.
Nz3Marking the coordinator dead (node %s)for group %s.)r   r4   warningr   r   r   rS   s    r'   r   !GroupCoordinator.coordinator_dead  sR    
 *KKE##
 #'D&&11$7 +r*   c                 ~    [         R                  U l        [        S   R                  U l        U R                  5         g)z^Coordinator did not recognize either generation or member_id. Will
need to re-join the group.
r   N)r   r   r   r   r   r   r   rS   s    r'   r   !GroupCoordinator.reset_generation  s/     .CC)!,>>r*   c                 z    U R                   R                  5       (       d  U R                   R                  S 5        g g r   )r   ro   r   rS   s    r'   r   GroupCoordinator.request_rejoin#  s0    &&++--##..t4 .r*   c                 `    UR                   SL =(       d    U R                  R                  5       $ )zuCheck whether the group should be rejoined

Returns:
    bool: True if consumer should rejoin group, False otherwise
N)rX   r   ro   )r$   r&   s     r'   need_rejoinGroupCoordinator.need_rejoin'  s)     &&$.P$2I2I2N2N2PPr*   c                   #    U R                   b  gU R                   ISh  vN   U R                  S-  nU R                   Gc#  U R                  R	                  5       (       Gd   U R
                  R                  [        R                  U R                  5      I Sh  vN nU R
                  R'                  U[(        R*                  S9I Sh  vN nU(       d   ["        R$                  " U5      I Sh  vN   M  X l         [-        5       U l        [        R1                  SU R                   U R                  5        U R                   c"  U R                  R	                  5       (       d  GM  SSS5      ISh  vN   g GNV N! [        R                   a'  n[        R                  " U R                  5      nXCeSnAf[        R                   av  n[        R                  SU5        UR                  (       aI  U R
                  R!                  5       I Sh  vN    ["        R$                  " U5      I Sh  vN     SnAGM  e SnAff = f GNw GNU N! , ISh  vN  (       d  f       g= f7f)z4Block until the coordinator for this group is known.Nr   z$Group Coordinator Request failed: %sr   z&Discovered coordinator %s for group %s)r   r   r   r   ro   r   coordinator_lookupr   GROUPr   rV   GroupAuthorizationFailedErrorr   r4   r   	retriableforce_metadata_updaterl   sleepreadyr   r   r   r   r5   )r$   retry_backoffr   r   new_errr%  s         r'   ensure_coordinator_known)GroupCoordinator.ensure_coordinator_known/  s    *000 22T9M%%-dmm6H6H6J6J+/<<+J+J(.., &N" #ll00"/*F*F 1   !--666&4#-:_*<''MM9 %%-dmm6H6H6J6J 100& ;; +$BB4==QG!*(( IIDcJ}}"ll@@BBB%mmM:::  73 1000s   I$E7I$>I
$7E<E:E< +I
I$I
0I1A2I
&I$1I2I$:E<<H?"F22H?	AH:HH:,H/-H:2I
9H::H??I
I
I$
I!II!I$c                 &  #     U R                  5       I S h  vN   g  N! [        R                   a    e [         aQ  n[        R                  S5        [        R                  " SU< 35      nU R                  R                  U5        X!eS nAff = f7f)Nz'Unexpected error in coordinator routinez%Unexpected error during coordination )
'_GroupCoordinator__coordination_routinerl   rq   r   r4   r   rV   r   r   r   )r$   r   	kafka_excs      r'   r   &GroupCoordinator._coordination_routineX  s     
	%--///%% 	 	%MMCD))7w?I ,,Y7$	%s1   B  B BAB		BBc                   #    U R                   R                  nSnU R                  R                  5       (       Gd  Ub7  UR                  (       d&  U R                  5         U R                   R                  nUc  [        R                  " U R                   R                  5       U R                  /[        R                  S9I Sh  vN   U R                  R                  5       (       a  GOGU R                   R                  nUb  UR                  (       d   eU R                   R                  5       n U R                  5       I Sh  vN   U(       aI  U R                  U5      (       a3  U R                  X5      I Sh  vN nUb  UR                  (       d  GMy  UnOUR                  nUb  UR                  (       d   eU R                  U5      I Sh  vN nU R                  U R&                  UR(                  /nU(       a  UR+                  U R,                  5        U R.                  (       a  UR+                  U R.                  5        U R0                  (       a  UR+                  U R0                  5        [        R                  " Xu[        R                  S9I Sh  vN u  pU R.                  U R0                  4 HU  n
U
(       d  M  U
R                  5       (       d  M#  U
R3                  5       nU(       d  M<  U R%                  U5      I Sh  vN   MW     U R                  R                  5       (       d  GM  Ub   U R5                  U5      I Sh  vN   gg GN GN GN GN! [         R"                   a&  nU R%                  U5      I Sh  vN     SnAGMt  SnAff = f GN N NT! [         R"                   a   n[6        R9                  SU5         SnAgSnAff = f7f)zMain background task, that keeps track of changes in group
coordination. This task will spawn/stop heartbeat task and perform
autocommit in times it's safe to do so.
Nra   timeoutrb   z%Failed to commit on finallization: %s)r   r&   r   ro   activer   rl   rk   rc   rn   r1   r(  r  ensure_active_grouprX   _maybe_do_autocommitrV   r   r   r   unsubscribe_futureappendr   r   r   r   r   r4   r   )r$   r&   rX   auto_assignednew_assignmentwait_timeoutr   futuresro   _r   r   s               r'   __coordination_routine'GroupCoordinator.__coordination_routinee  s    
 ))66
--$$&&'0C0C
 ##%#11>>#ll''==?O ' 7 7   ==%%''#11>>+0C0CCC ..GGIM33555 T%5%5l%C%C+/+C+C$, &N &-^5J5J %3
!-!8!8J!-*2C2CCC &*%>%>z%JJ **//G t667
 ##t334((t889#LL7;R;R GD
 --t/H/HI4DIIKK..*Cs"66s;;;	 JO --$$&&\ !H44Z@@@ "I 6&   K$$ ..s3330 <
 A$$ H		A3GGHs   C OMA+O/M M5M 8M9M O9M MM CON&O OO0ON'O/O3N NN OM M M N.NNNONOON O2OOOOc                 ~  #    U R                   R                  (       a4  U R                  R                  5       I S h  vN   UR                  (       d  g U R
                  (       d   U R                  U5      I S h  vN   SU l        U R                  5       I S h  vN   U R                   R                  nUb;  X0R                  :  a,  [        R                  " U R                  S-  5      I S h  vN   g U R                  U5      I S h  vN nU(       a#  SU l        U R                  5         UR                  $ g  N N N NO N77f)NTr   F)r   r,   r   r#  r1  r   r   r   fetcher_idle_timer   rl   r$  r   _do_rejoin_group_start_heartbeat_taskrX   )r$   r&   prev_assignment	idle_timesuccesss        r'   r2  $GroupCoordinator.ensure_active_group  s
     00,,44666&&++''888+/D( ''))) &&88	&98O8O+O-- 6 6 =>>> --l;;+0D(&&(***C 7 9 	*
 ? <sX   9D=D3;D=7D58D=D7AD=)D9*D=D;0D=5D=7D=9D=;D=c                 \    U R                   c  [        U R                  5       5      U l         g g r   )r   r   _heartbeat_routinerS   s    r'   r@  &GroupCoordinator._start_heartbeat_task  s)    '#.t/F/F/H#ID  (r*   c                    #    U R                   bU  U R                   R                  5       (       d.  U R                   R                  5         U R                   I S h  vN   S U l         g g  N7fr   )r   ro   rp   rS   s    r'   r   %GroupCoordinator._stop_heartbeat_task  sY     +'',,..$$++-****#'D 	 , +   AA(A&A(c                   #    [         R                  " 5       nU R                  S-  nU R                  S-  nU R                  S-  nUnU R
                  [        S   R                  :w  GaQ   [        R                  " U5      I S h  vN   U R                  5       I S h  vN   [         R                  " 5       nU R                  5       I S h  vN nU(       a(  [         R                  " 5       n[        SX!-
  U-   45      nOUn[         R                  " 5       U-
  nX:  a%  [        R                  S5        U R!                  5         U R"                  R$                  n	XR&                  :  a  [)        XPR&                  U	-
  5      nOU R+                  5       I S h  vN   U R
                  [        S   R                  :w  a  GMQ  [        R-                  S5        g  GNM GN8 GN! [        R                   a     M6  f = f Nb7f)Nr   r   z4Heartbeat session expired - marking coordinator deadzStopping heartbeat task)r   r   r   r   r   r   r   r   rl   r$  r(  _do_heartbeatrq   maxr4   r   r   r   r>  r   minr   r   )
r$   last_ok_heartbeathb_intervalsession_timeoutr&  
sleep_timet0rC  session_timerB  s
             r'   rF  #GroupCoordinator._heartbeat_routine  s     NN,11D822T9..5 
 nn 0 3 E EEmmJ///33555^^% $ 2 2 44 $(NN$4! ![%Dr%I!JK
*
>>+.??L- 		PQ%%' **<<I222 -D-Dy-PQ
--///C nn 0 3 E EEF 			+,C 05 5)) 4 0ss   A'G6*G GG G,G G	G CG6G4%G6:G6G G G G1-G60G11G6c                 Z  #    U R                   R                  S:  a  SOSn[        U   " U R                  U R                  U R
                  5      n[        R                  SU R                  U R                  U R
                  5         U R                  U5      I S h  vN n[        R                  " UR                  5      nU[        R                  L a!  [        R                  SU R                  5        gU[        R                  [        R                   4;   a<  [        R#                  S	U R                  U R$                  5        U R'                  5         gU[        R(                  L a1  [        R#                  S
U R                  5        U R+                  5         gU[        R,                  L a1  [        R#                  SU R                  5        U R/                  5         gU[        R0                  L a&  [        R#                  S5        U R/                  5         gU[        R2                  L a  U" U R                  5      e[        R                  " SU" 5       < 35      n[        R                  SU5        Ue GN! [        R                   a   n[        R                  SU5         S nAgS nAff = f7f)Nr   r   r   zHeartbeat: %s[%s] %sz.Heartbeat send request failed: %s. Will retry.Fz3Received successful heartbeat response for group %sTzWHeartbeat failed for group %s: coordinator (node %s) is either not started or not validz7Heartbeat failed for group %s because it is rebalancingz=Heartbeat failed for group %s: generation id is not  current.zTHeartbeat failed: local member_id was not recognized; resetting and re-joining groupz(Unexpected exception in heartbeat task: zHeartbeat failed: %r)r   r   r
   r   r   r   r4   r   r   rV   r   r   for_code
error_codeNoErrorr   NotCoordinatorForGroupErrorr  r   r   RebalanceInProgressErrorr   IllegalGenerationErrorr   UnknownMemberIdErrorr!  )r$   r   r   r   r   
error_types         r'   rL  GroupCoordinator._do_heartbeat(  s+    ||//*<!!"7+MM4??DNN
 			"DMM4??DNN	
	00D __T__5
'IIEt}} 44..
 
 KK6##	 !!#B A 6:::KKI4== ! 6888KKO !!#  6666KK2 !!#  6???T]]++##::<:JKC II,c2Ii 1   	IIFL	sC   BJ+I4 I1I4  GJ+1I4 4J(J#J+#J((J+c                     U R                   b  U R                   R                  5         [        U R                  U5      5      U l         g r   )r   rp   r   _commit_refresh_routiner$   rX   s     r'   r  2GroupCoordinator.start_commit_offsets_refresh_taskk  s;    $$0%%,,.$/((4%
!r*   c                    #    U R                   bU  U R                   R                  5       (       d.  U R                   R                  5         U R                   I S h  vN   S U l         g g  N7fr   )r   ro   rp   rS   s    r'   r   2GroupCoordinator._stop_commit_offsets_refresh_taskr  sY     $$0,,1133))002////(,D%	 1 0rJ  c                   #    U R                   S-  nUR                  nSn UR                  (       a  UR                  5         U R	                  U5      I Sh  vN nUR
                  /nU(       d  UnO,Sn[        UR                  5       5      nUR                  U5        [        R                  " Xg[        R                  S9I Sh  vN   UR                  (       a  M  Ub)  UR                  5       (       d  UR                  5         Snggg N NF! [        R                   a     NG[         a    UR                  5         e f = f7f)zGTask that will do a commit cache refresh if someone is waiting for
it.
r   Nr/  )r   re   r1  rf   _maybe_refresh_commit_offsetsrm   r   rk   r5  rl   rn   rq   r   r/   ro   rp   )r$   rX   r   re   rr   rC  wait_futuresr0  s           r'   ra  (GroupCoordinator._commit_refresh_routinez  s-      11D8 * @ @	##%++- $ B B: NN * : :;.G"G#./D/I/I/K#LL ''5ll w?V?V   ###0 #L,=,=,?,?!L -@#- O %% 	 	!%%'	sM   E5D D	A-D DD -E	D D D>!E#D>>Ec           	         #    [        U U R                  U R                  UU R                  U R                  U R
                  5      nUR                  5       I S h  vN nUR                  (       d6  [        R                  SUR                  U R                  R                  5        gUc,  [        R                  " U R
                  S-  5      I S h  vN   gUu  pEU R                  U R                  U R                   XE5      I S h  vN   g N N; N	7f)NzESubscription changed during rebalance from %s to %s. Rejoining group.Fr   T)CoordinatorGroupRebalancer   r   r   r   r   perform_group_joinr1  r4   r   r-   r   rl   r$  r  r   r   )r$   r&   	rebalancerX   r  r  s         r'   r?  !GroupCoordinator._do_rejoin_group  s     -MMOO$$""
	 %7799
""II###""))	 -- 6 6 =>>>,6)$$OOT^^X
 	
 	
 ' : ?	
s7   ADDA4DD
3DDD
DDc                   #    U R                   (       d  g [        R                  " 5       nU R                  S-  nU R                  S-  nX R
                  :  aV   U R                   IS h  vN   U R                  XR                  5       5      I S h  vN   S S S 5      IS h  vN   X#-   U l        [        SU R
                  [        R                  " 5       -
  5      $  Np NK N=! , IS h  vN  (       d  f       NR= f! [        R                   aV  n[        R                  SU5        U R                  U5      (       a$  [        R                  " 5       U-   U l        Us S nA$ e S nAff = f7f)Nr   zAuto offset commit failed: %sr   )r   r   r   r   r   r   r   _do_commit_offsetsall_consumed_offsetsrV   r   r4   r  _is_commit_retriablerM  )r$   rX   nowintervalbackoffr   s         r'   r3  %GroupCoordinator._maybe_do_autocommit  s    ''nn0047((4////,,,11"$C$C$E   -, .1^D*1d44t~~7GGHH! - -,,, $$ ;UC,,U3359^^5E5OD2"Ns   AE&C9 (C)C9 ,#CCCC9 C C9 $5E&C9 CC9 C6%C(&C62C9 5E&6C9 9E#A
EE#E&EE##E&c                     UR                   =(       d9    [        U[        R                  [        R                  [        R
                  45      $ r   )r"  r   rV   r]  r\  r[  )r$   r   s     r'   rr  %GroupCoordinator._is_commit_retriable  s?      
*++--//#
 	
r*   c                    #    U R                   (       d  g U R                  XR                  5       5      I S h  vN   g  N7fr   )r   commit_offsetsrq  rb  s     r'   r   *GroupCoordinator._maybe_do_last_autocommit  s.     ''!!*.M.M.OPPPs   5?=?c                   #     U R                  5       I Sh  vN    U R                   ISh  vN   [        R                  " U R	                  X5      5      I Sh  vN   SSS5      ISh  vN   g NX ND N N! , ISh  vN  (       d  f       N = f! [
        R                  [
        R                  [
        R                  4 a  n[
        R                  " S5      UeSnAf[
        R                   aH  nUR                  (       d  e [        R                  " U R                  S-  5      I Sh  vN     SnAOSnAff = fGM=  7f)zCommit specific offsets

Arguments:
    offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit

Raises KafkaError on failure
NzxCommit cannot be completed since the group has already rebalanced and may have assigned the partitions to another memberr   )r(  r   rl   shieldrp  rV   r]  r\  r[  CommitFailedErrorr   r"  r$  r   )r$   rX   offsetsr   r   s        r'   rz  GroupCoordinator.commit_offsets  s      //111,,,!..)@)@)UVVV -,& + 2,V -,,, ++--// 	
 ..( 	
 $$ G}} "--(>(>(EFFFG s   EA0EB A2B )A8A4A8B *A6+B /E2B 4A86B 8B>B?BB 3D;CD;38D6+D.,D61E6D;;Ec           
        #    U(       d  g [         R                  " [        5      nUR                  5        HC  u  pEX4R                     R                  UR                  UR                  UR                  45        ME     [        U R                  U R                  U R                  [        R                  [        UR                  5       5      5      n[        R                  SUU R                  U R                   5        U R#                  U5      I S h  vN n[         R$                  " 5       n['        5       n	UR(                   GH  u  pU GH  u  p[+        X5      n[,        R.                  " U5      nX$   nU[,        R0                  L a  [        R                  SXT5        MV  U[,        R2                  L aA  [        R5                  SU R                  UR6                  5        U" U R                  5      X'   M  U[,        R8                  L a  U	R;                  U
5        M  U[,        R<                  [,        R>                  4;   a8  [        RA                  SU R                  UUR6                  5        U" 5       X'   GM,  U[,        RB                  L a7  [        RA                  SU R                  UR6                  5        U" 5       X'   GMv  U[,        RD                  [,        RF                  [,        RH                  4;   aG  [        RA                  SU R                  UR6                  5        U RK                  5         U" 5       X'   GM  U[,        RL                  [,        RN                  [,        RP                  4;   an  U" U R                  5      n[        R5                  SU R                  U5        U[,        RP                  L a  U RS                  5         OU RU                  5         XU'   GM  [        R5                  SU R                  UUUR6                  5        U" 5       X'   GM     GM     U(       a  URW                  5       tnnUeU	(       a,  [        R5                  S	U	5        [,        R8                  " U	5      eg  GNX7f)
Nz8Sending offset-commit request with %s for group %s to %sz$Committed offset %s for partition %sz%OffsetCommit failed for group %s - %szFOffsetCommit failed for group %s on partition %s due to %s, will retryzOOffsetCommit failed for group %s because group is initializing (%s), will retryzeOffsetCommit failed for group %s due to a coordinator error (%s), will find new coordinator and retryzEOffsetCommit failed for group %s due to group error (%s), will rejoinzCOffsetCommit failed for group %s on partition %s with offset %s: %sz.OffsetCommit failed for unauthorized topics %s),collectionsdefaultdictlistitemsr7   r5  	partitionoffsetr  r   r   r   r   DEFAULT_RETENTION_TIMEr4   r   r   r   OrderedDictr/   r-   r   rV   rW  rY  r!  r   r@   TopicAuthorizationFailedErroraddOffsetMetadataTooLargeErrorInvalidCommitOffsetSizeErrorr5   GroupLoadInProgressErrorr   rZ  RequestTimedOutErrorr   r]  r\  r[  r   r   values)r$   rX   r  offset_datars   r  r   r   erroredunauthorized_topicsr7   r>   r  rX  r^  r   first_errorr:  s                     r'   rp  #GroupCoordinator._do_commit_offsets	  s     "--d3!--/JB!((v}}foo> *
 &MMOONN66""$%
 			FMM		
 00))+!e!)E)3%	#E5#__Z8
 /IIDfQ6#G#GGII?"++
 #-T]]";GK6#G#GG'++E26677$ 
 HH1"++ #-,GK6#B#BBHH9"++	 #-,GK<<66//$ 
 HH% "++ ))+",,GK//1133$  't}}5EII3	 "V%D%DD++---/"'BK II."++ #-,GK_ *4 "1d %nn.OK!II@BU 667JKK	 s 1s   DQ)Q&MQ)c                   #    UR                  5       nU(       am   U R                  U5      I S h  vN nU HM  nUR                  U5      nXS;   a  UR                  X5   5        M.  UR                  [        [        S5      5        MO     g NX! [        R                   a2  nUR                  (       d  e [
        R                  SU5         S nAgS nAff = f7f)Nz%Failed to fetch committed offsets: %rFr`   T)rg   _do_fetch_commit_offsetsrV   r   r"  r4   r   rh   ri   r   rj   )r$   rX   need_updater  r   rs   rt   s          r'   rg  .GroupCoordinator._maybe_refresh_commit_offsets  s      557 $ = =k JJ "%11"5=--gk:--.?PR.ST "  K$$ }}IIEsKs>   CB
 BB
 ACB
 
C(CCCCc                 P  #    U(       d  0 $  U R                  5       I Sh  vN    U R                  U5      I Sh  vN nU$  N  N! [        R                   aH  nUR                  (       d  e [
        R                  " U R                  S-  5      I Sh  vN     SnAOSnAff = fM  7f)zFetch the current committed offsets for specified partitions

Arguments:
    partitions (list of TopicPartition): partitions to fetch

Returns:
    dict: {TopicPartition: OffsetAndMetadata}
Nr   )r(  r  rV   r   r"  rl   r$  r   )r$   r>   r  r   s       r'   fetch_committed_offsets(GroupCoordinator.fetch_committed_offsets  s      I//111	 $ = =j II  2I$$ G}} "--(>(>(EFFFG	 sS   B&AB&A AA B&A B!8BBBB&B!!B&c                 .  #    [         R                  SU5        [        R                  " [        5      nU H*  nX#R
                     R                  UR                  5        M,     [        U R                  [	        UR                  5       5      5      nU R                  U5      I S h  vN n0 nUR                   GHT  u  pxU GHG  u  pp[        Xy5      n[        R                  " U5      nU[        R                   La  U" 5       n[         R                  SX>5        U[        R"                  L a  UeU[        R$                  L a  U R'                  5         UeU[        R(                  L a  [         R+                  SU5        M  U[        R,                  L a  U" U R                  5      e[         R/                  SX>5        [        R0                  " [3        U5      5      eU
[4        :X  a  [         R                  SU5        GM:  [7        X5      Xc'   GMJ     GMW     U$  GNn7f)Nz-Fetching committed offsets for partitions: %sz Error fetching offset for %s: %sz&OffsetFetchRequest -- unknown topic %sz)Unknown error fetching offsets for %s: %sz$No committed offset for partition %s)r4   r   r  r  r  r7   r5  r  OffsetFetchRequestr   r  r   r-   r   rV   rW  rY  r  rZ  r   rW   r  r!  r   r   reprrj   r   )r$   r>   partitions_by_topicrs   r   r   r  r7   topic_partitionsr  r  r  rX  r^  r   s                  r'   r  )GroupCoordinator._do_fetch_commit_offsets  s    		A:N)55d;B)00>  %T]]D9L9R9R9T4UV00'/#E;K7	8#E5#__Z8
V^^3&LEII@"L!V%D%DD##v'I'II--/##v'J'JJ$LeT #v'K'KK(77		G %//U<< ^+IIDbI"3F"EGK9 <L (7< A 1s   B"H$H%E.H)r   r   r   r   r   r   r   r   r   r   r   r2   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )-r@   rA   rB   rC   r   r   r(   r6   r   r}   r   ry   r   r   r   r   r   r  r  r   r   r   r  r(  r   r+  r2  r@  r   rF  rL  r  r   ra  r?  r3  rr  r   rz  rp  rg  r  r  rD   r   r   s   @r'   r   r      s   L * " $.0 $#"H( H(T,
((   D/( /b)V85Q'R%[Hz+ZJ(,-\AF
-" H>I4
Q
@xLt&2( (r*   r   c                   6    \ rS rSrSrS rS rS rS rS r	Sr
g	)
rk  i  a,  An adapter, that encapsulates rebalance logic and will have a copy of
assigned topics, so we can detect assignment changes. This includes
subscription pattern changes.

On how to handle cases read in https://cwiki.apache.org/confluence/            display/KAFKA/Kafka+Client-side+Assignment+Proposal
c                     Xl         X l        X0l        X@l        XPl        X`l        Xpl        U R                   R                  R                  U l	        U R                   R                  U l
        g r   )_coordinatorr   r   r   r   r   r   r   r   _api_versionr   )r$   coordinatorr   r   r&   r   r   r   s           r'   r(   "CoordinatorGroupRebalance.__init__  sZ     ( ,)##5 !1 --55AA%)%6%6%L%L"r*   c           
        #    [         R                  SU R                  5        U R                  R                  n/ nU R
                   GH  nUR                  U5      n[        U[        5      (       d  UR                  5       nUR                  U4nUR                  U5        SnU(       d  Md  SnU R                  S:  aK  [        S   " U R                  U R                  U R                  R                   ["        R$                  U5      nGO3U R                  S:  aU  [        S   " U R                  U R                  U R&                  U R                  R                   ["        R$                  U5      nOU R                  S:  aU  [        S	   " U R                  U R                  U R&                  U R                  R                   ["        R$                  U5      nOi[        S
   " U R                  U R                  U R&                  U R                  R                   U R                  R(                  ["        R$                  U5      n[         R+                  SUU R,                  5         U R                  R/                  U5      I Sh  vN nU R                  R4                  (       d    g[0        R6                  " UR8                  5      n	U	[0        R:                  L a  UR                   U R                  l        SnU(       a  GML  GM     W	[0        R<                  L a  [         R+                  SW5        UR                   U R                  l        UR>                  U R                  l         URB                  n
[         R                  SU R                  UR>                  UR                   5        URD                  UR                   :X  a0  [         R                  SU
5        U RG                  U5      I Sh  vN nOU RI                  5       I Sh  vN nUc  gX4$ U	[0        RJ                  L aW  [         R+                  SU R                  U R,                  5        [L        RN                  " U RP                  S-  5      I Sh  vN   gU	[0        RR                  L a;  U R                  RU                  5         [         R+                  SU R                  5        gU	[0        RV                  [0        RX                  4;   aC  U	" 5       nU R                  R[                  5         [         R+                  SU R                  U5        gU	[0        R\                  [0        R^                  [0        R`                  4;   a  U	" 5       n[         Rc                  SU5        UeU	[0        Rd                  L a  U	" U R                  5      eU	" 5       n[         Rc                  SU R                  U5        [0        R2                  " [g        U5      5      e GNh! [0        R2                   a       gf = f GN  GN
 GN7f)a
  Join the group and return the assignment for the next generation.

This function handles both JoinGroup and SyncGroup, delegating to
_perform_assignment() if elected as leader by the coordinator node.

Returns encoded-bytes assignment returned from the group leader
z(Re-)joining group %sTF)r   
   r   r   r   r   r      r   r   r  z(Sending JoinGroup (%s) to coordinator %sNzJoin group response %sz3Joined group '%s' (generation %s) with member_id %szAElected group leader -- performing partition assignments using %szLAttempt to join group %s rejected since coordinator %s is loading the group.r   z8Attempt to join group %s failed due to unknown member idzKAttempt to join group %s failed due to obsolete coordinator information: %sz3Attempt to join group failed due to fatal error: %sz0Unexpected error in join group '%s' response: %s)4r4   r5   r   r   r-   r   r  r   bytesencoder   r5  r  r   r   r  r   r   PROTOCOL_TYPEr   r   r   r   r   rV   r   r1  rW  rX  MemberIdRequiredrY  generation_idr   r   	leader_id_on_join_leader_on_join_followerr  rl   r$  r   r]  r   r   rZ  r   InconsistentGroupProtocolErrorInvalidSessionTimeoutErrorInvalidGroupIdErrorr   r!  r  )r$   r-   metadata_listr   r  r   try_joinr   r   r^  r  assignment_bytesr   s                r'   rl  ,CoordinatorGroupRebalance.perform_group_join  s     	($--8##**H((0Hh..#??,&mmX6N  0H( $$z1.q100))33(66%G &&3.q10022))33(66%G &&2.q10022))33(66%G /q10022))33))<<(66%G 		>''
 %)%6%6%@%@%IIH
 ))00#__X-@-@A
!8!882:2D2DD%%/#Hy ( (L 'II.9*2*<*<D'+3+A+AD(..HHHE&&""	 !!X%7%77,
 *.)=)=h)G#G )-)?)?)A#A '//6:::II)##	 -- 6 6 =>>>J I 6666..0IIJB ; 44..
 

 ,C..0II.	, ! 11--&&
 

 ,CIIKSQI6???T]]++,CIIBDMMSV ##DI..i  J((     @ $H#A ?s}   B%X
+F1X
W&;W#<W& A4X
7C*X
!X"X
:X;A/X
*X+E8X
#W&&W>:X
=W>>X
X
X
c                   #    U R                   S:  aY  U R                   S:  a  SOSn[        U   " U R                  U R                  R                  U R                  R
                  / 5      nOY[        S   " U R                  U R                  R                  U R                  R
                  U R                  R                  / 5      n[        R                  SU R                  U R                  U5        U R                  U5      I S h  vN $  N7f)Nr  r   r   r   r   z=Sending follower SyncGroup for group %s to coordinator %s: %s)r  r   r   r  r   r   r   r4   r   r   _send_sync_group_request)r$   r   r   s      r'   r  +CoordinatorGroupRebalance._on_join_follower  s     y(,,z9aqG&w/!!,,!!++	G 'q)!!,,!!++!!44G 			KMM		
 227;;;;s   DDD
Dc                 r  #     U R                   R                  U5      I Sh  vN n/ nUR                  5        H<  u  pV[        U[        5      (       d  UR                  5       nUR                  XV45        M>     U R                  S:  aY  U R                  S:  a  SOSn[        U   " U R                  U R                   R                  U R                   R                  U5      nOY[        S   " U R                  U R                   R                  U R                   R                  U R                   R                   U5      n["        R%                  SU R                  U R&                  U5        U R)                  U5      I Sh  vN $  GN^! [         a%  n[        R                  " [        U5      5      UeSnAff = f N97f)z
Perform leader synchronization and send back the assignment
for the group via SyncGroupRequest

Arguments:
    response (JoinResponse): broker response to parse

Returns:
    Future: resolves to member assignment encoded-bytes
Nr  r   r   r   r   z;Sending leader SyncGroup for group %s to coordinator %s: %s)r  r  r   rV   r   r  r  r   r  r  r5  r  r   r   r   r   r   r4   r   r   r  )	r$   r   group_assignmenteassignment_reqr   rX   r   r   s	            r'   r  )CoordinatorGroupRebalance._on_join_leader  s~    	4%)%6%6%J%J8%TT %5%;%;%=!Ij%00'..0
!!9"9: &>
 y(,,z9aqG&w/!!,,!!++	G 'q)!!,,!!++!!44G 			IMM		
 227;;;C  U 	4##DG,!3	4@ <sD   F7F F F EF7;F5<F7 F 
F2 F--F22F7c                 l  #    [        5       U R                  l        U R                  R                  nU R                  R                  n U R                  R                  U5      I S h  vN n[        R                  " UR                  5      nU[        R                  L aa  [        R                  SU R                  U R                  R                  5        X R                  l        X0R                  l        UR                  $ U R                  R                  5         U[        R                   L a!  [        R#                  SU R                  5        g U[        R$                  [        R&                  4;   aC  U" 5       n[        R#                  SU R                  U5        U R                  R)                  5         g U[        R*                  [        R,                  4;   aC  U" 5       n[        R#                  SU R                  U5        U R                  R/                  5         g U[        R0                  L a  U" U R                  5      eU" 5       n[        R3                  SU5        [        R                  " [5        U5      5      e GN! [        R                   a    U R                  R                  5          g f = f7f)Nz/Successfully synced group %s with generation %sz4SyncGroup for group %s failed due to group rebalancez(SyncGroup for group %s failed due to %s,z'SyncGroup for group %s failed due to %sz#Unexpected error from SyncGroup: %s)r   r  r   r   r   r   rV   r   r   rW  rX  rY  r4   r5   r   member_assignmentr[  r   r]  r\  r   r   rZ  r   r!  r   r  )r$   r   req_generationreq_member_idr   r^  r   s          r'   r  2CoordinatorGroupRebalance._send_sync_group_request  s"    
 0=,**55))33	!..88AAH __X%8%89
'HHA!!,, ,:(*7'--- 	((*888IIF* % F779V9VWW,CII@$--QTU..0  44..
 
 ,CII?PST..0  6???T]]++,CII;SA##DI..U B   	 ,,.		s=   AJ4	J  'I=(J  ,HJ4=J   .J1.J40J11J4)	r  r   r  r   r   r   r   r   r   N)r@   rA   rB   rC   r   r(   rl  r  r  r  rD   rE   r*   r'   rk  rk    s$    M*Yv<4-<^6r*   rk  )+rl   r  r   loggingr   aiokafka.errorserrorsrV   aiokafka.clientr   r   )aiokafka.coordinator.assignors.roundrobinr   aiokafka.coordinator.protocolr   aiokafka.protocol.apir   aiokafka.protocol.commitr   r   r	   r  aiokafka.protocol.groupr
   r   r   r   r   r   aiokafka.structsr   r   aiokafka.utilr   r   	getLoggerr@   r4   rj   r   rG   r   rk  rE   r*   r'   <module>r     s           = Q : * R P  ? 4!7$ 7$td0 d0NX Xv x xr*   