
    9hL                         S SK r S SKrS SKrS SKrS SKJr  S SKJrJrJ	r	J
r
  S SKJr  S SKJr  S SKJr  S SKJrJr   " S S	5      r " S
 S5      r " S S5      rg)    N)Sequence)KafkaTimeoutErrorLeaderNotAvailableErrorNotLeaderForPartitionErrorProducerClosed)DefaultRecordBatchBuilder)LegacyRecordBatchBuilder)RecordMetadata)create_futureget_running_loopc                   ^    \ rS rSrSSS.S jrS r/ S.S\4S jjrS	 rS
 r	S r
S rS rSrg)BatchBuilder   N)key_serializervalue_serializerc          
          US:  a  U(       a   e[        XU5      U l        O[        UUUSSSUS9U l        SU l        S U l        SU l        XPl        X`l        g )N   r   )is_transactionalproducer_idproducer_epochbase_sequence
batch_sizeF)r	   _builderr   _relative_offset_buffer_closed_key_serializer_value_serializer)selfmagicr   compression_typer   r   r   s          jC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\aiokafka/producer/message_accumulator.py__init__BatchBuilder.__init__   sl     19'''4DM 6 !1!%DM !"-!1    c                     U R                   c  UnOU R                  U5      nU R                  c  UnX44$ U R                  U5      nX44$ N)r   r   )r    keyvalueserialized_keyserialized_values        r#   
_serializeBatchBuilder._serialize3   s\    ' N!11#6N!!)$ //  $55e<//r&   headersr0   c                    Uc  / nU R                   (       a  gU R                  X#5      u  pVU R                  R                  U R                  UUUUS9nUc  gU =R                  S-  sl        U$ )a`  Add a message to the batch.

Arguments:
    timestamp (float or None): epoch timestamp in seconds. If None,
        the timestamp will be set to the current time. If submitting to
        an 0.8.x or 0.9.x broker, the timestamp will be ignored.
    key (bytes or None): the message key. `key` and `value` may not
        both be None.
    value (bytes or None): the message value. `key` and `value` may not
        both be None.

Returns:
    If the message was successfully added, returns a metadata object
    with crc, offset, size, and timestamp fields. If the batch is full
    or closed, returns None.
N)r)   r*   r0      )r   r-   r   appendr   )r    	timestampr)   r*   r0   	key_bytesvalue_bytesmetadatas           r#   r3   BatchBuilder.append?   sx    " ?G<<!%!<	==''!! ( 
 "r&   c                 6    U R                   (       a  gSU l         g)a  Close the batch to further updates.

Closing the batch before submitting to the producer ensures that no
messages are added via the ``producer.send()`` interface. To gracefully
support both the batch and individual message interfaces, leave the
batch open. For complete control over the batch's contents, close
before submission. Closing a batch has no effect on when it's sent to
the broker.

A batch may not be reopened after it's closed.
NT)r   r    s    r#   closeBatchBuilder.closee   s     <<r&   c                 x    [        U R                  5      [        L d   eU R                  R                  XU5        g r(   )typer   r   set_producer_stater    r   r   r   s       r#   _set_producer_state BatchBuilder._set_producer_stateu   s.    DMM"&????((mTr&   c                     U R                  5         U R                  c!  U R                  R                  5       U l        U ?U R                  $ r(   )r;   r   r   buildr:   s    r#   _buildBatchBuilder._buildy   s8    

<<==..0DL||r&   c                 z    U R                   b  [        U R                   5      $ U R                  R                  5       $ )zGet the size of batch in bytes.)r   lenr   sizer:   s    r#   rI   BatchBuilder.size   s.    <<#t||$$==%%''r&   c                     U R                   $ )z'Get the number of records in the batch.)r   r:   s    r#   record_countBatchBuilder.record_count   s    $$$r&   )r   r   r   r   r   r   )__name__
__module____qualname____firstlineno__r$   r-   r   r3   r;   rA   rE   rI   rL   __static_attributes__ r&   r#   r   r      sB     2>
0 DF $ $L U(%r&   r   c                       \ rS rSrSrS r\S 5       r\S 5       r\	/ 4S\
4S jjrSS\4S	 jrS
 rS rSS jrS rS rS rS rS rS r\S 5       rSrg)MessageBatch   z@This class incapsulate operations with batch of produce messagesc                     X l         Xl        X0l        [        R                  " 5       U l        [        5       U l        / U l        [        5       U l	        SU l
        g Nr   )r   _tp_ttltime	monotonic_ctimer   future_msg_futures_drain_waiter_retry_count)r    tpbuilderttls       r#   r$   MessageBatch.__init__   sD    	nn& $o*_r&   c                     U R                   $ r(   )rY   r:   s    r#   rb   MessageBatch.tp   s    xxr&   c                 6    U R                   R                  5       $ r(   r   rL   r:   s    r#   rL   MessageBatch.record_count   s    }}))++r&   r0   c                     U R                   R                  X1X%S9nUc  gU" 5       nU R                  R                  Xv45        U$ )zAppend message (key and value) to batch

Returns:
    None if batch is full
      or
    asyncio.Future that will resolved when message is delivered
)r4   r)   r*   r0   N)r   r3   r_   )r    r)   r*   timestamp_ms_create_futurer0   r7   r^   s           r#   r3   MessageBatch.append   sQ     ==''"5 ( 
 !  &!34r&   Nc                    U R                   nUR                  nUR                  nUS:X  a  SnOSnU R                  R	                  5       (       d'  U R                  R                  U" UUUUUUU5      5        U R                   HY  u  pU	R	                  5       (       a  M  US:X  a  U
R                  nXR                  -   nU	R                  U" UUUUUUU5      5        M[     g)zResolve all pending futuresr   r   r2   N)	rY   topic	partitionr^   done
set_resultr_   r4   offset)r    base_offsetr4   log_start_offset_record_metadata_classrb   rp   rq   timestamp_typer^   r7   rt   s               r#   rr   MessageBatch.done   s     XXLL	?NN {{!!KK""&"$
 !% 1 1F{{}} B$..	 ??2F&"$
 !2r&   c                     U R                   R                  5       (       d  U R                   R                  S5        U R                   H-  u  pUR                  5       (       a  M  UR                  S5        M/     g)z#Resolve all pending futures to NoneN)r^   rr   rs   r_   )r    r^   _s      r#   
done_noackMessageBatch.done_noack   sW     {{!!KK""4(**IF{{}}d# +r&   c                    U R                   R                  5       (       d  U R                   R                  U5        U R                   HA  u  p#UR                  5       (       a  M  UR                  [        R                  " U5      5        MC     U R                  (       a  U R                   R                  5         U R                  R                  5       (       d  U R                  R                  U5        g g r(   )r^   rr   set_exceptionr_   copy	exceptionr`   )r    r   r^   r{   s       r#   failureMessageBatch.failure   s    {{!!KK%%i0**IF{{}}   9!56 + KK!!# !!&&((,,Y7 )r&   c                    #    U R                   n[        R                  " U/US9I Sh  vN   UR                  5       (       a  UR	                  5         gg N+7f)z3Wait until all message from this batch is processedtimeoutN)r`   asynciowaitrr   result)r    r   waiters      r#   
wait_drainMessageBatch.wait_drain  sD     ##llF8W555;;==MMO  	6s   &AA,Ac                 `    [         R                  " 5       U R                  -
  U R                  :  $ )z"Check that batch is expired or not)r[   r\   r]   rZ   r:   s    r#   expiredMessageBatch.expired  s!     4;;.$));;r&   c                     U R                   R                  5       (       d  U R                   R                  S5        U =R                  S-  sl        g)z#Compress batch to be ready for sendNr2   )r`   rr   rs   ra   r:   s    r#   drain_readyMessageBatch.drain_ready  s<    !!&&(())$/Qr&   c                 d    U R                   R                  5       (       d   e[        5       U l         g)z2Reset drain waiter, until we will do another retryN)r`   rr   r   r:   s    r#   reset_drainMessageBatch.reset_drain   s'    !!&&((((*_r&   c                 ~    U R                   R                  5       (       a   eU R                  R                  XU5        g r(   )r`   rr   r   rA   r@   s       r#   r?   MessageBatch.set_producer_state%  s1    %%**,,,,))+}Ur&   c                 6    U R                   R                  5       $ r(   )r   rE   r:   s    r#   get_data_bufferMessageBatch.get_data_buffer)  s    }}##%%r&   c                 <    U R                   R                  5       S:H  $ rX   ri   r:   s    r#   is_emptyMessageBatch.is_empty,  s    }}))+q00r&   c                     U R                   $ r(   )ra   r:   s    r#   retry_countMessageBatch.retry_count/  s       r&   )r   r]   r`   r_   ra   rY   rZ   r^   r(   )rN   rO   rP   rQ   __doc__r$   propertyrb   rL   r   r   r3   r
   rr   r|   r   r   r   r   r   r?   r   r   r   rR   rS   r&   r#   rU   rU      s    J   , , % 8 -1f$8(<-
V&1 ! !r&   rU   c                       \ rS rSrSrSSS.S jrS rS rS rS	 r	S
 r
S/ 4S\4S jjrS rS rS r\" 5       4S jrSS jrS rS rSrg)MessageAccumulatori4  zAccumulator of messages batched by topic-partition

Producer adds messages to this accumulator and a background send task
gets batches per nodes to process it.
N)txn_managerloopc                *   Uc
  [        5       nX`l        [        R                  " [        R                  5      U l        [        5       U l        Xl        X l	        X0l
        X@l        UR                  5       U l        SU l        SU l        XPl        S U l        g )NF)r   	   )r   _loopcollectionsdefaultdictdeque_batchesset_pending_batches_cluster_batch_size_compression_type
_batch_ttlr   _wait_data_futurer   _api_version_txn_manager
_exception)r    clusterr   r"   	batch_ttlr   r   s          r#   r$   MessageAccumulator.__init__;  s{     <#%D
#//0A0AB #%!1#!%!3!3!5"'r&   c                     Xl         g r(   )r   )r    api_versions     r#   set_api_version"MessageAccumulator.set_api_versionU  s    'r&   c                 >  #    U R                   R                  5        VVs/ s H  o  H  o"R                  PM     M     nnnX0R                   Vs/ s H  o"R                  PM     sn-  nU(       a  [        R
                  " U5      I S h  vN   g g s  snnf s  snf  N7fr(   )r   valuesr^   r   r   r   )r    batchesbatchwaiterss       r#   flushMessageAccumulator.flushX  s     (,(<(<(>
(>WG5LLGL(> 	 
 	.C.CD.CULL.CDD,,w''' 	
 E's'   BBBB$%B	B
Bc                 x  #    / nU R                   R                  5        HA  nU H8  nUR                  R                  5         UR	                  UR
                  5        M:     MC     XR                   Vs/ s H  o3R
                  PM     sn-  nU(       a  [        R                  " U5      I S h  vN   g g s  snf  N7fr(   )	r   r   r   r;   r3   r^   r   r   r   )r    r   r   r   s       r#   flush_for_commit#MessageAccumulator.flush_for_commit`  s     }}++-G  $$&u||,	 ! . 	.C.CD.CULL.CDD ,,w''' 	 E
 (s   A0B:2B3%B:,B8-B:c                     U R                   R                  5        H  nU H  nUR                  U5        M     M     U R                   H  nUR                  U5        M     Xl        g r(   )r   r   r   r   r   )r    r   r   r   s       r#   fail_allMessageAccumulator.fail_allo  sR    }}++-G i( ! . **EMM)$ +#r&   c                 N   #    SU l         U R                  5       I S h  vN   g  N7f)NT)r   r   r:   s    r#   r;   MessageAccumulator.closex  s     jjls   %#%r0   c                   #     U R                   (       a
  [        5       eU R                  b   [        R                  " U R                  5      eU R                  R                  U5      nU(       d"  U R                  5       nU R                  X5      n	OUS   n	U	R                  X#XVS9n
U
b  U
$ [        R                  " 5       nU	R                  U5      I Sh  vN   U[        R                  " 5       U-
  -  nUS::  a
  [        5       eGM   N27f)zAdd message to batch by topic-partition
If batch is already full this method waits (`timeout` seconds maximum)
until batch is drained by send task
Nr   r/   r   )r   r   r   r   r   getcreate_builder_append_batchr3   r[   r\   r   r   )r    rb   r)   r*   r   rl   r0   pending_batchesrc   r   r^   starts               r#   add_messageMessageAccumulator.add_message|  s      || %&&*ii00"mm//3O"--/**77'+\\#l\LF! NN$E""7+++t~~'%//G!|'))1 * ,s   CDD3Dc                     U R                   $ )zXReturn waiter future that will be resolved when accumulator contain
some data for drain
)r   r:   s    r#   data_waiterMessageAccumulator.data_waiter  s     %%%r&   c                    U R                   U   R                  5       nUR                  S:H  nU R                  b  U(       a  U R                  R	                  5       (       d   S5       eU R                  R                  UR                  5      nU R                  R                  UR                  UR                  5        UR                  U R                  R                  U R                  R                  US9  UR                  5         [        U R                   U   5      S:X  a  U R                   U	 U R                  R                  U5        U(       a!  X 4S jnUR                   R#                  U5        U$ )Nr   z.We should have waited for it in sender routine)r   r   r   c                 :    UR                   R                  U5        g r(   )r   remove)futr   r    s      r#   cb)MessageAccumulator._pop_batch.<locals>.cb  s    %%,,U3r&   )r   popleftr   r   has_pidsequence_numberrb   increment_sequence_numberrL   r?   r   r   r   rH   r   addr^   add_done_callback)r    rb   r   	not_retryseqr   s         r#   
_pop_batchMessageAccumulator._pop_batch  s4   b!))+%%*	(Y!!))++@?@+##33EHH=C77%BTBTU$$ --99#00??! % 
 	t}}R !Q&b!!!%(# 4 LL**2.r&   c                     UR                   nU R                  U   R                  U5        U R                  R	                  U5        UR                  5         g r(   )rb   r   
appendleftr   r   r   )r    r   rb   s      r#   	reenqueueMessageAccumulator.reenqueue  sB    XXb$$U+$$U+r&   c                    [         R                  " [        5      nSn[        U R                  R                  5       5       H  nXR;   a  M
  U R                  R                  U5      nUb  US:X  aa  U R                  U   S   R                  5       (       a8  U R                  U5      nUc  [        5       nO
[        5       nUR                  US9  SnM  U(       a  Xa;   a  M  U R                  U5      nUR                  5       (       d	  XsU   U'   M  UR                  5         M     U R                  R!                  5       (       d  U R                  R#                  S5        U R$                  R'                  5       U l        X44$ )z+Group batches by leader to partition nodes.FNr   r   )r   T)r   r   dictlistr   keysr   leader_for_partitionr   r   r   r   r   r   r|   r   rr   rs   r   r   )	r    ignore_nodesmuted_partitionsnodesunknown_leaders_existrb   leaderr   errs	            r#   drain_by_nodes!MessageAccumulator.drain_by_nodes  sC   ''- %t}}))+,B %]]77;F~2==$Q'//11 !OOB/E~8:57MMCM0(,%&"8OOB'E >>##$)fb!   "; -D %%**,,""--d3!%!9!9!;++r&   c           	          U R                   S:  a  SnOU R                   S:  a  SnOSnSnU R                  b  U R                  R                  b  Sn[        UU R                  U R
                  UUUS9$ )	N)r      r   )r   
   r2   r   FT)r   r   r   )r   r   transactional_idr   r   r   )r    r   r   r!   r   s        r#   r   !MessageAccumulator.create_builder  s    'E')EE )!!22>#""-)-
 	
r&   c                 2   U R                   b  U R                   R                  U5        [        X!U R                  5      nU R                  U   R                  U5        U R                  R                  5       (       d  U R                  R                  S 5        U$ r(   )	r   maybe_add_partition_to_txnrU   r   r   r3   r   rr   rs   )r    rc   rb   r   s       r#   r    MessageAccumulator._append_batch  sw    (88<R$//:b  '%%**,,""--d3r&   c                 
  #    U R                   (       a
  [        5       eU R                  b   [        R                  " U R                  5      e[        R
                  " 5       nUS:  a  U R                  R                  U5      nU(       a6  US   R                  US9I Sh  vN   U[        R
                  " 5       U-
  -  nO1U R                  X5      n[        R                  " UR                  5      $ US:  a  M  [        5       e Nc7f)a/  Add BatchBuilder to queue by topic-partition.

Arguments:
    builder (BatchBuilder): batch object to enqueue.
    tp (TopicPartition): topic and partition to enqueue this batch for.
    timeout (int): time in seconds to wait for a free slot in the batch
        queue.

Returns:
    MessageBatch: delivery wrapper around the BatchBuilder object.

Raises:
    aiokafka.errors.ProducerClosed: the accumulator has already been
        closed and flushed.
    aiokafka.errors.KafkaTimeoutError: the batch could not be added
        within the specified timeout.
Nr   r   r   )r   r   r   r   r[   r\   r   r   r   r   r   shieldr^   r   )r    rc   rb   r   r   pendingr   s          r#   	add_batchMessageAccumulator.add_batch  s     $ << ""??&))DOO,, kmm''+Gbk,,W,===4>>+e33**77~~ell33 k  !! >s   BDDAD7D)r   r   r   r   r   r   r   r   r   r   r   r   )NN)rN   rO   rP   rQ   r   r$   r   r   r   r   r;   r   r   r   r   r   	frozensetr   r   r   r  rR   rS   r&   r#   r   r   4  sq     4((($ %* %*N&6 =FK *,X
.	 "r&   r   )r   r   r   r[   collections.abcr   aiokafka.errorsr   r   r   r   aiokafka.record.default_recordsr   aiokafka.record.legacy_recordsr	   aiokafka.structsr
   aiokafka.utilr   r   r   rU   r   rS   r&   r#   <module>r     sS        $  F C + 9v% v%re! e!PD" D"r&   