Skip to content

Commit 0161345

Browse files
committed
Upgrade to winnow 0.6
1 parent f47da53 commit 0161345

File tree

3 files changed

+168
-11
lines changed

3 files changed

+168
-11
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ anyhow = "1.0"
1818
futures = "0.3"
1919
tokio = { version = "1", features = ["net"] }
2020
tokio-util = { version = "0.7", features = ["codec"] }
21-
winnow = "0.5"
21+
winnow = "0.6"
2222

2323
[dev-dependencies]
2424
tokio = { version = "1", features = ["time", "macros", "rt-multi-thread"] }

src/client.rs

Lines changed: 159 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
use bytes::BytesMut;
1+
use bytes::{Buf, BytesMut};
22
use futures::prelude::*;
33
use futures::sink::SinkExt;
44

55
use tokio::net::TcpStream;
66
use tokio_util::codec::{Decoder, Encoder, Framed};
77
use winnow::error::ErrMode;
8+
use winnow::stream::Offset;
9+
use winnow::Partial;
810

911
pub type ClientTransport = Framed<TcpStream, ClientCodec>;
1012

@@ -75,11 +77,14 @@ impl Decoder for ClientCodec {
7577
type Error = anyhow::Error;
7678

7779
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
78-
let item = match frame::parse_frame(&mut &src[..]) {
80+
let buf = &mut Partial::new(src.chunk());
81+
let item = match frame::parse_frame(buf) {
7982
Ok(frame) => Message::<FromServer>::from_frame(frame),
8083
Err(ErrMode::Incomplete(_)) => return Ok(None),
8184
Err(e) => bail!("Parse failed: {:?}", e),
8285
};
86+
let len = buf.offset_from(&Partial::new(src.chunk()));
87+
src.advance(len);
8388
item.map(Some)
8489
}
8590
}
@@ -96,3 +101,155 @@ impl Encoder<Message<ToServer>> for ClientCodec {
96101
Ok(())
97102
}
98103
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use std::time::Duration;
108+
109+
use futures::{future::ok, SinkExt, StreamExt, TryStreamExt};
110+
111+
use crate::{client, FromServer, ToServer};
112+
113+
// Test to send a message
114+
#[tokio::test]
115+
#[ignore]
116+
async fn test_session() {
117+
let mut conn = crate::client::connect(
118+
"localhost:61613",
119+
"/",
120+
//None,None
121+
Some("artemis".to_string()),
122+
Some("artemis".to_string()),
123+
)
124+
.await
125+
.expect("Default connection to localhost");
126+
let msg = crate::Message {
127+
content: ToServer::Send {
128+
destination: "/test/a".to_string(),
129+
transaction: None,
130+
headers: Some(vec![("header-a".to_string(), "value-a".to_string())]),
131+
body: Some("This is a test message".as_bytes().to_vec()),
132+
},
133+
extra_headers: vec![],
134+
};
135+
conn.send(msg).await.expect("Send a");
136+
let msg = crate::Message {
137+
content: ToServer::Send {
138+
destination: "/test/b".to_string(),
139+
transaction: None,
140+
headers: Some(vec![("header-b".to_string(), "value-b".to_string())]),
141+
body: Some("This is a another test message".as_bytes().to_vec()),
142+
},
143+
extra_headers: vec![],
144+
};
145+
conn.send(msg).await.expect("Send b");
146+
}
147+
148+
// Test to recieve a message
149+
#[tokio::test]
150+
#[ignore]
151+
async fn test_subscribe() {
152+
let sub_msg = crate::client::subscribe("/test/a", "tjo");
153+
let mut conn = crate::client::connect(
154+
"localhost:61613",
155+
"/",
156+
//None,None
157+
Some("artemis".to_string()),
158+
Some("artemis".to_string()),
159+
)
160+
.await
161+
.expect("Default connection to localhost");
162+
conn.send(sub_msg).await.expect("Send subscribe");
163+
let (_sink, stream) = conn.split();
164+
165+
let mut cnt = 0;
166+
let _ = stream
167+
.try_for_each(|item| {
168+
println!("==== {cnt}");
169+
cnt += 1;
170+
if let FromServer::Message { body, .. } = item.content {
171+
println!(
172+
"Message received: {:?}",
173+
String::from_utf8_lossy(&body.unwrap())
174+
);
175+
} else {
176+
println!("{:?}", item);
177+
}
178+
ok(())
179+
})
180+
.await;
181+
}
182+
183+
// Test to send and recieve message
184+
#[tokio::test]
185+
#[ignore]
186+
async fn test_send_subscribe() {
187+
let conn = client::connect(
188+
"127.0.0.1:61613",
189+
"/".to_string(),
190+
"artemis".to_string().into(),
191+
"artemis".to_string().into(),
192+
)
193+
.await
194+
.expect("Connect");
195+
196+
tokio::time::sleep(Duration::from_millis(200)).await;
197+
198+
let (mut sink, stream) = conn.split();
199+
200+
let fut1 = async move {
201+
sink.send(client::subscribe("rusty", "myid")).await?;
202+
println!("Subscribe sent");
203+
204+
tokio::time::sleep(Duration::from_millis(200)).await;
205+
206+
sink.send(
207+
ToServer::Send {
208+
destination: "rusty".into(),
209+
transaction: None,
210+
headers: None,
211+
body: Some(b"Hello there rustaceans!".to_vec()),
212+
}
213+
.into(),
214+
)
215+
.await?;
216+
println!("Message sent");
217+
218+
tokio::time::sleep(Duration::from_millis(200)).await;
219+
220+
sink.send(ToServer::Unsubscribe { id: "myid".into() }.into())
221+
.await?;
222+
println!("Unsubscribe sent");
223+
224+
tokio::time::sleep(Duration::from_millis(200)).await;
225+
226+
tokio::time::sleep(Duration::from_secs(1)).await;
227+
sink.send(ToServer::Disconnect { receipt: None }.into())
228+
.await?;
229+
println!("Disconnect sent");
230+
231+
Ok(())
232+
};
233+
234+
// Listen from the main thread. Once the Disconnect message is sent from
235+
// the sender thread, the server will disconnect the client and the future
236+
// will resolve, ending the program
237+
let fut2 = stream.try_for_each(|item| {
238+
if let FromServer::Message { body, .. } = item.content {
239+
println!(
240+
"Message received: {:?}",
241+
String::from_utf8_lossy(&body.unwrap())
242+
);
243+
} else {
244+
// println!("{:?}", item);
245+
}
246+
ok(())
247+
});
248+
249+
futures::future::select(Box::pin(fut1), Box::pin(fut2))
250+
.await
251+
.factor_first()
252+
.0
253+
.expect("Select");
254+
}
255+
}

src/frame.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use winnow::{
55
ascii::{alpha1, line_ending, till_line_ending},
66
combinator::{delimited, opt, repeat, separated_pair, terminated, trace},
77
token::{literal, take, take_till, take_until},
8-
PResult, Parser,
8+
PResult, Parser, Partial,
99
};
1010

1111
use std::borrow::Cow;
@@ -101,7 +101,7 @@ fn is_empty_slice(s: &[u8]) -> Option<&[u8]> {
101101
}
102102
}
103103

104-
pub(crate) fn parse_frame<'a>(input: &mut &'a [u8]) -> PResult<Frame<'a>> {
104+
pub fn parse_frame<'a>(input: &mut Partial<&'a [u8]>) -> PResult<Frame<'a>> {
105105
let (command, headers): (_, Vec<_>) = trace(
106106
"parse_frame",
107107
(
@@ -129,7 +129,7 @@ pub(crate) fn parse_frame<'a>(input: &mut &'a [u8]) -> PResult<Frame<'a>> {
129129
})
130130
}
131131

132-
fn parse_header<'a>(input: &mut &'a [u8]) -> PResult<Header<'a>> {
132+
pub fn parse_header<'a>(input: &mut Partial<&'a [u8]>) -> PResult<Header<'a>> {
133133
trace(
134134
"parse_header",
135135
separated_pair(
@@ -546,7 +546,7 @@ login:user
546546
heart-beat:6,7
547547
passcode:password\n\n\x00"
548548
.to_vec();
549-
let frame = parse_frame(&mut data.as_slice()).unwrap();
549+
let frame = parse_frame(&mut Partial::new(data.as_slice())).unwrap();
550550
let headers_expect: Vec<(&[u8], &[u8])> = vec![
551551
(&b"accept-version"[..], &b"1.2"[..]),
552552
(b"host", b"datafeeds.here.co.uk"),
@@ -569,7 +569,7 @@ accept-version:1.2
569569
host:datafeeds.here.co.uk
570570
login:user
571571
passcode:password\n\n\x00";
572-
let frame = parse_frame(&mut data.as_slice()).unwrap();
572+
let frame = parse_frame(&mut Partial::new(data.as_slice())).unwrap();
573573
eprintln!("Frame: {frame:?}");
574574
let headers_expect: Vec<(&[u8], &[u8])> = vec![
575575
(&b"accept-version"[..], &b"1.2"[..]),
@@ -587,7 +587,7 @@ passcode:password\n\n\x00";
587587
/// https://stomp.github.io/stomp-specification-1.2.html#DISCONNECT
588588
fn parse_and_serialize_client_disconnect() {
589589
let data = b"DISCONNECT\nreceipt:77\n\n\x00";
590-
let frame = parse_frame(&mut data.as_slice()).unwrap();
590+
let frame = parse_frame(&mut Partial::new(data.as_slice())).unwrap();
591591
let headers_expect: Vec<(&[u8], &[u8])> = vec![(b"receipt", b"77")];
592592

593593
assert_eq!(frame.command, b"DISCONNECT");
@@ -605,7 +605,7 @@ passcode:password\n\n\x00";
605605
let body = b"this body contains no nulls \n and \n newlines OK?";
606606
data.extend_from_slice(body);
607607
data.extend_from_slice(b"\x00");
608-
let frame = parse_frame(&mut data.as_slice()).unwrap();
608+
let frame = parse_frame(&mut Partial::new(data.as_slice())).unwrap();
609609
let headers_expect: Vec<(&[u8], &[u8])> = vec![(&b"destination"[..], &b"/queue/a"[..])];
610610
assert_eq!(frame.command, b"SEND");
611611
parse_and_serialize_to_server(&data, frame, headers_expect, Some(body));
@@ -623,7 +623,7 @@ passcode:password\n\n\x00";
623623
let body = "this body contains \x00 nulls \n and \r\n newlines \x00 OK?";
624624
let rest = format!("content-length:{}\n\n{}\x00", body.len(), body);
625625
data.extend_from_slice(rest.as_bytes());
626-
let frame = parse_frame(&mut data.as_slice()).unwrap();
626+
let frame = parse_frame(&mut Partial::new(data.as_slice())).unwrap();
627627
let headers_expect: Vec<(&[u8], &[u8])> = vec![
628628
(&b"destination"[..], &b"/queue/a"[..]),
629629
(b"content-type", b"text/html;charset=utf-8"),

0 commit comments

Comments
 (0)