Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit e81d338

Browse files
committed
feat: switch tce-lib action to spawn tasks
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
1 parent 5b6ddb8 commit e81d338

File tree

3 files changed

+121
-304
lines changed

3 files changed

+121
-304
lines changed

crates/topos-tce/src/app_context/api.rs

Lines changed: 72 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::AppContext;
22
use std::collections::HashMap;
3+
use tokio::spawn;
34
use topos_core::uci::{Certificate, SubnetId};
45
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
56
use topos_tce_api::RuntimeError;
@@ -20,79 +21,82 @@ impl AppContext {
2021
self.delivery_latency
2122
.insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer());
2223

23-
_ = match self
24-
.validator_store
25-
.insert_pending_certificate(&certificate)
26-
.await
27-
{
28-
Ok(Some(pending_id)) => {
29-
let certificate_id = certificate.id;
30-
debug!(
31-
"Certificate {} from subnet {} has been inserted into pending pool",
32-
certificate_id, certificate.source_subnet_id
33-
);
24+
let validator_store = self.validator_store.clone();
25+
let double_echo = self.tce_cli.get_double_echo_channel();
3426

35-
if self
36-
.tce_cli
37-
.get_double_echo_channel()
38-
.send(DoubleEchoCommand::Broadcast {
39-
need_gossip: true,
40-
cert: *certificate,
41-
pending_id,
42-
})
43-
.await
44-
.is_err()
45-
{
46-
error!(
47-
"Unable to send DoubleEchoCommand::Broadcast command to double \
48-
echo for {}",
49-
certificate_id
27+
spawn(async move {
28+
_ = match validator_store
29+
.insert_pending_certificate(&certificate)
30+
.await
31+
{
32+
Ok(Some(pending_id)) => {
33+
let certificate_id = certificate.id;
34+
debug!(
35+
"Certificate {} from subnet {} has been inserted into pending pool",
36+
certificate_id, certificate.source_subnet_id
5037
);
5138

52-
sender.send(Err(RuntimeError::CommunicationError(
53-
"Unable to send DoubleEchoCommand::Broadcast command to double \
54-
echo"
55-
.to_string(),
56-
)))
57-
} else {
58-
sender.send(Ok(PendingResult::InPending(pending_id)))
39+
if double_echo
40+
.send(DoubleEchoCommand::Broadcast {
41+
need_gossip: true,
42+
cert: *certificate,
43+
pending_id,
44+
})
45+
.await
46+
.is_err()
47+
{
48+
error!(
49+
"Unable to send DoubleEchoCommand::Broadcast command to \
50+
double echo for {}",
51+
certificate_id
52+
);
53+
54+
sender.send(Err(RuntimeError::CommunicationError(
55+
"Unable to send DoubleEchoCommand::Broadcast command to \
56+
double echo"
57+
.to_string(),
58+
)))
59+
} else {
60+
sender.send(Ok(PendingResult::InPending(pending_id)))
61+
}
5962
}
60-
}
61-
Ok(None) => {
62-
debug!(
63-
"Certificate {} from subnet {} has been inserted into precedence pool \
64-
waiting for {}",
65-
certificate.id, certificate.source_subnet_id, certificate.prev_id
66-
);
67-
sender.send(Ok(PendingResult::AwaitPrecedence))
68-
}
69-
Err(StorageError::InternalStorage(
70-
InternalStorageError::CertificateAlreadyPending,
71-
)) => {
72-
debug!(
73-
"Certificate {} has already been added to the pending pool, skipping",
74-
certificate.id
75-
);
76-
sender.send(Ok(PendingResult::AlreadyPending))
77-
}
78-
Err(StorageError::InternalStorage(
79-
InternalStorageError::CertificateAlreadyExists,
80-
)) => {
81-
debug!(
82-
"Certificate {} has already been delivered, skipping",
83-
certificate.id
84-
);
85-
sender.send(Ok(PendingResult::AlreadyDelivered))
86-
}
87-
Err(error) => {
88-
error!(
89-
"Unable to insert pending certificate {}: {}",
90-
certificate.id, error
91-
);
63+
Ok(None) => {
64+
debug!(
65+
"Certificate {} from subnet {} has been inserted into precedence \
66+
pool waiting for {}",
67+
certificate.id, certificate.source_subnet_id, certificate.prev_id
68+
);
69+
sender.send(Ok(PendingResult::AwaitPrecedence))
70+
}
71+
Err(StorageError::InternalStorage(
72+
InternalStorageError::CertificateAlreadyPending,
73+
)) => {
74+
debug!(
75+
"Certificate {} has already been added to the pending pool, \
76+
skipping",
77+
certificate.id
78+
);
79+
sender.send(Ok(PendingResult::AlreadyPending))
80+
}
81+
Err(StorageError::InternalStorage(
82+
InternalStorageError::CertificateAlreadyExists,
83+
)) => {
84+
debug!(
85+
"Certificate {} has already been delivered, skipping",
86+
certificate.id
87+
);
88+
sender.send(Ok(PendingResult::AlreadyDelivered))
89+
}
90+
Err(error) => {
91+
error!(
92+
"Unable to insert pending certificate {}: {}",
93+
certificate.id, error
94+
);
9295

93-
sender.send(Err(error.into()))
94-
}
95-
};
96+
sender.send(Err(error.into()))
97+
}
98+
};
99+
});
96100
}
97101

98102
ApiEvent::GetSourceHead { subnet_id, sender } => {
Lines changed: 3 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -1,204 +1,13 @@
1-
use prost::Message;
2-
use std::collections::hash_map;
3-
use topos_tce_storage::errors::{InternalStorageError, StorageError};
4-
5-
use tokio::spawn;
6-
7-
use topos_metrics::CERTIFICATE_DELIVERY_LATENCY;
81
use topos_p2p::Event as NetEvent;
9-
use topos_tce_broadcast::DoubleEchoCommand;
10-
use tracing::{debug, error, info, trace};
11-
12-
use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready};
13-
use topos_core::uci;
2+
use tracing::warn;
143

154
use crate::AppContext;
165

176
impl AppContext {
187
pub async fn on_net_event(&mut self, evt: NetEvent) {
19-
trace!(
8+
warn!(
209
"on_net_event: peer: {} event {:?}",
21-
&self.network_client.local_peer_id,
22-
&evt
10+
&self.network_client.local_peer_id, &evt
2311
);
24-
25-
if let NetEvent::Gossip { data, from } = evt {
26-
if let Ok(DoubleEchoRequest {
27-
request: Some(double_echo_request),
28-
}) = DoubleEchoRequest::decode(&data[..])
29-
{
30-
match double_echo_request {
31-
double_echo_request::Request::Gossip(Gossip {
32-
certificate: Some(certificate),
33-
}) => match uci::Certificate::try_from(certificate) {
34-
Ok(cert) => {
35-
if let hash_map::Entry::Vacant(entry) =
36-
self.delivery_latency.entry(cert.id)
37-
{
38-
entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer());
39-
}
40-
info!(
41-
"Received certificate {} from GossipSub from {}",
42-
cert.id, from
43-
);
44-
45-
match self.validator_store.insert_pending_certificate(&cert).await {
46-
Ok(Some(pending_id)) => {
47-
let certificate_id = cert.id;
48-
debug!(
49-
"Certificate {} has been inserted into pending pool",
50-
certificate_id
51-
);
52-
53-
if self
54-
.tce_cli
55-
.get_double_echo_channel()
56-
.send(DoubleEchoCommand::Broadcast {
57-
need_gossip: false,
58-
cert,
59-
pending_id,
60-
})
61-
.await
62-
.is_err()
63-
{
64-
error!(
65-
"Unable to send DoubleEchoCommand::Broadcast command \
66-
to double echo for {}",
67-
certificate_id
68-
);
69-
}
70-
}
71-
72-
Ok(None) => {
73-
debug!(
74-
"Certificate {} from subnet {} has been inserted into \
75-
precedence pool waiting for {}",
76-
cert.id, cert.source_subnet_id, cert.prev_id
77-
);
78-
}
79-
Err(StorageError::InternalStorage(
80-
InternalStorageError::CertificateAlreadyPending,
81-
)) => {
82-
debug!(
83-
"Certificate {} has been already added to the pending \
84-
pool, skipping",
85-
cert.id
86-
);
87-
}
88-
Err(StorageError::InternalStorage(
89-
InternalStorageError::CertificateAlreadyExists,
90-
)) => {
91-
debug!(
92-
"Certificate {} has been already delivered, skipping",
93-
cert.id
94-
);
95-
}
96-
Err(error) => {
97-
error!(
98-
"Unable to insert pending certificate {}: {}",
99-
cert.id, error
100-
);
101-
}
102-
}
103-
}
104-
Err(e) => {
105-
error!("Failed to parse the received Certificate: {e}");
106-
}
107-
},
108-
double_echo_request::Request::Echo(Echo {
109-
certificate_id: Some(certificate_id),
110-
signature: Some(signature),
111-
validator_id: Some(validator_id),
112-
}) => {
113-
let channel = self.tce_cli.get_double_echo_channel();
114-
spawn(async move {
115-
let certificate_id = certificate_id.clone().try_into().map_err(|e| {
116-
error!(
117-
"Failed to parse the CertificateId {certificate_id} from \
118-
Echo: {e}"
119-
);
120-
e
121-
});
122-
let validator_id = validator_id.clone().try_into().map_err(|e| {
123-
error!(
124-
"Failed to parse the ValidatorId {validator_id} from Echo: {e}"
125-
);
126-
e
127-
});
128-
129-
if let (Ok(certificate_id), Ok(validator_id)) =
130-
(certificate_id, validator_id)
131-
{
132-
trace!(
133-
"Received Echo message, certificate_id: {certificate_id}, \
134-
validator_id: {validator_id} from: {from}",
135-
certificate_id = certificate_id,
136-
validator_id = validator_id
137-
);
138-
139-
if let Err(e) = channel
140-
.send(DoubleEchoCommand::Echo {
141-
signature: signature.into(),
142-
certificate_id,
143-
validator_id,
144-
})
145-
.await
146-
{
147-
error!("Unable to pass received Echo message: {:?}", e);
148-
}
149-
} else {
150-
error!("Unable to process Echo message due to invalid data");
151-
}
152-
});
153-
}
154-
double_echo_request::Request::Ready(Ready {
155-
certificate_id: Some(certificate_id),
156-
signature: Some(signature),
157-
validator_id: Some(validator_id),
158-
}) => {
159-
let channel = self.tce_cli.get_double_echo_channel();
160-
spawn(async move {
161-
let certificate_id = certificate_id.clone().try_into().map_err(|e| {
162-
error!(
163-
"Failed to parse the CertificateId {certificate_id} from \
164-
Ready: {e}"
165-
);
166-
e
167-
});
168-
let validator_id = validator_id.clone().try_into().map_err(|e| {
169-
error!(
170-
"Failed to parse the ValidatorId {validator_id} from Ready: \
171-
{e}"
172-
);
173-
e
174-
});
175-
if let (Ok(certificate_id), Ok(validator_id)) =
176-
(certificate_id, validator_id)
177-
{
178-
trace!(
179-
"Received Ready message, certificate_id: {certificate_id}, \
180-
validator_id: {validator_id} from: {from}",
181-
certificate_id = certificate_id,
182-
validator_id = validator_id
183-
);
184-
if let Err(e) = channel
185-
.send(DoubleEchoCommand::Ready {
186-
signature: signature.into(),
187-
certificate_id,
188-
validator_id,
189-
})
190-
.await
191-
{
192-
error!("Unable to pass received Ready message: {:?}", e);
193-
}
194-
} else {
195-
error!("Unable to process Ready message due to invalid data");
196-
}
197-
});
198-
}
199-
_ => {}
200-
}
201-
}
202-
}
20312
}
20413
}

0 commit comments

Comments
 (0)