-
Notifications
You must be signed in to change notification settings - Fork 179
Expand file tree
/
Copy pathgroupedTopKWithFractionalIndex.ts
More file actions
192 lines (173 loc) · 6.26 KB
/
groupedTopKWithFractionalIndex.ts
File metadata and controls
192 lines (173 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import { DifferenceStreamWriter, UnaryOperator } from '../graph.js'
import { StreamBuilder } from '../d2.js'
import { MultiSet } from '../multiset.js'
import { TopKState, handleMoveIn, handleMoveOut } from './topKState.js'
import { TopKArray, createKeyedComparator } from './topKArray.js'
import type { IndexedValue, TopK } from './topKArray.js'
import type { DifferenceStreamReader } from '../graph.js'
import type { IStreamBuilder, PipedOperator } from '../types.js'
export interface GroupedTopKWithFractionalIndexOptions<K, T> {
limit?: number
offset?: number
setSizeCallback?: (getSize: () => number) => void
setWindowFn?: (
windowFn: (options: { offset?: number; limit?: number }) => void,
) => void
/**
* Function to extract a group key from the element's key and value.
* Elements with the same group key will be sorted and limited together.
*/
groupKeyFn: (key: K, value: T) => unknown
}
/**
* Operator for grouped fractional indexed topK operations.
* This operator maintains separate topK windows for each group,
* allowing per-group limits and ordering.
*
* The input is a keyed stream [K, T] and outputs [K, IndexedValue<T>].
* Elements are grouped by the groupKeyFn, and each group maintains
* its own sorted collection with independent limit/offset.
*/
export class GroupedTopKWithFractionalIndexOperator<
K extends string | number,
T,
> extends UnaryOperator<[K, T], [K, IndexedValue<T>]> {
#groupStates: Map<unknown, TopKState<K, T>> = new Map()
#groupKeyFn: (key: K, value: T) => unknown
#comparator: (a: [K, T], b: [K, T]) => number
#offset: number
#limit: number
constructor(
id: number,
inputA: DifferenceStreamReader<[K, T]>,
output: DifferenceStreamWriter<[K, IndexedValue<T>]>,
comparator: (a: T, b: T) => number,
options: GroupedTopKWithFractionalIndexOptions<K, T>,
) {
super(id, inputA, output)
this.#groupKeyFn = options.groupKeyFn
this.#limit = options.limit ?? Infinity
this.#offset = options.offset ?? 0
this.#comparator = createKeyedComparator(comparator)
options.setSizeCallback?.(() => this.#getTotalSize())
options.setWindowFn?.(this.#moveTopK.bind(this))
}
/**
* Creates a new TopK data structure for a group.
* Can be overridden in subclasses to use different implementations (e.g., B+ tree).
*/
protected createTopK(
offset: number,
limit: number,
comparator: (a: [K, T], b: [K, T]) => number,
): TopK<[K, T]> {
return new TopKArray(offset, limit, comparator)
}
#getTotalSize(): number {
let size = 0
for (const state of this.#groupStates.values()) {
size += state.size
}
return size
}
#getOrCreateGroupState(groupKey: unknown): TopKState<K, T> {
let state = this.#groupStates.get(groupKey)
if (!state) {
const topK = this.createTopK(this.#offset, this.#limit, this.#comparator)
state = new TopKState(topK)
this.#groupStates.set(groupKey, state)
}
return state
}
#cleanupGroupIfEmpty(groupKey: unknown, state: TopKState<K, T>): void {
if (state.isEmpty) {
this.#groupStates.delete(groupKey)
}
}
/**
* Moves the topK window for all groups based on the provided offset and limit.
* Any changes to the topK are sent to the output.
*/
#moveTopK({ offset, limit }: { offset?: number; limit?: number }): void {
if (offset !== undefined) {
this.#offset = offset
}
if (limit !== undefined) {
this.#limit = limit
}
const result: Array<[[K, IndexedValue<T>], number]> = []
let hasChanges = false
for (const state of this.#groupStates.values()) {
const diff = state.move({ offset: this.#offset, limit: this.#limit }) // TODO: think we should just pass offset and limit
diff.moveIns.forEach((moveIn) => handleMoveIn(moveIn, result))
diff.moveOuts.forEach((moveOut) => handleMoveOut(moveOut, result))
if (diff.changes) {
hasChanges = true
}
}
if (hasChanges) {
this.output.sendData(new MultiSet(result))
}
}
run(): void {
const result: Array<[[K, IndexedValue<T>], number]> = []
for (const message of this.inputMessages()) {
for (const [item, multiplicity] of message.getInner()) {
const [key, value] = item
this.#processElement(key, value, multiplicity, result)
}
}
if (result.length > 0) {
this.output.sendData(new MultiSet(result))
}
}
#processElement(
key: K,
value: T,
multiplicity: number,
result: Array<[[K, IndexedValue<T>], number]>,
): void {
const groupKey = this.#groupKeyFn(key, value)
const state = this.#getOrCreateGroupState(groupKey)
const changes = state.processElement(key, value, multiplicity)
handleMoveIn(changes.moveIn, result)
handleMoveOut(changes.moveOut, result)
// Cleanup empty groups to prevent memory leaks
this.#cleanupGroupIfEmpty(groupKey, state)
}
}
/**
* Limits the number of results per group based on a comparator, with optional offset.
* Uses fractional indexing to minimize the number of changes when elements move positions.
* Each element is assigned a fractional index that is lexicographically sortable.
* When elements move, only the indices of the moved elements are updated, not all elements.
*
* This operator groups elements by the provided groupKeyFn and applies the limit/offset
* independently to each group.
*
* @param comparator - A function that compares two elements for ordering
* @param options - Configuration including groupKeyFn, limit, and offset
* @returns A piped operator that orders elements per group and limits results per group
*/
export function groupedTopKWithFractionalIndex<K extends string | number, T>(
comparator: (a: T, b: T) => number,
options: GroupedTopKWithFractionalIndexOptions<K, T>,
): PipedOperator<[K, T], [K, IndexedValue<T>]> {
return (
stream: IStreamBuilder<[K, T]>,
): IStreamBuilder<[K, IndexedValue<T>]> => {
const output = new StreamBuilder<[K, IndexedValue<T>]>(
stream.graph,
new DifferenceStreamWriter<[K, IndexedValue<T>]>(),
)
const operator = new GroupedTopKWithFractionalIndexOperator<K, T>(
stream.graph.getNextOperatorId(),
stream.connectReader(),
output.writer,
comparator,
options,
)
stream.graph.addOperator(operator)
return output
}
}