From 65aa3aca22e292bd72e71121458f6f757483bffb Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 12:48:05 -0500 Subject: [PATCH 1/5] save --- internal/verifier/change_reader.go | 4 ++ internal/verifier/change_stream.go | 53 ++++++++++++++----- internal/verifier/change_stream_test.go | 43 +++++++++++++++ internal/verifier/migration_verifier.go | 13 +++++ mmongo/cursor.go | 69 +++++++++++++++++++++++++ 5 files changed, 168 insertions(+), 14 deletions(-) create mode 100644 mmongo/cursor.go diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 056fdbc5..75bda5e9 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -156,6 +156,10 @@ func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] { } func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error { + if len(token) == 0 { + panic("internal error: resume token is empty but should never be") + } + coll := rc.metaDB.Collection(changeReaderCollectionName) _, err := coll.ReplaceOne( ctx, diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 81732825..5e5ef327 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -9,6 +9,7 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/util" "github.com/10gen/migration-verifier/mbson" + "github.com/10gen/migration-verifier/mmongo" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" clone "github.com/huandu/go-clone/generic" @@ -376,11 +377,13 @@ func (csr *ChangeStreamReader) createChangeStream( csStartLogEvent := csr.logger.Info() - if token, hasToken := savedResumeToken.Get(); hasToken { + resumetoken, hasSavedToken := savedResumeToken.Get() + + if hasSavedToken { logEvent := csStartLogEvent. - Stringer(csr.resumeTokenDocID(), token) + Stringer(csr.resumeTokenDocID(), resumetoken) - ts, err := csr.resumeTokenTSExtractor(token) + ts, err := csr.resumeTokenTSExtractor(resumetoken) if err == nil { logEvent = addTimestampToLogEvent(ts, logEvent) } else { @@ -392,9 +395,9 @@ func (csr *ChangeStreamReader) createChangeStream( logEvent.Msg("Starting change stream from persisted resume token.") if util.ClusterHasChangeStreamStartAfter([2]int(csr.clusterInfo.VersionArray)) { - opts = opts.SetStartAfter(token) + opts = opts.SetStartAfter(resumetoken) } else { - opts = opts.SetResumeAfter(token) + opts = opts.SetResumeAfter(resumetoken) } } else { csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType) @@ -410,9 +413,22 @@ func (csr *ChangeStreamReader) createChangeStream( return nil, nil, bson.Timestamp{}, errors.Wrap(err, "opening change stream") } - err = csr.persistResumeToken(ctx, changeStream.ResumeToken()) - if err != nil { - return nil, nil, bson.Timestamp{}, err + if !hasSavedToken { + // Usually the change stream’s initial response is empty, but sometimes + // there are events right away. We can discard those events because + // they’ve already happened, and our initial scan is yet to come. + if len(changeStream.ResumeToken()) == 0 { + _, _, err := mmongo.GetBatch(ctx, changeStream, nil, nil) + + if err != nil { + return nil, nil, bson.Timestamp{}, errors.Wrap(err, "discarding change stream’s initial events") + } + } + + err = csr.persistResumeToken(ctx, changeStream.ResumeToken()) + if err != nil { + return nil, nil, bson.Timestamp{}, errors.Wrapf(err, "persisting initial resume token") + } } startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken()) @@ -428,14 +444,19 @@ func (csr *ChangeStreamReader) createChangeStream( return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session") } - csr.logger.Debug(). - Any("resumeTokenTimestamp", startTs). - Any("clusterTime", clusterTime). - Stringer("changeStreamReader", csr). - Msg("Using earlier time as start timestamp.") - if startTs.After(clusterTime) { + csr.logger.Debug(). + Any("resumeTokenTimestamp", startTs). + Any("clusterTime", clusterTime). + Stringer("changeStreamReader", csr). + Msg("Cluster time predates resume token; using it as start timestamp.") + startTs = clusterTime + } else { + csr.logger.Debug(). + Any("resumeTokenTimestamp", startTs). + Stringer("changeStreamReader", csr). + Msg("Got start timestamp from change stream.") } return changeStream, sess, startTs, nil @@ -532,6 +553,10 @@ func (csr *ChangeStreamReader) String() string { } func extractTSFromChangeStreamResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) { + if len(resumeToken) == 0 { + panic("internal error: resume token is empty but should never be") + } + // Change stream token is always a V1 keystring in the _data field tokenDataRV, err := resumeToken.LookupErr("_data") diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 62a39bca..7c44163a 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -28,6 +28,49 @@ import ( "golang.org/x/sync/errgroup" ) +func (suite *IntegrationTestSuite) TestChangeStreamFilter_InitialNonempty() { + //zerolog.SetGlobalLevel(zerolog.TraceLevel) // gets restored automatically + + ctx := suite.Context() + dbName := suite.DBNameForTest() + + go func() { + for ctx.Err() == nil { + coll := suite.srcMongoClient. + Database(dbName). + Collection("coll") + + _, _ = coll.InsertOne(ctx, bson.D{{"_id", 123}}) + _, _ = coll.DeleteOne(ctx, bson.D{{"_id", 123}}) + } + }() + + for i := range 100 { + suite.Run( + fmt.Sprint(i), + func() { + ctx, cancel := contextplus.WithCancelCause(ctx) + defer cancel(fmt.Errorf("subtest is done")) + + verifier := suite.BuildVerifier() + + rdr, ok := verifier.srcChangeReader.(*ChangeStreamReader) + if !ok { + suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, rdr) + } + + eg, egCtx := contextplus.ErrGroup(ctx) + suite.Require().NoError(rdr.start(egCtx, eg)) + + suite.Require().NoError( + verifier.metaClient.Database(verifier.metaDBName).Drop(ctx), + ) + }, + ) + + } +} + func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() { ctx := suite.Context() diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..ff8eccd6 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -25,8 +25,10 @@ import ( "github.com/dustin/go-humanize" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" + "github.com/rs/zerolog" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/mongo/readconcern" @@ -220,6 +222,17 @@ func (verifier *Verifier) getClientOpts(uri string) *options.ClientOptions { opts.SetReadConcern(readconcern.Majority()) }) + if verifier.logger.GetLevel() <= zerolog.TraceLevel { + opts.SetMonitor(&event.CommandMonitor{ + Succeeded: func(ctx context.Context, cse *event.CommandSucceededEvent) { + verifier.logger.Trace(). + Str("commandName", cse.CommandName). + Stringer("reply", cse.Reply). + Msg("Command succeeded.") + }, + }) + } + return opts } diff --git a/mmongo/cursor.go b/mmongo/cursor.go new file mode 100644 index 00000000..0c1e4827 --- /dev/null +++ b/mmongo/cursor.go @@ -0,0 +1,69 @@ +package mmongo + +import ( + "context" + "fmt" + "slices" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/mongo" +) + +type cursorLike interface { + TryNext(context.Context) bool + RemainingBatchLength() int + Err() error +} + +// GetBatch returns a batch of documents from a cursor. It does so by appending +// to passed-in slices, which lets you optimize memory handling. +func GetBatch[T cursorLike]( + ctx context.Context, + cursor T, + docs []bson.Raw, + buffer []byte, +) ([]bson.Raw, []byte, error) { + var docsCount, expectedCount int + + var curDoc bson.Raw + + for hasDocs := true; hasDocs; hasDocs = cursor.RemainingBatchLength() > 0 { + got := cursor.TryNext(ctx) + + if cursor.Err() != nil { + return nil, nil, errors.Wrap(cursor.Err(), "cursor iteration failed") + } + + if !got { + if docsCount != 0 { + panic(fmt.Sprintf("Docs batch ended after %d but expected %d", docsCount, expectedCount)) + } + + break + } + + // This ensures we only reallocate once (if at all): + if docsCount == 0 { + expectedCount = 1 + cursor.RemainingBatchLength() + docs = slices.Grow(docs, expectedCount) + } + + docsCount++ + + switch typedCursor := any(cursor).(type) { + case *mongo.Cursor: + curDoc = typedCursor.Current + case *mongo.ChangeStream: + curDoc = typedCursor.Current + default: + panic(fmt.Sprintf("unknown cursor type: %T", cursor)) + } + + docPos := len(buffer) + buffer = append(buffer, curDoc...) + docs = append(docs, buffer[docPos:]) + } + + return docs, buffer, nil +} From e9589bfe2b5aacc7fc18f1a81a38030a7b765d2f Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 12:59:32 -0500 Subject: [PATCH 2/5] add test of GetBatch --- mmongo/cursor_all_test.go | 66 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/mmongo/cursor_all_test.go b/mmongo/cursor_all_test.go index aec794d1..9b3495d2 100644 --- a/mmongo/cursor_all_test.go +++ b/mmongo/cursor_all_test.go @@ -10,8 +10,74 @@ import ( "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" + "go.mongodb.org/mongo-driver/v2/mongo/readconcern" + "go.mongodb.org/mongo-driver/v2/mongo/writeconcern" ) +func TestGetBatch(t *testing.T) { + ctx := t.Context() + + connStr := os.Getenv("MVTEST_META") + if connStr == "" { + t.Skipf("No MVTEST_META found; skipping.") + } + + client, err := mongo.Connect( + options.Client().ApplyURI(connStr), + ) + require.NoError(t, err) + + coll := client.Database(t.Name()).Collection( + "coll", + options.Collection(). + SetWriteConcern(writeconcern.Majority()). + SetReadConcern(readconcern.Majority()), + ) + + sess, err := client.StartSession(options.Session().SetCausalConsistency(true)) + require.NoError(t, err) + + sctx := mongo.NewSessionContext(ctx, sess) + + docsCount := 1_000 + const batchSize = 100 + + _, err = coll.InsertMany( + sctx, + lo.RepeatBy( + docsCount, + func(index int) any { + return bson.D{} + }, + ), + ) + require.NoError(t, err) + + cursor, err := coll.Find( + ctx, + bson.D{}, + options.Find().SetBatchSize(batchSize), + ) + require.NoError(t, err) + + cursor.SetBatchSize(batchSize) + + var docs []bson.Raw + var buf []byte + for range docsCount / batchSize { + docs = docs[:0] + buf = buf[:0] + + docs, buf, err = GetBatch(ctx, cursor, docs, buf) + require.NoError(t, err) + + assert.Len(t, docs, 100, "should get expected batch") + } + + assert.False(t, cursor.TryNext(ctx), "cursor should be done") + require.NoError(t, cursor.Err(), "should be no error") +} + func TestUnmarshalCursor(t *testing.T) { ctx := t.Context() From 9fd12c375b5beb425b4490ffa0d17c92c510b891 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 13:05:52 -0500 Subject: [PATCH 3/5] dedupe --- mmongo/{cursor_all_test.go => cursor_test.go} | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) rename mmongo/{cursor_all_test.go => cursor_test.go} (92%) diff --git a/mmongo/cursor_all_test.go b/mmongo/cursor_test.go similarity index 92% rename from mmongo/cursor_all_test.go rename to mmongo/cursor_test.go index 9b3495d2..f7c2a253 100644 --- a/mmongo/cursor_all_test.go +++ b/mmongo/cursor_test.go @@ -17,15 +17,7 @@ import ( func TestGetBatch(t *testing.T) { ctx := t.Context() - connStr := os.Getenv("MVTEST_META") - if connStr == "" { - t.Skipf("No MVTEST_META found; skipping.") - } - - client, err := mongo.Connect( - options.Client().ApplyURI(connStr), - ) - require.NoError(t, err) + client := getClientFromEnv(t) coll := client.Database(t.Name()).Collection( "coll", @@ -81,15 +73,7 @@ func TestGetBatch(t *testing.T) { func TestUnmarshalCursor(t *testing.T) { ctx := t.Context() - connStr := os.Getenv("MVTEST_META") - if connStr == "" { - t.Skipf("No MVTEST_META found; skipping.") - } - - client, err := mongo.Connect( - options.Client().ApplyURI(connStr), - ) - require.NoError(t, err) + client := getClientFromEnv(t) cursor, err := client.Database("admin").Aggregate( ctx, @@ -121,6 +105,20 @@ func TestUnmarshalCursor(t *testing.T) { ) } +func getClientFromEnv(t *testing.T) *mongo.Client { + connStr := os.Getenv("MVTEST_META") + if connStr == "" { + t.Skipf("No MVTEST_META found; skipping.") + } + + client, err := mongo.Connect( + options.Client().ApplyURI(connStr), + ) + require.NoError(t, err) + + return client +} + type unmarshaler struct { Foo int32 } From 7f459403fab260f58a6488fd50a92e55893208b4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 13:51:29 -0500 Subject: [PATCH 4/5] remove trace-level monitor --- internal/verifier/migration_verifier.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index ff8eccd6..e23445cb 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -25,10 +25,8 @@ import ( "github.com/dustin/go-humanize" "github.com/olekukonko/tablewriter" "github.com/pkg/errors" - "github.com/rs/zerolog" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" - "go.mongodb.org/mongo-driver/v2/event" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" "go.mongodb.org/mongo-driver/v2/mongo/readconcern" @@ -222,17 +220,6 @@ func (verifier *Verifier) getClientOpts(uri string) *options.ClientOptions { opts.SetReadConcern(readconcern.Majority()) }) - if verifier.logger.GetLevel() <= zerolog.TraceLevel { - opts.SetMonitor(&event.CommandMonitor{ - Succeeded: func(ctx context.Context, cse *event.CommandSucceededEvent) { - verifier.logger.Trace(). - Str("commandName", cse.CommandName). - Stringer("reply", cse.Reply). - Msg("Command succeeded.") - }, - }) - } - return opts } From 73c31a86904b6bf9264519e65018e2a876ab5978 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 13:51:42 -0500 Subject: [PATCH 5/5] uncomment --- internal/verifier/change_stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 7c44163a..5066e4f8 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -29,7 +29,7 @@ import ( ) func (suite *IntegrationTestSuite) TestChangeStreamFilter_InitialNonempty() { - //zerolog.SetGlobalLevel(zerolog.TraceLevel) // gets restored automatically + zerolog.SetGlobalLevel(zerolog.TraceLevel) // gets restored automatically ctx := suite.Context() dbName := suite.DBNameForTest()