diff --git a/Cargo.lock b/Cargo.lock index a4242195b..ec44bbc0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1272,6 +1272,16 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "1.8.1" @@ -1785,6 +1795,8 @@ dependencies = [ "console-subscriber", "futures", "hex", + "humantime", + "humantime-serde", "moq-lite", "parking_lot", "quinn", @@ -1802,6 +1814,7 @@ dependencies = [ "tracing-subscriber", "url", "web-transport-quinn", + "web-transport-ws", ] [[package]] diff --git a/rs/hang-cli/src/client.rs b/rs/hang-cli/src/client.rs index ac8de46b2..950f13420 100644 --- a/rs/hang-cli/src/client.rs +++ b/rs/hang-cli/src/client.rs @@ -6,15 +6,14 @@ use url::Url; pub async fn client(config: moq_native::ClientConfig, url: Url, name: String, publish: Publish) -> anyhow::Result<()> { let client = config.init()?; - tracing::info!(%url, %name, "connecting"); - let session = client.connect(url).await?; - // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); origin.producer.publish_broadcast(&name, publish.consume()); + tracing::info!(%url, %name, "connecting"); + // Establish the connection, not providing a subscriber. - let session = moq_lite::Session::connect(session, origin.consumer, None).await?; + let session = client.connect_with_fallback(url, origin.consumer, None).await?; #[cfg(unix)] // Notify systemd that we're ready. diff --git a/rs/hang-cli/src/server.rs b/rs/hang-cli/src/server.rs index 4cd93c96e..84a9bf6bd 100644 --- a/rs/hang-cli/src/server.rs +++ b/rs/hang-cli/src/server.rs @@ -74,16 +74,12 @@ async fn run_session( name: String, consumer: moq_lite::BroadcastConsumer, ) -> anyhow::Result<()> { - // Blindly accept the session (WebTransport or QUIC), regardless of the URL. - let session = session.ok().await.context("failed to accept session")?; - // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); origin.producer.publish_broadcast(&name, consumer); - let session = moq_lite::Session::accept(session, origin.consumer, None) - .await - .context("failed to accept session")?; + // Blindly accept the session (WebTransport or QUIC), regardless of the URL. + let session = session.accept(origin.consumer, None).await?; tracing::info!(id, "accepted session"); diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index 9ad11e9b2..5132db605 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -31,12 +31,10 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> { // The "anon" path is usually configured to bypass authentication; be careful! let url = url::Url::parse("https://cdn.moq.dev/anon/video-example").unwrap(); - // Establish a WebTransport/QUIC connection. - let connection = client.connect(url).await?; - - // Perform the MoQ handshake. + // Establish a WebTransport/QUIC connection and MoQ handshake. // None means we're not consuming anything from the session, otherwise we would provide an OriginProducer. - let session = moq_lite::Session::connect(connection, origin, None).await?; + // Optional: Use connect_with_fallback if you also want to support WebSocket. + let session = client.connect(url, origin, None).await?; // Wait until the session is closed. session.closed().await.map_err(Into::into) diff --git a/rs/libmoq/src/session.rs b/rs/libmoq/src/session.rs index 5632e69d6..980608f3e 100644 --- a/rs/libmoq/src/session.rs +++ b/rs/libmoq/src/session.rs @@ -47,8 +47,10 @@ impl Session { ) -> Result<(), Error> { let config = moq_native::ClientConfig::default(); let client = config.init().map_err(|err| Error::Connect(Arc::new(err)))?; - let connection = client.connect(url).await.map_err(|err| Error::Connect(Arc::new(err)))?; - let session = moq_lite::Session::connect(connection, publish, consume).await?; + let session = client + .connect(url, publish, consume) + .await + .map_err(|err| Error::Connect(Arc::new(err)))?; callback.call(()); session.closed().await?; diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index e548f0145..6f0174638 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -48,32 +48,32 @@ async fn main() -> anyhow::Result<()> { tracing::info!(url = ?config.url, "connecting to server"); - let session = client.connect(config.url).await?; - let track = Track { name: config.track, priority: 0, }; + let origin = moq_lite::Origin::produce(); + match config.role { Command::Publish => { let mut broadcast = moq_lite::Broadcast::produce(); let track = broadcast.producer.create_track(track); let clock = clock::Publisher::new(track); - let origin = moq_lite::Origin::produce(); origin.producer.publish_broadcast(&config.broadcast, broadcast.consumer); - let session = moq_lite::Session::connect(session, origin.consumer, None).await?; + let session = client + .connect_with_fallback(config.url, Some(origin.consumer), None) + .await?; tokio::select! { - res = session.closed() => res.map_err(Into::into), + res = session.closed() => res.context("session closed"), _ = clock.run() => Ok(()), } } Command::Subscribe => { - let origin = moq_lite::Origin::produce(); - let session = moq_lite::Session::connect(session, None, Some(origin.producer)).await?; + let session = client.connect_with_fallback(config.url, None, origin.producer).await?; // NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead, // However that won't work with IETF MoQ and the current OriginConsumer API the moment. diff --git a/rs/moq-lite/src/session.rs b/rs/moq-lite/src/session.rs index 5f8f4cf0b..cf57d12c9 100644 --- a/rs/moq-lite/src/session.rs +++ b/rs/moq-lite/src/session.rs @@ -1,12 +1,12 @@ -use std::sync::Arc; +use std::{future::Future, pin::Pin, sync::Arc}; use crate::{ coding::{self, Decode, Encode, Stream}, ietf, lite, setup, Error, OriginConsumer, OriginProducer, }; -pub struct Session { - session: S, +pub struct Session { + session: Arc, } /// The versions of MoQ that are supported by this implementation. @@ -21,16 +21,18 @@ pub const VERSIONS: [coding::Version; 3] = [ /// The ALPN strings for supported versions. pub const ALPNS: [&str; 2] = [lite::ALPN, ietf::ALPN]; -impl Session { - fn new(session: S) -> Self { - Self { session } +impl Session { + fn new(session: S) -> Self { + Self { + session: Arc::new(session), + } } /// Perform the MoQ handshake as a client, negotiating the version. /// /// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer]. /// The connection remains active until the session is closed. - pub async fn connect( + pub async fn connect( session: S, publish: impl Into>, subscribe: impl Into>, @@ -91,7 +93,7 @@ impl Session { /// /// Publishing is performed with [OriginConsumer] and subscribing with [OriginProducer]. /// The connection remains active until the session is closed. - pub async fn accept( + pub async fn accept( session: S, publish: impl Into>, subscribe: impl Into>, @@ -163,6 +165,23 @@ impl Session { /// Block until the transport session is closed. // TODO Remove the Result the next time we make a breaking change. pub async fn closed(&self) -> Result<(), Error> { - Err(Error::Transport(Arc::new(self.session.closed().await))) + let err = self.session.closed().await; + Err(Error::Transport(err)) + } +} + +// We use a wrapper type that is dyn-compatible to remove the generic bounds from Session. +trait SessionInner: Send + Sync { + fn close(&self, code: u32, reason: &str); + fn closed(&self) -> Pin> + Send + '_>>; +} + +impl SessionInner for S { + fn close(&self, code: u32, reason: &str) { + S::close(self, code, reason); + } + + fn closed(&self) -> Pin> + Send + '_>> { + Box::pin(async move { Arc::new(S::closed(self).await) as Arc }) } } diff --git a/rs/moq-native/Cargo.toml b/rs/moq-native/Cargo.toml index 39d940272..93b644f0a 100644 --- a/rs/moq-native/Cargo.toml +++ b/rs/moq-native/Cargo.toml @@ -23,6 +23,8 @@ clap = { version = "4", features = ["derive", "env"] } console-subscriber = { version = "0.5", optional = true } futures = "0.3" hex = "0.4" +humantime = "2.3" +humantime-serde = "1.1" moq-lite = { workspace = true } parking_lot = { version = "0.12", features = ["deadlock_detection"] } @@ -45,6 +47,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2" web-transport-quinn = { workspace = true } +web-transport-ws = { workspace = true } [dev-dependencies] anyhow = "1" diff --git a/rs/moq-native/examples/chat.rs b/rs/moq-native/examples/chat.rs index 333812c77..0f794afc6 100644 --- a/rs/moq-native/examples/chat.rs +++ b/rs/moq-native/examples/chat.rs @@ -30,12 +30,10 @@ async fn run_session(origin: moq_lite::OriginConsumer) -> anyhow::Result<()> { // The "anon" path is usually configured to bypass authentication; be careful! let url = url::Url::parse("https://cdn.moq.dev/anon/chat-example").unwrap(); - // Establish a WebTransport/QUIC connection. - let connection = client.connect(url).await?; - - // Perform the MoQ handshake. - // None means we're not consuming anything from the session, otherwise we would provide an OriginProducer. - let session = moq_lite::Session::connect(connection, origin, None).await?; + // Establish a WebTransport/QUIC connection and MoQ handshake. + // Optional: You could do this as two separate steps, but this is more convenient. + // Optional: Use connect_with_fallback if you also want to support WebSocket too. + let session = client.connect(url, origin, None).await?; // Wait until the session is closed. session.closed().await.map_err(Into::into) diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 2eb36d5e2..083a5d0cd 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -2,9 +2,15 @@ use crate::crypto; use anyhow::Context; use rustls::pki_types::{CertificateDer, ServerName, UnixTime}; use rustls::RootCertStore; +use std::collections::HashSet; use std::path::PathBuf; +use std::sync::{LazyLock, Mutex}; use std::{fs, io, net, sync::Arc, time}; use url::Url; +use web_transport_ws::{tokio_tungstenite, tungstenite}; + +// Track servers (hostname:port) where WebSocket won the race, so we won't give QUIC a headstart next time +static WEBSOCKET_WON: LazyLock>> = LazyLock::new(|| Mutex::new(HashSet::new())); #[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)] #[serde(default, deny_unknown_fields)] @@ -30,6 +36,23 @@ pub struct ClientTls { pub disable_verify: Option, } +#[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)] +#[serde(default, deny_unknown_fields)] +pub struct ClientWebSocket { + /// Delay in milliseconds before attempting WebSocket fallback (default: 200) + /// If WebSocket won the previous race for a given server, this will be 0. + #[arg( + id = "websocket-delay", + long = "websocket-delay", + env = "MOQ_CLIENT_WEBSOCKET_DELAY", + default_value = "200ms", + value_parser = humantime::parse_duration, + )] + #[serde(with = "humantime_serde")] + #[serde(skip_serializing_if = "Option::is_none")] + pub delay: Option, +} + #[derive(Clone, Debug, clap::Parser, serde::Serialize, serde::Deserialize)] #[serde(deny_unknown_fields, default)] pub struct ClientConfig { @@ -45,6 +68,10 @@ pub struct ClientConfig { #[command(flatten)] #[serde(default)] pub tls: ClientTls, + + #[command(flatten)] + #[serde(default)] + pub websocket: ClientWebSocket, } impl Default for ClientConfig { @@ -52,6 +79,7 @@ impl Default for ClientConfig { Self { bind: "[::]:0".parse().unwrap(), tls: ClientTls::default(), + websocket: ClientWebSocket::default(), } } } @@ -67,6 +95,7 @@ pub struct Client { pub quic: quinn::Endpoint, pub tls: rustls::ClientConfig, pub transport: Arc, + pub websocket_delay: Option, } impl Client { @@ -135,10 +164,67 @@ impl Client { let quic = quinn::Endpoint::new(endpoint_config, None, socket, runtime).context("failed to create QUIC endpoint")?; - Ok(Self { quic, tls, transport }) + Ok(Self { + quic, + tls, + transport, + websocket_delay: config.websocket.delay, + }) + } + + /// Establish a WebTransport/QUIC connection followed by a MoQ handshake. + pub async fn connect( + &self, + url: Url, + publish: impl Into>, + subscribe: impl Into>, + ) -> anyhow::Result { + let session = self.connect_quic(url).await?; + let session = moq_lite::Session::connect(session, publish, subscribe).await?; + Ok(session) + } + + /// Establish a WebTransport/QUIC connection or a WebSocket connection, whichever is available first. + /// + /// Establishes a MoQ handshake on the winning transport. + pub async fn connect_with_fallback( + &self, + url: Url, + publish: impl Into>, + subscribe: impl Into>, + ) -> anyhow::Result { + // Create futures for both possible protocols + let quic_url = url.clone(); + let quic_handle = async { + match self.connect_quic(quic_url).await { + Ok(session) => Some(session), + Err(err) => { + tracing::warn!(%err, "QUIC connection failed"); + None + } + } + }; + + let ws_handle = async { + match self.connect_websocket(url).await { + Ok(session) => Some(session), + Err(err) => { + tracing::warn!(%err, "WebSocket connection failed"); + None + } + } + }; + + // Race the connection futures + Ok(tokio::select! { + Some(quic) = quic_handle => moq_lite::Session::connect(quic, publish, subscribe).await?, + Some(ws) = ws_handle => moq_lite::Session::connect(ws, publish, subscribe).await?, + // If both attempts fail, return an error + else => anyhow::bail!("failed to connect to server"), + }) } - pub async fn connect(&self, mut url: Url) -> anyhow::Result { + async fn connect_quic(&self, mut url: Url) -> anyhow::Result { let mut config = self.tls.clone(); let host = url.host().context("invalid DNS name")?.to_string(); @@ -203,6 +289,64 @@ impl Client { Ok(session) } + + async fn connect_websocket(&self, mut url: Url) -> anyhow::Result { + let host = url.host_str().context("missing hostname")?.to_string(); + let port = url.port().unwrap_or_else(|| match url.scheme() { + "https" | "wss" | "moql" | "moqt" => 443, + "http" | "ws" => 80, + _ => 443, + }); + let key = (host, port); + + // Apply a small penalty to WebSocket to improve odds for QUIC to connect first, + // unless we've already had to fall back to WebSockets for this server. + // TODO if let chain + match self.websocket_delay { + Some(delay) if !WEBSOCKET_WON.lock().unwrap().contains(&key) => { + tokio::time::sleep(delay).await; + tracing::debug!(%url, delay_ms = %delay.as_millis(), "QUIC not yet connected, attempting WebSocket fallback"); + } + _ => {} + } + + // Convert URL scheme: http:// -> ws://, https:// -> wss:// + match url.scheme() { + "http" => { + url.set_scheme("ws").expect("failed to set scheme"); + } + "https" | "moql" | "moqt" => { + url.set_scheme("wss").expect("failed to set scheme"); + } + "ws" | "wss" => {} + _ => anyhow::bail!("unsupported URL scheme for WebSocket: {}", url.scheme()), + }; + + tracing::debug!(%url, "connecting via WebSocket"); + + // Connect using tokio-tungstenite + let (ws_stream, _response) = tokio_tungstenite::connect_async_with_config( + url.as_str(), + Some(tungstenite::protocol::WebSocketConfig { + max_message_size: Some(64 << 20), // 64 MB + max_frame_size: Some(16 << 20), // 16 MB + accept_unmasked_frames: false, + ..Default::default() + }), + false, // disable_nagle + ) + .await + .context("failed to connect WebSocket")?; + + // Wrap WebSocket in WebTransport compatibility layer + // Similar to what the relay does: web_transport_ws::Session::new(socket, true) + let session = web_transport_ws::Session::new(ws_stream, false); + + tracing::warn!(%url, "using WebSocket fallback"); + WEBSOCKET_WON.lock().unwrap().insert(key); + + Ok(session) + } } #[derive(Debug)] diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 6d8845d18..e2b168fa1 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -245,7 +245,7 @@ pub enum Request { impl Request { /// Reject the session, returning your favorite HTTP status code. - pub async fn close(self, status: http::StatusCode) -> Result<(), ServerError> { + pub async fn reject(self, status: http::StatusCode) -> Result<(), ServerError> { match self { Self::WebTransport(request) => request.close(status).await, Self::Quic(request) => { @@ -255,15 +255,18 @@ impl Request { } } - /// Accept the session. - /// - /// For WebTransport, this completes the HTTP handshake (200 OK). - /// For raw QUIC, this constructs a raw session. - pub async fn ok(self) -> Result { - match self { - Request::WebTransport(request) => request.ok().await, - Request::Quic(request) => Ok(request.ok()), - } + /// Accept the session, performing rest of the MoQ handshake. + pub async fn accept( + self, + publish: impl Into>, + subscribe: impl Into>, + ) -> anyhow::Result { + let session = match self { + Request::WebTransport(request) => request.ok().await?, + Request::Quic(request) => request.ok(), + }; + let session = moq_lite::Session::accept(session, publish, subscribe).await?; + Ok(session) } /// Returns the URL provided by the client. diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index bd91fceb7..4f35fffb4 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -264,18 +264,14 @@ impl Cluster { tracing::info!(%url, "connecting to remote"); // Connect to the remote node. - let conn = self - .client - .connect(url.clone()) - .await - .context("failed to connect to remote")?; - let publish = Some(self.primary.consumer.consume()); let subscribe = Some(self.secondary.producer.clone()); - let session = moq_lite::Session::connect(conn, publish, subscribe) + let session = self + .client + .connect(url.clone(), publish, subscribe) .await - .context("failed to establish session")?; + .context("failed to connect to remote")?; session.closed().await.map_err(Into::into) } diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 3c010e532..07c4007eb 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -29,7 +29,7 @@ impl Connection { let token = match self.auth.verify(path, token.as_deref()) { Ok(token) => token, Err(err) => { - let _ = self.request.close(err.clone().into()).await; + let _ = self.request.reject(err.clone().into()).await; return Err(err.into()); } }; @@ -51,12 +51,10 @@ impl Connection { } // Accept the connection. - let session = self.request.ok().await?; - // NOTE: subscribe and publish seem backwards because of how relays work. // We publish the tracks the client is allowed to subscribe to. // We subscribe to the tracks the client is allowed to publish. - let session = moq_lite::Session::accept(session, subscribe, publish).await?; + let session = self.request.accept(subscribe, publish).await?; // Wait until the session is closed. session.closed().await.map_err(Into::into)