
    9h                     T   S SK r S SKrS SKrS SKrS SKrS SKrS SKJr  S SKrS SK	J
r  S SK	JrJrJr  S SKJr  S SKJr  S SKJrJr  S SKJr  S SKJrJrJr  S S	KJrJr  \R>                  " \ 5      r!S
r"S r#Sr$ " S S5      r% " S S5      r& " S S5      r' " S S5      r( " S S5      r)g)    N)chain)ConsumerStoppedErrorKafkaTimeoutErrorRecordTooLargeError)FetchRequest)OffsetRequest)ABORT_MARKERControlRecord)MemoryRecords)ConsumerRecordOffsetAndTimestampTopicPartition)create_futurecreate_task   c                   @    \ rS rSrSrSrSr\S 5       r\S 5       r	Sr
g)	OffsetResetStrategy   r   r   c                     UR                  5       nUS:X  a  U R                  $ US:X  a  U R                  $ US:X  a  U R                  $ [        R                  S5        U R                  $ )Nlatestearliestnonez5Unrecognized ``auto_offset_reset`` config, using NONE)lowerLATESTEARLIESTNONElogwarning)clsnames     ^C:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/consumer/fetcher.pyfrom_strOffsetResetStrategy.from_str"   sW    zz|8:::<<6>88OKKOP88O    c                 n    XR                   :X  a  gXR                  :X  a  gXR                  :X  a  gSU S3$ )Nr   r   r   z
timestamp())r   r   r   )r!   values     r#   to_strOffsetResetStrategy.to_str/   s8    JJLL HHwa((r&    N)__name__
__module____qualname____firstlineno__r   r   r   classmethodr$   r*   __static_attributes__r,   r&   r#   r   r      s7    FHD
 
 ) )r&   r   c                   H    \ rS rSrS rS rS rS rS rSS jr	S	 r
S
 rSrg)FetchResult;   c                h    Xl         X0l        [        R                  " 5       U l        X@l        X l        g N)_topic_partition_partition_recordstime	monotonic_created_backoff_assignment)selftp
assignmentpartition_recordsbackoffs        r#   __init__FetchResult.__init__<   s'     ""3(%r&   c                     [         R                  " 5       U R                  -
  nXR                  :  a  U R                  U-
  $ gNr   r:   r;   r<   r=   r?   lifetimes     r#   calculate_backoffFetchResult.calculate_backoffE   3    >>#dmm3mm#==8++r&   c                 B   U R                   nSnUR                  (       aY  U R                  nUR                  U5      nUR                  (       a  SnO*UR
                  nXPR                  R                  :w  a  SnOSnU(       d  [        R                  SU5        S U l        gg)NTFz^Not returning fetched records for partition %s since it is no fetchable (unassigned or paused))
r>   activer8   state_valuepausedpositionr9   next_fetch_offsetr   debug)r?   r@   rA   return_resulttp_staterR   s         r#   check_assignmentFetchResult.check_assignmentK   s    %%
 &&B!--b1H %#,,66HHH$)M!M IIC
 '+D#r&   c                     U R                   R                  U R                  5      nUR                  U R                  R
                  5        g r7   )r>   rP   r8   consumed_tor9   rS   )r?   states     r#   _update_positionFetchResult._update_positionl   s8      ,,T-B-BC$11CCDr&   c                     U R                   nU R                  U5      (       a  U R                  5       (       d  g [        U R                  S 5      nU R                  5         Uc  S U l        U$ r7   )r8   rW   has_morenextr9   r\   )r?   r@   msgs      r#   getoneFetchResult.getonep   s[    ""$$R((4**D1;&*D#
r&   Nc                 @   U R                   nU R                  U5      (       a  U R                  5       (       d  / $ / nU R                   H;  nUR	                  U5        Uc  M  [        U5      U:  d  M*  U R                  5           U$    U R                  5         S U l        U$ r7   )r8   rW   r_   r9   appendlenr\   )r?   max_recordsr@   ret_listra   s        r#   getallFetchResult.getall|   s    ""$$R((I**COOC &3x=K+G%%'
  + !!#&*D#r&   c                     U R                   S L$ r7   )r9   r?   s    r#   r_   FetchResult.has_more   s    &&d22r&   c                 8    SU R                   R                  < S3$ )Nz<FetchResult position=>)r9   rS   rl   s    r#   __repr__FetchResult.__repr__   s    '(?(?(Q(Q'TTUVVr&   )r>   r=   r<   r9   r8   r7   )r-   r.   r/   r0   rD   rK   rW   r\   rb   ri   r_   rp   r2   r,   r&   r#   r4   r4   ;   s-    &BE
"3Wr&   r4   c                   ,    \ rS rSrS rS rS rS rSrg)
FetchError   c                P    Xl         [        R                  " 5       U l        X l        g r7   )_errorr:   r;   r<   r=   )r?   errorrC   s      r#   rD   FetchError.__init__   s    (r&   c                     [         R                  " 5       U R                  -
  nXR                  :  a  U R                  U-
  $ grG   rH   rI   s     r#   rK   FetchError.calculate_backoff   rM   r&   c                     U R                   er7   rv   rl   s    r#   check_raiseFetchError.check_raise   s    kkr&   c                 $    SU R                   < S3$ )Nz<FetchError error=ro   r|   rl   s    r#   rp   FetchError.__repr__   s    #DKK?!44r&   )r=   r<   rv   N)	r-   r.   r/   r0   rD   rK   r}   rp   r2   r,   r&   r#   rs   rs      s     
5r&   rs   c                   >    \ rS rSrS rS rS rS rS rS r	S r
S	rg
)PartitionRecords   c	                     Xl         X l        [        U=(       d    / S S9U l        [	        5       U l        XPl        X`l        Xpl        Xl	        X@l
        U R                  5       U l        g )Nc                     U S   $ )Nr   r,   )xs    r#   <lambda>+PartitionRecords.__init__.<locals>.<lambda>   s    adr&   )key)_tp_recordssorted_aborted_transactionsset_aborted_producers_key_deserializer_value_deserializer_check_crcs_isolation_levelrS   _unpack_records_records_iterator)	r?   r@   recordsaborted_transactionsfetch_offsetkey_deserializervalue_deserializer
check_crcsisolation_levels	            r#   rD   PartitionRecords.__init__   sb     %+ &BN&
" #&%!1#5 % /
 ".!%!5!5!7r&   c                     U $ r7   r,   rl   s    r#   __iter__PartitionRecords.__iter__   s    r&   c                 Z     [        U R                  5      $ ! [         a	    S U l        e f = fr7   )r`   r   StopIterationrl   s    r#   __next__PartitionRecords.__next__   s2    	..// 	%)D"	s    *c              #   B  #    U R                   nU R                  nUR                  5       (       Ga  UR                  5       nU R                  (       a.  UR                  5       (       d  [        R                  " SU 35      eU R                  [        :X  a  UR                  b  U R                  UR                  5        UR                  (       a;  U R                  U5      (       a%  U R                  R!                  UR                  5        UR"                  (       ah  UR                  U R                  ;   aN  [$        R'                  SUUR                  UR                  UR(                  S-
  5        UR(                  U l        GMf  UR                  (       a  UR(                  U l        GM  U HH  nUR,                  U R*                  :  a  M  U R/                  X5      nUR,                  S-   U l        Uv   MJ     UR(                  U l        UR                  5       (       a  GM  g g 7f)NzInvalid CRC - zXSkipping aborted record batch from partition %s with producer_id %s and offsets %s to %sr   )r   r   has_next
next_batchr   validate_crcErrorsCorruptRecordExceptionr   READ_COMMITTEDproducer_id_consume_aborted_up_tobase_offsetis_control_batch_contains_abort_markerr   discardis_transactionalr   rT   next_offsetrS   offset_consumer_record)r?   r@   r   r   recordconsumer_records         r#   r    PartitionRecords._unpack_records   s     XX--   ++-J
(?(?(A(A 33nRD4IJJ %%7**6++J,B,BC //33J??
 ++33J4J4JK //"..$2I2IIII?"..".."..2 .8-C-CD* **)3)?)?&$ ==4#9#99"&"7"7"C)/):&%% %" &0%;%;D"w   s   HHHc                     U R                   nU(       aD  US   u  p4XA::  a-  U R                  R                  U5        UR                  S5        Og U(       a  MC  g g rG   )r   r   addpop)r?   batch_offsetr   r   first_offsets        r#   r   'PartitionRecords._consume_aborted_up_to  sS      $99"(<Q(?%K+''++K8$((+ #"r&   c                      [        U5      n[        R
                  " UR                  5      [        :H  $ ! [         a    [        R                  " S5      S ef = f)Nz)Control batch did not contain any records)r`   r   r   
KafkaErrorr
   parser   r	   )r?   r   control_records      r#   r   'PartitionRecords._contains_abort_marker  sY    	!*-N
 "">#5#56,FF	  	##;	s	   4 "Ac                 >   UR                   b  [        UR                   5      OSnUR                  b  [        UR                  5      OSnU R                  (       a  U R                  UR                   5      nOUR                   nU R                  (       a  U R	                  UR                  5      nOUR                  n[        UR                  UR                  UR                  UR                  UR                  UUUR                  UU[        UR                  5      5      $ )Nr   )r   rf   r)   r   r   r   topic	partitionr   	timestamptimestamp_typechecksumtupleheaders)r?   r@   r   key_size
value_sizer   r)   s          r#   r   !PartitionRecords._consumer_record*  s    &,jj&<3vzz?"*0,,*BS&
!!((4C**C##,,V\\:ELLEHHLLMM!!OO&..!
 	
r&   )
r   r   r   r   r   r   r   r   r   rS   N)r-   r.   r/   r0   rD   r   r   r   r   r   r   r2   r,   r&   r#   r   r      s(    8:@<D
	G
r&   r   c                       \ rS rSrSrSSSSSSSS	S
SSSS.S jrS rS rS r\	S 5       r
S rS rS rS rS rS$S jrS rS rS rS%S jrS rS rS  rS! rS" rS#rg)&FetcheriF  a  Initialize a Kafka Message Fetcher.

Parameters:
    client (AIOKafkaClient): kafka client
    subscription (SubscriptionState): instance of SubscriptionState
        located in aiokafka.consumer.subscription_state
    key_deserializer (callable): Any callable that takes a
        raw message key and returns a deserialized key.
    value_deserializer (callable, optional): Any callable that takes a
        raw message value and returns a deserialized value.
    fetch_min_bytes (int): Minimum amount of data the server should
        return for a fetch request, otherwise wait up to
        fetch_max_wait_ms for more data to accumulate. Default: 1.
    fetch_max_bytes (int): The maximum amount of data the server should
        return for a fetch request. This is not an absolute maximum, if
        the first message in the first non-empty partition of the fetch
        is larger than this value, the message will still be returned
        to ensure that the consumer can make progress. NOTE: consumer
        performs fetches to multiple brokers in parallel so memory
        usage will depend on the number of brokers containing
        partitions for the topic.
        Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
    fetch_max_wait_ms (int): The maximum amount of time in milliseconds
        the server will block before answering the fetch request if
        there isn't sufficient data to immediately satisfy the
        requirement given by fetch_min_bytes. Default: 500.
    max_partition_fetch_bytes (int): The maximum amount of data
        per-partition the server will return. The maximum total memory
        used for a request = #partitions * max_partition_fetch_bytes.
        This size must be at least as large as the maximum message size
        the server allows or else it is possible for the producer to
        send messages larger than the consumer can fetch. If that
        happens, the consumer can get stuck trying to fetch a large
        message on a certain partition. Default: 1048576.
    check_crcs (bool): Automatically check the CRC32 of the records
        consumed. This ensures no on-the-wire or on-disk corruption to
        the messages occurred. This check adds some overhead, so it may
        be disabled in cases seeking extreme performance. Default: True
    fetcher_timeout (float): Maximum polling interval in the background
        fetching routine. Default: 0.2
    prefetch_backoff (float): number of seconds to wait until
        consumption of partition is paused. Paused partitions will not
        request new data from Kafka server (will not be included in
        next poll request).
    auto_offset_reset (str): A policy for resetting offsets on
        OffsetOutOfRange errors: 'earliest' will move to the oldest
        available message, 'latest' will move to the most recent. Any
        ofther value will raise the exception. Default: 'latest'.
    isolation_level (str): Controls how to read messages written
        transactionally. See consumer description.
Nr   i   i  i   Tg?g?d   r   read_uncommitted)r   r   fetch_min_bytesfetch_max_bytesfetch_max_wait_msmax_partition_fetch_bytesr   fetcher_timeoutprefetch_backoffretry_backoff_msauto_offset_resetr   c                   Xl         UR                  U l        X0l        X@l        XPl        X`l        Xpl        Xl        Xl        Xl	        Xl
        US-  U l        X l        [        R                  U5      U l        US:X  a  [         U l        O US:X  a  [$        U l        O['        SU 35      e[(        R*                  " 5       U l        [/        5       U l        [/        5       U l        S U l        [/        5       U l        U R                  R9                  U R6                  5        UR:                  S:  a  SnO(UR:                  S:  a  SnOUR:                  S	:  a  S
nOSn[<        U   U l        [A        U RC                  5       5      U l"        SU l#        g )N  r   read_committedzIncorrect isolation level )r         r   
   r      )r   r      r   F)$_client_loopr   r   _fetch_min_bytes_fetch_max_bytes_fetch_max_wait_ms_max_partition_fetch_bytesr   _fetcher_timeout_prefetch_backoff_retry_backoff_subscriptionsr   r$   _default_reset_strategyREAD_UNCOMMITTEDr   r   
ValueErrorcollectionsOrderedDictr   r   
_in_flight_pending_tasks_wait_consume_future_fetch_waitersregister_fetch_waitersapi_versionr   _fetch_request_classr   _fetch_requests_routine_fetch_task_closed)r?   clientsubscriptionsr   r   r   r   r   r   r   r   r   r   r   r   req_versions                   r#   rD   Fetcher.__init__{  s`   $ \\
!1#5  / /"3*C'% /!1.5+':'C'CDU'V$00$4D! 00$2D!9/9JKLL#//1%!e$(!!e 	2243F3FG(K:-K7*KK$0$=!&t'C'C'EFr&   c                   #    SU l         U R                  R                  5         [        R                  " [
        R                  5         U R                  I S h  vN   S S S 5        U R                   H  nU R                  U5        M     U R                   H  nUR                  5         UI S h  vN   M     g  N^! , (       d  f       Nb= f N7f)NT)
r   r   cancel
contextlibsuppressasyncioCancelledErrorr   _notifyr   )r?   waiterr   s      r#   closeFetcher.close  s     !  !7!78"""" 9 ))FLL  * $$AHHJGG % # 98 s=   ACB9B7B9AC-C
.	C7B99
CCc                 Z    Ub(  UR                  5       (       d  UR                  S 5        g g g r7   )done
set_result)r?   futures     r#   r  Fetcher._notify  s(    fkkmmd# '4r&   c                     U R                   R                  5       nU R                  R                  U5        UR	                  U R                  4S j5        U$ )Nc                 $    UR                  U 5      $ r7   )remove)fwaiterss     r#   r   .Fetcher._create_fetch_waiter.<locals>.<lambda>  s    W^^TUEVr&   )r   r   r   r   add_done_callback)r?   futs     r#   _create_fetch_waiterFetcher._create_fetch_waiter  sF     jj&&($0C0CVW
r&   c                     U R                   $ r7   )r   rl   s    r#   error_futureFetcher.error_future  s    r&   c           	        #     SnU 4S jn Ub  UR                   (       Gd  U R                   H_  nUR                  5       (       d  UR                  5         [        R
                  " [        R                  5         UI Sh  vN   SSS5        Ma     U R                  R                  5         U R                  R                  5         U R                  R                  nUb  UR                  c%   U R                  R                  5       nUI Sh  vN   U R                  R                  R                  nUb  UR                   (       d   e[!        5       U l        U R%                  U5      u  nnnn	n
U H  u  pU" U R'                  XU5      US9  M     UR)                  5        H  u  pU" U R+                  XU5      US9  M     U R"                  UR,                  /nU	(       a+  U R.                  R1                  5       nUR3                  U5        [        R4                  " [7        [9        U R                  X5      5      U[        R:                  S9I Sh  vN u  nnU R                  R=                  U5      nU(       aR  [?        S U 5       5      nU(       a$  U R@                   H  nU RC                  U5        M     U =R                  U-  sl        GM   GNK! , (       d  f       GM  = f GN! [        R                   a     GM  f = f N! [        R                   a     g[D         a1  n[F        RI                  S5        [        R                  " S5      UeSnAff = f7f)aL  Implements a background task to populate internal fetch queue
``self._records`` with prefetched messages. This helps isolate the
``getall/getone`` calls from actual calls to broker. This way we don't
need to think of what happens if user calls get in 2 tasks, etc.

    The loop is quite complicated due to a large set of events that
can allow new fetches to be send. Those include previous fetches,
offset resets, metadata updates to discover new leaders for partitions,
data consumed for partition.

    Previously the offset reset was performed separately, but it did
not perform too reliably. In ``kafka-python`` and Java client the reset
is perform in ``poll()`` before each fetch, which works good for sync
systems. But with ``aiokafka`` the user can actually break such
behaviour quite easily by performing actions from different tasks.
Nc                    ^ [        U 5      nUR                  R                  U5        UR                  R                  T5        U4U4S jjnUR	                  U5        g )Nc                 <   > UR                   R                  T5        g r7   )r   r   )r  r?   node_ids     r#   on_doneLFetcher._fetch_requests_routine.<locals>.start_pending_task.<locals>.on_done  s    OO++G4r&   )r   r   r   r   r  )coror   r?   taskr!  s    `   r#   start_pending_task;Fetcher._fetch_requests_routine.<locals>.start_pending_task  sK    "4(##''-##G,&* 5 &&w/r&   )r   )timeoutreturn_whenc              3   @   #    U  H  oR                  5       v   M     g 7fr7   )result).0r  s     r#   	<genexpr>2Fetcher._fetch_requests_routine.<locals>.<genexpr>;  s     &L|zz|||s   z#Unexpected error in fetcher routinez&Unexpected error during data retrieval)%rO   r   r  r  r  r  r  r  clearr   r   subscriptionrA   wait_for_assignmentr   r   r   r   _get_actions_per_node_proc_fetch_requestitems_update_fetch_positionsunassign_futurer   force_metadata_updatere   waitr   r   FIRST_COMPLETEDintersectionanyr   r  	Exceptionr   	exception)r?   rA   r%  r$  r/  r  fetch_requestsreset_requestsr'  invalid_metadataresume_futuresr   requesttps
other_futsr  done_set_done_pendinghas_new_dataexcs                        r#   r   Fetcher._fetch_requests_routine  s    "Y	WJ7; 0  %Z->->-> $ 3 3  $yy{{ KKM'001G1GH"&JJ IH !4 ''--/MM'')#'#6#6#C#CL#+|/F/F/N%%)%8%8%L%L%NF"(LL
 "&!4!4!A!A!L!LJ!-*2C2CCC -:O) ..z:""$" )7$G&00gN ' )7 %3$8$8$:LG&44Z#N ' %; #779S9ST
#,,<<>C%%c*$+LLd11:NO# ' 7 7% !  $22??I#&&L|&L#LL#&*&9&9F !LL0 ': ''<7'O  ' IH )%00 % %%J  %% 	 	WMM?@##$LMSVV	Ws   MA6L :K KKA&L ,K$ K!K$ D<L L A>L K
K		L !K$ $K=8L <K==L MM	M!,MMMc                    [         R                  " [        5      n[         R                  " [        5      n[         R                  " [        5      n/ nSnUR                   GHG  nUR	                  U5      nU R
                  R                  R                  U5      n	Sn
XpR                  ;   a=  U R                  U   nUR                  5       n
U
(       a  XI   R                  U
5        M  M  XR                  ;   a  M  U	b  U	S:X  a  [        R                  SU5        SnM  UR                  (       d  X9   R                  U5        M  UR                  (       a  UR                  UR                   5        GM  UR"                  nX)   R                  X|45        [        R                  SX|5        GMJ     / nUR%                  5        GH  u  pX;   a  M  X;   a  M  [&        R(                  " U5        [         R                  " [        5      nU H8  u  p|XR*                     R                  UR,                  XR.                  45        M:     U R0                  nUR2                  S:  aM  U" SU R4                  U R6                  U R8                  U R:                  [        UR%                  5       5      5      nOUR2                  S:X  aB  U" SU R4                  U R6                  U R8                  [        UR%                  5       5      5      nO6U" SU R4                  U R6                  [        UR%                  5       5      5      nUR                  U	U45        GM     U(       a(  [=        [?        [@        URC                  5       5      5      n
OU RD                  n
UUU
UU4$ )zkFor each assigned partition determine the action needed to be
performed and group those by leader node id.
Fr   r   z9No leader found for partition %s. Waiting metadata updateTz2Adding fetch request for partition %s at offset %dr   )#r   defaultdictlistrB  rP   r   clusterleader_for_partitionr   rK   re   r   r   rT   has_valid_positionrQ   
resume_futrR   r3  randomshuffler   r   r   r   API_VERSIONr   r   r   r   minmapmaxvaluesr   )r?   rA   	fetchableawaiting_resetbackoff_by_nodesr@  r?  r@   rV   r   rC   r   rR   r=  partition_data	by_topicsklassreqs                     r#   r1  Fetcher._get_actions_per_nodeH  s     ++D1	$006&2248 ..B!--b1Hll**??CGG]]" r* 224$-44W= OO+GrM		OQS $( 00'..r2%%h&9&9:#,,"))2.9		H"9 !@ '0'8#G*(  NN>* $//5I .((#**\\8-L-LM !/ --E  1$++))))))*+ ""a'++))))*+ ++))*+	 !!7C.1W (9Z  #c#3#:#:#<=>G++G
 	
r&   c                   #    Sn U R                   R                  X#5      I S h  vN nUR                  (       d  [        R                  S5        g0 nUR                   H  u  pU	 H  u  pnX[        X5      '   M     M      [        S[        R                  " 5       -  5      nUR                   GH+  u  pU	 GH  tpnn[        X5      n[        R                   " U5      nUU   nUR#                  U5      nUR$                  (       a  UR&                  U:w  a  [        R                  SUU5        Mx  U[        R(                  L Ga4  UR*                  S:  a  US   nUS   nOS nS nUUl        UUl        UUl        [3        US	   5      nUR5                  5       (       ay  [        R                  S
UU5        [7        UUUUU R8                  U R:                  U R<                  U R>                  5      n[A        UUUU RB                  S9U RD                  U'   SnGM\  URG                  5       S:  aM  [I        SUUU RJ                  5      nU RM                  UU5        URO                  UR&                  S-   5        SnGM  GM  U[        RP                  [        RR                  4;   a  U R                   RU                  5         GM  U[        RV                  L a  U RX                  [Z        R\                  :w  a  UR_                  U RX                  5        O,[        RV                  " UU05      nU RM                  UU5        Sn[        Ra                  SUU5        GM  U[        Rb                  L aW  [        Re                  SURf                  5        [        Rb                  " URf                  5      nU RM                  UU5        SnGM  [        Re                  SURh                  5        GM!     GM.     U$  GN! [        R                   aI  n[        R                  SX&5        [        R                  " U R                  5      I S h  vN     S nAgS nAf[        R                   a     gf = f7f)NFz!Failed fetch messages from %s: %szCDiscarding fetch response since the assignment changed during fetchr   zbDiscarding fetch response for partition %s since its offset %s does not match the current positionr   r   r   zMAdding fetched record for partition %s with offset %d to buffered record list)rB   rA   rC   Tr   zThere are some messages at [Partition=Offset]: %s=%s whose size is larger than the fetch size %s and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.r   zBFetch offset %s is out of range for partition %s, resetting offsetz%Not authorized to read from topic %s.z(Unexpected error while fetching data: %s)5r   sendr   r   r   rw   r  sleepr   r  rO   rT   topicsr   intr:   for_coderP   rO  rR   NoErrorrS  	highwaterlsor   r   r   r   r   r   r   r   r4   r   r   size_in_bytesr   r   
_set_errorrZ   NotLeaderForPartitionErrorUnknownTopicOrPartitionErrorr6  OffsetOutOfRangeErrorr   r   r   await_resetinfoTopicAuthorizationFailedErrorr    r   r-   )r?   rA   r   rA  needs_wakeupresponseerrfetch_offsetsr   
partitionsr   r   rE  now_ms
error_coderh  	part_datar@   
error_typer   rV   r   ri  r   rB   s                            r#   r2  Fetcher._proc_fetch_request  s    		!\\..w@@H   IIU !(E(2$	1BHnU>? )3 "0 TDIIK'(!)E@J<	y9#E5#__Z8
,R0%11"522h6G6G<6WII# $ /**a//8},'m/3,")2H&#&HL)/H& ,IbM:G''))		A(	 -=#0( 22 44 ,, 11	-) -8.?'1$($:$:	-b) (, ..014 2B
 ( ;;	 C0 ,,X->->-BC'+! 5$  5577$  LL6686#?#??337J7O7OO ,,T-I-IJ$::B;MNC0'+HH,$	  6#G#GGKK GR >>rxxHCOOB,#'LKKBJDWDWG AK "1N { A   	II97H-- 3 3444%% 	 	sV   QO" OO" N6QO" "Q69P:/P20P:5Q:QQQQc                     XR                   ;  d   U R                   U   5       e[        X R                  S9U R                   U'   g )N)rw   rC   )r   rs   r   )r?   r@   rw   s      r#   rk  Fetcher._set_error2  s9    &9b(99&&U<R<RSbr&   c                 V  #    [         R                  SU5        SnU GH=  nUR                  U5      nUR                  (       d  UR                  (       a  M9   UR                  5       I Sh  vN nUc   eUR                  (       d  UR                  (       a  M{  UR                  [        :X  a|  U R                  [        R                  :w  a  UR                  U R                  5        O)[        R                  " U5      nU R!                  XX5        Sn[         R                  SU5        GM  [         R                  SUU5        UR#                  UR                  5        GM@     [$        R&                  " [(        5      n	/ n
U H  nUR                  U5      nUR                  (       d  M'  U
R+                  U5        UR,                  nUc   e[         R                  SU[        R/                  U5      5        XR0                     R+                  UR2                  U45        M     U	(       d  U$   U R5                  X)5      I Sh  vN nU
 H?  nX   S	   nUR                  U5      nUR                  (       d  M.  UR#                  U5        MA     U$  GN! [        R                   a    Us s  $ f = f Nl! [        R6                   aK  n[         R9                  SX(5        [        R:                  " U R<                  5      I Sh  vN    Us SnA$ SnAff = f! [        R                   a    Us $ f = f7f)
zThis task will be called if there is no valid position for
partition. It may be right after assignment, on seek_to_* calls of
Consumer or if current position went out of range.
z*Updating fetch positions for partitions %sFNTz No committed offset found for %sz<Resetting offset for partition %s to the committed offset %sz4Resetting offset for partition %s using %s strategy.z Failed fetch offsets from %s: %sr   )r   rT   rP   rO  rY  fetch_committedr  r  r   UNKNOWN_OFFSETr   r   r   ro  r   NoOffsetForPartitionErrorrk  reset_tor   rK  rL  re   reset_strategyr*   r   r   _proc_offset_requestr   rw   rc  r   )r?   rA   r   rB  rr  r@   rV   	committedrt  
topic_dataneeds_resetstrategyoffsetsr   s                 r#   r4  Fetcher._update_fetch_positions6  s    
 			>D B!--b1H**h.E.E$"*":":"<<	 ((( **h.E.E>1//3F3K3KK(()E)EF ::2>COOB,#'L		<bA		R
 !!)"2"23? B !,,T2
B!--b1H**r"..H'''IIF#**84
 xx ''x(@A  	 $ $ 9 9' NN B[^F!--b1H&&&!!&)   =)) $##$` O$$ $		<gKmmD$7$7888##$ %% 	 	 s   AL)J*J+J/F7L)(J+ <J)=J+ ,L)1L)JJ&!L)%J&&L))J+ +L
?9L8K;9L?L
 L L)L

L L&#L)%L&&L)c                 v  #    U(       d  0 $ Uc  SOUS-  n [         R                  " U5       ISh  vN     U R                  U5      I Sh  vN nUsSSS5      ISh  vN   $  N2 N N	! [        R                   ap  nUR
                  (       d  e UR                  (       a  U R                  R                  5         [        R                  " U R                  5      I Sh  vN     SnAOSnAff = fM  ! , ISh  vN  (       d  f       g= f! [        R                   a  n[        SU S35      UeSnAff = f7f)ai  Fetch offset for each partition passed in ``timestamps`` map.

Blocks until offsets are obtained, a non-retriable exception is raised
or ``timeout_ms`` passed.

Arguments:
    timestamps: {TopicPartition: int} dict with timestamps to fetch
        offsets by. -1 for the latest available, -2 for the earliest
        available. Otherwise timestamp is treated as epoch
        milliseconds.

Returns:
    {TopicPartition: (int, int)}: Mapping of partition to
        retrieved offset and timestamp. If offset does not exist for
        the provided timestamp, that partition will be missing from
        this mapping.
Nr   z"Failed to get offsets by times in z ms)async_timeoutr'  _proc_offset_requestsr   r   	retriabler?  r   r6  r  rc  r   TimeoutErrorr   )r?   
timestamps
timeout_msr'  r  rw   rH  s          r#   _retrieve_offsetsFetcher._retrieve_offsets  s    $ I$,$*t2C	$,,W55	'(,(B(B:(N"N  ' 655 #O 6 ",, A$! 11 LL>>@%mmD,?,?@@@A  655 ## 	#4ZLD	s   D9D A$D C3A*A&A*C3D A(D #D9$D &A*(D *C.>A C)C!C)$C3)C..C33D
9C<:D
D 	D9
D D6!D11D66D9c                 N  #    U R                   R                  5       I Sh  vN   [        R                  " S 5      nUR	                  5        H  u  p4U R                   R
                  R                  U5      nUcQ  U R                   R                  UR                  5        [        R                  SU5        [        R                  " U5      eUS:X  a,  [        R                  SU5        [        R                  " U5      eX%   UR                     R                  UR                  U45        M     / nUR	                  5        H%  u  pWUR                  U R!                  XW5      5        M'     0 n["        R$                  " U6 I Sh  vN n	U	 H  n
UR'                  U
5        M     U$  GN N#7f)a&  Fetch offsets for each partition in timestamps dict. This may send
request to multiple nodes, based on who is Leader for partition.

Arguments:
    timestamps (dict): {TopicPartition: int} mapping of fetching
        timestamps.

Returns:
    Future: resolves to a mapping of retrieved offsets
Nc                  6    [         R                  " [        5      $ r7   )r   rK  rL  r,   r&   r#   r   /Fetcher._proc_offset_requests.<locals>.<lambda>  s    K++D1r&   zFPartition %s is unknown for fetching offset, wait for metadata refreshr   zRLeader for partition %s unavailable for fetching offset, wait for metadata refresh)r   _maybe_wait_metadatar   rK  r3  rM  rN  	add_topicr   r   rT   r   StaleMetadataLeaderNotAvailableErrorre   r   r  r  gatherupdate)r?   r  timestamps_by_noder   r   r   futsr  r  respartial_offsetss              r#   r  Fetcher._proc_offset_requests  sn     ll//111 )441
 %/$4$4$6 Ill**??	JG&&y7		1
 **955B		8
 44Y??"+IOO<CC(()4' %7. #5#;#;#=GKK11'FG $>NND))"ONN?+  #K 	2D *s"   F%F EF%?F# !F%#F%c                 n  #    U R                   R                  S:  a7  SnUR                  5        H   u  pEU VVs/ s H	  u  pgXgS4PM     snnX$'   M"     OSn[        U   " S[	        UR                  5       5      5      nU R                   R                  X5      I S h  vN n	0 n
U	R                   GHw  u  pEU GHj  tpkn[        XF5      n[        R                  " U5      nU[        R                  L a  U	R                  S:X  aR  US   n[        U5      S::  d   S5       eU(       a$  US   n[        R                  SUU5        US 4X'   M  [        S 4X'   M  Uu  nn[        R                  SUUU5        UU4X'   M  U[        R                   L a  [        R                  SU5        M  U[        R"                  L a  [        R                  S	U5        U" U5      eU[        R$                  L a  [        R'                  S
U5        U" U5      e[        R'                  SUU5        U" U5      e   GMz     U
$ s  snnf  GN7f)Nr   r   r   r   z'Expected OffsetResponse with one offsetzAHandling v0 ListOffsetResponse response for %s. Fetched offset %szLHandling ListOffsetResponse response for %s. Fetched offset %s, timestamp %sz_Cannot search by timestamp for partition %s because the message format version is before 0.10.0zbAttempt to fetch offsets for partition %s failed due to obsolete leadership information, retrying.zReceived unknown topic or partition error in ListOffset request for partition %s. The topic/partition may not exist or the user may not have Describe access to it.z;Attempt to fetch offsets for partition %s failed due to: %s)r   r   r3  r   rL  rb  rd  r   r   rf  rg  rS  rf   r   rT   r   UnsupportedForMessageFormatErrorrl  rm  r    )r?   r   r  versionr   ry  parttsrA  rs  res_offsetsrx  partition_infor   rz  r  r   r   s                     r#   r  Fetcher._proc_offset_request  sB    <<##j0G$.$4$4$6 CL$M9xtd]9$M
! %7 G(T*2B2B2D-EF**7<< (E5>1>*57	#__Z8
/++q0"0"3LA-EDE-"%,QZFII!8 ) &	 7=d^K26Dd5KK2,:)	6		B%"% 39)1D.6#J#JJ II! "	  6#D#DDIIL "	 %Y//6#F#FFKK6 " %Y//KK!!"	 %Y// 6? !0B S %N
 =s   7H5H,	AH5H2FH5c                   #     U R                   (       a
  [        5       eU R                  R                  (       a"  U R                  R	                  5       I Sh  vN   [        U R                  R                  5       5       H  nU(       a4  X!;  a/  U R                  R                  U5      (       d  U R                  U	 M>  U R                  U   n[        U5      [        L aA  UR                  5       nUc*  U R                  U	 U R                  U R                  5        M  Us  $ U R                  U	 U R                  U R                  5        UR                  5         M     U R                  5       nUI Sh  vN   GMt   GN! N
7f)zReturn one fetched records

This method will contain a little overhead as we will do more work this
way:
    * Notify prefetch routine per every consumed partition
    * Assure message marked for autocommit

N)r   r   r   reassignment_in_progressr0  rL  r   keysis_assignedtyper4   rb   r  r   r}   r  )r?   rv  r@   res_or_errormessager  s         r#   next_recordFetcher.next_record3  s2     ||*,,
 "";;))==???4==--/0""6..::2>> MM"-#}}R0%4*113G MM"-T%>%>?& b)LL!:!:; ,,.' 1, ..0FLLC  @2 s%   AE=E8DE=0E;1E=;E=c                   #     U R                   R                  (       a"  U R                   R                  5       I Sh  vN   [        R                  " 5       n0 n[        U R                  R                  5       5       GH%  nU(       a4  Xa;  a/  U R                   R                  U5      (       d  U R                  U	 M?  U R                  U   n[        U5      [        L a  UR                  U5      nUR                  5       (       d(  U R                  U	 U R                  U R                  5        U(       d  M  XU'   Ub"  U[        XV   5      -  nUS:  d   eUS:X  a    OLM  M  U(       a  Us  $ U R                  U	 U R                  U R                  5        UR!                  5         GM(     U(       d  U(       d  U$ U R#                  5       n	[$        R&                  " U	/US9I Sh  vN u  pU
(       a  U R(                  (       a)  U(       a   UR+                  5       nUR-                  5         0 $ U	R/                  5       (       a  U	R1                  5         U[        R                  " 5       U-
  -
  n[3        SU5      nGMq   GN9 N7f)z@Returns previously fetched records and updates consumed offsets.Nr   )r'  )r   r  r0  r:   r;   rL  r   r  r  r  r4   ri   r_   r  r   rf   r}   r  r  r7  r   r   r  r  r*  rV  )r?   rv  r'  rg   
start_timedrainedr@   r  r   r  r  pendingr  s                r#   fetched_recordsFetcher.fetched_records_  s     "";;))==???)JG4==--/0""6..::2>> MM"-#}}R0%4*11+>G'0022 MM"-T%>%>?" ")BK".#s7;'77*a///&!+! , /
  #N b)LL!:!:; ,,.; 1> g..0F"),,x"IIMD4<<!++-CJJL	{{}} !1J!>?G!WoGs 
 @N Js#   :I:I5F!I:I8BI:8I:c                    #    U R                  X5      I S h  vN nU H1  nXC;  a  S X4'   M  X4   u  pVU[        :X  a  S X4'   M$  [        XV5      X4'   M3     U$  N=7fr7   )r  r  r   )r?   r  r  r  r@   r   r   s          r#   get_offsets_by_timesFetcher.get_offsets_by_times  sa     ..zFFB "$+K!^+"&GK"4V"GGK   Gs   AA>Ac                    #    U Vs0 s H  o3[         R                  _M     nnU R                  XB5      I S h  vN nUR                  5        VVVs0 s H
  u  nu  pgX6_M     snnn$ s  snf  N3s  snnnf 7fr7   )r   r   r  r3  r?   rv  r  r@   r  r  r   r  s           r#   beginning_offsetsFetcher.beginning_offsets  sf     AKL2-666
L..zFF5<]]_E_1LV
_EE MFE+   A4A&A4A+A4A-!A4-A4c                    #    U Vs0 s H  o3[         R                  _M     nnU R                  XB5      I S h  vN nUR                  5        VVVs0 s H
  u  nu  pgX6_M     snnn$ s  snf  N3s  snnnf 7fr7   )r   r   r  r3  r  s           r#   end_offsetsFetcher.end_offsets  sf     ?IJz-444z
J..zFF5<]]_E_1LV
_EE KFEr  c                 |   U R                   R                  R                  nUc   e/ nU Hb  nUR                  U5      nUR	                  U5        UR                  UR                  5       5        XPR                  ;   d  MU  U R                  U	 Md     U R                  U R                  5        [        R                  " U6 $ )zBForce a position reset. Called from Consumer of `seek_to_*` API's.)r   r/  rA   rP   ro  re   wait_for_positionr   r  r   r  r  )r?   rB  r  rA   r  r@   rV   s          r#   request_offset_resetFetcher.request_offset_reset  s    ((55@@
%%%B!--b1H  *NN85578]]"MM"%  	T../~~w''r&   c                     U R                   R                  X5        XR                  ;   a  U R                  U	 U R                  U R                  5        g)zOForce a position change to specific offset. Called from
`Consumer.seek()` API.
N)r   seekr   r  r   )r?   r@   r   s      r#   seek_toFetcher.seek_to  sB     	  ,b! 	T../r&   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r7   )r   N)r-   r.   r/   r0   __doc__rD   r	  r  r  propertyr  r   r1  r2  rk  r4  r  r  r  r  r  r  r  r  r  r  r2   r,   r&   r#   r   r   F  s    2r  ")"*!?B$    jWXf
P@DTOb&P2hN`*X;&zF
F
((
0r&   r   )*r  r   r  loggingrQ  r:   	itertoolsr   r  aiokafka.errorserrorsr   r   r   r   aiokafka.protocol.fetchr   aiokafka.protocol.offsetr   aiokafka.record.control_recordr	   r
   aiokafka.record.memory_recordsr   aiokafka.structsr   r   r   aiokafka.utilr   r   	getLoggerr-   r   r  r   r   r   r4   rs   r   r   r,   r&   r#   <module>r     s              X X 0 2 F 8 O O 4!  ) )<VW VWr5 5([
 [
|K0 K0r&   