Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add storage queues #749

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions packages/server/src/DirectConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class DirectConnection implements DirectConnectionInterface {

transaction(this.document)

await this.instance.storeDocumentHooks(this.document, {
await this.instance.onStoreDocument(this.document, {
Copy link
Collaborator Author

@georeith georeith Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that DirectConnection saves will be debounced too, we can disable this by passing true as the final arg but I feel like this should be preferable if your debounce is for rate-limit reasons?

Is there a reason it wasn't part of the throttle previously?

clientsCount: this.document.getConnectionsCount(),
context: this.context,
document: this.document,
Expand All @@ -50,7 +50,7 @@ export class DirectConnection implements DirectConnectionInterface {

this.document?.removeDirectConnection()

await this.instance.storeDocumentHooks(this.document, {
await this.instance.onStoreDocument(this.document, {
clientsCount: this.document.getConnectionsCount(),
context: this.context,
document: this.document,
Expand Down
64 changes: 43 additions & 21 deletions packages/server/src/Hocuspocus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
AwarenessUpdate,
Configuration,
ConnectionConfiguration,
Extension,
HookName,
HookPayloadByName,
beforeBroadcastStatelessPayload,
Expand All @@ -43,6 +44,8 @@ export const defaultConfiguration = {
gcFilter: () => true,
},
unloadImmediately: true,
storageQueue: 'default',
storageQueues: { default: {} },
stopOnSignals: true,
}

Expand Down Expand Up @@ -92,6 +95,11 @@ export class Hocuspocus {
this.configuration = {
...this.configuration,
...configuration,
storageQueues: {
default: {},
...this.configuration.storageQueues,
...configuration.storageQueues,
},
}

this.configuration.extensions.sort((a, b) => {
Expand Down Expand Up @@ -129,6 +137,14 @@ export class Hocuspocus {
afterUnloadDocument: this.configuration.afterUnloadDocument,
onDisconnect: this.configuration.onDisconnect,
onDestroy: this.configuration.onDestroy,
storageQueue: this.configuration.storageQueue,
})

// create storage queues that are referenced by extensions but not pre-defined at the top level
this.configuration.extensions.forEach(extension => {
if (extension.storageQueue && !this.configuration.storageQueues[extension.storageQueue]) {
this.configuration.storageQueues[extension.storageQueue] = {}
}
})

this.hooks('onConfigure', {
Expand Down Expand Up @@ -354,14 +370,7 @@ export class Hocuspocus {
// Only run this if the document has finished loading earlier (i.e. not to persist the empty
// ydoc if the onLoadDocument hook returned an error)
if (!document.isLoading) {
this.debounce(
`onStoreDocument-${document.name}`,
() => {
this.storeDocumentHooks(document, hookPayload)
},
this.configuration.unloadImmediately ? 0 : this.configuration.debounce,
this.configuration.maxDebounce,
)
this.onStoreDocument(document, hookPayload, this.configuration.unloadImmediately)
} else {
// Remove document from memory immediately
this.unloadDocument(document)
Expand Down Expand Up @@ -402,14 +411,7 @@ export class Hocuspocus {
return
}

this.debounce(
`onStoreDocument-${document.name}`,
() => {
this.storeDocumentHooks(document, hookPayload)
},
this.configuration.debounce,
this.configuration.maxDebounce,
)
this.onStoreDocument(document, hookPayload)
}

/**
Expand Down Expand Up @@ -485,10 +487,30 @@ export class Hocuspocus {
return document
}

storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload) {
return this.hooks('onStoreDocument', hookPayload)
onStoreDocument(document: Document, hookPayload: onStoreDocumentPayload, unloadImmediately = false) {
const promises = Object.entries(this.configuration.storageQueues).map(([queue, { debounce = this.configuration.debounce, maxDebounce = this.configuration.maxDebounce }]) => {
return this.debounce(
`onStoreDocument-${queue}-${document.name}`,
() => {
this.storeDocumentHooks(document, hookPayload, queue)
},
unloadImmediately ? 0 : debounce,
maxDebounce,
)
})

return Promise.all(promises)
}

storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload, queue = 'default') {
const filter = (extension: Extension) => {
return (
extension.storageQueue === queue || (extension.storageQueue === undefined && queue === 'default')
)
}
return this.hooks('onStoreDocument', hookPayload, null, filter)
.then(() => {
this.hooks('afterStoreDocument', hookPayload).then(() => {
this.hooks('afterStoreDocument', hookPayload, null, filter).then(() => {
// Remove document from memory.

if (document.getConnectionsCount() > 0) {
Expand All @@ -511,7 +533,7 @@ export class Hocuspocus {
* Run the given hook on all configured extensions.
* Runs the given callback after each hook.
*/
hooks<T extends HookName>(name: T, payload: HookPayloadByName[T], callback: Function | null = null): Promise<any> {
hooks<T extends HookName>(name: T, payload: HookPayloadByName[T], callback: Function | null = null, filter?: (extension: Extension) => boolean): Promise<any> {
const { extensions } = this.configuration

// create a new `thenable` chain
Expand All @@ -520,7 +542,7 @@ export class Hocuspocus {

extensions
// get me all extensions which have the given hook
.filter(extension => typeof extension[name] === 'function')
.filter(extension => typeof extension[name] === 'function' && (filter?.(extension) ?? true))
// run through all the configured hooks
.forEach(extension => {
chain = chain
Expand Down
125 changes: 68 additions & 57 deletions packages/server/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
IncomingHttpHeaders, IncomingMessage, ServerResponse,
} from 'http'
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from 'http'
import { URLSearchParams } from 'url'
import { Awareness } from 'y-protocols/awareness'
import Connection from './Connection.js'
Expand All @@ -27,55 +25,56 @@ export interface AwarenessUpdate {
}

export interface ConnectionConfiguration {
readOnly: boolean
requiresAuthentication: boolean
isAuthenticated: boolean
readOnly: boolean,
requiresAuthentication: boolean,
isAuthenticated: boolean,
}

export interface Extension {
priority?: number;
extensionName?: string;
onConfigure?(data: onConfigurePayload): Promise<any>;
onListen?(data: onListenPayload): Promise<any>;
onUpgrade?(data: onUpgradePayload): Promise<any>;
onConnect?(data: onConnectPayload): Promise<any>;
connected?(data: connectedPayload): Promise<any>;
onAuthenticate?(data: onAuthenticatePayload): Promise<any>;
onLoadDocument?(data: onLoadDocumentPayload): Promise<any>;
afterLoadDocument?(data: afterLoadDocumentPayload): Promise<any>;
beforeHandleMessage?(data: beforeHandleMessagePayload): Promise<any>;
beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise<any>;
onStateless?(payload: onStatelessPayload): Promise<any>;
onChange?(data: onChangePayload): Promise<any>;
onStoreDocument?(data: onStoreDocumentPayload): Promise<any>;
afterStoreDocument?(data: afterStoreDocumentPayload): Promise<any>;
onAwarenessUpdate?(data: onAwarenessUpdatePayload): Promise<any>;
onRequest?(data: onRequestPayload): Promise<any>;
onDisconnect?(data: onDisconnectPayload): Promise<any>;
afterUnloadDocument?(data: afterUnloadDocumentPayload): Promise<any>;
onDestroy?(data: onDestroyPayload): Promise<any>;
priority?: number,
extensionName?: string,
storageQueue?: string,
onConfigure?(data: onConfigurePayload): Promise<any>,
onListen?(data: onListenPayload): Promise<any>,
onUpgrade?(data: onUpgradePayload): Promise<any>,
onConnect?(data: onConnectPayload): Promise<any>,
connected?(data: connectedPayload): Promise<any>,
onAuthenticate?(data: onAuthenticatePayload): Promise<any>,
onLoadDocument?(data: onLoadDocumentPayload): Promise<any>,
afterLoadDocument?(data: afterLoadDocumentPayload): Promise<any>,
beforeHandleMessage?(data: beforeHandleMessagePayload): Promise<any>,
beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise<any>,
onStateless?(payload: onStatelessPayload): Promise<any>,
onChange?(data: onChangePayload): Promise<any>,
onStoreDocument?(data: onStoreDocumentPayload): Promise<any>,
afterStoreDocument?(data: afterStoreDocumentPayload): Promise<any>,
onAwarenessUpdate?(data: onAwarenessUpdatePayload): Promise<any>,
onRequest?(data: onRequestPayload): Promise<any>,
onDisconnect?(data: onDisconnectPayload): Promise<any>,
afterUnloadDocument?(data: afterUnloadDocumentPayload): Promise<any>,
onDestroy?(data: onDestroyPayload): Promise<any>,
}

export type HookName =
'onConfigure' |
'onListen' |
'onUpgrade' |
'onConnect' |
'connected' |
'onAuthenticate' |
'onLoadDocument' |
'afterLoadDocument' |
'beforeHandleMessage' |
'beforeBroadcastStateless' |
'onStateless' |
'onChange' |
'onStoreDocument' |
'afterStoreDocument' |
'onAwarenessUpdate' |
'onRequest' |
'onDisconnect' |
'afterUnloadDocument' |
'onDestroy'
| 'onConfigure'
| 'onListen'
| 'onUpgrade'
| 'onConnect'
| 'connected'
| 'onAuthenticate'
| 'onLoadDocument'
| 'afterLoadDocument'
| 'beforeHandleMessage'
| 'beforeBroadcastStateless'
| 'onStateless'
| 'onChange'
| 'onStoreDocument'
| 'afterStoreDocument'
| 'onAwarenessUpdate'
| 'onRequest'
| 'onDisconnect'
| 'afterUnloadDocument'
| 'onDestroy'

export type HookPayloadByName = {
onConfigure: onConfigurePayload,
Expand All @@ -98,6 +97,13 @@ export type HookPayloadByName = {
afterUnloadDocument: afterUnloadDocumentPayload,
onDestroy: onDestroyPayload,
}

export type StorageQueueConfigs = {
[key: string]: {
debounce?: number,
maxDebounce?: number,
},
}
export interface Configuration extends Extension {
/**
* A name for the instance, used for logging.
Expand Down Expand Up @@ -127,7 +133,7 @@ export interface Configuration extends Extension {
/**
* Makes sure to call `onStoreDocument` at least in the given amount of time (ms).
*/
maxDebounce: number
maxDebounce: number,
/**
* By default, the servers show a start screen. If passed false, the server will start quietly.
*/
Expand All @@ -153,9 +159,14 @@ export interface Configuration extends Extension {
*/
yDocOptions: {
gc: boolean, // enable or disable garbage collection (see https://github.com/yjs/yjs/blob/main/INTERNALS.md#deletions)
gcFilter: () => boolean, // will be called before garbage collecting ; return false to keep it
gcFilter: () => boolean, // will be called before garbage collecting , return false to keep it
},

/**
* Define specific debounce settings for each storage queue, allowing multiple extensions to store
* documents in different locations in parallel at different rates.
*/
storageQueues: StorageQueueConfigs,
}

export interface onStatelessPayload {
Expand Down Expand Up @@ -188,7 +199,7 @@ export interface onConnectPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

// @todo Change 'connection' to 'connectionConfig', and 'connectionInstance' to 'connection' in next major release
Expand All @@ -202,7 +213,7 @@ export interface connectedPayload {
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration,
connectionInstance: Connection
connectionInstance: Connection,
}

// @todo Change 'connection' to 'connectionConfig' in next major release
Expand All @@ -215,7 +226,7 @@ export interface onLoadDocumentPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

// @todo Change 'connection' to 'connectionConfig' in next major release
Expand All @@ -228,7 +239,7 @@ export interface afterLoadDocumentPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

export interface onChangePayload {
Expand All @@ -254,7 +265,7 @@ export interface beforeHandleMessagePayload {
requestParameters: URLSearchParams,
update: Uint8Array,
socketId: string,
connection: Connection
connection: Connection,
}

export interface beforeBroadcastStatelessPayload {
Expand Down Expand Up @@ -304,7 +315,7 @@ export interface fetchPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

export interface storePayload extends onStoreDocumentPayload {
Expand Down Expand Up @@ -352,11 +363,11 @@ export interface onConfigurePayload {
}

export interface afterUnloadDocumentPayload {
instance: Hocuspocus;
documentName: string;
instance: Hocuspocus,
documentName: string,
}

export interface DirectConnection {
transact(transaction: (document: Document) => void): Promise<void>,
disconnect(): void
disconnect(): void,
}
Loading
Loading