Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix build by upgrading dependencies (incl hyper 1.0) #118

Merged
merged 5 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
728 changes: 418 additions & 310 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ keywords = ["ssh", "share", "terminal", "collaborative"]
[workspace.dependencies]
anyhow = "1.0.62"
clap = { version = "4.5.17", features = ["derive", "env"] }
prost = "0.12.6"
prost = "0.13.4"
rand = "0.8.5"
serde = { version = "1.0.188", features = ["derive", "rc"] }
sshx-core = { version = "0.3.1", path = "crates/sshx-core" }
tokio = { version = "1.40.0", features = ["full"] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tonic = { version = "0.11.0", features = ["tls", "tls-webpki-roots"] }
tonic = { version = "0.12.3", features = ["tls", "tls-webpki-roots"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }

Expand Down
2 changes: 1 addition & 1 deletion crates/sshx-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ serde.workspace = true
tonic.workspace = true

[build-dependencies]
tonic-build = "0.11.0"
tonic-build = "0.12.3"
2 changes: 1 addition & 1 deletion crates/sshx-core/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.file_descriptor_set_path(descriptor_path)
.bytes(["."])
.compile(&["proto/sshx.proto"], &["proto/"])?;
.compile_protos(&["proto/sshx.proto"], &["proto/"])?;
Ok(())
}
18 changes: 9 additions & 9 deletions crates/sshx-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ edition = "2021"
anyhow.workspace = true
async-channel = "1.9.0"
async-stream = "0.3.5"
axum = { version = "0.6.20", features = ["ws"] }
axum = { version = "0.8.1", features = ["http2", "ws"] }
base64 = "0.21.4"
bytes = { version = "1.5.0", features = ["serde"] }
ciborium = "0.2.1"
clap.workspace = true
dashmap = "5.5.3"
deadpool = "0.10.0"
deadpool-redis = "0.13.0"
deadpool = "0.12.2"
deadpool-redis = "0.18.0"
futures-util = { version = "0.3.28", features = ["sink"] }
hmac = "0.12.1"
hyper = { version = "0.14.27", features = ["full"] }
hyper = { version = "1.6.0", features = ["full"] }
parking_lot = "0.12.1"
prost.workspace = true
rand.workspace = true
redis = { version = "0.23.3", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
redis = { version = "0.27.6", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
serde.workspace = true
sha2 = "0.10.7"
sshx-core.workspace = true
subtle = "2.5.0"
tokio.workspace = true
tokio-stream.workspace = true
tokio-tungstenite = "0.20.0"
tokio-tungstenite = "0.26.1"
tonic.workspace = true
tonic-reflection = "0.11.0"
tonic-reflection = "0.12.3"
tower = { version = "0.4.13", features = ["steer"] }
tower-http = { version = "0.4.4", features = ["fs", "redirect", "trace"] }
tower-http = { version = "0.6.2", features = ["fs", "redirect", "trace"] }
tracing.workspace = true
tracing-subscriber.workspace = true
zstd = "0.12.4"

[dev-dependencies]
reqwest = { version = "0.11.20", default-features = false, features = ["rustls-tls"] }
reqwest = { version = "0.12.12", default-features = false, features = ["rustls-tls"] }
sshx = { path = "../sshx" }
24 changes: 19 additions & 5 deletions crates/sshx-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
#![forbid(unsafe_code)]
#![warn(missing_docs)]

use std::{net::SocketAddr, sync::Arc};
use std::{fmt::Debug, net::SocketAddr, sync::Arc};

use anyhow::Result;
use hyper::server::conn::AddrIncoming;
use axum::serve::{Listener, ListenerExt};
use tokio::net::TcpListener;
use tracing::debug;
use utils::Shutdown;

use crate::state::ServerState;
Expand Down Expand Up @@ -65,7 +67,11 @@ impl Server {
}

/// Run the application server, listening on a stream of connections.
pub async fn listen(&self, incoming: AddrIncoming) -> Result<()> {
pub async fn listen<L>(&self, listener: L) -> Result<()>
where
L: Listener,
L::Addr: Debug,
{
let state = self.state.clone();
let terminated = self.shutdown.wait();
tokio::spawn(async move {
Expand All @@ -79,12 +85,20 @@ impl Server {
}
});

listen::start_server(self.state(), incoming, self.shutdown.wait()).await
listen::start_server(self.state(), listener, self.shutdown.wait()).await
}

/// Convenience function to call [`Server::listen`] bound to a TCP address.
///
/// This also sets `TCP_NODELAY` on the incoming connections for performance
/// reasons, as a reasonable default.
pub async fn bind(&self, addr: &SocketAddr) -> Result<()> {
self.listen(AddrIncoming::bind(addr)?).await
let listener = TcpListener::bind(addr).await?.tap_io(|tcp_stream| {
if let Err(err) = tcp_stream.set_nodelay(true) {
debug!("failed to set TCP_NODELAY on incoming connection: {err:#}");
}
});
self.listen(listener).await
}

/// Send a graceful shutdown signal to the server.
Expand Down
56 changes: 24 additions & 32 deletions crates/sshx-server/src/listen.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use std::{error::Error as StdError, future::Future, sync::Arc};
use std::{fmt::Debug, future::Future, sync::Arc};

use anyhow::Result;
use axum::body::HttpBody;
use hyper::{
header::CONTENT_TYPE,
server::{conn::AddrIncoming, Server as HyperServer},
service::make_service_fn,
Body, Request,
};
use axum::body::Body;
use axum::serve::Listener;
use hyper::{header::CONTENT_TYPE, Request};
use sshx_core::proto::{sshx_service_server::SshxServiceServer, FILE_DESCRIPTOR_SET};
use tonic::transport::Server as TonicServer;
use tower::{steer::Steer, ServiceBuilder, ServiceExt};
use tonic::service::Routes as TonicRoutes;
use tower::{make::Shared, steer::Steer, ServiceExt};
use tower_http::trace::TraceLayer;

use crate::{grpc::GrpcServer, web, ServerState};
Expand All @@ -19,33 +15,34 @@ use crate::{grpc::GrpcServer, web, ServerState};
///
/// This internal method is responsible for multiplexing the HTTP and gRPC
/// servers onto a single, consolidated `hyper` service.
pub(crate) async fn start_server(
pub(crate) async fn start_server<L>(
state: Arc<ServerState>,
incoming: AddrIncoming,
signal: impl Future<Output = ()>,
) -> Result<()> {
type BoxError = Box<dyn StdError + Send + Sync>;

listener: L,
signal: impl Future<Output = ()> + Send + 'static,
) -> Result<()>
where
L: Listener,
L::Addr: Debug,
{
let http_service = web::app()
.with_state(state.clone())
.layer(TraceLayer::new_for_http())
.map_response(|r| r.map(|b| b.map_err(BoxError::from).boxed_unsync()))
.map_err(BoxError::from)
.into_service()
.boxed_clone();

let grpc_service = TonicServer::builder()
let grpc_service = TonicRoutes::default()
.add_service(SshxServiceServer::new(GrpcServer::new(state)))
.add_service(
tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build()?,
.build_v1()?,
)
.into_service();

let grpc_service = ServiceBuilder::new()
.into_axum_router()
.layer(TraceLayer::new_for_grpc())
.service(grpc_service)
.map_response(|r| r.map(|b| b.map_err(BoxError::from).boxed_unsync()))
.into_service()
// This type conversion is necessary because Tonic 0.12 uses Axum 0.7, so its `axum::Router`
// and `axum::Body` are based on an older `axum_core` version.
.map_response(|r| r.map(Body::new))
.boxed_clone();

let svc = Steer::new(
Expand All @@ -58,14 +55,9 @@ pub(crate) async fn start_server(
}
},
);
let make_svc = make_service_fn(move |_| {
let svc = svc.clone();
async { Ok::<_, std::convert::Infallible>(svc) }
});
let make_svc = Shared::new(svc);

HyperServer::builder(incoming)
.tcp_nodelay(true)
.serve(make_svc)
axum::serve(listener, make_svc)
.with_graceful_shutdown(signal)
.await?;

Expand Down
19 changes: 14 additions & 5 deletions crates/sshx-server/src/state/mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::{pin::pin, sync::Arc, time::Duration};

use anyhow::Result;
use deadpool::managed::Manager;
use redis::AsyncCommands;
use tokio::time;
use tokio_stream::{Stream, StreamExt};
Expand All @@ -19,7 +18,7 @@ const STORAGE_EXPIRY: Duration = Duration::from_secs(300);

fn set_opts() -> redis::SetOptions {
redis::SetOptions::default()
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as usize))
.with_expiration(redis::SetExpiry::PX(STORAGE_EXPIRY.as_millis() as u64))
}

/// Communication with a distributed mesh of sshx server nodes.
Expand All @@ -33,6 +32,7 @@ fn set_opts() -> redis::SetOptions {
#[derive(Clone)]
pub struct StorageMesh {
redis: deadpool_redis::Pool,
redis_pubsub: redis::Client,
host: Option<String>,
}

Expand All @@ -46,8 +46,18 @@ impl StorageMesh {
.runtime(deadpool_redis::Runtime::Tokio1)
.build()?;

// Separate `redis::Client` just for pub/sub connections.
//
// At time of writing, deadpool-redis has not been updated to support the new
// pub/sub client APIs in Rust. This is a temporary workaround that creates a
// new Redis client on the side, bypassing the connection pool.
//
// Reference: https://github.com/deadpool-rs/deadpool/issues/226
let redis_pubsub = redis::Client::open(redis_url)?;

Ok(Self {
redis,
redis_pubsub,
host: host.map(|s| s.to_string()),
})
}
Expand Down Expand Up @@ -161,15 +171,14 @@ impl StorageMesh {

loop {
// Requires an owned, non-pool connection for ownership reasons.
let conn = match self.redis.manager().create().await {
Ok(conn) => conn,
let mut pubsub = match self.redis_pubsub.get_async_pubsub().await {
Ok(pubsub) => pubsub,
Err(err) => {
error!(?err, "failed to connect to redis for pub/sub");
time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let mut pubsub = conn.into_pubsub();
if let Err(err) = pubsub.subscribe(format!("transfers:{host}")).await {
error!(?err, "failed to subscribe to transfers");
time::sleep(Duration::from_secs(1)).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/sshx-server/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ pub fn app() -> Router<Arc<ServerState>> {

/// Routes for the backend web API server.
fn backend() -> Router<Arc<ServerState>> {
Router::new().route("/s/:name", get(socket::get_session_ws))
Router::new().route("/s/{name}", get(socket::get_session_ws))
}
10 changes: 5 additions & 5 deletions crates/sshx-server/src/web/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn handle_socket(socket: &mut WebSocket, session: Arc<Session>) -> Result<
async fn send(socket: &mut WebSocket, msg: WsServer) -> Result<()> {
let mut buf = Vec::new();
ciborium::ser::into_writer(&msg, &mut buf)?;
socket.send(Message::Binary(buf)).await?;
socket.send(Message::Binary(Bytes::from(buf))).await?;
Ok(())
}

Expand Down Expand Up @@ -265,12 +265,12 @@ async fn proxy_redirect(socket: &mut WebSocket, host: &str, name: &str) -> Resul
tokio::select! {
Some(client_msg) = socket.recv() => {
let msg = match client_msg {
Ok(Message::Text(s)) => Some(TMessage::Text(s)),
Ok(Message::Text(s)) => Some(TMessage::Text(s.as_str().into())),
Ok(Message::Binary(b)) => Some(TMessage::Binary(b)),
Ok(Message::Close(frame)) => {
let frame = frame.map(|frame| TCloseFrame {
code: frame.code.into(),
reason: frame.reason,
reason: frame.reason.as_str().into(),
});
Some(TMessage::Close(frame))
}
Expand All @@ -285,12 +285,12 @@ async fn proxy_redirect(socket: &mut WebSocket, host: &str, name: &str) -> Resul
}
Some(server_msg) = upstream.next() => {
let msg = match server_msg {
Ok(TMessage::Text(s)) => Some(Message::Text(s)),
Ok(TMessage::Text(s)) => Some(Message::Text(s.as_str().into())),
Ok(TMessage::Binary(b)) => Some(Message::Binary(b)),
Ok(TMessage::Close(frame)) => {
let frame = frame.map(|frame| CloseFrame {
code: frame.code.into(),
reason: frame.reason,
reason: frame.reason.as_str().into(),
});
Some(Message::Close(frame))
}
Expand Down
11 changes: 7 additions & 4 deletions crates/sshx-server/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{ensure, Result};
use axum::serve::ListenerExt;
use futures_util::{SinkExt, StreamExt};
use hyper::{server::conn::AddrIncoming, StatusCode};
use hyper::StatusCode;
use sshx::encrypt::Encrypt;
use sshx_core::proto::sshx_service_client::SshxServiceClient;
use sshx_core::{Sid, Uid};
Expand Down Expand Up @@ -34,12 +35,14 @@ impl TestServer {
let listener = TcpListener::bind("[::1]:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();

let incoming = AddrIncoming::from_listener(listener).unwrap();
let server = Arc::new(Server::new(Default::default()).unwrap());
{
let server = Arc::clone(&server);
let listener = listener.tap_io(|tcp_stream| {
_ = tcp_stream.set_nodelay(true);
});
tokio::spawn(async move {
server.listen(incoming).await.unwrap();
server.listen(listener).await.unwrap();
});
}

Expand Down Expand Up @@ -124,7 +127,7 @@ impl ClientSocket {
pub async fn send(&mut self, msg: WsClient) {
let mut buf = Vec::new();
ciborium::ser::into_writer(&msg, &mut buf).unwrap();
self.inner.send(Message::Binary(buf)).await.unwrap();
self.inner.send(Message::Binary(buf.into())).await.unwrap();
}

pub async fn send_input(&mut self, id: Sid, data: &[u8]) {
Expand Down