diff --git a/.gitignore b/.gitignore index a9fd35fae..f850519c1 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ node_modules *.hex *.jwk *.jwt +*.m3u8 +*.m4s # We're using bun (and sometimes deno) package-lock.json diff --git a/Cargo.lock b/Cargo.lock index d9834b6b0..dc48555c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,6 +112,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compression" +version = "0.4.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07a926debf178f2d355197f9caddb08e54a9329d44748034bba349c5848cb519" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -408,6 +421,23 @@ dependencies = [ "memchr", ] +[[package]] +name = "compression-codecs" +version = "0.4.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34a3cbbb8b6eca96f3a5c4bf6938d5b27ced3675d69f95bb51948722870bc323" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -457,6 +487,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -711,6 +750,16 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +[[package]] +name = "flate2" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -913,14 +962,17 @@ dependencies = [ "h264-parser", "hex", "lazy_static", + "m3u8-rs", "moq-lite", "moq-native", "mp4-atom", "num_enum", "regex", + "reqwest", "serde", "serde_json", "serde_with", + "strum", "thiserror 2.0.17", "tokio", "tracing", @@ -933,6 +985,7 @@ version = "0.7.0" dependencies = [ "anyhow", "axum", + "bytes", "clap", "hang", "hyper-serve", @@ -1060,6 +1113,23 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + [[package]] name = "hyper-serve" version = "0.6.2" @@ -1420,6 +1490,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "m3u8-rs" +version = "5.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1d7ba86f7ea62f17f4310c55e93244619ddc7dadfc7e565de1967e4e41e6e7" +dependencies = [ + "chrono", + "nom", +] + [[package]] name = "matchers" version = "0.2.0" @@ -1457,6 +1537,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -1464,6 +1550,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" dependencies = [ "adler2", + "simd-adler32", ] [[package]] @@ -1606,6 +1693,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -2164,16 +2261,21 @@ dependencies = [ "http-body", "http-body-util", "hyper", + "hyper-rustls", "hyper-util", "js-sys", "log", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-rustls", "tower 0.5.2", "tower-http", "tower-service", @@ -2181,6 +2283,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", ] [[package]] @@ -2621,6 +2724,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" + [[package]] name = "simple_asn1" version = "0.6.3" @@ -2689,6 +2798,27 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" @@ -3007,6 +3137,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ + "async-compression", "bitflags", "bytes", "futures-core", @@ -3417,6 +3548,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi-util" version = "0.1.11" diff --git a/justfile b/justfile index b85757948..34aa43f11 100644 --- a/justfile +++ b/justfile @@ -144,8 +144,68 @@ pub name url="http://localhost:4443/anon" *args: -i "dev/{{name}}.fmp4" \ -c copy \ -f mp4 -movflags cmaf+separate_moof+delay_moov+skip_trailer+frag_every_frame \ - - | cargo run --bin hang -- publish --url "{{url}}" --name "{{name}}" {{args}} + - | cargo run --bin hang -- publish --url "{{url}}" --name "{{name}}" fmp4 {{args}} +# Generate and ingest an HLS stream from a video file. +pub-hls name relay="http://localhost:4443/anon": + #!/usr/bin/env bash + set -euo pipefail + + just download "{{name}}" + + INPUT="dev/{{name}}.mp4" + OUT_DIR="dev/{{name}}" + + rm -rf "$OUT_DIR" + mkdir -p "$OUT_DIR" + + echo ">>> Generating HLS stream to disk..." + + # Start ffmpeg in the background to generate HLS + ffmpeg -loglevel warning -re -stream_loop -1 -i "$INPUT" \ + -map 0:v:0 -map 0:v:0 -map 0:a:0 \ + -r 25 -preset veryfast -g 50 -keyint_min 50 -sc_threshold 0 \ + -c:v:0 libx264 -profile:v:0 high -level:v:0 4.1 -pix_fmt:v:0 yuv420p -tag:v:0 avc1 -bsf:v:0 dump_extra -b:v:0 4M -vf:0 "scale=1920:-2" \ + -c:v:1 libx264 -profile:v:1 high -level:v:1 4.1 -pix_fmt:v:1 yuv420p -tag:v:1 avc1 -bsf:v:1 dump_extra -b:v:1 300k -vf:1 "scale=256:-2" \ + -c:a aac -b:a 128k \ + -f hls \ + -hls_time 2 -hls_list_size 12 \ + -hls_flags independent_segments+delete_segments \ + -hls_segment_type fmp4 \ + -master_pl_name master.m3u8 \ + -var_stream_map "v:0,agroup:audio v:1,agroup:audio a:0,agroup:audio" \ + -hls_segment_filename "$OUT_DIR/v%v/segment_%09d.m4s" \ + "$OUT_DIR/v%v/stream.m3u8" & + + FFMPEG_PID=$! + + # Wait for master playlist to be generated + echo ">>> Waiting for HLS playlist generation..." + for i in {1..30}; do + if [ -f "$OUT_DIR/master.m3u8" ]; then + break + fi + sleep 0.5 + done + + if [ ! -f "$OUT_DIR/master.m3u8" ]; then + kill $FFMPEG_PID 2>/dev/null || true + echo "Error: master.m3u8 not generated in time" + exit 1 + fi + + echo ">>> Starting HLS ingest from disk: $OUT_DIR/master.m3u8" + + # Trap to clean up ffmpeg on exit + cleanup() { + echo "Shutting down..." + kill $FFMPEG_PID 2>/dev/null || true + exit 0 + } + trap cleanup SIGINT SIGTERM + + # Run hang to ingest from local files + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" # Publish a video using H.264 Annex B format to the localhost relay server pub-h264 name url="http://localhost:4443/anon" *args: @@ -287,3 +347,45 @@ update: build: bun run --filter='*' build cargo build + +# Generate and serve an HLS stream from a video for testing pub-hls +serve-hls name port="8000": + #!/usr/bin/env bash + set -euo pipefail + + just download "{{name}}" + + INPUT="dev/{{name}}.mp4" + OUT_DIR="dev/{{name}}" + + rm -rf "$OUT_DIR" + mkdir -p "$OUT_DIR" + + echo ">>> Starting HLS stream generation..." + echo ">>> Master playlist: http://localhost:{{port}}/master.m3u8" + + cleanup() { + echo "Shutting down..." + kill $(jobs -p) 2>/dev/null || true + exit 0 + } + trap cleanup SIGINT SIGTERM + + ffmpeg -loglevel warning -re -stream_loop -1 -i "$INPUT" \ + -map 0:v:0 -map 0:v:0 -map 0:a:0 \ + -r 25 -preset veryfast -g 50 -keyint_min 50 -sc_threshold 0 \ + -c:v:0 libx264 -profile:v:0 high -level:v:0 4.1 -pix_fmt:v:0 yuv420p -tag:v:0 avc1 -bsf:v:0 dump_extra -b:v:0 4M -vf:0 "scale=1920:-2" \ + -c:v:1 libx264 -profile:v:1 high -level:v:1 4.1 -pix_fmt:v:1 yuv420p -tag:v:1 avc1 -bsf:v:1 dump_extra -b:v:1 300k -vf:1 "scale=256:-2" \ + -c:a aac -b:a 128k \ + -f hls \ + -hls_time 2 -hls_list_size 12 \ + -hls_flags independent_segments+delete_segments \ + -hls_segment_type fmp4 \ + -master_pl_name master.m3u8 \ + -var_stream_map "v:0,agroup:audio v:1,agroup:audio a:0,agroup:audio" \ + -hls_segment_filename "$OUT_DIR/v%v/segment_%09d.m4s" \ + "$OUT_DIR/v%v/stream.m3u8" & + + sleep 2 + echo ">>> HTTP server: http://localhost:{{port}}/" + cd "$OUT_DIR" && python3 -m http.server {{port}} diff --git a/rs/hang-cli/Cargo.toml b/rs/hang-cli/Cargo.toml index 709acef14..e036403b5 100644 --- a/rs/hang-cli/Cargo.toml +++ b/rs/hang-cli/Cargo.toml @@ -19,6 +19,7 @@ path = "src/main.rs" [dependencies] anyhow = { version = "1", features = ["backtrace"] } axum = { version = "0.8", features = ["tokio"] } +bytes = "1.10" clap = { version = "4", features = ["derive"] } hang = { workspace = true } hyper-serve = { version = "0.6", features = ["tls-rustls"] } diff --git a/rs/hang-cli/src/client.rs b/rs/hang-cli/src/client.rs index 6b57cf88a..e9af631a0 100644 --- a/rs/hang-cli/src/client.rs +++ b/rs/hang-cli/src/client.rs @@ -1,18 +1,9 @@ -use crate::import::Import; -use crate::ImportType; -use anyhow::Context; +use crate::Publish; + use hang::moq_lite; -use tokio::io::AsyncRead; use url::Url; -pub async fn client( - config: moq_native::ClientConfig, - url: Url, - name: String, - format: ImportType, - input: &mut T, -) -> anyhow::Result<()> { - let broadcast = moq_lite::Broadcast::produce(); +pub async fn client(config: moq_native::ClientConfig, url: Url, name: String, publish: Publish) -> anyhow::Result<()> { let client = config.init()?; tracing::info!(%url, %name, "connecting"); @@ -20,30 +11,19 @@ pub async fn client( // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); + origin.producer.publish_broadcast(&name, publish.consume()); // Establish the connection, not providing a subscriber. let session = moq_lite::Session::connect(session, origin.consumer, None).await?; - let mut import = Import::new(broadcast.producer.into(), format); - import - .init_from(input) - .await - .context("failed to initialize from media stream")?; - - // Announce the broadcast as available once the catalog is ready. - origin.producer.publish_broadcast(&name, broadcast.consumer); - // Notify systemd that we're ready. let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); tokio::select! { - res = import.read_from(input) => res, + res = publish.run() => res, res = session.closed() => res.map_err(Into::into), - _ = tokio::signal::ctrl_c() => { session.close(moq_lite::Error::Cancel); - - // Give it a chance to close. tokio::time::sleep(std::time::Duration::from_millis(100)).await; Ok(()) }, diff --git a/rs/hang-cli/src/import.rs b/rs/hang-cli/src/import.rs deleted file mode 100644 index 13bbfc166..000000000 --- a/rs/hang-cli/src/import.rs +++ /dev/null @@ -1,52 +0,0 @@ -use clap::ValueEnum; -use hang::{moq_lite::coding::BytesMut, BroadcastProducer}; -use tokio::io::{AsyncRead, AsyncReadExt}; - -#[derive(ValueEnum, Clone)] -pub enum ImportType { - AnnexB, - Cmaf, -} - -impl ImportType { - fn as_str(&self) -> &'static str { - match self { - ImportType::AnnexB => "annex-b", - ImportType::Cmaf => "cmaf", - } - } -} - -pub struct Import { - decoder: hang::import::Decoder, - buffer: BytesMut, -} - -impl Import { - pub fn new(broadcast: BroadcastProducer, format: ImportType) -> Self { - let decoder = hang::import::Decoder::new(broadcast, format.as_str()).expect("supported format"); - Self { - decoder, - buffer: BytesMut::new(), - } - } -} - -impl Import { - pub async fn init_from(&mut self, input: &mut T) -> anyhow::Result<()> { - while !self.decoder.is_initialized() && input.read_buf(&mut self.buffer).await? > 0 { - self.decoder.decode_stream(&mut self.buffer)?; - } - - Ok(()) - } - - pub async fn read_from(&mut self, input: &mut T) -> anyhow::Result<()> { - while input.read_buf(&mut self.buffer).await? > 0 { - self.decoder.decode_stream(&mut self.buffer)?; - } - - // Flush the final frame. - self.decoder.decode_frame(&mut self.buffer, None) - } -} diff --git a/rs/hang-cli/src/main.rs b/rs/hang-cli/src/main.rs index 126a2c6be..30bff20a0 100644 --- a/rs/hang-cli/src/main.rs +++ b/rs/hang-cli/src/main.rs @@ -1,14 +1,13 @@ mod client; -mod import; +mod publish; mod server; -use std::path::PathBuf; - use client::*; -use import::*; +use publish::*; use server::*; use clap::{Parser, Subcommand}; +use std::path::PathBuf; use url::Url; #[derive(Parser, Clone)] @@ -35,8 +34,8 @@ pub enum Command { dir: Option, /// The format of the input media. - #[arg(long, value_enum, default_value_t = ImportType::Cmaf)] - format: ImportType, + #[command(subcommand)] + format: PublishFormat, }, Publish { /// The MoQ client configuration. @@ -61,8 +60,8 @@ pub enum Command { name: String, /// The format of the input media. - #[arg(long, value_enum, default_value_t = ImportType::Cmaf)] - format: ImportType, + #[command(subcommand)] + format: PublishFormat, }, } @@ -71,18 +70,16 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); cli.log.init(); + let mut publish = Publish::new(match &cli.command { + Command::Serve { format, .. } => format, + Command::Publish { format, .. } => format, + })?; + + // Initialize the broadcast from stdin before starting any client/server. + publish.init().await?; + match cli.command { - Command::Serve { - config, - dir, - name, - format, - } => server(config, name, dir, format, &mut tokio::io::stdin()).await, - Command::Publish { - config, - url, - name, - format, - } => client(config, url, name, format, &mut tokio::io::stdin()).await, + Command::Serve { config, dir, name, .. } => server(config, name, dir, publish).await, + Command::Publish { config, url, name, .. } => client(config, url, name, publish).await, } } diff --git a/rs/hang-cli/src/publish.rs b/rs/hang-cli/src/publish.rs new file mode 100644 index 000000000..eb12d8771 --- /dev/null +++ b/rs/hang-cli/src/publish.rs @@ -0,0 +1,102 @@ +use bytes::BytesMut; +use clap::Subcommand; +use hang::{ + import::{Decoder, DecoderFormat}, + moq_lite::BroadcastConsumer, + BroadcastProducer, +}; +use tokio::io::AsyncReadExt; + +#[derive(Subcommand, Clone)] +pub enum PublishFormat { + Avc3, + Fmp4, + // NOTE: No aac support because it needs framing. + Hls { + /// URL or file path of an HLS playlist to ingest. + #[arg(long)] + playlist: String, + }, +} + +enum PublishDecoder { + Decoder(Box), + Hls(Box), +} + +pub struct Publish { + decoder: PublishDecoder, + broadcast: BroadcastProducer, + buffer: BytesMut, +} + +impl Publish { + pub fn new(format: &PublishFormat) -> anyhow::Result { + let broadcast = BroadcastProducer::default(); + + let decoder = match format { + PublishFormat::Avc3 => { + let format = DecoderFormat::Avc3; + let stream = Decoder::new(broadcast.clone(), format); + PublishDecoder::Decoder(Box::new(stream)) + } + PublishFormat::Fmp4 => { + let format = DecoderFormat::Fmp4; + let stream = Decoder::new(broadcast.clone(), format); + PublishDecoder::Decoder(Box::new(stream)) + } + PublishFormat::Hls { playlist } => { + let hls = hang::import::Hls::new( + broadcast.clone(), + hang::import::HlsConfig { + playlist: playlist.clone(), + client: None, + }, + )?; + PublishDecoder::Hls(Box::new(hls)) + } + }; + + Ok(Self { + decoder, + buffer: BytesMut::new(), + broadcast, + }) + } + + pub fn consume(&self) -> BroadcastConsumer { + self.broadcast.consume() + } +} + +impl Publish { + pub async fn init(&mut self) -> anyhow::Result<()> { + match &mut self.decoder { + PublishDecoder::Decoder(decoder) => { + let mut input = tokio::io::stdin(); + + while !decoder.is_initialized() && input.read_buf(&mut self.buffer).await? > 0 { + decoder.decode_stream(&mut self.buffer)?; + } + } + PublishDecoder::Hls(decoder) => decoder.init().await?, + } + + Ok(()) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + match &mut self.decoder { + PublishDecoder::Decoder(decoder) => { + let mut input = tokio::io::stdin(); + + while input.read_buf(&mut self.buffer).await? > 0 { + decoder.decode_stream(&mut self.buffer)?; + } + } + PublishDecoder::Hls(decoder) => decoder.run().await?, + } + + Ok(()) + } +} diff --git a/rs/hang-cli/src/server.rs b/rs/hang-cli/src/server.rs index 386e238c4..cb46d42ec 100644 --- a/rs/hang-cli/src/server.rs +++ b/rs/hang-cli/src/server.rs @@ -6,18 +6,16 @@ use axum::{http::Method, routing::get, Router}; use hang::moq_lite; use std::net::SocketAddr; use std::path::PathBuf; -use tokio::io::AsyncRead; use tower_http::cors::{Any, CorsLayer}; use tower_http::services::ServeDir; -use crate::import::{Import, ImportType}; +use crate::Publish; -pub async fn server( +pub async fn server( config: moq_native::ServerConfig, name: String, public: Option, - format: ImportType, - input: &mut T, + publish: Publish, ) -> anyhow::Result<()> { let mut listen = config.bind.unwrap_or("[::]:443".parse().unwrap()); listen = tokio::net::lookup_host(listen) @@ -32,17 +30,12 @@ pub async fn server( // TODO serve all of them so we can support multiple signature algorithms. let fingerprint = server.fingerprints().first().context("missing certificate")?.clone(); - let broadcast = moq_lite::Broadcast::produce(); - let mut import = Import::new(broadcast.producer.into(), format); - - import.init_from(input).await?; - // Notify systemd that we're ready. let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); tokio::select! { - res = accept(server, name, broadcast.consumer) => res, - res = import.read_from(input) => res, + res = accept(server, name, publish.consume()) => res, + res = publish.run() => res, res = web(listen, fingerprint, public) => res, } } diff --git a/rs/hang/Cargo.toml b/rs/hang/Cargo.toml index d3659137c..4bef5ca69 100644 --- a/rs/hang/Cargo.toml +++ b/rs/hang/Cargo.toml @@ -20,16 +20,23 @@ futures = "0.3" h264-parser = "0.4.0" hex = "0.4" lazy_static = "1" +m3u8-rs = "5" moq-lite = { workspace = true, features = ["serde"] } mp4-atom = { version = "0.9.2", features = ["tokio", "bytes", "serde"] } num_enum = "0.7" regex = "1" +reqwest = { version = "0.12", default-features = false, features = [ + "rustls-tls", + "gzip", +] } serde = { workspace = true } serde_json = "1" serde_with = { version = "3", features = ["hex"] } +strum = { version = "0.27", features = ["derive"] } thiserror = "2" tokio = { workspace = true, features = ["macros"] } tracing = "0.1" +url = "2" [dependencies.derive_more] version = "2" @@ -38,4 +45,3 @@ features = ["from", "display", "debug"] [dev-dependencies] anyhow = "1" moq-native = { workspace = true } -url = "2" diff --git a/rs/hang/src/error.rs b/rs/hang/src/error.rs index cc27ea6a7..3b2afaf2b 100644 --- a/rs/hang/src/error.rs +++ b/rs/hang/src/error.rs @@ -65,6 +65,14 @@ pub enum Error { /// The timestamp of each keyframe must be monotonically increasing. #[error("timestamp went backwards")] TimestampBackwards, + + /// An error from the HTTP client. + #[error("http error: {0}")] + Http(Arc), + + /// Failed to parse a URL. + #[error("url parse error: {0}")] + Url(#[from] url::ParseError), } /// A Result type alias for hang operations. @@ -79,3 +87,9 @@ impl From for Error { Error::Json(Arc::new(err)) } } + +impl From for Error { + fn from(err: reqwest::Error) -> Self { + Error::Http(Arc::new(err)) + } +} diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index 2508529cb..065282374 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -8,11 +8,16 @@ use moq_lite as moq; pub struct Aac { broadcast: hang::BroadcastProducer, track: Option, + zero: Option, } impl Aac { pub fn new(broadcast: hang::BroadcastProducer) -> Self { - Self { broadcast, track: None } + Self { + broadcast, + track: None, + zero: None, + } } pub fn initialize(&mut self, buf: &mut T) -> anyhow::Result<()> { @@ -118,7 +123,8 @@ impl Aac { Ok(()) } - pub fn decode(&mut self, buf: &mut T, pts: hang::Timestamp) -> anyhow::Result<()> { + pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { + let pts = self.pts(pts)?; let track = self.track.as_mut().context("not initialized")?; // Create a BufList at chunk boundaries, potentially avoiding allocations. @@ -141,6 +147,15 @@ impl Aac { pub fn is_initialized(&self) -> bool { self.track.is_some() } + + fn pts(&mut self, hint: Option) -> anyhow::Result { + if let Some(pts) = hint { + return Ok(pts); + } + + let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); + Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) + } } impl Drop for Aac { diff --git a/rs/hang/src/import/avc3.rs b/rs/hang/src/import/avc3.rs index e2cfe723e..87f301d6a 100644 --- a/rs/hang/src/import/avc3.rs +++ b/rs/hang/src/import/avc3.rs @@ -23,6 +23,9 @@ pub struct Avc3 { // The current frame being built. current: Frame, + + // Used to compute wall clock timestamps if needed. + zero: Option, } impl Avc3 { @@ -32,6 +35,7 @@ impl Avc3 { track: None, config: None, current: Default::default(), + zero: None, } } @@ -95,17 +99,33 @@ impl Avc3 { Ok(()) } + /// Initialize the decoder with SPS/PPS and other non-slice NALs. + pub fn initialize>(&mut self, buf: &mut T) -> anyhow::Result<()> { + let nals = NalIterator::new(buf); + + for nal in nals { + self.decode_nal(nal?, None)?; + } + + Ok(()) + } + /// Decode as much data as possible from the given buffer. /// /// Unlike [Self::decode_framed], this method needs the start code for the next frame. /// This means it works for streaming media (ex. stdin) but adds a frame of latency. - pub fn decode_stream>(&mut self, buf: &mut T, pts: hang::Timestamp) -> anyhow::Result<()> { - // Iterate over the NAL units in the buffer based on start codes. + pub fn decode_stream>( + &mut self, + buf: &mut T, + pts: Option, + ) -> anyhow::Result<()> { + let pts = self.pts(pts)?; + // Iterate over the NAL units in the buffer based on start codes. let nals = NalIterator::new(buf); for nal in nals { - self.decode_nal(nal?, pts)?; + self.decode_nal(nal?, Some(pts))?; } Ok(()) @@ -118,9 +138,15 @@ impl Avc3 { /// This can also be used when EOF is detected to flush the final frame. /// /// NOTE: The next decode will fail if it doesn't begin with a start code. - pub fn decode_frame>(&mut self, buf: &mut T, pts: hang::Timestamp) -> anyhow::Result<()> { + pub fn decode_frame>( + &mut self, + buf: &mut T, + pts: Option, + ) -> anyhow::Result<()> { + let pts = self.pts(pts)?; + // Decode any NALs at the start of the buffer. - self.decode_stream(buf, pts)?; + self.decode_stream(buf, Some(pts))?; // Make sure there's a start code at the start of the buffer. let start = after_start_code(buf.as_ref())?.context("missing start code")?; @@ -128,15 +154,15 @@ impl Avc3 { // Assume the rest of the buffer is a single NAL. let nal = buf.copy_to_bytes(buf.remaining()); - self.decode_nal(nal, pts)?; + self.decode_nal(nal, Some(pts))?; // Flush the frame if we read a slice. - self.maybe_start_frame(pts)?; + self.maybe_start_frame(Some(pts))?; Ok(()) } - fn decode_nal(&mut self, nal: Bytes, pts: hang::Timestamp) -> anyhow::Result<()> { + fn decode_nal(&mut self, nal: Bytes, pts: Option) -> anyhow::Result<()> { let header = nal.first().context("NAL unit is too short")?; let forbidden_zero_bit = (header >> 7) & 1; anyhow::ensure!(forbidden_zero_bit == 0, "forbidden zero bit is not zero"); @@ -186,13 +212,14 @@ impl Avc3 { Ok(()) } - fn maybe_start_frame(&mut self, pts: hang::Timestamp) -> anyhow::Result<()> { + fn maybe_start_frame(&mut self, pts: Option) -> anyhow::Result<()> { // If we haven't seen any slices, we shouldn't flush yet. if !self.current.contains_slice { return Ok(()); } let track = self.track.as_mut().context("expected SPS before any frames")?; + let pts = pts.context("missing timestamp")?; let payload = std::mem::take(&mut self.current.chunks); let frame = hang::Frame { @@ -212,6 +239,15 @@ impl Avc3 { pub fn is_initialized(&self) -> bool { self.track.is_some() } + + fn pts(&mut self, hint: Option) -> anyhow::Result { + if let Some(pts) = hint { + return Ok(pts); + } + + let zero = self.zero.get_or_insert_with(tokio::time::Instant::now); + Ok(hang::Timestamp::from_micros(zero.elapsed().as_micros() as u64)?) + } } impl Drop for Avc3 { diff --git a/rs/hang/src/import/decoder.rs b/rs/hang/src/import/decoder.rs index c9b4c3ac3..8c35f3a9d 100644 --- a/rs/hang/src/import/decoder.rs +++ b/rs/hang/src/import/decoder.rs @@ -4,6 +4,17 @@ use crate::{self as hang, import::Aac}; use super::{Avc3, Fmp4}; +#[derive(Clone, Copy, Debug, Eq, PartialEq, strum::EnumString)] +#[strum(serialize_all = "lowercase")] +pub enum DecoderFormat { + /// aka H264 with inline SPS/PPS + Avc3, + /// fMP4/CMAF container. + Fmp4, + /// Raw AAC frames (not ADTS). + Aac, +} + #[derive(derive_more::From)] enum DecoderKind { /// aka H264 with inline SPS/PPS @@ -19,31 +30,18 @@ enum DecoderKind { pub struct Decoder { // The decoder for the given format. decoder: DecoderKind, - - // Used for decoders that don't have timestamps in the stream. - zero: Option, } impl Decoder { - /// Create a new decoder with the given format, or `None` if the format is not supported. - pub fn new(broadcast: hang::BroadcastProducer, format: &str) -> Option { + /// Create a new decoder with the given format. + pub fn new(broadcast: hang::BroadcastProducer, format: DecoderFormat) -> Self { let decoder = match format { - "avc3" => Avc3::new(broadcast).into(), - "h264" => { - // NOTE: avc1 is unsupported, because the SPS/PPS are out-of-band. - tracing::warn!("'h264' format is deprecated, use 'avc3' instead"); - Avc3::new(broadcast).into() - } - "annex-b" => { - tracing::warn!("'annex-b' format is deprecated, use 'avc3' instead"); - Avc3::new(broadcast).into() - } - "fmp4" | "cmaf" => Box::new(Fmp4::new(broadcast)).into(), - "aac" => Aac::new(broadcast).into(), - _ => return None, + DecoderFormat::Avc3 => Avc3::new(broadcast).into(), + DecoderFormat::Fmp4 => Box::new(Fmp4::new(broadcast)).into(), + DecoderFormat::Aac => Aac::new(broadcast).into(), }; - Some(Self { decoder, zero: None }) + Self { decoder } } /// Initialize the decoder with the given buffer and populate the broadcast. @@ -53,13 +51,8 @@ impl Decoder { /// /// The buffer will be fully consumed, or an error will be returned. pub fn initialize>(&mut self, buf: &mut T) -> anyhow::Result<()> { - let mut pts = || { - self.zero = self.zero.or_else(|| Some(tokio::time::Instant::now())); - hang::Timestamp::from_micros(self.zero.unwrap().elapsed().as_micros() as u64) - }; - match &mut self.decoder { - DecoderKind::Avc3(decoder) => decoder.decode_frame(buf, pts()?)?, + DecoderKind::Avc3(decoder) => decoder.initialize(buf)?, DecoderKind::Fmp4(decoder) => decoder.decode(buf)?, DecoderKind::Aac(decoder) => decoder.initialize(buf)?, } @@ -83,18 +76,14 @@ impl Decoder { /// /// If the buffer is not fully consumed, more data is needed. pub fn decode_stream>(&mut self, buf: &mut T) -> anyhow::Result<()> { - // Make a function to compute the PTS timestamp only if needed by a decoder. - // We want to avoid calling Instant::now() if not needed. - let mut pts = || { - self.zero = self.zero.or_else(|| Some(tokio::time::Instant::now())); - hang::Timestamp::from_micros(self.zero.unwrap().elapsed().as_micros() as u64) - }; - match &mut self.decoder { - DecoderKind::Avc3(decoder) => decoder.decode_stream(buf, pts()?), - DecoderKind::Fmp4(decoder) => decoder.decode(buf), - DecoderKind::Aac(decoder) => decoder.decode(buf, pts()?), + DecoderKind::Avc3(decoder) => decoder.decode_stream(buf, None)?, + DecoderKind::Fmp4(decoder) => decoder.decode(buf)?, + // TODO Fix or make this more type safe. + DecoderKind::Aac(_) => anyhow::bail!("AAC does not support stream decoding"), } + + Ok(()) } /// Flush the decoder at a frame boundary. @@ -112,20 +101,10 @@ impl Decoder { buf: &mut T, pts: Option, ) -> anyhow::Result<()> { - // Make a function to compute the PTS timestamp only if needed by a decoder. - // We want to avoid calling Instant::now() if not needed. - let mut pts = || { - pts.or_else(|| { - self.zero = self.zero.or_else(|| Some(tokio::time::Instant::now())); - hang::Timestamp::from_micros(self.zero.unwrap().elapsed().as_micros() as u64).ok() - }) - .ok_or(crate::TimestampOverflow) - }; - match &mut self.decoder { - DecoderKind::Avc3(decoder) => decoder.decode_frame(buf, pts()?)?, + DecoderKind::Avc3(decoder) => decoder.decode_frame(buf, pts)?, DecoderKind::Fmp4(decoder) => decoder.decode(buf)?, - DecoderKind::Aac(decoder) => decoder.decode(buf, pts()?)?, + DecoderKind::Aac(decoder) => decoder.decode(buf, pts)?, } Ok(()) diff --git a/rs/hang/src/import/fmp4.rs b/rs/hang/src/import/fmp4.rs index f694af828..96d5f9432 100644 --- a/rs/hang/src/import/fmp4.rs +++ b/rs/hang/src/import/fmp4.rs @@ -1,6 +1,4 @@ -use crate::catalog::{ - AudioCodec, AudioConfig, Catalog, CatalogProducer, VideoCodec, VideoConfig, AAC, AV1, H264, H265, VP9, -}; +use crate::catalog::{AudioCodec, AudioConfig, CatalogProducer, VideoCodec, VideoConfig, AAC, AV1, H264, H265, VP9}; use crate::{self as hang, Timestamp}; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; @@ -30,7 +28,8 @@ pub struct Fmp4 { // This `hang` variant includes a catalog. broadcast: hang::BroadcastProducer, - // The catalog being produced + // A clone of the broadcast's catalog for mutable access. + // This is the same underlying catalog (via Arc), just a separate binding. catalog: CatalogProducer, // A lookup to tracks in the broadcast @@ -51,14 +50,12 @@ impl Fmp4 { /// Create a new CMAF importer that will write to the given broadcast. /// /// The broadcast will be populated with tracks as they're discovered in the - /// fMP4 file and the catalog will be automatically generated. - pub fn new(mut broadcast: hang::BroadcastProducer) -> Self { - let catalog = Catalog::default().produce(); - broadcast.insert_track(catalog.consumer.track); - + /// fMP4 file. The catalog from the `hang::BroadcastProducer` is used automatically. + pub fn new(broadcast: hang::BroadcastProducer) -> Self { + let catalog = broadcast.catalog.clone(); Self { broadcast, - catalog: catalog.producer, + catalog, tracks: HashMap::default(), last_keyframe: HashMap::default(), moov: None, @@ -498,7 +495,7 @@ impl Fmp4 { impl Drop for Fmp4 { fn drop(&mut self) { - let mut catalog = self.catalog.lock(); + let mut catalog = self.broadcast.catalog.lock(); for track in self.tracks.values() { tracing::debug!(name = ?track.info.name, "ending track"); diff --git a/rs/hang/src/import/hls.rs b/rs/hang/src/import/hls.rs new file mode 100644 index 000000000..a3c54d4aa --- /dev/null +++ b/rs/hang/src/import/hls.rs @@ -0,0 +1,555 @@ +//! HLS (HTTP Live Streaming) ingest built on top of fMP4. +//! +//! This module provides reusable logic to ingest HLS master/media playlists and +//! feed their fMP4 segments into a `hang` broadcast. It is designed to be +//! independent of any particular HTTP client; callers provide an implementation +//! of [`Fetcher`] to perform the actual network I/O. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Context; +use bytes::Bytes; +use m3u8_rs::{ + AlternativeMedia, AlternativeMediaType, Map, MasterPlaylist, MediaPlaylist, MediaSegment, Resolution, VariantStream, +}; +use reqwest::Client; +use tokio::fs; +use tracing::{debug, info, warn}; +use url::Url; + +use crate::import::Fmp4; +use crate::BroadcastProducer; + +/// Configuration for the single-rendition HLS ingest loop. +#[derive(Clone)] +pub struct HlsConfig { + /// The master or media playlist URL or file path to ingest. + pub playlist: String, + + /// An optional HTTP client to use for fetching the playlist and segments. + /// If not provided, a default client will be created. + pub client: Option, +} + +impl HlsConfig { + pub fn new(playlist: String) -> Self { + Self { playlist, client: None } + } + + /// Parse the playlist string into a URL. + /// If it starts with http:// or https://, parse as URL. + /// Otherwise, treat as a file path and convert to file:// URL. + fn parse_playlist(&self) -> anyhow::Result { + if self.playlist.starts_with("http://") || self.playlist.starts_with("https://") { + Url::parse(&self.playlist).context("invalid playlist URL") + } else { + let path = PathBuf::from(&self.playlist); + let absolute = if path.is_absolute() { + path + } else { + std::env::current_dir()?.join(path) + }; + Url::from_file_path(&absolute).ok().context("invalid file path") + } + } +} + +/// Result of a single ingest step. +struct StepOutcome { + /// Number of media segments written during this step. + pub wrote_segments: usize, + /// Target segment duration (in seconds) from the playlist, if known. + pub target_duration: Option, +} + +/// HLS ingest that pulls an HLS media playlist and feeds the bytes into the fMP4 ingest. +/// +/// Provides `init()` to prime the ingest with initial segments, and `service()` +/// to run the continuous ingest loop. +pub struct Hls { + /// Broadcast that all CMAF importers write into. + broadcast: BroadcastProducer, + + /// fMP4 importers for each discovered video rendition. + /// Each importer feeds a separate MoQ track but shares the same catalog. + video_importers: Vec, + + /// fMP4 importer for the selected audio rendition, if any. + audio_importer: Option, + + client: Client, + /// Parsed base URL for the playlist (file:// or http(s)://). + base_url: Url, + /// All discovered video variants (one per HLS rendition). + video: Vec, + /// Optional audio track shared across variants. + audio: Option, +} + +#[derive(Debug, Clone, Copy)] +enum TrackKind { + Video(usize), + Audio, +} + +struct TrackState { + playlist: Url, + next_sequence: Option, + init_ready: bool, +} + +impl TrackState { + fn new(playlist: Url) -> Self { + Self { + playlist, + next_sequence: None, + init_ready: false, + } + } +} + +impl Hls { + /// Create a new HLS ingest that will write into the given broadcast. + pub fn new(broadcast: BroadcastProducer, cfg: HlsConfig) -> anyhow::Result { + let base_url = cfg.parse_playlist()?; + let client = cfg.client.unwrap_or_else(|| { + Client::builder() + .user_agent(concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"))) + .build() + .unwrap() + }); + Ok(Self { + broadcast, + video_importers: Vec::new(), + audio_importer: None, + client, + base_url, + video: Vec::new(), + audio: None, + }) + } + + /// Fetch the latest playlist, download the init segment, and prime the importer with a buffer of segments. + /// + /// Returns the number of segments buffered during initialization. + pub async fn init(&mut self) -> anyhow::Result<()> { + let buffered = self.prime().await?; + if buffered == 0 { + warn!("HLS playlist had no new segments during init step"); + } else { + info!(count = buffered, "buffered initial HLS segments"); + } + Ok(()) + } + + /// Run the ingest loop until cancelled. + pub async fn run(&mut self) -> anyhow::Result<()> { + loop { + let outcome = self.step().await?; + let delay = self.refresh_delay(outcome.target_duration, outcome.wrote_segments); + + debug!( + wrote = outcome.wrote_segments, + delay = ?delay, + "HLS ingest step complete" + ); + + tokio::time::sleep(delay).await; + } + } + + /// Internal: fetch the latest playlist, download the init segment, and buffer segments. + async fn prime(&mut self) -> anyhow::Result { + self.ensure_tracks().await?; + + let mut buffered = 0usize; + + // Prime all discovered video variants. + // + // Move the video track states out of `self` so we can safely mutate both + // the ingest and the tracks without running into borrow checker issues. + let video_tracks = std::mem::take(&mut self.video); + for (index, mut track) in video_tracks.into_iter().enumerate() { + let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + let count = self + .consume_segments(TrackKind::Video(index), &mut track, &playlist) + .await?; + buffered += count; + self.video.push(track); + } + + // Prime the shared audio track, if any. + if let Some(mut track) = self.audio.take() { + let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + let count = self.consume_segments(TrackKind::Audio, &mut track, &playlist).await?; + buffered += count; + self.audio = Some(track); + } + + Ok(buffered) + } + + /// Perform a single ingest step for all active tracks. + /// + /// This fetches the current media playlists, consumes any fresh segments, + /// and returns how many segments were written along with the target + /// duration to guide scheduling of the next step. + async fn step(&mut self) -> anyhow::Result { + self.ensure_tracks().await?; + + let mut wrote = 0usize; + let mut target_duration = None; + + // Ingest a step from all active video variants. + let video_tracks = std::mem::take(&mut self.video); + for (index, mut track) in video_tracks.into_iter().enumerate() { + let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + // Use the first video's target duration as the base. + if target_duration.is_none() { + target_duration = Some(playlist.target_duration); + } + let count = self + .consume_segments(TrackKind::Video(index), &mut track, &playlist) + .await?; + wrote += count; + self.video.push(track); + } + + // Ingest from the shared audio track, if present. + if let Some(mut track) = self.audio.take() { + let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + if target_duration.is_none() { + target_duration = Some(playlist.target_duration); + } + let count = self.consume_segments(TrackKind::Audio, &mut track, &playlist).await?; + wrote += count; + self.audio = Some(track); + } + + Ok(StepOutcome { + wrote_segments: wrote, + target_duration, + }) + } + + /// Compute the delay before the next ingest step should run. + fn refresh_delay(&self, target_duration: Option, wrote_segments: usize) -> Duration { + let base = target_duration + .map(|dur| Duration::from_secs_f32(dur.max(0.5))) + .unwrap_or_else(|| Duration::from_millis(500)); + if wrote_segments == 0 { + return base / 2; + } + + base + } + + async fn fetch_media_playlist(&self, url: Url) -> anyhow::Result { + let body = self.fetch_bytes(url).await?; + + // Nom errors take ownership of the input, so we need to stringify any error messages. + let playlist = m3u8_rs::parse_media_playlist_res(&body) + .map_err(|e| anyhow::anyhow!("failed to parse media playlist: {}", e))?; + + Ok(playlist) + } + + async fn ensure_tracks(&mut self) -> anyhow::Result<()> { + // Tracks already discovered. + if !self.video.is_empty() { + return Ok(()); + } + + let body = self.fetch_bytes(self.base_url.clone()).await?; + if let Ok((_, master)) = m3u8_rs::parse_master_playlist(&body) { + let variants = select_variants(&master); + anyhow::ensure!(!variants.is_empty(), "no usable variants found in master playlist"); + + // Create a video track state for every usable variant. + for variant in &variants { + let video_url = resolve_uri(&self.base_url, &variant.uri)?; + self.video.push(TrackState::new(video_url)); + } + + // Choose an audio rendition based on the first variant with an audio group. + if let Some(group_id) = variants.iter().find_map(|v| v.audio.as_deref()) { + if let Some(audio_tag) = select_audio(&master, group_id) { + if let Some(uri) = &audio_tag.uri { + let audio_url = resolve_uri(&self.base_url, uri)?; + self.audio = Some(TrackState::new(audio_url)); + } else { + warn!(%group_id, "audio rendition missing URI"); + } + } else { + warn!(%group_id, "audio group not found in master playlist"); + } + } + + let audio_url = self.audio.as_ref().map(|a| a.playlist.to_string()); + info!( + video_variants = variants.len(), + audio = audio_url.as_deref().unwrap_or("none"), + "selected master playlist renditions" + ); + + return Ok(()); + } + + // Fallback: treat the provided URL as a single media playlist. + self.video.push(TrackState::new(self.base_url.clone())); + Ok(()) + } + + async fn consume_segments( + &mut self, + kind: TrackKind, + track: &mut TrackState, + playlist: &MediaPlaylist, + ) -> anyhow::Result { + self.ensure_init_segment(kind, track, playlist).await?; + + // Skip segments we've already seen + let skip = track.next_sequence.unwrap_or(0).saturating_sub(playlist.media_sequence) as usize; + let base_seq = playlist.media_sequence + skip as u64; + for (i, segment) in playlist.segments[skip..].iter().enumerate() { + self.push_segment(kind, track, segment, base_seq + i as u64).await?; + } + let consumed = playlist.segments.len() - skip; + + if consumed == 0 { + debug!(?kind, "no fresh HLS segments available"); + } + + Ok(consumed) + } + + async fn ensure_init_segment( + &mut self, + kind: TrackKind, + track: &mut TrackState, + playlist: &MediaPlaylist, + ) -> anyhow::Result<()> { + if track.init_ready { + return Ok(()); + } + + let map = self.find_map(playlist).context("playlist missing EXT-X-MAP")?; + + let url = resolve_uri(&track.playlist, &map.uri)?; + let mut bytes = self.fetch_bytes(url).await?; + let importer = match kind { + TrackKind::Video(index) => self.ensure_video_importer_for(index), + TrackKind::Audio => self.ensure_audio_importer(), + }; + + importer.decode(&mut bytes).context("init segment parse error")?; + + anyhow::ensure!(bytes.is_empty(), "init segment was not fully consumed"); + anyhow::ensure!( + importer.is_initialized(), + "init segment did not initialize the importer" + ); + + track.init_ready = true; + info!(?kind, "loaded HLS init segment"); + Ok(()) + } + + async fn push_segment( + &mut self, + kind: TrackKind, + track: &mut TrackState, + segment: &MediaSegment, + sequence: u64, + ) -> anyhow::Result<()> { + anyhow::ensure!(!segment.uri.is_empty(), "encountered segment with empty URI"); + + let url = resolve_uri(&track.playlist, &segment.uri)?; + let mut bytes = self.fetch_bytes(url).await?; + + let importer = match kind { + TrackKind::Video(index) => self.ensure_video_importer_for(index), + TrackKind::Audio => self.ensure_audio_importer(), + }; + + importer.decode(&mut bytes).context("failed to parse media segment")?; + track.next_sequence = Some(sequence + 1); + + Ok(()) + } + + fn find_map<'a>(&self, playlist: &'a MediaPlaylist) -> Option<&'a Map> { + playlist.segments.iter().find_map(|segment| segment.map.as_ref()) + } + + async fn fetch_bytes(&self, url: Url) -> anyhow::Result { + if url.scheme() == "file" { + let path = url.to_file_path().ok().context("invalid file URL")?; + let bytes = fs::read(&path).await.context("failed to read file")?; + Ok(Bytes::from(bytes)) + } else { + let response = self.client.get(url).send().await?; + let response = response.error_for_status()?; + let bytes = response.bytes().await.context("failed to read response body")?; + Ok(bytes) + } + } + + /// Create or retrieve the fMP4 importer for a specific video rendition. + /// + /// Each video variant gets its own importer so that their tracks remain + /// independent while still contributing to the same shared catalog. + fn ensure_video_importer_for(&mut self, index: usize) -> &mut Fmp4 { + while self.video_importers.len() <= index { + let importer = Fmp4::new(self.broadcast.clone()); + self.video_importers.push(importer); + } + + self.video_importers.get_mut(index).unwrap() + } + + /// Create or retrieve the fMP4 importer for the audio rendition. + fn ensure_audio_importer(&mut self) -> &mut Fmp4 { + self.audio_importer + .get_or_insert_with(|| Fmp4::new(self.broadcast.clone())) + } + + #[cfg(test)] + fn has_video_importer(&self) -> bool { + !self.video_importers.is_empty() + } + + #[cfg(test)] + fn has_audio_importer(&self) -> bool { + self.audio_importer.is_some() + } +} + +fn select_audio<'a>(master: &'a MasterPlaylist, group_id: &str) -> Option<&'a AlternativeMedia> { + let mut first = None; + let mut default = None; + + for alternative in master + .alternatives + .iter() + .filter(|alt| alt.media_type == AlternativeMediaType::Audio && alt.group_id == group_id) + { + if first.is_none() { + first = Some(alternative); + } + if alternative.default { + default = Some(alternative); + break; + } + } + + default.or(first) +} + +fn select_variants(master: &MasterPlaylist) -> Vec<&VariantStream> { + // Helper to extract the first video codec token from the CODECS attribute. + fn first_video_codec(variant: &VariantStream) -> Option<&str> { + let codecs = variant.codecs.as_deref()?; + codecs.split(',').map(|s| s.trim()).find(|s| !s.is_empty()) + } + + // Map codec strings into a coarse "family" so we can prefer H.264 over others. + fn codec_family(codec: &str) -> Option<&'static str> { + if codec.starts_with("avc1.") || codec.starts_with("avc3.") { + Some("h264") + } else { + None + } + } + + // Consider only non-i-frame variants with a URI and a known codec family. + let candidates: Vec<(&VariantStream, &str, &str)> = master + .variants + .iter() + .filter(|variant| !variant.is_i_frame && !variant.uri.is_empty()) + .filter_map(|variant| { + let codec = first_video_codec(variant)?; + let family = codec_family(codec)?; + Some((variant, codec, family)) + }) + .collect(); + + if candidates.is_empty() { + return Vec::new(); + } + + // Prefer families in this order, falling back to the first available. + const FAMILY_PREFERENCE: &[&str] = &["h264"]; + + let families_present: Vec<&str> = candidates.iter().map(|(_, _, fam)| *fam).collect(); + + let target_family = FAMILY_PREFERENCE + .iter() + .find(|fav| families_present.iter().any(|fam| fam == *fav)) + .copied() + .unwrap_or(families_present[0]); + + // Keep only variants in the chosen family. + let family_variants: Vec<&VariantStream> = candidates + .into_iter() + .filter(|(_, _, fam)| *fam == target_family) + .map(|(variant, _, _)| variant) + .collect(); + + // Deduplicate by resolution, keeping the lowest-bandwidth variant for each size. + let mut by_resolution: HashMap, &VariantStream> = HashMap::new(); + + for variant in family_variants { + let key = variant.resolution; + let bandwidth = variant.average_bandwidth.unwrap_or(variant.bandwidth); + + match by_resolution.entry(key) { + Entry::Vacant(entry) => { + entry.insert(variant); + } + Entry::Occupied(mut entry) => { + let existing = entry.get(); + let existing_bw = existing.average_bandwidth.unwrap_or(existing.bandwidth); + if bandwidth < existing_bw { + entry.insert(variant); + } + } + } + } + + by_resolution.values().cloned().collect() +} + +fn resolve_uri(base: &Url, value: &str) -> std::result::Result { + if let Ok(url) = Url::parse(value) { + return Ok(url); + } + + base.join(value) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn hls_config_new_sets_fields() { + let url = "https://example.com/stream.m3u8".to_string(); + let cfg = HlsConfig::new(url.clone()); + assert_eq!(cfg.playlist, url); + } + + #[test] + fn hls_ingest_starts_without_importers() { + let broadcast = moq_lite::Broadcast::produce().producer.into(); + let url = "https://example.com/master.m3u8".to_string(); + let cfg = HlsConfig::new(url); + let hls = Hls::new(broadcast, cfg).unwrap(); + + assert!(!hls.has_video_importer()); + assert!(!hls.has_audio_importer()); + } +} diff --git a/rs/hang/src/import/mod.rs b/rs/hang/src/import/mod.rs index 8bd1555dc..763e8bc67 100644 --- a/rs/hang/src/import/mod.rs +++ b/rs/hang/src/import/mod.rs @@ -2,8 +2,10 @@ mod aac; mod avc3; mod decoder; mod fmp4; +mod hls; pub use aac::*; pub use avc3::*; pub use decoder::*; pub use fmp4::*; +pub use hls::*; diff --git a/rs/hang/src/import/stream.rs b/rs/hang/src/import/stream.rs deleted file mode 100644 index 8b1378917..000000000 --- a/rs/hang/src/import/stream.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/rs/hang/src/lib.rs b/rs/hang/src/lib.rs index 7888e3a0a..b38b0dc55 100644 --- a/rs/hang/src/lib.rs +++ b/rs/hang/src/lib.rs @@ -11,7 +11,8 @@ //! - **Catalog**: A list of available tracks and their metadata. //! - **Codec support**: Integration with common audio/video codecs //! - **Container**: A simple timestamped container format. -//! - **CMAF Import**: Convert a fMP4 file into a hang broadcast. +//! - **fMP4 Import**: Convert a fMP4 file into a hang broadcast. +//! - **HLS Import**: Reusable HLS/fMP4 ingest logic via [`hls`]. //! mod error; diff --git a/rs/libmoq/src/publish.rs b/rs/libmoq/src/publish.rs index 6b3dbd840..27747780a 100644 --- a/rs/libmoq/src/publish.rs +++ b/rs/libmoq/src/publish.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; use moq_lite::coding::Buf; @@ -31,8 +31,10 @@ impl Publish { pub fn media_ordered(&mut self, broadcast: Id, format: &str, mut init: &[u8]) -> Result { let broadcast = self.broadcasts.get(broadcast).ok_or(Error::NotFound)?; - let mut decoder = hang::import::Decoder::new(broadcast.clone(), format) - .ok_or_else(|| Error::UnknownFormat(format.to_string()))?; + + let format = + hang::import::DecoderFormat::from_str(format).map_err(|_| Error::UnknownFormat(format.to_string()))?; + let mut decoder = hang::import::Decoder::new(broadcast.clone(), format); decoder .initialize(&mut init)