Skip to content

Conversation

@kixelated
Copy link
Collaborator

No description provided.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 15, 2025

Walkthrough

Renames moq-lite wrapper type Produce to Pair and updates all produce() return sites and type aliases. Replaces multiple HashMap fields with BTreeMap for deterministic ordering. Substantially restructures rs/libmoq: adds modules (api, origin, publish, consume, session, state), introduces a C ABI surface in api.rs and new runtime/ffi helpers in ffi.rs, expands Error variants and return-code mappings, and replaces prior state/runtime patterns with new global state and lock APIs. Changes track API (latencymax_latency), adjusts broadcast/producer/consumer structs, updates CMake/pkg-config/linker packaging, and bumps the crate version.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description check ⚠️ Warning No pull request description was provided by the author, only a title. The description is completely empty and therefore does not describe any part of the changeset. Add a description explaining the purpose of the consume API changes, key architectural decisions, and how it integrates with existing libmoq components.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'libmoq consume API' directly references the main change of introducing a consume API subsystem to libmoq, aligning with the substantial additions across multiple files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch libmoq-consume

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/libmoq/src/state.rs (1)

166-193: Replace assertions with proper error handling.

Lines 175 and 190 use assert! to check if buffers were fully consumed. In FFI-exposed library code, panics can crash the host application. Invalid or malformed input could trigger these assertions.

Consider returning an error instead:

-		assert!(init.is_empty(), "buffer was not fully consumed");
+		if !temp.is_empty() {
+			return Err(Error::InitFailed(Arc::new(anyhow::anyhow!("buffer was not fully consumed"))));
+		}

Apply similar changes to line 190.

🧹 Nitpick comments (5)
rs/libmoq/build.rs (1)

27-34: Consider more precise target matching.

The current target detection uses .contains() which could match unintended targets. For example, target.contains("windows") would match both x86_64-pc-windows-msvc and x86_64-pc-windows-gnu, which may need different linker flags.

Consider using more precise matching:

-		let target = env::var("TARGET").unwrap();
-		let libs_private = if target.contains("apple") {
-			"-framework CoreFoundation -framework Security"
-		} else if target.contains("windows") {
-			"-lws2_32 -lbcrypt -luserenv -lntdll"
-		} else {
-			"-ldl -lm -lpthread"
-		};
+		let target = env::var("TARGET").unwrap();
+		let libs_private = if target.contains("apple-darwin") || target.contains("apple-ios") {
+			"-framework CoreFoundation -framework Security"
+		} else if target.contains("windows-msvc") {
+			"-lws2_32 -lbcrypt -luserenv -lntdll"
+		} else if target.contains("windows-gnu") {
+			"-lws2_32 -lbcrypt -luserenv"
+		} else {
+			"-ldl -lm -lpthread"
+		};
rs/moq-lite/src/model/broadcast.rs (1)

184-184: Minor typo in existing comment.

Line 184 has "abitrary" instead of "arbitrary". While not introduced by this PR, you may want to fix it.

-/// Subscribe to abitrary broadcast/tracks.
+/// Subscribe to arbitrary broadcast/tracks.
rs/hang/src/catalog/root.rs (1)

256-261: Consider avoiding the redundant clone.

The catalog is cloned into self.current and then a reference is returned. Since Catalog::from_slice already produces an owned value, you could store it directly without cloning.

-				let catalog = Catalog::from_slice(&frame?)?;
-				self.current = Some(catalog.clone());
+				self.current = Some(Catalog::from_slice(&frame?)?);
 				return Ok(self.current.as_ref());
rs/libmoq/src/state.rs (2)

429-444: Consider using a multi-threaded runtime for better concurrency.

The current implementation uses a single-threaded runtime (new_current_thread()) which means all async tasks across all sessions share one thread. For a library managing multiple concurrent sessions, tracks, and catalog operations, this could become a bottleneck.

Consider using a multi-threaded runtime instead:

-	let runtime = tokio::runtime::Builder::new_current_thread()
+	let runtime = tokio::runtime::Builder::new_multi_thread()
+		.worker_threads(4)
+		.thread_name("libmoq-worker")
 		.enable_all()
 		.build()
 		.unwrap();

If single-threaded is intentional for simplicity or specific threading requirements, consider documenting the rationale.


339-364: LGTM, but track the TODO for chunking API.

The frame handling logic appropriately optimizes for the single-chunk case while handling multi-chunk payloads. The TODO comment on line 343 correctly identifies that a chunking API would avoid the allocation requirement for multi-chunk frames.

The TODO indicates a future improvement needed. Would you like me to open a tracking issue for implementing the chunking API?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 41ac5f0 and 62eab2f.

📒 Files selected for processing (23)
  • rs/hang/examples/video.rs (1 hunks)
  • rs/hang/src/catalog/audio/mod.rs (2 hunks)
  • rs/hang/src/catalog/root.rs (7 hunks)
  • rs/hang/src/catalog/video/mod.rs (2 hunks)
  • rs/hang/src/model/broadcast.rs (3 hunks)
  • rs/hang/src/model/frame.rs (1 hunks)
  • rs/libmoq/CMakeLists.txt (2 hunks)
  • rs/libmoq/build.rs (1 hunks)
  • rs/libmoq/moq.pc.in (2 hunks)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/error.rs (2 hunks)
  • rs/libmoq/src/ffi.rs (2 hunks)
  • rs/libmoq/src/id.rs (1 hunks)
  • rs/libmoq/src/lib.rs (1 hunks)
  • rs/libmoq/src/state.rs (2 hunks)
  • rs/moq-lite/src/model/broadcast.rs (2 hunks)
  • rs/moq-lite/src/model/frame.rs (2 hunks)
  • rs/moq-lite/src/model/group.rs (1 hunks)
  • rs/moq-lite/src/model/mod.rs (1 hunks)
  • rs/moq-lite/src/model/origin.rs (2 hunks)
  • rs/moq-lite/src/model/pair.rs (1 hunks)
  • rs/moq-lite/src/model/track.rs (2 hunks)
  • rs/moq-relay/src/cluster.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/moq-lite/src/model/pair.rs
  • rs/moq-lite/src/model/mod.rs
  • rs/moq-lite/src/model/frame.rs
  • rs/hang/src/model/frame.rs
  • rs/libmoq/build.rs
  • rs/libmoq/src/id.rs
  • rs/libmoq/src/ffi.rs
  • rs/hang/src/catalog/video/mod.rs
  • rs/moq-lite/src/model/track.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/hang/src/catalog/audio/mod.rs
  • rs/moq-lite/src/model/group.rs
  • rs/hang/examples/video.rs
  • rs/libmoq/src/error.rs
  • rs/moq-relay/src/cluster.rs
  • rs/hang/src/catalog/root.rs
  • rs/hang/src/model/broadcast.rs
  • rs/libmoq/src/lib.rs
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/state.rs
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
Repo: moq-dev/moq PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T04:00:14.871Z
Learning: Core protocol implementation in the `moq` layer must be generic and not contain media-specific logic; CDN/relay does not know anything about media
🧬 Code graph analysis (6)
rs/moq-lite/src/model/frame.rs (4)
rs/moq-lite/src/model/broadcast.rs (2)
  • produce (34-38)
  • new (59-69)
rs/moq-lite/src/model/group.rs (2)
  • produce (28-32)
  • new (85-90)
rs/moq-lite/src/model/origin.rs (3)
  • produce (342-346)
  • new (19-21)
  • new (64-69)
rs/moq-lite/src/model/track.rs (1)
  • produce (40-44)
rs/moq-lite/src/model/track.rs (5)
rs/hang/src/catalog/root.rs (3)
  • produce (86-93)
  • new (151-156)
  • new (227-233)
rs/moq-lite/src/model/broadcast.rs (2)
  • produce (34-38)
  • new (59-69)
rs/moq-lite/src/model/frame.rs (2)
  • produce (17-21)
  • new (71-77)
rs/moq-lite/src/model/group.rs (2)
  • produce (28-32)
  • new (85-90)
rs/moq-lite/src/model/origin.rs (5)
  • produce (342-346)
  • new (19-21)
  • new (64-69)
  • new (114-120)
  • new (454-473)
rs/moq-lite/src/model/broadcast.rs (3)
rs/moq-lite/src/model/frame.rs (2)
  • produce (17-21)
  • new (71-77)
rs/moq-lite/src/model/group.rs (2)
  • produce (28-32)
  • new (85-90)
rs/moq-lite/src/model/origin.rs (5)
  • produce (342-346)
  • new (19-21)
  • new (64-69)
  • new (114-120)
  • new (454-473)
rs/hang/src/catalog/audio/mod.rs (1)
js/hang/src/catalog/audio.ts (1)
  • AudioConfig (55-55)
rs/libmoq/src/api.rs (1)
rs/libmoq/src/ffi.rs (13)
  • return_code (30-35)
  • parse_str (126-133)
  • code (38-38)
  • code (42-44)
  • code (48-50)
  • code (54-60)
  • code (64-69)
  • code (73-78)
  • code (82-87)
  • code (91-93)
  • code (97-99)
  • parse_url (113-121)
  • parse_slice (138-149)
rs/libmoq/src/state.rs (1)
rs/libmoq/src/id.rs (1)
  • default (72-74)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (37)
rs/hang/src/model/frame.rs (1)

5-6: Public BufList re‑export looks appropriate, be aware of API commitment

Exposing BufList via pub use is consistent with broadening the public model API and simplifies consumer imports. Just note this effectively commits your public surface to the concrete buf_list::BufList type for semver purposes (any future change would be a breaking change).

rs/libmoq/src/id.rs (1)

25-28: LGTM!

The new immutable get method mirrors the existing get_mut pattern correctly and provides symmetric API access.

rs/libmoq/src/ffi.rs (2)

7-28: LGTM!

The rename from Callback to OnStatus is clean and all associated implementations are updated consistently.


106-111: LGTM!

The parse_id_optional function correctly treats 0 as a sentinel for "no ID", following common FFI patterns for optional values.

rs/hang/src/model/broadcast.rs (4)

38-42: LGTM!

The Default implementation correctly delegates to new(), ensuring the catalog is always properly initialized.


52-56: LGTM!

The DerefMut implementation follows the standard pattern and complements the existing Deref trait.


70-98: LGTM!

The BroadcastConsumer struct is well-designed:

  • Automatically subscribes to the catalog track on construction
  • Provides convenient subscribe method for additional tracks
  • Implements appropriate traits (Deref, From) for ergonomic usage

58-68: LGTM!

The From trait implementations provide clean conversions between the wrapper and inner types.

rs/libmoq/CMakeLists.txt (3)

54-59: LGTM!

The Apple-specific framework linkage is correct and necessary for networking code on macOS. This change is consistent with the Libs.private additions in moq.pc.in.


68-75: Installation approach updated.

The change from target-based exports to direct file installation simplifies the packaging flow. This is appropriate for distributing pre-built Rust libraries.


77-97: LGTM!

The CMake package configuration setup follows modern best practices:

  • Uses configure_package_config_file with proper PATH_VARS
  • Generates version file with appropriate compatibility
  • Installs to standard location
rs/libmoq/moq.pc.in (2)

1-1: LGTM!

Using ${pcfiledir}/../.. makes the pkg-config file relocatable, which is a best practice for portable library installations.


10-10: No issue found.

The moq.pc.in file is a template, not a final pkg-config file. The build.rs script correctly generates platform-specific .pc files at build time by detecting the target platform and replacing the @LIBS_PRIVATE@ placeholder with the appropriate platform-specific libraries:

  • macOS: CoreFoundation and Security frameworks
  • Windows: ws2_32, bcrypt, userenv, ntdll
  • Linux: dl, m, pthread

Each platform receives only its own dependencies in the generated .pc file.

Likely an incorrect or invalid review comment.

rs/moq-lite/src/model/mod.rs (1)

5-5: LGTM!

The module rename from produce to pair is consistent with the struct rename and properly updates the re-export.

Also applies to: 12-12

rs/hang/examples/video.rs (1)

79-79: LGTM!

The switch to BTreeMap provides deterministic alphabetical ordering for renditions, which improves reproducibility and aligns with the broader catalog standardization across the PR.

rs/moq-lite/src/model/pair.rs (1)

7-10: LGTM!

The rename from Produce to Pair improves clarity while maintaining the same structure and behavior. The name "Pair" more accurately describes the struct's purpose as a producer-consumer tuple.

rs/hang/src/catalog/audio/mod.rs (1)

7-7: LGTM! BTreeMap provides deterministic ordering.

The switch from HashMap to BTreeMap ensures alphabetically-sorted keys, which aids testing and reproducibility. The O(log n) lookup cost is negligible for typical rendition counts.

Also applies to: 24-25

rs/moq-lite/src/model/track.rs (1)

17-17: LGTM! Consistent with the Pair refactor pattern.

The rename from Produce to Pair with the TrackPair type alias improves API clarity and aligns with the refactor applied across Frame, Group, Origin, and Broadcast.

Also applies to: 23-23, 40-43

rs/moq-lite/src/model/group.rs (1)

15-15: LGTM! Consistent refactor to Pair.

The changes align with the systematic refactor from Produce to Pair across the moq-lite model layer.

Also applies to: 19-19, 28-31

rs/hang/src/catalog/video/mod.rs (1)

13-13: LGTM! Consistent with audio catalog changes.

The switch to BTreeMap provides deterministic ordering for video renditions, matching the approach in the audio catalog.

Also applies to: 30-31

rs/moq-lite/src/model/origin.rs (1)

9-9: LGTM! Origin aligned with Pair refactor.

The changes follow the same pattern as other model types, with extensive test coverage validating the new API.

Also applies to: 13-13, 342-345

rs/moq-lite/src/model/frame.rs (1)

6-8: LGTM! Frame refactored to Pair.

Consistent with the systematic refactor across all model types.

Also applies to: 17-20

rs/moq-lite/src/model/broadcast.rs (1)

10-10: LGTM! Broadcast refactored to Pair.

The changes complete the systematic refactor from Produce to Pair across all moq-lite model types.

Also applies to: 16-16, 34-37

rs/libmoq/src/error.rs (2)

53-65: LGTM! New error variants properly extend FFI error handling.

The new variants (Offline, Hang, NoIndex, NulError) appropriately cover the expanded API surface. The #[from] derives for hang::Error and std::ffi::NulError enable ergonomic error propagation.


92-95: Return codes are consistent.

The new codes (-17 through -20) continue the existing numbering scheme and are properly mapped.

rs/moq-relay/src/cluster.rs (1)

60-70: LGTM! Consistent migration from Produce to Pair.

The type change from moq_lite::Produce to moq_lite::Pair is a straightforward rename that aligns with the broader refactoring across the repository. Access patterns (.producer, .consumer) remain unchanged.

rs/libmoq/src/lib.rs (1)

1-9: LGTM! Clean module organization.

The introduction of the api module with a wildcard re-export provides a clear separation between the public C FFI surface and internal implementation details. Keeping state private is appropriate.

rs/hang/src/catalog/root.rs (2)

86-93: LGTM! Consistent migration to Pair type.

The return type change aligns with the repository-wide refactor from Produce to Pair.


235-237: Good addition of state tracking.

The current() accessor provides convenient access to the last-received catalog without re-polling.

rs/libmoq/src/api.rs (6)

63-75: LGTM! Logging initialization with sensible defaults.

The empty string check provides a reasonable default behavior, and the Level::from_str handles the parsing with proper error propagation.


92-106: LGTM! Session connection with optional origins.

The use of ffi::parse_id_optional for origin_publish and origin_consume allows flexible session setup where either or both origins can be omitted.


35-51: LGTM! Frame and Announced structs are properly C-compatible.

These structs use only primitive types and raw pointers without Option wrappers, making them safe for FFI.


127-186: LGTM! Origin management functions follow consistent patterns.

The origin lifecycle functions properly validate IDs, parse strings safely, and delegate to the State singleton.


188-250: LGTM! Broadcast and media publishing API.

The publish functions correctly handle pointer validation and delegate to State methods.


357-371: LGTM! Frame and consume cleanup functions.

These cleanup functions follow the established pattern with proper ID validation.

rs/libmoq/src/state.rs (2)

1-32: LGTM!

The State struct organization with NonZeroSlab for entity management and oneshot channels for task coordination is well-structured.


400-427: Excellent StateGuard pattern.

The StateGuard pattern cleanly ensures the Tokio runtime context is active whenever State is accessed, which is essential for the tokio::spawn calls made from various State methods. The Deref/DerefMut implementation provides ergonomic access.

Comment on lines 269 to 277
#[no_mangle]
pub unsafe extern "C" fn moq_consume_catalog_video(catalog: i32, index: i32, dst: *mut VideoTrack) -> i32 {
ffi::return_code(move || {
let catalog = ffi::parse_id(catalog)?;
let index = index as usize;
let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
State::lock().consume_catalog_video(catalog, index, dst)
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Negative index values silently wrap to large usize.

Casting i32 to usize when index is negative will produce an unexpectedly large value, potentially causing out-of-bounds access or incorrect behavior in State::lock().consume_catalog_video.

Consider validating the index before conversion:

 	ffi::return_code(move || {
 		let catalog = ffi::parse_id(catalog)?;
-		let index = index as usize;
+		if index < 0 {
+			return Err(Error::InvalidId);
+		}
+		let index = index as usize;
 		let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
 		State::lock().consume_catalog_video(catalog, index, dst)
 	})

This same pattern applies to moq_consume_catalog_audio (line 283), moq_consume_video_track (line 307), moq_consume_audio_track (line 332), and moq_consume_frame_chunk (line 351).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[no_mangle]
pub unsafe extern "C" fn moq_consume_catalog_video(catalog: i32, index: i32, dst: *mut VideoTrack) -> i32 {
ffi::return_code(move || {
let catalog = ffi::parse_id(catalog)?;
let index = index as usize;
let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
State::lock().consume_catalog_video(catalog, index, dst)
})
}
#[no_mangle]
pub unsafe extern "C" fn moq_consume_catalog_video(catalog: i32, index: i32, dst: *mut VideoTrack) -> i32 {
ffi::return_code(move || {
let catalog = ffi::parse_id(catalog)?;
if index < 0 {
return Err(Error::InvalidId);
}
let index = index as usize;
let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
State::lock().consume_catalog_video(catalog, index, dst)
})
}
🤖 Prompt for AI Agents
In rs/libmoq/src/api.rs around lines 269-277, the function casts an i32 index to
usize which allows negative values to wrap to huge usizes; validate the index
before conversion: check if index < 0 and return an appropriate error (e.g.
Error::InvalidArgument or similar) via the ffi::return_code path instead of
casting, otherwise safely cast to usize and proceed; make the same explicit
negative-index check and error-return change in the other functions mentioned
(moq_consume_catalog_audio at ~283, moq_consume_video_track at ~307,
moq_consume_audio_track at ~332, moq_consume_frame_chunk at ~351).

Comment on lines 114 to 116
let mut state = STATE.lock().unwrap();
let id = state.announced.insert((path.to_string(), broadcast.is_some()));
on_announce.call(id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Blocking mutex held in async context.

Lines 114, 228, and 341 use std::sync::Mutex (blocking) within async tasks. Holding a blocking mutex lock in an async context can block the entire executor thread, preventing other tasks from making progress. This can lead to deadlocks or poor performance in async code.

Consider one of these solutions:

Solution 1: Use tokio::sync::Mutex for the global STATE:

-static STATE: LazyLock<Mutex<State>> = LazyLock::new(|| Mutex::new(State::default()));
+static STATE: LazyLock<tokio::sync::Mutex<State>> = LazyLock::new(|| tokio::sync::Mutex::new(State::default()));

Then update lock sites to use .await:

let mut state = STATE.lock().await;

Solution 2: Minimize critical sections and use channels to send data back to sync context instead of locking from async tasks.

Also applies to: 228-230, 341-342

🤖 Prompt for AI Agents
In rs/libmoq/src/state.rs around lines 114, 228 and 341, the code currently
locks a std::sync::Mutex inside async tasks which can block the async executor;
replace the global STATE mutex type with tokio::sync::Mutex (or another
async-aware lock) and update all lock sites to use .await (e.g. let mut state =
STATE.lock().await), ensuring any initialization (lazy_static/static) uses the
async Mutex type and adjusting synchronous call sites to avoid awaiting in
non-async contexts (refactor those callers or move sync work into a small
critical section or a dedicated sync thread); alternatively, if switching to an
async mutex isn’t appropriate, minimize the locked region and send updates via
an mpsc channel to a single synchronous owner to mutate STATE instead of holding
a blocking Mutex inside async tasks.

Comment on lines 122 to 128
pub fn origin_announced_info(&self, announced: Id, dst: &mut Announced) -> Result<(), Error> {
let announced = self.announced.get(announced).ok_or(Error::NotFound)?;
dst.path = announced.0.as_str().as_ptr() as *const c_char;
dst.path_len = announced.0.len();
dst.active = announced.1;
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check the actual code in state.rs around the mentioned lines
cd rs/libmoq && wc -l src/state.rs

Repository: moq-dev/moq

Length of output: 71


🏁 Script executed:

# Read the relevant sections of the file
cd rs/libmoq && sed -n '120,130p' src/state.rs && echo "---" && sed -n '230,270p' src/state.rs

Repository: moq-dev/moq

Length of output: 2170


🏁 Script executed:

# Look for FFI headers that document the Announced type
find . -type f \( -name "*.h" -o -name "*.hpp" \) | xargs grep -l "Announced\|VideoTrack\|AudioTrack" 2>/dev/null | head -5

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check if there are inline tests in state.rs
cd rs/libmoq && grep -n "#\[test\]\|#\[cfg(test)\]\|mod tests" src/state.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Look for any _close functions referenced in the file
cd rs/libmoq && grep -n "_close\|close(" src/state.rs | head -10

Repository: moq-dev/moq

Length of output: 818


🏁 Script executed:

# Look for FFI/C interface files
find rs/libmoq -type f \( -name "*.h" -o -name "*.c" -o -name "*.rs" \) | xargs grep -l "Announced\|VideoTrack\|AudioTrack" 2>/dev/null | head -10

Repository: moq-dev/moq

Length of output: 98


🏁 Script executed:

# Check if there's a lib.rs with public API exports
cd rs/libmoq && head -80 src/lib.rs | grep -A 50 "pub\|use"

Repository: moq-dev/moq

Length of output: 103


🏁 Script executed:

# Look for any FFI-related documentation or README
find rs/libmoq -type f \( -name "*.md" -o -name "README*" \) -exec cat {} \;

Repository: moq-dev/moq

Length of output: 2297


🏁 Script executed:

# Check the structure around the _close functions to understand lifetime semantics
cd rs/libmoq && sed -n '130,140p' src/state.rs

Repository: moq-dev/moq

Length of output: 533


🏁 Script executed:

# Check api.rs for FFI documentation
cd rs/libmoq && head -100 src/api.rs

Repository: moq-dev/moq

Length of output: 2589


🏁 Script executed:

# Search for any doc comments about pointer lifetime in state.rs
cd rs/libmoq && grep -B 5 -A 2 "origin_announced_info\|consume_catalog_video\|consume_catalog_audio" src/state.rs | head -40

Repository: moq-dev/moq

Length of output: 880


🏁 Script executed:

# Check if there are any safety comments or documentation about FFI pointers
cd rs/libmoq && rg "SAFETY|safety|lifetime|pointer" -A 2 -B 2 src/state.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check if there are tests for state.rs elsewhere
find rs/libmoq -type f -name "*.rs" | xargs grep -l "state::" | xargs grep -l "#\[test\]"

Repository: moq-dev/moq

Length of output: 39


🏁 Script executed:

# Look at the actual lib.rs to see if there are FFI wrapper functions that document lifetime
cd rs/libmoq && grep -A 15 "origin_announced_info\|consume_catalog_video" src/lib.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check if there's any documentation in the README about pointer lifetime
cd rs/libmoq && cat README.md | grep -i "lifetime\|pointer\|safety" -A 3

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Check the entire api.rs file to see all documentation
cd rs/libmoq && cat src/api.rs

Repository: moq-dev/moq

Length of output: 10527


🏁 Script executed:

# Check if there's a specific FFI wrapper in lib.rs for these functions
cd rs/libmoq && cat src/lib.rs | head -200

Repository: moq-dev/moq

Length of output: 152


🏁 Script executed:

# Check if there are any tests for state.rs in a tests directory
find rs/libmoq -type f -name "*.rs" -path "*/tests/*" | xargs grep -l "state\|State" 2>/dev/null

Repository: moq-dev/moq

Length of output: 39


🏁 Script executed:

# Check the actual content around origin_announced_close to understand data removal
cd rs/libmoq && sed -n '130,140p' src/state.rs

Repository: moq-dev/moq

Length of output: 533


🏁 Script executed:

# Look at the slab structure to confirm data is removed on close
cd rs/libmoq && grep -B 5 -A 10 "announced_task.remove\|consume_catalog.remove" src/state.rs | head -30

Repository: moq-dev/moq

Length of output: 823


Add FFI safety documentation for pointer lifetimes and add inline tests to state.rs.

The FFI functions moq_origin_announced_info, moq_consume_catalog_video, and moq_consume_catalog_audio (lines 122-128, 236-268) return raw pointers to strings owned by State slabs. When the corresponding _close function removes the ID from the slab (lines 130, 270), those pointers become invalid. The FFI wrappers in api.rs must include a /// # Safety section documenting that callers must not use returned pointers after calling the corresponding close function.

Additionally, state.rs lacks inline test modules, which violates the project guideline that Rust tests should be integrated within source files.

Comment on lines 236 to 251
pub fn consume_catalog_video(&mut self, catalog: Id, index: usize, dst: &mut VideoTrack) -> Result<(), Error> {
let catalog = self.consume_catalog.get(catalog).ok_or(Error::NotFound)?;
let video = catalog.video.as_ref().ok_or(Error::NoIndex)?;
let (rendition, config) = video.renditions.iter().nth(index).ok_or(Error::NoIndex)?;

dst.name = rendition.as_str().as_ptr() as *const c_char;
dst.name_len = rendition.len();
dst.codec = rendition.as_str().as_ptr() as *const c_char;
dst.codec_len = rendition.len();
dst.description = config.description.as_ref().map(|desc| desc.as_ptr() as *const u8);
dst.description_len = config.description.as_ref().map(|desc| desc.len()).unwrap_or(0);
dst.coded_width = config.coded_width;
dst.coded_height = config.coded_height;

Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Search for VideoConfig struct definition
ast-grep --pattern $'struct VideoConfig {
  $$$
}'

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Also search for the video.renditions field to understand the structure
rg "renditions" rs/libmoq/src/ -B 2 -A 2 --type rust

Repository: moq-dev/moq

Length of output: 1621


🏁 Script executed:

# Search for VideoConfig struct in all files
rg "struct VideoConfig" --type rust

Repository: moq-dev/moq

Length of output: 112


🏁 Script executed:

# Search for the video field and its type in Catalog struct
rg "video:" rs/libmoq/src/ --type rust -B 3 -A 1 | head -30

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Look at imports and types at top of state.rs
head -50 rs/libmoq/src/state.rs

Repository: moq-dev/moq

Length of output: 1590


🏁 Script executed:

# Read the VideoConfig struct definition
cat rs/hang/src/catalog/video/mod.rs

Repository: moq-dev/moq

Length of output: 3583


🏁 Script executed:

# Find VideoCodec definition
rg "pub enum VideoCodec|pub struct VideoCodec" --type rust

Repository: moq-dev/moq

Length of output: 111


🏁 Script executed:

# Check the video codec module
cat rs/hang/src/catalog/video/codec.rs

Repository: moq-dev/moq

Length of output: 1567


Fix codec field assignment to use config.codec instead of rendition name.

Lines 243-244 incorrectly set dst.codec and dst.codec_len to the rendition name string. Since rendition is the BTreeMap key (track identifier like "high" or "low") and config contains the actual codec information, use config.codec.to_string() for the codec field instead.

🤖 Prompt for AI Agents
In rs/libmoq/src/state.rs around lines 236 to 251, the code incorrectly assigns
dst.codec and dst.codec_len from the rendition name; change those two
assignments to use the actual codec from config instead (e.g., set dst.codec to
config.codec.as_str().as_ptr() as *const c_char and dst.codec_len to
config.codec.len(), or if config.codec is owned call config.codec.as_ptr()/len
or config.codec.to_string().as_ptr()/len appropriately), leaving the other
fields (name, description, sizes) unchanged.

@davegullo
Copy link
Contributor

davegullo commented Dec 15, 2025

NOTE: I'm building this experimental plugin https://github.com/davegullo/obs-hang
against this branch, and managed to get it working with these changes:

obs-hang.diff.txt

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/libmoq/src/state.rs (1)

179-183: Assertion checks wrong variable.

The assertion on line 183 checks init.is_empty(), but init is the original immutable slice. The mutable reference temp is what gets advanced by decoder.initialize(). This assertion will always fail if init is non-empty.

 		let mut temp = init;
 		decoder
 			.initialize(&mut temp)
 			.map_err(|err| Error::InitFailed(Arc::new(err)))?;
-		assert!(init.is_empty(), "buffer was not fully consumed");
+		assert!(temp.is_empty(), "buffer was not fully consumed");
♻️ Duplicate comments (6)
rs/libmoq/src/state.rs (5)

117-125: Blocking mutex in async context - minimized but still present.

The STATE.lock().unwrap() on line 120 is a blocking lock inside an async task. While the critical section is intentionally kept short (insert only) and the lock is released before calling the callback, this pattern can still block the executor thread momentarily.

The current approach is acceptable for short operations, but consider using tokio::sync::Mutex if contention increases.


130-136: Document pointer lifetime constraints.

The returned pointers (dst.path) become invalid after origin_announced_close removes the entry from the slab. Consider adding a doc comment clarifying this lifetime constraint for maintainers.


258-261: Codec field incorrectly set to rendition name.

Lines 260-261 set dst.codec and dst.codec_len from rendition, which is the track identifier (e.g., "high", "low"), not the actual codec. The codec information should come from config.codec.

 		dst.name = rendition.as_str().as_ptr() as *const c_char;
 		dst.name_len = rendition.len();
-		dst.codec = rendition.as_str().as_ptr() as *const c_char;
-		dst.codec_len = rendition.len();
+		let codec_str = config.codec.to_string();
+		// Note: This creates a temporary that will be dropped.
+		// Consider storing the codec string in the catalog or returning it differently.

The fix requires storing the codec string to ensure pointer validity. Consider caching the stringified codec in the catalog entry or using a different approach for FFI.


278-281: Same codec bug in audio track info.

Lines 280-281 incorrectly set the codec from the rendition name instead of config.codec. Same issue as consume_catalog_video.


369-369: Audio track task inserted into wrong slab.

Line 369 inserts the task into consume_video_task instead of consume_audio_task. This will cause consume_audio_track_close to fail to find and cancel the task.

-		let id = self.consume_video_task.insert(channel.0);
+		let id = self.consume_audio_task.insert(channel.0);
rs/libmoq/src/api.rs (1)

276-293: Negative index values silently wrap to large usize.

Lines 279, 289, 313, 338, and 357 cast i32 to usize without validating non-negative. Negative indices wrap to very large values, which will likely return NoIndex but the behavior is confusing.

Consider adding validation:

 	ffi::return_code(move || {
 		let catalog = ffi::parse_id(catalog)?;
+		if index < 0 {
+			return Err(Error::NoIndex);
+		}
 		let index = index as usize;
 		let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
 		State::lock().consume_catalog_video(catalog, index, dst)
 	})

Apply the same pattern to moq_consume_catalog_audio, moq_consume_video_track, moq_consume_audio_track, and moq_consume_frame_chunk.

🧹 Nitpick comments (3)
rs/libmoq/src/state.rs (1)

436-436: Consider adding inline tests.

Per coding guidelines, Rust crates should integrate tests within source files using inline test modules. This file lacks a #[cfg(test)] module.

rs/libmoq/src/api.rs (2)

137-184: Add safety documentation to unsafe FFI functions.

Functions moq_origin_publish, moq_origin_announced, and moq_origin_consume are marked unsafe but lack # Safety documentation explaining the caller's obligations (valid null-terminated strings, valid function pointers, valid user_data).


303-343: Add safety documentation.

moq_consume_video_track and moq_consume_audio_track are unsafe but lack # Safety sections documenting requirements for on_frame and user_data parameters.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 62eab2f and 6ed5895.

📒 Files selected for processing (2)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/state.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/state.rs
  • rs/libmoq/src/api.rs
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
Repo: moq-dev/moq PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T04:00:14.871Z
Learning: Core protocol implementation in the `moq` layer must be generic and not contain media-specific logic; CDN/relay does not know anything about media
🧬 Code graph analysis (2)
rs/libmoq/src/state.rs (2)
rs/moq-lite/src/model/origin.rs (5)
  • publish (144-168)
  • consume (170-173)
  • consume (401-403)
  • consume (494-496)
  • default (329-333)
rs/moq-lite/src/model/broadcast.rs (5)
  • consume (98-104)
  • closed (236-242)
  • closed (328-356)
  • default (53-55)
  • new (59-69)
rs/libmoq/src/api.rs (2)
rs/libmoq/src/ffi.rs (16)
  • return_code (30-35)
  • parse_str (126-133)
  • code (38-38)
  • code (42-44)
  • code (48-50)
  • code (54-60)
  • code (64-69)
  • code (73-78)
  • code (82-87)
  • code (91-93)
  • code (97-99)
  • parse_url (113-121)
  • parse_id_optional (106-111)
  • new (13-18)
  • parse_id (102-104)
  • parse_slice (138-149)
rs/libmoq/src/error.rs (1)
  • code (74-97)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (6)
rs/libmoq/src/state.rs (2)

39-69: LGTM!

The session connection pattern with oneshot channel for cancellation and callback-based status reporting is well-structured.


467-484: LGTM - dedicated runtime thread for FFI.

The pattern of spawning a dedicated thread for the tokio runtime with block_on(pending()) is appropriate for FFI scenarios where calls may come from arbitrary threads. The std::sync::Mutex choice is intentional for synchronous FFI access.

rs/libmoq/src/api.rs (4)

11-57: LGTM - FFI-safe struct definitions.

The structs correctly use raw pointers with documented null/0 semantics for optional fields. The previous Option FFI issues have been addressed.


59-81: LGTM!

Proper safety documentation and error handling for the log level initialization.


83-125: LGTM!

Session API is well-documented with clear safety requirements and proper handling of optional parameters via parse_id_optional.


363-377: LGTM!

Frame close and consume close functions follow the established pattern correctly.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
rs/hang/src/model/track.rs (1)

121-121: Update the documentation to reference the renamed method.

The documentation references set_latency, but this method has been renamed to set_max_buffer on line 232. This will result in a broken documentation link.

Apply this diff to fix the documentation:

-/// Use [`set_latency`](Self::set_latency) to configure the maximum acceptable delay.
+/// Use [`set_max_buffer`](Self::set_max_buffer) to configure the maximum acceptable delay.
rs/libmoq/src/state.rs (2)

193-197: Check the correct buffer after consumption.

Line 197 checks init.is_empty() but the decoder consumed bytes from temp. Since temp is the mutable slice passed to initialize, the assertion should verify that temp is empty.

Apply this diff:

 	let mut temp = init;
 	decoder
 		.initialize(&mut temp)
 		.map_err(|err| Error::InitFailed(Arc::new(err)))?;
-	assert!(init.is_empty(), "buffer was not fully consumed");
+	assert!(temp.is_empty(), "buffer was not fully consumed");

1-492: Add inline test module per coding guidelines.

The file lacks inline tests, which violates the project guideline that Rust crates should integrate tests within source files using inline test modules.

Add a test module at the end of the file:

+#[cfg(test)]
+mod tests {
+	use super::*;
+
+	#[test]
+	fn test_state_lock() {
+		let state = State::lock();
+		// Verify state can be locked and accessed
+		assert!(state.origin.len() == 0);
+	}
+
+	// Add more tests for state operations
+}

As per coding guidelines, tests should be integrated within source files.

♻️ Duplicate comments (3)
rs/libmoq/src/state.rs (3)

126-142: Blocking mutex held in async context causes executor stalls.

Lines 131-138 lock std::sync::Mutex (blocking) inside the async function run_origin_announced. This blocks the entire executor thread, preventing other tasks from progressing and can cause deadlocks.

Apply the solution from the previous review: replace the global STATE with tokio::sync::Mutex:

-static STATE: LazyLock<Mutex<State>> = LazyLock::new(|| Mutex::new(State::default()));
+static STATE: LazyLock<tokio::sync::Mutex<State>> = LazyLock::new(|| tokio::sync::Mutex::new(State::default()));

Then update all lock sites to use .await:

-			let id = STATE.lock().unwrap().announced.insert((path.to_string(), broadcast.is_some()));
+			let id = STATE.lock().await.announced.insert((path.to_string(), broadcast.is_some()));

Also applies to: 245-249, 395-396

Based on learnings, this issue was previously flagged but not yet resolved.


258-286: Incorrect codec field assignment uses rendition name instead of actual codec.

Lines 266-267 assign dst.codec and dst.codec_len from the rendition variable, which is the BTreeMap key (track identifier like "high" or "low"). The actual codec information is in config.codec.

Apply this diff to use the actual codec:

 	dst.name = rendition.as_str().as_ptr() as *const c_char;
 	dst.name_len = rendition.len();
-	dst.codec = rendition.as_str().as_ptr() as *const c_char;
-	dst.codec_len = rendition.len();
+	let codec_str = config.codec.to_string();
+	dst.codec = codec_str.as_ptr() as *const c_char;
+	dst.codec_len = codec_str.len();

Note: You'll need to ensure the codec string lifetime is managed appropriately, either by storing it in the State slab or documenting that the caller must copy it immediately.

Based on learnings, this issue was previously flagged but not yet resolved.


288-307: Same codec bug in audio track handling.

Lines 295-296 have the same issue as the video track: using rendition (the track name) instead of config.codec for the codec field.

Apply the same fix as for consume_catalog_video:

 	dst.name = rendition.as_str().as_ptr() as *const c_char;
 	dst.name_len = rendition.len();
-	dst.codec = rendition.as_str().as_ptr() as *const c_char;
-	dst.codec_len = rendition.len();
+	let codec_str = config.codec.to_string();
+	dst.codec = codec_str.as_ptr() as *const c_char;
+	dst.codec_len = codec_str.len();

Based on learnings, this issue was previously flagged but not yet resolved.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6ed5895 and c5e4b24.

📒 Files selected for processing (9)
  • rs/hang/src/catalog/root.rs (5 hunks)
  • rs/hang/src/model/broadcast.rs (3 hunks)
  • rs/hang/src/model/track.rs (5 hunks)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/error.rs (2 hunks)
  • rs/libmoq/src/ffi.rs (2 hunks)
  • rs/libmoq/src/id.rs (3 hunks)
  • rs/libmoq/src/lib.rs (1 hunks)
  • rs/libmoq/src/state.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • rs/hang/src/catalog/root.rs
  • rs/libmoq/src/id.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/api.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/hang/src/model/track.rs
  • rs/libmoq/src/lib.rs
  • rs/hang/src/model/broadcast.rs
  • rs/libmoq/src/ffi.rs
  • rs/libmoq/src/state.rs
🧬 Code graph analysis (3)
rs/hang/src/model/track.rs (3)
rs/hang/src/catalog/root.rs (3)
  • consume (167-169)
  • new (151-156)
  • new (226-228)
rs/hang/src/model/group.rs (2)
  • new (37-44)
  • max_timestamp (107-109)
rs/hang/src/model/timestamp.rs (1)
  • max (69-71)
rs/libmoq/src/ffi.rs (2)
rs/libmoq/src/error.rs (1)
  • code (101-124)
rs/libmoq/src/id.rs (3)
  • try_from (51-53)
  • try_from (59-61)
  • try_from (73-75)
rs/libmoq/src/state.rs (5)
rs/moq-lite/src/model/origin.rs (5)
  • publish (144-168)
  • consume (170-173)
  • consume (401-403)
  • consume (494-496)
  • default (329-333)
rs/moq-lite/src/model/frame.rs (2)
  • consume (100-106)
  • new (71-77)
rs/moq-lite/src/model/broadcast.rs (4)
  • consume (98-104)
  • closed (236-242)
  • closed (328-356)
  • default (53-55)
rs/moq-lite/src/model/track.rs (2)
  • consume (128-134)
  • closed (195-200)
rs/libmoq/src/ffi.rs (2)
  • new (22-27)
  • parse_slice (155-166)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (8)
rs/hang/src/model/track.rs (3)

135-147: LGTM!

The field rename from latency to max_buffer and the updated constructor signature are consistent with the broader API changes in this file.


158-158: LGTM!

The updated usage of self.max_buffer in the latency calculation is correct and consistent with the field rename.


212-212: LGTM!

Good attention to detail updating the debug log message to reflect the renamed field.

rs/libmoq/src/lib.rs (1)

1-24: LGTM! Well-organized FFI crate structure.

The module organization and documentation are clear. The re-exported API surface provides a clean interface for C consumers.

rs/hang/src/model/broadcast.rs (1)

6-99: LGTM! Clean integration of broadcast consumer with catalog.

The new BroadcastConsumer struct properly wraps the underlying moq_lite::BroadcastConsumer and provides convenient access to the catalog. The Default implementation and conversion traits follow Rust idioms correctly.

rs/libmoq/src/ffi.rs (1)

1-166: LGTM! Robust FFI utilities with proper error handling.

The FFI helpers correctly handle null pointers, UTF-8 validation, and overflow cases. The ReturnCode trait provides a clean abstraction for converting Rust results to C-compatible error codes, and panic handling in return_code prevents undefined behavior at the FFI boundary.

rs/libmoq/src/state.rs (2)

376-400: Efficient frame chunk handling with smart buffer reuse.

The implementation correctly avoids allocating a contiguous buffer when the frame has a single chunk (lines 380-386), falling back to allocation only when necessary. This is a good performance optimization for the common case.


470-492: Well-designed runtime and state initialization.

The global runtime initialization correctly spawns a dedicated background thread and keeps it alive with a pending future. The StateGuard wrapper elegantly ensures the tokio context is entered whenever the state is locked, enabling async operations from FFI boundaries.

Comment on lines 94 to 96
pub fn consume(&self, max_buffer: std::time::Duration) -> TrackConsumer {
TrackConsumer::new(self.inner.consume(), max_buffer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

rg -nP --type=rust -C3 '\.consume\s*\(' .

Repository: moq-dev/moq

Length of output: 14894


Update call site in rs/hang/src/catalog/root.rs with the required max_buffer parameter.

The consume() method signature was changed to require a max_buffer: std::time::Duration parameter, but the call at line 168 in rs/hang/src/catalog/root.rs does not pass this argument:

pub fn consume(&self) -> CatalogConsumer {
    CatalogConsumer::new(self.track.consume())
}

This will fail to compile. The call must be updated to pass the required parameter.

🤖 Prompt for AI Agents
In rs/hang/src/catalog/root.rs around line 168, the call to self.track.consume()
must pass the new required max_buffer: std::time::Duration; update the public
API to accept and forward that Duration: change pub fn consume(&self) ->
CatalogConsumer to pub fn consume(&self, max_buffer: std::time::Duration) ->
CatalogConsumer and call CatalogConsumer::new(self.track.consume(max_buffer));
ensure any callers of Catalog::consume are updated accordingly to provide a
Duration.

Comment on lines 229 to 234
/// Set the maximum latency tolerance for this consumer.
///
/// Groups with timestamps older than `max_timestamp - latency` will be skipped.
pub fn set_latency(&mut self, max: std::time::Duration) {
self.latency = max;
/// Groups with timestamps older than `max_timestamp - max_buffer` will be skipped.
pub fn set_max_buffer(&mut self, max: std::time::Duration) {
self.max_buffer = max;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Update the documentation for consistent terminology.

The method has been renamed to set_max_buffer, but the first line of the documentation still says "maximum latency tolerance". For consistency with the method name and the rest of the codebase, this should be updated to use "buffer" terminology.

Apply this diff to improve consistency:

-	/// Set the maximum latency tolerance for this consumer.
+	/// Set the maximum buffer duration for this consumer.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Set the maximum latency tolerance for this consumer.
///
/// Groups with timestamps older than `max_timestamp - latency` will be skipped.
pub fn set_latency(&mut self, max: std::time::Duration) {
self.latency = max;
/// Groups with timestamps older than `max_timestamp - max_buffer` will be skipped.
pub fn set_max_buffer(&mut self, max: std::time::Duration) {
self.max_buffer = max;
}
/// Set the maximum buffer duration for this consumer.
///
/// Groups with timestamps older than `max_timestamp - max_buffer` will be skipped.
pub fn set_max_buffer(&mut self, max: std::time::Duration) {
self.max_buffer = max;
}
🤖 Prompt for AI Agents
In rs/hang/src/model/track.rs around lines 229 to 234, update the doc comment to
use "buffer" terminology instead of "maximum latency tolerance": change the
first line to something like "Set the maximum buffer for this consumer." and
adjust the second line to reference `max_timestamp - max_buffer` consistently
(e.g., "Groups with timestamps older than `max_timestamp - max_buffer` will be
skipped.") so the docs match the method name `set_max_buffer` and the rest of
the codebase.

@davegullo
Copy link
Contributor

🎉 validated https://github.com/davegullo/obs-hang
against the latest work!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
rs/libmoq/src/api.rs (1)

380-386: Negative index values silently wrap to large usize.

This issue was previously flagged and remains unaddressed. Casting negative i32 to usize produces unexpectedly large values (e.g., -1 becomes usize::MAX), potentially causing out-of-bounds access.

Add validation before each cast:

 pub unsafe extern "C" fn moq_consume_catalog_video(catalog: i32, index: i32, dst: *mut VideoTrack) -> i32 {
 	ffi::return_code(move || {
 		let catalog = ffi::parse_id(catalog)?;
+		if index < 0 {
+			return Err(Error::InvalidId);
+		}
 		let index = index as usize;
 		let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
 		CONSUME.lock().catalog_video(catalog, index, dst)
 	})
 }

Apply the same pattern to: moq_consume_catalog_audio, moq_consume_video_track, moq_consume_audio_track, and moq_consume_frame_chunk.

Alternatively, consider adding a helper like ffi::parse_index(i32) -> Result<usize, Error> to centralize this validation.

Also applies to: 399-405, 429-442, 466-479, 503-509

🧹 Nitpick comments (2)
rs/libmoq/src/api.rs (2)

324-328: Inconsistent parameter naming: export vs media.

The parameter is named export but the function is moq_publish_media_close. For consistency with moq_publish_media_init and moq_publish_media_frame which use media, consider:

-pub extern "C" fn moq_publish_media_close(export: i32) -> i32 {
+pub extern "C" fn moq_publish_media_close(media: i32) -> i32 {
 	ffi::return_code(move || {
-		let export = ffi::parse_id(export)?;
-		PUBLISH.lock().media_close(export)
+		let media = ffi::parse_id(media)?;
+		PUBLISH.lock().media_close(media)
 	})
 }

1-9: Consider adding inline tests for non-FFI logic.

Per coding guidelines, tests should be integrated within source files. While the FFI entry points themselves are challenging to unit test, helper logic or validation functions extracted from this module could benefit from inline tests.

For example, if you extract index validation as suggested:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_index_rejects_negative() {
        assert!(ffi::parse_index(-1).is_err());
    }

    #[test]
    fn test_parse_index_accepts_zero() {
        assert_eq!(ffi::parse_index(0).unwrap(), 0usize);
    }
}
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c5e4b24 and 842ea30.

📒 Files selected for processing (8)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/consume.rs (1 hunks)
  • rs/libmoq/src/lib.rs (1 hunks)
  • rs/libmoq/src/origin.rs (1 hunks)
  • rs/libmoq/src/publish.rs (1 hunks)
  • rs/libmoq/src/runtime.rs (1 hunks)
  • rs/libmoq/src/session.rs (1 hunks)
  • rs/libmoq/src/state.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • rs/libmoq/src/state.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • rs/libmoq/src/lib.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/runtime.rs
  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/origin.rs
  • rs/libmoq/src/session.rs
  • rs/libmoq/src/consume.rs
  • rs/libmoq/src/api.rs
🧬 Code graph analysis (2)
rs/libmoq/src/publish.rs (4)
rs/libmoq/src/consume.rs (1)
  • close (288-291)
rs/libmoq/src/session.rs (1)
  • close (55-58)
rs/libmoq/src/ffi.rs (1)
  • parse_slice (155-166)
rs/hang/src/model/timestamp.rs (1)
  • from_micros (40-46)
rs/libmoq/src/origin.rs (4)
rs/libmoq/src/runtime.rs (2)
  • std (20-20)
  • new (34-38)
rs/libmoq/src/publish.rs (3)
  • create (15-19)
  • get (21-23)
  • close (25-28)
rs/libmoq/src/consume.rs (1)
  • close (288-291)
rs/libmoq/src/session.rs (1)
  • close (55-58)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (32)
rs/libmoq/src/session.rs (4)

8-12: LGTM!

The Session struct correctly uses NonZeroSlab for managing session task cancellation channels, consistent with the pattern used across other modules in this PR.


15-37: LGTM!

The connect method correctly implements the cancellation pattern using oneshot channels. The tokio::select! ensures either graceful cancellation or connection result propagation. The callback receives both intermediate success (line 49) and final status.


39-53: LGTM!

The async connection flow properly chains client initialization, connection, and session establishment with appropriate error wrapping. The intermediate callback.call(()) at line 49 correctly signals successful connection before awaiting session closure.


55-62: LGTM!

The close method correctly triggers cancellation by dropping the sender, and the global SESSION static follows the established RuntimeLock pattern used across the crate.

rs/libmoq/src/publish.rs (4)

5-12: LGTM!

The Publish struct correctly manages two resource pools for broadcast producers and media decoders using NonZeroSlab.


14-28: LGTM!

The create, get, and close methods follow the established pattern used across other modules in this PR.


45-57: LGTM!

The media_frame method correctly uses data (the mutable binding) for the consumption assertion at line 54. The unsafe FFI slice parsing is properly delegated to parse_slice which handles null checks.


59-66: LGTM!

The media_close method and global PUBLISH static follow the established patterns.

rs/libmoq/src/runtime.rs (3)

10-25: LGTM!

The runtime initialization pattern is sound: creating a current-thread runtime, moving it to a dedicated background thread, and returning the handle for use via enter(). The pending::<()>() future keeps the runtime alive indefinitely.


27-50: LGTM!

The RuntimeLock correctly ensures the Tokio runtime context is entered before acquiring the mutex, enabling async task spawning from FFI code. The lock().unwrap() on line 43 follows Rust conventions for mutex access where poisoning typically indicates an unrecoverable state.


52-73: LGTM!

The RuntimeGuard correctly maintains both the runtime enter guard and the mutex guard, ensuring the Tokio context remains active for the duration of the lock. The Deref implementations provide ergonomic access to the underlying data.

rs/libmoq/src/origin.rs (6)

14-24: LGTM!

The Origin struct correctly manages three separate slabs for origin producers, announcement records, and cancellation channels.


26-33: LGTM!

The create and get methods are straightforward. Note that create() returns Id directly while Publish::create() returns Result<Id, Error> - both are valid since neither can fail, but the consistency could be improved in future refactoring.


35-61: Resource management concern: announced entries are not cleaned up when task closes.

The announced() method returns a task id (stored in announced_task), while run_announced() inserts announcement records into announced. However, announced_close() only removes from announced_task, leaving announced entries orphaned. Consider whether these should be cleaned up, or if this is intentional (announcements persist for later announced_info calls).

If cleanup is needed, you may want to track which announcements belong to which task:

 pub fn announced_close(&mut self, announced: Id) -> Result<(), Error> {
 	self.announced_task.remove(announced).ok_or(Error::NotFound)?;
+	// TODO: Consider cleaning up associated announced entries
 	Ok(())
 }

63-71: LGTM!

The announced_info method correctly provides pointers to internal string data for FFI consumption. The lifetime of these pointers is tied to the announced entry - callers must not use them after calling announced_close.


73-97: LGTM!

The announced_close, consume, publish, and close methods follow the established patterns with consistent error handling.


100-101: LGTM!

The global ORIGIN static follows the RuntimeLock pattern established in the crate.

rs/libmoq/src/consume.rs (11)

10-18: LGTM!

The ConsumeCatalog internal struct appropriately stores the broadcast reference, catalog data, and heap-allocated codec strings needed for FFI pointer stability.


20-39: LGTM!

The Consume struct correctly separates resource management across multiple slabs for broadcasts, catalogs, task cancellation channels, and frames.


41-62: LGTM!

The start and catalog methods follow the established async task pattern with proper cancellation support via oneshot channels.


64-105: LGTM!

The run_catalog method correctly builds heap-allocated codec strings for FFI pointer stability and properly releases the mutex before invoking the callback.


107-163: LGTM!

The catalog_video and catalog_audio methods correctly populate FFI structs with pointers to internal data. The pointer lifetime is tied to the catalog entry.


165-168: LGTM!

The catalog_close method follows the established pattern.


170-199: LGTM!

The video_track method correctly sets up a track subscription with the catalog's priority and wraps it with the specified latency for consumption.


201-230: LGTM!

The audio_track method mirrors video_track for audio consumption with appropriate task management.


232-256: LGTM!

The run_track method efficiently handles frame payload: avoiding allocation for single-chunk frames while falling back to copying for multi-chunk payloads. The TODO at line 234 is a reasonable future improvement.


258-291: LGTM!

The close methods and frame_chunk accessor follow consistent patterns. The NOTE at line 268 correctly documents the intended multi-call usage for chunk retrieval.


294-295: LGTM!

The global CONSUME static follows the established RuntimeLock pattern.

rs/libmoq/src/api.rs (4)

11-31: FFI-safety issues from prior review have been addressed.

The Option types have been replaced with raw pointers (*const u8, *const u32) with documented null semantics. This is the correct approach for C interop.

Also applies to: 34-53


91-103: LGTM!

The logging initialization handles both empty string (default) and specific level parsing correctly, with proper error propagation.


163-273: Origin API functions are well-structured.

Consistent error handling, proper resource pairing (create/close), and clear safety documentation throughout.


348-368: Consume API callback registration and cleanup functions are well-implemented.

Proper callback lifecycle documentation, consistent on_status/on_frame/on_catalog patterns, and paired close functions for resource cleanup.

Also applies to: 408-417, 445-454, 482-491, 512-532

Tokio didn't work like I expected. I'm sure it's possible to have multiple
mutexes, but it's a premature optimization.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
rs/libmoq/src/state.rs (1)

40-44: ** Blocking mutex called from async contexts.**

Past review comments flagged that std::sync::Mutex is used within async tasks (e.g., consume.rs lines 98, 251; origin.rs line 54). While the critical sections are brief, blocking mutexes can still block the executor thread. The past review suggested using tokio::sync::Mutex or minimizing the locked region.

Based on past review comments.

rs/libmoq/src/api.rs (1)

389-396: ** Negative index values wrap to large usize.**

Line 392 casts i32 to usize without checking for negative values. Negative indices will wrap to very large values, potentially causing out-of-bounds access. This issue was previously flagged for this function and others (moq_consume_catalog_audio line 411, moq_consume_video_track line 447, moq_consume_audio_track line 484, moq_consume_frame_chunk line 515).

Based on past review comments.

🧹 Nitpick comments (1)
rs/libmoq/src/session.rs (1)

26-34: Consider updating the comment reference.

Line 28 references [session_close], but the actual method name is close (line 55). Update the comment for consistency.

Apply this diff:

-			// No more receiver, which means [session_close] was called.
+			// No more receiver, which means [close] was called.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 842ea30 and aa3ef89.

📒 Files selected for processing (9)
  • rs/libmoq/CMakeLists.txt (3 hunks)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/consume.rs (1 hunks)
  • rs/libmoq/src/ffi.rs (2 hunks)
  • rs/libmoq/src/lib.rs (1 hunks)
  • rs/libmoq/src/origin.rs (1 hunks)
  • rs/libmoq/src/publish.rs (1 hunks)
  • rs/libmoq/src/session.rs (1 hunks)
  • rs/libmoq/src/state.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/ffi.rs
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/lib.rs
  • rs/libmoq/src/session.rs
  • rs/libmoq/src/origin.rs
  • rs/libmoq/src/consume.rs
  • rs/libmoq/src/state.rs
  • rs/libmoq/src/api.rs
🧬 Code graph analysis (6)
rs/libmoq/src/lib.rs (2)
rs/libmoq/src/origin.rs (2)
  • consume (76-79)
  • publish (81-90)
rs/moq-lite/src/model/origin.rs (4)
  • consume (170-173)
  • consume (401-403)
  • consume (494-496)
  • publish (144-168)
rs/libmoq/src/session.rs (3)
rs/libmoq/src/ffi.rs (1)
  • new (22-27)
rs/libmoq/src/consume.rs (1)
  • close (288-291)
rs/libmoq/src/publish.rs (1)
  • close (27-30)
rs/libmoq/src/origin.rs (4)
rs/libmoq/src/state.rs (2)
  • std (27-27)
  • enter (42-44)
rs/libmoq/src/publish.rs (3)
  • create (17-21)
  • get (23-25)
  • close (27-30)
rs/libmoq/src/consume.rs (1)
  • close (288-291)
rs/libmoq/src/session.rs (1)
  • close (55-58)
rs/libmoq/src/consume.rs (5)
rs/libmoq/src/state.rs (2)
  • enter (42-44)
  • new (17-38)
rs/libmoq/src/origin.rs (2)
  • consume (76-79)
  • close (92-95)
rs/libmoq/src/ffi.rs (1)
  • new (22-27)
rs/libmoq/src/publish.rs (1)
  • close (27-30)
rs/libmoq/src/session.rs (1)
  • close (55-58)
rs/libmoq/src/state.rs (1)
rs/libmoq/src/ffi.rs (1)
  • new (22-27)
rs/libmoq/src/api.rs (3)
rs/libmoq/src/state.rs (3)
  • std (27-27)
  • enter (42-44)
  • new (17-38)
rs/libmoq/src/ffi.rs (14)
  • parse_str (133-137)
  • code (44-44)
  • code (48-50)
  • code (54-56)
  • code (60-66)
  • code (70-75)
  • code (79-84)
  • code (88-93)
  • code (97-99)
  • code (103-105)
  • parse_id_optional (114-119)
  • new (22-27)
  • parse_id (109-111)
  • parse_slice (145-156)
rs/libmoq/src/error.rs (1)
  • code (101-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (39)
rs/libmoq/src/session.rs (3)

8-12: LGTM!

The Session struct design is clean and appropriately encapsulates the task cancellation channels using a NonZeroSlab.


39-53: Verify callback timing semantics.

The callback is invoked with Ok(()) on line 49 after connection establishment, but before session.closed().await (line 51). If the session fails during closed(), the callback has already signaled success. Ensure this matches the intended contract where the callback signals "connection established" rather than "session completed successfully."


55-58: LGTM!

The close method follows the standard pattern used across the codebase (similar to consume.rs and publish.rs).

rs/libmoq/CMakeLists.txt (3)

47-57: LGTM!

The interface library approach correctly wraps the Rust static library and properly handles platform-specific linkage for macOS frameworks.


66-73: LGTM!

The installation configuration properly uses CMake standard directories and follows best practices.


75-95: LGTM!

The CMake package configuration is properly set up with PATH_VARS for relocatability and appropriate version compatibility.

rs/libmoq/src/origin.rs (6)

8-24: Acknowledge the TODO for future refactoring.

The Origin struct appropriately centralizes origin-related state. The TODO on line 13 notes the intention to split this into separate structs/mutexes in the future, which would improve concurrency.


27-33: LGTM!

The create and get methods follow the standard pattern used across the codebase.


35-50: LGTM!

The announced method properly spawns an async task with cancellation support via oneshot channels.


52-59: LGTM!

The async loop correctly avoids holding the mutex during the callback by completing the State::enter call before invoking on_announce.


76-90: LGTM!

The consume and publish methods properly delegate to the underlying OriginProducer with appropriate error handling.


92-95: LGTM!

The close method follows the standard pattern for resource cleanup.

rs/libmoq/src/lib.rs (1)

1-34: LGTM!

The module reorganization properly separates the public FFI API surface (api module) from internal implementation details while maintaining clear documentation of the library's structure.

rs/libmoq/src/state.rs (4)

8-14: Public fields in State struct.

The State struct exposes all fields as public, which is unusual. Since access is gated through the State::enter method that holds a mutex, concurrent access is still protected. This design trades encapsulation for simplicity in the enter closures.


17-38: LGTM!

The runtime initialization properly creates a dedicated thread for async operations, which is appropriate for an FFI library where the caller may not have an async runtime.


46-52: LGTM!

The run method properly sets the runtime context and handles panics defensively.


55-56: LGTM!

The global STATE singleton using LazyLock is thread-safe and appropriate for the FFI boundary.

rs/libmoq/src/consume.rs (13)

10-18: LGTM!

The ConsumeCatalog struct appropriately stores codec information on the heap to support FFI pointer requirements.


20-39: LGTM!

The Consume struct is well-organized with clear separation of concerns for different resource types.


42-62: LGTM!

The start and catalog methods follow established patterns for resource management and async task spawning.


64-105: LGTM!

The run_catalog helper properly collects codec information and avoids holding the mutex during the callback (line 100 comment).


107-138: LGTM!

The catalog_video method correctly populates the FFI struct with raw pointers, using null for absent optional values.


140-163: LGTM!

The catalog_audio method mirrors the video method with consistent pointer handling.


165-168: LGTM!

Standard cleanup pattern.


170-199: LGTM!

The video_track method properly creates a track consumer with latency configuration and spawns an async task.


201-230: LGTM!

The audio_track method consistently mirrors the video_track pattern.


232-256: LGTM with TODO noted.

The run_track helper optimizes frame payload handling to avoid allocation when possible (lines 236-242). The TODO on line 234 acknowledges that a chunking API would be beneficial.


258-266: LGTM!

Both track close methods follow the standard cleanup pattern.


268-281: LGTM!

The frame_chunk method correctly supports multi-chunk frames as documented in the comment.


283-291: LGTM!

Both close methods follow standard cleanup patterns.

rs/libmoq/src/api.rs (9)

9-82: LGTM!

The FFI struct definitions properly use raw pointers with null semantics for optional values, addressing previous review concerns.


84-106: LGTM!

The moq_log_level function properly validates input and initializes logging with appropriate safety documentation.


108-150: LGTM!

The moq_session_connect function properly handles optional origin parameters and delegates to the Session type with good safety documentation.


152-275: LGTM!

The origin-related FFI functions consistently use the State::enter pattern with appropriate safety documentation.


277-355: LGTM!

The publish-related FFI functions properly delegate to the Publish subsystem with appropriate input validation.


428-463: LGTM aside from index casting.

The video track consumption functions properly handle latency configuration. The index casting issue on line 447 is addressed in a separate comment.


465-500: LGTM aside from index casting.

The audio track consumption functions mirror the video track pattern. The index casting issue on line 484 is addressed in a separate comment.


502-530: LGTM aside from index casting.

The frame functions properly support chunked frame access. The index casting issue on line 515 is addressed in a separate comment.


532-541: LGTM!

The moq_consume_close function follows the standard cleanup pattern.

Comment on lines +61 to +69
pub fn announced_info(&self, announced: Id, dst: &mut moq_announced) -> Result<(), Error> {
let announced = self.announced.get(announced).ok_or(Error::NotFound)?;
*dst = moq_announced {
path: announced.0.as_str().as_ptr() as *const c_char,
path_len: announced.0.len(),
active: announced.1,
};
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Explore the moq repository structure and examine the relevant files
# First, find the file
fd -t f "origin.rs" rs/libmoq/src/ 2>/dev/null | head -20

# Count lines in origin.rs
if [ -f "rs/libmoq/src/origin.rs" ]; then
  wc -l rs/libmoq/src/origin.rs
fi

# List files in rs/libmoq/src/
ls -la rs/libmoq/src/ 2>/dev/null | head -30

Repository: moq-dev/moq

Length of output: 807


🏁 Script executed:

#!/bin/bash
# Read the full origin.rs file since it's only 96 lines
echo "=== CONTENT OF origin.rs ==="
cat -n rs/libmoq/src/origin.rs

echo -e "\n=== CHECKING FOR announced_close ==="
rg -A 5 "announced_close" rs/libmoq/src/origin.rs

echo -e "\n=== CHECKING FOR inline tests #[cfg(test)] ==="
rg "#\[cfg\(test\)\]" rs/libmoq/src/origin.rs

Repository: moq-dev/moq

Length of output: 4093


🏁 Script executed:

#!/bin/bash
# Search for moq_origin_announced_info in api.rs
echo "=== Searching for moq_origin_announced_info in api.rs ==="
rg -B 5 -A 15 "moq_origin_announced_info" rs/libmoq/src/api.rs

# Also search for announced_close FFI wrapper
echo -e "\n=== Searching for announced_close FFI wrapper ==="
rg -B 2 -A 10 "announced_close" rs/libmoq/src/api.rs

Repository: moq-dev/moq

Length of output: 3044


🏁 Script executed:

#!/bin/bash
# Double-check what fields are removed in announced_close
echo "=== Checking all remove/cleanup operations on announced entry ==="
rg "announced\.remove\|announced_task\.remove" rs/libmoq/src/origin.rs

# Check if announced entries are ever cleaned up elsewhere
echo -e "\n=== Check if announced slab is ever modified elsewhere ==="
rg "announced" rs/libmoq/src/origin.rs | grep -E "(remove|clear|delete)" 

# Verify the structure definitions
echo -e "\n=== Check NonZeroSlab and what happens on task removal ==="
rg -A 3 "struct Origin" rs/libmoq/src/origin.rs

# Check api.rs for complete safety documentation pattern
echo -e "\n=== Check complete FFI safety docs ==="
rg -B 3 "# Safety" rs/libmoq/src/api.rs | grep -A 3 "announced_info\|announced_close"

Repository: moq-dev/moq

Length of output: 484


🏁 Script executed:

#!/bin/bash
# Final verification: check how announced IDs are generated and used
echo "=== Tracing announced ID flow ==="
cat -n rs/libmoq/src/origin.rs | grep -E "(announced\.insert|announced_task\.insert|announced_close|announced_info)"

echo -e "\n=== Check the moq_announced struct definition ==="
rg -B 2 -A 6 "struct moq_announced" rs/libmoq/src/

echo -e "\n=== Verify announced entry persistence ==="
echo "Looking for any cleanup of announced slab entries..."
rg "announced" rs/libmoq/src/ | grep -E "clear|remove|drop" | grep -v "announced_task" | grep -v "announced_close"

Repository: moq-dev/moq

Length of output: 1080


🏁 Script executed:

#!/bin/bash
# Check if there's any validation preventing announced_info calls after announced_close
echo "=== Check moq_origin_announced_info for ID validation ==="
rg -B 2 -A 10 "fn moq_origin_announced_info" rs/libmoq/src/api.rs

# Check if the announced entry lifetime is documented anywhere
echo -e "\n=== Check if announced entry lifetime is documented ==="
rg -B 5 -A 5 "announced_info\|announced_close" rs/libmoq/src/api.rs | head -80

# Verify there are NO tests anywhere in origin.rs
echo -e "\n=== Final check: any #[test] or #[cfg(test)] ==="
grep -i "test\|#\[" rs/libmoq/src/origin.rs

Repository: moq-dev/moq

Length of output: 761


Add inline test module to origin.rs.

The announced_info method returns raw pointers to internal String data, but the FFI contract for pointer validity is undocumented. The raw pointers remain valid as long as the announced entry exists in the slab (which persists even after announced_close), but C callers have no way to know this. Add a #[cfg(test)] mod tests module to document and verify the expected behavior of announced entries through their lifecycle, and update the FFI safety documentation to clarify that returned pointers remain valid after task closure.

🤖 Prompt for AI Agents
In rs/libmoq/src/origin.rs around lines 61 to 69, add a #[cfg(test)] mod tests
that documents and verifies the FFI lifetime behavior of announced_info: write
tests that create an announced entry, call announced_info to obtain the
moq_announced (capture path pointer and len), assert those pointers point to the
expected bytes while the entry exists, call announced_close (or the API that
closes the task) and then assert the pointer remains valid and still references
the same data (because the slab persists), and assert cleanup only when the slab
entry is removed; also update the announced_info / module FFI safety doc comment
to explicitly state that returned path pointers reference internal String
storage that remains valid after task closure while the announced slab entry
persists and must not be freed by C callers.

Comment on lines +71 to +74
pub fn announced_close(&mut self, announced: Id) -> Result<(), Error> {
self.announced_task.remove(announced).ok_or(Error::NotFound)?;
Ok(())
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Read the origin.rs file to understand the structure and search for cleanup patterns
cat rs/libmoq/src/origin.rs | head -150

Repository: moq-dev/moq

Length of output: 3038


🏁 Script executed:

#!/bin/bash
# Search for announced.remove usage in the codebase
rg "announced\.remove" rs/libmoq/src/ -A 2 -B 2

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Search for cleanup patterns related to announced slab
rg "announced" rs/libmoq/src/origin.rs -n

Repository: moq-dev/moq

Length of output: 1197


Remove announced slab entries when closing announcements.

The announced_close method only removes the task cancellation channel from announced_task but does not remove the corresponding entries from the announced slab. Since run_announced inserts a new entry into the announced slab for each received announcement, these entries accumulate indefinitely with no cleanup path, causing unbounded memory growth. Call self.announced.remove(announced) in addition to removing from announced_task.

🤖 Prompt for AI Agents
In rs/libmoq/src/origin.rs around lines 71 to 74, announced_close currently only
removes the cancellation handle from announced_task and never cleans up the
corresponding entry in the announced slab; update the function to also call
self.announced.remove(announced) after removing from announced_task so the slab
entry is freed, and ensure you propagate or return Error::NotFound if either
removal indicates the id was missing (or otherwise handle the remove() return
value appropriately).

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
rs/libmoq/src/api.rs (1)

400-407: Critical: Negative index values still not validated (unaddressed past review comment).

Line 403 casts i32 directly to usize without checking for negative values. This was flagged in a previous review but remains unaddressed. Negative values will wrap to very large usize values, potentially causing out-of-bounds access or incorrect behavior.

Add validation before casting:

 	State::enter(move |state| {
 		let catalog = ffi::parse_id(catalog)?;
+		if index < 0 {
+			return Err(Error::InvalidId);
+		}
 		let index = index as usize;
 		let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
 		state.consume.video_config(catalog, index, dst)
 	})

This same issue exists in:

  • moq_consume_audio_config (line 422)
  • moq_consume_video_track (line 447)
  • moq_consume_audio_track (line 484)
  • moq_consume_frame_chunk (line 515)

All of these require the same validation.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aa3ef89 and 5f5a59a.

📒 Files selected for processing (2)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/consume.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/consume.rs
  • rs/libmoq/src/api.rs
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
Repo: moq-dev/moq PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-10T04:00:14.871Z
Learning: Core protocol implementation in the `moq` layer must be generic and not contain media-specific logic; CDN/relay does not know anything about media
🧬 Code graph analysis (2)
rs/libmoq/src/consume.rs (5)
rs/libmoq/src/state.rs (3)
  • std (27-27)
  • enter (42-44)
  • new (17-38)
rs/libmoq/src/origin.rs (2)
  • consume (76-79)
  • close (92-95)
rs/libmoq/src/ffi.rs (1)
  • new (22-27)
rs/libmoq/src/session.rs (1)
  • close (55-58)
rs/libmoq/src/publish.rs (1)
  • close (27-30)
rs/libmoq/src/api.rs (3)
rs/libmoq/src/state.rs (2)
  • std (27-27)
  • enter (42-44)
rs/libmoq/src/ffi.rs (14)
  • parse_str (133-137)
  • code (44-44)
  • code (48-50)
  • code (54-56)
  • code (60-66)
  • code (70-75)
  • code (79-84)
  • code (88-93)
  • code (97-99)
  • code (103-105)
  • parse_url (122-125)
  • parse_id_optional (114-119)
  • parse_id (109-111)
  • parse_slice (145-156)
rs/libmoq/src/error.rs (1)
  • code (101-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (4)
rs/libmoq/src/consume.rs (2)

107-138: Document lifetime constraints more explicitly.

The video_config and audio_config methods return raw pointers to internal heap-allocated strings (lines 117-118, 150-151) and catalog data. These pointers become invalid when catalog_close is called. While this is documented in the FFI layer (api.rs line 398), it's a common source of use-after-free bugs in C FFI.

Consider whether the API should copy string data to caller-provided buffers instead, or ensure the documentation at the FFI boundary is very prominent. You might also want to add runtime assertions in debug builds to detect use-after-free.

Also applies to: 140-163


232-256: Performance optimization opportunity documented in TODO.

The frame handling currently allocates a new BufList even when avoiding a full buffer copy (line 236-242). The TODO at line 234 correctly identifies this as a future optimization opportunity.

rs/libmoq/src/api.rs (2)

10-54: LGTM: FFI safety issues addressed.

The moq_video_config and moq_audio_config structs correctly use raw pointers instead of Option types for FFI safety, addressing the concerns from previous reviews. The null pointer semantics are clearly documented.


94-106: LGTM: FFI functions follow consistent patterns.

The FFI functions consistently use State::enter, proper error handling via ffi::return_code, input validation with ffi::parse_* helpers, and clear safety documentation. The API design is coherent and follows good practices for C interop.

Also applies to: 127-150, 158-163, 175-177, 281-283, 289-294, 306-320, 326-331, 343-355, 367-377, 383-388, 458-463, 495-500, 525-530, 536-541

Comment on lines 438 to 452
pub unsafe extern "C" fn moq_consume_video_track(
broadcast: i32,
index: i32,
max_buffer_ms: u64,
on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
user_data: *mut c_void,
) -> i32 {
State::enter(move |state| {
let broadcast = ffi::parse_id(broadcast)?;
let index = index as usize;
let max_buffer = std::time::Duration::from_millis(max_buffer_ms);
let on_frame = ffi::OnStatus::new(user_data, on_frame);
state.consume.video_track(broadcast, index, max_buffer, on_frame)
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Misleading parameter name: broadcast should be catalog.

The first parameter is named broadcast (line 439), but it's actually a catalog ID based on the implementation in consume.rs (line 170-176). This could confuse API users who might pass a broadcast ID instead of a catalog ID.

 pub unsafe extern "C" fn moq_consume_video_track(
-	broadcast: i32,
+	catalog: i32,
 	index: i32,
 	max_buffer_ms: u64,
 	on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
 	user_data: *mut c_void,
 ) -> i32 {
 	State::enter(move |state| {
-		let broadcast = ffi::parse_id(broadcast)?;
+		let catalog = ffi::parse_id(catalog)?;
 		let index = index as usize;
 		let max_buffer = std::time::Duration::from_millis(max_buffer_ms);
 		let on_frame = ffi::OnStatus::new(user_data, on_frame);
-		state.consume.video_track(broadcast, index, max_buffer, on_frame)
+		state.consume.video_track(catalog, index, max_buffer, on_frame)
 	})
 }

The same issue exists in moq_consume_audio_track (lines 475-489).

Comment on lines 46 to 62
pub fn catalog(&mut self, broadcast: Id, mut on_catalog: OnStatus) -> Result<Id, Error> {
let broadcast = self.broadcast.get(broadcast).ok_or(Error::NotFound)?.clone();

let channel = oneshot::channel();

tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_catalog(broadcast, &mut on_catalog) => res,
_ = channel.1 => Ok(()),
};
on_catalog.call(res);
});

let id = self.catalog_task.insert(channel.0);

Ok(id)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add a method to close the catalog subscription task.

The catalog method spawns a task and returns its ID (line 59), but there's no corresponding method to cancel this subscription and clean up the task from catalog_task. Users can close individual catalogs via catalog_close, but the subscription task continues running and producing new catalog IDs.

Add a method like catalog_subscription_close:

+	pub fn catalog_subscription_close(&mut self, task: Id) -> Result<(), Error> {
+		self.catalog_task.remove(task).ok_or(Error::NotFound)?;
+		Ok(())
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn catalog(&mut self, broadcast: Id, mut on_catalog: OnStatus) -> Result<Id, Error> {
let broadcast = self.broadcast.get(broadcast).ok_or(Error::NotFound)?.clone();
let channel = oneshot::channel();
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_catalog(broadcast, &mut on_catalog) => res,
_ = channel.1 => Ok(()),
};
on_catalog.call(res);
});
let id = self.catalog_task.insert(channel.0);
Ok(id)
}
pub fn catalog(&mut self, broadcast: Id, mut on_catalog: OnStatus) -> Result<Id, Error> {
let broadcast = self.broadcast.get(broadcast).ok_or(Error::NotFound)?.clone();
let channel = oneshot::channel();
tokio::spawn(async move {
let res = tokio::select! {
res = Self::run_catalog(broadcast, &mut on_catalog) => res,
_ = channel.1 => Ok(()),
};
on_catalog.call(res);
});
let id = self.catalog_task.insert(channel.0);
Ok(id)
}
pub fn catalog_subscription_close(&mut self, task: Id) -> Result<(), Error> {
self.catalog_task.remove(task).ok_or(Error::NotFound)?;
Ok(())
}
🤖 Prompt for AI Agents
In rs/libmoq/src/consume.rs around lines 46 to 62, the spawned catalog
subscription task stores a oneshot::Sender in catalog_task but there is no
method to cancel that sender and remove it from catalog_task; add a new method
(e.g., pub fn catalog_subscription_close(&mut self, id: Id) -> Result<(),
Error>) that looks up and removes the sender from self.catalog_task (returning
Error::NotFound if absent) and then drops it (letting the receiver branch in the
spawned task complete), ensuring the task is signaled to stop and the entry is
cleaned up from the catalog_task map.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/libmoq/Cargo.toml (1)

23-31: Fix broken workspace dependency references or specify explicit versions.

Lines 23-31 contain mixed dependency specifications: some use { workspace = true } (hang, moq-lite, moq-native, tokio) while others specify explicit versions. However, no workspace root Cargo.toml exists at rs/Cargo.toml. The workspace = true references will fail to resolve. Either create a workspace configuration in rs/Cargo.toml with [workspace.dependencies] definitions for all shared dependencies, or replace all { workspace = true } references with explicit version specifications.

♻️ Duplicate comments (4)
rs/libmoq/src/consume.rs (2)

46-62: Resource leak: Task slab entries not cleaned up on natural completion.

When the spawned task completes naturally (not via cancellation), the catalog_task slab entry is never removed. Additionally, there's no method to cancel the catalog subscription task.

This was flagged in a previous review. Consider:

  1. Adding a catalog_task_close method to allow explicit cancellation
  2. Having tasks clean up their slab entries on completion via State::enter

170-230: Same resource leak pattern as catalog.

The video_task and audio_task slab entries are not cleaned up when tasks complete naturally. This is the same issue flagged for the catalog function.

rs/libmoq/src/api.rs (2)

401-408: Negative index values silently wrap to large usize.

Casting i32 to usize when index is negative produces an unexpectedly large value, likely causing NoIndex errors but with confusing semantics.

Consider validating before conversion:

 	State::enter(move |state| {
 		let catalog = ffi::parse_id(catalog)?;
+		if index < 0 {
+			return Err(Error::NoIndex);
+		}
 		let index = index as usize;
 		let dst = dst.as_mut().ok_or(Error::InvalidPointer)?;
 		state.consume.video_config(catalog, index, dst)
 	})

This same issue exists in moq_consume_audio_config (line 423), moq_consume_video_ordered (line 448), moq_consume_audio_ordered (line 485), and moq_consume_frame_chunk (line 516).


439-453: Misleading parameter name: broadcast should be catalog.

The first parameter is named broadcast, but based on consume.rs line 170-177, this is actually a catalog ID. This could confuse API users.

Suggested fix:
 pub unsafe extern "C" fn moq_consume_video_ordered(
-	broadcast: i32,
+	catalog: i32,
 	index: i32,
 	max_latency_ms: u64,
 	on_frame: Option<extern "C" fn(user_data: *mut c_void, frame: i32)>,
 	user_data: *mut c_void,
 ) -> i32 {
 	State::enter(move |state| {
-		let broadcast = ffi::parse_id(broadcast)?;
+		let catalog = ffi::parse_id(catalog)?;
 		let index = index as usize;
 		let max_latency = std::time::Duration::from_millis(max_latency_ms);
 		let on_frame = ffi::OnStatus::new(user_data, on_frame);
-		state.consume.video_ordered(broadcast, index, max_latency, on_frame)
+		state.consume.video_ordered(catalog, index, max_latency, on_frame)
 	})
 }

The same issue exists in moq_consume_audio_ordered (lines 476-490).

🧹 Nitpick comments (1)
rs/libmoq/src/consume.rs (1)

232-256: LGTM!

Good optimization to avoid allocation for single-chunk frames. The TODO comment about a chunking API is noted.

Would you like me to open an issue to track the chunking API improvement mentioned in the TODO comment?

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f5a59a and c43f5ef.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • rs/libmoq/Cargo.toml (1 hunks)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/consume.rs (1 hunks)
  • rs/libmoq/src/publish.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
rs/**/Cargo.toml

📄 CodeRabbit inference engine (CLAUDE.md)

For Rust development, use the workspace configuration in rs/Cargo.toml

Files:

  • rs/libmoq/Cargo.toml
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/consume.rs
  • rs/libmoq/src/api.rs
🧬 Code graph analysis (3)
rs/libmoq/src/publish.rs (2)
rs/libmoq/src/consume.rs (1)
  • close (288-291)
rs/libmoq/src/session.rs (1)
  • close (55-58)
rs/libmoq/src/consume.rs (5)
rs/libmoq/src/state.rs (3)
  • std (27-27)
  • enter (42-44)
  • new (17-38)
rs/libmoq/src/origin.rs (2)
  • consume (76-79)
  • close (92-95)
rs/libmoq/src/ffi.rs (1)
  • new (22-27)
rs/libmoq/src/publish.rs (1)
  • close (27-30)
rs/libmoq/src/session.rs (1)
  • close (55-58)
rs/libmoq/src/api.rs (3)
rs/libmoq/src/state.rs (3)
  • std (27-27)
  • enter (42-44)
  • new (17-38)
rs/libmoq/src/ffi.rs (14)
  • parse_str (133-137)
  • code (44-44)
  • code (48-50)
  • code (54-56)
  • code (60-66)
  • code (70-75)
  • code (79-84)
  • code (88-93)
  • code (97-99)
  • code (103-105)
  • parse_url (122-125)
  • new (22-27)
  • parse_id (109-111)
  • parse_slice (145-156)
rs/libmoq/src/error.rs (1)
  • code (101-124)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (19)
rs/libmoq/src/publish.rs (5)

7-14: LGTM! Clean resource management design.

The struct cleanly separates broadcast producers from media decoders, following the same NonZeroSlab-based pattern used elsewhere in the codebase.


17-30: LGTM!

The create, get, and close methods follow the established pattern from other managers in the codebase (session.rs, consume.rs).


32-48: LGTM!

Buffer consumption validation using has_remaining() is correctly implemented. The error wrapping with Arc appropriately handles the non-Clone nature of anyhow::Error.


50-64: LGTM!

The frame decoding logic correctly validates buffer consumption and wraps errors consistently with media_ordered.


66-69: LGTM!

Consistent close pattern.

rs/libmoq/src/consume.rs (5)

10-18: LGTM!

The ConsumeCatalog struct appropriately stores heap-allocated codec strings to ensure stable pointers for FFI consumption.


20-39: LGTM!

Clear separation of broadcast consumers, catalog state, task cancellation channels, and buffered frames.


64-105: LGTM!

The async catalog processing correctly:

  • Extracts codec information to heap-allocated strings
  • Uses State::enter to safely insert catalog entries
  • Avoids holding the mutex during callbacks (per line 100 comment)

107-163: LGTM!

Both config functions correctly extract rendition information and fill FFI-safe structs. The pointer validity is tied to catalog lifetime, which is documented in the API safety notes.


258-291: LGTM!

All close and chunk access functions follow consistent patterns and properly handle missing resources.

rs/libmoq/src/api.rs (8)

9-31: LGTM! FFI-safe struct design.

The moq_video_config struct now uses raw pointers (*const u8, *const u32) instead of Option types, addressing the previous FFI compatibility concerns.


33-54: LGTM!

FFI-safe struct with raw pointer for optional description field.


56-82: LGTM!

Both structs are properly repr(C) for FFI compatibility.


84-106: LGTM!

Clean log level initialization with sensible default behavior for empty strings.


108-163: LGTM!

Session management functions properly handle optional origin handles and include appropriate safety documentation.


165-275: LGTM!

Origin management functions are consistent and well-documented.


277-356: LGTM!

Publish functions properly validate input and propagate errors through the FFI boundary.


503-542: LGTM!

Frame access and cleanup functions follow consistent patterns. The negative index issue at line 516 was already noted above.

rs/libmoq/Cargo.toml (1)

11-11: Version bump to 0.2.0 is appropriate for the API changes.

The minor version bump from 0.1.2 to 0.2.0 correctly signals breaking API changes (type renaming, module reorganization, public FFI surface changes) described in the PR summary. For pre-1.0 crates, this follows semantic versioning conventions.

@kixelated kixelated enabled auto-merge (squash) December 18, 2025 04:44
@kixelated kixelated disabled auto-merge December 18, 2025 04:48
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ad3031f and 453db4e.

⛔ Files ignored due to path filters (1)
  • deno.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • rs/libmoq/src/ffi.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/ffi.rs
🧬 Code graph analysis (1)
rs/libmoq/src/ffi.rs (2)
rs/libmoq/src/error.rs (1)
  • code (101-124)
rs/libmoq/src/id.rs (3)
  • try_from (51-53)
  • try_from (59-61)
  • try_from (73-75)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (3)
rs/libmoq/src/ffi.rs (3)

73-138: LGTM! Well-designed trait with comprehensive error handling.

The ReturnCode trait provides a clean abstraction for converting Rust types to C-compatible status codes. Key strengths:

  • Negative code validation (line 94) prevents conflicts with error codes
  • Graceful fallback to InvalidCode on conversion failures
  • Comprehensive coverage of return types needed for the FFI surface
  • Consistent error handling pattern across all implementations

140-188: Parsing utilities are well-implemented with proper safety.

The parsing functions correctly handle FFI concerns:

  • Proper unsafe block usage in parse_str (line 155) and parse_slice (line 186)
  • Good null pointer validation in parse_slice (lines 178-184)
  • UTF-8 validation in parse_str (line 167) prevents invalid string access
  • Lifetime annotations correctly tie parsed references to input validity

The convention of treating 0 as None in parse_id_optional (line 148) is reasonable for C FFI, though consider documenting this convention in the C API headers to make it explicit for callers.


29-37: Remove the TODO comment—the current pattern is correct for FFI bridges.

The code correctly implements a safe pattern for bridging synchronous C code to an asynchronous Tokio runtime. The concerns in the TODO are based on misunderstandings:

  1. RUNTIME is a LazyLock<Handle>, not the Runtime itself. The actual tokio::runtime::Runtime runs on a dedicated background thread executing block_on(pending()), so it never gets dropped during the program's lifetime.

  2. LazyLock initialization is thread-safe. LazyLock is thread-safe and can be used in statics; since initialization may be called from multiple threads, any dereferencing call will block if another initialization routine is currently running.

  3. Multiple threads calling RUNTIME.enter() is safe and by design. The runtime context is entered using thread-local variables to store the current runtime, and whenever inside the runtime context, methods like tokio::spawn will use the runtime whose context you are inside. This pattern is standard for FFI bridges and handles are specifically designed for sharing across threads.

  4. AssertUnwindSafe is appropriate here. At FFI boundaries, this is an acceptable way to prevent panics from crossing the FFI boundary into C code, assuming the closure doesn't hold unwind-unsafe state.

The code as written is correct. Remove the TODO and, if desired, add a brief comment explaining this is the standard pattern for synchronous FFI wrappers around Tokio async code.

Likely an incorrect or invalid review comment.

Comment on lines 10 to 25
pub static RUNTIME: LazyLock<tokio::runtime::Handle> = LazyLock::new(|| {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = runtime.handle().clone();

std::thread::Builder::new()
.name("libmoq".into())
.spawn(move || {
runtime.block_on(std::future::pending::<()>());
})
.expect("failed to spawn runtime thread");

handle
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle initialization failures gracefully.

The .unwrap() on line 14 and .expect() on line 22 will panic if runtime creation or thread spawning fails. For a library consumed via C FFI, initialization panics can be difficult to debug and handle from the caller's perspective.

Consider returning a Result from a separate initialization function or using a more graceful fallback strategy.

Potential alternative approach

You could expose an explicit initialization function that returns an error code, allowing C callers to handle initialization failures:

// Example pattern (requires API changes):
static RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();

#[no_mangle]
pub extern "C" fn libmoq_init() -> i32 {
    enter(|| {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;
        let handle = runtime.handle().clone();
        
        std::thread::Builder::new()
            .name("libmoq".into())
            .spawn(move || {
                runtime.block_on(std::future::pending::<()>());
            })?;
        
        RUNTIME.set(handle).map_err(|_| Error::InitFailed("already initialized"))?;
        Ok(())
    })
}

Err(_) => Error::Panic.code(),
}
}
unsafe impl Send for OnStatus {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Document the safety invariants for the Send implementation.

The unsafe impl Send for OnStatus allows OnStatus to be sent across threads, but raw pointers (*mut c_void) are not inherently Send. The safety of this implementation depends on:

  1. The C callback function pointer being thread-safe (typically true for C functions)
  2. The user_data pointer either being thread-safe or only accessed from the thread where the callback is invoked

Please add a safety comment above the Send impl explaining these invariants, similar to the pattern in the new constructor.

Suggested safety documentation
+// SAFETY: 
+// - C function pointers are thread-safe (they're just addresses)
+// - user_data is only dereferenced when calling the callback, and it's
+//   the caller's responsibility (via the `new` safety contract) to ensure
+//   user_data remains valid and can be safely accessed from the callback thread
 unsafe impl Send for OnStatus {}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
unsafe impl Send for OnStatus {}
// SAFETY:
// - C function pointers are thread-safe (they're just addresses)
// - user_data is only dereferenced when calling the callback, and it's
// the caller's responsibility (via the `new` safety contract) to ensure
// user_data remains valid and can be safely accessed from the callback thread
unsafe impl Send for OnStatus {}

And it makes you want to check for <0
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
rs/libmoq/src/ffi.rs (2)

10-25: Initialization panics and threading concerns already flagged.

The use of .unwrap() and .expect() for runtime/thread creation, and the current_thread runtime's thread-safety limitations, have been identified in previous reviews. These remain valid concerns for a C FFI library where panics are difficult to handle gracefully.


71-71: Safety documentation for Send impl still needed.

As noted in a previous review, this unsafe impl Send should have a safety comment explaining why it's sound (C function pointers are thread-safe, and caller ensures user_data validity per the new contract).

🧹 Nitpick comments (2)
rs/libmoq/src/ffi.rs (1)

177-188: Redundant unsafe block inside unsafe function.

Line 186 has an unsafe block, but the function is already unsafe fn. While not incorrect, this adds visual noise.

🔎 Suggested fix
 pub unsafe fn parse_slice<'a>(data: *const u8, size: usize) -> Result<&'a [u8], Error> {
 	if data.is_null() {
 		if size == 0 {
 			return Ok(&[]);
 		}
 
 		return Err(Error::InvalidPointer);
 	}
 
-	let data = unsafe { std::slice::from_raw_parts(data, size) };
+	let data = std::slice::from_raw_parts(data, size);
 	Ok(data)
 }
rs/libmoq/src/api.rs (1)

1-549: Consider adding inline tests for non-FFI helper logic.

Per coding guidelines, Rust crates should include inline test modules. While testing extern "C" functions directly is challenging, any internal helper functions or parsing logic could benefit from unit tests. This could be deferred given the FFI-focused nature of this module.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 453db4e and cd6cdb6.

📒 Files selected for processing (2)
  • rs/libmoq/src/api.rs (1 hunks)
  • rs/libmoq/src/ffi.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/ffi.rs
  • rs/libmoq/src/api.rs
🧬 Code graph analysis (2)
rs/libmoq/src/ffi.rs (2)
rs/libmoq/src/error.rs (1)
  • code (101-124)
rs/libmoq/src/id.rs (3)
  • try_from (51-53)
  • try_from (59-61)
  • try_from (73-75)
rs/libmoq/src/api.rs (3)
rs/libmoq/src/ffi.rs (16)
  • std (20-20)
  • enter (29-37)
  • parse_str (165-169)
  • code (76-76)
  • code (80-82)
  • code (86-88)
  • code (92-98)
  • code (102-107)
  • code (111-116)
  • code (120-125)
  • code (129-131)
  • code (135-137)
  • parse_url (154-157)
  • parse_id_optional (146-151)
  • parse_id (141-143)
  • parse_slice (177-188)
rs/libmoq/src/error.rs (1)
  • code (101-124)
rs/libmoq/src/state.rs (1)
  • lock (22-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (11)
rs/libmoq/src/ffi.rs (4)

27-37: Panic handling approach looks reasonable.

The use of catch_unwind with AssertUnwindSafe to convert panics into error codes is appropriate for FFI boundaries. The TODO comment acknowledges the multi-threaded context concern raised in previous reviews.


39-69: Callback wrapper design is sound.

The OnStatus wrapper correctly documents safety invariants for the new constructor, and using &mut self in call to avoid requiring Sync is a good design choice.


73-138: ReturnCode implementations are well-designed.

The trait provides clean separation between success (non-negative) and error (negative) codes. The guard against negative Ok(code) values in Result<i32, Error>::code() (line 94) is a good defensive measure to maintain the convention.


140-151: ID parsing helpers are correct.

Using u32 for handles and treating 0 as "no ID" is a clean convention. The delegation to Id::try_from properly returns Error::InvalidId for invalid values.

rs/libmoq/src/api.rs (7)

9-82: FFI-safe struct definitions look correct.

The repr(C) structs properly use raw pointers with separate length fields instead of Option types (addressing previous review feedback). The pointer+length pattern is appropriate for C interop.


84-106: Log level initialization is well-structured.

The function correctly uses ffi::enter for error handling, supports an empty string for defaults, and properly propagates parsing errors via ?.


108-151: Session connection handling is well-implemented.

The function correctly acquires a single lock and performs all operations atomically. Using parse_id_optional with 0 to disable publish/consume functionality is a clean API design.


280-359: Publish functions are correctly implemented.

All functions use single lock acquisition and proper error propagation. The timestamp conversion from microseconds using hang::Timestamp::from_micros correctly propagates errors via ?.


361-430: Catalog functions properly validate inputs.

Using u32 for index parameters (addressing previous feedback about negative values) and validating destination pointers with .as_mut().ok_or(Error::InvalidPointer) are correct patterns.


432-508: Video and audio consumption functions are well-structured.

The functions follow consistent patterns with proper ID parsing, duration conversion, and callback setup. Using u32 for indices eliminates the previous concern about negative value wrapping.


510-549: Frame handling and cleanup functions are correct.

These functions follow the established patterns consistently, with proper ID parsing and destination pointer validation.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
rs/libmoq/src/ffi.rs (2)

10-25: Initialization panics remain unaddressed.

The .unwrap() on line 14 and .expect() on line 22 will still panic if runtime creation or thread spawning fails. For a library consumed via C FFI, these panics occur when LazyLock initializes on the first FFI call, making them difficult to debug from C code.

The previous review suggested exposing an explicit initialization function that returns an error code, allowing C callers to handle failures gracefully.


76-76: Add safety documentation for the Send implementation.

The unsafe impl Send for OnStatus still lacks safety documentation explaining why it's safe to send raw pointers across threads. The previous review provided a template that should be added.

🔎 Suggested safety documentation
+// SAFETY: 
+// - C function pointers are thread-safe (they're just addresses)
+// - user_data is only dereferenced when calling the callback, and it's
+//   the caller's responsibility (via the `new` safety contract) to ensure
+//   user_data remains valid and can be safely accessed from the callback thread
 unsafe impl Send for OnStatus {}
🧹 Nitpick comments (3)
rs/libmoq/src/ffi.rs (3)

27-42: Document the concurrency model and performance implications for C callers.

The mutex-based serialization ensures correctness but serializes all FFI calls. This is acknowledged in the comment on lines 33-34, but the concurrency model should be documented for C API users.

Consider adding documentation (e.g., in a header file or README) stating:

  • All FFI calls are serialized and thread-safe
  • Concurrent FFI calls from multiple threads will block on the mutex
  • If this becomes a bottleneck, callers should consider batching operations or calling from a single thread

145-156: Correct the doc comment inconsistency.

Lines 145 and 150 mention "i32 handle" in the doc comments, but both functions accept u32 parameters. The comments should be updated to match the actual parameter type.

🔎 Suggested fix
-/// Parse an i32 handle into an Id.
+/// Parse a u32 handle into an Id.
 pub fn parse_id(id: u32) -> Result<Id, Error> {
 	Id::try_from(id)
 }
 
-/// Parse an optional i32 handle (0 = None) into an Option<Id>.
+/// Parse an optional u32 handle (0 = None) into an Option<Id>.
 pub fn parse_id_optional(id: u32) -> Result<Option<Id>, Error> {
 	match id {
 		0 => Ok(None),
 		id => Ok(Some(parse_id(id)?)),
 	}
 }

164-174: Clarify the empty string documentation.

Line 166 states "Returns an empty string if the pointer is null," but this is only accurate when both the pointer is null AND cstr_len is 0. If the pointer is null but cstr_len is non-zero, the function returns Err(Error::InvalidPointer).

🔎 Suggested documentation clarification
 /// Parse a C string pointer into a &str.
 ///
-/// Returns an empty string if the pointer is null.
+/// Returns an empty string if the pointer is null and length is zero.
 ///
 /// # Safety
 /// The caller must ensure that cstr is valid for 'a.
 pub unsafe fn parse_str<'a>(cstr: *const c_char, cstr_len: usize) -> Result<&'a str, Error> {
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd6cdb6 and e51ff8f.

📒 Files selected for processing (1)
  • rs/libmoq/src/ffi.rs (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/ffi.rs
🧬 Code graph analysis (1)
rs/libmoq/src/ffi.rs (2)
rs/libmoq/src/error.rs (1)
  • code (101-124)
rs/libmoq/src/id.rs (3)
  • try_from (51-53)
  • try_from (59-61)
  • try_from (73-75)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (3)
rs/libmoq/src/ffi.rs (3)

44-74: LGTM!

The OnStatus wrapper is well-designed:

  • Safety documentation clearly explains the caller's responsibilities
  • Using &mut self for call() avoids the need for Sync while maintaining safety
  • The Option for the callback function pointer allows for optional callbacks

78-143: ReturnCode implementations look solid.

The trait provides a clean abstraction for converting Rust types to C-compatible return codes. The implementations are consistent and handle overflow cases appropriately with Error::InvalidCode.

Note: Line 99 validates that Ok(code) values are non-negative, which enforces the convention that positive/zero codes indicate success and negative codes indicate errors.


158-193: String and URL parsing helpers are well-implemented.

The parsing functions properly handle:

  • Null pointer edge cases with appropriate error handling
  • Length-based parsing (avoiding C string null-termination issues)
  • UTF-8 validation
  • Safety documentation for unsafe functions

The pattern of accepting explicit lengths rather than relying on null-termination improves safety at the FFI boundary.

Comment on lines +27 to +42
/// Runs the provided function in the runtime context.
/// Additionally, we convert the return code to a C-compatible return value.
///
/// Uses a mutex to ensure Handle::enter() guards are dropped in LIFO order,
/// as required by tokio to avoid panics in multi-threaded FFI contexts.
pub fn enter<C: ReturnCode, F: FnOnce() -> C>(f: F) -> i32 {
// NOTE: I think we need a mutex because Handle::enter() needs to be dropped in LIFO order.
// If this starts to become a bottleneck, we might have to rethink our runtime model.
let handle = RUNTIME.lock().unwrap();
let _guard = handle.enter();

match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) {
Ok(ret) => ret.code(),
Err(_) => Error::Panic.code(),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Add inline tests for the FFI infrastructure.

Per the coding guidelines, Rust crates should include inline test modules. The enter function is critical FFI infrastructure that handles runtime context, panic recovery, and return code conversion—all of which should have test coverage.

🔎 Example test module
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_enter_success() {
        let result = enter(|| Ok::<(), Error>(()));
        assert_eq!(result, 0);
    }

    #[test]
    fn test_enter_panic_handling() {
        let result = enter(|| -> Result<(), Error> {
            panic!("test panic");
        });
        assert_eq!(result, Error::Panic.code());
    }

    #[test]
    fn test_enter_concurrent_calls() {
        use std::sync::Arc;
        use std::thread;

        let handles: Vec<_> = (0..10)
            .map(|_| {
                thread::spawn(|| {
                    enter(|| Ok::<(), Error>(()))
                })
            })
            .collect();

        for handle in handles {
            assert_eq!(handle.join().unwrap(), 0);
        }
    }
}
🤖 Prompt for AI Agents
In rs/libmoq/src/ffi.rs around lines 27 to 42, there are no inline tests for the
critical enter function; add a #[cfg(test)] mod tests that imports super::* and
contains at least three tests: (1) test_enter_success that calls enter(||
Ok::<(), Error>(())) and asserts it returns 0, (2) test_enter_panic_handling
that calls enter with a closure that panics and asserts it returns
Error::Panic.code(), and (3) test_enter_concurrent_calls which spawns multiple
threads (e.g. 10), each calling enter(|| Ok::<(), Error>(())) and asserts each
join returns 0 to validate the mutex/LIFO behavior; ensure necessary std imports
(thread, sync::Arc if used) are added only inside the test module.

@kixelated kixelated enabled auto-merge (squash) December 18, 2025 05:09
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
rs/libmoq/src/api.rs (2)

79-81: Consider validating the active state toggle requirement.

The comment states that active MUST toggle between true and false over the lifetime of the broadcast. However, there's no enforcement mechanism for this requirement at the API level. This could lead to incorrect usage by C clients.

Consider either:

  1. Adding validation logic to detect and error on invalid state transitions
  2. Softening the language to document expected behavior without requiring strict enforcement
  3. Documenting the consequences of not following this pattern

1-551: Consider adding inline test coverage.

This file contains no inline test module. According to the coding guidelines, "In Rust crates, tests should be integrated within source files using inline test modules."

While FFI layers are sometimes tested via integration tests or from C, consider adding a #[cfg(test)] module to test:

  • Error handling paths (invalid IDs, null pointers)
  • Parsing logic edge cases
  • Return code mappings
  • State management behavior

This would improve maintainability and catch regressions early.

Based on coding guidelines, inline tests are expected in Rust source files.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e51ff8f and 2cdf531.

📒 Files selected for processing (1)
  • rs/libmoq/src/api.rs (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.rs

📄 CodeRabbit inference engine (CLAUDE.md)

In Rust crates, tests should be integrated within source files using inline test modules

Files:

  • rs/libmoq/src/api.rs
🧬 Code graph analysis (1)
rs/libmoq/src/api.rs (4)
rs/libmoq/src/ffi.rs (15)
  • enter (32-42)
  • parse_str (170-174)
  • code (81-81)
  • code (85-87)
  • code (91-93)
  • code (97-103)
  • code (107-112)
  • code (116-121)
  • code (125-130)
  • code (134-136)
  • code (140-142)
  • parse_url (159-162)
  • parse_id_optional (151-156)
  • parse_id (146-148)
  • parse_slice (182-193)
rs/libmoq/src/error.rs (1)
  • code (101-124)
rs/libmoq/src/origin.rs (3)
  • publish (84-93)
  • consume (79-82)
  • announced (35-50)
rs/libmoq/src/consume.rs (1)
  • catalog (46-63)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Check
🔇 Additional comments (2)
rs/libmoq/src/api.rs (2)

138-149: Variable naming may be confusing in publish/consume setup.

Lines 139-142 create a publish variable by calling .consume() on an OriginProducer, which is semantically confusing. The variable name publish suggests production, but the method call suggests consumption.

Verify that this logic is correct and consider renaming for clarity, or add a comment explaining why calling consume() on the producer is the intended pattern for the publish path.


265-267: Race condition from double lock acquisition.

The code acquires State::lock() at line 265 and again at line 267. The first lock is released before the second is acquired, allowing another thread to modify state between operations. This could invalidate the broadcast reference or cause inconsistent state.

This issue was previously flagged but remains unresolved.

🔎 Apply this fix to hold the lock for both operations:
 pub unsafe extern "C" fn moq_origin_consume(origin: u32, path: *const c_char, path_len: usize) -> i32 {
 	ffi::enter(move || {
 		let origin = ffi::parse_id(origin)?;
 		let path = unsafe { ffi::parse_str(path, path_len)? };
 
-		let mut state = State::lock();
-		let broadcast = state.origin.consume(origin, path)?;
-		Ok(state.consume.start(broadcast.into()))
+		let mut state = State::lock();
+		let broadcast = state.origin.consume(origin, path)?;
+		Ok(state.consume.start(broadcast.into()))
 	})
 }

Likely an incorrect or invalid review comment.

@kixelated kixelated merged commit 73e9986 into main Dec 18, 2025
1 check passed
@kixelated kixelated deleted the libmoq-consume branch December 18, 2025 05:27
@moq-bot moq-bot bot mentioned this pull request Dec 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants