Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/tasty-peaches-give.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Fix "E11000 duplicate key error collection: powersync_demo.bucket_state" in some cases on sync rules deploy
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,7 @@ export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.bucket_state.createIndex(
{
'_id.g': 1,
last_op: 1
},
{ name: INDEX_NAME, unique: true }
);
await db.createBucketStateIndex();
} finally {
await db.client.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey, BucketStateDocument } from './models.js';
import { MongoSyncBucketStorage } from './MongoSyncBucketStorage.js';
import { cacheKey } from './OperationBatch.js';
import { readSingleBatch } from './util.js';

interface CurrentBucketState {
/** Bucket name */
Expand Down Expand Up @@ -329,15 +328,11 @@ export class MongoCompactor {
count: 0,
bytes: 0
}
},
$setOnInsert: {
// Only set this if we're creating the document.
// In all other cases, the replication process will have a set a more accurate id.
last_op: this.maxOpId
}
},
// We generally expect this to have been created before, but do handle cases of old unchanged buckets
upsert: true
// We generally expect this to have been created before.
// We don't create new ones here, to avoid issues with the unique index on bucket_updates.
upsert: false
}
});
}
Expand Down Expand Up @@ -484,15 +479,25 @@ export class MongoCompactor {
* Subset of compact, only populating checksums where relevant.
*/
async populateChecksums() {
let lastId: BucketStateDocument['_id'] | null = null;
// This is updated after each batch
let lowerBound: BucketStateDocument['_id'] = {
g: this.group_id,
b: new mongo.MinKey() as any
};
// This is static
const upperBound: BucketStateDocument['_id'] = {
g: this.group_id,
b: new mongo.MaxKey() as any
};
while (!this.signal?.aborted) {
// By filtering buckets, we effectively make this "resumeable".
let filter: mongo.Filter<BucketStateDocument> = {
const filter: mongo.Filter<BucketStateDocument> = {
_id: {
$gt: lowerBound,
$lt: upperBound
},
compacted_state: { $exists: false }
};
if (lastId) {
filter._id = { $gt: lastId };
}

const bucketsWithoutChecksums = await this.db.bucket_state
.find(filter, {
Expand All @@ -515,7 +520,7 @@ export class MongoCompactor {

await this.updateChecksumsBatch(bucketsWithoutChecksums.map((b) => b._id.b));

lastId = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id;
lowerBound = bucketsWithoutChecksums[bucketsWithoutChecksums.length - 1]._id;
}
}

Expand Down Expand Up @@ -551,15 +556,11 @@ export class MongoCompactor {
checksum: BigInt(bucketChecksum.checksum),
bytes: null
}
},
$setOnInsert: {
// Only set this if we're creating the document.
// In all other cases, the replication process will have a set a more accurate id.
last_op: this.maxOpId
}
},
// We generally expect this to have been created before, but do handle cases of old unchanged buckets
upsert: true
// We don't create new ones here - it gets tricky to get the last_op right with the unique index on:
// bucket_updates: {'id.g': 1, 'last_op': 1}
upsert: false
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ export const MongoTestStorageFactoryGenerator = (factoryOptions: MongoTestStorag
await db.db.createCollection('bucket_parameters');
}

// Full migrations are not currently run for tests, so we manually create this
await db.createCheckpointEventsCollection();

if (!options?.doNotClear) {
await db.clear();
}

// Full migrations are not currently run for tests, so we manually create the important ones
await db.createCheckpointEventsCollection();
await db.createBucketStateIndex();

return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }, factoryOptions.internalOptions);
};
};
14 changes: 14 additions & 0 deletions modules/module-mongodb-storage/src/storage/implementation/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ export class PowerSyncMongo {
max: 50 // max number of documents
});
}

/**
* Only use in migrations and tests.
*/
async createBucketStateIndex() {
// TODO: Implement a better mechanism to use migrations in tests
await this.bucket_state.createIndex(
{
'_id.g': 1,
last_op: 1
},
{ name: 'bucket_updates', unique: true }
);
}
}

export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ export interface BucketStateDocument {
g: number;
b: string;
};
/**
* Important: There is an unique index on {'_id.g': 1, last_op: 1}.
* That means the last_op must match an actual op in the bucket, and not the commit checkpoint.
*/
last_op: bigint;
/**
* If set, this can be treated as "cache" of a checksum at a specific point.
Expand Down
125 changes: 120 additions & 5 deletions modules/module-mongodb-storage/test/src/storage_compacting.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,122 @@
import { register } from '@powersync/service-core-tests';
import { describe } from 'vitest';
import { register, TEST_TABLE, test_utils } from '@powersync/service-core-tests';
import { describe, expect, test } from 'vitest';
import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js';
import { storage, SyncRulesBucketStorage } from '@powersync/service-core';

describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
describe('Mongo Sync Parameter Storage Compact', () =>
register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY));
describe('Mongo Sync Bucket Storage Compact', () => {
register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);

describe('with blank bucket_state', () => {
// This can happen when migrating from older service versions, that did not populate bucket_state yet.
const populate = async (bucketStorage: SyncRulesBucketStorage) => {
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't1',
owner_id: 'u1'
},
afterReplicaId: test_utils.rid('t1')
});

await batch.save({
sourceTable: TEST_TABLE,
tag: storage.SaveOperationTag.INSERT,
after: {
id: 't2',
owner_id: 'u2'
},
afterReplicaId: test_utils.rid('t2')
});

await batch.commit('1/1');
});

return bucketStorage.getCheckpoint();
};

const setup = async () => {
await using factory = await INITIALIZED_MONGO_STORAGE_FACTORY();
const syncRules = await factory.updateSyncRules({
content: `
bucket_definitions:
by_user:
parameters: select request.user_id() as user_id
data: [select * from test where owner_id = bucket.user_id]
`
});
const bucketStorage = factory.getInstance(syncRules);
const { checkpoint } = await populate(bucketStorage);

return { bucketStorage, checkpoint, factory };
};

test('full compact', async () => {
const { bucketStorage, checkpoint, factory } = await setup();

// Simulate bucket_state from old version not being available
await factory.db.bucket_state.deleteMany({});

await bucketStorage.compact({
clearBatchLimit: 200,
moveBatchLimit: 10,
moveBatchQueryLimit: 10,
maxOpId: checkpoint,
signal: null as any
});

const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user["u1"]', 'by_user["u2"]']);
expect(checksumAfter.get('by_user["u1"]')).toEqual({
bucket: 'by_user["u1"]',
checksum: -659469718,
count: 1
});
expect(checksumAfter.get('by_user["u2"]')).toEqual({
bucket: 'by_user["u2"]',
checksum: 430217650,
count: 1
});
});

test('populatePersistentChecksumCache', async () => {
// Populate old sync rules version
const { factory } = await setup();

// Not populate another version (bucket definition name changed)
const syncRules = await factory.updateSyncRules({
content: `
bucket_definitions:
by_user2:
parameters: select request.user_id() as user_id
data: [select * from test where owner_id = bucket.user_id]
`
});
const bucketStorage = factory.getInstance(syncRules);

await populate(bucketStorage);
const { checkpoint } = await bucketStorage.getCheckpoint();

await bucketStorage.populatePersistentChecksumCache({
maxOpId: checkpoint,
signal: new AbortController().signal
});

const checksumAfter = await bucketStorage.getChecksums(checkpoint, ['by_user2["u1"]', 'by_user2["u2"]']);
expect(checksumAfter.get('by_user2["u1"]')).toEqual({
bucket: 'by_user2["u1"]',
checksum: -659469718,
count: 1
});
expect(checksumAfter.get('by_user2["u2"]')).toEqual({
bucket: 'by_user2["u2"]',
checksum: 430217650,
count: 1
});
});
});
});

describe('Mongo Sync Parameter Storage Compact', () => {
register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY);
});