Skip to content

Commit bc1294e

Browse files
authored
Reconnect with exponential backoff flag: enableReconnect (#507)
#507
1 parent 226d7d0 commit bc1294e

7 files changed

Lines changed: 430 additions & 35 deletions

File tree

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,44 @@ import WebSocket from 'ws'
133133
useWebSocketImplementation(WebSocket)
134134
```
135135

136-
You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms don't report websocket disconnections due to network issues, and enabling this can increase reliability.
136+
#### enablePing
137+
138+
You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms, like Node.js, don't report websocket disconnections due to network issues, and enabling this can increase the reliability of the `onclose` event.
137139

138140
```js
139141
import { SimplePool } from 'nostr-tools/pool'
140142

141143
const pool = new SimplePool({ enablePing: true })
142144
```
143145

146+
#### enableReconnect
147+
148+
You can also enable automatic reconnection with the `enableReconnect` option. This will make the pool try to reconnect to relays with an exponential backoff delay if the connection is lost unexpectedly.
149+
150+
```js
151+
import { SimplePool } from 'nostr-tools/pool'
152+
153+
const pool = new SimplePool({ enableReconnect: true })
154+
```
155+
156+
Using both `enablePing: true` and `enableReconnect: true` is recommended as it will improve the reliability and timeliness of the reconnection (at the expense of slighly higher bandwidth due to the ping messages).
157+
158+
```js
159+
// on Node.js
160+
const pool = new SimplePool({ enablePing: true, enableReconnect: true })
161+
```
162+
163+
The `enableReconnect` option can also be a callback function which will receive the current subscription filters and should return a new set of filters. This is useful if you want to modify the subscription on reconnect, for example, to update the `since` parameter to fetch only new events.
164+
165+
```js
166+
const pool = new SimplePool({
167+
enableReconnect: (filters) => {
168+
const newSince = Math.floor(Date.now() / 1000)
169+
return filters.map(filter => ({ ...filter, since: newSince }))
170+
}
171+
})
172+
```
173+
144174
### Parsing references (mentions) from a content based on NIP-27
145175

146176
```js

abstract-pool.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export class AbstractSimplePool {
3333

3434
public verifyEvent: Nostr['verifyEvent']
3535
public enablePing: boolean | undefined
36+
public enableReconnect: boolean | ((filters: Filter[]) => Filter[]) | undefined
3637
public trustedRelayURLs: Set<string> = new Set()
3738

3839
private _WebSocket?: typeof WebSocket
@@ -41,6 +42,7 @@ export class AbstractSimplePool {
4142
this.verifyEvent = opts.verifyEvent
4243
this._WebSocket = opts.websocketImplementation
4344
this.enablePing = opts.enablePing
45+
this.enableReconnect = opts.enableReconnect
4446
}
4547

4648
async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<AbstractRelay> {
@@ -52,9 +54,12 @@ export class AbstractSimplePool {
5254
verifyEvent: this.trustedRelayURLs.has(url) ? alwaysTrue : this.verifyEvent,
5355
websocketImplementation: this._WebSocket,
5456
enablePing: this.enablePing,
57+
enableReconnect: this.enableReconnect,
5558
})
5659
relay.onclose = () => {
57-
this.relays.delete(url)
60+
if (relay && !relay.enableReconnect) {
61+
this.relays.delete(url)
62+
}
5863
}
5964
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
6065
this.relays.set(url, relay)

abstract-relay.ts

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export type AbstractRelayConstructorOptions = {
1616
verifyEvent: Nostr['verifyEvent']
1717
websocketImplementation?: typeof WebSocket
1818
enablePing?: boolean
19+
enableReconnect?: boolean | ((filters: Filter[]) => Filter[])
1920
}
2021

2122
export class SendingOnClosedConnection extends Error {
@@ -37,9 +38,15 @@ export class AbstractRelay {
3738
public publishTimeout: number = 4400
3839
public pingFrequency: number = 20000
3940
public pingTimeout: number = 20000
41+
public resubscribeBackoff: number[] = [10000, 10000, 10000, 20000, 20000, 30000, 60000]
4042
public openSubs: Map<string, Subscription> = new Map()
4143
public enablePing: boolean | undefined
44+
public enableReconnect: boolean | ((filters: Filter[]) => Filter[])
4245
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
46+
private reconnectTimeoutHandle: ReturnType<typeof setTimeout> | undefined
47+
private pingTimeoutHandle: ReturnType<typeof setTimeout> | undefined
48+
private reconnectAttempts: number = 0
49+
private closedIntentionally: boolean = false
4350

4451
private connectionPromise: Promise<void> | undefined
4552
private openCountRequests = new Map<string, CountResolver>()
@@ -59,6 +66,7 @@ export class AbstractRelay {
5966
this.verifyEvent = opts.verifyEvent
6067
this._WebSocket = opts.websocketImplementation || WebSocket
6168
this.enablePing = opts.enablePing
69+
this.enableReconnect = opts.enableReconnect || false
6270
}
6371

6472
static async connect(url: string, opts: AbstractRelayConstructorOptions): Promise<AbstractRelay> {
@@ -88,6 +96,40 @@ export class AbstractRelay {
8896
return this._connected
8997
}
9098

99+
private async reconnect(): Promise<void> {
100+
const backoff = this.resubscribeBackoff[Math.min(this.reconnectAttempts, this.resubscribeBackoff.length - 1)]
101+
this.reconnectAttempts++
102+
103+
this.reconnectTimeoutHandle = setTimeout(async () => {
104+
try {
105+
await this.connect()
106+
} catch (err) {
107+
// this will be called again through onclose/onerror
108+
}
109+
}, backoff)
110+
}
111+
112+
private handleHardClose(reason: string) {
113+
if (this.pingTimeoutHandle) {
114+
clearTimeout(this.pingTimeoutHandle)
115+
this.pingTimeoutHandle = undefined
116+
}
117+
118+
this._connected = false
119+
this.connectionPromise = undefined
120+
121+
const wasIntentional = this.closedIntentionally
122+
this.closedIntentionally = false // reset for next time
123+
124+
this.onclose?.()
125+
126+
if (this.enableReconnect && !wasIntentional) {
127+
this.reconnect()
128+
} else {
129+
this.closeAllSubscriptions(reason)
130+
}
131+
}
132+
91133
public async connect(): Promise<void> {
92134
if (this.connectionPromise) return this.connectionPromise
93135

@@ -110,8 +152,23 @@ export class AbstractRelay {
110152
}
111153

112154
this.ws.onopen = () => {
155+
if (this.reconnectTimeoutHandle) {
156+
clearTimeout(this.reconnectTimeoutHandle)
157+
this.reconnectTimeoutHandle = undefined
158+
}
113159
clearTimeout(this.connectionTimeoutHandle)
114160
this._connected = true
161+
this.reconnectAttempts = 0
162+
163+
// resubscribe to all open subscriptions
164+
for (const sub of this.openSubs.values()) {
165+
sub.eosed = false
166+
if (typeof this.enableReconnect === 'function') {
167+
sub.filters = this.enableReconnect(sub.filters)
168+
}
169+
sub.fire()
170+
}
171+
115172
if (this.enablePing) {
116173
this.pingpong()
117174
}
@@ -121,19 +178,13 @@ export class AbstractRelay {
121178
this.ws.onerror = ev => {
122179
clearTimeout(this.connectionTimeoutHandle)
123180
reject((ev as any).message || 'websocket error')
124-
this._connected = false
125-
this.connectionPromise = undefined
126-
this.onclose?.()
127-
this.closeAllSubscriptions('relay connection errored')
181+
this.handleHardClose('relay connection errored')
128182
}
129183

130184
this.ws.onclose = ev => {
131185
clearTimeout(this.connectionTimeoutHandle)
132186
reject((ev as any).message || 'websocket closed')
133-
this._connected = false
134-
this.connectionPromise = undefined
135-
this.onclose?.()
136-
this.closeAllSubscriptions('relay connection closed')
187+
this.handleHardClose('relay connection closed')
137188
}
138189

139190
this.ws.onmessage = this._onmessage.bind(this)
@@ -145,7 +196,7 @@ export class AbstractRelay {
145196
private async waitForPingPong() {
146197
return new Promise((res, err) => {
147198
// listen for pong
148-
;(this.ws && this.ws.on && this.ws.on('pong', () => res(true))) || err("ws can't listen for pong")
199+
this.ws && this.ws.on ? this.ws.on('pong', () => res(true)) : err("ws can't listen for pong")
149200
// send a ping
150201
this.ws && this.ws.ping && this.ws.ping()
151202
})
@@ -178,13 +229,12 @@ export class AbstractRelay {
178229
])
179230
if (result) {
180231
// schedule another pingpong
181-
setTimeout(() => this.pingpong(), this.pingFrequency)
232+
this.pingTimeoutHandle = setTimeout(() => this.pingpong(), this.pingFrequency)
182233
} else {
183234
// pingpong closing socket
184-
this.closeAllSubscriptions('pingpong timed out')
185-
this._connected = false
186-
this.onclose?.()
187-
this.ws?.close()
235+
if (this.ws?.readyState === WebSocket.OPEN) {
236+
this.ws?.close()
237+
}
188238
}
189239
}
190240
}
@@ -372,10 +422,21 @@ export class AbstractRelay {
372422
}
373423

374424
public close() {
425+
this.closedIntentionally = true
426+
if (this.reconnectTimeoutHandle) {
427+
clearTimeout(this.reconnectTimeoutHandle)
428+
this.reconnectTimeoutHandle = undefined
429+
}
430+
if (this.pingTimeoutHandle) {
431+
clearTimeout(this.pingTimeoutHandle)
432+
this.pingTimeoutHandle = undefined
433+
}
375434
this.closeAllSubscriptions('relay connection closed by us')
376435
this._connected = false
377436
this.onclose?.()
378-
this.ws?.close()
437+
if (this.ws?.readyState === WebSocket.OPEN) {
438+
this.ws?.close()
439+
}
379440
}
380441

381442
// this is the function assigned to this.ws.onmessage

pool.test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,120 @@ test('ping-pong timeout in pool', async () => {
253253
expect(closed).toBeTrue()
254254
})
255255

256+
test('reconnect on disconnect in pool', async () => {
257+
const mockRelay = mockRelays[0]
258+
pool = new SimplePool({ enablePing: true, enableReconnect: true })
259+
const relay = await pool.ensureRelay(mockRelay.url)
260+
relay.pingTimeout = 50
261+
relay.pingFrequency = 50
262+
relay.resubscribeBackoff = [50, 100]
263+
264+
let closes = 0
265+
relay.onclose = () => {
266+
closes++
267+
}
268+
269+
expect(relay.connected).toBeTrue()
270+
271+
// wait for the first ping to succeed
272+
await new Promise(resolve => setTimeout(resolve, 75))
273+
expect(closes).toBe(0)
274+
275+
// now make it unresponsive
276+
mockRelay.unresponsive = true
277+
278+
// wait for the second ping to fail, which will trigger a close
279+
await new Promise(resolve => {
280+
const interval = setInterval(() => {
281+
if (closes > 0) {
282+
clearInterval(interval)
283+
resolve(null)
284+
}
285+
}, 10)
286+
})
287+
expect(closes).toBe(1)
288+
expect(relay.connected).toBeFalse()
289+
290+
// now make it responsive again
291+
mockRelay.unresponsive = false
292+
293+
// wait for reconnect
294+
await new Promise(resolve => {
295+
const interval = setInterval(() => {
296+
if (relay.connected) {
297+
clearInterval(interval)
298+
resolve(null)
299+
}
300+
}, 10)
301+
})
302+
303+
expect(relay.connected).toBeTrue()
304+
expect(closes).toBe(1)
305+
})
306+
307+
test('reconnect with filter update in pool', async () => {
308+
const mockRelay = mockRelays[0]
309+
const newSince = Math.floor(Date.now() / 1000)
310+
pool = new SimplePool({
311+
enablePing: true,
312+
enableReconnect: filters => {
313+
return filters.map(f => ({ ...f, since: newSince }))
314+
},
315+
})
316+
const relay = await pool.ensureRelay(mockRelay.url)
317+
relay.pingTimeout = 50
318+
relay.pingFrequency = 50
319+
relay.resubscribeBackoff = [50, 100]
320+
321+
let closes = 0
322+
relay.onclose = () => {
323+
closes++
324+
}
325+
326+
expect(relay.connected).toBeTrue()
327+
328+
const sub = relay.subscribe([{ kinds: [1], since: 0 }], { onevent: () => {} })
329+
expect(sub.filters[0].since).toBe(0)
330+
331+
// wait for the first ping to succeed
332+
await new Promise(resolve => setTimeout(resolve, 75))
333+
expect(closes).toBe(0)
334+
335+
// now make it unresponsive
336+
mockRelay.unresponsive = true
337+
338+
// wait for the second ping to fail, which will trigger a close
339+
await new Promise(resolve => {
340+
const interval = setInterval(() => {
341+
if (closes > 0) {
342+
clearInterval(interval)
343+
resolve(null)
344+
}
345+
}, 10)
346+
})
347+
expect(closes).toBe(1)
348+
expect(relay.connected).toBeFalse()
349+
350+
// now make it responsive again
351+
mockRelay.unresponsive = false
352+
353+
// wait for reconnect
354+
await new Promise(resolve => {
355+
const interval = setInterval(() => {
356+
if (relay.connected) {
357+
clearInterval(interval)
358+
resolve(null)
359+
}
360+
}, 10)
361+
})
362+
363+
expect(relay.connected).toBeTrue()
364+
expect(closes).toBe(1)
365+
366+
// check if filter was updated
367+
expect(sub.filters[0].since).toBe(newSince)
368+
})
369+
256370
test('track relays when publishing', async () => {
257371
let event1 = finalizeEvent(
258372
{

pool.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/* global WebSocket */
22

33
import { verifyEvent } from './pure.ts'
4-
import { AbstractSimplePool } from './abstract-pool.ts'
4+
import { AbstractSimplePool, type AbstractPoolConstructorOptions } from './abstract-pool.ts'
55

66
var _WebSocket: typeof WebSocket
77

@@ -14,7 +14,7 @@ export function useWebSocketImplementation(websocketImplementation: any) {
1414
}
1515

1616
export class SimplePool extends AbstractSimplePool {
17-
constructor(options?: { enablePing?: boolean }) {
17+
constructor(options?: Pick<AbstractPoolConstructorOptions, 'enablePing' | 'enableReconnect'>) {
1818
super({ verifyEvent, websocketImplementation: _WebSocket, ...options })
1919
}
2020
}

0 commit comments

Comments
 (0)