diff --git a/.changeset/chilly-melons-check.md b/.changeset/chilly-melons-check.md new file mode 100644 index 000000000..1f77e4d1e --- /dev/null +++ b/.changeset/chilly-melons-check.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-core': patch +'@powersync/lib-service-mongodb': patch +'@powersync/service-image': patch +--- + +Fix pre-computing of checksums after intial replication causing replication timeouts diff --git a/.changeset/late-queens-sell.md b/.changeset/late-queens-sell.md new file mode 100644 index 000000000..fc643adf2 --- /dev/null +++ b/.changeset/late-queens-sell.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Improve performance of the compact job diff --git a/libs/lib-mongodb/src/db/mongo.ts b/libs/lib-mongodb/src/db/mongo.ts index 6bd1f1a79..0f8f9dffd 100644 --- a/libs/lib-mongodb/src/db/mongo.ts +++ b/libs/lib-mongodb/src/db/mongo.ts @@ -9,11 +9,8 @@ export const MONGO_CONNECT_TIMEOUT_MS = 10_000; /** * Time for individual requests to timeout the socket. - * - * Currently increased to cater for slow checksum calculations - may be reduced to 60s again - * if we optimize those. */ -export const MONGO_SOCKET_TIMEOUT_MS = 90_000; +export const MONGO_SOCKET_TIMEOUT_MS = 60_000; /** * Time for individual requests to timeout the operation. @@ -30,11 +27,8 @@ export const MONGO_OPERATION_TIMEOUT_MS = 40_000; * This is time spent on the cursor, not total time. * * Must be less than MONGO_SOCKET_TIMEOUT_MS to ensure proper error handling. - * - * This is temporarily increased to cater for slow checksum calculations, - * may be reduced to MONGO_OPERATION_TIMEOUT_MS again if we optimize those. */ -export const MONGO_CHECKSUM_TIMEOUT_MS = 80_000; +export const MONGO_CHECKSUM_TIMEOUT_MS = 50_000; /** * Same as above, but specifically for clear operations. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts new file mode 100644 index 000000000..ef7c41c03 --- /dev/null +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -0,0 +1,320 @@ +import { + addPartialChecksums, + bson, + BucketChecksum, + ChecksumCache, + ChecksumMap, + FetchPartialBucketChecksum, + InternalOpId, + isPartialChecksum, + PartialChecksum, + PartialChecksumMap, + PartialOrFullChecksum +} from '@powersync/service-core'; +import * as lib_mongo from '@powersync/lib-service-mongodb'; +import { logger } from '@powersync/lib-services-framework'; +import { PowerSyncMongo } from './db.js'; + +/** + * Checksum query implementation. + */ +export class MongoChecksums { + private cache = new ChecksumCache({ + fetchChecksums: (batch) => { + return this.getChecksumsInternal(batch); + } + }); + + constructor( + private db: PowerSyncMongo, + private group_id: number + ) {} + + /** + * Calculate checksums, utilizing the cache. + */ + async getChecksums(checkpoint: InternalOpId, buckets: string[]): Promise { + return this.cache.getChecksumMap(checkpoint, buckets); + } + + clearCache() { + this.cache.clear(); + } + + /** + * Calculate (partial) checksums from bucket_state and the data collection. + * + * Results are not cached. + */ + private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + if (batch.length == 0) { + return new Map(); + } + + const preFilters: any[] = []; + for (let request of batch) { + if (request.start == null) { + preFilters.push({ + _id: { + g: this.group_id, + b: request.bucket + }, + 'compacted_state.op_id': { $exists: true, $lte: request.end } + }); + } + } + + const preStates = new Map(); + + if (preFilters.length > 0) { + // For un-cached bucket checksums, attempt to use the compacted state first. + const states = await this.db.bucket_state + .find({ + $or: preFilters + }) + .toArray(); + for (let state of states) { + const compactedState = state.compacted_state!; + preStates.set(state._id.b, { + opId: compactedState.op_id, + checksum: { + bucket: state._id.b, + checksum: Number(compactedState.checksum), + count: compactedState.count + } + }); + } + } + + const mappedRequests = batch.map((request) => { + let start = request.start; + if (start == null) { + const preState = preStates.get(request.bucket); + if (preState != null) { + start = preState.opId; + } + } + return { + ...request, + start + }; + }); + + const queriedChecksums = await this.queryPartialChecksums(mappedRequests); + + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available + const preState = preStates.get(bucket); + // Could be null if we got no data + const partialChecksum = queriedChecksums.get(bucket); + const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); + + return [bucket, merged]; + }) + ); + } + + /** + * Calculate (partial) checksums from the data collection directly. + */ + async queryPartialChecksums(batch: FetchPartialBucketChecksum[]): Promise { + try { + return await this.queryPartialChecksumsInternal(batch); + } catch (e) { + if (e.codeName == 'MaxTimeMSExpired') { + logger.warn(`Checksum query timed out; falling back to slower version`, e); + // Timeout - try the slower but more robust version + return await this.queryPartialChecksumsFallback(batch); + } + throw lib_mongo.mapQueryError(e, 'while reading checksums'); + } + } + + private async queryPartialChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + const filters: any[] = []; + for (let request of batch) { + filters.push({ + _id: { + $gt: { + g: this.group_id, + b: request.bucket, + o: request.start ?? new bson.MinKey() + }, + $lte: { + g: this.group_id, + b: request.bucket, + o: request.end + } + } + }); + } + + const aggregate = await this.db.bucket_data + .aggregate( + [ + { + $match: { + $or: filters + } + }, + CHECKSUM_QUERY_GROUP_STAGE + ], + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } + ) + // Don't map the error here - we want to keep timeout errors as-is + .toArray(); + + const partialChecksums = new Map( + aggregate.map((doc) => { + const bucket = doc._id; + return [bucket, checksumFromAggregate(doc)]; + }) + ); + + return new Map( + batch.map((request) => { + const bucket = request.bucket; + // Could be null if we got no data + let partialChecksum = partialChecksums.get(bucket); + if (partialChecksum == null) { + partialChecksum = { + bucket, + partialCount: 0, + partialChecksum: 0 + }; + } + if (request.start == null && isPartialChecksum(partialChecksum)) { + partialChecksum = { + bucket, + count: partialChecksum.partialCount, + checksum: partialChecksum.partialChecksum + }; + } + + return [bucket, partialChecksum]; + }) + ); + } + + /** + * Checksums for large buckets can run over the query timeout. + * To avoid this, we query in batches. + * This version can handle larger amounts of data, but is slower, especially for many buckets. + */ + async queryPartialChecksumsFallback(batch: FetchPartialBucketChecksum[]): Promise { + const partialChecksums = new Map(); + for (let request of batch) { + const checksum = await this.slowChecksum(request); + partialChecksums.set(request.bucket, checksum); + } + + return partialChecksums; + } + + private async slowChecksum(request: FetchPartialBucketChecksum): Promise { + const batchLimit = 50_000; + + let lowerBound = 0n; + const bucket = request.bucket; + + let runningChecksum: PartialOrFullChecksum = { + bucket, + partialCount: 0, + partialChecksum: 0 + }; + if (request.start == null) { + runningChecksum = { + bucket, + count: 0, + checksum: 0 + }; + } + + while (true) { + const filter = { + _id: { + $gt: { + g: this.group_id, + b: bucket, + o: lowerBound + }, + $lte: { + g: this.group_id, + b: bucket, + o: request.end + } + } + }; + const docs = await this.db.bucket_data + .aggregate( + [ + { + $match: filter + }, + // sort and limit _before_ grouping + { $sort: { _id: 1 } }, + { $limit: batchLimit }, + CHECKSUM_QUERY_GROUP_STAGE + ], + { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.MONGO_CHECKSUM_TIMEOUT_MS } + ) + .toArray(); + const doc = docs[0]; + if (doc == null) { + return runningChecksum; + } + const partial = checksumFromAggregate(doc); + runningChecksum = addPartialChecksums(bucket, runningChecksum, partial); + const isFinal = doc.count != batchLimit; + if (isFinal) { + break; + } else { + lowerBound = doc.last_op; + } + } + return runningChecksum; + } +} + +const CHECKSUM_QUERY_GROUP_STAGE = { + $group: { + _id: '$_id.b', + // Historically, checksum may be stored as 'int' or 'double'. + // More recently, this should be a 'long'. + // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. + checksum_total: { $sum: { $toLong: '$checksum' } }, + count: { $sum: 1 }, + has_clear_op: { + $max: { + $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] + } + }, + last_op: { $max: '$_id.o' } + } +}; + +/** + * Convert output of CHECKSUM_QUERY_GROUP_STAGE into a checksum. + */ +function checksumFromAggregate(doc: bson.Document): PartialOrFullChecksum { + const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; + const bucket = doc._id; + + if (doc.has_clear_op == 1) { + return { + // full checksum - replaces any previous one + bucket, + checksum: partialChecksum, + count: doc.count + } satisfies BucketChecksum; + } else { + return { + // partial checksum - is added to a previous one + bucket, + partialCount: doc.count, + partialChecksum + } satisfies PartialChecksum; + } +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index f6bf44f2c..7c62717af 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -1,9 +1,10 @@ -import { mongo } from '@powersync/lib-service-mongodb'; +import { mongo, MONGO_OPERATION_TIMEOUT_MS } from '@powersync/lib-service-mongodb'; import { logger, ReplicationAssertionError, ServiceAssertionError } from '@powersync/lib-services-framework'; -import { addChecksums, InternalOpId, storage, utils } from '@powersync/service-core'; +import { addChecksums, InternalOpId, isPartialChecksum, storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js'; +import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js'; import { cacheKey } from './OperationBatch.js'; import { readSingleBatch } from './util.js'; @@ -68,12 +69,14 @@ export class MongoCompactor { private maxOpId: bigint; private buckets: string[] | undefined; private signal?: AbortSignal; + private group_id: number; constructor( + private storage: MongoSyncBucketStorage, private db: PowerSyncMongo, - private group_id: number, options?: MongoCompactOptions ) { + this.group_id = storage.group_id; this.idLimitBytes = (options?.memoryLimitMB ?? DEFAULT_MEMORY_LIMIT_MB) * 1024 * 1024; this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT; this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT; @@ -136,33 +139,57 @@ export class MongoCompactor { o: new mongo.MaxKey() as any }; + const doneWithBucket = async () => { + if (currentState == null) { + return; + } + // Free memory before clearing bucket + currentState.seen.clear(); + if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { + logger.info( + `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` + ); + // Need flush() before clear() + await this.flush(); + await this.clearBucket(currentState); + } + + // Do this _after_ clearBucket so that we have accurate counts. + this.updateBucketChecksums(currentState); + }; + while (!this.signal?.aborted) { // Query one batch at a time, to avoid cursor timeouts - const cursor = this.db.bucket_data.aggregate([ - { - $match: { - _id: { - $gte: lowerBound, - $lt: upperBound + const cursor = this.db.bucket_data.aggregate( + [ + { + $match: { + _id: { + $gte: lowerBound, + $lt: upperBound + } + } + }, + { $sort: { _id: -1 } }, + { $limit: this.moveBatchQueryLimit }, + { + $project: { + _id: 1, + op: 1, + table: 1, + row_id: 1, + source_table: 1, + source_key: 1, + checksum: 1, + size: { $bsonSize: '$$ROOT' } } } - }, - { $sort: { _id: -1 } }, - { $limit: this.moveBatchQueryLimit }, - { - $project: { - _id: 1, - op: 1, - table: 1, - row_id: 1, - source_table: 1, - source_key: 1, - checksum: 1, - size: { $bsonSize: '$$ROOT' } - } - } - ]); - const { data: batch } = await readSingleBatch(cursor); + ], + { batchSize: this.moveBatchQueryLimit } + ); + // We don't limit to a single batch here, since that often causes MongoDB to scan through more than it returns. + // Instead, we load up to the limit. + const batch = await cursor.toArray(); if (batch.length == 0) { // We've reached the end @@ -174,24 +201,8 @@ export class MongoCompactor { for (let doc of batch) { if (currentState == null || doc._id.b != currentState.bucket) { - if (currentState != null) { - if (currentState.lastNotPut != null && currentState.opsSincePut >= 1) { - // Important to flush before clearBucket() - // Does not have to happen before flushBucketChecksums() - await this.flush(); - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - - // Free memory before clearing bucket - currentState!.seen.clear(); - - await this.clearBucket(currentState); - } + await doneWithBucket(); - // Should happen after clearBucket() for accurate stats - this.updateBucketChecksums(currentState); - } currentState = { bucket: doc._id.b, seen: new Map(), @@ -274,21 +285,14 @@ export class MongoCompactor { await this.flush(); } } - } - currentState?.seen.clear(); - if (currentState?.lastNotPut != null && currentState?.opsSincePut > 1) { - logger.info( - `Inserting CLEAR at ${this.group_id}:${currentState.bucket}:${currentState.lastNotPut} to remove ${currentState.opsSincePut} operations` - ); - // Need flush() before clear() - await this.flush(); - await this.clearBucket(currentState); - } - if (currentState != null) { - // Do this _after_ clearBucket so that we have accurate counts. - this.updateBucketChecksums(currentState); + if (currentState != null) { + logger.info(`Processed batch of length ${batch.length} current bucket: ${currentState.bucket}`); + } } + + await doneWithBucket(); + // Need another flush after updateBucketChecksums() await this.flush(); } @@ -475,4 +479,91 @@ export class MongoCompactor { await session.endSession(); } } + + /** + * Subset of compact, only populating checksums where relevant. + */ + async populateChecksums() { + let lastId: BucketStateDocument['_id'] | null = null; + while (!this.signal?.aborted) { + // By filtering buckets, we effectively make this "resumeable". + let filter: mongo.Filter = { + compacted_state: { $exists: false } + }; + if (lastId) { + filter._id = { $gt: lastId }; + } + + const bucketsWithoutChecksums = await this.db.bucket_state + .find(filter, { + projection: { + _id: 1 + }, + sort: { + _id: 1 + }, + limit: 5_000, + maxTimeMS: MONGO_OPERATION_TIMEOUT_MS + }) + .toArray(); + if (bucketsWithoutChecksums.length == 0) { + // All done + break; + } + + logger.info(`Calculating checksums for batch of ${bucketsWithoutChecksums.length} buckets`); + + await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b)); + + lastId = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id; + } + } + + private async updateChecksumsBatch(buckets: string[]) { + const checksums = await this.storage.checksums.queryPartialChecksums( + buckets.map((bucket) => { + return { + bucket, + end: this.maxOpId + }; + }) + ); + + for (let bucketChecksum of checksums.values()) { + if (isPartialChecksum(bucketChecksum)) { + // Should never happen since we don't specify `start` + throw new ServiceAssertionError(`Full checksum expected, got ${JSON.stringify(bucketChecksum)}`); + } + + this.bucketStateUpdates.push({ + updateOne: { + filter: { + _id: { + g: this.group_id, + b: bucketChecksum.bucket + } + }, + update: { + $set: { + compacted_state: { + op_id: this.maxOpId, + count: bucketChecksum.count, + checksum: BigInt(bucketChecksum.checksum), + bytes: null + } + }, + $setOnInsert: { + // Only set this if we're creating the document. + // In all other cases, the replication process will have a set a more accurate id. + last_op: this.maxOpId + } + }, + // We generally expect this to have been created before, but do handle cases of old unchanged buckets + upsert: true + } + }); + } + + await this.flush(); + } } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 781fc3cc8..e566620e9 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -7,9 +7,7 @@ import { ServiceAssertionError } from '@powersync/lib-services-framework'; import { - addPartialChecksums, BroadcastIterable, - BucketChecksum, CHECKPOINT_INVALIDATE_ALL, CheckpointChanges, CompactOptions, @@ -18,7 +16,6 @@ import { InternalOpId, internalToExternalOpId, maxLsn, - PartialChecksum, ProtocolOpId, ReplicationCheckpoint, storage, @@ -34,6 +31,7 @@ import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; +import { MongoChecksums } from './MongoChecksums.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoParameterCompactor } from './MongoParameterCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; @@ -44,11 +42,7 @@ export class MongoSyncBucketStorage implements storage.SyncRulesBucketStorage { private readonly db: PowerSyncMongo; - private checksumCache = new storage.ChecksumCache({ - fetchChecksums: (batch) => { - return this.getChecksumsInternal(batch); - } - }); + readonly checksums: MongoChecksums; private parsedSyncRulesCache: { parsed: SqlSyncRules; options: storage.ParseSyncRulesOptions } | undefined; private writeCheckpointAPI: MongoWriteCheckpointAPI; @@ -62,6 +56,7 @@ export class MongoSyncBucketStorage ) { super(); this.db = factory.db; + this.checksums = new MongoChecksums(this.db, this.group_id); this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ db: this.db, mode: writeCheckpointMode, @@ -491,145 +486,11 @@ export class MongoSyncBucketStorage } async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise { - return this.checksumCache.getChecksumMap(checkpoint, buckets); + return this.checksums.getChecksums(checkpoint, buckets); } clearChecksumCache() { - this.checksumCache.clear(); - } - - private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { - if (batch.length == 0) { - return new Map(); - } - - const preFilters: any[] = []; - for (let request of batch) { - if (request.start == null) { - preFilters.push({ - _id: { - g: this.group_id, - b: request.bucket - }, - 'compacted_state.op_id': { $exists: true, $lte: request.end } - }); - } - } - - const preStates = new Map(); - - if (preFilters.length > 0) { - // For un-cached bucket checksums, attempt to use the compacted state first. - const states = await this.db.bucket_state - .find({ - $or: preFilters - }) - .toArray(); - for (let state of states) { - const compactedState = state.compacted_state!; - preStates.set(state._id.b, { - opId: compactedState.op_id, - checksum: { - bucket: state._id.b, - checksum: Number(compactedState.checksum), - count: compactedState.count - } - }); - } - } - - const filters: any[] = []; - for (let request of batch) { - let start = request.start; - if (start == null) { - const preState = preStates.get(request.bucket); - if (preState != null) { - start = preState.opId; - } - } - - filters.push({ - _id: { - $gt: { - g: this.group_id, - b: request.bucket, - o: start ?? new bson.MinKey() - }, - $lte: { - g: this.group_id, - b: request.bucket, - o: request.end - } - } - }); - } - - const aggregate = await this.db.bucket_data - .aggregate( - [ - { - $match: { - $or: filters - } - }, - { - $group: { - _id: '$_id.b', - // Historically, checksum may be stored as 'int' or 'double'. - // More recently, this should be a 'long'. - // $toLong ensures that we always sum it as a long, avoiding inaccuracies in the calculations. - checksum_total: { $sum: { $toLong: '$checksum' } }, - count: { $sum: 1 }, - has_clear_op: { - $max: { - $cond: [{ $eq: ['$op', 'CLEAR'] }, 1, 0] - } - } - } - } - ], - { session: undefined, readConcern: 'snapshot', maxTimeMS: lib_mongo.db.MONGO_CHECKSUM_TIMEOUT_MS } - ) - .toArray() - .catch((e) => { - throw lib_mongo.mapQueryError(e, 'while reading checksums'); - }); - - const partialChecksums = new Map( - aggregate.map((doc) => { - const partialChecksum = Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff; - const bucket = doc._id; - return [ - bucket, - doc.has_clear_op == 1 - ? ({ - // full checksum - replaces any previous one - bucket, - checksum: partialChecksum, - count: doc.count - } satisfies BucketChecksum) - : ({ - // partial checksum - is added to a previous one - bucket, - partialCount: doc.count, - partialChecksum - } satisfies PartialChecksum) - ]; - }) - ); - - return new Map( - batch.map((request) => { - const bucket = request.bucket; - // Could be null if this is either (1) a partial request, or (2) no compacted checksum was available - const preState = preStates.get(bucket); - // Could be null if we got no data - const partialChecksum = partialChecksums.get(bucket); - const merged = addPartialChecksums(bucket, preState?.checksum ?? null, partialChecksum ?? null); - - return [bucket, merged]; - }) - ); + this.checksums.clearCache(); } async terminate(options?: storage.TerminateOptions) { @@ -779,22 +640,25 @@ export class MongoSyncBucketStorage const checkpoint = await this.getCheckpointInternal(); maxOpId = checkpoint?.checkpoint ?? undefined; } - await new MongoCompactor(this.db, this.group_id, { ...options, maxOpId }).compact(); + await new MongoCompactor(this, this.db, { ...options, maxOpId }).compact(); + if (maxOpId != null && options?.compactParameterData) { await new MongoParameterCompactor(this.db, this.group_id, maxOpId, options).compact(); } } - async populatePersistentChecksumCache(options: Pick): Promise { + async populatePersistentChecksumCache(options: Required>): Promise { + logger.info(`Populating persistent checksum cache...`); const start = Date.now(); - // We do a minimal compact, primarily to populate the checksum cache - await this.compact({ + // We do a minimal compact here. + // We can optimize this in the future. + const compactor = new MongoCompactor(this, this.db, { ...options, - // Skip parameter data - compactParameterData: false, // Don't track updates for MOVE compacting memoryLimitMB: 0 }); + + await compactor.populateChecksums(); const duration = Date.now() - start; logger.info(`Populated persistent checksum cache in ${(duration / 1000).toFixed(1)}s`); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index b877f7f98..998deec2c 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -106,7 +106,7 @@ export interface BucketStateDocument { op_id: InternalOpId; count: number; checksum: bigint; - bytes: number; + bytes: number | null; }; estimate_since_compact?: { diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/storage/implementation/util.ts index 9cf5eabd6..cae1a5b9f 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/util.ts @@ -3,7 +3,7 @@ import * as crypto from 'crypto'; import * as uuid from 'uuid'; import { mongo } from '@powersync/lib-service-mongodb'; -import { storage, utils } from '@powersync/service-core'; +import { BucketChecksum, PartialChecksum, PartialOrFullChecksum, storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument } from './models.js'; diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 3813caa0a..b5e26db4f 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -840,9 +840,14 @@ WHERE oid = $1::regclass`, try { // If anything errors here, the entire replication process is halted, and // all connections automatically closed, including this one. - const replicationConnection = await this.connections.replicationConnection(); - await this.initReplication(replicationConnection); - await this.streamChanges(replicationConnection); + const initReplicationConnection = await this.connections.replicationConnection(); + await this.initReplication(initReplicationConnection); + await initReplicationConnection.end(); + + // At this point, the above connection has often timed out, so we start a new one + const streamReplicationConnection = await this.connections.replicationConnection(); + await this.streamChanges(streamReplicationConnection); + await streamReplicationConnection.end(); } catch (e) { await this.storage.reportError(e); throw e; diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 85b9e2c75..411f9b49d 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -113,26 +113,30 @@ export function addBucketChecksums(a: BucketChecksum, b: PartialChecksum | Bucke export function addPartialChecksums( bucket: string, - a: BucketChecksum | null, + a: PartialChecksum | BucketChecksum | null, b: PartialChecksum | BucketChecksum | null ): PartialChecksum | BucketChecksum { if (a != null && b != null) { if (!isPartialChecksum(b)) { - // Replaces preState + // Replaces a return b; } // merge - return { - bucket, - checksum: addChecksums(a.checksum, b.partialChecksum), - count: a.count + b.partialCount - }; + if (!isPartialChecksum(a)) { + return { + bucket, + checksum: addChecksums(a.checksum, b.partialChecksum), + count: a.count + b.partialCount + }; + } else { + return { + bucket, + partialChecksum: addChecksums(a.partialChecksum, b.partialChecksum), + partialCount: a.partialCount + b.partialCount + }; + } } else if (a != null) { - return { - bucket, - checksum: a.checksum, - count: a.count - }; + return a; } else if (b != null) { return b; } else {