From 351ebca8346234d47218dcd81744bd91e89bfd69 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Wed, 20 Aug 2025 22:43:34 +0200 Subject: [PATCH 01/10] add: list aggregate --- packages/db-ivm/src/operators/groupBy.ts | 21 +++++++ .../db-ivm/tests/operators/groupBy.test.ts | 61 +++++++++++++++++++ packages/db/src/query/builder/functions.ts | 5 ++ packages/db/src/query/compiler/group-by.ts | 7 ++- packages/db/src/query/index.ts | 1 + packages/db/tests/query/group-by.test.ts | 3 + 6 files changed, 96 insertions(+), 2 deletions(-) diff --git a/packages/db-ivm/src/operators/groupBy.ts b/packages/db-ivm/src/operators/groupBy.ts index 344c4b1c..aa2226bc 100644 --- a/packages/db-ivm/src/operators/groupBy.ts +++ b/packages/db-ivm/src/operators/groupBy.ts @@ -343,6 +343,26 @@ export function mode( } } +/** + * Creates a string aggregate function + */ +export function list( + valueExtractor: (value: T) => any = (v) => v as unknown as any +): AggregateFunction { + return { + preMap: (data: T) => valueExtractor(data), + reduce: (values: Array<[string | number, number]>) => { + const total = [] + + for (const [value, _multiplicity] of values) { + total.push(value) + } + + return total + }, + } +} + export const groupByOperators = { sum, count, @@ -351,4 +371,5 @@ export const groupByOperators = { max, median, mode, + list, } diff --git a/packages/db-ivm/tests/operators/groupBy.test.ts b/packages/db-ivm/tests/operators/groupBy.test.ts index 5f20b8d0..5461f915 100644 --- a/packages/db-ivm/tests/operators/groupBy.test.ts +++ b/packages/db-ivm/tests/operators/groupBy.test.ts @@ -5,6 +5,7 @@ import { avg, count, groupBy, + list, max, median, min, @@ -131,6 +132,66 @@ describe(`Operators`, () => { expect(result).toEqual(expectedResult) }) + test(`with single stringAgg aggregate`, () => { + const graph = new D2() + const input = graph.newInput<{ + productId: number + category: string + }>() + let latestMessage: any = null + + input.pipe( + groupBy((data) => ({ productId: data.productId }), { + categories: list((data) => data.category), + }), + output((message) => { + latestMessage = message + }) + ) + + graph.finalize() + + // Initial data + input.sendData( + new MultiSet([ + [{ category: `A`, productId: 1 }, 1], + [{ category: `B`, productId: 1 }, 1], + [{ category: `A`, productId: 2 }, 1], + ]) + ) + graph.run() + + // Verify we have the latest message + expect(latestMessage).not.toBeNull() + + const result = latestMessage.getInner() + + const expectedResult = [ + [ + [ + `{"productId":1}`, + { + productId: 1, + categories: [`A`, `B`], + }, + ], + 1, + ], + [ + [ + `{"productId":2}`, + { + productId: 2, + categories: [`A`], + }, + ], + 1, + ], + ] + + expect(result).toEqual(expectedResult) + }) + test(`with sum and count aggregates`, () => { const graph = new D2() const input = graph.newInput<{ diff --git a/packages/db/src/query/builder/functions.ts b/packages/db/src/query/builder/functions.ts index b5902bd6..dd24edb1 100644 --- a/packages/db/src/query/builder/functions.ts +++ b/packages/db/src/query/builder/functions.ts @@ -1,5 +1,6 @@ import { Aggregate, Func } from "../ir" import { toExpression } from "./ref-proxy.js" +import type { RefProxyFor } from "./types" import type { BasicExpression } from "../ir" import type { RefProxy } from "./ref-proxy.js" @@ -266,6 +267,10 @@ export function max( return new Aggregate(`max`, [toExpression(arg)]) } +export function list(arg: RefProxy | RefProxyFor): Aggregate { + return new Aggregate(`list`, [toExpression(arg)]) +} + /** * List of comparison function names that can be used with indexes */ diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 7ce4adcf..f1192760 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -16,7 +16,7 @@ import type { } from "../ir.js" import type { NamespacedAndKeyedStream, NamespacedRow } from "../../types.js" -const { sum, count, avg, min, max } = groupByOperators +const { sum, count, avg, min, max, list } = groupByOperators /** * Interface for caching the mapping between GROUP BY expressions and SELECT expressions @@ -342,7 +342,7 @@ function getAggregateFunction(aggExpr: Aggregate) { const valueExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { const value = compiledExpr(namespacedRow) // Ensure we return a number for numeric aggregate functions - return typeof value === `number` ? value : value != null ? Number(value) : 0 + return value } // Return the appropriate aggregate function @@ -357,7 +357,10 @@ function getAggregateFunction(aggExpr: Aggregate) { return min(valueExtractor) case `max`: return max(valueExtractor) + case `list`: + return list(valueExtractor) default: + console.log(aggExpr) throw new UnsupportedAggregateFunctionError(aggExpr.name) } } diff --git a/packages/db/src/query/index.ts b/packages/db/src/query/index.ts index 20043a04..6ce9b867 100644 --- a/packages/db/src/query/index.ts +++ b/packages/db/src/query/index.ts @@ -38,6 +38,7 @@ export { sum, min, max, + list, } from "./builder/functions.js" // Ref proxy utilities diff --git a/packages/db/tests/query/group-by.test.ts b/packages/db/tests/query/group-by.test.ts index 684eb472..6dec248a 100644 --- a/packages/db/tests/query/group-by.test.ts +++ b/packages/db/tests/query/group-by.test.ts @@ -9,6 +9,7 @@ import { eq, gt, gte, + list, lt, max, min, @@ -228,6 +229,7 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { .groupBy(({ orders }) => orders.product_category) .select(({ orders }) => ({ product_category: orders.product_category, + product_ids: list(orders), total_quantity: sum(orders.quantity), order_count: count(orders.id), total_amount: sum(orders.amount), @@ -238,6 +240,7 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { // Electronics: orders 1, 2, 4, 6 (quantities: 2, 1, 1, 1) const electronics = categorySummary.get(`electronics`) + console.log(electronics) expect(electronics?.product_category).toBe(`electronics`) expect(electronics?.total_quantity).toBe(5) expect(electronics?.order_count).toBe(4) From d77176cd0cfe6d7cca6e073b02446554ea8f59e8 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Thu, 21 Aug 2025 17:08:47 +0200 Subject: [PATCH 02/10] fix: package name for npm publish --- packages/db-ivm/package.json | 2 +- packages/db/package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/db-ivm/package.json b/packages/db-ivm/package.json index 0f9734bc..772c436b 100644 --- a/packages/db-ivm/package.json +++ b/packages/db-ivm/package.json @@ -1,5 +1,5 @@ { - "name": "@tanstack/db-ivm", + "name": "@uzini/db-ivm", "description": "Incremental View Maintenance for TanStack DB based on Differential Dataflow", "version": "0.1.2", "dependencies": { diff --git a/packages/db/package.json b/packages/db/package.json index 4466018b..6664afbc 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -1,5 +1,5 @@ { - "name": "@tanstack/db", + "name": "@uzini/db", "description": "A reactive client store for building super fast apps on sync", "version": "0.1.5", "dependencies": { From f720b2a538a988b4cf77a7159fa6dea70b4b7f71 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Thu, 21 Aug 2025 17:13:06 +0200 Subject: [PATCH 03/10] fix: package private for npm publish --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index af8cf608..ac91069b 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tanstack/db-monorepo", - "private": true, + "private": false, "description": "Reactive client queryable store for sync-first applications", "version": "0.0.0", "repository": { From 4df196dad17a72ede2b3363c95da0a5ddbdd2a71 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Thu, 21 Aug 2025 17:37:41 +0200 Subject: [PATCH 04/10] back to normal --- package.json | 2 +- packages/db-ivm/package.json | 2 +- packages/db/package.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index ac91069b..af8cf608 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tanstack/db-monorepo", - "private": false, + "private": true, "description": "Reactive client queryable store for sync-first applications", "version": "0.0.0", "repository": { diff --git a/packages/db-ivm/package.json b/packages/db-ivm/package.json index 772c436b..0f9734bc 100644 --- a/packages/db-ivm/package.json +++ b/packages/db-ivm/package.json @@ -1,5 +1,5 @@ { - "name": "@uzini/db-ivm", + "name": "@tanstack/db-ivm", "description": "Incremental View Maintenance for TanStack DB based on Differential Dataflow", "version": "0.1.2", "dependencies": { diff --git a/packages/db/package.json b/packages/db/package.json index 6664afbc..4466018b 100644 --- a/packages/db/package.json +++ b/packages/db/package.json @@ -1,5 +1,5 @@ { - "name": "@uzini/db", + "name": "@tanstack/db", "description": "A reactive client store for building super fast apps on sync", "version": "0.1.5", "dependencies": { From 54b4adc4e9f57570a1d1d188250a1698e0242613 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Fri, 22 Aug 2025 20:08:59 +0200 Subject: [PATCH 05/10] fix: list aggregate type --- packages/db-ivm/src/operators/groupBy.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/db-ivm/src/operators/groupBy.ts b/packages/db-ivm/src/operators/groupBy.ts index aa2226bc..4a249d53 100644 --- a/packages/db-ivm/src/operators/groupBy.ts +++ b/packages/db-ivm/src/operators/groupBy.ts @@ -346,19 +346,22 @@ export function mode( /** * Creates a string aggregate function */ -export function list( - valueExtractor: (value: T) => any = (v) => v as unknown as any -): AggregateFunction { +export function list( + valueExtractor: (value: T) => V = (v) => v as unknown as V +): AggregateFunction, V> { return { - preMap: (data: T) => valueExtractor(data), - reduce: (values: Array<[string | number, number]>) => { + preMap: (data: T) => valueExtractor(data) as unknown as V, + reduce: (values) => { const total = [] for (const [value, _multiplicity] of values) { total.push(value) } - return total + return total as unknown as V + }, + postMap(result) { + return result as unknown as Array }, } } From 5723fbf4e8b4891099729b0f5452079ce5dd357e Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Fri, 22 Aug 2025 20:21:03 +0200 Subject: [PATCH 06/10] fix: list aggregate type in query build function --- packages/db/src/query/builder/functions.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/db/src/query/builder/functions.ts b/packages/db/src/query/builder/functions.ts index dd24edb1..6cd6665a 100644 --- a/packages/db/src/query/builder/functions.ts +++ b/packages/db/src/query/builder/functions.ts @@ -267,7 +267,9 @@ export function max( return new Aggregate(`max`, [toExpression(arg)]) } -export function list(arg: RefProxy | RefProxyFor): Aggregate { +export function list( + arg: RefProxy | RefProxyFor +): Aggregate> { return new Aggregate(`list`, [toExpression(arg)]) } From 74252a40e35ac290a3d6cf2130a0118747262f62 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Tue, 2 Sep 2025 17:26:06 +0200 Subject: [PATCH 07/10] fix: different value extractor for non numeric value (and slighlty fixed type) --- packages/db-ivm/src/operators/groupBy.ts | 5 +---- packages/db/src/query/compiler/group-by.ts | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/packages/db-ivm/src/operators/groupBy.ts b/packages/db-ivm/src/operators/groupBy.ts index 4a249d53..723d3668 100644 --- a/packages/db-ivm/src/operators/groupBy.ts +++ b/packages/db-ivm/src/operators/groupBy.ts @@ -350,7 +350,7 @@ export function list( valueExtractor: (value: T) => V = (v) => v as unknown as V ): AggregateFunction, V> { return { - preMap: (data: T) => valueExtractor(data) as unknown as V, + preMap: (data: T) => valueExtractor(data), reduce: (values) => { const total = [] @@ -360,9 +360,6 @@ export function list( return total as unknown as V }, - postMap(result) { - return result as unknown as Array - }, } } diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index a5580626..b02f6ded 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -343,28 +343,34 @@ function getAggregateFunction(aggExpr: Aggregate) { const compiledExpr = compileExpression(aggExpr.args[0]!) // Create a value extractor function for the expression to aggregate - const valueExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { + const numericValueExtractor = ([, namespacedRow]: [ + string, + NamespacedRow, + ]) => { const value = compiledExpr(namespacedRow) // Ensure we return a number for numeric aggregate functions - return value + return typeof value === `number` ? value : value != null ? Number(value) : 0 + } + + const anyValueExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { + return compiledExpr(namespacedRow) } // Return the appropriate aggregate function switch (aggExpr.name.toLowerCase()) { case `sum`: - return sum(valueExtractor) + return sum(numericValueExtractor) case `count`: return count() // count() doesn't need a value extractor case `avg`: - return avg(valueExtractor) + return avg(numericValueExtractor) case `min`: - return min(valueExtractor) + return min(numericValueExtractor) case `max`: - return max(valueExtractor) + return max(numericValueExtractor) case `list`: - return list(valueExtractor) + return list(anyValueExtractor) default: - console.log(aggExpr) throw new UnsupportedAggregateFunctionError(aggExpr.name) } } From 532f38c56edb92769c604d1266f30a84be9717d9 Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Tue, 2 Sep 2025 17:40:01 +0200 Subject: [PATCH 08/10] typo: list aggregate comment and valueExtractor comment --- packages/db-ivm/src/operators/groupBy.ts | 3 ++- packages/db/src/query/compiler/group-by.ts | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/db-ivm/src/operators/groupBy.ts b/packages/db-ivm/src/operators/groupBy.ts index 723d3668..071c098c 100644 --- a/packages/db-ivm/src/operators/groupBy.ts +++ b/packages/db-ivm/src/operators/groupBy.ts @@ -344,7 +344,8 @@ export function mode( } /** - * Creates a string aggregate function + * Creates a list aggregate function that collects all values into an array + * @param valueExtractor Function to extract a value from each data entry */ export function list( valueExtractor: (value: T) => V = (v) => v as unknown as V diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index b02f6ded..147b71eb 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -342,7 +342,7 @@ function getAggregateFunction(aggExpr: Aggregate) { // Pre-compile the value extractor expression const compiledExpr = compileExpression(aggExpr.args[0]!) - // Create a value extractor function for the expression to aggregate + // Create a number only value extractor function for the expression to aggregate const numericValueExtractor = ([, namespacedRow]: [ string, NamespacedRow, @@ -352,6 +352,7 @@ function getAggregateFunction(aggExpr: Aggregate) { return typeof value === `number` ? value : value != null ? Number(value) : 0 } + // Create a generic value extractor function for non-numeric aggregates const anyValueExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { return compiledExpr(namespacedRow) } From 71beb42d78237968330159a5fa5813a25462950b Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Tue, 2 Sep 2025 18:02:41 +0200 Subject: [PATCH 09/10] fix: improved list test --- packages/db-ivm/tests/operators/groupBy.test.ts | 2 +- packages/db/tests/query/group-by.test.ts | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/db-ivm/tests/operators/groupBy.test.ts b/packages/db-ivm/tests/operators/groupBy.test.ts index 5461f915..43377a03 100644 --- a/packages/db-ivm/tests/operators/groupBy.test.ts +++ b/packages/db-ivm/tests/operators/groupBy.test.ts @@ -132,7 +132,7 @@ describe(`Operators`, () => { expect(result).toEqual(expectedResult) }) - test(`with single stringAgg aggregate`, () => { + test(`with single list aggregate`, () => { const graph = new D2() const input = graph.newInput<{ productId: number diff --git a/packages/db/tests/query/group-by.test.ts b/packages/db/tests/query/group-by.test.ts index 6dec248a..a6772012 100644 --- a/packages/db/tests/query/group-by.test.ts +++ b/packages/db/tests/query/group-by.test.ts @@ -190,6 +190,7 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { .groupBy(({ orders }) => orders.status) .select(({ orders }) => ({ status: orders.status, + customer_ids: list(orders.customer_id), total_amount: sum(orders.amount), order_count: count(orders.id), avg_amount: avg(orders.amount), @@ -201,6 +202,7 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { // Completed orders: 1, 2, 4, 7 (amounts: 100, 200, 300, 400) const completed = statusSummary.get(`completed`) expect(completed?.status).toBe(`completed`) + expect(completed?.customer_ids).toEqual([1, 1, 2, 1]) expect(completed?.total_amount).toBe(1000) expect(completed?.order_count).toBe(4) expect(completed?.avg_amount).toBe(250) @@ -208,6 +210,7 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { // Pending orders: 3, 5 (amounts: 150, 250) const pending = statusSummary.get(`pending`) expect(pending?.status).toBe(`pending`) + expect(pending?.customer_ids).toEqual([2, 3]) expect(pending?.total_amount).toBe(400) expect(pending?.order_count).toBe(2) expect(pending?.avg_amount).toBe(200) @@ -215,6 +218,7 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { // Cancelled orders: 6 (amount: 75) const cancelled = statusSummary.get(`cancelled`) expect(cancelled?.status).toBe(`cancelled`) + expect(cancelled?.customer_ids).toEqual([3]) expect(cancelled?.total_amount).toBe(75) expect(cancelled?.order_count).toBe(1) expect(cancelled?.avg_amount).toBe(75) @@ -229,7 +233,6 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { .groupBy(({ orders }) => orders.product_category) .select(({ orders }) => ({ product_category: orders.product_category, - product_ids: list(orders), total_quantity: sum(orders.quantity), order_count: count(orders.id), total_amount: sum(orders.amount), @@ -240,7 +243,6 @@ function createGroupByTests(autoIndex: `off` | `eager`): void { // Electronics: orders 1, 2, 4, 6 (quantities: 2, 1, 1, 1) const electronics = categorySummary.get(`electronics`) - console.log(electronics) expect(electronics?.product_category).toBe(`electronics`) expect(electronics?.total_quantity).toBe(5) expect(electronics?.order_count).toBe(4) From c5a4ca3c2ed00769b6d8880b8b0c5867ed3b375b Mon Sep 17 00:00:00 2001 From: Uzini <43294422+Uziniii@users.noreply.github.com> Date: Wed, 3 Sep 2025 10:56:43 +0200 Subject: [PATCH 10/10] typo: value extractor name --- packages/db/src/query/compiler/group-by.ts | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/packages/db/src/query/compiler/group-by.ts b/packages/db/src/query/compiler/group-by.ts index 147b71eb..ea76d1c0 100644 --- a/packages/db/src/query/compiler/group-by.ts +++ b/packages/db/src/query/compiler/group-by.ts @@ -343,34 +343,31 @@ function getAggregateFunction(aggExpr: Aggregate) { const compiledExpr = compileExpression(aggExpr.args[0]!) // Create a number only value extractor function for the expression to aggregate - const numericValueExtractor = ([, namespacedRow]: [ - string, - NamespacedRow, - ]) => { + const numberExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { const value = compiledExpr(namespacedRow) // Ensure we return a number for numeric aggregate functions return typeof value === `number` ? value : value != null ? Number(value) : 0 } // Create a generic value extractor function for non-numeric aggregates - const anyValueExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { + const valueExtractor = ([, namespacedRow]: [string, NamespacedRow]) => { return compiledExpr(namespacedRow) } // Return the appropriate aggregate function switch (aggExpr.name.toLowerCase()) { case `sum`: - return sum(numericValueExtractor) + return sum(numberExtractor) case `count`: return count() // count() doesn't need a value extractor case `avg`: - return avg(numericValueExtractor) + return avg(numberExtractor) case `min`: - return min(numericValueExtractor) + return min(numberExtractor) case `max`: - return max(numericValueExtractor) + return max(numberExtractor) case `list`: - return list(anyValueExtractor) + return list(valueExtractor) default: throw new UnsupportedAggregateFunctionError(aggExpr.name) }