Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat(e2e): support multiple aggregators in the e2e tests #2378

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
7b05361
feat(e2e): update e2e test command line arguments
jpraynaud Mar 10, 2025
99f5881
feat(e2e): 'MithrilInfrastructure' supports multiple aggregators
jpraynaud Mar 11, 2025
19b4507
refactor(e2e): implement interior mutability for 'Aggregator', 'RunOn…
jpraynaud Mar 12, 2025
6e4b6b9
refactor(relay): support for dialing to peer in relays of e2e test
jpraynaud Mar 13, 2025
4e22900
refactor(e2e): enhance 'MithrilInfrastructure' support for multiple a…
jpraynaud Mar 13, 2025
6906938
feat(e2e): 'RunOnly' supports multiple aggregators
jpraynaud Mar 13, 2025
7e3ac58
feat(e2e): 'Spec' supports multiple aggregators
jpraynaud Mar 13, 2025
7ce4531
feat(e2e): runner supports multiple aggregators
jpraynaud Mar 13, 2025
59233ef
refactor(e2e): enhance naming of aggregators and associated relays
jpraynaud Mar 14, 2025
932360d
refactor(e2e): enhance assertions checks
jpraynaud Mar 14, 2025
40f32af
fix(common): enhance Certificate display implementation
jpraynaud Mar 17, 2025
e9e0d2f
fix(aggregator): integration test for slave uses evolving Mithril sta…
jpraynaud Mar 17, 2025
ed50cc8
fix(common): avoid too low stake in random stake distribution
jpraynaud Mar 17, 2025
b6e18ea
fix(aggregator): slave signer registration stabilization
jpraynaud Mar 17, 2025
f7c754c
feat(relay): implement signer relay modes
jpraynaud Mar 18, 2025
1c7f425
refactor(e2e): use signer relay modes in e2e test
jpraynaud Mar 18, 2025
80c2021
refactor(ci): update e2e tests in CI to use the signer relay modes
jpraynaud Mar 18, 2025
bd12065
refactor(e2e): better naming for aggregators in e2e tests
jpraynaud Mar 18, 2025
8bd1e56
fix(e2e): delegate stakes only from the first aggregator
jpraynaud Mar 18, 2025
be5f9ab
refactor(e2e): remove distinction master/slave aggregator
jpraynaud Mar 19, 2025
4731590
fix(e2e): make genesis bootstrap error retryable
jpraynaud Mar 19, 2025
cad83fb
fixup: feat: update state machine to support slave aggregator mode
jpraynaud Mar 20, 2025
0b21674
fix(e2e): flakiness in the genesis bootstrap of slave aggregators
jpraynaud Mar 20, 2025
42b4d01
refactor(e2e): enhance assertions logs with aggregator name
jpraynaud Mar 20, 2025
752292f
fix(ci): wrong format for next era in some e2e scenarios
jpraynaud Mar 21, 2025
124810c
fix(e2e): era switch done on multiple aggregators
jpraynaud Mar 21, 2025
33ddc53
refactor(aggregator): simplify slave aggregator integration test
jpraynaud Mar 21, 2025
366f910
refactor(e2e): better parameter handling with clap
jpraynaud Mar 21, 2025
618f376
chore(e2e): apply review comments
jpraynaud Mar 21, 2025
9e47514
wip(ci): DO NOT MERGE
jpraynaud Mar 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -294,23 +294,34 @@ jobs:
strategy:
fail-fast: false
matrix:
mode: ["std"]
mode: ["master-slave"]
era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras) }}
next_era: [""]
cardano_node_version: ["10.1.3", "10.1.4", "10.2.1"]
hard_fork_latest_era_at_epoch: [0]
run_id: ["#1", "#2"]
extra_args: [""]
run_id: ["#1", "#2", "#3", "#4", "#5", "#6", "#7", "#8", "#9", "#10"]
extra_args:
[
"--number-of-aggregators=2 --use-relays --relay-signer-registration-mode=passthrough --relay-signature-registration-mode=p2p",
]

include:
# Include a test for the P2P mode
- mode: "p2p"
# Include a test for partial decentralization with master/slave signer registration and P2P signature registration
- mode: "master-slave"
era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras)[0] }}
next_era: ""
cardano_node_version: "10.1.4"
hard_fork_latest_era_at_epoch: 0
run_id: "#1"
extra_args: "--number-of-aggregators=2 --use-relays --relay-signer-registration-mode=passthrough --relay-signature-registration-mode=p2p"
# Include a test for full dedentralization P2P signer registration and P2P signature registration
- mode: "decentralized"
era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras)[0] }}
next_era: [""]
next_era: ""
cardano_node_version: "10.1.4"
hard_fork_latest_era_at_epoch: 0
run_id: "#1"
extra_args: "--use-p2p-network"
extra_args: "--number-of-aggregators=2 --use-relays --relay-signer-registration-mode=p2p --relay-signature-registration-mode=p2p"
# Include a test for the era switch without regenesis
- mode: "std"
era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras)[0] }}
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions mithril-aggregator/src/runtime/state_machine.rs
Original file line number Diff line number Diff line change
@@ -143,7 +143,6 @@ impl AggregatorRuntime {
info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point);

let can_try_transition_from_idle_to_ready = if self.config.is_slave {
println!("Checking if slave aggregator is at the same epoch as master");
self.runner
.is_slave_aggregator_at_same_epoch_as_master(&last_time_point)
.await?
@@ -265,18 +264,19 @@ impl AggregatorRuntime {
self.runner
.update_stake_distribution(&new_time_point)
.await?;
if self.config.is_slave {
self.runner
.synchronize_slave_aggregator_signer_registration()
.await?;
}
self.runner.inform_new_epoch(new_time_point.epoch).await?;

self.runner.upkeep(new_time_point.epoch).await?;
self.runner
.open_signer_registration_round(&new_time_point)
.await?;
self.runner.update_epoch_settings().await?;
if self.config.is_slave {
self.runner
.synchronize_slave_aggregator_signer_registration()
.await?;
// Needed to recompute epoch data for the next signing round on the slave
self.runner.inform_new_epoch(new_time_point.epoch).await?;
}
Comment on lines 272 to +279
Copy link
Collaborator

@Alenar Alenar Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how this change help to stabilize the e2e tests ? I'm quite puzzled over the fact that we need to call runner.inform_new_epoch twice.

From what I understand this doesn't impact the methods called between the inform_new_epoch calls:

  • runner.upkeep call should not be impacted
  • open_signer_registration_round do nothing on slave
  • update_epoch_settings should not be impacted as the data registered by the epoch service (protocol parameters and transactions signing config) don't depends on the master aggregator

The functional impacts should be:

  • epoch service will expose an incorrect list of next_signers in the interval between the two inform_new_epoch calls
  • epoch service will be ready earlier since a first inform_epoch calls will be done without needing a roundtrip to the master aggregator

Is the last point the problem on fast network ? Maybe the synchronizer should be able to "edit" the next signers in the epoch_service instead ?

self.runner.precompute_epoch_data().await?;
}

@@ -940,7 +940,7 @@ mod tests {
runner
.expect_inform_new_epoch()
.with(predicate::eq(new_time_point_clone.clone().epoch))
.once()
.times(2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to change number of calls ?
Modification on the state_machine seems to concern only the slave mode.
Does it mean we are running a slave ?
Test name say that it is a master: "idle_new_epoch_detected_and_master_has_transitioned_to_epoch"

.returning(|_| Ok(()));
runner
.expect_update_epoch_settings()
449 changes: 304 additions & 145 deletions mithril-aggregator/tests/create_certificate_slave.rs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is really heavy to read and to execute (16s on the github actions ci).
Can we simplify it ? Here are some ideas:

  • We could bootstrap the master genesis certificate immediately and only start the slave afterward since this is already tested in genesis_to_signing
  • Is the last epoch (7) really needed ? It seems to only repeat the check done on epoch 6

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion mithril-common/src/entities/certificate.rs
Original file line number Diff line number Diff line change
@@ -157,7 +157,7 @@ impl Debug for Certificate {
true => debug
.field(
"aggregate_verification_key",
&format_args!("{:?}", self.aggregate_verification_key),
&format_args!("{:?}", self.aggregate_verification_key.to_json_hex()),
)
.field("signature", &format_args!("{:?}", self.signature))
.finish(),
2 changes: 1 addition & 1 deletion mithril-common/src/entities/epoch.rs
Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ impl Epoch {
pub const CARDANO_STAKE_DISTRIBUTION_SNAPSHOT_OFFSET: u64 = 2;

/// The epoch offset used to retrieve the epoch at which a signer has registered to the master aggregator.
pub const SIGNER_MASTER_SYNCHRONIZATION_OFFSET: u64 = 1;
pub const SIGNER_MASTER_SYNCHRONIZATION_OFFSET: u64 = 0;

/// Computes a new Epoch by applying an epoch offset.
///
1 change: 1 addition & 0 deletions mithril-common/src/protocol/multi_signer.rs
Original file line number Diff line number Diff line change
@@ -171,6 +171,7 @@ mod test {
.with_signers(1)
.with_stake_distribution(StakeDistributionGenerationMethod::RandomDistribution {
seed: [3u8; 32],
min_stake: 1,
})
.build(),
);
1 change: 1 addition & 0 deletions mithril-common/src/protocol/signer_builder.rs
Original file line number Diff line number Diff line change
@@ -218,6 +218,7 @@ mod test {
.with_stake_distribution(
crate::test_utils::StakeDistributionGenerationMethod::RandomDistribution {
seed: [4u8; 32],
min_stake: 1,
},
)
.build();
13 changes: 10 additions & 3 deletions mithril-common/src/test_utils/fixture_builder.rs
Original file line number Diff line number Diff line change
@@ -29,7 +29,10 @@ impl Default for MithrilFixtureBuilder {
enable_signers_certification: true,
number_of_signers: 5,
stake_distribution_generation_method:
StakeDistributionGenerationMethod::RandomDistribution { seed: [0u8; 32] },
StakeDistributionGenerationMethod::RandomDistribution {
seed: [0u8; 32],
min_stake: 1,
},
party_id_seed: [0u8; 32],
}
}
@@ -41,6 +44,8 @@ pub enum StakeDistributionGenerationMethod {
RandomDistribution {
/// The randomizer seed
seed: [u8; 32],
/// The minimum stake
min_stake: Stake,
},

/// Use a custom stake distribution
@@ -106,13 +111,13 @@ impl MithrilFixtureBuilder {
let signers_party_ids = self.generate_party_ids();

match &self.stake_distribution_generation_method {
StakeDistributionGenerationMethod::RandomDistribution { seed } => {
StakeDistributionGenerationMethod::RandomDistribution { seed, min_stake } => {
let mut stake_rng = ChaCha20Rng::from_seed(*seed);

signers_party_ids
.into_iter()
.map(|party_id| {
let stake = 1 + stake_rng.next_u64() % 999;
let stake = min_stake + stake_rng.next_u64() % 999;
(party_id, stake)
})
.collect::<Vec<_>>()
@@ -246,6 +251,7 @@ mod tests {
let result = MithrilFixtureBuilder::default()
.with_stake_distribution(StakeDistributionGenerationMethod::RandomDistribution {
seed: [0u8; 32],
min_stake: 1,
})
.with_signers(4)
.build();
@@ -275,6 +281,7 @@ mod tests {
let result = MithrilFixtureBuilder::default()
.with_stake_distribution(StakeDistributionGenerationMethod::RandomDistribution {
seed: [0u8; 32],
min_stake: 1,
})
.with_signers(5)
.build();
1 change: 1 addition & 0 deletions mithril-relay/Cargo.toml
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ slog = { version = "2.7.0", features = [
] }
slog-async = "2.8.0"
slog-bunyan = "2.5.0"
strum = { version = "0.26.3", features = ["derive"] }
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["full"] }
warp = "0.3.7"
14 changes: 13 additions & 1 deletion mithril-relay/src/commands/signer.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ use mithril_common::StdResult;
use slog::error;

use super::CommandContext;
use crate::SignerRelay;
use crate::{SignerRelay, SignerRelayMode};

#[derive(Parser, Debug, Clone)]
pub struct SignerCommand {
@@ -26,6 +26,14 @@ pub struct SignerCommand {
#[clap(long, env = "AGGREGATOR_ENDPOINT")]
aggregator_endpoint: String,

/// Signer registration relay mode
#[clap(value_enum, env = "SIGNER_REGISTRATION_MODE", default_value_t = SignerRelayMode::Passthrough)]
signer_registration_mode: SignerRelayMode,

/// Signature registration relay mode
#[clap(value_enum, env = "SIGNATURE_REGISTRATION_MODE", default_value_t = SignerRelayMode::P2P)]
signature_registration_mode: SignerRelayMode,

/// Interval at which a signer registration should be repeated in milliseconds (defaults to 1 hour)
#[clap(long, env = "SIGNER_REPEATER_DELAY", default_value_t = 3_600 * 1_000)]
signer_repeater_delay: u64,
@@ -38,12 +46,16 @@ impl SignerCommand {
let server_port = self.server_port.to_owned();
let dial_to = self.dial_to.to_owned();
let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?;
let signer_registration_mode = &self.signer_registration_mode;
let signature_registration_mode = &self.signature_registration_mode;
let aggregator_endpoint = self.aggregator_endpoint.to_owned();
let signer_repeater_delay = Duration::from_millis(self.signer_repeater_delay);

let mut relay = SignerRelay::start(
&addr,
&server_port,
signer_registration_mode,
signature_registration_mode,
&aggregator_endpoint,
&signer_repeater_delay,
logger,
1 change: 1 addition & 0 deletions mithril-relay/src/lib.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ pub use commands::RelayCommands;
pub use relay::AggregatorRelay;
pub use relay::PassiveRelay;
pub use relay::SignerRelay;
pub use relay::SignerRelayMode;

/// The P2P topic names used by Mithril
pub mod mithril_p2p_topic {
1 change: 1 addition & 0 deletions mithril-relay/src/relay/mod.rs
Original file line number Diff line number Diff line change
@@ -5,3 +5,4 @@ mod signer;
pub use aggregator::AggregatorRelay;
pub use passive::PassiveRelay;
pub use signer::SignerRelay;
pub use signer::SignerRelayMode;
172 changes: 126 additions & 46 deletions mithril-relay/src/relay/signer.rs
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ use crate::{
p2p::{Peer, PeerEvent},
repeater::MessageRepeater,
};
use clap::ValueEnum;
use libp2p::Multiaddr;
use mithril_common::{
logging::LoggerExtensions,
@@ -11,9 +12,37 @@ use mithril_common::{
};
use slog::{debug, info, Logger};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use strum::Display;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use warp::Filter;

/// Signer relay mode
///
/// The relay mode defines how the relay will behave when it receives a message
#[derive(Debug, Clone, Display, PartialEq, Eq, ValueEnum)]
#[strum(serialize_all = "mixed_case")]
pub enum SignerRelayMode {
/// Passthrough relay mode
///
/// In this mode, the relay will only call the aggregator with the message received
Passthrough,
/// P2P relay mode
///
/// In this mode, the relay will publish the message received to the P2P network
P2P,
}

struct HTTPServerConfiguration<'a> {
server_port: &'a u16,
signer_registration_mode: SignerRelayMode,
signature_registration_mode: SignerRelayMode,
aggregator_endpoint: &'a str,
signer_tx: UnboundedSender<RegisterSignerMessage>,
signature_tx: UnboundedSender<RegisterSignatureMessage>,
signer_repeater: Arc<MessageRepeater<RegisterSignerMessage>>,
logger: &'a Logger,
}

/// A relay for a Mithril signer
pub struct SignerRelay {
server: TestHttpServer,
@@ -29,12 +58,14 @@ impl SignerRelay {
pub async fn start(
address: &Multiaddr,
server_port: &u16,
signer_registration_mode: &SignerRelayMode,
signature_registration_mode: &SignerRelayMode,
aggregator_endpoint: &str,
signer_repeater_delay: &Duration,
logger: &Logger,
) -> StdResult<Self> {
let relay_logger = logger.new_with_component_name::<Self>();
debug!(relay_logger, "Starting...");
debug!(relay_logger, "Starting..."; "signer_registration_mode" => ?signer_registration_mode, "signature_registration_mode" => ?signature_registration_mode);
let (signature_tx, signature_rx) = unbounded_channel::<RegisterSignatureMessage>();
let (signer_tx, signer_rx) = unbounded_channel::<RegisterSignerMessage>();
let signer_repeater = Arc::new(MessageRepeater::new(
@@ -43,14 +74,16 @@ impl SignerRelay {
logger,
));
let peer = Peer::new(address).start().await?;
let server = Self::start_http_server(
let server = Self::start_http_server(&HTTPServerConfiguration {
server_port,
signer_registration_mode: signer_registration_mode.to_owned(),
signature_registration_mode: signature_registration_mode.to_owned(),
aggregator_endpoint,
signer_tx,
signature_tx,
signer_repeater.clone(),
logger,
)
signer_tx: signer_tx.clone(),
signature_tx: signature_tx.clone(),
signer_repeater: signer_repeater.clone(),
logger: &relay_logger,
})
.await;
info!(relay_logger, "Listening on"; "address" => ?server.address());

@@ -64,44 +97,55 @@ impl SignerRelay {
})
}

async fn start_http_server(
server_port: &u16,
aggregator_endpoint: &str,
signer_tx: UnboundedSender<RegisterSignerMessage>,
signature_tx: UnboundedSender<RegisterSignatureMessage>,
signer_repeater: Arc<MessageRepeater<RegisterSignerMessage>>,
logger: &Logger,
) -> TestHttpServer {
let server_logger = logger.new_with_name("http_server");
async fn start_http_server(configuration: &HTTPServerConfiguration<'_>) -> TestHttpServer {
let server_logger = configuration.logger.new_with_name("http_server");
test_http_server_with_socket_address(
warp::path::end()
.and(warp::get())
.and(middlewares::with_logger(&server_logger))
.and(middlewares::with_aggregator_endpoint(
aggregator_endpoint.to_string(),
configuration.aggregator_endpoint.to_string(),
))
.and_then(handlers::aggregator_features_handler)
.or(warp::path("register-signatures")
.and(warp::post())
.and(warp::body::json())
.and(middlewares::with_signer_relay_mode(
configuration.signature_registration_mode.clone(),
))
.and(middlewares::with_aggregator_endpoint(
configuration.aggregator_endpoint.to_string(),
))
.and(middlewares::with_logger(&server_logger))
.and(middlewares::with_transmitter(signature_tx))
.and(middlewares::with_transmitter(
configuration.signature_tx.clone(),
))
.and_then(handlers::register_signatures_handler))
.or(warp::path("register-signer")
.and(warp::post())
.and(warp::body::json())
.and(middlewares::with_signer_relay_mode(
configuration.signer_registration_mode.clone(),
))
.and(middlewares::with_aggregator_endpoint(
configuration.aggregator_endpoint.to_string(),
))
.and(middlewares::with_logger(&server_logger))
.and(middlewares::with_transmitter(signer_tx))
.and(middlewares::with_repeater(signer_repeater.clone()))
.and(middlewares::with_transmitter(
configuration.signer_tx.clone(),
))
.and(middlewares::with_repeater(
configuration.signer_repeater.clone(),
))
.and_then(handlers::register_signer_handler))
.or(warp::path("epoch-settings")
.and(warp::get())
.and(middlewares::with_logger(&server_logger))
.and(middlewares::with_aggregator_endpoint(
aggregator_endpoint.to_string(),
configuration.aggregator_endpoint.to_string(),
))
.and_then(handlers::epoch_settings_handler)),
([0, 0, 0, 0], *server_port).into(),
([0, 0, 0, 0], *configuration.server_port).into(),
)
}

@@ -173,6 +217,8 @@ mod middlewares {

use crate::repeater::MessageRepeater;

use super::SignerRelayMode;

pub fn with_logger(
logger: &slog::Logger,
) -> impl Filter<Extract = (slog::Logger,), Error = Infallible> + Clone {
@@ -197,6 +243,12 @@ mod middlewares {
) -> impl Filter<Extract = (String,), Error = Infallible> + Clone {
warp::any().map(move || aggregator_endpoint.clone())
}

pub fn with_signer_relay_mode(
signer_relay_mode: SignerRelayMode,
) -> impl Filter<Extract = (SignerRelayMode,), Error = Infallible> + Clone {
warp::any().map(move || signer_relay_mode.clone())
}
}

mod handlers {
@@ -205,10 +257,12 @@ mod handlers {
use slog::{debug, Logger};
use std::{convert::Infallible, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;
use warp::http::StatusCode;
use warp::{http::StatusCode, reply::WithStatus};

use crate::repeater;

use super::SignerRelayMode;

pub async fn aggregator_features_handler(
logger: Logger,
aggregator_endpoint: String,
@@ -223,40 +277,66 @@ mod handlers {

pub async fn register_signer_handler(
register_signer_message: RegisterSignerMessage,
signer_relay_mode: SignerRelayMode,
aggregator_endpoint: String,
logger: Logger,
tx: UnboundedSender<RegisterSignerMessage>,
repeater: Arc<repeater::MessageRepeater<RegisterSignerMessage>>,
) -> Result<impl warp::Reply, Infallible> {
debug!(logger, "Serve HTTP route /register-signer"; "register_signer_message" => #?register_signer_message);

repeater.set_message(register_signer_message.clone()).await;
match tx.send(register_signer_message) {
Ok(_) => Ok(Box::new(warp::reply::with_status(
"".to_string(),
StatusCode::CREATED,
))),
Err(err) => Ok(Box::new(warp::reply::with_status(
format!("{err:?}"),
StatusCode::INTERNAL_SERVER_ERROR,
))),
debug!(logger, "Serve HTTP route /register-signer"; "signer_relay_mode" => ?signer_relay_mode, "register_signer_message" => #?register_signer_message,);
match signer_relay_mode {
SignerRelayMode::P2P => {
repeater.set_message(register_signer_message.clone()).await;
match tx.send(register_signer_message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The match on tx.send seems to be a technical stuff (expect perhaps the StatusCode returned on sucess) and we may extract it into a function.
It'll simplify the function and help to understand faster what it does.
The new function can be reuse for register_signatures_handler

Ok(_) => Ok(Box::new(warp::reply::with_status(
"".to_string(),
StatusCode::CREATED,
))),
Err(err) => Ok(Box::new(warp::reply::with_status(
format!("{err:?}"),
StatusCode::INTERNAL_SERVER_ERROR,
))),
}
}
SignerRelayMode::Passthrough => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no test in this file about this code.
Are tests useless here?
Is it tested elsewhere ?

let response = reqwest::Client::new()
.post(format!("{aggregator_endpoint}/register-signer"))
.json(&register_signer_message)
.send()
.await;
reply_response(logger, response).await
}
}
}

pub async fn register_signatures_handler(
register_signature_message: RegisterSignatureMessage,
signer_relay_mode: SignerRelayMode,
aggregator_endpoint: String,
logger: Logger,
tx: UnboundedSender<RegisterSignatureMessage>,
) -> Result<impl warp::Reply, Infallible> {
debug!(logger, "Serve HTTP route /register-signatures"; "register_signature_message" => #?register_signature_message);
match tx.send(register_signature_message) {
Ok(_) => Ok(Box::new(warp::reply::with_status(
"".to_string(),
StatusCode::CREATED,
))),
Err(err) => Ok(Box::new(warp::reply::with_status(
format!("{err:?}"),
StatusCode::INTERNAL_SERVER_ERROR,
))),
debug!(logger, "Serve HTTP route /register-signatures"; "signer_relay_mode" => ?signer_relay_mode, "register_signature_message" => #?register_signature_message);

match signer_relay_mode {
SignerRelayMode::P2P => match tx.send(register_signature_message) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract the match tx.send into a function

Ok(_) => Ok(Box::new(warp::reply::with_status(
"".to_string(),
StatusCode::CREATED,
))),
Err(err) => Ok(Box::new(warp::reply::with_status(
format!("{err:?}"),
StatusCode::INTERNAL_SERVER_ERROR,
))),
},
SignerRelayMode::Passthrough => {
let response = reqwest::Client::new()
.post(format!("{aggregator_endpoint}/register-signatures"))
.json(&register_signature_message)
.send()
.await;
reply_response(logger, response).await
}
}
}

@@ -275,7 +355,7 @@ mod handlers {
pub async fn reply_response(
logger: Logger,
response: Result<Response, Error>,
) -> Result<impl warp::Reply, Infallible> {
) -> Result<Box<WithStatus<String>>, Infallible> {
match response {
Ok(response) => match StatusCode::from_u16(response.status().into()) {
Ok(status) => match response.text().await {
6 changes: 5 additions & 1 deletion mithril-relay/tests/register_signer_signature.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use libp2p::{gossipsub, Multiaddr};
use mithril_common::messages::{RegisterSignatureMessage, RegisterSignerMessage};
use mithril_relay::{
p2p::{BroadcastMessage, PeerBehaviourEvent, PeerEvent},
PassiveRelay, SignerRelay,
PassiveRelay, SignerRelay, SignerRelayMode,
};
use reqwest::StatusCode;
use slog::{Drain, Level, Logger};
@@ -35,11 +35,15 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() {
let total_peers = 1 + total_p2p_client;
let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap();
let server_port = 0;
let signer_registration_mode = SignerRelayMode::P2P;
let signature_registration_mode = SignerRelayMode::P2P;
let aggregator_endpoint = "http://0.0.0.0:1234".to_string();
let signer_repeater_delay = Duration::from_secs(100);
let mut signer_relay = SignerRelay::start(
&addr,
&server_port,
&signer_registration_mode,
&signature_registration_mode,
&aggregator_endpoint,
&signer_repeater_delay,
&logger,
1 change: 1 addition & 0 deletions mithril-test-lab/mithril-end-to-end/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ clap = { version = "4.5.28", features = ["derive"] }
indicatif = { version = "0.17.11", features = ["tokio"] }
mithril-common = { path = "../../mithril-common", features = ["full"] }
mithril-doc = { path = "../../internal/mithril-doc" }
mithril-relay = { path = "../../mithril-relay" }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm puzzled over making the E2e runner depends on other binary crates, is making the argument configuration always correct worth the burden of having a dependency as huge as this one ? And why limit it to only the relay we should do this also for the aggregator and signer too, making the build of the e2e far heavier and longer for little gain.
I would rather not having the dependency and copy the enum in the e2e.

reqwest = { version = "0.12.12", features = ["json"] }
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.138"
201 changes: 131 additions & 70 deletions mithril-test-lab/mithril-end-to-end/src/assertions/check.rs

Large diffs are not rendered by default.

46 changes: 28 additions & 18 deletions mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs
Original file line number Diff line number Diff line change
@@ -5,36 +5,45 @@ use mithril_common::entities::{Epoch, ProtocolParameters};
use mithril_common::StdResult;
use slog_scope::info;

pub async fn bootstrap_genesis_certificate(aggregator: &mut Aggregator) -> StdResult<()> {
info!("Bootstrap genesis certificate");

info!("> stopping aggregator");
pub async fn bootstrap_genesis_certificate(aggregator: &Aggregator) -> StdResult<()> {
info!("Bootstrap genesis certificate"; "aggregator" => &aggregator.name());

// A slave aggregator needs to wait few cycles of the state machine to be able to bootstrap
// This should be removed when the aggregator is able to synchronize its certificate chain from another aggregator
if !aggregator.is_first() {
tokio::time::sleep(std::time::Duration::from_millis(
2 * aggregator.mithril_run_interval() as u64,
))
.await;
}

info!("> stopping aggregator"; "aggregator" => &aggregator.name());
aggregator.stop().await?;
info!("> bootstrapping genesis using signers registered two epochs ago...");
info!("> bootstrapping genesis using signers registered two epochs ago..."; "aggregator" => &aggregator.name());
aggregator.bootstrap_genesis().await?;
info!("> done, restarting aggregator");
aggregator.serve()?;
info!("> done, restarting aggregator"; "aggregator" => &aggregator.name());
aggregator.serve().await?;

Ok(())
}

pub async fn register_era_marker(
aggregator: &mut Aggregator,
aggregator: &Aggregator,
devnet: &Devnet,
mithril_era: &str,
era_epoch: Epoch,
) -> StdResult<()> {
info!("Register '{mithril_era}' era marker");
info!("Register '{mithril_era}' era marker"; "aggregator" => &aggregator.name());

info!("> generating era marker tx datum...");
info!("> generating era marker tx datum..."; "aggregator" => &aggregator.name());
let tx_datum_file_path = devnet
.artifacts_dir()
.join(PathBuf::from("era-tx-datum.txt".to_string()));
aggregator
.era_generate_tx_datum(&tx_datum_file_path, mithril_era, era_epoch)
.await?;

info!("> writing '{mithril_era}' era marker on the Cardano chain...");
info!("> writing '{mithril_era}' era marker on the Cardano chain..."; "aggregator" => &aggregator.name());
devnet.write_era_marker(&tx_datum_file_path).await?;

Ok(())
@@ -56,8 +65,8 @@ pub async fn transfer_funds(devnet: &Devnet) -> StdResult<()> {
Ok(())
}

pub async fn update_protocol_parameters(aggregator: &mut Aggregator) -> StdResult<()> {
info!("Update protocol parameters");
pub async fn update_protocol_parameters(aggregator: &Aggregator) -> StdResult<()> {
info!("Update protocol parameters"; "aggregator" => &aggregator.name());

info!("> stopping aggregator");
aggregator.stop().await?;
@@ -67,12 +76,13 @@ pub async fn update_protocol_parameters(aggregator: &mut Aggregator) -> StdResul
phi_f: 0.80,
};
info!(
"> updating protocol parameters to {:?}...",
protocol_parameters_new
"> updating protocol parameters to {protocol_parameters_new:?}..."; "aggregator" => &aggregator.name()
);
aggregator.set_protocol_parameters(&protocol_parameters_new);
info!("> done, restarting aggregator");
aggregator.serve()?;
aggregator
.set_protocol_parameters(&protocol_parameters_new)
.await;
info!("> done, restarting aggregator"; "aggregator" => &aggregator.name());
aggregator.serve().await?;

Ok(())
}
25 changes: 13 additions & 12 deletions mithril-test-lab/mithril-end-to-end/src/assertions/wait.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use crate::{attempt, utils::AttemptResult};
use crate::{attempt, utils::AttemptResult, Aggregator};
use anyhow::{anyhow, Context};
use mithril_common::{
chain_observer::ChainObserver, digesters::ImmutableFile, entities::Epoch,
messages::EpochSettingsMessage, StdResult,
};
use reqwest::StatusCode;
use slog_scope::{info, warn};
use std::{path::Path, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

pub async fn wait_for_enough_immutable(db_directory: &Path) -> StdResult<()> {
info!("Waiting that enough immutable have been written in the devnet");
pub async fn wait_for_enough_immutable(aggregator: &Aggregator) -> StdResult<()> {
info!("Waiting that enough immutable have been written in the devnet"; "aggregator" => aggregator.name());

let db_directory = aggregator.db_directory();
match attempt!(24, Duration::from_secs(5), {
match ImmutableFile::list_completed_in_dir(db_directory)
.with_context(|| {
@@ -34,9 +35,10 @@ pub async fn wait_for_enough_immutable(db_directory: &Path) -> StdResult<()> {
}
}

pub async fn wait_for_epoch_settings(aggregator_endpoint: &str) -> StdResult<EpochSettingsMessage> {
pub async fn wait_for_epoch_settings(aggregator: &Aggregator) -> StdResult<EpochSettingsMessage> {
let aggregator_endpoint = aggregator.endpoint();
let url = format!("{aggregator_endpoint}/epoch-settings");
info!("Waiting for the aggregator to expose epoch settings");
info!("Waiting for the aggregator to expose epoch settings"; "aggregator" => aggregator.name());

match attempt!(20, Duration::from_millis(1000), {
match reqwest::get(url.clone()).await {
@@ -50,10 +52,7 @@ pub async fn wait_for_epoch_settings(aggregator_endpoint: &str) -> StdResult<Epo
Ok(Some(epoch_settings))
}
s if s.is_server_error() => {
warn!(
"Server error while waiting for the Aggregator, http code: {}",
s
);
warn!( "Server error while waiting for the Aggregator, http code: {s}"; "aggregator" => aggregator.name());
Ok(None)
}
_ => Ok(None),
@@ -70,16 +69,18 @@ pub async fn wait_for_epoch_settings(aggregator_endpoint: &str) -> StdResult<Epo
}

pub async fn wait_for_target_epoch(
aggregator: &Aggregator,
chain_observer: Arc<dyn ChainObserver>,
target_epoch: Epoch,
wait_reason: String,
) -> StdResult<()> {
info!(
"Waiting for the cardano network to be at the target epoch: {}", wait_reason;
"aggregator" => aggregator.name(),
"target_epoch" => ?target_epoch
);

match attempt!(90, Duration::from_millis(1000), {
match attempt!(450, Duration::from_millis(200), {
match chain_observer
.get_current_epoch()
.await
@@ -96,7 +97,7 @@ pub async fn wait_for_target_epoch(
}
}) {
AttemptResult::Ok(_) => {
info!("Target epoch reached!"; "target_epoch" => ?target_epoch);
info!("Target epoch reached!"; "aggregator" => aggregator.name(), "target_epoch" => ?target_epoch);
Ok(())
}
AttemptResult::Err(error) => Err(error),
200 changes: 127 additions & 73 deletions mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
use crate::assertions;
use crate::MithrilInfrastructure;
use std::sync::Arc;

use anyhow::anyhow;
use tokio::sync::RwLock;
use tokio::task::JoinSet;

use mithril_common::{
chain_observer::ChainObserver,
entities::{Epoch, SignedEntityTypeDiscriminants},
StdResult,
};

pub struct Spec<'a> {
pub infrastructure: &'a mut MithrilInfrastructure,
use crate::{assertions, Aggregator, MithrilInfrastructure};

pub struct Spec {
pub infrastructure: Arc<RwLock<Option<MithrilInfrastructure>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that the main.rs use a Arc<RwLock<Option<_>>> to handle the infrastructure lifecycle should not appears here, instead we should have a Arc<MithrilInfrastructure>.

is_signing_cardano_transactions: bool,
is_signing_cardano_stake_distribution: bool,
is_signing_cardano_database: bool,
next_era: Option<String>,
regenesis_on_era_switch: bool,
}

impl<'a> Spec<'a> {
impl Spec {
pub fn new(
infrastructure: &'a mut MithrilInfrastructure,
infrastructure: Arc<RwLock<Option<MithrilInfrastructure>>>,
signed_entity_types: Vec<String>,
next_era: Option<String>,
regenesis_on_era_switch: bool,
@@ -43,224 +50,271 @@ impl<'a> Spec<'a> {
}
}

pub async fn run(&mut self) -> StdResult<()> {
let aggregator_endpoint = self.infrastructure.aggregator().endpoint();
assertions::wait_for_enough_immutable(self.infrastructure.aggregator().db_directory())
.await?;
let start_epoch = self
.infrastructure
.chain_observer()
.get_current_epoch()
.await?
.unwrap_or_default();
pub async fn run(self) -> StdResult<()> {
let mut join_set = JoinSet::new();
let spec = Arc::new(self);
let infrastructure_guard = spec.infrastructure.read().await;
let infrastructure = infrastructure_guard
.as_ref()
.ok_or(anyhow!("No infrastructure found"))?;
let aggregators = infrastructure_guard
.as_ref()
.ok_or(anyhow!("No infrastructure found"))?
.aggregators();

// Transfer some funds on the devnet to have some Cardano transactions to sign.
// This step needs to be executed early in the process so that the transactions are available
// for signing in the penultimate immutable chunk before the end of the test.
// As we get closer to the tip of the chain when signing, we'll be able to relax this constraint.
assertions::transfer_funds(self.infrastructure.devnet()).await?;
assertions::transfer_funds(infrastructure.devnet()).await?;

for index in 0..aggregators.len() {
let spec_clone = spec.clone();
join_set.spawn(async move {
let infrastructure_guard = spec_clone.infrastructure.read().await;
let infrastructure = infrastructure_guard
.as_ref()
.ok_or(anyhow!("No infrastructure found"))?;

spec_clone
.run_scenario(
infrastructure.aggregator(index),
infrastructure.chain_observer(index),
infrastructure,
)
.await
});
}

while let Some(res) = join_set.join_next().await {
res??;
}

Ok(())
}

pub async fn run_scenario(
&self,
aggregator: &Aggregator,
chain_observer: Arc<dyn ChainObserver>,
infrastructure: &MithrilInfrastructure,
) -> StdResult<()> {
assertions::wait_for_enough_immutable(aggregator).await?;
let start_epoch = chain_observer
.get_current_epoch()
.await?
.unwrap_or_default();

// Wait 4 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate
let mut target_epoch = start_epoch + 4;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer.clone(),
target_epoch,
"minimal epoch for the aggregator to be able to bootstrap genesis certificate"
.to_string(),
)
.await?;
assertions::bootstrap_genesis_certificate(self.infrastructure.aggregator_mut()).await?;
assertions::wait_for_epoch_settings(&aggregator_endpoint).await?;
assertions::bootstrap_genesis_certificate(aggregator).await?;
assertions::wait_for_epoch_settings(aggregator).await?;

// Wait 2 epochs before changing stake distribution, so that we use at least one original stake distribution
target_epoch += 2;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer.clone(),
target_epoch,
"epoch after which the stake distribution will change".to_string(),
)
.await?;
let delegation_round = 1;
assertions::delegate_stakes_to_pools(self.infrastructure.devnet(), delegation_round)
.await?;

if aggregator.is_first() {
// Delegate some stakes to pools
let delegation_round = 1;
assertions::delegate_stakes_to_pools(infrastructure.devnet(), delegation_round).await?;
}

// Wait 2 epochs before changing protocol parameters
target_epoch += 2;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer.clone(),
target_epoch,
"epoch after which the protocol parameters will change".to_string(),
)
.await?;
assertions::update_protocol_parameters(self.infrastructure.aggregator_mut()).await?;
assertions::update_protocol_parameters(aggregator).await?;

// Wait 6 epochs after protocol parameters update, so that we make sure that we use new protocol parameters as well as new stake distribution a few times
target_epoch += 6;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer.clone(),
target_epoch,
"epoch after which the certificate chain will be long enough to catch most common troubles with stake distribution and protocol parameters".to_string(),
)
.await?;

// Verify that artifacts are produced and signed correctly
let mut target_epoch = self.verify_artifacts_production(target_epoch).await?;
let mut target_epoch = self
.verify_artifacts_production(target_epoch, aggregator, infrastructure)
.await?;

// Verify that artifacts are produced and signed correctly after era switch
if let Some(next_era) = &self.next_era {
// Switch to next era
self.infrastructure
.register_switch_to_next_era(next_era)
.await?;
if aggregator.is_first() {
infrastructure.register_switch_to_next_era(next_era).await?;
}
target_epoch += 5;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer.clone(),
target_epoch,
"epoch after which the era switch will have triggered".to_string(),
)
.await?;

// Proceed to a re-genesis of the certificate chain
if self.regenesis_on_era_switch {
assertions::bootstrap_genesis_certificate(self.infrastructure.aggregator_mut())
.await?;
assertions::bootstrap_genesis_certificate(aggregator).await?;
target_epoch += 5;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer.clone(),
target_epoch,
"epoch after which the re-genesis on era switch will be completed".to_string(),
)
.await?;
}

// Verify that artifacts are produced and signed correctly
self.verify_artifacts_production(target_epoch).await?;
self.verify_artifacts_production(target_epoch, aggregator, infrastructure)
.await?;
}

Ok(())
}

async fn verify_artifacts_production(&self, target_epoch: Epoch) -> StdResult<Epoch> {
let aggregator_endpoint = self.infrastructure.aggregator().endpoint();
async fn verify_artifacts_production(
&self,
target_epoch: Epoch,
aggregator: &Aggregator,
infrastructure: &MithrilInfrastructure,
) -> StdResult<Epoch> {
let expected_epoch_min = target_epoch - 3;
// Verify that mithril stake distribution artifacts are produced and signed correctly
{
let hash =
assertions::assert_node_producing_mithril_stake_distribution(&aggregator_endpoint)
.await?;
assertions::assert_node_producing_mithril_stake_distribution(aggregator).await?;
let certificate_hash = assertions::assert_signer_is_signing_mithril_stake_distribution(
&aggregator_endpoint,
aggregator,
&hash,
expected_epoch_min,
)
.await?;
assertions::assert_is_creating_certificate_with_enough_signers(
&aggregator_endpoint,
aggregator,
&certificate_hash,
self.infrastructure.signers().len(),
infrastructure.signers().len(),
)
.await?;
let mut client = self.infrastructure.build_client()?;
let mut client = infrastructure.build_client(aggregator).await?;
assertions::assert_client_can_verify_mithril_stake_distribution(&mut client, &hash)
.await?;
}

// Verify that snapshot artifacts are produced and signed correctly
{
let digest = assertions::assert_node_producing_snapshot(&aggregator_endpoint).await?;
let digest = assertions::assert_node_producing_snapshot(aggregator).await?;
let certificate_hash = assertions::assert_signer_is_signing_snapshot(
&aggregator_endpoint,
aggregator,
&digest,
expected_epoch_min,
)
.await?;

assertions::assert_is_creating_certificate_with_enough_signers(
&aggregator_endpoint,
aggregator,
&certificate_hash,
self.infrastructure.signers().len(),
infrastructure.signers().len(),
)
.await?;

let mut client = self.infrastructure.build_client()?;
let mut client = infrastructure.build_client(aggregator).await?;
assertions::assert_client_can_verify_snapshot(&mut client, &digest).await?;
}

// Verify that Cardano database snapshot artifacts are produced and signed correctly
if self.is_signing_cardano_database {
let hash =
assertions::assert_node_producing_cardano_database_snapshot(&aggregator_endpoint)
.await?;
assertions::assert_node_producing_cardano_database_snapshot(aggregator).await?;
let certificate_hash = assertions::assert_signer_is_signing_cardano_database_snapshot(
&aggregator_endpoint,
aggregator,
&hash,
expected_epoch_min,
)
.await?;

assertions::assert_is_creating_certificate_with_enough_signers(
&aggregator_endpoint,
aggregator,
&certificate_hash,
self.infrastructure.signers().len(),
infrastructure.signers().len(),
)
.await?;

assertions::assert_node_producing_cardano_database_digests_map(&aggregator_endpoint)
.await?;
assertions::assert_node_producing_cardano_database_digests_map(aggregator).await?;

let mut client = self.infrastructure.build_client()?;
let mut client = infrastructure.build_client(aggregator).await?;
assertions::assert_client_can_verify_cardano_database(&mut client, &hash).await?;
}

// Verify that Cardano transactions artifacts are produced and signed correctly
if self.is_signing_cardano_transactions {
let hash = assertions::assert_node_producing_cardano_transactions(&aggregator_endpoint)
.await?;
let hash = assertions::assert_node_producing_cardano_transactions(aggregator).await?;
let certificate_hash = assertions::assert_signer_is_signing_cardano_transactions(
&aggregator_endpoint,
aggregator,
&hash,
expected_epoch_min,
)
.await?;

assertions::assert_is_creating_certificate_with_enough_signers(
&aggregator_endpoint,
aggregator,
&certificate_hash,
self.infrastructure.signers().len(),
infrastructure.signers().len(),
)
.await?;

let transaction_hashes = self
.infrastructure
let transaction_hashes = infrastructure
.devnet()
.mithril_payments_transaction_hashes()?;
let mut client = self.infrastructure.build_client()?;
let mut client = infrastructure.build_client(aggregator).await?;
assertions::assert_client_can_verify_transactions(&mut client, transaction_hashes)
.await?;
}

// Verify that Cardano stake distribution artifacts are produced and signed correctly
if self.is_signing_cardano_stake_distribution {
{
let (hash, epoch) = assertions::assert_node_producing_cardano_stake_distribution(
&aggregator_endpoint,
)
.await?;
let (hash, epoch) =
assertions::assert_node_producing_cardano_stake_distribution(aggregator)
.await?;
let certificate_hash =
assertions::assert_signer_is_signing_cardano_stake_distribution(
&aggregator_endpoint,
aggregator,
&hash,
expected_epoch_min,
)
.await?;
assertions::assert_is_creating_certificate_with_enough_signers(
&aggregator_endpoint,
aggregator,
&certificate_hash,
self.infrastructure.signers().len(),
infrastructure.signers().len(),
)
.await?;

let mut client = self.infrastructure.build_client()?;
let mut client = infrastructure.build_client(aggregator).await?;
assertions::assert_client_can_verify_cardano_stake_distribution(
&mut client,
&hash,
118 changes: 77 additions & 41 deletions mithril-test-lab/mithril-end-to-end/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::{anyhow, Context};
use clap::{CommandFactory, Parser, Subcommand};
use mithril_relay::SignerRelayMode;
use slog::{Drain, Level, Logger};
use slog_scope::{error, info};
use std::{
@@ -12,7 +13,7 @@ use std::{
use thiserror::Error;
use tokio::{
signal::unix::{signal, SignalKind},
sync::Mutex,
sync::{Mutex, RwLock},
task::JoinSet,
};

@@ -51,9 +52,13 @@ pub struct Args {
#[clap(long, default_value = ".")]
bin_directory: PathBuf,

/// Number of Pool nodes in the devnet
#[clap(long, default_value_t = 3, value_parser = has_at_least_two_pool_nodes)]
number_of_pool_nodes: u8,
/// Number of aggregators
#[clap(long, default_value_t = 1, value_parser = clap::value_parser!(u8).range(1..))]
number_of_aggregators: u8,

/// Number of signers
#[clap(long, default_value_t = 2, value_parser = clap::value_parser!(u8).range(1..))]
number_of_signers: u8,

/// Length of a Cardano slot in the devnet (in s)
#[clap(long, default_value_t = 0.10)]
@@ -103,11 +108,19 @@ pub struct Args {
#[clap(long)]
run_only: bool,

/// Enable P2P network mode
/// Use Mithril relays
#[clap(long)]
use_p2p_network: bool,
use_relays: bool,

/// Signer registration relay mode (used only when 'use_relays' is set)
#[clap(long, value_enum, default_value_t = SignerRelayMode::Passthrough)]
relay_signer_registration_mode: SignerRelayMode,

/// Enable P2P passive relays in P2P mode
/// Signature registration relay mode (used only when 'use_relays' is set)
#[clap(long, value_enum, default_value_t = SignerRelayMode::P2P)]
relay_signature_registration_mode: SignerRelayMode,

/// Enable P2P passive relays in P2P mode (used only when 'use_relays' is set)
#[clap(long, default_value = "true")]
use_p2p_passive_relays: bool,

@@ -135,17 +148,15 @@ impl Args {
_ => Level::Trace,
}
}
}

fn has_at_least_two_pool_nodes(s: &str) -> Result<u8, String> {
let number_of_pool_nodes: u8 = s.parse().map_err(|_| format!("`{}` isn't a number", s))?;
if number_of_pool_nodes >= 2 {
Ok(number_of_pool_nodes)
} else {
Err(format!(
"At least two pool nodes are required (one for the aggregator, one for at least one \
signer), number given: {s}",
))
fn validate(&self) -> StdResult<()> {
if !self.use_relays && self.number_of_aggregators >= 2 {
return Err(anyhow!(
"The 'use_relays' parameter must be activated to run more than one aggregator"
));
}

Ok(())
}
}

@@ -190,16 +201,21 @@ async fn main_exec() -> StdResult<()> {
};
let artifacts_dir = {
let path = work_dir.join("artifacts");
fs::create_dir(&path).expect("Artifacts dir creation failure");
fs::create_dir(&path).with_context(|| "Artifacts dir creation failure")?;
path
};
let store_dir = {
let path = work_dir.join("stores");
fs::create_dir(&path).with_context(|| "Stores dir creation failure")?;
path
};

let mut app = App::new();
let mut app_stopper = AppStopper::new(&app);
let mut join_set = JoinSet::new();
with_gracefull_shutdown(&mut join_set);
with_graceful_shutdown(&mut join_set);

join_set.spawn(async move { app.run(args, work_dir, artifacts_dir).await });
join_set.spawn(async move { app.run(args, work_dir, store_dir, artifacts_dir).await });

let res = match join_set.join_next().await {
Some(Ok(tasks_result)) => tasks_result,
@@ -274,27 +290,27 @@ impl From<StdResult<()>> for AppResult {

struct App {
devnet: Arc<Mutex<Option<Devnet>>>,
infrastructure: Arc<Mutex<Option<MithrilInfrastructure>>>,
infrastructure: Arc<RwLock<Option<MithrilInfrastructure>>>,
}

impl App {
fn new() -> Self {
Self {
devnet: Arc::new(Mutex::new(None)),
infrastructure: Arc::new(Mutex::new(None)),
infrastructure: Arc::new(RwLock::new(None)),
}
}

async fn tail_logs(&self) {
if let Some(infrastructure) = self.infrastructure.lock().await.as_ref() {
if let Some(infrastructure) = self.infrastructure.read().await.as_ref() {
let _ = infrastructure.tail_logs(40).await.inspect_err(|e| {
error!("Failed to tail logs: {}", e);
});
}
}

async fn last_error_in_logs(&self) {
if let Some(infrastructure) = self.infrastructure.lock().await.as_ref() {
if let Some(infrastructure) = self.infrastructure.read().await.as_ref() {
let _ = infrastructure.last_error_in_logs(1).await.inspect_err(|e| {
error!("Failed to grep error in logs: {}", e);
});
@@ -305,17 +321,22 @@ impl App {
&mut self,
args: Args,
work_dir: PathBuf,
store_dir: PathBuf,
artifacts_dir: PathBuf,
) -> StdResult<()> {
let server_port = 8080;
args.validate()?;
let run_only_mode = args.run_only;
let use_p2p_network_mode = args.use_p2p_network;
let use_relays = args.use_relays;
let relay_signer_registration_mode = args.relay_signer_registration_mode;
let relay_signature_registration_mode = args.relay_signature_registration_mode;

let use_p2p_passive_relays = args.use_p2p_passive_relays;

let devnet = Devnet::bootstrap(&DevnetBootstrapArgs {
devnet_scripts_dir: args.devnet_scripts_directory,
artifacts_target_dir: work_dir.join("devnet"),
number_of_pool_nodes: args.number_of_pool_nodes,
number_of_pool_nodes: args.number_of_aggregators + args.number_of_signers,
cardano_slot_length: args.cardano_slot_length,
cardano_epoch_length: args.cardano_epoch_length,
cardano_node_version: args.cardano_node_version.to_owned(),
@@ -325,40 +346,43 @@ impl App {
.await?;
*self.devnet.lock().await = Some(devnet.clone());

let mut infrastructure = MithrilInfrastructure::start(&MithrilInfrastructureConfig {
let infrastructure = MithrilInfrastructure::start(&MithrilInfrastructureConfig {
number_of_aggregators: args.number_of_aggregators,
number_of_signers: args.number_of_signers,
server_port,
devnet: devnet.clone(),
artifacts_dir,
work_dir,
store_dir,
artifacts_dir,
bin_dir: args.bin_directory,
cardano_node_version: args.cardano_node_version,
mithril_run_interval: args.mithril_run_interval,
mithril_era: args.mithril_era,
mithril_era_reader_adapter: args.mithril_era_reader_adapter,
signed_entity_types: args.signed_entity_types.clone(),
run_only_mode,
use_p2p_network_mode,
use_relays,
relay_signer_registration_mode,
relay_signature_registration_mode,
use_p2p_passive_relays,
use_era_specific_work_dir: args.mithril_next_era.is_some(),
})
.await?;
*self.infrastructure.write().await = Some(infrastructure);

let runner: StdResult<()> = match run_only_mode {
true => {
let mut run_only = RunOnly::new(&mut infrastructure);
run_only.start().await
}
true => RunOnly::new(self.infrastructure.clone()).run().await,
false => {
let mut spec = Spec::new(
&mut infrastructure,
Spec::new(
self.infrastructure.clone(),
args.signed_entity_types,
args.mithril_next_era,
args.mithril_era_regenesis_on_switch,
);
spec.run().await
)
.run()
.await
}
};
*self.infrastructure.lock().await = Some(infrastructure);

match runner.with_context(|| "Mithril End to End test failed") {
Ok(()) if run_only_mode => loop {
@@ -377,7 +401,7 @@ impl App {

struct AppStopper {
devnet: Arc<Mutex<Option<Devnet>>>,
infrastructure: Arc<Mutex<Option<MithrilInfrastructure>>>,
infrastructure: Arc<RwLock<Option<MithrilInfrastructure>>>,
}

impl AppStopper {
@@ -389,7 +413,7 @@ impl AppStopper {
}

pub async fn stop(&mut self) {
if let Some(infrastructure) = self.infrastructure.lock().await.as_mut() {
if let Some(infrastructure) = self.infrastructure.write().await.as_mut() {
let _ = infrastructure.stop_nodes().await.inspect_err(|e| {
error!("Failed to stop nodes: {}", e);
});
@@ -422,7 +446,7 @@ fn create_workdir_if_not_exist_clean_otherwise(work_dir: &Path) {
#[error("Signal received: `{0}`")]
pub struct SignalError(pub String);

fn with_gracefull_shutdown(join_set: &mut JoinSet<StdResult<()>>) {
fn with_graceful_shutdown(join_set: &mut JoinSet<StdResult<()>>) {
join_set.spawn(async move {
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal");
sigterm.recv().await;
@@ -487,4 +511,16 @@ mod tests {
AppResult::Cancelled(_)
));
}

#[test]
fn args_fails_validation() {
let args = Args::parse_from(["", "--number-of-aggregators", "2"]);
args.validate().expect_err(
"validate should fail with more than one aggregator if p2p network is not used",
);

let args = Args::parse_from(["", "--use-relays", "--number-of-aggregators", "2"]);
args.validate()
.expect("validate should succeed with more than one aggregator if p2p network is used");
}
}
130 changes: 92 additions & 38 deletions mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::utils::MithrilCommand;
use crate::{
PoolNode, DEVNET_MAGIC_ID, ERA_MARKERS_SECRET_KEY, ERA_MARKERS_VERIFICATION_KEY,
GENESIS_SECRET_KEY, GENESIS_VERIFICATION_KEY,
PoolNode, RetryableDevnetError, DEVNET_MAGIC_ID, ERA_MARKERS_SECRET_KEY,
ERA_MARKERS_VERIFICATION_KEY, GENESIS_SECRET_KEY, GENESIS_VERIFICATION_KEY,
};
use anyhow::{anyhow, Context};
use mithril_common::era::SupportedEra;
@@ -10,15 +10,20 @@ use slog_scope::info;
use std::cmp;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tokio::process::Child;
use tokio::sync::RwLock;

#[derive(Debug)]
pub struct AggregatorConfig<'a> {
pub index: usize,
pub name: &'a str,
pub server_port: u64,
pub pool_node: &'a PoolNode,
pub cardano_cli_path: &'a Path,
pub work_dir: &'a Path,
pub store_dir: &'a Path,
pub artifacts_dir: &'a Path,
pub bin_dir: &'a Path,
pub cardano_node_version: &'a str,
@@ -28,14 +33,18 @@ pub struct AggregatorConfig<'a> {
pub mithril_era_marker_address: &'a str,
pub signed_entity_types: &'a [String],
pub chain_observer_type: &'a str,
pub master_aggregator_endpoint: &'a Option<String>,
}

#[derive(Debug)]
pub struct Aggregator {
index: usize,
name_suffix: String,
server_port: u64,
db_directory: PathBuf,
command: MithrilCommand,
process: Option<Child>,
mithril_run_interval: u32,
command: Arc<RwLock<MithrilCommand>>,
process: RwLock<Option<Child>>,
}

impl Aggregator {
@@ -57,7 +66,7 @@ impl Aggregator {
let signed_entity_types = aggregator_config.signed_entity_types.join(",");
let mithril_run_interval = format!("{}", aggregator_config.mithril_run_interval);
let public_server_url = format!("http://localhost:{server_port_parameter}/aggregator");
let env = HashMap::from([
let mut env = HashMap::from([
("NETWORK", "devnet"),
("RUN_INTERVAL", &mithril_run_interval),
("SERVER_IP", "0.0.0.0"),
@@ -71,7 +80,10 @@ impl Aggregator {
aggregator_config.artifacts_dir.to_str().unwrap(),
),
("NETWORK_MAGIC", &magic_id),
("DATA_STORES_DIRECTORY", "./stores/aggregator"),
(
"DATA_STORES_DIRECTORY",
aggregator_config.store_dir.to_str().unwrap(),
),
(
"CARDANO_NODE_SOCKET_PATH",
aggregator_config.pool_node.socket_path.to_str().unwrap(),
@@ -103,6 +115,9 @@ impl Aggregator {
("CARDANO_TRANSACTIONS_SIGNING_CONFIG__STEP", "15"),
("PERSIST_USAGE_REPORT_INTERVAL_IN_SECONDS", "3"),
]);
if let Some(master_aggregator_endpoint) = aggregator_config.master_aggregator_endpoint {
env.insert("MASTER_AGGREGATOR_ENDPOINT", master_aggregator_endpoint);
}
let args = vec![
"--db-directory",
aggregator_config.pool_node.db_path.to_str().unwrap(),
@@ -118,22 +133,44 @@ impl Aggregator {
)?;

Ok(Self {
index: aggregator_config.index,
name_suffix: aggregator_config.name.to_string(),
server_port: aggregator_config.server_port,
db_directory: aggregator_config.pool_node.db_path.clone(),
command,
process: None,
mithril_run_interval: aggregator_config.mithril_run_interval,
command: Arc::new(RwLock::new(command)),
process: RwLock::new(None),
})
}

pub fn name_suffix(index: usize) -> String {
format!("{}", index + 1)
}

pub fn copy_configuration(other: &Aggregator) -> Self {
Self {
index: other.index,
name_suffix: other.name_suffix.clone(),
server_port: other.server_port,
db_directory: other.db_directory.clone(),
mithril_run_interval: other.mithril_run_interval,
command: other.command.clone(),
process: None,
process: RwLock::new(None),
}
}

pub fn is_first(&self) -> bool {
self.index == 0
}

pub fn index(&self) -> usize {
self.index
}

pub fn name(&self) -> String {
format!("mithril-aggregator-{}", self.name_suffix)
}

pub fn endpoint(&self) -> String {
format!("http://localhost:{}/aggregator", &self.server_port)
}
@@ -142,16 +179,23 @@ impl Aggregator {
&self.db_directory
}

pub fn serve(&mut self) -> StdResult<()> {
self.process = Some(self.command.start(&["serve".to_string()])?);
pub fn mithril_run_interval(&self) -> u32 {
self.mithril_run_interval
}

pub async fn serve(&self) -> StdResult<()> {
let mut command = self.command.write().await;
command.set_log_name(&format!("mithril-aggregator-{}", self.name_suffix));
let mut process = self.process.write().await;
*process = Some(command.start(&["serve".to_string()])?);
Ok(())
}

pub async fn bootstrap_genesis(&mut self) -> StdResult<()> {
pub async fn bootstrap_genesis(&self) -> StdResult<()> {
// Clone the command so we can alter it without affecting the original
let mut command = self.command.clone();
let process_name = "mithril-aggregator-genesis-bootstrap";
command.set_log_name(process_name);
let mut command = self.command.write().await;
let command_name = &format!("mithril-aggregator-genesis-bootstrap-{}", self.name_suffix,);
command.set_log_name(command_name);

let exit_status = command
.start(&["genesis".to_string(), "bootstrap".to_string()])?
@@ -162,7 +206,7 @@ impl Aggregator {
if exit_status.success() {
Ok(())
} else {
self.command.tail_logs(Some(process_name), 40).await?;
command.tail_logs(Some(command_name), 40).await?;

Err(match exit_status.code() {
Some(c) => {
@@ -172,23 +216,25 @@ impl Aggregator {
anyhow!("`mithril-aggregator genesis bootstrap` was terminated with a signal")
}
})
.map_err(|e| anyhow!(RetryableDevnetError(e.to_string())))
}
}

pub async fn stop(&mut self) -> StdResult<()> {
if let Some(process) = self.process.as_mut() {
info!("Stopping aggregator");
process
pub async fn stop(&self) -> StdResult<()> {
let mut process = self.process.write().await;
if let Some(mut process_running) = process.take() {
info!("Stopping {}", self.name());
process_running
.kill()
.await
.with_context(|| "Could not kill aggregator")?;
self.process = None;
*process = None;
}
Ok(())
}

pub async fn era_generate_tx_datum(
&mut self,
&self,
target_path: &Path,
mithril_era: &str,
next_era_activation_epoch: entities::Epoch,
@@ -219,8 +265,8 @@ impl Aggregator {
args.push(next_era_epoch.to_string());
}

let exit_status = self
.command
let mut command = self.command.write().await;
let exit_status = command
.start(&args)?
.wait()
.await
@@ -242,45 +288,53 @@ impl Aggregator {
}
}

pub fn set_protocol_parameters(&mut self, protocol_parameters: &entities::ProtocolParameters) {
self.command.set_env_var(
pub async fn set_protocol_parameters(
&self,
protocol_parameters: &entities::ProtocolParameters,
) {
let mut command = self.command.write().await;
command.set_env_var(
"PROTOCOL_PARAMETERS__K",
&format!("{}", protocol_parameters.k),
);
self.command.set_env_var(
command.set_env_var(
"PROTOCOL_PARAMETERS__M",
&format!("{}", protocol_parameters.m),
);
self.command.set_env_var(
command.set_env_var(
"PROTOCOL_PARAMETERS__PHI_F",
&format!("{}", protocol_parameters.phi_f),
);
}

pub fn set_mock_cardano_cli_file_path(
&mut self,
pub async fn set_mock_cardano_cli_file_path(
&self,
stake_distribution_file: &Path,
epoch_file_path: &Path,
) {
self.command.set_env_var(
let mut command = self.command.write().await;
command.set_env_var(
"MOCK_STAKE_DISTRIBUTION_FILE",
stake_distribution_file.to_str().unwrap(),
);
self.command
.set_env_var("MOCK_EPOCH_FILE", epoch_file_path.to_str().unwrap());
command.set_env_var("MOCK_EPOCH_FILE", epoch_file_path.to_str().unwrap());
}

/// Change the run interval of the aggregator state machine (default: 400ms)
pub fn change_run_interval(&mut self, interval: Duration) {
self.command
.set_env_var("RUN_INTERVAL", &format!("{}", interval.as_millis()))
pub async fn change_run_interval(&self, interval: Duration) {
let mut command = self.command.write().await;
command.set_env_var("RUN_INTERVAL", &format!("{}", interval.as_millis()))
}

pub async fn tail_logs(&self, number_of_line: u64) -> StdResult<()> {
self.command.tail_logs(None, number_of_line).await
let command = self.command.write().await;
command.tail_logs(Some(&self.name()), number_of_line).await
}

pub async fn last_error_in_logs(&self, number_of_error: u64) -> StdResult<()> {
self.command.last_error_in_logs(None, number_of_error).await
let command = self.command.write().await;
command
.last_error_in_logs(Some(&self.name()), number_of_error)
.await
}
}
398 changes: 272 additions & 126 deletions mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -6,29 +6,36 @@ use tokio::process::Child;

#[derive(Debug)]
pub struct RelayAggregator {
name_suffix: String,
listen_port: u64,
command: MithrilCommand,
process: Option<Child>,
}

impl RelayAggregator {
pub fn new(
name: String,
listen_port: u64,
dial_to: Option<String>,
aggregator_endpoint: &str,
work_dir: &Path,
bin_dir: &Path,
) -> StdResult<Self> {
let listen_port_str = format!("{listen_port}");
let env = HashMap::from([
let mut env = HashMap::from([
("LISTEN_PORT", listen_port_str.as_str()),
("AGGREGATOR_ENDPOINT", aggregator_endpoint),
]);
if let Some(dial_to) = &dial_to {
env.insert("DIAL_TO", dial_to);
}
let args = vec!["-vvv", "aggregator"];

let mut command = MithrilCommand::new("mithril-relay", work_dir, bin_dir, env, &args)?;
command.set_log_name("mithril-relay-aggregator");
command.set_log_name(&format!("mithril-relay-aggregator-{}", name,));

Ok(Self {
name_suffix: name,
listen_port,
command,
process: None,
@@ -39,14 +46,21 @@ impl RelayAggregator {
format!("/ip4/127.0.0.1/tcp/{}", self.listen_port)
}

pub fn name_suffix(&self) -> String {
self.name_suffix.clone()
}

pub fn start(&mut self) -> StdResult<()> {
self.process = Some(self.command.start(&[])?);
Ok(())
}

pub async fn tail_logs(&self, number_of_line: u64) -> StdResult<()> {
self.command
.tail_logs(Some("mithril-relay-aggregator"), number_of_line)
.tail_logs(
Some(&format!("mithril-relay-aggregator-{}", self.name_suffix())),
number_of_line,
)
.await
}
}
10 changes: 5 additions & 5 deletions mithril-test-lab/mithril-end-to-end/src/mithril/relay_passive.rs
Original file line number Diff line number Diff line change
@@ -15,16 +15,16 @@ pub struct RelayPassive {
impl RelayPassive {
pub fn new(
listen_port: u64,
dial_to: String,
dial_to: Option<String>,
relay_id: String,
work_dir: &Path,
bin_dir: &Path,
) -> StdResult<Self> {
let listen_port_str = format!("{listen_port}");
let env = HashMap::from([
("LISTEN_PORT", listen_port_str.as_str()),
("DIAL_TO", &dial_to),
]);
let mut env = HashMap::from([("LISTEN_PORT", listen_port_str.as_str())]);
if let Some(dial_to) = &dial_to {
env.insert("DIAL_TO", dial_to);
}
let args = vec!["-vvv", "passive"];

let mut command = MithrilCommand::new("mithril-relay", work_dir, bin_dir, env, &args)?;
63 changes: 44 additions & 19 deletions mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
use crate::utils::MithrilCommand;
use mithril_common::entities::PartyId;
use mithril_common::StdResult;
use mithril_relay::SignerRelayMode;
use std::collections::HashMap;
use std::path::Path;
use tokio::process::Child;

pub struct RelaySignerConfiguration<'a> {
pub listen_port: u64,
pub server_port: u64,
pub dial_to: Option<String>,
pub relay_signer_registration_mode: SignerRelayMode,
pub relay_signature_registration_mode: SignerRelayMode,
pub aggregator_endpoint: &'a str,
pub party_id: PartyId,
pub work_dir: &'a Path,
pub bin_dir: &'a Path,
}

#[derive(Debug)]
pub struct RelaySigner {
listen_port: u64,
@@ -15,33 +28,45 @@ pub struct RelaySigner {
}

impl RelaySigner {
pub fn new(
listen_port: u64,
server_port: u64,
dial_to: String,
aggregator_endpoint: &str,
party_id: PartyId,
work_dir: &Path,
bin_dir: &Path,
) -> StdResult<Self> {
let listen_port_str = format!("{listen_port}");
let server_port_str = format!("{server_port}");
let env = HashMap::from([
pub fn new(configuration: &RelaySignerConfiguration) -> StdResult<Self> {
let listen_port_str = format!("{}", configuration.listen_port);
let server_port_str = format!("{}", configuration.server_port);
let relay_signer_registration_mode =
configuration.relay_signer_registration_mode.to_string();
let relay_signature_registration_mode =
configuration.relay_signature_registration_mode.to_string();
let mut env = HashMap::from([
("LISTEN_PORT", listen_port_str.as_str()),
("SERVER_PORT", server_port_str.as_str()),
("AGGREGATOR_ENDPOINT", aggregator_endpoint),
("DIAL_TO", &dial_to),
("AGGREGATOR_ENDPOINT", configuration.aggregator_endpoint),
("SIGNER_REPEATER_DELAY", "100"),
(
"SIGNER_REGISTRATION_MODE",
relay_signer_registration_mode.as_str(),
),
(
"SIGNATURE_REGISTRATION_MODE",
relay_signature_registration_mode.as_str(),
),
]);
if let Some(dial_to) = &configuration.dial_to {
env.insert("DIAL_TO", dial_to);
}
let args = vec!["-vvv", "signer"];

let mut command = MithrilCommand::new("mithril-relay", work_dir, bin_dir, env, &args)?;
command.set_log_name(format!("mithril-relay-signer-{party_id}").as_str());
let mut command = MithrilCommand::new(
"mithril-relay",
configuration.work_dir,
configuration.bin_dir,
env,
&args,
)?;
command.set_log_name(format!("mithril-relay-signer-{}", configuration.party_id).as_str());

Ok(Self {
listen_port,
server_port,
party_id,
listen_port: configuration.listen_port,
server_port: configuration.server_port,
party_id: configuration.party_id.to_owned(),
command,
process: None,
})
7 changes: 5 additions & 2 deletions mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ pub struct SignerConfig<'a> {
pub pool_node: &'a PoolNode,
pub cardano_cli_path: &'a Path,
pub work_dir: &'a Path,
pub store_dir: &'a Path,
pub bin_dir: &'a Path,
pub mithril_run_interval: u32,
pub mithril_era: &'a str,
@@ -36,7 +37,6 @@ impl Signer {
pub fn new(signer_config: &SignerConfig) -> StdResult<Self> {
let party_id = signer_config.pool_node.party_id()?;
let magic_id = DEVNET_MAGIC_ID.to_string();
let data_stores_path = format!("./stores/signer-{party_id}");
let era_reader_adapter_params =
if signer_config.mithril_era_reader_adapter == "cardano-chain" {
format!(
@@ -58,7 +58,10 @@ impl Signer {
"DB_DIRECTORY",
signer_config.pool_node.db_path.to_str().unwrap(),
),
("DATA_STORES_DIRECTORY", &data_stores_path),
(
"DATA_STORES_DIRECTORY",
signer_config.store_dir.to_str().unwrap(),
),
("STORE_RETENTION_LIMIT", "10"),
("NETWORK_MAGIC", &magic_id),
(
81 changes: 63 additions & 18 deletions mithril-test-lab/mithril-end-to-end/src/run_only.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,86 @@
use crate::assertions;
use crate::MithrilInfrastructure;
use std::sync::Arc;

use anyhow::anyhow;
use mithril_common::chain_observer::ChainObserver;
use tokio::sync::RwLock;
use tokio::task::JoinSet;

use mithril_common::StdResult;

pub struct RunOnly<'a> {
pub infrastructure: &'a mut MithrilInfrastructure,
use crate::{assertions, Aggregator, MithrilInfrastructure};

pub struct RunOnly {
pub infrastructure: Arc<RwLock<Option<MithrilInfrastructure>>>,
}

impl<'a> RunOnly<'a> {
pub fn new(infrastructure: &'a mut MithrilInfrastructure) -> Self {
impl RunOnly {
pub fn new(infrastructure: Arc<RwLock<Option<MithrilInfrastructure>>>) -> Self {
Self { infrastructure }
}

pub async fn start(&mut self) -> StdResult<()> {
let aggregator_endpoint = self.infrastructure.aggregator().endpoint();
assertions::wait_for_enough_immutable(self.infrastructure.aggregator().db_directory())
.await?;
let start_epoch = self
.infrastructure
.chain_observer()
pub async fn run(self) -> StdResult<()> {
let run_only = Arc::new(self);
let mut join_set = JoinSet::new();
let infrastructure_guard = run_only.infrastructure.read().await;
let aggregators = infrastructure_guard
.as_ref()
.ok_or(anyhow!("No infrastructure found"))?
.aggregators();

for index in 0..aggregators.len() {
let run_only_clone = run_only.clone();
join_set.spawn(async move {
let infrastructure_guard = run_only_clone.infrastructure.read().await;
let infrastructure = infrastructure_guard
.as_ref()
.ok_or(anyhow!("No infrastructure found"))?;

run_only_clone
.start_aggregator(
infrastructure.aggregator(index),
infrastructure.chain_observer(index),
infrastructure,
)
.await
});
}

while let Some(res) = join_set.join_next().await {
res??;
}

Ok(())
}

pub async fn start_aggregator(
&self,
aggregator: &Aggregator,
chain_observer: Arc<dyn ChainObserver>,
infrastructure: &MithrilInfrastructure,
) -> StdResult<()> {
assertions::wait_for_enough_immutable(aggregator).await?;
let start_epoch = chain_observer
.get_current_epoch()
.await?
.unwrap_or_default();

// Wait 3 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate
let target_epoch = start_epoch + 3;
assertions::wait_for_target_epoch(
self.infrastructure.chain_observer(),
aggregator,
chain_observer,
target_epoch,
"minimal epoch for the aggregator to be able to bootstrap genesis certificate"
.to_string(),
)
.await?;
assertions::bootstrap_genesis_certificate(self.infrastructure.aggregator_mut()).await?;
assertions::wait_for_epoch_settings(&aggregator_endpoint).await?;
assertions::bootstrap_genesis_certificate(aggregator).await?;
assertions::wait_for_epoch_settings(aggregator).await?;

// Transfer some funds on the devnet to have some Cardano transactions to sign
assertions::transfer_funds(self.infrastructure.devnet()).await?;
if aggregator.is_first() {
// Transfer some funds on the devnet to have some Cardano transactions to sign
assertions::transfer_funds(infrastructure.devnet()).await?;
}

Ok(())
}
Original file line number Diff line number Diff line change
@@ -19,10 +19,13 @@ pub async fn bootstrap_aggregator(
let chain_observer_type = "cardano-cli";

let mut aggregator = Aggregator::new(&AggregatorConfig {
index: 0,
name: "genesis",
server_port: args.server_port as u64,
pool_node: &args.pool_node,
cardano_cli_path: &args.cardano_cli_path,
work_dir: &args.work_dir,
store_dir: &args.work_dir,
artifacts_dir: &args.work_dir,
bin_dir: &args.bin_dir,
cardano_node_version: "1.2.3",
@@ -32,6 +35,7 @@ pub async fn bootstrap_aggregator(
mithril_era_reader_adapter: "dummy",
signed_entity_types: &signed_entity_types,
chain_observer_type,
master_aggregator_endpoint: &None,
})
.unwrap();

@@ -40,18 +44,24 @@ pub async fn bootstrap_aggregator(

// Extremely large interval since, for the two following starts, only the http_server part
// of the aggregator is relevant as we need to send signer registrations.
aggregator.change_run_interval(Duration::from_secs(20000));
aggregator.set_mock_cardano_cli_file_path(
&args.mock_stake_distribution_file_path(),
&args.mock_epoch_file_path(),
);
aggregator.set_protocol_parameters(&signers_fixture.protocol_parameters());
aggregator
.change_run_interval(Duration::from_secs(20000))
.await;
aggregator
.set_mock_cardano_cli_file_path(
&args.mock_stake_distribution_file_path(),
&args.mock_epoch_file_path(),
)
.await;
aggregator
.set_protocol_parameters(&signers_fixture.protocol_parameters())
.await;

info!(
">> Starting the aggregator with a large run interval to call the http_server\
without being bothered by the state machine cycles"
);
aggregator.serve().unwrap();
aggregator.serve().await.unwrap();
wait::for_aggregator_http_server_to_start(&aggregator, Duration::from_secs(10)).await?;

restart_aggregator_and_move_one_epoch_forward(&mut aggregator, current_epoch, args).await?;
@@ -95,16 +105,16 @@ pub async fn bootstrap_aggregator(

{
info!(">> Compute genesis certificate");
let mut genesis_aggregator = Aggregator::copy_configuration(&aggregator);
let genesis_aggregator = Aggregator::copy_configuration(&aggregator);
genesis_aggregator
.bootstrap_genesis()
.await
.expect("Genesis aggregator should be able to bootstrap genesis");
}

info!(">> Restart aggregator with a normal run interval");
aggregator.change_run_interval(Duration::from_secs(3));
aggregator.serve().unwrap();
aggregator.change_run_interval(Duration::from_secs(3)).await;
aggregator.serve().await.unwrap();

wait::for_aggregator_http_server_to_start(&aggregator, Duration::from_secs(10)).await?;

@@ -125,7 +135,7 @@ async fn restart_aggregator_and_move_one_epoch_forward(
fake_chain::set_epoch(&args.mock_epoch_file_path(), *current_epoch);

info!(">> Restarting the aggregator with a large run interval");
aggregator.serve().unwrap();
aggregator.serve().await.unwrap();
wait::for_aggregator_http_server_to_start(aggregator, Duration::from_secs(10)).await?;
wait::for_epoch_settings_at_epoch(aggregator, Duration::from_secs(10), *current_epoch).await?;