Skip to content

Commit 53e0055

Browse files
authored
Implement server recovery and peer routing (#15)
* Add snapshot support and Redis client * Add simple snapshotting test * Server discovery, peer routing, and recovery * Add immediate persistence and inactive expiry * Decrease shell data snapshot to 32 KiB * Enable Redis TLS (a couple duplicate dependencies) * Edit background sync to respect sync_now()
1 parent 85e3d07 commit 53e0055

30 files changed

+1029
-275
lines changed

Cargo.lock

+165-103
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ rand = "0.8.5"
1010
serde = { version = "1.0.188", features = ["derive", "rc"] }
1111
tokio = { version = "1.32.0", features = ["full"] }
1212
tokio-stream = { version = "0.1.14", features = ["sync"] }
13-
tonic = { version = "0.10.0", features = ["tls", "tls-roots"] }
13+
tonic = { version = "0.10.0", features = ["tls", "tls-webpki-roots"] }
1414
tracing = "0.1.37"
1515
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
1616

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ FROM alpine:latest
1616
WORKDIR /root
1717
COPY --from=frontend /usr/src/app/build build
1818
COPY --from=backend /home/rust/src/target/release/sshx-server .
19-
CMD ["./sshx-server", "--host"]
19+
CMD ["./sshx-server", "--listen", "::"]

README.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,19 @@ This will compile the `sshx` binary and place it in your `~/.cargo/bin` folder.
6868

6969
### Workflow
7070

71+
First, start service containers for development.
72+
73+
```shell
74+
docker compose up -d
75+
```
76+
7177
Install [Rust 1.70+](https://www.rust-lang.org/),
7278
[Node v18](https://nodejs.org/), [NPM v9](https://www.npmjs.com/), and
7379
[mprocs](https://github.com/pvolok/mprocs). Then, run
7480

7581
```shell
76-
$ npm install
77-
$ mprocs
82+
npm install
83+
mprocs
7884
```
7985

8086
This will compile and start the server, an instance of the client, and the web

compose.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Services used by sshx for development. These listen on ports 126XX, to reduce the chance that they
2+
# conflict with other processes.
3+
#
4+
# You can start them with `docker compose up -d`.
5+
6+
services:
7+
redis:
8+
image: bitnami/redis:7.2
9+
environment:
10+
- ALLOW_EMPTY_PASSWORD=yes
11+
ports:
12+
- 127.0.0.1:12601:6379

crates/sshx-core/build.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
44
let descriptor_path = PathBuf::from(env::var("OUT_DIR").unwrap()).join("sshx.bin");
55
tonic_build::configure()
66
.file_descriptor_set_path(descriptor_path)
7+
.bytes(["."])
78
.compile(&["proto/sshx.proto"], &["proto/"])?;
89
Ok(())
910
}

crates/sshx-core/proto/sshx.proto

+28-10
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,22 @@ message NewShell {
6464
// Bidirectional streaming update from the client.
6565
message ClientUpdate {
6666
oneof client_message {
67-
string hello = 1; // First stream message: "name,token".
68-
TerminalData data = 2; // Stream data from the terminal.
67+
string hello = 1; // First stream message: "name,token".
68+
TerminalData data = 2; // Stream data from the terminal.
6969
NewShell created_shell = 3; // Acknowledge that a new shell was created.
70-
uint32 closed_shell = 4; // Acknowledge that a shell was closed.
70+
uint32 closed_shell = 4; // Acknowledge that a shell was closed.
7171
string error = 15;
7272
}
7373
}
7474

7575
// Bidirectional streaming update from the server.
7676
message ServerUpdate {
7777
oneof server_message {
78-
TerminalInput input = 1; // Remote input bytes, received from the user.
79-
NewShell create_shell = 2; // ID of a new shell.
80-
uint32 close_shell = 3; // ID of a shell to close.
81-
SequenceNumbers sync = 4; // Periodic sequence number sync.
82-
TerminalSize resize = 5; // Resize a terminal window.
78+
TerminalInput input = 1; // Remote input bytes, received from the user.
79+
NewShell create_shell = 2; // ID of a new shell.
80+
uint32 close_shell = 3; // ID of a shell to close.
81+
SequenceNumbers sync = 4; // Periodic sequence number sync.
82+
TerminalSize resize = 5; // Resize a terminal window.
8383
string error = 15;
8484
}
8585
}
@@ -91,6 +91,24 @@ message CloseRequest {
9191
}
9292

9393
// Server response to closing a session.
94-
message CloseResponse {
95-
bool exists = 1; // True if the session was found and closed.
94+
message CloseResponse {}
95+
96+
// Snapshot of a session, used to restore state for persistence across servers.
97+
message SerializedSession {
98+
bytes encrypted_zeros = 1;
99+
map<uint32, SerializedShell> shells = 2;
100+
uint32 next_sid = 3;
101+
uint32 next_uid = 4;
102+
}
103+
104+
message SerializedShell {
105+
uint64 seqnum = 1;
106+
repeated bytes data = 2;
107+
uint64 chunk_offset = 3;
108+
uint64 byte_offset = 4;
109+
bool closed = 5;
110+
int32 winsize_x = 6;
111+
int32 winsize_y = 7;
112+
uint32 winsize_rows = 8;
113+
uint32 winsize_cols = 9;
96114
}

crates/sshx-core/src/lib.rs

+14
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,18 @@ impl IdCounter {
7676
pub fn next_uid(&self) -> Uid {
7777
Uid(self.next_uid.fetch_add(1, Ordering::Relaxed))
7878
}
79+
80+
/// Return the current internal values of the counter.
81+
pub fn get_current_values(&self) -> (Sid, Uid) {
82+
(
83+
Sid(self.next_sid.load(Ordering::Relaxed)),
84+
Uid(self.next_uid.load(Ordering::Relaxed)),
85+
)
86+
}
87+
88+
/// Set the internal values of the counter.
89+
pub fn set_current_values(&self, sid: Sid, uid: Uid) {
90+
self.next_sid.store(sid.0, Ordering::Relaxed);
91+
self.next_uid.store(uid.0, Ordering::Relaxed);
92+
}
7993
}

crates/sshx-server/Cargo.toml

+8-3
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,29 @@ bytes = { version = "1.5.0", features = ["serde"] }
1313
ciborium = "0.2.1"
1414
clap.workspace = true
1515
dashmap = "5.5.3"
16+
deadpool = "0.10.0"
17+
deadpool-redis = "0.13.0"
18+
futures-util = { version = "0.3.28", features = ["sink"] }
1619
hmac = "0.12.1"
1720
hyper = { version = "0.14.27", features = ["full"] }
1821
parking_lot = "0.12.1"
22+
prost.workspace = true
1923
rand.workspace = true
24+
redis = { version = "0.23.3", features = ["tokio-rustls-comp", "tls-rustls-webpki-roots"] }
2025
serde.workspace = true
2126
sha2 = "0.10.7"
2227
sshx-core = { path = "../sshx-core" }
2328
tokio.workspace = true
2429
tokio-stream.workspace = true
30+
tokio-tungstenite = "0.20.0"
2531
tonic.workspace = true
2632
tonic-reflection = "0.10.0"
2733
tower = { version = "0.4.13", features = ["steer"] }
2834
tower-http = { version = "0.4.4", features = ["fs", "redirect", "trace"] }
2935
tracing.workspace = true
3036
tracing-subscriber.workspace = true
37+
zstd = "0.12.4"
3138

3239
[dev-dependencies]
33-
futures-util = { version = "0.3.28", features = ["sink"] }
34-
reqwest = "0.11.20"
40+
reqwest = { version = "0.11.20", default-features = false, features = ["rustls-tls"] }
3541
sshx = { path = "../sshx" }
36-
tokio-tungstenite = "0.20.0"

crates/sshx-server/src/grpc.rs

+24-28
Original file line numberDiff line numberDiff line change
@@ -40,28 +40,23 @@ impl SshxService for GrpcServer {
4040
type ChannelStream = ReceiverStream<Result<ServerUpdate, Status>>;
4141

4242
async fn open(&self, request: Request<OpenRequest>) -> RR<OpenResponse> {
43-
use dashmap::mapref::entry::Entry::*;
44-
4543
let request = request.into_inner();
46-
let origin = match &self.0.override_origin {
47-
Some(origin) => origin.clone(),
48-
None => request.origin,
49-
};
44+
let origin = self.0.override_origin().unwrap_or(request.origin);
5045
if origin.is_empty() {
5146
return Err(Status::invalid_argument("origin is empty"));
5247
}
5348
let name = rand_alphanumeric(10);
5449
info!(%name, "creating new session");
55-
match self.0.store.entry(name.clone()) {
56-
Occupied(_) => return Err(Status::already_exists("generated duplicate ID")),
57-
Vacant(v) => {
50+
match self.0.lookup(&name) {
51+
Some(_) => return Err(Status::already_exists("generated duplicate ID")),
52+
None => {
5853
let metadata = Metadata {
59-
encrypted_zeros: request.encrypted_zeros.into(),
54+
encrypted_zeros: request.encrypted_zeros,
6055
};
61-
v.insert(Session::new(metadata).into());
56+
self.0.insert(&name, Arc::new(Session::new(metadata)));
6257
}
6358
};
64-
let token = self.0.mac.clone().chain_update(&name).finalize();
59+
let token = self.0.mac().chain_update(&name).finalize();
6560
let url = format!("{origin}/s/{name}");
6661
Ok(Response::new(OpenResponse {
6762
name,
@@ -81,14 +76,18 @@ impl SshxService for GrpcServer {
8176
let (name, token) = hello
8277
.split_once(',')
8378
.ok_or_else(|| Status::invalid_argument("missing name and token"))?;
84-
validate_token(&self.0.mac, name, token)?;
79+
validate_token(self.0.mac(), name, token)?;
8580
name.to_string()
8681
}
8782
_ => return Err(Status::invalid_argument("invalid first message")),
8883
};
89-
let session = match self.0.store.get(&session_name) {
90-
Some(session) => Arc::clone(&session),
91-
None => return Err(Status::not_found("session not found")),
84+
let session = match self.0.backend_connect(&session_name).await {
85+
Ok(Some(session)) => session,
86+
Ok(None) => return Err(Status::not_found("session not found")),
87+
Err(err) => {
88+
error!(?err, "failed to connect to backend session");
89+
return Err(Status::internal(err.to_string()));
90+
}
9291
};
9392

9493
// We now spawn an asynchronous task that sends updates to the client. Note that
@@ -106,22 +105,19 @@ impl SshxService for GrpcServer {
106105

107106
async fn close(&self, request: Request<CloseRequest>) -> RR<CloseResponse> {
108107
let request = request.into_inner();
109-
validate_token(&self.0.mac, &request.name, &request.token)?;
110-
let exists = match self.0.store.remove(&request.name) {
111-
Some((_, session)) => {
112-
session.shutdown();
113-
true
114-
}
115-
None => false,
116-
};
117-
Ok(Response::new(CloseResponse { exists }))
108+
validate_token(self.0.mac(), &request.name, &request.token)?;
109+
if let Err(err) = self.0.close_session(&request.name).await {
110+
error!(?err, "failed to close session");
111+
return Err(Status::internal(err.to_string()));
112+
}
113+
Ok(Response::new(CloseResponse {}))
118114
}
119115
}
120116

121117
/// Validate the client token for a session.
122-
fn validate_token(mac: &(impl Mac + Clone), name: &str, token: &str) -> Result<(), Status> {
118+
fn validate_token(mac: impl Mac, name: &str, token: &str) -> Result<(), Status> {
123119
if let Ok(token) = BASE64_STANDARD.decode(token) {
124-
if mac.clone().chain_update(name).verify_slice(&token).is_ok() {
120+
if mac.chain_update(name).verify_slice(&token).is_ok() {
125121
return Ok(());
126122
}
127123
}
@@ -182,7 +178,7 @@ async fn handle_update(tx: &ServerTx, session: &Session, update: ClientUpdate) -
182178
return send_err(tx, "unexpected hello".into()).await;
183179
}
184180
Some(ClientMessage::Data(data)) => {
185-
if let Err(err) = session.add_data(Sid(data.id), data.data.into(), data.seq) {
181+
if let Err(err) = session.add_data(Sid(data.id), data.data, data.seq) {
186182
return send_err(tx, format!("add data: {:?}", err)).await;
187183
}
188184
}

crates/sshx-server/src/lib.rs

+26-40
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,15 @@
1515
use std::{net::SocketAddr, sync::Arc};
1616

1717
use anyhow::Result;
18-
use dashmap::DashMap;
19-
use hmac::{Hmac, Mac as _};
2018
use hyper::server::conn::AddrIncoming;
21-
use session::Session;
22-
use sha2::Sha256;
23-
use sshx_core::rand_alphanumeric;
2419
use utils::Shutdown;
2520

21+
use crate::state::ServerState;
22+
2623
pub mod grpc;
2724
mod listen;
2825
pub mod session;
26+
pub mod state;
2927
pub mod utils;
3028
pub mod web;
3129

@@ -38,30 +36,12 @@ pub struct ServerOptions {
3836

3937
/// Override the origin returned for the Open() RPC.
4038
pub override_origin: Option<String>,
41-
}
42-
43-
/// Shared state object for global server logic.
44-
pub struct ServerState {
45-
/// Message authentication code for signing tokens.
46-
pub mac: Hmac<Sha256>,
47-
48-
/// Override the origin returned for the Open() RPC.
49-
pub override_origin: Option<String>,
5039

51-
/// A concurrent map of session IDs to session objects.
52-
pub store: DashMap<String, Arc<Session>>,
53-
}
40+
/// URL of the Redis server that stores session data.
41+
pub redis_url: Option<String>,
5442

55-
impl ServerState {
56-
/// Create an empty server state using the given secret.
57-
pub fn new(options: ServerOptions) -> Self {
58-
let secret = options.secret.unwrap_or_else(|| rand_alphanumeric(22));
59-
Self {
60-
mac: Hmac::new_from_slice(secret.as_bytes()).unwrap(),
61-
override_origin: options.override_origin,
62-
store: DashMap::new(),
63-
}
64-
}
43+
/// Hostname of this server, if running multiple servers.
44+
pub host: Option<String>,
6545
}
6646

6747
/// Stateful object that manages the sshx server, with graceful termination.
@@ -72,26 +52,34 @@ pub struct Server {
7252

7353
impl Server {
7454
/// Create a new application server, but do not listen for connections yet.
75-
pub fn new(options: ServerOptions) -> Self {
76-
Self {
77-
state: Arc::new(ServerState::new(options)),
55+
pub fn new(options: ServerOptions) -> Result<Self> {
56+
Ok(Self {
57+
state: Arc::new(ServerState::new(options)?),
7858
shutdown: Shutdown::new(),
79-
}
59+
})
8060
}
8161

8262
/// Returns the server's state object.
8363
pub fn state(&self) -> Arc<ServerState> {
8464
Arc::clone(&self.state)
8565
}
8666

87-
/// Returns a future that resolves when the server is terminated.
88-
async fn terminated(&self) {
89-
self.shutdown.wait().await
90-
}
91-
9267
/// Run the application server, listening on a stream of connections.
9368
pub async fn listen(&self, incoming: AddrIncoming) -> Result<()> {
94-
listen::start_server(self.state(), incoming, self.terminated()).await
69+
let state = self.state.clone();
70+
let terminated = self.shutdown.wait();
71+
tokio::spawn(async move {
72+
let background_tasks = futures_util::future::join(
73+
state.listen_for_transfers(),
74+
state.close_old_sessions(),
75+
);
76+
tokio::select! {
77+
_ = terminated => {}
78+
_ = background_tasks => {}
79+
}
80+
});
81+
82+
listen::start_server(self.state(), incoming, self.shutdown.wait()).await
9583
}
9684

9785
/// Convenience function to call [`Server::listen`] bound to a TCP address.
@@ -104,8 +92,6 @@ impl Server {
10492
// Stop receiving new network connections.
10593
self.shutdown.shutdown();
10694
// Terminate each of the existing sessions.
107-
for entry in &self.state.store {
108-
entry.value().shutdown();
109-
}
95+
self.state.shutdown();
11096
}
11197
}

0 commit comments

Comments
 (0)