diff --git a/examples/react/paced-mutations-demo/package.json b/examples/react/paced-mutations-demo/package.json index 49f22ec47..8e8ba243b 100644 --- a/examples/react/paced-mutations-demo/package.json +++ b/examples/react/paced-mutations-demo/package.json @@ -10,7 +10,7 @@ }, "dependencies": { "@tanstack/db": "^0.5.11", - "@tanstack/react-db": "^0.1.58", + "@tanstack/react-db": "^0.1.59", "mitt": "^3.0.1", "react": "^19.2.1", "react-dom": "^19.2.1" diff --git a/examples/react/todo/package.json b/examples/react/todo/package.json index 17a84a8bf..b40917598 100644 --- a/examples/react/todo/package.json +++ b/examples/react/todo/package.json @@ -5,8 +5,8 @@ "dependencies": { "@tanstack/electric-db-collection": "^0.2.12", "@tanstack/query-core": "^5.90.12", - "@tanstack/query-db-collection": "^1.0.10", - "@tanstack/react-db": "^0.1.58", + "@tanstack/query-db-collection": "^1.0.11", + "@tanstack/react-db": "^0.1.59", "@tanstack/react-router": "^1.140.0", "@tanstack/react-start": "^1.140.0", "@tanstack/trailbase-db-collection": "^0.1.55", diff --git a/examples/solid/todo/package.json b/examples/solid/todo/package.json index 91c3bce55..9c2dd369b 100644 --- a/examples/solid/todo/package.json +++ b/examples/solid/todo/package.json @@ -5,7 +5,7 @@ "dependencies": { "@tanstack/electric-db-collection": "^0.2.12", "@tanstack/query-core": "^5.90.12", - "@tanstack/query-db-collection": "^1.0.10", + "@tanstack/query-db-collection": "^1.0.11", "@tanstack/solid-db": "^0.1.54", "@tanstack/solid-router": "^1.140.0", "@tanstack/solid-start": "^1.140.0", diff --git a/packages/electric-db-collection/src/electric.ts b/packages/electric-db-collection/src/electric.ts index 477e37641..65d3b7cc0 100644 --- a/packages/electric-db-collection/src/electric.ts +++ b/packages/electric-db-collection/src/electric.ts @@ -1282,10 +1282,10 @@ function createElectricSync>( // Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap let commitPoint: `up-to-date` | `subset-end` | null = null - // Clear the current batch buffer at the START of processing a new batch - // This preserves messages from the previous batch until new ones arrive, - // allowing awaitMatch to find messages even if called after up-to-date - currentBatchMessages.setState(() => []) + // Don't clear the buffer between batches - this preserves messages for awaitMatch + // to find even if multiple batches arrive before awaitMatch is called. + // The buffer is naturally limited by MAX_BATCH_MESSAGES (oldest messages are dropped). + // Reset batchCommitted since we're starting a new batch batchCommitted.setState(() => false) for (const message of messages) { diff --git a/packages/electric-db-collection/tests/electric.test.ts b/packages/electric-db-collection/tests/electric.test.ts index 87d5567e8..3ac3ccf7a 100644 --- a/packages/electric-db-collection/tests/electric.test.ts +++ b/packages/electric-db-collection/tests/electric.test.ts @@ -1363,6 +1363,93 @@ describe(`Electric Integration`, () => { await insert3.isPersisted.promise expect(testCollection.has(502)).toBe(true) }) + + it(`should preserve buffer across heartbeat batches until awaitMatch is called`, async () => { + // This test verifies the fix for the race condition where: + // 1. Batch 1 arrives with insert message + up-to-date + // 2. Batch 2 arrives (heartbeat/empty) BEFORE awaitMatch is called + // 3. awaitMatch is called - should still find the message from Batch 1 + // This was failing before because the buffer was cleared when Batch 2 arrived + + let resolveServerCall: () => void + const serverCallPromise = new Promise((resolve) => { + resolveServerCall = resolve + }) + + const onInsert = vi + .fn() + .mockImplementation(async ({ transaction, collection: col }) => { + const item = transaction.mutations[0].modified + + // Simulate a slow API call + await serverCallPromise + + // awaitMatch is called AFTER multiple batches have arrived + await col.utils.awaitMatch((message: any) => { + return ( + isChangeMessage(message) && + message.headers.operation === `insert` && + message.value.id === item.id + ) + }, 5000) + }) + + const config = { + id: `test-buffer-preserved-across-heartbeats`, + shapeOptions: { + url: `http://test-url`, + params: { table: `test_table` }, + }, + startSync: true, + getKey: (item: Row) => item.id as number, + onInsert, + } + + const testCollection = createCollection(electricCollectionOptions(config)) + + // Start insert - will call onInsert which waits for serverCallPromise + const insertPromise = testCollection.insert({ + id: 600, + name: `Heartbeat Race Test`, + }) + + // Wait for onInsert to start + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Batch 1: insert message + up-to-date + subscriber([ + { + key: `600`, + value: { id: 600, name: `Heartbeat Race Test` }, + headers: { operation: `insert` }, + }, + { headers: { control: `up-to-date` } }, + ]) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Batch 2: heartbeat (just up-to-date, no insert messages) + // This simulates Electric sending a heartbeat while the API call is still in progress + // Previously, this would clear the buffer and lose the insert message from Batch 1 + subscriber([{ headers: { control: `up-to-date` } }]) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Batch 3: another heartbeat (to really stress the scenario) + subscriber([{ headers: { control: `up-to-date` } }]) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Now resolve the server call - awaitMatch should still find the message + // from Batch 1 despite Batches 2 and 3 arriving + resolveServerCall!() + + // Should complete successfully + await insertPromise.isPersisted.promise + + expect(onInsert).toHaveBeenCalled() + expect(testCollection.has(600)).toBe(true) + }) }) // Tests for matching strategies utilities diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c628ad115..8b06f5c89 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -257,7 +257,7 @@ importers: specifier: ^0.5.11 version: link:../../../packages/db '@tanstack/react-db': - specifier: ^0.1.58 + specifier: ^0.1.59 version: link:../../../packages/react-db mitt: specifier: ^3.0.1 @@ -433,10 +433,10 @@ importers: specifier: ^5.90.12 version: 5.90.12 '@tanstack/query-db-collection': - specifier: ^1.0.10 + specifier: ^1.0.11 version: link:../../../packages/query-db-collection '@tanstack/react-db': - specifier: ^0.1.58 + specifier: ^0.1.59 version: link:../../../packages/react-db '@tanstack/react-router': specifier: ^1.140.0 @@ -554,7 +554,7 @@ importers: specifier: ^5.90.12 version: 5.90.12 '@tanstack/query-db-collection': - specifier: ^1.0.10 + specifier: ^1.0.11 version: link:../../../packages/query-db-collection '@tanstack/solid-db': specifier: ^0.1.54