
    ChQ                         S SK r S SKrS SKrS SKJrJr  S SKJr  S SKJ	r	  S SK
JrJr  S SKJr  \R                  " \5      r " S S\	5      rg)	    N)IterableMapping)ClusterMetadata)AbstractPartitionAssignor) ConsumerProtocolMemberAssignmentConsumerProtocolMemberMetadata)TopicPartitionc            	           \ rS rSrSrSrSr\S\S\	\
\4   S\\
\4   4S j5       r\S	\\
   S\4S
 j5       r\S\SS4S j5       rSrg)RoundRobinPartitionAssignor   a  
The roundrobin assignor lays out all the available partitions and all the
available consumers. It then proceeds to do a roundrobin assignment from
partition to consumer. If the subscriptions of all consumer instances are
identical, then the partitions will be uniformly distributed. (i.e., the
partition ownership counts will be within a delta of exactly one across all
consumers.)

For example, suppose there are two consumers C0 and C1, two topics t0 and
t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
t0p2, t1p0, t1p1, and t1p2.

The assignment will be:
    C0: [t0p0, t0p2, t1p1]
    C1: [t0p1, t1p0, t1p2]

When subscriptions differ across consumer instances, the assignment process
still considers each consumer instance in round robin fashion but skips
over an instance if it is not subscribed to the topic. Unlike the case when
subscriptions are identical, this can result in imbalanced assignments.

For example, suppose we have three consumers C0, C1, C2, and three topics
t0, t1, t2, with unbalanced partitions t0p0, t1p0, t1p1, t2p0, t2p1, t2p2,
where C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is
subscribed to t0, t1, t2.

The assignment will be:
    C0: [t0p0]
    C1: [t1p0]
    C2: [t1p1, t2p0, t2p1, t2p2]

roundrobinr   clustermembersreturnc           	      \   [        5       nUR                  5        H  nUR                  UR                  5        M      / nU HN  nUR	                  U5      nUc  [
        R                  SU5        M/  UU Vs/ s H  n[        Xh5      PM     sn-  nMP     UR                  5         [        R                  " S 5      n	[        R                  " [        UR                  5       5      5      n
U H}  n[        U
5      nUR                   X+   R                  ;  a)  [        U
5      nUR                   X+   R                  ;  a  M)  X   UR                      R#                  UR$                  5        M     0 nU H5  n['        U R(                  [        X   R+                  5       5      S5      X'   M7     U$ s  snf )Nz"No partition metadata for topic %sc                  6    [         R                  " [        5      $ N)collectionsdefaultdictlist     nC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/coordinator/assignors/roundrobin.py<lambda>4RoundRobinPartitionAssignor.assign.<locals>.<lambda>L   s    K++D1r   r   )setvaluesupdatesubscriptionpartitions_for_topiclogwarningr	   sortr   r   	itertoolscyclesortedkeysnexttopicappend	partitionr   versionitems)clsr   r   
all_topicsmetadataall_topic_partitionsr)   
partitionsr+   
assignmentmember_iter	member_idprotocol_assignments                r   assign"RoundRobinPartitionAssignor.assign5   s    U
(Hh334 ) 68E 55e<J!@%H BL%BLYu0*%     	!!# 7B6M6M17

  oofW\\^&<=-I[)I //);)H)HH -	 //);)H)HH!)//299):M:MN . ! I-MVJ$9$?$?$ABC.* ! #"7%s   5F)topicsc                 B    [        U R                  [        U5      S5      $ )Nr   )r   r,   r   )r.   r9   s     r   r0   $RoundRobinPartitionAssignor.metadatab   s    -ckk4<MMr   r3   Nc                     g r   r   )r.   r3   s     r   on_assignment)RoundRobinPartitionAssignor.on_assignmentf   s    r   r   )__name__
__module____qualname____firstlineno____doc__namer,   classmethodr   r   strr   dictr   r7   r   r0   r=   __static_attributes__r   r   r   r   r      s    @ DG*# *# <<=*# 
c33	4	*# *#X Nhsm N0N N N 'G D  r   r   )r   r$   loggingcollections.abcr   r   aiokafka.clusterr   'aiokafka.coordinator.assignors.abstractr   aiokafka.coordinator.protocolr   r   aiokafka.structsr	   	getLoggerr?   r!   r   r   r   r   <module>rP      s@       - , M ,!W"; Wr   