diff --git a/Cargo.lock b/Cargo.lock index 80bab660e..b30ba8380 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1373,7 +1373,7 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libmoq" -version = "0.1.2" +version = "0.2.0" dependencies = [ "anyhow", "cbindgen", diff --git a/deno.lock b/deno.lock index a3a2ebc8b..cd8c796ae 100644 --- a/deno.lock +++ b/deno.lock @@ -60,7 +60,7 @@ "packageJson": { "dependencies": [ "npm:@rollup/plugin-node-resolve@^16.0.3", - "npm:happy-dom@^13.3.5", + "npm:happy-dom@^20.0.11", "npm:rimraf@^6.0.1", "npm:rollup-plugin-esbuild@^6.2.1", "npm:rollup@^4.53.3", diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index f2f5604dc..9ad11e9b2 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -76,7 +76,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu // Create a map of video renditions // Multiple renditions allow the viewer to choose based on their capabilities - let mut renditions = std::collections::HashMap::new(); + let mut renditions = std::collections::BTreeMap::new(); renditions.insert(video_track.name.clone(), video_config); // Create the video catalog entry with the renditions diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index 76a3772a6..7322bc307 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -4,7 +4,7 @@ mod codec; pub use aac::*; pub use codec::*; -use std::collections::HashMap; +use std::collections::BTreeMap; use bytes::Bytes; @@ -21,7 +21,8 @@ use serde_with::{hex::Hex, DisplayFromStr}; pub struct Audio { /// A map of track name to rendition configuration. /// This is not an array so it will work with JSON Merge Patch. - pub renditions: HashMap, + /// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior. + pub renditions: BTreeMap, /// The priority of the audio track, relative to other tracks in the broadcast. pub priority: u8, diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 6d2840078..b9609c26c 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use crate::catalog::{Audio, AudioConfig, Chat, Track, User, Video, VideoConfig}; use crate::Result; -use moq_lite::Produce; +use moq_lite::Pair; /// A catalog track, created by a broadcaster to describe the tracks available in a broadcast. #[serde_with::serde_as] @@ -83,10 +83,10 @@ impl Catalog { } /// Produce a catalog track that describes the available media tracks. - pub fn produce(self) -> Produce { + pub fn produce(self) -> Pair { let track = Catalog::default_track().produce(); - Produce { + Pair { producer: CatalogProducer::new(track.producer, self), consumer: track.consumer.into(), } @@ -267,14 +267,14 @@ impl From for CatalogConsumer { #[cfg(test)] mod test { + use std::collections::BTreeMap; + use crate::catalog::{AudioCodec::Opus, AudioConfig, VideoConfig, H264}; use super::*; #[test] fn simple() { - use std::collections::HashMap; - let mut encoded = r#"{ "video": { "renditions": { @@ -304,7 +304,7 @@ mod test { encoded.retain(|c| !c.is_whitespace()); - let mut video_renditions = HashMap::new(); + let mut video_renditions = BTreeMap::new(); video_renditions.insert( "video".to_string(), VideoConfig { @@ -326,7 +326,7 @@ mod test { }, ); - let mut audio_renditions = HashMap::new(); + let mut audio_renditions = BTreeMap::new(); audio_renditions.insert( "audio".to_string(), AudioConfig { diff --git a/rs/hang/src/catalog/video/mod.rs b/rs/hang/src/catalog/video/mod.rs index cd2466471..4af6de7f0 100644 --- a/rs/hang/src/catalog/video/mod.rs +++ b/rs/hang/src/catalog/video/mod.rs @@ -10,7 +10,7 @@ pub use h264::*; pub use h265::*; pub use vp9::*; -use std::collections::HashMap; +use std::collections::BTreeMap; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -27,7 +27,8 @@ use serde_with::{hex::Hex, DisplayFromStr}; pub struct Video { /// A map of track name to rendition configuration. /// This is not an array in order for it to work with JSON Merge Patch. - pub renditions: HashMap, + /// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior. + pub renditions: BTreeMap, /// The priority of the video track, relative to other tracks in the broadcast. pub priority: u8, diff --git a/rs/hang/src/model/broadcast.rs b/rs/hang/src/model/broadcast.rs index 9d5b68727..62fb6eed4 100644 --- a/rs/hang/src/model/broadcast.rs +++ b/rs/hang/src/model/broadcast.rs @@ -3,7 +3,10 @@ use std::{ sync::{atomic, Arc}, }; -use crate::catalog::{Catalog, CatalogProducer}; +use crate::{ + catalog::{Catalog, CatalogConsumer, CatalogProducer}, + TrackConsumer, +}; #[derive(Clone)] pub struct BroadcastProducer { @@ -32,6 +35,12 @@ impl BroadcastProducer { } } +impl Default for BroadcastProducer { + fn default() -> Self { + Self::new(moq_lite::BroadcastProducer::default()) + } +} + impl Deref for BroadcastProducer { type Target = moq_lite::BroadcastProducer; @@ -58,4 +67,33 @@ impl From for moq_lite::BroadcastProducer { } } -// TODO BroadcastConsumer +#[derive(Clone)] +pub struct BroadcastConsumer { + pub inner: moq_lite::BroadcastConsumer, + pub catalog: CatalogConsumer, +} + +impl BroadcastConsumer { + pub fn new(inner: moq_lite::BroadcastConsumer) -> Self { + let catalog = inner.subscribe_track(&Catalog::default_track()).into(); + Self { inner, catalog } + } + + pub fn subscribe(&self, track: &moq_lite::Track, latency: std::time::Duration) -> TrackConsumer { + TrackConsumer::new(self.inner.subscribe_track(track), latency) + } +} + +impl Deref for BroadcastConsumer { + type Target = moq_lite::BroadcastConsumer; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl From for BroadcastConsumer { + fn from(inner: moq_lite::BroadcastConsumer) -> Self { + Self::new(inner) + } +} diff --git a/rs/hang/src/model/frame.rs b/rs/hang/src/model/frame.rs index d02b7d7ca..e7a0c1e69 100644 --- a/rs/hang/src/model/frame.rs +++ b/rs/hang/src/model/frame.rs @@ -1,8 +1,9 @@ -use buf_list::BufList; use derive_more::Debug; use crate::Timestamp; +pub use buf_list::BufList; + /// A media frame with a timestamp and codec-specific payload. /// /// Frames are the fundamental unit of media data in hang. Each frame contains: diff --git a/rs/hang/src/model/track.rs b/rs/hang/src/model/track.rs index 487ddde07..cd092571a 100644 --- a/rs/hang/src/model/track.rs +++ b/rs/hang/src/model/track.rs @@ -91,8 +91,8 @@ impl TrackProducer { /// /// Multiple consumers can be created from the same producer, each receiving /// a copy of all data written to the track. - pub fn consume(&self) -> TrackConsumer { - TrackConsumer::new(self.inner.consume()) + pub fn consume(&self, max_latency: std::time::Duration) -> TrackConsumer { + TrackConsumer::new(self.inner.consume(), max_latency) } } @@ -132,18 +132,18 @@ pub struct TrackConsumer { max_timestamp: Timestamp, // The maximum buffer size before skipping a group. - latency: std::time::Duration, + max_latency: std::time::Duration, } impl TrackConsumer { /// Create a new TrackConsumer wrapping the given moq-lite consumer. - pub fn new(inner: moq_lite::TrackConsumer) -> Self { + pub fn new(inner: moq_lite::TrackConsumer, max_latency: std::time::Duration) -> Self { Self { inner, current: None, pending: VecDeque::new(), max_timestamp: Timestamp::default(), - latency: std::time::Duration::ZERO, + max_latency, } } @@ -155,7 +155,7 @@ impl TrackConsumer { /// /// Returns `None` when the track has ended. pub async fn read_frame(&mut self) -> Result, Error> { - let latency = self.latency.try_into()?; + let latency = self.max_latency.try_into()?; loop { let cutoff = self .max_timestamp @@ -209,7 +209,7 @@ impl TrackConsumer { }, Some((index, timestamp)) = buffering.next() => { if self.current.is_some() { - tracing::debug!(old = ?self.max_timestamp, new = ?timestamp, buffer = ?self.latency, "skipping slow group"); + tracing::debug!(old = ?self.max_timestamp, new = ?timestamp, buffer = ?self.max_latency, "skipping slow group"); } drop(buffering); @@ -228,9 +228,9 @@ impl TrackConsumer { /// Set the maximum latency tolerance for this consumer. /// - /// Groups with timestamps older than `max_timestamp - latency` will be skipped. - pub fn set_latency(&mut self, max: std::time::Duration) { - self.latency = max; + /// Groups with timestamps older than `max_timestamp - max_latency` will be skipped. + pub fn set_max_latency(&mut self, max: std::time::Duration) { + self.max_latency = max; } /// Wait until the track is closed. @@ -239,12 +239,6 @@ impl TrackConsumer { } } -impl From for TrackConsumer { - fn from(inner: moq_lite::TrackConsumer) -> Self { - Self::new(inner) - } -} - impl From for moq_lite::TrackConsumer { fn from(inner: TrackConsumer) -> Self { inner.inner diff --git a/rs/libmoq/CMakeLists.txt b/rs/libmoq/CMakeLists.txt index 692435e5c..7957ae697 100644 --- a/rs/libmoq/CMakeLists.txt +++ b/rs/libmoq/CMakeLists.txt @@ -44,12 +44,17 @@ endif() file(MAKE_DIRECTORY ${RUST_DIR}) file(MAKE_DIRECTORY ${RUST_TARGET_DIR}/include) -# Create imported library target -add_library(moq STATIC IMPORTED GLOBAL) -set_target_properties(moq PROPERTIES - IMPORTED_LOCATION ${RUST_LIB} - INTERFACE_INCLUDE_DIRECTORIES ${RUST_TARGET_DIR}/include -) +# Create interface library target that wraps the Rust static library +add_library(moq INTERFACE) +target_include_directories(moq INTERFACE ${RUST_TARGET_DIR}/include) + +# Force absolute path to be used in linker command +target_link_options(moq INTERFACE "$") + +# Link required system frameworks on macOS +if(APPLE) + target_link_options(moq INTERFACE "LINKER:-framework,CoreFoundation" "LINKER:-framework,Security") +endif() if(BUILD_RUST_LIB) add_dependencies(moq rust_build) @@ -58,6 +63,7 @@ endif() if(PROJECT_IS_TOP_LEVEL) include(GNUInstallDirs) + # Install header and library files install(FILES ${RUST_HEADER} DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} ) @@ -66,25 +72,14 @@ if(PROJECT_IS_TOP_LEVEL) DESTINATION ${CMAKE_INSTALL_LIBDIR} ) - install(TARGETS moq - EXPORT moq-targets - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - ) - - install(EXPORT moq-targets - FILE moq-targets.cmake - NAMESPACE moq:: - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/moq - ) - + # Generate and install CMake package config files include(CMakePackageConfigHelpers) configure_package_config_file( ${CMAKE_CURRENT_SOURCE_DIR}/cmake/moq-config.cmake.in ${CMAKE_CURRENT_BINARY_DIR}/moq-config.cmake INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/moq + PATH_VARS CMAKE_INSTALL_LIBDIR CMAKE_INSTALL_INCLUDEDIR ) write_basic_package_version_file( diff --git a/rs/libmoq/Cargo.toml b/rs/libmoq/Cargo.toml index a41f6c95c..8d62a12f3 100644 --- a/rs/libmoq/Cargo.toml +++ b/rs/libmoq/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Luke Curley ", "Brian Medley " repository = "https://github.com/moq-dev/moq" license = "MIT OR Apache-2.0" -version = "0.1.2" +version = "0.2.0" edition = "2021" keywords = ["quic", "http3", "webtransport", "media", "live"] diff --git a/rs/libmoq/build.rs b/rs/libmoq/build.rs index ba6067c32..6123f010b 100644 --- a/rs/libmoq/build.rs +++ b/rs/libmoq/build.rs @@ -24,9 +24,18 @@ fn main() { let pc_in = PathBuf::from(&crate_dir).join(format!("{}.pc.in", LIB_NAME)); let pc_out = target_dir.join(format!("{}.pc", LIB_NAME)); if let Ok(template) = fs::read_to_string(&pc_in) { + let target = env::var("TARGET").unwrap(); + let libs_private = if target.contains("apple") { + "-framework CoreFoundation -framework Security" + } else if target.contains("windows") { + "-lws2_32 -lbcrypt -luserenv -lntdll" + } else { + "-ldl -lm -lpthread" + }; + let content = template - .replace("@PREFIX@", "/usr/local") - .replace("@VERSION@", &version); + .replace("@VERSION@", &version) + .replace("@LIBS_PRIVATE@", libs_private); fs::write(&pc_out, content).expect("Failed to write pkg-config file"); } } diff --git a/rs/libmoq/moq.pc.in b/rs/libmoq/moq.pc.in index 03a11706d..0bbce69d9 100644 --- a/rs/libmoq/moq.pc.in +++ b/rs/libmoq/moq.pc.in @@ -1,4 +1,4 @@ -prefix=@PREFIX@ +prefix=${pcfiledir}/../.. exec_prefix=${prefix} libdir=${exec_prefix}/lib includedir=${prefix}/include @@ -7,4 +7,5 @@ Name: moq Description: Media over QUIC C Interface Version: @VERSION@ Libs: -L${libdir} -lmoq +Libs.private: -framework CoreFoundation -framework Security Cflags: -I${includedir} diff --git a/rs/libmoq/src/api.rs b/rs/libmoq/src/api.rs new file mode 100644 index 000000000..f8bf830ea --- /dev/null +++ b/rs/libmoq/src/api.rs @@ -0,0 +1,551 @@ +use crate::{ffi, Error, State}; + +use std::ffi::c_char; +use std::ffi::c_void; +use std::str::FromStr; + +use tracing::Level; + +/// Information about a video rendition in the catalog. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_video_config { + /// The name of the track, NOT NULL terminated. + pub name: *const c_char, + pub name_len: usize, + + /// The codec of the track, NOT NULL terminated + pub codec: *const c_char, + pub codec_len: usize, + + /// The description of the track, or NULL if not used. + /// This is codec specific, for example H264: + /// - NULL: annex.b encoded + /// - Non-NULL: AVCC encoded + pub description: *const u8, + pub description_len: usize, + + /// The encoded width/height of the media, or NULL if not available + pub coded_width: *const u32, + pub coded_height: *const u32, +} + +/// Information about an audio rendition in the catalog. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_audio_config { + /// The name of the track, NOT NULL terminated + pub name: *const c_char, + pub name_len: usize, + + /// The codec of the track, NOT NULL terminated + pub codec: *const c_char, + pub codec_len: usize, + + /// The description of the track, or NULL if not used. + pub description: *const u8, + pub description_len: usize, + + /// The sample rate of the track in Hz + pub sample_rate: u32, + + /// The number of channels in the track + pub channel_count: u32, +} + +/// Information about a frame of media. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_frame { + /// The payload of the frame, or NULL/0 if the stream has ended + pub payload: *const u8, + pub payload_size: usize, + + /// The presentation timestamp of the frame in microseconds + pub timestamp_us: u64, + + /// Whether the frame is a keyframe, aka the start of a new group. + pub keyframe: bool, +} + +/// Information about a broadcast announced by an origin. +#[repr(C)] +#[allow(non_camel_case_types)] +pub struct moq_announced { + /// The path of the broadcast, NOT NULL terminated + pub path: *const c_char, + pub path_len: usize, + + /// Whether the broadcast is active or has ended + /// This MUST toggle between true and false over the lifetime of the broadcast + pub active: bool, +} + +/// Initialize the library with a log level. +/// +/// This should be called before any other functions. +/// The log_level is a string: "error", "warn", "info", "debug", "trace" +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that level is a valid pointer to level_len bytes of data. +#[no_mangle] +pub unsafe extern "C" fn moq_log_level(level: *const c_char, level_len: usize) -> i32 { + ffi::enter(move || { + match unsafe { ffi::parse_str(level, level_len)? } { + "" => moq_native::Log::default(), + level => moq_native::Log { + level: Level::from_str(level)?, + }, + } + .init(); + + Ok(()) + }) +} + +/// Start establishing a connection to a MoQ server. +/// +/// Takes origin handles, which are used for publishing and consuming broadcasts respectively. +/// - Any broadcasts in `origin_publish` will be announced to the server. +/// - Any broadcasts announced by the server will be available in `origin_consume`. +/// - If an origin handle is 0, that functionality is completely disabled. +/// +/// This may be called multiple times to connect to different servers. +/// Origins can be shared across sessions, useful for fanout or relaying. +/// +/// Returns a non-zero handle to the session on success, or a negative code on (immediate) failure. +/// You should call [moq_session_close], even on error, to free up resources. +/// +/// The callback is called on success (status 0) and later when closed (status non-zero). +/// +/// # Safety +/// - The caller must ensure that url is a valid pointer to url_len bytes of data. +/// - The caller must ensure that `on_status` is valid until [moq_session_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_session_connect( + url: *const c_char, + url_len: usize, + origin_publish: u32, + origin_consume: u32, + on_status: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let url = ffi::parse_url(url, url_len)?; + + let mut state = State::lock(); + let publish = ffi::parse_id_optional(origin_publish)? + .map(|id| state.origin.get(id)) + .transpose()? + .map(|origin: &moq_lite::OriginProducer| origin.consume()); + let consume = ffi::parse_id_optional(origin_consume)? + .map(|id| state.origin.get(id)) + .transpose()? + .cloned(); + + let on_status = ffi::OnStatus::new(user_data, on_status); + state.session.connect(url, publish, consume, on_status) + }) +} + +/// Close a connection to a MoQ server. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// The [moq_session_connect] `on_status` callback will be called with [Error::Closed]. +#[no_mangle] +pub extern "C" fn moq_session_close(session: u32) -> i32 { + ffi::enter(move || { + let session = ffi::parse_id(session)?; + State::lock().session.close(session) + }) +} + +/// Create an origin for publishing broadcasts. +/// +/// Origins contain any number of broadcasts addressed by path. +/// The same broadcast can be published to multiple origins under different paths. +/// +/// [moq_origin_announced] can be used to discover broadcasts published to this origin. +/// This is extremely useful for discovering what is available on the server to [moq_origin_consume]. +/// +/// Returns a non-zero handle to the origin on success. +#[no_mangle] +pub extern "C" fn moq_origin_create() -> i32 { + ffi::enter(move || State::lock().origin.create()) +} + +/// Publish a broadcast to an origin. +/// +/// The broadcast will be announced to any origin consumers, such as over the network. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that path is a valid pointer to path_len bytes of data. +#[no_mangle] +pub unsafe extern "C" fn moq_origin_publish(origin: u32, path: *const c_char, path_len: usize, broadcast: u32) -> i32 { + ffi::enter(move || { + let origin = ffi::parse_id(origin)?; + let path = unsafe { ffi::parse_str(path, path_len)? }; + let broadcast = ffi::parse_id(broadcast)?; + + let mut state = State::lock(); + let broadcast = state.publish.get(broadcast)?.consume(); + state.origin.publish(origin, path, broadcast) + }) +} + +/// Learn about all broadcasts published to an origin. +/// +/// The callback is called with an announced ID when a new broadcast is published. +/// +/// - [moq_origin_announced_info] is used to query information about the broadcast. +/// - [moq_origin_announced_close] is used to stop receiving announcements. +/// +/// Returns a non-zero handle on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `on_announce` is valid until [moq_origin_announced_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_origin_announced( + origin: u32, + on_announce: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let origin = ffi::parse_id(origin)?; + let on_announce = ffi::OnStatus::new(user_data, on_announce); + State::lock().origin.announced(origin, on_announce) + }) +} + +/// Query information about a broadcast discovered by [moq_origin_announced]. +/// +/// The destination is filled with the broadcast information. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `dst` is a valid pointer to a [moq_announced] struct. +#[no_mangle] +pub unsafe extern "C" fn moq_origin_announced_info(announced: u32, dst: *mut moq_announced) -> i32 { + ffi::enter(move || { + let announced = ffi::parse_id(announced)?; + let dst = dst.as_mut().ok_or(Error::InvalidPointer)?; + State::lock().origin.announced_info(announced, dst) + }) +} + +/// Stop receiving announcements for broadcasts published to an origin. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_origin_announced_close(announced: u32) -> i32 { + ffi::enter(move || { + let announced = ffi::parse_id(announced)?; + State::lock().origin.announced_close(announced) + }) +} + +/// Consume a broadcast from an origin by path. +/// +/// Returns a non-zero handle to the broadcast on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that path is a valid pointer to path_len bytes of data. +#[no_mangle] +pub unsafe extern "C" fn moq_origin_consume(origin: u32, path: *const c_char, path_len: usize) -> i32 { + ffi::enter(move || { + let origin = ffi::parse_id(origin)?; + let path = unsafe { ffi::parse_str(path, path_len)? }; + + let mut state = State::lock(); + let broadcast = state.origin.consume(origin, path)?; + Ok(state.consume.start(broadcast.into())) + }) +} + +/// Close an origin and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_origin_close(origin: u32) -> i32 { + ffi::enter(move || { + let origin = ffi::parse_id(origin)?; + State::lock().origin.close(origin) + }) +} + +/// Create a new broadcast for publishing media tracks. +/// +/// Returns a non-zero handle to the broadcast on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_publish_create() -> i32 { + ffi::enter(move || State::lock().publish.create()) +} + +/// Close a broadcast and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_publish_close(broadcast: u32) -> i32 { + ffi::enter(move || { + let broadcast = ffi::parse_id(broadcast)?; + State::lock().publish.close(broadcast) + }) +} + +/// Create a new media track for a broadcast +/// +/// All frames in [moq_publish_media_frame] must be written in decode order. +/// The `format` controls the encoding, both of `init` and frame payloads. +/// +/// Returns a non-zero handle to the track on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that format is a valid pointer to format_len bytes of data. +/// - The caller must ensure that init is a valid pointer to init_size bytes of data. +#[no_mangle] +pub unsafe extern "C" fn moq_publish_media_ordered( + broadcast: u32, + format: *const c_char, + format_len: usize, + init: *const u8, + init_size: usize, +) -> i32 { + ffi::enter(move || { + let broadcast = ffi::parse_id(broadcast)?; + let format = unsafe { ffi::parse_str(format, format_len)? }; + let init = unsafe { ffi::parse_slice(init, init_size)? }; + + State::lock().publish.media_ordered(broadcast, format, init) + }) +} + +/// Remove a track from a broadcast. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_publish_media_close(export: u32) -> i32 { + ffi::enter(move || { + let export = ffi::parse_id(export)?; + State::lock().publish.media_close(export) + }) +} + +/// Write data to a track. +/// +/// The encoding of `data` depends on the track `format`. +/// The timestamp is in microseconds. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that payload is a valid pointer to payload_size bytes of data. +#[no_mangle] +pub unsafe extern "C" fn moq_publish_media_frame( + media: u32, + payload: *const u8, + payload_size: usize, + timestamp_us: u64, +) -> i32 { + ffi::enter(move || { + let media = ffi::parse_id(media)?; + let payload = unsafe { ffi::parse_slice(payload, payload_size)? }; + let timestamp = hang::Timestamp::from_micros(timestamp_us)?; + State::lock().publish.media_frame(media, payload, timestamp) + }) +} + +/// Create a catalog consumer for a broadcast. +/// +/// The callback is called with a catalog ID when a new catalog is available. +/// The catalog ID can be used to query video/audio track information. +/// +/// Returns a non-zero handle on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `on_catalog` is valid until [moq_consume_catalog_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_consume_catalog( + broadcast: u32, + on_catalog: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let broadcast = ffi::parse_id(broadcast)?; + let on_catalog = ffi::OnStatus::new(user_data, on_catalog); + State::lock().consume.catalog(broadcast, on_catalog) + }) +} + +/// Close a catalog consumer and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_consume_catalog_close(catalog: u32) -> i32 { + ffi::enter(move || { + let catalog = ffi::parse_id(catalog)?; + State::lock().consume.catalog_close(catalog) + }) +} + +/// Query information about a video track in a catalog. +/// +/// The destination is filled with the video track information. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `dst` is a valid pointer to a [moq_video_config] struct. +/// - The caller must ensure that `dst` is not used after [moq_consume_catalog_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_consume_video_config(catalog: u32, index: u32, dst: *mut moq_video_config) -> i32 { + ffi::enter(move || { + let catalog = ffi::parse_id(catalog)?; + let index = index as usize; + let dst = dst.as_mut().ok_or(Error::InvalidPointer)?; + State::lock().consume.video_config(catalog, index, dst) + }) +} + +/// Query information about an audio track in a catalog. +/// +/// The destination is filled with the audio track information. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `dst` is a valid pointer to a [moq_audio_config] struct. +/// - The caller must ensure that `dst` is not used after [moq_consume_catalog_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_consume_audio_config(catalog: u32, index: u32, dst: *mut moq_audio_config) -> i32 { + ffi::enter(move || { + let catalog = ffi::parse_id(catalog)?; + let index = index as usize; + let dst = dst.as_mut().ok_or(Error::InvalidPointer)?; + State::lock().consume.audio_config(catalog, index, dst) + }) +} + +/// Consume a video track from a broadcast, delivering frames in order. +/// +/// - `max_latency_ms` controls the maximum amount of buffering allowed before skipping a GoP. +/// - `on_frame` is called with a frame ID when a new frame is available. +/// +/// Returns a non-zero handle to the track on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `on_frame` is valid until [moq_consume_video_track_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_consume_video_ordered( + broadcast: u32, + index: u32, + max_latency_ms: u64, + on_frame: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let broadcast = ffi::parse_id(broadcast)?; + let index = index as usize; + let max_latency = std::time::Duration::from_millis(max_latency_ms); + let on_frame = ffi::OnStatus::new(user_data, on_frame); + State::lock() + .consume + .video_ordered(broadcast, index, max_latency, on_frame) + }) +} + +/// Close a video track consumer and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_consume_video_close(track: u32) -> i32 { + ffi::enter(move || { + let track = ffi::parse_id(track)?; + State::lock().consume.video_close(track) + }) +} + +/// Consume an audio track from a broadcast, emitting the frames in order. +/// +/// The callback is called with a frame ID when a new frame is available. +/// The `max_latency_ms` parameter controls how long to wait before skipping frames. +/// +/// Returns a non-zero handle to the track on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `on_frame` is valid until [moq_consume_audio_close] is called. +#[no_mangle] +pub unsafe extern "C" fn moq_consume_audio_ordered( + broadcast: u32, + index: u32, + max_latency_ms: u64, + on_frame: Option, + user_data: *mut c_void, +) -> i32 { + ffi::enter(move || { + let broadcast = ffi::parse_id(broadcast)?; + let index = index as usize; + let max_latency = std::time::Duration::from_millis(max_latency_ms); + let on_frame = ffi::OnStatus::new(user_data, on_frame); + State::lock() + .consume + .audio_ordered(broadcast, index, max_latency, on_frame) + }) +} + +/// Close an audio track consumer and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_consume_audio_close(track: u32) -> i32 { + ffi::enter(move || { + let track = ffi::parse_id(track)?; + State::lock().consume.audio_close(track) + }) +} + +/// Get a chunk of a frame's payload. +/// +/// Frames may be split into multiple chunks. Call this multiple times with increasing +/// index values to get all chunks. The destination is filled with the frame chunk information. +/// +/// Returns a zero on success, or a negative code on failure. +/// +/// # Safety +/// - The caller must ensure that `dst` is a valid pointer to a [moq_frame] struct. +#[no_mangle] +pub unsafe extern "C" fn moq_consume_frame_chunk(frame: u32, index: u32, dst: *mut moq_frame) -> i32 { + ffi::enter(move || { + let frame = ffi::parse_id(frame)?; + let index = index as usize; + let dst = dst.as_mut().ok_or(Error::InvalidPointer)?; + State::lock().consume.frame_chunk(frame, index, dst) + }) +} + +/// Close a frame and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_consume_frame_close(frame: u32) -> i32 { + ffi::enter(move || { + let frame = ffi::parse_id(frame)?; + State::lock().consume.frame_close(frame) + }) +} + +/// Close a broadcast consumer and clean up its resources. +/// +/// Returns a zero on success, or a negative code on failure. +#[no_mangle] +pub extern "C" fn moq_consume_close(consume: u32) -> i32 { + ffi::enter(move || { + let consume = ffi::parse_id(consume)?; + State::lock().consume.close(consume) + }) +} diff --git a/rs/libmoq/src/consume.rs b/rs/libmoq/src/consume.rs new file mode 100644 index 000000000..c477732e2 --- /dev/null +++ b/rs/libmoq/src/consume.rs @@ -0,0 +1,299 @@ +use std::ffi::c_char; + +use hang::TrackConsumer; +use moq_lite::coding::Buf; +use tokio::sync::oneshot; + +use crate::ffi::OnStatus; +use crate::{moq_audio_config, moq_frame, moq_video_config, Error, Id, NonZeroSlab, State}; + +struct ConsumeCatalog { + broadcast: hang::BroadcastConsumer, + + catalog: hang::catalog::Catalog, + + /// We need to store the codec information on the heap unfortunately. + audio_codec: Vec, + video_codec: Vec, +} + +#[derive(Default)] +pub struct Consume { + /// Active broadcast consumers. + broadcast: NonZeroSlab, + + /// Active catalog consumers and their broadcast references. + catalog: NonZeroSlab, + + /// Catalog consumer task cancellation channels. + catalog_task: NonZeroSlab>, + + /// Audio track consumer task cancellation channels. + audio_task: NonZeroSlab>, + + /// Video track consumer task cancellation channels. + video_task: NonZeroSlab>, + + /// Buffered frames ready for consumption. + frame: NonZeroSlab, +} + +impl Consume { + pub fn start(&mut self, broadcast: hang::BroadcastConsumer) -> Id { + self.broadcast.insert(broadcast) + } + + pub fn catalog(&mut self, broadcast: Id, mut on_catalog: OnStatus) -> Result { + let broadcast = self.broadcast.get(broadcast).ok_or(Error::NotFound)?.clone(); + + let channel = oneshot::channel(); + let id = self.catalog_task.insert(channel.0); + + tokio::spawn(async move { + let res = tokio::select! { + res = Self::run_catalog(broadcast, &mut on_catalog) => res, + _ = channel.1 => Ok(()), + }; + on_catalog.call(res); + + State::lock().consume.catalog_task.remove(id); + }); + + Ok(id) + } + + async fn run_catalog(mut broadcast: hang::BroadcastConsumer, on_catalog: &mut OnStatus) -> Result<(), Error> { + while let Some(catalog) = broadcast.catalog.next().await? { + // Unfortunately we need to store the codec information on the heap. + let audio_codec = catalog + .audio + .as_ref() + .map(|audio| { + audio + .renditions + .values() + .map(|config| config.codec.to_string()) + .collect() + }) + .unwrap_or_default(); + + let video_codec = catalog + .video + .as_ref() + .map(|video| { + video + .renditions + .values() + .map(|config| config.codec.to_string()) + .collect() + }) + .unwrap_or_default(); + + let catalog = ConsumeCatalog { + broadcast: broadcast.clone(), + catalog, + audio_codec, + video_codec, + }; + + let id = State::lock().consume.catalog.insert(catalog); + + // Important: Don't hold the mutex during this callback. + on_catalog.call(Ok(id)); + } + + Ok(()) + } + + pub fn video_config(&mut self, catalog: Id, index: usize, dst: &mut moq_video_config) -> Result<(), Error> { + let consume = self.catalog.get(catalog).ok_or(Error::NotFound)?; + + let video = consume.catalog.video.as_ref().ok_or(Error::NoIndex)?; + let (rendition, config) = video.renditions.iter().nth(index).ok_or(Error::NoIndex)?; + let codec = consume.video_codec.get(index).ok_or(Error::NoIndex)?; + + *dst = moq_video_config { + name: rendition.as_str().as_ptr() as *const c_char, + name_len: rendition.len(), + codec: codec.as_str().as_ptr() as *const c_char, + codec_len: codec.len(), + description: config + .description + .as_ref() + .map(|desc| desc.as_ptr()) + .unwrap_or(std::ptr::null()), + description_len: config.description.as_ref().map(|desc| desc.len()).unwrap_or(0), + coded_width: config + .coded_width + .as_ref() + .map(|width| width as *const u32) + .unwrap_or(std::ptr::null()), + coded_height: config + .coded_height + .as_ref() + .map(|height| height as *const u32) + .unwrap_or(std::ptr::null()), + }; + + Ok(()) + } + + pub fn audio_config(&mut self, catalog: Id, index: usize, dst: &mut moq_audio_config) -> Result<(), Error> { + let consume = self.catalog.get(catalog).ok_or(Error::NotFound)?; + + let audio = consume.catalog.audio.as_ref().ok_or(Error::NoIndex)?; + let (rendition, config) = audio.renditions.iter().nth(index).ok_or(Error::NoIndex)?; + let codec = consume.audio_codec.get(index).ok_or(Error::NoIndex)?; + + *dst = moq_audio_config { + name: rendition.as_str().as_ptr() as *const c_char, + name_len: rendition.len(), + codec: codec.as_str().as_ptr() as *const c_char, + codec_len: codec.len(), + description: config + .description + .as_ref() + .map(|desc| desc.as_ptr()) + .unwrap_or(std::ptr::null()), + description_len: config.description.as_ref().map(|desc| desc.len()).unwrap_or(0), + sample_rate: config.sample_rate, + channel_count: config.channel_count, + }; + + Ok(()) + } + + pub fn catalog_close(&mut self, catalog: Id) -> Result<(), Error> { + self.catalog.remove(catalog).ok_or(Error::NotFound)?; + Ok(()) + } + + pub fn video_ordered( + &mut self, + catalog: Id, + index: usize, + latency: std::time::Duration, + mut on_frame: OnStatus, + ) -> Result { + let consume = self.catalog.get(catalog).ok_or(Error::NotFound)?; + let video = consume.catalog.video.as_ref().ok_or(Error::NotFound)?; + let rendition = video.renditions.keys().nth(index).ok_or(Error::NotFound)?; + + let track = consume.broadcast.subscribe_track(&moq_lite::Track { + name: rendition.clone(), + priority: video.priority, + }); + let track = TrackConsumer::new(track, latency); + + let channel = oneshot::channel(); + let id = self.video_task.insert(channel.0); + + tokio::spawn(async move { + let res = tokio::select! { + res = Self::run_track(track, &mut on_frame) => res, + _ = channel.1 => Ok(()), + }; + on_frame.call(res); + + // Make sure we clean up the task on exit. + State::lock().consume.video_task.remove(id); + }); + + Ok(id) + } + + pub fn audio_ordered( + &mut self, + catalog: Id, + index: usize, + latency: std::time::Duration, + mut on_frame: OnStatus, + ) -> Result { + let consume = self.catalog.get(catalog).ok_or(Error::NotFound)?; + let audio = consume.catalog.audio.as_ref().ok_or(Error::NotFound)?; + let rendition = audio.renditions.keys().nth(index).ok_or(Error::NotFound)?; + + let track = consume.broadcast.subscribe_track(&moq_lite::Track { + name: rendition.clone(), + priority: audio.priority, + }); + let track = TrackConsumer::new(track, latency); + + let channel = oneshot::channel(); + let id = self.audio_task.insert(channel.0); + + tokio::spawn(async move { + let res = tokio::select! { + res = Self::run_track(track, &mut on_frame) => res, + _ = channel.1 => Ok(()), + }; + on_frame.call(res); + + // Make sure we clean up the task on exit. + State::lock().consume.audio_task.remove(id); + }); + + Ok(id) + } + + async fn run_track(mut track: TrackConsumer, on_frame: &mut OnStatus) -> Result<(), Error> { + while let Some(mut frame) = track.read_frame().await? { + // TODO add a chunking API so we don't have to (potentially) allocate a contiguous buffer for the frame. + let mut new_payload = hang::BufList::new(); + new_payload.push_chunk(if frame.payload.num_chunks() == 1 { + // We can avoid allocating + frame.payload.get_chunk(0).expect("frame has zero chunks").clone() + } else { + // We need to allocate + frame.payload.copy_to_bytes(frame.payload.num_bytes()) + }); + + let new_frame = hang::Frame { + payload: new_payload, + timestamp: frame.timestamp, + keyframe: frame.keyframe, + }; + + // Important: Don't hold the mutex during this callback. + let id = State::lock().consume.frame.insert(new_frame); + on_frame.call(Ok(id)); + } + + Ok(()) + } + + pub fn audio_close(&mut self, track: Id) -> Result<(), Error> { + self.audio_task.remove(track).ok_or(Error::NotFound)?; + Ok(()) + } + + pub fn video_close(&mut self, track: Id) -> Result<(), Error> { + self.video_task.remove(track).ok_or(Error::NotFound)?; + Ok(()) + } + + // NOTE: You're supposed to call this multiple times to get all of the chunks. + pub fn frame_chunk(&self, frame: Id, index: usize, dst: &mut moq_frame) -> Result<(), Error> { + let frame = self.frame.get(frame).ok_or(Error::NotFound)?; + let chunk = frame.payload.get_chunk(index).ok_or(Error::NoIndex)?; + + *dst = moq_frame { + payload: chunk.as_ptr(), + payload_size: chunk.len(), + timestamp_us: frame.timestamp.as_micros(), + keyframe: frame.keyframe, + }; + + Ok(()) + } + + pub fn frame_close(&mut self, frame: Id) -> Result<(), Error> { + self.frame.remove(frame).ok_or(Error::NotFound)?; + Ok(()) + } + + pub fn close(&mut self, consume: Id) -> Result<(), Error> { + self.broadcast.remove(consume).ok_or(Error::NotFound)?; + Ok(()) + } +} diff --git a/rs/libmoq/src/error.rs b/rs/libmoq/src/error.rs index 9d45fadf9..051dfebf9 100644 --- a/rs/libmoq/src/error.rs +++ b/rs/libmoq/src/error.rs @@ -2,54 +2,93 @@ use std::sync::Arc; use crate::ffi; +/// Status code returned by FFI functions. +/// +/// Negative values indicate errors, zero indicates success, +/// and positive values are valid resource handles. pub type Status = i32; +/// Error types that can occur in the FFI layer. +/// +/// Each error variant maps to a specific negative error code +/// returned to C callers. #[derive(Debug, thiserror::Error, Clone)] pub enum Error { + /// Resource was closed. #[error("closed")] Closed, + /// Error from the underlying MoQ protocol layer. #[error("moq error: {0}")] Moq(#[from] moq_lite::Error), + /// URL parsing error. #[error("url error: {0}")] Url(#[from] url::ParseError), + /// UTF-8 string validation error. #[error("utf8 error: {0}")] Utf8(#[from] std::str::Utf8Error), + /// Connection establishment error. #[error("connect error: {0}")] Connect(Arc), + /// Null or invalid pointer passed from C. #[error("invalid pointer")] InvalidPointer, + /// Invalid resource ID. #[error("invalid id")] InvalidId, + /// Resource not found. #[error("not found")] NotFound, + /// Unknown media format specified. #[error("unknown format: {0}")] UnknownFormat(String), + /// Media decoder initialization failed. #[error("init failed: {0}")] InitFailed(Arc), + /// Media frame decode failed. #[error("decode failed: {0}")] DecodeFailed(Arc), + /// Timestamp value overflow. #[error("timestamp overflow")] TimestampOverflow(#[from] hang::TimestampOverflow), + /// Log level parsing error. #[error("level error: {0}")] Level(Arc), + /// Invalid error code conversion. #[error("invalid code")] InvalidCode, + /// Panic occurred in Rust code. #[error("panic")] Panic, + + /// Session is offline. + #[error("offline")] + Offline, + + /// Error from the hang media layer. + #[error("hang error: {0}")] + Hang(#[from] hang::Error), + + /// Index out of bounds. + #[error("no index")] + NoIndex, + + /// Null byte found in C string. + #[error("nul error")] + NulError(#[from] std::ffi::NulError), } impl From for Error { @@ -77,6 +116,10 @@ impl ffi::ReturnCode for Error { Error::Level(_) => -14, Error::InvalidCode => -15, Error::Panic => -16, + Error::Offline => -17, + Error::Hang(_) => -18, + Error::NoIndex => -19, + Error::NulError(_) => -20, } } } diff --git a/rs/libmoq/src/ffi.rs b/rs/libmoq/src/ffi.rs index 4f6f6347a..cee6982cd 100644 --- a/rs/libmoq/src/ffi.rs +++ b/rs/libmoq/src/ffi.rs @@ -1,15 +1,61 @@ -use std::ffi::{c_char, c_void, CStr}; +use std::{ + ffi::{c_char, c_void}, + sync::{LazyLock, Mutex}, +}; use url::Url; use crate::{Error, Id}; -pub struct Callback { +pub static RUNTIME: LazyLock> = LazyLock::new(|| { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let handle = runtime.handle().clone(); + + std::thread::Builder::new() + .name("libmoq".into()) + .spawn(move || { + runtime.block_on(std::future::pending::<()>()); + }) + .expect("failed to spawn runtime thread"); + + Mutex::new(handle) +}); + +/// Runs the provided function in the runtime context. +/// Additionally, we convert the return code to a C-compatible return value. +/// +/// Uses a mutex to ensure Handle::enter() guards are dropped in LIFO order, +/// as required by tokio to avoid panics in multi-threaded FFI contexts. +pub fn enter C>(f: F) -> i32 { + // NOTE: I think we need a mutex because Handle::enter() needs to be dropped in LIFO order. + // If this starts to become a bottleneck, we might have to rethink our runtime model. + let handle = RUNTIME.lock().unwrap(); + let _guard = handle.enter(); + + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) { + Ok(ret) => ret.code(), + Err(_) => Error::Panic.code(), + } +} + +/// Wrapper for C callback functions with user data. +/// +/// Stores a function pointer and user data pointer to call C callbacks +/// from async Rust code. +pub struct OnStatus { user_data: *mut c_void, on_status: Option, } -impl Callback { +impl OnStatus { + /// Create a new callback wrapper. + /// + /// # Safety + /// - The caller must ensure user_data remains valid for the callback's lifetime. + /// - The callback function pointer must be valid if provided. pub unsafe fn new( user_data: *mut c_void, on_status: Option, @@ -17,6 +63,9 @@ impl Callback { Self { user_data, on_status } } + /// Invoke the callback with a result code. + /// + /// Using &mut avoids the need for Sync. pub fn call(&mut self, ret: C) { if let Some(on_status) = &self.on_status { on_status(self.user_data, ret.code()); @@ -24,16 +73,11 @@ impl Callback { } } -unsafe impl Send for Callback {} - -pub fn return_code C>(f: F) -> i32 { - match std::panic::catch_unwind(std::panic::AssertUnwindSafe(f)) { - Ok(ret) => ret.code(), - Err(_) => Error::Panic.code(), - } -} +unsafe impl Send for OnStatus {} +/// Types that can be converted to C-compatible return codes. pub trait ReturnCode { + /// Convert to an i32 status code. fn code(&self) -> i32; } @@ -98,34 +142,42 @@ impl ReturnCode for Id { } } -pub fn parse_id(id: i32) -> Result { +/// Parse an i32 handle into an Id. +pub fn parse_id(id: u32) -> Result { Id::try_from(id) } -pub fn parse_url(url: *const c_char) -> Result { - if url.is_null() { - return Err(Error::InvalidPointer); +/// Parse an optional i32 handle (0 = None) into an Option. +pub fn parse_id_optional(id: u32) -> Result, Error> { + match id { + 0 => Ok(None), + id => Ok(Some(parse_id(id)?)), } +} - let url = unsafe { CStr::from_ptr(url) }; - let url = url.to_str()?; +/// Parse a C string pointer into a Url. +pub fn parse_url(url: *const c_char, url_len: usize) -> Result { + let url = unsafe { parse_str(url, url_len)? }; Ok(Url::parse(url)?) } -/// # Safety +/// Parse a C string pointer into a &str. +/// +/// Returns an empty string if the pointer is null. /// +/// # Safety /// The caller must ensure that cstr is valid for 'a. -pub unsafe fn parse_str<'a>(cstr: *const c_char) -> Result<&'a str, Error> { - if cstr.is_null() { - return Ok(""); - } - - let string = unsafe { CStr::from_ptr(cstr) }; - Ok(string.to_str()?) +pub unsafe fn parse_str<'a>(cstr: *const c_char, cstr_len: usize) -> Result<&'a str, Error> { + let slice = parse_slice(cstr as *const u8, cstr_len)?; + let string = std::str::from_utf8(slice)?; + Ok(string) } -/// # Safety +/// Parse a raw pointer and size into a byte slice. +/// +/// Returns an empty slice if both pointer and size are zero. /// +/// # Safety /// The caller must ensure that data is valid for 'a. pub unsafe fn parse_slice<'a>(data: *const u8, size: usize) -> Result<&'a [u8], Error> { if data.is_null() { diff --git a/rs/libmoq/src/id.rs b/rs/libmoq/src/id.rs index 5b8e4b903..315b974e6 100644 --- a/rs/libmoq/src/id.rs +++ b/rs/libmoq/src/id.rs @@ -2,7 +2,11 @@ use std::num::NonZero; use crate::Error; -// Massive overkill, but it's fun. +/// Opaque resource identifier returned to C code. +/// +/// Non-zero u32 value that uniquely identifies resources like sessions, +/// origins, broadcasts, tracks, etc. Zero is reserved to indicate "none" +/// or optional parameters. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Id(NonZero); @@ -12,7 +16,10 @@ impl std::fmt::Display for Id { } } -// We purposely don't return 0 for slab IDs. +/// Slab allocator that only returns non-zero IDs. +/// +/// Wraps [slab::Slab] to ensure all IDs start from 1 instead of 0, +/// allowing 0 to represent null/none in the FFI layer. pub(crate) struct NonZeroSlab(slab::Slab); impl NonZeroSlab { @@ -22,6 +29,11 @@ impl NonZeroSlab { Id(unsafe { NonZero::new_unchecked(id) }) } + pub fn get(&self, id: Id) -> Option<&T> { + let id = (id.0.get() - 1) as usize; + self.0.get(id) + } + pub fn get_mut(&mut self, id: Id) -> Option<&mut T> { let id = (id.0.get() - 1) as usize; self.0.get_mut(id) diff --git a/rs/libmoq/src/lib.rs b/rs/libmoq/src/lib.rs index 03c358ff9..bd3efcbe6 100644 --- a/rs/libmoq/src/lib.rs +++ b/rs/libmoq/src/lib.rs @@ -1,167 +1,34 @@ +//! C FFI bindings for MoQ (Media over QUIC). +//! +//! This library provides a C-compatible API for working with MoQ broadcasts, +//! enabling real-time media delivery with low latency at scale. +//! +//! The API is organized around several key concepts: +//! - **Sessions**: Network connections to MoQ servers +//! - **Origins**: Collections of broadcasts that can be published or consumed +//! - **Broadcasts**: Container for media tracks +//! - **Tracks**: Individual audio or video streams +//! - **Frames**: Individual media samples with timestamps +//! +//! All functions return negative error codes on failure, or non-negative values on success. +//! Most resources are managed through opaque integer handles that must be explicitly closed. + +mod api; +mod consume; mod error; mod ffi; mod id; +mod origin; +mod publish; +mod session; mod state; +pub use api::*; pub use error::*; pub use id::*; -use state::*; -use std::ffi::c_void; -use std::os::raw::c_char; -use std::str::FromStr; - -use tracing::Level; - -/// Initialize the library with a log level. -/// -/// This should be called before any other functions. -/// The log_level is a string: "error", "warn", "info", "debug", "trace" -/// -/// Returns a zero on success, or a negative code on failure. -/// -/// # Safety -/// - The caller must ensure that level is a valid null-terminated C string. -#[no_mangle] -pub unsafe extern "C" fn moq_log_level(level: *const c_char) -> i32 { - ffi::return_code(move || { - match ffi::parse_str(level)? { - "" => moq_native::Log::default(), - level => moq_native::Log { - level: Level::from_str(level)?, - }, - } - .init(); - - Ok(()) - }) -} - -/// Start establishing a connection to a MoQ server. -/// -/// This may be called multiple times to connect to different servers. -/// Broadcast may be published before or after the connection is established. -/// -/// Returns a non-zero handle to the session on success, or a negative code on (immediate) failure. -/// You should call [moq_session_close], even on error, to free up resources. -/// -/// The callback is called on success (status 0) and later when closed (status non-zero). -/// -/// # Safety -/// - The caller must ensure that url is a valid null-terminated C string. -/// - The caller must ensure that callback is a valid function pointer, or null. -/// - The caller must ensure that user_data is a valid pointer. -#[no_mangle] -pub unsafe extern "C" fn moq_session_connect( - url: *const c_char, - callback: Option, - user_data: *mut c_void, -) -> i32 { - ffi::return_code(move || { - let url = ffi::parse_url(url)?; - let callback = ffi::Callback::new(user_data, callback); - State::lock().session_connect(url, callback) - }) -} - -/// Close a connection to a MoQ server. -/// -/// Returns a zero on success, or a negative code on failure. -/// -/// The [moq_session_connect] callback will be called with [Error::Closed]. -#[no_mangle] -pub extern "C" fn moq_session_close(id: i32) -> i32 { - ffi::return_code(move || { - let id = ffi::parse_id(id)?; - State::lock().session_close(id) - }) -} - -/// Create a new broadcast; a collection of tracks. -/// -/// Returns a non-zero handle to the broadcast on success. -#[no_mangle] -pub extern "C" fn moq_broadcast_create() -> i32 { - ffi::return_code(move || State::lock().create_broadcast()) -} - -/// Remove a broadcast and all its tracks. -/// -/// Returns a zero on success, or a negative code on failure. -#[no_mangle] -pub extern "C" fn moq_broadcast_close(id: i32) -> i32 { - ffi::return_code(move || { - let id = ffi::parse_id(id)?; - State::lock().remove_broadcast(id) - }) -} - -/// Publish the broadcast to the indicated session with the given path. -/// -/// Returns a zero on success, or a negative code on failure. -/// The same broadcast may be published to multiple connections. -/// -/// # Safety -/// - The caller must ensure that path is a valid null-terminated C string, or null. -// TODO add an unpublish method. -#[no_mangle] -pub unsafe extern "C" fn moq_broadcast_publish(id: i32, session: i32, path: *const c_char) -> i32 { - ffi::return_code(move || { - let id = ffi::parse_id(id)?; - let session = ffi::parse_id(session)?; - let path = ffi::parse_str(path)?; - State::lock().publish_broadcast(id, session, path) - }) -} - -/// Create a new track for a broadcast. -/// -/// The encoding of `extra` depends on the `format`. -/// See [hang::import::Generic] for the available formats. -/// -/// Returns a non-zero handle to the track on success, or a negative code on failure. -/// -/// # Safety -/// - The caller must ensure that format is a valid null-terminated C string. -#[no_mangle] -pub unsafe extern "C" fn moq_track_create( - broadcast: i32, - format: *const c_char, - init: *const u8, - init_size: usize, -) -> i32 { - ffi::return_code(move || { - let broadcast = ffi::parse_id(broadcast)?; - let format = ffi::parse_str(format)?; - let init = ffi::parse_slice(init, init_size)?; - - State::lock().create_track(broadcast, format, init) - }) -} - -/// Remove a track from a broadcast. -#[no_mangle] -pub extern "C" fn moq_track_close(id: i32) -> i32 { - ffi::return_code(move || { - let id = ffi::parse_id(id)?; - State::lock().remove_track(id) - }) -} - -/// Write data to a track. -/// -/// The encoding of `data` depends on the track `format`. -/// The timestamp is in microseconds. -/// -/// Returns a zero on success, or a negative code on failure. -/// -/// # Safety -/// - The caller must ensure that data is a valid pointer, or null. -#[no_mangle] -pub unsafe extern "C" fn moq_track_write(id: i32, data: *const u8, data_size: usize, pts: u64) -> i32 { - ffi::return_code(move || { - let id = ffi::parse_id(id)?; - let data = ffi::parse_slice(data, data_size)?; - State::lock().write_track(id, data, pts) - }) -} +pub(crate) use consume::*; +pub(crate) use origin::*; +pub(crate) use publish::*; +pub(crate) use session::*; +pub(crate) use state::*; diff --git a/rs/libmoq/src/origin.rs b/rs/libmoq/src/origin.rs new file mode 100644 index 000000000..49227a4e9 --- /dev/null +++ b/rs/libmoq/src/origin.rs @@ -0,0 +1,99 @@ +use std::ffi::c_char; + +use tokio::sync::oneshot; + +use crate::ffi::OnStatus; +use crate::{moq_announced, Error, Id, NonZeroSlab, State}; + +/// Global state managing all active resources. +/// +/// Stores all sessions, origins, broadcasts, tracks, and frames in slab allocators, +/// returning opaque IDs to C callers. Also manages async tasks via oneshot channels +/// for cancellation. +// TODO split this up into separate structs/mutexes +#[derive(Default)] +pub struct Origin { + /// Active origin producers for publishing and consuming broadcasts. + active: NonZeroSlab, + + /// Broadcast announcement information (path, active status). + announced: NonZeroSlab<(String, bool)>, + + /// Announcement listener task cancellation channels. + announced_task: NonZeroSlab>, +} + +impl Origin { + pub fn create(&mut self) -> Id { + self.active.insert(moq_lite::OriginProducer::default()) + } + + pub fn get(&self, id: Id) -> Result<&moq_lite::OriginProducer, Error> { + self.active.get(id).ok_or(Error::NotFound) + } + + pub fn announced(&mut self, origin: Id, mut on_announce: OnStatus) -> Result { + let origin = self.active.get_mut(origin).ok_or(Error::NotFound)?; + let consumer = origin.consume(); + let channel = oneshot::channel(); + + tokio::spawn(async move { + let res = tokio::select! { + res = Self::run_announced(consumer, &mut on_announce) => res, + _ = channel.1 => Ok(()), + }; + on_announce.call(res); + }); + + let id = self.announced_task.insert(channel.0); + Ok(id) + } + + async fn run_announced(mut consumer: moq_lite::OriginConsumer, on_announce: &mut OnStatus) -> Result<(), Error> { + while let Some((path, broadcast)) = consumer.announced().await { + let id = State::lock() + .origin + .announced + .insert((path.to_string(), broadcast.is_some())); + on_announce.call(id); + } + + Ok(()) + } + + pub fn announced_info(&self, announced: Id, dst: &mut moq_announced) -> Result<(), Error> { + let announced = self.announced.get(announced).ok_or(Error::NotFound)?; + *dst = moq_announced { + path: announced.0.as_str().as_ptr() as *const c_char, + path_len: announced.0.len(), + active: announced.1, + }; + Ok(()) + } + + pub fn announced_close(&mut self, announced: Id) -> Result<(), Error> { + self.announced_task.remove(announced).ok_or(Error::NotFound)?; + Ok(()) + } + + pub fn consume(&mut self, origin: Id, path: P) -> Result { + let origin = self.active.get_mut(origin).ok_or(Error::NotFound)?; + origin.consume().consume_broadcast(path).ok_or(Error::NotFound) + } + + pub fn publish( + &mut self, + origin: Id, + path: P, + broadcast: moq_lite::BroadcastConsumer, + ) -> Result<(), Error> { + let origin = self.active.get_mut(origin).ok_or(Error::NotFound)?; + origin.publish_broadcast(path, broadcast); + Ok(()) + } + + pub fn close(&mut self, origin: Id) -> Result<(), Error> { + self.active.remove(origin).ok_or(Error::NotFound)?; + Ok(()) + } +} diff --git a/rs/libmoq/src/publish.rs b/rs/libmoq/src/publish.rs new file mode 100644 index 000000000..6b3dbd840 --- /dev/null +++ b/rs/libmoq/src/publish.rs @@ -0,0 +1,70 @@ +use std::sync::Arc; + +use moq_lite::coding::Buf; + +use crate::{Error, Id, NonZeroSlab}; + +#[derive(Default)] +pub struct Publish { + /// Active broadcast producers for publishing. + broadcasts: NonZeroSlab, + + /// Active media encoders/decoders for publishing. + media: NonZeroSlab, +} + +impl Publish { + pub fn create(&mut self) -> Result { + let broadcast = hang::BroadcastProducer::default(); + let id = self.broadcasts.insert(broadcast); + Ok(id) + } + + pub fn get(&self, id: Id) -> Result<&hang::BroadcastProducer, Error> { + self.broadcasts.get(id).ok_or(Error::NotFound) + } + + pub fn close(&mut self, broadcast: Id) -> Result<(), Error> { + self.broadcasts.remove(broadcast).ok_or(Error::NotFound)?; + Ok(()) + } + + pub fn media_ordered(&mut self, broadcast: Id, format: &str, mut init: &[u8]) -> Result { + let broadcast = self.broadcasts.get(broadcast).ok_or(Error::NotFound)?; + let mut decoder = hang::import::Decoder::new(broadcast.clone(), format) + .ok_or_else(|| Error::UnknownFormat(format.to_string()))?; + + decoder + .initialize(&mut init) + .map_err(|err| Error::InitFailed(Arc::new(err)))?; + if init.has_remaining() { + return Err(Error::InitFailed(Arc::new(anyhow::anyhow!( + "buffer was not fully consumed" + )))); + } + + let id = self.media.insert(decoder); + Ok(id) + } + + pub fn media_frame(&mut self, media: Id, mut data: &[u8], timestamp: hang::Timestamp) -> Result<(), Error> { + let media = self.media.get_mut(media).ok_or(Error::NotFound)?; + + media + .decode_frame(&mut data, Some(timestamp)) + .map_err(|err| Error::DecodeFailed(Arc::new(err)))?; + + if data.has_remaining() { + return Err(Error::DecodeFailed(Arc::new(anyhow::anyhow!( + "buffer was not fully consumed" + )))); + } + + Ok(()) + } + + pub fn media_close(&mut self, media: Id) -> Result<(), Error> { + self.media.remove(media).ok_or(Error::NotFound)?; + Ok(()) + } +} diff --git a/rs/libmoq/src/session.rs b/rs/libmoq/src/session.rs new file mode 100644 index 000000000..5632e69d6 --- /dev/null +++ b/rs/libmoq/src/session.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use tokio::sync::oneshot; +use url::Url; + +use crate::{ffi, Error, Id, NonZeroSlab, State}; + +#[derive(Default)] +pub struct Session { + /// Session task cancellation channels. + task: NonZeroSlab>, +} + +impl Session { + pub fn connect( + &mut self, + url: Url, + publish: Option, + consume: Option, + mut callback: ffi::OnStatus, + ) -> Result { + // Used just to notify when the session is removed from the map. + let closed = oneshot::channel(); + + let id = self.task.insert(closed.0); + tokio::spawn(async move { + let res = tokio::select! { + // No more receiver, which means [session_close] was called. + _ = closed.1 => Err(Error::Closed), + // The connection failed. + res = Self::connect_run(url, publish, consume, &mut callback) => res, + }; + callback.call(res); + + // Make sure we clean up the task on exit. + State::lock().session.task.remove(id); + }); + + Ok(id) + } + + async fn connect_run( + url: Url, + publish: Option, + consume: Option, + callback: &mut ffi::OnStatus, + ) -> Result<(), Error> { + let config = moq_native::ClientConfig::default(); + let client = config.init().map_err(|err| Error::Connect(Arc::new(err)))?; + let connection = client.connect(url).await.map_err(|err| Error::Connect(Arc::new(err)))?; + let session = moq_lite::Session::connect(connection, publish, consume).await?; + callback.call(()); + + session.closed().await?; + Ok(()) + } + + pub fn close(&mut self, id: Id) -> Result<(), Error> { + self.task.remove(id).ok_or(Error::NotFound)?; + Ok(()) + } +} diff --git a/rs/libmoq/src/state.rs b/rs/libmoq/src/state.rs index 55d8bae8e..29516c0bf 100644 --- a/rs/libmoq/src/state.rs +++ b/rs/libmoq/src/state.rs @@ -1,184 +1,28 @@ -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, LazyLock, Mutex, MutexGuard}; +use std::sync::{LazyLock, Mutex, MutexGuard}; -use tokio::sync::oneshot; -use url::Url; - -use crate::{ffi, Error, Id, NonZeroSlab}; - -struct Session { - // The collection of published broadcasts. - origin: moq_lite::OriginProducer, - - // A simple signal to notify the background task when closed. - #[allow(dead_code)] - closed: oneshot::Sender<()>, -} +use crate::{Consume, Origin, Publish, Session}; pub struct State { - // All sessions by ID. - sessions: NonZeroSlab, // TODO clean these up on error. - - // All broadcasts, indexed by an ID. - broadcasts: NonZeroSlab, - - // All tracks, indexed by an ID. - tracks: NonZeroSlab, -} - -pub struct StateGuard { - _runtime: tokio::runtime::EnterGuard<'static>, - state: MutexGuard<'static, State>, -} - -impl Deref for StateGuard { - type Target = State; - fn deref(&self) -> &Self::Target { - &self.state - } -} - -impl DerefMut for StateGuard { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.state - } + pub session: Session, + pub origin: Origin, + pub publish: Publish, + pub consume: Consume, } impl State { - pub fn lock() -> StateGuard { - let runtime = RUNTIME.enter(); - let state = STATE.lock().unwrap(); - StateGuard { - _runtime: runtime, - state, - } - } -} - -static RUNTIME: LazyLock = LazyLock::new(|| { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let handle = runtime.handle().clone(); - - std::thread::Builder::new() - .name("libmoq".into()) - .spawn(move || { - runtime.block_on(std::future::pending::<()>()); - }) - .expect("failed to spawn runtime thread"); - - handle -}); - -static STATE: LazyLock> = LazyLock::new(|| Mutex::new(State::new())); - -impl State { - fn new() -> Self { + pub fn new() -> Self { Self { - sessions: Default::default(), - broadcasts: Default::default(), - tracks: Default::default(), + session: Session::default(), + origin: Origin::default(), + publish: Publish::default(), + consume: Consume::default(), } } - pub fn session_connect(&mut self, url: Url, mut callback: ffi::Callback) -> Result { - let origin = moq_lite::Origin::produce(); - - // Used just to notify when the session is removed from the map. - let closed = oneshot::channel(); - - let id = self.sessions.insert(Session { - closed: closed.0, - origin: origin.producer, - }); - - tokio::spawn(async move { - let err = tokio::select! { - // No more receiver, which means [session_close] was called. - _ = closed.1 => Ok(()), - // The connection failed. - res = Self::session_connect_run(url, origin.consumer, &mut callback) => res, - } - .err() - .unwrap_or(Error::Closed); - - callback.call(err); - }); - - Ok(id) - } - - async fn session_connect_run( - url: Url, - origin: moq_lite::OriginConsumer, - callback: &mut ffi::Callback, - ) -> Result<(), Error> { - let config = moq_native::ClientConfig::default(); - let client = config.init().map_err(|err| Error::Connect(Arc::new(err)))?; - let connection = client.connect(url).await.map_err(|err| Error::Connect(Arc::new(err)))?; - let session = moq_lite::Session::connect(connection, origin, None).await?; - callback.call(()); - - session.closed().await?; - Ok(()) - } - - pub fn session_close(&mut self, id: Id) -> Result<(), Error> { - self.sessions.remove(id).ok_or(Error::NotFound)?; - Ok(()) - } - - pub fn publish_broadcast(&mut self, broadcast: Id, session: Id, path: P) -> Result<(), Error> { - let path = path.as_path(); - let broadcast = self.broadcasts.get_mut(broadcast).ok_or(Error::NotFound)?; - let session = self.sessions.get_mut(session).ok_or(Error::NotFound)?; - - session.origin.publish_broadcast(path, broadcast.consume()); - - Ok(()) - } - - pub fn create_broadcast(&mut self) -> Id { - let broadcast = moq_lite::Broadcast::produce(); - self.broadcasts.insert(broadcast.producer.into()) - } - - pub fn remove_broadcast(&mut self, broadcast: Id) -> Result<(), Error> { - self.broadcasts.remove(broadcast).ok_or(Error::NotFound)?; - Ok(()) - } - - pub fn create_track(&mut self, broadcast: Id, format: &str, init: &[u8]) -> Result { - let broadcast = self.broadcasts.get_mut(broadcast).ok_or(Error::NotFound)?; - let mut decoder = hang::import::Decoder::new(broadcast.clone(), format) - .ok_or_else(|| Error::UnknownFormat(format.to_string()))?; - - let mut temp = init; - decoder - .initialize(&mut temp) - .map_err(|err| Error::InitFailed(Arc::new(err)))?; - assert!(init.is_empty(), "buffer was not fully consumed"); - - let id = self.tracks.insert(decoder); - Ok(id) - } - - pub fn write_track(&mut self, track: Id, mut data: &[u8], pts: u64) -> Result<(), Error> { - let track = self.tracks.get_mut(track).ok_or(Error::NotFound)?; - - let pts = hang::Timestamp::from_micros(pts)?; - track - .decode_frame(&mut data, Some(pts)) - .map_err(|err| Error::DecodeFailed(Arc::new(err)))?; - assert!(data.is_empty(), "buffer was not fully consumed"); - - Ok(()) - } - - pub fn remove_track(&mut self, track: Id) -> Result<(), Error> { - self.tracks.remove(track).ok_or(Error::NotFound)?; - Ok(()) + pub fn lock<'a>() -> MutexGuard<'a, Self> { + STATE.lock().unwrap() } } + +/// Global shared state instance. +static STATE: LazyLock> = LazyLock::new(|| Mutex::new(State::new())); diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index d518981ab..311ff4717 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -7,12 +7,14 @@ use std::{ }, }; -use crate::{Error, Produce, TrackConsumer, TrackProducer}; +use crate::{Error, Pair, TrackConsumer, TrackProducer}; use tokio::sync::watch; use web_async::Lock; use super::Track; +pub type BroadcastPair = Pair; + struct State { // When explicitly publishing, we hold a reference to the consumer. // This prevents the track from being marked as "unused". @@ -29,10 +31,10 @@ pub struct Broadcast { } impl Broadcast { - pub fn produce() -> Produce { + pub fn produce() -> BroadcastPair { let producer = BroadcastProducer::new(); let consumer = producer.consume(); - Produce { producer, consumer } + BroadcastPair { producer, consumer } } } diff --git a/rs/moq-lite/src/model/frame.rs b/rs/moq-lite/src/model/frame.rs index 7d6547a6e..81645d0df 100644 --- a/rs/moq-lite/src/model/frame.rs +++ b/rs/moq-lite/src/model/frame.rs @@ -3,7 +3,9 @@ use std::future::Future; use bytes::{Bytes, BytesMut}; use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Pair, Result}; + +pub type FramePair = Pair; #[derive(Clone, Debug)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -12,10 +14,10 @@ pub struct Frame { } impl Frame { - pub fn produce(self) -> Produce { + pub fn produce(self) -> FramePair { let producer = FrameProducer::new(self); let consumer = producer.consume(); - Produce { producer, consumer } + FramePair { producer, consumer } } } diff --git a/rs/moq-lite/src/model/group.rs b/rs/moq-lite/src/model/group.rs index a76e8ef56..72b37cef4 100644 --- a/rs/moq-lite/src/model/group.rs +++ b/rs/moq-lite/src/model/group.rs @@ -12,10 +12,12 @@ use std::future::Future; use bytes::Bytes; use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Pair, Result}; use super::{Frame, FrameConsumer, FrameProducer}; +pub type GroupPair = Pair; + #[derive(Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Group { @@ -23,10 +25,10 @@ pub struct Group { } impl Group { - pub fn produce(self) -> Produce { + pub fn produce(self) -> GroupPair { let producer = GroupProducer::new(self); let consumer = producer.consume(); - Produce { producer, consumer } + GroupPair { producer, consumer } } } diff --git a/rs/moq-lite/src/model/mod.rs b/rs/moq-lite/src/model/mod.rs index f867a6055..0f2f97aaa 100644 --- a/rs/moq-lite/src/model/mod.rs +++ b/rs/moq-lite/src/model/mod.rs @@ -2,12 +2,12 @@ mod broadcast; mod frame; mod group; mod origin; -mod produce; +mod pair; mod track; pub use broadcast::*; pub use frame::*; pub use group::*; pub use origin::*; -pub use produce::*; +pub use pair::*; pub use track::*; diff --git a/rs/moq-lite/src/model/origin.rs b/rs/moq-lite/src/model/origin.rs index b823eb474..05419c8f7 100644 --- a/rs/moq-lite/src/model/origin.rs +++ b/rs/moq-lite/src/model/origin.rs @@ -6,10 +6,12 @@ use tokio::sync::mpsc; use web_async::Lock; use super::BroadcastConsumer; -use crate::{AsPath, Path, PathOwned, Produce}; +use crate::{AsPath, Pair, Path, PathOwned}; static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0); +pub type OriginPair = Pair; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] struct ConsumerId(u64); @@ -337,10 +339,10 @@ pub type OriginAnnounce = (PathOwned, Option); pub struct Origin {} impl Origin { - pub fn produce() -> Produce { + pub fn produce() -> OriginPair { let producer = OriginProducer::default(); let consumer = producer.consume(); - Produce { producer, consumer } + OriginPair { producer, consumer } } } diff --git a/rs/moq-lite/src/model/produce.rs b/rs/moq-lite/src/model/pair.rs similarity index 93% rename from rs/moq-lite/src/model/produce.rs rename to rs/moq-lite/src/model/pair.rs index c99ce2bf3..04db09bde 100644 --- a/rs/moq-lite/src/model/produce.rs +++ b/rs/moq-lite/src/model/pair.rs @@ -4,7 +4,7 @@ /// However when the number of references reaches zero, the other will receive a signal to close. /// A new consumer may be created at any time by calling [T::consume]. #[derive(Clone)] -pub struct Produce { +pub struct Pair { pub producer: P, pub consumer: C, } diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index cc639194f..97cf3ad77 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -14,12 +14,14 @@ use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Pair, Result}; use super::{Group, GroupConsumer, GroupProducer}; use std::{cmp::Ordering, future::Future}; +pub type TrackPair = Pair; + #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct Track { @@ -35,10 +37,10 @@ impl Track { } } - pub fn produce(self) -> Produce { + pub fn produce(self) -> TrackPair { let producer = TrackProducer::new(self); let consumer = producer.consume(); - Produce { producer, consumer } + TrackPair { producer, consumer } } } diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index bd91fceb7..82362a608 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -57,16 +57,16 @@ pub struct Cluster { client: moq_native::Client, // Advertises ourselves as an origin to other nodes. - noop: moq_lite::Produce, + noop: moq_lite::Pair, // Broadcasts announced by local clients (users). - pub primary: Arc>, + pub primary: Arc>, // Broadcasts announced by remote servers (cluster). - pub secondary: Arc>, + pub secondary: Arc>, // Broadcasts announced by local clients and remote servers. - pub combined: Arc>, + pub combined: Arc>, } impl Cluster {