Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/react/paced-mutations-demo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"dependencies": {
"@tanstack/db": "^0.5.11",
"@tanstack/react-db": "^0.1.58",
"@tanstack/react-db": "^0.1.59",
"mitt": "^3.0.1",
"react": "^19.2.1",
"react-dom": "^19.2.1"
Expand Down
4 changes: 2 additions & 2 deletions examples/react/todo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"dependencies": {
"@tanstack/electric-db-collection": "^0.2.12",
"@tanstack/query-core": "^5.90.12",
"@tanstack/query-db-collection": "^1.0.10",
"@tanstack/react-db": "^0.1.58",
"@tanstack/query-db-collection": "^1.0.11",
"@tanstack/react-db": "^0.1.59",
"@tanstack/react-router": "^1.140.0",
"@tanstack/react-start": "^1.140.0",
"@tanstack/trailbase-db-collection": "^0.1.55",
Expand Down
2 changes: 1 addition & 1 deletion examples/solid/todo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"@tanstack/electric-db-collection": "^0.2.12",
"@tanstack/query-core": "^5.90.12",
"@tanstack/query-db-collection": "^1.0.10",
"@tanstack/query-db-collection": "^1.0.11",
"@tanstack/solid-db": "^0.1.54",
"@tanstack/solid-router": "^1.140.0",
"@tanstack/solid-start": "^1.140.0",
Expand Down
8 changes: 4 additions & 4 deletions packages/electric-db-collection/src/electric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1282,10 +1282,10 @@ function createElectricSync<T extends Row<unknown>>(
// Track commit point type - up-to-date takes precedence as it also triggers progressive mode atomic swap
let commitPoint: `up-to-date` | `subset-end` | null = null

// Clear the current batch buffer at the START of processing a new batch
// This preserves messages from the previous batch until new ones arrive,
// allowing awaitMatch to find messages even if called after up-to-date
currentBatchMessages.setState(() => [])
// Don't clear the buffer between batches - this preserves messages for awaitMatch
// to find even if multiple batches arrive before awaitMatch is called.
// The buffer is naturally limited by MAX_BATCH_MESSAGES (oldest messages are dropped).
// Reset batchCommitted since we're starting a new batch
batchCommitted.setState(() => false)

for (const message of messages) {
Expand Down
87 changes: 87 additions & 0 deletions packages/electric-db-collection/tests/electric.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,93 @@ describe(`Electric Integration`, () => {
await insert3.isPersisted.promise
expect(testCollection.has(502)).toBe(true)
})

it(`should preserve buffer across heartbeat batches until awaitMatch is called`, async () => {
// This test verifies the fix for the race condition where:
// 1. Batch 1 arrives with insert message + up-to-date
// 2. Batch 2 arrives (heartbeat/empty) BEFORE awaitMatch is called
// 3. awaitMatch is called - should still find the message from Batch 1
// This was failing before because the buffer was cleared when Batch 2 arrived

let resolveServerCall: () => void
const serverCallPromise = new Promise<void>((resolve) => {
resolveServerCall = resolve
})

const onInsert = vi
.fn()
.mockImplementation(async ({ transaction, collection: col }) => {
const item = transaction.mutations[0].modified

// Simulate a slow API call
await serverCallPromise

// awaitMatch is called AFTER multiple batches have arrived
await col.utils.awaitMatch((message: any) => {
return (
isChangeMessage(message) &&
message.headers.operation === `insert` &&
message.value.id === item.id
)
}, 5000)
})

const config = {
id: `test-buffer-preserved-across-heartbeats`,
shapeOptions: {
url: `http://test-url`,
params: { table: `test_table` },
},
startSync: true,
getKey: (item: Row) => item.id as number,
onInsert,
}

const testCollection = createCollection(electricCollectionOptions(config))

// Start insert - will call onInsert which waits for serverCallPromise
const insertPromise = testCollection.insert({
id: 600,
name: `Heartbeat Race Test`,
})

// Wait for onInsert to start
await new Promise((resolve) => setTimeout(resolve, 10))

// Batch 1: insert message + up-to-date
subscriber([
{
key: `600`,
value: { id: 600, name: `Heartbeat Race Test` },
headers: { operation: `insert` },
},
{ headers: { control: `up-to-date` } },
])

await new Promise((resolve) => setTimeout(resolve, 10))

// Batch 2: heartbeat (just up-to-date, no insert messages)
// This simulates Electric sending a heartbeat while the API call is still in progress
// Previously, this would clear the buffer and lose the insert message from Batch 1
subscriber([{ headers: { control: `up-to-date` } }])

await new Promise((resolve) => setTimeout(resolve, 10))

// Batch 3: another heartbeat (to really stress the scenario)
subscriber([{ headers: { control: `up-to-date` } }])

await new Promise((resolve) => setTimeout(resolve, 10))

// Now resolve the server call - awaitMatch should still find the message
// from Batch 1 despite Batches 2 and 3 arriving
resolveServerCall!()

// Should complete successfully
await insertPromise.isPersisted.promise

expect(onInsert).toHaveBeenCalled()
expect(testCollection.has(600)).toBe(true)
})
})

// Tests for matching strategies utilities
Expand Down
8 changes: 4 additions & 4 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading