From 939dc7db4884cc1cf6555a1517aa7d6b5f1bbcd9 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 3 Jan 2026 23:09:28 +0100 Subject: [PATCH 1/2] perf: improve mixer and sample rate conversion * Use VecDeque for output buffering and avoid O(n) remove(0) operations. * Replace Mutex+Atomic pending list with a channel and consume via try_recv. * Drain/retain_mut sources in-place to reduce allocations and copies. --- CHANGELOG.md | 3 +- src/conversions/sample_rate.rs | 26 +++++++-------- src/mixer.rs | 59 ++++++++++++++-------------------- 3 files changed, 40 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cb03c86..1c135826 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,8 +48,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Blue` noise generator uses uniform instead of Gaussian noise for better performance. - `Gaussian` noise generator has standard deviation of 0.6 for perceptual equivalence. - `Velvet` noise generator takes density in Hz as `usize` instead of `f32`. -- Upgrade `cpal` to v0.17. +- Upgraded `cpal` to v0.17. - Clarified `Source::current_span_len()` contract documentation. +- Improved mixer and sample rate conversion performance. ## Version [0.21.1] (2025-07-14) diff --git a/src/conversions/sample_rate.rs b/src/conversions/sample_rate.rs index 0e58eeb4..6f55e821 100644 --- a/src/conversions/sample_rate.rs +++ b/src/conversions/sample_rate.rs @@ -1,6 +1,7 @@ use crate::common::{ChannelCount, SampleRate}; use crate::{math, Sample}; use num_rational::Ratio; +use std::collections::VecDeque; use std::mem; /// Iterator that converts from a certain sample rate to another. @@ -27,7 +28,7 @@ where /// This counter is incremented (modulo `to`) every time the iterator is called. next_output_span_pos_in_chunk: u32, /// The buffer containing the samples waiting to be output. - output_buffer: Vec, + output_buffer: VecDeque, } impl SampleRateConverter @@ -81,7 +82,10 @@ where next_output_span_pos_in_chunk: 0, current_span: first_samples, next_frame: next_samples, - output_buffer: Vec::with_capacity(num_channels.get() as usize - 1), + // Capacity: worst case is upsampling where we buffer multiple frames worth of samples. + output_buffer: VecDeque::with_capacity( + (to as f32 / from as f32).ceil() as usize * num_channels.get() as usize, + ), } } @@ -126,8 +130,8 @@ where } // Short circuit if there are some samples waiting. - if !self.output_buffer.is_empty() { - return Some(self.output_buffer.remove(0)); + if let Some(sample) = self.output_buffer.pop_front() { + return Some(sample); } // The span we are going to return from this function will be a linear interpolation @@ -172,7 +176,7 @@ where if off == 0 { result = Some(sample); } else { - self.output_buffer.push(sample); + self.output_buffer.push_back(sample); } } @@ -183,14 +187,10 @@ where result } else { // draining `self.current_span` - if !self.current_span.is_empty() { - let r = Some(self.current_span.remove(0)); - mem::swap(&mut self.output_buffer, &mut self.current_span); - self.current_span.clear(); - r - } else { - None - } + let mut current_span = self.current_span.drain(..); + let r = current_span.next()?; + self.output_buffer.extend(current_span); + Some(r) } } diff --git a/src/mixer.rs b/src/mixer.rs index 4f3241e8..70b7246c 100644 --- a/src/mixer.rs +++ b/src/mixer.rs @@ -3,10 +3,14 @@ use crate::common::{ChannelCount, SampleRate}; use crate::source::{SeekError, Source, UniformSourceIterator}; use crate::Sample; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; +#[cfg(feature = "crossbeam-channel")] +use crossbeam_channel::{unbounded as channel, Receiver, Sender}; +#[cfg(not(feature = "crossbeam-channel"))] +use std::sync::mpsc::{channel, Receiver, Sender}; + /// Builds a new mixer. /// /// You can choose the characteristics of the output thanks to this constructor. All the sounds @@ -19,9 +23,10 @@ use std::time::Duration; /// As a result, input sources added to the mixer later might not be forwarded to the sink. /// Add `Zero` source to prevent detaching the mixer from sink. pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSource) { + let (tx, rx) = channel(); + let input = Mixer(Arc::new(Inner { - has_pending: AtomicBool::new(false), - pending_sources: Mutex::new(Vec::new()), + pending_tx: tx, channels, sample_rate, })); @@ -31,7 +36,7 @@ pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSo input: input.clone(), sample_count: 0, still_pending: vec![], - still_current: vec![], + pending_rx: rx, }; (input, output) @@ -42,8 +47,7 @@ pub fn mixer(channels: ChannelCount, sample_rate: SampleRate) -> (Mixer, MixerSo pub struct Mixer(Arc); struct Inner { - has_pending: AtomicBool, - pending_sources: Mutex>>, + pending_tx: Sender>, channels: ChannelCount, sample_rate: SampleRate, } @@ -57,12 +61,8 @@ impl Mixer { { let uniform_source = UniformSourceIterator::new(source, self.0.channels, self.0.sample_rate); - self.0 - .pending_sources - .lock() - .unwrap() - .push(Box::new(uniform_source) as Box<_>); - self.0.has_pending.store(true, Ordering::SeqCst); // TODO: can we relax this ordering? + // Ignore send errors (channel dropped means MixerSource was dropped) + let _ = self.0.pending_tx.send(Box::new(uniform_source)); } } @@ -80,8 +80,8 @@ pub struct MixerSource { // A temporary vec used in start_pending_sources. still_pending: Vec>, - // A temporary vec used in sum_current_sources. - still_current: Vec>, + // Receiver for pending sources from the channel. + pending_rx: Receiver>, } impl Source for MixerSource { @@ -118,9 +118,7 @@ impl Iterator for MixerSource { #[inline] fn next(&mut self) -> Option { - if self.input.0.has_pending.load(Ordering::SeqCst) { - self.start_pending_sources(); - } + self.start_pending_sources(); self.sample_count += 1; @@ -145,9 +143,7 @@ impl MixerSource { // in-step with the modulo of the samples produced so far. Otherwise, the // sound will play on the wrong channels, e.g. left / right will be reversed. fn start_pending_sources(&mut self) { - let mut pending = self.input.0.pending_sources.lock().unwrap(); // TODO: relax ordering? - - for source in pending.drain(..) { + while let Ok(source) = self.pending_rx.try_recv() { let in_step = self .sample_count .is_multiple_of(source.channels().get() as usize); @@ -158,24 +154,19 @@ impl MixerSource { self.still_pending.push(source); } } - std::mem::swap(&mut self.still_pending, &mut pending); - - let has_pending = !pending.is_empty(); - self.input - .0 - .has_pending - .store(has_pending, Ordering::SeqCst); // TODO: relax ordering? } fn sum_current_sources(&mut self) -> Sample { let mut sum = 0.0; - for mut source in self.current_sources.drain(..) { - if let Some(value) = source.next() { - sum += value; - self.still_current.push(source); + self.current_sources.retain_mut(|source| { + match source.next() { + Some(value) => { + sum += value; + true // Keep this source + } + None => false, // Remove exhausted source } - } - std::mem::swap(&mut self.still_current, &mut self.current_sources); + }); sum } From a9b19a86d684a13848f613dd8a8dcfe35641f78b Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 3 Jan 2026 23:33:00 +0100 Subject: [PATCH 2/2] perf: use VecDeque for sources queue Replace the Vec used for next_sounds with VecDeque to avoid O(n) shifts when removing from the front. --- CHANGELOG.md | 2 +- src/queue.rs | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c135826..8f18514e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,7 +50,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Velvet` noise generator takes density in Hz as `usize` instead of `f32`. - Upgraded `cpal` to v0.17. - Clarified `Source::current_span_len()` contract documentation. -- Improved mixer and sample rate conversion performance. +- Improved queue, mixer and sample rate conversion performance. ## Version [0.21.1] (2025-07-14) diff --git a/src/queue.rs b/src/queue.rs index 3bf1695e..182e8f32 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -1,5 +1,6 @@ //! Queue that plays sounds one after the other. +use std::collections::VecDeque; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -26,7 +27,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; /// pub fn queue(keep_alive_if_empty: bool) -> (Arc, SourcesQueueOutput) { let input = Arc::new(SourcesQueueInput { - next_sounds: Mutex::new(Vec::new()), + next_sounds: Mutex::new(VecDeque::new()), keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty), }); @@ -48,7 +49,7 @@ type SignalDone = Option>; /// The input of the queue. pub struct SourcesQueueInput { - next_sounds: Mutex>, + next_sounds: Mutex>, // See constructor. keep_alive_if_empty: AtomicBool, @@ -64,7 +65,7 @@ impl SourcesQueueInput { self.next_sounds .lock() .unwrap() - .push((Box::new(source) as Box<_>, None)); + .push_back((Box::new(source) as Box<_>, None)); } /// Adds a new source to the end of the queue. @@ -81,7 +82,7 @@ impl SourcesQueueInput { self.next_sounds .lock() .unwrap() - .push((Box::new(source) as Box<_>, Some(tx))); + .push_back((Box::new(source) as Box<_>, Some(tx))); rx } @@ -175,7 +176,7 @@ impl Source for SourcesQueueOutput { // - After append: the appended source while playing // - With keep_alive: Zero (silence) while playing self.current.channels() - } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().first() { + } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() { // Current source exhausted, peek at next queued source // This is critical: UniformSourceIterator queries metadata during append, // before any samples are pulled. We must report the next source's metadata. @@ -194,7 +195,7 @@ impl Source for SourcesQueueOutput { if !self.current.is_exhausted() { // Current source is active (producing samples) self.current.sample_rate() - } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().first() { + } else if let Some((next, _)) = self.input.next_sounds.lock().unwrap().front() { // Current source exhausted, peek at next queued source // This prevents wrong resampling setup in UniformSourceIterator next.sample_rate() @@ -282,7 +283,9 @@ impl SourcesQueueOutput { let (next, signal_after_end) = { let mut next = self.input.next_sounds.lock().unwrap(); - if next.is_empty() { + if let Some(next) = next.pop_front() { + next + } else { let channels = self.current.channels(); let silence = Box::new(Zero::new_samples( channels, @@ -295,8 +298,6 @@ impl SourcesQueueOutput { } else { return Err(()); } - } else { - next.remove(0) } };