Skip to content

Commit b25e627

Browse files
committed
Consolidate Quic
1 parent e754038 commit b25e627

File tree

1 file changed

+87
-170
lines changed

1 file changed

+87
-170
lines changed

src/node/quic/active.rs

+87-170
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,17 @@ use std::convert::TryInto;
99

1010
use chrono::Utc;
1111

12+
use crate::node::Block;
1213
use postcard::*;
1314
use quinn::Connection as QuicConnection;
15+
use std::fmt::Debug;
1416
use std::result::Result;
1517
use tracing::*;
1618

17-
/// Quic implements the Interface trait
18-
// impl Interface for Quic {}
19-
20-
impl<T: Message + 'static> Node<Nonblocking, Quic, Active, T> {
21-
#[tracing::instrument(skip(self))]
22-
pub async fn publish(&self, val: T) -> Result<(), Error> {
19+
impl<T: Message + 'static, B: Block + Debug> Node<B, Quic, Active, T> {
20+
#[tracing::instrument(skip_all)]
21+
#[inline]
22+
async fn publish_internal(&self, val: T) -> Result<(), Error> {
2323
let packet = Msg::new(MsgType::Set, self.topic.clone(), val)
2424
.to_generic()?
2525
.as_bytes()?;
@@ -48,8 +48,9 @@ impl<T: Message + 'static> Node<Nonblocking, Quic, Active, T> {
4848
}
4949
}
5050

51-
#[tracing::instrument(skip(self))]
52-
pub async fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
51+
#[tracing::instrument(skip_all)]
52+
#[inline]
53+
async fn publish_msg_internal(&self, msg: Msg<T>) -> Result<(), Error> {
5354
let packet = msg.to_generic()?.as_bytes()?;
5455

5556
if let Some(connection) = &self.connection {
@@ -76,8 +77,10 @@ impl<T: Message + 'static> Node<Nonblocking, Quic, Active, T> {
7677
}
7778
}
7879

79-
pub async fn request(&self) -> Result<Msg<T>, Error> {
80-
let packet = GenericMsg::get::<T>(self.topic.clone()).as_bytes()?;
80+
#[tracing::instrument(skip_all)]
81+
#[inline]
82+
async fn request_nth_back_internal(&self, n: usize) -> Result<Msg<T>, Error> {
83+
let packet = GenericMsg::get_nth::<T>(self.topic.clone(), n).as_bytes()?;
8184

8285
let mut buf = self.buffer.lock().await;
8386

@@ -105,7 +108,9 @@ impl<T: Message + 'static> Node<Nonblocking, Quic, Active, T> {
105108
}
106109
}
107110

108-
pub async fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
111+
#[tracing::instrument(skip_all)]
112+
#[inline]
113+
async fn topics_internal(&self) -> Result<Msg<Vec<String>>, Error> {
109114
let packet = GenericMsg::topics().as_bytes()?;
110115

111116
let mut buf = self.buffer.lock().await;
@@ -125,186 +130,98 @@ impl<T: Message + 'static> Node<Nonblocking, Quic, Active, T> {
125130
}
126131
}
127132

133+
impl<T: Message + 'static> Node<Nonblocking, Quic, Active, T> {
134+
#[tracing::instrument(skip_all)]
135+
#[inline]
136+
pub async fn publish(&self, val: T) -> Result<(), Error> {
137+
self.publish_internal(val).await?;
138+
Ok(())
139+
}
140+
141+
#[tracing::instrument(skip_all)]
142+
#[inline]
143+
pub async fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
144+
self.publish_msg_internal(msg).await?;
145+
Ok(())
146+
}
147+
148+
#[tracing::instrument(skip_all)]
149+
#[inline]
150+
pub async fn request(&self) -> Result<Msg<T>, Error> {
151+
let msg = self.request_nth_back_internal(0).await?;
152+
Ok(msg)
153+
}
154+
155+
#[tracing::instrument(skip_all)]
156+
#[inline]
157+
pub async fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
158+
let msg = self.topics_internal().await?;
159+
Ok(msg)
160+
}
161+
}
162+
128163
//-----
129164

130165
use crate::node::network_config::Blocking;
131166

132167
impl<T: Message + 'static> Node<Blocking, Quic, Active, T> {
133-
#[tracing::instrument(skip(self))]
168+
#[tracing::instrument(skip_all)]
169+
#[inline]
134170
pub fn publish(&self, val: T) -> Result<(), Error> {
135-
let packet = Msg::new(MsgType::Set, &self.topic, val)
136-
.to_generic()?
137-
.as_bytes()?;
138-
139-
let handle = match &self.rt_handle {
140-
Some(handle) => handle,
141-
None => return Err(Error::HandleAccess),
142-
};
143-
144-
if let Some(connection) = &self.connection {
145-
handle.block_on(async {
146-
match connection.open_bi().await {
147-
Ok((mut send, _recv)) => {
148-
debug!("Node succesfully opened stream from connection");
149-
150-
if let Ok(()) = send.write_all(&packet).await {
151-
if let Ok(()) = send.finish().await {
152-
debug!("Node successfully wrote packet to stream");
153-
}
154-
} else {
155-
error!("Error writing packet to stream");
156-
}
157-
}
158-
Err(e) => {
159-
warn!("{:?}", e);
160-
}
161-
};
162-
171+
match &self.rt_handle {
172+
Some(handle) => handle.block_on(async {
173+
self.publish_internal(val).await?;
163174
Ok(())
164-
})
165-
} else {
166-
Err(Error::Quic(Connection))
175+
}),
176+
None => Err(Error::HandleAccess),
167177
}
168178
}
169179

170-
#[tracing::instrument(skip(self))]
180+
#[tracing::instrument(skip_all)]
181+
#[inline]
171182
pub fn publish_msg(&self, msg: Msg<T>) -> Result<(), Error> {
172-
let packet = msg.to_generic()?.as_bytes()?;
173-
174-
let handle = match &self.rt_handle {
175-
Some(handle) => handle,
176-
None => return Err(Error::HandleAccess),
177-
};
178-
179-
if let Some(connection) = &self.connection {
180-
handle.block_on(async {
181-
match connection.open_bi().await {
182-
Ok((mut send, _recv)) => {
183-
debug!("Node succesfully opened stream from connection");
184-
185-
if let Ok(()) = send.write_all(&packet).await {
186-
if let Ok(()) = send.finish().await {
187-
debug!("Node successfully wrote packet to stream");
188-
}
189-
} else {
190-
error!("Error writing packet to stream");
191-
}
192-
}
193-
Err(e) => {
194-
warn!("{:?}", e);
195-
}
196-
};
197-
183+
match &self.rt_handle {
184+
Some(handle) => handle.block_on(async {
185+
self.publish_msg_internal(msg).await?;
198186
Ok(())
199-
})
200-
} else {
201-
Err(Error::Quic(Connection))
187+
}),
188+
None => Err(Error::HandleAccess),
202189
}
203190
}
204191

192+
#[tracing::instrument(skip_all)]
193+
#[inline]
205194
pub fn request(&self) -> Result<Msg<T>, Error> {
206-
let packet = GenericMsg::get::<T>(self.topic.clone()).as_bytes()?;
207-
208-
let handle = match &self.rt_handle {
209-
Some(handle) => handle,
210-
None => return Err(Error::HandleAccess),
211-
};
212-
213-
handle.block_on(async {
214-
let mut buf = self.buffer.lock().await;
215-
216-
if let Some(connection) = self.connection.clone() {
217-
let (mut send, mut recv) = connection.open_bi().await?;
218-
debug!("Node succesfully opened stream from connection");
219-
send.write_all(&packet).await?;
220-
// send.finish().await.map_err(WriteError)?;
221-
222-
loop {
223-
match recv.read(&mut buf).await? {
224-
Some(0) => continue,
225-
Some(n) => {
226-
let bytes = &buf[..n];
227-
let generic = from_bytes::<GenericMsg>(bytes)?;
228-
let msg = generic.try_into()?;
229-
230-
return Ok(msg);
231-
}
232-
None => continue,
233-
}
234-
}
235-
} else {
236-
Err(Error::Quic(Connection))
237-
}
238-
})
195+
match &self.rt_handle {
196+
Some(handle) => handle.block_on(async {
197+
let msg = self.request_nth_back_internal(0).await?;
198+
Ok(msg)
199+
}),
200+
None => Err(Error::HandleAccess),
201+
}
239202
}
240203

204+
#[tracing::instrument(skip_all)]
205+
#[inline]
241206
pub fn request_nth_back(&self, n: usize) -> Result<Msg<T>, Error> {
242-
let packet = GenericMsg::get_nth::<T>(self.topic.clone(), n).as_bytes()?;
243-
244-
let handle = match &self.rt_handle {
245-
Some(handle) => handle,
246-
None => return Err(Error::HandleAccess),
247-
};
248-
249-
handle.block_on(async {
250-
let mut buf = self.buffer.lock().await;
251-
252-
if let Some(connection) = self.connection.clone() {
253-
let (mut send, mut recv) = connection.open_bi().await?;
254-
debug!("Node succesfully opened stream from connection");
255-
send.write_all(&packet).await?;
256-
// send.finish().await.map_err(WriteError)?;
257-
258-
loop {
259-
match recv.read(&mut buf).await? {
260-
Some(0) => continue,
261-
Some(n) => {
262-
let bytes = &buf[..n];
263-
let generic = from_bytes::<GenericMsg>(bytes)?;
264-
match generic.msg_type {
265-
MsgType::Result(result) => {
266-
if let Err(e) = result {
267-
return Err(e);
268-
}
269-
}
270-
_ => {
271-
let msg = generic.try_into()?;
272-
return Ok(msg);
273-
}
274-
}
275-
}
276-
None => continue,
277-
}
278-
}
279-
} else {
280-
Err(Error::Quic(Connection))
281-
}
282-
})
207+
match &self.rt_handle {
208+
Some(handle) => handle.block_on(async {
209+
let msg = self.request_nth_back_internal(n).await?;
210+
Ok(msg)
211+
}),
212+
None => Err(Error::HandleAccess),
213+
}
283214
}
284215

216+
#[tracing::instrument(skip_all)]
217+
#[inline]
285218
pub fn topics(&self) -> Result<Msg<Vec<String>>, Error> {
286-
let packet = GenericMsg::topics().as_bytes()?;
287-
288-
let handle = match &self.rt_handle {
289-
Some(handle) => handle,
290-
None => return Err(Error::HandleAccess),
291-
};
292-
293-
handle.block_on(async {
294-
let mut buf = self.buffer.lock().await;
295-
296-
let connection = self.connection.clone().ok_or(Connection)?;
297-
298-
let (mut send, mut recv) = connection.open_bi().await?;
299-
debug!("Node succesfully opened stream from connection");
300-
send.write_all(&packet).await?;
301-
send.finish().await?;
302-
303-
let n = recv.read(&mut buf).await?.ok_or(Connection)?;
304-
let bytes = &buf[..n];
305-
let reply = from_bytes::<GenericMsg>(bytes)?;
306-
let topics: Msg<Vec<String>> = reply.try_into()?;
307-
Ok(topics)
308-
})
219+
match &self.rt_handle {
220+
Some(handle) => handle.block_on(async {
221+
let msg = self.topics_internal().await?;
222+
Ok(msg)
223+
}),
224+
None => Err(Error::HandleAccess),
225+
}
309226
}
310227
}

0 commit comments

Comments
 (0)