Skip to content

Commit d9775a7

Browse files
authored
Fix build by upgrading dependencies (incl hyper 1.0) (#118)
* Fix build by upgrading dependencies (incl hyper 1.0) * Fix the hyper 1.0 stuff * Fix tungstenite-axum compatibility issues * Fix more awful issues in listen.rs * Fix build
1 parent 34189bf commit d9775a7

File tree

11 files changed

+501
-375
lines changed

11 files changed

+501
-375
lines changed

Cargo.lock

+418-310
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ keywords = ["ssh", "share", "terminal", "collaborative"]
1414
[workspace.dependencies]
1515
anyhow = "1.0.62"
1616
clap = { version = "4.5.17", features = ["derive", "env"] }
17-
prost = "0.12.6"
17+
prost = "0.13.4"
1818
rand = "0.8.5"
1919
serde = { version = "1.0.188", features = ["derive", "rc"] }
2020
sshx-core = { version = "0.3.1", path = "crates/sshx-core" }
2121
tokio = { version = "1.40.0", features = ["full"] }
2222
tokio-stream = { version = "0.1.14", features = ["sync"] }
23-
tonic = { version = "0.11.0", features = ["tls", "tls-webpki-roots"] }
23+
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
2424
tracing = "0.1.37"
2525
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
2626

crates/sshx-core/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ serde.workspace = true
1616
tonic.workspace = true
1717

1818
[build-dependencies]
19-
tonic-build = "0.11.0"
19+
tonic-build = "0.12.3"

crates/sshx-core/build.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
55
tonic_build::configure()
66
.file_descriptor_set_path(descriptor_path)
77
.bytes(["."])
8-
.compile(&["proto/sshx.proto"], &["proto/"])?;
8+
.compile_protos(&["proto/sshx.proto"], &["proto/"])?;
99
Ok(())
1010
}

crates/sshx-server/Cargo.toml

+9-9
Original file line numberDiff line numberDiff line change
@@ -13,36 +13,36 @@ edition = "2021"
1313
anyhow.workspace = true
1414
async-channel = "1.9.0"
1515
async-stream = "0.3.5"
16-
axum = { version = "0.6.20", features = ["ws"] }
16+
axum = { version = "0.8.1", features = ["http2", "ws"] }
1717
base64 = "0.21.4"
1818
bytes = { version = "1.5.0", features = ["serde"] }
1919
ciborium = "0.2.1"
2020
clap.workspace = true
2121
dashmap = "5.5.3"
22-
deadpool = "0.10.0"
23-
deadpool-redis = "0.13.0"
22+
deadpool = "0.12.2"
23+
deadpool-redis = "0.18.0"
2424
futures-util = { version = "0.3.28", features = ["sink"] }
2525
hmac = "0.12.1"
26-
hyper = { version = "0.14.27", features = ["full"] }
26+
hyper = { version = "1.6.0", features = ["full"] }
2727
parking_lot = "0.12.1"
2828
prost.workspace = true
2929
rand.workspace = true
30-
redis = { version = "0.23.3", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
30+
redis = { version = "0.27.6", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
3131
serde.workspace = true
3232
sha2 = "0.10.7"
3333
sshx-core.workspace = true
3434
subtle = "2.5.0"
3535
tokio.workspace = true
3636
tokio-stream.workspace = true
37-
tokio-tungstenite = "0.20.0"
37+
tokio-tungstenite = "0.26.1"
3838
tonic.workspace = true
39-
tonic-reflection = "0.11.0"
39+
tonic-reflection = "0.12.3"
4040
tower = { version = "0.4.13", features = ["steer"] }
41-
tower-http = { version = "0.4.4", features = ["fs", "redirect", "trace"] }
41+
tower-http = { version = "0.6.2", features = ["fs", "redirect", "trace"] }
4242
tracing.workspace = true
4343
tracing-subscriber.workspace = true
4444
zstd = "0.12.4"
4545

4646
[dev-dependencies]
47-
reqwest = { version = "0.11.20", default-features = false, features = ["rustls-tls"] }
47+
reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls"] }
4848
sshx = { path = "../sshx" }

crates/sshx-server/src/lib.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
#![forbid(unsafe_code)]
1313
#![warn(missing_docs)]
1414

15-
use std::{net::SocketAddr, sync::Arc};
15+
use std::{fmt::Debug, net::SocketAddr, sync::Arc};
1616

1717
use anyhow::Result;
18-
use hyper::server::conn::AddrIncoming;
18+
use axum::serve::{Listener, ListenerExt};
19+
use tokio::net::TcpListener;
20+
use tracing::debug;
1921
use utils::Shutdown;
2022

2123
use crate::state::ServerState;
@@ -65,7 +67,11 @@ impl Server {
6567
}
6668

6769
/// Run the application server, listening on a stream of connections.
68-
pub async fn listen(&self, incoming: AddrIncoming) -> Result<()> {
70+
pub async fn listen<L>(&self, listener: L) -> Result<()>
71+
where
72+
L: Listener,
73+
L::Addr: Debug,
74+
{
6975
let state = self.state.clone();
7076
let terminated = self.shutdown.wait();
7177
tokio::spawn(async move {
@@ -79,12 +85,20 @@ impl Server {
7985
}
8086
});
8187

82-
listen::start_server(self.state(), incoming, self.shutdown.wait()).await
88+
listen::start_server(self.state(), listener, self.shutdown.wait()).await
8389
}
8490

8591
/// Convenience function to call [`Server::listen`] bound to a TCP address.
92+
///
93+
/// This also sets `TCP_NODELAY` on the incoming connections for performance
94+
/// reasons, as a reasonable default.
8695
pub async fn bind(&self, addr: &SocketAddr) -> Result<()> {
87-
self.listen(AddrIncoming::bind(addr)?).await
96+
let listener = TcpListener::bind(addr).await?.tap_io(|tcp_stream| {
97+
if let Err(err) = tcp_stream.set_nodelay(true) {
98+
debug!("failed to set TCP_NODELAY on incoming connection: {err:#}");
99+
}
100+
});
101+
self.listen(listener).await
88102
}
89103

90104
/// Send a graceful shutdown signal to the server.

crates/sshx-server/src/listen.rs

+24-32
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
1-
use std::{error::Error as StdError, future::Future, sync::Arc};
1+
use std::{fmt::Debug, future::Future, sync::Arc};
22

33
use anyhow::Result;
4-
use axum::body::HttpBody;
5-
use hyper::{
6-
header::CONTENT_TYPE,
7-
server::{conn::AddrIncoming, Server as HyperServer},
8-
service::make_service_fn,
9-
Body, Request,
10-
};
4+
use axum::body::Body;
5+
use axum::serve::Listener;
6+
use hyper::{header::CONTENT_TYPE, Request};
117
use sshx_core::proto::{sshx_service_server::SshxServiceServer, FILE_DESCRIPTOR_SET};
12-
use tonic::transport::Server as TonicServer;
13-
use tower::{steer::Steer, ServiceBuilder, ServiceExt};
8+
use tonic::service::Routes as TonicRoutes;
9+
use tower::{make::Shared, steer::Steer, ServiceExt};
1410
use tower_http::trace::TraceLayer;
1511

1612
use crate::{grpc::GrpcServer, web, ServerState};
@@ -19,33 +15,34 @@ use crate::{grpc::GrpcServer, web, ServerState};
1915
///
2016
/// This internal method is responsible for multiplexing the HTTP and gRPC
2117
/// servers onto a single, consolidated `hyper` service.
22-
pub(crate) async fn start_server(
18+
pub(crate) async fn start_server<L>(
2319
state: Arc<ServerState>,
24-
incoming: AddrIncoming,
25-
signal: impl Future<Output = ()>,
26-
) -> Result<()> {
27-
type BoxError = Box<dyn StdError + Send + Sync>;
28-
20+
listener: L,
21+
signal: impl Future<Output = ()> + Send + 'static,
22+
) -> Result<()>
23+
where
24+
L: Listener,
25+
L::Addr: Debug,
26+
{
2927
let http_service = web::app()
3028
.with_state(state.clone())
3129
.layer(TraceLayer::new_for_http())
32-
.map_response(|r| r.map(|b| b.map_err(BoxError::from).boxed_unsync()))
33-
.map_err(BoxError::from)
30+
.into_service()
3431
.boxed_clone();
3532

36-
let grpc_service = TonicServer::builder()
33+
let grpc_service = TonicRoutes::default()
3734
.add_service(SshxServiceServer::new(GrpcServer::new(state)))
3835
.add_service(
3936
tonic_reflection::server::Builder::configure()
4037
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
41-
.build()?,
38+
.build_v1()?,
4239
)
43-
.into_service();
44-
45-
let grpc_service = ServiceBuilder::new()
40+
.into_axum_router()
4641
.layer(TraceLayer::new_for_grpc())
47-
.service(grpc_service)
48-
.map_response(|r| r.map(|b| b.map_err(BoxError::from).boxed_unsync()))
42+
.into_service()
43+
// This type conversion is necessary because Tonic 0.12 uses Axum 0.7, so its `axum::Router`
44+
// and `axum::Body` are based on an older `axum_core` version.
45+
.map_response(|r| r.map(Body::new))
4946
.boxed_clone();
5047

5148
let svc = Steer::new(
@@ -58,14 +55,9 @@ pub(crate) async fn start_server(
5855
}
5956
},
6057
);
61-
let make_svc = make_service_fn(move |_| {
62-
let svc = svc.clone();
63-
async { Ok::<_, std::convert::Infallible>(svc) }
64-
});
58+
let make_svc = Shared::new(svc);
6559

66-
HyperServer::builder(incoming)
67-
.tcp_nodelay(true)
68-
.serve(make_svc)
60+
axum::serve(listener, make_svc)
6961
.with_graceful_shutdown(signal)
7062
.await?;
7163

crates/sshx-server/src/state/mesh.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
use std::{pin::pin, sync::Arc, time::Duration};
44

55
use anyhow::Result;
6-
use deadpool::managed::Manager;
76
use redis::AsyncCommands;
87
use tokio::time;
98
use tokio_stream::{Stream, StreamExt};
@@ -19,7 +18,7 @@ const STORAGE_EXPIRY: Duration = Duration::from_secs(300);
1918

2019
fn set_opts() -> redis::SetOptions {
2120
redis::SetOptions::default()
22-
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as usize))
21+
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as u64))
2322
}
2423

2524
/// Communication with a distributed mesh of sshx server nodes.
@@ -33,6 +32,7 @@ fn set_opts() -> redis::SetOptions {
3332
#[derive(Clone)]
3433
pub struct StorageMesh {
3534
redis: deadpool_redis::Pool,
35+
redis_pubsub: redis::Client,
3636
host: Option<String>,
3737
}
3838

@@ -46,8 +46,18 @@ impl StorageMesh {
4646
.runtime(deadpool_redis::Runtime::Tokio1)
4747
.build()?;
4848

49+
// Separate `redis::Client` just for pub/sub connections.
50+
//
51+
// At time of writing, deadpool-redis has not been updated to support the new
52+
// pub/sub client APIs in Rust. This is a temporary workaround that creates a
53+
// new Redis client on the side, bypassing the connection pool.
54+
//
55+
// Reference: https://github.com/deadpool-rs/deadpool/issues/226
56+
let redis_pubsub = redis::Client::open(redis_url)?;
57+
4958
Ok(Self {
5059
redis,
60+
redis_pubsub,
5161
host: host.map(|s| s.to_string()),
5262
})
5363
}
@@ -161,15 +171,14 @@ impl StorageMesh {
161171

162172
loop {
163173
// Requires an owned, non-pool connection for ownership reasons.
164-
let conn = match self.redis.manager().create().await {
165-
Ok(conn) => conn,
174+
let mut pubsub = match self.redis_pubsub.get_async_pubsub().await {
175+
Ok(pubsub) => pubsub,
166176
Err(err) => {
167177
error!(?err, "failed to connect to redis for pub/sub");
168178
time::sleep(Duration::from_secs(5)).await;
169179
continue;
170180
}
171181
};
172-
let mut pubsub = conn.into_pubsub();
173182
if let Err(err) = pubsub.subscribe(format!("transfers:{host}")).await {
174183
error!(?err, "failed to subscribe to transfers");
175184
time::sleep(Duration::from_secs(1)).await;

crates/sshx-server/src/web.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@ pub fn app() -> Router<Arc<ServerState>> {
3030

3131
/// Routes for the backend web API server.
3232
fn backend() -> Router<Arc<ServerState>> {
33-
Router::new().route("/s/:name", get(socket::get_session_ws))
33+
Router::new().route("/s/{name}", get(socket::get_session_ws))
3434
}

crates/sshx-server/src/web/socket.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ async fn handle_socket(socket: &mut WebSocket, session: Arc<Session>) -> Result<
7575
async fn send(socket: &mut WebSocket, msg: WsServer) -> Result<()> {
7676
let mut buf = Vec::new();
7777
ciborium::ser::into_writer(&msg, &mut buf)?;
78-
socket.send(Message::Binary(buf)).await?;
78+
socket.send(Message::Binary(Bytes::from(buf))).await?;
7979
Ok(())
8080
}
8181

@@ -265,12 +265,12 @@ async fn proxy_redirect(socket: &mut WebSocket, host: &str, name: &str) -> Resul
265265
tokio::select! {
266266
Some(client_msg) = socket.recv() => {
267267
let msg = match client_msg {
268-
Ok(Message::Text(s)) => Some(TMessage::Text(s)),
268+
Ok(Message::Text(s)) => Some(TMessage::Text(s.as_str().into())),
269269
Ok(Message::Binary(b)) => Some(TMessage::Binary(b)),
270270
Ok(Message::Close(frame)) => {
271271
let frame = frame.map(|frame| TCloseFrame {
272272
code: frame.code.into(),
273-
reason: frame.reason,
273+
reason: frame.reason.as_str().into(),
274274
});
275275
Some(TMessage::Close(frame))
276276
}
@@ -285,12 +285,12 @@ async fn proxy_redirect(socket: &mut WebSocket, host: &str, name: &str) -> Resul
285285
}
286286
Some(server_msg) = upstream.next() => {
287287
let msg = match server_msg {
288-
Ok(TMessage::Text(s)) => Some(Message::Text(s)),
288+
Ok(TMessage::Text(s)) => Some(Message::Text(s.as_str().into())),
289289
Ok(TMessage::Binary(b)) => Some(Message::Binary(b)),
290290
Ok(TMessage::Close(frame)) => {
291291
let frame = frame.map(|frame| CloseFrame {
292292
code: frame.code.into(),
293-
reason: frame.reason,
293+
reason: frame.reason.as_str().into(),
294294
});
295295
Some(Message::Close(frame))
296296
}

crates/sshx-server/tests/common/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ use std::sync::Arc;
44
use std::time::Duration;
55

66
use anyhow::{ensure, Result};
7+
use axum::serve::ListenerExt;
78
use futures_util::{SinkExt, StreamExt};
8-
use hyper::{server::conn::AddrIncoming, StatusCode};
9+
use hyper::StatusCode;
910
use sshx::encrypt::Encrypt;
1011
use sshx_core::proto::sshx_service_client::SshxServiceClient;
1112
use sshx_core::{Sid, Uid};
@@ -34,12 +35,14 @@ impl TestServer {
3435
let listener = TcpListener::bind("[::1]:0").await.unwrap();
3536
let local_addr = listener.local_addr().unwrap();
3637

37-
let incoming = AddrIncoming::from_listener(listener).unwrap();
3838
let server = Arc::new(Server::new(Default::default()).unwrap());
3939
{
4040
let server = Arc::clone(&server);
41+
let listener = listener.tap_io(|tcp_stream| {
42+
_ = tcp_stream.set_nodelay(true);
43+
});
4144
tokio::spawn(async move {
42-
server.listen(incoming).await.unwrap();
45+
server.listen(listener).await.unwrap();
4346
});
4447
}
4548

@@ -124,7 +127,7 @@ impl ClientSocket {
124127
pub async fn send(&mut self, msg: WsClient) {
125128
let mut buf = Vec::new();
126129
ciborium::ser::into_writer(&msg, &mut buf).unwrap();
127-
self.inner.send(Message::Binary(buf)).await.unwrap();
130+
self.inner.send(Message::Binary(buf.into())).await.unwrap();
128131
}
129132

130133
pub async fn send_input(&mut self, id: Sid, data: &[u8]) {

0 commit comments

Comments
 (0)