Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/grumpy-frogs-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/trailbase-db-collection': patch
---

add syncMode with tests
117 changes: 95 additions & 22 deletions packages/trailbase-db-collection/src/trailbase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ export function trailBaseCollectionOptions<
TKey extends string | number = string | number,
>(
config: TrailBaseCollectionConfig<TItem, TRecord, TKey>,
): CollectionConfig<TItem, TKey> & { utils: TrailBaseCollectionUtils } {
): CollectionConfig<TItem, TKey> & {
utils: TrailBaseCollectionUtils
} {
const getKey = config.getKey

const parse = (record: TRecord) =>
Expand All @@ -123,8 +125,13 @@ export function trailBaseCollectionOptions<
const serialIns = (item: TItem) =>
convert<TItem, TRecord>(config.serialize, item)

const abortController = new AbortController()

const seenIds = new Store(new Map<string, number>())

const internalSyncMode = (config as any).syncMode ?? `eager`
let fullSyncCompleted = false

const awaitIds = (
ids: Array<string>,
timeout: number = 120 * 1000,
Expand All @@ -136,30 +143,26 @@ export function trailBaseCollectionOptions<
}

return new Promise<void>((resolve, reject) => {
const timeoutId = setTimeout(() => {
unsubscribe()
reject(new TimeoutWaitingForIdsError(ids.toString()))
}, timeout)
const onAbort = () => {
clearTimeout(timeoutId)
reject(new TimeoutWaitingForIdsError(`Aborted while waiting for ids`))
}

abortController.signal.addEventListener(`abort`, onAbort)

const timeoutId = setTimeout(() => reject(new TimeoutWaitingForIdsError(ids.toString())), timeout)

const unsubscribe = seenIds.subscribe((value) => {
if (completed(value.currentVal)) {
clearTimeout(timeoutId)
abortController.signal.removeEventListener(`abort`, onAbort)
unsubscribe()
resolve()
}
})
})
}

let eventReader: ReadableStreamDefaultReader<Event> | undefined
const cancelEventReader = () => {
if (eventReader) {
eventReader.cancel()
eventReader.releaseLock()
eventReader = undefined
}
}

type SyncParams = Parameters<SyncConfig<TItem, TKey>[`sync`]>[0]
const sync = {
sync: (params: SyncParams) => {
Expand Down Expand Up @@ -211,8 +214,13 @@ export function trailBaseCollectionOptions<
const { done, value: event } = await reader.read()

if (done || !event) {
reader.releaseLock()
eventReader = undefined
try {
if ((reader as any).locked) {
reader.releaseLock()
}
} catch {
// ignore if already released
}
return
}

Expand Down Expand Up @@ -244,21 +252,41 @@ export function trailBaseCollectionOptions<

async function start() {
const eventStream = await config.recordApi.subscribe(`*`)
const reader = (eventReader = eventStream.getReader())
const reader = eventStream.getReader()

// Start listening for subscriptions first. Otherwise, we'd risk a gap
// between the initial fetch and starting to listen.
listen(reader)

try {
await initialFetch()
// Eager mode: perform initial fetch to populate everything
if (internalSyncMode === `eager`) {
await initialFetch()
fullSyncCompleted = true
}
} catch (e) {
cancelEventReader()
abortController.abort()
throw e
} finally {
// Mark ready both if everything went well or if there's an error to
// avoid blocking apps waiting for `.preload()` to finish.
// In on-demand/progressive mode we mark ready immediately after listener starts
// to allow queries to drive snapshots via `loadSubset`.
markReady()
// If progressive, start the background full sync after we've marked ready
if (internalSyncMode === `progressive`) {
// Defer background sync to avoid racing with preload assertions
setTimeout(() => {
void (async () => {
try {
await initialFetch()
fullSyncCompleted = true
} catch (e) {
console.error(`TrailBase progressive full sync failed`, e)
}
})()
}, 0)
}
}

// Lastly, start a periodic cleanup task that will be removed when the
Expand All @@ -281,13 +309,58 @@ export function trailBaseCollectionOptions<
})
}, 120 * 1000)

reader.closed.finally(() => clearInterval(periodicCleanupTask))
const onAbort = () => {
clearInterval(periodicCleanupTask)
// It's safe to call cancel and releaseLock even if the stream is already closed.
reader.cancel().catch(() => { /* ignore */ })
try {
reader.releaseLock()
} catch {
/* ignore */
}
}

abortController.signal.addEventListener(`abort`, onAbort)
reader.closed.finally(() => {
abortController.signal.removeEventListener(`abort`, onAbort)
clearInterval(periodicCleanupTask)
})
}

start()

// Eager mode doesn't need subset loading
if (internalSyncMode === `eager`) {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at electric, it seems to do the opposite, i.e. fetch erverything in eager mode:

await stream.requestSnapshot(snapshotParams)

}

const loadSubset = async (opts: { limit?: number } = {}) => {
const limit = opts.limit ?? 256
const response = await config.recordApi.list({ pagination: { limit } })
const records = (response?.records ?? [])

if (records.length > 0) {
begin()
for (const item of records) {
write({ type: `insert`, value: parse(item) })
}
commit()
}
}

return {
loadSubset,
getSyncMetadata: () => ({
syncMode: internalSyncMode,
fullSyncComplete: fullSyncCompleted,
} as const),
}
},
// Expose the getSyncMetadata function
getSyncMetadata: undefined,
getSyncMetadata: () => ({
syncMode: internalSyncMode,
fullSyncComplete: fullSyncCompleted,
} as const),
}

return {
Expand Down Expand Up @@ -352,7 +425,7 @@ export function trailBaseCollectionOptions<
await awaitIds(ids)
},
utils: {
cancel: cancelEventReader,
cancel: () => abortController.abort(),
},
}
}
Loading