Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions js/hang-demo/src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

<h3>Other demos:</h3>
<ul>
<li><a href="watch-mse.html">Watch with MSE player (native video).</a></li>
<li><a href="publish.html">Publish a broadcast.</a></li>
<li><a href="meet.html">Watch a room of broadcasts.</a></li>
<li><a href="support.html">Check browser support.</a></li>
Expand Down
175 changes: 175 additions & 0 deletions js/hang-demo/src/mse/moq-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import * as Catalog from "@moq/hang/catalog";
import * as Moq from "@moq/lite";

export interface MoqClientConfig {
relayUrl: string;
broadcastName: string;
onData?: (data: Uint8Array, trackType: "video" | "audio") => void;
onCatalog?: (catalog: Catalog.Root) => void;
onError?: (error: Error) => void;
onConnected?: () => void;
onDisconnected?: () => void;
}

export class MoqClient {
private connection: Moq.Connection.Established | null = null;
private broadcast: Moq.Broadcast | null = null;
private config: MoqClientConfig;
private isRunning = false;

constructor(config: MoqClientConfig) {
this.config = config;
}

async connect(): Promise<void> {
try {
this.isRunning = true;

// Connect to relay using @moq/lite
this.connection = await Moq.Connection.connect(new URL(this.config.relayUrl));
console.log("[MoqClient] Connected to relay:", this.config.relayUrl);

this.config.onConnected?.();

// Subscribe to broadcast
this.broadcast = this.connection.consume(Moq.Path.from(this.config.broadcastName));
console.log("[MoqClient] Subscribed to broadcast:", this.config.broadcastName);

// First, fetch the catalog to get track names
const catalog = await this.fetchCatalog();

if (!catalog) {
console.warn("[MoqClient] No catalog received, using default track names");
// Fallback to default names
await this.subscribeToTracks("video0", "audio1");
} else {
this.config.onCatalog?.(catalog);

// Get track names from catalog
const videoTrackName = this.getVideoTrackName(catalog);
const audioTrackName = this.getAudioTrackName(catalog);

console.log("[MoqClient] Using tracks from catalog:", { videoTrackName, audioTrackName });

await this.subscribeToTracks(videoTrackName, audioTrackName);
}
} catch (error) {
console.error("[MoqClient] Connection error:", error);
this.config.onError?.(error as Error);
throw error;
}
}

private async fetchCatalog(): Promise<Catalog.Root | null> {
if (!this.broadcast) return null;

console.log("[MoqClient] Fetching catalog.json...");
const catalogTrack = this.broadcast.subscribe("catalog.json", 100);

try {
// Wait for catalog with timeout
const frame = await Promise.race([
catalogTrack.readFrame(),
new Promise<null>((resolve) => setTimeout(() => resolve(null), 5000)),
]);

if (!frame) {
console.warn("[MoqClient] Catalog fetch timed out");
return null;
}

// Use the catalog decode function from @moq/hang
const catalog = Catalog.decode(frame);
console.log("[MoqClient] Received catalog:", catalog);

return catalog;
} catch (error) {
console.warn("[MoqClient] Error fetching catalog:", error);
return null;
}
}
Comment on lines +63 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Catalog track resource may leak on timeout.

The catalogTrack subscription created on line 67 is never explicitly closed if the timeout occurs (line 73) or if an error is thrown (line 87). While the track may be closed when this.broadcast is closed later, it's better to explicitly clean up to avoid potential resource accumulation during reconnects.

🔎 Proposed fix
 private async fetchCatalog(): Promise<Catalog.Root | null> {
   if (!this.broadcast) return null;

   console.log("[MoqClient] Fetching catalog.json...");
   const catalogTrack = this.broadcast.subscribe("catalog.json", 100);

   try {
     // Wait for catalog with timeout
     const frame = await Promise.race([
       catalogTrack.readFrame(),
       new Promise<null>((resolve) => setTimeout(() => resolve(null), 5000)),
     ]);

     if (!frame) {
       console.warn("[MoqClient] Catalog fetch timed out");
+      catalogTrack.close?.();
       return null;
     }

     // Use the catalog decode function from @moq/hang
     const catalog = Catalog.decode(frame);
     console.log("[MoqClient] Received catalog:", catalog);

     return catalog;
   } catch (error) {
     console.warn("[MoqClient] Error fetching catalog:", error);
     return null;
+  } finally {
+    catalogTrack.close?.();
   }
 }
🤖 Prompt for AI Agents
In js/hang-demo/src/mse/moq-client.ts around lines 63 to 90, the catalogTrack
subscription created on line 67 is not cleaned up on the timeout path or when an
exception occurs; ensure you always close the subscription: wrap the await
Promise.race(...) and subsequent decode in a try/finally (or add explicit
cleanup in each return path) and call the track's close/unsubscribe method
(e.g., catalogTrack.close() or catalogTrack.unsubscribe() depending on the API)
in the finally block so the subscription is released on success, timeout, or
error.


private getVideoTrackName(catalog: Catalog.Root): string | null {
if (!catalog.video?.renditions) return null;

// The rendition key (e.g., "video0") IS the track name
const renditionKeys = Object.keys(catalog.video.renditions);
if (renditionKeys.length > 0) {
return renditionKeys[0];
}
return null;
}

private getAudioTrackName(catalog: Catalog.Root): string | null {
if (!catalog.audio?.renditions) return null;

// The rendition key (e.g., "audio1") IS the track name
const renditionKeys = Object.keys(catalog.audio.renditions);
if (renditionKeys.length > 0) {
return renditionKeys[0];
}
return null;
}

private async subscribeToTracks(videoTrackName: string | null, audioTrackName: string | null): Promise<void> {
if (!this.broadcast) return;

const promises: Promise<void>[] = [];

if (videoTrackName) {
const videoTrack = this.broadcast.subscribe(videoTrackName, 2);
console.log("[MoqClient] Subscribed to video track:", videoTrackName);
promises.push(this.processTrack(videoTrack, "video"));
}

if (audioTrackName) {
const audioTrack = this.broadcast.subscribe(audioTrackName, 2);
console.log("[MoqClient] Subscribed to audio track:", audioTrackName);
promises.push(this.processTrack(audioTrack, "audio"));
}

await Promise.all(promises);
}
Comment on lines +114 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Early rejection in subscribeToTracks could leave one track processing.

If processTrack for one track throws an error before the other starts processing, Promise.all will reject immediately, but the other processTrack call (if already started) will continue running in the background since errors are caught internally at line 154. While isRunning will eventually stop the loop when disconnect() is called, there's a window where one track could continue processing after subscribeToTracks has rejected.

This is a minor edge case but could cause unexpected behavior during error recovery.

🔎 Proposed fix

Consider using Promise.allSettled to ensure both tracks are properly tracked even if one fails:

-    await Promise.all(promises);
+    const results = await Promise.allSettled(promises);
+    const failures = results.filter(r => r.status === 'rejected');
+    if (failures.length > 0) {
+      throw new Error(`Track processing failed: ${failures.map(f => (f as PromiseRejectedResult).reason).join(', ')}`);
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async subscribeToTracks(videoTrackName: string | null, audioTrackName: string | null): Promise<void> {
if (!this.broadcast) return;
const promises: Promise<void>[] = [];
if (videoTrackName) {
const videoTrack = this.broadcast.subscribe(videoTrackName, 2);
console.log("[MoqClient] Subscribed to video track:", videoTrackName);
promises.push(this.processTrack(videoTrack, "video"));
}
if (audioTrackName) {
const audioTrack = this.broadcast.subscribe(audioTrackName, 2);
console.log("[MoqClient] Subscribed to audio track:", audioTrackName);
promises.push(this.processTrack(audioTrack, "audio"));
}
await Promise.all(promises);
}
private async subscribeToTracks(videoTrackName: string | null, audioTrackName: string | null): Promise<void> {
if (!this.broadcast) return;
const promises: Promise<void>[] = [];
if (videoTrackName) {
const videoTrack = this.broadcast.subscribe(videoTrackName, 2);
console.log("[MoqClient] Subscribed to video track:", videoTrackName);
promises.push(this.processTrack(videoTrack, "video"));
}
if (audioTrackName) {
const audioTrack = this.broadcast.subscribe(audioTrackName, 2);
console.log("[MoqClient] Subscribed to audio track:", audioTrackName);
promises.push(this.processTrack(audioTrack, "audio"));
}
const results = await Promise.allSettled(promises);
const failures = results.filter(r => r.status === 'rejected');
if (failures.length > 0) {
throw new Error(`Track processing failed: ${failures.map(f => (f as PromiseRejectedResult).reason).join(', ')}`);
}
}
🤖 Prompt for AI Agents
In js/hang-demo/src/mse/moq-client.ts around lines 114 to 132, subscribeToTracks
uses Promise.all which can reject early and leave the other processTrack
running; change to use Promise.allSettled so both track processing promises are
awaited to completion, then examine the settled results to log or rethrow errors
as appropriate (e.g., if any promise rejected, surface a combined error or
rethrow the first failure) to ensure both tracks are tracked and cleaned up
before subscribeToTracks returns.


private async processTrack(track: Moq.Track, trackType: "video" | "audio"): Promise<void> {
console.log(`[MoqClient] Processing ${trackType} track:`, track.name);

try {
while (this.isRunning) {
const group = await track.nextGroup();
if (!group) {
console.log(`[MoqClient] ${trackType} track ended`);
break;
}

while (this.isRunning) {
const frame = await group.readFrame();
if (!frame) break;

// For segment mode, the frame IS the complete fMP4 segment
// No timestamp stripping needed
this.config.onData?.(frame, trackType);
}
}
} catch (error) {
if (this.isRunning) {
console.error(`[MoqClient] Error processing ${trackType} track:`, error);
this.config.onError?.(error as Error);
}
}
}
Comment on lines +134 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential race condition when setting isRunning = false during active processing.

In processTrack, the while (this.isRunning) checks on lines 138 and 145 could create a race where:

  1. Line 138 checks isRunning (true) and enters outer loop
  2. disconnect() is called, setting isRunning = false
  3. Line 139 awaits nextGroup() which could take time
  4. Line 145 checks isRunning but line 146 still awaits readFrame()

While the error handling at line 155 checks isRunning before logging, the loop could still process one more frame after disconnect is called. This is a minor timing issue but could cause confusion during debugging.

Consider checking isRunning immediately after each await:

🔎 Proposed fix
 private async processTrack(track: Moq.Track, trackType: "video" | "audio"): Promise<void> {
   console.log(`[MoqClient] Processing ${trackType} track:`, track.name);

   try {
     while (this.isRunning) {
       const group = await track.nextGroup();
+      if (!this.isRunning) break;
       if (!group) {
         console.log(`[MoqClient] ${trackType} track ended`);
         break;
       }

       while (this.isRunning) {
         const frame = await group.readFrame();
+        if (!this.isRunning) break;
         if (!frame) break;

         // For segment mode, the frame IS the complete fMP4 segment
         // No timestamp stripping needed
         this.config.onData?.(frame, trackType);
       }
     }
   } catch (error) {
     if (this.isRunning) {
       console.error(`[MoqClient] Error processing ${trackType} track:`, error);
       this.config.onError?.(error as Error);
     }
   }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async processTrack(track: Moq.Track, trackType: "video" | "audio"): Promise<void> {
console.log(`[MoqClient] Processing ${trackType} track:`, track.name);
try {
while (this.isRunning) {
const group = await track.nextGroup();
if (!group) {
console.log(`[MoqClient] ${trackType} track ended`);
break;
}
while (this.isRunning) {
const frame = await group.readFrame();
if (!frame) break;
// For segment mode, the frame IS the complete fMP4 segment
// No timestamp stripping needed
this.config.onData?.(frame, trackType);
}
}
} catch (error) {
if (this.isRunning) {
console.error(`[MoqClient] Error processing ${trackType} track:`, error);
this.config.onError?.(error as Error);
}
}
}
private async processTrack(track: Moq.Track, trackType: "video" | "audio"): Promise<void> {
console.log(`[MoqClient] Processing ${trackType} track:`, track.name);
try {
while (this.isRunning) {
const group = await track.nextGroup();
if (!this.isRunning) break;
if (!group) {
console.log(`[MoqClient] ${trackType} track ended`);
break;
}
while (this.isRunning) {
const frame = await group.readFrame();
if (!this.isRunning) break;
if (!frame) break;
// For segment mode, the frame IS the complete fMP4 segment
// No timestamp stripping needed
this.config.onData?.(frame, trackType);
}
}
} catch (error) {
if (this.isRunning) {
console.error(`[MoqClient] Error processing ${trackType} track:`, error);
this.config.onError?.(error as Error);
}
}
}
🤖 Prompt for AI Agents
In js/hang-demo/src/mse/moq-client.ts around lines 134 to 160, processTrack can
continue processing a group/frame after disconnect() flips isRunning to false
because awaits aren't re-checked; after each await (after nextGroup() and after
readFrame()) immediately test this.isRunning and break/return if false so no
further frames are processed, and ensure the catch logging still guards on
isRunning to avoid spurious error reports.


disconnect(): void {
this.isRunning = false;
if (this.broadcast) {
this.broadcast.close();
this.broadcast = null;
}
if (this.connection) {
this.connection.close();
this.connection = null;
}
this.config.onDisconnected?.();
console.log("[MoqClient] Disconnected");
}
}
Loading
Loading