Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOCSP-43081: Change Streams #37

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions source/includes/read/change-streams.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#include <bson/bson.h>
#include <mongoc/mongoc.h>
#include <stdio.h>

int
main (int argc, char *argv[])
{
mongoc_client_t *client;
mongoc_collection_t *collection;
mongoc_init ();

client =
mongoc_client_new ("<connection string URI>");
collection = mongoc_client_get_collection (client, "sample_restaurants", "restaurants");

{
// Opens a change stream on the collection and prints each change
// start-open-change-stream
bson_t *pipeline = bson_new ();
// Change stream will wait 1 minute for changes
bson_t *opts = BCON_NEW ("maxAwaitTimeMS", BCON_INT64 (60000));
const bson_t *doc;

mongoc_change_stream_t *change_stream =
mongoc_collection_watch (collection, pipeline, opts);

while (mongoc_change_stream_next (change_stream, &doc)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongoc_change_stream_next only returns true when a change is returned within the maxAwaitTimeMS. If no change was returned, and no error occurred, a later call to mongoc_change_stream_next can still return a change.

If the examples are intended to show the behavior of "wait (possibly forever) until there is an event or error", then I suggest looping as follows:

while (true)
{
  bson_error_t error;
  if (mongoc_change_stream_next(change_stream, &doc))
  {
    char *str = bson_as_canonical_extended_json(doc, NULL);
    printf("Received change: %s\n", str);
    bson_free(str);
  }
  else if (mongoc_change_stream_error_document(change_stream, &error, NULL))
  {
    printf("Got error on change stream: %s\n", error.message);
    break;
  }
}

char *str = bson_as_canonical_extended_json (doc, NULL);
printf ("Received change: %s\n", str);
bson_free (str);
}

bson_destroy (pipeline);
bson_destroy (opts);
mongoc_change_stream_destroy (change_stream);
// end-open-change-stream
}
{
// Updates a document in the collection to trigger a change event
// start-update-for-change-stream
bson_t *filter = BCON_NEW ("name", BCON_UTF8 ("Blarney Castle"));
bson_t *update = BCON_NEW("$set", "{", "cuisine", BCON_UTF8 ("Irish"), "}");

mongoc_collection_update_one (collection, filter, update, NULL, NULL, NULL);
// end-update-for-change-stream
}
{
// Opens a change stream on the collection and specifies a pipeline to only receive update events
// start-change-stream-pipeline
bson_t *pipeline = BCON_NEW (
"pipeline", "[",
"{", "$match", "{", "operationType", BCON_UTF8 ("update"), "}", "}",
"]");
const bson_t *doc;

mongoc_change_stream_t *change_stream =
mongoc_collection_watch (collection, pipeline, NULL);

while (mongoc_change_stream_next (change_stream, &doc)) {
char *str = bson_as_canonical_extended_json (doc, NULL);
printf ("Received change: %s\n", str);
bson_free (str);
}

bson_destroy (pipeline);
mongoc_change_stream_destroy (change_stream);
// end-change-stream-pipeline
}
{
// Opens a change stream on the collection and specifies options to receive the full document for update events
// start-change-stream-post-image
bson_t *pipeline = bson_new ();
// Change stream will wait 1 minute for changes
bson_t *opts = BCON_NEW ("maxAwaitTimeMS", BCON_INT64 (60000),
"fullDocument", BCON_UTF8 ("updateLookup"));
const bson_t *doc;

mongoc_change_stream_t *change_stream =
mongoc_collection_watch (collection, pipeline, opts);

while (mongoc_change_stream_next (change_stream, &doc)) {
char *str = bson_as_canonical_extended_json (doc, NULL);
printf ("Received change: %s\n", str);
bson_free (str);
}

bson_destroy (pipeline);
bson_destroy (opts);
mongoc_change_stream_destroy (change_stream);
// end-change-stream-post-image
}

mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();

return EXIT_SUCCESS;
}
6 changes: 3 additions & 3 deletions source/read.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Read Data from MongoDB
/read/count
/read/distinct
/read/cursors
/read/change-streams

Overview
--------
Expand Down Expand Up @@ -152,6 +153,5 @@ subsequent change events in that collection:
:copyable:
:dedent:

.. TODO: uncomment when watch change stream guide is complete
.. To learn more about the ``mongoc_collection_watch()`` function, see the
.. :ref:`c-change-streams` guide.
To learn more about the ``mongoc_collection_watch()`` function, see the
:ref:`c-change-streams` guide.
278 changes: 278 additions & 0 deletions source/read/change-streams.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
.. _c-change-streams:

====================
Monitor Data Changes
====================

.. contents:: On this page
:local:
:backlinks: none
:depth: 2
:class: singlecol

.. facet::
:name: genre
:values: reference

.. meta::
:keywords: watch, code example

Overview
--------

In this guide, you can learn how to use the {+driver-short+} to monitor a **change stream**,
allowing you to view real-time changes to your database. A change stream is a {+mdb-server+} feature that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S: Change to data since you can open a change stream on more than just a database

Suggested change
allowing you to view real-time changes to your database. A change stream is a {+mdb-server+} feature that
allowing you to view real-time changes to your data. A change stream is a {+mdb-server+} feature that

publishes data changes on a collection, database, or deployment. Your application can
subscribe to a change stream and use events to perform other actions.

Sample Data
~~~~~~~~~~~

The examples in this guide use the ``restaurants`` collection in the ``sample_restaurants``
database from the :atlas:`Atlas sample datasets </sample-data>`. To learn how to create a
free MongoDB Atlas cluster and load the sample datasets, see the
:atlas:`Get Started with Atlas </getting-started>` guide.

Open a Change Stream
--------------------

To open a change stream, call one of the following functions that corresponds to the
scope of events you want to observe.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
scope of events you want to observe.
scope of events you want to observe:


- ``mongoc_client_watch()``: To monitor all changes in the MongoDB deployment
- ``mongoc_database_watch()``: To monitor changes in all collections in the database
- ``mongoc_collection_watch()``: To monitor changes in the collection
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- ``mongoc_client_watch()``: To monitor all changes in the MongoDB deployment
- ``mongoc_database_watch()``: To monitor changes in all collections in the database
- ``mongoc_collection_watch()``: To monitor changes in the collection
- ``mongoc_client_watch()``: Monitors all changes in the MongoDB deployment
- ``mongoc_database_watch()``: Monitors changes in all collections in the database
- ``mongoc_collection_watch()``: Monitors changes in the collection


Open a Change Stream Example
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following example opens a change stream on the ``restaurants`` collection
and prints changes as they occur:

.. literalinclude:: /includes/read/change-streams.c
:start-after: start-open-change-stream
:end-before: end-open-change-stream
:language: c
:copyable:
:dedent:

To begin watching for changes, run the application. Then, in a separate
application or shell, perform a write operation on the ``restaurants`` collection. The
following example updates a document in which the value of the ``name`` field is ``"Blarney Castle"``:

.. _c-change-stream-update:

.. literalinclude:: /includes/read/change-streams.c
:start-after: start-update-for-change-stream
:end-before: end-update-for-change-stream
:language: c
:copyable:
:dedent:

When you update the collection, the change stream application prints the change
as it occurs. The printed change event resembles the
following:

.. code-block:: json

{
"_id": { ... },
"operationType": "update",
"clusterTime": { ... },
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
...
}

Modify the Change Stream Output
-------------------------------

You can pass the ``pipeline`` parameter to any watch function to modify the
change stream output. This parameter allows you to watch for only specified
change events. Format the parameter as a list of objects that each represents an
aggregation stage.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S:

Suggested change
change events. Format the parameter as a list of objects that each represents an
aggregation stage.
change events. Format the parameter as a list of objects, where each object represents an
aggregation stage.


You can specify the following stages in the ``pipeline`` parameter:

- ``$addFields``
- ``$match``
- ``$project``
- ``$replaceRoot``
- ``$replaceWith``
- ``$redact``
- ``$set``
- ``$unset``

Match Specific events Example
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Match Specific events Example
Match Specific Events Example

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The following example uses the ``pipeline`` parameter to include a ``$match`` stage
to open a change stream that only records update operations:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
to open a change stream that only records update operations:
to open a change stream that records only update operations:


.. literalinclude:: /includes/read/change-streams.c
:start-after: start-change-stream-pipeline
:end-before: end-change-stream-pipeline
:language: c
:copyable:
:dedent:

To learn more about modifying your change stream output, see the
:manual:`Modify Change Stream Output
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
manual.

Modify Watch Behavior
---------------------

You can modify any watch function by passing options to the function call. If you don't
specify any options, the driver does not customize the operation.

The following table describes options you can use to customize the behavior
of the watch functions:

.. list-table::
:widths: 30 70
:header-rows: 1

* - Option
- Description

* - ``batchSize``
- | Sets the number of documents to return per batch.


* - ``comment``
- | Specifies a comment to attach to the operation.

* - ``fullDocument``
- | Sets the ``fullDocument`` value. To learn more, see the
:ref:`<c-change-stream-pre-post-image>` section of this document.

* - ``fullDocumentBeforeChange``
- | Sets the ``fullDocumentBeforeChange`` value. To learn more, see the
:ref:`<c-change-stream-pre-post-image>` section of this document.

* - ``maxAwaitTimeMS``
- | Sets the maximum await execution time on the server for this operation, in
milliseconds.

For a complete list of options you can use to configure the watch operation, see
the :manual:`watch method </reference/method/db.collection.watch>` guide in the {+mdb-server+}
manual.

.. _c-change-stream-pre-post-image:

Include Pre-Images and Post-Images
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. important::

You can enable pre-images and post-images on collections only if your
deployment uses MongoDB v6.0 or later.

By default, when you perform an operation on a collection, the
corresponding change event includes only the delta of the fields
modified by that operation. To see the full document before or after a
change, specify the ``fullDocumentBeforeChange`` or the ``fullDocument``
options in your watch function call.

The **pre-image** is the full version of a document *before* a change. To include the
pre-image in the change stream event, pass one of the following values to the
``fullDocumentBeforeChange`` option:

- ``whenAvailable``: The change event includes a pre-image of the
modified document for change events only if the pre-image is available.
- ``required``: The change event includes a pre-image of the
modified document for change events. If the pre-image is not available, the
driver raises an error.

The **post-image** is the full version of a document *after* a change. To include the
post-image in the change stream event, pass one of the following values to the
``fullDocument`` option:

- ``updateLookup``: The change event includes a copy of the entire changed
document from some time after the change.
- ``whenAvailable``: The change event includes a post-image of the
modified document for change events only if the post-image is available.
- ``required``: The change event includes a post-image of the
modified document for change events. If the post-image is not available, the
driver raises an error.

The following example calls the ``mongoc_collection_watch()`` function on a collection and
includes the post-image of updated documents in the results by specifying the
``fullDocument`` option:

.. literalinclude:: /includes/read/change-streams.c
:start-after: start-change-stream-post-image
:end-before: end-change-stream-post-image
:language: c
:copyable:
:dedent:

With the change stream application running, updating a document in the
``restaurants`` collection by using the :ref:`preceding update example
<c-change-stream-update>` prints a change event resembling the following:

.. code-block:: json

{
"_id": ...,
"operationType": "update",
"clusterTime": ...,
"wallTime": ...,
"fullDocument": {
"_id": {
...
},
"address": ...,
"borough": "Queens",
"cuisine": "Irish",
"grades": [ ... ],
"name": "Blarney Castle",
"restaurant_id": ...
},
"ns": {
"db": "sample_restaurants",
"coll": "restaurants"
},
"documentKey": {
"_id": ...
},
"updateDescription": {
"updatedFields": {
"cuisine": "Irish"
},
"removedFields": [],
"truncatedArrays": []
}
}

To learn more about pre-images and post-images, see
:manual:`Change Streams with Document Pre- and Post-Images </changeStreams#change-streams-with-document-pre--and-post-images>`
in the {+mdb-server+} manual.

Additional Information
----------------------

To learn more about change streams, see :manual:`Change Streams
</changeStreams>` in the {+mdb-server+} manual.

API Documentation
~~~~~~~~~~~~~~~~~

To learn more about any of the functions or types discussed in this
guide, see the following API documentation:

- `mongoc_client_watch() <{+api-libmongoc+}/mongoc_client_watch.html>`__
- `mongoc_database_watch() <{+api-libmongoc+}/mongoc_database_watch.html>`__
- `mongoc_collection_watch() <{+api-libmongoc+}/mongoc_collection_watch.html>`__
- `mongoc_change_stream_t <{+api-libmongoc+}/mongoc_change_stream_t.html>`__
Loading