-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathchange-streams.txt
323 lines (247 loc) · 12.6 KB
/
change-streams.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
.. _csharp-change-streams:
====================
Monitor Data Changes
====================
.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol
.. facet::
:name: genre
:values: reference
.. meta::
:keywords: watch, code example
Overview
--------
In this guide, you can learn how to use a **change stream** to monitor real-time
changes to your data. A change stream is a {+mdb-server+} feature that
allows your application to subscribe to data changes on a collection, database,
or deployment.
Sample Data
~~~~~~~~~~~
The examples in this guide use the ``sample_restaurants.restaurants`` collection
from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the :ref:`<csharp-quickstart>`.
The examples on this page use the following ``Restaurant``, ``Address``, and ``GradeEntry``
classes as models:
.. literalinclude:: /includes/code-examples/Restaurant.cs
:language: csharp
:copyable:
:dedent:
.. literalinclude:: /includes/code-examples/Address.cs
:language: csharp
:copyable:
:dedent:
.. literalinclude:: /includes/code-examples/GradeEntry.cs
:language: csharp
:copyable:
:dedent:
.. include:: /includes/convention-pack-note.rst
Open a Change Stream
--------------------
To open a change stream, call the ``Watch()`` or ``WatchAsync()`` method. The instance on which you
call the method determines the scope of events that the change
stream listens for. You can call the ``Watch()`` or ``WatchAsync()`` method on the following
classes:
- ``MongoClient``: To monitor all changes in the MongoDB deployment
- ``Database``: To monitor changes in all collections in the database
- ``Collection``: To monitor changes in the collection
The following example opens a change stream on the ``restaurants`` collection
and outputs the changes as they occur. Select the
:guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the corresponding
code.
.. tabs::
.. tab:: Asynchronous
:tabid: change-stream-async
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-open-change-stream-async
:end-before: end-open-change-stream-async
:language: csharp
.. tab:: Synchronous
:tabid: change-stream-sync
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-open-change-stream
:end-before: end-open-change-stream
:language: csharp
To begin watching for changes, run the application. Then, in a separate
application or shell, modify the ``restaurants`` collection. Updating a document
that has a ``"name"`` value of ``"Blarney Castle"`` results in the following
change stream output:
.. code-block:: sh
:copyable: false
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }
Modify the Change Stream Output
-------------------------------
You can pass the ``pipeline`` parameter to the ``Watch()`` and ``WatchAsync()``
methods to modify the change stream output. This parameter allows you to watch
for only specified change events. Create the pipeline by using the
``EmptyPipelineDefinition`` class and appending the relevant aggregation stage methods.
You can specify the following aggregation stages in the ``pipeline`` parameter:
- ``$addFields``
- ``$match``
- ``$project``
- ``$replaceRoot``
- ``$replaceWith``
- ``$redact``
- ``$set``
- ``$unset``
To learn how to build an aggregation pipeline by using the
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
the Operations with Builders guide.
The following example uses the ``pipeline`` parameter to open a change stream
that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the
corresponding code.
.. tabs::
.. tab:: Asynchronous
:tabid: change-stream-async
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-pipeline-async
:end-before: end-change-stream-pipeline-async
:language: csharp
.. tab:: Synchronous
:tabid: change-stream-sync
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-pipeline
:end-before: end-change-stream-pipeline
:language: csharp
To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.
Modify ``Watch()`` Behavior
---------------------------
The ``Watch()`` and ``WatchAsync()`` methods accept optional parameters, which represent
options you can use to configure the operation. If you don't specify any
options, the driver does not customize the operation.
The following table describes the options you can set to customize the behavior
of ``Watch()`` and ``WatchAsync()``:
.. list-table::
:widths: 30 70
:header-rows: 1
* - Option
- Description
* - ``FullDocument``
- | Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see :ref:`csharp-change-stream-pre-post-image`.
* - ``FullDocumentBeforeChange``
- | Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see :ref:`csharp-change-stream-pre-post-image`.
* - ``ResumeAfter``
- | Directs ``Watch()`` or ``WatchAsync()`` to resume returning changes after the
operation specified in the resume token.
| Each change stream event document includes a resume token as the ``_id``
field. Pass the entire ``_id`` field of the change event document that
represents the operation you want to resume after.
| ``ResumeAfter`` is mutually exclusive with ``StartAfter`` and ``StartAtOperationTime``.
* - ``StartAfter``
- | Directs ``Watch()`` or ``WatchAsync()`` to start a new change stream after the
operation specified in the resume token. Allows notifications to
resume after an invalidate event.
| Each change stream event document includes a resume token as the ``_id``
field. Pass the entire ``_id`` field of the change event document that
represents the operation you want to resume after.
| ``StartAfter`` is mutually exclusive with ``ResumeAfter`` and ``StartAtOperationTime``.
* - ``StartAtOperationTime``
- | Directs ``Watch()`` or ``WatchAsync()`` to return only events that occur after the
specified timestamp.
| ``StartAtOperationTime`` is mutually exclusive with ``ResumeAfter`` and ``StartAfter``.
* - ``MaxAwaitTime``
- | Specifies the maximum amount of time, in milliseconds, the server waits for new
data changes to report to the change stream cursor before returning an
empty batch. Defaults to 1000 milliseconds.
* - ``ShowExpandedEvents``
- | Starting in {+mdb-server+} v6.0, change streams support change notifications
for Data Definition Language (DDL) events, such as the ``createIndexes`` and ``dropIndexes`` events. To
include expanded events in a change stream, create the change stream
cursor and set this parameter to ``True``.
* - ``batchSize``
- | Specifies the maximum number of documents that a change
stream can return in each batch, which applies to ``Watch()`` or
``WatchAsync()``. If the ``batchSize`` option is not set, watch functions have an
initial batch size of ``101`` documents and a maximum size of 16 mebibytes (MiB)
for each subsequent batch. This option can
enforce a smaller limit than 16 MiB, but not a larger one. If
you set ``batchSize`` to a limit that result in batches
larger than 16 MiB, this option has no effect and ``Watch()``
or ``WatchAsync()`` uses the default batch size.
* - ``Collation``
- | Specifies the collation to use for the change stream cursor.
* - ``Comment``
- | Attaches a comment to the operation.
.. _csharp-change-stream-pre-post-image:
Include Pre-Images and Post-Images
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. important::
You can enable pre-images and post-images on collections only if your
deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, create a ``ChangeStreamOptions`` object and specify the
``FullDocumentBeforeChange`` or the ``FullDocument`` options. Then, pass the
``ChangeStreamOptions`` object to the ``Watch()`` or ``WatchAsync()`` method.
The **pre-image** is the full version of a document *before* a change. To include the
pre-image in the change stream event, set the ``FullDocumentBeforeChange``
option to one of the following values:
- ``ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable``: The change event
includes a pre-image of the modified document for change events only if the
pre-image is available.
- ``ChangeStreamFullDocumentBeforeChangeOption.Required``:
The change event includes a pre-image of the modified document for change
events. If the pre-image is not available, the driver raises an error.
The **post-image** is the full version of a document *after* a change. To include the
post-image in the change stream event, set the ``FullDocument`` option to
one of the following values:
- ``ChangeStreamFullDocumentOption.UpdateLookup``: The change event includes a
copy of the entire changed document from some time after the change.
- ``ChangeStreamFullDocumentOption.WhenAvailable``: The change event includes a
post-image of the modified document for change events only if the post-image
is available.
- ``ChangeStreamFullDocumentOption.Required``: The change event includes a
post-image of the modified document for change events. If the post-image is
not available, the driver raises an error.
The following example opens a change stream on a collection and includes the post-image
of updated documents by specifying the ``FullDocument`` option. Select the
:guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the corresponding
code.
.. tabs::
.. tab:: Asynchronous
:tabid: change-stream-async
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-post-image-async
:end-before: end-change-stream-post-image-async
:language: csharp
.. tab:: Synchronous
:tabid: change-stream-sync
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
:start-after: start-change-stream-post-image
:end-before: end-change-stream-post-image
:language: csharp
Running the preceding code example and updating a document that has a ``"name"``
value of ``"Blarney Castle"`` results in the following change stream output:
.. code-block:: sh
:copyable: false
{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }
To learn more about pre-images and post-images, see
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
in the {+mdb-server+} manual.
Additional Information
----------------------
To learn more about change streams, see :manual:`Change Streams
</changeStreams>` in the {+mdb-server+} manual.
API Documentation
~~~~~~~~~~~~~~~~~
To learn more about any of the methods or types discussed in this
guide, see the following API documentation:
- `Watch() <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.IMongoClient.Watch.html>`__
- `WatchAsync() <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.IMongoClient.WatchAsync.html>`__
- `ChangeStreamOptions <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.ChangeStreamOptions.html>`__
- `UpdateOne() <{+new-api-root+}/MongoDB.Driver/MongoDB.Driver.IMongoCollectionExtensions.UpdateOne.html>`__