
    9h                     F    S SK Jr  SSKJrJr  SSKJrJr   " S S\5      r	g)    )Producer   )MessageFieldSerializationContext)KeySerializationErrorValueSerializationErrorc                   @   ^  \ rS rSrSrU 4S jr  SU 4S jjrSrU =r$ )SerializingProducer   aG
  
A high level Kafka producer with serialization capabilities.

`This class is experimental and likely to be removed, or subject to incompatible API
changes in future versions of the library. To avoid breaking changes on upgrading, we
recommend using serializers directly.`

Derived from the :py:class:`Producer` class, overriding the :py:func:`Producer.produce`
method to add serialization capabilities.

Additional configuration properties:

+-------------------------+---------------------+-----------------------------------------------------+
| Property Name           | Type                | Description                                         |
+=========================+=====================+=====================================================+
|                         |                     | Callable(obj, SerializationContext) -> bytes        |
| ``key.serializer``      | callable            |                                                     |
|                         |                     | Serializer used for message keys.                   |
+-------------------------+---------------------+-----------------------------------------------------+
|                         |                     | Callable(obj, SerializationContext) -> bytes        |
| ``value.serializer``    | callable            |                                                     |
|                         |                     | Serializer used for message values.                 |
+-------------------------+---------------------+-----------------------------------------------------+

Serializers for string, integer and double (:py:class:`StringSerializer`, :py:class:`IntegerSerializer`
and :py:class:`DoubleSerializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
namespace.

Serializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufSerializer`, :py:class:`JSONSerializer`
and :py:class:`AvroSerializer`) with Confluent Schema Registry integration are supplied out-of-the-box
in the ``confluent_kafka.schema_registry`` namespace.

See Also:
    - The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
    - `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
    - `STATISTICS.md <https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md>`_ for detailed information on the statistics provided by stats_cb
    - The :py:class:`Producer` class for inherited methods.

Args:
    conf (producer): SerializingProducer configuration.
c                    > UR                  5       nUR                  SS 5      U l        UR                  SS 5      U l        [        [
        U ]  U5        g )Nzkey.serializerzvalue.serializer)copypop_key_serializer_value_serializersuperr
   __init__)selfconf	conf_copy	__class__s      iC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\confluent_kafka/serializing_producer.pyr   SerializingProducer.__init__E   sF    IIK	(}}-=tD!*/A4!H!41)<    c           
        > [        U[        R                  U5      nU R                  b   U R                  X(5      n[        R                  Ul        U R                  b   U R                  X85      n[        [        U ]3  XUUUUUS9  g! [         a  n	[        U	5      eSn	A	ff = f! [         a  n	[        U	5      eSn	A	ff = f)a  
Produce a message.

This is an asynchronous operation. An application may use the
``on_delivery`` argument to pass a function (or lambda) that will be
called from :py:func:`SerializingProducer.poll` when the message has
been successfully delivered or permanently fails delivery.

Note:
    Currently message headers are not supported on the message returned to
    the callback. The ``msg.headers()`` will return None even if the
    original message had headers set.

Args:
    topic (str): Topic to produce message to.

    key (object, optional): Message payload key.

    value (object, optional): Message payload value.

    partition (int, optional): Partition to produce to, else the
        configured built-in partitioner will be used.

    on_delivery (callable(KafkaError, Message), optional): Delivery
        report callback. Called as a side effect of
        :py:func:`SerializingProducer.poll` or
        :py:func:`SerializingProducer.flush` on successful or
        failed delivery.

    timestamp (float, optional): Message timestamp (CreateTime) in
        milliseconds since Unix epoch UTC (requires broker >= 0.10.0.0).
        Default value is current time.

    headers (dict, optional): Message headers. The header key must be
        a str while the value must be binary, unicode or None. (Requires
        broker version >= 0.11.0.0)

Raises:
    BufferError: if the internal producer message queue is full.
        (``queue.buffering.max.messages`` exceeded). If this happens
        the application should call :py:func:`SerializingProducer.Poll`
        and try again.

    KeySerializationError: If an error occurs during key serialization.

    ValueSerializationError: If an error occurs during value serialization.

    KafkaException: For all other errors
N)headers	partition	timestampon_delivery)r   r   KEYr   	Exceptionr   VALUEfieldr   r   r   r
   produce)r   topickeyvaluer   r   r   r   ctxser   s             r   r#   SerializingProducer.produceM   s    h #5,*:*:GD+0**34 !&&	!!-2..u: 	!40s9@;D;D=H	 	1 	J  0+B//0  2-b112s/   B B$ 
B!BB!$
B>.B99B>)r   r   )NNNr   N)	__name__
__module____qualname____firstlineno____doc__r   r#   __static_attributes____classcell__)r   s   @r   r
   r
      s%    (T= >@7;EJ EJr   r
   N)
confluent_kafka.cimplr   _ProducerImplserializationr   r   errorr   r   r
    r   r   <module>r7      s$   & <2-xJ- xJr   