diff --git a/js/hang/package.json b/js/hang/package.json index 976a6d9a4..385dde6dd 100644 --- a/js/hang/package.json +++ b/js/hang/package.json @@ -28,7 +28,6 @@ ], "files": [ "./src", - "./dist", "README.md", "tsconfig.json" ], diff --git a/js/hang/src/connection.ts b/js/hang/src/connection.ts index 12f0d32a3..ede11e5f2 100644 --- a/js/hang/src/connection.ts +++ b/js/hang/src/connection.ts @@ -18,7 +18,7 @@ export type ConnectionProps = { maxDelay?: number; }; -export type ConnectionStatus = "connecting" | "connected" | "disconnected" | "unsupported"; +export type ConnectionStatus = "connecting" | "connected" | "disconnected"; export class Connection { url: Signal; @@ -43,12 +43,6 @@ export class Connection { this.#delay = this.delay; - if (typeof WebTransport === "undefined") { - console.warn("WebTransport is not supported"); - this.status.set("unsupported"); - return; - } - // Create a reactive root so cleanup is easier. this.signals.effect(this.#connect.bind(this)); } diff --git a/js/hang/src/support/element.ts b/js/hang/src/support/element.ts index 48acd56c2..afec0dcb6 100644 --- a/js/hang/src/support/element.ts +++ b/js/hang/src/support/element.ts @@ -285,7 +285,7 @@ export default class HangSupport extends HTMLElement { container.appendChild(col3Div); }; - addRow("WebTransport", "", binary(support.webtransport)); + addRow("WebTransport", "", partial(support.webtransport)); if (mode !== "core") { if (mode !== "watch") { diff --git a/js/hang/src/support/index.ts b/js/hang/src/support/index.ts index 953823c34..efe5b5eb1 100644 --- a/js/hang/src/support/index.ts +++ b/js/hang/src/support/index.ts @@ -22,7 +22,7 @@ export type Video = { }; export type Full = { - webtransport: boolean; + webtransport: Partial; audio: { capture: boolean; encoding: Audio | undefined; @@ -115,7 +115,7 @@ async function videoEncoderSupported(codec: keyof typeof CODECS) { export async function isSupported(): Promise { return { - webtransport: typeof WebTransport !== "undefined", + webtransport: typeof WebTransport !== "undefined" ? "full" : "partial", audio: { capture: typeof AudioWorkletNode !== "undefined", encoding: diff --git a/js/justfile b/js/justfile index 61154e4e7..44fc08851 100644 --- a/js/justfile +++ b/js/justfile @@ -8,12 +8,12 @@ default: # Run the web server web url='http://localhost:4443/anon': - pnpm -r i + pnpm i VITE_RELAY_URL="{{url}}" pnpm -r run dev # Run the CI checks check flags="": - pnpm -r install {{flags}} + pnpm install {{flags}} # Make sure Typescript compiles pnpm -r run check @@ -30,7 +30,7 @@ check flags="": # Automatically fix some issues. fix flags="": # Fix the JS packages - pnpm -r install {{flags}} + pnpm install {{flags}} # Format and lint pnpm exec biome check --fix @@ -44,5 +44,5 @@ upgrade: # Build the packages build flags="": - pnpm -r install {{flags}} + pnpm install {{flags}} pnpm -r run build diff --git a/js/moq/package.json b/js/moq/package.json index 324fd8467..942959561 100644 --- a/js/moq/package.json +++ b/js/moq/package.json @@ -20,7 +20,8 @@ "release": "tsx ../scripts/release.ts" }, "dependencies": { - "async-mutex": "^0.5.0" + "async-mutex": "^0.5.0", + "@kixelated/web-transport-ws": "^0.1" }, "peerDependencies": { "zod": "^4.0.0" diff --git a/js/moq/src/connection.ts b/js/moq/src/connection.ts index 4420f262e..01ef80dc0 100644 --- a/js/moq/src/connection.ts +++ b/js/moq/src/connection.ts @@ -6,6 +6,12 @@ import type * as Path from "./path"; import { Stream } from "./stream"; import * as Hex from "./util/hex"; +// Check if we need to load the WebSocket polyfill. +let polyfill: Promise; +if (typeof globalThis !== "undefined" && !("WebTransport" in globalThis)) { + polyfill = import("@kixelated/web-transport-ws"); +} + export interface Connection { readonly url: URL; @@ -26,33 +32,45 @@ export async function connect(url: URL): Promise { const options: WebTransportOptions = { allowPooling: false, congestionControl: "low-latency", - requireUnreliable: true, }; - let adjustedUrl = url; + let finalUrl = url; + + let quic: WebTransport; + if (polyfill) { + console.warn("Using web-transport-ws polyfill; user experience may suffer during congestion."); + const WebTransportWs = (await polyfill).default; + quic = new WebTransportWs(finalUrl, options); + } else { + // Only perform certificate fetch and URL rewrite when polyfill is not needed + // This is needed because WebTransport is a butt to work with in local development. + if (url.protocol === "http:") { + const fingerprintUrl = new URL(url); + fingerprintUrl.pathname = "/certificate.sha256"; + fingerprintUrl.search = ""; + console.warn( + fingerprintUrl.toString(), + "performing an insecure fingerprint fetch; use https:// in production", + ); - if (url.protocol === "http:") { - const fingerprintUrl = new URL(url); - fingerprintUrl.pathname = "/certificate.sha256"; - fingerprintUrl.search = ""; - console.warn(fingerprintUrl.toString(), "performing an insecure fingerprint fetch; use https:// in production"); + // Fetch the fingerprint from the server. + const fingerprint = await fetch(fingerprintUrl); + const fingerprintText = await fingerprint.text(); - // Fetch the fingerprint from the server. - const fingerprint = await fetch(fingerprintUrl); - const fingerprintText = await fingerprint.text(); + options.serverCertificateHashes = [ + { + algorithm: "sha-256", + value: Hex.toBytes(fingerprintText), + }, + ]; - options.serverCertificateHashes = [ - { - algorithm: "sha-256", - value: Hex.toBytes(fingerprintText), - }, - ]; + finalUrl = new URL(url); + finalUrl.protocol = "https:"; + } - adjustedUrl = new URL(url); - adjustedUrl.protocol = "https:"; + quic = new WebTransport(finalUrl, options); } - const quic = new WebTransport(adjustedUrl, options); await quic.ready; // moq-rs currently requires the ROLE extension to be set. @@ -75,10 +93,10 @@ export async function connect(url: URL): Promise { const server = await Lite.SessionServer.decode(stream.reader); if (server.version === Lite.CURRENT_VERSION) { console.debug("moq-lite session established"); - return new Lite.Connection(adjustedUrl, quic, stream); + return new Lite.Connection(finalUrl, quic, stream); } else if (server.version === Ietf.CURRENT_VERSION) { console.debug("moq-ietf session established"); - return new Ietf.Connection(adjustedUrl, quic, stream); + return new Ietf.Connection(finalUrl, quic, stream); } else { throw new Error(`unsupported server version: ${server.version.toString()}`); } diff --git a/js/moq/src/lite/connection.ts b/js/moq/src/lite/connection.ts index da0d36c22..9a99e0f77 100644 --- a/js/moq/src/lite/connection.ts +++ b/js/moq/src/lite/connection.ts @@ -112,14 +112,16 @@ export class Connection implements ConnectionInterface { } async #runSession() { - // Receive messages until the connection is closed. - for (;;) { - const msg = await SessionInfo.decodeMaybe(this.#session.reader); - if (!msg) break; - // TODO use the session info + try { + // Receive messages until the connection is closed. + for (;;) { + const msg = await SessionInfo.decodeMaybe(this.#session.reader); + if (!msg) break; + // TODO use the session info + } + } finally { + console.warn("session stream closed"); } - - console.warn("session stream closed"); } async #runBidis() { diff --git a/js/moq/src/stream.ts b/js/moq/src/stream.ts index 149911519..bc018a475 100644 --- a/js/moq/src/stream.ts +++ b/js/moq/src/stream.ts @@ -66,11 +66,19 @@ export class Reader { // Adds more data to the buffer, returning true if more data was added. async #fill(): Promise { - const result = await this.#reader?.read(); - if (!result || result.done) { + if (!this.#reader) { return false; } + const result = await this.#reader.read(); + if (result.done) { + return false; + } + + if (result.value.byteLength === 0) { + throw new Error("unexpected empty chunk"); + } + const buffer = new Uint8Array(result.value); if (this.#buffer.byteLength === 0) { diff --git a/js/pnpm-lock.yaml b/js/pnpm-lock.yaml index 93e0a08bd..097472df2 100644 --- a/js/pnpm-lock.yaml +++ b/js/pnpm-lock.yaml @@ -101,6 +101,9 @@ importers: moq: dependencies: + '@kixelated/web-transport-ws': + specifier: ^0.1 + version: 0.1.0 async-mutex: specifier: ^0.5.0 version: 0.5.0 @@ -549,6 +552,9 @@ packages: '@jridgewell/trace-mapping@0.3.30': resolution: {integrity: sha512-GQ7Nw5G2lTu/BtHTKfXhKHok2WGetd4XYcVKGx00SjAk8GMwgJM3zr6zORiPGuOE+/vkc90KtTosSSvaCjKb2Q==} + '@kixelated/web-transport-ws@0.1.0': + resolution: {integrity: sha512-HKxvAKSX/XSc8hyY7hAIpUCZ+QwsPHyWt0wpPIItLn5Wa4xwz/GPXHmTTGzFAf42xu+PfOsJOHLfRRJaXTUDLA==} + '@napi-rs/wasm-runtime@1.0.3': resolution: {integrity: sha512-rZxtMsLwjdXkMUGC3WwsPwLNVqVqnTJT6MNIB6e+5fhMcSCPP0AOsNWuMQ5mdCq6HNjs/ZeWAEchpqeprqBD2Q==} @@ -2139,6 +2145,8 @@ snapshots: '@jridgewell/resolve-uri': 3.1.2 '@jridgewell/sourcemap-codec': 1.5.5 + '@kixelated/web-transport-ws@0.1.0': {} + '@napi-rs/wasm-runtime@1.0.3': dependencies: '@emnapi/core': 1.5.0 diff --git a/js/tsconfig.json b/js/tsconfig.json index ba54be9c2..e0c858f01 100644 --- a/js/tsconfig.json +++ b/js/tsconfig.json @@ -6,9 +6,9 @@ "moduleResolution": "bundler", "declaration": true, - "declarationMap": true, + //"declarationMap": true, "isolatedModules": true, - "sourceMap": true, + //"sourceMap": true, // https://www.typescriptlang.org/tsconfig/#Type_Checking_6248 "allowUnreachableCode": false, diff --git a/rs/.cargo/config.toml b/rs/.cargo/config.toml deleted file mode 100644 index 84671750f..000000000 --- a/rs/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -rustflags = ["--cfg=web_sys_unstable_apis"] diff --git a/rs/Cargo.lock b/rs/Cargo.lock index a2122b3b9..3b6cdedf4 100644 --- a/rs/Cargo.lock +++ b/rs/Cargo.lock @@ -161,6 +161,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ "axum-core", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -180,8 +181,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite 0.26.2", "tower 0.5.2", "tower-layer", "tower-service", @@ -258,12 +261,27 @@ version = "2.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -427,12 +445,31 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "darling" version = "0.20.11" @@ -468,6 +505,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "deranged" version = "0.4.0" @@ -499,6 +542,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -681,6 +734,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1347,7 +1410,7 @@ dependencies = [ "tokio", "tracing", "web-async", - "web-transport", + "web-transport-trait", ] [[package]] @@ -1373,7 +1436,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "web-transport", + "web-transport-quinn", "webpki", ] @@ -1385,6 +1448,7 @@ dependencies = [ "axum", "bytes", "clap", + "futures", "http-body", "hyper-serve", "moq-lite", @@ -1399,7 +1463,7 @@ dependencies = [ "tower-http", "tracing", "url", - "web-transport", + "web-transport-ws", ] [[package]] @@ -1751,7 +1815,7 @@ dependencies = [ "aws-lc-rs", "bytes", "getrandom 0.3.3", - "rand", + "rand 0.9.2", "ring", "rustc-hash 2.1.1", "rustls", @@ -1793,14 +1857,35 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -1810,7 +1895,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -2247,6 +2341,17 @@ dependencies = [ "syn", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2523,6 +2628,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.26.2", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -2716,6 +2845,47 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.2", + "sha1", + "thiserror 2.0.16", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "unicase" version = "2.8.1" @@ -2758,6 +2928,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -2776,6 +2952,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" @@ -2892,18 +3074,6 @@ dependencies = [ "wasm-bindgen-futures", ] -[[package]] -name = "web-streams" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15c4d5dbf19463c4b65e974303d453cc11991873c7a4a4953214f791d73303a2" -dependencies = [ - "thiserror 2.0.16", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "web-sys" version = "0.3.77" @@ -2924,24 +3094,11 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "web-transport" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dd16497e7d916fe90639b0b8553bdb3098d488f576c3a825da852ed0e630d59" -dependencies = [ - "bytes", - "thiserror 2.0.16", - "url", - "web-transport-quinn", - "web-transport-wasm", -] - [[package]] name = "web-transport-proto" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1814af4572856a29a2d29a56520e86fda994423043b70139ce98e5a32e0d91be" +checksum = "fb650c577c46254d16041c7fe0dc9901d9a42df3f46e77e9d05d1b3c17294b19" dependencies = [ "bytes", "http", @@ -2951,9 +3108,9 @@ dependencies = [ [[package]] name = "web-transport-quinn" -version = "0.7.3" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f678f219136b44edb0f264679f6668a41817ccb7cf9098256ff9a359cee8d010" +checksum = "aad296f2c132240811fa6783fbece2d90aada7b6b2081c22aeff533a4e695bd6" dependencies = [ "aws-lc-rs", "bytes", @@ -2968,22 +3125,31 @@ dependencies = [ "tokio", "url", "web-transport-proto", + "web-transport-trait", ] [[package]] -name = "web-transport-wasm" -version = "0.5.1" +name = "web-transport-trait" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3ad9b5e49988cf8f4fd722c1b390bad43482b866ac7a2ba0f23280eba74c893" +checksum = "4850148841799c83f033f4dddccb219f1f097aff6db1bda5b0d3be69fefb32bd" dependencies = [ "bytes", - "js-sys", +] + +[[package]] +name = "web-transport-ws" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f7606375ad1582a0c60d3fe8f18e460a8e0cb6a31f37606a19b905c85b1329a" +dependencies = [ + "bytes", + "futures", "thiserror 2.0.16", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-streams", - "web-sys", + "tokio", + "tokio-tungstenite 0.24.0", + "web-transport-proto", + "web-transport-trait", ] [[package]] diff --git a/rs/Cargo.toml b/rs/Cargo.toml index 284fb7bcc..91e3734c9 100644 --- a/rs/Cargo.toml +++ b/rs/Cargo.toml @@ -12,7 +12,6 @@ members = [ resolver = "2" [workspace.dependencies] - hang = { version = "0.5", path = "hang" } moq-lite = { version = "0.6", path = "moq" } moq-native = { version = "0.7", path = "moq-native" } @@ -21,4 +20,6 @@ moq-token = { version = "0.5", path = "moq-token" } serde = { version = "1", features = ["derive"] } tokio = "1.45" web-async = { version = "0.1.1", features = ["tracing"] } -web-transport = "0.9.4" +web-transport-quinn = { version = "0.8" } +web-transport-trait = { version = "0.1" } +web-transport-ws = { version = "0.1" } diff --git a/rs/hang-cli/src/server.rs b/rs/hang-cli/src/server.rs index ca56f72ea..c1aff5f4d 100644 --- a/rs/hang-cli/src/server.rs +++ b/rs/hang-cli/src/server.rs @@ -4,7 +4,7 @@ use axum::http::StatusCode; use axum::response::IntoResponse; use axum::{http::Method, routing::get, Router}; use hang::{cmaf, moq_lite}; -use moq_lite::web_transport; +use moq_native::web_transport_quinn; use std::net::SocketAddr; use std::path::PathBuf; use tokio::io::AsyncRead; @@ -66,7 +66,7 @@ async fn accept( #[tracing::instrument("session", skip_all, fields(id))] async fn run_session( id: u64, - session: web_transport::quinn::Request, + session: web_transport_quinn::Request, name: String, consumer: moq_lite::BroadcastConsumer, ) -> anyhow::Result<()> { diff --git a/rs/moq-native/Cargo.toml b/rs/moq-native/Cargo.toml index 623d766e2..63be5fdcc 100644 --- a/rs/moq-native/Cargo.toml +++ b/rs/moq-native/Cargo.toml @@ -12,6 +12,7 @@ keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] + anyhow = { version = "1", features = ["backtrace"] } clap = { version = "4", features = ["derive", "env"] } futures = "0.3" @@ -32,5 +33,5 @@ tokio = { workspace = true, features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2" -web-transport = { workspace = true } +web-transport-quinn = { workspace = true } webpki = "0.22" diff --git a/rs/moq-native/src/client.rs b/rs/moq-native/src/client.rs index c5b93fc10..eb3f65f4a 100644 --- a/rs/moq-native/src/client.rs +++ b/rs/moq-native/src/client.rs @@ -6,8 +6,6 @@ use std::path::PathBuf; use std::{fs, io, net, sync::Arc, time}; use url::Url; -use web_transport::quinn as web_transport_quinn; - #[derive(Clone, Default, Debug, clap::Args, serde::Serialize, serde::Deserialize)] #[serde(default, deny_unknown_fields)] pub struct ClientTls { @@ -170,7 +168,7 @@ impl Client { } let alpn = match url.scheme() { - "https" => web_transport::quinn::ALPN, + "https" => web_transport_quinn::ALPN, "moql" => moq_lite::ALPN, _ => anyhow::bail!("url scheme must be 'http', 'https', or 'moql'"), }; @@ -189,8 +187,8 @@ impl Client { tracing::Span::current().record("id", connection.stable_id()); let session = match url.scheme() { - "https" => web_transport::quinn::Session::connect(connection, url).await?, - moq_lite::ALPN => web_transport::quinn::Session::raw(connection, url), + "https" => web_transport_quinn::Session::connect(connection, url).await?, + moq_lite::ALPN => web_transport_quinn::Session::raw(connection, url), _ => unreachable!(), }; diff --git a/rs/moq-native/src/lib.rs b/rs/moq-native/src/lib.rs index fa38063e5..7c625b2b8 100644 --- a/rs/moq-native/src/lib.rs +++ b/rs/moq-native/src/lib.rs @@ -8,4 +8,4 @@ pub use server::*; // Re-export these crates. pub use moq_lite; -pub use web_transport; +pub use web_transport_quinn; diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 023818e2a..75226fcc1 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -14,8 +14,6 @@ use futures::future::BoxFuture; use futures::stream::{FuturesUnordered, StreamExt}; use futures::FutureExt; -use web_transport::quinn as web_transport_quinn; - #[derive(clap::Args, Clone, Debug, serde::Serialize, serde::Deserialize)] #[serde(deny_unknown_fields)] pub struct ServerTlsCert { @@ -105,7 +103,7 @@ impl Server { .with_cert_resolver(Arc::new(serve)); tls.alpn_protocols = vec![ - web_transport::quinn::ALPN.as_bytes().to_vec(), + web_transport_quinn::ALPN.as_bytes().to_vec(), moq_lite::ALPN.as_bytes().to_vec(), ]; tls.key_log = Arc::new(rustls::KeyLogFile::new()); @@ -186,9 +184,9 @@ impl Server { span.record("id", conn.stable_id()); // TODO can we get this earlier? match alpn.as_str() { - web_transport::quinn::ALPN => { + web_transport_quinn::ALPN => { // Wait for the CONNECT request. - web_transport::quinn::Request::accept(conn) + web_transport_quinn::Request::accept(conn) .await .context("failed to receive WebTransport request") } diff --git a/rs/moq-relay/Cargo.toml b/rs/moq-relay/Cargo.toml index e45e6c92f..e65e61fb0 100644 --- a/rs/moq-relay/Cargo.toml +++ b/rs/moq-relay/Cargo.toml @@ -12,10 +12,12 @@ keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] [dependencies] + anyhow = { version = "1", features = ["backtrace"] } -axum = { version = "0.8", features = ["tokio"] } +axum = { version = "0.8", features = ["tokio", "ws"] } bytes = "1" clap = { version = "4", features = ["derive"] } +futures = "0.3" http-body = "1" hyper-serve = { version = "0.6", features = [ "tls-rustls", @@ -31,7 +33,7 @@ toml = "0.8" tower-http = { version = "0.6", features = ["cors"] } tracing = "0.1" url = { version = "2", features = ["serde"] } -web-transport = { workspace = true } +web-transport-ws = { workspace = true } [dev-dependencies] tempfile = "3" diff --git a/rs/moq-relay/cfg/dev.toml b/rs/moq-relay/cfg/dev.toml index 96074d544..b2d0257c5 100644 --- a/rs/moq-relay/cfg/dev.toml +++ b/rs/moq-relay/cfg/dev.toml @@ -15,6 +15,11 @@ listen = "[::]:4443" # This is used for local development, in conjunction with a fingerprint, or with TLS verification disabled. tls.generate = ["localhost"] +[web] +# Listen for HTTP and WebSocket (TCP) connections on the given address. +# Defaults to disabled if not provided. +listen = "[::]:4443" + # See root.toml and leaf.toml for auth and clustering examples. [auth] # Allow anonymous access to everything. diff --git a/rs/moq-relay/cfg/leaf.toml b/rs/moq-relay/cfg/leaf.toml index 04eed2f33..36188cff9 100644 --- a/rs/moq-relay/cfg/leaf.toml +++ b/rs/moq-relay/cfg/leaf.toml @@ -12,6 +12,11 @@ listen = "[::]:4444" # You should use a real certificate in production. tls.generate = ["localhost"] +[web] +# Listen for HTTP and WebSocket (TCP) connections on the given address. +# Defaults to disabled if not provided. +listen = "[::]:4444" + # This clustering scheme is very very simple for now. # # There is a root node that is used to connect leaf nodes together. diff --git a/rs/moq-relay/cfg/root.toml b/rs/moq-relay/cfg/root.toml index 103d68635..def36b02e 100644 --- a/rs/moq-relay/cfg/root.toml +++ b/rs/moq-relay/cfg/root.toml @@ -12,6 +12,11 @@ listen = "[::]:4443" # This is used for local development, in conjunction with a fingerprint, or with TLS verification disabled. tls.generate = ["localhost"] +[web] +# Listen for HTTP and WebSocket (TCP) connections on the given address. +# Defaults to disabled if not provided. +listen = "[::]:4443" + # In production, we would use a real certificate from something like Let's Encrypt. # Multiple certificates are supported; the first one that matches the SNI will be used. # [[server.tls.cert]] diff --git a/rs/moq-relay/src/auth.rs b/rs/moq-relay/src/auth.rs index 0f0cc447b..378e6d75f 100644 --- a/rs/moq-relay/src/auth.rs +++ b/rs/moq-relay/src/auth.rs @@ -1,9 +1,35 @@ use std::sync::Arc; -use anyhow::Context; +use axum::http; use moq_lite::{AsPath, Path, PathOwned}; use serde::{Deserialize, Serialize}; -use url::Url; + +#[derive(thiserror::Error, Debug, Clone)] +pub enum AuthError { + #[error("authentication is disabled")] + UnexpectedToken, + + #[error("a token was expected")] + ExpectedToken, + + #[error("failed to decode the token")] + DecodeFailed, + + #[error("the path does not match the root")] + IncorrectRoot, +} + +impl From for http::StatusCode { + fn from(_: AuthError) -> Self { + http::StatusCode::UNAUTHORIZED + } +} + +impl axum::response::IntoResponse for AuthError { + fn into_response(self) -> axum::response::Response { + http::StatusCode::UNAUTHORIZED.into_response() + } +} #[derive(clap::Args, Clone, Debug, Serialize, Deserialize, Default)] #[serde(default)] @@ -63,14 +89,14 @@ impl Auth { // Parse the token from the user provided URL, returning the claims if successful. // If no token is provided, then the claims will use the public path if it is set. - pub fn verify(&self, url: &Url) -> anyhow::Result { + pub fn verify(&self, path: &str, token: Option<&str>) -> Result { // Find the token in the query parameters. // ?jwt=... - let claims = if let Some((_, token)) = url.query_pairs().find(|(k, _)| k == "jwt") { + let claims = if let Some(token) = token { if let Some(key) = self.key.as_ref() { - key.decode(&token)? + key.decode(token).map_err(|_| AuthError::DecodeFailed)? } else { - anyhow::bail!("token provided, but no key configured"); + return Err(AuthError::UnexpectedToken); } } else if let Some(public) = &self.public { moq_token::Claims { @@ -80,17 +106,18 @@ impl Auth { ..Default::default() } } else { - anyhow::bail!("no token provided and no public path configured"); + return Err(AuthError::ExpectedToken); }; // Get the path from the URL, removing any leading or trailing slashes. // We will automatically add a trailing slash when joining the path with the subscribe/publish roots. - let root = Path::new(url.path()); + let root = Path::new(path); // Make sure the URL path matches the root path. - let suffix = root - .strip_prefix(&claims.root) - .context("path does not match the root")?; + let suffix = match root.strip_prefix(&claims.root) { + None => return Err(AuthError::IncorrectRoot), + Some(suffix) => suffix, + }; // If a more specific path is is provided, reduce the permissions. let subscribe = claims @@ -150,15 +177,13 @@ mod tests { })?; // Should succeed for anonymous path - let url = Url::parse("https://relay.example.com/anon")?; - let token = auth.verify(&url)?; + let token = auth.verify("/anon", None)?; assert_eq!(token.root, "anon".as_path()); assert_eq!(token.subscribe, vec!["".as_path()]); assert_eq!(token.publish, vec!["".as_path()]); // Should succeed for sub-paths under anonymous - let url = Url::parse("https://relay.example.com/anon/room/123")?; - let token = auth.verify(&url)?; + let token = auth.verify("/anon/room/123", None)?; assert_eq!(token.root, Path::new("anon/room/123").to_owned()); assert_eq!(token.subscribe, vec![Path::new("").to_owned()]); assert_eq!(token.publish, vec![Path::new("").to_owned()]); @@ -175,8 +200,7 @@ mod tests { })?; // Should succeed for any path - let url = Url::parse("https://relay.example.com/any/path")?; - let token = auth.verify(&url)?; + let token = auth.verify("/any/path", None)?; assert_eq!(token.root, Path::new("any/path").to_owned()); assert_eq!(token.subscribe, vec![Path::new("").to_owned()]); assert_eq!(token.publish, vec![Path::new("").to_owned()]); @@ -193,10 +217,8 @@ mod tests { })?; // Should fail for non-anonymous path - let url = Url::parse("https://relay.example.com/secret")?; - let result = auth.verify(&url); + let result = auth.verify("/secret", None); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("path does not match the root")); Ok(()) } @@ -210,13 +232,8 @@ mod tests { })?; // Should fail when no token and no public path - let url = Url::parse("https://relay.example.com/any/path")?; - let result = auth.verify(&url); + let result = auth.verify("/any/path", None); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("no token provided and no public path configured")); Ok(()) } @@ -229,13 +246,8 @@ mod tests { })?; // Should fail when token provided but no key configured - let url = Url::parse("https://relay.example.com/any/path?jwt=fake-token")?; - let result = auth.verify(&url); + let result = auth.verify("/any/path", Some("fake-token")); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("token provided, but no key configured")); Ok(()) } @@ -258,8 +270,7 @@ mod tests { let token = key.encode(&claims)?; // Should succeed with valid token and matching path - let url = Url::parse(&format!("https://relay.example.com/room/123?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123", Some(&token))?; assert_eq!(token.root, "room/123".as_path()); assert_eq!(token.subscribe, vec!["".as_path()]); assert_eq!(token.publish, vec!["alice".as_path()]); @@ -285,10 +296,8 @@ mod tests { let token = key.encode(&claims)?; // Should fail when trying to access wrong path - let url = Url::parse(&format!("https://relay.example.com/secret?jwt={token}"))?; - let result = auth.verify(&url); + let result = auth.verify("/secret", Some(&token)); assert!(result.is_err()); - assert!(result.unwrap_err().to_string().contains("path does not match the root")); Ok(()) } @@ -311,8 +320,7 @@ mod tests { let token = key.encode(&claims)?; // Verify the restrictions are preserved - let url = Url::parse(&format!("https://relay.example.com/room/123?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123", Some(&token))?; assert_eq!(token.root, "room/123".as_path()); assert_eq!(token.subscribe, vec!["bob".as_path()]); assert_eq!(token.publish, vec!["alice".as_path()]); @@ -337,8 +345,7 @@ mod tests { }; let token = key.encode(&claims)?; - let url = Url::parse(&format!("https://relay.example.com/room/123?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123", Some(&token))?; assert_eq!(token.subscribe, vec!["".as_path()]); assert_eq!(token.publish, vec![]); @@ -362,8 +369,7 @@ mod tests { }; let token = key.encode(&claims)?; - let url = Url::parse(&format!("https://relay.example.com/room/123?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123", Some(&token))?; assert_eq!(token.subscribe, vec![]); assert_eq!(token.publish, vec!["bob".as_path()]); @@ -388,8 +394,7 @@ mod tests { let token = key.encode(&claims)?; // Connect to more specific path room/123/alice - let url = Url::parse(&format!("https://relay.example.com/room/123/alice?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123/alice", Some(&token))?; // Root should be updated to the more specific path assert_eq!(token.root, Path::new("room/123/alice")); @@ -418,8 +423,7 @@ mod tests { let token = key.encode(&claims)?; // Connect to room/123/alice - should remove alice prefix from publish - let url = Url::parse(&format!("https://relay.example.com/room/123/alice?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123/alice", Some(&token))?; assert_eq!(token.root, "room/123/alice".as_path()); // Alice still can't subscribe to anything. @@ -448,8 +452,7 @@ mod tests { let token = key.encode(&claims)?; // Connect to room/123/bob - should remove bob prefix from subscribe - let url = Url::parse(&format!("https://relay.example.com/room/123/bob?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123/bob", Some(&token))?; assert_eq!(token.root, "room/123/bob".as_path()); // bob prefix stripped, now can subscribe to everything under room/123/bob @@ -477,8 +480,7 @@ mod tests { let token = key.encode(&claims)?; // Connect to room/123/alice - loses ability to subscribe to bob - let url = Url::parse(&format!("https://relay.example.com/room/123/alice?jwt={token}"))?; - let verified = auth.verify(&url)?; + let verified = auth.verify("/room/123/alice", Some(&token))?; assert_eq!(verified.root, "room/123/alice".as_path()); // Can't subscribe to bob anymore (alice doesn't have bob prefix) @@ -487,8 +489,7 @@ mod tests { assert_eq!(verified.publish, vec!["".as_path()]); // Connect to room/123/bob - loses ability to publish to alice - let url = Url::parse(&format!("https://relay.example.com/room/123/bob?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123/bob", Some(&token))?; assert_eq!(token.root, "room/123/bob".as_path()); // Can subscribe to everything under bob @@ -517,8 +518,7 @@ mod tests { let token = key.encode(&claims)?; // Connect to room/123/users - permissions should be reduced - let url = Url::parse(&format!("https://relay.example.com/room/123/users?jwt={token}"))?; - let verified = auth.verify(&url)?; + let verified = auth.verify("/room/123/users", Some(&token))?; assert_eq!(verified.root, "room/123/users".as_path()); // users prefix removed from paths @@ -526,8 +526,7 @@ mod tests { assert_eq!(verified.publish, vec!["alice/camera".as_path()]); // Connect to room/123/users/alice - further reduction - let url = Url::parse(&format!("https://relay.example.com/room/123/users/alice?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123/users/alice", Some(&token))?; assert_eq!(token.root, "room/123/users/alice".as_path()); // Can't subscribe (alice doesn't have bob prefix) @@ -556,8 +555,7 @@ mod tests { let token = key.encode(&claims)?; // Connect to more specific path - let url = Url::parse(&format!("https://relay.example.com/room/123/alice?jwt={token}"))?; - let token = auth.verify(&url)?; + let token = auth.verify("/room/123/alice", Some(&token))?; // Should remain read-only assert_eq!(token.subscribe, vec!["".as_path()]); @@ -572,8 +570,7 @@ mod tests { }; let token = key.encode(&claims)?; - let url = Url::parse(&format!("https://relay.example.com/room/123/alice?jwt={token}"))?; - let verified = auth.verify(&url)?; + let verified = auth.verify("/room/123/alice", Some(&token))?; // Should remain write-only assert_eq!(verified.subscribe, vec![]); diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 332ced91f..396b38741 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -5,6 +5,8 @@ use moq_lite::{AsPath, Broadcast, BroadcastConsumer, BroadcastProducer, Origin, use tracing::Instrument; use url::Url; +use crate::AuthToken; + #[serde_with::serde_as] #[derive(clap::Args, Clone, Debug, serde::Serialize, serde::Deserialize, Default)] #[serde_with::skip_serializing_none] @@ -64,6 +66,33 @@ impl Cluster { } } + // For a given auth token, return the origin that should be used for the session. + pub fn subscriber(&self, token: &AuthToken) -> Option { + // These broadcasts will be served to the session (when it subscribes). + // If this is a cluster node, then only publish our primary broadcasts. + // Otherwise publish everything. + let subscribe_origin = match token.cluster { + true => &self.primary, + false => &self.combined, + }; + + // Scope the origin to our root. + let subscribe_origin = subscribe_origin.producer.with_root(&token.root).unwrap(); + subscribe_origin.consume_only(&token.subscribe) + } + + pub fn publisher(&self, token: &AuthToken) -> Option { + // If this is a cluster node, then add its broadcasts to the secondary origin. + // That way we won't publish them to other cluster nodes. + let publish_origin = match token.cluster { + true => &self.secondary, + false => &self.primary, + }; + + let publish_origin = publish_origin.producer.with_root(&token.root).unwrap(); + publish_origin.publish_only(&token.publish) + } + pub fn get(&self, broadcast: &str) -> Option { self.primary .consumer diff --git a/rs/moq-relay/src/config.rs b/rs/moq-relay/src/config.rs index ab451f6a0..17e3e7ed4 100644 --- a/rs/moq-relay/src/config.rs +++ b/rs/moq-relay/src/config.rs @@ -1,7 +1,7 @@ use clap::Parser; use serde::{Deserialize, Serialize}; -use crate::{AuthConfig, ClusterConfig}; +use crate::{AuthConfig, ClusterConfig, WebConfig}; #[derive(Parser, Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] @@ -30,6 +30,11 @@ pub struct Config { #[serde(default)] pub auth: AuthConfig, + /// Optionally run a HTTP and WebSocket server. + #[command(flatten)] + #[serde(default)] + pub web: WebConfig, + /// If provided, load the configuration from this file. #[serde(default)] pub file: Option, diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 68e848f2a..b73864c37 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -1,10 +1,10 @@ use crate::{Auth, Cluster}; -use web_transport::quinn::http; +use moq_native::web_transport_quinn; pub struct Connection { pub id: u64, - pub request: web_transport::quinn::Request, + pub request: web_transport_quinn::Request, pub cluster: Cluster, pub auth: Auth, } @@ -12,55 +12,45 @@ pub struct Connection { impl Connection { #[tracing::instrument("conn", skip_all, fields(id = self.id))] pub async fn run(self) -> anyhow::Result<()> { + // Extract the path and token from the URL. + let path = self.request.url().path(); + let token = self + .request + .url() + .query_pairs() + .find(|(k, _)| k == "jwt") + .map(|(_, v)| v.to_string()); + // Verify the URL before accepting the connection. - let token = match self.auth.verify(self.request.url()) { + let token = match self.auth.verify(path, token.as_deref()) { Ok(token) => token, Err(err) => { - self.request.close(http::StatusCode::UNAUTHORIZED).await?; - return Err(err); + self.request.close(err.clone().into()).await?; + return Err(err.into()); } }; tracing::info!(token = ?token, "session accepted"); - // Accept the connection. - let session = self.request.ok().await?; - - // These broadcasts will be served to the session (when it subscribes). - // If this is a cluster node, then only publish our primary broadcasts. - // Otherwise publish everything. - let subscribe_origin = match token.cluster { - true => &self.cluster.primary, - false => &self.cluster.combined, - }; - - // Scope the origin to our root. - let subscribe_origin = subscribe_origin.producer.with_root(&token.root).unwrap(); - let subscribe = subscribe_origin.consume_only(&token.subscribe); + let publish = self.cluster.publisher(&token); + let subscribe = self.cluster.subscriber(&token); - // If this is a cluster node, then add its broadcasts to the secondary origin. - // That way we won't publish them to other cluster nodes. - let publish_origin = match token.cluster { - true => &self.cluster.secondary, - false => &self.cluster.primary, - }; - - let publish_origin = publish_origin.producer.with_root(&token.root).unwrap(); - let publish = publish_origin.publish_only(&token.publish); - - match (&subscribe, &publish) { - (Some(subscribe), Some(publish)) => { - tracing::info!(root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), "session accepted"); + match (&publish, &subscribe) { + (Some(publish), Some(subscribe)) => { + tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "session accepted"); } - (Some(subscribe), None) => { - tracing::info!(root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "subscriber accepted"); + (Some(publish), None) => { + tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), "publisher accepted"); } - (None, Some(publish)) => { - tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::>().join(","), "publisher accepted") + (None, Some(subscribe)) => { + tracing::info!(root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::>().join(","), "subscriber accepted") } _ => anyhow::bail!("invalid session; no allowed paths"), } + // Accept the connection. + let session = self.request.ok().await?; + // NOTE: subscribe and publish seem backwards because of how relays work. // We publish the tracks the client is allowed to subscribe to. // We subscribe to the tracks the client is allowed to publish. diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index bc03c1e8f..37c2d0d92 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -25,10 +25,12 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { cloned.run().await.expect("cluster failed") }); // Create a web server too. - let web = Web::new(WebConfig { - bind: addr, - fingerprints, + let web = Web::new(WebState { + auth: auth.clone(), cluster: cluster.clone(), + fingerprints, + config: config.web, + conn_id: Default::default(), }); tokio::spawn(async move { diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index 49c788d69..46c6a1fed 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -1,82 +1,157 @@ +use futures::{SinkExt, StreamExt}; use std::{ net, pin::Pin, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, task::{ready, Context, Poll}, }; +use web_transport_ws::tungstenite; use axum::{ body::Body, - extract::Path, + extract::{Path, Query, State, WebSocketUpgrade}, http::{Method, StatusCode}, response::{IntoResponse, Response}, - routing::get, + routing::{any, get}, Router, }; use bytes::Bytes; +use clap::Parser; use hyper_serve::accept::DefaultAcceptor; +use moq_lite::{OriginConsumer, OriginProducer}; +use serde::{Deserialize, Serialize}; use std::future::Future; use tower_http::cors::{Any, CorsLayer}; -use crate::Cluster; +use crate::{Auth, Cluster}; +#[derive(Debug, Deserialize)] +struct Params { + jwt: Option, +} + +#[derive(Parser, Clone, Default, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] pub struct WebConfig { - pub bind: net::SocketAddr, - pub fingerprints: Vec, + /// Listen for HTTP and WebSocket connections on the given address. + /// Defaults to disabled if not provided. + #[arg(long = "web-listen", id = "web-listen", env = "MOQ_WEB_LISTEN")] + pub listen: Option, +} + +pub struct WebState { + pub auth: Auth, pub cluster: Cluster, + pub fingerprints: Vec, + pub config: WebConfig, + pub conn_id: AtomicU64, } // Run a HTTP server using Axum -// TODO remove this when Chrome adds support for self-signed certificates using WebTransport pub struct Web { app: Router, - server: hyper_serve::Server, + server: Option>, } impl Web { - pub fn new(config: WebConfig) -> Self { + pub fn new(state: WebState) -> Self { // Get the first certificate's fingerprint. // TODO serve all of them so we can support multiple signature algorithms. - let fingerprint = config.fingerprints.first().expect("missing certificate").clone(); + let fingerprint = state.fingerprints.first().expect("missing certificate").clone(); + let listen = state.config.listen; let app = Router::new() .route("/certificate.sha256", get(fingerprint)) - .route( - "/announced", - get({ - let cluster = config.cluster.clone(); - move || serve_announced(Path("".to_string()), cluster.clone()) - }), - ) - .route( - "/announced/{*prefix}", - get({ - let cluster = config.cluster.clone(); - move |path| serve_announced(path, cluster) - }), - ) - .route( - "/fetch/{*path}", - get({ - let cluster = config.cluster.clone(); - move |path| serve_fetch(path, cluster) - }), - ) - .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])); - - let server = hyper_serve::bind(config.bind); + .route("/announced", get(serve_announced)) + .route("/announced/{*prefix}", get(serve_announced)) + .route("/fetch/{*path}", get(serve_fetch)) + .route("/{*path}", any(serve_ws)) + .layer(CorsLayer::new().allow_origin(Any).allow_methods([Method::GET])) + .with_state(Arc::new(state)); + let server = listen.map(hyper_serve::bind); Self { app, server } } pub async fn run(self) -> anyhow::Result<()> { - self.server.serve(self.app.into_make_service()).await?; + if let Some(server) = self.server { + server.serve(self.app.into_make_service()).await?; + } Ok(()) } } +async fn serve_ws( + ws: WebSocketUpgrade, + Path(path): Path, + Query(params): Query, + State(state): State>, +) -> axum::response::Result { + let ws = ws.protocols(["webtransport"]); + + let token = state.auth.verify(&path, params.jwt.as_deref())?; + let publish = state.cluster.publisher(&token); + let subscribe = state.cluster.subscriber(&token); + + if publish.is_none() && subscribe.is_none() { + // Bad token, we can't publish or subscribe. + return Err(StatusCode::UNAUTHORIZED.into()); + } + + Ok(ws.on_upgrade(async move |socket| { + let id = state.conn_id.fetch_add(1, Ordering::Relaxed); + + // Unfortuantely, we need to convert from Axum to Tungstenite. + // Axum uses Tungstenite internally, but it's not exposed to avoid semvar issues. + let socket = socket + .map(axum_to_tungstenite) + // TODO Figure out how to avoid swallowing errors. + .sink_map_err(|err| { + tracing::warn!(%err, "WebSocket error"); + tungstenite::Error::ConnectionClosed + }) + .with(tungstenite_to_axum); + let _ = handle_socket(id, socket, publish, subscribe).await; + })) +} + +#[tracing::instrument("ws", err, skip_all, fields(id = _id))] +async fn handle_socket( + _id: u64, + socket: T, + publish: Option, + subscribe: Option, +) -> anyhow::Result<()> +where + T: futures::Stream> + + futures::Sink + + Send + + Unpin + + 'static, +{ + tracing::info!("session accepted"); + + // Wrap the WebSocket in a WebTransport compatibility layer. + let ws = web_transport_ws::Session::new(socket, true); + + tracing::info!("connecting session"); + let session = moq_lite::Session::accept(ws, subscribe, publish).await?; + tracing::info!("web session connected"); + + Err(session.closed().await.into()) +} + /// Serve the announced broadcasts for a given prefix. -async fn serve_announced(Path(prefix): Path, cluster: Cluster) -> axum::response::Result { - let mut origin = match cluster.combined.consumer.consume_only(&[prefix.into()]) { +async fn serve_announced( + Path(prefix): Path, + Query(params): Query, + State(state): State>, +) -> axum::response::Result { + let token = state.auth.verify(&prefix, params.jwt.as_deref())?; + let mut origin = match state.cluster.subscriber(&token) { Some(origin) => origin, None => return Err(StatusCode::UNAUTHORIZED.into()), }; @@ -93,14 +168,27 @@ async fn serve_announced(Path(prefix): Path, cluster: Cluster) -> axum:: } /// Serve the latest group for a given track -async fn serve_fetch(Path(path): Path, cluster: Cluster) -> axum::response::Result { +async fn serve_fetch( + Path(path): Path, + Query(params): Query, + State(state): State>, +) -> axum::response::Result { + // The path containts a broadcast/track let mut path: Vec<&str> = path.split("/").collect(); - if path.len() < 2 { + let track = path.pop().unwrap().to_string(); + + // We need at least a broadcast and a track. + if path.is_empty() { return Err(StatusCode::BAD_REQUEST.into()); } - let track = path.pop().unwrap().to_string(); let broadcast = path.join("/"); + let token = state.auth.verify(&broadcast, params.jwt.as_deref())?; + + let origin = match state.cluster.subscriber(&token) { + Some(origin) => origin, + None => return Err(StatusCode::UNAUTHORIZED.into()), + }; tracing::info!(%broadcast, %track, "subscribing to track"); @@ -109,7 +197,7 @@ async fn serve_fetch(Path(path): Path, cluster: Cluster) -> axum::respon priority: 0, }; - let broadcast = cluster.get(&broadcast).ok_or(StatusCode::NOT_FOUND)?; + let broadcast = origin.consume_broadcast(&broadcast).ok_or(StatusCode::NOT_FOUND)?; let mut track = broadcast.subscribe_track(&track); let group = match track.next_group().await { @@ -187,3 +275,47 @@ impl IntoResponse for ServeGroupError { (StatusCode::INTERNAL_SERVER_ERROR, self.0.to_string()).into_response() } } + +// https://github.com/tokio-rs/axum/discussions/848#discussioncomment-11443587 + +#[allow(clippy::result_large_err)] +fn axum_to_tungstenite( + message: Result, +) -> Result { + match message { + Ok(msg) => Ok(match msg { + axum::extract::ws::Message::Text(text) => tungstenite::Message::Text(text.to_string()), + axum::extract::ws::Message::Binary(bin) => tungstenite::Message::Binary(bin.into()), + axum::extract::ws::Message::Ping(ping) => tungstenite::Message::Ping(ping.into()), + axum::extract::ws::Message::Pong(pong) => tungstenite::Message::Pong(pong.into()), + axum::extract::ws::Message::Close(close) => { + tungstenite::Message::Close(close.map(|c| tungstenite::protocol::CloseFrame { + code: c.code.into(), + reason: c.reason.to_string().into(), + })) + } + }), + Err(_err) => Err(tungstenite::Error::ConnectionClosed), + } +} + +#[allow(clippy::result_large_err)] +fn tungstenite_to_axum( + message: tungstenite::Message, +) -> Pin> + Send + Sync>> { + Box::pin(async move { + Ok(match message { + tungstenite::Message::Text(text) => axum::extract::ws::Message::Text(text.into()), + tungstenite::Message::Binary(bin) => axum::extract::ws::Message::Binary(bin.into()), + tungstenite::Message::Ping(ping) => axum::extract::ws::Message::Ping(ping.into()), + tungstenite::Message::Pong(pong) => axum::extract::ws::Message::Pong(pong.into()), + tungstenite::Message::Frame(_frame) => unreachable!(), + tungstenite::Message::Close(close) => { + axum::extract::ws::Message::Close(close.map(|c| axum::extract::ws::CloseFrame { + code: c.code.into(), + reason: c.reason.to_string().into(), + })) + } + }) + }) +} diff --git a/rs/moq/Cargo.toml b/rs/moq/Cargo.toml index beb1c99e5..d0742c7d8 100644 --- a/rs/moq/Cargo.toml +++ b/rs/moq/Cargo.toml @@ -29,4 +29,4 @@ tokio = { workspace = true, features = [ ] } tracing = "0.1" web-async = { workspace = true } -web-transport = { workspace = true } +web-transport-trait = { workspace = true } diff --git a/rs/moq/src/coding/varint.rs b/rs/moq/src/coding/varint.rs index 1c034a198..ef9415bc8 100644 --- a/rs/moq/src/coding/varint.rs +++ b/rs/moq/src/coding/varint.rs @@ -18,7 +18,7 @@ pub struct BoundsExceeded; /// Values of this type are suitable for encoding as QUIC variable-length integer. /// It would be neat if we could express to Rust that the top two bits are available for use as enum /// discriminants -#[derive(Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct VarInt(u64); impl VarInt { diff --git a/rs/moq/src/error.rs b/rs/moq/src/error.rs index 8df201a52..cbcfc5008 100644 --- a/rs/moq/src/error.rs +++ b/rs/moq/src/error.rs @@ -1,10 +1,10 @@ use crate::{coding, message}; /// A list of possible errors that can occur during the session. -#[derive(thiserror::Error, Debug, Clone)] +#[derive(thiserror::Error, Debug)] pub enum Error { - #[error("webtransport error: {0}")] - WebTransport(#[from] web_transport::Error), + #[error("transport error: {0}")] + Transport(Box), #[error("decode error: {0}")] Decode(#[from] coding::DecodeError), @@ -67,7 +67,7 @@ impl Error { Self::RequiredExtension(_) => 1, Self::Old => 2, Self::Timeout => 3, - Self::WebTransport(_) => 4, + Self::Transport(_) => 4, Self::Decode(_) => 5, Self::Unauthorized => 6, Self::Version(..) => 9, @@ -82,4 +82,26 @@ impl Error { } } +impl Clone for Error { + fn clone(&self) -> Self { + match self { + Error::Transport(_) => Error::Transport(Box::new(std::io::Error::other("Transport error (cloned)"))), + Error::Decode(e) => Error::Decode(e.clone()), + Error::Version(v1, v2) => Error::Version(v1.clone(), v2.clone()), + Error::RequiredExtension(e) => Error::RequiredExtension(*e), + Error::UnexpectedStream(s) => Error::UnexpectedStream(*s), + Error::BoundsExceeded(b) => Error::BoundsExceeded(*b), + Error::Duplicate => Error::Duplicate, + Error::Cancel => Error::Cancel, + Error::Timeout => Error::Timeout, + Error::Old => Error::Old, + Error::App(code) => Error::App(*code), + Error::NotFound => Error::NotFound, + Error::WrongSize => Error::WrongSize, + Error::ProtocolViolation => Error::ProtocolViolation, + Error::Unauthorized => Error::Unauthorized, + } + } +} + pub type Result = std::result::Result; diff --git a/rs/moq/src/lib.rs b/rs/moq/src/lib.rs index 6cbe69e46..5b9636e30 100644 --- a/rs/moq/src/lib.rs +++ b/rs/moq/src/lib.rs @@ -21,6 +21,7 @@ mod session; pub mod coding; pub mod message; + pub use error::*; pub use model::*; pub use path::*; @@ -28,6 +29,3 @@ pub use session::*; /// The ALPN used when connecting via QUIC directly. pub const ALPN: &str = message::Alpn::CURRENT.0; - -/// Export the web_transport crate. -pub use web_transport; diff --git a/rs/moq/src/session/mod.rs b/rs/moq/src/session/mod.rs index aebd759b0..d9b2183a7 100644 --- a/rs/moq/src/session/mod.rs +++ b/rs/moq/src/session/mod.rs @@ -17,14 +17,14 @@ use writer::*; /// /// This simplifies the state machine and immediately rejects any subscriptions that don't match the origin prefix. /// You probably want to use [Session] unless you're writing a relay. -pub struct Session { - pub webtransport: web_transport::Session, +pub struct Session { + pub transport: S, } -impl Session { +impl Session { async fn new( - mut session: web_transport::Session, - stream: Stream, + session: S, + stream: Stream, // We will publish any local broadcasts from this origin. publish: Option, // We will consume any remote broadcasts, inserting them into this origin. @@ -34,7 +34,7 @@ impl Session { let subscriber = Subscriber::new(session.clone(), subscribe); let this = Self { - webtransport: session.clone(), + transport: session.clone(), }; let init = oneshot::channel(); @@ -47,13 +47,13 @@ impl Session { }; match res { - Err(Error::WebTransport(web_transport::Error::Session(_))) => { + Err(Error::Transport(_)) => { tracing::info!("session terminated"); session.close(1, ""); } Err(err) => { tracing::warn!(%err, "session error"); - session.close(err.to_code(), &err.to_string()); + session.close(err.to_code(), err.to_string().as_ref()); } _ => { tracing::info!("session closed"); @@ -73,18 +73,17 @@ impl Session { /// Perform the MoQ handshake as a client. pub async fn connect( - session: impl Into, + session: S, publish: impl Into>, subscribe: impl Into>, ) -> Result { - let mut session = session.into(); - let mut stream = Stream::open(&mut session, message::ControlType::Session).await?; + let mut stream = Stream::open(&session, message::ControlType::Session).await?; Self::connect_setup(&mut stream).await?; let session = Self::new(session, stream, publish.into(), subscribe.into()).await?; Ok(session) } - async fn connect_setup(setup: &mut Stream) -> Result<(), Error> { + async fn connect_setup(setup: &mut Stream) -> Result<(), Error> { let client = message::ClientSetup { versions: [message::Version::CURRENT].into(), extensions: Default::default(), @@ -99,17 +98,12 @@ impl Session { } /// Perform the MoQ handshake as a server - pub async fn accept< - T: Into, - P: Into>, - C: Into>, - >( - session: T, + pub async fn accept>, C: Into>>( + session: S, publish: P, subscribe: C, ) -> Result { - let mut session = session.into(); - let mut stream = Stream::accept(&mut session).await?; + let mut stream = Stream::accept(&session).await?; let kind = stream.reader.decode().await?; Self::accept_setup(kind, &mut stream).await?; @@ -117,13 +111,12 @@ impl Session { Ok(session) } - async fn accept_setup(kind: message::ControlType, control: &mut Stream) -> Result<(), Error> { + async fn accept_setup(kind: message::ControlType, control: &mut Stream) -> Result<(), Error> { if kind != message::ControlType::Session && kind != message::ControlType::ClientCompat { return Err(Error::UnexpectedStream(kind)); } let client: message::ClientSetup = control.reader.decode().await?; - if !client.versions.contains(&message::Version::CURRENT) { return Err(Error::Version(client.versions, [message::Version::CURRENT].into())); } @@ -147,18 +140,18 @@ impl Session { } // TODO do something useful with this - async fn run_session(mut stream: Stream) -> Result<(), Error> { + async fn run_session(mut stream: Stream) -> Result<(), Error> { while let Some(_info) = stream.reader.decode_maybe::().await? {} Err(Error::Cancel) } - /// Close the underlying WebTransport session. - pub fn close(mut self, err: Error) { - self.webtransport.close(err.to_code(), &err.to_string()); + /// Close the underlying transport session. + pub fn close(self, err: Error) { + self.transport.close(err.to_code(), err.to_string().as_ref()); } - /// Block until the WebTransport session is closed. + /// Block until the transport session is closed. pub async fn closed(&self) -> Error { - self.webtransport.closed().await.into() + Error::Transport(self.transport.closed().await.into()) } } diff --git a/rs/moq/src/session/publisher.rs b/rs/moq/src/session/publisher.rs index 6181471cf..9c241ab3d 100644 --- a/rs/moq/src/session/publisher.rs +++ b/rs/moq/src/session/publisher.rs @@ -1,4 +1,5 @@ use web_async::FuturesExt; +use web_transport_trait::SendStream; use crate::{ message, model::GroupConsumer, AsPath, BroadcastConsumer, Error, Origin, OriginConsumer, Track, TrackConsumer, @@ -6,13 +7,13 @@ use crate::{ use super::{Stream, Writer}; -pub(super) struct Publisher { - session: web_transport::Session, +pub(super) struct Publisher { + session: S, origin: OriginConsumer, } -impl Publisher { - pub fn new(session: web_transport::Session, origin: Option) -> Self { +impl Publisher { + pub fn new(session: S, origin: Option) -> Self { // Default to a dummy origin that is immediately closed. let origin = origin.unwrap_or_else(|| Origin::produce().consumer); Self { session, origin } @@ -25,7 +26,7 @@ impl Publisher { async fn run_bi(mut self) -> Result<(), Error> { loop { - let mut stream = Stream::accept(&mut self.session).await?; + let mut stream = Stream::accept(&self.session).await?; // To avoid cloning the origin, we process each control stream in received order. // This adds some head-of-line blocking but it delays an expensive clone. @@ -43,7 +44,7 @@ impl Publisher { } } - pub async fn recv_announce(&mut self, mut stream: Stream) -> Result<(), Error> { + pub async fn recv_announce(&mut self, mut stream: Stream) -> Result<(), Error> { let interest = stream.reader.decode::().await?; let prefix = interest.prefix.to_owned(); @@ -61,7 +62,7 @@ impl Publisher { Error::Cancel => { tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); } - Error::WebTransport(_) => { + Error::Transport(_) => { tracing::debug!(prefix = %origin.absolute(prefix), "announcing cancelled"); } err => { @@ -78,7 +79,11 @@ impl Publisher { Ok(()) } - async fn run_announce(stream: &mut Stream, origin: &mut OriginConsumer, prefix: impl AsPath) -> Result<(), Error> { + async fn run_announce( + stream: &mut Stream, + origin: &mut OriginConsumer, + prefix: impl AsPath, + ) -> Result<(), Error> { let prefix = prefix.as_path(); let mut init = Vec::new(); @@ -120,14 +125,14 @@ impl Publisher { stream.writer.encode(&msg).await?; } }, - None => return stream.writer.close().await, + None => return stream.writer.finish().await, } } } } } - pub async fn recv_subscribe(&mut self, mut stream: Stream) -> Result<(), Error> { + pub async fn recv_subscribe(&mut self, mut stream: Stream) -> Result<(), Error> { let subscribe = stream.reader.decode::().await?; let id = subscribe.id; @@ -146,7 +151,7 @@ impl Publisher { tracing::debug!(%id, broadcast = %absolute, %track, "subscribed cancelled") } // TODO better classify WebTransport errors. - Error::WebTransport(_) => { + Error::Transport(_) => { tracing::debug!(%id, broadcast = %absolute, %track, "subscribed cancelled") } err => { @@ -163,8 +168,8 @@ impl Publisher { } async fn run_subscribe( - session: web_transport::Session, - stream: &mut Stream, + session: S, + stream: &mut Stream, subscribe: &message::Subscribe<'_>, consumer: Option, ) -> Result<(), Error> { @@ -189,14 +194,10 @@ impl Publisher { res = stream.reader.closed() => res?, } - stream.writer.close().await + stream.writer.finish().await } - async fn run_track( - session: web_transport::Session, - mut track: TrackConsumer, - subscribe: &message::Subscribe<'_>, - ) -> Result<(), Error> { + async fn run_track(session: S, mut track: TrackConsumer, subscribe: &message::Subscribe<'_>) -> Result<(), Error> { // TODO use a BTreeMap serve the latest N groups by sequence. // Until then, we'll implement N=2 manually. // Also, this is more complicated because we can't use tokio because of WASM. @@ -241,7 +242,7 @@ impl Publisher { continue; } - let priority = Self::stream_priority(track.info.priority, sequence); + let priority = stream_priority(track.info.priority, sequence); let msg = message::Group { subscribe: subscribe.id, sequence, @@ -257,6 +258,8 @@ impl Publisher { old_group.take(); // Drop the future to cancel it. } + assert!(old_group.is_none()); + if sequence >= *latest { old_group = new_group; old_sequence = new_sequence; @@ -271,14 +274,17 @@ impl Publisher { } pub async fn serve_group( - mut session: web_transport::Session, + session: S, msg: message::Group, priority: i32, mut group: GroupConsumer, ) -> Result<(), Error> { // TODO add a way to open in priority order. - let mut stream = Writer::open(&mut session, message::DataType::Group).await?; + let mut stream = session.open_uni().await.map_err(|err| Error::Transport(err.into()))?; stream.set_priority(priority); + + let mut stream = Writer::new(stream); + stream.encode(&message::DataType::Group).await?; stream.encode(&msg).await?; loop { @@ -293,6 +299,8 @@ impl Publisher { None => break, }; + tracing::trace!(size = %frame.info.size, "writing frame"); + stream.encode(&frame.info.size).await?; loop { @@ -307,22 +315,26 @@ impl Publisher { None => break, } } + + tracing::trace!(size = %frame.info.size, "wrote frame"); } - stream.close().await?; + stream.finish().await?; + + tracing::debug!(sequence = %msg.sequence, "finished group"); Ok(()) } +} - // Quinn takes a i32 priority. - // We do our best to distill 70 bits of information into 32 bits, but overflows will happen. - // Specifically, group sequence 2^24 will overflow and be incorrectly prioritized. - // But even with a group per frame, it will take ~6 days to reach that point. - // TODO The behavior when two tracks share the same priority is undefined. Should we round-robin? - fn stream_priority(track_priority: u8, group_sequence: u64) -> i32 { - let sequence = 0xFFFFFF - (group_sequence as u32 & 0xFFFFFF); - ((track_priority as i32) << 24) | sequence as i32 - } +// Quinn takes a i32 priority. +// We do our best to distill 70 bits of information into 32 bits, but overflows will happen. +// Specifically, group sequence 2^24 will overflow and be incorrectly prioritized. +// But even with a group per frame, it will take ~6 days to reach that point. +// TODO The behavior when two tracks share the same priority is undefined. Should we round-robin? +fn stream_priority(track_priority: u8, group_sequence: u64) -> i32 { + let sequence = 0xFFFFFF - (group_sequence as u32 & 0xFFFFFF); + ((track_priority as i32) << 24) | sequence as i32 } #[cfg(test)] @@ -330,9 +342,9 @@ mod test { use super::*; #[test] - fn stream_priority() { + fn priority() { let assert = |track_priority, group_sequence, expected| { - assert_eq!(Publisher::stream_priority(track_priority, group_sequence), expected); + assert_eq!(stream_priority(track_priority, group_sequence), expected); }; const U24: i32 = (1 << 24) - 1; diff --git a/rs/moq/src/session/reader.rs b/rs/moq/src/session/reader.rs index cbd7267a6..4b86da75b 100644 --- a/rs/moq/src/session/reader.rs +++ b/rs/moq/src/session/reader.rs @@ -4,23 +4,28 @@ use bytes::{Buf, Bytes, BytesMut}; use crate::{coding::*, Error}; -pub struct Reader { - stream: web_transport::RecvStream, +pub struct Reader { + stream: S, buffer: BytesMut, } -impl Reader { - pub fn new(stream: web_transport::RecvStream) -> Self { +impl Reader { + pub fn new(stream: S) -> Self { Self { stream, buffer: Default::default(), } } - pub async fn accept(session: &mut web_transport::Session) -> Result { + /* + pub async fn accept(session: &S) -> Result + where + S::RecvStream: T, + { let stream = session.accept_uni().await?; Ok(Self::new(stream)) } + */ pub async fn decode(&mut self) -> Result { loop { @@ -32,7 +37,13 @@ impl Reader { } Err(DecodeError::Short) => { // Try to read more data - if self.stream.read_buf(&mut self.buffer).await?.is_none() { + if self + .stream + .read_buf(&mut self.buffer) + .await + .map_err(|e| Error::Transport(e.into()))? + .is_none() + { // Stream closed while we still need more data return Err(Error::Decode(DecodeError::Short)); } @@ -59,12 +70,22 @@ impl Reader { return Ok(Some(data)); } - Ok(self.stream.read(max).await?) + self.stream + .read_chunk(max) + .await + .map_err(|e| Error::Transport(e.into())) } /// Wait until the stream is closed, erroring if there are any additional bytes. pub async fn closed(&mut self) -> Result<(), Error> { - if self.buffer.is_empty() && self.stream.read_buf(&mut self.buffer).await?.is_none() { + if self.buffer.is_empty() + && self + .stream + .read_buf(&mut self.buffer) + .await + .map_err(|e| Error::Transport(e.into()))? + .is_none() + { return Ok(()); } diff --git a/rs/moq/src/session/stream.rs b/rs/moq/src/session/stream.rs index 4b2b5fe40..93d98b94b 100644 --- a/rs/moq/src/session/stream.rs +++ b/rs/moq/src/session/stream.rs @@ -1,14 +1,14 @@ use super::{Reader, Writer}; use crate::{message, Error}; -pub(super) struct Stream { - pub writer: Writer, - pub reader: Reader, +pub(super) struct Stream { + pub writer: Writer, + pub reader: Reader, } -impl Stream { - pub async fn open(session: &mut web_transport::Session, typ: message::ControlType) -> Result { - let (send, recv) = session.open_bi().await?; +impl Stream { + pub async fn open(session: &S, typ: message::ControlType) -> Result { + let (send, recv) = session.open_bi().await.map_err(|err| Error::Transport(err.into()))?; let mut writer = Writer::new(send); let reader = Reader::new(recv); @@ -17,8 +17,8 @@ impl Stream { Ok(Stream { writer, reader }) } - pub async fn accept(session: &mut web_transport::Session) -> Result { - let (send, recv) = session.accept_bi().await?; + pub async fn accept(session: &S) -> Result { + let (send, recv) = session.accept_bi().await.map_err(|err| Error::Transport(err.into()))?; let writer = Writer::new(send); let reader = Reader::new(recv); diff --git a/rs/moq/src/session/subscriber.rs b/rs/moq/src/session/subscriber.rs index 2fd97d4c8..9e606ad96 100644 --- a/rs/moq/src/session/subscriber.rs +++ b/rs/moq/src/session/subscriber.rs @@ -9,13 +9,13 @@ use crate::{ }; use tokio::sync::oneshot; -use web_async::{spawn, Lock}; +use web_async::Lock; use super::{Reader, Stream}; #[derive(Clone)] -pub(super) struct Subscriber { - session: web_transport::Session, +pub(super) struct Subscriber { + session: S, origin: Option, broadcasts: Lock>, @@ -23,8 +23,8 @@ pub(super) struct Subscriber { next_id: Arc, } -impl Subscriber { - pub fn new(session: web_transport::Session, origin: Option) -> Self { +impl Subscriber { + pub fn new(session: S, origin: Option) -> Self { Self { session, origin, @@ -42,9 +42,15 @@ impl Subscriber { } } - async fn run_uni(mut self) -> Result<(), Error> { + async fn run_uni(self) -> Result<(), Error> { loop { - let stream = Reader::accept(&mut self.session).await?; + let stream = self + .session + .accept_uni() + .await + .map_err(|err| Error::Transport(err.into()))?; + + let stream = Reader::new(stream); let this = self.clone(); web_async::spawn(async move { @@ -53,7 +59,7 @@ impl Subscriber { } } - async fn run_uni_stream(mut self, mut stream: Reader) -> Result<(), Error> { + async fn run_uni_stream(mut self, mut stream: Reader) -> Result<(), Error> { let kind = stream.decode().await?; let res = match kind { @@ -74,7 +80,7 @@ impl Subscriber { return Ok(()); } - let mut stream = Stream::open(&mut self.session, message::ControlType::Announce).await?; + let mut stream = Stream::open(&self.session, message::ControlType::Announce).await?; tracing::trace!(root = %self.log_path(""), "announced start"); @@ -108,7 +114,7 @@ impl Subscriber { } // Close the stream when there's nothing more to announce. - stream.writer.close().await + stream.writer.finish().await } fn start_announce( @@ -132,7 +138,7 @@ impl Subscriber { .unwrap() .publish_broadcast(path.clone(), broadcast.consumer); - spawn(self.clone().run_broadcast(path, broadcast.producer)); + web_async::spawn(self.clone().run_broadcast(path, broadcast.producer)); Ok(()) } @@ -155,7 +161,7 @@ impl Subscriber { let mut this = self.clone(); let path = path.clone(); - spawn(async move { + web_async::spawn(async move { this.run_subscribe(id, path, track).await; this.subscribes.lock().remove(&id); }); @@ -183,7 +189,7 @@ impl Subscriber { }; match res { - Err(Error::Cancel) | Err(Error::WebTransport(_)) => { + Err(Error::Cancel) | Err(Error::Transport(_)) => { tracing::debug!(broadcast = %self.log_path(&broadcast), track = %track.info.name, id, "subscribe cancelled"); track.abort(Error::Cancel); } @@ -199,17 +205,17 @@ impl Subscriber { } async fn run_track(&mut self, msg: message::Subscribe<'_>) -> Result<(), Error> { - let mut stream = Stream::open(&mut self.session, message::ControlType::Subscribe).await?; + let mut stream = Stream::open(&self.session, message::ControlType::Subscribe).await?; if let Err(err) = self.run_track_stream(&mut stream, msg).await { stream.writer.abort(&err); return Err(err); } - stream.writer.close().await + stream.writer.finish().await } - async fn run_track_stream(&mut self, stream: &mut Stream, msg: message::Subscribe<'_>) -> Result<(), Error> { + async fn run_track_stream(&mut self, stream: &mut Stream, msg: message::Subscribe<'_>) -> Result<(), Error> { stream.writer.encode(&msg).await?; // TODO use the response correctly populate the track info @@ -221,7 +227,7 @@ impl Subscriber { Ok(()) } - pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { + pub async fn recv_group(&mut self, stream: &mut Reader) -> Result<(), Error> { let group: message::Group = stream.decode().await?; let group = { @@ -240,7 +246,7 @@ impl Subscriber { }; match res { - Err(Error::Cancel) | Err(Error::WebTransport(_)) => { + Err(Error::Cancel) | Err(Error::Transport(_)) => { tracing::trace!(group = %group.info.sequence, "group cancelled"); group.abort(Error::Cancel); } @@ -257,7 +263,7 @@ impl Subscriber { Ok(()) } - async fn run_group(&mut self, stream: &mut Reader, mut group: GroupProducer) -> Result<(), Error> { + async fn run_group(&mut self, stream: &mut Reader, mut group: GroupProducer) -> Result<(), Error> { while let Some(size) = stream.decode_maybe::().await? { let frame = group.create_frame(Frame { size }); @@ -277,15 +283,19 @@ impl Subscriber { Ok(()) } - async fn run_frame(&mut self, stream: &mut Reader, mut frame: FrameProducer) -> Result<(), Error> { + async fn run_frame(&mut self, stream: &mut Reader, mut frame: FrameProducer) -> Result<(), Error> { let mut remain = frame.info.size; + tracing::trace!(size = %frame.info.size, "reading frame"); + while remain > 0 { let chunk = stream.read(remain as usize).await?.ok_or(Error::WrongSize)?; remain = remain.checked_sub(chunk.len() as u64).ok_or(Error::WrongSize)?; frame.write_chunk(chunk); } + tracing::trace!(size = %frame.info.size, "read frame"); + frame.close(); Ok(()) diff --git a/rs/moq/src/session/writer.rs b/rs/moq/src/session/writer.rs index a954bdd63..b0ad349af 100644 --- a/rs/moq/src/session/writer.rs +++ b/rs/moq/src/session/writer.rs @@ -1,20 +1,24 @@ -use crate::{coding::*, message, Error}; +use crate::{coding::*, Error}; -// A wrapper around a web_transport::SendStream that will reset on Drop -pub(super) struct Writer { - stream: web_transport::SendStream, +// A wrapper around a SendStream that will reset on Drop +pub(super) struct Writer { + stream: S, buffer: bytes::BytesMut, } -impl Writer { - pub fn new(stream: web_transport::SendStream) -> Self { +impl Writer { + pub fn new(stream: S) -> Self { Self { stream, buffer: Default::default(), } } - pub async fn open(session: &mut web_transport::Session, typ: message::DataType) -> Result { + /* + pub async fn open(session: &S, typ: message::DataType) -> Result + where + S::SendStream: T, + { let send = session.open_uni().await?; let mut writer = Self::new(send); @@ -22,31 +26,30 @@ impl Writer { Ok(writer) } + */ pub async fn encode(&mut self, msg: &T) -> Result<(), Error> { self.buffer.clear(); msg.encode(&mut self.buffer); while !self.buffer.is_empty() { - self.stream.write_buf(&mut self.buffer).await?; + self.stream + .write_buf(&mut self.buffer) + .await + .map_err(|e| Error::Transport(e.into()))?; } Ok(()) } pub async fn write(&mut self, buf: &[u8]) -> Result<(), Error> { - self.stream.write(buf).await?; // convert the error type + self.stream.write(buf).await.map_err(|e| Error::Transport(e.into()))?; Ok(()) } - pub fn set_priority(&mut self, priority: i32) { - self.stream.set_priority(priority); - } - /// A clean termination of the stream, waiting for the peer to close. - pub async fn close(&mut self) -> Result<(), Error> { - self.stream.finish()?; - self.stream.closed().await?; // TODO Return any error code? + pub async fn finish(&mut self) -> Result<(), Error> { + self.stream.finish().await.map_err(|e| Error::Transport(e.into()))?; Ok(()) } @@ -55,12 +58,12 @@ impl Writer { } pub async fn closed(&mut self) -> Result<(), Error> { - self.stream.closed().await?; + self.stream.closed().await.map_err(|e| Error::Transport(e.into()))?; Ok(()) } } -impl Drop for Writer { +impl Drop for Writer { fn drop(&mut self) { // Unlike the Quinn default, we abort the stream on drop. self.stream.reset(Error::Cancel.to_code());