
    9hK                    R   S SK Jr  S SKrS SKrS SKrS SKrS SKJrJr  S SK	J
r
  S SKJr  S SKJr  S SKJr  S SKJr  S S	KJrJr  S S
KJrJr  \R2                  " \5      r " S S\5      r " S S5      r " S S5      r " S S\5      r " S S5      r  " S S\5      r! " S S5      r"g)    )annotationsN)Eventshield)Iterable)Enum)Pattern)ConsumerRebalanceListener)IllegalStateError)OffsetAndMetadataTopicPartition)create_futureget_running_loopc                  $    \ rS rSrSrSrSrSrSrg)SubscriptionType                N)	__name__
__module____qualname____firstlineno__NONEAUTO_TOPICSAUTO_PATTERNUSER_ASSIGNED__static_attributes__r       iC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/consumer/subscription_state.pyr   r      s    DKLMr    r   c                     \ rS rSrSr\R                  rSrSr	Sr
S$S jr\S%S j5       r\S&S j5       r\S'S j5       r\S 5       rS(S	 jr\S
 5       rS)S jrS*S jrS+S jrS,S jrS-S jrS rS rS$S.S jjrS$S/S jjrS0S jrS rS.S jrS1S jrS r S2S jr!S r"S r#S r$S r%S3S jr&S(S jr'S3S  jr(\)RT                  S! 5       r+\S" 5       r,S#r-g)4SubscriptionState   a  Intermediate bridge to coordinate work between Consumer, Coordinator
and Fetcher primitives.

    The class is different from kafka-python's implementation to provide
a more friendly way to interact in asynchronous paradigm. The changes
focus on making the internals less mutable (subscription, topic state etc.)
paired with futures for when those change.
    Before there was a lot of trouble if user say did a subscribe between
yield statements of a rebalance or other critical IO operation.
Nc                    Uc
  [        5       nXl        / U l        / U l        SU l        [
        R                  " 5       U l        g )Nr   )r   _loop_subscription_waiters_assignment_waiters_fetch_counttime	monotonic_last_fetch_ended)selfloops     r!   __init__SubscriptionState.__init__,   s=    <#%D
%'"#%  !%!1r    c                    U R                   $ N)_subscriptionr-   s    r!   subscriptionSubscriptionState.subscription8   s    !!!r    c                    U R                   $ r2   )_subscribed_patternr4   s    r!   subscribed_pattern$SubscriptionState.subscribed_pattern<   s    '''r    c                    U R                   $ r2   )	_listenerr4   s    r!   listenerSubscriptionState.listener@   s    ~~r    c                \    U R                   b  U R                   R                  $ [        5       $ r2   )r3   topicssetr4   s    r!   r@   SubscriptionState.topicsD   s'    )%%,,,ur    c                    U R                   c
  [        5       $ U R                   R                  c
  [        5       $ U R                   R                  R                  $ r2   )r3   rA   
assignmenttpsr4   s    r!   assigned_partitions%SubscriptionState.assigned_partitionsJ   sF    %5L((05L!!,,000r    c                J    U R                   c  gU R                   R                  $ NT)r3   _reassignment_in_progressr4   s    r!   reassignment_in_progress*SubscriptionState.reassignment_in_progressQ   s#    %!!;;;r    c                \    U R                   [        R                  [        R                  4;   $ r2   )_subscription_typer   r   r   r4   s    r!   partitions_auto_assigned*SubscriptionState.partitions_auto_assignedW   s,    &&(())+
 
 	
r    c                    U R                   c  gU R                   R                  c  gXR                   R                  R                  ;   $ NF)r3   rD   rE   r-   tps     r!   is_assignedSubscriptionState.is_assigned]   sA    %((0''226666r    c                f    U R                   [        R                  U4;   a  Xl         g [        S5      e)NzESubscription to topics, partitions and pattern are mutually exclusive)rN   r   r   r
   )r-   subscription_types     r!   _set_subscription_type(SubscriptionState._set_subscription_typed   s5    ""'7'<'<>O&PP&7## r    c                    [         R                  SUR                  5        U R                  b  U R                  R	                  5         Xl        U R                  5         g )Nz!Updating subscribed topics to: %s)loginfor@   r3   _unsubscribe_notify_subscription_waiters)r-   r5   s     r!   _change_subscription&SubscriptionState._change_subscriptionm   sG    4l6I6IJ)++-)))+r    c                    U R                   c   eU R                   R                  c   eU R                   R                  R                  U5      nUc  [        SU 35      eU$ )Nz$No current assignment for partition )r3   rD   state_valuer
   )r-   rT   tp_states      r!   _assigned_state!SubscriptionState._assigned_stateu   sf    !!---!!,,888%%00<<R@#&J2$$OPPr    c                    U R                    H+  nUR                  5       (       a  M  UR                  S 5        M-     U R                   R                  5         g r2   )r'   done
set_resultclearr-   waiters     r!   r_   .SubscriptionState._notify_subscription_waiters}   s?    00F;;==!!$' 1 	""((*r    c                    U R                    H+  nUR                  5       (       a  M  UR                  S 5        M-     U R                   R                  5         g r2   )r(   rh   ri   rj   rk   s     r!   _notify_assignment_waiters,SubscriptionState._notify_assignment_waiters   s?    ..F;;==!!$' / 	  &&(r    c                   [        U[        5      (       d   eUb  [        U[        5      (       d   eU R                  [        R
                  5        U R                  [        XR                  S95        X l	        U R                  5         g)zdSubscribe to a list (or tuple) of topics

Caller: Consumer.
Affects: SubscriptionState.subscription
Nr.   )
isinstancerA   r	   rY   r   r   r`   Subscriptionr&   r<   r_   )r-   r@   r=   s      r!   	subscribeSubscriptionState.subscribe   sm     &#&&&&:h8Q#R#RRR##$4$@$@A!!,vJJ"GH!))+r    c                    [        US5      (       d   S5       eUb  [        U[        5      (       d   eU R                  [        R
                  5        Xl        X l        g)zSubscribe to all topics matching a regex pattern.
Subsequent calls `subscribe_from_pattern()` by Coordinator will provide
the actual subscription topics.

Caller: Consumer.
Affects: SubscriptionState.subscribed_pattern
matchzExpected Pattern typeN)hasattrrs   r	   rY   r   r   r8   r<   )r-   patternr=   s      r!   subscribe_pattern#SubscriptionState.subscribe_pattern   sV     w((A*AA(:h8Q#R#RRR##$4$A$AB#* !r    c                    U R                  [        R                  5        U R                  [	        XR
                  S95        U R                  5         g)zManually assign partitions. After this call automatic assignment
will be impossible and will raise an ``IllegalStateError``.

Caller: Consumer.
Affects: SubscriptionState.subscription
rr   N)rY   r   r   r`   ManualSubscriptionr&   ro   )r-   
partitionss     r!   assign_from_user"SubscriptionState.assign_from_user   s=     	##$4$B$BC!!"4Zjj"QR'')r    c                    U R                   b  U R                   R                  5         SU l         SU l        SU l        [        R
                  U l        g)zUnsubscribe from the last subscription. This will also clear the
subscription type.

Caller: Consumer.
Affects: SubscriptionState.subscription
N)r3   r^   r8   r<   r   r   rN   r4   s    r!   unsubscribeSubscriptionState.unsubscribe   sG     )++-!#' "2"7"7r    c                x    U R                   [        R                  :X  d   eU R                  [	        U5      5        g)zChange subscription on cluster metadata update if a new topic
created or one is removed.

Caller: Coordinator
Affects: SubscriptionState.subscription
N)rN   r   r   r`   rt   )r-   r@   s     r!   subscribe_from_pattern(SubscriptionState.subscribe_from_pattern   s2     &&*:*G*GGGG!!,v"67r    c                    U R                   [        R                  [        R                  4;   d   eU R                  R                  U5        U R                  5         g)zxSet assignment if automatic assignment is used.

Caller: Coordinator
Affects: SubscriptionState.subscription.assignment
N)rN   r   r   r   r3   _assignro   r-   rD   s     r!   assign_from_subscribed(SubscriptionState.assign_from_subscribed   sU     &&))((+
 
 	
 

 	"":.'')r    c                T    U R                   b  U R                   R                  5         gg)zSignal from Coordinator that a group re-join is needed. For example
this will be called if a commit or heartbeat fails with an
InvalidMember error.

Caller: Coordinator
N)r3   _begin_reassignmentr4   s    r!   begin_reassignment$SubscriptionState.begin_reassignment   s'     )224 *r    c                D    U R                  U5      R                  U5        g)zrForce reset of position to the specified offset.

Caller: Consumer, Fetcher
Affects: TopicPartitionState.position
N)re   seek)r-   rT   offsets      r!   r   SubscriptionState.seek   s     	R %%f-r    c                P    [        5       nU R                  R                  U5        U$ )zKWait for subscription change. This will always wait for next
subscription.
)r   r'   appendr-   futs     r!   wait_for_subscription'SubscriptionState.wait_for_subscription   s$     o""))#.
r    c                P    [        5       nU R                  R                  U5        U$ )zwWait for next assignment. Be careful, as this will always wait for
next assignment, even if the current one is active.
)r   r(   r   r   s     r!   wait_for_assignment%SubscriptionState.wait_for_assignment   s$     o  '',
r    c                    Xl         g r2   )_fetch_waiters)r-   waiterss     r!   register_fetch_waiters(SubscriptionState.register_fetch_waiters  s    %r    c                t   U R                    H?  nUR                  5       (       a  M  UR                  [        R                  " U5      5        MA     U R                  R                  5         U R                   H?  nUR                  5       (       a  M  UR                  [        R                  " U5      5        MA     g)z9Critical error occurred, we will abort any pending waiterN)r(   rh   set_exceptioncopyr'   rj   r   )r-   excrl   s      r!   abort_waitersSubscriptionState.abort_waiters  sx    ..F;;==$$TYYs^4 / 	""((*))F;;==$$TYYs^4 *r    c                B    U R                  U5      R                  5         g r2   )re   pauserS   s     r!   r   SubscriptionState.pause  s    R &&(r    c                    [        5       nU R                  5        H6  nU R                  U5      R                  (       d  M%  UR	                  U5        M8     U$ r2   )rA   rF   re   pausedadd)r-   resrT   s      r!   paused_partitions#SubscriptionState.paused_partitions  sE    e**,B##B'... - 
r    c                B    U R                  U5      R                  5         g r2   )re   resumerS   s     r!   r   SubscriptionState.resume  s    R '')r    c              #     #    U =R                   S-  sl         S v   U =R                   S-  sl         U R                   S:X  a  [        R                  " 5       U l        g g 7f)Nr   r   r)   r*   r+   r,   r4   s    r!   fetch_contextSubscriptionState.fetch_context#  sL     QQ!%)^^%5D" "s   AAc                h    U R                   S:X  a"  [        R                  " 5       U R                  -
  $ g)z>How much time (in seconds) spent without consuming any recordsr   r   r4   s    r!   fetcher_idle_time#SubscriptionState.fetcher_idle_time+  s-     !>>#d&<&<<<r    )
r(   r)   r   r,   r<   r&   r8   r3   rN   r'   r2   )returnrt   )r   r   )r   r	   )r   set[TopicPartition]r   bool)rT   r   r   r   )rX   r   )r5   rt   rT   r   r   TopicPartitionState)r@   zset[str])rz   r   )r   Iterable[TopicPartition])rD   r   )rT   r   r   int)rT   r   r   None).r   r   r   r   __doc__r   r   rN   r8   r3   r<   r/   propertyr5   r9   r=   r@   rF   rK   rO   rU   rY   r`   re   r_   ro   ru   r{   r   r   r   r   r   r   r   r   r   r   r   r   r   
contextlibcontextmanagerr   r   r   r   r    r!   r#   r#      s/   	 *..MI
2 " " ( (    
1 < <

7,+),"
*8$8*5.&	5)*
 6 6  r    r#   c                  l    \ rS rSrSrSSS jjr\S 5       r\S 5       r\S 5       r	SS jr
S	 rS
 rSrg)rt   i4  zDescribes current subscription to a list of topics. In case of pattern
subscription a new instance of this class will be created if number of
matched topics change.

States:
    * Subscribed
    * Assigned (assignment was set)
    * Unsubscribed
Nc                    Uc
  [        5       n[        U5      U l        S U l        UR	                  5       U l        SU l        g rI   )r   	frozenset_topics_assignmentr   unsubscribe_futurerJ   )r-   r@   r.   s      r!   r/   Subscription.__init__?  s<    <#%D ("&"4"4"6)-&r    c                :    U R                   R                  5       SL $ rR   )r   rh   r4   s    r!   activeSubscription.activeH  s    &&++-66r    c                    U R                   $ r2   )r   r4   s    r!   r@   Subscription.topicsL      ||r    c                    U R                   $ r2   r   r4   s    r!   rD   Subscription.assignmentP      r    c                    U H'  nUR                   U R                  ;   a  M   SU 35       e   U R                  b  U R                  R                  5         [	        U5      U l        SU l        g )Nz/Received an assignment for unsubscribed topic: F)topicr   r   	_unassign
AssignmentrJ   r-   topic_partitionsrT   s      r!   r   Subscription._assignT  sk    "BDLL(F@EF( #
 '&&(%&67).&r    c                    U R                   R                  S 5        U R                  b  U R                  R                  5         g g r2   )r   ri   r   r   r4   s    r!   r^   Subscription._unsubscribe`  s9    **40'&&( (r    c                    SU l         g rI   )rJ   r4   s    r!   r    Subscription._begin_reassignmente  s
    )-&r    )r   rJ   r   r   r2   )r@   zIterable[str]r   r   )r   r   r   r   r   r/   r   r   r@   rD   r   r^   r   r   r   r    r!   rt   rt   4  sW    . 7 7      
/)
.r    rt   c                  x   ^  \ rS rSrSrS	S
U 4S jjjrSS jr\S 5       r\R                  S 5       rS r
SrU =r$ )r~   ii  zDescribes a user assignmentc                R   > S U 5       n[         TU ]  X2S9  [        U5      U l        g )Nc              3  8   #    U  H  oR                   v   M     g 7fr2   )r   ).0rT   s     r!   	<genexpr>.ManualSubscription.__init__.<locals>.<genexpr>m  s     5_r((_s   rr   )superr/   r   r   )r-   user_assignmentr.   r@   	__class__s       r!   r/   ManualSubscription.__init__l  s*    5_5+%o6r    c                    [        S5      eNzShould not be calledAssertionError)r-   r   s     r!   r   ManualSubscription._assignq      344r    c                    grR   r   r4   s    r!   rJ   ,ManualSubscription._reassignment_in_progresst  s    r    c                    g r2   r   )r-   values     r!   rJ   r   x  s    r    c                    [        S5      er   r   r4   s    r!   r   &ManualSubscription._begin_reassignment|  r   r    r   r2   )r   r   )r   r   )r   r   r   r   r   r/   r   r   rJ   setterr   r   __classcell__)r   s   @r!   r~   r~   i  sM    %7 7
5   %% &5 5r    r~   c                  b    \ rS rSrSrSS jr\S 5       r\S 5       rS r	SS jr
SS jrS	 rS
rg)r   i  zDescribes current partition assignment. New instance will be created
on each group rebalance if automatic assignment is used.

States:
    * Assigned
    * Unassigned
c                   [        U[        [        [        45      (       d   e[	        U5      U l        0 U l        U R
                   H  n[        U 5      U R                  U'   M     [        5       U l	        [        5       U l        g r2   )rs   listrA   tupler   _topic_partitions	_tp_stater   r   unassign_futurer   commit_refresh_neededr   s      r!   r/   Assignment.__init__  sk    *T3,>????!*+;!<((B!4T!:DNN2 )  -%*W"r    c                    U R                   $ r2   )r  r4   s    r!   rE   Assignment.tps  s    %%%r    c                :    U R                   R                  5       SL $ rR   )r  rh   r4   s    r!   r   Assignment.active  s    ##((*e33r    c                :    U R                   R                  S 5        g r2   )r  ri   r4   s    r!   r   Assignment._unassign  s    ''-r    c                8    U R                   R                  U5      $ r2   )r  getrS   s     r!   rc   Assignment.state_value  s    ~~!!"%%r    c                    0 nU R                    H?  nU R                  U5      nUR                  (       d  M'  [        UR                  S5      X'   MA     U$ )z?Returns consumed offsets as {TopicPartition: OffsetAndMetadata} )r  rc   has_valid_positionr   position)r-   all_consumedrT   states       r!   all_consumed_offsetsAssignment.all_consumed_offsets  sN    ((B$$R(E'''#4U^^R#H  ) r    c                    / nU R                    H8  nU R                  U5      nUR                  (       d  M'  UR                  U5        M:     U$ )z<Return all partitions that are requesting commit point fetch)r  rc   _committed_futsr   )r-   
requestingrT   rd   s       r!   requesting_committedAssignment.requesting_committed  sI    
((B''+H'''!!"% ) r    )r  r  r  r  Nr   r   )r   z'dict[TopicPartition, OffsetAndMetadata])r   r   r   r   r   r/   r   rE   r   r   rc   r  r  r   r   r    r!   r   r     sH    
- & & 4 4.&r    r   c                       \ rS rSrSrSrSrSrg)PartitionStatusi  r   r   r   r   N)r   r   r   r   AWAITING_RESET	CONSUMING
UNASSIGNEDr   r   r    r!   r  r    s    NIJr    r  c                      \ rS rSrSrS r\S 5       r\S 5       r\SS j5       r	\SS j5       r
\S 5       r\SS	 j5       rS
 rS rSS jrSS jrSS jrSS jrS rS rS rS rSrg)r   i  a  Shared Partition metadata state.

After creation the workflow is similar to:

    * Partition assigned to this consumer (AWAITING_RESET)
    * Fetcher either uses commit save point or resets position in respect
      to defined reset policy (AWAITING_RESET -> CONSUMING)
    * Fetcher loads a new batch of records, yields results to consumer
      and updates consumed position (CONSUMING)
    * Assignment changed or subscription changed (CONSUMING -> UNASSIGNED)

c                    / U l         S U l        S U l        S U l        S U l        [        5       U l        S U l        [        R                  U l
        Xl        SU l        S U l        g rR   )r  	highwaterlso	timestamp	_positionr   _position_fut_reset_strategyr  r  _statusr   _paused_resume_futr   s     r!   r/   TopicPartitionState.__init__  s\    !*_
  $&55%r    c                    U R                   $ r2   )r+  r4   s    r!   r   TopicPartitionState.paused  r   r    c                    U R                   $ r2   )r,  r4   s    r!   
resume_futTopicPartitionState.resume_fut  r   r    c                    U R                   S L$ r2   r'  r4   s    r!   r  &TopicPartitionState.has_valid_position  s    ~~T))r    c                8    U R                   c   eU R                   $ r2   r4  r4   s    r!   r  TopicPartitionState.position  s    ~~)))~~r    c                    U R                   S L$ r2   r)  r4   s    r!   awaiting_reset"TopicPartitionState.awaiting_reset  s    ##4//r    c                    U R                   $ r2   r9  r4   s    r!   reset_strategy"TopicPartitionState.reset_strategy  s    ###r    c                    Xl         SU l        U R                  R                  5       (       a  [	        5       U l        [
        R                  U l        g)zcCalled by either Coonsumer in `seek_to_*` or by Coordinator after
setting initial committed point.
N)r)  r'  r(  rh   r   r  r  r*  )r-   strategys     r!   await_resetTopicPartitionState.await_reset  s=      (""$$!.D&55r    c                    [        5       nU R                  R                  U5        U R                  R                  R                  5         U$ r2   )r   r  r   r   r  rA   r   s     r!   fetch_committed#TopicPartitionState.fetch_committed  s:    o##C(..224
r    c                    U R                    H+  nUR                  5       (       a  M  UR                  U5        M-     U R                   R                  5         g)zBCalled by Coordinator on successful commit to update commit cache.N)r  rh   ri   rj   )r-   offset_metar   s      r!   update_committed$TopicPartitionState.update_committed
  s=    ''C88::{+ ( 	""$r    c                P    U R                   [        R                  :X  d   eXl        g)z3Called by Fetcher when yielding results to ConsumerN)r*  r  r   r'  r-   r  s     r!   consumed_toTopicPartitionState.consumed_to  s    ||88888!r    c                    U R                   [        R                  :X  d   eXl        SU l        [        R
                  U l         U R                  R                  5       (       d  U R                  R                  S5        gg)zMCalled by Fetcher after performing a reset to force position to
a new point.
N)	r*  r  r  r'  r)  r   r(  rh   ri   rK  s     r!   reset_toTopicPartitionState.reset_to  sd     ||=====!#&00!!&&(())$/ )r    c                    Xl         SU l        [        R                  U l        U R
                  R                  5       (       d  U R
                  R                  S5        gg)z9Called by Consumer to force position to a specific offsetN)r'  r)  r  r   r*  r(  rh   ri   rK  s     r!   r   TopicPartitionState.seek#  sJ    !#&00!!&&(())$/ )r    c                ,    [        U R                  5      $ r2   )r   r(  r4   s    r!   wait_for_position%TopicPartitionState.wait_for_position+  s    d(())r    c                r    U R                   (       d&  SU l         U R                  b   e[        5       U l        g g rI   )r+  r,  r   r4   s    r!   r   TopicPartitionState.pause/  s1    ||DL##+++,D r    c                z    U R                   (       a*  SU l         U R                  R                  S 5        S U l        g g rR   )r+  r,  ri   r4   s    r!   r   TopicPartitionState.resume5  s1    << DL''-#D r    c                <    SU R                    SU R                   S3$ )NzTopicPartitionState<Status=z
 position=>)r*  r'  r4   s    r!   __repr__TopicPartitionState.__repr__;  s!    ,T\\N*T^^DTTUVVr    )r   r  r+  r'  r(  r)  r,  r*  r$  r%  r&  Nr   )r   r   )rG  r   )r  r   )r   r   r   r   r   r/   r   r   r1  r  r  r:  r=  rA  rD  rH  rL  rO  r   rT  r   r   r\  r   r   r    r!   r   r     s     *       * *   0 0 $ $6%"
	00*/$Wr    r   )#
__future__r   r   r   loggingr*   asyncior   r   collections.abcr   enumr   rer   aiokafka.abcr	   aiokafka.errorsr
   aiokafka.structsr   r   aiokafka.utilr   r   	getLoggerr   r\   r   r#   rt   r~   r   r  r   r   r    r!   <module>ri     s    "     ! $   2 - > 9!t V Vr2. 2.j5 5.3 3ld @W @Wr    