Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ export class EngineActorDriver implements ActorDriver {
#runnerStarted: PromiseWithResolvers<undefined> = promiseWithResolvers();
#runnerStopped: PromiseWithResolvers<undefined> = promiseWithResolvers();

// WebSocket message acknowledgment debouncing
#wsAckQueue: Map<
string,
{ requestIdBuf: ArrayBuffer; messageIndex: number }
> = new Map();
#wsAckFlushInterval?: NodeJS.Timeout;

constructor(
registryConfig: RegistryConfig,
runConfig: RunnerConfig,
Expand Down Expand Up @@ -284,6 +291,15 @@ export class EngineActorDriver implements ActorDriver {
namespace: runConfig.namespace,
runnerName: runConfig.runnerName,
});

// Start WebSocket ack flush interval
//
// Decreasing this reduces the amount of buffered messages on the
// gateway
//
// Gateway timeout configured to 30s
// https://github.com/rivet-dev/rivet/blob/222dae87e3efccaffa2b503de40ecf8afd4e31eb/engine/packages/pegboard-gateway/src/shared_state.rs#L17
this.#wsAckFlushInterval = setInterval(() => this.#flushWsAcks(), 1000);
}

async #loadActorHandler(actorId: string): Promise<ActorHandler> {
Expand All @@ -302,6 +318,19 @@ export class EngineActorDriver implements ActorDriver {
return handler.actor;
}

#flushWsAcks(): void {
if (this.#wsAckQueue.size === 0) return;

for (const {
requestIdBuf: requestId,
messageIndex: index,
} of this.#wsAckQueue.values()) {
this.#runner.sendWebsocketMessageAck(requestId, index);
}

this.#wsAckQueue.clear();
}

getContext(actorId: string): DriverContext {
return {};
}
Expand Down Expand Up @@ -554,13 +583,32 @@ export class EngineActorDriver implements ActorDriver {

invariant(event.rivetRequestId, "missing rivetRequestId");
invariant(event.rivetMessageIndex, "missing rivetMessageIndex");
this.#runner.sendWebsocketMessageAck(
event.rivetRequestId,
event.rivetMessageIndex,
);

// Track only the highest seen message index per request
// Convert ArrayBuffer to string for Map key
const currentEntry = this.#wsAckQueue.get(requestId);
if (currentEntry) {
if (event.rivetMessageIndex > currentEntry.messageIndex) {
currentEntry.messageIndex = event.rivetMessageIndex;
} else {
logger().warn({
msg: "received lower index than ack queue for message",
requestId,
queuedMessageIndex: currentEntry,
eventMessageIndex: event.rivetMessageIndex,
});
}
} else {
this.#wsAckQueue.set(requestId, {
requestIdBuf,
messageIndex: event.rivetMessageIndex,
});
}
});

websocket.addEventListener("close", (event) => {
// Flush any pending acks before closing
this.#flushWsAcks();
wsHandlerPromise.then((x) => x.onClose?.(event, wsContext));
});

Expand All @@ -575,6 +623,16 @@ export class EngineActorDriver implements ActorDriver {

async shutdownRunner(immediate: boolean): Promise<void> {
logger().info({ msg: "stopping engine actor driver" });

// Clear the ack flush interval
if (this.#wsAckFlushInterval) {
clearInterval(this.#wsAckFlushInterval);
this.#wsAckFlushInterval = undefined;
}

// Flush any remaining acks
this.#flushWsAcks();

await this.#runner.shutdown(immediate);
}

Expand Down
Loading