diff --git a/packages/db-ivm/src/d2.ts b/packages/db-ivm/src/d2.ts index 8451b2aff..220c758cc 100644 --- a/packages/db-ivm/src/d2.ts +++ b/packages/db-ivm/src/d2.ts @@ -47,19 +47,49 @@ export class D2 implements ID2 { if (!this.#finalized) { throw new Error(`Graph not finalized`) } + console.debug( + `[TanStack-DB-DEBUG] D2.step: running ${this.#operators.length} operators`, + { + operators: this.#operators.map((op) => ({ + id: op.id, + type: op.operatorType, + })), + }, + ) for (const op of this.#operators) { + const hadWork = op.hasPendingWork() + if (hadWork) { + console.debug( + `[TanStack-DB-DEBUG] D2.step: operator ${op.id} (${op.operatorType}) has pending work, running`, + ) + } op.run() } } pendingWork(): boolean { - return this.#operators.some((op) => op.hasPendingWork()) + const operatorsWithWork = this.#operators.filter((op) => + op.hasPendingWork(), + ) + const hasPending = operatorsWithWork.length > 0 + console.debug(`[TanStack-DB-DEBUG] D2.pendingWork:`, { + hasPending, + operatorsWithWork: operatorsWithWork.map((op) => op.id), + }) + return hasPending } run(): void { + console.debug(`[TanStack-DB-DEBUG] D2.run: starting`) + let stepCount = 0 while (this.pendingWork()) { + stepCount++ + console.debug(`[TanStack-DB-DEBUG] D2.run: step ${stepCount}`) this.step() } + console.debug( + `[TanStack-DB-DEBUG] D2.run: complete after ${stepCount} steps`, + ) } } diff --git a/packages/db-ivm/src/graph.ts b/packages/db-ivm/src/graph.ts index 5263bb643..7e395f72e 100644 --- a/packages/db-ivm/src/graph.ts +++ b/packages/db-ivm/src/graph.ts @@ -57,6 +57,7 @@ export class DifferenceStreamWriter implements IDifferenceStreamWriter { export abstract class Operator implements IOperator { protected inputs: Array> protected output: DifferenceStreamWriter + public operatorType = 'unknown' constructor( public id: number, @@ -65,6 +66,8 @@ export abstract class Operator implements IOperator { ) { this.inputs = inputs this.output = output + // Set operator type from constructor name + this.operatorType = this.constructor.name } abstract run(): void @@ -124,8 +127,35 @@ export abstract class LinearUnaryOperator extends UnaryOperator { abstract inner(collection: MultiSet): MultiSet run(): void { - for (const message of this.inputMessages()) { - this.output.sendData(this.inner(message)) + const messages = this.inputMessages() + console.debug( + `[TanStack-DB-DEBUG] LinearUnaryOperator.run (${this.operatorType})`, + { + operatorId: this.id, + messageCount: messages.length, + }, + ) + for (const message of messages) { + const inputItems = message.getInner() + const result = this.inner(message) + const outputItems = result.getInner() + console.debug( + `[TanStack-DB-DEBUG] LinearUnaryOperator.inner (${this.operatorType})`, + { + operatorId: this.id, + inputItemCount: inputItems.length, + outputItemCount: outputItems.length, + inputSample: inputItems.slice(0, 3).map(([item, mult]) => ({ + item: typeof item === 'object' ? JSON.stringify(item) : item, + multiplicity: mult, + })), + outputSample: outputItems.slice(0, 3).map(([item, mult]) => ({ + item: typeof item === 'object' ? JSON.stringify(item) : item, + multiplicity: mult, + })), + }, + ) + this.output.sendData(result) } } } diff --git a/packages/db-ivm/src/operators/output.ts b/packages/db-ivm/src/operators/output.ts index 2e9795e86..aebbab19b 100644 --- a/packages/db-ivm/src/operators/output.ts +++ b/packages/db-ivm/src/operators/output.ts @@ -21,7 +21,21 @@ export class OutputOperator extends UnaryOperator { } run(): void { - for (const message of this.inputMessages()) { + const messages = this.inputMessages() + console.debug(`[TanStack-DB-DEBUG] OutputOperator: run called`, { + operatorId: this.id, + messageCount: messages.length, + }) + for (const message of messages) { + const items = message.getInner() + console.debug(`[TanStack-DB-DEBUG] OutputOperator: processing message`, { + operatorId: this.id, + itemCount: items.length, + items: items.slice(0, 5).map(([item, mult]) => ({ + item: typeof item === 'object' ? JSON.stringify(item) : item, + multiplicity: mult, + })), + }) this.#fn(message) this.output.sendData(message) } diff --git a/packages/db-ivm/src/operators/reduce.ts b/packages/db-ivm/src/operators/reduce.ts index 3a8690e01..f884748c6 100644 --- a/packages/db-ivm/src/operators/reduce.ts +++ b/packages/db-ivm/src/operators/reduce.ts @@ -29,11 +29,23 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { for (const message of this.inputMessages()) { for (const [item, multiplicity] of message.getInner()) { const [key, value] = item + console.debug(`[TanStack-DB-DEBUG] ReduceOperator: adding to index`, { + operatorId: this.id, + key, + value: typeof value === 'object' ? JSON.stringify(value) : value, + multiplicity, + }) this.#index.addValue(key, [value, multiplicity]) keysTodo.add(key) } } + console.debug(`[TanStack-DB-DEBUG] ReduceOperator: processing keys`, { + operatorId: this.id, + keysCount: keysTodo.size, + keys: Array.from(keysTodo).slice(0, 10), // Limit to first 10 for readability + }) + // For each key, compute the reduction and delta const result: Array<[[K, V2], number]> = [] for (const key of keysTodo) { @@ -41,6 +53,14 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { const currOut = this.#indexOut.get(key) const out = this.#f(curr) + console.debug(`[TanStack-DB-DEBUG] ReduceOperator: processing key`, { + operatorId: this.id, + key, + inputValuesCount: curr.length, + previousOutputCount: currOut.length, + newOutputCount: out.length, + }) + // Create maps for current and previous outputs using values directly as keys const newOutputMap = new Map() const oldOutputMap = new Map() @@ -91,6 +111,16 @@ export class ReduceOperator extends UnaryOperator<[K, V1], [K, V2]> { } } + console.debug(`[TanStack-DB-DEBUG] ReduceOperator: run complete`, { + operatorId: this.id, + resultCount: result.length, + results: result.slice(0, 10).map(([[key, value], mult]) => ({ + key, + value: typeof value === 'object' ? JSON.stringify(value) : value, + multiplicity: mult, + })), + }) + if (result.length > 0) { this.output.sendData(new MultiSet(result)) } diff --git a/packages/db-ivm/src/operators/topK.ts b/packages/db-ivm/src/operators/topK.ts index 6ff21d888..d68f9487e 100644 --- a/packages/db-ivm/src/operators/topK.ts +++ b/packages/db-ivm/src/operators/topK.ts @@ -37,7 +37,26 @@ export function topK< const sortedValues = consolidated .getInner() .sort((a, b) => comparator(a[0] as V1Type, b[0] as V1Type)) - return sortedValues.slice(offset, offset + limit) + const result = sortedValues.slice(offset, offset + limit) + + console.debug(`[TanStack-DB-DEBUG] topK: processing`, { + inputCount: values.length, + consolidatedCount: sortedValues.length, + offset, + limit, + resultCount: result.length, + // Show first few items for debugging + sortedFirst3: sortedValues.slice(0, 3).map(([v, m]) => ({ + value: typeof v === 'object' ? JSON.stringify(v) : v, + multiplicity: m, + })), + resultItems: result.map(([v, m]) => ({ + value: typeof v === 'object' ? JSON.stringify(v) : v, + multiplicity: m, + })), + }) + + return result }), ) return reduced as IStreamBuilder diff --git a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts index 3609703e5..42d7a9f09 100644 --- a/packages/db-ivm/src/operators/topKWithFractionalIndex.ts +++ b/packages/db-ivm/src/operators/topKWithFractionalIndex.ts @@ -302,14 +302,45 @@ export class TopKWithFractionalIndexOperator< } run(): void { + const messages = this.inputMessages() + console.debug(`[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.run`, { + operatorId: this.id, + messageCount: messages.length, + indexSize: this.#index.size, + }) + const result: Array<[[K, IndexedValue], number]> = [] - for (const message of this.inputMessages()) { - for (const [item, multiplicity] of message.getInner()) { + for (const message of messages) { + const items = message.getInner() + console.debug( + `[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator: processing message`, + { + operatorId: this.id, + itemCount: items.length, + items: items + .slice(0, 5) + .map(([[key], mult]) => ({ key, multiplicity: mult })), + }, + ) + for (const [item, multiplicity] of items) { const [key, value] = item this.processElement(key, value, multiplicity, result) } } + console.debug( + `[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.run complete`, + { + operatorId: this.id, + resultCount: result.length, + results: result.slice(0, 5).map(([[key, [, index]], mult]) => ({ + key, + index, + multiplicity: mult, + })), + }, + ) + if (result.length > 0) { this.output.sendData(new MultiSet(result)) } @@ -323,6 +354,23 @@ export class TopKWithFractionalIndexOperator< ): void { const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity) + console.debug( + `[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.processElement`, + { + operatorId: this.id, + key, + multiplicity, + oldMultiplicity, + newMultiplicity, + action: + oldMultiplicity <= 0 && newMultiplicity > 0 + ? 'INSERT' + : oldMultiplicity > 0 && newMultiplicity <= 0 + ? 'DELETE' + : 'NO_CHANGE', + }, + ) + let res: TopKChanges<[K, T]> = { moveIn: null, moveOut: null, @@ -341,6 +389,18 @@ export class TopKWithFractionalIndexOperator< // so it doesn't affect the topK } + console.debug( + `[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.processElement result`, + { + operatorId: this.id, + key, + hasMoveIn: res.moveIn !== null, + hasMoveOut: res.moveOut !== null, + moveInKey: res.moveIn ? res.moveIn[0][0] : null, + moveOutKey: res.moveOut ? res.moveOut[0][0] : null, + }, + ) + this.handleMoveIn(res.moveIn, result) this.handleMoveOut(res.moveOut, result) diff --git a/packages/db/src/collection/change-events.ts b/packages/db/src/collection/change-events.ts index 8a422a407..c73b2bcb9 100644 --- a/packages/db/src/collection/change-events.ts +++ b/packages/db/src/collection/change-events.ts @@ -248,6 +248,13 @@ export function createFilteredCallback( const filterFn = createFilterFunctionFromExpression(options.whereExpression!) return (changes: Array>) => { + console.debug( + `[TanStack-DB-DEBUG] createFilteredCallback: filtering changes by whereExpression`, + { + incomingChanges: changes.map((c) => ({ type: c.type, key: c.key })), + }, + ) + const filteredChanges: Array> = [] for (const change of changes) { @@ -255,6 +262,11 @@ export function createFilteredCallback( // For inserts, check if the new value matches the filter if (filterFn(change.value)) { filteredChanges.push(change) + } else { + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT insert by whereExpression`, + { key: change.key }, + ) } } else if (change.type === `update`) { // For updates, we need to check both old and new values @@ -279,13 +291,23 @@ export function createFilteredCallback( type: `delete`, value: change.previousValue!, // Use the previous value for the delete }) + } else { + // If neither matches, don't emit anything + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT update by whereExpression (neither old nor new matches)`, + { key: change.key }, + ) } - // If neither matches, don't emit anything } else { // For deletes, include if the previous value would have matched // (so subscribers know something they were tracking was deleted) if (filterFn(change.value)) { filteredChanges.push(change) + } else { + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT delete by whereExpression`, + { key: change.key }, + ) } } } @@ -293,7 +315,22 @@ export function createFilteredCallback( // Always call the original callback if we have filtered changes OR // if the original changes array was empty (which indicates a ready signal) if (filteredChanges.length > 0 || changes.length === 0) { + console.debug( + `[TanStack-DB-DEBUG] createFilteredCallback: calling originalCallback`, + { + filteredChangesCount: filteredChanges.length, + filteredChanges: filteredChanges.map((c) => ({ + type: c.type, + key: c.key, + })), + }, + ) originalCallback(filteredChanges) + } else { + console.debug( + `[TanStack-DB-DEBUG] createFilteredCallback: NOT calling callback - all changes filtered out`, + { originalChangesCount: changes.length }, + ) } } } diff --git a/packages/db/src/collection/changes.ts b/packages/db/src/collection/changes.ts index 873c906eb..3415864c0 100644 --- a/packages/db/src/collection/changes.ts +++ b/packages/db/src/collection/changes.ts @@ -58,10 +58,20 @@ export class CollectionChangesManager< changes: Array>, forceEmit = false, ): void { + console.debug(`[TanStack-DB-DEBUG] emitEvents called on collection`, { + collectionId: this.collection.id, + changesCount: changes.length, + changeTypes: changes.map((c) => ({ type: c.type, key: c.key })), + forceEmit, + shouldBatchEvents: this.shouldBatchEvents, + subscriberCount: this.changeSubscriptions.size, + }) + // Skip batching for user actions (forceEmit=true) to keep UI responsive if (this.shouldBatchEvents && !forceEmit) { // Add events to the batch this.batchedEvents.push(...changes) + console.debug(`[TanStack-DB-DEBUG] Batching events, not emitting yet`) return } @@ -80,9 +90,15 @@ export class CollectionChangesManager< } if (eventsToEmit.length === 0) { + console.debug(`[TanStack-DB-DEBUG] No events to emit, returning early`) return } + console.debug( + `[TanStack-DB-DEBUG] Emitting to ${this.changeSubscriptions.size} subscriptions`, + { eventsToEmit: eventsToEmit.map((c) => ({ type: c.type, key: c.key })) }, + ) + // Emit to all listeners for (const subscription of this.changeSubscriptions) { subscription.emitEvents(eventsToEmit) @@ -96,6 +112,12 @@ export class CollectionChangesManager< callback: (changes: Array>) => void, options: SubscribeChangesOptions = {}, ): CollectionSubscription { + console.debug(`[TanStack-DB-DEBUG] subscribeChanges called`, { + collectionId: this.collection.id, + includeInitialState: options.includeInitialState, + hasWhereExpression: !!options.whereExpression, + }) + // Start sync and track subscriber this.addSubscriber() @@ -108,12 +130,26 @@ export class CollectionChangesManager< }) if (options.includeInitialState) { + console.debug( + `[TanStack-DB-DEBUG] Requesting snapshot for subscription (includeInitialState: true)`, + ) subscription.requestSnapshot({ trackLoadSubsetPromise: false }) + } else if (options.includeInitialState === false) { + // When explicitly set to false (not just undefined), mark all state as "seen" + // so that all future changes (including deletes) pass through unfiltered. + console.debug( + `[TanStack-DB-DEBUG] markAllStateAsSeen for subscription (includeInitialState: false)`, + ) + subscription.markAllStateAsSeen() } // Add to batched listeners this.changeSubscriptions.add(subscription) + console.debug( + `[TanStack-DB-DEBUG] Subscription added, total subscriptions: ${this.changeSubscriptions.size}`, + ) + return subscription } diff --git a/packages/db/src/collection/state.ts b/packages/db/src/collection/state.ts index b76580c19..aff1a973a 100644 --- a/packages/db/src/collection/state.ts +++ b/packages/db/src/collection/state.ts @@ -225,12 +225,22 @@ export class CollectionStateManager< public recomputeOptimisticState( triggeredByUserAction: boolean = false, ): void { + console.debug(`[TanStack-DB-DEBUG] recomputeOptimisticState called`, { + collectionId: this.collection.id, + triggeredByUserAction, + isCommittingSyncTransactions: this.isCommittingSyncTransactions, + transactionCount: this.transactions.size, + }) + // Skip redundant recalculations when we're in the middle of committing sync transactions // While the sync pipeline is replaying a large batch we still want to honour // fresh optimistic mutations from the UI. Only skip recompute for the // internal sync-driven redraws; user-triggered work (triggeredByUserAction) // must run so live queries stay responsive during long commits. if (this.isCommittingSyncTransactions && !triggeredByUserAction) { + console.debug( + `[TanStack-DB-DEBUG] recomputeOptimisticState: skipping due to isCommittingSyncTransactions`, + ) return } @@ -278,6 +288,15 @@ export class CollectionStateManager< const events: Array> = [] this.collectOptimisticChanges(previousState, previousDeletes, events) + console.debug( + `[TanStack-DB-DEBUG] recomputeOptimisticState: collected events`, + { + collectionId: this.collection.id, + eventsCount: events.length, + eventTypes: events.map((e) => ({ type: e.type, key: e.key })), + }, + ) + // Filter out events for recently synced keys to prevent duplicates // BUT: Only filter out events that are actually from sync operations // New user transactions should NOT be filtered even if the key was recently synced @@ -292,6 +311,10 @@ export class CollectionStateManager< } // Otherwise filter out duplicate sync events + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT event due to recentlySyncedKeys`, + { key: event.key, type: event.type }, + ) return false }) @@ -322,6 +345,10 @@ export class CollectionStateManager< ) if (!hasActiveOptimisticMutation) { + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT delete due to pendingSyncKeys`, + { key: event.key }, + ) return false // Skip this delete event as sync will restore the data } } @@ -329,12 +356,33 @@ export class CollectionStateManager< }) // Update indexes for the filtered events + console.debug( + `[TanStack-DB-DEBUG] recomputeOptimisticState: emitting events (with pending sync filtering)`, + { + collectionId: this.collection.id, + filteredEventsCount: filteredEvents.length, + eventTypes: filteredEvents.map((e) => ({ type: e.type, key: e.key })), + triggeredByUserAction, + }, + ) if (filteredEvents.length > 0) { this.indexes.updateIndexes(filteredEvents) } this.changes.emitEvents(filteredEvents, triggeredByUserAction) } else { // Update indexes for all events + console.debug( + `[TanStack-DB-DEBUG] recomputeOptimisticState: emitting events (no pending sync filtering)`, + { + collectionId: this.collection.id, + eventsCount: filteredEventsBySyncStatus.length, + eventTypes: filteredEventsBySyncStatus.map((e) => ({ + type: e.type, + key: e.key, + })), + triggeredByUserAction, + }, + ) if (filteredEventsBySyncStatus.length > 0) { this.indexes.updateIndexes(filteredEventsBySyncStatus) } diff --git a/packages/db/src/collection/subscription.ts b/packages/db/src/collection/subscription.ts index 44981d460..734375a81 100644 --- a/packages/db/src/collection/subscription.ts +++ b/packages/db/src/collection/subscription.ts @@ -52,6 +52,11 @@ export class CollectionSubscription { private loadedInitialState = false + // Flag to skip filtering in filterAndFlipChanges. + // This is separate from loadedInitialState because we want to allow + // requestSnapshot to still work even when filtering is skipped. + private skipFiltering = false + // Flag to indicate that we have sent at least 1 snapshot. // While `snapshotSent` is false we filter out all changes from subscription to the collection. private snapshotSent = false @@ -173,7 +178,18 @@ export class CollectionSubscription } emitEvents(changes: Array>) { + console.debug(`[TanStack-DB-DEBUG] Subscription.emitEvents called`, { + collectionId: this.collection.id, + incomingChanges: changes.map((c) => ({ type: c.type, key: c.key })), + loadedInitialState: this.loadedInitialState, + sentKeysSize: this.sentKeys.size, + snapshotSent: this.snapshotSent, + }) const newChanges = this.filterAndFlipChanges(changes) + console.debug(`[TanStack-DB-DEBUG] After filterAndFlipChanges`, { + filteredChanges: newChanges.map((c) => ({ type: c.type, key: c.key })), + droppedCount: changes.length - newChanges.length, + }) this.filteredCallback(newChanges) } @@ -443,23 +459,65 @@ export class CollectionSubscription * Updates are flipped into inserts for keys that have not been sent yet. */ private filterAndFlipChanges(changes: Array>) { - if (this.loadedInitialState) { - // We loaded the entire initial state + if (this.loadedInitialState || this.skipFiltering) { + // We loaded the entire initial state or filtering is explicitly skipped // so no need to filter or flip changes + console.debug( + `[TanStack-DB-DEBUG] filterAndFlipChanges: skipping filtering`, + { + loadedInitialState: this.loadedInitialState, + skipFiltering: this.skipFiltering, + }, + ) return changes } + console.debug( + `[TanStack-DB-DEBUG] filterAndFlipChanges: will filter based on sentKeys`, + { sentKeysSize: this.sentKeys.size }, + ) + const newChanges = [] for (const change of changes) { let newChange = change - if (!this.sentKeys.has(change.key)) { + const keyInSentKeys = this.sentKeys.has(change.key) + if (!keyInSentKeys) { if (change.type === `update`) { + console.debug( + `[TanStack-DB-DEBUG] Flipping update to insert for key not in sentKeys`, + { key: change.key }, + ) newChange = { ...change, type: `insert`, previousValue: undefined } } else if (change.type === `delete`) { // filter out deletes for keys that have not been sent + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT delete for key not in sentKeys`, + { key: change.key, sentKeysSize: this.sentKeys.size }, + ) continue } this.sentKeys.add(change.key) + } else { + // Key was already sent - handle based on change type + if (change.type === `insert`) { + // Filter out duplicate inserts - the key was already inserted. + // This prevents D2 multiplicity from going above 1, which would + // cause deletes to not properly remove items (multiplicity would + // go from 2 to 1 instead of 1 to 0). + console.debug( + `[TanStack-DB-DEBUG] FILTERING OUT duplicate insert for key already in sentKeys`, + { key: change.key }, + ) + continue + } else if (change.type === `delete`) { + // Remove from sentKeys so future inserts for this key are allowed + // (e.g., after truncate + reinsert) + this.sentKeys.delete(change.key) + } + console.debug( + `[TanStack-DB-DEBUG] Key found in sentKeys, passing through`, + { key: change.key, type: change.type }, + ) } newChanges.push(newChange) } @@ -467,15 +525,50 @@ export class CollectionSubscription } private trackSentKeys(changes: Array>) { - if (this.loadedInitialState) { - // No need to track sent keys if we loaded the entire state. - // Since we sent everything, all keys must have been observed. + if (this.loadedInitialState || this.skipFiltering) { + // No need to track sent keys if we loaded the entire state or filtering is skipped. + // Since filtering won't be applied, all keys are effectively "observed". + console.debug( + `[TanStack-DB-DEBUG] trackSentKeys: skipping (loadedInitialState=${this.loadedInitialState}, skipFiltering=${this.skipFiltering})`, + ) return } + const keysAdded: Array = [] + const keysRemoved: Array = [] for (const change of changes) { - this.sentKeys.add(change.key) + if (change.type === `delete`) { + // Remove deleted keys from sentKeys so future re-inserts are allowed + if (this.sentKeys.has(change.key)) { + this.sentKeys.delete(change.key) + keysRemoved.push(change.key) + } + } else { + // For inserts and updates, track the key as sent + if (!this.sentKeys.has(change.key)) { + keysAdded.push(change.key) + } + this.sentKeys.add(change.key) + } } + console.debug(`[TanStack-DB-DEBUG] trackSentKeys: updated keys`, { + collectionId: this.collection.id, + keysAddedCount: keysAdded.length, + keysAdded: keysAdded.slice(0, 10), + keysRemovedCount: keysRemoved.length, + keysRemoved: keysRemoved.slice(0, 10), + totalSentKeys: this.sentKeys.size, + }) + } + + /** + * Mark that the subscription should not filter any changes. + * This is used when includeInitialState is explicitly set to false, + * meaning the caller doesn't want initial state but does want ALL future changes. + */ + markAllStateAsSeen() { + console.debug(`[TanStack-DB-DEBUG] markAllStateAsSeen called`) + this.skipFiltering = true } unsubscribe() { diff --git a/packages/db/src/query/live/collection-config-builder.ts b/packages/db/src/query/live/collection-config-builder.ts index b663fdad5..ab6d7ddfe 100644 --- a/packages/db/src/query/live/collection-config-builder.ts +++ b/packages/db/src/query/live/collection-config-builder.ts @@ -308,10 +308,20 @@ export class CollectionConfigBuilder< // So this callback would notice that it doesn't have enough rows and load some more. // The callback returns a boolean, when it's true it's done loading data and we can mark the collection as ready. maybeRunGraph(callback?: () => boolean) { + console.debug(`[TanStack-DB-DEBUG] maybeRunGraph called`, { + liveQueryId: this.id, + isGraphRunning: this.isGraphRunning, + hasCurrentSyncConfig: !!this.currentSyncConfig, + hasCurrentSyncState: !!this.currentSyncState, + }) + if (this.isGraphRunning) { // no nested runs of the graph // which is possible if the `callback` // would call `maybeRunGraph` e.g. after it has loaded some more data + console.debug( + `[TanStack-DB-DEBUG] maybeRunGraph: EARLY RETURN - graph already running`, + ) return } @@ -330,16 +340,43 @@ export class CollectionConfigBuilder< // Don't run if the live query is in an error state if (this.isInErrorState) { + console.debug( + `[TanStack-DB-DEBUG] maybeRunGraph: EARLY RETURN - in error state`, + ) return } // Always run the graph if subscribed (eager execution) if (syncState.subscribedToAllCollections) { + const hasPendingWork = syncState.graph.pendingWork() + console.debug( + `[TanStack-DB-DEBUG] maybeRunGraph: checking pending work`, + { + liveQueryId: this.id, + subscribedToAllCollections: true, + hasPendingWork, + }, + ) + + let runCount = 0 while (syncState.graph.pendingWork()) { + runCount++ + console.debug( + `[TanStack-DB-DEBUG] maybeRunGraph: running graph iteration`, + { + liveQueryId: this.id, + runCount, + }, + ) syncState.graph.run() callback?.() } + console.debug(`[TanStack-DB-DEBUG] maybeRunGraph: graph run complete`, { + liveQueryId: this.id, + totalRuns: runCount, + }) + // On the initial run, we may need to do an empty commit to ensure that // the collection is initialized if (syncState.messagesCount === 0) { @@ -349,6 +386,11 @@ export class CollectionConfigBuilder< // (in case all sources were already ready before we subscribed) this.updateLiveQueryStatus(this.currentSyncConfig) } + } else { + console.debug( + `[TanStack-DB-DEBUG] maybeRunGraph: NOT subscribed to all collections yet`, + { liveQueryId: this.id }, + ) } } finally { this.isGraphRunning = false @@ -383,6 +425,12 @@ export class CollectionConfigBuilder< }, ) { const contextId = options?.contextId ?? getActiveTransaction()?.id + console.debug(`[TanStack-DB-DEBUG] scheduleGraphRun called`, { + liveQueryId: this.id, + hasCallback: !!callback, + alias: options?.alias, + contextId: contextId ?? `immediate`, + }) // Use the builder instance as the job ID for deduplication. This is memory-safe // because the scheduler's context Map is deleted after flushing (no long-term retention). const jobId = options?.jobId ?? this @@ -481,6 +529,12 @@ export class CollectionConfigBuilder< contextId?: SchedulerContextId, pendingParam?: PendingGraphRun, ): void { + console.debug(`[TanStack-DB-DEBUG] executeGraphRun called`, { + liveQueryId: this.id, + contextId: contextId ?? `immediate`, + hasPendingParam: !!pendingParam, + }) + // Get pending state: either from parameter (no context) or from map (with context) // Remove from map BEFORE checking sync state to prevent leaking entries when sync ends // before the transaction flushes (e.g., unsubscribe during in-flight transaction) @@ -493,14 +547,27 @@ export class CollectionConfigBuilder< // If no pending state, nothing to execute (context was cleared) if (!pending) { + console.debug( + `[TanStack-DB-DEBUG] executeGraphRun: EARLY RETURN - no pending state`, + { liveQueryId: this.id }, + ) return } // If sync session has ended, don't execute (graph is finalized, subscriptions cleared) if (!this.currentSyncConfig || !this.currentSyncState) { + console.debug( + `[TanStack-DB-DEBUG] executeGraphRun: EARLY RETURN - no sync session`, + { liveQueryId: this.id }, + ) return } + console.debug(`[TanStack-DB-DEBUG] executeGraphRun: proceeding to run`, { + liveQueryId: this.id, + loadCallbacksCount: pending.loadCallbacks.size, + }) + this.incrementRunCount() const combinedLoader = () => { @@ -711,6 +778,19 @@ export class CollectionConfigBuilder< const { write, collection } = config const { deletes, inserts, value, orderByIndex } = changes + console.debug( + `[TanStack-DB-DEBUG] applyChanges called (D2 pipeline output)`, + { + liveQueryId: this.id, + key, + deletes, + inserts, + hasValue: !!value, + orderByIndex, + collectionHasKey: collection.has(collection.getKeyFromItem(value)), + }, + ) + // Store the key of the result so that we can retrieve it in the // getKey function this.resultKeys.set(value, key) @@ -722,6 +802,7 @@ export class CollectionConfigBuilder< // Simple singular insert. if (inserts && deletes === 0) { + console.debug(`[TanStack-DB-DEBUG] applyChanges: writing INSERT`, { key }) write({ value, type: `insert`, @@ -733,12 +814,14 @@ export class CollectionConfigBuilder< // was inserted previously). (inserts === deletes && collection.has(collection.getKeyFromItem(value))) ) { + console.debug(`[TanStack-DB-DEBUG] applyChanges: writing UPDATE`, { key }) write({ value, type: `update`, }) // Only delete is left as an option } else if (deletes > 0) { + console.debug(`[TanStack-DB-DEBUG] applyChanges: writing DELETE`, { key }) write({ value, type: `delete`, diff --git a/packages/db/src/query/live/collection-subscriber.ts b/packages/db/src/query/live/collection-subscriber.ts index f368562cf..f01d6c0df 100644 --- a/packages/db/src/query/live/collection-subscriber.ts +++ b/packages/db/src/query/live/collection-subscriber.ts @@ -29,6 +29,11 @@ export class CollectionSubscriber< { resolve: () => void } >() + // Track keys that have been sent to the D2 pipeline to prevent duplicate inserts + // This is necessary because different code paths (initial load, change events) + // can potentially send the same item to D2 multiple times. + private sentToD2Keys = new Set() + constructor( private alias: string, private collectionId: string, @@ -129,16 +134,58 @@ export class CollectionSubscriber< changes: Iterable>, callback?: () => boolean, ) { + const changesArray = Array.isArray(changes) ? changes : [...changes] + console.debug(`[TanStack-DB-DEBUG] sendChangesToPipeline called`, { + alias: this.alias, + collectionId: this.collection.id, + changesCount: changesArray.length, + changeTypes: changesArray.map((c) => ({ type: c.type, key: c.key })), + }) + + // Filter changes to prevent duplicate inserts to D2 pipeline. + // This ensures D2 multiplicity stays at 1 for visible items, so deletes + // properly reduce multiplicity to 0 (triggering DELETE output). + const filteredChanges: Array> = [] + for (const change of changesArray) { + if (change.type === `insert`) { + if (this.sentToD2Keys.has(change.key)) { + // Skip duplicate insert - already sent to D2 + console.debug( + `[TanStack-DB-DEBUG] sendChangesToPipeline: FILTERING duplicate insert`, + { key: change.key, alias: this.alias }, + ) + continue + } + this.sentToD2Keys.add(change.key) + } else if (change.type === `delete`) { + // Remove from tracking so future re-inserts are allowed + this.sentToD2Keys.delete(change.key) + } + // Updates are handled as delete+insert by splitUpdates, so no special handling needed + filteredChanges.push(change) + } + + console.debug(`[TanStack-DB-DEBUG] sendChangesToPipeline after filtering`, { + originalCount: changesArray.length, + filteredCount: filteredChanges.length, + alias: this.alias, + }) + // currentSyncState and input are always defined when this method is called // (only called from active subscriptions during a sync session) const input = this.collectionConfigBuilder.currentSyncState!.inputs[this.alias]! const sentChanges = sendChangesToInput( input, - changes, + filteredChanges, this.collection.config.getKey, ) + console.debug(`[TanStack-DB-DEBUG] sendChangesToInput returned`, { + sentChanges, + alias: this.alias, + }) + // Do not provide the callback that loads more data // if there's no more data to load // otherwise we end up in an infinite loop trying to load more data @@ -162,8 +209,11 @@ export class CollectionSubscriber< this.sendChangesToPipeline(changes) } + // Only pass includeInitialState when true. When it's false, we leave it + // undefined so that user subscriptions with explicit `includeInitialState: false` + // can be distinguished from internal lazy-loading subscriptions. const subscription = this.collection.subscribeChanges(sendChanges, { - includeInitialState, + ...(includeInitialState && { includeInitialState }), whereExpression, }) @@ -176,11 +226,32 @@ export class CollectionSubscriber< ) { const { orderBy, offset, limit, index } = orderByInfo + console.debug( + `[TanStack-DB-DEBUG] subscribeToOrderedChanges: setting up subscription`, + { + alias: this.alias, + collectionId: this.collection.id, + hasWhereExpression: !!whereExpression, + hasIndex: !!index, + offset, + limit, + }, + ) + const sendChangesInRange = ( changes: Iterable>, ) => { + const changesArray = Array.isArray(changes) ? changes : [...changes] + console.debug( + `[TanStack-DB-DEBUG] subscribeToOrderedChanges.sendChangesInRange called`, + { + alias: this.alias, + changesCount: changesArray.length, + changeTypes: changesArray.map((c) => ({ type: c.type, key: c.key })), + }, + ) // Split live updates into a delete of the old value and an insert of the new value - const splittedChanges = splitUpdates(changes) + const splittedChanges = splitUpdates(changesArray) this.sendChangesToPipelineWithTracking(splittedChanges, subscription) } @@ -194,6 +265,8 @@ export class CollectionSubscriber< // This ensures that after a must-refetch/truncate, we don't use stale cursor data const truncateUnsubscribe = this.collection.on(`truncate`, () => { this.biggest = undefined + // Also clear D2 key tracking so all items can be re-inserted after truncate + this.sentToD2Keys.clear() }) // Clean up truncate listener when subscription is unsubscribed @@ -374,7 +447,17 @@ function sendChangesToInput( getKey: (item: ChangeMessage[`value`]) => any, ): number { const multiSetArray: MultiSetArray = [] - for (const change of changes) { + const changesArray = Array.isArray(changes) ? changes : [...changes] + + console.debug(`[TanStack-DB-DEBUG] sendChangesToInput: processing changes`, { + changesCount: changesArray.length, + changeTypes: changesArray.map((c) => ({ + type: c.type, + key: getKey(c.value), + })), + }) + + for (const change of changesArray) { const key = getKey(change.value) if (change.type === `insert`) { multiSetArray.push([[key, change.value], 1]) @@ -383,12 +466,24 @@ function sendChangesToInput( multiSetArray.push([[key, change.value], 1]) } else { // change.type === `delete` + console.debug( + `[TanStack-DB-DEBUG] sendChangesToInput: adding DELETE to multiset`, + { key }, + ) multiSetArray.push([[key, change.value], -1]) } } if (multiSetArray.length !== 0) { + console.debug( + `[TanStack-DB-DEBUG] sendChangesToInput: sending to D2 input`, + { multiSetArrayLength: multiSetArray.length }, + ) input.sendData(new MultiSet(multiSetArray)) + } else { + console.debug( + `[TanStack-DB-DEBUG] sendChangesToInput: NO data to send (multiSetArray empty)`, + ) } return multiSetArray.length @@ -403,9 +498,17 @@ function* splitUpdates< ): Generator> { for (const change of changes) { if (change.type === `update`) { + console.debug( + `[TanStack-DB-DEBUG] splitUpdates: splitting update into delete+insert`, + { key: change.key }, + ) yield { type: `delete`, key: change.key, value: change.previousValue! } yield { type: `insert`, key: change.key, value: change.value } } else { + console.debug(`[TanStack-DB-DEBUG] splitUpdates: passing through`, { + type: change.type, + key: change.key, + }) yield change } } diff --git a/packages/db/tests/collection-subscribe-changes.test.ts b/packages/db/tests/collection-subscribe-changes.test.ts index 4258302c7..f3f4b1cce 100644 --- a/packages/db/tests/collection-subscribe-changes.test.ts +++ b/packages/db/tests/collection-subscribe-changes.test.ts @@ -1193,8 +1193,10 @@ describe(`Collection.subscribeChanges`, () => { f.write({ type: `insert`, value: { id: 1, value: `server-after` } }) f.commit() - // Expect delete, insert with optimistic value, and an empty event from markReady - expect(changeEvents.length).toBe(3) + // Expect delete and insert with optimistic value + // Note: Previously there was a duplicate insert event that was incorrectly + // being sent, causing 3 events. Now duplicates are filtered correctly. + expect(changeEvents.length).toBe(2) expect(changeEvents[0]).toEqual({ type: `delete`, key: 1, diff --git a/packages/db/tests/query/optimistic-delete-with-limit.test.ts b/packages/db/tests/query/optimistic-delete-with-limit.test.ts new file mode 100644 index 000000000..2eec67d7b --- /dev/null +++ b/packages/db/tests/query/optimistic-delete-with-limit.test.ts @@ -0,0 +1,596 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { createCollection } from '../../src/collection/index.js' +import { mockSyncCollectionOptions } from '../utils.js' +import { createLiveQueryCollection } from '../../src/query/live-query-collection.js' +import { like } from '../../src/query/builder/functions.js' +import type { ChangeMessage } from '../../src/types.js' +import type { Collection } from '../../src/collection/index.js' + +type Item = { + id: string + value: number + name: string +} + +const initialData: Array = [ + { id: `1`, value: 100, name: `Item A` }, + { id: `2`, value: 90, name: `Item B` }, + { id: `3`, value: 80, name: `Item C` }, + { id: `4`, value: 70, name: `Item D` }, + { id: `5`, value: 60, name: `Item E` }, +] + +describe(`Optimistic delete with limit`, () => { + let sourceCollection: Collection + + beforeEach(async () => { + sourceCollection = createCollection( + mockSyncCollectionOptions({ + id: `items`, + getKey: (item: Item) => item.id, + initialData, + }), + ) + + // Wait for the collection to be ready + await sourceCollection.preload() + }) + + it(`should emit delete event with limit`, async () => { + // Create a live query with orderBy and limit (matching the user's pattern) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(3) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(3) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`]) + + // Subscribe to changes on the live query collection + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: false, + }) + + // Clear any initial calls from subscription setup + changeCallback.mockClear() + + // Optimistically delete item 2 (which is in the visible top 3) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + console.log( + `All changes (with limit):`, + JSON.stringify(allChanges, null, 2), + ) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event without limit (baseline)`, async () => { + // Create a live query WITHOUT limit (for comparison) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(5) + + // Subscribe to changes on the live query collection + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: false, + }) + + // Clear any initial calls from subscription setup + changeCallback.mockClear() + + // Optimistically delete item 2 + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + console.log( + `All changes (without limit):`, + JSON.stringify(allChanges, null, 2), + ) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event with limit and includeInitialState: true`, async () => { + // Create a live query with orderBy and limit (matching the user's exact pattern) + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(3) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(3) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`, `3`]) + + // Subscribe to changes on the live query collection with includeInitialState: true + // This is what the user is doing + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Optimistically delete item 2 (which is in the visible top 3) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + console.log( + `All changes (with limit, includeInitialState: true):`, + JSON.stringify(allChanges, null, 2), + ) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event with limit and offset`, async () => { + // Create a live query with orderBy, limit AND offset (matching the user's exact pattern) + const pageSize = 2 + const pageIndex = 0 + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - should be items 1 and 2 (highest values) + const initialResults = Array.from(liveQueryCollection.values()) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes with includeInitialState: true (same as user) + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 2 (which is in the visible page) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + console.log( + `All changes (with limit+offset, includeInitialState: true):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete event with where clause, limit and offset (matching user's exact pattern)`, async () => { + // Create a live query that matches the user's pattern: + // query.where(...).orderBy(...).limit(pageSize).offset(pageIndex * pageSize) + const pageSize = 2 + const pageIndex = 0 + const search = `Item` // Simulating their search filter + + const liveQueryCollection = createLiveQueryCollection((q) => { + let query = q.from({ items: sourceCollection }) + // Add a where clause like the user does + query = query.where(({ items }) => like(items.name, `%${search}%`)) + return query + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })) + }) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - should be items 1 and 2 (highest values matching search) + const initialResults = Array.from(liveQueryCollection.values()) + console.log( + `Initial results (where + limit + offset):`, + JSON.stringify(initialResults, null, 2), + ) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes with includeInitialState: true (same as user) + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 2 (which is in the visible page) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // The callback should have been called with the delete event + console.log( + `All changes (where + limit + offset, includeInitialState: true):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + expect(changeCallback).toHaveBeenCalled() + + // Get the changes from all calls + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + + // Should have a delete for item 2 + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.length).toBeGreaterThan(0) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `2`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should emit delete and update when deleting from different page (page 1 delete while viewing page 2)`, async () => { + // This test captures the scenario from Marius's environment: + // - Viewing page 2 (items 3 and 4 with offset=2, limit=2) + // - Delete an item from page 1 (item 2) + // - The live query should update because items shift: + // - After delete, sorted order is: 1, 3, 4, 5 + // - Page 2 (offset=2, limit=2) should now show items 4 and 5 + // - So: item 3 should be deleted from result, item 5 should be inserted + + const pageSize = 2 + const pageIndex = 1 // page 2 (0-indexed) + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - page 2 should show items 3 and 4 (offset 2, limit 2) + // Sorted by value desc: 1 (100), 2 (90), 3 (80), 4 (70), 5 (60) + // Page 2 = offset 2 = items 3 and 4 + const initialResults = Array.from(liveQueryCollection.values()) + console.log( + `Initial results (page 2):`, + JSON.stringify(initialResults, null, 2), + ) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`3`, `4`]) + + // Subscribe to changes with includeInitialState: true + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 2 (which is on page 1, NOT in current view) + console.log(`Deleting item 2 (on page 1, not visible on page 2)...`) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After deleting item 2: + // - Sorted order becomes: 1 (100), 3 (80), 4 (70), 5 (60) + // - Page 2 (offset 2, limit 2) should now show items 4 and 5 + // - So: item 3 should be deleted from result, item 5 should be inserted + + // Check the state after delete + const resultsAfterDelete = Array.from(liveQueryCollection.values()) + console.log( + `Results after delete:`, + JSON.stringify(resultsAfterDelete, null, 2), + ) + + // The live query collection should now show items 4 and 5 + expect(resultsAfterDelete).toHaveLength(2) + expect(resultsAfterDelete.map((r) => r.id)).toEqual([`4`, `5`]) + + // Check that we got the expected change events + console.log( + `All changes (page 2 after deleting from page 1):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + + // We should have received change events (delete for item 3, insert for item 5) + expect(changeCallback).toHaveBeenCalled() + + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + + // Should have a delete for item 3 (shifted out of page 2) + const deleteEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `delete`, + ) + expect(deleteEvents.some((e: ChangeMessage) => e.key === `3`)).toBe( + true, + ) + + // Should have an insert for item 5 (shifted into page 2) + const insertEvents = allChanges.filter( + (c: ChangeMessage) => c.type === `insert`, + ) + expect(insertEvents.some((e: ChangeMessage) => e.key === `5`)).toBe( + true, + ) + + subscription.unsubscribe() + }) + + it(`should NOT update when deleting item beyond TopK window (no-op case)`, async () => { + // Test scenario: delete an item that's AFTER the TopK window + // - Page 1: items 1 and 2 (offset=0, limit=2) + // - Delete item 5 (which is on page 3) + // - Page 1 should NOT change (items 1 and 2 are still there) + + const pageSize = 2 + const pageIndex = 0 // page 1 + + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(pageSize) + .offset(pageIndex * pageSize) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results - page 1 should show items 1 and 2 + const initialResults = Array.from(liveQueryCollection.values()) + console.log( + `Initial results (page 1 for no-op test):`, + JSON.stringify(initialResults, null, 2), + ) + expect(initialResults).toHaveLength(2) + expect(initialResults.map((r) => r.id)).toEqual([`1`, `2`]) + + // Subscribe to changes + const changeCallback = vi.fn() + const subscription = liveQueryCollection.subscribeChanges(changeCallback, { + includeInitialState: true, + }) + + // Wait for initial state to be sent + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Clear initial state calls + changeCallback.mockClear() + + // Delete item 5 (which is on page 3, beyond the TopK window) + console.log(`Deleting item 5 (on page 3, beyond TopK window)...`) + sourceCollection.delete(`5`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 50)) + + // After deleting item 5: + // - Sorted order becomes: 1 (100), 2 (90), 3 (80), 4 (70) + // - Page 1 (offset 0, limit 2) still shows items 1 and 2 + // - No change to page 1 + + // Check the state after delete + const resultsAfterDelete = Array.from(liveQueryCollection.values()) + console.log( + `Results after delete (no-op):`, + JSON.stringify(resultsAfterDelete, null, 2), + ) + + // The live query collection should still show items 1 and 2 + expect(resultsAfterDelete).toHaveLength(2) + expect(resultsAfterDelete.map((r) => r.id)).toEqual([`1`, `2`]) + + // Check that we did NOT receive any change events + console.log( + `Change events (should be empty):`, + JSON.stringify( + changeCallback.mock.calls.flatMap((call) => call[0]), + null, + 2, + ), + ) + + // No changes expected since item 5 is outside the window + const allChanges = changeCallback.mock.calls.flatMap((call) => call[0]) + expect(allChanges).toHaveLength(0) + + subscription.unsubscribe() + }) + + it(`should update state correctly after delete with limit`, async () => { + // Create a live query with orderBy and limit + const liveQueryCollection = createLiveQueryCollection((q) => + q + .from({ items: sourceCollection }) + .orderBy(({ items }) => items.value, `desc`) + .limit(3) + .select(({ items }) => ({ + id: items.id, + value: items.value, + name: items.name, + })), + ) + + // Wait for the live query collection to be ready + await liveQueryCollection.preload() + + // Check initial results + let results = Array.from(liveQueryCollection.values()) + expect(results.map((r) => r.id)).toEqual([`1`, `2`, `3`]) + + // Subscribe to changes + liveQueryCollection.subscribeChanges(() => {}, { + includeInitialState: false, + }) + + // Optimistically delete item 2 (which is in the visible top 3) + sourceCollection.delete(`2`) + + // Wait for microtasks to process + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Check that the state is updated + // Item 2 should be gone, and item 4 should move into the top 3 + results = Array.from(liveQueryCollection.values()) + expect(results.map((r) => r.id)).toEqual([`1`, `3`, `4`]) + }) +})