Skip to content

Commit 1b9d7ea

Browse files
committed
Refactoring of the topK operator and grouped topK operator
1 parent 6193beb commit 1b9d7ea

File tree

4 files changed

+368
-392
lines changed

4 files changed

+368
-392
lines changed

packages/db-ivm/src/operators/groupedTopKWithFractionalIndex.ts

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'
22
import { StreamBuilder } from '../d2.js'
33
import { MultiSet } from '../multiset.js'
4-
import {
5-
TopKArray,
6-
TopKState,
7-
createKeyedComparator,
8-
} from './topKWithFractionalIndex.js'
4+
import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js'
5+
import { TopKArray, createKeyedComparator } from './topKArray.js'
6+
import type { IndexedValue, TopK} from './topKArray.js';
97
import type { DifferenceStreamReader } from '../graph.js'
108
import type { IStreamBuilder, PipedOperator } from '../types.js'
11-
import type { IndexedValue, TopK } from './topKWithFractionalIndex.js'
129

1310
export interface GroupedTopKWithFractionalIndexOptions<K, T> {
1411
limit?: number
@@ -111,7 +108,7 @@ export class GroupedTopKWithFractionalIndexOperator<
111108
let hasChanges = false
112109

113110
for (const state of this.#groupStates.values()) {
114-
const diff = state.move({ offset: this.#offset, limit: this.#limit })
111+
const diff = state.move({ offset: this.#offset, limit: this.#limit }) // TODO: think we should just pass offset and limit
115112

116113
diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result))
117114
diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result))
@@ -158,32 +155,6 @@ export class GroupedTopKWithFractionalIndexOperator<
158155
}
159156
}
160157

161-
/**
162-
* Handles a moveIn change by adding it to the result array.
163-
*/
164-
function handleMoveIn<K extends string | number, T>(
165-
moveIn: IndexedValue<[K, T]> | null,
166-
result: Array<[[K, IndexedValue<T>], number]>,
167-
): void {
168-
if (moveIn) {
169-
const [[key, value], index] = moveIn
170-
result.push([[key, [value, index]], 1])
171-
}
172-
}
173-
174-
/**
175-
* Handles a moveOut change by adding it to the result array.
176-
*/
177-
function handleMoveOut<K extends string | number, T>(
178-
moveOut: IndexedValue<[K, T]> | null,
179-
result: Array<[[K, IndexedValue<T>], number]>,
180-
): void {
181-
if (moveOut) {
182-
const [[key, value], index] = moveOut
183-
result.push([[key, [value, index]], -1])
184-
}
185-
}
186-
187158
/**
188159
* Limits the number of results per group based on a comparator, with optional offset.
189160
* Uses fractional indexing to minimize the number of changes when elements move positions.
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
import { generateKeyBetween } from 'fractional-indexing'
2+
import { binarySearch, compareKeys, diffHalfOpen } from '../utils.js'
3+
import type { HRange } from '../utils.js'
4+
5+
// Abstraction for fractionally indexed values
6+
export type FractionalIndex = string
7+
export type IndexedValue<V> = [V, FractionalIndex]
8+
9+
export function indexedValue<V>(
10+
value: V,
11+
index: FractionalIndex,
12+
): IndexedValue<V> {
13+
return [value, index]
14+
}
15+
16+
export function getValue<V>(indexedVal: IndexedValue<V>): V {
17+
return indexedVal[0]
18+
}
19+
20+
export function getIndex<V>(indexedVal: IndexedValue<V>): FractionalIndex {
21+
return indexedVal[1]
22+
}
23+
24+
/**
25+
* Creates a comparator for [key, value] tuples that first compares values,
26+
* then uses the row key as a stable tie-breaker.
27+
*/
28+
export function createKeyedComparator<K extends string | number, T>(
29+
comparator: (a: T, b: T) => number,
30+
): (a: [K, T], b: [K, T]) => number {
31+
return ([aKey, aVal], [bKey, bVal]) => {
32+
// First compare on the value
33+
const valueComparison = comparator(aVal, bVal)
34+
if (valueComparison !== 0) {
35+
return valueComparison
36+
}
37+
// If the values are equal, use the row key as tie-breaker
38+
// This provides stable, deterministic ordering since keys are string | number
39+
return compareKeys(aKey, bKey)
40+
}
41+
}
42+
43+
export type TopKChanges<V> = {
44+
/** Indicates which element moves into the topK (if any) */
45+
moveIn: IndexedValue<V> | null
46+
/** Indicates which element moves out of the topK (if any) */
47+
moveOut: IndexedValue<V> | null
48+
}
49+
50+
export type TopKMoveChanges<V> = {
51+
/** Flag that marks whether there were any changes to the topK */
52+
changes: boolean
53+
/** Indicates which elements move into the topK (if any) */
54+
moveIns: Array<IndexedValue<V>>
55+
/** Indicates which elements move out of the topK (if any) */
56+
moveOuts: Array<IndexedValue<V>>
57+
}
58+
59+
/**
60+
* A topK data structure that supports insertions and deletions
61+
* and returns changes to the topK.
62+
*/
63+
export interface TopK<V> {
64+
size: number
65+
insert: (value: V) => TopKChanges<V>
66+
delete: (value: V) => TopKChanges<V>
67+
}
68+
69+
/**
70+
* Implementation of a topK data structure.
71+
* Uses a sorted array internally to store the values and keeps a topK window over that array.
72+
* Inserts and deletes are O(n) operations because worst case an element is inserted/deleted
73+
* at the start of the array which causes all the elements to shift to the right/left.
74+
*/
75+
export class TopKArray<V> implements TopK<V> {
76+
#sortedValues: Array<IndexedValue<V>> = []
77+
#comparator: (a: V, b: V) => number
78+
#topKStart: number
79+
#topKEnd: number
80+
81+
constructor(
82+
offset: number,
83+
limit: number,
84+
comparator: (a: V, b: V) => number,
85+
) {
86+
this.#topKStart = offset
87+
this.#topKEnd = offset + limit
88+
this.#comparator = comparator
89+
}
90+
91+
get size(): number {
92+
const offset = this.#topKStart
93+
const limit = this.#topKEnd - this.#topKStart
94+
const available = this.#sortedValues.length - offset
95+
return Math.max(0, Math.min(limit, available))
96+
}
97+
98+
/**
99+
* Moves the topK window
100+
*/
101+
move({
102+
offset,
103+
limit,
104+
}: {
105+
offset?: number
106+
limit?: number
107+
}): TopKMoveChanges<V> {
108+
const oldOffset = this.#topKStart
109+
const oldLimit = this.#topKEnd - this.#topKStart
110+
111+
// `this.#topKEnd` can be `Infinity` if it has no limit
112+
// but `diffHalfOpen` expects a finite range
113+
// so we restrict it to the size of the topK if topKEnd is infinite
114+
const oldRange: HRange = [
115+
this.#topKStart,
116+
this.#topKEnd === Infinity ? this.#topKStart + this.size : this.#topKEnd,
117+
]
118+
119+
this.#topKStart = offset ?? oldOffset
120+
this.#topKEnd = this.#topKStart + (limit ?? oldLimit) // can be `Infinity` if limit is `Infinity`
121+
122+
// Also handle `Infinity` in the newRange
123+
const newRange: HRange = [
124+
this.#topKStart,
125+
this.#topKEnd === Infinity
126+
? Math.max(this.#topKStart + this.size, oldRange[1]) // since the new limit is Infinity we need to take everything (so we need to take the biggest (finite) topKEnd)
127+
: this.#topKEnd,
128+
]
129+
const { onlyInA, onlyInB } = diffHalfOpen(oldRange, newRange)
130+
131+
const moveIns: Array<IndexedValue<V>> = []
132+
onlyInB.forEach((index) => {
133+
const value = this.#sortedValues[index]
134+
if (value) {
135+
moveIns.push(value)
136+
}
137+
})
138+
139+
const moveOuts: Array<IndexedValue<V>> = []
140+
onlyInA.forEach((index) => {
141+
const value = this.#sortedValues[index]
142+
if (value) {
143+
moveOuts.push(value)
144+
}
145+
})
146+
147+
// It could be that there are changes (i.e. moveIns or moveOuts)
148+
// but that the collection is lazy so we don't have the data yet that needs to move in/out
149+
// so `moveIns` and `moveOuts` will be empty but `changes` will be true
150+
// this will tell the caller that it needs to run the graph to load more data
151+
return { moveIns, moveOuts, changes: onlyInA.length + onlyInB.length > 0 }
152+
}
153+
154+
insert(value: V): TopKChanges<V> {
155+
const result: TopKChanges<V> = { moveIn: null, moveOut: null }
156+
157+
// Lookup insert position
158+
const index = this.#findIndex(value)
159+
// Generate fractional index based on the fractional indices of the elements before and after it
160+
const indexBefore =
161+
index === 0 ? null : getIndex(this.#sortedValues[index - 1]!)
162+
const indexAfter =
163+
index === this.#sortedValues.length
164+
? null
165+
: getIndex(this.#sortedValues[index]!)
166+
const fractionalIndex = generateKeyBetween(indexBefore, indexAfter)
167+
168+
// Insert the value at the correct position
169+
const val = indexedValue(value, fractionalIndex)
170+
// Splice is O(n) where n = all elements in the collection (i.e. n >= k) !
171+
this.#sortedValues.splice(index, 0, val)
172+
173+
// Check if the topK changed
174+
if (index < this.#topKEnd) {
175+
// The inserted element is either before the top K or within the top K
176+
// If it is before the top K then it moves the element that was right before the topK into the topK
177+
// If it is within the top K then the inserted element moves into the top K
178+
// In both cases the last element of the old top K now moves out of the top K
179+
const moveInIndex = Math.max(index, this.#topKStart)
180+
if (moveInIndex < this.#sortedValues.length) {
181+
// We actually have a topK
182+
// because in some cases there may not be enough elements in the array to reach the start of the topK
183+
// e.g. [1, 2, 3] with K = 2 and offset = 3 does not have a topK
184+
result.moveIn = this.#sortedValues[moveInIndex]!
185+
186+
// We need to remove the element that falls out of the top K
187+
// The element that falls out of the top K has shifted one to the right
188+
// because of the element we inserted, so we find it at index topKEnd
189+
if (this.#topKEnd < this.#sortedValues.length) {
190+
result.moveOut = this.#sortedValues[this.#topKEnd]!
191+
}
192+
}
193+
}
194+
195+
return result
196+
}
197+
198+
/**
199+
* Deletes a value that may or may not be in the topK.
200+
* IMPORTANT: this assumes that the value is present in the collection
201+
* if it's not the case it will remove the element
202+
* that is on the position where the provided `value` would be.
203+
*/
204+
delete(value: V): TopKChanges<V> {
205+
const result: TopKChanges<V> = { moveIn: null, moveOut: null }
206+
207+
// Lookup delete position
208+
const index = this.#findIndex(value)
209+
// Remove the value at that position
210+
const [removedElem] = this.#sortedValues.splice(index, 1)
211+
212+
// Check if the topK changed
213+
if (index < this.#topKEnd) {
214+
// The removed element is either before the top K or within the top K
215+
// If it is before the top K then the first element of the topK moves out of the topK
216+
// If it is within the top K then the removed element moves out of the topK
217+
result.moveOut = removedElem!
218+
if (index < this.#topKStart) {
219+
// The removed element is before the topK
220+
// so actually, the first element of the topK moves out of the topK
221+
// and not the element that we removed
222+
// The first element of the topK is now at index topKStart - 1
223+
// since we removed an element before the topK
224+
const moveOutIndex = this.#topKStart - 1
225+
if (moveOutIndex < this.#sortedValues.length) {
226+
result.moveOut = this.#sortedValues[moveOutIndex]!
227+
} else {
228+
// No value is moving out of the topK
229+
// because there are no elements in the topK
230+
result.moveOut = null
231+
}
232+
}
233+
234+
// Since we removed an element that was before or in the topK
235+
// the first element after the topK moved one position to the left
236+
// and thus falls into the topK now
237+
const moveInIndex = this.#topKEnd - 1
238+
if (moveInIndex < this.#sortedValues.length) {
239+
result.moveIn = this.#sortedValues[moveInIndex]!
240+
}
241+
}
242+
243+
return result
244+
}
245+
246+
// TODO: see if there is a way to refactor the code for insert and delete in the topK above
247+
// because they are very similar, one is shifting the topK window to the left and the other is shifting it to the right
248+
// so i have the feeling there is a common pattern here and we can implement both cases using that pattern
249+
250+
#findIndex(value: V): number {
251+
return binarySearch(this.#sortedValues, indexedValue(value, ``), (a, b) =>
252+
this.#comparator(getValue(a), getValue(b)),
253+
)
254+
}
255+
}

0 commit comments

Comments
 (0)