Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions rs/hang-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ use url::Url;
pub async fn client(config: moq_native::ClientConfig, url: Url, name: String, publish: Publish) -> anyhow::Result<()> {
let client = config.init()?;

tracing::info!(%url, %name, "connecting");
let session = client.connect(url).await?;

// Create an origin producer to publish to the broadcast.
let origin = moq_lite::Origin::produce();
origin.producer.publish_broadcast(&name, publish.consume());

tracing::info!(%url, %name, "connecting");

// Establish the connection, not providing a subscriber.
let session = moq_lite::Session::connect(session, origin.consumer, None).await?;
let session = client.connect_with_fallback(url, origin.consumer, None).await?;

#[cfg(unix)]
// Notify systemd that we're ready.
Expand Down
8 changes: 2 additions & 6 deletions rs/hang-cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,12 @@ async fn run_session(
name: String,
consumer: moq_lite::BroadcastConsumer,
) -> anyhow::Result<()> {
// Blindly accept the session (WebTransport or QUIC), regardless of the URL.
let session = session.ok().await.context("failed to accept session")?;

// Create an origin producer to publish to the broadcast.
let origin = moq_lite::Origin::produce();
origin.producer.publish_broadcast(&name, consumer);

let session = moq_lite::Session::accept(session, origin.consumer, None)
.await
.context("failed to accept session")?;
// Blindly accept the session (WebTransport or QUIC), regardless of the URL.
let session = session.accept(origin.consumer, None).await?;

tracing::info!(id, "accepted session");

Expand Down
8 changes: 3 additions & 5 deletions rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> {
// The "anon" path is usually configured to bypass authentication; be careful!
let url = url::Url::parse("https://cdn.moq.dev/anon/video-example").unwrap();

// Establish a WebTransport/QUIC connection.
let connection = client.connect(url).await?;

// Perform the MoQ handshake.
// Establish a WebTransport/QUIC connection and MoQ handshake.
// None means we're not consuming anything from the session, otherwise we would provide an OriginProducer.
let session = moq_lite::Session::connect(connection, origin, None).await?;
// Optional: Use connect_with_fallback if you also want to support WebSocket.
let session = client.connect(url, origin, None).await?;

// Wait until the session is closed.
session.closed().await.map_err(Into::into)
Expand Down
6 changes: 4 additions & 2 deletions rs/libmoq/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ impl Session {
) -> Result<(), Error> {
let config = moq_native::ClientConfig::default();
let client = config.init().map_err(|err| Error::Connect(Arc::new(err)))?;
let connection = client.connect(url).await.map_err(|err| Error::Connect(Arc::new(err)))?;
let session = moq_lite::Session::connect(connection, publish, consume).await?;
let session = client
.connect(url, publish, consume)
.await
.map_err(|err| Error::Connect(Arc::new(err)))?;
callback.call(());

session.closed().await?;
Expand Down
14 changes: 7 additions & 7 deletions rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,32 @@ async fn main() -> anyhow::Result<()> {

tracing::info!(url = ?config.url, "connecting to server");

let session = client.connect(config.url).await?;

let track = Track {
name: config.track,
priority: 0,
};

let origin = moq_lite::Origin::produce();

match config.role {
Command::Publish => {
let mut broadcast = moq_lite::Broadcast::produce();
let track = broadcast.producer.create_track(track);
let clock = clock::Publisher::new(track);

let origin = moq_lite::Origin::produce();
origin.producer.publish_broadcast(&config.broadcast, broadcast.consumer);

let session = moq_lite::Session::connect(session, origin.consumer, None).await?;
let session = client
.connect_with_fallback(config.url, Some(origin.consumer), None)
.await?;

tokio::select! {
res = session.closed() => res.map_err(Into::into),
res = session.closed() => res.context("session closed"),
_ = clock.run() => Ok(()),
}
}
Command::Subscribe => {
let origin = moq_lite::Origin::produce();
let session = moq_lite::Session::connect(session, None, Some(origin.producer)).await?;
let session = client.connect_with_fallback(config.url, None, origin.producer).await?;

// NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead,
// However that won't work with IETF MoQ and the current OriginConsumer API the moment.
Expand Down
37 changes: 28 additions & 9 deletions rs/moq-lite/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::sync::Arc;
use std::{future::Future, pin::Pin, sync::Arc};

use crate::{
coding::{self, Decode, Encode, Stream},
ietf, lite, setup, Error, OriginConsumer, OriginProducer,
};

pub struct Session<S: web_transport_trait::Session> {
session: S,
pub struct Session {
session: Arc<dyn SessionInner>,
}

/// The versions of MoQ that are supported by this implementation.
Expand All @@ -21,16 +21,18 @@ pub const VERSIONS: [coding::Version; 3] = [
/// The ALPN strings for supported versions.
pub const ALPNS: [&str; 2] = [lite::ALPN, ietf::ALPN];

impl<S: web_transport_trait::Session> Session<S> {
fn new(session: S) -> Self {
Self { session }
impl Session {
fn new<S: web_transport_trait::Session>(session: S) -> Self {
Self {
session: Arc::new(session),
}
}

/// Perform the MoQ handshake as a client, negotiating the version.
///
/// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer].
/// The connection remains active until the session is closed.
pub async fn connect(
pub async fn connect<S: web_transport_trait::Session>(
session: S,
publish: impl Into<Option<OriginConsumer>>,
subscribe: impl Into<Option<OriginProducer>>,
Expand Down Expand Up @@ -91,7 +93,7 @@ impl<S: web_transport_trait::Session> Session<S> {
///
/// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer].
/// The connection remains active until the session is closed.
pub async fn accept(
pub async fn accept<S: web_transport_trait::Session>(
session: S,
publish: impl Into<Option<OriginConsumer>>,
subscribe: impl Into<Option<OriginProducer>>,
Expand Down Expand Up @@ -163,6 +165,23 @@ impl<S: web_transport_trait::Session> Session<S> {
/// Block until the transport session is closed.
// TODO Remove the Result the next time we make a breaking change.
pub async fn closed(&self) -> Result<(), Error> {
Err(Error::Transport(Arc::new(self.session.closed().await)))
let err = self.session.closed().await;
Err(Error::Transport(err))
}
}

// We use a wrapper type that is dyn-compatible to remove the generic bounds from Session.
trait SessionInner: Send + Sync {
fn close(&self, code: u32, reason: &str);
fn closed(&self) -> Pin<Box<dyn Future<Output = Arc<dyn crate::error::SendSyncError>> + Send + '_>>;
}

impl<S: web_transport_trait::Session> SessionInner for S {
fn close(&self, code: u32, reason: &str) {
S::close(self, code, reason);
}

fn closed(&self) -> Pin<Box<dyn Future<Output = Arc<dyn crate::error::SendSyncError>> + Send + '_>> {
Box::pin(async move { Arc::new(S::closed(self).await) as Arc<dyn crate::error::SendSyncError> })
}
}
3 changes: 3 additions & 0 deletions rs/moq-native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ clap = { version = "4", features = ["derive", "env"] }
console-subscriber = { version = "0.5", optional = true }
futures = "0.3"
hex = "0.4"
humantime = "2.3"
humantime-serde = "1.1"
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check the workspace configuration
head -30 rs/Cargo.toml

Repository: moq-dev/moq

Length of output: 127


🏁 Script executed:

# Read the full rs/moq-native/Cargo.toml to verify the dependencies
cat -n rs/moq-native/Cargo.toml

Repository: moq-dev/moq

Length of output: 2209


🏁 Script executed:

# Check if web-transport-ws is defined in workspace dependencies
grep -A 20 "\[workspace.dependencies\]" rs/Cargo.toml

Repository: moq-dev/moq

Length of output: 101


🏁 Script executed:

# Find all Cargo.toml files in the repository
find . -name "Cargo.toml" -type f

Repository: moq-dev/moq

Length of output: 296


🏁 Script executed:

# Check if there's a root Cargo.toml with workspace definition
git ls-files | grep -i cargo

Repository: moq-dev/moq

Length of output: 306


🏁 Script executed:

# Check the root Cargo.toml for workspace dependencies
cat -n Cargo.toml

Repository: moq-dev/moq

Length of output: 1003


🌐 Web query:

tokio-tungstenite latest version crate

💡 Result:

The latest released version is tokio-tungstenite 0.28.0. [1][2]

Sources:

  • docs.rs documentation for tokio-tungstenite (0.28.0). [1]
  • tokio-tungstenite GitHub repository / project pages. [2]

🌐 Web query:

humantime crate version 2.3 latest

💡 Result:

Yes — the latest published humantime release is 2.3.0 (current as of Jan 3, 2026). [1][2]

Sources:
[1] humantime 2.3.0 documentation (docs.rs).
[2] Fedora package listing showing humantime 2.3.0.


🌐 Web query:

humantime-serde crate version 1.1 latest

💡 Result:

The latest published version of the humantime-serde crate is 1.1.1. [1][2]

Sources:

  • docs.rs listing for humantime-serde (v1.1.1). [1]
  • Fedora/crates packaging information showing humantime-serde 1.1.1. [2]

Update tokio-tungstenite to the latest version.

While humantime (2.3.0) and web-transport-ws (workspace reference) are correctly pinned, tokio-tungstenite = "0.24" is outdated. The latest version is 0.28.0. Update to tokio-tungstenite = "0.28" to align with current releases and security updates.

humantime-serde = "1.1" could also be more specific (latest is 1.1.1), though the semantic versioning allows the current specification.

🤖 Prompt for AI Agents
In rs/moq-native/Cargo.toml around lines 26 to 27, update the outdated
dependency version: change tokio-tungstenite from "0.24" to "0.28" to use the
latest release (0.28.0), and optionally pin humantime-serde to "1.1.1" for
specificity; adjust the version strings in the Cargo.toml dependency entries
accordingly and run cargo update to refresh the lockfile.


moq-lite = { workspace = true }
parking_lot = { version = "0.12", features = ["deadlock_detection"] }
Expand All @@ -45,6 +47,7 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2"
web-transport-quinn = { workspace = true }
web-transport-ws = { workspace = true }

[dev-dependencies]
anyhow = "1"
Expand Down
10 changes: 4 additions & 6 deletions rs/moq-native/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> {
// The "anon" path is usually configured to bypass authentication; be careful!
let url = url::Url::parse("https://cdn.moq.dev/anon/chat-example").unwrap();

// Establish a WebTransport/QUIC connection.
let connection = client.connect(url).await?;

// Perform the MoQ handshake.
// None means we're not consuming anything from the session, otherwise we would provide an OriginProducer.
let session = moq_lite::Session::connect(connection, origin, None).await?;
// Establish a WebTransport/QUIC connection and MoQ handshake.
// Optional: You could do this as two separate steps, but this is more convenient.
// Optional: Use connect_with_fallback if you also want to support WebSocket too.
let session = client.connect(url, origin, None).await?;

// Wait until the session is closed.
session.closed().await.map_err(Into::into)
Expand Down
Loading
Loading