Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu

// Create a map of video renditions
// Multiple renditions allow the viewer to choose based on their capabilities
let mut renditions = std::collections::HashMap::new();
let mut renditions = std::collections::BTreeMap::new();
renditions.insert(video_track.name.clone(), video_config);

// Create the video catalog entry with the renditions
Expand Down
5 changes: 3 additions & 2 deletions rs/hang/src/catalog/audio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod codec;
pub use aac::*;
pub use codec::*;

use std::collections::HashMap;
use std::collections::BTreeMap;

use bytes::Bytes;

Expand All @@ -21,7 +21,8 @@ use serde_with::{hex::Hex, DisplayFromStr};
pub struct Audio {
/// A map of track name to rendition configuration.
/// This is not an array so it will work with JSON Merge Patch.
pub renditions: HashMap<String, AudioConfig>,
/// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior.
pub renditions: BTreeMap<String, AudioConfig>,

/// The priority of the audio track, relative to other tracks in the broadcast.
pub priority: u8,
Expand Down
14 changes: 7 additions & 7 deletions rs/hang/src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};

use crate::catalog::{Audio, AudioConfig, Chat, Track, User, Video, VideoConfig};
use crate::Result;
use moq_lite::Produce;
use moq_lite::Pair;

/// A catalog track, created by a broadcaster to describe the tracks available in a broadcast.
#[serde_with::serde_as]
Expand Down Expand Up @@ -83,10 +83,10 @@ impl Catalog {
}

/// Produce a catalog track that describes the available media tracks.
pub fn produce(self) -> Produce<CatalogProducer, CatalogConsumer> {
pub fn produce(self) -> Pair<CatalogProducer, CatalogConsumer> {
let track = Catalog::default_track().produce();

Produce {
Pair {
producer: CatalogProducer::new(track.producer, self),
consumer: track.consumer.into(),
}
Expand Down Expand Up @@ -267,14 +267,14 @@ impl From<moq_lite::TrackConsumer> for CatalogConsumer {

#[cfg(test)]
mod test {
use std::collections::BTreeMap;

use crate::catalog::{AudioCodec::Opus, AudioConfig, VideoConfig, H264};

use super::*;

#[test]
fn simple() {
use std::collections::HashMap;

let mut encoded = r#"{
"video": {
"renditions": {
Expand Down Expand Up @@ -304,7 +304,7 @@ mod test {

encoded.retain(|c| !c.is_whitespace());

let mut video_renditions = HashMap::new();
let mut video_renditions = BTreeMap::new();
video_renditions.insert(
"video".to_string(),
VideoConfig {
Expand All @@ -326,7 +326,7 @@ mod test {
},
);

let mut audio_renditions = HashMap::new();
let mut audio_renditions = BTreeMap::new();
audio_renditions.insert(
"audio".to_string(),
AudioConfig {
Expand Down
5 changes: 3 additions & 2 deletions rs/hang/src/catalog/video/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use h264::*;
pub use h265::*;
pub use vp9::*;

use std::collections::HashMap;
use std::collections::BTreeMap;

use bytes::Bytes;
use serde::{Deserialize, Serialize};
Expand All @@ -27,7 +27,8 @@ use serde_with::{hex::Hex, DisplayFromStr};
pub struct Video {
/// A map of track name to rendition configuration.
/// This is not an array in order for it to work with JSON Merge Patch.
pub renditions: HashMap<String, VideoConfig>,
/// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior.
pub renditions: BTreeMap<String, VideoConfig>,

/// The priority of the video track, relative to other tracks in the broadcast.
pub priority: u8,
Expand Down
42 changes: 40 additions & 2 deletions rs/hang/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{
sync::{atomic, Arc},
};

use crate::catalog::{Catalog, CatalogProducer};
use crate::{
catalog::{Catalog, CatalogConsumer, CatalogProducer},
TrackConsumer,
};

#[derive(Clone)]
pub struct BroadcastProducer {
Expand Down Expand Up @@ -32,6 +35,12 @@ impl BroadcastProducer {
}
}

impl Default for BroadcastProducer {
fn default() -> Self {
Self::new(moq_lite::BroadcastProducer::default())
}
}

impl Deref for BroadcastProducer {
type Target = moq_lite::BroadcastProducer;

Expand All @@ -58,4 +67,33 @@ impl From<BroadcastProducer> for moq_lite::BroadcastProducer {
}
}

// TODO BroadcastConsumer
#[derive(Clone)]
pub struct BroadcastConsumer {
pub inner: moq_lite::BroadcastConsumer,
pub catalog: CatalogConsumer,
}

impl BroadcastConsumer {
pub fn new(inner: moq_lite::BroadcastConsumer) -> Self {
let catalog = inner.subscribe_track(&Catalog::default_track()).into();
Self { inner, catalog }
}

pub fn subscribe(&self, track: &moq_lite::Track, latency: std::time::Duration) -> TrackConsumer {
TrackConsumer::new(self.inner.subscribe_track(track), latency)
}
}

impl Deref for BroadcastConsumer {
type Target = moq_lite::BroadcastConsumer;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl From<moq_lite::BroadcastConsumer> for BroadcastConsumer {
fn from(inner: moq_lite::BroadcastConsumer) -> Self {
Self::new(inner)
}
}
3 changes: 2 additions & 1 deletion rs/hang/src/model/frame.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use buf_list::BufList;
use derive_more::Debug;

use crate::Timestamp;

pub use buf_list::BufList;

/// A media frame with a timestamp and codec-specific payload.
///
/// Frames are the fundamental unit of media data in hang. Each frame contains:
Expand Down
26 changes: 10 additions & 16 deletions rs/hang/src/model/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl TrackProducer {
///
/// Multiple consumers can be created from the same producer, each receiving
/// a copy of all data written to the track.
pub fn consume(&self) -> TrackConsumer {
TrackConsumer::new(self.inner.consume())
pub fn consume(&self, max_buffer: std::time::Duration) -> TrackConsumer {
TrackConsumer::new(self.inner.consume(), max_buffer)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

rg -nP --type=rust -C3 '\.consume\s*\(' .

Repository: moq-dev/moq

Length of output: 14894


Update call site in rs/hang/src/catalog/root.rs with the required max_buffer parameter.

The consume() method signature was changed to require a max_buffer: std::time::Duration parameter, but the call at line 168 in rs/hang/src/catalog/root.rs does not pass this argument:

pub fn consume(&self) -> CatalogConsumer {
    CatalogConsumer::new(self.track.consume())
}

This will fail to compile. The call must be updated to pass the required parameter.

🤖 Prompt for AI Agents
In rs/hang/src/catalog/root.rs around line 168, the call to self.track.consume()
must pass the new required max_buffer: std::time::Duration; update the public
API to accept and forward that Duration: change pub fn consume(&self) ->
CatalogConsumer to pub fn consume(&self, max_buffer: std::time::Duration) ->
CatalogConsumer and call CatalogConsumer::new(self.track.consume(max_buffer));
ensure any callers of Catalog::consume are updated accordingly to provide a
Duration.

}

Expand Down Expand Up @@ -132,18 +132,18 @@ pub struct TrackConsumer {
max_timestamp: Timestamp,

// The maximum buffer size before skipping a group.
latency: std::time::Duration,
max_buffer: std::time::Duration,
}

impl TrackConsumer {
/// Create a new TrackConsumer wrapping the given moq-lite consumer.
pub fn new(inner: moq_lite::TrackConsumer) -> Self {
pub fn new(inner: moq_lite::TrackConsumer, max_buffer: std::time::Duration) -> Self {
Self {
inner,
current: None,
pending: VecDeque::new(),
max_timestamp: Timestamp::default(),
latency: std::time::Duration::ZERO,
max_buffer,
}
}

Expand All @@ -155,7 +155,7 @@ impl TrackConsumer {
///
/// Returns `None` when the track has ended.
pub async fn read_frame(&mut self) -> Result<Option<Frame>, Error> {
let latency = self.latency.try_into()?;
let latency = self.max_buffer.try_into()?;
loop {
let cutoff = self
.max_timestamp
Expand Down Expand Up @@ -209,7 +209,7 @@ impl TrackConsumer {
},
Some((index, timestamp)) = buffering.next() => {
if self.current.is_some() {
tracing::debug!(old = ?self.max_timestamp, new = ?timestamp, buffer = ?self.latency, "skipping slow group");
tracing::debug!(old = ?self.max_timestamp, new = ?timestamp, buffer = ?self.max_buffer, "skipping slow group");
}

drop(buffering);
Expand All @@ -228,9 +228,9 @@ impl TrackConsumer {

/// Set the maximum latency tolerance for this consumer.
///
/// Groups with timestamps older than `max_timestamp - latency` will be skipped.
pub fn set_latency(&mut self, max: std::time::Duration) {
self.latency = max;
/// Groups with timestamps older than `max_timestamp - max_buffer` will be skipped.
pub fn set_max_buffer(&mut self, max: std::time::Duration) {
self.max_buffer = max;
}
Comment on lines 229 to 234
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Update the documentation for consistent terminology.

The method has been renamed to set_max_buffer, but the first line of the documentation still says "maximum latency tolerance". For consistency with the method name and the rest of the codebase, this should be updated to use "buffer" terminology.

Apply this diff to improve consistency:

-	/// Set the maximum latency tolerance for this consumer.
+	/// Set the maximum buffer duration for this consumer.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Set the maximum latency tolerance for this consumer.
///
/// Groups with timestamps older than `max_timestamp - latency` will be skipped.
pub fn set_latency(&mut self, max: std::time::Duration) {
self.latency = max;
/// Groups with timestamps older than `max_timestamp - max_buffer` will be skipped.
pub fn set_max_buffer(&mut self, max: std::time::Duration) {
self.max_buffer = max;
}
/// Set the maximum buffer duration for this consumer.
///
/// Groups with timestamps older than `max_timestamp - max_buffer` will be skipped.
pub fn set_max_buffer(&mut self, max: std::time::Duration) {
self.max_buffer = max;
}
🤖 Prompt for AI Agents
In rs/hang/src/model/track.rs around lines 229 to 234, update the doc comment to
use "buffer" terminology instead of "maximum latency tolerance": change the
first line to something like "Set the maximum buffer for this consumer." and
adjust the second line to reference `max_timestamp - max_buffer` consistently
(e.g., "Groups with timestamps older than `max_timestamp - max_buffer` will be
skipped.") so the docs match the method name `set_max_buffer` and the rest of
the codebase.


/// Wait until the track is closed.
Expand All @@ -239,12 +239,6 @@ impl TrackConsumer {
}
}

impl From<moq_lite::TrackConsumer> for TrackConsumer {
fn from(inner: moq_lite::TrackConsumer) -> Self {
Self::new(inner)
}
}

impl From<TrackConsumer> for moq_lite::TrackConsumer {
fn from(inner: TrackConsumer) -> Self {
inner.inner
Expand Down
23 changes: 10 additions & 13 deletions rs/libmoq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,21 @@ set_target_properties(moq PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES ${RUST_TARGET_DIR}/include
)

# Link required system frameworks on macOS
if(APPLE)
set_property(TARGET moq APPEND PROPERTY
INTERFACE_LINK_LIBRARIES "-framework CoreFoundation" "-framework Security"
)
endif()

if(BUILD_RUST_LIB)
add_dependencies(moq rust_build)
endif()

if(PROJECT_IS_TOP_LEVEL)
include(GNUInstallDirs)

# Install header and library files
install(FILES ${RUST_HEADER}
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)
Expand All @@ -66,25 +74,14 @@ if(PROJECT_IS_TOP_LEVEL)
DESTINATION ${CMAKE_INSTALL_LIBDIR}
)

install(TARGETS moq
EXPORT moq-targets
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)

install(EXPORT moq-targets
FILE moq-targets.cmake
NAMESPACE moq::
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/moq
)

# Generate and install CMake package config files
include(CMakePackageConfigHelpers)

configure_package_config_file(
${CMAKE_CURRENT_SOURCE_DIR}/cmake/moq-config.cmake.in
${CMAKE_CURRENT_BINARY_DIR}/moq-config.cmake
INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/moq
PATH_VARS CMAKE_INSTALL_LIBDIR CMAKE_INSTALL_INCLUDEDIR
)

write_basic_package_version_file(
Expand Down
13 changes: 11 additions & 2 deletions rs/libmoq/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ fn main() {
let pc_in = PathBuf::from(&crate_dir).join(format!("{}.pc.in", LIB_NAME));
let pc_out = target_dir.join(format!("{}.pc", LIB_NAME));
if let Ok(template) = fs::read_to_string(&pc_in) {
let target = env::var("TARGET").unwrap();
let libs_private = if target.contains("apple") {
"-framework CoreFoundation -framework Security"
} else if target.contains("windows") {
"-lws2_32 -lbcrypt -luserenv -lntdll"
} else {
"-ldl -lm -lpthread"
};

let content = template
.replace("@PREFIX@", "/usr/local")
.replace("@VERSION@", &version);
.replace("@VERSION@", &version)
.replace("@LIBS_PRIVATE@", libs_private);
fs::write(&pc_out, content).expect("Failed to write pkg-config file");
}
}
Expand Down
3 changes: 2 additions & 1 deletion rs/libmoq/moq.pc.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
prefix=@PREFIX@
prefix=${pcfiledir}/../..
exec_prefix=${prefix}
libdir=${exec_prefix}/lib
includedir=${prefix}/include
Expand All @@ -7,4 +7,5 @@ Name: moq
Description: Media over QUIC C Interface
Version: @VERSION@
Libs: -L${libdir} -lmoq
Libs.private: -framework CoreFoundation -framework Security
Cflags: -I${includedir}
Loading
Loading