diff --git a/README.md b/README.md index 6151c228..de8cdd64 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,7 @@ TanStack DB provides several collection types to support different backend integ - **`@tanstack/query-db-collection`** - Collections backed by [TanStack Query](https://tanstack.com/query) for REST APIs and GraphQL endpoints - **`@tanstack/electric-db-collection`** - Real-time sync collections powered by [ElectricSQL](https://electric-sql.com) for live database synchronization - **`@tanstack/trailbase-db-collection`** - Collections for [TrailBase](https://trailbase.io) backend integration +- **`@tanstack/rxdb-db-collection`** - Collections backed by [RxDB](https://rxdb.info), a client-side database with replication support and local persistence made for local-first apps. ## Framework integrations @@ -157,7 +158,7 @@ TanStack DB integrates with React & Vue with more on the way! ```bash npm install @tanstack/react-db # Optional: for specific collection types -npm install @tanstack/electric-db-collection @tanstack/query-db-collection +npm install @tanstack/electric-db-collection @tanstack/query-db-collection @tanstack/trailbase-db-collection @tanstack/rxdb-db-collection ``` Other framework integrations are in progress. diff --git a/docs/collections/rxdb-collection.md b/docs/collections/rxdb-collection.md new file mode 100644 index 00000000..09a9470e --- /dev/null +++ b/docs/collections/rxdb-collection.md @@ -0,0 +1,132 @@ +--- +title: RxDB Collection +--- + +# RxDB Collection + +RxDB collections provide seamless integration between TanStack DB and [RxDB](https://rxdb.info), enabling automatic synchronization between your in-memory TanStack DB collections and RxDB's local-first database. Giving you offline-ready persistence, and powerful sync capabilities with a wide range of backends. + + +## Overview + +The `@tanstack/rxdb-db-collection` package allows you to create collections that: +- Automatically mirror the state of an underlying RxDB collection +- Reactively update when RxDB documents change +- Support optimistic mutations with rollback on error +- Provide persistence handlers to keep RxDB in sync with TanStack DB transactions +- Sync across browser tabs - changes in one tab are reflected in RxDB and TanStack DB collections in all tabs +- Use one of RxDB's [storage engines](https://rxdb.info/rx-storage.html). +- Work with RxDB's [replication features](https://rxdb.info/replication.html) for offline-first and sync scenarios +- Leverage RxDB's [replication plugins](https://rxdb.info/replication.html) to sync with CouchDB, MongoDB, Supabase, REST APIs, GraphQL, WebRTC (P2P) and more. + + +## 1. Installation + +```bash +npm install @tanstack/rxdb-db-collection rxdb @tanstack/db +``` + + +### 2. Create an RxDatabase and RxCollection + +```ts +import { createRxDatabase, addRxPlugin } from 'rxdb/plugins/core' + +/** + * Here we use the localstorage based storage for RxDB. + * RxDB has a wide range of storages based on Dexie.js, IndexedDB, SQLite and more. + */ +import { getRxStorageLocalstorage } from 'rxdb/plugins/storage-localstorage' + +// add json-schema validation (optional) +import { wrappedValidateAjvStorage } from 'rxdb/plugins/validate-ajv'; + +// Enable dev mode (optional, recommended during development) +import { RxDBDevModePlugin } from 'rxdb/plugins/dev-mode' +addRxPlugin(RxDBDevModePlugin) + +type Todo = { id: string; text: string; completed: boolean } + +const db = await createRxDatabase({ + name: 'my-todos', + storage: wrappedValidateAjvStorage({ + storage: getRxStorageLocalstorage() + }) +}) + +await db.addCollections({ + todos: { + schema: { + title: 'todos', + version: 0, + type: 'object', + primaryKey: 'id', + properties: { + id: { type: 'string', maxLength: 100 }, + text: { type: 'string' }, + completed: { type: 'boolean' }, + }, + required: ['id', 'text', 'completed'], + }, + }, +}) +``` + + +### 3. (optional) sync with a backend +```ts +import { replicateRxCollection } from 'rxdb/plugins/replication' +const replicationState = replicateRxCollection({ + collection: db.todos, + pull: { handler: myPullHandler }, + push: { handler: myPushHandler }, +}) +``` + +### 4. Wrap the RxDB collection with TanStack DB + +```ts +import { createCollection } from '@tanstack/db' +import { rxdbCollectionOptions } from '@tanstack/rxdb-db-collection' + +const todosCollection = createCollection( + rxdbCollectionOptions({ + rxCollection: myDatabase.todos, + startSync: true, // start ingesting RxDB data immediately + }) +) +``` + + +Now `todosCollection` is a reactive TanStack DB collection driven by RxDB: + +- Writes via `todosCollection.insert/update/delete` persist to RxDB. +- Direct writes in RxDB (or via replication) flow into the TanStack collection via change streams. + + + +## Configuration Options + +The `rxdbCollectionOptions` function accepts the following options: + +### Required + +- `rxCollection`: The underlying [RxDB collection](https://rxdb.info/rx-collection.html) + +### Optional + +- `id`: Unique identifier for the collection +- `schema`: Schema for validating items. RxDB already has schema validation but having additional validation on the TanStack DB side can help to unify error handling between different tanstack collections. +- `startSync`: Whether to start syncing immediately (default: true) +- `onInsert, onUpdate, onDelete`: Override default persistence handlers. By default, TanStack DB writes are persisted to RxDB using bulkUpsert, patch, and bulkRemove. +- `syncBatchSize`: The maximum number of documents fetched per batch during the initial sync from RxDB into TanStack DB (default: 1000). Larger values reduce round trips but use more memory; smaller values are lighter but may increase query calls. Note that this only affects the initial sync. Ongoing live updates are streamed one by one via RxDB's change feed. + + + +## Syncing with Backends + +Replication and sync in RxDB run independently of TanStack DB. You set up replication directly on your RxCollection using RxDB's replication plugins (for CouchDB, GraphQL, WebRTC, REST APIs, etc.). + +When replication runs, it pulls and pushes changes to the backend and applies them to the RxDB collection. Since the TanStack DB integration subscribes to the RxDB change stream, any changes applied by replication are automatically reflected in your TanStack DB collection. + +This separation of concerns means you configure replication entirely in RxDB, and TanStack DB automatically benefits: your TanStack collections always stay up to date with whatever sync strategy you choose. diff --git a/docs/config.json b/docs/config.json index 90a7225d..155ef849 100644 --- a/docs/config.json +++ b/docs/config.json @@ -88,6 +88,10 @@ { "label": "Electric Collection", "to": "collections/electric-collection" + }, + { + "label": "RxDB Collection", + "to": "collections/rxdb-collection" } ] }, @@ -137,6 +141,14 @@ { "label": "queryCollectionOptions", "to": "reference/query-db-collection/functions/querycollectionoptions" + }, + { + "label": "RxDB DB Collection", + "to": "reference/rxdb-db-collection/index" + }, + { + "label": "rxdbCollectionOptions", + "to": "reference/rxdb-db-collection/functions/rxdbcollectionoptions" } ], "frameworks": [ diff --git a/docs/guides/collection-options-creator.md b/docs/guides/collection-options-creator.md index 6dfe2bac..058bb7b2 100644 --- a/docs/guides/collection-options-creator.md +++ b/docs/guides/collection-options-creator.md @@ -17,7 +17,7 @@ Collection options creators follow a consistent pattern: ## When to Create a Custom Collection You should create a custom collection when: -- You have a dedicated sync engine (like ElectricSQL, Trailbase, Firebase, or a custom WebSocket solution) +- You have a dedicated sync engine (like ElectricSQL, Trailbase, Firebase, RxDB or a custom WebSocket solution) - You need specific sync behaviors that aren't covered by the query collection - You want to integrate with a backend that has its own sync protocol @@ -331,6 +331,7 @@ For complete, production-ready examples, see the collection packages in the TanS - **[@tanstack/query-collection](https://github.com/TanStack/db/tree/main/packages/query-collection)** - Pattern A: User-provided handlers with full refetch strategy - **[@tanstack/trailbase-collection](https://github.com/TanStack/db/tree/main/packages/trailbase-collection)** - Pattern B: Built-in handlers with ID-based tracking - **[@tanstack/electric-collection](https://github.com/TanStack/db/tree/main/packages/electric-collection)** - Pattern A: Transaction ID tracking with complex sync protocols +- **[@tanstack/rxdb-collection](https://github.com/TanStack/db/tree/main/packages/rxdb-collection)** - Pattern B: Built-in handlers that bridge [RxDB](https://rxdb.info) change streams into TanStack DB's sync lifecycle ### Key Lessons from Production Collections @@ -349,6 +350,11 @@ For complete, production-ready examples, see the collection packages in the TanS - Demonstrates advanced deduplication techniques - Shows how to wrap user handlers with sync coordination +**From RxDB Collection:** +- Uses RxDB's built-in queries and change streams +- Uses `RxCollection.$` to subscribe to inserts/updates/deletes and forward them to TanStack DB with begin-write-commit +- Implements built-in mutation handlers (onInsert, onUpdate, onDelete) that call RxDB APIs (bulkUpsert, incrementalPatch, bulkRemove) + ## Complete Example: WebSocket Collection Here's a complete example of a WebSocket-based collection options creator that demonstrates the full round-trip flow: diff --git a/docs/installation.md b/docs/installation.md index 8948c028..dfe4c59f 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -85,3 +85,14 @@ npm install @tanstack/trailbase-db-collection ``` Use `trailBaseCollectionOptions` to sync records from TrailBase's Record APIs with built-in subscription support. + +### RxDB Collection + +For offline-first apps and local persistence with [RxDB](https://rxdb.info): + +```sh +npm install @tanstack/rxdb-db-collection +``` + +Use `rxdbCollectionOptions` to bridge an [RxDB collection](https://rxdb.info/rx-collection.html) into TanStack DB. +This gives you reactive TanStack DB collections backed by RxDB's powerful local-first database, replication, and conflict handling features. diff --git a/docs/overview.md b/docs/overview.md index 0f99db21..6f1cb893 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -154,8 +154,9 @@ There are a number of built-in collection types: 1. [`QueryCollection`](#querycollection) to load data into collections using [TanStack Query](https://tanstack.com/query) 2. [`ElectricCollection`](#electriccollection) to sync data into collections using [ElectricSQL](https://electric-sql.com) 3. [`TrailBaseCollection`](#trailbasecollection) to sync data into collections using [TrailBase](https://trailbase.io) -4. [`LocalStorageCollection`](#localstoragecollection) for small amounts of local-only state that syncs across browser tabs -5. [`LocalOnlyCollection`](#localonlycollection) for in-memory client data or UI state +4. [`RxDBCollection`](#rxdbcollection) to integrate with [RxDB](https://rxdb.info) for local persistence and sync +5. [`LocalStorageCollection`](#localstoragecollection) for small amounts of local-only state that syncs across browser tabs +6. [`LocalOnlyCollection`](#localonlycollection) for in-memory client data or UI state You can also use: @@ -301,6 +302,52 @@ This collection requires the following TrailBase-specific options: A new collections doesn't start syncing until you call `collection.preload()` or you query it. +#### `RxDBCollection` + +[RxDB](https://rxdb.info) is a client-side database for JavaScript apps with replication, conflict resolution, and offline-first features. +Use `rxdbCollectionOptions` from `@tanstack/rxdb-db-collection` to integrate an RxDB collection with TanStack DB: + +```ts +import { createCollection } from "@tanstack/react-db" +import { rxdbCollectionOptions } from "@tanstack/rxdb-db-collection" +import { createRxDatabase } from "rxdb" + +const db = await createRxDatabase({ + name: "mydb", + storage: getRxStorageMemory(), +}) +await db.addCollections({ + todos: { + schema: { + version: 0, + primaryKey: "id", + type: "object", + properties: { + id: { type: "string", maxLength: 100 }, + text: { type: "string" }, + completed: { type: "boolean" }, + }, + }, + }, +}) + +// Wrap the RxDB collection with TanStack DB +export const todoCollection = createCollection( + rxdbCollectionOptions({ + rxCollection: db.todos, + startSync: true + }) +) +``` + +With this integration: + +- TanStack DB subscribes to RxDB's change streams and reflects updates, deletes, and inserts in real-time. +- You get local-first sync when RxDB replication is configured. +- Mutation handlers (onInsert, onUpdate, onDelete) are implemented using RxDB's APIs (bulkUpsert, incrementalPatch, bulkRemove). + +This makes RxDB a great choice for apps that need local-first storage, replication, or peer-to-peer sync combined with TanStack DB's live queries and transaction lifecycle. + #### `LocalStorageCollection` localStorage collections store small amounts of local-only state that persists across browser sessions and syncs across browser tabs in real-time. All data is stored under a single localStorage key and automatically synchronized using storage events. diff --git a/packages/rxdb-db-collection/CHANGELOG.md b/packages/rxdb-db-collection/CHANGELOG.md new file mode 100644 index 00000000..67220f79 --- /dev/null +++ b/packages/rxdb-db-collection/CHANGELOG.md @@ -0,0 +1,5 @@ +# @tanstack/rxdb-db-collection + +## 0.0.0 + +- Initial Release diff --git a/packages/rxdb-db-collection/package.json b/packages/rxdb-db-collection/package.json new file mode 100644 index 00000000..a2a2606c --- /dev/null +++ b/packages/rxdb-db-collection/package.json @@ -0,0 +1,71 @@ +{ + "name": "@tanstack/rxdb-db-collection", + "description": "RxDB collection for TanStack DB", + "version": "0.1.4", + "dependencies": { + "rxdb": "16.17.2", + "@standard-schema/spec": "^1.0.0", + "@tanstack/db": "workspace:*", + "@tanstack/store": "^0.7.0", + "debug": "^4.4.1" + }, + "devDependencies": { + "@types/debug": "^4.1.12", + "@vitest/coverage-istanbul": "^3.0.9" + }, + "exports": { + ".": { + "import": { + "types": "./dist/esm/index.d.ts", + "default": "./dist/esm/index.js" + }, + "require": { + "types": "./dist/cjs/index.d.cts", + "default": "./dist/cjs/index.cjs" + } + }, + "./package.json": "./package.json" + }, + "files": [ + "dist", + "src" + ], + "main": "dist/cjs/index.cjs", + "module": "dist/esm/index.js", + "packageManager": "pnpm@10.6.3", + "peerDependencies": { + "rxdb": ">=16.17.2", + "typescript": ">=4.7" + }, + "author": "Kyle Mathews", + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/TanStack/db.git", + "directory": "packages/rxdb-db-collection" + }, + "homepage": "https://tanstack.com/db", + "keywords": [ + "rxdb", + "nosql", + "realtime", + "local-first", + "sync-engine", + "sync", + "replication", + "opfs", + "indexeddb", + "localstorage", + "optimistic", + "typescript" + ], + "scripts": { + "build": "vite build", + "dev": "vite build --watch", + "lint": "eslint . --fix", + "test": "npx vitest --run" + }, + "sideEffects": false, + "type": "module", + "types": "dist/esm/index.d.ts" +} diff --git a/packages/rxdb-db-collection/src/helper.ts b/packages/rxdb-db-collection/src/helper.ts new file mode 100644 index 00000000..57678bb3 --- /dev/null +++ b/packages/rxdb-db-collection/src/helper.ts @@ -0,0 +1,16 @@ +const RESERVED_RXDB_FIELDS = new Set([ + '_rev', + '_deleted', + '_attachments', + '_meta', +]) + +export function stripRxdbFields>(obj: T): T { + if (!obj) return obj + const out: any = Array.isArray(obj) ? [] : {} + for (const k of Object.keys(obj)) { + if (RESERVED_RXDB_FIELDS.has(k)) continue + out[k] = obj[k] + } + return out as T +} diff --git a/packages/rxdb-db-collection/src/index.ts b/packages/rxdb-db-collection/src/index.ts new file mode 100644 index 00000000..4d3f5981 --- /dev/null +++ b/packages/rxdb-db-collection/src/index.ts @@ -0,0 +1,2 @@ +export * from "./rxdb" +export * from "./helper" diff --git a/packages/rxdb-db-collection/src/rxdb.ts b/packages/rxdb-db-collection/src/rxdb.ts new file mode 100644 index 00000000..0f1ee64d --- /dev/null +++ b/packages/rxdb-db-collection/src/rxdb.ts @@ -0,0 +1,303 @@ +import { + FilledMangoQuery, + RxCollection, + RxDocumentData, + clone, + ensureNotFalsy, + getFromMapOrCreate, + lastOfArray, + prepareQuery, + rxStorageWriteErrorToRxError +} from "rxdb/plugins/core" +import type { Subscription } from 'rxjs' + +import DebugModule from "debug" +import type { + CollectionConfig, + ResolveType, + SyncConfig, +} from "@tanstack/db" +import type { StandardSchemaV1 } from "@standard-schema/spec" +import { stripRxdbFields } from './helper' + +const debug = DebugModule.debug(`ts/db:rxdb`) + + +/** + * Used in tests to ensure proper cleanup + */ +export const OPEN_RXDB_SUBSCRIPTIONS = new WeakMap>() + + +/** + * Configuration interface for RxDB collection options + * @template TExplicit - The explicit type of items in the collection (highest priority). Use the document type of your RxCollection here. + * @template TSchema - The schema type for validation and type inference (second priority) + * + * @remarks + * Type resolution follows a priority order: + * 1. If you provide an explicit type via generic parameter, it will be used + * 2. If no explicit type is provided but a schema is, the schema's output type will be inferred + * + * You should provide EITHER an explicit type OR a schema, but not both, as they would conflict. + * Notice that primary keys in RxDB must always be a string. + */ +export type RxDBCollectionConfig< + TExplicit extends object = Record, + TSchema extends StandardSchemaV1 = never +> = Omit< + CollectionConfig< + ResolveType, + string, + TSchema + >, + 'insert' | 'update' | 'delete' | 'getKey' | 'sync' +> & { + /** + * The RxCollection from a RxDB Database instance. + */ + rxCollection: RxCollection + + /** + * The maximum number of documents to read from the RxDB collection + * in a single batch during the initial sync between RxDB and the + * in-memory TanStack DB collection. + * + * @remarks + * - Defaults to `1000` if not specified. + * - Larger values reduce the number of round trips to the storage + * engine but increase memory usage per batch. + * - Smaller values may lower memory usage and allow earlier + * streaming of initial results, at the cost of more query calls. + * + * Adjust this depending on your expected collection size and + * performance characteristics of the chosen RxDB storage adapter. + */ + syncBatchSize?: number +} + +/** + * Creates RxDB collection options for use with a standard Collection + * + * @template TExplicit - The explicit type of items in the collection (highest priority) + * @template TSchema - The schema type for validation and type inference (second priority) + * @param config - Configuration options for the RxDB collection + * @returns Collection options with utilities + */ +export function rxdbCollectionOptions< + TExplicit extends object = Record, + TSchema extends StandardSchemaV1 = never +>( + config: RxDBCollectionConfig +) { + type Row = ResolveType; + type Key = string; // because RxDB primary keys must be strings + + const { ...restConfig } = config + const rxCollection = config.rxCollection + + // "getKey" + const primaryPath = rxCollection.schema.primaryPath + const getKey: CollectionConfig['getKey'] = (item) => { + const key: string = (item as any)[primaryPath] as string + return key + } + + /** + * "sync" + * Notice that this describes the Sync between the local RxDB collection + * and the in-memory tanstack-db collection. + * It is not about sync between a client and a server! + */ + type SyncParams = Parameters['sync']>[0] + const sync: SyncConfig = { + sync: (params: SyncParams) => { + const { begin, write, commit, markReady } = params + + let ready = false + async function initialFetch() { + /** + * RxDB stores a last-write-time + * which can be used to "sort" document writes, + * so for initial sync we iterate over that. + */ + let cursor: RxDocumentData | undefined = undefined + const syncBatchSize = config.syncBatchSize ? config.syncBatchSize : 1000 + begin() + + while (!ready) { + let query: FilledMangoQuery + if (cursor) { + query = { + selector: { + $or: [ + { '_meta.lwt': { $gt: (cursor._meta.lwt as number) } }, + { + '_meta.lwt': cursor._meta.lwt, + [primaryPath]: { + $gt: cursor[primaryPath] + }, + } + ] + } as any, + sort: [ + { '_meta.lwt': 'asc' }, + { [primaryPath]: 'asc' } as any + ], + limit: syncBatchSize, + skip: 0 + } + } else { + query = { + selector: {}, + sort: [ + { '_meta.lwt': 'asc' }, + { [primaryPath]: 'asc' } as any + ], + limit: syncBatchSize, + skip: 0 + } + } + + /** + * Instead of doing a RxCollection.query(), + * we directly query the storage engine of the RxCollection so we do not use the + * RxCollection document cache because it likely wont be used anyway + * since most queries will run directly on the tanstack-db side. + */ + const preparedQuery = prepareQuery( + rxCollection.storageInstance.schema, + query + ); + const result = await rxCollection.storageInstance.query(preparedQuery) + const docs = result.documents + + cursor = lastOfArray(docs) + if (docs.length === 0) { + ready = true + break; + } + + docs.forEach(d => { + write({ + type: 'insert', + value: stripRxdbFields(clone(d)) as any + }) + }) + + } + commit() + } + + type WriteMessage = Parameters[0] + const buffer: WriteMessage[] = [] + const queue = (msg: WriteMessage) => { + if (!ready) { + buffer.push(msg) + return + } + begin() + write(msg as any) + commit() + } + + let sub: Subscription + async function startOngoingFetch() { + // Subscribe early and buffer live changes during initial load and ongoing + sub = rxCollection.$.subscribe((ev) => { + const cur = stripRxdbFields(clone(ev.documentData as Row)) + switch (ev.operation) { + case 'INSERT': + if (cur) queue({ type: 'insert', value: cur }) + break + case 'UPDATE': + if (cur) queue({ type: 'update', value: cur }) + break + case 'DELETE': + queue({ type: 'delete', value: cur }) + break + } + }) + + const subs = getFromMapOrCreate( + OPEN_RXDB_SUBSCRIPTIONS, + rxCollection, + () => new Set() + ) + subs.add(sub) + } + + + async function start() { + startOngoingFetch() + await initialFetch(); + + if (buffer.length) { + begin() + for (const msg of buffer) write(msg as any) + commit() + buffer.length = 0 + } + + markReady() + } + + + start() + + return () => { + const subs = getFromMapOrCreate( + OPEN_RXDB_SUBSCRIPTIONS, + rxCollection, + () => new Set() + ) + subs.delete(sub) + sub.unsubscribe() + } + }, + // Expose the getSyncMetadata function + getSyncMetadata: undefined, + } + + const collectionConfig: CollectionConfig> = { + ...restConfig, + getKey, + sync, + onInsert: async (params) => { + debug("insert", params) + const newItems = params.transaction.mutations.map(m => m.modified) + return rxCollection.bulkUpsert(newItems as any).then(result => { + if (result.error.length > 0) { + throw rxStorageWriteErrorToRxError(ensureNotFalsy(result.error[0])) + } + return result.success + }) + }, + onUpdate: async (params) => { + debug("update", params) + const mutations = params.transaction.mutations.filter(m => m.type === 'update') + + for (const mutation of mutations) { + const newValue = stripRxdbFields(mutation.modified) + const id = (newValue as any)[primaryPath] + const doc = await rxCollection.findOne(id).exec() + if (!doc) { + continue + } + await doc.incrementalPatch(newValue as any) + } + }, + onDelete: async (params) => { + debug("delete", params) + const mutations = params.transaction.mutations.filter(m => m.type === 'delete') + const ids = mutations.map(mutation => (mutation.original as any)[primaryPath]) + return rxCollection.bulkRemove(ids).then(result => { + if (result.error.length > 0) { + throw result.error + } + return result.success + }) + } + } + return collectionConfig; +} diff --git a/packages/rxdb-db-collection/tests/rxdb.test.ts b/packages/rxdb-db-collection/tests/rxdb.test.ts new file mode 100644 index 00000000..40ee231f --- /dev/null +++ b/packages/rxdb-db-collection/tests/rxdb.test.ts @@ -0,0 +1,290 @@ +import { describe, expect, it, vi } from "vitest" +import { + createCollection, +} from "@tanstack/db" +import { + OPEN_RXDB_SUBSCRIPTIONS, + RxDBCollectionConfig, + rxdbCollectionOptions +} from "../src/rxdb" +import type { + Collection, + UtilsRecord, +} from "@tanstack/db" +import type { StandardSchemaV1 } from "@standard-schema/spec" +import { + RxCollection, + addRxPlugin, + createRxDatabase, + getFromMapOrCreate +} from 'rxdb/plugins/core' +import { RxDBDevModePlugin } from 'rxdb/plugins/dev-mode' +import { getRxStorageMemory } from 'rxdb/plugins/storage-memory' +import { wrappedValidateAjvStorage } from 'rxdb/plugins/validate-ajv' + +// Mock the ShapeStream module +const mockSubscribe = vi.fn() +const mockStream = { + subscribe: mockSubscribe, +} + +type TestDocType = { + id: string + name: string +} +type RxCollections = { test: RxCollection }; + +// Helper to advance timers and allow microtasks to flush +const flushPromises = () => new Promise((resolve) => setTimeout(resolve, 0)) + +describe(`RxDB Integration`, () => { + addRxPlugin(RxDBDevModePlugin) + let collection: Collection< + any, + string | number, + UtilsRecord, + StandardSchemaV1, + any + >; + + let dbNameId = 0; + function getTestData(amount: number): Array { + return new Array(amount).fill(0).map((_v, i) => { + return { + id: (i + 1) + '', + name: 'Item ' + (i + 1) + } + }) + } + async function createTestState( + initialDocs: TestDocType[] = [], + config: Partial> = {} + ) { + const db = await createRxDatabase({ + name: 'my-rxdb-' + (dbNameId++), + storage: wrappedValidateAjvStorage({ + storage: getRxStorageMemory() + }) + }); + const collections = await db.addCollections({ + test: { + schema: { + version: 0, + type: 'object', + primaryKey: 'id', + properties: { + id: { + type: 'string', + maxLength: 100 + }, + name: { + type: 'string', + maxLength: 9 + } + } + } + } + }); + const rxCollection: RxCollection = collections.test; + if (initialDocs.length > 0) { + const insertResult = await rxCollection.bulkInsert(initialDocs) + expect(insertResult.error.length).toBe(0) + } + + const options = rxdbCollectionOptions({ + rxCollection: rxCollection, + startSync: true, + /** + * In tests we use a small batch size + * to ensure iteration works. + */ + syncBatchSize: 10, + ...config + }) + + collection = createCollection(options) + await collection.stateWhenReady() + + return { + collection, + rxCollection, + db + } + } + + describe('sync', () => { + it(`should initialize and fetch initial data`, async () => { + const initialItems = getTestData(2) + + const { collection, db } = await createTestState(initialItems); + + // Verify the collection state contains our items + expect(collection.size).toBe(initialItems.length) + expect(collection.get(`1`)).toEqual(initialItems[0]) + expect(collection.get(`2`)).toEqual(initialItems[1]) + + // Verify the synced data + expect(collection.syncedData.size).toBe(initialItems.length) + expect(collection.syncedData.get(`1`)).toEqual(initialItems[0]) + expect(collection.syncedData.get(`2`)).toEqual(initialItems[1]) + + await db.remove() + }) + + it('should initialize and fetch initial data with many documents', async () => { + const docsAmount = 25; // > 10 to force multiple batches + const initialItems = getTestData(docsAmount); + const { collection, db } = await createTestState(initialItems); + + // All docs should be present after initial sync + expect(collection.size).toBe(docsAmount); + expect(collection.syncedData.size).toBe(docsAmount); + + // Spot-check a few positions + expect(collection.get('1')).toEqual({ id: '1', name: 'Item 1' }); + expect(collection.get('10')).toEqual({ id: '10', name: 'Item 10' }); + expect(collection.get('11')).toEqual({ id: '11', name: 'Item 11' }); + expect(collection.get('25')).toEqual({ id: '25', name: 'Item 25' }); + + // Ensure no gaps + for (let i = 1; i <= docsAmount; i++) { + expect(collection.has(String(i))).toBe(true); + } + + await db.remove() + }) + + it(`should update the collection when RxDB changes data`, async () => { + const initialItems = getTestData(2) + + const { collection, rxCollection, db } = await createTestState(initialItems); + + + // inserts + const doc = await rxCollection.insert({ id: '3', name: 'inserted' }) + expect(collection.get(`3`).name).toEqual('inserted') + + // updates + await doc.getLatest().patch({ name: 'updated' }) + expect(collection.get(`3`).name).toEqual('updated') + + + // deletes + await doc.getLatest().remove() + expect(collection.get(`3`)).toEqual(undefined) + + await db.remove() + }) + + it(`should update RxDB when the collection changes data`, async () => { + const initialItems = getTestData(2) + + const { collection, rxCollection, db } = await createTestState(initialItems); + + + // inserts + const tx = collection.insert({ id: `3`, name: `inserted` }) + await tx.isPersisted.promise + let doc = await rxCollection.findOne('3').exec(true) + expect(doc.name).toEqual('inserted') + + // updates + collection.update( + '3', + d => { + d.name = 'updated' + } + ) + expect(collection.get(`3`).name).toEqual('updated') + await collection.stateWhenReady() + await rxCollection.database.requestIdlePromise() + doc = await rxCollection.findOne('3').exec(true) + expect(doc.name).toEqual('updated') + + + // deletes + collection.delete('3') + await rxCollection.database.requestIdlePromise() + const mustNotBeFound = await rxCollection.findOne('3').exec() + expect(mustNotBeFound).toEqual(null) + + await db.remove() + }) + }); + + describe(`lifecycle management`, () => { + it(`should call unsubscribe when collection is cleaned up`, async () => { + const { collection, rxCollection, db } = await createTestState(); + + await collection.cleanup() + + const subs = getFromMapOrCreate( + OPEN_RXDB_SUBSCRIPTIONS, + rxCollection, + () => new Set() + ) + expect(subs.size).toEqual(0) + + + await db.remove() + }) + + it(`should restart sync when collection is accessed after cleanup`, async () => { + const initialItems = getTestData(2) + const { collection, rxCollection, db } = await createTestState(initialItems); + + await collection.cleanup() + await flushPromises() + expect(collection.status).toBe(`cleaned-up`) + + // insert into RxDB while cleaned-up + await rxCollection.insert({ id: '3', name: 'Item 3' }) + + // Access collection data to restart sync + const unsubscribe = collection.subscribeChanges(() => { }) + + await collection.toArrayWhenReady() + expect(collection.get(`3`).name).toEqual('Item 3') + + + unsubscribe() + await db.remove() + }) + }) + + describe('error handling', () => { + it('should rollback the transaction on invalid data that does not match the RxCollection schema', async () => { + const initialItems = getTestData(2) + const { collection, db } = await createTestState(initialItems); + + // INSERT + await expect(async () => { + const tx = await collection.insert({ + id: '3', + name: 'invalid', + foo: 'bar' + }) + await tx.isPersisted.promise + }).rejects.toThrow(/schema validation error/) + expect(collection.has('3')).toBe(false) + + // UPDATE + await expect(async () => { + const tx = await collection.update( + '2', + d => { + d.name = 'invalid' + d.foo = 'bar' + } + ) + await tx.isPersisted.promise + }).rejects.toThrow(/schema validation error/) + expect(collection.get('2').name).toBe('Item 2') + + + await db.remove() + }) + }) + + +}); diff --git a/packages/rxdb-db-collection/tsconfig.docs.json b/packages/rxdb-db-collection/tsconfig.docs.json new file mode 100644 index 00000000..5a73feb0 --- /dev/null +++ b/packages/rxdb-db-collection/tsconfig.docs.json @@ -0,0 +1,9 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "paths": { + "@tanstack/db": ["../db/src"] + } + }, + "include": ["src"] +} diff --git a/packages/rxdb-db-collection/tsconfig.json b/packages/rxdb-db-collection/tsconfig.json new file mode 100644 index 00000000..7e586bab --- /dev/null +++ b/packages/rxdb-db-collection/tsconfig.json @@ -0,0 +1,20 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "target": "ES2020", + "module": "ESNext", + "moduleResolution": "Bundler", + "declaration": true, + "outDir": "dist", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "jsx": "react", + "paths": { + "@tanstack/store": ["../store/src"] + } + }, + "include": ["src", "tests", "vite.config.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/rxdb-db-collection/vite.config.ts b/packages/rxdb-db-collection/vite.config.ts new file mode 100644 index 00000000..c7968f28 --- /dev/null +++ b/packages/rxdb-db-collection/vite.config.ts @@ -0,0 +1,21 @@ +import { defineConfig, mergeConfig } from "vitest/config" +import { tanstackViteConfig } from "@tanstack/config/vite" +import packageJson from "./package.json" + +const config = defineConfig({ + test: { + name: packageJson.name, + dir: `./tests`, + environment: `jsdom`, + coverage: { enabled: true, provider: `istanbul`, include: [`src/**/*`] }, + typecheck: { enabled: true }, + }, +}) + +export default mergeConfig( + config, + tanstackViteConfig({ + entry: `./src/index.ts`, + srcDir: `./src`, + }) +)