Skip to content

Commit bec7496

Browse files
authored
Compact fixes (#351)
* Add failing tests. * Remove upsert on bucket_state, to avoid unique index failures. * Filter by group_id for populateChecksums. * Add changeset.
1 parent 725daa1 commit bec7496

File tree

7 files changed

+172
-36
lines changed

7 files changed

+172
-36
lines changed

.changeset/tasty-peaches-give.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-module-mongodb-storage': patch
3+
'@powersync/service-core': patch
4+
'@powersync/service-image': patch
5+
---
6+
7+
Fix "E11000 duplicate key error collection: powersync_demo.bucket_state" in some cases on sync rules deploy

modules/module-mongodb-storage/src/migrations/db/migrations/1741697235857-bucket-state-index.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,7 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => {
1111
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);
1212

1313
try {
14-
await db.bucket_state.createIndex(
15-
{
16-
'_id.g': 1,
17-
last_op: 1
18-
},
19-
{ name: INDEX_NAME, unique: true }
20-
);
14+
await db.createBucketStateIndex();
2115
} finally {
2216
await db.client.close();
2317
}

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { PowerSyncMongo } from './db.js';
66
import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js';
77
import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js';
88
import { cacheKey } from './OperationBatch.js';
9-
import { readSingleBatch } from './util.js';
109

1110
interface CurrentBucketState {
1211
/** Bucket name */
@@ -329,15 +328,11 @@ export class MongoCompactor {
329328
count: 0,
330329
bytes: 0
331330
}
332-
},
333-
$setOnInsert: {
334-
// Only set this if we're creating the document.
335-
// In all other cases, the replication process will have a set a more accurate id.
336-
last_op: this.maxOpId
337331
}
338332
},
339-
// We generally expect this to have been created before, but do handle cases of old unchanged buckets
340-
upsert: true
333+
// We generally expect this to have been created before.
334+
// We don't create new ones here, to avoid issues with the unique index on bucket_updates.
335+
upsert: false
341336
}
342337
});
343338
}
@@ -484,15 +479,25 @@ export class MongoCompactor {
484479
* Subset of compact, only populating checksums where relevant.
485480
*/
486481
async populateChecksums() {
487-
let lastId: BucketStateDocument['_id'] | null = null;
482+
// This is updated after each batch
483+
let lowerBound: BucketStateDocument['_id'] = {
484+
g: this.group_id,
485+
b: new mongo.MinKey() as any
486+
};
487+
// This is static
488+
const upperBound: BucketStateDocument['_id'] = {
489+
g: this.group_id,
490+
b: new mongo.MaxKey() as any
491+
};
488492
while (!this.signal?.aborted) {
489493
// By filtering buckets, we effectively make this "resumeable".
490-
let filter: mongo.Filter<BucketStateDocument> = {
494+
const filter: mongo.Filter<BucketStateDocument> = {
495+
_id: {
496+
$gt: lowerBound,
497+
$lt: upperBound
498+
},
491499
compacted_state: { $exists: false }
492500
};
493-
if (lastId) {
494-
filter._id = { $gt: lastId };
495-
}
496501

497502
const bucketsWithoutChecksums = await this.db.bucket_state
498503
.find(filter, {
@@ -515,7 +520,7 @@ export class MongoCompactor {
515520

516521
await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b));
517522

518-
lastId = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id;
523+
lowerBound = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id;
519524
}
520525
}
521526

@@ -551,15 +556,11 @@ export class MongoCompactor {
551556
checksum: BigInt(bucketChecksum.checksum),
552557
bytes: null
553558
}
554-
},
555-
$setOnInsert: {
556-
// Only set this if we're creating the document.
557-
// In all other cases, the replication process will have a set a more accurate id.
558-
last_op: this.maxOpId
559559
}
560560
},
561-
// We generally expect this to have been created before, but do handle cases of old unchanged buckets
562-
upsert: true
561+
// We don't create new ones here - it gets tricky to get the last_op right with the unique index on:
562+
// bucket_updates: {'id.g': 1, 'last_op': 1}
563+
upsert: false
563564
}
564565
});
565566
}

modules/module-mongodb-storage/src/storage/implementation/MongoTestStorageFactoryGenerator.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag
1818
await db.db.createCollection('bucket_parameters');
1919
}
2020

21-
// Full migrations are not currently run for tests, so we manually create this
22-
await db.createCheckpointEventsCollection();
23-
2421
if (!options?.doNotClear) {
2522
await db.clear();
2623
}
2724

25+
// Full migrations are not currently run for tests, so we manually create the important ones
26+
await db.createCheckpointEventsCollection();
27+
await db.createBucketStateIndex();
28+
2829
return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions);
2930
};
3031
};

modules/module-mongodb-storage/src/storage/implementation/db.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ export class PowerSyncMongo {
127127
max: 50 // max number of documents
128128
});
129129
}
130+
131+
/**
132+
* Only use in migrations and tests.
133+
*/
134+
async createBucketStateIndex() {
135+
// TODO: Implement a better mechanism to use migrations in tests
136+
await this.bucket_state.createIndex(
137+
{
138+
'_id.g': 1,
139+
last_op: 1
140+
},
141+
{ name: 'bucket_updates', unique: true }
142+
);
143+
}
130144
}
131145

132146
export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {

modules/module-mongodb-storage/src/storage/implementation/models.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ export interface BucketStateDocument {
9797
g: number;
9898
b: string;
9999
};
100+
/**
101+
* Important: There is an unique index on {'_id.g': 1, last_op: 1}.
102+
* That means the last_op must match an actual op in the bucket, and not the commit checkpoint.
103+
*/
100104
last_op: bigint;
101105
/**
102106
* If set, this can be treated as "cache" of a checksum at a specific point.
Lines changed: 120 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,122 @@
1-
import { register } from '@powersync/service-core-tests';
2-
import { describe } from 'vitest';
1+
import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests';
2+
import { describe, expect, test } from 'vitest';
33
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
4+
import { storage, SyncRulesBucketStorage } from '@powersync/service-core';
45

5-
describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
6-
describe('Mongo Sync Parameter Storage Compact', () =>
7-
register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
6+
describe('Mongo Sync Bucket Storage Compact', () => {
7+
register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
8+
9+
describe('with blank bucket_state', () => {
10+
// This can happen when migrating from older service versions, that did not populate bucket_state yet.
11+
const populate = async (bucketStorage: SyncRulesBucketStorage) => {
12+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
13+
await batch.save({
14+
sourceTable: TEST_TABLE,
15+
tag: storage.SaveOperationTag.INSERT,
16+
after: {
17+
id: 't1',
18+
owner_id: 'u1'
19+
},
20+
afterReplicaId: test_utils.rid('t1')
21+
});
22+
23+
await batch.save({
24+
sourceTable: TEST_TABLE,
25+
tag: storage.SaveOperationTag.INSERT,
26+
after: {
27+
id: 't2',
28+
owner_id: 'u2'
29+
},
30+
afterReplicaId: test_utils.rid('t2')
31+
});
32+
33+
await batch.commit('1/1');
34+
});
35+
36+
return bucketStorage.getCheckpoint();
37+
};
38+
39+
const setup = async () => {
40+
await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY();
41+
const syncRules = await factory.updateSyncRules({
42+
content: `
43+
bucket_definitions:
44+
by_user:
45+
parameters: select request.user_id() as user_id
46+
data: [select * from test where owner_id = bucket.user_id]
47+
`
48+
});
49+
const bucketStorage = factory.getInstance(syncRules);
50+
const { checkpoint } = await populate(bucketStorage);
51+
52+
return { bucketStorage, checkpoint, factory };
53+
};
54+
55+
test('full compact', async () => {
56+
const { bucketStorage, checkpoint, factory } = await setup();
57+
58+
// Simulate bucket_state from old version not being available
59+
await factory.db.bucket_state.deleteMany({});
60+
61+
await bucketStorage.compact({
62+
clearBatchLimit: 200,
63+
moveBatchLimit: 10,
64+
moveBatchQueryLimit: 10,
65+
maxOpId: checkpoint,
66+
signal: null as any
67+
});
68+
69+
const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user["u1"]', 'by_user["u2"]']);
70+
expect(checksumAfter.get('by_user["u1"]')).toEqual({
71+
bucket: 'by_user["u1"]',
72+
checksum: -659469718,
73+
count: 1
74+
});
75+
expect(checksumAfter.get('by_user["u2"]')).toEqual({
76+
bucket: 'by_user["u2"]',
77+
checksum: 430217650,
78+
count: 1
79+
});
80+
});
81+
82+
test('populatePersistentChecksumCache', async () => {
83+
// Populate old sync rules version
84+
const { factory } = await setup();
85+
86+
// Not populate another version (bucket definition name changed)
87+
const syncRules = await factory.updateSyncRules({
88+
content: `
89+
bucket_definitions:
90+
by_user2:
91+
parameters: select request.user_id() as user_id
92+
data: [select * from test where owner_id = bucket.user_id]
93+
`
94+
});
95+
const bucketStorage = factory.getInstance(syncRules);
96+
97+
await populate(bucketStorage);
98+
const { checkpoint } = await bucketStorage.getCheckpoint();
99+
100+
await bucketStorage.populatePersistentChecksumCache({
101+
maxOpId: checkpoint,
102+
signal: new AbortController().signal
103+
});
104+
105+
const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']);
106+
expect(checksumAfter.get('by_user2["u1"]')).toEqual({
107+
bucket: 'by_user2["u1"]',
108+
checksum: -659469718,
109+
count: 1
110+
});
111+
expect(checksumAfter.get('by_user2["u2"]')).toEqual({
112+
bucket: 'by_user2["u2"]',
113+
checksum: 430217650,
114+
count: 1
115+
});
116+
});
117+
});
118+
});
119+
120+
describe('Mongo Sync Parameter Storage Compact', () => {
121+
register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
122+
});

0 commit comments

Comments
 (0)