forked from moq-dev/moq
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconnection.rs
More file actions
64 lines (56 loc) · 2.12 KB
/
connection.rs
File metadata and controls
64 lines (56 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use crate::{Auth, Cluster};
use moq_native::Request;
pub struct Connection {
pub id: u64,
pub request: Request,
pub cluster: Cluster,
pub auth: Auth,
}
impl Connection {
#[tracing::instrument("conn", skip_all, fields(id = self.id))]
pub async fn run(self) -> anyhow::Result<()> {
let (path, token) = match &self.request {
Request::WebTransport(request) => {
// Extract the path and token from the URL.
let path = request.url().path();
let token = request
.url()
.query_pairs()
.find(|(k, _)| k == "jwt")
.map(|(_, v)| v.to_string());
(path, token)
}
Request::Quic(_conn) => ("", None),
};
// Verify the URL before accepting the connection.
let token = match self.auth.verify(path, token.as_deref()) {
Ok(token) => token,
Err(err) => {
let _ = self.request.close(err.clone().into()).await;
return Err(err.into());
}
};
let publish = self.cluster.publisher(&token);
let subscribe = self.cluster.subscriber(&token);
match (&publish, &subscribe) {
(Some(publish), Some(subscribe)) => {
tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), "session accepted");
}
(Some(publish), None) => {
tracing::info!(root = %token.root, publish = %publish.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), "publisher accepted");
}
(None, Some(subscribe)) => {
tracing::info!(root = %token.root, subscribe = %subscribe.allowed().map(|p| p.as_str()).collect::<Vec<_>>().join(","), "subscriber accepted")
}
_ => anyhow::bail!("invalid session; no allowed paths"),
}
// Accept the connection.
let session = self.request.ok().await?;
// NOTE: subscribe and publish seem backwards because of how relays work.
// We publish the tracks the client is allowed to subscribe to.
// We subscribe to the tracks the client is allowed to publish.
let session = moq_lite::Session::accept(session, subscribe, publish).await?;
// Wait until the session is closed.
Err(session.closed().await.into())
}
}