diff --git a/js/hang/package.json b/js/hang/package.json index 976a6d9a4..68d8e3235 100644 --- a/js/hang/package.json +++ b/js/hang/package.json @@ -35,19 +35,21 @@ "scripts": { "build": "rimraf dist && tsc -b && tsx ../scripts/package.ts", "check": "tsc --noEmit", - "release": "tsx ../scripts/release.ts" + "release": "tsx ../scripts/release.ts", + "test": "vitest" }, "dependencies": { + "@huggingface/transformers": "^3.7.2", "@kixelated/moq": "workspace:^", "@kixelated/signals": "workspace:^", - "zod": "^4.1.3", + "async-mutex": "^0.5.0", "comlink": "^4.4.2", - "@huggingface/transformers": "^3.7.2", - "async-mutex": "^0.5.0" + "zod": "^4.1.3" }, "devDependencies": { "@types/audioworklet": "^0.0.77", "@typescript/lib-dom": "npm:@types/web@^0.0.241", - "fast-glob": "^3.3.3" + "fast-glob": "^3.3.3", + "vitest": "^3.2.4" } } diff --git a/js/hang/src/connection.ts b/js/hang/src/connection.ts index 12f0d32a3..fb8675b84 100644 --- a/js/hang/src/connection.ts +++ b/js/hang/src/connection.ts @@ -1,5 +1,6 @@ import * as Moq from "@kixelated/moq"; import { Effect, Signal } from "@kixelated/signals"; +import type * as Time from "./time"; export type ConnectionProps = { // The URL of the relay server. @@ -11,11 +12,11 @@ export type ConnectionProps = { // The delay in milliseconds before reconnecting. // default: 1000 - delay?: DOMHighResTimeStamp; + delay?: Time.Milli; // The maximum delay in milliseconds. // default: 30000 - maxDelay?: number; + maxDelay?: Time.Milli; }; export type ConnectionStatus = "connecting" | "connected" | "disconnected" | "unsupported"; @@ -26,11 +27,11 @@ export class Connection { established = new Signal(undefined); readonly reload: boolean; - readonly delay: number; - readonly maxDelay: number; + readonly delay: Time.Milli; + readonly maxDelay: Time.Milli; signals = new Effect(); - #delay: number; + #delay: Time.Milli; // Increased by 1 each time to trigger a reload. #tick = new Signal(0); @@ -38,8 +39,8 @@ export class Connection { constructor(props?: ConnectionProps) { this.url = Signal.from(props?.url); this.reload = props?.reload ?? true; - this.delay = props?.delay ?? 1000; - this.maxDelay = props?.maxDelay ?? 30000; + this.delay = props?.delay ?? (1000 as Time.Milli); + this.maxDelay = props?.maxDelay ?? (30000 as Time.Milli); this.#delay = this.delay; @@ -91,7 +92,7 @@ export class Connection { effect.timer(() => this.#tick.set((prev) => Math.max(prev, tick)), this.#delay); // Exponential backoff. - this.#delay = Math.min(this.#delay * 2, this.maxDelay); + this.#delay = Math.min(this.#delay * 2, this.maxDelay) as Time.Milli; } } }); diff --git a/js/hang/src/frame.test.ts b/js/hang/src/frame.test.ts new file mode 100644 index 000000000..07ad5abfb --- /dev/null +++ b/js/hang/src/frame.test.ts @@ -0,0 +1,28 @@ +import * as Moq from "@kixelated/moq"; +import { describe, expect, it } from "vitest"; +import { Consumer, Producer } from "./frame"; +import * as Time from "./time"; + +describe("Consumer", () => { + it("should close cleanly", async () => { + // Create a broadcast and track + const broadcast = new Moq.BroadcastProducer(); + const track = broadcast.createTrack("test"); + const producer = new Producer(track); + + // Create consumer from the track + const consumer = new Consumer(track.consume()); + + producer.encode(new Uint8Array([1]), Time.Micro.fromMilli(1 as Time.Milli), true); + producer.close(); + + // Close consumer before trying to decode + consumer.close(); + + const frame = await consumer.decode(); + expect(frame).toBeUndefined(); + + // Clean up + broadcast.close(); + }); +}); diff --git a/js/hang/src/frame.ts b/js/hang/src/frame.ts index c3940c1a6..b6fc6a339 100644 --- a/js/hang/src/frame.ts +++ b/js/hang/src/frame.ts @@ -1,12 +1,24 @@ import type * as Moq from "@kixelated/moq"; +import { Effect } from "@kixelated/signals"; +import * as Time from "./time"; export interface Source { byteLength: number; copyTo(buffer: Uint8Array): void; } -export function encode(source: Uint8Array | Source, timestamp: number): Uint8Array { +export interface Frame { + data: Uint8Array; + timestamp: Time.Micro; + keyframe: boolean; + group: number; +} + +export function encode(source: Uint8Array | Source, timestamp: Time.Micro): Uint8Array { + // TODO switch over to u64 for simplicity. The varint uses 8 bytes anyway after 18 minutes lul. + // TODO Don't encode into one buffer. Write the header/payload separately to avoid reallocating. const data = new Uint8Array(8 + (source instanceof Uint8Array ? source.byteLength : source.byteLength)); + const size = setVint53(data, timestamp).byteLength; if (source instanceof Uint8Array) { data.set(source, size); @@ -17,8 +29,9 @@ export function encode(source: Uint8Array | Source, timestamp: number): Uint8Arr } // NOTE: A keyframe is always the first frame in a group, so it's not encoded on the wire. -export function decode(buffer: Uint8Array): { data: Uint8Array; timestamp: number } { - const [timestamp, data] = getVint53(buffer); +export function decode(buffer: Uint8Array): { data: Uint8Array; timestamp: Time.Micro } { + const [us, data] = getVint53(buffer); + const timestamp = us as Time.Micro; return { timestamp, data }; } @@ -30,7 +43,7 @@ export class Producer { this.#track = track; } - encode(data: Uint8Array | Source, timestamp: number, keyframe: boolean) { + encode(data: Uint8Array | Source, timestamp: Time.Micro, keyframe: boolean) { if (keyframe) { this.#group?.close(); this.#group = this.#track.appendGroup(); @@ -41,29 +54,208 @@ export class Producer { this.#group?.writeFrame(encode(data, timestamp)); } - consume(): Consumer { - return new Consumer(this.#track.consume()); - } - close() { this.#track.close(); this.#group?.close(); } } +export interface ConsumerProps { + // Target latency in milliseconds (default: 0) + latency?: Time.Milli; +} + export class Consumer { #track: Moq.TrackConsumer; + #latency: Time.Micro; + #groups: Moq.GroupConsumer[] = []; + #active = 0; // the active group sequence number + #frames: Frame[] = []; + #prev?: Time.Micro; - constructor(track: Moq.TrackConsumer) { + // Wake up the consumer when a new frame is available. + #notify?: () => void; + + #signals = new Effect(); + + constructor(track: Moq.TrackConsumer, props?: ConsumerProps) { this.#track = track; + this.#latency = Time.Micro.fromMilli(props?.latency ?? Time.Milli.zero); + this.#signals.spawn(this.#run.bind(this)); + } + + async #run(cancel: Promise) { + // Start fetching groups in the background + for (;;) { + const group = await Promise.race([this.#track.nextGroup(), cancel]); + if (!group) break; + + if (group.sequence < this.#active) { + console.warn(`skipping old group: ${group.sequence} < ${this.#active}`); + // Skip old groups. + group.close(); + continue; + } + + // Insert into #groups based on the group sequence number (ascending). + // This is used to cancel old groups. + this.#groups.push(group.clone()); + this.#groups.sort((a, b) => a.sequence - b.sequence); + + // Start buffering frames from this group + this.#signals.spawn(this.#runGroup.bind(this, group)); + } + } + + async #runGroup(group: Moq.GroupConsumer, cancel: Promise) { + try { + let keyframe = true; + + for (;;) { + const next = await Promise.race([group.readFrame(), cancel]); + if (!next) break; + + const { data, timestamp } = decode(next); + const frame = { + data, + timestamp, + keyframe, + group: group.sequence, + }; + + keyframe = false; + + // Store frames in buffer in order. + if (this.#frames.length === 0 || frame.timestamp >= this.#frames[this.#frames.length - 1].timestamp) { + // Already sorted; most of the time we just append to the end. + this.#frames.push(frame); + } else { + console.warn( + `frame out of order: ${frame.timestamp} < ${this.#frames[this.#frames.length - 1].timestamp}`, + ); + this.#frames.push(frame); + this.#frames.sort((a, b) => a.timestamp - b.timestamp); + } + + if (this.#frames.at(0)?.group === this.#active) { + if (this.#notify) { + this.#notify(); + this.#notify = undefined; + } + } else { + // Check for latency violations + this.#checkLatency(); + } + } + + group.close(); + } finally { + if (group.sequence === this.#active) { + // Advance to the next group. + // We don't use #skipTo because we don't want to drop the last frames. + this.#active += 1; + + if (this.#notify && this.#frames.at(0)?.group === this.#active) { + this.#notify(); + this.#notify = undefined; + } + } + } + } + + #checkLatency() { + if (this.#frames.length < 2) return; + + // Check if we have at least #latency frames in the queue. + const first = this.#frames[0]; + const last = this.#frames[this.#frames.length - 1]; + + const latency = last.timestamp - first.timestamp; + if (latency < this.#latency) return; + + // Skip to the next group + const nextFrame = this.#frames.find((f) => f.group > this.#active); + if (!nextFrame) return; // Within the same group, ignore for now + + if (this.#prev) { + console.warn(`latency violation: ${Math.round(latency / 1000)}ms buffered`); + console.warn(`skipping ahead: ${Math.round((nextFrame.timestamp - this.#prev) / 1000)}ms`); + } + + this.#skipTo(nextFrame.group); + } + + #skipTo(groupId: number) { + this.#active = groupId; + + // Skip old groups. + while (this.#groups.length > 0 && this.#groups[0].sequence < this.#active) { + this.#groups.shift()?.close(); + } + + // Skip old frames. + let dropped = 0; + while (this.#frames.length > 0 && this.#frames[0].group < this.#active) { + dropped++; + this.#frames.shift(); + } + + if (dropped > 0) { + console.warn(`dropped ${dropped} frames while skipping`); + } + + if (this.#notify && this.#frames.at(0)?.group === this.#active) { + this.#notify(); + this.#notify = undefined; + } } - async decode(): Promise<{ data: Uint8Array; timestamp: number; keyframe: boolean } | undefined> { - const next = await this.#track.nextFrame(); - if (!next) return undefined; + async decode(): Promise { + for (;;) { + // Check if we have frames from the active group + if (this.#frames.length > 0) { + if (this.#frames[0].group === this.#active) { + const next = this.#frames.shift(); + this.#prev = next?.timestamp; + return next; + } + + // We have frames but not from the active group + // Check if we should move to the next group + const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active); + if (nextGroupFrames.length > 0) { + // Move to the next group + const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group)); + this.#skipTo(nextGroup); + continue; + } + } + + if (this.#notify) { + throw new Error("multiple calls to decode not supported"); + } + + const wait = new Promise((resolve) => { + this.#notify = resolve; + }).then(() => true); + + if (!(await Promise.race([wait, this.#signals.closed()]))) { + this.#notify = undefined; + // Consumer was closed while waiting for a new frame. + return undefined; + } + } + } + + close(): void { + this.#signals.close(); + + for (const group of this.#groups) { + group.close(); + } - const { timestamp, data } = decode(next.data); - return { data, timestamp, keyframe: next.frame === 0 }; + this.#groups = []; + this.#frames = []; } } diff --git a/js/hang/src/publish/audio/captions.ts b/js/hang/src/publish/audio/captions.ts index 508355aa0..5a977424f 100644 --- a/js/hang/src/publish/audio/captions.ts +++ b/js/hang/src/publish/audio/captions.ts @@ -2,6 +2,7 @@ import * as Moq from "@kixelated/moq"; import { Effect, Signal } from "@kixelated/signals"; import type * as Catalog from "../../catalog"; import { u8 } from "../../catalog"; +import type * as Time from "../../time"; import type { Audio } from "."; import type { Request, Result } from "./captions-worker"; import CaptureWorklet from "./capture-worklet?worker&url"; @@ -11,7 +12,7 @@ export type CaptionsProps = { transcribe?: boolean; // Captions are cleared after this many milliseconds. (10s default) - ttl?: DOMHighResTimeStamp; + ttl?: Time.Milli; }; export class Captions { @@ -25,13 +26,13 @@ export class Captions { signals = new Effect(); - #ttl: DOMHighResTimeStamp; + #ttl: Time.Milli; #track = new Moq.TrackProducer("captions.txt", 1); constructor(audio: Audio, props?: CaptionsProps) { this.audio = audio; - this.#ttl = props?.ttl ?? 10000; + this.#ttl = props?.ttl ?? (10000 as Time.Milli); this.enabled = Signal.from(props?.enabled ?? false); this.signals.effect(this.#run.bind(this)); diff --git a/js/hang/src/publish/audio/capture-worklet.ts b/js/hang/src/publish/audio/capture-worklet.ts index 81a75d3c7..080f4b68e 100644 --- a/js/hang/src/publish/audio/capture-worklet.ts +++ b/js/hang/src/publish/audio/capture-worklet.ts @@ -1,3 +1,4 @@ +import * as Time from "../../time"; import type { AudioFrame } from "./capture"; class Capture extends AudioWorkletProcessor { @@ -9,8 +10,11 @@ class Capture extends AudioWorkletProcessor { const channels = input[0]; if (channels.length === 0) return true; // TODO: No input hooked up? + // Convert sample count to microseconds + const timestamp = Time.Micro.fromSecond((this.#sampleCount / sampleRate) as Time.Second); + const msg: AudioFrame = { - timestamp: this.#sampleCount, + timestamp, channels, }; diff --git a/js/hang/src/publish/audio/capture.ts b/js/hang/src/publish/audio/capture.ts index b64a3cc17..3e6feb878 100644 --- a/js/hang/src/publish/audio/capture.ts +++ b/js/hang/src/publish/audio/capture.ts @@ -1,4 +1,6 @@ +import type * as Time from "../../time"; + export interface AudioFrame { - timestamp: number; + timestamp: Time.Micro; channels: Float32Array[]; } diff --git a/js/hang/src/publish/audio/index.ts b/js/hang/src/publish/audio/index.ts index f11142985..3f6b98ed9 100644 --- a/js/hang/src/publish/audio/index.ts +++ b/js/hang/src/publish/audio/index.ts @@ -3,6 +3,7 @@ import { Effect, type Getter, Signal } from "@kixelated/signals"; import type * as Catalog from "../../catalog"; import { u8, u53 } from "../../catalog/integers"; import * as Frame from "../../frame"; +import * as Time from "../../time"; import { Captions, type CaptionsProps } from "./captions"; import type * as Capture from "./capture"; @@ -55,7 +56,7 @@ export type AudioProps = { // The size of each group. Larger groups mean fewer drops but the viewer can fall further behind. // NOTE: Each frame is always flushed to the network immediately. - maxLatency?: DOMHighResTimeStamp; + maxLatency?: Time.Milli; }; export class Audio { @@ -66,7 +67,7 @@ export class Audio { volume: Signal; captions: Captions; speaking: Speaking; - maxLatency: DOMHighResTimeStamp; + maxLatency: Time.Milli; source: Signal; @@ -93,7 +94,7 @@ export class Audio { this.captions = new Captions(this, props?.captions); this.muted = Signal.from(props?.muted ?? false); this.volume = Signal.from(props?.volume ?? 1); - this.maxLatency = props?.maxLatency ?? 100; // Default is a group every 100ms + this.maxLatency = props?.maxLatency ?? (100 as Time.Milli); // Default is a group every 100ms this.#signals.effect(this.#runSource.bind(this)); this.#signals.effect(this.#runGain.bind(this)); @@ -180,12 +181,13 @@ export class Audio { numberOfChannels: u53(settings.channelCount), // TODO configurable bitrate: u53(settings.channelCount * 32_000), + // TODO there's a bunch of advanced Opus settings that we should use. }; let group: Moq.GroupProducer = this.#track.appendGroup(); effect.cleanup(() => group.close()); - let groupTimestamp = 0; + let groupTimestamp = 0 as Time.Micro; const encoder = new AudioEncoder({ output: (frame) => { @@ -193,13 +195,13 @@ export class Audio { throw new Error("only key frames are supported"); } - if (frame.timestamp - groupTimestamp >= 1000 * this.maxLatency) { + if (frame.timestamp - groupTimestamp >= Time.Micro.fromMilli(this.maxLatency)) { group.close(); group = this.#track.appendGroup(); - groupTimestamp = frame.timestamp; + groupTimestamp = frame.timestamp as Time.Micro; } - const buffer = Frame.encode(frame, frame.timestamp); + const buffer = Frame.encode(frame, frame.timestamp as Time.Micro); group.writeFrame(buffer); }, error: (err) => { @@ -232,7 +234,7 @@ export class Audio { sampleRate: worklet.context.sampleRate, numberOfFrames: channels[0].length, numberOfChannels: channels.length, - timestamp: (1_000_000 * data.timestamp) / worklet.context.sampleRate, + timestamp: data.timestamp, data: joined, transfer: [joined.buffer], }); diff --git a/js/hang/src/publish/video/index.ts b/js/hang/src/publish/video/index.ts index f36515113..c1fdf4f61 100644 --- a/js/hang/src/publish/video/index.ts +++ b/js/hang/src/publish/video/index.ts @@ -3,6 +3,7 @@ import { Effect, type Getter, Signal } from "@kixelated/signals"; import type * as Catalog from "../../catalog"; import { u8, u53 } from "../../catalog/integers"; import * as Frame from "../../frame"; +import * as Time from "../../time"; import { isFirefox } from "../../util/hacks"; import * as Hex from "../../util/hex"; import { Detection, type DetectionProps } from "./detection"; @@ -13,7 +14,7 @@ export * from "./detection"; export type Source = VideoStreamTrack; // Create a group every 2 seconds -const GOP_DURATION_US = 2 * 1000 * 1000; +const GOP_DURATION = Time.Micro.fromSecond(2 as Time.Second); // Stronger typing for the MediaStreamTrack interface. export interface VideoStreamTrack extends MediaStreamTrack { @@ -104,7 +105,7 @@ export class Video { let group: Moq.GroupProducer | undefined; effect.cleanup(() => group?.close()); - let groupTimestamp = 0; + let groupTimestamp = 0 as Time.Micro; const encoder = new VideoEncoder({ output: (frame: EncodedVideoChunk, metadata?: EncodedVideoChunkMetadata) => { @@ -113,14 +114,14 @@ export class Video { } if (frame.type === "key") { - groupTimestamp = frame.timestamp; + groupTimestamp = frame.timestamp as Time.Micro; group?.close(); group = this.#track.appendGroup(); } else if (!group) { throw new Error("no keyframe"); } - const buffer = Frame.encode(frame, frame.timestamp); + const buffer = Frame.encode(frame, frame.timestamp as Time.Micro); group?.writeFrame(buffer); }, error: (err: Error) => { @@ -147,9 +148,9 @@ export class Video { while (frame) { // Force a keyframe if this is the first frame (no group yet), or GOP elapsed. - const keyFrame = !group || groupTimestamp + GOP_DURATION_US <= frame.timestamp; + const keyFrame = !group || groupTimestamp + GOP_DURATION <= frame.timestamp; if (keyFrame) { - groupTimestamp = frame.timestamp; + groupTimestamp = frame.timestamp as Time.Micro; } this.frame.set((prev) => { diff --git a/js/hang/src/publish/video/polyfill.ts b/js/hang/src/publish/video/polyfill.ts index 135260575..1a054dc4f 100644 --- a/js/hang/src/publish/video/polyfill.ts +++ b/js/hang/src/publish/video/polyfill.ts @@ -1,3 +1,4 @@ +import * as Time from "../../time"; import type { VideoStreamTrack } from "."; // Firefox doesn't support MediaStreamTrackProcessor so we need to use a polyfill. @@ -20,7 +21,7 @@ export function VideoTrackProcessor(track: VideoStreamTrack): ReadableStream requestAnimationFrame(r)); continue; @@ -56,7 +57,7 @@ export function VideoTrackProcessor(track: VideoStreamTrack): ReadableStream (us * 1_000) as Nano, + fromMilli: (ms: Milli): Nano => (ms * 1_000_000) as Nano, + fromSecond: (s: Second): Nano => (s * 1_000_000_000) as Nano, + toMicro: (ns: Nano): Micro => (ns / 1_000) as Micro, + toMilli: (ns: Nano): Milli => (ns / 1_000_000) as Milli, + toSecond: (ns: Nano): Second => (ns / 1_000_000_000) as Second, +} as const; + +export type Micro = number & { readonly _brand: "micro" }; + +export const Micro = { + zero: 0 as Micro, + fromNano: (ns: Nano): Micro => (ns / 1_000) as Micro, + fromMilli: (ms: Milli): Micro => (ms * 1_000) as Micro, + fromSecond: (s: Second): Micro => (s * 1_000_000) as Micro, + toNano: (us: Micro): Nano => (us * 1_000) as Nano, + toMilli: (us: Micro): Milli => (us / 1_000) as Milli, + toSecond: (us: Micro): Second => (us / 1_000_000) as Second, +} as const; + +export type Milli = number & { readonly _brand: "milli" }; + +export const Milli = { + zero: 0 as Milli, + fromNano: (ns: Nano): Milli => (ns / 1_000_000) as Milli, + fromMicro: (us: Micro): Milli => (us / 1_000) as Milli, + fromSecond: (s: Second): Milli => (s * 1_000) as Milli, + toNano: (ms: Milli): Nano => (ms * 1_000_000) as Nano, + toMicro: (ms: Milli): Micro => (ms * 1_000) as Micro, + toSecond: (ms: Milli): Second => (ms / 1_000) as Second, +} as const; + +export type Second = number & { readonly _brand: "second" }; + +export const Second = { + zero: 0 as Second, + fromNano: (ns: Nano): Second => (ns / 1_000_000_000) as Second, + fromMicro: (us: Micro): Second => (us / 1_000_000) as Second, + fromMilli: (ms: Milli): Second => (ms / 1_000) as Second, + toNano: (s: Second): Nano => (s * 1_000_000_000) as Nano, + toMicro: (s: Second): Micro => (s * 1_000_000) as Micro, + toMilli: (s: Second): Milli => (s * 1_000) as Milli, +} as const; diff --git a/js/hang/src/watch/audio/emitter.ts b/js/hang/src/watch/audio/emitter.ts index 427dee191..0f0d4c592 100644 --- a/js/hang/src/watch/audio/emitter.ts +++ b/js/hang/src/watch/audio/emitter.ts @@ -40,13 +40,16 @@ export class AudioEmitter { if (muted) { this.#unmuteVolume = this.volume.peek() || 0.5; this.volume.set(0); - this.source.enabled.set(false); } else { this.volume.set(this.#unmuteVolume); - this.source.enabled.set(true); } }); + this.#signals.effect((effect) => { + const enabled = !effect.get(this.paused) && !effect.get(this.muted); + this.source.enabled.set(enabled); + }); + // Set unmute when the volume is non-zero. this.#signals.effect((effect) => { const volume = effect.get(this.volume); @@ -60,10 +63,17 @@ export class AudioEmitter { const gain = new GainNode(root.context, { gain: effect.get(this.volume) }); root.connect(gain); - gain.connect(root.context.destination); // speakers - effect.cleanup(() => gain.disconnect()); - effect.set(this.#gain, gain); + + effect.effect(() => { + // We only connect/disconnect when enabled to save power. + // Otherwise the worklet keeps running in the background returning 0s. + const enabled = effect.get(this.source.enabled); + if (!enabled) return; + + gain.connect(root.context.destination); // speakers + effect.cleanup(() => gain.disconnect()); + }); }); this.#signals.effect((effect) => { @@ -81,10 +91,6 @@ export class AudioEmitter { gain.gain.exponentialRampToValueAtTime(volume, gain.context.currentTime + FADE_TIME); } }); - - this.#signals.effect((effect) => { - this.source.enabled.set(!effect.get(this.paused)); - }); } close() { diff --git a/js/hang/src/watch/audio/index.ts b/js/hang/src/watch/audio/index.ts index 5e132bcc6..4d9163811 100644 --- a/js/hang/src/watch/audio/index.ts +++ b/js/hang/src/watch/audio/index.ts @@ -2,6 +2,7 @@ import type * as Moq from "@kixelated/moq"; import { Effect, type Getter, Signal } from "@kixelated/signals"; import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; +import type * as Time from "../../time"; import * as Hex from "../../util/hex"; import type * as Render from "./render"; @@ -10,12 +11,17 @@ export * from "./emitter"; import { Captions, type CaptionsProps } from "./captions"; import { Speaking, type SpeakingProps } from "./speaking"; +// We want some extra overhead to avoid starving the render worklet. +// The default Opus frame duration is 20ms. +// TODO: Put it in the catalog so we don't have to guess. +const JITTER_UNDERHEAD = 25 as Time.Milli; + export type AudioProps = { // Enable to download the audio track. enabled?: boolean | Signal; // The latency hint to use for the AudioContext. - latency?: DOMHighResTimeStamp; + latency?: Time.Milli; // Enable to download the captions track. captions?: CaptionsProps; @@ -35,10 +41,12 @@ export class Audio { enabled: Signal; info = new Signal(undefined); + #context = new Signal(undefined); + readonly context: Getter = this.#context; + // The root of the audio graph, which can be used for custom visualizations. - // You can access the audio context via `root.context`. #worklet = new Signal(undefined); - // Downcast to AudioNode so it matches Publish. + // Downcast to AudioNode so it matches Publish.Audio readonly root = this.#worklet as Getter; #sampleRate = new Signal(undefined); @@ -47,11 +55,8 @@ export class Audio { captions: Captions; speaking: Speaking; - // Not a signal because it updates constantly. - #buffered: DOMHighResTimeStamp = 0; - // Not a signal because I'm lazy. - readonly latency: DOMHighResTimeStamp; + readonly latency: Time.Milli; #signals = new Effect(); @@ -63,7 +68,7 @@ export class Audio { this.broadcast = broadcast; this.catalog = catalog; this.enabled = Signal.from(props?.enabled ?? false); - this.latency = props?.latency ?? 100; // TODO Reduce this once fMP4 stuttering is fixed. + this.latency = props?.latency ?? (100 as Time.Milli); // TODO Reduce this once fMP4 stuttering is fixed. this.captions = new Captions(broadcast, this.info, props?.captions); this.speaking = new Speaking(broadcast, this.info, props?.speaking); @@ -72,13 +77,19 @@ export class Audio { }); this.#signals.effect(this.#runWorklet.bind(this)); + this.#signals.effect(this.#runEnabled.bind(this)); this.#signals.effect(this.#runDecoder.bind(this)); } #runWorklet(effect: Effect): void { - const enabled = effect.get(this.enabled); + // It takes a second or so to initialize the AudioContext/AudioWorklet, so do it even if disabled. + // This is less efficient for video-only playback but makes muting/unmuting instant. + + //const enabled = effect.get(this.enabled); + //if (!enabled) return; + const info = effect.get(this.info); - if (!enabled || !info) return; + if (!info) return; const sampleRate = info.config.sampleRate; const channelCount = info.config.numberOfChannels; @@ -87,9 +98,11 @@ export class Audio { // This way we can process the audio for visualizations. const context = new AudioContext({ - latencyHint: "interactive", + latencyHint: "interactive", // We don't use real-time because of the jitter buffer. sampleRate, }); + effect.set(this.#context, context); + effect.cleanup(() => context.close()); effect.spawn(async () => { @@ -103,28 +116,30 @@ export class Audio { }); effect.cleanup(() => worklet.disconnect()); - // Listen for buffer status updates (optional, for monitoring) - worklet.port.onmessage = (event: MessageEvent) => { - const { type, available } = event.data; - if (type === "status") { - this.#buffered = (1000 * available) / sampleRate; - } - }; - effect.cleanup(() => { - worklet.port.onmessage = null; - }); - - worklet.port.postMessage({ + const init: Render.Init = { type: "init", - sampleRate, - channelCount, + rate: sampleRate, + channels: channelCount, latency: this.latency, - }); + }; + worklet.port.postMessage(init); effect.set(this.#worklet, worklet); }); } + #runEnabled(effect: Effect): void { + const enabled = effect.get(this.enabled); + if (!enabled) return; + + const context = effect.get(this.#context); + if (!context) return; + + context.resume(); + + // NOTE: You should disconnect/reconnect the worklet to save power when disabled. + } + #runDecoder(effect: Effect): void { const enabled = effect.get(this.enabled); if (!enabled) return; @@ -152,29 +167,31 @@ export class Audio { description, }); + // Create consumer with slightly less latency than the render worklet to avoid underflowing. + const consumer = new Frame.Consumer(sub, { + latency: Math.max(this.latency - JITTER_UNDERHEAD, 0) as Time.Milli, + }); + effect.cleanup(() => consumer.close()); + effect.spawn(async (cancel) => { - try { - for (;;) { - const frame = await Promise.race([sub.readFrame(), cancel]); - if (!frame) break; - - const decoded = Frame.decode(frame); - - const chunk = new EncodedAudioChunk({ - type: "key", - data: decoded.data, - timestamp: decoded.timestamp, - }); - - decoder.decode(chunk); - } - } catch (error) { - console.warn("audio subscription error", error); + for (;;) { + const frame = await Promise.race([consumer.decode(), cancel]); + if (!frame) break; + + const chunk = new EncodedAudioChunk({ + type: frame.keyframe ? "key" : "delta", + data: frame.data, + timestamp: frame.timestamp, + }); + + decoder.decode(chunk); } }); } #emit(sample: AudioData) { + const timestamp = sample.timestamp as Time.Micro; + const worklet = this.#worklet.peek(); if (!worklet) { // We're probably in the process of closing. @@ -192,7 +209,7 @@ export class Audio { const msg: Render.Data = { type: "data", data: channelData, - timestamp: sample.timestamp, + timestamp, }; // Send audio data to worklet via postMessage @@ -210,8 +227,4 @@ export class Audio { this.captions.close(); this.speaking.close(); } - - get buffered() { - return this.#buffered; - } } diff --git a/js/hang/src/watch/audio/render-worklet.ts b/js/hang/src/watch/audio/render-worklet.ts index 174f51208..b9db0c955 100644 --- a/js/hang/src/watch/audio/render-worklet.ts +++ b/js/hang/src/watch/audio/render-worklet.ts @@ -1,13 +1,9 @@ -import type { Data, Init, Message, Status } from "./render"; +import type { Message } from "./render"; +import { AudioRingBuffer } from "./ring-buffer"; class Render extends AudioWorkletProcessor { - #buffer: Float32Array[] = []; - - #writeIndex = 0; - #readIndex = 0; - - // Wait for the buffer to be refilled before outputting. - #refill = true; + #buffer?: AudioRingBuffer; + #underflow = 0; constructor() { super(); @@ -16,9 +12,12 @@ class Render extends AudioWorkletProcessor { this.port.onmessage = (event: MessageEvent) => { const { type } = event.data; if (type === "init") { - this.#handleInit(event.data); + console.debug(`init: ${event.data.latency}`); + this.#buffer = new AudioRingBuffer(event.data); + this.#underflow = 0; } else if (type === "data") { - this.#handleData(event.data); + if (!this.#buffer) throw new Error("buffer not initialized"); + this.#buffer.write(event.data.timestamp, event.data.data); } else { const exhaustive: never = type; throw new Error(`unknown message type: ${exhaustive}`); @@ -26,94 +25,19 @@ class Render extends AudioWorkletProcessor { }; } - #handleInit(init: Init) { - // Sanity checks - if (init.channelCount === 0) throw new Error("invalid channels"); - if (init.sampleRate === 0) throw new Error("invalid sample rate"); - if (init.latency === 0) throw new Error("invalid latency"); - - if (this.#buffer.length > 0) return; // Already initialized - - const samples = Math.ceil((init.sampleRate * init.latency) / 1000); - - // Initialize circular buffer for each channel - this.#buffer = []; - for (let i = 0; i < init.channelCount; i++) { - this.#buffer[i] = new Float32Array(samples); - } - } - - #handleData(sample: Data) { - if (this.#buffer.length === 0) throw new Error("not initialized"); - - const samples = sample.data[0].length; - - // Discard old samples from the front to prevent an overflow. - const discard = this.#writeIndex - this.#readIndex + samples - this.#buffer[0].length; - if (discard >= 0) { - this.#refill = false; - this.#readIndex += discard; - } - - // Write new samples to buffer - for (let channel = 0; channel < Math.min(this.#buffer.length, sample.data.length); channel++) { - const src = sample.data[channel]; - const dst = this.#buffer[channel]; - - for (let i = 0; i < samples; i++) { - const writePos = (this.#writeIndex + i) % dst.length; - dst[writePos] = src[i]; - } - } - - this.#writeIndex += samples; - } - - #advance(samples: number) { - this.#readIndex += samples; - - if (this.#readIndex >= this.#buffer[0].length) { - this.#readIndex -= this.#buffer[0].length; - this.#writeIndex -= this.#buffer[0].length; - } - } - process(_inputs: Float32Array[][], outputs: Float32Array[][], _parameters: Record) { const output = outputs[0]; + const samplesRead = this.#buffer?.read(output) ?? 0; - // Not initialized yet, output silence - if (this.#buffer.length === 0 || output.length === 0) return true; - if (this.#refill) return true; - - // No data available, output silence - const samples = Math.min(this.#writeIndex - this.#readIndex, output[0].length); - if (samples === 0) return true; - - for (let channel = 0; channel < output.length; channel++) { - const dst = output[channel]; - const src = this.#buffer[channel]; - - for (let i = 0; i < samples; i++) { - const readPos = (this.#readIndex + i) % src.length; - dst[i] = src[readPos]; - } + if (samplesRead < output[0].length) { + this.#underflow += output[0].length - samplesRead; + } else if (this.#underflow > 0 && this.#buffer) { + console.warn(`audio underflow: ${Math.round((1000 * this.#underflow) / this.#buffer.rate)}ms`); + this.#underflow = 0; } - this.#advance(samples); - - // Send buffer status back to main thread for monitoring - this.post({ - type: "status", - available: this.#writeIndex - this.#readIndex, - utilization: (this.#writeIndex - this.#readIndex) / this.#buffer[0].length, - }); - return true; } - - private post(status: Status) { - this.port.postMessage(status); - } } registerProcessor("render", Render); diff --git a/js/hang/src/watch/audio/render.ts b/js/hang/src/watch/audio/render.ts index 14845497b..a2123fe3d 100644 --- a/js/hang/src/watch/audio/render.ts +++ b/js/hang/src/watch/audio/render.ts @@ -1,20 +1,16 @@ +import type * as Time from "../../time"; + export type Message = Init | Data; export interface Data { type: "data"; data: Float32Array[]; - timestamp: number; + timestamp: Time.Micro; } export interface Init { type: "init"; - sampleRate: number; - channelCount: number; - latency: DOMHighResTimeStamp; -} - -export interface Status { - type: "status"; - available: number; - utilization: number; + rate: number; + channels: number; + latency: Time.Milli; } diff --git a/js/hang/src/watch/audio/ring-buffer.test.ts b/js/hang/src/watch/audio/ring-buffer.test.ts new file mode 100644 index 000000000..6faf92187 --- /dev/null +++ b/js/hang/src/watch/audio/ring-buffer.test.ts @@ -0,0 +1,388 @@ +import { describe, expect, it } from "vitest"; +import * as Time from "../../time"; +import { AudioRingBuffer } from "./ring-buffer"; + +function read(buffer: AudioRingBuffer, samples: number, channelCount = 2): Float32Array[] { + const output: Float32Array[] = []; + for (let i = 0; i < channelCount; i++) { + output.push(new Float32Array(samples)); + } + const samplesRead = buffer.read(output); + // Return the output arrays with only the read samples + if (samplesRead < samples) { + return output.map((channel) => channel.slice(0, samplesRead)); + } + return output; +} + +function write( + buffer: AudioRingBuffer, + timestamp: Time.Milli, + samples: number, + props?: { channels?: number; value?: number }, +): void { + const channelCount = props?.channels ?? buffer.channels; + const data: Float32Array[] = []; + for (let i = 0; i < channelCount; i++) { + const channel = new Float32Array(samples); + channel.fill(props?.value ?? 1.0); + data.push(channel); + } + buffer.write(Time.Micro.fromMilli(timestamp), data); +} + +describe("initialization", () => { + it("should initialize with valid parameters", () => { + const buffer = new AudioRingBuffer({ rate: 48000, channels: 2, latency: 100 as Time.Milli }); + + expect(buffer.capacity).toBe(4800); // 48000 * 0.1 + expect(buffer.length).toBe(0); + }); + + it("should throw on invalid channel count", () => { + expect(() => new AudioRingBuffer({ rate: 48000, channels: 0, latency: 100 as Time.Milli })).toThrow( + /invalid channels/, + ); + }); + + it("should throw on invalid sample rate", () => { + expect(() => new AudioRingBuffer({ rate: 0, channels: 2, latency: 100 as Time.Milli })).toThrow( + /invalid sample rate/, + ); + }); + + it("should throw on invalid latency", () => { + expect(() => new AudioRingBuffer({ rate: 48000, channels: 2, latency: 0 as Time.Milli })).toThrow( + /invalid latency/, + ); + }); +}); + +describe("writing data", () => { + it("should write continuous data", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Write 10 samples at timestamp 0 + write(buffer, 0 as Time.Milli, 10, { channels: 2, value: 1.0 }); + expect(buffer.length).toBe(10); + + // Write 10 more samples at timestamp 10ms + write(buffer, 10 as Time.Milli, 10, { channels: 2, value: 2.0 }); + expect(buffer.length).toBe(20); + }); + + it("should handle gaps by filling with zeros", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); // 100 samples buffer + + // Write at timestamp 0 + write(buffer, 0 as Time.Milli, 10, { channels: 2, value: 1.0 }); + + // Write at timestamp 20ms (sample 20), creating a 10 sample gap + write(buffer, 20 as Time.Milli, 10, { channels: 2, value: 2.0 }); + + // Should have filled the gap with zeros + expect(buffer.length).toBe(30); // 10 + 10 (gap) + 10 + + // Exit refill mode by filling buffer + write(buffer, 30 as Time.Milli, 70, { channels: 2, value: 0.0 }); + expect(buffer.refilling).toBe(false); + + // Read and verify the gap was filled with zeros + const output = read(buffer, 30, 2); + expect(output[0].length).toBe(30); + + // First 10 samples should be 1.0 + for (let i = 0; i < 10; i++) { + expect(output[0][i]).toBe(1.0); + expect(output[1][i]).toBe(1.0); + } + // Next 10 samples should be 0 (the gap) + for (let i = 10; i < 20; i++) { + expect(output[0][i]).toBe(0); + expect(output[1][i]).toBe(0); + } + // Last 10 samples should be 2.0 + for (let i = 20; i < 30; i++) { + expect(output[0][i]).toBe(2.0); + expect(output[1][i]).toBe(2.0); + } + }); + + it("should handle late-arriving samples (out-of-order writes)", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + + // Fill buffer to exit refill mode + write(buffer, 0 as Time.Milli, 100, { channels: 1, value: 0.0 }); + expect(buffer.refilling).toBe(false); + + // Read 50 samples to advance read pointer to 50 + read(buffer, 50, 1); + + // Write at timestamp 120ms (sample 120) - this creates a gap from 100-120 + write(buffer, 120 as Time.Milli, 10, { channels: 1, value: 1.0 }); + + // Now write data that fills part of the gap at timestamp 110ms (sample 110) + // This should work because readIndex is at 50, so sample 110 is still ahead + write(buffer, 110 as Time.Milli, 10, { channels: 1, value: 2.0 }); + + // We should have samples from 50-99 (original), gap 100-109 (zeros), 110-119 (2.0), 120-129 (1.0) + expect(buffer.length).toBe(80); // 130 - 50 + + // Skip the old samples and gap + read(buffer, 60, 1); // Read samples 50-109 + + // Read and verify the out-of-order writes + const output = read(buffer, 20, 1); + expect(output[0].length).toBe(20); + + // First 10 samples should be 2.0 (the late-arriving data at 110-119) + for (let i = 0; i < 10; i++) { + expect(output[0][i]).toBe(2.0); + } + // Next 10 samples should be 1.0 (the earlier write at 120-129) + for (let i = 10; i < 20; i++) { + expect(output[0][i]).toBe(1.0); + } + }); + + it("should discard samples that are too old", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Exit refill mode by filling buffer + write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); + expect(buffer.refilling).toBe(false); + + // Read 60 samples, readIndex now at 60 + read(buffer, 60, 2); + + // Write 50 new samples at timestamp 100 + write(buffer, 100 as Time.Milli, 50, { channels: 2, value: 1.0 }); + expect(buffer.length).toBe(90); // 150 - 60 + + // Read 10 more samples, readIndex now at 70 + read(buffer, 10, 2); + expect(buffer.length).toBe(80); // 150 - 70 + + // Try to write data that's before the read index (at sample 50, which is before 70) + write(buffer, 50 as Time.Milli, 5, { channels: 2, value: 2.0 }); // These should be ignored + + // Available should still be 80 (unchanged because old samples were discarded) + expect(buffer.length).toBe(80); + }); + + it("should not throw when writing to buffer", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + // The current implementation doesn't require initialization + expect(() => write(buffer, 0 as Time.Milli, 10, { channels: 2, value: 0.0 })).not.toThrow(); + }); +}); + +describe("reading data", () => { + it("should read available data", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Exit refill mode by filling the buffer + write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); + // Buffer should now be out of refill mode + expect(buffer.refilling).toBe(false); + + // Read some samples to make room (readIndex at 80) + read(buffer, 80, 2); + expect(buffer.length).toBe(20); // 100 - 80 + + // Write 20 samples at the current position + write(buffer, 100 as Time.Milli, 20, { channels: 2, value: 1.5 }); + expect(buffer.length).toBe(40); // 120 - 80 + + // First read the remaining old samples (80-99) + const output1 = read(buffer, 20, 2); + expect(output1[0].length).toBe(20); + for (let channel = 0; channel < 2; channel++) { + for (let i = 0; i < 20; i++) { + expect(output1[channel][i]).toBe(0.0); + } + } + + // Now read the new samples (100-109) + const output2 = read(buffer, 10, 2); + expect(output2[0].length).toBe(10); + expect(buffer.length).toBe(10); // 120 - 110 + + // Verify the new data + for (let channel = 0; channel < 2; channel++) { + for (let i = 0; i < 10; i++) { + expect(output2[channel][i]).toBe(1.5); + } + } + }); + + it("should handle partial reads", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Exit refill mode by filling the buffer + write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); + expect(buffer.refilling).toBe(false); + + // Read some to make room (readIndex at 80) + read(buffer, 80, 2); + + // Write 20 samples + write(buffer, 100 as Time.Milli, 20, { channels: 2, value: 1.0 }); + expect(buffer.length).toBe(40); // 120 - 80 + + // Try to read 50 samples (only 40 available) + const output = read(buffer, 50, 2); + + expect(output[0].length).toBe(40); + expect(buffer.length).toBe(0); + }); + + it("should return 0 when no data available", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + const output = read(buffer, 10, 2); + expect(output[0].length).toBe(0); + }); + + it("should return 0 when not initialized", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + const output = read(buffer, 10, 2); + expect(output[0].length).toBe(0); + }); +}); + +describe("refill behavior", () => { + it("should start in refill mode", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + expect(buffer.refilling).toBe(true); + + // Should not output anything in refill mode + write(buffer, 0 as Time.Milli, 50, { channels: 2, value: 1.0 }); + const output = read(buffer, 10, 2); + expect(output[0].length).toBe(0); + }); + + it("should exit refill mode when buffer is full", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Fill the buffer completely + write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 1.0 }); + + // Write more data to trigger overflow handling + write(buffer, 10 as Time.Milli, 50, { channels: 2, value: 2.0 }); // This should exit refill mode + + expect(buffer.refilling).toBe(false); + + // Now we should be able to read + const output = read(buffer, 10, 2); + expect(output[0].length).toBe(10); + }); +}); + +describe("ring buffer wrapping", () => { + it("should wrap around when buffer is full", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 1, latency: 100 as Time.Milli }); + + // Fill the buffer + write(buffer, 0 as Time.Milli, 100, { channels: 1, value: 1.0 }); + expect(buffer.refilling).toBe(false); + + // Read 50 samples to make room (readIndex at 50) + const output1 = read(buffer, 50, 1); + expect(output1[0].length).toBe(50); + + // Write 50 more samples at timestamp 100 (fills from sample 100-149) + write(buffer, 100 as Time.Milli, 50, { channels: 1, value: 2.0 }); + + // Now we have 100 samples available (50-149) + expect(buffer.length).toBe(100); + + // Write 50 more samples at timestamp 150, this will wrap around + write(buffer, 150 as Time.Milli, 50, { channels: 1, value: 3.0 }); + + // Should still have 100 samples (buffer is at capacity) + expect(buffer.length).toBe(100); + + // Read all 100 samples + const output2 = read(buffer, 100, 1); + expect(output2[0].length).toBe(100); + + // First 50 should be 2.0, next 50 should be 3.0 + for (let i = 0; i < 50; i++) { + expect(output2[0][i]).toBe(2.0); + } + for (let i = 50; i < 100; i++) { + expect(output2[0][i]).toBe(3.0); + } + }); +}); + +describe("multi-channel handling", () => { + it("should handle stereo data correctly", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Exit refill mode by filling buffer + write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.5 }); + expect(buffer.refilling).toBe(false); + + // Read some to make room + read(buffer, 80, 2); + + // Write stereo data with same value for both channels + write(buffer, 100 as Time.Milli, 20, { channels: 2, value: 1.5 }); + + // Read and verify + const output = read(buffer, 20, 2); + expect(output[0].length).toBe(20); + expect(output[1].length).toBe(20); + + // Both channels should have the same data + for (let i = 0; i < 20; i++) { + expect(output[0][i]).toBe(0.5); + expect(output[1][i]).toBe(0.5); + } + + // Read the next batch + const output2 = read(buffer, 20, 2); + for (let i = 0; i < 20; i++) { + expect(output2[0][i]).toBe(1.5); + expect(output2[1][i]).toBe(1.5); + } + }); +}); + +describe("edge cases", () => { + it("should throw when output array has wrong channel count", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + write(buffer, 0 as Time.Milli, 50, { channels: 2, value: 1.0 }); + + const output: Float32Array[] = []; + // Current implementation throws when channel count doesn't match + expect(() => buffer.read(output)).toThrow(/wrong number of channels/); + }); + + it("should handle zero-length output buffers", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + write(buffer, 0 as Time.Milli, 50, { channels: 2, value: 1.0 }); + + const output = [new Float32Array(0), new Float32Array(0)]; + const samplesRead = buffer.read(output); + expect(samplesRead).toBe(0); + }); + + it("should handle fractional timestamps", () => { + const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli }); + + // Exit refill mode first + write(buffer, 0 as Time.Milli, 100, { channels: 2, value: 0.0 }); + write(buffer, 10 as Time.Milli, 10, { channels: 2, value: 0.0 }); // This exits refill mode + read(buffer, 110, 2); + + // Write with fractional timestamp that rounds + write(buffer, 1105 as Time.Milli, 10, { channels: 2, value: 1.0 }); // 110.5 samples, rounds to 111 + write(buffer, 1204 as Time.Milli, 10, { channels: 2, value: 2.0 }); // 120.4 samples, rounds to 120 + + const output = read(buffer, 20, 2); + expect(output[0].length).toBeGreaterThan(0); + }); +}); diff --git a/js/hang/src/watch/audio/ring-buffer.ts b/js/hang/src/watch/audio/ring-buffer.ts new file mode 100644 index 000000000..dc0aa48da --- /dev/null +++ b/js/hang/src/watch/audio/ring-buffer.ts @@ -0,0 +1,126 @@ +import * as Time from "../../time"; + +export class AudioRingBuffer { + #buffer: Float32Array[]; + #writeIndex = 0; + #readIndex = 0; + + readonly rate: number; + readonly channels: number; + #refill = true; + + constructor(props: { rate: number; channels: number; latency: Time.Milli }) { + if (props.channels <= 0) throw new Error("invalid channels"); + if (props.rate <= 0) throw new Error("invalid sample rate"); + if (props.latency <= 0) throw new Error("invalid latency"); + + const samples = Math.ceil(props.rate * Time.Second.fromMilli(props.latency)); + if (samples === 0) throw new Error("empty buffer"); + + this.rate = props.rate; + this.channels = props.channels; + + this.#buffer = []; + for (let i = 0; i < this.channels; i++) { + this.#buffer[i] = new Float32Array(samples); + } + } + + get refilling(): boolean { + return this.#refill; + } + + get length(): number { + return this.#writeIndex - this.#readIndex; + } + + get capacity(): number { + return this.#buffer[0]?.length; + } + + write(timestamp: Time.Micro, data: Float32Array[]): void { + if (data.length !== this.channels) throw new Error("wrong number of channels"); + + let start = Math.round(Time.Second.fromMicro(timestamp) * this.rate); + let samples = data[0].length; + + // Ignore samples that are too old (before the read index) + let offset = this.#readIndex - start; + if (offset > samples) { + // All samples are too old, ignore them + return; + } else if (offset > 0) { + // Some samples are too old, skip them + samples -= offset; + start += offset; + } else { + offset = 0; + } + + const end = start + samples; + + // Check if we need to discard old samples to prevent overflow + const overflow = end - this.#readIndex - this.#buffer[0].length; + if (overflow >= 0) { + // Discard old samples and exit refill mode + this.#refill = false; + this.#readIndex += overflow; + } + + // Fill gaps with zeros if there's a discontinuity + if (start > this.#writeIndex) { + const gapSize = Math.min(start - this.#writeIndex, this.#buffer[0].length); + if (gapSize === 1) { + console.warn("floating point inaccuracy detected"); + } + + for (let channel = 0; channel < this.channels; channel++) { + const dst = this.#buffer[channel]; + for (let i = 0; i < gapSize; i++) { + const writePos = (this.#writeIndex + i) % dst.length; + dst[writePos] = 0; + } + } + } + + // Write the actual samples + for (let channel = 0; channel < this.channels; channel++) { + const src = data[channel]; + const dst = this.#buffer[channel]; + if (src.length !== samples) throw new Error("mismatching number of samples"); + + for (let i = 0; i < samples; i++) { + const writePos = (start + i) % dst.length; + dst[writePos] = src[offset + i]; + } + } + + // Update write index, but only if we're moving forward + if (end > this.#writeIndex) { + this.#writeIndex = end; + } + } + + read(output: Float32Array[]): number { + if (output.length !== this.channels) throw new Error("wrong number of channels"); + if (this.#refill) return 0; + + const samples = Math.min(this.#writeIndex - this.#readIndex, output[0].length); + if (samples === 0) return 0; + + for (let channel = 0; channel < this.channels; channel++) { + const dst = output[channel]; + const src = this.#buffer[channel]; + + if (dst.length !== output[0].length) throw new Error("mismatching number of samples"); + + for (let i = 0; i < samples; i++) { + const readPos = (this.#readIndex + i) % src.length; + dst[i] = src[readPos]; + } + } + + this.#readIndex += samples; + return samples; + } +} diff --git a/js/moq/src/group.ts b/js/moq/src/group.ts index da28d71a7..1e2b8de24 100644 --- a/js/moq/src/group.ts +++ b/js/moq/src/group.ts @@ -77,7 +77,7 @@ export class GroupProducer { */ export class GroupConsumer { /** The unique identifier for this reader */ - readonly id: number; + readonly sequence: number; #frames: WatchConsumer; #index = 0; @@ -90,7 +90,7 @@ export class GroupConsumer { * @internal */ constructor(frames: WatchConsumer, id: number) { - this.id = id; + this.sequence = id; this.#frames = frames; } @@ -138,7 +138,7 @@ export class GroupConsumer { * @returns A new GroupConsumer instance */ clone(): GroupConsumer { - return new GroupConsumer(this.#frames.clone(), this.id); + return new GroupConsumer(this.#frames.clone(), this.sequence); } get index() { diff --git a/js/moq/src/ietf/publisher.ts b/js/moq/src/ietf/publisher.ts index 63fdfd875..73adeb1d1 100644 --- a/js/moq/src/ietf/publisher.ts +++ b/js/moq/src/ietf/publisher.ts @@ -151,7 +151,7 @@ export class Publisher { const header = new Group( subscribeId, trackAlias, - group.id, + group.sequence, 0, // publisherPriority ); await header.encode(stream); diff --git a/js/moq/src/lite/publisher.ts b/js/moq/src/lite/publisher.ts index 9854c1d4a..8f9302d4f 100644 --- a/js/moq/src/lite/publisher.ts +++ b/js/moq/src/lite/publisher.ts @@ -201,7 +201,7 @@ export class Publisher { * @internal */ async #runGroup(sub: bigint, group: GroupConsumer) { - const msg = new Group(sub, group.id); + const msg = new Group(sub, group.sequence); try { const stream = await Writer.open(this.#quic); await stream.u8(0); diff --git a/js/moq/src/lite/subscriber.ts b/js/moq/src/lite/subscriber.ts index 735206df2..28bb77648 100644 --- a/js/moq/src/lite/subscriber.ts +++ b/js/moq/src/lite/subscriber.ts @@ -154,7 +154,10 @@ export class Subscriber { async runGroup(group: Group, stream: Reader) { const subscribe = this.#subscribes.get(group.subscribe); if (!subscribe) { - console.warn(`unknown subscription: id=${group.subscribe}`); + if (group.subscribe >= this.#subscribeNext) { + throw new Error(`unknown subscription: id=${group.subscribe}`); + } + return; } diff --git a/js/moq/src/track.test.ts b/js/moq/src/track.test.ts index 07220739f..c54314173 100644 --- a/js/moq/src/track.test.ts +++ b/js/moq/src/track.test.ts @@ -18,9 +18,9 @@ test("track clone", async () => { const group1B = await consumerB.nextGroup(); const group1C = await consumerC.nextGroup(); - assert.strictEqual(group1A?.id, group1.id); - assert.strictEqual(group1B?.id, group1.id); - assert.strictEqual(group1C?.id, group1.id); + assert.strictEqual(group1A?.sequence, group1.id); + assert.strictEqual(group1B?.sequence, group1.id); + assert.strictEqual(group1C?.sequence, group1.id); // Append a new group, everybody gets it const group2 = producer.appendGroup(); @@ -29,16 +29,16 @@ test("track clone", async () => { const group2B = await consumerB.nextGroup(); const group2C = await consumerC.nextGroup(); - assert.strictEqual(group2A?.id, group2.id); - assert.strictEqual(group2B?.id, group2.id); - assert.strictEqual(group2C?.id, group2.id); + assert.strictEqual(group2A?.sequence, group2.id); + assert.strictEqual(group2B?.sequence, group2.id); + assert.strictEqual(group2C?.sequence, group2.id); // Clone the reader after we appended that group. // This new reader gets the most recent group but that's it. const consumerD = consumerA.clone(); const group2D = await consumerD.nextGroup(); - assert.strictEqual(group2D?.id, group2.id); + assert.strictEqual(group2D?.sequence, group2.id); // Everybody gets the new group const group3 = producer.appendGroup(); @@ -48,10 +48,10 @@ test("track clone", async () => { const group3C = await consumerC.nextGroup(); const group3D = await consumerD.nextGroup(); - assert.strictEqual(group3A?.id, group3.id); - assert.strictEqual(group3B?.id, group3.id); - assert.strictEqual(group3C?.id, group3.id); - assert.strictEqual(group3D?.id, group3.id); + assert.strictEqual(group3A?.sequence, group3.id); + assert.strictEqual(group3B?.sequence, group3.id); + assert.strictEqual(group3C?.sequence, group3.id); + assert.strictEqual(group3D?.sequence, group3.id); // It's okay to close readers. consumerA.close(); @@ -64,14 +64,14 @@ test("track clone", async () => { const group4C = await consumerC.nextGroup(); const group4D = await consumerD.nextGroup(); - assert.strictEqual(group4A?.id, undefined); - assert.strictEqual(group4B?.id, undefined); - assert.strictEqual(group4C?.id, group4.id); - assert.strictEqual(group4D?.id, group4.id); + assert.strictEqual(group4A?.sequence, undefined); + assert.strictEqual(group4B?.sequence, undefined); + assert.strictEqual(group4C?.sequence, group4.id); + assert.strictEqual(group4D?.sequence, group4.id); const consumerE = consumerC.clone(); const group4E = await consumerE.nextGroup(); - assert.strictEqual(group4E?.id, group4.id); + assert.strictEqual(group4E?.sequence, group4.id); }); test("track group cloning", async () => { @@ -88,8 +88,8 @@ test("track group cloning", async () => { const groupA = await consumerA.nextGroup(); const groupB = await consumerB.nextGroup(); - assert.strictEqual(groupA?.id, group.id); - assert.strictEqual(groupB?.id, group.id); + assert.strictEqual(groupA?.sequence, group.id); + assert.strictEqual(groupB?.sequence, group.id); const frame1A = await groupA.readFrame(); const frame1B = await groupB.readFrame(); diff --git a/js/moq/src/track.ts b/js/moq/src/track.ts index 4238f567b..fbd919109 100644 --- a/js/moq/src/track.ts +++ b/js/moq/src/track.ts @@ -49,12 +49,12 @@ export class TrackProducer { * @param group - The group to insert */ insertGroup(group: GroupConsumer) { - if (group.id < (this.#next ?? 0)) { + if (group.sequence < (this.#next ?? 0)) { group.close(); return; } - this.#next = group.id + 1; + this.#next = group.sequence + 1; this.#latest.update((latest) => { latest?.close(); return group; @@ -210,12 +210,12 @@ export class TrackConsumer { this.#nextFrame = this.#currentGroup?.readFrame(); // Return the frame and increment the frame index. - return { group: this.#currentGroup?.id, frame: this.#currentFrame++, data: next.frame }; + return { group: this.#currentGroup?.sequence, frame: this.#currentFrame++, data: next.frame }; } this.#nextGroup = this.#groups.next((group) => !!group).then((group) => group?.clone()); - if (this.#currentGroup && this.#currentGroup.id >= next.group.id) { + if (this.#currentGroup && this.#currentGroup.sequence >= next.group.sequence) { // Skip this old group. next.group.close(); continue; diff --git a/js/pnpm-lock.yaml b/js/pnpm-lock.yaml index 93e0a08bd..5e957ead2 100644 --- a/js/pnpm-lock.yaml +++ b/js/pnpm-lock.yaml @@ -67,6 +67,9 @@ importers: fast-glob: specifier: ^3.3.3 version: 3.3.3 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5) hang-demo: dependencies: @@ -902,6 +905,12 @@ packages: '@types/audioworklet@0.0.77': resolution: {integrity: sha512-aPQ0DurtnDRWO3qvu8iK1R0aDlKS7sDOWx7MVhMWkGTmlyfnf50GPKYJlKdq8UP159ntPiqPJ5XZ/2v1R0V0Bw==} + '@types/chai@5.2.2': + resolution: {integrity: sha512-8kB30R7Hwqf40JPiKhVzodJs2Qc1ZJ5zuT3uzw5Hq/dhNCl3G3l83jfpdI1e20BP348+fV7VIL/+FxaXkqBmWg==} + + '@types/deep-eql@4.0.2': + resolution: {integrity: sha512-c9h9dVVMigMPc4bwTvC5dxqtqJZwQPePsWjPlpSOnojbor6pGqdk541lfA7AqFQr5pB1BRdq0juY9db81BwyFw==} + '@types/deno@2.3.0': resolution: {integrity: sha512-/4SyefQpKjwNKGkq9qG3Ln7MazfbWKvydyVFBnXzP5OQA4u1paoFtaOe1iHKycIWHHkhYag0lPxyheThV1ijzw==} @@ -917,6 +926,35 @@ packages: '@types/web@0.0.241': resolution: {integrity: sha512-aHz73musjS4z8Sm0wLaEj4lOaRxhKZRD4XerKL4rxNhhqmvthKkYzS0kHLEVsjOFRN2zo3gQAlyf0EKL1QYYQg==} + '@vitest/expect@3.2.4': + resolution: {integrity: sha512-Io0yyORnB6sikFlt8QW5K7slY4OjqNX9jmJQ02QDda8lyM6B5oNgVWoSoKPac8/kgnCUzuHQKrSLtu/uOqqrig==} + + '@vitest/mocker@3.2.4': + resolution: {integrity: sha512-46ryTE9RZO/rfDd7pEqFl7etuyzekzEhUbTW3BvmeO/BcCMEgq59BKhek3dXDWgAj4oMK6OZi+vRr1wPW6qjEQ==} + peerDependencies: + msw: ^2.4.9 + vite: ^5.0.0 || ^6.0.0 || ^7.0.0-0 + peerDependenciesMeta: + msw: + optional: true + vite: + optional: true + + '@vitest/pretty-format@3.2.4': + resolution: {integrity: sha512-IVNZik8IVRJRTr9fxlitMKeJeXFFFN0JaB9PHPGQ8NKQbGpfjlTx9zO4RefN8gp7eqjNy8nyK3NZmBzOPeIxtA==} + + '@vitest/runner@3.2.4': + resolution: {integrity: sha512-oukfKT9Mk41LreEW09vt45f8wx7DordoWUZMYdY/cyAk7w5TWkTRCNZYF7sX7n2wB7jyGAl74OxgwhPgKaqDMQ==} + + '@vitest/snapshot@3.2.4': + resolution: {integrity: sha512-dEYtS7qQP2CjU27QBC5oUOxLE/v5eLkGqPE0ZKEIDGMs4vKWe7IjgLOeauHsR0D5YuuycGRO5oSRXnwnmA78fQ==} + + '@vitest/spy@3.2.4': + resolution: {integrity: sha512-vAfasCOe6AIK70iP5UD11Ac4siNUNJ9i/9PZ3NKx07sG6sUxeag1LWdNrMWeKKYBLlzuK+Gn65Yd5nyL6ds+nw==} + + '@vitest/utils@3.2.4': + resolution: {integrity: sha512-fB2V0JFrQSMsCo9HiSq3Ezpdv4iYaXRG1Sx8edX3MwxfyNn83mKiGzOcH+Fkxt4MHxr3y42fQi1oeAInqgX2QA==} + acorn@8.15.0: resolution: {integrity: sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==} engines: {node: '>=0.4.0'} @@ -949,6 +987,10 @@ packages: resolution: {integrity: sha512-tLkvA81vQG/XqE2mjDkGQHoOINtMHtysSnemrmoGe6PydDPMRbVugqyk4A6V/WDWEfm3l+0d8anA9r8cv/5Jaw==} engines: {node: '>=12'} + assertion-error@2.0.1: + resolution: {integrity: sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==} + engines: {node: '>=12'} + async-mutex@0.5.0: resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==} @@ -977,13 +1019,25 @@ packages: buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} + cac@6.7.14: + resolution: {integrity: sha512-b6Ilus+c3RrdDk+JhLKUAQfzzgLEPy6wcXqS7f/xe1EETvsDP6GORG7SFuOs6cID5YkqchW/LXZbX5bc8j7ZcQ==} + engines: {node: '>=8'} + camel-case@4.1.2: resolution: {integrity: sha512-gxGWBrTT1JuMx6R+o5PTXMmUnhnVzLQ9SNutD4YqKtI6ap897t3tKECYla6gCWEkplXnlNybEkZg9GEGxKFCgw==} + chai@5.3.3: + resolution: {integrity: sha512-4zNhdJD/iOjSH0A05ea+Ke6MU5mmpQcbQsSOkgdaUMJ9zTlDTD/GYlwohmIE2u0gaxHYiVHEn1Fw9mZ/ktJWgw==} + engines: {node: '>=18'} + chalk@4.1.2: resolution: {integrity: sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==} engines: {node: '>=10'} + check-error@2.1.1: + resolution: {integrity: sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==} + engines: {node: '>= 16'} + chownr@3.0.0: resolution: {integrity: sha512-+IxzY9BZOQd/XuYPRmrvEVjF/nqj5kgT4kEq7VofrDoM1MxoRjEWkrCC3EtLi59TVawxTAn+orJwFQcrqEN1+g==} engines: {node: '>=18'} @@ -1071,6 +1125,19 @@ packages: csstype@3.1.3: resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==} + debug@4.4.1: + resolution: {integrity: sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ==} + engines: {node: '>=6.0'} + peerDependencies: + supports-color: '*' + peerDependenciesMeta: + supports-color: + optional: true + + deep-eql@5.0.2: + resolution: {integrity: sha512-h5k/5U50IJJFpzfL6nO9jaaumfjO/f2NjK/oYB2Djzm4p9L+3T9qWpZqZ2hAbLPuuYq9wrU08WQyBTL5GbPk5Q==} + engines: {node: '>=6'} + define-data-property@1.1.4: resolution: {integrity: sha512-rBMvIzlpA8v6E+SJZoo++HAYqsLrkg7MSfIinMPFhmkorw7X+dOXVJQs+QT69zGkzMyfDnIMN2Wid1+NbL3T+A==} engines: {node: '>= 0.4'} @@ -1147,6 +1214,9 @@ packages: resolution: {integrity: sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw==} engines: {node: '>= 0.4'} + es-module-lexer@1.7.0: + resolution: {integrity: sha512-jEQoCwk8hyb2AZziIOLhDqpm5+2ww5uIE6lkO/6jcOCusfk6LhMHpXXfBLXTZ7Ydyt0j4VoUQv6uGNYbdW+kBA==} + es6-error@4.1.1: resolution: {integrity: sha512-Um/+FxMr9CISWh0bi5Zv0iOD+4cFh5qLeks1qhAopKVAJw3drgKbKySikp7wGhDL0HPeaja0P5ULZrxLkniUVg==} @@ -1170,6 +1240,13 @@ packages: estree-walker@2.0.2: resolution: {integrity: sha512-Rfkk/Mp/DL7JVje3u18FxFujQlTNR2q6QfMSMB7AvCBx91NGj/ba3kCfza0f6dVDbw7YlRf/nDrn7pQrCCyQ/w==} + estree-walker@3.0.3: + resolution: {integrity: sha512-7RUKfXgSMMkzt6ZuXmqapOurLGPPfgj6l9uRZ7lRGolvk0y2yocc35LdcxKC5PQZdn2DMqioAQ2NoWcrTKmm6g==} + + expect-type@1.2.2: + resolution: {integrity: sha512-JhFGDVJ7tmDJItKhYgJCGLOWjuK9vPxiXoUFLwLDc99NlmklilbiQJwoctZtt13+xMw91MCk/REan6MWHqDjyA==} + engines: {node: '>=12.0.0'} + fast-glob@3.3.3: resolution: {integrity: sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==} engines: {node: '>=8.6.0'} @@ -1321,6 +1398,9 @@ packages: jose@6.1.0: resolution: {integrity: sha512-TTQJyoEoKcC1lscpVDCSsVgYzUDg/0Bt3WE//WiTPK6uOCQC2KZS4MpugbMWt/zyjkopgZoXhZuCi00gLudfUA==} + js-tokens@9.0.1: + resolution: {integrity: sha512-mxa9E9ITFOt0ban3j6L5MpjwegGz6lBQmM1IJkWeBZGcMxto50+eWdjC/52xDbS2vy0k7vIMK0Fe2wfL9OQSpQ==} + js-yaml@4.1.0: resolution: {integrity: sha512-wpxZs9NoxZaJESJGIZTyDEaYpl0FKSA+FB9aJiyemKhMwkxQg63h4T1KJgUGHpTqPDNRcmmYLugrRjJlBtWvRA==} hasBin: true @@ -1419,6 +1499,9 @@ packages: long@5.3.2: resolution: {integrity: sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==} + loupe@3.2.1: + resolution: {integrity: sha512-CdzqowRJCeLU72bHvWqwRBBlLcMEtIvGrlvef74kMnV2AolS9Y8xUv1I0U/MNAWMhBlKIoyuEgoJ0t/bbwHbLQ==} + lower-case@2.0.2: resolution: {integrity: sha512-7fm3l3NAF9WfN6W3JOmf5drwpVqX78JtoGJ3A6W0a6ZnldM41w2fV5D490psKFTpMds8TJse/eHLFFsNHHjHgg==} @@ -1469,6 +1552,9 @@ packages: engines: {node: '>=10'} hasBin: true + ms@2.1.3: + resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + nanoid@3.3.11: resolution: {integrity: sha512-N8SpfPUnUp1bK+PMYW8qSWdl9U+wwNWI4QKxOYDy9JAro3WMX7p2OeVRF9v+347pnakNevPmiHhNmZ2HbFA76w==} engines: {node: ^10 || ^12 || ^13.7 || ^14 || >=15.0.1} @@ -1555,6 +1641,13 @@ packages: pathe@0.2.0: resolution: {integrity: sha512-sTitTPYnn23esFR3RlqYBWn4c45WGeLcsKzQiUpXJAyfcWkolvlYpV8FLo7JishK946oQwMFUCHXQ9AjGPKExw==} + pathe@2.0.3: + resolution: {integrity: sha512-WUjGcAqP1gQacoQe+OBJsFA7Ld4DyXuUIjZ5cc75cLHvJ7dtNsTugphxIADwspS+AraAUePCKrSVtPLFj/F88w==} + + pathval@2.0.1: + resolution: {integrity: sha512-//nshmD55c46FuFw26xV/xFAaB5HF9Xdap7HJBBnrKdAd6/GxDBaNA1870O79+9ueg61cZLSVc+OaFlfmObYVQ==} + engines: {node: '>= 14.16'} + picocolors@1.1.1: resolution: {integrity: sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA==} @@ -1661,6 +1754,9 @@ packages: resolution: {integrity: sha512-ObmnIF4hXNg1BqhnHmgbDETF8dLPCggZWBjkQfhZpbszZnYur5DUljTcCHii5LC3J5E0yeO/1LIMyH+UvHQgyw==} engines: {node: '>= 0.4'} + siginfo@2.0.0: + resolution: {integrity: sha512-ybx0WO1/8bSBLEWXZvEd7gMW3Sn3JFlW3TvX1nREbDLRNQNaeNN8WK0meBwPdAaOI7TtRRRJn/Es1zhrrCHu7g==} + signal-exit@4.1.0: resolution: {integrity: sha512-bzyZ1e88w9O1iNJbKnOlvYTrWPDl46O1bG0D3XInv+9tkPrxrN8jUUTiFlDkkmKWgn1M6CfIA13SuGqOa9Korw==} engines: {node: '>=14'} @@ -1693,6 +1789,12 @@ packages: sprintf-js@1.1.3: resolution: {integrity: sha512-Oo+0REFV59/rz3gfJNKQiBlwfHaSESl1pcGyABQsnnIfWOFt6JNj5gCog2U6MLZ//IGYD+nA8nI+mTShREReaA==} + stackback@0.0.2: + resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==} + + std-env@3.9.0: + resolution: {integrity: sha512-UGvjygr6F6tpH7o2qyqR6QYpwraIjKSdtzyBdyytFOHmPZY917kwdwLG0RbOjWOnKmnm3PeHjaoLLMie7kPLQw==} + string-width@4.2.3: resolution: {integrity: sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==} engines: {node: '>=8'} @@ -1713,6 +1815,9 @@ packages: resolution: {integrity: sha512-4X2FR3UwhNUE9G49aIsJW5hRRR3GXGTBTZRMfv568O60ojM8HcWjV/VxAxCDW3SUND33O6ZY66ZuRcdkj73q2g==} engines: {node: '>=14.16'} + strip-literal@3.0.0: + resolution: {integrity: sha512-TcccoMhJOM3OebGhSBEmp3UZ2SfDMZUEBdRA/9ynfLi8yYajyWX3JiXArcJt4Umh4vISpspkQIY8ZZoCqjbviA==} + supports-color@7.2.0: resolution: {integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==} engines: {node: '>=8'} @@ -1737,10 +1842,28 @@ packages: engines: {node: '>=10'} hasBin: true + tinybench@2.9.0: + resolution: {integrity: sha512-0+DUvqWMValLmha6lr4kD8iAMK1HzV0/aKnCtWb9v9641TnP/MFb7Pc2bxoxQjTXAErryXVgUOfv2YqNllqGeg==} + + tinyexec@0.3.2: + resolution: {integrity: sha512-KQQR9yN7R5+OSwaK0XQoj22pwHoTlgYqmUscPYoknOoWCWfj/5/ABTMRi69FrKU5ffPVh5QcFikpWJI/P1ocHA==} + tinyglobby@0.2.14: resolution: {integrity: sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ==} engines: {node: '>=12.0.0'} + tinypool@1.1.1: + resolution: {integrity: sha512-Zba82s87IFq9A9XmjiX5uZA/ARWDrB03OHlq+Vw1fSdt0I+4/Kutwy8BP4Y/y/aORMo61FQ0vIb5j44vSo5Pkg==} + engines: {node: ^18.0.0 || >=20.0.0} + + tinyrainbow@2.0.0: + resolution: {integrity: sha512-op4nsTR47R6p0vMUUoYl/a+ljLFVtlfaXkLQmqfLR1qHma1h/ysYk4hEXZ880bf2CYgTskvTa/e196Vd5dDQXw==} + engines: {node: '>=14.0.0'} + + tinyspy@4.0.3: + resolution: {integrity: sha512-t2T/WLB2WRgZ9EpE4jgPJ9w+i66UZfDc8wHh0xrwiRNN+UwH98GIJkTeZqX9rg0i0ptwzqW+uYeIF0T4F8LR7A==} + engines: {node: '>=14.0.0'} + to-regex-range@5.0.1: resolution: {integrity: sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==} engines: {node: '>=8.0'} @@ -1776,6 +1899,11 @@ packages: util-deprecate@1.0.2: resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==} + vite-node@3.2.4: + resolution: {integrity: sha512-EbKSKh+bh1E1IFxeO0pg1n4dvoOTt0UDiXMd/qn++r98+jPO1xtJilvXldeuQ8giIB5IkpjCgMleHMNEsGH6pg==} + engines: {node: ^18.0.0 || ^20.0.0 || >=22.0.0} + hasBin: true + vite-plugin-html@3.2.2: resolution: {integrity: sha512-vb9C9kcdzcIo/Oc3CLZVS03dL5pDlOFuhGlZYDCJ840BhWl/0nGeZWf3Qy7NlOayscY4Cm/QRgULCQkEZige5Q==} peerDependencies: @@ -1821,6 +1949,34 @@ packages: yaml: optional: true + vitest@3.2.4: + resolution: {integrity: sha512-LUCP5ev3GURDysTWiP47wRRUpLKMOfPh+yKTx3kVIEiu5KOMeqzpnYNsKyOoVrULivR8tLcks4+lga33Whn90A==} + engines: {node: ^18.0.0 || ^20.0.0 || >=22.0.0} + hasBin: true + peerDependencies: + '@edge-runtime/vm': '*' + '@types/debug': ^4.1.12 + '@types/node': ^18.0.0 || ^20.0.0 || >=22.0.0 + '@vitest/browser': 3.2.4 + '@vitest/ui': 3.2.4 + happy-dom: '*' + jsdom: '*' + peerDependenciesMeta: + '@edge-runtime/vm': + optional: true + '@types/debug': + optional: true + '@types/node': + optional: true + '@vitest/browser': + optional: true + '@vitest/ui': + optional: true + happy-dom: + optional: true + jsdom: + optional: true + walk-up-path@4.0.0: resolution: {integrity: sha512-3hu+tD8YzSLGuFYtPRb48vdhKMi0KQV5sn+uWr8+7dMEq/2G/dtLrdDinkLjqq5TIbIBjYJ4Ax/n3YiaW7QM8A==} engines: {node: 20 || >=22} @@ -1830,6 +1986,11 @@ packages: engines: {node: '>= 8'} hasBin: true + why-is-node-running@2.3.0: + resolution: {integrity: sha512-hUrmaWBdVDcxvYqnyh09zunKzROWjbZTiNy8dBEjkS7ehEDQibXJ7XvlmtbwuTclUiIyN+CyXQD4Vmko8fNm8w==} + engines: {node: '>=8'} + hasBin: true + wrap-ansi@7.0.0: resolution: {integrity: sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==} engines: {node: '>=10'} @@ -2402,6 +2563,12 @@ snapshots: '@types/audioworklet@0.0.77': {} + '@types/chai@5.2.2': + dependencies: + '@types/deep-eql': 4.0.2 + + '@types/deep-eql@4.0.2': {} + '@types/deno@2.3.0': {} '@types/estree@1.0.8': {} @@ -2416,6 +2583,48 @@ snapshots: '@types/web@0.0.241': {} + '@vitest/expect@3.2.4': + dependencies: + '@types/chai': 5.2.2 + '@vitest/spy': 3.2.4 + '@vitest/utils': 3.2.4 + chai: 5.3.3 + tinyrainbow: 2.0.0 + + '@vitest/mocker@3.2.4(vite@6.3.5(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5))': + dependencies: + '@vitest/spy': 3.2.4 + estree-walker: 3.0.3 + magic-string: 0.30.18 + optionalDependencies: + vite: 6.3.5(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5) + + '@vitest/pretty-format@3.2.4': + dependencies: + tinyrainbow: 2.0.0 + + '@vitest/runner@3.2.4': + dependencies: + '@vitest/utils': 3.2.4 + pathe: 2.0.3 + strip-literal: 3.0.0 + + '@vitest/snapshot@3.2.4': + dependencies: + '@vitest/pretty-format': 3.2.4 + magic-string: 0.30.18 + pathe: 2.0.3 + + '@vitest/spy@3.2.4': + dependencies: + tinyspy: 4.0.3 + + '@vitest/utils@3.2.4': + dependencies: + '@vitest/pretty-format': 3.2.4 + loupe: 3.2.1 + tinyrainbow: 2.0.0 + acorn@8.15.0: {} aggregate-error@4.0.1: @@ -2437,6 +2646,8 @@ snapshots: arrify@3.0.0: {} + assertion-error@2.0.1: {} + async-mutex@0.5.0: dependencies: tslib: 2.8.1 @@ -2459,16 +2670,28 @@ snapshots: buffer-from@1.1.2: {} + cac@6.7.14: {} + camel-case@4.1.2: dependencies: pascal-case: 3.1.2 tslib: 2.8.1 + chai@5.3.3: + dependencies: + assertion-error: 2.0.1 + check-error: 2.1.1 + deep-eql: 5.0.2 + loupe: 3.2.1 + pathval: 2.0.1 + chalk@4.1.2: dependencies: ansi-styles: 4.3.0 supports-color: 7.2.0 + check-error@2.1.1: {} + chownr@3.0.0: {} clean-css@5.3.3: @@ -2564,6 +2787,12 @@ snapshots: csstype@3.1.3: {} + debug@4.4.1: + dependencies: + ms: 2.1.3 + + deep-eql@5.0.2: {} + define-data-property@1.1.4: dependencies: es-define-property: 1.0.1 @@ -2634,6 +2863,8 @@ snapshots: es-errors@1.3.0: {} + es-module-lexer@1.7.0: {} + es6-error@4.1.1: {} esbuild@0.25.9: @@ -2673,6 +2904,12 @@ snapshots: estree-walker@2.0.2: {} + estree-walker@3.0.3: + dependencies: + '@types/estree': 1.0.8 + + expect-type@1.2.2: {} + fast-glob@3.3.3: dependencies: '@nodelib/fs.stat': 2.0.5 @@ -2820,6 +3057,8 @@ snapshots: jose@6.1.0: {} + js-tokens@9.0.1: {} + js-yaml@4.1.0: dependencies: argparse: 2.0.1 @@ -2905,6 +3144,8 @@ snapshots: long@5.3.2: {} + loupe@3.2.1: {} + lower-case@2.0.2: dependencies: tslib: 2.8.1 @@ -2946,6 +3187,8 @@ snapshots: mkdirp@3.0.1: {} + ms@2.1.3: {} + nanoid@3.3.11: {} napi-postinstall@0.3.3: {} @@ -3050,6 +3293,10 @@ snapshots: pathe@0.2.0: {} + pathe@2.0.3: {} + + pathval@2.0.1: {} + picocolors@1.1.1: {} picomatch@2.3.1: {} @@ -3196,6 +3443,8 @@ snapshots: shell-quote@1.8.3: {} + siginfo@2.0.0: {} + signal-exit@4.1.0: {} simple-swizzle@0.2.2: @@ -3223,6 +3472,10 @@ snapshots: sprintf-js@1.1.3: {} + stackback@0.0.2: {} + + std-env@3.9.0: {} + string-width@4.2.3: dependencies: emoji-regex: 8.0.0 @@ -3245,6 +3498,10 @@ snapshots: strip-json-comments@5.0.2: {} + strip-literal@3.0.0: + dependencies: + js-tokens: 9.0.1 + supports-color@7.2.0: dependencies: has-flag: 4.0.0 @@ -3273,11 +3530,21 @@ snapshots: commander: 2.20.3 source-map-support: 0.5.21 + tinybench@2.9.0: {} + + tinyexec@0.3.2: {} + tinyglobby@0.2.14: dependencies: fdir: 6.5.0(picomatch@4.0.3) picomatch: 4.0.3 + tinypool@1.1.1: {} + + tinyrainbow@2.0.0: {} + + tinyspy@4.0.3: {} + to-regex-range@5.0.1: dependencies: is-number: 7.0.0 @@ -3303,6 +3570,27 @@ snapshots: util-deprecate@1.0.2: {} + vite-node@3.2.4(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5): + dependencies: + cac: 6.7.14 + debug: 4.4.1 + es-module-lexer: 1.7.0 + pathe: 2.0.3 + vite: 6.3.5(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5) + transitivePeerDependencies: + - '@types/node' + - jiti + - less + - lightningcss + - sass + - sass-embedded + - stylus + - sugarss + - supports-color + - terser + - tsx + - yaml + vite-plugin-html@3.2.2(vite@6.3.5(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5)): dependencies: '@rollup/pluginutils': 4.2.1 @@ -3335,12 +3623,58 @@ snapshots: terser: 5.43.1 tsx: 4.20.5 + vitest@3.2.4(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5): + dependencies: + '@types/chai': 5.2.2 + '@vitest/expect': 3.2.4 + '@vitest/mocker': 3.2.4(vite@6.3.5(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5)) + '@vitest/pretty-format': 3.2.4 + '@vitest/runner': 3.2.4 + '@vitest/snapshot': 3.2.4 + '@vitest/spy': 3.2.4 + '@vitest/utils': 3.2.4 + chai: 5.3.3 + debug: 4.4.1 + expect-type: 1.2.2 + magic-string: 0.30.18 + pathe: 2.0.3 + picomatch: 4.0.3 + std-env: 3.9.0 + tinybench: 2.9.0 + tinyexec: 0.3.2 + tinyglobby: 0.2.14 + tinypool: 1.1.1 + tinyrainbow: 2.0.0 + vite: 6.3.5(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5) + vite-node: 3.2.4(@types/node@24.3.0)(jiti@2.5.1)(lightningcss@1.30.1)(terser@5.43.1)(tsx@4.20.5) + why-is-node-running: 2.3.0 + optionalDependencies: + '@types/node': 24.3.0 + transitivePeerDependencies: + - jiti + - less + - lightningcss + - msw + - sass + - sass-embedded + - stylus + - sugarss + - supports-color + - terser + - tsx + - yaml + walk-up-path@4.0.0: {} which@2.0.2: dependencies: isexe: 2.0.0 + why-is-node-running@2.3.0: + dependencies: + siginfo: 2.0.0 + stackback: 0.0.2 + wrap-ansi@7.0.0: dependencies: ansi-styles: 4.3.0 diff --git a/js/signals/src/index.ts b/js/signals/src/index.ts index da74cba80..28e30bdb4 100644 --- a/js/signals/src/index.ts +++ b/js/signals/src/index.ts @@ -102,6 +102,9 @@ export class Effect { #stop!: () => void; #stopped: Promise; + #close!: () => void; + #closed: Promise; + // If a function is provided, it will be run with the effect as an argument. constructor(fn?: (effect: Effect) => void) { if (DEV) { @@ -119,6 +122,10 @@ export class Effect { this.#stop = resolve; }); + this.#closed = new Promise((resolve) => { + this.#close = resolve; + }); + if (fn) { this.#schedule(); } @@ -430,6 +437,7 @@ export class Effect { return; } + this.#close(); this.#stop(); for (const fn of this.#dispose) fn(); @@ -444,4 +452,8 @@ export class Effect { Effect.#finalizer.unregister(this); } } + + async closed() { + await this.#closed; + } }