-
Notifications
You must be signed in to change notification settings - Fork 13
DOCSP-43081: Change Streams #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
mcmorisi
merged 4 commits into
mongodb:standardization
from
mcmorisi:DOCSP-43081-change-streams
Sep 18, 2024
Merged
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| #include <bson/bson.h> | ||
| #include <mongoc/mongoc.h> | ||
| #include <stdio.h> | ||
|
|
||
| int | ||
| main (int argc, char *argv[]) | ||
| { | ||
| mongoc_client_t *client; | ||
| mongoc_collection_t *collection; | ||
| mongoc_init (); | ||
|
|
||
| client = | ||
| mongoc_client_new ("<connection string URI>"); | ||
| collection = mongoc_client_get_collection (client, "sample_restaurants", "restaurants"); | ||
|
|
||
| { | ||
| // Opens a change stream on the collection and prints each change | ||
| // start-open-change-stream | ||
| bson_t *pipeline = bson_new (); | ||
| // Change stream will wait 1 minute for changes | ||
| bson_t *opts = BCON_NEW ("maxAwaitTimeMS", BCON_INT64 (60000)); | ||
| const bson_t *doc; | ||
|
|
||
| mongoc_change_stream_t *change_stream = | ||
| mongoc_collection_watch (collection, pipeline, opts); | ||
|
|
||
| while (mongoc_change_stream_next (change_stream, &doc)) { | ||
| char *str = bson_as_canonical_extended_json (doc, NULL); | ||
| printf ("Received change: %s\n", str); | ||
| bson_free (str); | ||
| } | ||
|
|
||
| bson_destroy (pipeline); | ||
| bson_destroy (opts); | ||
| mongoc_change_stream_destroy (change_stream); | ||
| // end-open-change-stream | ||
| } | ||
| { | ||
| // Updates a document in the collection to trigger a change event | ||
| // start-update-for-change-stream | ||
| bson_t *filter = BCON_NEW ("name", BCON_UTF8 ("Blarney Castle")); | ||
| bson_t *update = BCON_NEW("$set", "{", "cuisine", BCON_UTF8 ("Irish"), "}"); | ||
|
|
||
| mongoc_collection_update_one (collection, filter, update, NULL, NULL, NULL); | ||
| // end-update-for-change-stream | ||
| } | ||
| { | ||
| // Opens a change stream on the collection and specifies a pipeline to only receive update events | ||
| // start-change-stream-pipeline | ||
| bson_t *pipeline = BCON_NEW ( | ||
| "pipeline", "[", | ||
| "{", "$match", "{", "operationType", BCON_UTF8 ("update"), "}", "}", | ||
| "]"); | ||
| const bson_t *doc; | ||
|
|
||
| mongoc_change_stream_t *change_stream = | ||
| mongoc_collection_watch (collection, pipeline, NULL); | ||
|
|
||
| while (mongoc_change_stream_next (change_stream, &doc)) { | ||
| char *str = bson_as_canonical_extended_json (doc, NULL); | ||
| printf ("Received change: %s\n", str); | ||
| bson_free (str); | ||
| } | ||
|
|
||
| bson_destroy (pipeline); | ||
| mongoc_change_stream_destroy (change_stream); | ||
| // end-change-stream-pipeline | ||
| } | ||
| { | ||
| // Opens a change stream on the collection and specifies options to receive the full document for update events | ||
| // start-change-stream-post-image | ||
| bson_t *pipeline = bson_new (); | ||
| // Change stream will wait 1 minute for changes | ||
| bson_t *opts = BCON_NEW ("maxAwaitTimeMS", BCON_INT64 (60000), | ||
| "fullDocument", BCON_UTF8 ("updateLookup")); | ||
| const bson_t *doc; | ||
|
|
||
| mongoc_change_stream_t *change_stream = | ||
| mongoc_collection_watch (collection, pipeline, opts); | ||
|
|
||
| while (mongoc_change_stream_next (change_stream, &doc)) { | ||
| char *str = bson_as_canonical_extended_json (doc, NULL); | ||
| printf ("Received change: %s\n", str); | ||
| bson_free (str); | ||
| } | ||
|
|
||
| bson_destroy (pipeline); | ||
| bson_destroy (opts); | ||
| mongoc_change_stream_destroy (change_stream); | ||
| // end-change-stream-post-image | ||
| } | ||
|
|
||
| mongoc_collection_destroy (collection); | ||
| mongoc_client_destroy (client); | ||
| mongoc_cleanup (); | ||
|
|
||
| return EXIT_SUCCESS; | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,278 @@ | ||
| .. _c-change-streams: | ||
|
|
||
| ==================== | ||
| Monitor Data Changes | ||
| ==================== | ||
|
|
||
| .. contents:: On this page | ||
| :local: | ||
| :backlinks: none | ||
| :depth: 2 | ||
| :class: singlecol | ||
|
|
||
| .. facet:: | ||
| :name: genre | ||
| :values: reference | ||
|
|
||
| .. meta:: | ||
| :keywords: watch, code example | ||
|
|
||
| Overview | ||
| -------- | ||
|
|
||
| In this guide, you can learn how to use the {+driver-short+} to monitor a **change stream**, | ||
| allowing you to view real-time changes to your data. A change stream is a {+mdb-server+} feature that | ||
| publishes data changes on a collection, database, or deployment. Your application can | ||
| subscribe to a change stream and use events to perform other actions. | ||
|
|
||
| Sample Data | ||
| ~~~~~~~~~~~ | ||
|
|
||
| The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants`` | ||
| database from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a | ||
| free MongoDB Atlas cluster and load the sample datasets, see the | ||
| :atlas:`Get Started with Atlas </getting-started>` guide. | ||
|
|
||
| Open a Change Stream | ||
| -------------------- | ||
|
|
||
| To open a change stream, call one of the following functions that corresponds to the | ||
| scope of events you want to observe: | ||
|
|
||
| - ``mongoc_client_watch()``: Monitors all changes in the MongoDB deployment | ||
| - ``mongoc_database_watch()``: Monitors changes in all collections in the database | ||
| - ``mongoc_collection_watch()``: Monitors changes in the collection | ||
|
|
||
| Open a Change Stream Example | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| The following example opens a change stream on the ``restaurants`` collection | ||
| and prints changes as they occur: | ||
|
|
||
| .. literalinclude:: /includes/read/change-streams.c | ||
| :start-after: start-open-change-stream | ||
| :end-before: end-open-change-stream | ||
| :language: c | ||
| :copyable: | ||
| :dedent: | ||
|
|
||
| To begin watching for changes, run the application. Then, in a separate | ||
| application or shell, perform a write operation on the ``restaurants`` collection. The | ||
| following example updates a document in which the value of the ``name`` field is ``"Blarney Castle"``: | ||
|
|
||
| .. _c-change-stream-update: | ||
|
|
||
| .. literalinclude:: /includes/read/change-streams.c | ||
| :start-after: start-update-for-change-stream | ||
| :end-before: end-update-for-change-stream | ||
| :language: c | ||
| :copyable: | ||
| :dedent: | ||
|
|
||
| When you update the collection, the change stream application prints the change | ||
| as it occurs. The printed change event resembles the | ||
| following: | ||
|
|
||
| .. code-block:: json | ||
|
|
||
| { | ||
| "_id": { ... }, | ||
| "operationType": "update", | ||
| "clusterTime": { ... }, | ||
| "ns": { | ||
| "db": "sample_restaurants", | ||
| "coll": "restaurants" | ||
| }, | ||
| "updateDescription": { | ||
| "updatedFields": { | ||
| "cuisine": "Irish" | ||
| }, | ||
| "removedFields": [], | ||
| "truncatedArrays": [] | ||
| } | ||
| ... | ||
| } | ||
|
|
||
| Modify the Change Stream Output | ||
| ------------------------------- | ||
|
|
||
| You can pass the ``pipeline`` parameter to any watch function to modify the | ||
| change stream output. This parameter allows you to watch for only specified | ||
| change events. Format the parameter as a list of objects, where each object represents an | ||
| aggregation stage. | ||
|
|
||
| You can specify the following stages in the ``pipeline`` parameter: | ||
|
|
||
| - ``$addFields`` | ||
| - ``$match`` | ||
| - ``$project`` | ||
| - ``$replaceRoot`` | ||
| - ``$replaceWith`` | ||
| - ``$redact`` | ||
| - ``$set`` | ||
| - ``$unset`` | ||
|
|
||
| Match Specific Events Example | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| The following example uses the ``pipeline`` parameter to include a ``$match`` stage | ||
| to open a change stream that records only update operations: | ||
|
|
||
| .. literalinclude:: /includes/read/change-streams.c | ||
| :start-after: start-change-stream-pipeline | ||
| :end-before: end-change-stream-pipeline | ||
| :language: c | ||
| :copyable: | ||
| :dedent: | ||
|
|
||
| To learn more about modifying your change stream output, see the | ||
| :manual:`Modify Change Stream Output | ||
| </changeStreams/#modify-change-stream-output>` section in the {+mdb-server+} | ||
| manual. | ||
|
|
||
| Modify Watch Behavior | ||
| --------------------- | ||
|
|
||
| You can modify any watch function by passing options to the function call. If you don't | ||
| specify any options, the driver does not customize the operation. | ||
|
|
||
| The following table describes options you can use to customize the behavior | ||
| of the watch functions: | ||
|
|
||
| .. list-table:: | ||
| :widths: 30 70 | ||
| :header-rows: 1 | ||
|
|
||
| * - Option | ||
| - Description | ||
|
|
||
| * - ``batchSize`` | ||
| - | Sets the number of documents to return per batch. | ||
|
|
||
|
|
||
| * - ``comment`` | ||
| - | Specifies a comment to attach to the operation. | ||
|
|
||
| * - ``fullDocument`` | ||
| - | Sets the ``fullDocument`` value. To learn more, see the | ||
| :ref:`<c-change-stream-pre-post-image>` section of this document. | ||
|
|
||
| * - ``fullDocumentBeforeChange`` | ||
| - | Sets the ``fullDocumentBeforeChange`` value. To learn more, see the | ||
| :ref:`<c-change-stream-pre-post-image>` section of this document. | ||
|
|
||
| * - ``maxAwaitTimeMS`` | ||
| - | Sets the maximum await execution time on the server for this operation, in | ||
| milliseconds. | ||
|
|
||
| For a complete list of options you can use to configure the watch operation, see | ||
| the :manual:`watch method </reference/method/db.collection.watch>` guide in the {+mdb-server+} | ||
| manual. | ||
|
|
||
| .. _c-change-stream-pre-post-image: | ||
|
|
||
| Include Pre-Images and Post-Images | ||
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
|
|
||
| .. important:: | ||
|
|
||
| You can enable pre-images and post-images on collections only if your | ||
| deployment uses MongoDB v6.0 or later. | ||
|
|
||
| By default, when you perform an operation on a collection, the | ||
| corresponding change event includes only the delta of the fields | ||
| modified by that operation. To see the full document before or after a | ||
| change, specify the ``fullDocumentBeforeChange`` or the ``fullDocument`` | ||
| options in your watch function call. | ||
|
|
||
| The **pre-image** is the full version of a document *before* a change. To include the | ||
| pre-image in the change stream event, pass one of the following values to the | ||
| ``fullDocumentBeforeChange`` option: | ||
|
|
||
| - ``whenAvailable``: The change event includes a pre-image of the | ||
| modified document for change events only if the pre-image is available. | ||
| - ``required``: The change event includes a pre-image of the | ||
| modified document for change events. If the pre-image is not available, the | ||
| driver raises an error. | ||
|
|
||
| The **post-image** is the full version of a document *after* a change. To include the | ||
| post-image in the change stream event, pass one of the following values to the | ||
| ``fullDocument`` option: | ||
|
|
||
| - ``updateLookup``: The change event includes a copy of the entire changed | ||
| document from some time after the change. | ||
| - ``whenAvailable``: The change event includes a post-image of the | ||
| modified document for change events only if the post-image is available. | ||
| - ``required``: The change event includes a post-image of the | ||
| modified document for change events. If the post-image is not available, the | ||
| driver raises an error. | ||
|
|
||
| The following example calls the ``mongoc_collection_watch()`` function on a collection and | ||
| includes the post-image of updated documents in the results by specifying the | ||
| ``fullDocument`` option: | ||
|
|
||
| .. literalinclude:: /includes/read/change-streams.c | ||
| :start-after: start-change-stream-post-image | ||
| :end-before: end-change-stream-post-image | ||
| :language: c | ||
| :copyable: | ||
| :dedent: | ||
|
|
||
| With the change stream application running, updating a document in the | ||
| ``restaurants`` collection by using the :ref:`preceding update example | ||
| <c-change-stream-update>` prints a change event resembling the following: | ||
|
|
||
| .. code-block:: json | ||
|
|
||
| { | ||
| "_id": ..., | ||
| "operationType": "update", | ||
| "clusterTime": ..., | ||
| "wallTime": ..., | ||
| "fullDocument": { | ||
| "_id": { | ||
| ... | ||
| }, | ||
| "address": ..., | ||
| "borough": "Queens", | ||
| "cuisine": "Irish", | ||
| "grades": [ ... ], | ||
| "name": "Blarney Castle", | ||
| "restaurant_id": ... | ||
| }, | ||
| "ns": { | ||
| "db": "sample_restaurants", | ||
| "coll": "restaurants" | ||
| }, | ||
| "documentKey": { | ||
| "_id": ... | ||
| }, | ||
| "updateDescription": { | ||
| "updatedFields": { | ||
| "cuisine": "Irish" | ||
| }, | ||
| "removedFields": [], | ||
| "truncatedArrays": [] | ||
| } | ||
| } | ||
|
|
||
| To learn more about pre-images and post-images, see | ||
| :manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>` | ||
| in the {+mdb-server+} manual. | ||
|
|
||
| Additional Information | ||
| ---------------------- | ||
|
|
||
| To learn more about change streams, see :manual:`Change Streams | ||
| </changeStreams>` in the {+mdb-server+} manual. | ||
|
|
||
| API Documentation | ||
| ~~~~~~~~~~~~~~~~~ | ||
|
|
||
| To learn more about any of the functions or types discussed in this | ||
| guide, see the following API documentation: | ||
|
|
||
| - `mongoc_client_watch() <{+api-libmongoc+}/mongoc_client_watch.html>`__ | ||
| - `mongoc_database_watch() <{+api-libmongoc+}/mongoc_database_watch.html>`__ | ||
| - `mongoc_collection_watch() <{+api-libmongoc+}/mongoc_collection_watch.html>`__ | ||
| - `mongoc_change_stream_t <{+api-libmongoc+}/mongoc_change_stream_t.html>`__ |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mongoc_change_stream_nextonly returns true when a change is returned within themaxAwaitTimeMS. If no change was returned, and no error occurred, a later call tomongoc_change_stream_nextcan still return a change.If the examples are intended to show the behavior of "wait (possibly forever) until there is an event or error", then I suggest looping as follows: