Skip to content

Commit 73e9986

Browse files
kixelateddavegullo
andauthored
libmoq consume API (#777)
Co-authored-by: Dave Gullo <[email protected]>
1 parent 30c28b8 commit 73e9986

File tree

31 files changed

+1386
-437
lines changed

31 files changed

+1386
-437
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deno.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rs/hang/examples/video.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu
7676

7777
// Create a map of video renditions
7878
// Multiple renditions allow the viewer to choose based on their capabilities
79-
let mut renditions = std::collections::HashMap::new();
79+
let mut renditions = std::collections::BTreeMap::new();
8080
renditions.insert(video_track.name.clone(), video_config);
8181

8282
// Create the video catalog entry with the renditions

rs/hang/src/catalog/audio/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ mod codec;
44
pub use aac::*;
55
pub use codec::*;
66

7-
use std::collections::HashMap;
7+
use std::collections::BTreeMap;
88

99
use bytes::Bytes;
1010

@@ -21,7 +21,8 @@ use serde_with::{hex::Hex, DisplayFromStr};
2121
pub struct Audio {
2222
/// A map of track name to rendition configuration.
2323
/// This is not an array so it will work with JSON Merge Patch.
24-
pub renditions: HashMap<String, AudioConfig>,
24+
/// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior.
25+
pub renditions: BTreeMap<String, AudioConfig>,
2526

2627
/// The priority of the audio track, relative to other tracks in the broadcast.
2728
pub priority: u8,

rs/hang/src/catalog/root.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
77

88
use crate::catalog::{Audio, AudioConfig, Chat, Track, User, Video, VideoConfig};
99
use crate::Result;
10-
use moq_lite::Produce;
10+
use moq_lite::Pair;
1111

1212
/// A catalog track, created by a broadcaster to describe the tracks available in a broadcast.
1313
#[serde_with::serde_as]
@@ -83,10 +83,10 @@ impl Catalog {
8383
}
8484

8585
/// Produce a catalog track that describes the available media tracks.
86-
pub fn produce(self) -> Produce<CatalogProducer, CatalogConsumer> {
86+
pub fn produce(self) -> Pair<CatalogProducer, CatalogConsumer> {
8787
let track = Catalog::default_track().produce();
8888

89-
Produce {
89+
Pair {
9090
producer: CatalogProducer::new(track.producer, self),
9191
consumer: track.consumer.into(),
9292
}
@@ -267,14 +267,14 @@ impl From<moq_lite::TrackConsumer> for CatalogConsumer {
267267

268268
#[cfg(test)]
269269
mod test {
270+
use std::collections::BTreeMap;
271+
270272
use crate::catalog::{AudioCodec::Opus, AudioConfig, VideoConfig, H264};
271273

272274
use super::*;
273275

274276
#[test]
275277
fn simple() {
276-
use std::collections::HashMap;
277-
278278
let mut encoded = r#"{
279279
"video": {
280280
"renditions": {
@@ -304,7 +304,7 @@ mod test {
304304

305305
encoded.retain(|c| !c.is_whitespace());
306306

307-
let mut video_renditions = HashMap::new();
307+
let mut video_renditions = BTreeMap::new();
308308
video_renditions.insert(
309309
"video".to_string(),
310310
VideoConfig {
@@ -326,7 +326,7 @@ mod test {
326326
},
327327
);
328328

329-
let mut audio_renditions = HashMap::new();
329+
let mut audio_renditions = BTreeMap::new();
330330
audio_renditions.insert(
331331
"audio".to_string(),
332332
AudioConfig {

rs/hang/src/catalog/video/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub use h264::*;
1010
pub use h265::*;
1111
pub use vp9::*;
1212

13-
use std::collections::HashMap;
13+
use std::collections::BTreeMap;
1414

1515
use bytes::Bytes;
1616
use serde::{Deserialize, Serialize};
@@ -27,7 +27,8 @@ use serde_with::{hex::Hex, DisplayFromStr};
2727
pub struct Video {
2828
/// A map of track name to rendition configuration.
2929
/// This is not an array in order for it to work with JSON Merge Patch.
30-
pub renditions: HashMap<String, VideoConfig>,
30+
/// We use a BTreeMap so keys are sorted alphabetically for *some* deterministic behavior.
31+
pub renditions: BTreeMap<String, VideoConfig>,
3132

3233
/// The priority of the video track, relative to other tracks in the broadcast.
3334
pub priority: u8,

rs/hang/src/model/broadcast.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::{
33
sync::{atomic, Arc},
44
};
55

6-
use crate::catalog::{Catalog, CatalogProducer};
6+
use crate::{
7+
catalog::{Catalog, CatalogConsumer, CatalogProducer},
8+
TrackConsumer,
9+
};
710

811
#[derive(Clone)]
912
pub struct BroadcastProducer {
@@ -32,6 +35,12 @@ impl BroadcastProducer {
3235
}
3336
}
3437

38+
impl Default for BroadcastProducer {
39+
fn default() -> Self {
40+
Self::new(moq_lite::BroadcastProducer::default())
41+
}
42+
}
43+
3544
impl Deref for BroadcastProducer {
3645
type Target = moq_lite::BroadcastProducer;
3746

@@ -58,4 +67,33 @@ impl From<BroadcastProducer> for moq_lite::BroadcastProducer {
5867
}
5968
}
6069

61-
// TODO BroadcastConsumer
70+
#[derive(Clone)]
71+
pub struct BroadcastConsumer {
72+
pub inner: moq_lite::BroadcastConsumer,
73+
pub catalog: CatalogConsumer,
74+
}
75+
76+
impl BroadcastConsumer {
77+
pub fn new(inner: moq_lite::BroadcastConsumer) -> Self {
78+
let catalog = inner.subscribe_track(&Catalog::default_track()).into();
79+
Self { inner, catalog }
80+
}
81+
82+
pub fn subscribe(&self, track: &moq_lite::Track, latency: std::time::Duration) -> TrackConsumer {
83+
TrackConsumer::new(self.inner.subscribe_track(track), latency)
84+
}
85+
}
86+
87+
impl Deref for BroadcastConsumer {
88+
type Target = moq_lite::BroadcastConsumer;
89+
90+
fn deref(&self) -> &Self::Target {
91+
&self.inner
92+
}
93+
}
94+
95+
impl From<moq_lite::BroadcastConsumer> for BroadcastConsumer {
96+
fn from(inner: moq_lite::BroadcastConsumer) -> Self {
97+
Self::new(inner)
98+
}
99+
}

rs/hang/src/model/frame.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
use buf_list::BufList;
21
use derive_more::Debug;
32

43
use crate::Timestamp;
54

5+
pub use buf_list::BufList;
6+
67
/// A media frame with a timestamp and codec-specific payload.
78
///
89
/// Frames are the fundamental unit of media data in hang. Each frame contains:

rs/hang/src/model/track.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ impl TrackProducer {
9191
///
9292
/// Multiple consumers can be created from the same producer, each receiving
9393
/// a copy of all data written to the track.
94-
pub fn consume(&self) -> TrackConsumer {
95-
TrackConsumer::new(self.inner.consume())
94+
pub fn consume(&self, max_latency: std::time::Duration) -> TrackConsumer {
95+
TrackConsumer::new(self.inner.consume(), max_latency)
9696
}
9797
}
9898

@@ -132,18 +132,18 @@ pub struct TrackConsumer {
132132
max_timestamp: Timestamp,
133133

134134
// The maximum buffer size before skipping a group.
135-
latency: std::time::Duration,
135+
max_latency: std::time::Duration,
136136
}
137137

138138
impl TrackConsumer {
139139
/// Create a new TrackConsumer wrapping the given moq-lite consumer.
140-
pub fn new(inner: moq_lite::TrackConsumer) -> Self {
140+
pub fn new(inner: moq_lite::TrackConsumer, max_latency: std::time::Duration) -> Self {
141141
Self {
142142
inner,
143143
current: None,
144144
pending: VecDeque::new(),
145145
max_timestamp: Timestamp::default(),
146-
latency: std::time::Duration::ZERO,
146+
max_latency,
147147
}
148148
}
149149

@@ -155,7 +155,7 @@ impl TrackConsumer {
155155
///
156156
/// Returns `None` when the track has ended.
157157
pub async fn read_frame(&mut self) -> Result<Option<Frame>, Error> {
158-
let latency = self.latency.try_into()?;
158+
let latency = self.max_latency.try_into()?;
159159
loop {
160160
let cutoff = self
161161
.max_timestamp
@@ -209,7 +209,7 @@ impl TrackConsumer {
209209
},
210210
Some((index, timestamp)) = buffering.next() => {
211211
if self.current.is_some() {
212-
tracing::debug!(old = ?self.max_timestamp, new = ?timestamp, buffer = ?self.latency, "skipping slow group");
212+
tracing::debug!(old = ?self.max_timestamp, new = ?timestamp, buffer = ?self.max_latency, "skipping slow group");
213213
}
214214

215215
drop(buffering);
@@ -228,9 +228,9 @@ impl TrackConsumer {
228228

229229
/// Set the maximum latency tolerance for this consumer.
230230
///
231-
/// Groups with timestamps older than `max_timestamp - latency` will be skipped.
232-
pub fn set_latency(&mut self, max: std::time::Duration) {
233-
self.latency = max;
231+
/// Groups with timestamps older than `max_timestamp - max_latency` will be skipped.
232+
pub fn set_max_latency(&mut self, max: std::time::Duration) {
233+
self.max_latency = max;
234234
}
235235

236236
/// Wait until the track is closed.
@@ -239,12 +239,6 @@ impl TrackConsumer {
239239
}
240240
}
241241

242-
impl From<moq_lite::TrackConsumer> for TrackConsumer {
243-
fn from(inner: moq_lite::TrackConsumer) -> Self {
244-
Self::new(inner)
245-
}
246-
}
247-
248242
impl From<TrackConsumer> for moq_lite::TrackConsumer {
249243
fn from(inner: TrackConsumer) -> Self {
250244
inner.inner

rs/libmoq/CMakeLists.txt

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,17 @@ endif()
4444
file(MAKE_DIRECTORY ${RUST_DIR})
4545
file(MAKE_DIRECTORY ${RUST_TARGET_DIR}/include)
4646

47-
# Create imported library target
48-
add_library(moq STATIC IMPORTED GLOBAL)
49-
set_target_properties(moq PROPERTIES
50-
IMPORTED_LOCATION ${RUST_LIB}
51-
INTERFACE_INCLUDE_DIRECTORIES ${RUST_TARGET_DIR}/include
52-
)
47+
# Create interface library target that wraps the Rust static library
48+
add_library(moq INTERFACE)
49+
target_include_directories(moq INTERFACE ${RUST_TARGET_DIR}/include)
50+
51+
# Force absolute path to be used in linker command
52+
target_link_options(moq INTERFACE "$<LINK_ONLY:${RUST_LIB}>")
53+
54+
# Link required system frameworks on macOS
55+
if(APPLE)
56+
target_link_options(moq INTERFACE "LINKER:-framework,CoreFoundation" "LINKER:-framework,Security")
57+
endif()
5358

5459
if(BUILD_RUST_LIB)
5560
add_dependencies(moq rust_build)
@@ -58,6 +63,7 @@ endif()
5863
if(PROJECT_IS_TOP_LEVEL)
5964
include(GNUInstallDirs)
6065

66+
# Install header and library files
6167
install(FILES ${RUST_HEADER}
6268
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
6369
)
@@ -66,25 +72,14 @@ if(PROJECT_IS_TOP_LEVEL)
6672
DESTINATION ${CMAKE_INSTALL_LIBDIR}
6773
)
6874

69-
install(TARGETS moq
70-
EXPORT moq-targets
71-
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
72-
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
73-
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
74-
)
75-
76-
install(EXPORT moq-targets
77-
FILE moq-targets.cmake
78-
NAMESPACE moq::
79-
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/moq
80-
)
81-
75+
# Generate and install CMake package config files
8276
include(CMakePackageConfigHelpers)
8377

8478
configure_package_config_file(
8579
${CMAKE_CURRENT_SOURCE_DIR}/cmake/moq-config.cmake.in
8680
${CMAKE_CURRENT_BINARY_DIR}/moq-config.cmake
8781
INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/moq
82+
PATH_VARS CMAKE_INSTALL_LIBDIR CMAKE_INSTALL_INCLUDEDIR
8883
)
8984

9085
write_basic_package_version_file(

0 commit comments

Comments
 (0)