Skip to content

Commit 1cbfbc6

Browse files
authored
feat(moq-native): support raw QUIC sessions with moql:// URLs (#578)
1 parent 0fb4823 commit 1cbfbc6

File tree

4 files changed

+110
-29
lines changed

4 files changed

+110
-29
lines changed

rs/hang-cli/src/server.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use axum::http::StatusCode;
44
use axum::response::IntoResponse;
55
use axum::{http::Method, routing::get, Router};
66
use hang::{cmaf, moq_lite};
7-
use moq_native::web_transport_quinn;
87
use std::net::SocketAddr;
98
use std::path::PathBuf;
109
use tokio::io::AsyncRead;
@@ -66,11 +65,11 @@ async fn accept(
6665
#[tracing::instrument("session", skip_all, fields(id))]
6766
async fn run_session(
6867
id: u64,
69-
session: web_transport_quinn::Request,
68+
session: moq_native::Request,
7069
name: String,
7170
consumer: moq_lite::BroadcastConsumer,
7271
) -> anyhow::Result<()> {
73-
// Blindly accept the WebTransport session, regardless of the URL.
72+
// Blindly accept the session (WebTransport or QUIC), regardless of the URL.
7473
let session = session.ok().await.context("failed to accept session")?;
7574

7675
// Create an origin producer to publish to the broadcast.

rs/moq-native/src/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,10 @@ impl Client {
185185
let connection = self.quic.connect_with(config, ip, &host)?.await?;
186186
tracing::Span::current().record("id", connection.stable_id());
187187

188-
let session = match url.scheme() {
189-
"https" => web_transport_quinn::Session::connect(connection, url).await?,
188+
let session = match alpn {
189+
web_transport_quinn::ALPN => web_transport_quinn::Session::connect(connection, url).await?,
190190
moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url),
191-
_ => unreachable!(),
191+
_ => unreachable!("ALPN was checked above"),
192192
};
193193

194194
Ok(session)

rs/moq-native/src/server.rs

Lines changed: 90 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use rustls::server::{ClientHello, ResolvesServerCert};
88
use rustls::sign::CertifiedKey;
99
use std::fs;
1010
use std::io::{self, Cursor, Read};
11+
use url::Url;
12+
use web_transport_quinn::http;
1113

1214
use futures::future::BoxFuture;
1315
use futures::stream::{FuturesUnordered, StreamExt};
@@ -77,7 +79,7 @@ impl ServerConfig {
7779

7880
pub struct Server {
7981
quic: quinn::Endpoint,
80-
accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<web_transport_quinn::Request>>>,
82+
accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
8183
fingerprints: Vec<String>,
8284
}
8385

@@ -149,21 +151,25 @@ impl Server {
149151
&self.fingerprints
150152
}
151153

152-
/// Returns the next partially established WebTransport session.
154+
/// Returns the next partially established QUIC or WebTransport session.
153155
///
154-
/// This returns a [web_transport_quinn::Request] instead of a [web_transport_quinn::Session]
155-
/// so the connection can be rejected early on an invalid path.
156-
/// Call [web_transport_quinn::Request::ok] or [web_transport_quinn::Request::close] to complete the WebTransport handshake.
157-
pub async fn accept(&mut self) -> Option<web_transport_quinn::Request> {
156+
/// This returns a [Request] instead of a [web_transport_quinn::Session]
157+
/// so the connection can be rejected early on an invalid path or missing auth.
158+
///
159+
/// The [Request] is either a WebTransport or a raw QUIC request.
160+
/// Call [Request::ok] or [Request::close] to complete the handshake in case this is
161+
/// a WebTransport request.
162+
pub async fn accept(&mut self) -> Option<Request> {
158163
loop {
159164
tokio::select! {
160165
res = self.quic.accept() => {
161166
let conn = res?;
162167
self.accept.push(Self::accept_session(conn).boxed());
163168
}
164169
Some(res) = self.accept.next() => {
165-
if let Ok(session) = res {
166-
return Some(session)
170+
match res {
171+
Ok(session) => return Some(session),
172+
Err(err) => tracing::debug!(%err, "failed to accept session"),
167173
}
168174
}
169175
_ = tokio::signal::ctrl_c() => {
@@ -177,7 +183,7 @@ impl Server {
177183
}
178184
}
179185

180-
async fn accept_session(conn: quinn::Incoming) -> anyhow::Result<web_transport_quinn::Request> {
186+
async fn accept_session(conn: quinn::Incoming) -> anyhow::Result<Request> {
181187
let mut conn = conn.accept()?;
182188

183189
let handshake = conn
@@ -197,15 +203,17 @@ impl Server {
197203

198204
let span = tracing::Span::current();
199205
span.record("id", conn.stable_id()); // TODO can we get this earlier?
206+
tracing::debug!(%host, ip = %conn.remote_address(), %alpn, "accepted");
200207

201208
match alpn.as_str() {
202209
web_transport_quinn::ALPN => {
203210
// Wait for the CONNECT request.
204-
web_transport_quinn::Request::accept(conn)
211+
let request = web_transport_quinn::Request::accept(conn)
205212
.await
206-
.context("failed to receive WebTransport request")
213+
.context("failed to receive WebTransport request")?;
214+
Ok(Request::WebTransport(request))
207215
}
208-
// TODO hack in raw QUIC support again
216+
moq_lite::ALPN => Ok(Request::Quic(QuicRequest::accept(conn))),
209217
_ => anyhow::bail!("unsupported ALPN: {}", alpn),
210218
}
211219
}
@@ -219,6 +227,76 @@ impl Server {
219227
}
220228
}
221229

230+
pub enum Request {
231+
WebTransport(web_transport_quinn::Request),
232+
Quic(QuicRequest),
233+
}
234+
235+
impl Request {
236+
/// Reject the session, returning your favorite HTTP status code.
237+
pub async fn close(self, status: http::StatusCode) -> Result<(), quinn::WriteError> {
238+
match self {
239+
Self::WebTransport(request) => request.close(status).await,
240+
Self::Quic(request) => {
241+
request.close(status);
242+
Ok(())
243+
}
244+
}
245+
}
246+
247+
/// Accept the session.
248+
///
249+
/// For WebTransport, this completes the HTTP handshake (200 OK).
250+
/// For raw QUIC, this constructs a raw session.
251+
pub async fn ok(self) -> Result<web_transport_quinn::Session, quinn::WriteError> {
252+
match self {
253+
Request::WebTransport(request) => request.ok().await,
254+
Request::Quic(request) => Ok(request.ok()),
255+
}
256+
}
257+
258+
/// Returns the URL provided by the client.
259+
pub fn url(&self) -> &Url {
260+
match self {
261+
Request::WebTransport(request) => request.url(),
262+
Request::Quic(request) => request.url(),
263+
}
264+
}
265+
}
266+
267+
pub struct QuicRequest {
268+
connection: quinn::Connection,
269+
url: Url,
270+
}
271+
272+
impl QuicRequest {
273+
/// Accept a new QUIC session from a client.
274+
pub fn accept(connection: quinn::Connection) -> Self {
275+
let url: Url = format!("moql://{}", connection.remote_address())
276+
.parse()
277+
.expect("URL is valid");
278+
Self { connection, url }
279+
}
280+
281+
/// Accept the session, returning a 200 OK if using WebTransport.
282+
pub fn ok(self) -> web_transport_quinn::Session {
283+
web_transport_quinn::Session::raw(self.connection, self.url)
284+
}
285+
286+
/// Returns the URL provided by the client.
287+
pub fn url(&self) -> &Url {
288+
&self.url
289+
}
290+
291+
/// Reject the session with a status code.
292+
///
293+
/// The status code number will be used as the error code.
294+
pub fn close(self, status: http::StatusCode) {
295+
self.connection
296+
.close(status.as_u16().into(), status.as_str().as_bytes());
297+
}
298+
}
299+
222300
#[derive(Debug)]
223301
struct ServeCerts {
224302
certs: Vec<Arc<CertifiedKey>>,

rs/moq-relay/src/connection.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,30 @@
11
use crate::{Auth, Cluster};
22

3-
use moq_native::web_transport_quinn;
3+
use moq_native::Request;
44

55
pub struct Connection {
66
pub id: u64,
7-
pub request: web_transport_quinn::Request,
7+
pub request: Request,
88
pub cluster: Cluster,
99
pub auth: Auth,
1010
}
1111

1212
impl Connection {
1313
#[tracing::instrument("conn", skip_all, fields(id = self.id))]
1414
pub async fn run(self) -> anyhow::Result<()> {
15-
// Extract the path and token from the URL.
16-
let path = self.request.url().path();
17-
let token = self
18-
.request
19-
.url()
20-
.query_pairs()
21-
.find(|(k, _)| k == "jwt")
22-
.map(|(_, v)| v.to_string());
23-
15+
let (path, token) = match &self.request {
16+
Request::WebTransport(request) => {
17+
// Extract the path and token from the URL.
18+
let path = request.url().path();
19+
let token = request
20+
.url()
21+
.query_pairs()
22+
.find(|(k, _)| k == "jwt")
23+
.map(|(_, v)| v.to_string());
24+
(path, token)
25+
}
26+
Request::Quic(_conn) => ("", None),
27+
};
2428
// Verify the URL before accepting the connection.
2529
let token = match self.auth.verify(path, token.as_deref()) {
2630
Ok(token) => token,

0 commit comments

Comments
 (0)