diff --git a/.changeset/lovely-olives-type.md b/.changeset/lovely-olives-type.md new file mode 100644 index 000000000..97f777076 --- /dev/null +++ b/.changeset/lovely-olives-type.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Fix SnapshotTooOld on parameter queries in some cases. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 8853d99a6..bc199bdf0 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -41,6 +41,17 @@ export interface MongoSyncBucketStorageOptions { checksumOptions?: MongoChecksumOptions; } +/** + * Only keep checkpoints around for a minute, before fetching a fresh one. + * + * The reason is that we keep a MongoDB snapshot reference (clusterTime) with the checkpoint, + * and they expire after 5 minutes by default. This is an issue if the checkpoint stream is idle, + * but new clients connect and use an outdated checkpoint snapshot for parameter queries. + * + * These will be filtered out for existing clients, so should not create significant overhead. + */ +const CHECKPOINT_TIMEOUT_MS = 60_000; + export class MongoSyncBucketStorage extends BaseObserver implements storage.SyncRulesBucketStorage @@ -680,25 +691,45 @@ export class MongoSyncBucketStorage // We only watch changes to the active sync rules. // If it changes to inactive, we abort and restart with the new sync rules. - let lastOp: storage.ReplicationCheckpoint | null = null; + try { + while (true) { + // If the stream is idle, we wait a max of a minute (CHECKPOINT_TIMEOUT_MS) + // before we get another checkpoint, to avoid stale checkpoint snapshots. + const timeout = timers + .setTimeout(CHECKPOINT_TIMEOUT_MS, { done: false }, { signal }) + .catch(() => ({ done: true })); + try { + const result = await Promise.race([stream.next(), timeout]); + if (result.done) { + break; + } + } catch (e) { + if (e.name == 'AbortError') { + break; + } + throw e; + } - for await (const _ of stream) { - if (signal.aborted) { - break; - } + if (signal.aborted) { + // Would likely have been caught by the signal on the timeout or the upstream stream, but we check here anyway + break; + } - const op = await this.getCheckpointInternal(); - if (op == null) { - // Sync rules have changed - abort and restart. - // We do a soft close of the stream here - no error - break; - } + const op = await this.getCheckpointInternal(); + if (op == null) { + // Sync rules have changed - abort and restart. + // We do a soft close of the stream here - no error + break; + } - // Check for LSN / checkpoint changes - ignore other metadata changes - if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) { - lastOp = op; + // Previously, we only yielded when the checkpoint or lsn changed. + // However, we always want to use the latest snapshotTime, so we skip that filtering here. + // That filtering could be added in the per-user streams if needed, but in general the capped collection + // should already only contain useful changes in most cases. yield op; } + } finally { + await stream.return(null); } } diff --git a/test-client/src/ndjson.ts b/test-client/src/ndjson.ts index 200843943..e499a1814 100644 --- a/test-client/src/ndjson.ts +++ b/test-client/src/ndjson.ts @@ -10,48 +10,53 @@ export function ndjsonStream( var decoder = new TextDecoder(); var data_buf = ''; - reader.read().then(function processResult(result): void | Promise { - if (result.done) { - if (cancellationRequest) { - // Immediately exit - return; - } - - data_buf = data_buf.trim(); - if (data_buf.length !== 0) { - try { - var data_l = JSON.parse(data_buf); - controller.enqueue({ chunk: data_l, size: data_l.length }); - } catch (e) { - controller.error(e); + reader + .read() + .then(function processResult(result): void | Promise { + if (result.done) { + if (cancellationRequest) { + // Immediately exit return; } + + data_buf = data_buf.trim(); + if (data_buf.length !== 0) { + try { + var data_l = JSON.parse(data_buf); + controller.enqueue({ chunk: data_l, size: data_l.length }); + } catch (e) { + controller.error(e); + return; + } + } + controller.close(); + return; } - controller.close(); - return; - } - var data = decoder.decode(result.value, { stream: true }); - data_buf += data; - var lines = data_buf.split('\n'); - for (var i = 0; i < lines.length - 1; ++i) { - var l = lines[i].trim(); - if (l.length > 0) { - try { - var data_line = JSON.parse(l); - controller.enqueue({ chunk: data_line, size: l.length }); - } catch (e) { - controller.error(e); - cancellationRequest = true; - reader.cancel(); - return; + var data = decoder.decode(result.value, { stream: true }); + data_buf += data; + var lines = data_buf.split('\n'); + for (var i = 0; i < lines.length - 1; ++i) { + var l = lines[i].trim(); + if (l.length > 0) { + try { + var data_line = JSON.parse(l); + controller.enqueue({ chunk: data_line, size: l.length }); + } catch (e) { + controller.error(e); + cancellationRequest = true; + reader.cancel(); + return; + } } } - } - data_buf = lines[lines.length - 1]; + data_buf = lines[lines.length - 1]; - return reader.read().then(processResult); - }); + return reader.read().then(processResult); + }) + .catch((e) => { + controller.error(e); + }); }, cancel: function (reason) { cancellationRequest = true;