From e866fc2bcffd33ddbc4bb7bf58bc0fa4c607eb84 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 12:25:46 -0500 Subject: [PATCH 01/27] save --- internal/verifier/migration_verifier.go | 22 ++++++++----------- internal/verifier/progress.go | 29 +++++++++++++++++++++++++ internal/verifier/webserver.go | 9 ++++---- 3 files changed, 43 insertions(+), 17 deletions(-) create mode 100644 internal/verifier/progress.go diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..e38fb50c 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -1255,9 +1255,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { - taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + return verifier.getVerificationStatusForGeneration(ctx, generation) +} + +func (verifier *Verifier) getVerificationStatusForGeneration( + ctx context.Context, + generation int, +) (*VerificationStatus, error) { + taskCollection := verifier.verificationTaskCollection() + var results []bson.Raw err := retry.New().WithCallback( @@ -1396,18 +1404,6 @@ func (verifier *Verifier) StartServer() error { return server.Run(context.Background()) } -func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { - status, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return Progress{Error: err}, err - } - return Progress{ - Phase: verifier.phase, - Generation: verifier.generation, - Status: status, - }, nil -} - // Returned boolean indicates that namespaces are cached, and // whatever needs them can proceed. func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool { diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go new file mode 100644 index 00000000..36610ea7 --- /dev/null +++ b/internal/verifier/progress.go @@ -0,0 +1,29 @@ +package verifier + +import ( + "context" + "time" +) + +func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { + verifier.mux.RLock() + defer verifier.mux.RUnlock() + + generation := verifier.generation + vStatus, err := verifier.getVerificationStatusForGeneration(ctx, generation) + + progressTime := time.Now() + genElapsed := progressTime.Sub(verifier.generationStartTime) + + /* + status, err := verifier.GetVerificationStatus(ctx) + if err != nil { + return Progress{Error: err}, err + } + return Progress{ + Phase: verifier.phase, + Generation: verifier.generation, + Status: status, + }, nil + */ +} diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 57c52624..07887304 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -242,10 +242,11 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { // Progress represents the structure of the JSON response from the Progress end point. type Progress struct { - Phase string `json:"phase"` - Generation int `json:"generation"` - Error error `json:"error"` - Status *VerificationStatus `json:"verificationStatus"` + Phase string `json:"phase"` + Generation int `json:"generation"` + GenerationTimeElapsed time.Duration `json:"generationTimeElapsed"` + Error error `json:"error"` + Status *VerificationStatus `json:"verificationStatus"` } // progressEndpoint implements the gin handle for the progress endpoint. From b074f9f5cb1d9a98e73bc8fdb9da2b9f490ceaa7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 15:44:10 -0500 Subject: [PATCH 02/27] add API --- internal/verifier/mismatches.go | 2 +- internal/verifier/progress.go | 151 +++++++++++++++++++++++++++++--- internal/verifier/statistics.go | 7 ++ internal/verifier/webserver.go | 36 ++++++-- 4 files changed, 178 insertions(+), 18 deletions(-) diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 6cafb852..382acfcb 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -85,7 +85,7 @@ func countMismatchesForTasks( ctx context.Context, db *mongo.Database, taskIDs []bson.ObjectID, - filter bson.D, + filter any, ) (int64, int64, error) { cursor, err := db.Collection(mismatchesCollectionName).Aggregate( ctx, diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 36610ea7..ed734638 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -3,27 +3,154 @@ package verifier import ( "context" "time" + + "github.com/10gen/migration-verifier/contextplus" + "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/mslices" + "github.com/pkg/errors" + "github.com/samber/lo" + "go.mongodb.org/mongo-driver/v2/bson" ) func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { verifier.mux.RLock() defer verifier.mux.RUnlock() + var vStatus *VerificationStatus + generation := verifier.generation - vStatus, err := verifier.getVerificationStatusForGeneration(ctx, generation) progressTime := time.Now() genElapsed := progressTime.Sub(verifier.generationStartTime) - /* - status, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return Progress{Error: err}, err - } - return Progress{ - Phase: verifier.phase, - Generation: verifier.generation, - Status: status, - }, nil - */ + genStats := ProgressGenerationStats{ + TimeElapsed: genElapsed, + } + + eg, egCtx := contextplus.ErrGroup(ctx) + eg.Go( + func() error { + var err error + vStatus, err = verifier.getVerificationStatusForGeneration(egCtx, generation) + + return errors.Wrapf(err, "fetching generation %d’s tasks’ status", generation) + }, + ) + eg.Go( + func() error { + var err error + nsStats, err := verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation) + + if err != nil { + return errors.Wrapf(err, "fetching generation %d’s persisted namespace stats", generation) + } + + var totalDocs, comparedDocs types.DocumentCount + var totalBytes, comparedBytes types.ByteCount + var totalNss, completedNss types.NamespaceCount + + for _, result := range nsStats { + totalDocs += result.TotalDocs + comparedDocs += result.DocsCompared + totalBytes += result.TotalBytes + comparedBytes += result.BytesCompared + + totalNss++ + if result.PartitionsDone > 0 { + partitionsPending := result.PartitionsAdded + result.PartitionsProcessing + if partitionsPending == 0 { + completedNss++ + } + } + } + + var activeWorkers int + perNamespaceWorkerStats := verifier.getPerNamespaceWorkerStats() + for _, nsWorkerStats := range perNamespaceWorkerStats { + for _, workerStats := range nsWorkerStats { + activeWorkers++ + comparedDocs += workerStats.SrcDocCount + comparedBytes += workerStats.SrcByteCount + } + } + + genStats.DocsCompared = comparedDocs + genStats.TotalDocs = totalDocs + + genStats.SrcBytesCompared = comparedBytes + genStats.TotalSrcBytes = totalBytes + + return nil + }, + ) + eg.Go( + func() error { + failedTasks, incompleteTasks, err := FetchFailedAndIncompleteTasks( + ctx, + verifier.logger, + verifier.verificationTaskCollection(), + verificationTaskVerifyDocuments, + generation, + ) + if err != nil { + return errors.Wrapf(err, "fetching generation %d’s failed & incomplete tasks") + } + + taskIDsToQuery := lo.Map( + lo.Flatten(mslices.Of(failedTasks, incompleteTasks)), + func(ft VerificationTask, _ int) bson.ObjectID { + return ft.PrimaryKey + }, + ) + + mismatchCount, _, err := countMismatchesForTasks( + egCtx, + verifier.verificationDatabase(), + taskIDsToQuery, + true, + ) + if err != nil { + return errors.Wrapf(err, "counting generation %d’s mismatches") + } + + genStats.MismatchesFound = mismatchCount + + return nil + }, + ) + eg.Go( + func() error { + recheckColl := verifier.getRecheckQueueCollection(1 + generation) + count, err := recheckColl.EstimatedDocumentCount(ctx) + if err != nil { + return errors.Wrapf(err, "counting rechecks enqueued during generation %d", generation) + } + + genStats.RechecksEnqueued = count + + return nil + }, + ) + + if err := eg.Wait(); err != nil { + return Progress{Error: err}, err + } + + return Progress{ + Phase: verifier.phase, + Generation: verifier.generation, + GenerationStats: genStats, + SrcChangeStreamStats: ProgressChangeStreamStats{ + EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(), + Lag: verifier.srcChangeReader.getLag(), + BufferSaturation: verifier.srcChangeReader.getBufferSaturation(), + }, + DstChangeStreamStats: ProgressChangeStreamStats{ + EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(), + Lag: verifier.dstChangeReader.getLag(), + BufferSaturation: verifier.dstChangeReader.getBufferSaturation(), + }, + Status: vStatus, + }, nil + } diff --git a/internal/verifier/statistics.go b/internal/verifier/statistics.go index 73aa0725..29ad3038 100644 --- a/internal/verifier/statistics.go +++ b/internal/verifier/statistics.go @@ -179,6 +179,13 @@ var jsonTemplate *template.Template func (verifier *Verifier) GetPersistedNamespaceStatistics(ctx context.Context) ([]NamespaceStats, error) { generation, _ := verifier.getGeneration() + return verifier.GetPersistedNamespaceStatisticsForGeneration(ctx, generation) +} + +func (verifier *Verifier) GetPersistedNamespaceStatisticsForGeneration( + ctx context.Context, + generation int, +) ([]NamespaceStats, error) { templateOnce.Do(func() { tmpl, err := template.New("").Parse(perNsStatsPipelineTemplate) if err != nil { diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 07887304..26a36610 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -11,7 +11,9 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/logger" + "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/verifier/webserver" + "github.com/10gen/migration-verifier/option" "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -240,13 +242,37 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { successResponse(c) } +type ProgressGenerationStats struct { + TimeElapsed time.Duration `json:"generationTimeElapsed"` + + DocsCompared types.DocumentCount `json:"srcDocsCompared"` + TotalDocs types.DocumentCount `json:"allSrcDocsCompared"` + + SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` + TotalSrcBytes types.ByteCount `json:"allSrcBytes"` + + MismatchesFound int64 `json:"mismatchesFound"` + RechecksEnqueued int64 `json:"rechecksEnqueued"` +} + +type ProgressChangeStreamStats struct { + EventsPerSecond option.Option[float64] + Lag option.Option[time.Duration] + BufferSaturation float64 +} + // Progress represents the structure of the JSON response from the Progress end point. type Progress struct { - Phase string `json:"phase"` - Generation int `json:"generation"` - GenerationTimeElapsed time.Duration `json:"generationTimeElapsed"` - Error error `json:"error"` - Status *VerificationStatus `json:"verificationStatus"` + Phase string `json:"phase"` + + Generation int `json:"generation"` + GenerationStats ProgressGenerationStats `json:"generationStats"` + + SrcChangeStreamStats ProgressChangeStreamStats `json:"srcChangeStreamStats"` + DstChangeStreamStats ProgressChangeStreamStats `json:"dstChangeStreamStats"` + + Error error `json:"error"` + Status *VerificationStatus `json:"verificationStatus"` } // progressEndpoint implements the gin handle for the progress endpoint. From c32ea1571a4c05f19b9e6531d492d37de2bcc06b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 15:46:18 -0500 Subject: [PATCH 03/27] augment Progress API --- internal/verifier/progress.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index ed734638..d88ee115 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -93,7 +93,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { generation, ) if err != nil { - return errors.Wrapf(err, "fetching generation %d’s failed & incomplete tasks") + return errors.Wrapf(err, "fetching generation %d’s failed & incomplete tasks", generation) } taskIDsToQuery := lo.Map( @@ -110,7 +110,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { true, ) if err != nil { - return errors.Wrapf(err, "counting generation %d’s mismatches") + return errors.Wrapf(err, "counting mismatches seen during generation %d", generation) } genStats.MismatchesFound = mismatchCount From e452319f4dd1254f3de2931cb615fc2fe892e2e1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 15:58:21 -0500 Subject: [PATCH 04/27] string elapsed --- internal/verifier/progress.go | 2 +- internal/verifier/webserver.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index d88ee115..ec29a838 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -24,7 +24,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { genElapsed := progressTime.Sub(verifier.generationStartTime) genStats := ProgressGenerationStats{ - TimeElapsed: genElapsed, + TimeElapsed: genElapsed.String(), } eg, egCtx := contextplus.ErrGroup(ctx) diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 26a36610..0f675f43 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -243,7 +243,7 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { } type ProgressGenerationStats struct { - TimeElapsed time.Duration `json:"generationTimeElapsed"` + TimeElapsed string `json:"timeElapsed"` DocsCompared types.DocumentCount `json:"srcDocsCompared"` TotalDocs types.DocumentCount `json:"allSrcDocsCompared"` From c9be1cae56160bf21575fbc47747e337fd9c6d7a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 16:00:21 -0500 Subject: [PATCH 05/27] fix durations --- internal/verifier/progress.go | 15 +++++++++++++-- internal/verifier/webserver.go | 2 +- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index ec29a838..50ffc715 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -7,6 +7,7 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" @@ -142,15 +143,25 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { GenerationStats: genStats, SrcChangeStreamStats: ProgressChangeStreamStats{ EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(), - Lag: verifier.srcChangeReader.getLag(), + Lag: optDurationToOptString(verifier.srcChangeReader.getLag()), BufferSaturation: verifier.srcChangeReader.getBufferSaturation(), }, DstChangeStreamStats: ProgressChangeStreamStats{ EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(), - Lag: verifier.dstChangeReader.getLag(), + Lag: optDurationToOptString(verifier.dstChangeReader.getLag()), BufferSaturation: verifier.dstChangeReader.getBufferSaturation(), }, Status: vStatus, }, nil } + +func optDurationToOptString(dur option.Option[time.Duration]) option.Option[string] { + var ret option.Option[string] + + if dur, has := dur.Get(); has { + ret = option.Some(dur.String()) + } + + return ret +} diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 0f675f43..6ac2e030 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -257,7 +257,7 @@ type ProgressGenerationStats struct { type ProgressChangeStreamStats struct { EventsPerSecond option.Option[float64] - Lag option.Option[time.Duration] + Lag option.Option[string] BufferSaturation float64 } From edea418ebf3856233785039bc914ff1e347bc753 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 16:02:28 -0500 Subject: [PATCH 06/27] tolerate no mismatches --- internal/verifier/mismatches.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 382acfcb..36d11430 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -116,7 +116,11 @@ func countMismatchesForTasks( return 0, 0, errors.Wrap(err, "reading mismatch counts") } - if len(got) != 1 { + switch len(got) { + case 0: + return 0, 0, nil + case 1: + default: return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got) } From 489097a7156c09aa72a26a00d5ea8a760e577049 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 16:03:21 -0500 Subject: [PATCH 07/27] huh?? --- internal/verifier/mismatches.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 36d11430..4a4002ba 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -121,7 +121,7 @@ func countMismatchesForTasks( return 0, 0, nil case 1: default: - return 0, 0, fmt.Errorf("unexpected mismatch count result: %+v", got) + return 0, 0, fmt.Errorf("unexpected mismatch count (%d) result: %+v", len(got), got) } totalRV, err := got[0].LookupErr("total") From d73f6cc55316020797685fa8873331605a32a225 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 16:08:09 -0500 Subject: [PATCH 08/27] fix JSON names --- internal/verifier/webserver.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 6ac2e030..a7381906 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -245,11 +245,11 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { type ProgressGenerationStats struct { TimeElapsed string `json:"timeElapsed"` - DocsCompared types.DocumentCount `json:"srcDocsCompared"` - TotalDocs types.DocumentCount `json:"allSrcDocsCompared"` + DocsCompared types.DocumentCount `json:"docsCompared"` + TotalDocs types.DocumentCount `json:"totalDocs"` SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` - TotalSrcBytes types.ByteCount `json:"allSrcBytes"` + TotalSrcBytes types.ByteCount `json:"totalSrcBytes"` MismatchesFound int64 `json:"mismatchesFound"` RechecksEnqueued int64 `json:"rechecksEnqueued"` From 7d6b4ce7f3148984939a3c1bf128977e20696d95 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 16:14:36 -0500 Subject: [PATCH 09/27] add active workers --- internal/verifier/progress.go | 2 ++ internal/verifier/webserver.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 50ffc715..1c60579d 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -81,6 +81,8 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { genStats.SrcBytesCompared = comparedBytes genStats.TotalSrcBytes = totalBytes + genStats.ActiveWorkers = activeWorkers + return nil }, ) diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index a7381906..87bc26b1 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -243,7 +243,8 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { } type ProgressGenerationStats struct { - TimeElapsed string `json:"timeElapsed"` + TimeElapsed string `json:"timeElapsed"` + ActiveWorkers int DocsCompared types.DocumentCount `json:"docsCompared"` TotalDocs types.DocumentCount `json:"totalDocs"` From e82667c9ba50557d849bd2be4d0552dce5da12bc Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 28 Nov 2025 16:24:48 -0500 Subject: [PATCH 10/27] Make phase dynamic - bug fix for restart --- internal/verifier/check.go | 6 ------ internal/verifier/migration_verifier.go | 2 -- internal/verifier/progress.go | 10 +++++++--- internal/verifier/webserver.go | 26 ++++++++++++------------- 4 files changed, 20 insertions(+), 24 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index be003ba6..4f2fe703 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -246,11 +246,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.logger.Debug().Msg("Starting Check") - verifier.phase = Check - defer func() { - verifier.phase = Idle - }() - if err := verifier.startChangeHandling(ctx); err != nil { return err } @@ -362,7 +357,6 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // on enqueued rechecks. Meanwhile, generaiton 3’s recheck tasks will // derive from rechecks enqueued during generation 2. verifier.generation++ - verifier.phase = Recheck verifier.mux.Unlock() // Generation of recheck tasks can partial-fail. The following will diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e38fb50c..0e741395 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -89,7 +89,6 @@ type Verifier struct { lastGeneration bool running bool generation int - phase string port int metaURI string metaClient *mongo.Client @@ -181,7 +180,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { logger: logger, writer: logWriter, - phase: Idle, numWorkers: NumWorkers, readPreference: readpref.Primary(), partitionSizeInBytes: 400 * 1024 * 1024, diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 1c60579d..65239c47 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -140,15 +140,19 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { } return Progress{ - Phase: verifier.phase, + Phase: lo.Ternary( + verifier.running, + lo.Ternary(generation > 0, Recheck, Check), + Idle, + ), Generation: verifier.generation, GenerationStats: genStats, - SrcChangeStreamStats: ProgressChangeStreamStats{ + SrcChangeStats: ProgressChangeStats{ EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(), Lag: optDurationToOptString(verifier.srcChangeReader.getLag()), BufferSaturation: verifier.srcChangeReader.getBufferSaturation(), }, - DstChangeStreamStats: ProgressChangeStreamStats{ + DstChangeStats: ProgressChangeStats{ EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(), Lag: optDurationToOptString(verifier.dstChangeReader.getLag()), BufferSaturation: verifier.dstChangeReader.getBufferSaturation(), diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 87bc26b1..3c72855a 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -246,17 +246,17 @@ type ProgressGenerationStats struct { TimeElapsed string `json:"timeElapsed"` ActiveWorkers int - DocsCompared types.DocumentCount `json:"docsCompared"` - TotalDocs types.DocumentCount `json:"totalDocs"` + DocsCompared types.DocumentCount + TotalDocs types.DocumentCount - SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` - TotalSrcBytes types.ByteCount `json:"totalSrcBytes"` + SrcBytesCompared types.ByteCount + TotalSrcBytes types.ByteCount - MismatchesFound int64 `json:"mismatchesFound"` - RechecksEnqueued int64 `json:"rechecksEnqueued"` + MismatchesFound int64 + RechecksEnqueued int64 } -type ProgressChangeStreamStats struct { +type ProgressChangeStats struct { EventsPerSecond option.Option[float64] Lag option.Option[string] BufferSaturation float64 @@ -264,15 +264,15 @@ type ProgressChangeStreamStats struct { // Progress represents the structure of the JSON response from the Progress end point. type Progress struct { - Phase string `json:"phase"` + Phase string - Generation int `json:"generation"` - GenerationStats ProgressGenerationStats `json:"generationStats"` + Generation int + GenerationStats ProgressGenerationStats - SrcChangeStreamStats ProgressChangeStreamStats `json:"srcChangeStreamStats"` - DstChangeStreamStats ProgressChangeStreamStats `json:"dstChangeStreamStats"` + SrcChangeStats ProgressChangeStats + DstChangeStats ProgressChangeStats - Error error `json:"error"` + Error error Status *VerificationStatus `json:"verificationStatus"` } From 6b21175dfdd16d2156dfa640881277b79b68108b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Sun, 30 Nov 2025 12:49:12 -0500 Subject: [PATCH 11/27] replace lag w/ resume & cluster times --- internal/verifier/change_reader.go | 50 +++++++++++++++++++++---- internal/verifier/change_stream_test.go | 4 +- internal/verifier/progress.go | 7 ++-- internal/verifier/summary.go | 8 ++-- internal/verifier/webserver.go | 4 +- 5 files changed, 54 insertions(+), 19 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 056fdbc5..4e65d1cf 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -31,13 +31,24 @@ const ( changeReaderCollectionName = "changeReader" ) +type readerCurrentTimes struct { + LastResumeTime bson.Timestamp + LastClusterTime bson.Timestamp +} + +func (rp readerCurrentTimes) Lag() time.Duration { + return time.Second * time.Duration( + int(rp.LastClusterTime.T)-int(rp.LastResumeTime.T), + ) +} + type changeReader interface { getWhichCluster() whichCluster getReadChannel() <-chan changeEventBatch getStartTimestamp() bson.Timestamp getLastSeenClusterTime() option.Option[bson.Timestamp] getEventsPerSecond() option.Option[float64] - getLag() option.Option[time.Duration] + getCurrentTimes() option.Option[readerCurrentTimes] getBufferSaturation() float64 setWritesOff(bson.Timestamp) start(context.Context, *errgroup.Group) error @@ -64,9 +75,10 @@ type ChangeReaderCommon struct { lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]] + currentTimes *msync.TypedAtomic[option.Option[readerCurrentTimes]] + startAtTs *bson.Timestamp - lag *msync.TypedAtomic[option.Option[time.Duration]] batchSizeHistory *history.History[int] onDDLEvent ddlEventHandling @@ -77,9 +89,9 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { readerType: clusterName, changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), writesOffTs: util.NewEventual[bson.Timestamp](), - lag: msync.NewTypedAtomic(option.None[time.Duration]()), - lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), - batchSizeHistory: history.New[int](time.Minute), + //lag: msync.NewTypedAtomic(option.None[time.Duration]()), + lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), + batchSizeHistory: history.New[int](time.Minute), onDDLEvent: lo.Ternary( clusterName == dst, onDDLEventAllow, @@ -123,11 +135,23 @@ func (rc *ChangeReaderCommon) getBufferSaturation() float64 { return util.DivideToF64(len(rc.changeEventBatchChan), cap(rc.changeEventBatchChan)) } +func (rc *ChangeReaderCommon) getCurrentTimes() option.Option[readerCurrentTimes] { + return rc.currentTimes.Load() +} + +/* // getLag returns the observed change stream lag (i.e., the delta between // cluster time and the most-recently-seen change event). func (rc *ChangeReaderCommon) getLag() option.Option[time.Duration] { - return rc.lag.Load() + if prog, has := rc.progress.Load().Get(); has { + return option.Some( + time.Duration(int(prog.lastClusterTime.T)-int(prog.lastResumeTime.T)) * time.Second, + ) + } + + return option.None[time.Duration]() } +*/ // getEventsPerSecond returns the number of change events per second we’ve been // seeing “recently”. (See implementation for the actual period over which we @@ -221,8 +245,18 @@ func (rc *ChangeReaderCommon) loadResumeToken(ctx context.Context) (option.Optio func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { tokenTs, err := rc.resumeTokenTSExtractor(token) if err == nil { - lagSecs := int64(sess.OperationTime().T) - int64(tokenTs.T) - rc.lag.Store(option.Some(time.Second * time.Duration(lagSecs))) + cTime, err := util.GetClusterTimeFromSession(sess) + if err != nil { + rc.logger.Warn(). + Err(err). + Str("reader", string(rc.getWhichCluster())). + Msg("Failed to extract cluster time from session.") + } else { + rc.currentTimes.Store(option.Some(readerCurrentTimes{ + LastResumeTime: tokenTs, + LastClusterTime: cTime, + })) + } } else { rc.logger.Warn(). Err(err). diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 62a39bca..69807dae 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -593,7 +593,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { verifierRunner.AwaitGenerationEnd(), ) - return verifier.srcChangeReader.getLag().IsSome() + return verifier.srcChangeReader.getCurrentTimes().IsSome() }, time.Minute, 100*time.Millisecond, @@ -602,7 +602,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamLag() { // NB: The lag will include whatever time elapsed above before // verifier read the event, so it can be several seconds. suite.Assert().Less( - verifier.srcChangeReader.getLag().MustGet(), + verifier.srcChangeReader.getCurrentTimes().MustGet().Lag(), 10*time.Minute, "verifier lag is as expected", ) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 65239c47..419641d9 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -7,7 +7,6 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/mslices" - "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" @@ -149,12 +148,12 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { GenerationStats: genStats, SrcChangeStats: ProgressChangeStats{ EventsPerSecond: verifier.srcChangeReader.getEventsPerSecond(), - Lag: optDurationToOptString(verifier.srcChangeReader.getLag()), + CurrentTimes: verifier.srcChangeReader.getCurrentTimes(), BufferSaturation: verifier.srcChangeReader.getBufferSaturation(), }, DstChangeStats: ProgressChangeStats{ EventsPerSecond: verifier.dstChangeReader.getEventsPerSecond(), - Lag: optDurationToOptString(verifier.dstChangeReader.getLag()), + CurrentTimes: verifier.dstChangeReader.getCurrentTimes(), BufferSaturation: verifier.dstChangeReader.getBufferSaturation(), }, Status: vStatus, @@ -162,6 +161,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { } +/* func optDurationToOptString(dur option.Option[time.Duration]) option.Option[string] { var ret option.Option[string] @@ -171,3 +171,4 @@ func optDurationToOptString(dur option.Option[time.Duration]) option.Option[stri return ret } +*/ diff --git a/internal/verifier/summary.go b/internal/verifier/summary.go index d204ed88..01080bb5 100644 --- a/internal/verifier/summary.go +++ b/internal/verifier/summary.go @@ -587,10 +587,10 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { if eventsPerSec, has := cluster.csReader.getEventsPerSecond().Get(); has { var lagNote string - lag, hasLag := cluster.csReader.getLag().Get() + prog, hasProg := cluster.csReader.getCurrentTimes().Get() - if hasLag { - lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(lag)) + if hasProg { + lagNote = fmt.Sprintf("lag: %s; ", reportutils.DurationToHMS(prog.Lag())) } saturation := cluster.csReader.getBufferSaturation() @@ -604,7 +604,7 @@ func (verifier *Verifier) printChangeEventStatistics(builder io.Writer) { reportutils.FmtReal(100*saturation), ) - if hasLag && lag > lagWarnThreshold { + if hasProg && prog.Lag() > lagWarnThreshold { fmt.Fprint( builder, "⚠️ Lag is excessive. Verification may fail. See documentation.\n", diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 3c72855a..7cbd5465 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -243,7 +243,7 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { } type ProgressGenerationStats struct { - TimeElapsed string `json:"timeElapsed"` + TimeElapsed string ActiveWorkers int DocsCompared types.DocumentCount @@ -258,7 +258,7 @@ type ProgressGenerationStats struct { type ProgressChangeStats struct { EventsPerSecond option.Option[float64] - Lag option.Option[string] + CurrentTimes option.Option[readerCurrentTimes] BufferSaturation float64 } From cb089891441ad1d2372b37d968fb728edff42ad1 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Sun, 30 Nov 2025 21:00:17 -0500 Subject: [PATCH 12/27] currentTimes --- internal/verifier/change_reader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index 4e65d1cf..e4245b56 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -89,9 +89,9 @@ func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon { readerType: clusterName, changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize), writesOffTs: util.NewEventual[bson.Timestamp](), - //lag: msync.NewTypedAtomic(option.None[time.Duration]()), - lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), - batchSizeHistory: history.New[int](time.Minute), + currentTimes: msync.NewTypedAtomic(option.None[readerCurrentTimes]()), + lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()), + batchSizeHistory: history.New[int](time.Minute), onDDLEvent: lo.Ternary( clusterName == dst, onDDLEventAllow, From d9294f241f2f1ba9d7c9a58eddb9285928576948 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 13:50:26 -0500 Subject: [PATCH 13/27] remove trace-level command monitor --- internal/verifier/migration_verifier.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 0e741395..e23445cb 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -89,6 +89,7 @@ type Verifier struct { lastGeneration bool running bool generation int + phase string port int metaURI string metaClient *mongo.Client @@ -180,6 +181,7 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { logger: logger, writer: logWriter, + phase: Idle, numWorkers: NumWorkers, readPreference: readpref.Primary(), partitionSizeInBytes: 400 * 1024 * 1024, @@ -1253,16 +1255,8 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { - generation, _ := verifier.getGeneration() - - return verifier.getVerificationStatusForGeneration(ctx, generation) -} - -func (verifier *Verifier) getVerificationStatusForGeneration( - ctx context.Context, - generation int, -) (*VerificationStatus, error) { taskCollection := verifier.verificationTaskCollection() + generation, _ := verifier.getGeneration() var results []bson.Raw @@ -1402,6 +1396,18 @@ func (verifier *Verifier) StartServer() error { return server.Run(context.Background()) } +func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { + status, err := verifier.GetVerificationStatus(ctx) + if err != nil { + return Progress{Error: err}, err + } + return Progress{ + Phase: verifier.phase, + Generation: verifier.generation, + Status: status, + }, nil +} + // Returned boolean indicates that namespaces are cached, and // whatever needs them can proceed. func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool { From 204dc85133f4ca01d8b1f1b429e4c152a891767e Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 13:51:12 -0500 Subject: [PATCH 14/27] Revert "remove trace-level command monitor" This reverts commit d9294f241f2f1ba9d7c9a58eddb9285928576948. --- internal/verifier/migration_verifier.go | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..0e741395 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -89,7 +89,6 @@ type Verifier struct { lastGeneration bool running bool generation int - phase string port int metaURI string metaClient *mongo.Client @@ -181,7 +180,6 @@ func NewVerifier(settings VerifierSettings, logPath string) *Verifier { logger: logger, writer: logWriter, - phase: Idle, numWorkers: NumWorkers, readPreference: readpref.Primary(), partitionSizeInBytes: 400 * 1024 * 1024, @@ -1255,9 +1253,17 @@ func (verifier *Verifier) verifyMetadataAndPartitionCollection( } func (verifier *Verifier) GetVerificationStatus(ctx context.Context) (*VerificationStatus, error) { - taskCollection := verifier.verificationTaskCollection() generation, _ := verifier.getGeneration() + return verifier.getVerificationStatusForGeneration(ctx, generation) +} + +func (verifier *Verifier) getVerificationStatusForGeneration( + ctx context.Context, + generation int, +) (*VerificationStatus, error) { + taskCollection := verifier.verificationTaskCollection() + var results []bson.Raw err := retry.New().WithCallback( @@ -1396,18 +1402,6 @@ func (verifier *Verifier) StartServer() error { return server.Run(context.Background()) } -func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { - status, err := verifier.GetVerificationStatus(ctx) - if err != nil { - return Progress{Error: err}, err - } - return Progress{ - Phase: verifier.phase, - Generation: verifier.generation, - Status: status, - }, nil -} - // Returned boolean indicates that namespaces are cached, and // whatever needs them can proceed. func (verifier *Verifier) ensureNamespaces(ctx context.Context) bool { From ce4bb6b54aa448d655f8c1ea8122bc2855659649 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 14:34:31 -0500 Subject: [PATCH 15/27] renames --- internal/verifier/change_reader.go | 8 +++---- internal/verifier/progress.go | 12 ----------- internal/verifier/webserver.go | 34 +++++++++++++++--------------- 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/internal/verifier/change_reader.go b/internal/verifier/change_reader.go index e4245b56..030fb5e2 100644 --- a/internal/verifier/change_reader.go +++ b/internal/verifier/change_reader.go @@ -32,13 +32,13 @@ const ( ) type readerCurrentTimes struct { - LastResumeTime bson.Timestamp - LastClusterTime bson.Timestamp + LastHandledTime bson.Timestamp `json:"lastHandledTime"` + LastClusterTime bson.Timestamp `json:"lastClusterTime"` } func (rp readerCurrentTimes) Lag() time.Duration { return time.Second * time.Duration( - int(rp.LastClusterTime.T)-int(rp.LastResumeTime.T), + int(rp.LastClusterTime.T)-int(rp.LastHandledTime.T), ) } @@ -253,7 +253,7 @@ func (rc *ChangeReaderCommon) updateLag(sess *mongo.Session, token bson.Raw) { Msg("Failed to extract cluster time from session.") } else { rc.currentTimes.Store(option.Some(readerCurrentTimes{ - LastResumeTime: tokenTs, + LastHandledTime: tokenTs, LastClusterTime: cTime, })) } diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 419641d9..d51a42cb 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -160,15 +160,3 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { }, nil } - -/* -func optDurationToOptString(dur option.Option[time.Duration]) option.Option[string] { - var ret option.Option[string] - - if dur, has := dur.Get(); has { - ret = option.Some(dur.String()) - } - - return ret -} -*/ diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 7cbd5465..10e30ca6 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -243,36 +243,36 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { } type ProgressGenerationStats struct { - TimeElapsed string - ActiveWorkers int + TimeElapsed string `json:"timeElapsed"` + ActiveWorkers int `json:"activeWorkers"` - DocsCompared types.DocumentCount - TotalDocs types.DocumentCount + DocsCompared types.DocumentCount `json:"docsCompared"` + TotalDocs types.DocumentCount `json:"totalDocs"` - SrcBytesCompared types.ByteCount - TotalSrcBytes types.ByteCount + SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` + TotalSrcBytes types.ByteCount `json:"totalSrcBytes,omitempty"` - MismatchesFound int64 - RechecksEnqueued int64 + MismatchesFound int64 `json:"mismatchesFound"` + RechecksEnqueued int64 `json:"rechecksEnqueued"` } type ProgressChangeStats struct { - EventsPerSecond option.Option[float64] - CurrentTimes option.Option[readerCurrentTimes] - BufferSaturation float64 + EventsPerSecond option.Option[float64] `json:"eventsPerSecond"` + CurrentTimes option.Option[readerCurrentTimes] `json:"currentTimes"` + BufferSaturation float64 `json:"bufferSaturation"` } // Progress represents the structure of the JSON response from the Progress end point. type Progress struct { - Phase string + Phase string `json:"phase"` - Generation int - GenerationStats ProgressGenerationStats + Generation int `json:"generation"` + GenerationStats ProgressGenerationStats `json:"generationStats"` - SrcChangeStats ProgressChangeStats - DstChangeStats ProgressChangeStats + SrcChangeStats ProgressChangeStats `json:"srcChangeStats"` + DstChangeStats ProgressChangeStats `json:"dstChangeStats"` - Error error + Error error `json:"error,omitempty"` Status *VerificationStatus `json:"verificationStatus"` } From 697f33a91f21b769228fb0b0063990802dcfa546 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 14:52:03 -0500 Subject: [PATCH 16/27] document the /progress API --- README.md | 47 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 48926001..5074cdac 100644 --- a/README.md +++ b/README.md @@ -112,20 +112,43 @@ The verifier will now check to completion to make sure that there are no inconsi ``` -3. You can poll the status of the verification by hitting the `progress`endpoint. In particular, the `phase`should reveal whether the verifier is done verifying; once the `phase`is `idle`the verification has completed. When the `phase`has reached `idle`, the `error`field should be `null`and the `failedTasks`field should be `0`, if the verification was successful. A non-`null``error`field indicates that the verifier itself ran into an error. `failedTasks`being non-`0`indicates that there was an inconsistency. The logs printed by the verifier itself should have more information regarding what the inconsistencies are. - - ``` - curl -H "Content-Type: application/json" -X GET http://127.0.0.1:27020/api/v1/progress - - ``` - - - - - This is a sample output when inconsistencies are present: +3. You can poll the status of the verification by hitting the `progress` endpoint. In particular, the `phase` should reveal whether the verifier is done verifying. Once the `phase` is `idle`, the verification has completed. At that point the `error` field should be `null`, and the `failedTasks` field should be `0`, if the verification was successful. A non-`null` `error` field indicates that the verifier itself ran into an error. `failedTasks` being non-`0` indicates that there was an inconsistency. See below for how to investigate mismatches. +``` +curl -H "Content-Type: application/json" -X GET http://127.0.0.1:27020/api/v1/progress +``` - `{"progress":{"phase":"idle","error":null,"verificationStatus":{"totalTasks":1,"addedTasks":0,"processingTasks":0,"failedTasks":1,"completedTasks":0,"metadataMismatchTasks":0,"recheckTasks":0}}}` +### `/progress` API contents + +In the below a “timestamp” is an object with `T` and `I` unsigned integers. +These represent a logical time in MongoDB’s replication protocol. + +- `progress` + - `phase` (string): either `idle`, `check`, or `recheck` + - `generation` (unsigned integer) + - `generationStats` + - `timeElapsed` (string, [Go Duration format](https://pkg.go.dev/time#ParseDuration)) + - `activeWorkers` (unsigned integer) + - `docsCompared` (unsigned integer) + - `totalDocs` (unsigned integer) + - `srcBytesCompared` (unsigned integer) + - `totalSrcBytes` (unsigned integer, only present in `check` phase) + - `mismatchesFound` (unsigned integer) + - `rechecksEnqueued` (unsigned integer) + - `srcChangeStats` + - `eventsPerSecond` (nonnegative float, optional) + - `currentTimes` + - `lastHandledTime` (timestamp) + - `lastClusterTime` (timestamp) + - `bufferSaturation` (nonnegative float) + - `error` (string, optional) + - `verificationStatus` (tasks for the current generation) + - `totalTasks` (unsigned integer) + - `addedTasks` (unsigned integer, unstarted tasks) + - `processingTasks` (unsigned integer, in-progress tasks) + - `failedTasks` (unsigned integer, tasks that found a document mismatch) + - `completedTasks` (unsigned integer, tasks that found no problems) + - `metadataMismatchTasks` (unsigned integer, tasks that found a collection metadata mismatch) # CLI Options From d161fa27ccd376a951c669342f0ffc849663a902 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 14:53:11 -0500 Subject: [PATCH 17/27] fix/detail docs --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5074cdac..1a087ce6 100644 --- a/README.md +++ b/README.md @@ -137,10 +137,11 @@ These represent a logical time in MongoDB’s replication protocol. - `rechecksEnqueued` (unsigned integer) - `srcChangeStats` - `eventsPerSecond` (nonnegative float, optional) - - `currentTimes` + - `currentTimes` (optional) - `lastHandledTime` (timestamp) - `lastClusterTime` (timestamp) - `bufferSaturation` (nonnegative float) + - `dstChangeStats` (same fields as `srcChangeStats`) - `error` (string, optional) - `verificationStatus` (tasks for the current generation) - `totalTasks` (unsigned integer) From 3cfa675283df775ce994b0bafe8b80dc736b89d4 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 14:54:23 -0500 Subject: [PATCH 18/27] tidy --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 1a087ce6..668e71c4 100644 --- a/README.md +++ b/README.md @@ -108,14 +108,14 @@ metaURI: mongodb://localhost:28012 The verifier will now check to completion to make sure that there are no inconsistencies. The command you need to send the verifier here is `writesOff`. The command doesn’t block. This means that you will have to poll the verifier, or watch its logs, to see the status of the verification (see `progress`). ``` - curl -H "Content-Type: application/json" -X POST -d '{}' http://127.0.0.1:27020/api/v1/writesOff + curl -H "Content-Type: application/json" -d '{}' http://127.0.0.1:27020/api/v1/writesOff ``` 3. You can poll the status of the verification by hitting the `progress` endpoint. In particular, the `phase` should reveal whether the verifier is done verifying. Once the `phase` is `idle`, the verification has completed. At that point the `error` field should be `null`, and the `failedTasks` field should be `0`, if the verification was successful. A non-`null` `error` field indicates that the verifier itself ran into an error. `failedTasks` being non-`0` indicates that there was an inconsistency. See below for how to investigate mismatches. ``` -curl -H "Content-Type: application/json" -X GET http://127.0.0.1:27020/api/v1/progress +curl http://127.0.0.1:27020/api/v1/progress ``` ### `/progress` API contents From 42cfc2189013a270d775d0abe4bb531d72530cb8 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 14:55:37 -0500 Subject: [PATCH 19/27] title detail --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 668e71c4..2ff9683d 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ The verifier will now check to completion to make sure that there are no inconsi curl http://127.0.0.1:27020/api/v1/progress ``` -### `/progress` API contents +### `/progress` API Response Contents In the below a “timestamp” is an object with `T` and `I` unsigned integers. These represent a logical time in MongoDB’s replication protocol. From 0c850d94c2c84658178b99ad640289f103ff8ef7 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 16:36:37 -0500 Subject: [PATCH 20/27] round elapsed --- internal/verifier/progress.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index d51a42cb..725607e3 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -24,7 +24,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { genElapsed := progressTime.Sub(verifier.generationStartTime) genStats := ProgressGenerationStats{ - TimeElapsed: genElapsed.String(), + TimeElapsed: genElapsed.Round(10 * time.Millisecond).String(), } eg, egCtx := contextplus.ErrGroup(ctx) From fd341cd4ae8acc39d6ce2fc3335ceda7e7a4db35 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 21:45:24 -0500 Subject: [PATCH 21/27] add priorMismatches --- README.md | 1 + internal/verifier/mismatches.go | 48 +++++++++++++++++++++++++++++++++ internal/verifier/progress.go | 20 ++++++++++++++ internal/verifier/webserver.go | 5 ++-- mbson/raw_value.go | 6 ++++- 5 files changed, 77 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2ff9683d..a6f42d0f 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,7 @@ These represent a logical time in MongoDB’s replication protocol. - `totalDocs` (unsigned integer) - `srcBytesCompared` (unsigned integer) - `totalSrcBytes` (unsigned integer, only present in `check` phase) + - `priorMismatches` (unsigned integer, optional, mismatches seen in prior generation) - `mismatchesFound` (unsigned integer) - `rechecksEnqueued` (unsigned integer) - `srcChangeStats` diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 4a4002ba..01306a9a 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "fmt" + "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" @@ -139,6 +140,53 @@ func countMismatchesForTasks( return matched, totalRV.AsInt64() - matched, nil } +func countMismatchesForGeneration( + ctx context.Context, + metaDB *mongo.Database, + generation int, +) (int64, error) { + cursor, err := metaDB.Collection(verificationTasksCollection).Aggregate( + ctx, + mongo.Pipeline{ + {{"$match", bson.D{ + {"generation", generation}, + }}}, + {{"$lookup", bson.D{ + {"from", mismatchesCollectionName}, + {"localField", "_id"}, + {"foreignField", "task"}, + {"as", "mismatches"}, + }}}, + {{"$group", bson.D{ + {"_id", nil}, + {"mismatches", bson.D{ + {"$sum", bson.D{{"$size", "$mismatches"}}}, + }}, + }}}, + }, + ) + if err != nil { + return 0, errors.Wrap(err, "sending query to count last generation’s found mismatches") + } + + defer cursor.Close(ctx) + + if !cursor.Next(ctx) { + if cursor.Err() != nil { + return 0, errors.Wrap(err, "reading count of last generation’s found mismatches") + } + + panic("no mismatches result and no error??") + } + + count, err := mbson.Lookup[int64](cursor.Current, "mismatches") + if err != nil { + return 0, errors.Wrapf(err, "reading mismatches from result (%v)", cursor.Current) + } + + return count, nil +} + func getMismatchesForTasks( ctx context.Context, db *mongo.Database, diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 725607e3..cdc75da2 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -7,6 +7,7 @@ import ( "github.com/10gen/migration-verifier/contextplus" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/mslices" + "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" "go.mongodb.org/mongo-driver/v2/bson" @@ -36,6 +37,25 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { return errors.Wrapf(err, "fetching generation %d’s tasks’ status", generation) }, ) + + if generation > 0 { + eg.Go( + func() error { + count, err := countMismatchesForGeneration( + egCtx, + verifier.metaClient.Database(verifier.metaDBName), + generation-1, + ) + + if err != nil { + return errors.Wrapf(err, "counting mismatches seen during generation %d", generation-1) + } + + genStats.PriorMismatches = option.Some(count) + }, + ) + } + eg.Go( func() error { var err error diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 10e30ca6..a4d93015 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -252,8 +252,9 @@ type ProgressGenerationStats struct { SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` TotalSrcBytes types.ByteCount `json:"totalSrcBytes,omitempty"` - MismatchesFound int64 `json:"mismatchesFound"` - RechecksEnqueued int64 `json:"rechecksEnqueued"` + PriorMismatches option.Option[int64] `json:"priorMismatches"` + MismatchesFound int64 `json:"mismatchesFound"` + RechecksEnqueued int64 `json:"rechecksEnqueued"` } type ProgressChangeStats struct { diff --git a/mbson/raw_value.go b/mbson/raw_value.go index b0b1c96c..50fde919 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -9,7 +9,7 @@ import ( ) type bsonCastRecipient interface { - bson.Raw | bson.Timestamp | bson.ObjectID | string | int32 + bson.Raw | bson.Timestamp | bson.ObjectID | string | int32 | int64 } type bsonSourceTypes interface { @@ -52,6 +52,10 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { if val, ok := in.Int32OK(); ok { return any(val).(T), nil } + case int64: + if val, ok := in.Int64OK(); ok { + return any(val).(T), nil + } default: panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", in)) } From a753d770adee19274eb5515323272506b533fc0a Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 21:46:13 -0500 Subject: [PATCH 22/27] fix --- internal/verifier/progress.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index cdc75da2..79f476da 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -52,6 +52,8 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { } genStats.PriorMismatches = option.Some(count) + + return nil }, ) } From 342ceebef9fe31dfe9a6abcef91b5c9379382420 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 21:54:09 -0500 Subject: [PATCH 23/27] o panic --- internal/verifier/mismatches.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 01306a9a..5b73f6b8 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -176,7 +176,8 @@ func countMismatchesForGeneration( return 0, errors.Wrap(err, "reading count of last generation’s found mismatches") } - panic("no mismatches result and no error??") + // This happens if there were no tasks in the queried generation. + return 0, nil } count, err := mbson.Lookup[int64](cursor.Current, "mismatches") From dc9e868151ab18355953eb4411bd12d860c345ab Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 21:58:49 -0500 Subject: [PATCH 24/27] fix cast err --- mbson/raw_value.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mbson/raw_value.go b/mbson/raw_value.go index 50fde919..5c798f3e 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -60,7 +60,7 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", in)) } - return *new(T), cannotCastErr{in.Type, any(in)} + return *new(T), cannotCastErr{in.Type, *new(T)} } // Lookup fetches a value from a BSON document, casts it to the appropriate From 8df7fcb955150befcb2cc963da09e0bb8986daf9 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 4 Dec 2025 23:29:25 -0500 Subject: [PATCH 25/27] fix mismatch counts --- internal/verifier/mismatches.go | 4 ++-- mbson/raw_value.go | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 5b73f6b8..3638ab69 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -180,12 +180,12 @@ func countMismatchesForGeneration( return 0, nil } - count, err := mbson.Lookup[int64](cursor.Current, "mismatches") + mmRV, err := cursor.Current.LookupErr("mismatches") if err != nil { return 0, errors.Wrapf(err, "reading mismatches from result (%v)", cursor.Current) } - return count, nil + return mbson.ToInt64(mmRV) } func getMismatchesForTasks( diff --git a/mbson/raw_value.go b/mbson/raw_value.go index 5c798f3e..35dd4fa9 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -63,6 +63,15 @@ func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { return *new(T), cannotCastErr{in.Type, *new(T)} } +func ToInt64(in bson.RawValue) (int64, error) { + i64, ok := in.AsInt64OK() + if !ok { + return 0, cannotCastErr{in.Type, i64} + } + + return i64, nil +} + // Lookup fetches a value from a BSON document, casts it to the appropriate // type, then returns the result. func Lookup[T bsonCastRecipient](doc bson.Raw, pointer ...string) (T, error) { From 8f95ff6b08209c7edafe93edbd646557a787f205 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 5 Dec 2025 01:07:51 -0500 Subject: [PATCH 26/27] mapsome --- internal/verifier/check.go | 11 +++++++---- internal/verifier/progress.go | 9 +++++---- internal/verifier/webserver.go | 4 ++-- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/verifier/check.go b/internal/verifier/check.go index 4f2fe703..49b0e947 100644 --- a/internal/verifier/check.go +++ b/internal/verifier/check.go @@ -216,7 +216,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh // Now that we’ve initialized verifier.generation we can // start the change readers. verifier.initializeChangeReaders() - verifier.mux.Unlock() + //verifier.mux.Unlock() err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { @@ -251,7 +251,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } // Log the verification status when initially booting up so it's easy to see the current state - verificationStatus, err := verifier.GetVerificationStatus(ctx) + verificationStatus, err := verifier.getVerificationStatusForGeneration(ctx, verifier.generation) if err != nil { return errors.Wrapf( err, @@ -269,7 +269,7 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh } // Now enter the multi-generational steady check state for { - verifier.mux.Lock() + //verifier.mux.Lock() err = retry.New().WithCallback( func(ctx context.Context, _ *retry.FuncInfo) error { return verifier.persistGenerationWhileLocked(ctx) @@ -281,12 +281,13 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh verifier.mux.Unlock() return errors.Wrapf(err, "failed to persist generation (%d)", verifier.generation) } - verifier.mux.Unlock() verifier.generationStartTime = time.Now() verifier.srcEventRecorder.Reset() verifier.dstEventRecorder.Reset() + verifier.mux.Unlock() + err := verifier.CheckWorker(ctx) if err != nil { return err @@ -378,6 +379,8 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh Err(err). Msg("Failed to clear out old recheck docs. (This is probably unimportant.)") } + + verifier.mux.Lock() } } diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 79f476da..9e598c3c 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -20,12 +20,13 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { var vStatus *VerificationStatus generation := verifier.generation + genStats := ProgressGenerationStats{} - progressTime := time.Now() - genElapsed := progressTime.Sub(verifier.generationStartTime) + if !verifier.generationStartTime.IsZero() { + progressTime := time.Now() + genElapsed := progressTime.Sub(verifier.generationStartTime) - genStats := ProgressGenerationStats{ - TimeElapsed: genElapsed.Round(10 * time.Millisecond).String(), + genStats.TimeElapsed = option.Some(genElapsed.Round(10 * time.Millisecond).String()) } eg, egCtx := contextplus.ErrGroup(ctx) diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index a4d93015..7f8ceb30 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -243,8 +243,8 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { } type ProgressGenerationStats struct { - TimeElapsed string `json:"timeElapsed"` - ActiveWorkers int `json:"activeWorkers"` + TimeElapsed option.Option[string] `json:"timeElapsed"` + ActiveWorkers int `json:"activeWorkers"` DocsCompared types.DocumentCount `json:"docsCompared"` TotalDocs types.DocumentCount `json:"totalDocs"` From ae73b8757a512ed68d74d3ea931ab0d8024c2125 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Fri, 5 Dec 2025 02:13:49 -0500 Subject: [PATCH 27/27] tweak counts --- internal/verifier/mismatches.go | 35 +++++++++++++++++++++++---------- internal/verifier/progress.go | 9 ++++++--- internal/verifier/webserver.go | 11 ++++++++--- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/internal/verifier/mismatches.go b/internal/verifier/mismatches.go index 3638ab69..7395bef1 100644 --- a/internal/verifier/mismatches.go +++ b/internal/verifier/mismatches.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "fmt" - "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/option" "github.com/pkg/errors" "github.com/samber/lo" @@ -140,11 +139,11 @@ func countMismatchesForTasks( return matched, totalRV.AsInt64() - matched, nil } -func countMismatchesForGeneration( +func countRechecksForGeneration( ctx context.Context, metaDB *mongo.Database, generation int, -) (int64, error) { +) (int64, int64, error) { cursor, err := metaDB.Collection(verificationTasksCollection).Aggregate( ctx, mongo.Pipeline{ @@ -157,35 +156,51 @@ func countMismatchesForGeneration( {"foreignField", "task"}, {"as", "mismatches"}, }}}, + {{"$addFields", bson.D{ + {"mismatches", bson.D{{"$size", "$mismatches"}}}, + }}}, {{"$group", bson.D{ {"_id", nil}, + {"changes", bson.D{ + {"$sum", bson.D{ + {"$subtract", bson.A{ + bson.D{{"$size", "$_ids"}}, + "$mismatches", + }}, + }}, + }}, {"mismatches", bson.D{ - {"$sum", bson.D{{"$size", "$mismatches"}}}, + {"$sum", "$mismatches"}, }}, }}}, }, ) if err != nil { - return 0, errors.Wrap(err, "sending query to count last generation’s found mismatches") + return 0, 0, errors.Wrap(err, "sending query to count last generation’s found mismatches") } defer cursor.Close(ctx) if !cursor.Next(ctx) { if cursor.Err() != nil { - return 0, errors.Wrap(err, "reading count of last generation’s found mismatches") + return 0, 0, errors.Wrap(err, "reading count of last generation’s found mismatches") } // This happens if there were no tasks in the queried generation. - return 0, nil + return 0, 0, nil } - mmRV, err := cursor.Current.LookupErr("mismatches") + result := struct { + Mismatches int64 + Changes int64 + }{} + + err = cursor.Decode(&result) if err != nil { - return 0, errors.Wrapf(err, "reading mismatches from result (%v)", cursor.Current) + return 0, 0, errors.Wrapf(err, "reading mismatches from result (%v)", cursor.Current) } - return mbson.ToInt64(mmRV) + return result.Mismatches, result.Changes, nil } func getMismatchesForTasks( diff --git a/internal/verifier/progress.go b/internal/verifier/progress.go index 9e598c3c..cd6cc39e 100644 --- a/internal/verifier/progress.go +++ b/internal/verifier/progress.go @@ -26,7 +26,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { progressTime := time.Now() genElapsed := progressTime.Sub(verifier.generationStartTime) - genStats.TimeElapsed = option.Some(genElapsed.Round(10 * time.Millisecond).String()) + genStats.TimeElapsed = option.Some(genElapsed.Round(time.Millisecond).String()) } eg, egCtx := contextplus.ErrGroup(ctx) @@ -42,7 +42,7 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { if generation > 0 { eg.Go( func() error { - count, err := countMismatchesForGeneration( + mismatches, changes, err := countRechecksForGeneration( egCtx, verifier.metaClient.Database(verifier.metaDBName), generation-1, @@ -52,7 +52,10 @@ func (verifier *Verifier) GetProgress(ctx context.Context) (Progress, error) { return errors.Wrapf(err, "counting mismatches seen during generation %d", generation-1) } - genStats.PriorMismatches = option.Some(count) + genStats.PriorRechecks = option.Some(ProgressRechecks{ + Changes: changes, + Mismatches: mismatches, + }) return nil }, diff --git a/internal/verifier/webserver.go b/internal/verifier/webserver.go index 7f8ceb30..524f3006 100644 --- a/internal/verifier/webserver.go +++ b/internal/verifier/webserver.go @@ -242,6 +242,11 @@ func (server *WebServer) writesOffEndpoint(c *gin.Context) { successResponse(c) } +type ProgressRechecks struct { + Mismatches int64 `json:"mismatches"` + Changes int64 `json:"changes"` +} + type ProgressGenerationStats struct { TimeElapsed option.Option[string] `json:"timeElapsed"` ActiveWorkers int `json:"activeWorkers"` @@ -252,9 +257,9 @@ type ProgressGenerationStats struct { SrcBytesCompared types.ByteCount `json:"srcBytesCompared"` TotalSrcBytes types.ByteCount `json:"totalSrcBytes,omitempty"` - PriorMismatches option.Option[int64] `json:"priorMismatches"` - MismatchesFound int64 `json:"mismatchesFound"` - RechecksEnqueued int64 `json:"rechecksEnqueued"` + PriorRechecks option.Option[ProgressRechecks] `json:"priorRechecks"` + MismatchesFound int64 `json:"mismatchesFound"` + RechecksEnqueued int64 `json:"rechecksEnqueued"` } type ProgressChangeStats struct {