-
Notifications
You must be signed in to change notification settings - Fork 136
Support WebSocket as a fallback #541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Claude finished @kixelated's task —— View job Code Review Complete ✅I've thoroughly reviewed this PR and found one security issue that should be addressed: 🔒 Security Issue: Unsafe Memory OperationsIn async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Self::Error> {
let dst = buf.chunk_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) }; // ⚠️ UNSAFE
let size = match self.read(dst).await? {
Some(size) => size,
None => return Ok(None),
};
unsafe { buf.advance_mut(size) }; // ⚠️ UNSAFE
Ok(Some(size))
}Problem: The unsafe pointer cast Recommendation: Use safe alternatives like ✅ Positive FindingsThe rest of the code looks solid:
Overall AssessmentThis is a well-architected refactoring that successfully abstracts the transport layer. The unsafe memory operations should be addressed before merging to maintain memory safety guarantees. |
|
Caution Review failedThe pull request is closed. WalkthroughReplaces direct Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant WebRelay as moq-relay (Axum)
participant Auth as Auth.verify(path, token)
participant Cluster as Cluster
participant Bridge as WS↔WebTransport Bridge
participant MQ as moq_lite
Client->>WebRelay: Upgrade /ws/{path}?jwt=...
WebRelay->>Auth: verify(path, token)
Auth-->>WebRelay: AuthToken | AuthError(401)
alt verified
WebRelay->>Cluster: publisher(token) / subscriber(token)
Cluster-->>WebRelay: OriginProducer / OriginConsumer
WebRelay->>Bridge: upgrade WS → WebTransport session
Bridge->>MQ: Session::accept(subscribe, publish)
MQ-->>Bridge: session running
Client-->>WebRelay: WS frames ↔ WebTransport frames
else failed
WebRelay-->>Client: 401 Unauthorized
end
sequenceDiagram
autonumber
actor App
participant Transport as web_transport_quinn | polyfill
participant Session as moq::Session<S>
participant Stream as moq::Stream<S>
App->>Transport: obtain session S (quinn or polyfill)
App->>Session: connect(session=S, publish?, subscribe?)
Session->>Transport: open_bi / accept_bi
Transport-->>Stream: send/recv streams
Stream-->>Session: Writer<S::Send>, Reader<S::Recv>
Session-->>App: Session<S>
note right of Session: Transport errors map to Error::Transport\nclose/finish → S.close(...)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 💡 Knowledge Base configuration:
You can enable these sources in your CodeRabbit configuration. ⛔ Files ignored due to path filters (2)
📒 Files selected for processing (24)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (21)
rs/moq-native/src/lib.rs (1)
11-11: Public re-export rename is fine; consider a temporary back-compat alias.The
web_transport_quinnalias is clear and consistent with the transport refactor. To ease downstream migration, consider also re-exporting the same module under the old name as a deprecated alias for one release.Apply this diff to add a transitional alias:
pub use moq_lite; pub use moq_lite::transport::quinn as web_transport_quinn; +// Deprecated: temporary back-compat alias. Prefer `web_transport_quinn`. +#[deprecated(note = "use `web_transport_quinn` instead")] +pub use moq_lite::transport::quinn as web_transport;rs/moq-native/src/server.rs (3)
107-111: Advertised ALPNs include moq_lite but accept path rejects it; align the two.You advertise both
web_transport_quinn::ALPNandmoq_lite::ALPN, but the accept path below rejects non-WebTransport sessions. Either handlemoq_lite::ALPNor stop advertising it for now to avoid confusing clients.Apply this diff if you want to stop advertising moq_lite temporarily:
tls.alpn_protocols = vec![ web_transport_quinn::ALPN.as_bytes().to_vec(), - moq_lite::ALPN.as_bytes().to_vec(), ];
189-196: Accept path only handles WebTransport; consider clearer failure or adding support later.Given the TODO, rejecting non-WebTransport is intentional. Consider logging at warn-level with ALPN to aid debugging, or reintroduce raw QUIC support when ready.
170-176: Avoid unwrap on downcast of handshake data.If a different crypto backend is used,
downcast::<quinn::crypto::rustls::HandshakeData>()could fail. Prefer returning a typed error instead ofunwrap().For example (outside the changed lines):
let handshake = conn .handshake_data() .await? .downcast::<quinn::crypto::rustls::HandshakeData>() .ok_or_else(|| anyhow::anyhow!("unexpected handshake backend"))?;rs/hang-cli/src/server.rs (1)
7-7: Prefer using moq_native’s re-export to avoid cross-crate type identity pitfalls.Using
moq_native::web_transport_quinnensures theRequesttype matches whatmoq_native::Server::accept()yields, even if cargo resolves different feature sets formoq_lite.Apply this diff:
-use moq_lite::transport::quinn; +use moq_native::web_transport_quinn as quinn;rs/moq/Cargo.toml (1)
32-32: Adopted workspace web-transport-quinn dep — consider feature-gating for future transports.If you plan to support multiple backends, making web-transport-quinn optional behind a feature (e.g., "transport-quinn") will decouple the generic transport layer from a specific impl and simplify future alternatives. This can be done by marking the dep optional and adding a default feature that includes it.
rs/moq/src/lib.rs (1)
24-25: New public transport module is a clean surface; consider a temporary compatibility re-export.If you want to ease migration for downstreams still using the old
web_transportpath, add a transitional re-export:You can add (outside the changed lines) near your public exports:
// Transitional alias for downstreams previously using `moq_lite::web_transport::*` pub use transport::quinn as web_transport;rs/moq-relay/src/connection.rs (1)
3-4: Alias name nit: prefer a shorter, unambiguous alias to avoid confusion with the crate name.
web_transport_quinnas an alias is a bit verbose and easy to conflate with the crate name. A shorter alias improves readability without changing behavior.Apply this diff:
- use moq_lite::transport::quinn as web_transport_quinn; - use web_transport_quinn::http; + use moq_lite::transport::quinn as wt; + use wt::http; @@ - pub request: web_transport_quinn::Request, + pub request: wt::Request,Also applies to: 8-8
rs/moq/src/transport/mod.rs (1)
1-6: Good module split and re-exports; consider documenting the alias and hiding impl details from docs.
- Add a brief doc comment clarifying that
quinnis the WebTransport-over-QUIC adapter.- Optionally hide
implsfrom docs to keep the public surface focused:#[doc(hidden)] mod impls;Example (outside the selected lines):
/// Transport abstractions and adapters. /// /// - `quinn`: WebTransport-over-QUIC adapter re-exported from the `web_transport_quinn` crate. #[doc(hidden)] mod impls;rs/moq/src/session/reader.rs (1)
20-28: Consider implementing the commented-outacceptmethod.The commented-out code suggests an unfinished implementation. If this functionality is needed, consider completing it or removing the dead code.
Would you like help implementing the
acceptmethod or should this dead code be removed?rs/moq/src/session/writer.rs (2)
17-29: Consider implementing or removing the commented-outopenmethod.Similar to Reader, there's commented-out code that should either be implemented or removed.
Would you like help implementing the
openmethod or should this dead code be removed?
54-57: Consider handling the error code from closed().The TODO comment indicates uncertainty about returning error codes. The
closed()method onSendStreamreturnsResult<Option<u8>, Self::Error>where the u8 represents an error code. Consider propagating this information.pub async fn close(&mut self) -> Result<(), Error> { self.stream.finish().map_err(|e| Error::Transport(Box::new(e)))?; - self.stream.closed().await.map_err(|e| Error::Transport(Box::new(e)))?; // TODO Return any error code? + // Wait for peer acknowledgment and check for error codes + if let Some(error_code) = self.stream.closed().await.map_err(|e| Error::Transport(Box::new(e)))? { + // The peer closed with an error code - you might want to handle this + tracing::debug!("Stream closed by peer with error code: {}", error_code); + } Ok(()) }rs/moq/src/transport/impls.rs (4)
38-39: Silent error suppression in set_priority.The
ok()call silently discards any errors fromset_priority. Consider logging errors or propagating them if the trait allows.fn set_priority(&mut self, order: i32) { - Self::set_priority(self, order).ok(); + if let Err(e) = Self::set_priority(self, order) { + tracing::debug!("Failed to set stream priority: {:?}", e); + } }
42-44: Silent error suppression in reset.Similar to set_priority, errors are silently discarded.
fn reset(&mut self, code: u32) { - Self::reset(self, code).ok(); + if let Err(e) = Self::reset(self, code) { + tracing::debug!("Failed to reset stream with code {}: {:?}", code, e); + } }
72-74: Silent error suppression in stop.Errors from stop are silently discarded.
fn stop(&mut self, code: u32) { - Self::stop(self, code).ok(); + if let Err(e) = Self::stop(self, code) { + tracing::debug!("Failed to stop stream with code {}: {:?}", code, e); + } }
76-88: Unsafe code requires careful review.The unsafe pointer manipulation appears correct - it's creating a mutable slice from
UninitSlice, reading into it, and then advancing the buffer. However, this pattern could benefit from additional safety documentation.async fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Result<Option<usize>, Self::Error> { let dst = buf.chunk_mut(); + // SAFETY: We're converting UninitSlice to a regular slice for the read operation. + // The bytes::BufMut contract ensures the memory is valid and properly aligned. + // We only advance the buffer by the actual number of bytes read. let dst = unsafe { &mut *(dst as *mut _ as *mut [u8]) }; let size = match self.read(dst).await? { Some(size) => size, None => return Ok(None), }; + // SAFETY: We're advancing by exactly the number of bytes that were written + // by the read operation above, which is within the buffer's capacity. unsafe { buf.advance_mut(size) }; Ok(Some(size)) }rs/moq/src/session/mod.rs (1)
24-38: Minor: avoid shadowingsession.Inside
new,connect, andaccept,sessionis re-bound multiple times. Consider renaming the constructedSession(e.g.,let this = Self { ... }) to avoid shadowing the transport handle. This slightly improves readability.- let this = Self { - transport: session.clone(), - }; + let this = Self { + transport: session.clone(), + }; ... - let session = Self::new(session, stream, publish.into(), subscribe.into()).await?; - Ok(session) + let sess = Self::new(session, stream, publish.into(), subscribe.into()).await?; + Ok(sess)rs/moq/src/transport/generic.rs (3)
65-76: Same+ Syncissue on send-stream futures and buffer bounds.
- It’s uncommon and unnecessary to require the returned futures from
write,write_buf, andclosedto beSync.- Requiring
B: Buf + Send + Syncforwrite_bufis also stricter than needed and may exclude useful types.Relax these constraints to improve compatibility.
- fn write(&mut self, buf: &[u8]) -> impl Future<Output = Result<usize, Self::Error>> + Send + Sync; + fn write(&mut self, buf: &[u8]) -> impl Future<Output = Result<usize, Self::Error>> + Send; - fn write_buf<B: Buf + Send + Sync>( + fn write_buf<B: Buf + Send>( &mut self, buf: &mut B, - ) -> impl Future<Output = Result<usize, Self::Error>> + Send + Sync; + ) -> impl Future<Output = Result<usize, Self::Error>> + Send; - fn closed(&mut self) -> impl Future<Output = Result<Option<u8>, Self::Error>> + Send + Sync; + fn closed(&mut self) -> impl Future<Output = Result<Option<u8>, Self::Error>> + Send;
91-94: Also relax+ Syncon RecvStream futures.For the same reasons as above, drop
+ Synconread_chunk.- fn read_chunk(&mut self, max_size: usize) - -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send + Sync; + fn read_chunk(&mut self, max_size: usize) + -> impl Future<Output = Result<Option<Bytes>, Self::Error>> + Send;
5-8: Consider exposing an error code accessor.
ErrorCodecurrently has a TODO forcode(). Since the rest of the stack works with numeric error codes (e.g.,to_code()onError), consider adding afn code(&self) -> u32to standardize transport error mapping.rs/moq/src/session/publisher.rs (1)
247-273: Serving at most two groups with priority looks good; minor nit on logging.The two-group window and cancellation by dropping the future is a reasonable interim strategy. Minor nit: the “aborting group” log references
%old_sequenceafter it’s taken; consider logging the savedold_sequencevalue beforetake().- if let Some(old_sequence) = old_sequence.take() { - tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, old = %old_sequence, %latest, "aborting group"); + if let Some(os) = old_sequence.take() { + tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, old = %os, %latest, "aborting group"); old_group.take(); // Drop the future to cancel it. }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
rs/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (20)
rs/Cargo.toml(1 hunks)rs/hang-cli/src/server.rs(2 hunks)rs/moq-native/Cargo.toml(0 hunks)rs/moq-native/src/client.rs(3 hunks)rs/moq-native/src/lib.rs(1 hunks)rs/moq-native/src/server.rs(3 hunks)rs/moq-relay/Cargo.toml(0 hunks)rs/moq-relay/src/connection.rs(1 hunks)rs/moq/Cargo.toml(1 hunks)rs/moq/src/error.rs(3 hunks)rs/moq/src/lib.rs(1 hunks)rs/moq/src/session/mod.rs(7 hunks)rs/moq/src/session/publisher.rs(12 hunks)rs/moq/src/session/reader.rs(3 hunks)rs/moq/src/session/stream.rs(2 hunks)rs/moq/src/session/subscriber.rs(12 hunks)rs/moq/src/session/writer.rs(2 hunks)rs/moq/src/transport/generic.rs(1 hunks)rs/moq/src/transport/impls.rs(1 hunks)rs/moq/src/transport/mod.rs(1 hunks)
💤 Files with no reviewable changes (2)
- rs/moq-native/Cargo.toml
- rs/moq-relay/Cargo.toml
🧰 Additional context used
📓 Path-based instructions (4)
rs/moq-relay/**
📄 CodeRabbit Inference Engine (CLAUDE.md)
The CDN/relay (rs/moq-relay) must not include application logic, media codecs, or track details
Files:
rs/moq-relay/src/connection.rs
rs/**/src/**/*.rs
📄 CodeRabbit Inference Engine (CLAUDE.md)
Rust tests should be integrated within source files (inline unit tests)
Files:
rs/moq-relay/src/connection.rsrs/moq-native/src/client.rsrs/moq-native/src/server.rsrs/moq/src/lib.rsrs/moq/src/transport/impls.rsrs/moq-native/src/lib.rsrs/hang-cli/src/server.rsrs/moq/src/transport/mod.rsrs/moq/src/session/stream.rsrs/moq/src/error.rsrs/moq/src/transport/generic.rsrs/moq/src/session/reader.rsrs/moq/src/session/subscriber.rsrs/moq/src/session/writer.rsrs/moq/src/session/mod.rsrs/moq/src/session/publisher.rs
rs/**
📄 CodeRabbit Inference Engine (CLAUDE.md)
Place Rust crates under the rs/ directory
Files:
rs/moq-relay/src/connection.rsrs/moq/Cargo.tomlrs/moq-native/src/client.rsrs/moq-native/src/server.rsrs/Cargo.tomlrs/moq/src/lib.rsrs/moq/src/transport/impls.rsrs/moq-native/src/lib.rsrs/hang-cli/src/server.rsrs/moq/src/transport/mod.rsrs/moq/src/session/stream.rsrs/moq/src/error.rsrs/moq/src/transport/generic.rsrs/moq/src/session/reader.rsrs/moq/src/session/subscriber.rsrs/moq/src/session/writer.rsrs/moq/src/session/mod.rsrs/moq/src/session/publisher.rs
rs/Cargo.toml
📄 CodeRabbit Inference Engine (CLAUDE.md)
Maintain the Rust workspace configuration in rs/Cargo.toml
Files:
rs/Cargo.toml
🧠 Learnings (1)
📚 Learning: 2025-08-13T21:30:34.980Z
Learnt from: CR
PR: kixelated/moq#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-13T21:30:34.980Z
Learning: Applies to rs/Cargo.toml : Maintain the Rust workspace configuration in rs/Cargo.toml
Applied to files:
rs/moq/Cargo.toml
🧬 Code Graph Analysis (9)
rs/moq-native/src/client.rs (2)
rs/moq/src/session/mod.rs (1)
connect(75-84)rs/hang-cli/src/client.rs (1)
connect(23-53)
rs/moq/src/transport/impls.rs (4)
rs/moq/src/transport/generic.rs (15)
accept_uni(20-20)accept_bi(25-27)open_bi(30-30)open_uni(33-33)close(36-36)closed(39-39)closed(75-75)set_priority(57-57)reset(60-60)finish(63-63)write(66-66)write_buf(69-72)stop(83-83)read_buf(86-89)read_chunk(92-93)rs/moq/src/session/mod.rs (2)
close(150-152)closed(155-157)rs/moq/src/session/writer.rs (3)
close(54-58)closed(64-67)write(45-51)rs/moq/src/session/reader.rs (1)
closed(80-93)
rs/moq/src/session/stream.rs (1)
rs/moq/src/session/mod.rs (1)
accept(101-112)
rs/moq/src/transport/generic.rs (5)
rs/moq/src/transport/impls.rs (15)
accept_uni(10-12)accept_bi(14-16)open_bi(18-20)open_uni(22-24)close(26-28)closed(30-32)closed(60-66)set_priority(38-40)reset(42-44)finish(46-48)write(50-52)write_buf(54-58)stop(72-74)read_buf(76-88)read_chunk(90-94)rs/moq-native/src/server.rs (1)
close(204-206)rs/moq/src/session/mod.rs (2)
close(150-152)closed(155-157)rs/moq/src/session/writer.rs (3)
close(54-58)closed(64-67)write(45-51)rs/moq/src/session/reader.rs (1)
closed(80-93)
rs/moq/src/session/reader.rs (3)
rs/moq/src/session/writer.rs (2)
new(10-15)closed(64-67)rs/moq/src/transport/generic.rs (2)
closed(39-39)closed(75-75)rs/moq/src/transport/impls.rs (2)
closed(30-32)closed(60-66)
rs/moq/src/session/subscriber.rs (4)
rs/moq/src/session/mod.rs (2)
new(25-72)stream(145-145)rs/moq/src/session/reader.rs (1)
new(13-18)rs/moq/src/session/publisher.rs (3)
new(18-22)stream(50-50)stream(138-138)rs/moq/src/session/stream.rs (1)
open(10-18)
rs/moq/src/session/writer.rs (2)
rs/moq/src/transport/generic.rs (2)
write(66-66)close(36-36)rs/moq/src/transport/impls.rs (2)
write(50-52)close(26-28)
rs/moq/src/session/mod.rs (5)
rs/moq/src/session/publisher.rs (3)
new(18-22)stream(50-50)stream(138-138)rs/moq/src/session/subscriber.rs (3)
new(27-35)stream(101-101)stream(267-267)rs/moq/src/session/stream.rs (2)
open(10-18)accept(20-27)rs/moq/src/transport/generic.rs (3)
close(36-36)closed(39-39)closed(75-75)rs/moq/src/transport/impls.rs (3)
close(26-28)closed(30-32)closed(60-66)
rs/moq/src/session/publisher.rs (3)
rs/moq/src/session/mod.rs (3)
new(25-72)stream(145-145)accept(101-112)rs/moq/src/session/reader.rs (1)
new(13-18)rs/moq/src/session/stream.rs (1)
accept(20-27)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check
🔇 Additional comments (43)
rs/moq-native/src/client.rs (2)
9-9: Import path change LGTM.Switching to the crate-local re-export (
super::web_transport_quinn) aligns with the new transport layering and keeps the dependency surface tight.
172-176: Bug: mixing URL scheme with ALPN constant; scheme match should use "moql".
moq_lite::ALPNis the ALPN identifier, not the URL scheme. Matchingurl.scheme()against it will fail when the scheme is "moql". Use the literal"moql"like you do above when selecting ALPN.Apply this diff:
let alpn = match url.scheme() { "https" => web_transport_quinn::ALPN, - "moql" => moq_lite::ALPN, + "moql" => moq_lite::ALPN, _ => anyhow::bail!("url scheme must be 'http', 'https', or 'moql'"), };(Note: No functional change here; just showing intended consistency. See the next hunk for the real bug fix.)
Likely an incorrect or invalid review comment.
rs/moq-native/src/server.rs (1)
17-17: Import path change LGTM.Using the crate-local
web_transport_quinnre-export keeps the server implementation aligned with the new transport surface.rs/hang-cli/src/server.rs (1)
66-72: Signature change LGTM.Updating
run_sessionto accept aquinn::Requestaligns with the transport refactor and the new accept path in moq-native’s server.rs/moq/src/error.rs (1)
85-105: Remove the manual Clone impl; derive Clone will do the right thing with Arc.Once
Transportholds anArc<dyn Error + Send + Sync>, the enum can deriveCloneand you can delete this manual implementation that drops error context.Apply this diff:
-impl Clone for Error { - fn clone(&self) -> Self { - match self { - Error::Transport(_) => Error::Transport(Box::new(std::io::Error::other("Transport error (cloned)"))), - Error::Decode(e) => Error::Decode(e.clone()), - Error::Version(v1, v2) => Error::Version(v1.clone(), v2.clone()), - Error::RequiredExtension(e) => Error::RequiredExtension(*e), - Error::UnexpectedStream(s) => Error::UnexpectedStream(*s), - Error::BoundsExceeded(b) => Error::BoundsExceeded(*b), - Error::Duplicate => Error::Duplicate, - Error::Cancel => Error::Cancel, - Error::Timeout => Error::Timeout, - Error::Old => Error::Old, - Error::App(code) => Error::App(*code), - Error::NotFound => Error::NotFound, - Error::WrongSize => Error::WrongSize, - Error::ProtocolViolation => Error::ProtocolViolation, - Error::Unauthorized => Error::Unauthorized, - } - } -} +// Manual Clone no longer needed; derived via `#[derive(Clone)]` above.Likely an incorrect or invalid review comment.
rs/Cargo.toml (1)
37-37: ✅ No stale references to the old crate remain
- No
web-transportdependencies in any Cargo.toml- No
web_transport::imports or path usages in Rust code- All code now imports and uses
web_transport_quinn::as expectedrs/moq/src/session/stream.rs (4)
1-2: LGTM! Clean import refactoring for transport abstraction.The imports are properly updated to use the new transport abstraction instead of the concrete web-transport dependency.
4-7: Well-designed generic abstraction over transport sessions.The struct is properly parameterized with the transport trait bound, and the associated types correctly leverage the session's stream types.
9-18: Consistent error handling with proper trait abstraction.The
openmethod correctly uses the generic session reference and maps transport errors appropriately.
20-27: Clean implementation of the accept pattern.The method properly handles bidirectional stream acceptance with consistent error mapping.
rs/moq/src/session/reader.rs (5)
5-10: Clean generic abstraction for Reader.The struct is properly parameterized over
transport::RecvStream, making it transport-agnostic.
12-18: LGTM! Constructor properly updated for generic type.The constructor correctly accepts the generic stream type.
40-49: Consistent error handling in decode method.The error mapping to
Error::Transportis properly implemented for the read_buf operation.
73-77: LGTM! Proper error mapping for read operations.The
read_chunkcall correctly maps transport errors.
80-93: closed() correctly handles stream closure as per RecvStream::read_buf semantics
Confirmed that allread_bufimplementations returnOk(None)when the stream is closed, matching the trait documentation. Theclosed()method insession/reader.rstherefore correctly detects closure and returnsOk(()). No changes needed.rs/moq/src/session/writer.rs (4)
1-8: Clean generic abstraction for Writer.The struct is properly parameterized over
transport::SendStreamwith appropriate visibility modifiers.
9-15: LGTM! Constructor properly handles generic stream type.The implementation correctly accepts and stores the generic stream.
36-40: Consistent error handling in encode method.The write_buf operation properly maps errors to
Error::Transport.
70-75: Good defensive programming with abort-on-drop.The Drop implementation ensures streams are properly cleaned up with an error code, preventing resource leaks.
rs/moq/src/session/subscriber.rs (8)
17-24: Well-structured generic Subscriber implementation.The struct is properly parameterized with the transport session trait bound.
26-35: LGTM! Constructor properly handles generic session.The implementation correctly accepts and stores the generic session type.
45-60: Consistent error handling in run_uni method.The accept_uni operation properly maps transport errors.
62-74: Good error handling pattern with stream abort.The method properly aborts the stream on error, ensuring clean resource management.
83-83: Session reference usage is correct.The change from mutable to immutable reference for Stream::open is appropriate since the transport trait methods take immutable self references.
192-194: Consistent error categorization for transport errors.The error handling properly treats both Cancel and Transport errors as cancellation scenarios.
Also applies to: 249-251
208-208: Consistent usage of immutable session reference.The Stream::open call correctly uses an immutable reference.
230-264: Well-structured generic method signatures.The recv_group method properly accepts the generic Reader type with appropriate lifetime bounds.
rs/moq/src/transport/impls.rs (5)
1-4: LGTM! Clean imports for the bridging layer.The imports correctly bring in the necessary traits and types.
5-33: Well-implemented Session trait for web_transport_quinn.The implementation correctly delegates to the underlying methods with proper async handling.
54-58: LGTM! Proper buffer management in write_buf.The implementation correctly advances the buffer after writing.
97-99: LGTM! Marker trait implementations.The ErrorCode trait implementations enable proper error type handling in the transport abstraction.
46-48: Investigate error mapping in finish():
Mapping every underlying error toWriteError::ClosedStreamobscures the actual cause. TheWriteErrorenum includes:
Stopped(VarInt)ConnectionLost(ConnectionError)ClosedStreamZeroRttRejectedBy collapsing all errors into
ClosedStream, you lose whether the stream was stopped, the connection was lost, or 0-RTT was rejected. Please confirm whether this coarse mapping is intentional. If you’d rather preserve the original error, you could simply propagate it:fn finish(&mut self) -> Result<(), Self::Error> { - Self::finish(self).map_err(|_| web_transport_quinn::WriteError::ClosedStream) + // Propagate the actual WriteError variant rather than overwriting it + Self::finish(self) }Or, if only certain cases should map to
ClosedStream, match on the error explicitly:Self::finish(self).map_err(|err| match err { web_transport_quinn::WriteError::ClosedStream => err, other => other, // or map specific variants as needed })Let me know if the intent is to collapse all errors, or if you’d prefer to preserve or selectively map them.
rs/moq/src/session/mod.rs (6)
20-22: Nice transport-agnostic Session struct.The generic parameter over
transport::Sessionand thetransportfield rename make the abstraction clear and decouple the module nicely from any concrete transport.
42-63: Spawn orchestration looks solid; close semantics are consistent.The
tokio::select!across session/publisher/subscriber and the close paths (0 = normal, 1 = transport terminated, error code otherwise) are clean and keep the transport invariant centralized. Good use ofweb_async::spawnfor cross-target compatibility.
74-84: Connect path correctly uses the generic Streamhandshake.Opening the control stream via
Stream::open(&session, ControlType::Session)and then delegating tonew(...)aligns with the transport abstraction.
101-113: Server accept path is clean and generic.The
acceptAPI and the handshake flow viaStream::accept(&session)andaccept_setup(...)look correct and transport-agnostic.
149-152: Close consumes self (good), but ensure reason lifetime is not retained.
err.to_string().as_bytes()passes a temporary slice. This is fine as long as implementations don't storereasonbeyond the call. Based on transport::impls calling through to the underlying transport synchronously, this should be safe.Would you like me to add a brief doc comment on
transport::Session::closeclarifying thatreasonis borrowed only for the duration of the call and must not be retained?
154-157:closedAPI shape is pragmatic.Returning
Error(instead ofResult) fromclosedmirrors the underlying transport’s “await the terminal error” semantics and keeps callers simple.rs/moq/src/session/publisher.rs (5)
12-22: Publisher generic over transport is clean.The
Publisher<S: transport::Session>refactor and constructor align well with the new transport traits. Defaulting the origin to a closed dummy is a nice ergonomic touch.
31-46: Control stream dispatch is straightforward and robust.Processing each control stream in arrival order to avoid cloning the origin is a pragmatic tradeoff, with clear logging on errors.
137-170: Double-check lifetimes: spawned task borrows data derived from the same stream.
subscribeis created viastream.reader.decode::<message::Subscribe>().await?and is then borrowed (&subscribe) inside the spawned task which also takes ownership ofstream. IfSubscribe<'_>holds references intostream’s internal buffer, this is likely safe (both are moved into the task), but it’s subtle.Please confirm that:
message::Subscribe<'_>only borrows fromstreamand won’t outlive it in the task, andstream.reader.closed()won’t re-use or invalidate the backing buffer in a way that would invalidatesubscribewhilerun_subscribeis reading it.If there’s any risk, we can snapshot the needed fields from
subscribeinto owned values before spawning.
276-321: Data stream send path is solid and transport-agnostic.Opening a uni stream, setting priority, writing
DataType::Group+ header then streaming frames/chunks with cancellation viaclosed()is clean and matches the transport traits well.
329-342: Priority calculation: behavior documented and tested; good to keep an eye on overflow edge.The 24-bit inversion scheme is well-documented with a targeted unit test. Given the known overflow after ~2^24 groups, adding a brief comment about the wrap behavior (already present) and perhaps a future TODO to clamp/round-robin same-priority tracks is enough for now.
| let session = match url.scheme() { | ||
| "https" => web_transport::quinn::Session::connect(connection, url).await?, | ||
| moq_lite::ALPN => web_transport::quinn::Session::raw(connection, url), | ||
| "https" => web_transport_quinn::Session::connect(connection, url).await?, | ||
| moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url), | ||
| _ => unreachable!(), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix scheme match arm: use "moql" literal instead of the ALPN constant.
Matching the URL scheme against moq_lite::ALPN is incorrect and will never match if ALPN != "moql". This breaks raw-QUIC connections.
Apply this diff:
let session = match url.scheme() {
"https" => web_transport_quinn::Session::connect(connection, url).await?,
- moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url),
+ "moql" => web_transport_quinn::Session::raw(connection, url),
_ => unreachable!(),
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let session = match url.scheme() { | |
| "https" => web_transport::quinn::Session::connect(connection, url).await?, | |
| moq_lite::ALPN => web_transport::quinn::Session::raw(connection, url), | |
| "https" => web_transport_quinn::Session::connect(connection, url).await?, | |
| moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url), | |
| _ => unreachable!(), | |
| }; | |
| let session = match url.scheme() { | |
| "https" => web_transport_quinn::Session::connect(connection, url).await?, | |
| "moql" => web_transport_quinn::Session::raw(connection, url), | |
| _ => unreachable!(), | |
| }; |
🤖 Prompt for AI Agents
In rs/moq-native/src/client.rs around lines 191 to 195, the URL scheme match
uses moq_lite::ALPN which won't match the literal "moql" scheme; change the
match arm to test for the string literal "moql" instead of moq_lite::ALPN so
raw-QUIC connections are selected correctly (i.e., replace the moq_lite::ALPN
arm with "moql" => web_transport_quinn::Session::raw(connection, url), keeping
other arms unchanged).
| #[derive(thiserror::Error, Debug)] | ||
| pub enum Error { | ||
| #[error("webtransport error: {0}")] | ||
| WebTransport(#[from] web_transport::Error), | ||
| #[error("transport error: {0}")] | ||
| Transport(Box<dyn std::error::Error + Send + Sync>), | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Don’t lose inner transport error details when cloning; use Arc instead of Box.
The current manual Clone drops the underlying error by replacing it with a generic “Transport error (cloned)”. This harms debuggability. Store the transport error in an Arc<dyn Error + Send + Sync> so Clone is lossless and you can derive Clone for the enum.
Apply this diff:
-#[derive(thiserror::Error, Debug)]
+#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
- #[error("transport error: {0}")]
- Transport(Box<dyn std::error::Error + Send + Sync>),
+ #[error("transport error: {0}")]
+ Transport(Arc<dyn std::error::Error + Send + Sync>),Add the import (outside the changed lines):
use std::sync::Arc;🤖 Prompt for AI Agents
In rs/moq/src/error.rs around lines 4 to 8, the Transport variant currently
stores Box<dyn std::error::Error + Send + Sync> and a manual Clone that replaces
the inner error with a generic message; change the variant to store Arc<dyn
std::error::Error + Send + Sync> (add use std::sync::Arc at top), update the
enum derives to include Clone so cloning is lossless, and remove the manual
Clone implementation so the derived Clone clones the Arc-wrapped error instead.
rs/moq/src/error.rs
Outdated
| impl From<web_transport_quinn::SessionError> for Error { | ||
| fn from(err: web_transport_quinn::SessionError) -> Self { | ||
| Error::Transport(Box::new(err)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update conversion to store errors in Arc.
With the Transport variant using Arc, keep conversions consistent.
Apply this diff:
impl From<web_transport_quinn::SessionError> for Error {
fn from(err: web_transport_quinn::SessionError) -> Self {
- Error::Transport(Box::new(err))
+ Error::Transport(Arc::new(err))
}
}Optional: If you want broader ergonomics, consider a blanket conversion (outside the changed lines):
impl<E> From<E> for Error
where
E: std::error::Error + Send + Sync + 'static,
{
fn from(err: E) -> Self {
Error::Transport(Arc::new(err))
}
}This coexists with the specific From impls (Decode, BoundsExceeded, etc.) and reduces the need for transport-specific mappings.
🤖 Prompt for AI Agents
In rs/moq/src/error.rs around lines 107 to 111, the
From<web_transport_quinn::SessionError> impl constructs Error::Transport with
Box::new(err) but the Transport variant now holds an Arc; change the conversion
to Error::Transport(Arc::new(err)) and ensure std::sync::Arc is imported at top
of the file (or fully qualify Arc::new). Optionally, after this change you can
add the suggested blanket impl<E> From<E> for Error where E: std::error::Error +
Send + Sync + 'static to reduce repetition, but the immediate fix is replacing
Box::new with Arc::new and importing Arc.
rs/moq/src/transport/generic.rs
Outdated
| /// Accept an incoming unidirectional stream | ||
| fn accept_uni(&self) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + Send + Sync; | ||
|
|
||
| /// Accept an incoming bidirectional stream | ||
| /// | ||
| /// Returning `None` implies the connection is closing or closed. | ||
| fn accept_bi( | ||
| &self, | ||
| ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send + Sync; | ||
|
|
||
| /// Poll the connection to create a new bidirectional stream. | ||
| fn open_bi(&self) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send + Sync; | ||
|
|
||
| /// Poll the connection to create a new unidirectional stream. | ||
| fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + Send + Sync; | ||
|
|
||
| /// Close the connection immediately | ||
| fn close(&self, code: u32, reason: &[u8]); | ||
|
|
||
| /// Check if the connection is closed, returning the error if it is. | ||
| fn closed(&self) -> impl Future<Output = Self::Error> + Send + Sync; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Overly strict + Sync on returned futures likely breaks implementations.
Most async fn futures are not Sync even when they are Send. Requiring + Sync on every returned future (e.g., accept_uni, accept_bi, open_bi, open_uni, closed) is unnecessary and will constrain or break implementations.
Recommend dropping + Sync on returned futures across the Session trait.
- fn accept_uni(&self) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + Send + Sync;
+ fn accept_uni(&self) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + Send;
- fn accept_bi(
+ fn accept_bi(
&self,
- ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send + Sync;
+ ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send;
- fn open_bi(&self) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send + Sync;
+ fn open_bi(&self) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send;
- fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + Send + Sync;
+ fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + Send;
- fn closed(&self) -> impl Future<Output = Self::Error> + Send + Sync;
+ fn closed(&self) -> impl Future<Output = Self::Error> + Send;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Accept an incoming unidirectional stream | |
| fn accept_uni(&self) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + Send + Sync; | |
| /// Accept an incoming bidirectional stream | |
| /// | |
| /// Returning `None` implies the connection is closing or closed. | |
| fn accept_bi( | |
| &self, | |
| ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send + Sync; | |
| /// Poll the connection to create a new bidirectional stream. | |
| fn open_bi(&self) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send + Sync; | |
| /// Poll the connection to create a new unidirectional stream. | |
| fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + Send + Sync; | |
| /// Close the connection immediately | |
| fn close(&self, code: u32, reason: &[u8]); | |
| /// Check if the connection is closed, returning the error if it is. | |
| fn closed(&self) -> impl Future<Output = Self::Error> + Send + Sync; | |
| /// Accept an incoming unidirectional stream | |
| fn accept_uni(&self) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + Send; | |
| /// Accept an incoming bidirectional stream | |
| /// | |
| /// Returning `None` implies the connection is closing or closed. | |
| fn accept_bi( | |
| &self, | |
| ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send; | |
| /// Poll the connection to create a new bidirectional stream. | |
| fn open_bi(&self) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + Send; | |
| /// Poll the connection to create a new unidirectional stream. | |
| fn open_uni(&self) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + Send; | |
| /// Close the connection immediately | |
| fn close(&self, code: u32, reason: &[u8]); | |
| /// Check if the connection is closed, returning the error if it is. | |
| fn closed(&self) -> impl Future<Output = Self::Error> + Send; |
🤖 Prompt for AI Agents
In rs/moq/src/transport/generic.rs around lines 19 to 40, the trait methods
accept_uni, accept_bi, open_bi, open_uni, and closed currently require returned
futures to be `+ Send + Sync`; drop the `+ Sync` bound from the return types so
the signature uses `impl Future<Output = ...> + Send` (or just `impl
Future<Output = ...>` if Send is not required) for each method; update all
method signatures in this range accordingly and run cargo check to ensure no
remaining `Sync` constraints from callers or implementations.
rs/moq/src/transport/generic.rs
Outdated
| fn read_buf<B: BufMut + Send + Sync>( | ||
| &mut self, | ||
| buf: &mut B, | ||
| ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send + Sync; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile blocker: BufMut + Sync prevents using BytesMut and mismatches impls.
- The trait currently requires
B: BufMut + Send + Sync.bytes::BytesMut(commonly used for read buffers, e.g., insession::reader) is notSync, which makesRecvStream::read_bufunusable at call sites. - The concrete impl in
transport/impls.rsusesasync fn read_buf<B: BufMut>(...), which does not restateSend + Sync; that signature won’t match if the trait insists onSync.
Drop the Sync bound here. It’s unnecessary: the buffer is exclusively borrowed via &mut B and not shared across threads.
Apply this minimal fix:
- fn read_buf<B: BufMut + Send + Sync>(
+ fn read_buf<B: BufMut + Send>(
&mut self,
buf: &mut B,
) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send + Sync;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn read_buf<B: BufMut + Send + Sync>( | |
| &mut self, | |
| buf: &mut B, | |
| ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send + Sync; | |
| fn read_buf<B: BufMut + Send>( | |
| &mut self, | |
| buf: &mut B, | |
| ) -> impl Future<Output = Result<Option<usize>, Self::Error>> + Send + Sync; |
🤖 Prompt for AI Agents
In rs/moq/src/transport/generic.rs around lines 86 to 90, the generic bound for
read_buf currently requires B: BufMut + Send + Sync which prevents using
bytes::BytesMut and mismatches the concrete impls; remove the Sync bound so the
signature becomes B: BufMut + Send (or just B: BufMut if Send is not required by
callers) to match the async impls in transport/impls.rs and allow BytesMut
usage, and ensure any corresponding impl signatures and trait usages are updated
to the same bounds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
rs/moq/src/session/reader.rs (1)
66-74: read(max) no longer honors max; can over-read and break framingThe new implementation ignores max for underlying reads. If the transport returns a chunk larger than remain, run_frame will compute a negative remain and return WrongSize.
Apply this diff to honor max by refilling the internal buffer and slicing:
// Returns a non-zero chunk of data, or None if the stream is closed pub async fn read(&mut self, max: usize) -> Result<Option<Bytes>, Error> { if !self.buffer.is_empty() { let size = cmp::min(max, self.buffer.len()); let data = self.buffer.split_to(size).freeze(); return Ok(Some(data)); } - self.stream.read().await.map_err(|e| Error::Transport(e.into())) + // Refill the buffer from the stream, then return up to `max` bytes. + if self + .stream + .read_buf(&mut self.buffer) + .await + .map_err(|e| Error::Transport(std::sync::Arc::new(e)))? + .is_none() + { + return Ok(None); + } + + let size = cmp::min(max, self.buffer.len()); + Ok(Some(self.buffer.split_to(size).freeze()))rs/moq/src/session/subscriber.rs (1)
230-236: Preserve transport error details on cloneConverting the
Transportvariant ofErrorto hold anArc<TransportError>and derivingCloneensures that anyerr.clone()(e.g., in abort paths) retains the original, underlying error cause.Affected locations:
- rs/moq/src/session/subscriber.rs (around line 274)
if let Err(err) = res { frame.abort(err.clone()); return Err(err); }- rs/moq/src/model/frame.rs (line 167)
return Err(err.clone());Suggested changes:
- Change the
Error::Transport(TransportError)variant toError::Transport(Arc<TransportError>)- Add
#[derive(Clone)]toError(removing any manualCloneimpl)- Update all
err.clone()calls to use the derivedArc‐based clone (no deep copy, preserves full context)rs/moq/src/session/publisher.rs (1)
10-21: Add Clone bound to S to match usage (session.clone()).Publisher methods clone the session (Lines 146 and 253), but the generic bound only requires Session. Unless web_transport_generic::Session itself is Clone, this won’t compile. Add an explicit Clone bound for clarity and to avoid accidental regressions if the trait changes.
Apply this diff to add the bound:
-pub(super) struct Publisher<S: web_transport_generic::Session> { +pub(super) struct Publisher<S: web_transport_generic::Session + Clone> { session: S, origin: OriginConsumer, } -impl<S: web_transport_generic::Session> Publisher<S> { +impl<S> Publisher<S> +where + S: web_transport_generic::Session + Clone, +{ pub fn new(session: S, origin: Option<OriginConsumer>) -> Self { // Default to a dummy origin that is immediately closed. let origin = origin.unwrap_or_else(|| Origin::produce().consumer); Self { session, origin } }rs/moq-relay/src/auth.rs (3)
92-110: verify() control-flow and errors look good; tests need to align with new messagesThe new path/token split is clear. Note that callers/tests expecting old error strings will fail (e.g., “no token provided and no public path configured” vs “a token was expected”, and “token provided, but no key configured” vs “authentication is disabled”/the suggested clearer string). Update those tests to assert on variants, not substrings.
Apply these diffs to strengthen tests:
For “no token and no public path”:
- assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("no token provided and no public path configured")); + assert!(matches!(result, Err(AuthError::ExpectedToken)));For “token provided but no key configured”:
- assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("token provided, but no key configured")); + assert!(matches!(result, Err(AuthError::UnexpectedToken)));
236-243: Update this test to assert on AuthError::ExpectedTokenThe verify() error message has changed. Use variant matching rather than brittle substring checks.
- assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("no token provided and no public path configured")); + assert!(matches!(result, Err(AuthError::ExpectedToken)));
254-260: Update this test to assert on AuthError::UnexpectedTokenMessage text changed; assert on the variant.
- assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("token provided, but no key configured")); + assert!(matches!(result, Err(AuthError::UnexpectedToken)));rs/moq-relay/src/web.rs (1)
171-201: Validate track segment isn’t empty to avoid subscribing to a blank trackA trailing slash (e.g., /fetch/foo/) yields track == "". Add a guard to return 400 in that case.
let mut path: Vec<&str> = path.split("/").collect(); - let track = path.pop().unwrap().to_string(); + let track = path.pop().unwrap().to_string(); // We need at least a broadcast and a track. if path.is_empty() { return Err(StatusCode::BAD_REQUEST.into()); } + if track.is_empty() { + return Err(StatusCode::BAD_REQUEST.into()); + }
♻️ Duplicate comments (3)
rs/moq-native/src/client.rs (1)
189-193: Fix scheme match arm: use "moql" literal instead of the ALPN constant.Matching url.scheme() against moq_lite::ALPN is incorrect and will not fire if ALPN != "moql". This breaks raw-QUIC session selection.
Apply this diff:
- let session = match url.scheme() { - "https" => web_transport_quinn::Session::connect(connection, url).await?, - moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url), - _ => unreachable!(), - }; + let session = match url.scheme() { + "https" => web_transport_quinn::Session::connect(connection, url).await?, + "moql" => web_transport_quinn::Session::raw(connection, url), + _ => unreachable!(), + };Run this quick check to ensure there are no other places comparing a URL scheme against moq_lite::ALPN:
#!/bin/bash set -euo pipefail echo "Occurrences of 'url.scheme()' matches that reference moq_lite::ALPN (should be none):" rg -n -C3 -P '(?s)match\s+url\.scheme\(\)\s*\{.*?moq_lite::ALPN' --type=rust || true echo "All matches on url.scheme() for review:" rg -n -C2 'match\s+url\.scheme\(\)' --type=rustrs/moq/src/error.rs (2)
4-7: Preserve transport error details: switch to Arc and derive CloneCloning currently drops the underlying transport error, harming debuggability. Store the error behind Arc and derive Clone to make cloning lossless.
Apply this diff:
-#[derive(thiserror::Error, Debug)] +#[derive(thiserror::Error, Debug, Clone)] pub enum Error { - #[error("transport error: {0}")] - Transport(Box<dyn std::error::Error + Send + Sync>), + #[error("transport error: {0}")] + Transport(std::sync::Arc<dyn std::error::Error + Send + Sync>),If you prefer a clean import, add this outside the shown hunk:
use std::sync::Arc;As a follow-up, update call sites that construct Error::Transport to wrap with Arc::new (see my inline suggestions in other files), or add a blanket From for Error (optional):
impl<E> From<E> for Error where E: std::error::Error + Send + Sync + 'static, { fn from(err: E) -> Self { Error::Transport(std::sync::Arc::new(err)) } }
85-105: Remove manual Clone impl that erases transport error contextThe manual Clone replaces Transport with a generic io::Error, losing root-cause details. With Transport using Arc, let the derive handle Clone.
Apply this diff:
-impl Clone for Error { - fn clone(&self) -> Self { - match self { - Error::Transport(_) => Error::Transport(Box::new(std::io::Error::other("Transport error (cloned)"))), - Error::Decode(e) => Error::Decode(e.clone()), - Error::Version(v1, v2) => Error::Version(v1.clone(), v2.clone()), - Error::RequiredExtension(e) => Error::RequiredExtension(*e), - Error::UnexpectedStream(s) => Error::UnexpectedStream(*s), - Error::BoundsExceeded(b) => Error::BoundsExceeded(*b), - Error::Duplicate => Error::Duplicate, - Error::Cancel => Error::Cancel, - Error::Timeout => Error::Timeout, - Error::Old => Error::Old, - Error::App(code) => Error::App(*code), - Error::NotFound => Error::NotFound, - Error::WrongSize => Error::WrongSize, - Error::ProtocolViolation => Error::ProtocolViolation, - Error::Unauthorized => Error::Unauthorized, - } - } -}
🧹 Nitpick comments (28)
rs/moq-native/src/server.rs (1)
186-195: Only WebTransport ALPN is accepted; consider either adding raw-QUIC handling or returning a more generic accept type.Currently, only web_transport_quinn::ALPN proceeds; all others bail. If you plan to support raw QUIC (moq_lite::ALPN), either:
- Add a branch to accept raw QUIC and return a corresponding handle, or
- Keep rejecting it and avoid advertising it (see previous comment), or
- Introduce a generic accept enum (e.g., enum Accept { WebTransport(Request), Raw(quinn::Connection) }) to model both paths explicitly.
rs/moq/src/coding/varint.rs (3)
206-215: Use bit shifts instead of pow for thresholds (micro-clarity and zero-cost)Shifts make the bounds more idiomatic and avoid calling pow at runtime. Not critical, but clearer.
Apply this diff:
- if self.0 < 2u64.pow(6) { + if self.0 < (1u64 << 6) { w.put_u8(self.0 as u8); - } else if self.0 < 2u64.pow(14) { + } else if self.0 < (1u64 << 14) { w.put_u16((0b01 << 14) | self.0 as u16); - } else if self.0 < 2u64.pow(30) { + } else if self.0 < (1u64 << 30) { w.put_u32((0b10 << 30) | self.0 as u32); } else { w.put_u64((0b11 << 62) | self.0); }
242-246: Potential usize truncation on 32-bit targets during decodeCasting u64 to usize with as can silently truncate on 32-bit. If 32-bit support matters, add a bounds check. If the project is 64-bit only, feel free to ignore.
Proposed guarded decode to avoid truncation:
- VarInt::decode(r).map(|v| v.into_inner() as usize) + let v = VarInt::decode(r)?; + let x = v.into_inner(); + if x > usize::MAX as u64 { + return Err(DecodeError::BoundsExceeded); + } + Ok(x as usize)If you prefer to keep conversions infallible only on 64-bit, consider gating the From for usize impl behind cfg(target_pointer_width = "64").
1-261: Add inline unit tests for round-trip and boundary valuesInline tests help guard the QUIC varint codec invariants (1/2/4/8 byte cases) and align with the repo’s guideline to keep Rust tests in-source.
Here’s a minimal tests module you can drop at the end of this file:
#[cfg(test)] mod tests { use super::*; use bytes::{BufMut, BytesMut}; fn roundtrip(x: u64) -> u64 { let v = VarInt::try_from(x).unwrap(); let mut buf = BytesMut::new(); v.encode(&mut buf); let mut r = &buf[..]; VarInt::decode(&mut r).unwrap().into_inner() } #[test] fn boundaries_roundtrip() { let cases: &[u64] = &[ 0, (1u64 << 6) - 1, // 1-byte max 1u64 << 6, // 2-byte min (1u64 << 14) - 1, // 2-byte max 1u64 << 14, // 4-byte min (1u64 << 30) - 1, // 4-byte max 1u64 << 30, // 8-byte min VarInt::MAX.into_inner(), // 8-byte max ]; for &x in cases { assert_eq!(roundtrip(x), x, "failed roundtrip for {}", x); } } #[test] fn encode_tags_are_correct() { // Spot-check the tag bits for each range. let mut buf = BytesMut::new(); VarInt::from_u32(0).encode(&mut buf); assert_eq!(buf.len(), 1); assert_eq!(buf[0] >> 6, 0b00); buf.clear(); VarInt::try_from((1u64 << 6)).unwrap().encode(&mut buf); assert_eq!(buf.len(), 2); assert_eq!(buf[0] >> 6, 0b01); buf.clear(); VarInt::try_from((1u64 << 14)).unwrap().encode(&mut buf); assert_eq!(buf.len(), 4); assert_eq!(buf[0] >> 6, 0b10); buf.clear(); VarInt::try_from((1u64 << 30)).unwrap().encode(&mut buf); assert_eq!(buf.len(), 8); assert_eq!(buf[0] >> 6, 0b11); } }rs/hang-cli/src/server.rs (3)
69-69: run_session is now tied to QUIC; consider a generic Request to align with the PR’s transport abstraction goalGiven the move toward a generic transport interface (and potential WebSocket compatibility), you could avoid hard-coding web_transport_quinn::Request here by depending on a generic Request type/trait if moq_native exposes one. This keeps hang-cli transport-agnostic without further edits later.
- If moq_native re-exports a transport-agnostic Request or trait (e.g., via web-transport-generic), prefer that.
- Otherwise, this is fine for a QUIC-only CLI; just confirm that this binary is intentionally QUIC-specific.
56-60: Avoid warning on normal session close; treat closure as Ok and log at inforun_session always returns Err(session.closed().await.into()), which makes the spawn wrapper log a warning on normal connection close. That’s noisy. Return Ok on normal close and keep warnings for actual failures.
Apply this diff:
@@ tokio::spawn(async move { if let Err(err) = run_session(id, session, name, consumer).await { tracing::warn!(%err, "failed to accept session"); } }); @@ - Err(session.closed().await.into()) + let reason = session.closed().await; + tracing::info!(id, ?reason, "session closed"); + Ok(())Also applies to: 86-87
108-109: Serve all fingerprints or return a clear error instead of panickingYou’re taking only the first fingerprint and using expect, which will panic if empty. Given the TODO to “serve all of them,” consider:
- Exposing /certificates.sha256 that returns all fingerprints (newline- or JSON-formatted).
- Returning 500 with a clear error if the vector is empty, instead of panicking.
I can draft a small handler to emit all fingerprints if helpful.
rs/moq/src/session/mod.rs (1)
24-28: Make Clone bound on S explicitnew() clones session multiple times; make S: Clone explicit on the impl to avoid accidental usage with non-cloneable sessions and improve error messages.
Apply this diff:
-impl<S: web_transport_generic::Session + Sync> Session<S> { +impl<S: web_transport_generic::Session + Sync + Clone> Session<S> {rs/moq/src/session/publisher.rs (5)
22-25: Consider driving origin.closed() to completion in run().There’s a TODO to block on origin.closed(). As-is, run() only drives run_bi() and can return while the origin is still producing. Consider joining on origin closure to avoid dangling tasks or premature session shutdown.
I can propose a select! wrapper if you want to fold origin.closed() into this entrypoint without changing call sites.
60-71: Collapse duplicate match arms for cancel/cancelled transport for readability.Both Error::Cancel and Error::Transport(_) log “announcing cancelled”. Combine them to reduce duplication.
Apply this diff:
- if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix).await { - match &err { - Error::Cancel => { - tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); - } - Error::Transport(_) => { - tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); - } - err => { - tracing::warn!(%err, prefix = %origin.absolute(prefix), "announcing error"); - } - } + if let Err(err) = Self::run_announce(&mut stream, &mut origin, &prefix).await { + match &err { + Error::Cancel | Error::Transport(_) => { + tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); + } + err => tracing::warn!(%err, prefix = %origin.absolute(prefix), "announcing error"), + }
148-156: Collapse duplicate match arms for cancel/cancelled transport.Same duplication here as in announce; compact it.
Apply this diff:
- if let Err(err) = Self::run_subscribe(session, &mut stream, &subscribe, broadcast).await { - match &err { - Error::Cancel => { - tracing::debug!(%id, broadcast = %absolute, %track, "subscribed cancelled") - } - // TODO better classify WebTransport errors. - Error::Transport(_) => { - tracing::debug!(%id, broadcast = %absolute, %track, "subscribed cancelled") - } - err => { - tracing::warn!(%err, %id, broadcast = %absolute, %track, "subscribed error") - } - } + if let Err(err) = Self::run_subscribe(session, &mut stream, &subscribe, broadcast).await { + match &err { + Error::Cancel | Error::Transport(_) => { + tracing::debug!(%id, broadcast = %absolute, %track, "subscribed cancelled") + } + err => tracing::warn!(%err, %id, broadcast = %absolute, %track, "subscribed error"), + }
200-273: Group service scheduling is careful and non-blocking; minor clarity nit.The two-slot pipeline (old/new) with biased select and cancellation by dropping futures is well thought out. Logging includes subscribe id, track, and sequence.
Consider a small comment clarifying that “biased” prioritizes completion of in-flight group sends to reduce HOL blocking, since that’s non-obvious to readers unfamiliar with tokio::select! semantics.
274-319: Avoid shadowing the name ‘stream’ in serve_group for readability.Shadowing the SendStream with Writer named “stream” makes later code harder to scan, especially since elsewhere stream refers to Stream
with writer/reader. Renaming improves clarity without functional change.Apply this refactor:
pub async fn serve_group( session: S, msg: message::Group, priority: i32, mut group: GroupConsumer, ) -> Result<(), Error> { // TODO add a way to open in priority order. - let mut stream = session.open_uni().await.map_err(|err| Error::Transport(err.into()))?; - stream.set_priority(priority); + let mut send = session.open_uni().await.map_err(|err| Error::Transport(err.into()))?; + send.set_priority(priority); - let mut stream = Writer::new(stream); - stream.encode(&message::DataType::Group).await?; - stream.encode(&msg).await?; + let mut writer = Writer::new(send); + writer.encode(&message::DataType::Group).await?; + writer.encode(&msg).await?; loop { let frame = tokio::select! { biased; - _ = stream.closed() => return Err(Error::Cancel), + _ = writer.closed() => return Err(Error::Cancel), frame = group.next_frame() => frame, }; let mut frame = match frame? { Some(frame) => frame, None => break, }; - stream.encode(&frame.info.size).await?; + writer.encode(&frame.info.size).await?; loop { let chunk = tokio::select! { biased; - _ = stream.closed() => return Err(Error::Cancel), + _ = writer.closed() => return Err(Error::Cancel), chunk = frame.read_chunk() => chunk, }; match chunk? { - Some(chunk) => stream.write(&chunk).await?, + Some(chunk) => writer.write(&chunk).await?, None => break, } } } - stream.close().await?; + writer.close().await?; Ok(()) }rs/moq-relay/cfg/leaf.toml (2)
15-19: Mirror the protocol clarification for the leaf web listener.Same suggestion as root: clarify whether this is HTTPS/WSS by changing the comment to “HTTP(S) and WebSocket (TCP) connections” to reflect TLS expectations.
Apply this small comment tweak:
[web] -# Listen for HTTP and WebSocket (TCP) connections on the given address. +# Listen for HTTP(S) and WebSocket (TCP) connections on the given address. # Defaults to disabled if not provided. listen = "[::]:4444"
6-10: Comment/address mismatch for server port.Comment says UDP:4443 but the configured listen is “[::]:4444”. Update the comment to prevent misconfiguration during ops.
[server] -# Listen for QUIC connections on UDP:4443 +# Listen for QUIC connections on UDP:4444 # Sometimes IPv6 causes issues; try 127.0.0.1:4444 instead. listen = "[::]:4444"rs/moq-relay/cfg/dev.toml (1)
18-22: Dev web listener looks good; consider clarifying TLS and keeping dev/public scopes isolated.
- Same minor comment tweak: “HTTP(S) and WebSocket (TCP)”.
- With
auth.public = ""(fully public), this is dev-only as intended. Ensure no dev config is accidentally used in prod.[web] -# Listen for HTTP and WebSocket (TCP) connections on the given address. +# Listen for HTTP(S) and WebSocket (TCP) connections on the given address. # Defaults to disabled if not provided. listen = "[::]:4443"rs/moq-relay/src/config.rs (1)
33-36: Web config defaults and flattening are appropriate; add a quick unit test for config parsing.The
#[serde(default)]and#[command(flatten)]are correct to make the web server optional. To guard against regressions, add an inline test verifying:
- Default:
web.listenis None.- TOML with
[web] listen="..."deserializes as expected.You can drop this into the same file:
#[cfg(test)] mod tests { use super::*; use toml; #[test] fn web_default_disabled() { let cfg: Config = toml::from_str(r#" [server] listen = "[::]:4443" "#).expect("parse"); assert!(cfg.web.listen.is_none()); } #[test] fn web_enabled_from_toml() { let cfg: Config = toml::from_str(r#" [server] listen = "[::]:4443" [web] listen = "[::]:4443" "#).expect("parse"); assert_eq!(cfg.web.listen.map(|a| a.to_string()).as_deref(), Some("[::]:4443")); } }rs/moq-relay/src/main.rs (2)
28-34: Only construct and spawn the web server when configured; also log its address.
Web::newcurrently assumes certificates exist (web.rs calls.first().expect("missing certificate")). Ifweb.listenis None or certs are missing, avoid constructing it and give a clear log. This also avoids starting an idle task when web is disabled.Apply:
- // Create a web server too. - let web = Web::new(WebState { - auth: auth.clone(), - cluster: cluster.clone(), - fingerprints, - config: config.web, - conn_id: Default::default(), - }); - - tokio::spawn(async move { - web.run().await.expect("failed to run web server"); - }); + // Optionally create a web server too. + if let Some(web_addr) = config.web.listen { + tracing::info!(%web_addr, "web: listening"); + let web = Web::new(WebState { + auth: auth.clone(), + cluster: cluster.clone(), + fingerprints, + config: Config { web: WebConfig { listen: Some(web_addr) }, ..config.clone() }.web, + conn_id: Default::default(), + }); + tokio::spawn(async move { + web.run().await.expect("failed to run web server"); + }); + } else { + tracing::info!("web: disabled"); + }Note: If
Configisn’tClone, constructWebStateusingconfig.webbefore this block and gate on it:- let web = Web::new(WebState { ... config: config.web, ... }); + let web_cfg = config.web.clone(); + if let Some(web_addr) = web_cfg.listen { + tracing::info!(%web_addr, "web: listening"); + let web = Web::new(WebState { + auth: auth.clone(), + cluster: cluster.clone(), + fingerprints, + config: web_cfg, + conn_id: Default::default(), + }); + tokio::spawn(async move { + web.run().await.expect("failed to run web server"); + }); + } else { + tracing::info!("web: disabled"); + }
36-38: Task panic on web server failure. Consider graceful error reporting.
expect("failed to run web server")will panic the task. Prefer logging and letting the QUIC server continue, or propagating a shutdown signal if web is critical.- web.run().await.expect("failed to run web server"); + if let Err(err) = web.run().await { + tracing::error!(%err, "web server stopped"); + }rs/moq-relay/src/cluster.rs (2)
69-82: Avoid unwrap() on with_root to prevent panic on bad token/rootwith_root(&token.root).unwrap() will panic if scoping fails (e.g., malformed/unsupported root). Since the method returns Option<...> or Result<...> (depending on moq_lite), prefer mapping failures to None so subscriber() can gracefully return no access rather than crash.
Apply one of the following diffs depending on with_root’s return type:
If with_root returns Option<_>:
- let subscribe_origin = subscribe_origin.producer.with_root(&token.root).unwrap(); - subscribe_origin.consume_only(&token.subscribe) + subscribe_origin + .producer + .with_root(&token.root) + .and_then(|p| p.consume_only(&token.subscribe))If with_root returns Result<_, _>:
- let subscribe_origin = subscribe_origin.producer.with_root(&token.root).unwrap(); - subscribe_origin.consume_only(&token.subscribe) + subscribe_origin + .producer + .with_root(&token.root) + .ok() + .and_then(|p| p.consume_only(&token.subscribe))
84-94: Mirror unwrap() removal in publisher()Same unwrap() concern as subscriber(). Return None when scoping fails instead of panicking.
If with_root returns Option<_>:
- let publish_origin = publish_origin.producer.with_root(&token.root).unwrap(); - publish_origin.publish_only(&token.publish) + publish_origin + .producer + .with_root(&token.root) + .and_then(|p| p.publish_only(&token.publish))If with_root returns Result<_, _>:
- let publish_origin = publish_origin.producer.with_root(&token.root).unwrap(); - publish_origin.publish_only(&token.publish) + publish_origin + .producer + .with_root(&token.root) + .ok() + .and_then(|p| p.publish_only(&token.publish))rs/moq-relay/src/connection.rs (3)
15-23: Consider supporting Authorization header as a fallback to query paramExtracting jwt from the query works for the polyfill. For future parity and better security posture, also consider checking an Authorization: Bearer header if present.
Example sketch:
- let token = self + let mut token = self .request .url() .query_pairs() .find(|(k, _)| k == "jwt") .map(|(_, v)| v.to_string()); + if token.is_none() { + if let Some(h) = self.request.headers().get(axum::http::header::AUTHORIZATION) { + if let Ok(s) = h.to_str() { + if let Some(bearer) = s.strip_prefix("Bearer ") { + token = Some(bearer.to_string()); + } + } + } + }
33-33: Tweak log: don’t log “session accepted” before we validate allowed pathsThis log fires before we ensure there’s at least some allowed publish/subscribe. Consider rewording (e.g., “claims parsed”) or moving the log after the match below.
- tracing::info!(token = ?token, "session accepted"); + tracing::info!(token = ?token, "claims parsed");
38-49: Prefer structured arrays over join() for allowed paths loggingCollecting to Vec and letting tracing format it preserves structure and avoids string joins.
- tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), "session accepted"); + tracing::info!(root = %token.root, publish = ?publish.allowed().map(|p| p.as_str()).collect::<Vec<_>>(), subscribe = ?subscribe.allowed().map(|p| p.as_str()).collect::<Vec<_>>(), "session accepted"); } - (Some(publish), None) => { - tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), "publisher accepted"); + (Some(publish), None) => { + tracing::info!(root = %token.root, publish = ?publish.allowed().map(|p| p.as_str()).collect::<Vec<_>>(), "publisher accepted"); } - (None, Some(subscribe)) => { - tracing::info!(root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), "subscriber accepted") + (None, Some(subscribe)) => { + tracing::info!(root = %token.root, subscribe = ?subscribe.allowed().map(|p| p.as_str()).collect::<Vec<_>>(), "subscriber accepted") }rs/moq-relay/src/auth.rs (2)
7-21: AuthError is a good move; consider clearer variant messagesThe explicit error enum improves ergonomics. Minor nit: UnexpectedToken’s message “authentication is disabled” can be confusing in contexts where a key simply isn’t configured. Consider clarifying it to “token provided but no key configured”.
- #[error("authentication is disabled")] + #[error("token provided but no key configured")] UnexpectedToken,
22-32: Consider including WWW-Authenticate header in 401 responsesAdding a WWW-Authenticate header helps clients understand auth requirements and aligns with RFC 6750 for bearer tokens.
impl axum::response::IntoResponse for AuthError { fn into_response(self) -> axum::response::Response { - http::StatusCode::UNAUTHORIZED.into_response() + let mut res = http::StatusCode::UNAUTHORIZED.into_response(); + // Hint clients to send a bearer token (adjust realm as appropriate) + res.headers_mut().insert( + axum::http::header::WWW_AUTHENTICATE, + axum::http::HeaderValue::from_static("Bearer realm=\"moq-relay\""), + ); + res } }rs/moq-relay/src/web.rs (1)
121-145: Session bridging looks correct; minor TODO on error swallowingThe axum↔tungstenite adapters intentionally squash errors to ConnectionClosed. Consider bubbling critical errors into logs or metrics to aid debugging.
You could log the axum::Error contents before mapping:
- .sink_map_err(|err| { - tracing::warn!(%err, "WebSocket error"); - tungstenite::Error::ConnectionClosed - }) + .sink_map_err(|err| { + tracing::warn!(%err, "WebSocket sink error"); + tungstenite::Error::ConnectionClosed + })js/justfile (1)
11-11: Switching to non-recursive installs is fine; consider locking CI installs.Using
pnpm installat the workspace root is correct and typically faster than-r install. For reproducible CI, consider adding--frozen-lockfilein thecheckandbuildrecipes so the lockfile is treated as the source of truth.Apply this diff to harden CI installs:
- pnpm install {{flags}} + pnpm install --frozen-lockfile {{flags}}Also applies to: 16-16, 36-36, 53-53
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (2)
js/pnpm-lock.yamlis excluded by!**/pnpm-lock.yamlrs/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (30)
js/hang/package.json(0 hunks)js/justfile(3 hunks)js/moq/package.json(1 hunks)js/moq/src/connection.ts(2 hunks)rs/Cargo.toml(2 hunks)rs/hang-cli/src/server.rs(2 hunks)rs/moq-native/Cargo.toml(1 hunks)rs/moq-native/src/client.rs(2 hunks)rs/moq-native/src/lib.rs(1 hunks)rs/moq-native/src/server.rs(2 hunks)rs/moq-relay/Cargo.toml(1 hunks)rs/moq-relay/cfg/dev.toml(1 hunks)rs/moq-relay/cfg/leaf.toml(1 hunks)rs/moq-relay/cfg/root.toml(1 hunks)rs/moq-relay/src/auth.rs(21 hunks)rs/moq-relay/src/cluster.rs(2 hunks)rs/moq-relay/src/config.rs(2 hunks)rs/moq-relay/src/connection.rs(1 hunks)rs/moq-relay/src/main.rs(1 hunks)rs/moq-relay/src/web.rs(4 hunks)rs/moq/Cargo.toml(1 hunks)rs/moq/src/coding/varint.rs(1 hunks)rs/moq/src/error.rs(3 hunks)rs/moq/src/lib.rs(1 hunks)rs/moq/src/session/mod.rs(6 hunks)rs/moq/src/session/publisher.rs(12 hunks)rs/moq/src/session/reader.rs(3 hunks)rs/moq/src/session/stream.rs(2 hunks)rs/moq/src/session/subscriber.rs(13 hunks)rs/moq/src/session/writer.rs(2 hunks)
💤 Files with no reviewable changes (1)
- js/hang/package.json
🚧 Files skipped from review as they are similar to previous changes (7)
- rs/moq-native/Cargo.toml
- rs/moq/src/lib.rs
- rs/moq-relay/Cargo.toml
- rs/moq-native/src/lib.rs
- rs/moq/Cargo.toml
- rs/Cargo.toml
- rs/moq/src/session/stream.rs
🧰 Additional context used
📓 Path-based instructions (4)
rs/**/src/**/*.rs
📄 CodeRabbit Inference Engine (CLAUDE.md)
Rust tests should be integrated within source files (inline unit tests)
Files:
rs/moq/src/coding/varint.rsrs/moq-relay/src/config.rsrs/moq-relay/src/cluster.rsrs/moq-relay/src/auth.rsrs/hang-cli/src/server.rsrs/moq/src/error.rsrs/moq-relay/src/main.rsrs/moq-native/src/client.rsrs/moq/src/session/subscriber.rsrs/moq-native/src/server.rsrs/moq/src/session/mod.rsrs/moq/src/session/reader.rsrs/moq-relay/src/web.rsrs/moq-relay/src/connection.rsrs/moq/src/session/publisher.rsrs/moq/src/session/writer.rs
rs/**
📄 CodeRabbit Inference Engine (CLAUDE.md)
Place Rust crates under the rs/ directory
Files:
rs/moq/src/coding/varint.rsrs/moq-relay/src/config.rsrs/moq-relay/cfg/dev.tomlrs/moq-relay/cfg/leaf.tomlrs/moq-relay/cfg/root.tomlrs/moq-relay/src/cluster.rsrs/moq-relay/src/auth.rsrs/hang-cli/src/server.rsrs/moq/src/error.rsrs/moq-relay/src/main.rsrs/moq-native/src/client.rsrs/moq/src/session/subscriber.rsrs/moq-native/src/server.rsrs/moq/src/session/mod.rsrs/moq/src/session/reader.rsrs/moq-relay/src/web.rsrs/moq-relay/src/connection.rsrs/moq/src/session/publisher.rsrs/moq/src/session/writer.rs
js/**
📄 CodeRabbit Inference Engine (CLAUDE.md)
Place JS/TS packages under the js/ directory
Files:
js/moq/package.jsonjs/justfilejs/moq/src/connection.ts
rs/moq-relay/**
📄 CodeRabbit Inference Engine (CLAUDE.md)
The CDN/relay (rs/moq-relay) must not include application logic, media codecs, or track details
Files:
rs/moq-relay/src/config.rsrs/moq-relay/cfg/dev.tomlrs/moq-relay/cfg/leaf.tomlrs/moq-relay/cfg/root.tomlrs/moq-relay/src/cluster.rsrs/moq-relay/src/auth.rsrs/moq-relay/src/main.rsrs/moq-relay/src/web.rsrs/moq-relay/src/connection.rs
🧬 Code Graph Analysis (11)
rs/moq-relay/src/auth.rs (1)
rs/moq/src/path.rs (10)
from(233-235)from(239-242)from(252-272)new(82-98)as_path(13-13)as_path(17-19)as_path(23-26)as_path(30-32)as_path(36-38)as_path(42-44)
rs/moq/src/error.rs (1)
rs/moq/src/session/mod.rs (1)
new(25-72)
rs/moq-relay/src/main.rs (3)
rs/moq-relay/src/web.rs (2)
new(60-77)new(218-220)rs/moq-relay/src/auth.rs (1)
new(70-88)rs/moq-relay/src/cluster.rs (1)
new(58-67)
rs/moq-native/src/client.rs (2)
rs/moq/src/session/mod.rs (1)
connect(75-84)rs/hang-cli/src/client.rs (1)
connect(23-53)
rs/moq/src/session/subscriber.rs (4)
rs/moq/src/session/mod.rs (2)
new(25-72)stream(144-144)rs/moq/src/session/writer.rs (1)
new(10-15)rs/moq/src/session/reader.rs (1)
new(13-18)rs/moq/src/session/stream.rs (1)
open(10-18)
rs/moq/src/session/mod.rs (3)
rs/moq/src/session/writer.rs (3)
new(10-15)close(51-54)closed(60-63)rs/moq/src/session/reader.rs (2)
new(13-18)closed(77-90)rs/moq/src/session/stream.rs (2)
open(10-18)accept(20-27)
rs/moq/src/session/reader.rs (1)
rs/moq/src/session/writer.rs (2)
new(10-15)closed(60-63)
rs/moq-relay/src/web.rs (5)
rs/moq-relay/src/cluster.rs (3)
get(96-101)new(58-67)run(103-139)rs/moq/src/session/mod.rs (3)
accept(101-112)new(25-72)close(149-151)rs/hang-cli/src/server.rs (3)
accept(39-64)server(14-37)publish(89-102)rs/moq-relay/src/auth.rs (1)
new(70-88)rs/moq-relay/src/connection.rs (1)
run(14-61)
rs/moq-relay/src/connection.rs (1)
rs/moq-relay/src/cluster.rs (1)
run(103-139)
rs/moq/src/session/publisher.rs (5)
rs/moq/src/session/mod.rs (3)
new(25-72)stream(144-144)accept(101-112)rs/moq/src/session/writer.rs (1)
new(10-15)rs/moq/src/session/reader.rs (1)
new(13-18)rs/moq/src/session/subscriber.rs (3)
new(27-35)stream(101-101)stream(267-267)rs/moq/src/session/stream.rs (1)
accept(20-27)
rs/moq/src/session/writer.rs (2)
rs/moq/src/session/mod.rs (3)
new(25-72)stream(144-144)close(149-151)rs/moq/src/session/reader.rs (1)
new(13-18)
🔇 Additional comments (41)
rs/moq-native/src/client.rs (1)
171-174: ALPN mapping for HTTPS is correct.Using web_transport_quinn::ALPN for the "https" scheme is the right selection given the migration to web-transport-quinn.
rs/moq/src/coding/varint.rs (1)
21-21: Adding Debug derive on VarInt is a safe, helpful winThis improves diagnostics without affecting behavior or invariants. The inner field remains private, so no exposure risk.
rs/hang-cli/src/server.rs (1)
7-7: Import migration verified: no stale references or unwanted transport crates
- No occurrences of the old
web_transport::quinnpath remain.- All code imports now use
web_transport_quinn::…(in moq-relay, moq-native, and hang-cli).- Cargo manifests only patch or depend on the new transport crates (
web-transport-quinn,web-transport-generic,web-transport-polyfill); there are no legacyweb-transportentries.Everything looks correct—no further action needed.
rs/moq/src/error.rs (1)
70-70: Code mapping looks consistentMapping Transport to code 4 is consistent with the refactor to a generic transport abstraction.
rs/moq/src/session/reader.rs (1)
7-14: LGTM on the generic RecvStream refactorThe Reader struct and impl bounds over web_transport_generic::RecvStream look clean and keep the same public surface.
rs/moq/src/session/writer.rs (1)
3-7: LGTM on generic SendStream Writer and drop semanticsThe generic Writer over SendStream and abort-on-drop behavior are consistent and desirable for MoQ control streams.
Also applies to: 9-16
rs/moq/src/session/subscriber.rs (1)
141-167: Spawn + state updates look soundThe announce/broadcast spawning and Lock usage avoids data races while keeping per-broadcast tasks isolated. No issues spotted.
rs/moq/src/session/mod.rs (1)
20-22: Generic Session refactor looks cohesiveThe genericization over web_transport_generic::Session and Stream
lifecycles, including connect/accept handshakes and background task orchestration, is consistent with the rest of the PR.Also applies to: 74-84, 100-112, 142-146
rs/moq/src/session/publisher.rs (7)
27-41: Switch to Stream::accept(&self.session) looks correct with generic session.
Accepting bidirectional streams by reference aligns with Stream::accept(session: &S). The control dispatch to Announce/Subscribe is clean.
47-80: Spawn + borrowing pattern for announce path looks sound.
Moving stream and the derived origin into the spawned task avoids clone costs and keeps lifetimes aligned with the stream buffer. Good use of a narrow mutable borrow for self.
82-133: Announce init and incremental updates logic is solid.
Nice use of try_announced() for synchronous init and then announced() for streaming updates. Suffix math and logging look correct.
135-168: Subscribe path spawn and error handling mirror announce; good symmetry.
Decoding Subscribe, capturing absolute path for logs, and deferring work to a spawned task is consistent and clean.
170-199: Responding with SubscribeOk before serving is correct; select! cancellation path is clear.
Returning SubscribeOk with negotiated priority, then racing between run_track and reader.closed() is straightforward. Writer close on exit is good.
322-351: Priority math is coherent and covered by unit tests.
Free function stream_priority has clear comments and tests validate expected packing. Using 24-bit sequence space and noting overflow behavior is helpful.
281-286: Ensure SendStream trait is in scope for set_priority (and Writer’s methods are available).
You’ve imported web_transport_generic::SendStream; that should bring set_priority into scope for the uni stream. If Writer relies on the same trait bounds for encode/close/closed, ensure its impls carry the SendStream bound. If not already enforced in writer.rs, add a where bound there.
I can scan the crate for Writer’s trait bounds and usages to confirm the constraints if you want a quick verification script.
rs/moq-relay/cfg/root.toml (1)
15-19: Clarify TLS Termination and Update Comment
The hyper-serve crate is built with the tls-rustls feature (per Cargo.toml), and you import hyper_serve::accept::DefaultAcceptor as the acceptor for hyper_serve::bind. In practice:
hyper_serve::bind(...)will terminate TLS using the same certificates you supplied via the globaltls.generateortls.certsettings (env vars MOQ_SERVER_TLS_GENERATE and MOQ_SERVER_TLS_CERT), not plain HTTP.
Please update the comment in
rs/moq-relay/cfg/root.toml:15–19
from:
[web]
# Listen for HTTP and WebSocket (TCP) connections on the given address.
listen = "[::]:4443"to something like:
[web]
# Listen for HTTPS and secure WebSocket (WSS) connections on the given address.
# TLS is terminated using the certificates configured via tls.generate/tls.cert above.
listen = "[::]:4443"This makes it clear that connections on 4443 are HTTPS/WSS, avoiding confusion around plain HTTP on a typically‐TLS port.
rs/moq-relay/src/config.rs (1)
4-4: Good integration of WebConfig into the root Config.
Importing WebConfig alongside AuthConfig and ClusterConfig is consistent with the existing config pattern.
rs/moq-relay/src/cluster.rs (2)
8-8: Import of AuthToken is correct and improves clarity
Using the concrete AuthToken type here makes the new per-token helpers explicit and discoverable.
69-95: Origin selection logic looks sound
- Cluster subscribers get primary only; external subscribers get combined.
- Cluster publishers publish to secondary; external publishers to primary.
This enforces the intended separation between local and cluster-originated broadcasts.
rs/moq-relay/src/connection.rs (3)
3-7: Dependency path/type update LGTM
Switching to moq_native::web_transport_quinn and aligning the Request type is consistent with the transport abstraction refactor.
51-58: Session acceptance flow LGTM
Accepting the WebTransport request first and then wiring publish/subscribe into moq_lite::Session::accept matches the new transport abstraction.
25-31: Verify external Request::close signature and semantics
We couldn’t locate the web_transport_quinn::Request::close implementation in this repo—it’s provided by the external web-transport-quinn crate. Please confirm:
- That
Request::closeexists on your version ofweb-transport-quinn. - Its parameter type matches what
err.clone().into()produces (e.g. au32orStatusCode). - It returns a
Futurewhose error type you handle correctly. - It implements a clean WebTransport session termination (not just a stream reset).
If any of these don’t hold, switch to the proper stream or session close/reset methods offered by web-transport-quinn.
rs/moq-relay/src/auth.rs (15)
115-121: Path/root reduction logic is correct and concise
Using Path::new(path) and strip_prefix against claims.root to derive the suffix, then shrinking permissions accordingly, matches the intended nested path semantics.
180-187: Anonymous access tests: adopt Path comparisons consistently
The updates to call verify with a path string look good and the assertions against Path/PathOwned are consistent. LGTM.
202-204: Fully public path test LGTM
Verifies that an empty public root allows any path while preserving empty (“full under new root”) permissions.
220-223: Wrong-prefix anonymous access: assertion matches new IncorrectRoot
The test’s error substring matches the new IncorrectRoot message. LGTM.
282-286: Basic JWT validation test LGTM
Confirms root and allowed prefixes are preserved for a matching path.
334-337: Restricted pub/sub test LGTM
Ensures restricted claims survive verification on the same root.
358-361: Read-only token test LGTM
Confirms empty publish rights are preserved.
382-385: Write-only token test LGTM
Confirms empty subscribe rights are preserved.
407-416: Claims reduction on more specific path LGTM
Verifies that connecting deeper within the root reduces the root while keeping “full under new root” semantics for empty prefixes.
436-443: Publish restrictions reduction LGTM
Correctly strips the deeper connection suffix from publish prefixes.
466-471: Subscribe restrictions reduction LGTM
Correctly strips the deeper connection suffix from subscribe prefixes.
494-509: Loss of access when connecting to a sibling subtree is validated
Tests demonstrate expected privilege loss. LGTM.
531-537: Nested paths reduction LGTM
Ensures nested prefixes are adjusted as intended.
540-546: Further nested reduction LGTM
Correctly demonstrates permission removal when moving into a subtree that doesn’t match the other prefix.
569-589: Preserving read-only and write-only semantics LGTM
Confirms unilateral permissions remain unilateral after reduction. Nicely covered.
rs/moq-relay/src/web.rs (4)
87-119: WebSocket upgrade and polyfill wiring LGTM
Selecting the “webtransport” subprotocol and deferring to the polyfill is clean. The id counter with Relaxed is sufficient for a monotonic tag.
148-168: serve_announced auth/origin flow LGTM
Verifies per-prefix access and returns active broadcast suffixes. Clear and minimal.
281-299: axum_to_tungstenite: OK with caveat
The mapping is fine. Note we drop the actual axum::Error details by converting to ConnectionClosed; we already warn above. Good enough for now.
301-319: tungstenite_to_axum adapter LGTM
Covers relevant message variants; Frame is unreachable as expected. Good.
| "dependencies": { | ||
| "@kixelated/web-transport-polyfill": "file:../../../web-transport-rs/web-transport-polyfill" | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
file: dependency to an out-of-tree path will break consumers and violates the js/ packaging guideline.
- The dependency points outside
js/(../../../web-transport-rs/web-transport-polyfill), which violates the guideline to place JS/TS packages underjs/. - Publishing
@kixelated/moqwith afile:dependency will fail for consumers not in this repo. This will also cause bundlers to fail resolution at build time when consuming the package from npm.
Prefer a workspace dependency (if the polyfill is part of the monorepo) or a semver dependency to a published package. If the polyfill is optional, consider marking it as an optional dependency or peer dependency and keeping the dynamic import, but it still must be resolvable at build time.
Run this to verify the package’s location and whether it’s a workspace member:
If it’s a workspace member, switch to a workspace dependency:
- "@kixelated/web-transport-polyfill": "file:../../../web-transport-rs/web-transport-polyfill"
+ "@kixelated/web-transport-polyfill": "workspace:*"🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Searching for @kixelated/web-transport-polyfill..."
rg -nP --glob '!**/node_modules/**' '"name"\s*:\s*"@kixelated/web-transport-polyfill"' -C2 || {
echo "Package @kixelated/web-transport-polyfill not found. Consider adding it under js/ as a workspace package."
exit 1
}
echo
echo "Checking if js/moq is two levels deep and whether ../../../ resolves outside repo..."
# Count path segments to repo root by checking for .git at or above.
if [ -d .git ]; then
ROOT=$(pwd)
else
echo "Run from repo root. Exiting with non-zero."
exit 1
fi
# Show the relative path currently used and whether it exists
TARGET_REL="../../../web-transport-rs/web-transport-polyfill"
MOQ_DIR="js/moq"
if [ -d "$MOQ_DIR" ]; then
pushd "$MOQ_DIR" >/dev/null
echo "From $(pwd), checking $TARGET_REL"
if [ -d "$TARGET_REL" ]; then
echo "Path exists at: $(python - <<'PY'
import os
print(os.path.abspath("../../../web-transport-rs/web-transport-polyfill"))
PY
)"
else
echo "Path does not exist. The relative path is likely incorrect from js/moq."
exit 2
fi
popd >/dev/null
else
echo "js/moq directory not found at expected location."
exit 3
fi
echo
echo "If the polyfill is in-repo, ensure it is declared as a workspace. Searching for pnpm-workspace.yaml or workspaces..."
fd -H -a pnpm-workspace.yaml package.json | xargs -I {} sh -c 'echo "--- {} ---"; cat "{}"'Length of output: 453
Replace out-of-tree file: dependency with a workspace or published package
The file:../../../web-transport-rs/web-transport-polyfill entry in js/moq/package.json points outside the js/ directory and is not declared as a workspace member. This will break both local consumers (bundlers won’t resolve it) and published installs.
• The @kixelated/web-transport-polyfill package isn’t found in your workspace (no “name”: "@kixelated/web-transport-polyfill" under any js/ folder).
• file: dependencies to paths outside js/ violate our packaging guidelines and won’t resolve for npm consumers.
Mandatory refactor:
- If the polyfill lives in this monorepo, move its package under
js/, add it to your workspace (e.g. in pnpm-workspace.yaml or package.json “workspaces”), then depend on it via:"dependencies": {
- "@kixelated/web-transport-polyfill": "file:../../../web-transport-rs/web-transport-polyfill"
- "@kixelated/web-transport-polyfill": "workspace:*"
}
2. Otherwise, publish `@kixelated/web-transport-polyfill` to npm and switch to a semver dependency:
```diff
"dependencies": {
- "@kixelated/web-transport-polyfill": "file:../../../web-transport-rs/web-transport-polyfill"
+ "@kixelated/web-transport-polyfill": "^1.2.3"
}
- If the polyfill is optional, consider marking it as an
optionalDependencyorpeerDependencyand using a dynamic import—but it must still resolve at install/build time.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "dependencies": { | |
| "@kixelated/web-transport-polyfill": "file:../../../web-transport-rs/web-transport-polyfill" | |
| }, | |
| "dependencies": { | |
| "@kixelated/web-transport-polyfill": "workspace:*" | |
| }, |
🤖 Prompt for AI Agents
In js/moq/package.json around lines 21 to 23, the dependency uses an out-of-tree
file: path ("file:../../../web-transport-rs/web-transport-polyfill") which is
not a workspace member and will break installs and bundlers; fix by either (A)
moving the web-transport-polyfill package into the repo under js/, add it to the
workspace (pnpm-workspace.yaml or package.json "workspaces") and change the
dependency to the workspace package name/version, or (B) publish
@kixelated/web-transport-polyfill to npm and replace the file: entry with a
semver dependency, or (C) if optional, mark it as optionalDependency or
peerDependency and load it dynamically—but ensure the package resolves at
install/build time and the package.json refers to the package name (not an
external file path).
js/moq/src/connection.ts
Outdated
| let polyfill: Promise<typeof import("@kixelated/web-transport-polyfill")>; | ||
| if (typeof globalThis !== "undefined" && !("WebTransport" in globalThis)) { | ||
| polyfill = import("@kixelated/web-transport-polyfill"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Type-safety: allow polyfill to be undefined; improves narrowing and avoids “used before assignment” issues.
polyfill is only assigned when WebTransport is missing. Make the type Promise<...> | undefined and rely on narrowing. This also improves readability at the usage sites.
-let polyfill: Promise<typeof import("@kixelated/web-transport-polyfill")>;
+let polyfill: Promise<typeof import("@kixelated/web-transport-polyfill")> | undefined;Also applies to: 62-68
🤖 Prompt for AI Agents
In js/moq/src/connection.ts around lines 10-13 (and similarly at 62-68), the
variable `polyfill` is declared as a non-optional Promise but is only assigned
when WebTransport is missing, causing potential "used before assignment" and
narrowing issues; change the declaration to allow undefined (e.g.,
Promise<typeof import("@kixelated/web-transport-polyfill")> | undefined) and
update any usage sites to narrow/check for undefined before awaiting or
accessing it so TypeScript knows the value may be absent.
js/moq/src/connection.ts
Outdated
| // Only perform certificate fetch and URL rewrite when polyfill is not needed | ||
| // This is needed because WebTransport is a butt to work with in local development. | ||
| if (!polyfill && url.protocol === "http:") { | ||
| const fingerprintUrl = new URL(url); | ||
| fingerprintUrl.pathname = "/certificate.sha256"; | ||
| fingerprintUrl.search = ""; | ||
| console.warn(fingerprintUrl.toString(), "performing an insecure fingerprint fetch; use https:// in production"); | ||
|
|
||
| // Fetch the fingerprint from the server. | ||
| const fingerprint = await fetch(fingerprintUrl); | ||
| const fingerprintText = await fingerprint.text(); | ||
|
|
||
| options.serverCertificateHashes = [ | ||
| { | ||
| algorithm: "sha-256", | ||
| value: Hex.toBytes(fingerprintText), | ||
| }, | ||
| ]; | ||
|
|
||
| adjustedUrl = new URL(url); | ||
| adjustedUrl.protocol = "https:"; | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Harden fingerprint fetch: check HTTP status and trim input before decoding.
fetch may fail or return non-200. Also, certificate files often include a newline; trim before Hex.toBytes to avoid parsing errors.
Apply this diff to improve robustness:
- // Fetch the fingerprint from the server.
- const fingerprint = await fetch(fingerprintUrl);
- const fingerprintText = await fingerprint.text();
-
- options.serverCertificateHashes = [
- {
- algorithm: "sha-256",
- value: Hex.toBytes(fingerprintText),
- },
- ];
+ // Fetch the fingerprint from the server.
+ const resp = await fetch(fingerprintUrl);
+ if (!resp.ok) {
+ throw new Error(`Failed to fetch certificate fingerprint: ${resp.status} ${resp.statusText}`);
+ }
+ const fingerprintText = (await resp.text()).trim();
+
+ options.serverCertificateHashes = [
+ {
+ algorithm: "sha-256",
+ value: Hex.toBytes(fingerprintText),
+ },
+ ];🏁 Script executed:
#!/bin/bash
# Show the fingerprint fetch snippet for verification
rg -nC3 "fetch(" js/moq/src/connection.tsLength of output: 106
🏁 Script executed:
#!/bin/bash
# Show surrounding context for the fingerprint fetch snippet
rg -nC5 "fingerprintUrl" js/moq/src/connection.ts
# Verify the fetch call with escaped parenthesis
rg -nC3 'fetch\(' js/moq/src/connection.tsLength of output: 1171
Harden fingerprint fetch: check HTTP status and trim response
To avoid silent failures or parse errors when fetching the certificate fingerprint, verify resp.ok before reading the body and trim any trailing newline or whitespace from the text.
Locations to update:
- js/moq/src/connection.ts, around lines 47–55
Proposed diff:
- // Fetch the fingerprint from the server.
- const fingerprint = await fetch(fingerprintUrl);
- const fingerprintText = await fingerprint.text();
-
- options.serverCertificateHashes = [
- {
- algorithm: "sha-256",
- value: Hex.toBytes(fingerprintText),
- },
- ];
+ // Fetch the fingerprint from the server.
+ const resp = await fetch(fingerprintUrl);
+ if (!resp.ok) {
+ throw new Error(`Failed to fetch certificate fingerprint: ${resp.status} ${resp.statusText}`);
+ }
+ const fingerprintText = (await resp.text()).trim();
+
+ options.serverCertificateHashes = [
+ {
+ algorithm: "sha-256",
+ value: Hex.toBytes(fingerprintText),
+ },
+ ];📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Only perform certificate fetch and URL rewrite when polyfill is not needed | |
| // This is needed because WebTransport is a butt to work with in local development. | |
| if (!polyfill && url.protocol === "http:") { | |
| const fingerprintUrl = new URL(url); | |
| fingerprintUrl.pathname = "/certificate.sha256"; | |
| fingerprintUrl.search = ""; | |
| console.warn(fingerprintUrl.toString(), "performing an insecure fingerprint fetch; use https:// in production"); | |
| // Fetch the fingerprint from the server. | |
| const fingerprint = await fetch(fingerprintUrl); | |
| const fingerprintText = await fingerprint.text(); | |
| options.serverCertificateHashes = [ | |
| { | |
| algorithm: "sha-256", | |
| value: Hex.toBytes(fingerprintText), | |
| }, | |
| ]; | |
| adjustedUrl = new URL(url); | |
| adjustedUrl.protocol = "https:"; | |
| } | |
| // Only perform certificate fetch and URL rewrite when polyfill is not needed | |
| // This is needed because WebTransport is a butt to work with in local development. | |
| if (!polyfill && url.protocol === "http:") { | |
| const fingerprintUrl = new URL(url); | |
| fingerprintUrl.pathname = "/certificate.sha256"; | |
| fingerprintUrl.search = ""; | |
| console.warn(fingerprintUrl.toString(), "performing an insecure fingerprint fetch; use https:// in production"); | |
| // Fetch the fingerprint from the server. | |
| const resp = await fetch(fingerprintUrl); | |
| if (!resp.ok) { | |
| throw new Error(`Failed to fetch certificate fingerprint: ${resp.status} ${resp.statusText}`); | |
| } | |
| const fingerprintText = (await resp.text()).trim(); | |
| options.serverCertificateHashes = [ | |
| { | |
| algorithm: "sha-256", | |
| value: Hex.toBytes(fingerprintText), | |
| }, | |
| ]; | |
| adjustedUrl = new URL(url); | |
| adjustedUrl.protocol = "https:"; | |
| } |
🤖 Prompt for AI Agents
In js/moq/src/connection.ts around lines 39 to 61, the fingerprint fetch must
verify the HTTP status and trim the response before parsing; after awaiting
fetch(fingerprintUrl) check resp.ok and throw or handle an error including
resp.status (so failures don't silently continue), then read the body with await
resp.text() and call .trim() to remove newlines/whitespace before passing the
value into Hex.toBytes and setting options.serverCertificateHashes; keep the
rest of the URL adjustment logic unchanged.
| tls.alpn_protocols = vec![ | ||
| web_transport::quinn::ALPN.as_bytes().to_vec(), | ||
| web_transport_quinn::ALPN.as_bytes().to_vec(), | ||
| moq_lite::ALPN.as_bytes().to_vec(), | ||
| ]; | ||
| tls.key_log = Arc::new(rustls::KeyLogFile::new()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Do not advertise moq_lite::ALPN unless it’s actually handled.
The server advertises both web_transport_quinn::ALPN and moq_lite::ALPN, but accept_session rejects moq_lite::ALPN. This leads to successful TLS ALPN negotiation followed by an application-level bail, causing confusing failures.
If raw QUIC isn’t supported yet, remove moq_lite::ALPN from the advertised list to avoid false negotiation:
- tls.alpn_protocols = vec![
- web_transport_quinn::ALPN.as_bytes().to_vec(),
- moq_lite::ALPN.as_bytes().to_vec(),
- ];
+ tls.alpn_protocols = vec![
+ web_transport_quinn::ALPN.as_bytes().to_vec(),
+ // TODO: Re-add moq_lite::ALPN when raw QUIC support is implemented in accept_session.
+ ];Verify the inconsistency (advertised vs accepted ALPN) across the codebase:
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Where server advertises ALPNs:"
rg -n -C2 'alpn_protocols\s*=\s*vec!\[' --type=rust
echo
echo "Where server accepts ALPNs:"
rg -n -C3 'match\s+alpn\.as_str\(\)' --type=rust
echo
echo "References to moq_lite::ALPN in server code:"
rg -n -C2 'moq_lite::ALPN' rs/moq-native/src/server.rsLength of output: 1763
Remove moq_lite::ALPN from advertised ALPNs until raw QUIC support is implemented
The server currently advertises both web_transport_quinn::ALPN and moq_lite::ALPN but only handles web_transport_quinn::ALPN in its ALPN match. A client negotiating moq_lite::ALPN will succeed in TLS but then be immediately rejected at the application layer.
• File rs/moq-native/src/server.rs, lines 105–109
- tls.alpn_protocols = vec![
- web_transport_quinn::ALPN.as_bytes().to_vec(),
- moq_lite::ALPN.as_bytes().to_vec(),
- ];
+ tls.alpn_protocols = vec![
+ web_transport_quinn::ALPN.as_bytes().to_vec(),
+ // TODO: re-add moq_lite::ALPN when raw QUIC support is implemented
+ ];Alternatively, if raw QUIC is ready to be supported, add a branch for moq_lite::ALPN in the ALPN match at line 186.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tls.alpn_protocols = vec![ | |
| web_transport::quinn::ALPN.as_bytes().to_vec(), | |
| web_transport_quinn::ALPN.as_bytes().to_vec(), | |
| moq_lite::ALPN.as_bytes().to_vec(), | |
| ]; | |
| tls.key_log = Arc::new(rustls::KeyLogFile::new()); | |
| tls.alpn_protocols = vec![ | |
| web_transport_quinn::ALPN.as_bytes().to_vec(), | |
| // TODO: re-add moq_lite::ALPN when raw QUIC support is implemented | |
| ]; | |
| tls.key_log = Arc::new(rustls::KeyLogFile::new()); |
🤖 Prompt for AI Agents
In rs/moq-native/src/server.rs around lines 105–109, the TLS ALPN list
advertises both web_transport_quinn::ALPN and moq_lite::ALPN even though the
application only handles web_transport_quinn::ALPN (mismatch at the ALPN match
near line 186), causing clients negotiating moq_lite::ALPN to be TLS-successful
but rejected by the app; fix by removing moq_lite::ALPN from tls.alpn_protocols
so only web_transport_quinn::ALPN is advertised, or if raw QUIC support is
ready, add a handling branch in the ALPN match at line 186 to accept and route
moq_lite::ALPN paths (including any protocol-specific setup) so advertised ALPNs
match implemented logic.
| let app = Router::new() | ||
| .route("/certificate.sha256", get(fingerprint)) | ||
| .route( | ||
| "/announced", | ||
| get({ | ||
| let cluster = config.cluster.clone(); | ||
| move || serve_announced(Path("".to_string()), cluster.clone()) | ||
| }), | ||
| ) | ||
| .route( | ||
| "/announced/{*prefix}", | ||
| get({ | ||
| let cluster = config.cluster.clone(); | ||
| move |path| serve_announced(path, cluster) | ||
| }), | ||
| ) | ||
| .route( | ||
| "/fetch/{*path}", | ||
| get({ | ||
| let cluster = config.cluster.clone(); | ||
| move |path| serve_fetch(path, cluster) | ||
| }), | ||
| ) | ||
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])); | ||
|
|
||
| let server = hyper_serve::bind(config.bind); | ||
| .route("/announced", get(serve_announced)) | ||
| .route("/announced/{*prefix}", get(serve_announced)) | ||
| .route("/fetch/{*path}", get(serve_fetch)) | ||
| .route("/ws/{*path}", any(serve_ws)) | ||
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])) | ||
| .with_state(Arc::new(state)); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/certificate.sha256 handler won’t compile as-is; needs a closure handler
Passing a String directly to get(...) is invalid; get expects a handler function. Use a closure that returns the fingerprint.
- let app = Router::new()
- .route("/certificate.sha256", get(fingerprint))
+ let fp = fingerprint.clone();
+ let app = Router::new()
+ .route("/certificate.sha256", get(move || async move { fp.clone() }))
.route("/announced", get(serve_announced))
.route("/announced/{*prefix}", get(serve_announced))
.route("/fetch/{*path}", get(serve_fetch))
.route("/ws/{*path}", any(serve_ws))
.layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET]))
.with_state(Arc::new(state));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let app = Router::new() | |
| .route("/certificate.sha256", get(fingerprint)) | |
| .route( | |
| "/announced", | |
| get({ | |
| let cluster = config.cluster.clone(); | |
| move || serve_announced(Path("".to_string()), cluster.clone()) | |
| }), | |
| ) | |
| .route( | |
| "/announced/{*prefix}", | |
| get({ | |
| let cluster = config.cluster.clone(); | |
| move |path| serve_announced(path, cluster) | |
| }), | |
| ) | |
| .route( | |
| "/fetch/{*path}", | |
| get({ | |
| let cluster = config.cluster.clone(); | |
| move |path| serve_fetch(path, cluster) | |
| }), | |
| ) | |
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])); | |
| let server = hyper_serve::bind(config.bind); | |
| .route("/announced", get(serve_announced)) | |
| .route("/announced/{*prefix}", get(serve_announced)) | |
| .route("/fetch/{*path}", get(serve_fetch)) | |
| .route("/ws/{*path}", any(serve_ws)) | |
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])) | |
| .with_state(Arc::new(state)); | |
| let fp = fingerprint.clone(); | |
| let app = Router::new() | |
| .route("/certificate.sha256", get(move || async move { fp.clone() })) | |
| .route("/announced", get(serve_announced)) | |
| .route("/announced/{*prefix}", get(serve_announced)) | |
| .route("/fetch/{*path}", get(serve_fetch)) | |
| .route("/ws/{*path}", any(serve_ws)) | |
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])) | |
| .with_state(Arc::new(state)); |
🤖 Prompt for AI Agents
In rs/moq-relay/src/web.rs around lines 66 to 74, the route for
"/certificate.sha256" is passing a String directly to get(...), which is invalid
because get expects a handler; change it to a closure-based handler that returns
the fingerprint (e.g., an async closure that captures or clones the String and
returns it as a response), or reference an existing async handler function that
returns the fingerprint wrapped as an axum response; ensure the closure's
signature is compatible with axum (async, returns impl IntoResponse) and that
any captured String is cloned or moved appropriately to satisfy lifetimes.
🛠️ Refactor suggestion
Route/matcher mismatch: serve_announced expects Path but “/announced” has no parameter
Calling GET /announced will fail extraction for Path. Either remove the non-parameter route or change the handler to accept an optional prefix and default to empty.
Option A: Use a single wildcard route and drop the no-param route.
- .route("/announced", get(serve_announced))
- .route("/announced/{*prefix}", get(serve_announced))
+ .route("/announced/*prefix", get(serve_announced))Option B: Make the handler accept Option and default to empty:
-async fn serve_announced(
- Path(prefix): Path<String>,
+async fn serve_announced(
+ Path(prefix): Path<Option<String>>,
Query(params): Query<Params>,
State(state): State<Arc<WebState>>,
) -> axum::response::Result<String> {
- let token = state.auth.verify(&prefix, params.jwt.as_deref())?;
+ let prefix = prefix.unwrap_or_default();
+ let token = state.auth.verify(&prefix, params.jwt.as_deref())?;And update the routes to only one wildcard route as in Option A.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let app = Router::new() | |
| .route("/certificate.sha256", get(fingerprint)) | |
| .route( | |
| "/announced", | |
| get({ | |
| let cluster = config.cluster.clone(); | |
| move || serve_announced(Path("".to_string()), cluster.clone()) | |
| }), | |
| ) | |
| .route( | |
| "/announced/{*prefix}", | |
| get({ | |
| let cluster = config.cluster.clone(); | |
| move |path| serve_announced(path, cluster) | |
| }), | |
| ) | |
| .route( | |
| "/fetch/{*path}", | |
| get({ | |
| let cluster = config.cluster.clone(); | |
| move |path| serve_fetch(path, cluster) | |
| }), | |
| ) | |
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])); | |
| let server = hyper_serve::bind(config.bind); | |
| .route("/announced", get(serve_announced)) | |
| .route("/announced/{*prefix}", get(serve_announced)) | |
| .route("/fetch/{*path}", get(serve_fetch)) | |
| .route("/ws/{*path}", any(serve_ws)) | |
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])) | |
| .with_state(Arc::new(state)); | |
| let app = Router::new() | |
| .route("/certificate.sha256", get(fingerprint)) | |
| - .route("/announced", get(serve_announced)) | |
| .route("/announced/*prefix", get(serve_announced)) | |
| .route("/fetch/{*path}", get(serve_fetch)) | |
| .route("/ws/{*path}", any(serve_ws)) | |
| .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])) | |
| .with_state(Arc::new(state)); |
🤖 Prompt for AI Agents
In rs/moq-relay/src/web.rs around lines 66 to 74, the route "/announced" is
registered but the handler serve_announced expects Path<String>, causing
extraction failures when no path parameter is present; fix by either (A)
removing the non-parameter route and keeping only the wildcard route
.route("/announced/{*prefix}", get(serve_announced)) so all requests map to the
same handler, or (B) change serve_announced to accept Option<String> (or
Path<Option<String>>) and treat None as an empty prefix, then register only the
wildcard route; implement one of these two changes and remove the duplicate
"/announced" registration so GET /announced no longer fails extraction.
| match res { | ||
| Err(Error::WebTransport(web_transport::Error::Session(_))) => { | ||
| Err(Error::Transport(_)) => { | ||
| tracing::info!("session terminated"); | ||
| session.close(1, ""); | ||
| } | ||
| Err(err) => { | ||
| tracing::warn!(%err, "session error"); | ||
| session.close(err.to_code(), &err.to_string()); | ||
| session.close(err.to_code(), err.to_string().as_ref()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Align Transport conversions with Arc and consider a blanket From
Update the remaining conversions to produce Error::Transport(Arc::new(e)) where applicable, and consider a blanket From to reduce repetition.
Apply this diff:
- Err(Error::Transport(_)) => {
+ Err(Error::Transport(_)) => {
tracing::info!("session terminated");
session.close(1, "");
}
Err(err) => {
tracing::warn!(%err, "session error");
- session.close(err.to_code(), err.to_string().as_ref());
+ session.close(err.to_code(), err.to_string().as_ref());
}And here:
- Error::Transport(self.transport.closed().await.into())
+ Error::Transport(std::sync::Arc::new(self.transport.closed().await))To ensure you catch all remaining e.into() usage that constructs Error::Transport with Box, run:
Also applies to: 80-84, 100-107, 149-156
🏁 Script executed:
#!/bin/bash
# Find all sites using Error::Transport(e.into())
rg -nP --type=rust 'Error::Transport\s*\(\s*[^)]*\.into\(\)\s*\)'Length of output: 1175
Align all Transport errors to use Arc and reduce repetition
We found eleven occurrences of Error::Transport(e.into()) across the session modules. Each should be updated to wrap the error in an Arc (and import it), for example:
- .map_err(|e| Error::Transport(e.into()))?
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?Occurrences:
- rs/moq/src/session/writer.rs:39, 46, 52, 61
- rs/moq/src/session/stream.rs:11, 21
- rs/moq/src/session/subscriber.rs:51
- rs/moq/src/session/publisher.rs:281
- rs/moq/src/session/reader.rs:44, 73, 83
Plus the site in mod.rs and:
- Error::Transport(self.transport.closed().await.into())
+ Error::Transport(std::sync::Arc::new(self.transport.closed().await))To DRY this pattern, consider adding a blanket From<E> impl:
use std::sync::Arc;
use std::error::Error as StdErr;
impl<E> From<E> for Error
where
E: Into<Arc<dyn StdErr + Send + Sync + 'static>>,
{
fn from(e: E) -> Self {
Error::Transport(e.into())
}
}With that in place you can write:
.map_err(Error::from)?and eliminate the repetitive closures.
🤖 Prompt for AI Agents
In rs/moq/src/session/mod.rs around lines 49 to 57, the match arm handling
transport errors must use Arc-wrapped errors consistently and remove repetitive
manual conversions; add an import for std::sync::Arc and a blanket From<E> for
Error (where E: Into<Arc<dyn std::error::Error + Send + Sync + 'static>>) that
constructs Error::Transport(e.into()), then replace occurrences of
Error::Transport(e.into()) (and other manual closures) across the listed session
files with Error::from or use .map_err(Error::from)? to keep code DRY and ensure
all transport errors are wrapped in Arc.
| if self | ||
| .stream | ||
| .read_buf(&mut self.buffer) | ||
| .await | ||
| .map_err(|e| Error::Transport(e.into()))? | ||
| .is_none() | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Update Transport conversions to Arc for consistency with Error
If Error::Transport holds Arc, update conversions accordingly.
Apply this diff:
- if self
- .stream
- .read_buf(&mut self.buffer)
- .await
- .map_err(|e| Error::Transport(e.into()))?
- .is_none()
+ if self
+ .stream
+ .read_buf(&mut self.buffer)
+ .await
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?
+ .is_none()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if self | |
| .stream | |
| .read_buf(&mut self.buffer) | |
| .await | |
| .map_err(|e| Error::Transport(e.into()))? | |
| .is_none() | |
| { | |
| if self | |
| .stream | |
| .read_buf(&mut self.buffer) | |
| .await | |
| .map_err(|e| Error::Transport(std::sync::Arc::new(e)))? | |
| .is_none() | |
| { |
🤖 Prompt for AI Agents
In rs/moq/src/session/reader.rs around lines 40 to 46, the code maps transport
errors into Error::Transport but Error::Transport now wraps the inner transport
error in Arc; update the mapping to construct an Arc around the converted error
(e.g. map_err(|e| Error::Transport(Arc::new(e.into()))) ), add or ensure use of
std::sync::Arc is imported, and update any other transport-to-Error conversions
in this function to wrap with Arc similarly so types match.
| if self.buffer.is_empty() | ||
| && self | ||
| .stream | ||
| .read_buf(&mut self.buffer) | ||
| .await | ||
| .map_err(|e| Error::Transport(e.into()))? | ||
| .is_none() | ||
| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use Arc in closed() transport error mapping
Align with Error::Transport(Arc<_>).
- if self.buffer.is_empty()
- && self
- .stream
- .read_buf(&mut self.buffer)
- .await
- .map_err(|e| Error::Transport(e.into()))?
- .is_none()
+ if self.buffer.is_empty()
+ && self
+ .stream
+ .read_buf(&mut self.buffer)
+ .await
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?
+ .is_none()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if self.buffer.is_empty() | |
| && self | |
| .stream | |
| .read_buf(&mut self.buffer) | |
| .await | |
| .map_err(|e| Error::Transport(e.into()))? | |
| .is_none() | |
| { | |
| if self.buffer.is_empty() | |
| && self | |
| .stream | |
| .read_buf(&mut self.buffer) | |
| .await | |
| .map_err(|e| Error::Transport(std::sync::Arc::new(e)))? | |
| .is_none() | |
| { |
🤖 Prompt for AI Agents
In rs/moq/src/session/reader.rs around lines 78 to 85, the transport error
mapping uses Error::Transport(e.into()) but the Error::Transport variant expects
an Arc-wrapped error; change the mapping to wrap the converted error in an Arc
(e.g. Arc::new(e.into())) and add/use std::sync::Arc if not already imported so
the mapped error becomes Error::Transport(Arc::new(e.into())).
| async fn run_uni(self) -> Result<(), Error> { | ||
| loop { | ||
| let stream = Reader::accept(&mut self.session).await?; | ||
| let stream = self | ||
| .session | ||
| .accept_uni() | ||
| .await | ||
| .map_err(|err| Error::Transport(err.into()))?; | ||
|
|
||
| let stream = Reader::new(stream); | ||
| let this = self.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Wrap transport errors with Arc
To align with Error::Transport(Arc<_>), update this conversion.
- let stream = self
- .session
- .accept_uni()
- .await
- .map_err(|err| Error::Transport(err.into()))?;
+ let stream = self
+ .session
+ .accept_uni()
+ .await
+ .map_err(|err| Error::Transport(std::sync::Arc::new(err)))?;🤖 Prompt for AI Agents
In rs/moq/src/session/subscriber.rs around lines 45 to 54, the map_err currently
converts transport errors to Error::Transport(err.into()) but the
Error::Transport variant expects an Arc-wrapped value; update the conversion to
wrap the transported error in an Arc (e.g.
Error::Transport(Arc::new(err.into()))) and ensure std::sync::Arc is imported or
referenced, so the error type matches Error::Transport(Arc<_>).
| self.stream | ||
| .write_buf(&mut self.buffer) | ||
| .await | ||
| .map_err(|e| Error::Transport(e.into()))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Switch Transport conversions to Arc to match Error
Update all Error::Transport(e.into()) to wrap errors with Arc::new.
Apply this diff:
- self.stream
- .write_buf(&mut self.buffer)
- .await
- .map_err(|e| Error::Transport(e.into()))?;
+ self.stream
+ .write_buf(&mut self.buffer)
+ .await
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;- self.stream.write(buf).await.map_err(|e| Error::Transport(e.into()))?;
+ self.stream
+ .write(buf)
+ .await
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;- self.stream.finish().await.map_err(|e| Error::Transport(e.into()))?;
+ self.stream
+ .finish()
+ .await
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;- self.stream.closed().await.map_err(|e| Error::Transport(e.into()))?;
+ self.stream
+ .closed()
+ .await
+ .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;Also applies to: 45-47, 52-53, 61-62
🤖 Prompt for AI Agents
In rs/moq/src/session/writer.rs around lines 36-40 (and also update the same
pattern at 45-47, 52-53, 61-62), replace occurrences of
Error::Transport(e.into()) with Error::Transport(Arc::new(e.into())) so the
transported error is wrapped in an Arc to match the Error enum variant; also
ensure std::sync::Arc is imported at the top of the file (add use
std::sync::Arc; if missing).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
rs/moq/src/session/subscriber.rs (1)
47-53: Wrap transport errors in Arc to match Error::Transport(Arc<_>)Error::Transport expects an Arc-wrapped error; using
err.into()likely mismatches the enum variant and/or loses sharing semantics.Apply this diff:
- .await - .map_err(|err| Error::Transport(err.into()))?; + .await + .map_err(|err| Error::Transport(std::sync::Arc::new(err)))?;rs/moq/src/session/writer.rs (1)
36-39: Switch Transport conversions to Arc for consistency with Error::TransportTo align with the Error enum (Transport(Arc<_>)), wrap transport errors with Arc::new instead of using
.into().Apply these diffs:
self.stream .write_buf(&mut self.buffer) .await - .map_err(|e| Error::Transport(e.into()))?; + .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;- self.stream.write(buf).await.map_err(|e| Error::Transport(e.into()))?; + self.stream + .write(buf) + .await + .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;- self.stream.finish().await.map_err(|e| Error::Transport(e.into()))?; + self.stream + .finish() + .await + .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;- self.stream.closed().await.map_err(|e| Error::Transport(e.into()))?; + self.stream + .closed() + .await + .map_err(|e| Error::Transport(std::sync::Arc::new(e)))?;Also applies to: 46-46, 52-52, 61-61
🧹 Nitpick comments (1)
rs/moq/src/session/subscriber.rs (1)
141-142: Action Required: Remove or Populate the UnusedbroadcastsMap inSubscriberIt looks like
self.broadcastsis only ever removed (in run_broadcast, line 171) but never populated, making it dead state. You should either:• Remove the bookkeeping entirely
– Delete thebroadcastsfield from theSubscriberstruct inrs/moq/src/session/subscriber.rs
– Remove the line
rust self.broadcasts.lock().remove(&path);
at line 171• Or wire it up consistently
– Before spawning, insert the new broadcast into the map, e.g.:
rust self.broadcasts .lock() .insert(path.clone(), broadcast.producer.clone()); web_async::spawn(self.clone().run_broadcast(path, broadcast.producer));
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
rs/moq/src/session/publisher.rs(12 hunks)rs/moq/src/session/subscriber.rs(13 hunks)rs/moq/src/session/writer.rs(2 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
rs/**/src/**/*.rs
📄 CodeRabbit Inference Engine (CLAUDE.md)
Rust tests should be integrated within source files (inline unit tests)
Files:
rs/moq/src/session/writer.rsrs/moq/src/session/publisher.rsrs/moq/src/session/subscriber.rs
rs/**
📄 CodeRabbit Inference Engine (CLAUDE.md)
Place Rust crates under the rs/ directory
Files:
rs/moq/src/session/writer.rsrs/moq/src/session/publisher.rsrs/moq/src/session/subscriber.rs
🧬 Code Graph Analysis (3)
rs/moq/src/session/writer.rs (2)
rs/moq/src/session/mod.rs (2)
new(25-72)stream(144-144)rs/moq/src/session/reader.rs (1)
new(13-18)
rs/moq/src/session/publisher.rs (3)
rs/moq/src/session/writer.rs (2)
new(10-15)finish(51-54)rs/moq/src/session/mod.rs (3)
new(25-72)stream(144-144)accept(101-112)rs/moq/src/session/stream.rs (1)
accept(20-27)
rs/moq/src/session/subscriber.rs (3)
rs/moq/src/session/writer.rs (1)
new(10-15)rs/moq/src/session/reader.rs (1)
new(13-18)rs/moq/src/session/stream.rs (1)
open(10-18)
🔇 Additional comments (1)
rs/moq/src/session/publisher.rs (1)
337-340: LGTM: priority calculation test matches implementationThe test asserts match the stream_priority logic and document the inversion for Quinn priorities. Good coverage for boundary behavior.
| let mut stream = session.open_uni().await.map_err(|err| Error::Transport(err.into()))?; | ||
| stream.set_priority(priority); | ||
|
|
||
| let mut stream = Writer::new(stream); | ||
| stream.encode(&message::DataType::Group).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Wrap transport error from open_uni with Arc
Maintain consistency with Error::Transport(Arc<_>) by wrapping the error with Arc.
- let mut stream = session.open_uni().await.map_err(|err| Error::Transport(err.into()))?;
+ let mut stream = session
+ .open_uni()
+ .await
+ .map_err(|err| Error::Transport(std::sync::Arc::new(err)))?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut stream = session.open_uni().await.map_err(|err| Error::Transport(err.into()))?; | |
| stream.set_priority(priority); | |
| let mut stream = Writer::new(stream); | |
| stream.encode(&message::DataType::Group).await?; | |
| let mut stream = session | |
| .open_uni() | |
| .await | |
| .map_err(|err| Error::Transport(std::sync::Arc::new(err)))?; | |
| stream.set_priority(priority); | |
| let mut stream = Writer::new(stream); | |
| stream.encode(&message::DataType::Group).await?; |
🤖 Prompt for AI Agents
In rs/moq/src/session/publisher.rs around lines 281 to 285, the map_err closure
wraps the transport error as Error::Transport(err.into()) but the
Error::Transport variant expects an Arc-wrapped error; change the mapping to
wrap the converted error in Arc (e.g. Error::Transport(Arc::new(err.into()))),
and add use std::sync::Arc if not already imported.
|
Going to make a new PR for fresh comments. |
Going to try making a WebSocket compatibility layer.
Summary by CodeRabbit
New Features
Refactor
Chores