diff --git a/packages/server/src/DirectConnection.ts b/packages/server/src/DirectConnection.ts index a4cebef24..7edce6351 100644 --- a/packages/server/src/DirectConnection.ts +++ b/packages/server/src/DirectConnection.ts @@ -32,7 +32,7 @@ export class DirectConnection implements DirectConnectionInterface { transaction(this.document) - await this.instance.storeDocumentHooks(this.document, { + await this.instance.onStoreDocument(this.document, { clientsCount: this.document.getConnectionsCount(), context: this.context, document: this.document, @@ -50,7 +50,7 @@ export class DirectConnection implements DirectConnectionInterface { this.document?.removeDirectConnection() - await this.instance.storeDocumentHooks(this.document, { + await this.instance.onStoreDocument(this.document, { clientsCount: this.document.getConnectionsCount(), context: this.context, document: this.document, diff --git a/packages/server/src/Hocuspocus.ts b/packages/server/src/Hocuspocus.ts index 2ede61e10..f4e931f59 100644 --- a/packages/server/src/Hocuspocus.ts +++ b/packages/server/src/Hocuspocus.ts @@ -19,6 +19,7 @@ import { AwarenessUpdate, Configuration, ConnectionConfiguration, + Extension, HookName, HookPayloadByName, beforeBroadcastStatelessPayload, @@ -43,6 +44,8 @@ export const defaultConfiguration = { gcFilter: () => true, }, unloadImmediately: true, + storageQueue: 'default', + storageQueues: { default: {} }, stopOnSignals: true, } @@ -92,6 +95,11 @@ export class Hocuspocus { this.configuration = { ...this.configuration, ...configuration, + storageQueues: { + default: {}, + ...this.configuration.storageQueues, + ...configuration.storageQueues, + }, } this.configuration.extensions.sort((a, b) => { @@ -129,6 +137,14 @@ export class Hocuspocus { afterUnloadDocument: this.configuration.afterUnloadDocument, onDisconnect: this.configuration.onDisconnect, onDestroy: this.configuration.onDestroy, + storageQueue: this.configuration.storageQueue, + }) + + // create storage queues that are referenced by extensions but not pre-defined at the top level + this.configuration.extensions.forEach(extension => { + if (extension.storageQueue && !this.configuration.storageQueues[extension.storageQueue]) { + this.configuration.storageQueues[extension.storageQueue] = {} + } }) this.hooks('onConfigure', { @@ -354,14 +370,7 @@ export class Hocuspocus { // Only run this if the document has finished loading earlier (i.e. not to persist the empty // ydoc if the onLoadDocument hook returned an error) if (!document.isLoading) { - this.debounce( - `onStoreDocument-${document.name}`, - () => { - this.storeDocumentHooks(document, hookPayload) - }, - this.configuration.unloadImmediately ? 0 : this.configuration.debounce, - this.configuration.maxDebounce, - ) + this.onStoreDocument(document, hookPayload, this.configuration.unloadImmediately) } else { // Remove document from memory immediately this.unloadDocument(document) @@ -402,14 +411,7 @@ export class Hocuspocus { return } - this.debounce( - `onStoreDocument-${document.name}`, - () => { - this.storeDocumentHooks(document, hookPayload) - }, - this.configuration.debounce, - this.configuration.maxDebounce, - ) + this.onStoreDocument(document, hookPayload) } /** @@ -485,10 +487,27 @@ export class Hocuspocus { return document } - storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload) { - return this.hooks('onStoreDocument', hookPayload) + onStoreDocument(document: Document, hookPayload: onStoreDocumentPayload, unloadImmediately = false) { + const promises = Object.entries(this.configuration.storageQueues).map(([queue, { debounce = this.configuration.debounce, maxDebounce = this.configuration.maxDebounce }]) => { + return this.debounce( + `onStoreDocument-${queue}-${document.name}`, + () => this.storeDocumentHooks(document, hookPayload, queue), + unloadImmediately ? 0 : debounce, + maxDebounce, + ) + }) + return Promise.all(promises) + } + + storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload, queue = 'default') { + const filter = (extension: Extension) => { + return ( + extension.storageQueue === queue || (extension.storageQueue === undefined && queue === 'default') + ) + } + return this.hooks('onStoreDocument', hookPayload, null, filter) .then(() => { - this.hooks('afterStoreDocument', hookPayload).then(() => { + this.hooks('afterStoreDocument', hookPayload, null, filter).then(() => { // Remove document from memory. if (document.getConnectionsCount() > 0) { @@ -511,7 +530,7 @@ export class Hocuspocus { * Run the given hook on all configured extensions. * Runs the given callback after each hook. */ - hooks(name: T, payload: HookPayloadByName[T], callback: Function | null = null): Promise { + hooks(name: T, payload: HookPayloadByName[T], callback: Function | null = null, filter?: (extension: Extension) => boolean): Promise { const { extensions } = this.configuration // create a new `thenable` chain @@ -520,7 +539,7 @@ export class Hocuspocus { extensions // get me all extensions which have the given hook - .filter(extension => typeof extension[name] === 'function') + .filter(extension => typeof extension[name] === 'function' && (filter?.(extension) ?? true)) // run through all the configured hooks .forEach(extension => { chain = chain diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index cd2b663a8..5e26ac2e5 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -1,6 +1,4 @@ -import { - IncomingHttpHeaders, IncomingMessage, ServerResponse, -} from 'http' +import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from 'http' import { URLSearchParams } from 'url' import { Awareness } from 'y-protocols/awareness' import Connection from './Connection.js' @@ -27,55 +25,56 @@ export interface AwarenessUpdate { } export interface ConnectionConfiguration { - readOnly: boolean - requiresAuthentication: boolean - isAuthenticated: boolean + readOnly: boolean, + requiresAuthentication: boolean, + isAuthenticated: boolean, } export interface Extension { - priority?: number; - extensionName?: string; - onConfigure?(data: onConfigurePayload): Promise; - onListen?(data: onListenPayload): Promise; - onUpgrade?(data: onUpgradePayload): Promise; - onConnect?(data: onConnectPayload): Promise; - connected?(data: connectedPayload): Promise; - onAuthenticate?(data: onAuthenticatePayload): Promise; - onLoadDocument?(data: onLoadDocumentPayload): Promise; - afterLoadDocument?(data: afterLoadDocumentPayload): Promise; - beforeHandleMessage?(data: beforeHandleMessagePayload): Promise; - beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise; - onStateless?(payload: onStatelessPayload): Promise; - onChange?(data: onChangePayload): Promise; - onStoreDocument?(data: onStoreDocumentPayload): Promise; - afterStoreDocument?(data: afterStoreDocumentPayload): Promise; - onAwarenessUpdate?(data: onAwarenessUpdatePayload): Promise; - onRequest?(data: onRequestPayload): Promise; - onDisconnect?(data: onDisconnectPayload): Promise; - afterUnloadDocument?(data: afterUnloadDocumentPayload): Promise; - onDestroy?(data: onDestroyPayload): Promise; + priority?: number, + extensionName?: string, + storageQueue?: string, + onConfigure?(data: onConfigurePayload): Promise, + onListen?(data: onListenPayload): Promise, + onUpgrade?(data: onUpgradePayload): Promise, + onConnect?(data: onConnectPayload): Promise, + connected?(data: connectedPayload): Promise, + onAuthenticate?(data: onAuthenticatePayload): Promise, + onLoadDocument?(data: onLoadDocumentPayload): Promise, + afterLoadDocument?(data: afterLoadDocumentPayload): Promise, + beforeHandleMessage?(data: beforeHandleMessagePayload): Promise, + beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise, + onStateless?(payload: onStatelessPayload): Promise, + onChange?(data: onChangePayload): Promise, + onStoreDocument?(data: onStoreDocumentPayload): Promise, + afterStoreDocument?(data: afterStoreDocumentPayload): Promise, + onAwarenessUpdate?(data: onAwarenessUpdatePayload): Promise, + onRequest?(data: onRequestPayload): Promise, + onDisconnect?(data: onDisconnectPayload): Promise, + afterUnloadDocument?(data: afterUnloadDocumentPayload): Promise, + onDestroy?(data: onDestroyPayload): Promise, } export type HookName = - 'onConfigure' | - 'onListen' | - 'onUpgrade' | - 'onConnect' | - 'connected' | - 'onAuthenticate' | - 'onLoadDocument' | - 'afterLoadDocument' | - 'beforeHandleMessage' | - 'beforeBroadcastStateless' | - 'onStateless' | - 'onChange' | - 'onStoreDocument' | - 'afterStoreDocument' | - 'onAwarenessUpdate' | - 'onRequest' | - 'onDisconnect' | - 'afterUnloadDocument' | - 'onDestroy' + | 'onConfigure' + | 'onListen' + | 'onUpgrade' + | 'onConnect' + | 'connected' + | 'onAuthenticate' + | 'onLoadDocument' + | 'afterLoadDocument' + | 'beforeHandleMessage' + | 'beforeBroadcastStateless' + | 'onStateless' + | 'onChange' + | 'onStoreDocument' + | 'afterStoreDocument' + | 'onAwarenessUpdate' + | 'onRequest' + | 'onDisconnect' + | 'afterUnloadDocument' + | 'onDestroy' export type HookPayloadByName = { onConfigure: onConfigurePayload, @@ -98,6 +97,13 @@ export type HookPayloadByName = { afterUnloadDocument: afterUnloadDocumentPayload, onDestroy: onDestroyPayload, } + +export type StorageQueueConfigs = { + [key: string]: { + debounce?: number, + maxDebounce?: number, + }, +} export interface Configuration extends Extension { /** * A name for the instance, used for logging. @@ -127,7 +133,7 @@ export interface Configuration extends Extension { /** * Makes sure to call `onStoreDocument` at least in the given amount of time (ms). */ - maxDebounce: number + maxDebounce: number, /** * By default, the servers show a start screen. If passed false, the server will start quietly. */ @@ -153,9 +159,14 @@ export interface Configuration extends Extension { */ yDocOptions: { gc: boolean, // enable or disable garbage collection (see https://github.com/yjs/yjs/blob/main/INTERNALS.md#deletions) - gcFilter: () => boolean, // will be called before garbage collecting ; return false to keep it + gcFilter: () => boolean, // will be called before garbage collecting , return false to keep it }, + /** + * Define specific debounce settings for each storage queue, allowing multiple extensions to store + * documents in different locations in parallel at different rates. + */ + storageQueues: StorageQueueConfigs, } export interface onStatelessPayload { @@ -188,7 +199,7 @@ export interface onConnectPayload { requestHeaders: IncomingHttpHeaders, requestParameters: URLSearchParams, socketId: string, - connection: ConnectionConfiguration + connection: ConnectionConfiguration, } // @todo Change 'connection' to 'connectionConfig', and 'connectionInstance' to 'connection' in next major release @@ -202,7 +213,7 @@ export interface connectedPayload { requestParameters: URLSearchParams, socketId: string, connection: ConnectionConfiguration, - connectionInstance: Connection + connectionInstance: Connection, } // @todo Change 'connection' to 'connectionConfig' in next major release @@ -215,7 +226,7 @@ export interface onLoadDocumentPayload { requestHeaders: IncomingHttpHeaders, requestParameters: URLSearchParams, socketId: string, - connection: ConnectionConfiguration + connection: ConnectionConfiguration, } // @todo Change 'connection' to 'connectionConfig' in next major release @@ -228,7 +239,7 @@ export interface afterLoadDocumentPayload { requestHeaders: IncomingHttpHeaders, requestParameters: URLSearchParams, socketId: string, - connection: ConnectionConfiguration + connection: ConnectionConfiguration, } export interface onChangePayload { @@ -254,7 +265,7 @@ export interface beforeHandleMessagePayload { requestParameters: URLSearchParams, update: Uint8Array, socketId: string, - connection: Connection + connection: Connection, } export interface beforeBroadcastStatelessPayload { @@ -304,7 +315,7 @@ export interface fetchPayload { requestHeaders: IncomingHttpHeaders, requestParameters: URLSearchParams, socketId: string, - connection: ConnectionConfiguration + connection: ConnectionConfiguration, } export interface storePayload extends onStoreDocumentPayload { @@ -352,11 +363,11 @@ export interface onConfigurePayload { } export interface afterUnloadDocumentPayload { - instance: Hocuspocus; - documentName: string; + instance: Hocuspocus, + documentName: string, } export interface DirectConnection { transact(transaction: (document: Document) => void): Promise, - disconnect(): void + disconnect(): void, } diff --git a/packages/server/src/util/debounce.ts b/packages/server/src/util/debounce.ts index 0c467e7ac..53bccf667 100644 --- a/packages/server/src/util/debounce.ts +++ b/packages/server/src/util/debounce.ts @@ -1,21 +1,36 @@ export const useDebounce = () => { const timers: Map, + resolve: (value: unknown) => void, start: number }> = new Map() + /** + * All calls to the function within a given debounce window will recieve the same promise that + * resolves when the debounced function has resolved. + */ const debounce = ( id: string, func: Function, debounce: number, maxDebounce: number, ) => { + // default function to satisfy typescript + let newResolve: (value: unknown) => void = () => {} + const newPromise = new Promise(resolve => { + newResolve = resolve + }) const old = timers.get(id) const start = old?.start || Date.now() + const promise = old?.promise || newPromise + const resolve = old?.resolve || newResolve - const run = () => { + const run = async () => { timers.delete(id) - return func() + const result = await func() + resolve(result) + return result } if (old?.timeout) { @@ -33,7 +48,10 @@ export const useDebounce = () => { timers.set(id, { start, timeout: setTimeout(run, debounce), + promise, + resolve, }) + return promise } return debounce diff --git a/tests/server/afterStoreDocument.ts b/tests/server/afterStoreDocument.ts index 57068f48e..49f505ae7 100644 --- a/tests/server/afterStoreDocument.ts +++ b/tests/server/afterStoreDocument.ts @@ -1,5 +1,6 @@ import test from 'ava' -import { newHocuspocus, newHocuspocusProvider } from '../utils/index.js' +import { assertThrottledCallback, createPromiseWithResolve, createStorageQueueExtension } from 'tests/utils/storeDocument.js' +import { newHocuspocus, newHocuspocusProvider, newHocuspocusProviderWebsocket } from '../utils/index.js' test('calls the afterStoreDocument hook', async t => { await new Promise(async resolve => { @@ -43,3 +44,69 @@ test('executes afterStoreDocument callback from a custom extension', async t => }) }) }) + +test('executes afterStoreDocument individually for each storageQueue', async t => { + let startTime = 0 + const [a1Promise, a1Resolve] = createPromiseWithResolve() + const [a2Promise, a2Resolve] = createPromiseWithResolve() + const [b1Promise, b1Resolve] = createPromiseWithResolve() + const [b2Promise, b2Resolve] = createPromiseWithResolve() + const [default1Promise, default1Resolve] = createPromiseWithResolve() + const [default2Promise, default2Resolve] = createPromiseWithResolve() + + function assertAfterStoreDocumentThrottled(minTime: number, maxTime: number, resolve: () => void, extensionName = 'default') { + assertThrottledCallback(t, startTime, minTime, maxTime, resolve, 'afterStoreDocument', extensionName) + } + + function createAfterStoreDocumentExtension(extensionName: string, storageQueue: string, debounceMin: number, debounceMax: number, resolve: () => void) { + return createStorageQueueExtension( + extensionName, + storageQueue, + { + async afterStoreDocument() { + assertAfterStoreDocumentThrottled(debounceMin, debounceMax, resolve, extensionName) + }, + }, + ) + } + + const extensions = [ + createAfterStoreDocumentExtension('a1', 'a', 0, 500, a1Resolve), + createAfterStoreDocumentExtension('a2', 'a', 0, 500, a2Resolve), + createAfterStoreDocumentExtension('b1', 'b', 500, 1000, b1Resolve), + createAfterStoreDocumentExtension('b2', 'b', 500, 1000, b2Resolve), + createAfterStoreDocumentExtension('default1', 'default1', 1000, 1500, default1Resolve), + ] + + const server = await newHocuspocus({ + unloadImmediately: false, + debounce: 1000, + maxDebounce: 1500, + async afterStoreDocument() { + assertAfterStoreDocumentThrottled(1000, 1500, default2Resolve, 'default2') + }, + extensions, + storageQueues: { + a: { + debounce: 0, + maxDebounce: 500, + }, + b: { + debounce: 500, + maxDebounce: 100, + }, + }, + }) + + const socket = newHocuspocusProviderWebsocket(server) + + newHocuspocusProvider(server, { + websocketProvider: socket, + onSynced() { + startTime = Date.now() + socket.destroy() + }, + }) + + await Promise.all([a1Promise, a2Promise, b1Promise, b2Promise, default1Promise, default2Promise]) +}) diff --git a/tests/server/onStoreDocument.ts b/tests/server/onStoreDocument.ts index 32634e448..b634eb4a5 100644 --- a/tests/server/onStoreDocument.ts +++ b/tests/server/onStoreDocument.ts @@ -1,5 +1,6 @@ import test from 'ava' import { onStoreDocumentPayload } from '@hocuspocus/server' +import { assertThrottledCallback, createPromiseWithResolve, createStorageQueueExtension } from 'tests/utils/storeDocument.js' import { newHocuspocus, newHocuspocusProvider, newHocuspocusProviderWebsocket, sleep, } from '../utils/index.js' @@ -414,3 +415,69 @@ test('waits before calling onStoreDocument after the last user disconnects when }) }) }) + +test('storageQueues throttle individually of each other', async t => { + let startTime = 0 + const [a1Promise, a1Resolve] = createPromiseWithResolve() + const [a2Promise, a2Resolve] = createPromiseWithResolve() + const [b1Promise, b1Resolve] = createPromiseWithResolve() + const [b2Promise, b2Resolve] = createPromiseWithResolve() + const [default1Promise, default1Resolve] = createPromiseWithResolve() + const [default2Promise, default2Resolve] = createPromiseWithResolve() + + function assertOnStoreDocumentThrottled(minTime: number, maxTime: number, resolve: () => void, extensionName = 'default') { + assertThrottledCallback(t, startTime, minTime, maxTime, resolve, 'onStoreDocument', extensionName) + } + + function createOnStoreDocumentExtension(extensionName: string, storageQueue: string, debounceMin: number, debounceMax: number, resolve: () => void) { + return createStorageQueueExtension( + extensionName, + storageQueue, + { + async onStoreDocument() { + assertOnStoreDocumentThrottled(debounceMin, debounceMax, resolve, extensionName) + }, + }, + ) + } + + const extensions = [ + createOnStoreDocumentExtension('a1', 'a', 0, 500, a1Resolve), + createOnStoreDocumentExtension('a2', 'a', 0, 500, a2Resolve), + createOnStoreDocumentExtension('b1', 'b', 500, 1000, b1Resolve), + createOnStoreDocumentExtension('b2', 'b', 500, 1000, b2Resolve), + createOnStoreDocumentExtension('default1', 'default1', 1000, 1500, default1Resolve), + ] + + const server = await newHocuspocus({ + unloadImmediately: false, + debounce: 1000, + maxDebounce: 1500, + async onStoreDocument() { + assertOnStoreDocumentThrottled(1000, 1500, default2Resolve, 'default2') + }, + extensions, + storageQueues: { + a: { + debounce: 0, + maxDebounce: 500, + }, + b: { + debounce: 500, + maxDebounce: 100, + }, + }, + }) + + const socket = newHocuspocusProviderWebsocket(server) + + newHocuspocusProvider(server, { + websocketProvider: socket, + onSynced() { + startTime = Date.now() + socket.destroy() + }, + }) + + await Promise.all([a1Promise, a2Promise, b1Promise, b2Promise, default1Promise, default2Promise]) +}) diff --git a/tests/utils/storeDocument.ts b/tests/utils/storeDocument.ts new file mode 100644 index 000000000..74b9a367a --- /dev/null +++ b/tests/utils/storeDocument.ts @@ -0,0 +1,50 @@ +import { Extension } from '@hocuspocus/server' +import { ExecutionContext } from 'ava' + +export function createPromiseWithResolve(): [Promise, () => void] { + let resolve: () => void = () => {} + const promise = new Promise(r => { + resolve = r + }) + return [promise, resolve] +} + +export function assertThrottledCallback( + t: ExecutionContext, + startTime: number, + minTime: number, + maxTime: number, + resolve: () => void, + callbackName: string, + extensionName = 'default', +) { + const endTime = Date.now() + const totalTime = endTime - startTime + if (startTime === 0) { + t.fail('startTime not set') + } else if (totalTime < minTime) { + t.fail( + `did not wait ${minTime}ms to call ${callbackName} (${totalTime}ms) (extension ${extensionName})`, + ) + } else if (totalTime > maxTime) { + t.fail( + `waited longer than ${maxTime}ms to call ${callbackName} (${totalTime}ms) (extension ${extensionName})`, + ) + } else { + t.pass(extensionName) + } + resolve() +} + +export function createStorageQueueExtension( + extensionName: string, + storageQueue: string, + extension: Partial = {}, +) { + return { + extensionName, + storageQueue, + async onStoreDocument() {}, + ...extension, + } +}