
    9h                     J    S SK Jr  SSKJrJrJr  SSKJrJ	r	   " S S\5      r
g)    )Consumer   )ConsumeErrorKeyDeserializationErrorValueDeserializationError)SerializationContextMessageFieldc                   F   ^  \ rS rSrSrU 4S jrSU 4S jjrSS jrSrU =r	$ )	DeserializingConsumer   a
  
A high level Kafka consumer with deserialization capabilities.

`This class is experimental and likely to be removed, or subject to incompatible API
changes in future versions of the library. To avoid breaking changes on upgrading, we
recommend using deserializers directly.`

Derived from the :py:class:`Consumer` class, overriding the :py:func:`Consumer.poll`
method to add deserialization capabilities.

Additional configuration properties:

+-------------------------+---------------------+-----------------------------------------------------+
| Property Name           | Type                | Description                                         |
+=========================+=====================+=====================================================+
|                         |                     | Callable(bytes, SerializationContext) -> obj        |
| ``key.deserializer``    | callable            |                                                     |
|                         |                     | Deserializer used for message keys.                 |
+-------------------------+---------------------+-----------------------------------------------------+
|                         |                     | Callable(bytes, SerializationContext) -> obj        |
| ``value.deserializer``  | callable            |                                                     |
|                         |                     | Deserializer used for message values.               |
+-------------------------+---------------------+-----------------------------------------------------+

Deserializers for string, integer and double (:py:class:`StringDeserializer`, :py:class:`IntegerDeserializer`
and :py:class:`DoubleDeserializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
namespace.

Deserializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufDeserializer`, :py:class:`JSONDeserializer`
and :py:class:`AvroDeserializer`) with Confluent Schema Registry integration are supplied out-of-the-box
in the ``confluent_kafka.schema_registry`` namespace.

See Also:
    - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
    - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
    - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb
    - The :py:class:`Consumer` class for inherited methods.

Args:
    conf (dict): DeserializingConsumer configuration.

Raises:
    ValueError: if configuration validation fails
c                    > UR                  5       nUR                  SS 5      U l        UR                  SS 5      U l        [        [
        U ]  U5        g )Nzkey.deserializerzvalue.deserializer)copypop_key_deserializer_value_deserializersuperr   __init__)selfconf	conf_copy	__class__s      kC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\confluent_kafka/deserializing_consumer.pyr   DeserializingConsumer.__init__I   sF    IIK	!*/A4!H#,==1Et#L #T3I>    c                   > [         [        U ]  U5      nUc  gUR                  5       b  [	        UR                  5       US9e[        UR                  5       [        R                  UR                  5       5      nUR                  5       nU R                  b   U R                  XC5      nUR                  5       n[        R                  Ul        U R"                  b   U R#                  Xc5      nUR'                  U5        UR)                  U5        U$ ! [         a  n[        XRS9eSnAff = f! [         a  n[%        XRS9eSnAff = f)a  
Consume messages and calls callbacks.

Args:
    timeout (float): Maximum time to block waiting for message(Seconds).

Returns:
    :py:class:`Message` or None on timeout

Raises:
    KeyDeserializationError: If an error occurs during key deserialization.

    ValueDeserializationError: If an error occurs during value deserialization.

    ConsumeError: If an error was encountered while polling.
N)kafka_message)	exceptionr   )r   r   pollerrorr   r   topicr	   VALUEheadersvaluer   	Exceptionr   keyKEYfieldr   r   set_key	set_value)r   timeoutmsgctxr#   ser%   r   s          r   r   DeserializingConsumer.pollP   s#   $ )45g>;99;"syy{#>>"399;0B0BCKKMR		##/Q00< ggi $$	!!-O,,S6 	Ce
  Q/"PPQ  O-NNOs0   D D+ 
D(	D##D(+
E5	D>>Ec                     [         e)z`
:py:func:`Consumer.consume` not implemented, use
:py:func:`DeserializingConsumer.poll` instead
)NotImplementedError)r   num_messagesr*   s      r   consumeDeserializingConsumer.consume~   s
     "!r   )r   r   ))r   r4   )
__name__
__module____qualname____firstlineno____doc__r   r   r2   __static_attributes____classcell__)r   s   @r   r   r      s    +Z?,\" "r   r   N)confluent_kafka.cimplr   _ConsumerImplr   r   r   r   serializationr   r	   r    r   r   <module>r@      s'   & </ /*i"M i"r   