Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
nodejs 22.13.1
pnpm 10.13.1
27 changes: 27 additions & 0 deletions packages/db-ivm/src/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,32 @@ export function mode<T>(
}
}

/**
* Creates a collect aggregate function that gathers all values into an array
* This is similar to SQL's array_agg or GROUP_CONCAT
* @param valueExtractor Function to extract a value from each data entry
*/
export function collect<T, V = T>(
valueExtractor: (value: T) => V = (v) => v as unknown as V,
): AggregateFunction<T, Array<V>, Array<V>> {
return {
preMap: (data: T) => [valueExtractor(data)],
reduce: (values: Array<[Array<V>, number]>) => {
const allValues: Array<V> = []
for (const [valueArray, multiplicity] of values) {
for (const value of valueArray) {
// Add each value 'multiplicity' times for correct IVM semantics
for (let i = 0; i < multiplicity; i++) {
allValues.push(value)
}
}
}
return allValues
},
// No postMap - return the array directly
}
}

export const groupByOperators = {
sum,
count,
Expand All @@ -374,4 +400,5 @@ export const groupByOperators = {
max,
median,
mode,
collect,
}
35 changes: 35 additions & 0 deletions packages/db/src/query/builder/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,38 @@ export function max<T extends ExpressionLike>(arg: T): AggregateReturnType<T> {
return new Aggregate(`max`, [toExpression(arg)]) as AggregateReturnType<T>
}

/**
* String-typed min aggregate.
* Unlike min() which coerces to number, minStr() preserves string values
* for proper lexicographic comparison (e.g., ISO 8601 date strings).
*/
export function minStr<T extends ExpressionLike>(
arg: T,
): Aggregate<string | null | undefined> {
return new Aggregate(`minstr`, [toExpression(arg)])
}

/**
* String-typed max aggregate.
* Unlike max() which coerces to number, maxStr() preserves string values
* for proper lexicographic comparison (e.g., ISO 8601 date strings).
*/
export function maxStr<T extends ExpressionLike>(
arg: T,
): Aggregate<string | null | undefined> {
return new Aggregate(`maxstr`, [toExpression(arg)])
}

/**
* Collects all values in a group into an array
* Similar to SQL's array_agg or GROUP_CONCAT
*/
export function collect<T extends ExpressionLike>(
arg: T,
): Aggregate<Array<ExtractType<T>>> {
return new Aggregate(`collect`, [toExpression(arg)])
}

/**
* List of comparison function names that can be used with indexes
*/
Expand Down Expand Up @@ -373,6 +405,9 @@ export const operators = [
`sum`,
`min`,
`max`,
`minStr`,
`maxStr`,
`collect`,
] as const

export type OperatorName = (typeof operators)[number]
8 changes: 7 additions & 1 deletion packages/db/src/query/compiler/group-by.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type {
} from '../ir.js'
import type { NamespacedAndKeyedStream, NamespacedRow } from '../../types.js'

const { sum, count, avg, min, max } = groupByOperators
const { sum, count, avg, min, max, collect } = groupByOperators

/**
* Interface for caching the mapping between GROUP BY expressions and SELECT expressions
Expand Down Expand Up @@ -379,6 +379,12 @@ function getAggregateFunction(aggExpr: Aggregate) {
return min(valueExtractorWithDate)
case `max`:
return max(valueExtractorWithDate)
case `minstr`:
return min(rawValueExtractor)
case `maxstr`:
return max(rawValueExtractor)
case `collect`:
return collect(rawValueExtractor)
default:
throw new UnsupportedAggregateFunctionError(aggExpr.name)
}
Expand Down
3 changes: 3 additions & 0 deletions packages/db/src/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export {
sum,
min,
max,
minStr,
maxStr,
collect,
} from './builder/functions.js'

// Ref proxy utilities
Expand Down
238 changes: 238 additions & 0 deletions packages/db/tests/query/group-by.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { mockSyncCollectionOptions } from '../utils.js'
import {
and,
avg,
collect,
count,
eq,
gt,
Expand Down Expand Up @@ -1387,3 +1388,240 @@ describe(`Query GROUP BY Execution`, () => {
createGroupByTests(`off`)
createGroupByTests(`eager`)
})

describe(`Collect Aggregate Function`, () => {
let ordersCollection: ReturnType<typeof createOrdersCollection>

function createOrdersCollection(autoIndex: `off` | `eager` = `eager`) {
return createCollection(
mockSyncCollectionOptions<Order>({
id: `test-orders-collect`,
getKey: (order) => order.id,
initialData: sampleOrders,
autoIndex,
}),
)
}

beforeEach(() => {
ordersCollection = createOrdersCollection()
})

test(`collect gathers all values into an array`, () => {
const ordersByCustomer = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.groupBy(({ orders }) => orders.customer_id)
.select(({ orders }) => ({
customer_id: orders.customer_id,
order_ids: collect(orders.id),
amounts: collect(orders.amount),
})),
})

expect(ordersByCustomer.size).toBe(3) // 3 customers

// Customer 1: orders 1, 2, 7 (amounts: 100, 200, 400)
const customer1 = ordersByCustomer.get(1)
expect(customer1).toBeDefined()
expect(customer1?.order_ids).toHaveLength(3)
expect(customer1?.order_ids).toEqual(expect.arrayContaining([1, 2, 7]))
expect(customer1?.amounts).toHaveLength(3)
expect(customer1?.amounts).toEqual(expect.arrayContaining([100, 200, 400]))

// Customer 2: orders 3, 4 (amounts: 150, 300)
const customer2 = ordersByCustomer.get(2)
expect(customer2).toBeDefined()
expect(customer2?.order_ids).toHaveLength(2)
expect(customer2?.order_ids).toEqual(expect.arrayContaining([3, 4]))
expect(customer2?.amounts).toEqual(expect.arrayContaining([150, 300]))

// Customer 3: orders 5, 6 (amounts: 250, 75)
const customer3 = ordersByCustomer.get(3)
expect(customer3).toBeDefined()
expect(customer3?.order_ids).toHaveLength(2)
expect(customer3?.order_ids).toEqual(expect.arrayContaining([5, 6]))
expect(customer3?.amounts).toEqual(expect.arrayContaining([250, 75]))
})

test(`collect works with string values`, () => {
const statusesByCustomer = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.groupBy(({ orders }) => orders.customer_id)
.select(({ orders }) => ({
customer_id: orders.customer_id,
statuses: collect(orders.status),
})),
})

// Customer 1: all completed orders
const customer1 = statusesByCustomer.get(1)
expect(customer1?.statuses).toHaveLength(3)
expect(customer1?.statuses.every((s) => s === `completed`)).toBe(true)

// Customer 3: pending and cancelled
const customer3 = statusesByCustomer.get(3)
expect(customer3?.statuses).toHaveLength(2)
expect(customer3?.statuses).toEqual(
expect.arrayContaining([`pending`, `cancelled`]),
)
})

test(`collect combined with other aggregates`, () => {
const customerStats = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.groupBy(({ orders }) => orders.customer_id)
.select(({ orders }) => ({
customer_id: orders.customer_id,
order_ids: collect(orders.id),
order_count: count(orders.id),
total_amount: sum(orders.amount),
amounts: collect(orders.amount),
})),
})

const customer1 = customerStats.get(1)
expect(customer1?.order_ids).toHaveLength(3)
expect(customer1?.order_count).toBe(3)
expect(customer1?.total_amount).toBe(700)
expect(customer1?.amounts).toHaveLength(3)
})

test(`collect with live updates - insert`, () => {
const ordersByCustomer = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.groupBy(({ orders }) => orders.customer_id)
.select(({ orders }) => ({
customer_id: orders.customer_id,
order_ids: collect(orders.id),
})),
})

// Initial state
const initialCustomer1 = ordersByCustomer.get(1)
expect(initialCustomer1?.order_ids).toHaveLength(3)

// Insert new order for customer 1
const newOrder: Order = {
id: 8,
customer_id: 1,
amount: 500,
status: `completed`,
date: new Date(`2023-03-15`),
product_category: `electronics`,
quantity: 2,
discount: 0,
sales_rep_id: 1,
}

ordersCollection.utils.begin()
ordersCollection.utils.write({ type: `insert`, value: newOrder })
ordersCollection.utils.commit()

const updatedCustomer1 = ordersByCustomer.get(1)
expect(updatedCustomer1?.order_ids).toHaveLength(4)
expect(updatedCustomer1?.order_ids).toEqual(
expect.arrayContaining([1, 2, 7, 8]),
)
})

test(`collect with live updates - delete`, () => {
const ordersByCustomer = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.groupBy(({ orders }) => orders.customer_id)
.select(({ orders }) => ({
customer_id: orders.customer_id,
order_ids: collect(orders.id),
})),
})

// Initial state
const initialCustomer3 = ordersByCustomer.get(3)
expect(initialCustomer3?.order_ids).toHaveLength(2)
expect(initialCustomer3?.order_ids).toEqual(expect.arrayContaining([5, 6]))

// Delete order 6
const orderToDelete = sampleOrders.find((o) => o.id === 6)!

ordersCollection.utils.begin()
ordersCollection.utils.write({ type: `delete`, value: orderToDelete })
ordersCollection.utils.commit()

const updatedCustomer3 = ordersByCustomer.get(3)
expect(updatedCustomer3?.order_ids).toHaveLength(1)
expect(updatedCustomer3?.order_ids).toEqual([5])
})

test(`collect with WHERE filter`, () => {
const completedOrdersByCustomer = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.where(({ orders }) => eq(orders.status, `completed`))
.groupBy(({ orders }) => orders.customer_id)
.select(({ orders }) => ({
customer_id: orders.customer_id,
order_ids: collect(orders.id),
})),
})

// Customer 1: all 3 orders are completed
const customer1 = completedOrdersByCustomer.get(1)
expect(customer1?.order_ids).toHaveLength(3)

// Customer 2: only order 4 is completed
const customer2 = completedOrdersByCustomer.get(2)
expect(customer2?.order_ids).toHaveLength(1)
expect(customer2?.order_ids).toEqual([4])

// Customer 3: no completed orders
const customer3 = completedOrdersByCustomer.get(3)
expect(customer3).toBeUndefined()
})

test(`collect with multiple column grouping`, () => {
const ordersByStatusAndCategory = createLiveQueryCollection({
startSync: true,
query: (q) =>
q
.from({ orders: ordersCollection })
.groupBy(({ orders }) => [orders.status, orders.product_category])
.select(({ orders }) => ({
status: orders.status,
product_category: orders.product_category,
order_ids: collect(orders.id),
})),
})

// Completed electronics: orders 1, 2, 4
const completedElectronics = ordersByStatusAndCategory.get(
`["completed","electronics"]`,
)
expect(completedElectronics?.order_ids).toHaveLength(3)
expect(completedElectronics?.order_ids).toEqual(
expect.arrayContaining([1, 2, 4]),
)

// Completed books: order 7
const completedBooks = ordersByStatusAndCategory.get(
`["completed","books"]`,
)
expect(completedBooks?.order_ids).toHaveLength(1)
expect(completedBooks?.order_ids).toEqual([7])
})
})
Loading
Loading