Skip to content

Commit b189c34

Browse files
author
Leonidas Loucas
committed
test: Add first test, and timeout so we dont hang on bad test
1 parent 4d2f7da commit b189c34

File tree

4 files changed

+130
-4
lines changed

4 files changed

+130
-4
lines changed

rust/lib/srpc/client/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@ path = "examples/rust_client_example2.rs"
4343
required-features = []
4444

4545
[dev-dependencies]
46-
tokio = { version = "1", features = ["full", "macros"] }
46+
rstest = "0.23.0"
47+
test-log = { version = "0.2.16", features = ["trace", "color"] }
48+
tokio = { version = "1", features = ["full", "macros", "test-util"] }

rust/lib/srpc/client/src/lib.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::error::Error;
66
use std::fmt;
77
use std::pin::Pin;
88
use std::sync::Arc;
9-
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
9+
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
1010
use tokio::net::TcpStream;
1111
use tokio::sync::{mpsc, Mutex};
1212
use tokio::time::{timeout, Duration};
@@ -15,6 +15,8 @@ use tokio_util::codec::{FramedRead, LinesCodec};
1515
use tracing::debug;
1616

1717
mod chunk_limiter;
18+
#[cfg(test)]
19+
mod tests;
1820

1921
// Custom error type
2022
#[derive(Debug)]
@@ -40,13 +42,19 @@ pub struct ClientConfig {
4042
pub struct ReceiveOptions {
4143
channel_buffer_size: usize,
4244
max_chunk_size: usize,
45+
read_next_line_duration: Duration,
4346
}
4447

4548
impl ReceiveOptions {
46-
pub fn new(channel_buffer_size: usize, max_chunk_size: usize) -> Self {
49+
pub fn new(
50+
channel_buffer_size: usize,
51+
max_chunk_size: usize,
52+
read_next_line_duration: Duration,
53+
) -> Self {
4754
ReceiveOptions {
4855
channel_buffer_size,
4956
max_chunk_size,
57+
read_next_line_duration,
5058
}
5159
}
5260
}
@@ -56,6 +64,7 @@ impl Default for ReceiveOptions {
5664
ReceiveOptions {
5765
channel_buffer_size: 100,
5866
max_chunk_size: 16384,
67+
read_next_line_duration: Duration::from_secs(10),
5968
}
6069
}
6170
}
@@ -251,14 +260,15 @@ where
251260
let stream = Arc::clone(&self.stream);
252261
let (tx, rx) = mpsc::channel(opts.channel_buffer_size);
253262
let max_chunk_size = opts.max_chunk_size;
263+
let read_next_line_duration = opts.read_next_line_duration;
254264

255265
tokio::spawn(async move {
256266
let mut guard = stream.lock().await;
257267
let limited_reader = ChunkLimiter::new(&mut *guard, max_chunk_size);
258268
let buf_reader = BufReader::new(limited_reader);
259269
let mut framed = FramedRead::new(buf_reader, LinesCodec::new());
260270

261-
while let Some(line_res) = framed.next().await {
271+
while let Ok(Some(line_res)) = timeout(read_next_line_duration, framed.next()).await {
262272
let line_res = line_res.map_err(|e| Box::new(e) as Box<dyn Error + Send>);
263273

264274
match line_res {

rust/lib/srpc/client/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mod lib;
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::{error::Error, num::NonZeroU8};
2+
3+
use crate::{ClientConfig, ConnectedClient, ReceiveOptions};
4+
5+
use rstest::rstest;
6+
use tokio::{
7+
io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
8+
sync::mpsc,
9+
};
10+
11+
fn setup_test_client() -> (ConnectedClient<DuplexStream>, DuplexStream) {
12+
let (client_stream, server_stream) = duplex(1024);
13+
14+
let config = ClientConfig::new("example.com", 443, "/", "", "");
15+
(ConnectedClient::new(config, client_stream), server_stream)
16+
}
17+
18+
fn n_message(num: NonZeroU8) -> impl FnMut(&str) -> bool {
19+
let mut seen = 0;
20+
move |_msg: &str| {
21+
if seen + 1 == num.get() {
22+
false
23+
} else {
24+
seen += 1;
25+
true
26+
}
27+
}
28+
}
29+
30+
fn one_message() -> impl FnMut(&str) -> bool {
31+
n_message(NonZeroU8::new(1).unwrap())
32+
}
33+
34+
async fn check_message(
35+
server_message: &str,
36+
rx: &mut mpsc::Receiver<Result<String, Box<dyn Error + Send>>>,
37+
) {
38+
if let Some(Ok(received_msg)) = rx.recv().await {
39+
assert_eq!(received_msg, server_message.trim());
40+
} else {
41+
panic!("Did not receive expected message from server");
42+
}
43+
}
44+
45+
async fn check_server(
46+
client_message: &str,
47+
server_stream: &mut DuplexStream,
48+
) -> Result<(), Box<dyn Error>> {
49+
let mut server_buf = vec![0u8; client_message.len()];
50+
server_stream.read_exact(&mut server_buf).await?;
51+
assert_eq!(&server_buf, client_message.as_bytes());
52+
Ok(())
53+
}
54+
55+
#[test_log::test(rstest)]
56+
#[tokio::test(start_paused = true)]
57+
async fn test_connected_client_send_and_receive() -> Result<(), Box<dyn Error>> {
58+
let (connected_client, mut server_stream) = setup_test_client();
59+
60+
let client_message = "Hello from client\n";
61+
connected_client.send_message(client_message).await?;
62+
63+
check_server(client_message, &mut server_stream).await?;
64+
65+
let server_message = "Hello from server\n";
66+
server_stream.write_all(server_message.as_bytes()).await?;
67+
68+
let should_continue = one_message();
69+
70+
let opts = ReceiveOptions::default();
71+
let mut rx = connected_client
72+
.receive_message(false, should_continue, &opts)
73+
.await?;
74+
75+
check_message(server_message, &mut rx).await;
76+
77+
Ok(())
78+
}
79+
80+
#[test_log::test(rstest)]
81+
#[tokio::test(start_paused = true)]
82+
async fn test_connected_client_send_and_receive_stream() -> Result<(), Box<dyn Error>> {
83+
let (connected_client, mut server_stream) = setup_test_client();
84+
85+
let client_message = "Hello from client\n";
86+
connected_client.send_message(client_message).await?;
87+
88+
check_server(client_message, &mut server_stream).await?;
89+
90+
server_stream.write_all("\n".as_bytes()).await?;
91+
92+
let should_continue = one_message();
93+
94+
let opts = ReceiveOptions::default();
95+
let mut rx = connected_client
96+
.receive_message(true, should_continue, &opts)
97+
.await?;
98+
99+
check_message("", &mut rx).await;
100+
101+
server_stream.write_all("first\n".as_bytes()).await?;
102+
103+
server_stream.write_all("second\n".as_bytes()).await?;
104+
105+
let should_continue = n_message(NonZeroU8::new(2).unwrap());
106+
let mut rx = connected_client
107+
.receive_message(false, should_continue, &opts)
108+
.await?;
109+
110+
check_message("first", &mut rx).await;
111+
check_message("second", &mut rx).await;
112+
Ok(())
113+
}

0 commit comments

Comments
 (0)