Skip to content

Commit 840b2f1

Browse files
authored
feat: onceWhen function (#43)
* feat: implement `onceWhen` helper function * chore: add example usage docs * chore: add example usage docs
1 parent 16c9616 commit 840b2f1

File tree

3 files changed

+239
-0
lines changed

3 files changed

+239
-0
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import { EventEmitter } from 'node:events';
2+
import { onceWhen } from '../events';
3+
4+
describe('onceWhen tests', () => {
5+
test('should resolve when event is emitted and predicate matches', async () => {
6+
const emitter = new EventEmitter<{
7+
myTestEvent: [eventNumber: number, msg: string];
8+
}>();
9+
10+
setTimeout(() => {
11+
for (let i = 0; i <= 5; i++) {
12+
emitter.emit('myTestEvent', i, `Message ${i}`);
13+
}
14+
}, 10); // Emit after a delay
15+
16+
const [eventNumber, msg] = await onceWhen(emitter, 'myTestEvent', (eventNumber, msg) => {
17+
return eventNumber === 5;
18+
});
19+
expect(eventNumber).toBe(5);
20+
expect(msg).toBe('Message 5');
21+
22+
// Expect that the event listener was removed after onceWhen is finished
23+
expect(emitter.eventNames()).toStrictEqual([]);
24+
});
25+
26+
test('should reject if aborted immediately', async () => {
27+
const emitter = new EventEmitter<{
28+
myTestEvent: [eventNumber: number];
29+
}>();
30+
const controller = new AbortController();
31+
const abortReason = new Error('Test aborted');
32+
controller.abort(abortReason);
33+
await expect(
34+
onceWhen(emitter, 'myTestEvent', () => true, { signal: controller.signal })
35+
).rejects.toThrow(abortReason);
36+
37+
// Expect that the event listener was removed after onceWhen is finished
38+
expect(emitter.eventNames()).toStrictEqual([]);
39+
});
40+
41+
test('should reject if aborted before event is emitted', async () => {
42+
const emitter = new EventEmitter<{
43+
myTestEvent: [eventNumber: number];
44+
}>();
45+
const controller = new AbortController();
46+
const abortReason = new Error('Test aborted');
47+
// controller.abort(abortReason);
48+
setTimeout(() => {
49+
for (let i = 0; i <= 5; i++) {
50+
emitter.emit('myTestEvent', i);
51+
if (i === 3) {
52+
controller.abort(abortReason); // Abort after emitting some events
53+
}
54+
}
55+
}, 10); // Emit after a delay
56+
57+
let lastEventNumberSeen = 0;
58+
await expect(
59+
onceWhen(
60+
emitter,
61+
'myTestEvent',
62+
eventNumber => {
63+
lastEventNumberSeen = eventNumber;
64+
return false;
65+
},
66+
{ signal: controller.signal }
67+
)
68+
).rejects.toThrow(abortReason);
69+
70+
// Check that we saw events before the abort
71+
expect(lastEventNumberSeen).toBe(3);
72+
73+
// Expect that the event listener was removed after onceWhen is finished
74+
expect(emitter.eventNames()).toStrictEqual([]);
75+
});
76+
77+
test('should resolve if event is emitted before abort', async () => {
78+
const emitter = new EventEmitter<{
79+
myTestEvent: [eventNumber: number];
80+
}>();
81+
const controller = new AbortController();
82+
83+
setTimeout(() => {
84+
for (let i = 0; i <= 5; i++) {
85+
emitter.emit('myTestEvent', i);
86+
}
87+
controller.abort(); // Abort after emitting all events
88+
}, 10); // Emit after a delay
89+
90+
const [eventNumber] = await onceWhen(emitter, 'myTestEvent', eventNumber => eventNumber === 5, {
91+
signal: controller.signal,
92+
});
93+
expect(eventNumber).toBe(5);
94+
95+
// Expect that the event listener was removed after onceWhen is finished
96+
expect(emitter.eventNames()).toStrictEqual([]);
97+
});
98+
99+
test('should reject if predict function throws', async () => {
100+
const emitter = new EventEmitter<{
101+
myTestEvent: [eventNumber: number];
102+
}>();
103+
setTimeout(() => {
104+
for (let i = 0; i <= 5; i++) {
105+
emitter.emit('myTestEvent', i);
106+
}
107+
}, 10);
108+
109+
let lastEventNumberSeen = 0;
110+
const predictFunctionError = new Error('Predict function error');
111+
await expect(
112+
onceWhen(emitter, 'myTestEvent', eventNumber => {
113+
lastEventNumberSeen = eventNumber;
114+
if (eventNumber === 3) {
115+
throw predictFunctionError;
116+
}
117+
return false;
118+
})
119+
).rejects.toThrow(predictFunctionError);
120+
expect(lastEventNumberSeen).toBe(3);
121+
122+
// Expect that the event listener was removed after onceWhen is finished
123+
expect(emitter.eventNames()).toStrictEqual([]);
124+
});
125+
126+
test('abort signal test', async () => {
127+
const emitter = new EventEmitter<{ myEvent: [id: number, msg: string] }>();
128+
const signal = AbortSignal.timeout(10);
129+
setTimeout(() => emitter.emit('myEvent', 1, 'Hello'), 1000);
130+
const whenPromise = onceWhen(emitter, 'myEvent', id => id === 1, { signal });
131+
// This rejects because the signal is aborted before the event is emitted
132+
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
133+
await expect(whenPromise).rejects.toThrow(signal.reason);
134+
});
135+
});

src/helpers/events.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import { EventEmitter, addAbortListener } from 'node:events';
2+
3+
// This is a workaround for Node.js versions that do not support Symbol.dispose
4+
const DisposeSymbol: typeof Symbol.dispose = Symbol.dispose ?? Symbol.for('nodejs.dispose');
5+
6+
/**
7+
* Creates a Promise that resolves when the specified `eventName` is emitted by the `EventEmitter`
8+
* and the provided predicate returns `true` for the emitted arguments.
9+
*
10+
* Similar to [`events.once`](https://nodejs.org/api/events.html#eventsonceemitter-name-options),
11+
* but includes support for a predicate function to filter events. Only events for which
12+
* the predicate returns `true` will cause the Promise to resolve.
13+
*
14+
* The resolved value is an array of the arguments emitted with the event.
15+
*
16+
* Supports typed `EventEmitter`s and optional cancellation via `AbortSignal`.
17+
*
18+
* @example
19+
* ```ts
20+
* import { EventEmitter } from 'node:events';
21+
*
22+
* const emitter = new EventEmitter<{
23+
* myEvent: [id: number, msg: string];
24+
* }>();
25+
*
26+
* setTimeout(() => {
27+
* for (let i = 0; i <= 5; i++) {
28+
* emitter.emit('myEvent', i, `Message ${i}`);
29+
* }
30+
* }, 100);
31+
*
32+
* const [id, msg] = await onceWhen(emitter, 'myEvent', (id, msg) => id === 3);
33+
*
34+
* // outputs: "Received event with id: 3, message: Message 3"
35+
* console.log(`Received event with id: ${id}, message: ${msg}`);
36+
* ```
37+
*
38+
* @example
39+
* ```ts
40+
* import { EventEmitter } from 'node:events';
41+
*
42+
* const emitter = new EventEmitter<{ myEvent: [id: number, msg: string] }>();
43+
*
44+
* const signal = AbortSignal.timeout(10);
45+
*
46+
* setTimeout(() => emitter.emit('myEvent', 1, 'Hello'), 1000);
47+
*
48+
* const whenPromise = onceWhen(emitter, 'myEvent', id => id === 1, { signal });
49+
*
50+
* // This rejects because the signal is aborted before the event is emitted
51+
* await expect(whenPromise).rejects.toThrow(signal.reason);
52+
* ```
53+
*/
54+
export function onceWhen<
55+
EventMap extends Record<string, any[]> = Record<string, any[]>,
56+
K extends Extract<keyof EventMap, string> = Extract<keyof EventMap, string>
57+
>(
58+
emitter: EventEmitter<EventMap>,
59+
eventName: K,
60+
predicate: (...args: EventMap[K]) => boolean,
61+
options?: { signal?: AbortSignal }
62+
): Promise<EventMap[K]> {
63+
return new Promise((resolve, reject) => {
64+
// Immediate abort check
65+
if (options?.signal?.aborted) {
66+
reject((options.signal.reason as Error) ?? new Error('Aborted'));
67+
return;
68+
}
69+
70+
// Cleanup helper: remove both the event listener and the abort listener
71+
const cleanup = () => {
72+
// eslint-disable-next-line @typescript-eslint/no-use-before-define
73+
(emitter as EventEmitter).off(eventName, listener);
74+
// eslint-disable-next-line @typescript-eslint/no-use-before-define
75+
disposable?.[DisposeSymbol]();
76+
};
77+
78+
// Abort handler
79+
const onAbort = () => {
80+
cleanup();
81+
reject((options?.signal?.reason as Error) ?? new Error('Aborted'));
82+
};
83+
84+
// Our event listener that checks the predicate
85+
const listener = (...args: EventMap[K]) => {
86+
try {
87+
if (predicate(...args)) {
88+
cleanup();
89+
resolve(args);
90+
}
91+
} catch (err) {
92+
cleanup();
93+
reject(err as Error);
94+
return;
95+
}
96+
};
97+
98+
// Install the AbortSignal listener via Node’s helper
99+
const disposable = options?.signal ? addAbortListener(options.signal, onAbort) : undefined;
100+
101+
(emitter as EventEmitter).on(eventName, listener);
102+
});
103+
}

src/helpers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ export * from './iterators';
22
export * from './time';
33
export * from './values';
44
export * from './is-debugging';
5+
export * from './events';
56
export { WorkerThreadManager } from './worker-thread-manager';
67
export type { WorkerPoolModuleInterface } from './worker-thread-manager';

0 commit comments

Comments
 (0)