Skip to content

Commit 8607a32

Browse files
committed
Add note about transport cueing to design notes, use GenericStore methods in Quic
1 parent ce50a75 commit 8607a32

File tree

2 files changed

+37
-99
lines changed

2 files changed

+37
-99
lines changed

design/notes.md

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
## Meadow Design Notes
22

3-
- Meadow currently relies on transport-layer
4-
5-
63
Each message is serialized into two types of messages:
74
1. `Msg<T>`, which is a strongly-typed variant that is primarily used in the user-facing APIs.
85
2. `GenericMsg`, which has structural overlap with `Msg<T>`, but carries its data payload as an vector of bytes `Vec<u8>`.
@@ -38,3 +35,5 @@ For all message types, the `Node` operates by:
3835
- Reply: Send the created message
3936

4037
At any point during these operations, a failure can be had, which will be in the form of `meadow::Error` enum. This error type is serializable, and so can be included in `Msg` types. As a result, a failure of any of the `Host`-side actions will result in a `MsgType::Error(e)`-based `GenericMsg` being sent back to the `Node`, which is responsible for propagating this message.
38+
39+
Meadow cues off of transport-layer level guarantees for if the results of Host-side actions should be communicated back to their originating Node. This means that both `Node<Quic>` and `Node<Tcp>` expect that the Host will generate a `MsgType::Result` that acts as an `ACK` on the requested operation, even if the operation does not inherently require a response (i.e. `MsgType::Set` doesn't inherently expect a return value). Conversely, `Node<Udp>` does *not* expect and `ACK`, in keeping with the

src/host/quic.rs

+35-96
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::error::{
22
Error, HostOperation,
33
Quic::{self, *},
44
};
5+
use crate::host::GenericStore;
56
use crate::prelude::*;
67
use futures_util::lock::Mutex;
78
use futures_util::StreamExt;
@@ -72,7 +73,7 @@ pub fn read_certs_from_file(
7273
}
7374
}
7475

75-
pub async fn process_quic(stream: (SendStream, RecvStream), db: sled::Db, buf: &mut [u8]) {
76+
pub async fn process_quic(stream: (SendStream, RecvStream), mut db: sled::Db, buf: &mut [u8]) {
7677
let (mut tx, mut rx) = stream;
7778

7879
if let Ok(Some(n)) = rx.read(buf).await {
@@ -85,95 +86,59 @@ pub async fn process_quic(stream: (SendStream, RecvStream), db: sled::Db, buf: &
8586
}
8687
};
8788
info!("{:?}", &msg);
89+
8890
match msg.msg_type {
8991
MsgType::Result(result) => {
9092
if let Err(e) = result {
9193
error!("Received {}", e);
9294
}
9395
}
9496
MsgType::Set => {
95-
let tree = db
96-
.open_tree(msg.topic.as_bytes())
97-
.expect("Error opening tree");
98-
99-
let db_result = match tree.insert(msg.timestamp.to_string(), bytes) {
100-
Ok(_prev_msg) => crate::error::HostOperation::SUCCESS, //"SUCCESS".to_string(),
101-
Err(_e) => {
102-
error!("{:?}", _e);
103-
crate::error::HostOperation::FAILURE
104-
}
105-
};
106-
107-
if let Ok(bytes) = postcard::to_allocvec(&db_result) {
108-
for _ in 0..10 {
109-
match tx.write(&bytes).await {
110-
Ok(_n) => {
111-
break;
112-
}
113-
Err(e) => {
114-
error!("{}", e);
115-
continue;
116-
}
117-
}
97+
let response = GenericMsg::result(db.insert_generic(msg));
98+
if let Ok(return_bytes) = response.as_bytes() {
99+
if let Err(e) = tx.write(&return_bytes).await {
100+
error!("{}", e);
118101
}
119102
}
120103
}
121104
MsgType::Get => {
122-
let tree = db
123-
.open_tree(msg.topic.as_bytes())
124-
.expect("Error opening tree");
125-
126-
let return_bytes = match tree.last() {
127-
Ok(Some(msg)) => msg.1,
128-
_ => {
129-
let e: String = format!("Error: no topic \"{}\" exists", &msg.topic);
130-
error!("{}", &e);
131-
e.as_bytes().into()
132-
}
105+
let response = match db.get_generic_nth(&msg.topic, 0) {
106+
Ok(g) => g,
107+
Err(e) => GenericMsg::result(Err(e)),
133108
};
134-
135-
match tx.write(&return_bytes).await {
136-
Ok(_n) => {}
137-
Err(e) => {
109+
if let Ok(return_bytes) = response.as_bytes() {
110+
if let Err(e) = tx.write(&return_bytes).await {
138111
error!("{}", e);
139112
}
140113
}
141114
}
142115
MsgType::GetNth(n) => {
143-
let tree = db
144-
.open_tree(msg.topic.as_bytes())
145-
.expect("Error opening tree");
146-
147-
match tree.iter().nth_back(n) {
148-
Some(topic) => {
149-
let return_bytes = match topic {
150-
Ok((_timestamp, bytes)) => bytes,
151-
Err(e) => {
152-
let e: String =
153-
format!("Error: no topic \"{}\" exists", &msg.topic);
154-
error!("{}", &e);
155-
e.as_bytes().into()
156-
}
157-
};
158-
159-
match tx.write(&return_bytes).await {
160-
Ok(_n) => {}
161-
Err(e) => {
162-
error!("{}", e);
163-
}
164-
}
116+
let response = match db.get_generic_nth(&msg.topic, n) {
117+
Ok(g) => g,
118+
Err(e) => GenericMsg::result(Err(e)),
119+
};
120+
if let Ok(return_bytes) = response.as_bytes() {
121+
if let Err(e) = tx.write(&return_bytes).await {
122+
error!("{}", e);
165123
}
166-
None => {
167-
let e: String = format!("Error: no topic \"{}\" exists", &msg.topic);
168-
error!("{}", &e);
169-
170-
match tx.write(&e.as_bytes()).await {
171-
Ok(_n) => {}
172-
Err(e) => {
173-
error!("{}", e);
174-
}
124+
}
125+
}
126+
MsgType::Topics => {
127+
let response = match db.topics() {
128+
Ok(mut topics) => {
129+
topics.sort();
130+
let msg = Msg::new(MsgType::Topics, "", topics);
131+
match msg.to_generic() {
132+
Ok(msg) => msg,
133+
Err(e) => GenericMsg::result(Err(e)),
175134
}
176135
}
136+
Err(e) => GenericMsg::result(Err(e)),
137+
};
138+
if let Ok(return_bytes) = response.as_bytes() {
139+
if let Err(e) = tx.write(&return_bytes).await {
140+
error!("{}", e);
141+
}
177142
}
178143
}
179144
MsgType::Subscribe => {
@@ -203,32 +168,6 @@ pub async fn process_quic(stream: (SendStream, RecvStream), db: sled::Db, buf: &
203168
sleep(rate).await;
204169
}
205170
}
206-
MsgType::Topics => {
207-
let names = db.tree_names();
208-
let mut strings = Vec::new();
209-
for name in names {
210-
if let Ok(name) = std::str::from_utf8(&name[..]) {
211-
strings.push(name.to_string());
212-
}
213-
}
214-
// Remove default sled tree name
215-
let index = strings
216-
.iter()
217-
.position(|x| *x == "__sled__default")
218-
.unwrap();
219-
strings.remove(index);
220-
strings.sort();
221-
if let Ok(data) = to_allocvec(&strings) {
222-
let mut packet = GenericMsg::topics();
223-
packet.set_data(data);
224-
225-
if let Ok(bytes) = to_allocvec(&packet) {
226-
if let Err(e) = tx.write(&bytes).await {
227-
error!("Error sending data back on QUIC/TOPICS: {:?}", e);
228-
}
229-
}
230-
}
231-
}
232171
}
233172
}
234173
}

0 commit comments

Comments
 (0)