
    &gK                       S r SSKJr  SSKrSSKJrJrJrJrJ	r	J
r
Jr  SSKJrJr  SSKJr  SSKJr  SSKJrJr  SS	KJr  SS
KJrJrJrJrJr  SSKJr  SSK J!r!J"r"J#r#  SSK$J%r%  SSK&J'r'J(r(J)r)  Sr*\+" / SQ5      r,\(       a  SSK-J.r.  SSK/J0r0  SSK1J2r2  SSK3J4r4  SSK5J6r6  SS jr7 " S S\\(   5      r8 " S S\8\(   5      r9 " S S\8\(   5      r: " S S\:\(   5      r;g) zAWatch changes on a collection, a database, or the entire cluster.    )annotationsN)TYPE_CHECKINGAnyGenericMappingOptionalTypeUnion)CodecOptions_bson_to_dict)RawBSONDocument)	Timestamp)_csotcommon)validate_collation_or_none)ConnectionFailureCursorNotFoundInvalidOperationOperationFailurePyMongoError)_Op)_AggregationCommand_CollectionAggregationCommand_DatabaseAggregationCommand)CommandCursor)_CollationIn_DocumentType	_PipelineT)      Y   [      i  i)#  i{'  iP-  iR-  i{4  i|4  ?      iL4        )ClientSession)
Collection)Database)MongoClient)
Connectionc                0   [        U [        [        45      (       a  g[        U [        5      (       ae  U R                  c  gU R                  S:  =(       a    U R                  S5      =(       d)    U R                  S:  =(       a    U R                  [        ;   $ g)z5Return True if given a resumable change stream error.TF	   ResumableChangeStreamError)
isinstancer   r   r   _max_wire_versionhas_error_labelcode_RESUMABLE_GETMORE_ERRORS)excs    fC:\Suresh\moveshuttle\MDcreated\moveengine\venv\Lib\site-packages\pymongo/synchronous/change_stream.py
_resumabler7   M   s    #)>:;;#'((  (!!Q&\3+>+>?[+\S##a'QCHH8Q,Q	S     c                     \ rS rSrSr   S                           SS jjrSS jr\SS j5       r\SS j5       r	SS jr
SS	 jrSS
 jrSS jr      S S jrS!S jrSS jrSS jrS"S jr\S#S j5       r\R*                  S$S j5       r\r\S%S j5       r\R*                  S&S j5       rS"S jrS'S jrSrg)(ChangeStreamZ   a  The internal abstract base class for change stream cursors.

Should not be called directly by application developers. Use
:meth:`pymongo.collection.Collection.watch`,
:meth:`pymongo.database.Database.watch`, or
:meth:`pymongo.mongo_client.MongoClient.watch` instead.

.. versionadded:: 3.6
.. seealso:: The MongoDB documentation on `changeStreams <https://mongodb.com/docs/manual/changeStreams/>`_.
Nc                   Uc  / n[         R                  " SU5      n[         R                  " SU5        [        U5        [         R                  " SU5        SU l        UR                  U l        UR                  R                  R                  (       a7  SU l        UR                  UR                  R                  [        S9S9U l        OXl        [        R                  " U5      U l        X0l        Xl        U
S LU l        US LU l        [        R                  " U
=(       d    U5      U l        XPl        X`l        Xpl        Xl        Xl        Xl        SU l        U R                  R8                  U l        Xl        g )Npipelinefull_document	batchSizeFT)document_class)codec_options)r   validate_listvalidate_string_or_noner   %validate_non_negative_integer_or_none_decode_customrA   _orig_codec_optionstype_registry_decoder_mapwith_optionsr   _targetcopydeepcopy	_pipeline_full_document_full_document_before_change_uses_start_after_uses_resume_after_resume_token_max_await_time_ms_batch_size
_collation_start_at_operation_time_session_comment_closed_timeout_show_expanded_events)selftargetr=   r>   resume_aftermax_await_time_ms
batch_size	collationstart_at_operation_timesessionstart_aftercommentfull_document_before_changeshow_expanded_eventss                 r6   __init__ChangeStream.__init__f   s>   ( H''
H=&&F"9-44[*M#@F@T@T --::"&D "..$22???_ / DL "Lx0+,G)!,D!8".d":!]];+F,G"3%#(?%--%9"r8   c                .    U R                  5       U l        g N)_create_cursor_cursorr\   s    r6   _initialize_cursorChangeStream._initialize_cursor   s    **,r8   c                    [         e)z)The aggregation command class to be used.NotImplementedErrorrn   s    r6   _aggregation_command_class'ChangeStream._aggregation_command_class   s
     "!r8   c                    [         e)zUThe client against which the aggregation commands for
this ChangeStream will be run.
rr   rn   s    r6   _clientChangeStream._client   s
    
 "!r8   c                F   0 nU R                   b  U R                   US'   U R                  b  U R                  US'   U R                  nUb  U R                  (       a  X!S'   O!X!S'   OU R                  b  U R                  US'   U R
                  (       a  U R
                  US'   U$ )z=Return the options dict for the $changeStream pipeline stage.fullDocumentfullDocumentBeforeChange
startAfterresumeAfterstartAtOperationTimeshowExpandedEvents)rN   rO   resume_tokenrP   rV   r[   )r\   optionsr   s      r6   _change_stream_options#ChangeStream._change_stream_options   s    "$*&*&9&9GN#,,8262S2SG./((#%%(4%)5&**6.2.K.KG*+%%,0,F,FG()r8   c                z    0 nU R                   b  U R                   US'   U R                  b  U R                  US'   U$ )z4Return the options dict for the aggregation command.maxAwaitTimeMSr?   )rS   rT   )r\   r   s     r6   _command_optionsChangeStream._command_options   sE    "".(,(?(?G$%'#'#3#3GK r8   c                f    U R                  5       nSU0/nUR                  U R                  5        U$ )z;Return the full aggregation pipeline for this ChangeStream.z$changeStream)r   extendrM   )r\   r   full_pipelines      r6   _aggregation_pipeline"ChangeStream._aggregation_pipeline   s5    --/ /9:T^^,r8   c                0   US   S   (       d  SUS   ;   a  US   S   U l         gU R                  cd  U R                  SL aT  U R                  SL aD  UR                  S:  a3  UR                  S5      U l        U R                  c  [        SU< 35      egggggg)	a%  Callback that caches the postBatchResumeToken or
startAtOperationTime from a changeStream aggregate command response
containing an empty batch of change documents.

This is implemented as a callback because we need access to the wire
version in order to determine whether to cache this value.
cursor
firstBatchpostBatchResumeTokenNFr    operationTimez?Expected field 'operationTime' missing from command response : )rR   rV   rQ   rP   max_wire_versiongetr   )r\   resultconns      r6   _process_resultChangeStream._process_result   s     h-%)99%+H%56L%M"--5++u4**e3))Q.06

?0K-008*&&,Z1  9	 / 4 5 6	 .r8   c           
     B   U R                  U R                  [        U R                  5       U R	                  5       UU R
                  U R                  S9nU R                  R                  UR                  U R                  R                  U5      U[        R                  S9$ )zdRun the full aggregation pipeline for this ChangeStream and return
the corresponding CommandCursor.
)result_processorre   )	operation)rt   rJ   r   r   r   r   rX   rw   _retryable_read
get_cursor_read_preference_forr   	AGGREGATE)r\   rc   explicit_sessioncmds       r6   _run_aggregation_cmd!ChangeStream._run_aggregation_cmd   s     --LL&&(!!#!11MM . 
 ||++NNLL--g6mm	 , 
 	
r8   c                    U R                   R                  U R                  SS9 nU R                  XR                  S LS9sS S S 5        $ ! , (       d  f       g = f)NFclose)rc   r   )rw   _tmp_sessionrW   r   )r\   ss     r6   rl   ChangeStream._create_cursor  sG    \\&&t}}E&Ba,,Q^bIb,c CBBs   A


Ac                     U R                   R                  5         U R                  5       U l         g! [         a     N"f = f)z7Reestablish this change stream after a resumable error.N)rm   r   r   rl   rn   s    r6   _resumeChangeStream._resume  s=    	LL  **,  		s   2 
??c                F    SU l         U R                  R                  5         g)zClose this ChangeStream.TN)rY   rm   r   rn   s    r6   r   ChangeStream.close  s    r8   c                    U $ rk    rn   s    r6   __iter__ChangeStream.__iter__      r8   c                B    [         R                  " U R                  5      $ )ztThe cached resume token that will be used to resume after the most
recently returned change.

.. versionadded:: 3.9
)rK   rL   rR   rn   s    r6   r   ChangeStream.resume_token  s     }}T//00r8   c                    U R                   (       a(  U R                  5       nUb  U$ U R                   (       a  M(  [        e)a#  Advance the cursor.

This method blocks until the next change document is returned or an
unrecoverable error is raised. This method is used when iterating over
all changes in the cursor. For example::

    try:
        resume_token = None
        pipeline = [{'$match': {'operationType': 'insert'}}]
        with db.collection.watch(pipeline) as stream:
            for insert_change in stream:
                print(insert_change)
                resume_token = stream.resume_token
    except pymongo.errors.PyMongoError:
        # The ChangeStream encountered an unrecoverable error or the
        # resume attempt failed to recreate the cursor.
        if resume_token is None:
            # There is no usable resume token because there was a
            # failure during ChangeStream initialization.
            logging.error('...')
        else:
            # Use the interrupted ChangeStream's resume token to create
            # a new ChangeStream. The new stream will continue from the
            # last seen insert change without missing any events.
            with db.collection.watch(
                    pipeline, resume_after=resume_token) as stream:
                for insert_change in stream:
                    print(insert_change)

Raises :exc:`StopIteration` if this ChangeStream is closed.
)alivetry_nextStopIteration)r\   docs     r6   nextChangeStream.next!  s4    B jj--/C
 jjj
 r8   c                $    U R                   (       + $ )zDoes this cursor have the potential to return more data?

.. note:: Even if :attr:`alive` is ``True``, :meth:`next` can raise
    :exc:`StopIteration` and :meth:`try_next` can return ``None``.

.. versionadded:: 3.8
)rY   rn   s    r6   r   ChangeStream.aliveK  s     <<r8   c                t   U R                   (       d+  U R                  R                  (       d  U R                  5           U R                  R	                  S5      nU R                  R                  (       d  SU l         Uc;  U R                  R                  b"  U R                  R                  U l        SU l        U$  US   nU R                  R                  5       (       d1  U R                  R                  (       a  U R                  R                  nSU l        SU l        X0l        SU l        U R$                  (       a   ['        UR(                  U R*                  5      $ U$ ! [
         aG  n[        U5      (       d  e U R                  5         U R                  R	                  S5      n SnAGNPSnAff = f! [
         a7  n[        U5      (       d!  UR                  (       d  U R                  5         e SnAf[         a    U R                  5         e f = f! [         a    U R                  5         [        S5      Sef = f)a  Advance the cursor without blocking indefinitely.

This method returns the next change document without waiting
indefinitely for the next change. For example::

    with db.collection.watch() as stream:
        while stream.alive:
            change = stream.try_next()
            # Note that the ChangeStream's resume token may be updated
            # even when no changes are returned.
            print("Current resume token: %r" % (stream.resume_token,))
            if change is not None:
                print("Change document: %r" % (change,))
                continue
            # We end up here when there are no recent changes.
            # Sleep for a while before trying again to avoid flooding
            # the server with getMore requests when no changes are
            # available.
            time.sleep(10)

If no change document is cached locally then this method runs a single
getMore command. If the getMore yields any documents, the next
document is returned, otherwise, if the getMore returns no documents
(because there have been no changes) then ``None`` is returned.

:return: The next change document or ``None`` when no document is available
  after running a single getMore or when the cursor is closed.

.. versionadded:: 3.8
TFN_idzECannot provide resume functionality when the resume token is missing.)rY   rm   r   r   	_try_nextr   r7   timeoutr   BaseException_post_batch_resume_tokenrR   rV   KeyErrorr   	_has_nextrP   rQ   rE   r   rawrF   )r\   changer5   r   s       r6   r   ChangeStream.try_nextV  s   @ ||DLL$6$6LLN	7//5" ||!!DL >
 ||44@%)\\%J%J"04-M	!%=L ||%%''DLL,Q,Q<<@@L "'"& *(,% T-E-EFFm   7!#//6	7
  	c??3;;

 	JJL	,  	JJL"W	sA   E ;H 
F/(<F*$F2 *F//F2 2
H<2G..H'H7c                    U $ rk   r   rn   s    r6   	__enter__ChangeStream.__enter__  r   r8   c                $    U R                  5         g rk   r   )r\   exc_typeexc_valexc_tbs       r6   __exit__ChangeStream.__exit__  s    

r8   )rT   rY   rU   rX   rm   rE   rN   rO   rS   rF   rM   rR   rW   r[   rV   rJ   rZ   rQ   rP   )NNN)r]   zUUnion[MongoClient[_DocumentType], Database[_DocumentType], Collection[_DocumentType]]r=   zOptional[_Pipeline]r>   Optional[str]r^   Optional[Mapping[str, Any]]r_   Optional[int]r`   r   ra   zOptional[_CollationIn]rb   zOptional[Timestamp]rc   Optional[ClientSession]rd   r   re   zOptional[Any]rf   r   rg   zOptional[bool]returnNone)r   r   )r   zType[_AggregationCommand])r   r+   r   zdict[str, Any])r   zlist[dict[str, Any]])r   zMapping[str, Any]r   r,   r   r   )rc   r   r   boolr   r   )r   r   )r   zChangeStream[_DocumentType])r   r   )r   r   )r   r   )r   zOptional[_DocumentType])r   r   r   r   r   r   r   r   )__name__
__module____qualname____firstlineno____doc__rh   ro   propertyrt   rw   r   r   r   r   r   rl   r   r   r   r   r   applyr   __next__r   r   r   r   __static_attributes__r   r8   r6   r:   r:   Z   s   	6 "&59/3%5:
5: &5: %5: 25: )5: "5: *5: "55: )5: 15:  !5:" &3#5:$ -%5:& 
'5:n- " " " "02
.
BF
	
,d-
 1 1 [[% %N H    [[] ]~r8   r:   c                  L    \ rS rSr% SrS\S'   \S	S j5       r\S
S j5       rSr	g)CollectionChangeStreami  zA change stream that watches changes on a single collection.

Should not be called directly by application developers. Use
helper method :meth:`pymongo.collection.Collection.watch` instead.

.. versionadded:: 3.7
zCollection[_DocumentType]rJ   c                    [         $ rk   )r   rn   s    r6   rt   1CollectionChangeStream._aggregation_command_class  s    ,,r8   c                B    U R                   R                  R                  $ rk   )rJ   databaseclientrn   s    r6   rw   CollectionChangeStream._client  s    ||$$+++r8   r   N)r   z#Type[_CollectionAggregationCommand]r   zMongoClient[_DocumentType]
r   r   r   r   r   __annotations__r   rt   rw   r   r   r8   r6   r   r     s5     '&- - , ,r8   r   c                  L    \ rS rSr% SrS\S'   \S	S j5       r\S
S j5       rSr	g)DatabaseChangeStreami  zA change stream that watches changes on all collections in a database.

Should not be called directly by application developers. Use
helper method :meth:`pymongo.database.Database.watch` instead.

.. versionadded:: 3.7
zDatabase[_DocumentType]rJ   c                    [         $ rk   )r   rn   s    r6   rt   /DatabaseChangeStream._aggregation_command_class  s    **r8   c                .    U R                   R                  $ rk   )rJ   r   rn   s    r6   rw   DatabaseChangeStream._client  s    ||"""r8   r   N)r   z!Type[_DatabaseAggregationCommand]r   r   r   r8   r6   r   r     s5     %$+ + # #r8   r   c                  0   ^  \ rS rSrSrSU 4S jjrSrU =r$ )ClusterChangeStreami  zA change stream that watches changes on all collections in the cluster.

Should not be called directly by application developers. Use
helper method :meth:`pymongo.mongo_client.MongoClient.watch` instead.

.. versionadded:: 3.7
c                .   > [         TU ]  5       nSUS'   U$ )NTallChangesForCluster)superr   )r\   r   	__class__s     r6   r   *ClusterChangeStream._change_stream_options  s     '02*.&'r8   r   r   )r   r   r   r   r   r   r   __classcell__)r   s   @r6   r   r     s     r8   r   )r5   r   r   r   )<r   
__future__r   rK   typingr   r   r   r   r   r	   r
   bsonr   r   bson.raw_bsonr   bson.timestampr   pymongor   r   pymongo.collationr   pymongo.errorsr   r   r   r   r   pymongo.operationsr   pymongo.synchronous.aggregationr   r   r   "pymongo.synchronous.command_cursorr   pymongo.typingsr   r   r   _IS_SYNC	frozensetr4   "pymongo.synchronous.client_sessionr(   pymongo.synchronous.collectionr)   pymongo.synchronous.databaser*    pymongo.synchronous.mongo_clientr+   pymongo.synchronous.poolr,   r7   r:   r   r   r   r   r8   r6   <module>r     s    H "  N N N , ) $ ! 8  # 
 = B B & . @95<3
`7=) `F,\-8 ,(#<6 #(.}= r8   