Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/verifier/change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func (rc *ChangeReaderCommon) getEventsPerSecond() option.Option[float64] {
}

func (rc *ChangeReaderCommon) persistResumeToken(ctx context.Context, token bson.Raw) error {
if len(token) == 0 {
panic("internal error: resume token is empty but should never be")
}

coll := rc.metaDB.Collection(changeReaderCollectionName)
_, err := coll.ReplaceOne(
ctx,
Expand Down
53 changes: 39 additions & 14 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mmongo"
"github.com/10gen/migration-verifier/option"
mapset "github.com/deckarep/golang-set/v2"
clone "github.com/huandu/go-clone/generic"
Expand Down Expand Up @@ -376,11 +377,13 @@ func (csr *ChangeStreamReader) createChangeStream(

csStartLogEvent := csr.logger.Info()

if token, hasToken := savedResumeToken.Get(); hasToken {
resumetoken, hasSavedToken := savedResumeToken.Get()

if hasSavedToken {
logEvent := csStartLogEvent.
Stringer(csr.resumeTokenDocID(), token)
Stringer(csr.resumeTokenDocID(), resumetoken)

ts, err := csr.resumeTokenTSExtractor(token)
ts, err := csr.resumeTokenTSExtractor(resumetoken)
if err == nil {
logEvent = addTimestampToLogEvent(ts, logEvent)
} else {
Expand All @@ -392,9 +395,9 @@ func (csr *ChangeStreamReader) createChangeStream(
logEvent.Msg("Starting change stream from persisted resume token.")

if util.ClusterHasChangeStreamStartAfter([2]int(csr.clusterInfo.VersionArray)) {
opts = opts.SetStartAfter(token)
opts = opts.SetStartAfter(resumetoken)
} else {
opts = opts.SetResumeAfter(token)
opts = opts.SetResumeAfter(resumetoken)
}
} else {
csStartLogEvent.Msgf("Starting change stream from current %s cluster time.", csr.readerType)
Expand All @@ -410,9 +413,22 @@ func (csr *ChangeStreamReader) createChangeStream(
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "opening change stream")
}

err = csr.persistResumeToken(ctx, changeStream.ResumeToken())
if err != nil {
return nil, nil, bson.Timestamp{}, err
if !hasSavedToken {
// Usually the change stream’s initial response is empty, but sometimes
// there are events right away. We can discard those events because
// they’ve already happened, and our initial scan is yet to come.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I understanding correctly that it is safe because the initial scan enforces afterClusterTime that is after the resume token timestamp?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. If there was no saved resume token, then the time that this function returns gets used as the find queries’ initial afterClusterTime.

if len(changeStream.ResumeToken()) == 0 {
_, _, err := mmongo.GetBatch(ctx, changeStream, nil, nil)

if err != nil {
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "discarding change stream’s initial events")
}
}

err = csr.persistResumeToken(ctx, changeStream.ResumeToken())
if err != nil {
return nil, nil, bson.Timestamp{}, errors.Wrapf(err, "persisting initial resume token")
}
}

startTs, err := csr.resumeTokenTSExtractor(changeStream.ResumeToken())
Expand All @@ -428,14 +444,19 @@ func (csr *ChangeStreamReader) createChangeStream(
return nil, nil, bson.Timestamp{}, errors.Wrap(err, "failed to read cluster time from session")
}

csr.logger.Debug().
Any("resumeTokenTimestamp", startTs).
Any("clusterTime", clusterTime).
Stringer("changeStreamReader", csr).
Msg("Using earlier time as start timestamp.")

if startTs.After(clusterTime) {
csr.logger.Debug().
Any("resumeTokenTimestamp", startTs).
Any("clusterTime", clusterTime).
Stringer("changeStreamReader", csr).
Msg("Cluster time predates resume token; using it as start timestamp.")

startTs = clusterTime
} else {
csr.logger.Debug().
Any("resumeTokenTimestamp", startTs).
Stringer("changeStreamReader", csr).
Msg("Got start timestamp from change stream.")
}

return changeStream, sess, startTs, nil
Expand Down Expand Up @@ -532,6 +553,10 @@ func (csr *ChangeStreamReader) String() string {
}

func extractTSFromChangeStreamResumeToken(resumeToken bson.Raw) (bson.Timestamp, error) {
if len(resumeToken) == 0 {
panic("internal error: resume token is empty but should never be")
}

// Change stream token is always a V1 keystring in the _data field
tokenDataRV, err := resumeToken.LookupErr("_data")

Expand Down
43 changes: 43 additions & 0 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,49 @@ import (
"golang.org/x/sync/errgroup"
)

func (suite *IntegrationTestSuite) TestChangeStreamFilter_InitialNonempty() {
zerolog.SetGlobalLevel(zerolog.TraceLevel) // gets restored automatically

ctx := suite.Context()
dbName := suite.DBNameForTest()

go func() {
for ctx.Err() == nil {
coll := suite.srcMongoClient.
Database(dbName).
Collection("coll")

_, _ = coll.InsertOne(ctx, bson.D{{"_id", 123}})
_, _ = coll.DeleteOne(ctx, bson.D{{"_id", 123}})
}
}()

for i := range 100 {
suite.Run(
fmt.Sprint(i),
func() {
ctx, cancel := contextplus.WithCancelCause(ctx)
defer cancel(fmt.Errorf("subtest is done"))

verifier := suite.BuildVerifier()

rdr, ok := verifier.srcChangeReader.(*ChangeStreamReader)
if !ok {
suite.T().Skipf("source change reader is a %T; this test needs a %T", verifier.srcChangeReader, rdr)
}

eg, egCtx := contextplus.ErrGroup(ctx)
suite.Require().NoError(rdr.start(egCtx, eg))

suite.Require().NoError(
verifier.metaClient.Database(verifier.metaDBName).Drop(ctx),
)
},
)

}
}

func (suite *IntegrationTestSuite) TestChangeStreamFilter_NoNamespaces() {
ctx := suite.Context()

Expand Down
69 changes: 69 additions & 0 deletions mmongo/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package mmongo

import (
"context"
"fmt"
"slices"

"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
)

type cursorLike interface {
TryNext(context.Context) bool
RemainingBatchLength() int
Err() error
}

// GetBatch returns a batch of documents from a cursor. It does so by appending
// to passed-in slices, which lets you optimize memory handling.
func GetBatch[T cursorLike](
ctx context.Context,
cursor T,
docs []bson.Raw,
buffer []byte,
) ([]bson.Raw, []byte, error) {
var docsCount, expectedCount int

var curDoc bson.Raw

for hasDocs := true; hasDocs; hasDocs = cursor.RemainingBatchLength() > 0 {
got := cursor.TryNext(ctx)

if cursor.Err() != nil {
return nil, nil, errors.Wrap(cursor.Err(), "cursor iteration failed")
}

if !got {
if docsCount != 0 {
panic(fmt.Sprintf("Docs batch ended after %d but expected %d", docsCount, expectedCount))
}

break
}

// This ensures we only reallocate once (if at all):
if docsCount == 0 {
expectedCount = 1 + cursor.RemainingBatchLength()
docs = slices.Grow(docs, expectedCount)
}

docsCount++

switch typedCursor := any(cursor).(type) {
case *mongo.Cursor:
curDoc = typedCursor.Current
case *mongo.ChangeStream:
curDoc = typedCursor.Current
default:
panic(fmt.Sprintf("unknown cursor type: %T", cursor))
}

docPos := len(buffer)
buffer = append(buffer, curDoc...)
docs = append(docs, buffer[docPos:])
}

return docs, buffer, nil
}
66 changes: 0 additions & 66 deletions mmongo/cursor_all_test.go

This file was deleted.

Loading