Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion packages/db-ivm/src/d2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,49 @@ export class D2 implements ID2 {
if (!this.#finalized) {
throw new Error(`Graph not finalized`)
}
console.debug(
`[TanStack-DB-DEBUG] D2.step: running ${this.#operators.length} operators`,
{
operators: this.#operators.map((op) => ({
id: op.id,
type: op.operatorType,
})),
},
)
for (const op of this.#operators) {
const hadWork = op.hasPendingWork()
if (hadWork) {
console.debug(
`[TanStack-DB-DEBUG] D2.step: operator ${op.id} (${op.operatorType}) has pending work, running`,
)
}
op.run()
}
}

pendingWork(): boolean {
return this.#operators.some((op) => op.hasPendingWork())
const operatorsWithWork = this.#operators.filter((op) =>
op.hasPendingWork(),
)
const hasPending = operatorsWithWork.length > 0
console.debug(`[TanStack-DB-DEBUG] D2.pendingWork:`, {
hasPending,
operatorsWithWork: operatorsWithWork.map((op) => op.id),
})
return hasPending
}

run(): void {
console.debug(`[TanStack-DB-DEBUG] D2.run: starting`)
let stepCount = 0
while (this.pendingWork()) {
stepCount++
console.debug(`[TanStack-DB-DEBUG] D2.run: step ${stepCount}`)
this.step()
}
console.debug(
`[TanStack-DB-DEBUG] D2.run: complete after ${stepCount} steps`,
)
}
}

Expand Down
34 changes: 32 additions & 2 deletions packages/db-ivm/src/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class DifferenceStreamWriter<T> implements IDifferenceStreamWriter<T> {
export abstract class Operator<T> implements IOperator<T> {
protected inputs: Array<DifferenceStreamReader<T>>
protected output: DifferenceStreamWriter<T>
public operatorType = 'unknown'

constructor(
public id: number,
Expand All @@ -65,6 +66,8 @@ export abstract class Operator<T> implements IOperator<T> {
) {
this.inputs = inputs
this.output = output
// Set operator type from constructor name
this.operatorType = this.constructor.name
}

abstract run(): void
Expand Down Expand Up @@ -124,8 +127,35 @@ export abstract class LinearUnaryOperator<T, U> extends UnaryOperator<T | U> {
abstract inner(collection: MultiSet<T | U>): MultiSet<U>

run(): void {
for (const message of this.inputMessages()) {
this.output.sendData(this.inner(message))
const messages = this.inputMessages()
console.debug(
`[TanStack-DB-DEBUG] LinearUnaryOperator.run (${this.operatorType})`,
{
operatorId: this.id,
messageCount: messages.length,
},
)
for (const message of messages) {
const inputItems = message.getInner()
const result = this.inner(message)
const outputItems = result.getInner()
console.debug(
`[TanStack-DB-DEBUG] LinearUnaryOperator.inner (${this.operatorType})`,
{
operatorId: this.id,
inputItemCount: inputItems.length,
outputItemCount: outputItems.length,
inputSample: inputItems.slice(0, 3).map(([item, mult]) => ({
item: typeof item === 'object' ? JSON.stringify(item) : item,
multiplicity: mult,
})),
outputSample: outputItems.slice(0, 3).map(([item, mult]) => ({
item: typeof item === 'object' ? JSON.stringify(item) : item,
multiplicity: mult,
})),
},
)
this.output.sendData(result)
}
}
}
16 changes: 15 additions & 1 deletion packages/db-ivm/src/operators/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,21 @@ export class OutputOperator<T> extends UnaryOperator<T> {
}

run(): void {
for (const message of this.inputMessages()) {
const messages = this.inputMessages()
console.debug(`[TanStack-DB-DEBUG] OutputOperator: run called`, {
operatorId: this.id,
messageCount: messages.length,
})
for (const message of messages) {
const items = message.getInner()
console.debug(`[TanStack-DB-DEBUG] OutputOperator: processing message`, {
operatorId: this.id,
itemCount: items.length,
items: items.slice(0, 5).map(([item, mult]) => ({
item: typeof item === 'object' ? JSON.stringify(item) : item,
multiplicity: mult,
})),
})
this.#fn(message)
this.output.sendData(message)
}
Expand Down
30 changes: 30 additions & 0 deletions packages/db-ivm/src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,38 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
for (const message of this.inputMessages()) {
for (const [item, multiplicity] of message.getInner()) {
const [key, value] = item
console.debug(`[TanStack-DB-DEBUG] ReduceOperator: adding to index`, {
operatorId: this.id,
key,
value: typeof value === 'object' ? JSON.stringify(value) : value,
multiplicity,
})
this.#index.addValue(key, [value, multiplicity])
keysTodo.add(key)
}
}

console.debug(`[TanStack-DB-DEBUG] ReduceOperator: processing keys`, {
operatorId: this.id,
keysCount: keysTodo.size,
keys: Array.from(keysTodo).slice(0, 10), // Limit to first 10 for readability
})

// For each key, compute the reduction and delta
const result: Array<[[K, V2], number]> = []
for (const key of keysTodo) {
const curr = this.#index.get(key)
const currOut = this.#indexOut.get(key)
const out = this.#f(curr)

console.debug(`[TanStack-DB-DEBUG] ReduceOperator: processing key`, {
operatorId: this.id,
key,
inputValuesCount: curr.length,
previousOutputCount: currOut.length,
newOutputCount: out.length,
})

// Create maps for current and previous outputs using values directly as keys
const newOutputMap = new Map<V2, number>()
const oldOutputMap = new Map<V2, number>()
Expand Down Expand Up @@ -91,6 +111,16 @@ export class ReduceOperator<K, V1, V2> extends UnaryOperator<[K, V1], [K, V2]> {
}
}

console.debug(`[TanStack-DB-DEBUG] ReduceOperator: run complete`, {
operatorId: this.id,
resultCount: result.length,
results: result.slice(0, 10).map(([[key, value], mult]) => ({
key,
value: typeof value === 'object' ? JSON.stringify(value) : value,
multiplicity: mult,
})),
})

if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}
Expand Down
21 changes: 20 additions & 1 deletion packages/db-ivm/src/operators/topK.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,26 @@ export function topK<
const sortedValues = consolidated
.getInner()
.sort((a, b) => comparator(a[0] as V1Type, b[0] as V1Type))
return sortedValues.slice(offset, offset + limit)
const result = sortedValues.slice(offset, offset + limit)

console.debug(`[TanStack-DB-DEBUG] topK: processing`, {
inputCount: values.length,
consolidatedCount: sortedValues.length,
offset,
limit,
resultCount: result.length,
// Show first few items for debugging
sortedFirst3: sortedValues.slice(0, 3).map(([v, m]) => ({
value: typeof v === 'object' ? JSON.stringify(v) : v,
multiplicity: m,
})),
resultItems: result.map(([v, m]) => ({
value: typeof v === 'object' ? JSON.stringify(v) : v,
multiplicity: m,
})),
})

return result
}),
)
return reduced as IStreamBuilder<T>
Expand Down
64 changes: 62 additions & 2 deletions packages/db-ivm/src/operators/topKWithFractionalIndex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,45 @@ export class TopKWithFractionalIndexOperator<
}

run(): void {
const messages = this.inputMessages()
console.debug(`[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.run`, {
operatorId: this.id,
messageCount: messages.length,
indexSize: this.#index.size,
})

const result: Array<[[K, IndexedValue<T>], number]> = []
for (const message of this.inputMessages()) {
for (const [item, multiplicity] of message.getInner()) {
for (const message of messages) {
const items = message.getInner()
console.debug(
`[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator: processing message`,
{
operatorId: this.id,
itemCount: items.length,
items: items
.slice(0, 5)
.map(([[key], mult]) => ({ key, multiplicity: mult })),
},
)
for (const [item, multiplicity] of items) {
const [key, value] = item
this.processElement(key, value, multiplicity, result)
}
}

console.debug(
`[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.run complete`,
{
operatorId: this.id,
resultCount: result.length,
results: result.slice(0, 5).map(([[key, [, index]], mult]) => ({
key,
index,
multiplicity: mult,
})),
},
)

if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}
Expand All @@ -323,6 +354,23 @@ export class TopKWithFractionalIndexOperator<
): void {
const { oldMultiplicity, newMultiplicity } = this.addKey(key, multiplicity)

console.debug(
`[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.processElement`,
{
operatorId: this.id,
key,
multiplicity,
oldMultiplicity,
newMultiplicity,
action:
oldMultiplicity <= 0 && newMultiplicity > 0
? 'INSERT'
: oldMultiplicity > 0 && newMultiplicity <= 0
? 'DELETE'
: 'NO_CHANGE',
},
)

let res: TopKChanges<[K, T]> = {
moveIn: null,
moveOut: null,
Expand All @@ -341,6 +389,18 @@ export class TopKWithFractionalIndexOperator<
// so it doesn't affect the topK
}

console.debug(
`[TanStack-DB-DEBUG] TopKWithFractionalIndexOperator.processElement result`,
{
operatorId: this.id,
key,
hasMoveIn: res.moveIn !== null,
hasMoveOut: res.moveOut !== null,
moveInKey: res.moveIn ? res.moveIn[0][0] : null,
moveOutKey: res.moveOut ? res.moveOut[0][0] : null,
},
)

this.handleMoveIn(res.moveIn, result)
this.handleMoveOut(res.moveOut, result)

Expand Down
39 changes: 38 additions & 1 deletion packages/db/src/collection/change-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,25 @@ export function createFilteredCallback<T extends object>(
const filterFn = createFilterFunctionFromExpression(options.whereExpression!)

return (changes: Array<ChangeMessage<T>>) => {
console.debug(
`[TanStack-DB-DEBUG] createFilteredCallback: filtering changes by whereExpression`,
{
incomingChanges: changes.map((c) => ({ type: c.type, key: c.key })),
},
)

const filteredChanges: Array<ChangeMessage<T>> = []

for (const change of changes) {
if (change.type === `insert`) {
// For inserts, check if the new value matches the filter
if (filterFn(change.value)) {
filteredChanges.push(change)
} else {
console.debug(
`[TanStack-DB-DEBUG] FILTERING OUT insert by whereExpression`,
{ key: change.key },
)
}
} else if (change.type === `update`) {
// For updates, we need to check both old and new values
Expand All @@ -279,21 +291,46 @@ export function createFilteredCallback<T extends object>(
type: `delete`,
value: change.previousValue!, // Use the previous value for the delete
})
} else {
// If neither matches, don't emit anything
console.debug(
`[TanStack-DB-DEBUG] FILTERING OUT update by whereExpression (neither old nor new matches)`,
{ key: change.key },
)
}
// If neither matches, don't emit anything
} else {
// For deletes, include if the previous value would have matched
// (so subscribers know something they were tracking was deleted)
if (filterFn(change.value)) {
filteredChanges.push(change)
} else {
console.debug(
`[TanStack-DB-DEBUG] FILTERING OUT delete by whereExpression`,
{ key: change.key },
)
}
}
}

// Always call the original callback if we have filtered changes OR
// if the original changes array was empty (which indicates a ready signal)
if (filteredChanges.length > 0 || changes.length === 0) {
console.debug(
`[TanStack-DB-DEBUG] createFilteredCallback: calling originalCallback`,
{
filteredChangesCount: filteredChanges.length,
filteredChanges: filteredChanges.map((c) => ({
type: c.type,
key: c.key,
})),
},
)
originalCallback(filteredChanges)
} else {
console.debug(
`[TanStack-DB-DEBUG] createFilteredCallback: NOT calling callback - all changes filtered out`,
{ originalChangesCount: changes.length },
)
}
}
}
Expand Down
Loading
Loading