Skip to content

Conversation

@kixelated
Copy link
Collaborator

@kixelated kixelated commented Sep 2, 2025

Supports inserting into the worklet buffer out-of-order. Previously, it would append each sample without accounting for gaps, causing artifacts and the buffer to slowly deplete.

Now receives groups/frames out of order. Previously, it would use nextFrame to receive everything in order, canceling late frames immediately. Now it will wait up to latency for them to arrive before moving on.

The jitter buffer class should be reused for video. There's a noticeable desync right now because the audio latency is 100ms+ while the video latency is 1 frame.

Summary by CodeRabbit

  • New Features

    • Typed time units and time-safe APIs; latency-aware consumer buffering and reordering; public AudioContext getter.
  • Improvements

    • More accurate timestamps and GOP/grouping for audio/video; audio ring buffer for smoother playback and reduced underflow; group identifier renamed to "sequence"; effect lifecycle can be awaited.
  • Tests

    • Added frame-closure and comprehensive audio ring-buffer tests.
  • Chores

    • Added test script and new runtime/dev dependencies.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 2, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

Adds a typed Time module and propagates branded Milli/Micro types across the codebase; overhauls frame encoding and Consumer into a latency-aware, grouped buffer with background prefetching; replaces worklet buffering with an AudioRingBuffer and adds tests; renames MoQ group idsequence; updates connection/backoff typing; adds package deps and test script; adds Effect.closed lifecycle.

Changes

Cohort / File(s) Summary
Time units module
js/hang/src/time.ts
Adds branded time types (Nano/Micro/Milli/Second) with conversion helpers and zero constants.
Frame pipeline overhaul
js/hang/src/frame.ts, js/hang/src/frame.test.ts
Adds Frame interface; encode/decode and Producer/Consumer APIs use Time.Micro; Consumer rewritten to background-prefetch, group-buffer, latency-aware decode; new ConsumerProps; unit test added.
Audio publish (input/encoding)
js/hang/src/publish/audio/index.ts, js/hang/src/publish/audio/capture.ts, js/hang/src/publish/audio/capture-worklet.ts, js/hang/src/publish/audio/captions.ts, js/hang/src/publish/video/index.ts, js/hang/src/publish/video/polyfill.ts
Standardizes timestamps/latency types to Time.Micro/Time.Milli; computes microsecond timestamps from samples; updates GOP/grouping and captions TTL typing.
Audio watch/render pipeline
js/hang/src/watch/audio/index.ts, js/hang/src/watch/audio/emitter.ts, js/hang/src/watch/audio/render.ts, js/hang/src/watch/audio/render-worklet.ts, js/hang/src/watch/audio/ring-buffer.ts, js/hang/src/watch/audio/ring-buffer.test.ts
Reworks watch audio: adds context getter and latency-typed props; replaces manual buffering with AudioRingBuffer; changes worklet Init/Data shapes and timestamp types to Time.Milli/Time.Micro; adds ring buffer implementation and tests; gates destination connection by enabled state.
Connection/backoff typing
js/hang/src/connection.ts
Converts delay and maxDelay to Time.Milli, updates defaults and exponential-backoff casts.
MoQ group sequencing
js/moq/src/group.ts, js/moq/src/track.ts, js/moq/src/track.test.ts, js/moq/src/ietf/publisher.ts, js/moq/src/lite/publisher.ts, js/moq/src/lite/subscriber.ts
Renames Group idsequence; updates producers/consumers, header usage, tests, and makes subscriber throw on unknown future subscription ids.
Signals Effect lifecycle
js/signals/src/index.ts
Adds Effect close lifecycle wiring and new async closed() method to await completion.
Package config
js/hang/package.json
Adds test script (vitest), adds vitest devDependency and runtime deps @huggingface/transformers, async-mutex, zod.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Producer
  participant Track as Moq.Track
  participant Consumer as Frame.Consumer
  participant Buffer as Groups/Frames Buffer
  participant Decoder as Watch.Audio.Decoder

  Producer->>Track: push encoded frame (timestamp: Time.Micro, group)
  Track-->>Consumer: frames stream (grouped)
  Consumer->>Buffer: prefetch & store (run/runGroup)
  loop consumer loop
    Decoder->>Consumer: decode()
    alt frame available (within latency)
      Consumer-->>Decoder: Frame (from active group)
    else latency exceeded
      Consumer->>Buffer: skipTo(next group)
      Consumer-->>Decoder: Frame (next keyframe group)
    end
  end
Loading
sequenceDiagram
  autonumber
  participant Watcher as Watch.Audio
  participant Worklet as AudioWorkletProcessor
  participant RB as AudioRingBuffer

  Watcher->>Worklet: postMessage Init { rate, channels, latency: Time.Milli }
  Worklet->>RB: construct(rate, channels, latency)
  loop streaming
    Watcher->>Worklet: postMessage Data(timestamp: Time.Micro, Float32Array[])
    Worklet->>RB: write(timestamp, data)
  end
  loop audio process
    Worklet->>RB: read(output)
    alt underflow
      Worklet-->>Worklet: increment underflow, warn once
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

I nibble Micro-ticks, count Milli hops in tune,
Frames line up in burrows beneath the silvery moon.
Sequence changed its nametag, buffers hum in ring,
Latency kept snug — I twirl, my ears take wing. 🐇✨


📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 908460e and de3997c.

📒 Files selected for processing (1)
  • js/hang/src/frame.ts (4 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch audio-jitter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
js/moq/src/track.ts (1)

35-41: Use sequence instead of deprecated id when advancing #next.

GroupProducer was migrated to sequence. Using group.id can break compilation or desync #next.

Apply:

-    this.#next = group.id + 1;
+    this.#next = group.sequence + 1;
js/hang/src/publish/audio/index.ts (1)

232-240: AudioDataInit does not support a 'transfer' field (TypeScript compile error).

AudioDataInit only accepts format, sampleRate, numberOfFrames, numberOfChannels, timestamp, and data. Remove transfer to satisfy DOM WebCodecs types.

 const frame = new AudioData({
   format: "f32-planar",
   sampleRate: worklet.context.sampleRate,
   numberOfFrames: channels[0].length,
   numberOfChannels: channels.length,
   timestamp: data.timestamp,
   data: joined,
-  transfer: [joined.buffer],
 });
🧹 Nitpick comments (29)
js/moq/src/group.ts (1)

80-96: Align constructor param with renamed field; consider compatibility alias

Rename the param to sequence to avoid confusion and keep API coherent. Optionally add a deprecated id getter for backward compatibility if any external code still reads id.

Apply within this hunk:

-	constructor(frames: WatchConsumer<Uint8Array[]>, id: number) {
-		this.sequence = id;
+	constructor(frames: WatchConsumer<Uint8Array[]>, sequence: number) {
+		this.sequence = sequence;
 		this.#frames = frames;
 	}

Outside this hunk (optional alias):

// Deprecated: remove in a future major
get id(): number { return this.sequence }
js/moq/src/track.test.ts (1)

21-24: Assertions updated to sequence — good; tiny clarity nit

LGTM across all updates. Optional: where groupX is expected to be absent (Lines 67-70), assert.strictEqual(group4A, undefined) and assert.strictEqual(group4B, undefined) reads a bit clearer than probing ?.sequence.

Also applies to: 32-34, 41-41, 51-54, 67-70, 74-74, 91-92

js/moq/src/track.ts (2)

212-214: Avoid optional chaining; #currentGroup is guaranteed here.

You throw if #currentGroup is undefined above, so the ?. is unnecessary.

-                return { group: this.#currentGroup?.sequence, frame: this.#currentFrame++, data: next.frame };
+                return { group: this.#currentGroup.sequence, frame: this.#currentFrame++, data: next.frame };

216-222: Clarify duplicate/older group skip semantics.

>= is intentional to dedupe equal or older sequences. Consider a brief comment to prevent future regressions.

-            if (this.#currentGroup && this.#currentGroup.sequence >= next.group.sequence) {
+            // Skip if not strictly newer (dedupe equal or older sequences).
+            if (this.#currentGroup && this.#currentGroup.sequence >= next.group.sequence) {
js/signals/src/index.ts (1)

456-458: Return the promise directly (avoid extra microtask, add explicit type).

No need for an async wrapper here; return #closed with a concrete return type.

-  async closed() {
-    await this.#closed;
-  }
+  closed(): Promise<void> {
+    return this.#closed;
+  }
js/hang/package.json (2)

8-22: Export the time module for public consumption.

Downstream callers currently rely on relative paths. Consider adding an export for stability:

   "exports": {
     ".": "./src/index.ts",
+    "./time": "./src/time.ts",
     "./publish": "./src/publish/index.ts",
     "./publish/element": "./src/publish/element.ts",
     "./watch": "./src/watch/index.ts",

42-48: Make @huggingface/transformers an optional peer and switch to dynamic imports

  • In package.json: move "@huggingface/transformers": "^3.7.2" to peerDependencies and add peerDependenciesMeta with "optional": true.
  • In src/publish/audio/captions-worker.ts, speaking-worker.ts, and video/detection-worker.ts: replace top-level static imports with dynamic await import("@huggingface/transformers") inside your init logic, and throw a clear runtime error if the module isn’t installed.
js/hang/src/time.ts (1)

1-47: Optional: add safe “coerce” helpers and freeze the unit objects.

  • Provide coerce(n: number): <Unit> to avoid scattered as <Unit> casts.
  • Freeze the objects to prevent accidental runtime mutation.

Example (pattern only):

 export const Milli = {
   zero: 0 as Milli,
+  coerce: (n: number): Milli => n as Milli,
   fromNano: (ns: Nano): Milli => (ns / 1_000_000) as Milli,
   ...
-} as const;
+} as const satisfies Readonly<Record<string, unknown>>;
+Object.freeze(Milli);
js/hang/src/publish/audio/captions.ts (2)

35-36: Prefer value-level conversion over literal cast for the default TTL.

Use the unit helpers for readability and future refactors. Requires switching to a value import.

-import type * as Time from "../../time";
+import * as Time from "../../time";
@@
-    this.#ttl = props?.ttl ?? (10000 as Time.Milli);
+    this.#ttl = props?.ttl ?? Time.Milli.fromSecond(10 as Time.Second);

51-59: Clear catalog on cleanup to avoid stale state after disable/teardown.

Right now the track is removed but catalog remains set.

     this.audio.broadcast.insertTrack(this.#track.consume());
-    effect.cleanup(() => this.audio.broadcast.removeTrack(this.#track.name));
+    effect.cleanup(() => {
+      this.audio.broadcast.removeTrack(this.#track.name);
+      this.catalog.set(undefined);
+    });
js/hang/src/publish/video/polyfill.ts (1)

52-55: Keep time units branded through the frame pacing check.

Avoid mixing branded/unbranded numbers; compute the min frame delta with the Time API to maintain Milli throughout.

Apply:

-const now = performance.now() as Time.Milli;
-if (now - last < 1000 / frameRate) {
+const now = performance.now() as Time.Milli;
+const minDelta = Time.Milli.fromSecond((1 / frameRate) as Time.Second);
+if ((now - last) < minDelta) {
js/hang/src/publish/audio/capture-worklet.ts (1)

13-18: Timestamp derivation looks good; consider integer microseconds.

WebCodecs timestamps are integer µs; rounding avoids fractional drift from FP math.

-// Convert sample count to microseconds
-const timestamp = Time.Micro.fromSecond((this.#sampleCount / sampleRate) as Time.Second);
+// Convert sample count to integer microseconds
+const timestamp = Math.round(
+  Time.Micro.fromSecond((this.#sampleCount / sampleRate) as Time.Second) as number
+) as Time.Micro;
js/hang/src/connection.ts (2)

95-96: Add jitter to exponential backoff to prevent reconnection stampedes.

Randomizing the delay reduces herd effects when many clients reconnect simultaneously.

-// Exponential backoff.
-this.#delay = Math.min(this.#delay * 2, this.maxDelay) as Time.Milli;
+// Exponential backoff with jitter (±20%)
+const base = Math.min(this.#delay * 2, this.maxDelay);
+const jittered = base * (0.8 + Math.random() * 0.4);
+this.#delay = Math.min(jittered, this.maxDelay) as Time.Milli;

42-45: Clamp initial delay to maxDelay.

Guard against props configuring delay > maxDelay.

-this.#delay = this.delay;
+this.#delay = Math.min(this.delay, this.maxDelay) as Time.Milli;
js/hang/src/frame.test.ts (1)

6-28: Solid sanity test; consider adding a latency/jitter scenario later.

This verifies close semantics; a follow-up could assert decode ordering under out-of-order group arrivals.

js/hang/src/publish/video/index.ts (1)

151-154: Preserve the Micro brand in arithmetic (type-level nit).

groupTimestamp + GOP_DURATION erases the Micro brand to number. Cast or add a helper to keep the brand for clearer intent.

Apply:

-				const keyFrame = !group || groupTimestamp + GOP_DURATION <= frame.timestamp;
+				const due = (groupTimestamp + GOP_DURATION) as Time.Micro;
+				const keyFrame = !group || due <= frame.timestamp;
js/hang/src/watch/audio/render-worklet.ts (2)

19-21: Drop or queue data messages received before init instead of throwing.

Throwing inside the worklet can terminate processing. Prefer warn-and-drop for resilience.

-      } else if (type === "data") {
-        if (!this.#buffer) throw new Error("buffer not initialized");
-        this.#buffer.write(event.data.timestamp, event.data.data);
+      } else if (type === "data") {
+        if (!this.#buffer) {
+          console.warn("dropping audio data: buffer not initialized");
+          return;
+        }
+        this.#buffer.write(event.data.timestamp, event.data.data);

15-17: Include units in init log.

Small clarity boost; makes logs actionable when tuning latency.

-        console.debug(`init: ${event.data.latency}`);
+        console.debug(`init: latency=${event.data.latency}ms, rate=${event.data.rate}, channels=${event.data.channels}`);
js/hang/src/watch/audio/ring-buffer.test.ts (3)

373-384: Fix misleading comments about units in fractional timestamp test.

At rate=1000 Hz, 1105 ms is 1.105 s → 1105 samples (not 110.5). Comments should reflect ms→samples correctly.

-  // Write with fractional timestamp that rounds
-  write(buffer, 1105 as Time.Milli, 10, { channels: 2, value: 1.0 }); // 110.5 samples, rounds to 111
-  write(buffer, 1204 as Time.Milli, 10, { channels: 2, value: 2.0 }); // 120.4 samples, rounds to 120
+  // Write with fractional timestamp: ms→samples at 1kHz (1105ms→1105 samples, 1204ms→1204 samples)
+  write(buffer, 1105 as Time.Milli, 10, { channels: 2, value: 1.0 });
+  write(buffer, 1204 as Time.Milli, 10, { channels: 2, value: 2.0 });

34-59: Add tests for negative constructor inputs (rate/channels/latency).

Constructor only rejects zero; negative values should also throw to avoid undefined behavior.

Proposed additions:

 describe("initialization", () => {
   it("should initialize with valid parameters", () => {
@@
   it("should throw on invalid latency", () => {
     expect(() => new AudioRingBuffer({ rate: 48000, channels: 2, latency: 0 as Time.Milli })).toThrow(
       /invalid latency/,
     );
   });
+
+  it("should throw on negative parameters", () => {
+    expect(() => new AudioRingBuffer({ rate: -1, channels: 2, latency: 100 as Time.Milli })).toThrow();
+    expect(() => new AudioRingBuffer({ rate: 48000, channels: -2, latency: 100 as Time.Milli })).toThrow();
+    expect(() => new AudioRingBuffer({ rate: 48000, channels: 2, latency: (-1) as Time.Milli })).toThrow();
+  });
 });

Want me to open a follow-up to add these?


354-372: Add a test for mismatched per-channel lengths.

write should reject channel arrays of differing lengths.

Proposed test:

 describe("edge cases", () => {
   it("should throw when output array has wrong channel count", () => {
@@
   });
 
+  it("should throw when input channel lengths differ", () => {
+    const buffer = new AudioRingBuffer({ rate: 1000, channels: 2, latency: 100 as Time.Milli });
+    const data: Float32Array[] = [new Float32Array(10), new Float32Array(8)];
+    expect(() => buffer.write(Time.Micro.fromMilli(0 as Time.Milli), data)).toThrow(/channel length mismatch/);
+  });

I can wire this once the runtime check is added in the class.

js/hang/src/watch/audio/ring-buffer.ts (3)

70-84: Optional: replace per-sample zeroing with typed-array fill using two spans.

Reduces loop overhead, especially for bigger gaps, while preserving wrap semantics.

     if (start > this.#writeIndex) {
-      const gapSize = Math.min(start - this.#writeIndex, this.#buffer[0].length);
+      const cap = this.#buffer[0].length;
+      const gapSize = Math.min(start - this.#writeIndex, cap);
       if (gapSize === 1) {
         console.warn("floating point inaccuracy detected");
       }
 
       for (let channel = 0; channel < this.channels; channel++) {
-        const dst = this.#buffer[channel];
-        for (let i = 0; i < gapSize; i++) {
-          const writePos = (this.#writeIndex + i) % dst.length;
-          dst[writePos] = 0;
-        }
+        const dst = this.#buffer[channel];
+        const head = this.#writeIndex % cap;
+        const first = Math.min(gapSize, cap - head);
+        dst.fill(0, head, head + first);
+        if (first < gapSize) dst.fill(0, 0, gapSize - first);
       }
     }

86-101: Optional: fast-path contiguous writes with TypedArray.set and wrap handling.

This reduces per-sample overhead for common non-wrapping writes.

-    for (let channel = 0; channel < this.channels; channel++) {
-      const src = data[channel];
-      const dst = this.#buffer[channel];
-
-      for (let i = 0; i < samples; i++) {
-        const writePos = (start + i) % dst.length;
-        dst[writePos] = src[offset + i];
-      }
-    }
+    for (let channel = 0; channel < this.channels; channel++) {
+      const src = data[channel];
+      const dst = this.#buffer[channel];
+      const cap = dst.length;
+      const head = start % cap;
+      const first = Math.min(samples, cap - head);
+      dst.set(src.subarray(offset, offset + first), head);
+      if (first < samples) {
+        const remaining = samples - first;
+        dst.set(src.subarray(offset + first, offset + samples), 0);
+      }
+    }

103-122: Optional: fast-path reads with TypedArray.set and wrap handling.

Symmetric optimization for read.

-    for (let channel = 0; channel < this.channels; channel++) {
-      const dst = output[channel];
-      const src = this.#buffer[channel];
-
-      for (let i = 0; i < samples; i++) {
-        const readPos = (this.#readIndex + i) % src.length;
-        dst[i] = src[readPos];
-      }
-    }
+    for (let channel = 0; channel < this.channels; channel++) {
+      const dst = output[channel];
+      const src = this.#buffer[channel];
+      const cap = src.length;
+      const head = this.#readIndex % cap;
+      const first = Math.min(samples, cap - head);
+      dst.set(src.subarray(head, head + first), 0);
+      if (first < samples) {
+        dst.set(src.subarray(0, samples - first), first);
+      }
+    }
js/hang/src/frame.ts (3)

17-29: Simplify encode: remove redundant instanceof checks and precompute payload length

Uint8Array already has byteLength. This reduces duplication and clarifies bounds.

-export function encode(source: Uint8Array | Source, timestamp: Time.Micro): Uint8Array {
+export function encode(source: Uint8Array | Source, timestamp: Time.Micro): Uint8Array {
   // TODO switch over to u64 for simplicity. The varint uses 8 bytes anyway after 18 minutes lul.
   // TODO Don't encode into one buffer. Write the header/payload separately to avoid reallocating.
-  const data = new Uint8Array(8 + (source instanceof Uint8Array ? source.byteLength : source.byteLength));
+  const payloadLen = source.byteLength;
+  const data = new Uint8Array(8 + payloadLen);
 
   const size = setVint53(data, timestamp).byteLength;
   if (source instanceof Uint8Array) {
     data.set(source, size);
   } else {
     source.copyTo(data.subarray(size));
   }
-  return data.subarray(0, (source instanceof Uint8Array ? source.byteLength : source.byteLength) + size);
+  return data.subarray(0, payloadLen + size);
 }

274-277: Use unsigned read for 2-byte QUIC varint branch

getUint16 avoids unnecessary sign handling; the mask remains correct.

-  } else if (size === 2) {
-    v = view.getInt16(0) & 0x3fff;
+  } else if (size === 2) {
+    v = view.getUint16(0) & 0x3fff;

128-138: Avoid full-array sort on occasional out-of-order frames

For small buffers it's fine, but consider binary insertion to keep O(log n) insert vs O(n log n) resort.

If you see frequent reordering, replace the push+sort path with binary insertion:

function insertByTimestamp(frames: Frame[], f: Frame) {
  let lo = 0, hi = frames.length;
  while (lo < hi) {
    const mid = (lo + hi) >> 1;
    if (frames[mid].timestamp <= f.timestamp) lo = mid + 1;
    else hi = mid;
  }
  frames.splice(lo, 0, f);
}

And here:

-  this.#frames.push(frame);
-  this.#frames.sort((a, b) => a.timestamp - b.timestamp);
+  insertByTimestamp(this.#frames, frame);
js/hang/src/watch/audio/index.ts (2)

84-107: Creating AudioContext when disabled: acceptable trade-off; ensure resume is awaited when enabling

Right now resume() is called without awaiting; if you need determinism before decoding/connecting, wrap in an async spawn and await the promise.

-#runEnabled(effect: Effect): void {
+#runEnabled(effect: Effect): void {
   const enabled = effect.get(this.enabled);
   if (!enabled) return;
   const context = effect.get(this.#context);
   if (!context) return;
-  context.resume();
+  effect.spawn(async () => { await context.resume(); });
 }

14-18: Name nit: UNDERHEAD → HEADROOM

Minor terminology polish; “headroom” is the common term.

-const JITTER_UNDERHEAD = 25 as Time.Milli;
+const JITTER_HEADROOM = 25 as Time.Milli;
...
-  latency: Math.max(this.latency - JITTER_UNDERHEAD, 0) as Time.Milli,
+  latency: Math.max(this.latency - JITTER_HEADROOM, 0) as Time.Milli,
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between dd3cffc and 8f5448f.

⛔ Files ignored due to path filters (1)
  • js/pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (24)
  • js/hang/package.json (1 hunks)
  • js/hang/src/connection.ts (4 hunks)
  • js/hang/src/frame.test.ts (1 hunks)
  • js/hang/src/frame.ts (4 hunks)
  • js/hang/src/publish/audio/captions.ts (3 hunks)
  • js/hang/src/publish/audio/capture-worklet.ts (2 hunks)
  • js/hang/src/publish/audio/capture.ts (1 hunks)
  • js/hang/src/publish/audio/index.ts (6 hunks)
  • js/hang/src/publish/video/index.ts (5 hunks)
  • js/hang/src/publish/video/polyfill.ts (3 hunks)
  • js/hang/src/time.ts (1 hunks)
  • js/hang/src/watch/audio/emitter.ts (1 hunks)
  • js/hang/src/watch/audio/index.ts (10 hunks)
  • js/hang/src/watch/audio/render-worklet.ts (2 hunks)
  • js/hang/src/watch/audio/render.ts (1 hunks)
  • js/hang/src/watch/audio/ring-buffer.test.ts (1 hunks)
  • js/hang/src/watch/audio/ring-buffer.ts (1 hunks)
  • js/moq/src/group.ts (3 hunks)
  • js/moq/src/ietf/publisher.ts (1 hunks)
  • js/moq/src/lite/publisher.ts (1 hunks)
  • js/moq/src/lite/subscriber.ts (1 hunks)
  • js/moq/src/track.test.ts (5 hunks)
  • js/moq/src/track.ts (2 hunks)
  • js/signals/src/index.ts (4 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
{rs,js}/hang/**

📄 CodeRabbit inference engine (CLAUDE.md)

All media logic belongs in the hang layer (rs/hang and js/hang)

Files:

  • js/hang/src/frame.test.ts
  • js/hang/src/publish/video/index.ts
  • js/hang/src/publish/audio/captions.ts
  • js/hang/src/publish/audio/capture.ts
  • js/hang/src/watch/audio/ring-buffer.test.ts
  • js/hang/src/publish/audio/capture-worklet.ts
  • js/hang/src/time.ts
  • js/hang/src/watch/audio/emitter.ts
  • js/hang/src/publish/video/polyfill.ts
  • js/hang/src/frame.ts
  • js/hang/src/watch/audio/ring-buffer.ts
  • js/hang/package.json
  • js/hang/src/connection.ts
  • js/hang/src/watch/audio/render-worklet.ts
  • js/hang/src/publish/audio/index.ts
  • js/hang/src/watch/audio/render.ts
  • js/hang/src/watch/audio/index.ts
js/**

📄 CodeRabbit inference engine (CLAUDE.md)

Place JS/TS packages under the js/ directory

Files:

  • js/hang/src/frame.test.ts
  • js/hang/src/publish/video/index.ts
  • js/moq/src/lite/publisher.ts
  • js/moq/src/ietf/publisher.ts
  • js/hang/src/publish/audio/captions.ts
  • js/moq/src/lite/subscriber.ts
  • js/hang/src/publish/audio/capture.ts
  • js/moq/src/track.test.ts
  • js/hang/src/watch/audio/ring-buffer.test.ts
  • js/hang/src/publish/audio/capture-worklet.ts
  • js/hang/src/time.ts
  • js/hang/src/watch/audio/emitter.ts
  • js/moq/src/track.ts
  • js/signals/src/index.ts
  • js/hang/src/publish/video/polyfill.ts
  • js/hang/src/frame.ts
  • js/hang/src/watch/audio/ring-buffer.ts
  • js/hang/package.json
  • js/hang/src/connection.ts
  • js/hang/src/watch/audio/render-worklet.ts
  • js/hang/src/publish/audio/index.ts
  • js/moq/src/group.ts
  • js/hang/src/watch/audio/render.ts
  • js/hang/src/watch/audio/index.ts
🧬 Code graph analysis (20)
js/hang/src/frame.test.ts (2)
js/hang/src/frame.ts (2)
  • Producer (38-61)
  • Consumer (68-257)
js/hang/src/time.ts (2)
  • Milli (25-25)
  • Milli (27-35)
js/hang/src/publish/video/index.ts (2)
js/hang/src/time.ts (4)
  • Second (37-37)
  • Second (39-47)
  • Micro (13-13)
  • Micro (15-23)
js/hang/src/frame.ts (2)
  • group (110-161)
  • Frame (10-15)
js/moq/src/lite/publisher.ts (2)
js/moq/src/lite/group.ts (1)
  • Group (3-32)
js/hang/src/frame.ts (1)
  • group (110-161)
js/moq/src/ietf/publisher.ts (1)
js/hang/src/frame.ts (1)
  • group (110-161)
js/hang/src/publish/audio/captions.ts (3)
js/hang/src/time.ts (2)
  • Milli (25-25)
  • Milli (27-35)
js/hang/src/watch/audio/index.ts (1)
  • Audio (38-230)
js/hang/src/publish/audio/index.ts (1)
  • Audio (62-280)
js/moq/src/lite/subscriber.ts (1)
js/hang/src/frame.ts (1)
  • group (110-161)
js/hang/src/publish/audio/capture.ts (1)
js/hang/src/time.ts (2)
  • Micro (13-13)
  • Micro (15-23)
js/moq/src/track.test.ts (1)
js/hang/src/frame.ts (1)
  • group (110-161)
js/hang/src/watch/audio/ring-buffer.test.ts (2)
js/hang/src/watch/audio/ring-buffer.ts (3)
  • read (103-122)
  • AudioRingBuffer (3-123)
  • write (41-101)
js/hang/src/time.ts (2)
  • Milli (25-25)
  • Milli (27-35)
js/hang/src/publish/audio/capture-worklet.ts (2)
js/hang/src/time.ts (2)
  • Second (37-37)
  • Second (39-47)
js/hang/src/publish/audio/capture.ts (1)
  • AudioFrame (3-6)
js/hang/src/watch/audio/emitter.ts (2)
js/hang/src/watch/audio/index.ts (3)
  • effect (84-129)
  • effect (131-141)
  • effect (143-190)
js/signals/src/index.ts (1)
  • effect (310-320)
js/moq/src/track.ts (1)
js/hang/src/frame.ts (1)
  • group (110-161)
js/hang/src/publish/video/polyfill.ts (1)
js/hang/src/time.ts (2)
  • Milli (25-25)
  • Milli (27-35)
js/hang/src/frame.ts (4)
js/hang/src/time.ts (4)
  • Micro (13-13)
  • Micro (15-23)
  • Milli (25-25)
  • Milli (27-35)
js/moq/src/track.ts (3)
  • TrackConsumer (143-322)
  • nextFrame (199-232)
  • nextGroup (179-192)
js/moq/src/group.ts (1)
  • GroupConsumer (78-147)
js/signals/src/index.ts (1)
  • Effect (88-459)
js/hang/src/watch/audio/ring-buffer.ts (1)
js/hang/src/time.ts (4)
  • Milli (25-25)
  • Milli (27-35)
  • Micro (13-13)
  • Micro (15-23)
js/hang/src/connection.ts (1)
js/hang/src/time.ts (2)
  • Milli (25-25)
  • Milli (27-35)
js/hang/src/watch/audio/render-worklet.ts (1)
js/hang/src/watch/audio/ring-buffer.ts (1)
  • AudioRingBuffer (3-123)
js/hang/src/publish/audio/index.ts (2)
js/hang/src/time.ts (4)
  • Milli (25-25)
  • Milli (27-35)
  • Micro (13-13)
  • Micro (15-23)
js/hang/src/frame.ts (2)
  • group (110-161)
  • Frame (10-15)
js/hang/src/watch/audio/render.ts (1)
js/hang/src/time.ts (4)
  • Micro (13-13)
  • Micro (15-23)
  • Milli (25-25)
  • Milli (27-35)
js/hang/src/watch/audio/index.ts (3)
js/hang/src/time.ts (4)
  • Milli (25-25)
  • Milli (27-35)
  • Micro (13-13)
  • Micro (15-23)
js/hang/src/watch/audio/render.ts (1)
  • Init (11-16)
js/hang/src/frame.ts (1)
  • Frame (10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (23)
js/moq/src/group.ts (1)

141-142: Clone uses sequence correctly

Looks consistent with the rename; no issues.

js/moq/src/ietf/publisher.ts (1)

151-156: Switched to group.sequence in header — good

Matches the id→sequence rename and wire expectations.

js/moq/src/lite/publisher.ts (1)

203-205: Using group.sequence in lite Group message — good

Consistent with the new identifier semantics.

js/moq/src/track.ts (1)

52-58: Sequence-based dedupe on insert looks correct.

Dropping older groups and advancing #next from group.sequence + 1 is consistent with the new ordering model.

js/signals/src/index.ts (3)

105-107: Good addition: explicit closed lifecycle hooks.

The private resolver + promise pair cleanly models “closed” state for external awaiters.


125-127: Constructor wiring looks correct.

Resolver is set once and is idempotent under guarded close().


440-440: Right call order in close().

Resolving #closed before cleanup lets waiters observe completion deterministically.

js/hang/package.json (1)

39-40: Vitest script wiring LGTM.

Script name and devDependency align; nothing else to change here.

js/hang/src/time.ts (1)

1-47: Solid, minimal unit branding with correct scale factors.

Conversions read clearly; branding prevents unit mix-ups.

js/hang/src/publish/audio/capture.ts (2)

1-5: Type upgrade to Time.Micro is correct.

Interface now encodes timestamp units explicitly; aligns with encoder usage.


1-5: All timestamp producers emit microseconds via Time.Micro conversions; no changes required.

js/hang/src/publish/audio/captions.ts (2)

15-16: TTL typed as Time.Milli: good change.

This documents the unit and avoids accidental seconds vs ms mistakes.


65-66: Timer unit match is fine.

Passing Time.Milli to Effect.timer(ms: number) is type-safe with branded numbers; no change needed.

js/hang/src/publish/video/polyfill.ts (1)

60-61: Correct unit conversion to microseconds.

Using Time.Micro.fromMilli(last) fixes the previous ms→µs conversion bug for VideoFrame timestamps.

js/hang/src/publish/audio/index.ts (2)

96-99: Typed default for maxLatency looks good.

Defaulting to 100ms as Time.Milli aligns with the new branded time model.


198-205: Group rotation based on typed latency is correct.

Comparing frame.timestamp against Time.Micro.fromMilli(this.maxLatency) is consistent and avoids unit bugs.

js/hang/src/publish/video/index.ts (2)

16-18: Typed GOP duration looks good.

Switch to Time.Micro.fromSecond strengthens units without changing semantics.


108-118: LGTM on timestamp branding and frame encode usage.

Casting WebCodecs timestamps to Time.Micro at boundaries is consistent and contained.

Also applies to: 124-125

js/hang/src/watch/audio/render.ts (2)

1-1: Type-only Time import is appropriate.

Keeps the module tree lean and clarifies branding usage.


11-16: Init/Data shape changes verified. All watch/audio call sites—including init in index.ts, worklet handling in render-worklet.ts, and ring-buffer tests—use rate, channels, latency: Time.Milli, and timestamp: Time.Micro with no leftover sampleRate/channelCount or DOMHighResTimeStamp usage.

js/hang/src/watch/audio/ring-buffer.ts (1)

17-18: Time.Second API and import confirmed
Time.Second.fromMilli and .fromMicro exist and are correctly imported; no change required.

js/hang/src/watch/audio/index.ts (2)

170-175: Good: Consumer latency headroom to protect render buffer

Using Math.max(this.latency - JITTER_UNDERHEAD, 0) is a sensible hedge to prevent underflow. Nice.


119-126: worklet init shape matches new fields
I confirmed there are no remaining sampleRate or channelCount references in render-worklet.ts or any Render.Init consumers.

Comment on lines 151 to 160
group.close();
} finally {
if (group.sequence === this.#active) {
this.#active += 1;
if (this.#notify && this.#frames.at(0)?.group === this.#active) {
this.#notify();
this.#notify = undefined;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent decode from stalling when a group ends; drop stale frames and notify via skipTo

If the active group's reader finishes while frames from that group remain buffered, #active is incremented but those old frames aren't dropped. If no newer-group frames arrive, decode() can wait forever with this.#frames[0].group < this.#active. Call #skipTo(this.#active) in finally to prune stale frames, close old group clones, and wake any waiter.

Apply this diff:

       if (group.sequence === this.#active) {
         this.#active += 1;
-        if (this.#notify && this.#frames.at(0)?.group === this.#active) {
-          this.#notify();
-          this.#notify = undefined;
-        }
+        // Drop stale frames/groups from the previous active group and notify if unblocked.
+        this.#skipTo(this.#active);
       }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
group.close();
} finally {
if (group.sequence === this.#active) {
this.#active += 1;
if (this.#notify && this.#frames.at(0)?.group === this.#active) {
this.#notify();
this.#notify = undefined;
}
}
}
group.close();
} finally {
if (group.sequence === this.#active) {
this.#active += 1;
- if (this.#notify && this.#frames.at(0)?.group === this.#active) {
- this.#notify();
- this.#notify = undefined;
// Drop stale frames/groups from the previous active group and notify if unblocked.
this.#skipTo(this.#active);
}
}
🤖 Prompt for AI Agents
In js/hang/src/frame.ts around lines 151 to 160, after the branch that
increments this.#active in the finally block, call this.#skipTo(this.#active) to
prune any buffered frames whose group is now stale, ensure any cloned/old group
resources are closed, and trigger/wake any awaiting decode() by notifying via
skipTo; place the call after updating this.#active (and after clearing
this.#notify) so stale frames are removed and waiting decoders are awakened.

Comment on lines 60 to +64
const gain = new GainNode(root.context, { gain: effect.get(this.volume) });
root.connect(gain);

gain.connect(root.context.destination); // speakers
effect.cleanup(() => gain.disconnect());

effect.set(this.#gain, gain);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure disconnection on disable and on teardown (don’t rely solely on nested cleanup).

Add an explicit root.disconnect(gain) on outer cleanup, and defensively disconnect when disabled so a toggle to false disconnects immediately even if cleanup ordering changes.

Apply:

 			const gain = new GainNode(root.context, { gain: effect.get(this.volume) });
 			root.connect(gain);
 
 			effect.set(this.#gain, gain);
+			effect.cleanup(() => {
+				try { root.disconnect(gain); } catch {}
+			});
 
-			effect.effect(() => {
-				// We only connect/disconnect when enabled to save power.
-				// Otherwise the worklet keeps running in the background returning 0s.
-				const enabled = effect.get(this.source.enabled);
-				if (!enabled) return;
-
-				gain.connect(root.context.destination); // speakers
-				effect.cleanup(() => gain.disconnect());
-			});
+			effect.effect((inner) => {
+				// Only connect when enabled to save power.
+				const enabled = inner.get(this.source.enabled);
+				if (enabled) {
+					gain.connect(root.context.destination); // speakers
+				} else {
+					try { gain.disconnect(); } catch {}
+				}
+				inner.cleanup(() => {
+					try { gain.disconnect(); } catch {}
+				});
+			});

Also applies to: 65-73

🤖 Prompt for AI Agents
In js/hang/src/watch/audio/emitter.ts around lines 60-64 (and similarly for
65-73), the GainNode is connected to root but not explicitly disconnected on
outer cleanup or immediately when disabled; add explicit disconnects: on the
outer/teardown cleanup call root.disconnect(gain) (guarding that gain exists),
and when toggling disabled set ensure you defensively disconnect the gain from
root and from any other connections (guarding null/undefined) and remove/clear
the stored reference so toggling to false disconnects immediately even if nested
cleanup ordering changes.

Comment on lines 28 to +36
process(_inputs: Float32Array[][], outputs: Float32Array[][], _parameters: Record<string, Float32Array>) {
const output = outputs[0];
const samplesRead = this.#buffer?.read(output) ?? 0;

// Not initialized yet, output silence
if (this.#buffer.length === 0 || output.length === 0) return true;
if (this.#refill) return true;

// No data available, output silence
const samples = Math.min(this.#writeIndex - this.#readIndex, output[0].length);
if (samples === 0) return true;

for (let channel = 0; channel < output.length; channel++) {
const dst = output[channel];
const src = this.#buffer[channel];

for (let i = 0; i < samples; i++) {
const readPos = (this.#readIndex + i) % src.length;
dst[i] = src[readPos];
}
if (samplesRead < output[0].length) {
this.#underflow += output[0].length - samplesRead;
} else if (this.#underflow > 0 && this.#buffer) {
console.warn(`audio underflow: ${Math.round((1000 * this.#underflow) / this.#buffer.rate)}ms`);
this.#underflow = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Zero-fill remainder of the output when partial reads occur (prevents garbage audio).

Read may return fewer samples than the render quantum; the remainder of the channel buffers should be explicitly zeroed. Also guard when this.#buffer is not yet initialized to output silence for the whole frame.

   process(_inputs: Float32Array[][], outputs: Float32Array[][], _parameters: Record<string, Float32Array>) {
     const output = outputs[0];
-    const samplesRead = this.#buffer?.read(output) ?? 0;
+    const frame = output?.[0]?.length ?? 0;
+    let samplesRead = 0;
+    if (frame > 0) {
+      if (this.#buffer) {
+        samplesRead = this.#buffer.read(output);
+      }
+      // Zero-fill remainder to avoid stale data
+      for (let ch = 0; ch < output.length; ch++) {
+        if (samplesRead < frame) output[ch].fill(0, samplesRead);
+      }
+    }
 
-    if (samplesRead < output[0].length) {
-      this.#underflow += output[0].length - samplesRead;
+    if (frame > 0 && samplesRead < frame) {
+      this.#underflow += frame - samplesRead;
     } else if (this.#underflow > 0 && this.#buffer) {
       console.warn(`audio underflow: ${Math.round((1000 * this.#underflow) / this.#buffer.rate)}ms`);
       this.#underflow = 0;
     }
 
     return true;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
process(_inputs: Float32Array[][], outputs: Float32Array[][], _parameters: Record<string, Float32Array>) {
const output = outputs[0];
const samplesRead = this.#buffer?.read(output) ?? 0;
// Not initialized yet, output silence
if (this.#buffer.length === 0 || output.length === 0) return true;
if (this.#refill) return true;
// No data available, output silence
const samples = Math.min(this.#writeIndex - this.#readIndex, output[0].length);
if (samples === 0) return true;
for (let channel = 0; channel < output.length; channel++) {
const dst = output[channel];
const src = this.#buffer[channel];
for (let i = 0; i < samples; i++) {
const readPos = (this.#readIndex + i) % src.length;
dst[i] = src[readPos];
}
if (samplesRead < output[0].length) {
this.#underflow += output[0].length - samplesRead;
} else if (this.#underflow > 0 && this.#buffer) {
console.warn(`audio underflow: ${Math.round((1000 * this.#underflow) / this.#buffer.rate)}ms`);
this.#underflow = 0;
process(_inputs: Float32Array[][], outputs: Float32Array[][], _parameters: Record<string, Float32Array>) {
const output = outputs[0];
const frame = output?.[0]?.length ?? 0;
let samplesRead = 0;
if (frame > 0) {
if (this.#buffer) {
samplesRead = this.#buffer.read(output);
}
// Zero-fill any unwritten samples to avoid stale audio data
for (let ch = 0; ch < output.length; ch++) {
if (samplesRead < frame) {
output[ch].fill(0, samplesRead);
}
}
}
if (frame > 0 && samplesRead < frame) {
this.#underflow += frame - samplesRead;
} else if (this.#underflow > 0 && this.#buffer) {
console.warn(`audio underflow: ${Math.round((1000 * this.#underflow) / this.#buffer.rate)}ms`);
this.#underflow = 0;
}
return true;
}
🤖 Prompt for AI Agents
In js/hang/src/watch/audio/render-worklet.ts around lines 28 to 36, partial
reads can leave the remainder of the output channels uninitialized (and when
this.#buffer is undefined the whole frame must be silence). Change the process
method so that if this.#buffer is falsy you zero all samples in every output
channel for the full render quantum; otherwise capture samplesRead and when
samplesRead < channelLength explicitly zero the remaining samples for every
output channel, and only update/compute underflow when this.#buffer existed; use
tight loops over channels and indices to fill zeros.

Comment on lines +157 to +160
if (group.subscribe >= this.#subscribeNext) {
throw new Error(`unknown subscription: id=${group.subscribe}`);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Throwing on unknown future subscription may crash caller; fail stream defensively or downgrade to warn

If the caller doesn’t catch this, it can tear down the task/event loop. Prefer resetting/closing the offending stream or logging-and-returning. If a hard-fail is intended (protocol violation), ensure the invoker catches and tears down the connection cleanly.

Suggested soft-fail within this hunk:

-			if (group.subscribe >= this.#subscribeNext) {
-				throw new Error(`unknown subscription: id=${group.subscribe}`);
-			}
+			if (group.subscribe >= this.#subscribeNext) {
+				console.warn(`unknown subscription: id=${group.subscribe} (next=${this.#subscribeNext})`);
+				return;
+			}

To verify handling, confirm all runGroup(...) call sites catch and handle errors:


🏁 Script executed:

#!/bin/bash
# Find runGroup invocations and show surrounding context
rg -nP -C3 '\brunGroup\s*\(' --type=ts js/

Length of output: 2845


Convert hard throw to soft-fail in subscriber.ts
runGroup calls in publisher.ts and connection.ts aren’t wrapped in try/catch—an uncaught throw here leads to an unhandled rejection. Replace the throw with a warning and early return:

@@ js/moq/src/lite/subscriber.ts:157-160
-   if (group.subscribe >= this.#subscribeNext) {
-     throw new Error(`unknown subscription: id=${group.subscribe}`);
-   }
+   if (group.subscribe >= this.#subscribeNext) {
+     console.warn(`unknown subscription: id=${group.subscribe} (next=${this.#subscribeNext})`);
+     return;
+   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (group.subscribe >= this.#subscribeNext) {
throw new Error(`unknown subscription: id=${group.subscribe}`);
}
if (group.subscribe >= this.#subscribeNext) {
console.warn(`unknown subscription: id=${group.subscribe} (next=${this.#subscribeNext})`);
return;
}
🤖 Prompt for AI Agents
In js/moq/src/lite/subscriber.ts around lines 157 to 160, replace the hard throw
when group.subscribe >= this.#subscribeNext with a soft-fail: log a warning that
includes the subscription id (and any relevant group/context) and then return
early (e.g., return false or undefined) instead of throwing, so callers of
runGroup won't receive an unhandled rejection; ensure the warning uses the
existing logger (or console.warn if none) and keep the method's return
consistent with callers' expectations.

@kixelated kixelated enabled auto-merge (squash) September 2, 2025 23:28
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
js/hang/src/watch/audio/emitter.ts (3)

48-51: Good: single writer for source.enabled eliminates flapping.

Deriving enabled from paused and muted centralizes ownership and fixes prior race/toggle loops.


63-66: Add outer cleanup to disconnect root→gain to prevent graph leaks.

Disconnect on teardown regardless of inner state; keeps the graph clean when root changes or emitter closes.

       const gain = new GainNode(root.context, { gain: effect.get(this.volume) });
       root.connect(gain);
 
       effect.set(this.#gain, gain);
+      effect.cleanup(() => {
+        try { root.disconnect(gain); } catch {}
+      });

68-76: Bug: nested effect uses the outer Effect for get/cleanup; disconnect never runs on re-evaluation.

Use the inner Effect instance so cleanup runs when enabled flips; also disconnect when disabled to avoid double-connections/amplification.

-      effect.effect(() => {
+      effect.effect((inner) => {
         // We only connect/disconnect when enabled to save power.
         // Otherwise the worklet keeps running in the background returning 0s.
-        const enabled = effect.get(this.source.enabled);
-        if (!enabled) return;
+        const enabled = inner.get(this.source.enabled);
+        if (!enabled) {
+          try { gain.disconnect(); } catch {}
+          return;
+        }
 
         gain.connect(root.context.destination); // speakers
-        effect.cleanup(() => gain.disconnect());
+        inner.cleanup(() => {
+          try { gain.disconnect(); } catch {}
+        });
       });
js/hang/src/frame.ts (2)

176-186: Precedence bug in latency logging is fixed

Parenthesized difference before division; logged ms are now correct.


110-164: Fix active-group advancement and drop stale frames via skipTo (condition inverted)

Currently increments #active when group.sequence !== #active, which is wrong. We should advance only when the active group finishes, then prune stale frames/groups and notify. As-is, this can both stall decode and incorrectly skip groups.

-		} finally {
-			if (group.sequence !== this.#active) {
-				// Advance to the next group.
-				// We don't use #skipTo because we don't want to drop the last frames.
-				this.#active += 1;
-
-				if (this.#notify && this.#frames.at(0)?.group === this.#active) {
-					this.#notify();
-					this.#notify = undefined;
-				}
-			}
-		}
+		} finally {
+			if (group.sequence === this.#active) {
+				// Advance and prune any stale frames/groups from the finished active group.
+				this.#active += 1;
+				this.#skipTo(this.#active);
+			}
+		}
🧹 Nitpick comments (5)
js/hang/src/watch/audio/emitter.ts (1)

35-36: Decouple paused default from muted (safer default).

Defaulting paused to muted can stop downloads when the user only intends mute-for-visualizations.

-    this.paused = Signal.from(props?.paused ?? props?.muted ?? false);
+    this.paused = Signal.from(props?.paused ?? false);
js/hang/src/frame.ts (4)

17-29: Simplify buffer sizing and clean up comment tone

Redundant ternaries and a casual tone in a TODO. Tighten both.

-	// TODO switch over to u64 for simplicity. The varint uses 8 bytes anyway after 18 minutes lul.
+	// TODO: Consider u64; QUIC varint becomes 8 bytes after ~18 minutes anyway.
-	const data = new Uint8Array(8 + (source instanceof Uint8Array ? source.byteLength : source.byteLength));
+	const payloadLen = source.byteLength;
+	const data = new Uint8Array(8 + payloadLen);
@@
-	return data.subarray(0, (source instanceof Uint8Array ? source.byteLength : source.byteLength) + size);
+	return data.subarray(0, payloadLen + size);

100-107: Avoid full-array sort on every push

this.#groups.push(...); this.#groups.sort(...) is O(n log n) per insertion. Consider binary insertion to keep it O(log n) compare + O(n) move, or maintain a deque indexed by sequence. Low priority unless groups are large.


128-139: Potential unbounded growth of #frames

If producers outpace consumption and latency never trips, #frames can grow without bound. Consider a cap (by count or time window) and a drop policy (oldest by group/timestamp) with metrics.


250-259: Confirm lifecycle ownership of TrackConsumer

Consumer.close() doesn’t close #track. If callers expect Consumer to own the underlying TrackConsumer, this may leak. Either close it here or document that callers must close TrackConsumer themselves.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8f5448f and 908460e.

📒 Files selected for processing (3)
  • js/hang/src/frame.ts (4 hunks)
  • js/hang/src/watch/audio/emitter.ts (2 hunks)
  • js/hang/src/watch/audio/ring-buffer.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • js/hang/src/watch/audio/ring-buffer.ts
🧰 Additional context used
📓 Path-based instructions (2)
{rs,js}/hang/**

📄 CodeRabbit inference engine (CLAUDE.md)

All media logic belongs in the hang layer (rs/hang and js/hang)

Files:

  • js/hang/src/watch/audio/emitter.ts
  • js/hang/src/frame.ts
js/**

📄 CodeRabbit inference engine (CLAUDE.md)

Place JS/TS packages under the js/ directory

Files:

  • js/hang/src/watch/audio/emitter.ts
  • js/hang/src/frame.ts
🧬 Code graph analysis (2)
js/hang/src/watch/audio/emitter.ts (2)
js/hang/src/watch/audio/index.ts (3)
  • effect (84-129)
  • effect (131-141)
  • effect (143-190)
js/signals/src/index.ts (1)
  • effect (310-320)
js/hang/src/frame.ts (4)
js/hang/src/time.ts (4)
  • Micro (13-13)
  • Micro (15-23)
  • Milli (25-25)
  • Milli (27-35)
js/moq/src/track.ts (3)
  • TrackConsumer (143-322)
  • nextFrame (199-232)
  • nextGroup (179-192)
js/moq/src/group.ts (1)
  • GroupConsumer (78-147)
js/signals/src/index.ts (1)
  • Effect (88-459)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (1)
js/hang/src/frame.ts (1)

32-35: Typed decode looks good

Return type branded with Time.Micro is consistent with encode.

Comment on lines +223 to +231
// We have frames but not from the active group
// Check if we should move to the next group
const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active);
if (nextGroupFrames.length > 0) {
// Move to the next group
const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group));
this.#skipTo(nextGroup);
continue;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Honor latency policy; don’t skip just because newer-group frames exist

This unconditionally drops the remainder of the active group whenever any newer group is buffered, ignoring #latency. Defer to #checkLatency() (which enforces the policy) and only skip when it decides to.

-				// We have frames but not from the active group
-				// Check if we should move to the next group
-				const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active);
-				if (nextGroupFrames.length > 0) {
-					// Move to the next group
-					const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group));
-					this.#skipTo(nextGroup);
-					continue;
-				}
+				// Not from the active group; enforce latency policy before considering skips.
+				this.#checkLatency();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// We have frames but not from the active group
// Check if we should move to the next group
const nextGroupFrames = this.#frames.filter((f) => f.group > this.#active);
if (nextGroupFrames.length > 0) {
// Move to the next group
const nextGroup = Math.min(...nextGroupFrames.map((f) => f.group));
this.#skipTo(nextGroup);
continue;
}
// Not from the active group; enforce latency policy before considering skips.
this.#checkLatency();
🤖 Prompt for AI Agents
In js/hang/src/frame.ts around lines 223 to 231, the code currently
unconditionally advances to the next group when any newer-group frames are
buffered, which ignores the latency policy; change it to consult the latency
policy first by calling this.#checkLatency() and only call
this.#skipTo(nextGroup) when that check returns true. Compute nextGroup as
before from nextGroupFrames, but replace the unconditional skip with an if
(this.#checkLatency()) { this.#skipTo(nextGroup); } else { break/return/wait
(i.e., stop dropping the rest of the active group) } so that skipping occurs
only when the latency policy allows it.

@kixelated kixelated merged commit ffb4434 into main Sep 2, 2025
3 checks passed
@kixelated kixelated deleted the audio-jitter branch September 2, 2025 23:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants