Skip to content

Commit 6c39b4b

Browse files
committed
reduce boilerplate on response channels
1 parent 25368e6 commit 6c39b4b

File tree

4 files changed

+176
-143
lines changed

4 files changed

+176
-143
lines changed

websockets/chat-actorless/src/command.rs

Lines changed: 0 additions & 32 deletions
This file was deleted.

websockets/chat-actorless/src/handler.rs

Lines changed: 23 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,9 @@ use futures_util::{
55
future::{select, Either},
66
StreamExt as _,
77
};
8-
use tokio::{
9-
pin,
10-
sync::{
11-
mpsc::{self, UnboundedSender},
12-
oneshot,
13-
},
14-
time::interval,
15-
};
8+
use tokio::{pin, sync::mpsc, time::interval};
169

17-
use crate::{Command, ConnId, RoomId};
10+
use crate::{ChatServerHandle, ConnId};
1811

1912
/// How often heartbeat pings are sent
2013
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
@@ -25,27 +18,20 @@ const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
2518
/// Echo text & binary messages received from the client, respond to ping messages, and monitor
2619
/// connection health to detect network issues and free up resources.
2720
pub async fn chat_ws(
28-
server_tx: UnboundedSender<Command>,
21+
chat_server: ChatServerHandle,
2922
mut session: actix_ws::Session,
3023
mut msg_stream: actix_ws::MessageStream,
3124
) {
3225
log::info!("connected");
3326

3427
let mut name = None;
35-
let mut room = "main".to_owned();
3628
let mut last_heartbeat = Instant::now();
3729
let mut interval = interval(HEARTBEAT_INTERVAL);
3830

3931
let (conn_tx, mut conn_rx) = mpsc::unbounded_channel();
40-
let (res_tx, res_rx) = oneshot::channel();
4132

4233
// unwrap: chat server is not dropped before the HTTP server
43-
server_tx
44-
.send(Command::Connect { conn_tx, res_tx })
45-
.unwrap();
46-
47-
// unwrap: chat server does not drop our response channel
48-
let conn_id = res_rx.await.unwrap();
34+
let conn_id = chat_server.connect(conn_tx).await;
4935

5036
let close_reason = loop {
5137
// most of the futures we process need to be stack-pinned to work with select()
@@ -56,6 +42,7 @@ pub async fn chat_ws(
5642
let msg_rx = conn_rx.recv();
5743
pin!(msg_rx);
5844

45+
// TODO: nested select is pretty gross for readability on the match
5946
let messages = select(msg_stream.next(), msg_rx);
6047
pin!(messages);
6148

@@ -76,15 +63,8 @@ pub async fn chat_ws(
7663
}
7764

7865
Message::Text(text) => {
79-
process_text_msg(
80-
&server_tx,
81-
&mut session,
82-
&text,
83-
conn_id,
84-
&mut room,
85-
&mut name,
86-
)
87-
.await;
66+
process_text_msg(&chat_server, &mut session, &text, conn_id, &mut name)
67+
.await;
8868
}
8969

9070
Message::Binary(_bin) => {
@@ -113,8 +93,10 @@ pub async fn chat_ws(
11393
session.text(chat_msg).await.unwrap();
11494
}
11595

116-
// all connection's msg senders were dropped
117-
Either::Left((Either::Right((None, _)), _)) => unreachable!(),
96+
// all connection's message senders were dropped
97+
Either::Left((Either::Right((None, _)), _)) => unreachable!(
98+
"all connection message senders were dropped; chat server may have panicked"
99+
),
118100

119101
// heartbeat internal tick
120102
Either::Right((_inst, _)) => {
@@ -132,16 +114,17 @@ pub async fn chat_ws(
132114
};
133115
};
134116

117+
chat_server.disconnect(conn_id);
118+
135119
// attempt to close connection gracefully
136120
let _ = session.close(close_reason).await;
137121
}
138122

139123
async fn process_text_msg(
140-
server_tx: &UnboundedSender<Command>,
124+
chat_server: &ChatServerHandle,
141125
session: &mut actix_ws::Session,
142126
text: &str,
143127
conn: ConnId,
144-
room: &mut RoomId,
145128
name: &mut Option<String>,
146129
) {
147130
// strip leading and trailing whitespace (spaces, newlines, etc.)
@@ -154,46 +137,32 @@ async fn process_text_msg(
154137
// unwrap: we have guaranteed non-zero string length already
155138
match cmd_args.next().unwrap() {
156139
"/list" => {
157-
// Send ListRooms message to chat server and wait for
158-
// response
159-
log::info!("List rooms");
140+
log::info!("conn {conn}: listing rooms");
160141

161-
let (res_tx, res_rx) = oneshot::channel();
162-
server_tx.send(Command::List { res_tx }).unwrap();
163-
// unwrap: chat server does not drop our response channel
164-
let rooms = res_rx.await.unwrap();
142+
let rooms = chat_server.list_rooms().await;
165143

166144
for room in rooms {
167145
session.text(room).await.unwrap();
168146
}
169147
}
170148

171149
"/join" => match cmd_args.next() {
172-
Some(room_id) => {
173-
*room = room_id.to_owned();
150+
Some(room) => {
151+
log::info!("conn {conn}: joining room {room}");
174152

175-
let (res_tx, res_rx) = oneshot::channel();
153+
chat_server.join_room(conn, room).await;
176154

177-
server_tx
178-
.send(Command::Join {
179-
conn,
180-
room: room.clone(),
181-
res_tx,
182-
})
183-
.unwrap();
184-
185-
// unwrap: chat server does not drop our response channel
186-
res_rx.await.unwrap();
187-
188-
session.text(format!("joined {room_id}")).await.unwrap();
155+
session.text(format!("joined {room}")).await.unwrap();
189156
}
157+
190158
None => {
191159
session.text("!!! room name is required").await.unwrap();
192160
}
193161
},
194162

195163
"/name" => match cmd_args.next() {
196164
Some(new_name) => {
165+
log::info!("conn {conn}: setting name to: {new_name}");
197166
name.replace(new_name.to_owned());
198167
}
199168
None => {
@@ -215,20 +184,6 @@ async fn process_text_msg(
215184
None => msg.to_owned(),
216185
};
217186

218-
let (res_tx, res_rx) = oneshot::channel();
219-
220-
// send message to chat server
221-
server_tx
222-
.send(Command::Message {
223-
msg,
224-
room: room.clone(),
225-
skip: conn,
226-
res_tx,
227-
})
228-
// unwrap: chat server is not dropped before the HTTP server
229-
.unwrap();
230-
231-
// unwrap: chat server does not drop our response channel
232-
res_rx.await.unwrap();
187+
chat_server.send_message(conn, msg).await
233188
}
234189
}

websockets/chat-actorless/src/main.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@
55
use actix_files::NamedFile;
66
use actix_web::{middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
77
use tokio::{
8-
sync::mpsc::UnboundedSender,
98
task::{spawn, spawn_local},
109
try_join,
1110
};
1211

13-
mod command;
1412
mod handler;
1513
mod server;
1614

17-
pub use self::command::Command;
18-
pub use self::server::ChatServer;
15+
pub use self::server::{ChatServer, ChatServerHandle};
1916

2017
/// Connection ID.
2118
pub type ConnId = usize;
@@ -34,12 +31,16 @@ async fn index() -> impl Responder {
3431
async fn chat_ws(
3532
req: HttpRequest,
3633
stream: web::Payload,
37-
server_tx: web::Data<UnboundedSender<Command>>,
34+
chat_server: web::Data<ChatServerHandle>,
3835
) -> Result<HttpResponse, Error> {
3936
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
4037

4138
// spawn websocket handler (and don't await it) so that the response is returned immediately
42-
spawn_local(handler::chat_ws((**server_tx).clone(), session, msg_stream));
39+
spawn_local(handler::chat_ws(
40+
(**chat_server).clone(),
41+
session,
42+
msg_stream,
43+
));
4344

4445
Ok(res)
4546
}

0 commit comments

Comments
 (0)