
    9h';                         S SK r S SKrS SKrS SKrS SKrS SKJr  S SKJr  S SK	J
r  S SKJr  S SKJrJrJr  \R$                  " \5      r " S S5      rg)	    N)Future)Optional)errors)collect_hosts)BrokerMetadataPartitionMetadataTopicPartitionc                       \ rS rSrSrSS/ S.rS rS rS rS	 r	S
 r
S\S\\\      4S jrS rS rS rS rS rSS jrS rS rS rS rS rS rS rS S jrS rSrg)!ClusterMetadata   a  
A class to manage kafka cluster metadata.

This class does not perform any IO. It simply updates internal state
given API responses (MetadataResponse, GroupCoordinatorResponse).

Keyword Arguments:
    retry_backoff_ms (int): Milliseconds to backoff when retrying on
        errors. Default: 100.
    metadata_max_age_ms (int): The period of time in milliseconds after
        which we force a refresh of metadata even if we haven't seen any
        partition leadership changes to proactively discover any new
        brokers or partitions. Default: 300000
    bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
        strings) that the client should contact to bootstrap initial
        cluster metadata. This does not have to be the full node list.
        It just needs to have at least one broker that will respond to a
        Metadata API Request. Default port is 9092. If no servers are
        specified, will default to localhost:9092.
d   i )retry_backoff_msmetadata_max_age_msbootstrap_serversc                 B   0 U l         0 U l        [        R                  " [        5      U l        0 U l        SU l        SU l        SU l	        S U l
        [	        5       U l        [        R                  " 5       U l        SU l        [	        5       U l        [	        5       U l        S U l        [&        R&                  " U R(                  5      U l        U R*                   H  nX!;   d  M
  X   U R*                  U'   M     U R-                  5       U l        0 U l        0 U l        0 U l        g )Nr   TF)_brokers_partitionscollectionsdefaultdictset_broker_partitions_groups_last_refresh_ms_last_successful_refresh_ms_need_update_future
_listeners	threadingLock_lockneed_all_topic_metadataunauthorized_topicsinternal_topics
controllercopyDEFAULT_CONFIGconfig_generate_bootstrap_brokers_bootstrap_brokers_coordinator_brokers_coordinators_coordinator_by_key)selfconfigskeys      UC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/cluster.py__init__ClusterMetadata.__init__,   s    "-"9"9#"> !+,( %^^%
',$#&5 "uii 3 34;;C~#*<C   #'"B"B"D$&!#%     c                     [        U R                  S   5      n0 n[        U5       H  u  nu  pEnSU 3n[        XtUS 5      X''   M     U$ )Nr   z
bootstrap-)r   r'   	enumerater   )r-   bootstrap_hostsbrokersihostport_node_ids           r0   r(   +ClusterMetadata._generate_bootstrap_brokersG   sW    '4G(HI"+O"<AA"1#&G-gT4HG #= r3   c                     XR                   ;   $ N)r)   r-   r<   s     r0   is_bootstrapClusterMetadata.is_bootstrapQ   s    1111r3   c                     [        U R                  R                  5       5      =(       d#    [        U R                  R                  5       5      $ )z@Get all BrokerMetadata

Returns:
    set: {BrokerMetadata, ...}
)r   r   valuesr)   r-   s    r0   r7   ClusterMetadata.brokersT   s5     4=='')*Sc$2I2I2P2P2R.SSr3   c                     U R                   R                  U5      =(       d=    U R                  R                  U5      =(       d    U R                  R                  U5      $ )zGet BrokerMetadata

Arguments:
    broker_id (int): node_id for a broker to check

Returns:
    BrokerMetadata or None if not found
)r   getr)   r*   r-   	broker_ids     r0   broker_metadataClusterMetadata.broker_metadata\   sM     MMi( 8&&**958((,,Y7	
r3   topicreturnc                 n    XR                   ;  a  g[        U R                   U   R                  5       5      $ )zReturn set of all partitions for topic (whether available or not)

Arguments:
    topic (str): topic to check for partitions

Returns:
    set: {partition (int), ...}
N)r   r   keys)r-   rM   s     r0   partitions_for_topic$ClusterMetadata.partitions_for_topick   s2     (((4##E*//122r3   c                     XR                   ;  a  gU R                   U   R                  5        VVs1 s H  u  p#UR                  S:w  d  M  UiM     snn$ s  snnf )zReturn set of partitions with known leaders

Arguments:
    topic (str): topic to check for partitions

Returns:
    set: {partition (int), ...}
    None if topic not found.
N)r   itemsleader)r-   rM   	partitionmetadatas       r0   available_partitions_for_topic.ClusterMetadata.available_partitions_for_topicx   s]     ((( (,'7'7'>'D'D'F
'F#	"$ 'F
 	
 
s   AAc                     UR                   U R                  ;  a  gU R                  UR                      nUR                  U;  a  gX!R                     R                  $ )z:Return node_id of leader, -1 unavailable, None if unknown.N)rM   r   rW   rV   )r-   rW   
partitionss      r0   leader_for_partition$ClusterMetadata.leader_for_partition   sQ    ??$"2"22%%ioo6
j0--.555r3   c                 8    U R                   R                  U5      $ )zReturn TopicPartitions for which the broker is a leader.

Arguments:
    broker_id (int): node id for a broker

Returns:
    set: {TopicPartition, ...}
    None if the broker either has no partitions or does not exist.
)r   rH   rI   s     r0   partitions_for_broker%ClusterMetadata.partitions_for_broker   s     &&**955r3   c                 8    U R                   R                  U5      $ )zReturn node_id of group coordinator.

Arguments:
    group (str): name of consumer group

Returns:
    int: node_id for group coordinator
    None if the group does not exist.
)r   rH   )r-   groups     r0   coordinator_for_group%ClusterMetadata.coordinator_for_group   s     ||&&r3   c                     U R                      SU l        U R                  (       a  U R                  R                  (       a  [	        5       U l        U R                  sSSS5        $ ! , (       d  f       g= f)zFlags metadata for update, return Future()

Actual update must be handled separately. This method will only
change the reported ttl()

Returns:
    Future (value will be the cluster object after update)
TN)r    r   r   is_doner   rE   s    r0   request_updateClusterMetadata.request_update   s@     ZZ $D<<4<<#7#7%x<<	 ZZs   AA%%
A3c                 v    [        U R                  R                  5       5      nU(       a  X R                  -
  $ U$ )aJ  Get set of known topics.

Arguments:
    exclude_internal_topics (bool): Whether records from internal topics
        (such as offsets) should be exposed to the consumer. If set to
        True the only way to receive records from an internal topic is
        subscribing to it. Default True

Returns:
    set: {topic (str), ...}
)r   r   rP   r#   )r-   exclude_internal_topicstopicss      r0   rl   ClusterMetadata.topics   s4     T%%**,-"0000Mr3   c                    SnU R                      U R                  (       a  U R                  nSU l        SSS5        U(       a  UR                  U5        [        R                  " 5       S-  U l        g! , (       d  f       ND= f)z4Update cluster state given a failed MetadataRequest.N  )r    r   set_exceptiontimer   )r-   	exceptionfs      r0   failed_updateClusterMetadata.failed_update   sV    ZZ||LL#  OOI& $		d 2 Zs   %A22
B c                 L   UR                   (       d;  [        R                  S5        U R                  [        R
                  " U5      5        g0 nUR                    H=  nUR                  S:X  a  Uu  pEnSnOUu  pEpgUR                  U[        XEXg5      05        M?     UR                  S:X  a  SnOUR                  UR                  5      n0 n	[        R                  " [        5      n
[        5       n[        5       nUR                   GH  nUR                  S:X  a  Uu  pnSnOUu  pnnU(       a  UR                  U5        [        R                   " U5      nU[        R"                  L aN  0 X'   U HB  tnnnnnn[%        UUUUUUS9X   U'   US:w  d  M$  U
U   R                  ['        UU5      5        MD     M  U R(                  (       a  M  U[        R*                  L a  [        R                  SU5        M  U[        R,                  L a  [        R/                  SU5        GM  U[        R0                  L a*  [        R/                  S	U5        UR                  U5        GMX  U[        R2                  L a  [        R/                  S
U5        GM  [        R/                  SUU5        GM     U R4                     X l        Xl        Xl        Xl        Xl        Xl         SnU RB                  (       a  U RB                  nSU l!        SU l"        SSS5        [F        RF                  " 5       S-  nUU l$        UU l%        W(       a  URM                  U 5        [        RO                  SU 5        U RP                   H  nU" U 5        M     U R(                  (       a  SU l"        gg! , (       d  f       N= f)zUpdate cluster state given a MetadataResponse.

Arguments:
    metadata (MetadataResponse): broker response to a metadata request

Returns: None
z9No broker metadata found in MetadataResponse -- ignoring.Nr   F)rM   rW   rV   replicasisrerrorrT   z;Topic %s is not available during auto-create initializationz&Topic %s not found in cluster metadataz*Topic %s is not authorized for this clientz'%s' is not a valid topic namez(Error fetching metadata for topic %s: %sro   zUpdated cluster metadata to %s))r7   logwarningrt   ErrorsMetadataEmptyBrokerListAPI_VERSIONupdater   rH   controller_idr   r   r   rl   addfor_codeNoErrorr   r	   r!   LeaderNotAvailableErrorUnknownTopicOrPartitionErrorry   TopicAuthorizationFailedErrorInvalidTopicErrorr    r   r$   r   r   r"   r#   r   r   rq   r   r   
set_resultdebugr   )r-   rX   _new_brokersbrokerr<   r9   r:   rack_new_controller_new_partitions_new_broker_partitions_new_unauthorized_topics_new_internal_topics
topic_data
error_coderM   r\   is_internal
error_typep_errorrW   rV   rw   rx   r;   rs   nowlisteners                               r0   update_metadataClusterMetadata.update_metadata   s    KKSTv==hGH&&F##q(&,#t,2)t.*S TU ' 1$"O*..x/E/EFO!,!8!8!=#&5 "u"//J##q(0:-
:#=G:
;
$((/4JV^^+)+&EOAGY#8I#"+%!)%9O*95 |.v6::*5)< FP --v===Q vBBB		BEJvCCC		FN(,,U3v777		:EB		DeZXW *Z ZZ(M-O.&<#'?$#7 A||LLDL %D  iikD  #+.(LL		2D9HTN ( ''
 !&D (1 Zs   2AN
N#c                 :    U R                   R                  U5        g)z<Add a callback function to be called on each metadata updateN)r   r   r-   r   s     r0   add_listenerClusterMetadata.add_listenerC  s    H%r3   c                 :    U R                   R                  U5        g)z+Remove a previously added listener callbackN)r   remover   s     r0   remove_listenerClusterMetadata.remove_listenerG  s    x(r3   c                    [         R                  SX5        [        R                  " UR                  5      nU[        R
                  La&  [         R                  SU5        SU R                  U'   gSUR                   3n[        XBR                  UR                  S5      n[         R                  SX5        XPR                  U'   X@R                  U'   U$ )zUpdate with metadata for a group coordinator

Arguments:
    group (str): name of group from GroupCoordinatorRequest
    response (GroupCoordinatorResponse): broker response

Returns:
    string: coordinator node_id if metadata is updated, None on error
zUpdating coordinator for %s: %sz"GroupCoordinatorResponse error: %srT   Nzcoordinator-zGroup coordinator for %s is %s)rz   r   r|   r   r   r   ry   r   coordinator_idr   r9   r:   infor*   )r-   rc   responser   r<   coordinators         r0   add_group_coordinator%ClusterMetadata.add_group_coordinatorK  s     			3UE__X%8%89
V^^+II:JG"$DLL !!8!8 9:$WmmX]]DQ15F-8!!'*%Ur3   c                    [        S0 U R                  D6n[        R                  " U R                  5      Ul        [        R                  " U R
                  5      Ul        [        R                  " U R                  5      Ul        [        R                  " U R                  5      Ul        [        R                  " U R                  5      Ul        [        R                  " U R                  5      Ul	        U H  nX2R
                  UR                     UR                  '   UR                  c  M7  UR                  S:w  d  MI  UR                  UR                     R                  [        UR                  UR                  5      5        M     U$ )z8Returns a copy of cluster metadata with partitions addedrT    )r   r'   r%   deepcopyr   r   r   r   r#   r"   rM   rW   rV   r   r	   )r-   partitions_to_addnew_metadatarW   s       r0   with_partitionsClusterMetadata.with_partitionsf  s   &55 $dmm <#'==1A1A#B *.--8O8O*P'#}}T\\:'+}}T5I5I'J$+/==9Q9Q+R(*IMV$$Y__5i6I6IJ+	0@0@B0F//	0@0@AEE"9??I4G4GH	 + r3   c                 8    U R                   R                  U5      $ r?   )r+   rH   r@   s     r0   coordinator_metadata$ClusterMetadata.coordinator_metadataz  s    !!%%g..r3   Nc                    XPR                   ;   a(  U R                   R                  U5      nU R                  U	 [        XX45      U R                  U'   XR                   U'   g)zKeep track of all coordinator nodes separately and remove them if
a new one was elected for the same purpose (For example group
coordinator for group X).
N)r,   popr+   r   )r-   r<   r9   r:   r   purposeold_ids          r0   add_coordinatorClusterMetadata.add_coordinator}  sY    
 ...--11':F""6*&4WD&O7#,3  )r3   c                     S[        U R                  5      [        U R                  5      [        U R                  5      4-  $ )Nz4ClusterMetadata(brokers: %d, topics: %d, groups: %d))lenr   r   r   rE   s    r0   __str__ClusterMetadata.__str__  s;    E  !I
 
 	
r3   )r)   r   r   r*   r,   r+   r   r   r   r   r   r    r   r   r'   r$   r#   r!   r"   )Tr?   ) __name__
__module____qualname____firstlineno____doc__r&   r1   r(   rA   r7   rK   strr   r   intrQ   rY   r]   r`   rd   rh   rl   rt   r   r   r   r   r   r   r   r   __static_attributes__r   r3   r0   r   r      s    ,  %N&62T
3# 3(3s82D 3
$6
6
' $	3j&X&)6(/
4
r3   r   )r   r%   loggingr   rq   concurrent.futuresr   typingr   aiokafkar   r|   aiokafka.connr   aiokafka.structsr   r   r	   	getLoggerr   rz   r   r   r3   r0   <module>r      sB         %  % ' N N!~
 ~
r3   