Skip to content

Commit 67ceff1

Browse files
committed
Hub adapter: replace HTTP polling with WebSocket connection
- New ws.ts: WebSocket client with auth, reconnect backoff, ping/pong keepalive - monitor.ts: switched from pollHubMessages to connectHubWebSocket - poll.ts: kept for fallback but no longer imported Hub server pushes messages in real-time over the WebSocket. No more polling = no more Cloudflare rate limiting.
1 parent 174096e commit 67ceff1

2 files changed

Lines changed: 136 additions & 3 deletions

File tree

extensions/hub/src/monitor.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { createLoggerBackedRuntime, type RuntimeEnv } from "openclaw/plugin-sdk";
22
import { resolveHubAccount } from "./accounts.js";
33
import { handleHubInbound } from "./inbound.js";
4-
import { pollHubMessages } from "./poll.js";
54
import { getHubRuntime } from "./runtime.js";
65
import { sendMessageHub } from "./send.js";
76
import type { CoreConfig, HubInboundMessage } from "./types.js";
7+
import { connectHubWebSocket } from "./ws.js";
88

99
export type HubMonitorOptions = {
1010
accountId?: string;
@@ -43,11 +43,13 @@ export async function monitorHubProvider(opts: HubMonitorOptions): Promise<{ sto
4343
: ac.signal;
4444

4545
// Start polling in the background (fire and forget).
46-
pollHubMessages({
46+
connectHubWebSocket({
4747
url: account.url,
4848
agentId: account.agentId,
4949
secret: account.secret,
50-
pollTimeoutSec: account.pollTimeoutSec,
50+
onConnected: () => {
51+
logger.info(`[${account.accountId}] Hub WebSocket connected`);
52+
},
5153
abortSignal: combinedSignal,
5254
onMessages: async (messages: HubInboundMessage[]) => {
5355
for (const message of messages) {

extensions/hub/src/ws.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { setTimeout } from "node:timers/promises";
2+
import type { HubInboundMessage } from "./types.js";
3+
4+
export type WsHubOptions = {
5+
url: string;
6+
agentId: string;
7+
secret: string;
8+
abortSignal?: AbortSignal;
9+
onMessages: (messages: HubInboundMessage[]) => void | Promise<void>;
10+
onError?: (error: Error) => void;
11+
onConnected?: () => void;
12+
};
13+
14+
function httpToWs(url: string): string {
15+
return url.replace(/^http/, "ws");
16+
}
17+
18+
function computeReconnectDelay(attempt: number): number {
19+
const initialMs = 1_000;
20+
const maxMs = 60_000;
21+
const base = Math.min(initialMs * 2 ** attempt, maxMs);
22+
const jitter = base * 0.3 * (Math.random() * 2 - 1);
23+
return Math.max(initialMs, Math.round(base + jitter));
24+
}
25+
26+
export async function connectHubWebSocket(opts: WsHubOptions): Promise<void> {
27+
const { url, agentId, secret, abortSignal, onMessages, onError, onConnected } = opts;
28+
let attempt = 0;
29+
30+
while (!abortSignal?.aborted) {
31+
try {
32+
const wsUrl = `${httpToWs(url)}/agents/${encodeURIComponent(agentId)}/ws`;
33+
const ws = new WebSocket(wsUrl);
34+
35+
await new Promise<void>((resolve, reject) => {
36+
const cleanup = () => {
37+
ws.removeEventListener("open", onOpen);
38+
ws.removeEventListener("error", onErr);
39+
ws.removeEventListener("close", onClose);
40+
ws.removeEventListener("message", onMsg);
41+
};
42+
43+
const onOpen = () => {
44+
ws.send(JSON.stringify({ secret }));
45+
};
46+
47+
const onErr = () => {
48+
cleanup();
49+
reject(new Error("WebSocket error"));
50+
};
51+
52+
const onClose = () => {
53+
cleanup();
54+
resolve();
55+
};
56+
57+
const onMsg = async (ev: MessageEvent) => {
58+
try {
59+
const data = JSON.parse(String(ev.data));
60+
61+
if (data.type === "auth" && data.ok) {
62+
attempt = 0;
63+
onConnected?.();
64+
return;
65+
}
66+
67+
if (!data.ok && data.error) {
68+
cleanup();
69+
reject(new Error(`Hub auth failed: ${data.error}`));
70+
return;
71+
}
72+
73+
if (data.type === "message" && data.data) {
74+
const msg: HubInboundMessage = {
75+
messageId: data.data.messageId || `hub-${crypto.randomUUID()}`,
76+
from: data.data.from,
77+
text: data.data.text,
78+
timestamp:
79+
typeof data.data.timestamp === "number"
80+
? data.data.timestamp
81+
: typeof data.data.timestamp === "string"
82+
? new Date(data.data.timestamp).getTime() || Date.now()
83+
: Date.now(),
84+
};
85+
await onMessages([msg]);
86+
}
87+
} catch (err) {
88+
onError?.(err instanceof Error ? err : new Error(String(err)));
89+
}
90+
};
91+
92+
ws.addEventListener("open", onOpen);
93+
ws.addEventListener("error", onErr);
94+
ws.addEventListener("close", onClose);
95+
ws.addEventListener("message", onMsg);
96+
97+
const pingInterval = setInterval(() => {
98+
if (ws.readyState === WebSocket.OPEN) {
99+
ws.send(JSON.stringify({ type: "ping" }));
100+
}
101+
}, 25_000);
102+
103+
abortSignal?.addEventListener(
104+
"abort",
105+
() => {
106+
clearInterval(pingInterval);
107+
cleanup();
108+
ws.close();
109+
resolve();
110+
},
111+
{ once: true },
112+
);
113+
114+
ws.addEventListener("close", () => clearInterval(pingInterval), { once: true });
115+
});
116+
} catch (err) {
117+
if (abortSignal?.aborted) break;
118+
const error = err instanceof Error ? err : new Error(String(err));
119+
onError?.(error);
120+
}
121+
122+
if (abortSignal?.aborted) break;
123+
const delay = computeReconnectDelay(attempt);
124+
attempt++;
125+
try {
126+
await setTimeout(delay, undefined, { signal: abortSignal });
127+
} catch {
128+
break;
129+
}
130+
}
131+
}

0 commit comments

Comments
 (0)