-
Notifications
You must be signed in to change notification settings - Fork 116
Expand file tree
/
Copy pathapp.rs
More file actions
389 lines (334 loc) · 17 KB
/
app.rs
File metadata and controls
389 lines (334 loc) · 17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
use std::time::Duration;
use eyre::eyre;
use tokio::time::sleep;
use tracing::{debug, error, info};
use malachitebft_app_channel::app::engine::host::{HeightParams, Next};
use malachitebft_app_channel::app::streaming::StreamContent;
use malachitebft_app_channel::app::types::codec::Codec;
use malachitebft_app_channel::app::types::core::{Round, Validity};
use malachitebft_app_channel::app::types::sync::RawDecidedValue;
use malachitebft_app_channel::app::types::{LocallyProposedValue, ProposedValue};
use malachitebft_app_channel::{AppMsg, Channels, NetworkMsg};
use malachitebft_test::codec::json::JsonCodec;
use malachitebft_test::{Height, TestContext};
use crate::state::{decode_value, State};
pub async fn run(state: &mut State, channels: &mut Channels<TestContext>) -> eyre::Result<()> {
while let Some(msg) = channels.consensus.recv().await {
match msg {
// The first message to handle is the `ConsensusReady` message, signaling to the app
// that Malachite is ready to start consensus
AppMsg::ConsensusReady { reply } => {
let start_height = state
.store
.max_decided_value_height()
.await
.map(|height| height.increment())
.unwrap_or_else(|| Height::new(1));
info!(%start_height, "Consensus is ready");
sleep(Duration::from_millis(200)).await;
// We can simply respond by telling the engine to start consensus
// at the next height, and provide it with the appropriate validator set
let params = HeightParams {
validator_set: state.get_validator_set(start_height),
timeouts: state.get_timeouts(start_height),
};
if reply.send((start_height, params)).is_err() {
error!("Failed to send ConsensusReady reply");
}
}
// The next message to handle is the `StartRound` message, signaling to the app
// that consensus has entered a new round (including the initial round 0)
AppMsg::StartedRound {
height,
round,
proposer,
role,
reply_value,
} => {
info!(%height, %round, %proposer, ?role, "Started round");
// We can use that opportunity to update our internal state
state.current_height = height;
state.current_round = round;
state.current_proposer = Some(proposer);
let pending_parts = state
.store
.get_pending_proposal_parts(height, round)
.await?;
info!(%height, %round, "Found {} pending proposal parts, validating...", pending_parts.len());
for parts in &pending_parts {
// Remove the parts from pending
state
.store
.remove_pending_proposal_parts(parts.clone())
.await?;
match state.validate_proposal_parts(parts) {
Ok(()) => {
// Validation passed - convert to ProposedValue and move to undecided
let value = State::assemble_value_from_parts(parts.clone())?;
state.store.store_undecided_proposal(value).await?;
info!(
height = %parts.height,
round = %parts.round,
proposer = %parts.proposer,
"Moved valid pending proposal to undecided after validation"
);
}
Err(error) => {
// Validation failed, log error
error!(
height = %parts.height,
round = %parts.round,
proposer = %parts.proposer,
error = ?error,
"Removed invalid pending proposal"
);
}
}
}
// If we have already built or seen values for this height and round,
// send them back to consensus. This may happen when we are restarting after a crash.
let proposals = state.store.get_undecided_proposals(height, round).await?;
info!(%height, %round, "Found {} undecided proposals", proposals.len());
if reply_value.send(proposals).is_err() {
error!("Failed to send undecided proposals");
}
}
// At some point, we may end up being the proposer for that round, and the engine
// will then ask us for a value to propose to the other validators.
AppMsg::GetValue {
height,
round,
timeout: _,
reply,
} => {
// NOTE: We can ignore the timeout as we are building the value right away.
// If we were let's say reaping as many txes from a mempool and executing them,
// then we would need to respect the timeout and stop at a certain point.
info!(%height, %round, "Consensus is requesting a value to propose");
tracing::debug!(%height, %round, "Middleware: {:?}", state.ctx.middleware());
// Here it is important that, if we have previously built a value for this height and round,
// we send back the very same value.
let proposal = match state.get_previously_built_value(height, round).await? {
Some(mut proposal) => {
state
.ctx
.middleware()
.on_propose_value(&state.ctx, &mut proposal, true);
proposal
}
None => {
// If we have not previously built a value for that very same height and round,
// we need to create a new value to propose and send it back to consensus.
let mut proposal = state.propose_value(height, round).await?;
state
.ctx
.middleware()
.on_propose_value(&state.ctx, &mut proposal, false);
proposal
}
};
// Send it to consensus
if reply.send(proposal.clone()).is_err() {
error!("Failed to send GetValue reply");
}
// The POL round is always nil when we propose a newly built value.
// See L15/L18 of the Tendermint algorithm.
let pol_round = Round::Nil;
// Now what's left to do is to break down the value to propose into parts,
// and send those parts over the network to our peers, for them to re-assemble the full value.
for stream_message in state.stream_proposal(proposal, pol_round) {
debug!(%height, %round, "Streaming proposal part: {stream_message:?}");
channels
.network
.send(NetworkMsg::PublishProposalPart(stream_message))
.await?;
}
}
// On the receiving end of these proposal parts (ie. when we are not the proposer),
// we need to process these parts and re-assemble the full value.
// To this end, we store each part that we receive and assemble the full value once we
// have all its constituent parts. Then we send that value back to consensus for it to
// consider and vote for or against it (ie. vote `nil`), depending on its validity.
AppMsg::ReceivedProposalPart { from, part, reply } => {
let part_type = match &part.content {
StreamContent::Data(part) => part.get_type(),
StreamContent::Fin => "end of stream",
};
debug!(%from, %part.sequence, part.type = %part_type, "Received proposal part");
let proposed_value = state.received_proposal_part(from, part).await?;
if reply.send(proposed_value).is_err() {
error!("Failed to send ReceivedProposalPart reply");
}
}
// After some time, consensus will finally reach a decision on the value
// to commit for the current height, and will notify the application,
// providing it with a commit certificate which contains the ID of the value
// that was decided on as well as the set of commits for that value,
// ie. the precommits together with their (aggregated) signatures.
AppMsg::Decided {
certificate,
extensions: _,
reply,
} => {
info!(
height = %certificate.height,
round = %certificate.round,
value = %certificate.value_id,
"Consensus has decided on value, committing..."
);
assert!(!certificate.commit_signatures.is_empty());
// When that happens, we store the decided value in our store
match state.commit(certificate).await {
Ok(_) => {
// And then we instruct consensus to start the next height
// NOTE: `current_height` has already been incremented in `commit()`
let params = HeightParams {
validator_set: state.get_validator_set(state.current_height),
timeouts: state.get_timeouts(state.current_height),
};
if reply
.send(Next::Start(state.current_height, params))
.is_err()
{
error!("Failed to send StartHeight reply");
}
}
Err(e) => {
// Commit failed, restart the height
error!("Commit failed: {e}");
error!("Restarting height {}", state.current_height);
let params = HeightParams {
validator_set: state.get_validator_set(state.current_height),
timeouts: state.get_timeouts(state.current_height),
};
if reply
.send(Next::Restart(state.current_height, params))
.is_err()
{
error!("Failed to send RestartHeight reply");
}
}
}
sleep(Duration::from_millis(500)).await;
}
// It may happen that our node is lagging behind its peers. In that case,
// a synchronization mechanism will automatically kick to try and catch up to
// our peers. When that happens, some of these peers will send us decided values
// for the current height only (not for future heights). When the engine receives
// such a value, it will forward to the application to decode it from its wire format
// and send back the decoded value to consensus.
AppMsg::ProcessSyncedValue {
height,
round,
proposer,
value_bytes,
reply,
} => {
info!(%height, %round, "Processing synced value");
if let Some(value) = decode_value(value_bytes) {
let proposal = ProposedValue {
height,
round,
valid_round: Round::Nil,
proposer,
value,
validity: Validity::Valid,
};
// TODO: We plan to add some validation here in the future.
state.store_synced_value(proposal.clone()).await?;
if reply.send(Some(proposal)).is_err() {
error!("Failed to send ProcessSyncedValue reply");
}
} else {
error!(%height, %round, "Failed to decode synced value");
if reply.send(None).is_err() {
error!("Failed to send ProcessSyncedValue reply");
}
}
}
// If, on the other hand, we are not lagging behind but are instead asked by one of
// our peer to help them catch up because they are the one lagging behind,
// then the engine might ask the application to provide with the value
// that was decided at some lower height. In that case, we fetch it from our store
// and send it to consensus.
AppMsg::GetDecidedValue { height, reply } => {
info!(%height, "Received sync request for decided value");
let decided_value = state.get_decided_value(height).await;
info!(%height, "Found decided value: {decided_value:?}");
let raw_decided_value = decided_value.and_then(|decided_value| {
match JsonCodec.encode(&decided_value.value) {
Ok(value_bytes) => Some(RawDecidedValue {
certificate: decided_value.certificate,
value_bytes,
}),
Err(e) => {
error!(%height, "Failed to encode decided value: {e}");
None
}
}
});
if reply.send(raw_decided_value).is_err() {
error!("Failed to send GetDecidedValue reply");
}
}
// In order to figure out if we can help a peer that is lagging behind,
// the engine may ask us for the height of the earliest available value in our store.
AppMsg::GetHistoryMinHeight { reply } => {
let min_height = state.get_earliest_height().await;
if reply.send(min_height).is_err() {
error!("Failed to send GetHistoryMinHeight reply");
}
}
AppMsg::RestreamProposal {
height,
round,
valid_round,
address: _,
value_id,
} => {
info!(%height, %valid_round, "Restreaming existing proposal...");
// Look for a proposal at valid_round or round(should be already stored)
let proposal_round = if valid_round == Round::Nil {
round
} else {
valid_round
};
// Look for a proposal for the given value_id at valid_round (should be already stored)
let proposal = state
.store
.get_undecided_proposal(height, proposal_round, value_id)
.await?;
if let Some(proposal) = proposal {
assert_eq!(proposal.value.id(), value_id);
let locally_proposed_value = LocallyProposedValue {
height,
round,
value: proposal.value,
};
for stream_message in state.stream_proposal(locally_proposed_value, valid_round)
{
debug!(%height, %valid_round, "Publishing proposal part: {stream_message:?}");
channels
.network
.send(NetworkMsg::PublishProposalPart(stream_message))
.await?;
}
}
}
AppMsg::ExtendVote { reply, .. } => {
if reply.send(None).is_err() {
error!("Failed to send ExtendVote reply");
}
}
AppMsg::VerifyVoteExtension { reply, .. } => {
if reply.send(Ok(())).is_err() {
error!("Failed to send VerifyVoteExtension reply");
}
}
}
}
// If we get there, it can only be because the channel we use to receive message
// from consensus has been closed, meaning that the consensus actor has died.
// We can do nothing but return an error here.
Err(eyre!("Consensus channel closed unexpectedly"))
}