Skip to content

Commit

Permalink
feat(input): Support HLS streams (#242)
Browse files Browse the repository at this point in the history
This patch adds support for yt-dl streams with the protocol m3u8_native which includes sites like Soundcloud.

Closes: #241
  • Loading branch information
Erk- committed Jul 7, 2024
1 parent 2e68338 commit 8e92c49
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 18 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ serenity-voice-model = { optional = true, version = "0.2" }
simd-json = { features = ["serde_impl"], optional = true, version = "0.13" }
socket2 = { optional = true, version = "0.5" }
streamcatcher = { optional = true, version = "1" }
stream_lib = { optional = true, version = "0.4.1" }
symphonia = { default_features = false, optional = true, version = "0.5.2" }
symphonia-core = { optional = true, version = "0.5.2" }
tokio = { default-features = false, optional = true, version = "1.0" }
Expand Down Expand Up @@ -83,20 +84,22 @@ driver = [
"dep:async-trait",
"dep:audiopus",
"dep:byteorder",
"dep:bytes",
"dep:crypto_secretbox",
"dep:discortp",
"dep:reqwest",
"dep:flume",
"dep:nohash-hasher",
"dep:once_cell",
"dep:parking_lot",
"dep:rand",
"dep:reqwest",
"dep:ringbuf",
"dep:rubato",
"dep:rusty_pool",
"dep:serde-aux",
"dep:serenity-voice-model",
"dep:socket2",
"dep:stream_lib",
"dep:streamcatcher",
"dep:symphonia",
"dep:symphonia-core",
Expand Down
5 changes: 3 additions & 2 deletions src/driver/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ impl fmt::Display for Error {
Self::CryptoModeInvalid => write!(f, "server changed negotiated encryption mode"),
Self::CryptoModeUnavailable => write!(f, "server did not offer chosen encryption mode"),
Self::EndpointUrl => write!(f, "endpoint URL received from gateway was invalid"),
Self::IllegalDiscoveryResponse =>
write!(f, "IP discovery/NAT punching response was invalid"),
Self::IllegalDiscoveryResponse => {
write!(f, "IP discovery/NAT punching response was invalid")
},
Self::IllegalIp => write!(f, "IP discovery/NAT punching response had bad IP value"),
Self::Io(e) => e.fmt(f),
Self::Json(e) => e.fmt(f),
Expand Down
5 changes: 3 additions & 2 deletions src/driver/tasks/mixer/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl<'a> InternalTrack {
},
Ok(MixerInputResultMessage::Seek(parsed, rec, seek_res)) => {
match seek_res {
Ok(pos) =>
Ok(pos) => {
if let Some(time_base) = parsed.decoder.codec_params().time_base {
// Update track's position to match the actual timestamp the
// seek landed at.
Expand Down Expand Up @@ -282,7 +282,8 @@ impl<'a> InternalTrack {
SymphError::Unsupported("Track had no recorded time base.")
.into(),
))
},
}
},
Err(e) => Err(InputReadyingError::Seeking(e)),
}
},
Expand Down
10 changes: 6 additions & 4 deletions src/driver/test_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ impl DriverTestHandle {
OutputPacket::Empty => eprintln!("pkt: Nothing"),
OutputPacket::Rtp(p) => eprintln!("pkt: RTP[{}B]", p.len()),
OutputPacket::Raw(OutputMessage::Silent) => eprintln!("pkt: Raw-Silent"),
OutputPacket::Raw(OutputMessage::Passthrough(p)) =>
eprintln!("pkt: Raw-Passthrough[{}B]", p.len()),
OutputPacket::Raw(OutputMessage::Mixed(p)) =>
eprintln!("pkt: Raw-Mixed[{}B]", p.len()),
OutputPacket::Raw(OutputMessage::Passthrough(p)) => {
eprintln!("pkt: Raw-Passthrough[{}B]", p.len())
},
OutputPacket::Raw(OutputMessage::Mixed(p)) => {
eprintln!("pkt: Raw-Mixed[{}B]", p.len())
},
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/input/metadata/ytdl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Output {
pub uploader: Option<String>,
pub url: String,
pub webpage_url: Option<String>,
pub protocol: Option<String>,
}

impl Output {
Expand Down
149 changes: 149 additions & 0 deletions src/input/sources/hls.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{
io::{ErrorKind as IoErrorKind, Result as IoResult, SeekFrom},
pin::Pin,
task::{Context, Poll},
};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use pin_project::pin_project;
use reqwest::{header::HeaderMap, Client};
use stream_lib::Event;
use symphonia_core::io::MediaSource;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio_util::io::StreamReader;

use crate::input::{
AsyncAdapterStream,
AsyncMediaSource,
AudioStream,
AudioStreamError,
Compose,
Input,
};

/// Lazy HLS stream
#[derive(Debug)]
pub struct HlsRequest {
/// HTTP client
client: Client,
/// URL of hls playlist
request: String,
/// Headers of the request
headers: HeaderMap,
}

impl HlsRequest {
#[must_use]
/// Create a lazy HLS request.
pub fn new(client: Client, request: String) -> Self {
Self::new_with_headers(client, request, HeaderMap::default())
}

#[must_use]
/// Create a lazy HTTP request.
pub fn new_with_headers(client: Client, request: String, headers: HeaderMap) -> Self {
HlsRequest {
client,
request,
headers,
}
}

fn create_stream(&mut self) -> Result<HlsStream, AudioStreamError> {
let request = self
.client
.get(&self.request)
.headers(self.headers.clone())
.build()
.map_err(|why| AudioStreamError::Fail(why.into()))?;

let hls = stream_lib::download_hls(self.client.clone(), request, None);

let stream = Box::new(StreamReader::new(hls.map(|ev| match ev {
Event::Bytes { bytes } => Ok(bytes),
Event::End => Ok(Bytes::new()),
Event::Error { error } => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error,
)),
})));

Ok(HlsStream { stream })
}
}

#[pin_project]
struct HlsStream {
#[pin]
stream: Box<dyn AsyncRead + Send + Sync + Unpin>,
}

impl AsyncRead for HlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
AsyncRead::poll_read(self.project().stream, cx, buf)
}
}

impl AsyncSeek for HlsStream {
fn start_seek(self: Pin<&mut Self>, _position: SeekFrom) -> IoResult<()> {
Err(IoErrorKind::Unsupported.into())
}

fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<u64>> {
unreachable!()
}
}

#[async_trait]
impl AsyncMediaSource for HlsStream {
fn is_seekable(&self) -> bool {
false
}

async fn byte_len(&self) -> Option<u64> {
None
}

async fn try_resume(
&mut self,
_offset: u64,
) -> Result<Box<dyn AsyncMediaSource>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}
}

#[async_trait]
impl Compose for HlsRequest {
fn create(&mut self) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
self.create_stream().map(|input| {
let stream = AsyncAdapterStream::new(Box::new(input), 64 * 1024);

AudioStream {
input: Box::new(stream) as Box<dyn MediaSource>,
hint: None,
}
})
}

async fn create_async(
&mut self,
) -> Result<AudioStream<Box<dyn MediaSource>>, AudioStreamError> {
Err(AudioStreamError::Unsupported)
}

fn should_create_async(&self) -> bool {
false
}
}

impl From<HlsRequest> for Input {
fn from(val: HlsRequest) -> Self {
Input::Lazy(Box::new(val))
}
}
3 changes: 2 additions & 1 deletion src/input/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod file;
mod hls;
mod http;
mod ytdl;

pub use self::{file::*, http::*, ytdl::*};
pub use self::{file::*, hls::*, http::*, ytdl::*};
27 changes: 19 additions & 8 deletions src/input/sources/ytdl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::{error::Error, io::ErrorKind};
use symphonia_core::io::MediaSource;
use tokio::process::Command;

use super::HlsRequest;

const YOUTUBE_DL_COMMAND: &str = "yt-dlp";

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -194,14 +196,23 @@ impl Compose for YoutubeDl {
}));
}

let mut req = HttpRequest {
client: self.client.clone(),
request: result.url,
headers,
content_length: result.filesize,
};

req.create_async().await
#[allow(clippy::single_match_else)]
match result.protocol.as_deref() {
Some("m3u8_native") => {
let mut req =
HlsRequest::new_with_headers(self.client.clone(), result.url, headers);
req.create()
},
_ => {
let mut req = HttpRequest {
client: self.client.clone(),
request: result.url,
headers,
content_length: result.filesize,
};
req.create_async().await
},
}
}

fn should_create_async(&self) -> bool {
Expand Down

0 comments on commit 8e92c49

Please sign in to comment.