diff --git a/rs/hang-cli/src/server.rs b/rs/hang-cli/src/server.rs index c1aff5f4d..32b356465 100644 --- a/rs/hang-cli/src/server.rs +++ b/rs/hang-cli/src/server.rs @@ -4,7 +4,6 @@ use axum::http::StatusCode; use axum::response::IntoResponse; use axum::{http::Method, routing::get, Router}; use hang::{cmaf, moq_lite}; -use moq_native::web_transport_quinn; use std::net::SocketAddr; use std::path::PathBuf; use tokio::io::AsyncRead; @@ -66,11 +65,11 @@ async fn accept( #[tracing::instrument("session", skip_all, fields(id))] async fn run_session( id: u64, - session: web_transport_quinn::Request, + session: moq_native::Request, name: String, consumer: moq_lite::BroadcastConsumer, ) -> anyhow::Result<()> { - // Blindly accept the WebTransport session, regardless of the URL. + // Blindly accept the session (WebTransport or QUIC), regardless of the URL. let session = session.ok().await.context("failed to accept session")?; // Create an origin producer to publish to the broadcast. diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index 54e79489e..713b6ab8f 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -185,10 +185,10 @@ impl Client { let connection = self.quic.connect_with(config, ip, &host)?.await?; tracing::Span::current().record("id", connection.stable_id()); - let session = match url.scheme() { - "https" => web_transport_quinn::Session::connect(connection, url).await?, + let session = match alpn { + web_transport_quinn::ALPN => web_transport_quinn::Session::connect(connection, url).await?, moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url), - _ => unreachable!(), + _ => unreachable!("ALPN was checked above"), }; Ok(session) diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 0eae7a798..ed334cbb0 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -8,6 +8,8 @@ use rustls::server::{ClientHello, ResolvesServerCert}; use rustls::sign::CertifiedKey; use std::fs; use std::io::{self, Cursor, Read}; +use url::Url; +use web_transport_quinn::http; use futures::future::BoxFuture; use futures::stream::{FuturesUnordered, StreamExt}; @@ -77,7 +79,7 @@ impl ServerConfig { pub struct Server { quic: quinn::Endpoint, - accept: FuturesUnordered>>, + accept: FuturesUnordered>>, fingerprints: Vec, } @@ -149,12 +151,15 @@ impl Server { &self.fingerprints } - /// Returns the next partially established WebTransport session. + /// Returns the next partially established QUIC or WebTransport session. /// - /// This returns a [web_transport_quinn::Request] instead of a [web_transport_quinn::Session] - /// so the connection can be rejected early on an invalid path. - /// Call [web_transport_quinn::Request::ok] or [web_transport_quinn::Request::close] to complete the WebTransport handshake. - pub async fn accept(&mut self) -> Option { + /// This returns a [Request] instead of a [web_transport_quinn::Session] + /// so the connection can be rejected early on an invalid path or missing auth. + /// + /// The [Request] is either a WebTransport or a raw QUIC request. + /// Call [Request::ok] or [Request::close] to complete the handshake in case this is + /// a WebTransport request. + pub async fn accept(&mut self) -> Option { loop { tokio::select! { res = self.quic.accept() => { @@ -162,8 +167,9 @@ impl Server { self.accept.push(Self::accept_session(conn).boxed()); } Some(res) = self.accept.next() => { - if let Ok(session) = res { - return Some(session) + match res { + Ok(session) => return Some(session), + Err(err) => tracing::debug!(%err, "failed to accept session"), } } _ = tokio::signal::ctrl_c() => { @@ -177,7 +183,7 @@ impl Server { } } - async fn accept_session(conn: quinn::Incoming) -> anyhow::Result { + async fn accept_session(conn: quinn::Incoming) -> anyhow::Result { let mut conn = conn.accept()?; let handshake = conn @@ -197,15 +203,17 @@ impl Server { let span = tracing::Span::current(); span.record("id", conn.stable_id()); // TODO can we get this earlier? + tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepted"); match alpn.as_str() { web_transport_quinn::ALPN => { // Wait for the CONNECT request. - web_transport_quinn::Request::accept(conn) + let request = web_transport_quinn::Request::accept(conn) .await - .context("failed to receive WebTransport request") + .context("failed to receive WebTransport request")?; + Ok(Request::WebTransport(request)) } - // TODO hack in raw QUIC support again + moq_lite::ALPN => Ok(Request::Quic(QuicRequest::accept(conn))), _ => anyhow::bail!("unsupported ALPN: {}", alpn), } } @@ -219,6 +227,76 @@ impl Server { } } +pub enum Request { + WebTransport(web_transport_quinn::Request), + Quic(QuicRequest), +} + +impl Request { + /// Reject the session, returning your favorite HTTP status code. + pub async fn close(self, status: http::StatusCode) -> Result<(), quinn::WriteError> { + match self { + Self::WebTransport(request) => request.close(status).await, + Self::Quic(request) => { + request.close(status); + Ok(()) + } + } + } + + /// Accept the session. + /// + /// For WebTransport, this completes the HTTP handshake (200 OK). + /// For raw QUIC, this constructs a raw session. + pub async fn ok(self) -> Result { + match self { + Request::WebTransport(request) => request.ok().await, + Request::Quic(request) => Ok(request.ok()), + } + } + + /// Returns the URL provided by the client. + pub fn url(&self) -> &Url { + match self { + Request::WebTransport(request) => request.url(), + Request::Quic(request) => request.url(), + } + } +} + +pub struct QuicRequest { + connection: quinn::Connection, + url: Url, +} + +impl QuicRequest { + /// Accept a new QUIC session from a client. + pub fn accept(connection: quinn::Connection) -> Self { + let url: Url = format!("moql://{}", connection.remote_address()) + .parse() + .expect("URL is valid"); + Self { connection, url } + } + + /// Accept the session, returning a 200 OK if using WebTransport. + pub fn ok(self) -> web_transport_quinn::Session { + web_transport_quinn::Session::raw(self.connection, self.url) + } + + /// Returns the URL provided by the client. + pub fn url(&self) -> &Url { + &self.url + } + + /// Reject the session with a status code. + /// + /// The status code number will be used as the error code. + pub fn close(self, status: http::StatusCode) { + self.connection + .close(status.as_u16().into(), status.as_str().as_bytes()); + } +} + #[derive(Debug)] struct ServeCerts { certs: Vec>, diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index eefa6bbfd..efa0dc00c 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -1,10 +1,10 @@ use crate::{Auth, Cluster}; -use moq_native::web_transport_quinn; +use moq_native::Request; pub struct Connection { pub id: u64, - pub request: web_transport_quinn::Request, + pub request: Request, pub cluster: Cluster, pub auth: Auth, } @@ -12,15 +12,19 @@ pub struct Connection { impl Connection { #[tracing::instrument("conn", skip_all, fields(id = self.id))] pub async fn run(self) -> anyhow::Result<()> { - // Extract the path and token from the URL. - let path = self.request.url().path(); - let token = self - .request - .url() - .query_pairs() - .find(|(k, _)| k == "jwt") - .map(|(_, v)| v.to_string()); - + let (path, token) = match &self.request { + Request::WebTransport(request) => { + // Extract the path and token from the URL. + let path = request.url().path(); + let token = request + .url() + .query_pairs() + .find(|(k, _)| k == "jwt") + .map(|(_, v)| v.to_string()); + (path, token) + } + Request::Quic(_conn) => ("", None), + }; // Verify the URL before accepting the connection. let token = match self.auth.verify(path, token.as_deref()) { Ok(token) => token,