-
Notifications
You must be signed in to change notification settings - Fork 440
PipeWire implementation #938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
… server and test client. Remove rstest use of case directive.
…e control when to release a listener instance.
…ted in cascade (improved tests execution time)
…run them sequentially anymore.
…ing requests/responses in async context. Fix instabilities when running all tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi and thank you for this pull request. PipeWire is certainly the future and therefore this PR is very valuable.
Sadly, the time that I can dedicate to cpal and my free time OSS endeavours is very limited as of late. So I can't really review this PR in its depth. I glanced over the code and it doesn't look bad.
Would you be interested in me merging this and then the community figures out if there is issues/bugs? If you have the time to spare, I can make you a maintainer.
Hi thanks for your feedback, indeed this a huge PR. I'm still working on it but on my own repository at this moment, I was planning to return to this PR to include only the PipeWire integration into CPAL. I don't mind if you merge it now, that could be cool to have feedbacks. But I was thinking if this Pipewire client implementation could have it own repository under RustAudio organization. What do you think ? Either way, I would be glad to maintain it. |
Good question, if by overlap you mean about merge conflicts that will depend on which PR will be merged first. I need to do some rework before this PR will be merged. If you mean overlap between PulseAudio integration and PipeWire integration. I guess if you have a PipeWire server running on your system, you could use CPAL PulseAudio integration to connect to the PipeWire server through their pipewire-pulse daemon. I don't have any knowledge about how PA is working but to integrate PW to CPAL I needed to create an init phase to retrieve default configuration (sample rate, etc) and default devices configured by the session manager (WirePlumber). That could be interesting to know how your PA implementation behave when connected to pipewire-pulse daemon. If think the two implementations can coexists. If tomorrow I'm going to switch to Archlinux and setup a PA server, I'll be glad to find a PA integration into CPAL. |
New collaborator here and doing backlog grooming, when I came across this very nice and pretty sizable 😉 addition! From what I read this PR was 'bout good to merge but then it wasn't. When we agree that PulseAudio and PipeWire implementations can co-exist, and are both maintained, then what'd be the right next step? In any case it'd be great if you could resolve the conflicts - shouldn't be much - and get the CI to pass. To the point of a code review I am going to trigger a Copilot review in any case, hoping that will address major points, if any. Then based on the maturity of where we are, there's all sorts of directions we could take:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces comprehensive PipeWire support for CPAL, adding a new audio backend to complement existing ALSA and JACK support on Linux. The implementation provides full audio device enumeration, stream management, and configuration capabilities through PipeWire's modern audio architecture.
Key changes include:
- New PipeWire host implementation with device discovery and stream creation
- Sample format conversion utilities between CPAL and PipeWire formats
- Updated examples and documentation to demonstrate PipeWire usage
Reviewed Changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 10 comments.
Show a summary per file
File | Description |
---|---|
src/platform/mod.rs | Adds PipeWire backend to platform host implementations |
src/host/mod.rs | Conditionally includes PipeWire module for supported platforms |
src/host/pipewire/*.rs | Core PipeWire implementation files (host, device, stream, utils) |
examples/*.rs | Updates beep and synth_tones examples to support PipeWire |
Cargo.toml | Adds PipeWire dependencies and feature flag |
.github/workflows/cpal.yml | Installs PipeWire development libraries in CI |
README.md | Documents PipeWire support and installation requirements |
}) | ||
.filter(move |result| result.is_ok()) | ||
.last(); | ||
sample_format.unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code will panic if no supported sample format is found in the alternatives. Consider returning an error instead of unwrap().
sample_format.unwrap() | |
match sample_format { | |
Some(Ok(fmt)) => Ok(fmt), | |
_ => Err(BackendSpecificError { | |
description: "No supported sample format found in alternatives".to_string(), | |
}), | |
} |
Copilot uses AI. Check for mistakes.
fn from(value: (&StreamConfig, SampleFormat)) -> Self; | ||
} | ||
|
||
impl FromStreamConfigWithSampleFormat for AudioStreamInfo { | ||
fn from(value: (&StreamConfig, SampleFormat)) -> Self { | ||
Self { | ||
media_type: MediaType::Audio, | ||
media_subtype: MediaSubtype::Raw, | ||
sample_format: value.1.try_into().unwrap(), | ||
sample_rate: value.0.sample_rate.0, | ||
channels: value.0.channels as u32, | ||
position: AudioChannelPosition::default(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() could panic if the sample format conversion fails. Consider handling the error properly instead of panicking.
fn from(value: (&StreamConfig, SampleFormat)) -> Self; | |
} | |
impl FromStreamConfigWithSampleFormat for AudioStreamInfo { | |
fn from(value: (&StreamConfig, SampleFormat)) -> Self { | |
Self { | |
media_type: MediaType::Audio, | |
media_subtype: MediaSubtype::Raw, | |
sample_format: value.1.try_into().unwrap(), | |
sample_rate: value.0.sample_rate.0, | |
channels: value.0.channels as u32, | |
position: AudioChannelPosition::default(), | |
} | |
fn from(value: (&StreamConfig, SampleFormat)) -> Result<Self, BackendSpecificError> where Self: Sized; | |
} | |
impl FromStreamConfigWithSampleFormat for AudioStreamInfo { | |
fn from(value: (&StreamConfig, SampleFormat)) -> Result<Self, BackendSpecificError> { | |
Ok(Self { | |
media_type: MediaType::Audio, | |
media_subtype: MediaSubtype::Raw, | |
sample_format: value.1.try_into()?, | |
sample_rate: value.0.sample_rate.0, | |
channels: value.0.channels as u32, | |
position: AudioChannelPosition::default(), | |
}) |
Copilot uses AI. Check for mistakes.
|
||
impl StreamTrait for Stream { | ||
fn play(&self) -> Result<(), PlayStreamError> { | ||
self.client.stream().connect(self.name.clone()).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() could panic if the stream connection fails. The function returns Result<(), PlayStreamError> but doesn't handle the potential error properly.
self.client.stream().connect(self.name.clone()).unwrap(); | |
self.client.stream().connect(self.name.clone())?; |
Copilot uses AI. Check for mistakes.
} | ||
|
||
fn pause(&self) -> Result<(), PauseStreamError> { | ||
self.client.stream().disconnect(self.name.clone()).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() could panic if the stream disconnection fails. The function returns Result<(), PauseStreamError> but doesn't handle the potential error properly.
self.client.stream().disconnect(self.name.clone()).unwrap(); | |
self.client.stream().disconnect(self.name.clone())?; |
Copilot uses AI. Check for mistakes.
|
||
impl Drop for Stream { | ||
fn drop(&mut self) { | ||
self.client.stream().delete(self.name.clone()).unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() in Drop implementation could panic during cleanup. Consider logging the error instead of panicking, as panicking in Drop can cause unexpected behavior.
self.client.stream().delete(self.name.clone()).unwrap() | |
if let Err(e) = self.client.stream().delete(self.name.clone()) { | |
eprintln!("Failed to delete stream '{}': {:?}", self.name, e); | |
} |
Copilot uses AI. Check for mistakes.
impl Host { | ||
pub fn new() -> Result<Self, HostUnavailable> { | ||
let timeout = Duration::from_secs(30); | ||
let runtime = Arc::new(Runtime::new().unwrap()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() could panic if the Tokio runtime creation fails. Consider handling this error and returning HostUnavailable instead.
let runtime = Arc::new(Runtime::new().unwrap()); | |
let runtime = Arc::new(Runtime::new().map_err(|_| HostUnavailable)?); |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runtime should not be tokio, but agnostic for apps which don't use tokio but other runtimes like async_std or smol.
let mut supported_configs = vec![]; | ||
for &sample_format in self.format.sample_format.alternatives.iter() { | ||
supported_configs.push(SupportedStreamConfigRange { | ||
channels: f.channels, | ||
min_sample_rate: SampleRate(self.format.sample_rate.minimum), | ||
max_sample_rate: SampleRate(self.format.sample_rate.maximum), | ||
buffer_size: f.buffer_size.clone(), | ||
sample_format: sample_format.try_into().unwrap(), | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() could panic if the sample format conversion fails. Consider handling the error gracefully instead of panicking.
let mut supported_configs = vec![]; | |
for &sample_format in self.format.sample_format.alternatives.iter() { | |
supported_configs.push(SupportedStreamConfigRange { | |
channels: f.channels, | |
min_sample_rate: SampleRate(self.format.sample_rate.minimum), | |
max_sample_rate: SampleRate(self.format.sample_rate.maximum), | |
buffer_size: f.buffer_size.clone(), | |
sample_format: sample_format.try_into().unwrap(), | |
}); | |
} | |
let supported_configs = self.format.sample_format.alternatives.iter().filter_map(|&sample_format| { | |
match sample_format.try_into() { | |
Ok(fmt) => Some(SupportedStreamConfigRange { | |
channels: f.channels, | |
min_sample_rate: SampleRate(self.format.sample_rate.minimum), | |
max_sample_rate: SampleRate(self.format.sample_rate.maximum), | |
buffer_size: f.buffer_size.clone(), | |
sample_format: fmt, | |
}), | |
Err(_) => None, | |
} | |
}).collect(); |
Copilot uses AI. Check for mistakes.
let mut data = data.unwrap(); | ||
data_callback(&mut data) | ||
} | ||
).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unwrap() could panic if stream creation fails. Consider propagating the error as BuildStreamError instead.
).unwrap(); | |
).map_err(|err| BuildStreamError::BackendSpecific { err: BackendSpecificError { description: err.to_string() } })?; |
Copilot uses AI. Check for mistakes.
), | ||
feature = "pipewire" | ||
))] | ||
pub(crate)mod pipewire; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing space between 'pub(crate)' and 'mod'. Should be 'pub(crate) mod pipewire;'.
pub(crate)mod pipewire; | |
pub(crate) mod pipewire; |
Copilot uses AI. Check for mistakes.
@@ -11,6 +11,8 @@ rust-version = "1.70" | |||
|
|||
[features] | |||
asio = ["asio-sys", "num-traits"] # Only available on Windows. See README for setup instructions. | |||
jack = ["dep:jack"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The jack feature was already defined implicitly. This line appears to be redefining it, which could cause confusion. Verify if this is intentional.
jack = ["dep:jack"] |
Copilot uses AI. Check for mistakes.
Please don't make this depend on tokio. It should be runtime agnostic. Many projects use alternative async runtimes. |
Is there any reason to introduce async at all in CPAL? |
@roderickvd: I'll check the review this week-end. This feature already has a feature flag. I also moved the client implementation code to a separate repository to keep the current PR short. We could merge this PR and keep the PipeWire integration marked as experimental, as you suggested, until the client implementation is mature enough. That way we can collect feedback from users and developers, improve the implementation design, and iterate over time. If there is other suggestions, I'll be happy to hear your thoughts. I'll try to give you some context on why I used Tokio/async. PipeWire is designed as a client/server architecture. As a client, you need to use an event loop to catch events sent from the server, and that event loop runs on a separate thread. The implemented client is fully synchronous, but to consider the client initialized we need to wait for particular data (device nodes and properties, global settings) and for WirePlumber to initialize. This process is not linear or deterministic. The main idea was to use a short-lived thread, through Tokio, to wait for certain conditions during the initialization phase. The intent was only for internal purposes. I agree this was not the best design decision and it needs some refactoring. I hope this answers your question. Please let me know if you have any other questions I can clarify. |
I certainly don’t question your design decisions, except for introducing async which means introducing overhead and code bloat with questionable benifits in the uses I can see for the CPAL library. I have no issues with async when used where needed, I just don't see the need here, but maybe I'm wrong, and there are uses for CPAL in situations where async is beneficial? |
PipeWire implementation based on pipewire/pipewire-rs/doc and others draft PRs in this repository (thanks to them).
There is two additional crates, one for the PW client and one for SPA utils (used for some POD deserialization).
Client capabilities:
I updated some CPAL examples (enumerate and beep).
TODO (just some ideas):
I'm posting this PR as draft for now, if you have any suggestion/feedback or particular use cases, I'm glad to discuss it.
Note: You will need to install PipeWire development lib from your package manager. Having WirePlumber or PipeWire Media Session running. If your PW socket is located somewhere else you can define env vars: