Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Blue` noise generator uses uniform instead of Gaussian noise for better performance.
- `Gaussian` noise generator has standard deviation of 0.6 for perceptual equivalence.
- `Velvet` noise generator takes density in Hz as `usize` instead of `f32`.
- Upgrade `cpal` to v0.17.
- Upgraded `cpal` to v0.17.
- Clarified `Source::current_span_len()` contract documentation.
- Improved queue, mixer and sample rate conversion performance.

## Version [0.21.1] (2025-07-14)

Expand Down
26 changes: 13 additions & 13 deletions src/conversions/sample_rate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::common::{ChannelCount, SampleRate};
use crate::{math, Sample};
use num_rational::Ratio;
use std::collections::VecDeque;
use std::mem;

/// Iterator that converts from a certain sample rate to another.
Expand All @@ -27,7 +28,7 @@ where
/// This counter is incremented (modulo `to`) every time the iterator is called.
next_output_span_pos_in_chunk: u32,
/// The buffer containing the samples waiting to be output.
output_buffer: Vec<I::Item>,
output_buffer: VecDeque<I::Item>,
}

impl<I> SampleRateConverter<I>
Expand Down Expand Up @@ -81,7 +82,10 @@ where
next_output_span_pos_in_chunk: 0,
current_span: first_samples,
next_frame: next_samples,
output_buffer: Vec::with_capacity(num_channels.get() as usize - 1),
// Capacity: worst case is upsampling where we buffer multiple frames worth of samples.
output_buffer: VecDeque::with_capacity(
(to as f32 / from as f32).ceil() as usize * num_channels.get() as usize,
),
}
}

Expand Down Expand Up @@ -126,8 +130,8 @@ where
}

// Short circuit if there are some samples waiting.
if !self.output_buffer.is_empty() {
return Some(self.output_buffer.remove(0));
if let Some(sample) = self.output_buffer.pop_front() {
return Some(sample);
}

// The span we are going to return from this function will be a linear interpolation
Expand Down Expand Up @@ -172,7 +176,7 @@ where
if off == 0 {
result = Some(sample);
} else {
self.output_buffer.push(sample);
self.output_buffer.push_back(sample);
}
}

Expand All @@ -183,14 +187,10 @@ where
result
} else {
// draining `self.current_span`
if !self.current_span.is_empty() {
let r = Some(self.current_span.remove(0));
mem::swap(&mut self.output_buffer, &mut self.current_span);
self.current_span.clear();
r
} else {
None
}
let mut current_span = self.current_span.drain(..);
let r = current_span.next()?;
self.output_buffer.extend(current_span);
Some(r)
}
}

Expand Down
59 changes: 25 additions & 34 deletions src/mixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
use crate::common::{ChannelCount, SampleRate};
use crate::source::{SeekError, Source, UniformSourceIterator};
use crate::Sample;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;

#[cfg(feature = "crossbeam-channel")]
use crossbeam_channel::{unbounded as channel, Receiver, Sender};
#[cfg(not(feature = "crossbeam-channel"))]
use std::sync::mpsc::{channel, Receiver, Sender};

/// Builds a new mixer.
///
/// You can choose the characteristics of the output thanks to this constructor. All the sounds
Expand All @@ -19,9 +23,10 @@ use std::time::Duration;
/// As a result, input sources added to the mixer later might not be forwarded to the sink.
/// Add `Zero` source to prevent detaching the mixer from sink.
pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSource) {
let (tx, rx) = channel();

let input = Mixer(Arc::new(Inner {
has_pending: AtomicBool::new(false),
pending_sources: Mutex::new(Vec::new()),
pending_tx: tx,
channels,
sample_rate,
}));
Expand All @@ -31,7 +36,7 @@ pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSo
input: input.clone(),
sample_count: 0,
still_pending: vec![],
still_current: vec![],
pending_rx: rx,
};

(input, output)
Expand All @@ -42,8 +47,7 @@ pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSo
pub struct Mixer(Arc<Inner>);

struct Inner {
has_pending: AtomicBool,
pending_sources: Mutex<Vec<Box<dyn Source + Send>>>,
pending_tx: Sender<Box<dyn Source + Send>>,
channels: ChannelCount,
sample_rate: SampleRate,
}
Expand All @@ -57,12 +61,8 @@ impl Mixer {
{
let uniform_source =
UniformSourceIterator::new(source, self.0.channels, self.0.sample_rate);
self.0
.pending_sources
.lock()
.unwrap()
.push(Box::new(uniform_source) as Box<_>);
self.0.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering?
// Ignore send errors (channel dropped means MixerSource was dropped)
let _ = self.0.pending_tx.send(Box::new(uniform_source));
}
}

Expand All @@ -80,8 +80,8 @@ pub struct MixerSource {
// A temporary vec used in start_pending_sources.
still_pending: Vec<Box<dyn Source + Send>>,

// A temporary vec used in sum_current_sources.
still_current: Vec<Box<dyn Source + Send>>,
// Receiver for pending sources from the channel.
pending_rx: Receiver<Box<dyn Source + Send>>,
}

impl Source for MixerSource {
Expand Down Expand Up @@ -118,9 +118,7 @@ impl Iterator for MixerSource {

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.input.0.has_pending.load(Ordering::SeqCst) {
self.start_pending_sources();
}
self.start_pending_sources();

self.sample_count += 1;

Expand All @@ -145,9 +143,7 @@ impl MixerSource {
// in-step with the modulo of the samples produced so far. Otherwise, the
// sound will play on the wrong channels, e.g. left / right will be reversed.
fn start_pending_sources(&mut self) {
let mut pending = self.input.0.pending_sources.lock().unwrap(); // TODO: relax ordering?

for source in pending.drain(..) {
while let Ok(source) = self.pending_rx.try_recv() {
let in_step = self
.sample_count
.is_multiple_of(source.channels().get() as usize);
Expand All @@ -158,24 +154,19 @@ impl MixerSource {
self.still_pending.push(source);
}
}
std::mem::swap(&mut self.still_pending, &mut pending);

let has_pending = !pending.is_empty();
self.input
.0
.has_pending
.store(has_pending, Ordering::SeqCst); // TODO: relax ordering?
}

fn sum_current_sources(&mut self) -> Sample {
let mut sum = 0.0;
for mut source in self.current_sources.drain(..) {
if let Some(value) = source.next() {
sum += value;
self.still_current.push(source);
self.current_sources.retain_mut(|source| {
match source.next() {
Some(value) => {
sum += value;
true // Keep this source
}
None => false, // Remove exhausted source
}
}
std::mem::swap(&mut self.still_current, &mut self.current_sources);
});

sum
}
Expand Down
19 changes: 10 additions & 9 deletions src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Queue that plays sounds one after the other.

use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand All @@ -26,7 +27,7 @@ use std::sync::mpsc::{channel, Receiver, Sender};
///
pub fn queue(keep_alive_if_empty: bool) -> (Arc<SourcesQueueInput>, SourcesQueueOutput) {
let input = Arc::new(SourcesQueueInput {
next_sounds: Mutex::new(Vec::new()),
next_sounds: Mutex::new(VecDeque::new()),
keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty),
});

Expand All @@ -48,7 +49,7 @@ type SignalDone = Option<Sender<()>>;

/// The input of the queue.
pub struct SourcesQueueInput {
next_sounds: Mutex<Vec<(Sound, SignalDone)>>,
next_sounds: Mutex<VecDeque<(Sound, SignalDone)>>,

// See constructor.
keep_alive_if_empty: AtomicBool,
Expand All @@ -64,7 +65,7 @@ impl SourcesQueueInput {
self.next_sounds
.lock()
.unwrap()
.push((Box::new(source) as Box<_>, None));
.push_back((Box::new(source) as Box<_>, None));
}

/// Adds a new source to the end of the queue.
Expand All @@ -81,7 +82,7 @@ impl SourcesQueueInput {
self.next_sounds
.lock()
.unwrap()
.push((Box::new(source) as Box<_>, Some(tx)));
.push_back((Box::new(source) as Box<_>, Some(tx)));
rx
}

Expand Down Expand Up @@ -175,7 +176,7 @@ impl Source for SourcesQueueOutput {
// - After append: the appended source while playing
// - With keep_alive: Zero (silence) while playing
self.current.channels()
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().first() {
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
// Current source exhausted, peek at next queued source
// This is critical: UniformSourceIterator queries metadata during append,
// before any samples are pulled. We must report the next source's metadata.
Expand All @@ -194,7 +195,7 @@ impl Source for SourcesQueueOutput {
if !self.current.is_exhausted() {
// Current source is active (producing samples)
self.current.sample_rate()
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().first() {
} else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() {
// Current source exhausted, peek at next queued source
// This prevents wrong resampling setup in UniformSourceIterator
next.sample_rate()
Expand Down Expand Up @@ -282,7 +283,9 @@ impl SourcesQueueOutput {
let (next, signal_after_end) = {
let mut next = self.input.next_sounds.lock().unwrap();

if next.is_empty() {
if let Some(next) = next.pop_front() {
next
} else {
let channels = self.current.channels();
let silence = Box::new(Zero::new_samples(
channels,
Expand All @@ -295,8 +298,6 @@ impl SourcesQueueOutput {
} else {
return Err(());
}
} else {
next.remove(0)
}
};

Expand Down