
    9h1R                         S SK r S SKrS SKrS SKrS SKJr  S SKJrJr  SSK	J
r
  SSKJr   \r/ SQr/ SQr/ S	QrS
r\ R*                  " \5      r " S S\5      rg! \ a    \r N6f = f)    N)defaultdict)Sessionutils   )ClientError)loads)NONEFULLFORWARDBACKWARD)GETPOSTPUTDELETE)URL	USER_INFOSASL_INHERITz]application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/jsonc                       \ rS rSrSrSS jrS rS rS rS r	\
S	 5       rS
 r\
S 5       r\
S 5       rSS0 4S jr\
S 5       rSS jrS rS rS rS rS rS rS rSS jrSS jrSS jrSrg) CachedSchemaRegistryClient0   a  
A client that talks to a Schema Registry over HTTP

See http://confluent.io/docs/current/schema-registry/docs/intro.html for more information.

.. deprecated:: 1.1.0

Use CachedSchemaRegistryClient(dict: config) instead.
Existing params ca_location, cert_location and key_location will be replaced with their librdkafka equivalents:
`ssl.ca.location`, `ssl.certificate.location` and `ssl.key.location` respectively.
The support for password protected private key is via the Config only using 'ssl.key.password' field.

Errors communicating to the server will result in a ClientError being raised.

:param str|dict url: url(deprecated) to schema registry or dictionary containing client configuration.
:param str ca_location: File or directory path to CA certificate(s) for verifying the Schema Registry key.
:param str cert_location: Path to client's public key used for authentication.
:param str key_location: Path to client's private key used for authentication.
Nc                     Un[        U[        5      (       d"  UUUUS.n[        R                  " S[        SS9   UR                  SS5      n[        U[        5      (       d  [        S5      eUR                  S5      (       d  [        S	5      eUR                  S
5      U l        [        [        5      U l        [        [        5      U l        [        [        5      U l        [!        5       nUR                  SS 5      nUb  Xl        U R%                  U5      Ul        U R)                  U R                  U5      Ul        [,        R.                  " U R                  5      U l        Xpl        UR                  SS 5      n	U	(       + U l        U R5                  UR&                  S   UR&                  S   XR*                  U	5      U l        UR                  SS5      U l        [;        U5      S:  a(  [        SR=                  UR?                  5       5      5      eg )N)urlssl.ca.locationssl.certificate.locationssl.key.locationaF  CachedSchemaRegistry constructor is being deprecated. Use CachedSchemaRegistryClient(dict: config) instead. Existing params ca_location, cert_location and key_location will be replaced with their librdkafka equivalents as keys in the conf dict: `ssl.ca.location`, `ssl.certificate.location` and `ssl.key.location` respectively   )category
stacklevelr    zURL must be of type strhttpz(Invalid URL provided for Schema Registry/r   zssl.key.passwordr   r   zauto.register.schemasTz)Unrecognized configuration properties: {}) 
isinstancedictwarningswarnDeprecationWarningpopstring_type	TypeError
startswith
ValueErrorrstripr   r   subject_to_schema_idsid_to_schemasubject_to_schema_versionsr   verify_configure_client_tlscert_configure_basic_authauthr   urldefragauth_session_is_key_password_provided_make_https_session_https_sessionauto_register_schemaslenformatkeys)
selfr   max_schemas_per_subjectca_locationcert_locationkey_locationconfsca_pathkey_passwords
             wC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\confluent_kafka/avro/cached_schema_registry_client.py__init__#CachedSchemaRegistryClient.__init__E   s   #t$$#.,9$0	D MM2
 ,; 5 hhub!#{++566~~f%%GHH::c? &1%6"'-*5d*;'I((,d3H++D1++DHHd;&&txx0xx 2D9-9)9&"66qvvay!&&)WV\V\^jk%)XX.Et%L"t9q=HOOPTPYPYP[\]]     c                 $    U R                  5         g Ncloser>   s    rG   __del__"CachedSchemaRegistryClient.__del__|       

rJ   c                     U $ rL    rO   s    rG   	__enter__$CachedSchemaRegistryClient.__enter__   s    rJ   c                 $    U R                  5         g rL   rM   )r>   argss     rG   __exit__#CachedSchemaRegistryClient.__exit__   rR   rJ   c                     [        U S5      (       a  U R                  R                  5         [        U S5      (       a  U R                  R	                  5         g g )Nr6   r9   )hasattrr6   rN   r9   clearrO   s    rG   rN    CachedSchemaRegistryClient.close   sD    4$$MM!4)**%%' +rJ   c                 @    [         R                  " SUXUS9nX5l        U$ )NCERT_REQUIRED)	cert_reqsca_certs	cert_filekey_filerF   )urllib3PoolManagerr4   )rA   rB   ca_certs_pathr4   rF   https_sessions         rG   r8   .CachedSchemaRegistryClient._make_https_session   s*    ++oP]6Ciuw!rJ   c                    S[         0nU R                  R                  nU(       aA  [        R                  " U5      R                  S5      n[        [        U5      5      US'   SUS'   US   S:w  a8  US   S:w  a/  UR                  [        R                  " US   S	-   US   -   S
95        UR                  U5        U R                  R                  X!XTS9nU$ )NAcceptzUTF-8Content-Length&application/vnd.schemaregistry.v1+jsonContent-Typer   r   r   :)
basic_auth)headersbody)
ACCEPT_HDRr9   r4   jsondumpsencodestrr;   updatere   make_headersrequest)r>   r   methodrq   rr   request_headersr4   responses           rG   _send_https_session_request6CachedSchemaRegistryClient._send_https_session_request   s    #Z0""''::d#**73D03CIO,-.VON+7b=T!W]""7#7#747S=8<QD@ $A Bw'&&..vO._rJ   c                    UR                  SS5      R                  5       nU[        ;  a  [        SR	                  [        5      5      eUS:X  aU  UR                  SS5      R                  5       S:X  a  [        S5      eUR                  S	S5      UR                  S
S5      4nU$ US:X  a,  [        UR                  SS5      R                  S5      5      nU$ [        R                  " U 5      nU$ )Nzbasic.auth.credentials.sourcer   z?schema.registry.basic.auth.credentials.source must be one of {}r   zsasl.mechanismr   GSSAPIz3SASL_INHERIT does not support SASL mechanism GSSAPIzsasl.usernamezsasl.passwordr   zbasic.auth.user.inforo   )	r'   upperVALID_AUTH_PROVIDERSr+   r<   tuplesplitr   get_auth_from_url)r   rC   auth_providerr4   s       rG   r3   0CachedSchemaRegistryClient._configure_basic_auth   s    !@%HNNP 44^$f%9:< <N*xx("-335A !VWWHH_b1488OR3PQD
 	 k)"8"=CCCHID  **3/DrJ   c                     U R                  SS 5      U R                  SS 5      4n[        US   5      [        US   5      :w  a  [        S5      eU$ )Nr   r   r   r   z^Both schema.registry.ssl.certificate.location and schema.registry.ssl.key.location must be set)r'   boolr+   )rC   r2   s     rG   r1   0CachedSchemaRegistryClient._configure_client_tls   sT    xx2D9488DVX\;]]Q=DaM)pr rrJ   r   c                    U[         ;  a  [        SR                  U[         5      5      eUR                  S5      (       aP  U R                  (       a?  U R                  XXC5      n [        R                  " UR                  5      UR                  4$ S[        0nU(       a  [        [        U5      5      US'   SUS'   UR                  U5        U R                   R#                  X!XcS9n UR                  5       UR$                  4$ ! [         a    UR                  UR                  4s $ f = f! [         a    UR                  UR$                  4s $ f = f)Nz.Method {} is invalid; valid methods include {}httpsrk   rl   rm   rn   )rq   rt   )VALID_METHODSr   r<   r*   r7   r~   rt   r   datastatusr+   contentrs   rw   r;   rx   r6   rz   status_code)r>   r   r{   rr   rq   r}   _headerss          rG   _send_request(CachedSchemaRegistryClient._send_request   s)   &NUUV\^klmm>>'""t'E'E77WSH9zz(--0(//AA j)),SYH%&'OH^$ ==((h(R	:==?H$8$888  9''889  	:##X%9%999	:s$   $+D 'D+ "D('D(+"EEc                     X   nX4U'   g rL   rT   )cachesubjectschemavalue	sub_caches        rG   _add_to_cache(CachedSchemaRegistryClient._add_to_cache   s    N	!&rJ   c                     X R                   ;   a  U R                   U   nOXR                   U'   U(       aC  U R                  U R                  X1U5        U(       a  U R                  U R                  X1U5        g g g rL   )r.   r   r-   r/   )r>   r   	schema_idr   versions        rG   _cache_schema(CachedSchemaRegistryClient._cache_schema   sr    )))&&y1F+1i(t99&	;""4#B#B#*G=  rJ   c                 v   U R                   U   nUR                  US5      nUb  U$ SR                  U R                  SUS/5      nS[	        U5      0nU R                  USUS9u  pxUS:X  d  US	:X  a&  [        S
[	        U5      -   S-   [	        U5      -   5      eUS:X  a&  [        S[	        U5      -   S-   [	        U5      -   5      eUS:X  a&  [        S[	        U5      -   S-   [	        U5      -   5      eUS:  a  US::  d&  [        S[	        U5      -   S-   [	        U5      -   5      eUS   nU R                  X$U5        U$ )a  
POST /subjects/(string: subject)/versions
Register a schema with the registry under the given subject
and receive a schema id.

avro_schema must be a parsed schema from the python avro library

Multiple instances of the same schema will result in cache misses.

:param str subject: subject name
:param schema avro_schema: Avro schema to be registered
:returns: schema_id
:rtype: int
Nr!   subjectsversionsr   r   r{   rr        Unauthorized access. Error code:z	 message:i  zIncompatible Avro schema:  zInvalid Avro schema:   +  z&Unable to register schema. Error code:idr-   getjoinr   rw   r   r   r   	r>   r   avro_schemaschemas_to_idr   r   rr   resultcodes	            rG   register#CachedSchemaRegistryClient.register   st     227;!%%k48	 hh*gzBC #k*+))#f4)HCK43;@3t9L +,.1&k: ; ;S[9CIE +,.1&k: ; ;S[4s4y@ +,.1&k: ; ;#+$#+FTR +,.1&k: ; ; 4L	;7;rJ   c                    U R                   U   nUR                  US5      nUb  U$ SR                  U R                  SU/5      nS[	        U5      0nU R                  USUS9u  pxUS:X  d  US:X  a  [        S	[	        U5      -   5      eUS
:X  a  [        S[	        U5      -   5      eSUs=::  a  S::  d  O  [        S[	        U5      -   5      eUS   nU R                  X$U5        U$ )a  
POST /subjects/(string: subject)
Check if a schema has already been registered under the specified subject.
If so, returns the schema id. Otherwise, raises a ClientError.

avro_schema must be a parsed schema from the python avro library

Multiple instances of the same schema will result in inconsistencies.

:param str subject: subject name
:param schema avro_schema: Avro schema to be checked
:returns: schema_id
:rtype: int
Nr!   r   r   r   r   r   r   r     zSchema or subject not found:r   r   z0Unable to check schema registration. Error code:r   r   r   s	            rG   check_registration-CachedSchemaRegistryClient.check_registration  s      227;!%%k48	 hh*g67 #k*+))#f4)H3;$#+@3t9LMMS[<s4yHII##PSVW[S\\]]4L	;7;rJ   c                     SR                  U R                  SU/5      nU R                  USS9u  p4US:  a  US::  d  [        SR	                  U5      5      eU$ )aG  
DELETE /subjects/(string: subject)
Deletes the specified subject and its associated compatibility level if registered.
It is recommended to use this API only when a topic needs to be recycled or in development environments.
:param subject: subject name
:returns: version of the schema deleted under this subject
:rtype: (int)
r!   r   r   )r{   r   r   zUnable to delete subject: {})r   r   r   r   r<   )r>   r   r   r   r   s        rG   delete_subject)CachedSchemaRegistryClient.delete_subject7  s]     hh*g67))#h)?<CCFKLLrJ   c                 
   XR                   ;   a  U R                   U   $ SR                  U R                  SS[        U5      /5      nU R	                  U5      u  p4US:X  a"  [
        R                  S[        U5      -   5        gUS:  a  US::  d"  [
        R                  S	[        U5      -   5        gUR                  S
5      n [        U5      nU R                  X15        U$ ! [         a  n[        SU< SU< 35      eSnAff = f)z
GET /schemas/ids/{int: id}
Retrieve a parsed avro schema by id or None if not found
:param int schema_id: int value
:returns: Avro schema
:rtype: schema
r!   schemasidsr   Schema not found:Nr   r   z)Unable to get schema for the specific ID:r   zReceived bad schema (id z) from registry: )r.   r   r   rw   r   logerrorr   r   r   r   )r>   r   r   r   r   
schema_stres          rG   	get_by_id$CachedSchemaRegistryClient.get_by_idH  s     )))$$Y//hh)UC	NCD))#.3;II)CI56#+$#+IIACIMN  H-Jdz*""65 d!U^`a"bccds   C   
D*C==Dc                 &    U R                  US5      $ )aq  
GET /subjects/(string: subject)/versions/latest

Return the latest 3-tuple of:
(the schema id, the parsed avro schema, the schema version)
for a particular subject.

This call always contacts the registry.

If the subject is not found, (None,None,None) is returned.
:param str subject: subject name
:returns: (schema_id, schema, version)
:rtype: (string, schema, int)
latest)get_by_version)r>   r   s     rG   get_latest_schema,CachedSchemaRegistryClient.get_latest_schemah  s     ""7H55rJ   c           	         SR                  U R                  SUS[        U5      /5      nU R                  U5      u  pEUS:X  a"  [        R                  S[        U5      -   5        gUS:X  a"  [        R                  S[        U5      -   5        gUS	:  a  US
::  d  gUS   nUS   nX`R                  ;   a  U R                  U   nO [        US   5      nU R                  XvX5        XgU4$ ! [         a    e f = f)a  
GET /subjects/(string: subject)/versions/(versionId: version)

Return the 3-tuple of:
(the schema id, the parsed avro schema, the schema version)
for a particular subject and version.

This call always contacts the registry.

If the subject is not found, (None,None,None) is returned.
:param str subject: subject name
:param int version: version number
:returns: (schema_id, schema, version)
:rtype: (string, schema, int)
r!   r   r   r   r   )NNNr   zInvalid version:r   r   r   r   r   )
r   r   rw   r   r   r   r.   r   r   r   )r>   r   r   r   r   r   r   r   s           rG   r   )CachedSchemaRegistryClient.get_by_versiony  s      hh*gz3w<PQ))#.3;II)CI56%S[II(3t945%#+$#+%4L	#)))&&y1Fvh/0
 	6g?7++  s   C( (C4c                    U R                   U   nUR                  US5      nUb  U$ SR                  U R                  SU/5      nS[	        U5      0nU R                  USUS9u  pxUS:X  a"  [        R                  S[	        U5      -   5        gUS	:  a  US
::  d"  [        R                  S[	        U5      -   5        gUS   n	US   nU R                  X)X5        U$ )z
POST /subjects/(string: subject)

Get the version of a schema for a given subject.

Returns None if not found.
:param str subject: subject name
:param: schema avro_schema: Avro schema
:returns: version
:rtype: int
Nr!   r   r   r   r   r   z
Not found:r   r   z"Unable to get version of a schema:r   r   )	r/   r   r   r   rw   r   r   r   r   )
r>   r   r   schemas_to_versionr   r   rr   r   r   r   s
             rG   get_version&CachedSchemaRegistryClient.get_version  s     "<<WE$((d;Nhh*g67#k*+))#f4)H3;IIlSY./#+$#+II:SYFG4L	#;7DrJ   c           
         SR                  U R                  SSUS[        U5      /5      nS[        U5      0n U R                  USUS9u  pgUS:X  a"  [        R                  S	[        U5      -   5        g
US:X  a"  [        R                  S[        U5      -   5        g
US:  a  US::  a  UR                  S5      $ [        R                  S[        U5      -   5        g
! [         a   n[        R                  SU5         SnAg
SnAff = f)aW  
POST /compatibility/subjects/(string: subject)/versions/(versionId: version)

Test the compatibility of a candidate parsed schema for a given subject.

By default the latest version is checked against.
:param: str subject: subject name
:param: schema avro_schema: Avro schema
:return: True if compatible, False if not compatible
:rtype: bool
r!   compatibilityr   r   r   r   r   r   zSubject or version not found:Fr   zInvalid subject or schema:r   r   is_compatiblez#Unable to check the compatibility: z_send_request() failed: %sN)r   r   rw   r   r   r   r   	Exception)	r>   r   r   r   r   rr   r   r   r   s	            rG   test_compatibility-CachedSchemaRegistryClient.test_compatibility  s     hh/:w"CL2 3#k*+	--c&t-LLFs{		:SYFH		7#d)CEzz/22		?#d)KL 	II2A6	s)   :C 4'C C 9!C 
D%D  Dc                    U[         ;  a  [        S[        U5      -  5      eSR                  U R                  S/5      nU(       a  USU-   -  nSU0nU R                  USUS9u  pVUS:  a  US::  a  US   $ [        S	[        U5      U4-  5      e)
z
PUT /config/(string: subject)

Update the compatibility level for a subject.  Level must be one of:

:param str level: ex: 'NONE','FULL','FORWARD', or 'BACKWARD'
zInvalid level specified: %sr!   configr   r   r   r   r   z*Unable to update level: %s. Error code: %d)VALID_LEVELSr   rw   r   r   r   )r>   levelr   r   rr   r   r   s          rG   update_compatibility/CachedSchemaRegistryClient.update_compatibility  s     $;s5zJKKhh(+,3= C'))#e$)G3;43;/**JcRWjZ^M__``rJ   c                 Z   SR                  U R                  S/5      nU(       a  SR                  X!/5      nU R                  U5      u  p4US:  =(       a    US:*  nU(       d  [        SU-  5      eUR	                  SS5      nU[
        ;  a  Uc  SnO[        U5      n[        S	U-  5      eU$ )
a#  
GET /config
Get the current compatibility level for a subject.  Result will be one of:

:param str subject: subject name
:raises ClientError: if the request was unsuccessful or an invalid compatibility level was returned
:returns: one of 'NONE','FULL','FORWARD', or 'BACKWARD'
:rtype: bool
r!   r   r   r   z3Unable to fetch compatibility level. Error code: %dcompatibilityLevelNzNo compatibility was returnedz(Invalid compatibility level received: %s)r   r   r   r   r   r   rw   )r>   r   r   r   r   is_successful_requestr   error_msg_suffixs           rG   get_compatibility,CachedSchemaRegistryClient.get_compatibility  s     hh(+,((C>*C))#. $ ;$SVZZ[[

#7>,$#B #&}#5 HK[[\\rJ   )r9   r7   r6   r:   r.   r-   r/   r   )i  NNN)NN)r   rL   )__name__
__module____qualname____firstlineno____doc__rH   rP   rU   rY   rN   staticmethodr8   r~   r3   r1   r   r   r   r   r   r   r   r   r   r   r   r   r   __static_attributes__rT   rJ   rG   r   r   0   s    (5^n(       ).D" :0 " "=*X$L"d@6"',R@@a,rJ   r   )loggingr$   re   rt   collectionsr   requestsr   r   r   r   r   r   
basestringr(   	NameErrorrw   r   r   r   rs   	getLoggerr   r   objectr   rT   rJ   rG   <module>r      sw   ,     # #  K 70;  m
!c c  Ks   A A%$A%