Skip to content

Commit 93c7540

Browse files
authored
TQ: Add support for "Expunge" messages (#8874)
When a node requests a share for an old configuration it will receive an `Expunged` message if the share request receiving node has a later committed configuration where the node is not a member. When the `Expunged` message is receieved, the expunged node persists this fact and stops replying to peer messages. We also fix a bug in the test where nexus wasn't actually committing configurations at all. So far invariants really slow the test down. On my machine checking the invariants after every applied event makes the test take ~120s. Without invariant checking, it takes about 14s. I definitely want to add more invariants for correctness, but maybe not check them when they aren't applicable. Some may become postconditions on certain events instead.
1 parent b7c2392 commit 93c7540

File tree

6 files changed

+226
-19
lines changed

6 files changed

+226
-19
lines changed

trust-quorum/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ pub use node::{Node, NodeDiff};
3939
// public only for docs.
4040
pub use node_ctx::NodeHandlerCtx;
4141
pub use node_ctx::{NodeCallerCtx, NodeCommonCtx, NodeCtx, NodeCtxDiff};
42-
pub use persistent_state::{PersistentState, PersistentStateSummary};
42+
pub use persistent_state::{
43+
ExpungedMetadata, PersistentState, PersistentStateSummary,
44+
};
4345

4446
#[derive(
4547
Debug,

trust-quorum/src/messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub enum PeerMsgKind {
6565
LrtqShare(LrtqShare),
6666

6767
/// Inform a node that it is no longer part of the trust quorum as of the
68-
/// given epoch
68+
/// given epoch, which the responder knows is commmitted.
6969
Expunged(Epoch),
7070

7171
/// Inform a node that it is utilizing an old committed onfiguration and

trust-quorum/src/node.rs

Lines changed: 132 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use crate::validators::{
2020
MismatchedRackIdError, ReconfigurationError, ValidatedReconfigureMsg,
2121
};
2222
use crate::{
23-
Alarm, Configuration, CoordinatorState, Epoch, NodeHandlerCtx, PlatformId,
24-
messages::*,
23+
Alarm, Configuration, CoordinatorState, Epoch, ExpungedMetadata,
24+
NodeHandlerCtx, PlatformId, messages::*,
2525
};
2626
use daft::{Diffable, Leaf};
2727
use gfss::shamir::Share;
@@ -101,9 +101,10 @@ impl Node {
101101
};
102102

103103
if let Some(kcs) = &self.key_share_computer {
104-
// We know from our `ValidatedReconfigureMsg` that we haven't seen a newer
105-
// configuration and we have the correct last committed configuration. Therefore if we are computing a key share,
106-
// we must be doing it for a stale commit and should cancel it.
104+
// We know from our `ValidatedReconfigureMsg` that we haven't seen
105+
// a newer configuration and we have the correct last committed
106+
// configuration. Therefore if we are computing a key share, we must
107+
// be doing it for a stale commit and should cancel it.
107108
//
108109
// I don't think it's actually possible to hit this condition, but
109110
// we check anyway.
@@ -139,6 +140,19 @@ impl Node {
139140
{
140141
let ps = ctx.persistent_state();
141142

143+
if let Some(expunged) = &ps.expunged {
144+
error!(
145+
self.log,
146+
"Commit attempted on expunged node";
147+
"expunged_epoch" => %expunged.epoch,
148+
"expunging_node" => %expunged.from
149+
);
150+
return Err(CommitError::Expunged {
151+
epoch: expunged.epoch,
152+
from: expunged.from.clone(),
153+
});
154+
}
155+
142156
// If we have a configuration the rack id must match the one from
143157
// Nexus
144158
if let Some(ps_rack_id) = ps.rack_id() {
@@ -243,13 +257,26 @@ impl Node {
243257
from: PlatformId,
244258
msg: PeerMsg,
245259
) {
260+
if ctx.persistent_state().is_expunged() {
261+
warn!(
262+
self.log,
263+
"Received message while expunged. Dropping.";
264+
"from" => %from,
265+
"msg" => msg.kind.name()
266+
);
267+
return;
268+
}
269+
246270
if let Some(rack_id) = ctx.persistent_state().rack_id() {
247271
if rack_id != msg.rack_id {
248-
error!(self.log, "Mismatched rack id";
249-
"from" => %from,
250-
"msg" => msg.kind.name(),
251-
"expected" => %rack_id,
252-
"got" => %msg.rack_id);
272+
error!(
273+
self.log,
274+
"Mismatched rack id";
275+
"from" => %from,
276+
"msg" => msg.kind.name(),
277+
"expected" => %rack_id,
278+
"got" => %msg.rack_id
279+
);
253280
return;
254281
}
255282
}
@@ -269,6 +296,9 @@ impl Node {
269296
PeerMsgKind::CommitAdvance(config) => {
270297
self.handle_commit_advance(ctx, from, config)
271298
}
299+
PeerMsgKind::Expunged(epoch) => {
300+
self.handle_expunged(ctx, from, epoch);
301+
}
272302
_ => todo!(
273303
"cannot handle message variant yet - not implemented: {msg:?}"
274304
),
@@ -308,6 +338,85 @@ impl Node {
308338
}
309339
}
310340

341+
fn handle_expunged(
342+
&mut self,
343+
ctx: &mut impl NodeHandlerCtx,
344+
from: PlatformId,
345+
epoch: Epoch,
346+
) {
347+
if let Some(config) = ctx.persistent_state().latest_config() {
348+
if epoch < config.epoch {
349+
// It's possible, but unlikely, that we were expunged at `epoch`
350+
// and later re-added to the trust-quorum, but the reply to
351+
// an old message is still floating in the network. This is
352+
// especially unlikely since, we should really have restarted
353+
// sprockets connections in this case. In any event, the race
354+
// condition exists at the protocol level, and so we handle it.
355+
if config.members.contains_key(ctx.platform_id()) {
356+
let m = concat!(
357+
"Received Expunged message for old epoch. ",
358+
"We must have been re-added as a trust-quorum member."
359+
);
360+
warn!(
361+
self.log,
362+
"{m}";
363+
"from" => %from,
364+
"received_epoch" => %epoch,
365+
"epoch" => %config.epoch
366+
);
367+
}
368+
return;
369+
} else if epoch > config.epoch {
370+
let m = concat!(
371+
"Received Expunged message for newer epoch. ",
372+
"Recording expungement in persistent state."
373+
);
374+
warn!(
375+
self.log,
376+
"{m}";
377+
"from" => %from,
378+
"received_epoch" => %epoch,
379+
"epoch" => %config.epoch
380+
);
381+
// Intentionally fall through
382+
} else {
383+
let m = concat!(
384+
"Received Expunged message for latest known epoch. ",
385+
"Recording expungement in persistent state."
386+
);
387+
warn!(
388+
self.log,
389+
"{m}";
390+
"from" => %from,
391+
"received_epoch" => %epoch,
392+
"epoch" => %config.epoch
393+
);
394+
// Intentionally fall through
395+
}
396+
397+
// Perform the actual expunge
398+
ctx.update_persistent_state(|ps| {
399+
ps.expunged = Some(ExpungedMetadata { epoch, from });
400+
true
401+
});
402+
403+
// Stop coordinating and computing a key share
404+
self.coordinator_state = None;
405+
self.key_share_computer = None;
406+
} else {
407+
let m = concat!(
408+
"Received Expunge message, but we have no configurations. ",
409+
"We must have been factory reset already."
410+
);
411+
error!(
412+
self.log,
413+
"{m}";
414+
"from" => %from,
415+
"received_epoch" => %epoch
416+
);
417+
}
418+
}
419+
311420
fn handle_commit_advance(
312421
&mut self,
313422
ctx: &mut impl NodeHandlerCtx,
@@ -469,7 +578,10 @@ impl Node {
469578
%latest_committed_config.epoch,
470579
"requested_epoch" => %epoch
471580
);
472-
// TODO: Send an expunged message
581+
ctx.send(
582+
from,
583+
PeerMsgKind::Expunged(latest_committed_config.epoch),
584+
);
473585
return;
474586
}
475587
info!(
@@ -499,7 +611,13 @@ impl Node {
499611
"from" => %from,
500612
"epoch" => %epoch
501613
);
502-
// TODO: Send an expunged message
614+
// Technically, this node does not yet know that the
615+
// configuration at `epoch` has been committed. However,
616+
// requesting nodes only ask for key shares when they know that
617+
// the configuration has been committed. Therefore, rather than
618+
// introduce a new message such as `NotAMember`, we inform the
619+
// requesting node that they have been expunged.
620+
ctx.send(from, PeerMsgKind::Expunged(epoch));
503621
return;
504622
}
505623
}
@@ -720,6 +838,8 @@ pub enum CommitError {
720838
),
721839
#[error("cannot commit: not prepared for epoch {0}")]
722840
NotPrepared(Epoch),
841+
#[error("cannot commit: expunged at epoch {epoch} by {from}")]
842+
Expunged { epoch: Epoch, from: PlatformId },
723843
}
724844

725845
#[cfg(test)]

trust-quorum/src/persistent_state.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct PersistentState {
3131

3232
// Has the node been informed that it is no longer part of the trust quorum?
3333
//
34-
// If at any time this gets set, than the it remains true for the lifetime
34+
// If at any time this gets set, then the it remains true for the lifetime
3535
// of the node. The sled corresponding to the node must be factory reset by
3636
// wiping its storage.
3737
pub expunged: Option<ExpungedMetadata>,
@@ -62,11 +62,13 @@ impl PersistentState {
6262
self.lrtq.is_some() && self.latest_committed_epoch().is_none()
6363
}
6464

65-
// Are there any committed configurations or lrtq data?
65+
/// Are there any committed configurations or lrtq data?
6666
pub fn is_uninitialized(&self) -> bool {
6767
self.lrtq.is_none() && self.latest_committed_epoch().is_none()
6868
}
6969

70+
/// The latest configuration that we know about, regardless of whether it
71+
/// has been committed.
7072
pub fn latest_config(&self) -> Option<&Configuration> {
7173
self.configs.iter().last()
7274
}
@@ -108,6 +110,11 @@ impl PersistentState {
108110
pub fn has_prepared(&self, epoch: Epoch) -> bool {
109111
self.configs.contains_key(&epoch) && self.shares.contains_key(&epoch)
110112
}
113+
114+
/// Has this node been expunged?
115+
pub fn is_expunged(&self) -> bool {
116+
self.expunged.is_some()
117+
}
111118
}
112119

113120
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

trust-quorum/test-utils/src/state.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::nexus::{
1010
use crate::{Event, member_universe};
1111
use daft::{BTreeMapDiff, BTreeSetDiff, Diffable, Leaf};
1212
use iddqd::IdOrdMap;
13-
use slog::Logger;
13+
use slog::{Logger, info};
1414
use std::collections::{BTreeMap, BTreeSet};
1515
use std::fmt::Display;
1616
use trust_quorum::{
@@ -280,6 +280,11 @@ impl TqState {
280280
NexusReply::AckedPreparesFromCoordinator { epoch, acks } => {
281281
if epoch == latest_config.epoch {
282282
latest_config.prepared_members.extend(acks);
283+
284+
if latest_config.can_commit() {
285+
drop(latest_config);
286+
self.nexus_commit();
287+
}
283288
}
284289
}
285290
NexusReply::CommitAck { from, epoch } => {
@@ -347,6 +352,47 @@ impl TqState {
347352
self.send_reconfigure_msg();
348353
self.send_envelopes_from_coordinator();
349354
}
355+
356+
// Commit at nexus when preparing
357+
fn nexus_commit(&mut self) {
358+
let mut latest_config = self.nexus.latest_config_mut();
359+
info!(
360+
self.log,
361+
"nexus committed";
362+
"epoch" => %latest_config.epoch,
363+
"coordinator" => %latest_config.coordinator
364+
);
365+
366+
latest_config.op = NexusOp::Committed;
367+
368+
let new_members = latest_config.members.clone();
369+
let new_epoch = latest_config.epoch;
370+
371+
// Expunge any removed nodes from the last committed configuration
372+
if let Some(last_committed_epoch) = latest_config.last_committed_epoch {
373+
// Release our mutable borrow
374+
drop(latest_config);
375+
376+
let last_committed_config = self
377+
.nexus
378+
.configs
379+
.get(&last_committed_epoch)
380+
.expect("config exists");
381+
382+
let expunged =
383+
last_committed_config.members.difference(&new_members).cloned();
384+
385+
for e in expunged {
386+
info!(
387+
self.log,
388+
"expunged node";
389+
"epoch" => %new_epoch,
390+
"platform_id" => %e
391+
);
392+
self.expunged.insert(e);
393+
}
394+
}
395+
}
350396
}
351397

352398
/// Broken out of `TqState` to alleviate borrow checker woes
@@ -528,7 +574,7 @@ fn display_nexus_state_diff(
528574
f: &mut std::fmt::Formatter<'_>,
529575
) -> std::fmt::Result {
530576
if diff.configs.modified().count() != 0 {
531-
writeln!(f, " nexus state changed:")?;
577+
writeln!(f, "nexus state changed:")?;
532578
}
533579

534580
// Nexus configs can only be added or modified

trust-quorum/tests/cluster.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ impl TestState {
309309
for s in removed_nodes {
310310
// The same selection can be chosen more than once. so we
311311
// must add the extra check rather than shrinking the length
312-
// of the `removed_nodes` iterator with `take`.;
312+
// of the `removed_nodes` iterator with `take`.
313313
if nodes_to_remove.len() == max_nodes_to_remove {
314314
break;
315315
}
@@ -398,6 +398,38 @@ impl TestState {
398398
self.invariant_nodes_have_committed_if_nexus_has_acks()?;
399399
self.invariant_nodes_not_coordinating_and_computing_key_share_simultaneously()?;
400400
self.invariant_no_alarms()?;
401+
self.invariant_expunged_nodes_have_actually_been_expunged()?;
402+
Ok(())
403+
}
404+
405+
/// For all expunged nodes ensure that either:
406+
/// * they know they are expunged
407+
/// * have a latest committed configuration where they are still a member
408+
/// * have no committed configurations
409+
fn invariant_expunged_nodes_have_actually_been_expunged(
410+
&self,
411+
) -> Result<(), TestCaseError> {
412+
for id in &self.tq_state.expunged {
413+
let (_, ctx) =
414+
self.tq_state.sut.nodes.get(id).expect("node exists");
415+
let ps = ctx.persistent_state();
416+
if ps.is_expunged() {
417+
continue;
418+
}
419+
if let Some(config) = ps.latest_committed_configuration() {
420+
let nexus_config = self
421+
.tq_state
422+
.nexus
423+
.configs
424+
.get(&config.epoch)
425+
.expect("config exists");
426+
prop_assert!(config.members.contains_key(ctx.platform_id()));
427+
prop_assert!(nexus_config.members.contains(ctx.platform_id()));
428+
} else {
429+
continue;
430+
}
431+
}
432+
401433
Ok(())
402434
}
403435

0 commit comments

Comments
 (0)