Skip to content

Commit e721d24

Browse files
authored
DOCSP-34008: Split large change events (#374)
* DOCSP-34008: Split large change events * fixes * edits * fixes to async code * fixes to sync code * wording * admonition * small fix * MM feedback * FP feedback * tech feedback * edits * final tech feedback
1 parent 6f4000d commit e721d24

File tree

2 files changed

+194
-11
lines changed

2 files changed

+194
-11
lines changed

source/fundamentals/crud/read-operations/change-streams.txt

+81-7
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ for only specified change events. Create the pipeline by using the
111111
You can specify the following aggregation stages in the ``pipeline`` parameter:
112112

113113
- ``$addFields``
114+
- ``$changeStreamSplitLargeEvent``
114115
- ``$match``
115116
- ``$project``
116117
- ``$replaceRoot``
@@ -119,9 +120,19 @@ You can specify the following aggregation stages in the ``pipeline`` parameter:
119120
- ``$set``
120121
- ``$unset``
121122

122-
To learn how to build an aggregation pipeline by using the
123-
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
124-
the Operations with Builders guide.
123+
.. tip::
124+
125+
To learn how to build an aggregation pipeline by using the
126+
``PipelineDefinitionBuilder`` class, see :ref:`csharp-builders-aggregation` in
127+
the Operations with Builders guide.
128+
129+
To learn more about modifying your change stream output, see the
130+
:manual:`Modify Change Stream Output
131+
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
132+
manual.
133+
134+
Monitor Update Events Example
135+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
125136

126137
The following example uses the ``pipeline`` parameter to open a change stream
127138
that records only update operations. Select the :guilabel:`Asynchronous` or :guilabel:`Synchronous` tab to see the
@@ -145,10 +156,73 @@ corresponding code.
145156
:end-before: end-change-stream-pipeline
146157
:language: csharp
147158

148-
To learn more about modifying your change stream output, see the
149-
:manual:`Modify Change Stream Output
150-
</changeStreams/#modify-change-stream-output>` section in the {+mdb-server+}
151-
manual.
159+
Split Large Change Events Example
160+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
161+
162+
If your application generates change events that exceed 16 MB in size, the
163+
server returns a ``BSONObjectTooLarge`` error. To avoid this error, you can use
164+
the ``$changeStreamSplitLargeEvent`` pipeline stage to split the events
165+
into smaller fragments. The {+driver-short+} aggregation API includes the
166+
``ChangeStreamSplitLargeEvent()`` method, which you can use to add the
167+
``$changeStreamSplitLargeEvent`` stage to the change stream pipeline.
168+
169+
This example instructs the driver to watch for changes and split
170+
change events that exceed the 16 MB limit. The code prints the
171+
change document for each event and calls helper methods to
172+
reassemble any event fragments:
173+
174+
.. tabs::
175+
176+
.. tab:: Asynchronous
177+
:tabid: change-stream-split-async
178+
179+
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
180+
:start-after: start-split-change-event-async
181+
:end-before: end-split-change-event-async
182+
:language: csharp
183+
184+
.. tab:: Synchronous
185+
:tabid: change-stream-split-sync
186+
187+
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
188+
:start-after: start-split-change-event-sync
189+
:end-before: end-split-change-event-sync
190+
:language: csharp
191+
192+
.. note::
193+
194+
We recommend reassembling change event fragments, as shown in the
195+
preceding example, but this step is optional. You can use the same
196+
logic to watch both split and complete change events.
197+
198+
The preceding example uses the ``GetNextChangeStreamEvent()``,
199+
``GetNextChangeStreamEventAsync()``, and ``MergeFragment()``
200+
methods to reassemble change event fragments into a single change stream document.
201+
The following code defines these methods:
202+
203+
.. tabs::
204+
205+
.. tab:: Asynchronous
206+
:tabid: split-event-helpers-async
207+
208+
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
209+
:start-after: start-split-event-helpers-async
210+
:end-before: end-split-event-helpers-async
211+
:language: csharp
212+
213+
.. tab:: Synchronous
214+
:tabid: split-event-helpers-sync
215+
216+
.. literalinclude:: /includes/code-examples/change-streams/change-streams.cs
217+
:start-after: start-split-event-helpers-sync
218+
:end-before: end-split-event-helpers-sync
219+
:language: csharp
220+
221+
.. tip::
222+
223+
To learn more about splitting large change events, see
224+
:manual:`$changeStreamSplitLargeEvent </reference/operator/aggregation/changeStreamSplitLargeEvent/>`
225+
in the {+mdb-server+} manual.
152226

153227
Modify ``Watch()`` Behavior
154228
---------------------------

source/includes/code-examples/change-streams/change-streams.cs

+113-4
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ await cursor.ForEachAsync(change =>
4242
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
4343

4444
// Opens a change stream and prints the changes as they're received
45-
using (var cursor = await _restaurantsCollection.WatchAsync(pipeline))
45+
using (var cursor = await collection.WatchAsync(pipeline))
4646
{
4747
await cursor.ForEachAsync(change =>
4848
{
@@ -56,7 +56,7 @@ await cursor.ForEachAsync(change =>
5656
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
5757

5858
// Opens a change streams and print the changes as they're received
59-
using (var cursor = _restaurantsCollection.Watch(pipeline))
59+
using (var cursor = collection.Watch(pipeline))
6060
{
6161
foreach (var change in cursor.ToEnumerable())
6262
{
@@ -65,6 +65,115 @@ await cursor.ForEachAsync(change =>
6565
}
6666
// end-change-stream-pipeline
6767

68+
// start-split-event-helpers-sync
69+
// Fetches the next complete change stream event
70+
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
71+
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
72+
{
73+
while (changeStreamEnumerator.MoveNext())
74+
{
75+
var changeStreamEvent = changeStreamEnumerator.Current;
76+
if (changeStreamEvent.SplitEvent != null)
77+
{
78+
var fragment = changeStreamEvent;
79+
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
80+
{
81+
changeStreamEnumerator.MoveNext();
82+
fragment = changeStreamEnumerator.Current;
83+
MergeFragment(changeStreamEvent, fragment);
84+
}
85+
}
86+
yield return changeStreamEvent;
87+
}
88+
}
89+
90+
// Merges a fragment into the base event
91+
private static void MergeFragment<TDocument>(
92+
ChangeStreamDocument<TDocument> changeStreamEvent,
93+
ChangeStreamDocument<TDocument> fragment)
94+
{
95+
foreach (var element in fragment.BackingDocument)
96+
{
97+
if (element.Name != "_id" && element.Name != "splitEvent")
98+
{
99+
changeStreamEvent.BackingDocument[element.Name] = element.Value;
100+
}
101+
}
102+
}
103+
// end-split-event-helpers-sync
104+
105+
// start-split-event-helpers-async
106+
// Fetches the next complete change stream event
107+
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
108+
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
109+
{
110+
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
111+
while (await changeStreamEnumerator.MoveNextAsync())
112+
{
113+
var changeStreamEvent = changeStreamEnumerator.Current;
114+
if (changeStreamEvent.SplitEvent != null)
115+
{
116+
var fragment = changeStreamEvent;
117+
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
118+
{
119+
await changeStreamEnumerator.MoveNextAsync();
120+
fragment = changeStreamEnumerator.Current;
121+
MergeFragment(changeStreamEvent, fragment);
122+
}
123+
}
124+
yield return changeStreamEvent;
125+
}
126+
}
127+
128+
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
129+
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
130+
{
131+
while (await changeStreamCursor.MoveNextAsync())
132+
{
133+
foreach (var changeStreamEvent in changeStreamCursor.Current)
134+
{
135+
yield return changeStreamEvent;
136+
}
137+
}
138+
}
139+
140+
// Merges a fragment into the base event
141+
private static void MergeFragment<TDocument>(
142+
ChangeStreamDocument<TDocument> changeStreamEvent,
143+
ChangeStreamDocument<TDocument> fragment)
144+
{
145+
foreach (var element in fragment.BackingDocument)
146+
{
147+
if (element.Name != "_id" && element.Name != "splitEvent")
148+
{
149+
changeStreamEvent.BackingDocument[element.Name] = element.Value;
150+
}
151+
}
152+
}
153+
// end-split-event-helpers-async
154+
155+
// start-split-change-event-sync
156+
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
157+
.ChangeStreamSplitLargeEvent();
158+
159+
using var cursor = collection.Watch(pipeline);
160+
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
161+
{
162+
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
163+
}
164+
// end-split-change-event-sync
165+
166+
// start-split-change-event-async
167+
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
168+
.ChangeStreamSplitLargeEvent();
169+
170+
using var cursor = await collection.WatchAsync(pipeline);
171+
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
172+
{
173+
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
174+
}
175+
// end-split-change-event-async
176+
68177
// start-change-stream-post-image
69178
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
70179
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
@@ -74,7 +183,7 @@ await cursor.ForEachAsync(change =>
74183
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
75184
};
76185

77-
using (var cursor = _restaurantsCollection.Watch(pipeline, options))
186+
using (var cursor = collection.Watch(pipeline, options))
78187
{
79188
foreach (var change in cursor.ToEnumerable())
80189
{
@@ -92,7 +201,7 @@ await cursor.ForEachAsync(change =>
92201
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
93202
};
94203

95-
using var cursor = await _restaurantsCollection.WatchAsync(pipeline, options);
204+
using var cursor = await collection.WatchAsync(pipeline, options);
96205
await cursor.ForEachAsync(change =>
97206
{
98207
Console.WriteLine(change.FullDocument.ToBsonDocument());

0 commit comments

Comments
 (0)