Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/lovely-olives-type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix SnapshotTooOld on parameter queries in some cases.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ export interface MongoSyncBucketStorageOptions {
checksumOptions?: MongoChecksumOptions;
}

/**
* Only keep checkpoints around for a minute, before fetching a fresh one.
*
* The reason is that we keep a MongoDB snapshot reference (clusterTime) with the checkpoint,
* and they expire after 5 minutes by default. This is an issue if the checkpoint stream is idle,
* but new clients connect and use an outdated checkpoint snapshot for parameter queries.
*
* These will be filtered out for existing clients, so should not create significant overhead.
*/
const CHECKPOINT_TIMEOUT_MS = 60_000;

export class MongoSyncBucketStorage
extends BaseObserver<storage.SyncRulesBucketStorageListener>
implements storage.SyncRulesBucketStorage
Expand Down Expand Up @@ -680,25 +691,45 @@ export class MongoSyncBucketStorage

// We only watch changes to the active sync rules.
// If it changes to inactive, we abort and restart with the new sync rules.
let lastOp: storage.ReplicationCheckpoint | null = null;
try {
while (true) {
// If the stream is idle, we wait a max of a minute (CHECKPOINT_TIMEOUT_MS)
// before we get another checkpoint, to avoid stale checkpoint snapshots.
const timeout = timers
.setTimeout(CHECKPOINT_TIMEOUT_MS, { done: false }, { signal })
.catch(() => ({ done: true }));
try {
const result = await Promise.race([stream.next(), timeout]);
if (result.done) {
break;
}
} catch (e) {
if (e.name == 'AbortError') {
break;
}
throw e;
}

for await (const _ of stream) {
if (signal.aborted) {
break;
}
if (signal.aborted) {
// Would likely have been caught by the signal on the timeout or the upstream stream, but we check here anyway
break;
}

const op = await this.getCheckpointInternal();
if (op == null) {
// Sync rules have changed - abort and restart.
// We do a soft close of the stream here - no error
break;
}
const op = await this.getCheckpointInternal();
if (op == null) {
// Sync rules have changed - abort and restart.
// We do a soft close of the stream here - no error
break;
}

// Check for LSN / checkpoint changes - ignore other metadata changes
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
lastOp = op;
// Previously, we only yielded when the checkpoint or lsn changed.
// However, we always want to use the latest snapshotTime, so we skip that filtering here.
// That filtering could be added in the per-user streams if needed, but in general the capped collection
// should already only contain useful changes in most cases.
yield op;
}
} finally {
await stream.return(null);
}
}

Expand Down
75 changes: 40 additions & 35 deletions test-client/src/ndjson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,53 @@ export function ndjsonStream<T>(
var decoder = new TextDecoder();
var data_buf = '';

reader.read().then(function processResult(result): void | Promise<any> {
if (result.done) {
if (cancellationRequest) {
// Immediately exit
return;
}

data_buf = data_buf.trim();
if (data_buf.length !== 0) {
try {
var data_l = JSON.parse(data_buf);
controller.enqueue({ chunk: data_l, size: data_l.length });
} catch (e) {
controller.error(e);
reader
.read()
.then(function processResult(result): void | Promise<any> {
if (result.done) {
if (cancellationRequest) {
// Immediately exit
return;
}

data_buf = data_buf.trim();
if (data_buf.length !== 0) {
try {
var data_l = JSON.parse(data_buf);
controller.enqueue({ chunk: data_l, size: data_l.length });
} catch (e) {
controller.error(e);
return;
}
}
controller.close();
return;
}
controller.close();
return;
}

var data = decoder.decode(result.value, { stream: true });
data_buf += data;
var lines = data_buf.split('\n');
for (var i = 0; i < lines.length - 1; ++i) {
var l = lines[i].trim();
if (l.length > 0) {
try {
var data_line = JSON.parse(l);
controller.enqueue({ chunk: data_line, size: l.length });
} catch (e) {
controller.error(e);
cancellationRequest = true;
reader.cancel();
return;
var data = decoder.decode(result.value, { stream: true });
data_buf += data;
var lines = data_buf.split('\n');
for (var i = 0; i < lines.length - 1; ++i) {
var l = lines[i].trim();
if (l.length > 0) {
try {
var data_line = JSON.parse(l);
controller.enqueue({ chunk: data_line, size: l.length });
} catch (e) {
controller.error(e);
cancellationRequest = true;
reader.cancel();
return;
}
}
}
}
data_buf = lines[lines.length - 1];
data_buf = lines[lines.length - 1];

return reader.read().then(processResult);
});
return reader.read().then(processResult);
})
.catch((e) => {
controller.error(e);
});
},
cancel: function (reason) {
cancellationRequest = true;
Expand Down