
    9h"                     
   S SK r S SKrS SKrS SKrS SKrS SKrS SKrS SKrS SKJ	r	  S SK
JrJrJr  \R                  " \5      rS rSr S SKJrJr  S SKJr  Sr " S S	\ R2                  5      r " S
 S\5      rg! \ a     N)f = f)    N)ClientError)SerializerErrorKeySerializerErrorValueSerializerErrorF)schemaless_readerschemaless_writer)parse_schemaTc                   $    \ rS rSrSrS rS rSrg)ContextStringIO3   z9
Wrapper to allow use of StringIO via 'with' constructs.
c                     U $ N )selfs    wC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\confluent_kafka/avro/serializer/message_serializer.py	__enter__ContextStringIO.__enter__8   s        c                 $    U R                  5         g)NF)close)r   argss     r   __exit__ContextStringIO.__exit__;   s    

r   r   N)__name__
__module____qualname____firstlineno____doc__r   r   __static_attributes__r   r   r   r   r   3   s    r   r   c                   P    \ rS rSrSrSS jrS rSS jrSS jrSS jr	SS	 jr
S
rg)MessageSerializer@   z
A helper class that can serialize and deserialize messages
that need to be encoded or decoded using the schema registry.

All encode_* methods return a buffer that can be sent to kafka.
All decode_* methods expect a buffer received from kafka.
Nc                 D    Xl         0 U l        0 U l        X l        X0l        g r   )registry_clientid_to_decoder_funcid_to_writersreader_key_schemareader_value_schema)r   r$   r'   r(   s       r   __init__MessageSerializer.__init__I   s$    ."$!2#6 r   c                    ^^ [         (       a0  [        R                  " [        U5      5      n[	        U5      mU4S j$ [
        R                  R                  U5      mU4S j$ )Nc                    > [        UTU 5      $ r   )r   )recordfpparsed_schemas     r   <lambda>5MessageSerializer._get_encoder_func.<locals>.<lambda>U   s    &7M6&Rr   c                 b   > TR                  U [        R                  R                  U5      5      $ r   )writeavroioBinaryEncoder)r-   r.   writers     r   r0   r1   W   s    &,,vtww7L7LR7P"Qr   )HAS_FASTjsonloadsstrr	   r4   r5   DatumWriter)r   writer_schemaschemar/   r7   s      @@r   _get_encoder_func#MessageSerializer._get_encoder_funcQ   sE    8ZZM 23F(0MRR$$]3QQr   c                    U(       a  [         O[        nU(       a  SOSnX-   nU R                  R                  (       a  U R                  R	                  Xr5      nOU R                  R                  Xr5      nU(       d  SU-  n	U" U	5      eXR                  ;  a  U R                  U5      U R                  U'   U R                  XUS9$ )a~  
Given a parsed avro schema, encode a record for the given topic.  The
record is expected to be a dictionary.

The schema is registered with the subject of 'topic-value'
:param str topic: Topic name
:param schema schema: Avro Schema
:param dict record: An object to serialize
:param bool is_key: If the record is a key
:returns: Encoded record with schema ID as bytes
:rtype: bytes
z-keyz-valuez+Unable to retrieve schema id for subject %s)is_key)	r   r   r$   auto_register_schemasregistercheck_registrationr&   r?   encode_record_with_schema_id)
r   topicr>   r-   rB   serialize_errsubject_suffixsubject	schema_idmessages
             r   encode_record_with_schema+MessageSerializer.encode_record_with_schemaY   s     /5*:N$*&(55,,55gFI,,??PICwOG(( ...,0,B,B6,JDy)0060RRr   c           
      ^   U(       a  [         O[        nXR                  ;  aI   U R                  R	                  U5      nU(       d  U" S5      eU R                  U5      U R                  U'   U R                  U   n	[        5        n
U
R                  [        R                  " S[         U5      5        U	" X*5        U
R#                  5       sSSS5        $ ! [         a?    [        R                  " 5       u  pgnU" [        [        R                  " XgU5      5      5      ef = f! , (       d  f       g= f)z
Encode a record with a given schema id.  The record must
be a python dictionary.
:param int schema_id: integer ID
:param dict record: An object to serialize
:param bool is_key: If the record is a key
:returns: decoder function
:rtype: func
zSchema does not exist>bIN)r   r   r&   r$   	get_by_idr?   r   sysexc_inforepr	tracebackformat_exceptionr   r3   structpack
MAGIC_BYTEgetvalue)r   rK   r-   rB   rH   r>   exc_type	exc_valueexc_tracebackr7   outfs              r   rF   .MessageSerializer.encode_record_with_schema_idz   s     /5*:N ...j--77	B'(?@@040F0Fv0N""9- ##I.$JJv{{5*i@A 6 ==?   j58\\^2]#D)C)CHYf)g$hiij s   AC ADA	D
D,c                   ^	^
^ XR                   ;   a  U R                   U   $  U R                  R                  U5      nUc  [	        SU-  5      eUR                  5       nU(       a  U R                  OU R                  n[        (       a   [        [        R                  " [        U5      5      5      mUb)  [        [        R                  " [        U5      5      5      m
OS m
[        UT5        UR                  U5        U
U4S jU R                   U'   U R                   U   $ UR                  U5        [$        R&                  R)                  XG5      m	U	4S jnXR                   U'   U R                   U   $ ! [         a  n[	        SU[        U5      4-  5      eS nAff = f! [         a    [         R#                  SU-  5         Nf = f)Nz%unable to fetch schema with id %d: %sz!unable to fetch schema with id %dc                    > [        U TT5      $ r   )r   )pfast_avro_reader_schemafast_avro_writer_schemas    r   r0   5MessageSerializer._get_decoder_func.<locals>.<lambda>   s    ?P.0G@Ir   zEFast avro failed for schema with id %d, falling thru to standard avroc                 d   > [         R                  R                  U 5      nTR                  U5      $ r   )r4   r5   BinaryDecoderread)rb   bin_decoderavro_readers     r   decoder4MessageSerializer._get_decoder_func.<locals>.decoder   s(    ''//2K##K00r   )r%   r$   rQ   r   r   r;   tellr'   r(   r8   r	   r9   r:   r   seek	Exceptionlogwarningr4   r5   DatumReader)r   rK   payloadrB   writer_schema_objecurr_posreader_schema_objrk   rj   rc   rd   s            @@@r   _get_decoder_func#MessageSerializer._get_decoder_func   s   ///**955	a $ 4 4 > >y I $!"E"STT<<>6<D22$BZBZ8s*6tzz#FWBX7Y*Z'$0.:4::cJ[F\;].^+.2+!'+BC X&6I''	2..y99 	X gg))*;O	1 .5	*&&y11a  	a!"IYX[\]X^L_"_``	a:  scgpqrss*   E3 
BF 3
F=FF"GGc                 8   Uc  g[        U5      S::  a  [        S5      e[        U5       n[        R                  " SUR                  S5      5      u  pEU[        :w  a  [        S5      eU R                  XSU5      nU" U5      sSSS5        $ ! , (       d  f       g= f)z
Decode a message from kafka that has been encoded for use with
the schema registry.
:param str|bytes or None message: message key or value to be decoded
:returns: Decoded message contents.
:rtype dict:
N   zmessage is too small to decoderP   z&message does not start with magic byte)lenr   r   rW   unpackrh   rY   rx   )r   rL   rB   rs   magicrK   decoder_funcs          r   decode_message MessageSerializer.decode_message   s     ?w<1!"BCCW%%}}UGLLODE
"%&NOO11)fML( &%%s   AB
B)r%   r&   r'   r(   r$   )NN)F)r   r   r   r   r   r)   r?   rM   rF   rx   r   r   r   r   r   r!   r!   @   s*    7RSB"#J72r)r   r!   )r5   r9   loggingrW   rR   rU   r4   avro.ioconfluent_kafka.avror   confluent_kafka.avro.serializerr   r   r   	getLoggerr   rp   rY   r8   fastavror   r   fastavro.schemar	   ImportErrorBytesIOr   objectr!   r   r   r   <module>r      s   , 
    
    ,C C !
	=,H

bjj 
l) l)#  		s   A9 9BB