Skip to content

Conversation

@kixelated
Copy link
Collaborator

#761 But with merge conflicts fixed.

@sebastianpiq I changed the pub-hls so it's simpler (one ffmpeg command). I think there's still a lot of improvements that can be made but this is a good start; let's merge it now to prevent more conflicts.

sebastianpiq and others added 23 commits December 8, 2025 14:31
@kixelated kixelated enabled auto-merge (squash) December 19, 2025 03:49
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 19, 2025

Walkthrough

The PR adds HLS ingest and publish support and refactors format handling to typed enums. It replaces the previous Import/ImportType pipeline with a new Publish abstraction that supports Avc3, fMP4 and HLS, moves stream decoding into a publish module, and introduces an HLS importer that fetches and decodes playlists/segments. Decoder APIs were changed to use DecoderFormat and optional timestamps with wall-clock fallback. CLI justfile targets and commands were extended for HLS generation and serving; several modules/files were added, removed, or renamed.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add HLS import module' directly describes the main change—introducing a new HLS ingest module with full implementation including public HlsConfig and Hls structs, integration into the import system, and supporting infrastructure changes.
Description check ✅ Passed The description references PR #761 (merge conflicts fixed), mentions simplification of pub-hls to a single ffmpeg command, and notes this is a good starting point despite possible future improvements—all related to the HLS module addition work.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/hls-ingest-module

Comment @coderabbitai help to get the list of available commands and usage tips.

@kixelated
Copy link
Collaborator Author

Also I added support to load from disk instead of only over HTTP, so we don't need the python simple HTTP server.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (8)
rs/hang-cli/Cargo.toml (1)

22-22: Consider using a broader version specifier for consistency.

The hang crate uses bytes = "1" while this crate pins to "1.10". For consistency across the workspace and to avoid potential version conflicts, consider using the same version specifier "1" or adding bytes to the workspace dependencies.

justfile (2)

199-208: Cleanup trap should be defined before starting ffmpeg.

The cleanup function and trap are set after starting ffmpeg. If the script is interrupted between starting ffmpeg (line 165) and setting the trap (line 205), ffmpeg would be orphaned.

🔎 Proposed fix: Move trap setup earlier
+	# Trap to clean up ffmpeg on exit
+	FFMPEG_PID=""
+	cleanup() {
+		echo "Shutting down..."
+		[ -n "$FFMPEG_PID" ] && kill $FFMPEG_PID 2>/dev/null || true
+		exit 0
+	}
+	trap cleanup SIGINT SIGTERM
+
 	# Start ffmpeg in the background to generate HLS
 	ffmpeg -loglevel warning -re -stream_loop -1 -i "$INPUT" \
 		...
 		"$OUT_DIR/v%v/stream.m3u8" &

 	FFMPEG_PID=$!

 	# Wait for master playlist to be generated
 	...
-
-	# Trap to clean up ffmpeg on exit
-	cleanup() {
-		echo "Shutting down..."
-		kill $FFMPEG_PID 2>/dev/null || true
-		exit 0
-	}
-	trap cleanup SIGINT SIGTERM

389-391: Consider dynamic wait for HLS segments.

The 2-second sleep assumes ffmpeg will have generated content by then. For consistency with pub-hls (which waits for master.m3u8), consider polling for the master playlist before starting the HTTP server.

rs/hang-cli/src/publish.rs (1)

76-80: Consider extracting the stdin decode loop to reduce duplication.

Both init() and run() contain similar stdin reading loops for the Decoder path. Consider extracting a helper method to reduce duplication.

🔎 Potential refactor
async fn decode_stdin_loop(
    decoder: &mut Decoder,
    buffer: &mut BytesMut,
    stop_when_initialized: bool,
) -> anyhow::Result<()> {
    let mut input = tokio::io::stdin();
    while input.read_buf(buffer).await? > 0 {
        decoder.decode_stream(buffer)?;
        if stop_when_initialized && decoder.is_initialized() {
            break;
        }
    }
    Ok(())
}

Also applies to: 91-95

rs/hang/src/import/hls.rs (4)

148-162: Consider adding explicit cancellation support.

The run() method loops indefinitely with no explicit cancellation mechanism. While the caller can abort the task, consider accepting a CancellationToken or tokio::sync::watch::Receiver for graceful shutdown, especially for clean resource cleanup.


361-382: Media segment bytes are not verified to be fully consumed.

The init segment check ensures bytes.is_empty() after decoding (line 350), but push_segment does not verify that all bytes were consumed. This inconsistency could mask parsing issues in media segments.

🔎 Suggested fix
 importer.decode(&mut bytes).context("failed to parse media segment")?;
+if !bytes.is_empty() {
+    warn!(remaining = bytes.len(), "media segment had unconsumed bytes");
+}
 track.next_sequence = Some(sequence + 1);

536-557: Test coverage is minimal for this module's complexity.

Consider adding tests for:

  • parse_playlist with various URL formats and file paths
  • select_variants with different master playlist configurations
  • select_audio edge cases (no default, missing group)
  • resolve_uri with absolute and relative URIs

The helper functions are pure and easy to unit test.


407-414: Consider using resize_with for cleaner implementation.

The while loop pattern works but resize_with is more idiomatic for this use case.

🔎 Alternative implementation
 fn ensure_video_importer_for(&mut self, index: usize) -> &mut Fmp4 {
-    while self.video_importers.len() <= index {
-        let importer = Fmp4::new(self.broadcast.clone());
-        self.video_importers.push(importer);
-    }
-
-    self.video_importers.get_mut(index).unwrap()
+    if self.video_importers.len() <= index {
+        self.video_importers.resize_with(index + 1, || Fmp4::new(self.broadcast.clone()));
+    }
+    &mut self.video_importers[index]
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ca82289 and d951e4c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • .gitignore (1 hunks)
  • justfile (2 hunks)
  • rs/hang-cli/Cargo.toml (1 hunks)
  • rs/hang-cli/src/client.rs (1 hunks)
  • rs/hang-cli/src/import.rs (0 hunks)
  • rs/hang-cli/src/main.rs (4 hunks)
  • rs/hang-cli/src/publish.rs (1 hunks)
  • rs/hang-cli/src/server.rs (2 hunks)
  • rs/hang/Cargo.toml (1 hunks)
  • rs/hang/src/error.rs (2 hunks)
  • rs/hang/src/import/aac.rs (3 hunks)
  • rs/hang/src/import/avc3.rs (6 hunks)
  • rs/hang/src/import/decoder.rs (5 hunks)
  • rs/hang/src/import/fmp4.rs (4 hunks)
  • rs/hang/src/import/hls.rs (1 hunks)
  • rs/hang/src/import/mod.rs (1 hunks)
  • rs/hang/src/import/stream.rs (0 hunks)
  • rs/hang/src/lib.rs (1 hunks)
  • rs/libmoq/src/publish.rs (2 hunks)
💤 Files with no reviewable changes (2)
  • rs/hang-cli/src/import.rs
  • rs/hang/src/import/stream.rs
🧰 Additional context used
📓 Path-based instructions (2)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/hang/src/lib.rs
  • rs/hang/src/error.rs
  • rs/hang/src/import/mod.rs
  • rs/hang/src/import/fmp4.rs
  • rs/hang-cli/src/client.rs
  • rs/hang-cli/src/publish.rs
  • rs/hang-cli/src/main.rs
  • rs/hang-cli/src/server.rs
  • rs/hang/src/import/hls.rs
  • rs/hang/src/import/decoder.rs
  • rs/libmoq/src/publish.rs
  • rs/hang/src/import/avc3.rs
  • rs/hang/src/import/aac.rs
rs/**/Cargo.toml

📄 CodeRabbit inference engine (CLAUDE.md)

For Rust development, use the workspace configuration in rs/Cargo.toml

Files:

  • rs/hang/Cargo.toml
  • rs/hang-cli/Cargo.toml
🧠 Learnings (1)
📚 Learning: 2025-12-10T04:00:14.871Z
Learnt from: CR
Repo: moq-dev/moq PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T04:00:14.871Z
Learning: Applies to js/package.json : For JavaScript/TypeScript development, use bun workspaces with configuration in `js/package.json`

Applied to files:

  • .gitignore
🧬 Code graph analysis (7)
rs/hang/src/import/fmp4.rs (5)
rs/hang/src/catalog/root.rs (2)
  • new (151-156)
  • new (226-228)
rs/hang/src/model/track.rs (2)
  • new (30-36)
  • new (140-148)
rs/hang/src/import/aac.rs (1)
  • new (15-21)
rs/hang/src/import/avc3.rs (2)
  • new (32-40)
  • new (290-292)
rs/hang/src/import/decoder.rs (1)
  • new (37-45)
rs/hang-cli/src/publish.rs (5)
rs/hang/src/import/hls.rs (4)
  • new (38-40)
  • new (105-111)
  • new (116-133)
  • init (138-146)
rs/hang/src/model/broadcast.rs (3)
  • new (20-29)
  • new (77-80)
  • default (39-41)
rs/hang/src/import/aac.rs (1)
  • new (15-21)
rs/hang/src/import/avc3.rs (3)
  • new (32-40)
  • new (290-292)
  • init (42-100)
rs/hang/src/import/decoder.rs (1)
  • new (37-45)
rs/hang-cli/src/main.rs (3)
rs/hang-cli/src/server.rs (1)
  • server (14-41)
rs/hang-cli/src/client.rs (1)
  • client (6-31)
rs/hang-cli/src/publish.rs (1)
  • new (34-65)
rs/hang-cli/src/server.rs (1)
rs/hang-cli/src/publish.rs (2)
  • consume (67-69)
  • run (88-101)
rs/hang/src/import/decoder.rs (3)
rs/hang/src/import/fmp4.rs (1)
  • new (54-65)
rs/hang/src/import/aac.rs (2)
  • new (15-21)
  • pts (151-158)
rs/hang/src/import/avc3.rs (3)
  • new (32-40)
  • new (290-292)
  • pts (243-250)
rs/libmoq/src/publish.rs (1)
rs/hang/src/catalog/root.rs (1)
  • from_str (51-53)
rs/hang/src/import/aac.rs (2)
rs/hang/src/import/avc3.rs (2)
  • new (32-40)
  • new (290-292)
rs/hang/src/model/timestamp.rs (1)
  • from_micros (40-46)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (30)
rs/hang/src/error.rs (2)

68-75: LGTM!

The new error variants follow the established patterns in this enum: Http wraps in Arc for Clone compatibility (consistent with Json), and Url uses #[from] since url::ParseError is Clone.


90-95: LGTM!

The From<reqwest::Error> implementation correctly mirrors the From<serde_json::Error> pattern above.

rs/hang/Cargo.toml (2)

28-31: Good use of minimal feature set for reqwest.

Using default-features = false with only rustls-tls and gzip is a good practice that avoids pulling in native dependencies and keeps the dependency tree minimal.


35-35: LGTM!

The strum crate with derive feature enables convenient enum-to-string conversions, and url is appropriately moved to regular dependencies to support URL parsing in the HLS import module.

Also applies to: 39-39

.gitignore (1)

27-28: LGTM!

Adding *.m3u8 and *.m4s patterns is appropriate for excluding HLS playlist and segment files generated during development, consistent with the existing media file patterns.

rs/hang/src/import/mod.rs (1)

5-5: LGTM!

The new hls module follows the established pattern for other import modules, with private module declaration and public glob re-export.

Also applies to: 11-11

rs/libmoq/src/publish.rs (1)

35-37: Good improvement: Type-safe format parsing.

The transition from string-based format handling to DecoderFormat::from_str provides compile-time type safety and centralizes format validation. The error mapping to Error::UnknownFormat is appropriate.

rs/hang-cli/src/main.rs (2)

73-79: Clean initialization flow.

The early Publish::new and publish.init().await pattern ensures the broadcast is initialized from stdin before starting network operations. This correctly separates media initialization from the server/client dispatch.


37-38: Good use of subcommand for format selection.

Using #[command(subcommand)] for PublishFormat allows format-specific arguments (e.g., --playlist for HLS) while maintaining a clean CLI structure.

Also applies to: 63-64

rs/hang/src/import/aac.rs (2)

151-158: Wall-clock timestamp fallback implementation looks good.

The pts() helper correctly implements a monotonic timestamp fallback using tokio::time::Instant, consistent with the pattern in avc3.rs. The lazy initialization of zero on first use is appropriate.

The as u64 cast from u128 is safe for practical stream durations (would only overflow after ~584,942 years).


126-127: API change: Optional timestamp parameter.

The signature change from mandatory to Option<hang::Timestamp> enables HLS ingest and other sources that may not provide explicit timestamps. The internal fallback ensures frames always have valid timing.

rs/hang-cli/src/server.rs (2)

36-40: Clean concurrent task orchestration.

The tokio::select! correctly runs three concurrent tasks:

  • accept: handles incoming MoQ connections
  • publish.run(): reads stdin and decodes media (consumes self)
  • web: serves HTTP endpoints

The publish.consume() is called before publish.run() to obtain the BroadcastConsumer, which is then shared with connections.


14-18: Signature simplification with Publish abstraction.

The migration from input-stream-based parameters to a single Publish instance encapsulates format handling and broadcast management cleanly.

rs/hang/src/import/fmp4.rs (3)

31-33: Comment accurately describes the Arc-based sharing.

The updated comment clearly explains that catalog is a clone of the broadcast's catalog (via Arc) for mutable access, not a separate catalog. This aligns well with the change in new() and ensures maintainers understand the shared ownership.


54-58: LGTM!

The constructor now correctly clones the broadcast's catalog instead of creating a separate one. This ensures catalog updates are visible across all consumers of the broadcast and aligns with the new centralized publishing flow.


496-507: Drop implementation correctly uses the shared catalog.

The cleanup logic properly accesses self.broadcast.catalog.lock() to remove tracks, ensuring consistency with the new catalog sharing approach.

rs/hang-cli/src/client.rs (2)

6-14: Clean refactoring to Publish-based flow.

The function signature and publish flow are well-structured. The Publish abstraction cleanly encapsulates format-specific logic, and publish.consume() provides the broadcast consumer for publishing.


22-30: LGTM!

The select! block correctly handles the three lifecycle paths: publish streaming, session closure, and graceful shutdown via ctrl-c. The 100ms delay before returning on ctrl-c allows the close message to be sent.

rs/hang/src/import/avc3.rs (5)

26-28: Consistent wall-clock timestamp pattern.

The zero field and its initialization follow the same pattern as Aac (see rs/hang/src/import/aac.rs:14-20), ensuring consistent behavior across decoders when no external timestamp is provided.

Also applies to: 38-38


102-111: New initialize method for pre-parsing SPS/PPS.

This method allows parsing configuration NALs before streaming begins, which is useful for HLS ingest where initialization data may be separate. Passing None to decode_nal is correct since no frames should be emitted during initialization.


117-132: LGTM!

The decode_stream method correctly resolves the timestamp early via pts() and then passes Some(pts) to decode_nal. This ensures consistent timestamp handling throughout the decode path.


215-222: Safe timestamp requirement when flushing frames.

The pts.context("missing timestamp")? at line 222 correctly ensures a timestamp is available when actually writing a frame. During initialize(), maybe_start_frame will return early at line 218 since contains_slice is false for non-slice NALs (SPS/PPS), avoiding the missing timestamp error.


243-250: Wall-clock fallback implementation.

The pts() helper correctly returns the provided hint or computes elapsed time from a lazily-initialized Instant. The as_micros() as u64 cast is safe for practical durations (overflow would require ~584,000 years of uptime).

rs/hang-cli/src/publish.rs (2)

10-20: Well-designed format enum with CLI integration.

The PublishFormat enum cleanly maps to CLI subcommands via clap::Subcommand. The comment on line 14 helpfully explains why AAC is excluded (requires framing).


72-86: Buffer correctly persists between init() and run().

The self.buffer field ensures any data read during init() but not consumed (e.g., partial NAL units) is available when run() continues. For HLS, the delegation to decoder.init() and decoder.run() is appropriate since HLS handles its own fetching.

One consideration: run() takes mut self, consuming the Publish instance—this prevents accidental reuse after the stream ends, which is good.

Also applies to: 88-101

rs/hang/src/import/decoder.rs (4)

7-16: Good use of typed enum with string parsing support.

The DecoderFormat enum with strum::EnumString provides type safety while still allowing string-based parsing if needed (e.g., from CLI or config). The lowercase serialization is a sensible default.


36-45: LGTM!

The constructor now uses the typed DecoderFormat enum, making the API more explicit and eliminating potential runtime errors from invalid format strings.


53-63: Avc3 now supports explicit initialization.

The initialize() method now correctly delegates to decoder.initialize(buf) for Avc3, allowing SPS/PPS to be parsed from a separate initialization segment (useful for HLS).


78-87: AAC stream decoding correctly returns an error.

AAC requires frame boundaries to decode properly, so bailing with an error for stream decoding is the correct behavior. The TODO comment suggests this could be made more type-safe in the future (e.g., via separate traits or enum variants).

rs/hang/src/import/hls.rs (1)

528-534: LGTM!

The URI resolution logic correctly handles both absolute and relative URIs, following HLS specification conventions.

h264-parser = "0.4.0"
hex = "0.4"
lazy_static = "1"
m3u8-rs = "5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Update m3u8-rs to version 6.0.0.

Version 6.0.0 is the current latest stable version. Change the dependency to m3u8-rs = "6" to use a more recent release.

🤖 Prompt for AI Agents
In rs/hang/Cargo.toml around line 23 the dependency m3u8-rs is pinned to version
"5"; update the dependency declaration to use the newer stable release by
changing it to m3u8-rs = "6" so Cargo pulls version 6.0.0 (or the compatible 6.x
series).

Comment on lines +4 to +6
//! feed their fMP4 segments into a `hang` broadcast. It is designed to be
//! independent of any particular HTTP client; callers provide an implementation
//! of [`Fetcher`] to perform the actual network I/O.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Documentation mentions Fetcher trait that doesn't exist.

The module doc says "callers provide an implementation of Fetcher" but the actual implementation uses reqwest::Client directly via HlsConfig.client. Consider updating the documentation to reflect the actual design.

🤖 Prompt for AI Agents
In rs/hang/src/import/hls.rs around lines 4 to 6, the module doc incorrectly
references a non-existent `Fetcher` trait; update the documentation to match the
actual design by replacing the sentence that says "callers provide an
implementation of `Fetcher` to perform the actual network I/O" with a brief
description that callers supply an HTTP client via `HlsConfig.client` (e.g., a
`reqwest::Client`) or, if preferred, note that a custom client implementing the
expected interface can be provided on `HlsConfig`; keep the doc concise and
accurate to the current implementation.

Comment on lines +118 to +123
let client = cfg.client.unwrap_or_else(|| {
Client::builder()
.user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")))
.build()
.unwrap()
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

unwrap() on client build can panic in edge cases.

Client::builder().build() can fail if the TLS backend fails to initialize (e.g., missing system certificates). Consider propagating the error instead of panicking.

🔎 Suggested fix
 let client = cfg.client.unwrap_or_else(|| {
-    Client::builder()
+    Client::builder()
         .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")))
         .build()
-        .unwrap()
+        .expect("failed to build HTTP client - TLS initialization error")
 });

Or better, propagate the error:

let client = match cfg.client {
    Some(c) => c,
    None => Client::builder()
        .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")))
        .build()
        .context("failed to build HTTP client")?,
};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let client = cfg.client.unwrap_or_else(|| {
Client::builder()
.user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")))
.build()
.unwrap()
});
let client = cfg.client.unwrap_or_else(|| {
Client::builder()
.user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")))
.build()
.expect("failed to build HTTP client - TLS initialization error")
});
🤖 Prompt for AI Agents
In rs/hang/src/import/hls.rs around lines 118 to 123, the code unconditionally
calls .unwrap() on Client::builder().build(), which can panic if the HTTP/TLS
client fails to initialize; change this to propagate the error instead of
panicking by matching on cfg.client and returning the built client via ? (or
map_err/context) when build() fails, e.g., use Some(c) => c, None =>
Client::builder()...build().context("failed to build HTTP client")? so the
function returns a proper error rather than panic.

Comment on lines +174 to +182
let video_tracks = std::mem::take(&mut self.video);
for (index, mut track) in video_tracks.into_iter().enumerate() {
let playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
let count = self
.consume_segments(TrackKind::Video(index), &mut track, &playlist)
.await?;
buffered += count;
self.video.push(track);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Track state may be lost on error during iteration.

If consume_segments fails mid-iteration, processed tracks are pushed back to self.video, but remaining tracks from the original vector are lost. Consider using indexed iteration to avoid this issue.

🔎 Suggested approach
-let video_tracks = std::mem::take(&mut self.video);
-for (index, mut track) in video_tracks.into_iter().enumerate() {
-    let playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
-    let count = self
-        .consume_segments(TrackKind::Video(index), &mut track, &playlist)
-        .await?;
-    buffered += count;
-    self.video.push(track);
-}
+for index in 0..self.video.len() {
+    let playlist_url = self.video[index].playlist.clone();
+    let playlist = self.fetch_media_playlist(playlist_url).await?;
+    let count = self
+        .consume_segments(TrackKind::Video(index), &mut self.video[index], &playlist)
+        .await?;
+    buffered += count;
+}

This requires adjusting consume_segments to accept a mutable reference from the vector directly rather than a moved value.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let video_tracks = std::mem::take(&mut self.video);
for (index, mut track) in video_tracks.into_iter().enumerate() {
let playlist = self.fetch_media_playlist(track.playlist.clone()).await?;
let count = self
.consume_segments(TrackKind::Video(index), &mut track, &playlist)
.await?;
buffered += count;
self.video.push(track);
}
for index in 0..self.video.len() {
let playlist_url = self.video[index].playlist.clone();
let playlist = self.fetch_media_playlist(playlist_url).await?;
let count = self
.consume_segments(TrackKind::Video(index), &mut self.video[index], &playlist)
.await?;
buffered += count;
}
🤖 Prompt for AI Agents
In rs/hang/src/import/hls.rs around lines 174 to 182, the current loop takes
ownership of the entire self.video vector and if consume_segments returns an
error mid-iteration the unprocessed tracks are lost; change the loop to iterate
by index and call consume_segments with a mutable reference into self.video
(i.e., do not std::mem::take the vector), or alternatively restore unprocessed
items on error — update consume_segments signature to accept &mut Track (not a
moved Track) so it operates in-place and ensure you only advance the index after
a successful call so partial failures do not discard remaining tracks.

Comment on lines +395 to +398
} else {
let response = self.client.get(url).send().await?;
let response = response.error_for_status()?;
let bytes = response.bytes().await.context("failed to read response body")?;
Ok(bytes)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

HTTP requests have no timeout configured.

If the remote server is slow or unresponsive, requests will hang indefinitely. Consider configuring a timeout on the client or per-request.

🔎 Suggested fix at client creation
 let client = cfg.client.unwrap_or_else(|| {
     Client::builder()
         .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")))
+        .timeout(Duration::from_secs(30))
+        .connect_timeout(Duration::from_secs(10))
         .build()
         .unwrap()
 });

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In rs/hang/src/import/hls.rs around lines 395 to 400, the HTTP request uses
self.client.get(...).send().await without any timeout and can hang indefinitely;
update the code to enforce a timeout by configuring the reqwest::Client at
creation (Client::builder().timeout(Duration::from_secs(...)).build() ) or apply
a per-request timeout (e.g., wrap the future with tokio::time::timeout or use
reqwest's per-request timeout if available), choose a sensible default timeout,
handle the timeout error by mapping it into the existing error flow (adding
context like "request timed out"), and ensure the client creation sites are
updated so self.client always has a timeout configured if you pick the
client-level approach.

Comment on lines +461 to +466
// Map codec strings into a coarse "family" so we can prefer H.264 over others.
fn codec_family(codec: &str) -> Option<&'static str> {
if codec.starts_with("avc1.") || codec.starts_with("avc3.") {
Some("h264")
} else {
None
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for codec references in the codebase
rg -n "hvc1|hev1|av01|vp09|dvh1" --type rust rs/hang/src/
# Also check the broader context of codec handling
rg -n "codec" --type rust rs/hang/src/import/ -A 2 -B 2 | head -100

Repository: moq-dev/moq

Length of output: 8292


🏁 Script executed:

#!/bin/bash
# Read the full file to understand codec_family and select_variants
cat -n rs/hang/src/import/hls.rs | sed -n '450,530p'

Repository: moq-dev/moq

Length of output: 3051


codec_family only recognizes H.264, silently filtering HEVC, VP9, and AV1 variants.

The function returns None for all codecs except avc1.* and avc3.*, causing variants with HEVC (hvc1/hev1), VP9 (vp09), or AV1 (av01) to be excluded during import without warning. This is inconsistent with the fmp4 module (same codebase), which fully supports these codecs. If this limitation is intentional, add logging or documentation to clarify why HLS streams with non-H.264 codecs are unsupported.

Comment on lines +504 to +524
// Deduplicate by resolution, keeping the lowest-bandwidth variant for each size.
let mut by_resolution: HashMap<Option<Resolution>, &VariantStream> = HashMap::new();

for variant in family_variants {
let key = variant.resolution;
let bandwidth = variant.average_bandwidth.unwrap_or(variant.bandwidth);

match by_resolution.entry(key) {
Entry::Vacant(entry) => {
entry.insert(variant);
}
Entry::Occupied(mut entry) => {
let existing = entry.get();
let existing_bw = existing.average_bandwidth.unwrap_or(existing.bandwidth);
if bandwidth < existing_bw {
entry.insert(variant);
}
}
}
}

by_resolution.values().cloned().collect()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Variants without resolution share the same HashMap key.

Using Option<Resolution> as a key means all variants with resolution: None will collide, keeping only one. If the master playlist has multiple variants without explicit resolution (common for audio-only or some encoders), all but one will be lost.

🔎 Suggested consideration

Consider using a composite key that includes bandwidth or URI as a fallback when resolution is None, or handle the None case separately.

Comment on lines +14 to +15
//! - **fMP4 Import**: Convert a fMP4 file into a hang broadcast.
//! - **HLS Import**: Reusable HLS/fMP4 ingest logic via [`hls`].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# First, locate lib.rs and inspect its structure
echo "=== Inspecting rs/hang/src/lib.rs ==="
wc -l rs/hang/src/lib.rs

echo ""
echo "=== First 40 lines of lib.rs ==="
head -40 rs/hang/src/lib.rs

echo ""
echo "=== Checking for module declarations ==="
rg -n "pub mod|pub use" rs/hang/src/lib.rs | head -20

Repository: moq-dev/moq

Length of output: 1242


🏁 Script executed:

#!/bin/bash

echo "=== Files in rs/hang/src/import/ ==="
find rs/hang/src/import -type f -name "*.rs"

echo ""
echo "=== Checking import/mod.rs exports ==="
cat -n rs/hang/src/import/mod.rs 2>/dev/null || echo "Not found at that path"

Repository: moq-dev/moq

Length of output: 508


🏁 Script executed:

#!/bin/bash

# Check what's in hls module/file
echo "=== Checking hls.rs or hls/mod.rs ==="
if [ -f rs/hang/src/import/hls.rs ]; then
  echo "Found hls.rs"
  wc -l rs/hang/src/import/hls.rs
  head -30 rs/hang/src/import/hls.rs
elif [ -d rs/hang/src/import/hls ]; then
  echo "Found hls directory"
  cat -n rs/hang/src/import/hls/mod.rs | head -30
fi

Repository: moq-dev/moq

Length of output: 1111


🏁 Script executed:

#!/bin/bash

# Check if hls module defines a public struct/type Hls
echo "=== Searching for Hls types in import module ==="
rg -n "pub (struct|enum|type) Hls" rs/hang/src/import/

echo ""
echo "=== Checking how import module is re-exported in lib.rs ==="
rg -n "mod import|pub.*import" rs/hang/src/lib.rs

Repository: moq-dev/moq

Length of output: 281


🏁 Script executed:

#!/bin/bash

# Check the lib.rs file structure
echo "=== lib.rs structure (first 50 lines) ==="
head -50 rs/hang/src/lib.rs

echo ""
echo "=== Checking for 'import' module declaration/use in lib.rs ==="
rg -n "mod import|pub use.*import|pub.*import" rs/hang/src/lib.rs

Repository: moq-dev/moq

Length of output: 1090


🏁 Script executed:

#!/bin/bash

# Check if import is a module and how it's organized
echo "=== Check import module files ==="
find rs/hang/src/import -type f -name "*.rs" 2>/dev/null | head -20

echo ""
echo "=== Check import/mod.rs ==="
if [ -f rs/hang/src/import/mod.rs ]; then
  wc -l rs/hang/src/import/mod.rs
  cat -n rs/hang/src/import/mod.rs | head -40
fi

Repository: moq-dev/moq

Length of output: 523


🏁 Script executed:

#!/bin/bash
cd /tmp
# First, let's see the directory structure
ls -la rs/hang/src/ 2>/dev/null || echo "Checking file structure"

# Look for lib.rs
if [ -f rs/hang/src/lib.rs ]; then
  echo "=== rs/hang/src/lib.rs exists ==="
  wc -l rs/hang/src/lib.rs
  head -40 rs/hang/src/lib.rs
fi

Repository: moq-dev/moq

Length of output: 78


🏁 Script executed:

#!/bin/bash

# Check current working directory and list files
pwd
echo ""
echo "=== Directory structure ==="
find . -name "lib.rs" -path "*/hang/src/*" 2>/dev/null

echo ""
echo "=== Looking for import module ==="
find . -path "*hang/src/import*" -type f 2>/dev/null | head -10

Repository: moq-dev/moq

Length of output: 332


🏁 Script executed:

#!/bin/bash

# Find lib.rs in the hang crate
find . -type f -name "lib.rs" | grep hang | head -5

# Find import related files
find . -type d -name "import" | grep hang | head -5

Repository: moq-dev/moq

Length of output: 96


🏁 Script executed:

#!/bin/bash

# Check files in current directory
ls -la | head -20

# Locate the hang crate lib.rs
if [ -d "rs/hang" ]; then
  echo "=== Found rs/hang directory ==="
  head -50 rs/hang/src/lib.rs
elif [ -d "hang" ]; then
  echo "=== Found hang directory ==="
  head -50 hang/src/lib.rs
else
  echo "=== Searching for lib.rs files ==="
  find . -name "lib.rs" -type f 2>/dev/null | head -5
fi

Repository: moq-dev/moq

Length of output: 2182


🏁 Script executed:

#!/bin/bash
# List all Rust source files in the repository to understand structure
git ls-files '*.rs' | grep -E "(lib\.rs|hang)" | head -20

Repository: moq-dev/moq

Length of output: 657


🏁 Script executed:

#!/bin/bash
# Check repository files
git ls-files | grep -E "hang.*lib\.rs|hang.*import"

Repository: moq-dev/moq

Length of output: 235


🏁 Script executed:

#!/bin/bash
# Find the actual location
find . -type f -name "lib.rs" -path "*hang*" 2>/dev/null
find . -type f -name "mod.rs" -path "*hang*import*" 2>/dev/null

Repository: moq-dev/moq

Length of output: 103


🏁 Script executed:

#!/bin/bash

# Get current working directory
echo "Current directory: $(pwd)"

# List git files containing hang and lib.rs
git ls-files | grep hang | grep lib.rs

# List git files with import in the hang directory
git ls-files | grep hang | grep import

# Get the actual content of lib.rs
echo ""
echo "=== Content of lib.rs (if it exists) ==="
git show HEAD:rs/hang/src/lib.rs 2>/dev/null | head -50

Repository: moq-dev/moq

Length of output: 1226


Use [import::Hls] or [import] for the documentation link.

The [hls] link cannot resolve in rustdoc because the hls module is private within import and not directly in scope at lib.rs. Link to the public Hls struct via import::Hls, or reference the import module directly.

🤖 Prompt for AI Agents
In rs/hang/src/lib.rs around lines 14 to 15, the rustdoc link [`hls`] is
unresolved because the hls module is private inside import; update the
documentation to reference a public item instead by replacing [`hls`] with
either [`import::Hls`] to link the public Hls struct or with [`import`] to link
the import module so the doc link resolves correctly.

@kixelated kixelated force-pushed the feat/hls-ingest-module branch from d951e4c to fcfa2ec Compare December 19, 2025 03:59
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (6)
rs/hang/src/import/hls.rs (6)

1-6: Documentation references non-existent Fetcher trait.

The module doc mentions "callers provide an implementation of [Fetcher]" but the actual implementation uses reqwest::Client directly via HlsConfig.client. Update the documentation to reflect the actual design.


118-123: unwrap() on client build can panic if TLS fails.

Client::builder().build() can fail if the TLS backend fails to initialize (e.g., missing system certificates). Propagate the error instead of panicking.


174-182: Track state may be lost on error during iteration.

If consume_segments fails mid-iteration, processed tracks are pushed back to self.video, but remaining tracks from the iterator are lost. Use indexed iteration instead.


393-398: HTTP requests have no timeout configured.

If the remote server is slow or unresponsive, requests will hang indefinitely. Configure a timeout on the client or per-request.


459-466: codec_family only recognizes H.264, silently filtering other codecs.

The function returns None for HEVC (hvc1/hev1), VP9 (vp09), and AV1 (av01), causing variants with these codecs to be excluded during import without warning.


502-524: Variants without resolution share the same HashMap key.

Using Option<Resolution> as a key means all variants with resolution: None will collide, keeping only one. If the master playlist has multiple variants without explicit resolution, all but one will be lost.

🧹 Nitpick comments (1)
rs/hang/src/import/hls.rs (1)

534-555: Consider expanding test coverage (optional).

The current tests verify basic initialization. Consider adding tests for:

  • parse_playlist with various URL formats and file paths
  • select_variants and select_audio with sample master playlists
  • resolve_uri with relative and absolute URIs
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d951e4c and fcfa2ec.

📒 Files selected for processing (1)
  • rs/hang/src/import/hls.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/hang/src/import/hls.rs
🧬 Code graph analysis (1)
rs/hang/src/import/hls.rs (2)
rs/hang/src/import/decoder.rs (2)
  • new (37-45)
  • is_initialized (114-120)
rs/hang/src/import/avc3.rs (4)
  • new (32-40)
  • new (290-292)
  • init (42-100)
  • is_initialized (239-241)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (1)
rs/hang/src/import/hls.rs (1)

26-58: LGTM: Configuration and URL parsing logic.

The configuration struct and URL parsing method correctly handle both HTTP(S) URLs and file paths, with proper error context.

Comment on lines +314 to +320
// Skip segments we've already seen
let skip = track.next_sequence.unwrap_or(0).saturating_sub(playlist.media_sequence) as usize;
let base_seq = playlist.media_sequence + skip as u64;
for (i, segment) in playlist.segments[skip..].iter().enumerate() {
self.push_segment(kind, track, segment, base_seq + i as u64).await?;
}
let consumed = playlist.segments.len() - skip;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential slice panic if skip exceeds segment count.

If next_sequence is ahead of the current playlist window (segments rotated out), skip can exceed playlist.segments.len(), causing a panic at line 317.

🔎 Proposed fix
 let skip = track.next_sequence.unwrap_or(0).saturating_sub(playlist.media_sequence) as usize;
+let skip = skip.min(playlist.segments.len());
 let base_seq = playlist.media_sequence + skip as u64;
 for (i, segment) in playlist.segments[skip..].iter().enumerate() {
🤖 Prompt for AI Agents
In rs/hang/src/import/hls.rs around lines 314 to 320, computing skip as
track.next_sequence... can produce a value larger than playlist.segments.len(),
causing a panic when slicing playlist.segments[skip..]; guard against that by
checking if skip >= playlist.segments.len() and handling it (either set skip =
playlist.segments.len() to result in no iteration, or return/continue early),
compute base_seq using the clamped skip, iterate only over the valid slice, and
compute consumed accordingly (e.g., consumed =
playlist.segments.len().saturating_sub(skip)). Ensure types match (usize/u64
conversions) and avoid panics from out-of-bounds slicing.

Comment on lines +388 to +399
async fn fetch_bytes(&self, url: Url) -> anyhow::Result<Bytes> {
if url.scheme() == "file" {
let path = url.to_file_path().ok().context("invalid file URL")?;
let bytes = fs::read(&path).await.context("failed to read file")?;
Ok(Bytes::from(bytes))
} else {
let response = self.client.get(url).send().await?;
let response = response.error_for_status()?;
let bytes = response.bytes().await.context("failed to read response body")?;
Ok(bytes)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

No size limits on file or HTTP reads could cause OOM.

Both fs::read() (line 391) and response.bytes() (line 396) load entire content into memory without size limits. A malicious or misconfigured source with huge files/responses could exhaust memory.

🔎 Suggested approach

Consider adding a maximum fetch size (e.g., 100 MB for video segments, 1 MB for playlists):

const MAX_SEGMENT_SIZE: u64 = 100 * 1024 * 1024; // 100 MB
const MAX_PLAYLIST_SIZE: u64 = 1024 * 1024; // 1 MB

async fn fetch_bytes(&self, url: Url) -> anyhow::Result<Bytes> {
    let bytes = if url.scheme() == "file" {
        let path = url.to_file_path().ok().context("invalid file URL")?;
        let metadata = fs::metadata(&path).await?;
        anyhow::ensure!(
            metadata.len() <= MAX_SEGMENT_SIZE,
            "file too large: {} bytes", metadata.len()
        );
        fs::read(&path).await.context("failed to read file")?
    } else {
        let response = self.client.get(url).send().await?;
        let response = response.error_for_status()?;
        
        if let Some(len) = response.content_length() {
            anyhow::ensure!(len <= MAX_SEGMENT_SIZE, "response too large: {} bytes", len);
        }
        
        response.bytes().await.context("failed to read response body")?
    };
    Ok(Bytes::from(bytes))
}

You may want different limits for playlists vs. segments, which would require passing the expected content type to this method.

🤖 Prompt for AI Agents
In rs/hang/src/import/hls.rs around lines 388 to 399, both fs::read() and
response.bytes() load the entire resource into memory without bounds; add
explicit size checks and bounded reads to prevent OOM. Define constants for max
sizes (e.g. MAX_SEGMENT_SIZE, MAX_PLAYLIST_SIZE), and for file URLs check
fs::metadata().len() and return an error if it exceeds the appropriate limit
before reading; for HTTP, inspect response.content_length() and reject if it
exceeds the limit, and if Content-Length is missing, read the body as a stream
and accumulate up to the limit (fail if the stream exceeds the limit) rather
than calling bytes() unbounded. Ensure error messages include the observed size
and that callers can indicate whether they expect a playlist or segment to apply
the correct limit.

@kixelated kixelated merged commit c7fa332 into main Dec 19, 2025
1 check passed
@kixelated kixelated deleted the feat/hls-ingest-module branch December 19, 2025 04:13
@moq-bot moq-bot bot mentioned this pull request Dec 19, 2025
@kixelated kixelated mentioned this pull request Dec 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants