Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions js/hang/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@
"scripts": {
"build": "rimraf dist && tsc -b && tsx ../scripts/package.ts",
"check": "tsc --noEmit",
"release": "tsx ../scripts/release.ts"
"release": "tsx ../scripts/release.ts",
"test": "vitest"
},
"dependencies": {
"@huggingface/transformers": "^3.7.2",
"@kixelated/moq": "workspace:^",
"@kixelated/signals": "workspace:^",
"zod": "^4.1.3",
"async-mutex": "^0.5.0",
"comlink": "^4.4.2",
"@huggingface/transformers": "^3.7.2",
"async-mutex": "^0.5.0"
"zod": "^4.1.3"
},
"devDependencies": {
"@types/audioworklet": "^0.0.77",
"@typescript/lib-dom": "npm:@types/web@^0.0.241",
"fast-glob": "^3.3.3"
"fast-glob": "^3.3.3",
"vitest": "^3.2.4"
}
}
17 changes: 9 additions & 8 deletions js/hang/src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Moq from "@kixelated/moq";
import { Effect, Signal } from "@kixelated/signals";
import type * as Time from "./time";

export type ConnectionProps = {
// The URL of the relay server.
Expand All @@ -11,11 +12,11 @@ export type ConnectionProps = {

// The delay in milliseconds before reconnecting.
// default: 1000
delay?: DOMHighResTimeStamp;
delay?: Time.Milli;

// The maximum delay in milliseconds.
// default: 30000
maxDelay?: number;
maxDelay?: Time.Milli;
};

export type ConnectionStatus = "connecting" | "connected" | "disconnected" | "unsupported";
Expand All @@ -26,20 +27,20 @@ export class Connection {
established = new Signal<Moq.Connection | undefined>(undefined);

readonly reload: boolean;
readonly delay: number;
readonly maxDelay: number;
readonly delay: Time.Milli;
readonly maxDelay: Time.Milli;

signals = new Effect();
#delay: number;
#delay: Time.Milli;

// Increased by 1 each time to trigger a reload.
#tick = new Signal(0);

constructor(props?: ConnectionProps) {
this.url = Signal.from(props?.url);
this.reload = props?.reload ?? true;
this.delay = props?.delay ?? 1000;
this.maxDelay = props?.maxDelay ?? 30000;
this.delay = props?.delay ?? (1000 as Time.Milli);
this.maxDelay = props?.maxDelay ?? (30000 as Time.Milli);

this.#delay = this.delay;

Expand Down Expand Up @@ -91,7 +92,7 @@ export class Connection {
effect.timer(() => this.#tick.set((prev) => Math.max(prev, tick)), this.#delay);

// Exponential backoff.
this.#delay = Math.min(this.#delay * 2, this.maxDelay);
this.#delay = Math.min(this.#delay * 2, this.maxDelay) as Time.Milli;
}
}
});
Expand Down
28 changes: 28 additions & 0 deletions js/hang/src/frame.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import * as Moq from "@kixelated/moq";
import { describe, expect, it } from "vitest";
import { Consumer, Producer } from "./frame";
import * as Time from "./time";

describe("Consumer", () => {
it("should close cleanly", async () => {
// Create a broadcast and track
const broadcast = new Moq.BroadcastProducer();
const track = broadcast.createTrack("test");
const producer = new Producer(track);

// Create consumer from the track
const consumer = new Consumer(track.consume());

producer.encode(new Uint8Array([1]), Time.Micro.fromMilli(1 as Time.Milli), true);
producer.close();

// Close consumer before trying to decode
consumer.close();

const frame = await consumer.decode();
expect(frame).toBeUndefined();

// Clean up
broadcast.close();
});
});
220 changes: 206 additions & 14 deletions js/hang/src/frame.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
import type * as Moq from "@kixelated/moq";
import { Effect } from "@kixelated/signals";
import * as Time from "./time";

export interface Source {
byteLength: number;
copyTo(buffer: Uint8Array): void;
}

export function encode(source: Uint8Array | Source, timestamp: number): Uint8Array {
export interface Frame {
data: Uint8Array;
timestamp: Time.Micro;
keyframe: boolean;
group: number;
}

export function encode(source: Uint8Array | Source, timestamp: Time.Micro): Uint8Array {
// TODO switch over to u64 for simplicity. The varint uses 8 bytes anyway after 18 minutes lul.
// TODO Don't encode into one buffer. Write the header/payload separately to avoid reallocating.
const data = new Uint8Array(8 + (source instanceof Uint8Array ? source.byteLength : source.byteLength));

const size = setVint53(data, timestamp).byteLength;
if (source instanceof Uint8Array) {
data.set(source, size);
Expand All @@ -17,8 +29,9 @@ export function encode(source: Uint8Array | Source, timestamp: number): Uint8Arr
}

// NOTE: A keyframe is always the first frame in a group, so it's not encoded on the wire.
export function decode(buffer: Uint8Array): { data: Uint8Array; timestamp: number } {
const [timestamp, data] = getVint53(buffer);
export function decode(buffer: Uint8Array): { data: Uint8Array; timestamp: Time.Micro } {
const [us, data] = getVint53(buffer);
const timestamp = us as Time.Micro;
return { timestamp, data };
}

Expand All @@ -30,7 +43,7 @@ export class Producer {
this.#track = track;
}

encode(data: Uint8Array | Source, timestamp: number, keyframe: boolean) {
encode(data: Uint8Array | Source, timestamp: Time.Micro, keyframe: boolean) {
if (keyframe) {
this.#group?.close();
this.#group = this.#track.appendGroup();
Expand All @@ -41,29 +54,208 @@ export class Producer {
this.#group?.writeFrame(encode(data, timestamp));
}

consume(): Consumer {
return new Consumer(this.#track.consume());
}

close() {
this.#track.close();
this.#group?.close();
}
}

export interface ConsumerProps {
// Target latency in milliseconds (default: 0)
latency?: Time.Milli;
}

export class Consumer {
#track: Moq.TrackConsumer;
#latency: Time.Micro;
#groups: Moq.GroupConsumer[] = [];
#active = 0; // the active group sequence number
#frames: Frame[] = [];
#prev?: Time.Micro;

constructor(track: Moq.TrackConsumer) {
// Wake up the consumer when a new frame is available.
#notify?: () => void;

#signals = new Effect();

constructor(track: Moq.TrackConsumer, props?: ConsumerProps) {
this.#track = track;
this.#latency = Time.Micro.fromMilli(props?.latency ?? Time.Milli.zero);
this.#signals.spawn(this.#run.bind(this));
}

async #run(cancel: Promise<void>) {
// Start fetching groups in the background
for (;;) {
const group = await Promise.race([this.#track.nextGroup(), cancel]);
if (!group) break;

if (group.sequence < this.#active) {
console.warn(`skipping old group: ${group.sequence} < ${this.#active}`);
// Skip old groups.
group.close();
continue;
}

// Insert into #groups based on the group sequence number (ascending).
// This is used to cancel old groups.
this.#groups.push(group.clone());
this.#groups.sort((a, b) => a.sequence - b.sequence);

// Start buffering frames from this group
this.#signals.spawn(this.#runGroup.bind(this, group));
}
}

async #runGroup(group: Moq.GroupConsumer, cancel: Promise<void>) {
try {
let keyframe = true;

for (;;) {
const next = await Promise.race([group.readFrame(), cancel]);
if (!next) break;

const { data, timestamp } = decode(next);
const frame = {
data,
timestamp,
keyframe,
group: group.sequence,
};

keyframe = false;

// Store frames in buffer in order.
if (this.#frames.length === 0 || frame.timestamp >= this.#frames[this.#frames.length - 1].timestamp) {
// Already sorted; most of the time we just append to the end.
this.#frames.push(frame);
} else {
console.warn(
`frame out of order: ${frame.timestamp} < ${this.#frames[this.#frames.length - 1].timestamp}`,
);
this.#frames.push(frame);
this.#frames.sort((a, b) => a.timestamp - b.timestamp);
}

if (this.#frames.at(0)?.group === this.#active) {
if (this.#notify) {
this.#notify();
this.#notify = undefined;
}
} else {
// Check for latency violations
this.#checkLatency();
}
}

group.close();
} finally {
if (group.sequence === this.#active) {
// Advance to the next group.
// We don't use #skipTo because we don't want to drop the last frames.
this.#active += 1;

if (this.#notify && this.#frames.at(0)?.group === this.#active) {
this.#notify();
this.#notify = undefined;
}
}
}
}

#checkLatency() {
if (this.#frames.length < 2) return;

// Check if we have at least #latency frames in the queue.
const first = this.#frames[0];
const last = this.#frames[this.#frames.length - 1];

const latency = last.timestamp - first.timestamp;
if (latency < this.#latency) return;

// Skip to the next group
const nextFrame = this.#frames.find((f) => f.group > this.#active);
if (!nextFrame) return; // Within the same group, ignore for now

if (this.#prev) {
console.warn(`latency violation: ${Math.round(latency / 1000)}ms buffered`);
console.warn(`skipping ahead: ${Math.round((nextFrame.timestamp - this.#prev) / 1000)}ms`);
}

this.#skipTo(nextFrame.group);
}

#skipTo(groupId: number) {
this.#active = groupId;

// Skip old groups.
while (this.#groups.length > 0 && this.#groups[0].sequence < this.#active) {
this.#groups.shift()?.close();
}

// Skip old frames.
let dropped = 0;
while (this.#frames.length > 0 && this.#frames[0].group < this.#active) {
dropped++;
this.#frames.shift();
}

if (dropped > 0) {
console.warn(`dropped ${dropped} frames while skipping`);
}

if (this.#notify && this.#frames.at(0)?.group === this.#active) {
this.#notify();
this.#notify = undefined;
}
}

async decode(): Promise<{ data: Uint8Array; timestamp: number; keyframe: boolean } | undefined> {
const next = await this.#track.nextFrame();
if (!next) return undefined;
async decode(): Promise<Frame | undefined> {
for (;;) {
// Check if we have frames from the active group
if (this.#frames.length > 0) {
if (this.#frames[0].group === this.#active) {
const next = this.#frames.shift();
this.#prev = next?.timestamp;
return next;
}

// We have frames but not from the active group
// Check if we should move to the next group
const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active);
if (nextGroupFrames.length > 0) {
// Move to the next group
const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group));
this.#skipTo(nextGroup);
continue;
}
Comment on lines +223 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor latency policy; don’t skip just because newer-group frames exist

This unconditionally drops the remainder of the active group whenever any newer group is buffered, ignoring #latency. Defer to #checkLatency() (which enforces the policy) and only skip when it decides to.

-				// We have frames but not from the active group
-				// Check if we should move to the next group
-				const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active);
-				if (nextGroupFrames.length > 0) {
-					// Move to the next group
-					const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group));
-					this.#skipTo(nextGroup);
-					continue;
-				}
+				// Not from the active group; enforce latency policy before considering skips.
+				this.#checkLatency();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// We have frames but not from the active group
// Check if we should move to the next group
const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active);
if (nextGroupFrames.length > 0) {
// Move to the next group
const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group));
this.#skipTo(nextGroup);
continue;
}
// Not from the active group; enforce latency policy before considering skips.
this.#checkLatency();
🤖 Prompt for AI Agents
In js/hang/src/frame.ts around lines 223 to 231, the code currently
unconditionally advances to the next group when any newer-group frames are
buffered, which ignores the latency policy; change it to consult the latency
policy first by calling this.#checkLatency() and only call
this.#skipTo(nextGroup) when that check returns true. Compute nextGroup as
before from nextGroupFrames, but replace the unconditional skip with an if
(this.#checkLatency()) { this.#skipTo(nextGroup); } else { break/return/wait
(i.e., stop dropping the rest of the active group) } so that skipping occurs
only when the latency policy allows it.

}

if (this.#notify) {
throw new Error("multiple calls to decode not supported");
}

const wait = new Promise<void>((resolve) => {
this.#notify = resolve;
}).then(() => true);

if (!(await Promise.race([wait, this.#signals.closed()]))) {
this.#notify = undefined;
// Consumer was closed while waiting for a new frame.
return undefined;
}
}
}

close(): void {
this.#signals.close();

for (const group of this.#groups) {
group.close();
}

const { timestamp, data } = decode(next.data);
return { data, timestamp, keyframe: next.frame === 0 };
this.#groups = [];
this.#frames = [];
}
}

Expand Down
Loading
Loading