Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,24 +108,49 @@ metaURI: mongodb://localhost:28012
The verifier will now check to completion to make sure that there are no inconsistencies. The command you need to send the verifier here is `writesOff`. The command doesn’t block. This means that you will have to poll the verifier, or watch its logs, to see the status of the verification (see `progress`).

```
curl -H "Content-Type: application/json" -X POST -d '{}' http://127.0.0.1:27020/api/v1/writesOff
curl -H "Content-Type: application/json" -d '{}' http://127.0.0.1:27020/api/v1/writesOff
```


3. You can poll the status of the verification by hitting the `progress`endpoint. In particular, the `phase`should reveal whether the verifier is done verifying; once the `phase`is `idle`the verification has completed. When the `phase`has reached `idle`, the `error`field should be `null`and the `failedTasks`field should be `0`, if the verification was successful. A non-`null``error`field indicates that the verifier itself ran into an error. `failedTasks`being non-`0`indicates that there was an inconsistency. The logs printed by the verifier itself should have more information regarding what the inconsistencies are.

```
curl -H "Content-Type: application/json" -X GET http://127.0.0.1:27020/api/v1/progress

```




This is a sample output when inconsistencies are present:
3. You can poll the status of the verification by hitting the `progress` endpoint. In particular, the `phase` should reveal whether the verifier is done verifying. Once the `phase` is `idle`, the verification has completed. At that point the `error` field should be `null`, and the `failedTasks` field should be `0`, if the verification was successful. A non-`null` `error` field indicates that the verifier itself ran into an error. `failedTasks` being non-`0` indicates that there was an inconsistency. See below for how to investigate mismatches.

```
curl http://127.0.0.1:27020/api/v1/progress
```

`{"progress":{"phase":"idle","error":null,"verificationStatus":{"totalTasks":1,"addedTasks":0,"processingTasks":0,"failedTasks":1,"completedTasks":0,"metadataMismatchTasks":0,"recheckTasks":0}}}`
### `/progress` API Response Contents

In the below a “timestamp” is an object with `T` and `I` unsigned integers.
These represent a logical time in MongoDB’s replication protocol.

- `progress`
- `phase` (string): either `idle`, `check`, or `recheck`
- `generation` (unsigned integer)
- `generationStats`
- `timeElapsed` (string, [Go Duration format](https://pkg.go.dev/time#ParseDuration))
- `activeWorkers` (unsigned integer)
- `docsCompared` (unsigned integer)
- `totalDocs` (unsigned integer)
- `srcBytesCompared` (unsigned integer)
- `totalSrcBytes` (unsigned integer, only present in `check` phase)
- `priorMismatches` (unsigned integer, optional, mismatches seen in prior generation)
- `mismatchesFound` (unsigned integer)
- `rechecksEnqueued` (unsigned integer)
- `srcChangeStats`
- `eventsPerSecond` (nonnegative float, optional)
- `currentTimes` (optional)
- `lastHandledTime` (timestamp)
- `lastClusterTime` (timestamp)
- `bufferSaturation` (nonnegative float)
- `dstChangeStats` (same fields as `srcChangeStats`)
- `error` (string, optional)
- `verificationStatus` (tasks for the current generation)
- `totalTasks` (unsigned integer)
- `addedTasks` (unsigned integer, unstarted tasks)
- `processingTasks` (unsigned integer, in-progress tasks)
- `failedTasks` (unsigned integer, tasks that found a document mismatch)
- `completedTasks` (unsigned integer, tasks that found no problems)
- `metadataMismatchTasks` (unsigned integer, tasks that found a collection metadata mismatch)


# CLI Options
Expand Down
46 changes: 40 additions & 6 deletions internal/verifier/change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,24 @@ const (
changeReaderCollectionName = "changeReader"
)

type readerCurrentTimes struct {
LastHandledTime bson.Timestamp `json:"lastHandledTime"`
LastClusterTime bson.Timestamp `json:"lastClusterTime"`
}

func (rp readerCurrentTimes) Lag() time.Duration {
return time.Second * time.Duration(
int(rp.LastClusterTime.T)-int(rp.LastHandledTime.T),
)
}

type changeReader interface {
getWhichCluster() whichCluster
getReadChannel() <-chan changeEventBatch
getStartTimestamp() bson.Timestamp
getLastSeenClusterTime() option.Option[bson.Timestamp]
getEventsPerSecond() option.Option[float64]
getLag() option.Option[time.Duration]
getCurrentTimes() option.Option[readerCurrentTimes]
getBufferSaturation() float64
setWritesOff(bson.Timestamp)
start(context.Context, *errgroup.Group) error
Expand All @@ -64,9 +75,10 @@ type ChangeReaderCommon struct {

lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]]

currentTimes *msync.TypedAtomic[option.Option[readerCurrentTimes]]

startAtTs *bson.Timestamp

lag *msync.TypedAtomic[option.Option[time.Duration]]
batchSizeHistory *history.History[int]

onDDLEvent ddlEventHandling
Expand All @@ -77,7 +89,7 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon {
readerType: clusterName,
changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize),
writesOffTs: util.NewEventual[bson.Timestamp](),
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
currentTimes: msync.NewTypedAtomic(option.None[readerCurrentTimes]()),
lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()),
batchSizeHistory: history.New[int](time.Minute),
onDDLEvent: lo.Ternary(
Expand Down Expand Up @@ -123,11 +135,23 @@ func (rc *ChangeReaderCommon) getBufferSaturation() float64 {
return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan))
}

func (rc *ChangeReaderCommon) getCurrentTimes() option.Option[readerCurrentTimes] {
return rc.currentTimes.Load()
}

/*
// getLag returns the observed change stream lag (i.e., the delta between
// cluster time and the most-recently-seen change event).
func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] {
return rc.lag.Load()
if prog, has := rc.progress.Load().Get(); has {
return option.Some(
time.Duration(int(prog.lastClusterTime.T)-int(prog.lastResumeTime.T)) * time.Second,
)
}

return option.None[time.Duration]()
}
*/

// getEventsPerSecond returns the number of change events per second we’ve been
// seeing “recently”. (See implementation for the actual period over which we
Expand Down Expand Up @@ -221,8 +245,18 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio
func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) {
tokenTs, err := rc.resumeTokenTSExtractor(token)
if err == nil {
lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T)
rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs)))
cTime, err := util.GetClusterTimeFromSession(sess)
if err != nil {
rc.logger.Warn().
Err(err).
Str("reader", string(rc.getWhichCluster())).
Msg("Failed to extract cluster time from session.")
} else {
rc.currentTimes.Store(option.Some(readerCurrentTimes{
LastHandledTime: tokenTs,
LastClusterTime: cTime,
}))
}
} else {
rc.logger.Warn().
Err(err).
Expand Down
4 changes: 2 additions & 2 deletions internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() {
verifierRunner.AwaitGenerationEnd(),
)

return verifier.srcChangeReader.getLag().IsSome()
return verifier.srcChangeReader.getCurrentTimes().IsSome()
},
time.Minute,
100*time.Millisecond,
Expand All @@ -602,7 +602,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() {
// NB: The lag will include whatever time elapsed above before
// verifier read the event, so it can be several seconds.
suite.Assert().Less(
verifier.srcChangeReader.getLag().MustGet(),
verifier.srcChangeReader.getCurrentTimes().MustGet().Lag(),
10*time.Minute,
"verifier lag is as expected",
)
Expand Down
17 changes: 7 additions & 10 deletions internal/verifier/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
// Now that we’ve initialized verifier.generation we can
// start the change readers.
verifier.initializeChangeReaders()
verifier.mux.Unlock()
//verifier.mux.Unlock()

err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
Expand Down Expand Up @@ -246,17 +246,12 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh

verifier.logger.Debug().Msg("Starting Check")

verifier.phase = Check
defer func() {
verifier.phase = Idle
}()

if err := verifier.startChangeHandling(ctx); err != nil {
return err
}

// Log the verification status when initially booting up so it's easy to see the current state
verificationStatus, err := verifier.GetVerificationStatus(ctx)
verificationStatus, err := verifier.getVerificationStatusForGeneration(ctx, verifier.generation)
if err != nil {
return errors.Wrapf(
err,
Expand All @@ -274,7 +269,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
}
// Now enter the multi-generational steady check state
for {
verifier.mux.Lock()
//verifier.mux.Lock()
err = retry.New().WithCallback(
func(ctx context.Context, _ *retry.FuncInfo) error {
return verifier.persistGenerationWhileLocked(ctx)
Expand All @@ -286,12 +281,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
verifier.mux.Unlock()
return errors.Wrapf(err, "failed to persist generation (%d)", verifier.generation)
}
verifier.mux.Unlock()

verifier.generationStartTime = time.Now()
verifier.srcEventRecorder.Reset()
verifier.dstEventRecorder.Reset()

verifier.mux.Unlock()

err := verifier.CheckWorker(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -362,7 +358,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
// on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will
// derive from rechecks enqueued during generation 2.
verifier.generation++
verifier.phase = Recheck
verifier.mux.Unlock()

// Generation of recheck tasks can partial-fail. The following will
Expand All @@ -384,6 +379,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
Err(err).
Msg("Failed to clear out old recheck docs. (This is probably unimportant.)")
}

verifier.mux.Lock()
}
}

Expand Down
24 changes: 9 additions & 15 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ type Verifier struct {
lastGeneration bool
running bool
generation int
phase string
port int
metaURI string
metaClient *mongo.Client
Expand Down Expand Up @@ -181,7 +180,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier {
logger: logger,
writer: logWriter,

phase: Idle,
numWorkers: NumWorkers,
readPreference: readpref.Primary(),
partitionSizeInBytes: 400 * 1024 * 1024,
Expand Down Expand Up @@ -1255,9 +1253,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection(
}

func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) {
taskCollection := verifier.verificationTaskCollection()
generation, _ := verifier.getGeneration()

return verifier.getVerificationStatusForGeneration(ctx, generation)
}

func (verifier *Verifier) getVerificationStatusForGeneration(
ctx context.Context,
generation int,
) (*VerificationStatus, error) {
taskCollection := verifier.verificationTaskCollection()

var results []bson.Raw

err := retry.New().WithCallback(
Expand Down Expand Up @@ -1396,18 +1402,6 @@ func (verifier *Verifier) StartServer() error {
return server.Run(context.Background())
}

func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) {
status, err := verifier.GetVerificationStatus(ctx)
if err != nil {
return Progress{Error: err}, err
}
return Progress{
Phase: verifier.phase,
Generation: verifier.generation,
Status: status,
}, nil
}

// Returned boolean indicates that namespaces are cached, and
// whatever needs them can proceed.
func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool {
Expand Down
74 changes: 71 additions & 3 deletions internal/verifier/mismatches.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func countMismatchesForTasks(
ctx context.Context,
db *mongo.Database,
taskIDs []bson.ObjectID,
filter bson.D,
filter any,
) (int64, int64, error) {
cursor, err := db.Collection(mismatchesCollectionName).Aggregate(
ctx,
Expand Down Expand Up @@ -116,8 +116,12 @@ func countMismatchesForTasks(
return 0, 0, errors.Wrap(err, "reading mismatch counts")
}

if len(got) != 1 {
return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got)
switch len(got) {
case 0:
return 0, 0, nil
case 1:
default:
return 0, 0, fmt.Errorf("unexpected mismatch count (%d) result: %+v", len(got), got)
}

totalRV, err := got[0].LookupErr("total")
Expand All @@ -135,6 +139,70 @@ func countMismatchesForTasks(
return matched, totalRV.AsInt64() - matched, nil
}

func countRechecksForGeneration(
ctx context.Context,
metaDB *mongo.Database,
generation int,
) (int64, int64, error) {
cursor, err := metaDB.Collection(verificationTasksCollection).Aggregate(
ctx,
mongo.Pipeline{
{{"$match", bson.D{
{"generation", generation},
}}},
{{"$lookup", bson.D{
{"from", mismatchesCollectionName},
{"localField", "_id"},
{"foreignField", "task"},
{"as", "mismatches"},
}}},
{{"$addFields", bson.D{
{"mismatches", bson.D{{"$size", "$mismatches"}}},
}}},
{{"$group", bson.D{
{"_id", nil},
{"changes", bson.D{
{"$sum", bson.D{
{"$subtract", bson.A{
bson.D{{"$size", "$_ids"}},
"$mismatches",
}},
}},
}},
{"mismatches", bson.D{
{"$sum", "$mismatches"},
}},
}}},
},
)
if err != nil {
return 0, 0, errors.Wrap(err, "sending query to count last generation’s found mismatches")
}

defer cursor.Close(ctx)

if !cursor.Next(ctx) {
if cursor.Err() != nil {
return 0, 0, errors.Wrap(err, "reading count of last generation’s found mismatches")
}

// This happens if there were no tasks in the queried generation.
return 0, 0, nil
}

result := struct {
Mismatches int64
Changes int64
}{}

err = cursor.Decode(&result)
if err != nil {
return 0, 0, errors.Wrapf(err, "reading mismatches from result (%v)", cursor.Current)
}

return result.Mismatches, result.Changes, nil
}

func getMismatchesForTasks(
ctx context.Context,
db *mongo.Database,
Expand Down
Loading