diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01098c87275..73cb4e626b1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -294,23 +294,34 @@ jobs: strategy: fail-fast: false matrix: - mode: ["std"] + mode: ["master-slave"] era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras) }} next_era: [""] cardano_node_version: ["10.1.3", "10.1.4", "10.2.1"] hard_fork_latest_era_at_epoch: [0] - run_id: ["#1", "#2"] - extra_args: [""] + run_id: ["#1", "#2", "#3", "#4", "#5", "#6", "#7", "#8", "#9", "#10"] + extra_args: + [ + "--number-of-aggregators=2 --use-relays --relay-signer-registration-mode=passthrough --relay-signature-registration-mode=p2p", + ] include: - # Include a test for the P2P mode - - mode: "p2p" + # Include a test for partial decentralization with master/slave signer registration and P2P signature registration + - mode: "master-slave" era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras)[0] }} next_era: [""] cardano_node_version: "10.2.1" hard_fork_latest_era_at_epoch: 0 run_id: "#1" - extra_args: "--use-p2p-network" + extra_args: "--number-of-aggregators=2 --use-relays --relay-signer-registration-mode=passthrough --relay-signature-registration-mode=p2p" + # Include a test for full dedentralization P2P signer registration and P2P signature registration + - mode: "decentralized" + era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras)[0] }} + next_era: "" + cardano_node_version: "10.1.4" + hard_fork_latest_era_at_epoch: 0 + run_id: "#1" + extra_args: "--number-of-aggregators=2 --use-relays --relay-signer-registration-mode=p2p --relay-signature-registration-mode=p2p" # Include a test for the era switch without regenesis - mode: "std" era: ${{ fromJSON(needs.build-ubuntu-X64.outputs.eras)[0] }} diff --git a/Cargo.lock b/Cargo.lock index b8df062b4d1..01f243a176d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3945,6 +3945,7 @@ dependencies = [ "slog-bunyan", "slog-scope", "slog-term", + "strum", "thiserror 2.0.11", "tokio", "warp", diff --git a/mithril-aggregator/src/runtime/state_machine.rs b/mithril-aggregator/src/runtime/state_machine.rs index 4669b5e89c1..6921cd3ba14 100644 --- a/mithril-aggregator/src/runtime/state_machine.rs +++ b/mithril-aggregator/src/runtime/state_machine.rs @@ -143,7 +143,6 @@ impl AggregatorRuntime { info!(self.logger, "→ Trying to transition to READY"; "last_time_point" => ?last_time_point); let can_try_transition_from_idle_to_ready = if self.config.is_slave { - println!("Checking if slave aggregator is at the same epoch as master"); self.runner .is_slave_aggregator_at_same_epoch_as_master(&last_time_point) .await? @@ -265,18 +264,19 @@ impl AggregatorRuntime { self.runner .update_stake_distribution(&new_time_point) .await?; - if self.config.is_slave { - self.runner - .synchronize_slave_aggregator_signer_registration() - .await?; - } self.runner.inform_new_epoch(new_time_point.epoch).await?; - self.runner.upkeep(new_time_point.epoch).await?; self.runner .open_signer_registration_round(&new_time_point) .await?; self.runner.update_epoch_settings().await?; + if self.config.is_slave { + self.runner + .synchronize_slave_aggregator_signer_registration() + .await?; + // Needed to recompute epoch data for the next signing round on the slave + self.runner.inform_new_epoch(new_time_point.epoch).await?; + } self.runner.precompute_epoch_data().await?; } @@ -940,7 +940,7 @@ mod tests { runner .expect_inform_new_epoch() .with(predicate::eq(new_time_point_clone.clone().epoch)) - .once() + .times(2) .returning(|_| Ok(())); runner .expect_update_epoch_settings() diff --git a/mithril-aggregator/tests/create_certificate_slave.rs b/mithril-aggregator/tests/create_certificate_slave.rs index 68e70a3cb37..a54e01b5064 100644 --- a/mithril-aggregator/tests/create_certificate_slave.rs +++ b/mithril-aggregator/tests/create_certificate_slave.rs @@ -1,17 +1,83 @@ mod test_extensions; +use std::{collections::HashMap, ops::Range}; + use mithril_aggregator::Configuration; use mithril_common::{ entities::{ BlockNumber, CardanoTransactionsSigningConfig, ChainPoint, Epoch, ProtocolParameters, - SignedEntityType, SignedEntityTypeDiscriminants, SlotNumber, StakeDistribution, - StakeDistributionParty, TimePoint, + SignedEntityType, SignedEntityTypeDiscriminants, SlotNumber, StakeDistributionParty, + TimePoint, }, temp_dir, - test_utils::{MithrilFixtureBuilder, TempDir}, + test_utils::{ + MithrilFixture, MithrilFixtureBuilder, StakeDistributionGenerationMethod, TempDir, + }, }; use test_extensions::{utilities::get_test_dir, ExpectedCertificate, RuntimeTester}; +/// Epoch fixtures helps using the fixtures in the tests +struct EpochFixtures<'a> { + /// The fixture used for the registration of signers of the epoch + registering: &'a MithrilFixture, + /// The fixture used for the signing of the epoch + current_signing: Option<&'a MithrilFixture>, + /// The fixture used for the signing of the following epoch + next_signing: Option<&'a MithrilFixture>, +} + +/// Epoch fixtures map builders +struct EpochFixturesMapBuilder; + +impl EpochFixturesMapBuilder { + fn build_fixtures_sequence( + epochs_range: Range, + protocol_parameters: ProtocolParameters, + ) -> HashMap { + epochs_range + .map(|epoch| { + ( + Epoch(epoch as u64), + MithrilFixtureBuilder::default() + .with_signers(epoch) + .with_protocol_parameters(protocol_parameters.clone()) + .with_stake_distribution( + StakeDistributionGenerationMethod::RandomDistribution { + seed: [epoch as u8; 32], + min_stake: 10, + }, + ) + .build(), + ) + }) + .collect::>() + } + + fn build_epoch_fixtures_map( + fixtures: &HashMap, + ) -> HashMap> { + fixtures + .iter() + .map(|(Epoch(index), _fixture)| { + ( + Epoch(*index), + EpochFixtures { + registering: &fixtures[&Epoch(*index)], + next_signing: index + .checked_sub(1) + .map(Epoch) + .and_then(|e| fixtures.get(&e)), + current_signing: index + .checked_sub(2) + .map(Epoch) + .and_then(|e| fixtures.get(&e)), + }, + ) + }) + .collect::>() + } +} + #[tokio::test] async fn create_certificate_slave() { let protocol_parameters = ProtocolParameters { @@ -19,11 +85,9 @@ async fn create_certificate_slave() { m: 150, phi_f: 0.95, }; - let current_fixture = MithrilFixtureBuilder::default() - .with_signers(2) - .with_protocol_parameters(protocol_parameters.clone()) - .build(); - let current_avk = current_fixture.compute_and_encode_avk(); + let fixtures = + EpochFixturesMapBuilder::build_fixtures_sequence(1..10, protocol_parameters.clone()); + let epoch_fixtures_map = EpochFixturesMapBuilder::build_epoch_fixtures_map(&fixtures); let start_time_point = TimePoint { epoch: Epoch(1), immutable_file_number: 1, @@ -56,151 +120,194 @@ async fn create_certificate_slave() { comment!( "Epoch 1: - - the master aggregator bootstraps its genesis certificate - - the slave aggregator synchronizes signers from the master aggregator - - the slave aggregator stays in Idle state with an error as it doesn't have a genesis certificate yet - "); + - the master aggregator registers the first signers + - the master aggregator can't transition from 'Idle' to 'Ready' + - the slave aggregator can't transition from 'Idle' to 'Ready' + " + ); + let epoch_fixture = &epoch_fixtures_map[&Epoch(1)]; - comment!("Master: create signers & declare stake distribution"); + comment!("Master: update stake distribution source"); master_tester - .init_state_from_fixture(¤t_fixture) + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) .await .unwrap(); - comment!("Slave: create signers & declare stake distribution"); + comment!("Slave: update stake distribution source"); slave_tester - .chain_observer - .set_signers(current_fixture.signers_with_stake()) - .await; - - comment!("Master: bootstrap the genesis certificate"); - master_tester - .register_genesis_certificate(¤t_fixture) + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) .await .unwrap(); - assert_last_certificate_eq!( - master_tester, - ExpectedCertificate::new_genesis(Epoch(1), current_fixture.compute_and_encode_avk()) - ); - comment!("Master: start the runtime state machine"); - cycle!(master_tester, "ready"); - cycle!(master_tester, "signing"); + cycle_err!(master_tester, "idle"); comment!("Slave: start the runtime state machine"); cycle_err!(slave_tester, "idle"); - cycle_err!(slave_tester, "idle"); comment!("Master: register signers"); master_tester - .register_signers(¤t_fixture.signers_fixture()) + .register_signers(&epoch_fixture.registering.signers_fixture()) .await .unwrap(); - cycle_err!(master_tester, "signing"); + cycle_err!(master_tester, "idle"); comment!( "Epoch 2: - - the master aggregator produces a new certificate - - the slave aggregator synchronizes signers from the master aggregator - - the slave aggregator bootstraps its genesis certificate" + - the master aggregator creates its genesis certificate + - the master aggregator can't transition from 'Idle' to 'Ready' + - the slave aggregator can't transition from 'Idle' to 'Ready' + " ); + let epoch_fixture = &epoch_fixtures_map[&Epoch(2)]; + + comment!("Master: update stake distribution source"); + master_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); + + comment!("Slave: update stake distribution source"); + slave_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); comment!("Master: change the epoch"); master_tester.increase_epoch().await.unwrap(); - cycle!(master_tester, "idle"); - cycle!(master_tester, "ready"); + cycle_err!(master_tester, "idle"); comment!("Slave: change the epoch after master"); slave_tester.increase_epoch().await.unwrap(); cycle_err!(slave_tester, "idle"); - cycle_err!(slave_tester, "idle"); - comment!("Slave: bootstrap the genesis certificate"); + comment!("Master: register signers"); + master_tester + .register_signers(&epoch_fixture.registering.signers_fixture()) + .await + .unwrap(); + cycle_err!(master_tester, "idle"); + + comment!("Master: bootstrap the genesis certificate"); + master_tester + .register_genesis_certificate(epoch_fixture.next_signing.unwrap()) + .await + .unwrap(); + + assert_last_certificate_eq!( + master_tester, + ExpectedCertificate::new_genesis( + Epoch(2), + epoch_fixture.next_signing.unwrap().compute_and_encode_avk() + ) + ); + + comment!( + "Epoch 3: + - the master aggregator produces a new certificate + - the slave aggregator synchronizes signers from the master aggregator + - the slave aggregator can't transition from 'Idle' to 'Ready' + " + ); + let epoch_fixture = &epoch_fixtures_map[&Epoch(3)]; + + comment!("Master: update stake distribution source"); + master_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); + + comment!("Slave: update stake distribution source"); slave_tester - .register_genesis_certificate(¤t_fixture) + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) .await .unwrap(); + comment!("Master: change the epoch"); + master_tester.increase_epoch().await.unwrap(); + cycle!(master_tester, "ready"); + cycle!(master_tester, "signing"); + + comment!("Slave: change the epoch after master"); + slave_tester.increase_epoch().await.unwrap(); + cycle_err!(slave_tester, "idle"); + comment!("Master: register signers"); master_tester - .register_signers(¤t_fixture.signers_fixture()) + .register_signers(&epoch_fixture.registering.signers_fixture()) .await .unwrap(); - cycle!(master_tester, "signing"); comment!("Master: signers send their single signature"); master_tester .send_single_signatures( SignedEntityTypeDiscriminants::MithrilStakeDistribution, - ¤t_fixture.signers_fixture(), + &epoch_fixture.current_signing.unwrap().signers_fixture(), ) .await .unwrap(); + cycle!(master_tester, "ready"); comment!("Master: state machine should issue a certificate for the MithrilStakeDistribution"); - cycle!(master_tester, "ready"); + cycle!(master_tester, "signing"); + assert_last_certificate_eq!( master_tester, ExpectedCertificate::new( - Epoch(2), - StakeDistributionParty::from_signers(current_fixture.signers_with_stake()).as_slice(), - current_fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(2)), - ExpectedCertificate::genesis_identifier(Epoch(1)), + Epoch(3), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake() + ) + .as_slice(), + epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(3)), + ExpectedCertificate::genesis_identifier(Epoch(2)), ) ); - cycle!(master_tester, "signing"); + cycle_err!(master_tester, "signing"); comment!( - "Epoch 3: + "Epoch 4: - the master aggregator produces a new certificate - - the master aggregator stake distribution is updated - - the slave aggregator can't transition from 'Idle' to 'Ready' when the master aggregator has not transitioned to a new epoch - - the slave aggregator can transition from 'Idle' to 'Ready' when the master aggregator has transitioned to a new epoch - - the slave aggregator produces a new certificate - - the slave aggregator new certificate uses the same avk as the master aggregator's new certificate - - the slave aggregator stake distribution is updated - "); + - the slave aggregator synchronizes signers from the master aggregator + - the slave aggregator bootstraps its genesis certificate + - the slave aggregator can't transition from 'Idle' to 'Ready'" + ); + let epoch_fixture = &epoch_fixtures_map[&Epoch(4)]; - comment!("Master: update stake distribution"); - let following_fixture = { - let updated_stake_distribution = StakeDistribution::from_iter( - current_fixture - .signers_with_stake() - .into_iter() - .map(|s| (s.party_id, s.stake + 1000)), - ); - - master_tester - .update_stake_distribution(updated_stake_distribution) - .await - .unwrap() - }; - let following_avk = following_fixture.compute_and_encode_avk(); + comment!("Master: update stake distribution source"); + master_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); - comment!("Slave: update stake distribution"); + comment!("Slave: update stake distribution source"); slave_tester - .update_stake_distribution(following_fixture.stake_distribution()) + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) .await .unwrap(); - comment!("Slave: change the epoch before master"); - slave_tester.increase_epoch().await.unwrap(); - cycle!(slave_tester, "idle"); - cycle!(slave_tester, "idle"); - comment!("Master: change the epoch"); master_tester.increase_epoch().await.unwrap(); cycle!(master_tester, "idle"); cycle!(master_tester, "ready"); comment!("Slave: change the epoch after master"); - cycle!(slave_tester, "ready"); + slave_tester.increase_epoch().await.unwrap(); + cycle_err!(slave_tester, "idle"); + + comment!("Slave: bootstrap the genesis certificate"); + slave_tester + .register_genesis_certificate(epoch_fixture.next_signing.unwrap()) + .await + .unwrap(); comment!("Master: register signers"); master_tester - .register_signers(¤t_fixture.signers_fixture()) + .register_signers(&epoch_fixture.registering.signers_fixture()) .await .unwrap(); cycle!(master_tester, "signing"); @@ -209,52 +316,55 @@ async fn create_certificate_slave() { master_tester .send_single_signatures( SignedEntityTypeDiscriminants::MithrilStakeDistribution, - ¤t_fixture.signers_fixture(), - ) - .await - .unwrap(); - - comment!("Slave: signers send their single signature"); - cycle!(slave_tester, "signing"); - slave_tester - .send_single_signatures( - SignedEntityTypeDiscriminants::MithrilStakeDistribution, - ¤t_fixture.signers_fixture(), + &epoch_fixture.current_signing.unwrap().signers_fixture(), ) .await .unwrap(); comment!("Master: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(master_tester, "ready"); - let master_expected_certificate = ExpectedCertificate::new( - Epoch(3), - StakeDistributionParty::from_signers(current_fixture.signers_with_stake()).as_slice(), - current_fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(3)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(2))), - ); - assert_last_certificate_eq!(master_tester, master_expected_certificate); - comment!("Slave: state machine should issue a certificate for the MithrilStakeDistribution"); - cycle!(slave_tester, "ready"); - let slave_expected_certificate = ExpectedCertificate::new( - Epoch(3), - StakeDistributionParty::from_signers(current_fixture.signers_with_stake()).as_slice(), - current_fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(3)), - ExpectedCertificate::genesis_identifier(Epoch(2)), + assert_last_certificate_eq!( + master_tester, + ExpectedCertificate::new( + Epoch(4), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake() + ) + .as_slice(), + epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(4)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(3))), + ) ); - assert_last_certificate_eq!(slave_tester, slave_expected_certificate); - let expected_avk = current_avk.clone(); - assert_eq!(expected_avk, master_expected_certificate.avk()); - assert_eq!(expected_avk, slave_expected_certificate.avk()); + cycle!(master_tester, "signing"); comment!( - "Epoch 4: + "Epoch 5: - the master aggregator produces a new certificate - the slave aggregator produces a new certificate - the slave aggregator new certificate uses the same avk as the master aggregator's new certificate "); + let epoch_fixture = &epoch_fixtures_map[&Epoch(5)]; + + comment!("Master: update stake distribution source"); + master_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); + + comment!("Slave: update stake distribution source"); + slave_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); + + comment!("Slave: change the epoch before master"); + slave_tester.increase_epoch().await.unwrap(); + cycle!(slave_tester, "idle"); comment!("Master: change the epoch"); master_tester.increase_epoch().await.unwrap(); @@ -262,13 +372,11 @@ async fn create_certificate_slave() { cycle!(master_tester, "ready"); comment!("Slave: change the epoch after master"); - slave_tester.increase_epoch().await.unwrap(); - cycle!(slave_tester, "idle"); cycle!(slave_tester, "ready"); comment!("Master: register signers"); master_tester - .register_signers(¤t_fixture.signers_fixture()) + .register_signers(&epoch_fixture.registering.signers_fixture()) .await .unwrap(); cycle!(master_tester, "signing"); @@ -277,7 +385,7 @@ async fn create_certificate_slave() { master_tester .send_single_signatures( SignedEntityTypeDiscriminants::MithrilStakeDistribution, - ¤t_fixture.signers_fixture(), + &epoch_fixture.current_signing.unwrap().signers_fixture(), ) .await .unwrap(); @@ -287,7 +395,7 @@ async fn create_certificate_slave() { slave_tester .send_single_signatures( SignedEntityTypeDiscriminants::MithrilStakeDistribution, - ¤t_fixture.signers_fixture(), + &epoch_fixture.current_signing.unwrap().signers_fixture(), ) .await .unwrap(); @@ -295,35 +403,63 @@ async fn create_certificate_slave() { comment!("Master: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(master_tester, "ready"); let master_expected_certificate = ExpectedCertificate::new( - Epoch(4), - StakeDistributionParty::from_signers(current_fixture.signers_with_stake()).as_slice(), - current_fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(4)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(3))), + Epoch(5), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake(), + ) + .as_slice(), + epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(5)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(4))), ); assert_last_certificate_eq!(master_tester, master_expected_certificate); comment!("Slave: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(slave_tester, "ready"); let slave_expected_certificate = ExpectedCertificate::new( - Epoch(4), - StakeDistributionParty::from_signers(current_fixture.signers_with_stake()).as_slice(), - current_fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(4)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(3))), + Epoch(5), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake(), + ) + .as_slice(), + epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(5)), + ExpectedCertificate::genesis_identifier(Epoch(4)), ); assert_last_certificate_eq!(slave_tester, slave_expected_certificate); - let expected_avk = current_avk; + let expected_avk = epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk() + .clone(); assert_eq!(expected_avk, master_expected_certificate.avk()); assert_eq!(expected_avk, slave_expected_certificate.avk()); comment!( - "Epoch 5: - - the master aggregator produces a new certificate with the new stake distribution from epoch 3 - - the slave aggregator produces a new certificate with the new stake distribution from epoch 3 + "Epoch 6: + - the master aggregator produces a new certificate + - the slave aggregator produces a new certificate - the slave aggregator new certificate uses the same avk as the master aggregator's new certificate "); - let fixture = following_fixture; + let epoch_fixture = &epoch_fixtures_map[&Epoch(6)]; + + comment!("Master: update stake distribution source"); + master_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); + + comment!("Slave: update stake distribution source"); + slave_tester + .update_stake_distribution(epoch_fixture.registering.stake_distribution()) + .await + .unwrap(); comment!("Master: change the epoch"); master_tester.increase_epoch().await.unwrap(); @@ -337,7 +473,7 @@ async fn create_certificate_slave() { comment!("Master: register signers"); master_tester - .register_signers(&fixture.signers_fixture()) + .register_signers(&epoch_fixture.registering.signers_fixture()) .await .unwrap(); cycle!(master_tester, "signing"); @@ -346,7 +482,7 @@ async fn create_certificate_slave() { master_tester .send_single_signatures( SignedEntityTypeDiscriminants::MithrilStakeDistribution, - &fixture.signers_fixture(), + &epoch_fixture.current_signing.unwrap().signers_fixture(), ) .await .unwrap(); @@ -356,7 +492,7 @@ async fn create_certificate_slave() { slave_tester .send_single_signatures( SignedEntityTypeDiscriminants::MithrilStakeDistribution, - &fixture.signers_fixture(), + &epoch_fixture.current_signing.unwrap().signers_fixture(), ) .await .unwrap(); @@ -364,25 +500,41 @@ async fn create_certificate_slave() { comment!("Master: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(master_tester, "ready"); let master_expected_certificate = ExpectedCertificate::new( - Epoch(5), - StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(), - fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(5)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(4))), + Epoch(6), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake(), + ) + .as_slice(), + epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(6)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(5))), ); assert_last_certificate_eq!(master_tester, master_expected_certificate); comment!("Slave: state machine should issue a certificate for the MithrilStakeDistribution"); cycle!(slave_tester, "ready"); let slave_expected_certificate = ExpectedCertificate::new( - Epoch(5), - StakeDistributionParty::from_signers(fixture.signers_with_stake()).as_slice(), - fixture.compute_and_encode_avk(), - SignedEntityType::MithrilStakeDistribution(Epoch(5)), - ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(4))), + Epoch(6), + StakeDistributionParty::from_signers( + epoch_fixture.current_signing.unwrap().signers_with_stake(), + ) + .as_slice(), + epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk(), + SignedEntityType::MithrilStakeDistribution(Epoch(6)), + ExpectedCertificate::identifier(&SignedEntityType::MithrilStakeDistribution(Epoch(5))), ); assert_last_certificate_eq!(slave_tester, slave_expected_certificate); - let expected_avk = following_avk; + let expected_avk = epoch_fixture + .current_signing + .unwrap() + .compute_and_encode_avk() + .clone(); assert_eq!(expected_avk, master_expected_certificate.avk()); assert_eq!(expected_avk, slave_expected_certificate.avk()); } diff --git a/mithril-common/src/entities/certificate.rs b/mithril-common/src/entities/certificate.rs index f94ce800fe6..c5214791b91 100644 --- a/mithril-common/src/entities/certificate.rs +++ b/mithril-common/src/entities/certificate.rs @@ -157,7 +157,7 @@ impl Debug for Certificate { true => debug .field( "aggregate_verification_key", - &format_args!("{:?}", self.aggregate_verification_key), + &format_args!("{:?}", self.aggregate_verification_key.to_json_hex()), ) .field("signature", &format_args!("{:?}", self.signature)) .finish(), diff --git a/mithril-common/src/entities/epoch.rs b/mithril-common/src/entities/epoch.rs index cbc9133087c..e2c7469d800 100644 --- a/mithril-common/src/entities/epoch.rs +++ b/mithril-common/src/entities/epoch.rs @@ -38,7 +38,7 @@ impl Epoch { pub const CARDANO_STAKE_DISTRIBUTION_SNAPSHOT_OFFSET: u64 = 2; /// The epoch offset used to retrieve the epoch at which a signer has registered to the master aggregator. - pub const SIGNER_MASTER_SYNCHRONIZATION_OFFSET: u64 = 1; + pub const SIGNER_MASTER_SYNCHRONIZATION_OFFSET: u64 = 0; /// Computes a new Epoch by applying an epoch offset. /// diff --git a/mithril-common/src/protocol/multi_signer.rs b/mithril-common/src/protocol/multi_signer.rs index 52c52240486..c1f11af44ef 100644 --- a/mithril-common/src/protocol/multi_signer.rs +++ b/mithril-common/src/protocol/multi_signer.rs @@ -171,6 +171,7 @@ mod test { .with_signers(1) .with_stake_distribution(StakeDistributionGenerationMethod::RandomDistribution { seed: [3u8; 32], + min_stake: 1, }) .build(), ); diff --git a/mithril-common/src/protocol/signer_builder.rs b/mithril-common/src/protocol/signer_builder.rs index 8bfeeb876e2..6e3c1080b69 100644 --- a/mithril-common/src/protocol/signer_builder.rs +++ b/mithril-common/src/protocol/signer_builder.rs @@ -218,6 +218,7 @@ mod test { .with_stake_distribution( crate::test_utils::StakeDistributionGenerationMethod::RandomDistribution { seed: [4u8; 32], + min_stake: 1, }, ) .build(); diff --git a/mithril-common/src/test_utils/fixture_builder.rs b/mithril-common/src/test_utils/fixture_builder.rs index 0e46b20ce1f..cbecfb5ae1d 100644 --- a/mithril-common/src/test_utils/fixture_builder.rs +++ b/mithril-common/src/test_utils/fixture_builder.rs @@ -29,7 +29,10 @@ impl Default for MithrilFixtureBuilder { enable_signers_certification: true, number_of_signers: 5, stake_distribution_generation_method: - StakeDistributionGenerationMethod::RandomDistribution { seed: [0u8; 32] }, + StakeDistributionGenerationMethod::RandomDistribution { + seed: [0u8; 32], + min_stake: 1, + }, party_id_seed: [0u8; 32], } } @@ -41,6 +44,8 @@ pub enum StakeDistributionGenerationMethod { RandomDistribution { /// The randomizer seed seed: [u8; 32], + /// The minimum stake + min_stake: Stake, }, /// Use a custom stake distribution @@ -106,13 +111,13 @@ impl MithrilFixtureBuilder { let signers_party_ids = self.generate_party_ids(); match &self.stake_distribution_generation_method { - StakeDistributionGenerationMethod::RandomDistribution { seed } => { + StakeDistributionGenerationMethod::RandomDistribution { seed, min_stake } => { let mut stake_rng = ChaCha20Rng::from_seed(*seed); signers_party_ids .into_iter() .map(|party_id| { - let stake = 1 + stake_rng.next_u64() % 999; + let stake = min_stake + stake_rng.next_u64() % 999; (party_id, stake) }) .collect::>() @@ -246,6 +251,7 @@ mod tests { let result = MithrilFixtureBuilder::default() .with_stake_distribution(StakeDistributionGenerationMethod::RandomDistribution { seed: [0u8; 32], + min_stake: 1, }) .with_signers(4) .build(); @@ -275,6 +281,7 @@ mod tests { let result = MithrilFixtureBuilder::default() .with_stake_distribution(StakeDistributionGenerationMethod::RandomDistribution { seed: [0u8; 32], + min_stake: 1, }) .with_signers(5) .build(); diff --git a/mithril-relay/Cargo.toml b/mithril-relay/Cargo.toml index f4298188c9e..a657716cb6b 100644 --- a/mithril-relay/Cargo.toml +++ b/mithril-relay/Cargo.toml @@ -48,6 +48,7 @@ slog = { version = "2.7.0", features = [ ] } slog-async = "2.8.0" slog-bunyan = "2.5.0" +strum = { version = "0.26.3", features = ["derive"] } thiserror = "2.0.11" tokio = { version = "1.43.0", features = ["full"] } warp = "0.3.7" diff --git a/mithril-relay/src/commands/signer.rs b/mithril-relay/src/commands/signer.rs index e54d968330e..7045f5c883c 100644 --- a/mithril-relay/src/commands/signer.rs +++ b/mithril-relay/src/commands/signer.rs @@ -6,7 +6,7 @@ use mithril_common::StdResult; use slog::error; use super::CommandContext; -use crate::SignerRelay; +use crate::{SignerRelay, SignerRelayMode}; #[derive(Parser, Debug, Clone)] pub struct SignerCommand { @@ -26,6 +26,14 @@ pub struct SignerCommand { #[clap(long, env = "AGGREGATOR_ENDPOINT")] aggregator_endpoint: String, + /// Signer registration relay mode + #[clap(value_enum, env = "SIGNER_REGISTRATION_MODE", default_value_t = SignerRelayMode::Passthrough)] + signer_registration_mode: SignerRelayMode, + + /// Signature registration relay mode + #[clap(value_enum, env = "SIGNATURE_REGISTRATION_MODE", default_value_t = SignerRelayMode::P2P)] + signature_registration_mode: SignerRelayMode, + /// Interval at which a signer registration should be repeated in milliseconds (defaults to 1 hour) #[clap(long, env = "SIGNER_REPEATER_DELAY", default_value_t = 3_600 * 1_000)] signer_repeater_delay: u64, @@ -38,12 +46,16 @@ impl SignerCommand { let server_port = self.server_port.to_owned(); let dial_to = self.dial_to.to_owned(); let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?; + let signer_registration_mode = &self.signer_registration_mode; + let signature_registration_mode = &self.signature_registration_mode; let aggregator_endpoint = self.aggregator_endpoint.to_owned(); let signer_repeater_delay = Duration::from_millis(self.signer_repeater_delay); let mut relay = SignerRelay::start( &addr, &server_port, + signer_registration_mode, + signature_registration_mode, &aggregator_endpoint, &signer_repeater_delay, logger, diff --git a/mithril-relay/src/lib.rs b/mithril-relay/src/lib.rs index a6ac2d52981..52e2ecc45e6 100644 --- a/mithril-relay/src/lib.rs +++ b/mithril-relay/src/lib.rs @@ -12,6 +12,7 @@ pub use commands::RelayCommands; pub use relay::AggregatorRelay; pub use relay::PassiveRelay; pub use relay::SignerRelay; +pub use relay::SignerRelayMode; /// The P2P topic names used by Mithril pub mod mithril_p2p_topic { diff --git a/mithril-relay/src/relay/mod.rs b/mithril-relay/src/relay/mod.rs index 0d177321999..3dffee0edb5 100644 --- a/mithril-relay/src/relay/mod.rs +++ b/mithril-relay/src/relay/mod.rs @@ -5,3 +5,4 @@ mod signer; pub use aggregator::AggregatorRelay; pub use passive::PassiveRelay; pub use signer::SignerRelay; +pub use signer::SignerRelayMode; diff --git a/mithril-relay/src/relay/signer.rs b/mithril-relay/src/relay/signer.rs index fc2ae98a037..f98f56c5d81 100644 --- a/mithril-relay/src/relay/signer.rs +++ b/mithril-relay/src/relay/signer.rs @@ -2,6 +2,7 @@ use crate::{ p2p::{Peer, PeerEvent}, repeater::MessageRepeater, }; +use clap::ValueEnum; use libp2p::Multiaddr; use mithril_common::{ logging::LoggerExtensions, @@ -11,9 +12,37 @@ use mithril_common::{ }; use slog::{debug, info, Logger}; use std::{net::SocketAddr, sync::Arc, time::Duration}; +use strum::Display; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use warp::Filter; +/// Signer relay mode +/// +/// The relay mode defines how the relay will behave when it receives a message +#[derive(Debug, Clone, Display, PartialEq, Eq, ValueEnum)] +#[strum(serialize_all = "mixed_case")] +pub enum SignerRelayMode { + /// Passthrough relay mode + /// + /// In this mode, the relay will only call the aggregator with the message received + Passthrough, + /// P2P relay mode + /// + /// In this mode, the relay will publish the message received to the P2P network + P2P, +} + +struct HTTPServerConfiguration<'a> { + server_port: &'a u16, + signer_registration_mode: SignerRelayMode, + signature_registration_mode: SignerRelayMode, + aggregator_endpoint: &'a str, + signer_tx: UnboundedSender, + signature_tx: UnboundedSender, + signer_repeater: Arc>, + logger: &'a Logger, +} + /// A relay for a Mithril signer pub struct SignerRelay { server: TestHttpServer, @@ -29,12 +58,14 @@ impl SignerRelay { pub async fn start( address: &Multiaddr, server_port: &u16, + signer_registration_mode: &SignerRelayMode, + signature_registration_mode: &SignerRelayMode, aggregator_endpoint: &str, signer_repeater_delay: &Duration, logger: &Logger, ) -> StdResult { let relay_logger = logger.new_with_component_name::(); - debug!(relay_logger, "Starting..."); + debug!(relay_logger, "Starting..."; "signer_registration_mode" => ?signer_registration_mode, "signature_registration_mode" => ?signature_registration_mode); let (signature_tx, signature_rx) = unbounded_channel::(); let (signer_tx, signer_rx) = unbounded_channel::(); let signer_repeater = Arc::new(MessageRepeater::new( @@ -43,14 +74,16 @@ impl SignerRelay { logger, )); let peer = Peer::new(address).start().await?; - let server = Self::start_http_server( + let server = Self::start_http_server(&HTTPServerConfiguration { server_port, + signer_registration_mode: signer_registration_mode.to_owned(), + signature_registration_mode: signature_registration_mode.to_owned(), aggregator_endpoint, - signer_tx, - signature_tx, - signer_repeater.clone(), - logger, - ) + signer_tx: signer_tx.clone(), + signature_tx: signature_tx.clone(), + signer_repeater: signer_repeater.clone(), + logger: &relay_logger, + }) .await; info!(relay_logger, "Listening on"; "address" => ?server.address()); @@ -64,44 +97,55 @@ impl SignerRelay { }) } - async fn start_http_server( - server_port: &u16, - aggregator_endpoint: &str, - signer_tx: UnboundedSender, - signature_tx: UnboundedSender, - signer_repeater: Arc>, - logger: &Logger, - ) -> TestHttpServer { - let server_logger = logger.new_with_name("http_server"); + async fn start_http_server(configuration: &HTTPServerConfiguration<'_>) -> TestHttpServer { + let server_logger = configuration.logger.new_with_name("http_server"); test_http_server_with_socket_address( warp::path::end() .and(warp::get()) .and(middlewares::with_logger(&server_logger)) .and(middlewares::with_aggregator_endpoint( - aggregator_endpoint.to_string(), + configuration.aggregator_endpoint.to_string(), )) .and_then(handlers::aggregator_features_handler) .or(warp::path("register-signatures") .and(warp::post()) .and(warp::body::json()) + .and(middlewares::with_signer_relay_mode( + configuration.signature_registration_mode.clone(), + )) + .and(middlewares::with_aggregator_endpoint( + configuration.aggregator_endpoint.to_string(), + )) .and(middlewares::with_logger(&server_logger)) - .and(middlewares::with_transmitter(signature_tx)) + .and(middlewares::with_transmitter( + configuration.signature_tx.clone(), + )) .and_then(handlers::register_signatures_handler)) .or(warp::path("register-signer") .and(warp::post()) .and(warp::body::json()) + .and(middlewares::with_signer_relay_mode( + configuration.signer_registration_mode.clone(), + )) + .and(middlewares::with_aggregator_endpoint( + configuration.aggregator_endpoint.to_string(), + )) .and(middlewares::with_logger(&server_logger)) - .and(middlewares::with_transmitter(signer_tx)) - .and(middlewares::with_repeater(signer_repeater.clone())) + .and(middlewares::with_transmitter( + configuration.signer_tx.clone(), + )) + .and(middlewares::with_repeater( + configuration.signer_repeater.clone(), + )) .and_then(handlers::register_signer_handler)) .or(warp::path("epoch-settings") .and(warp::get()) .and(middlewares::with_logger(&server_logger)) .and(middlewares::with_aggregator_endpoint( - aggregator_endpoint.to_string(), + configuration.aggregator_endpoint.to_string(), )) .and_then(handlers::epoch_settings_handler)), - ([0, 0, 0, 0], *server_port).into(), + ([0, 0, 0, 0], *configuration.server_port).into(), ) } @@ -173,6 +217,8 @@ mod middlewares { use crate::repeater::MessageRepeater; + use super::SignerRelayMode; + pub fn with_logger( logger: &slog::Logger, ) -> impl Filter + Clone { @@ -197,6 +243,12 @@ mod middlewares { ) -> impl Filter + Clone { warp::any().map(move || aggregator_endpoint.clone()) } + + pub fn with_signer_relay_mode( + signer_relay_mode: SignerRelayMode, + ) -> impl Filter + Clone { + warp::any().map(move || signer_relay_mode.clone()) + } } mod handlers { @@ -204,11 +256,13 @@ mod handlers { use reqwest::{Error, Response}; use slog::{debug, Logger}; use std::{convert::Infallible, sync::Arc}; - use tokio::sync::mpsc::UnboundedSender; - use warp::http::StatusCode; + use tokio::sync::mpsc::{error::SendError, UnboundedSender}; + use warp::{http::StatusCode, reply::WithStatus}; use crate::repeater; + use super::SignerRelayMode; + pub async fn aggregator_features_handler( logger: Logger, aggregator_endpoint: String, @@ -223,32 +277,57 @@ mod handlers { pub async fn register_signer_handler( register_signer_message: RegisterSignerMessage, + signer_relay_mode: SignerRelayMode, + aggregator_endpoint: String, logger: Logger, tx: UnboundedSender, repeater: Arc>, ) -> Result { - debug!(logger, "Serve HTTP route /register-signer"; "register_signer_message" => #?register_signer_message); - - repeater.set_message(register_signer_message.clone()).await; - match tx.send(register_signer_message) { - Ok(_) => Ok(Box::new(warp::reply::with_status( - "".to_string(), - StatusCode::CREATED, - ))), - Err(err) => Ok(Box::new(warp::reply::with_status( - format!("{err:?}"), - StatusCode::INTERNAL_SERVER_ERROR, - ))), + debug!(logger, "Serve HTTP route /register-signer"; "signer_relay_mode" => ?signer_relay_mode, "register_signer_message" => #?register_signer_message,); + match signer_relay_mode { + SignerRelayMode::P2P => { + repeater.set_message(register_signer_message.clone()).await; + reply_response_from_tx_send_result(tx.send(register_signer_message)) + } + SignerRelayMode::Passthrough => { + let response = reqwest::Client::new() + .post(format!("{aggregator_endpoint}/register-signer")) + .json(®ister_signer_message) + .send() + .await; + reply_response(logger, response).await + } } } pub async fn register_signatures_handler( register_signature_message: RegisterSignatureMessage, + signer_relay_mode: SignerRelayMode, + aggregator_endpoint: String, logger: Logger, tx: UnboundedSender, ) -> Result { - debug!(logger, "Serve HTTP route /register-signatures"; "register_signature_message" => #?register_signature_message); - match tx.send(register_signature_message) { + debug!(logger, "Serve HTTP route /register-signatures"; "signer_relay_mode" => ?signer_relay_mode, "register_signature_message" => #?register_signature_message); + + match signer_relay_mode { + SignerRelayMode::P2P => { + reply_response_from_tx_send_result(tx.send(register_signature_message)) + } + SignerRelayMode::Passthrough => { + let response = reqwest::Client::new() + .post(format!("{aggregator_endpoint}/register-signatures")) + .json(®ister_signature_message) + .send() + .await; + reply_response(logger, response).await + } + } + } + + fn reply_response_from_tx_send_result( + result: Result<(), SendError>, + ) -> Result>, Infallible> { + match result { Ok(_) => Ok(Box::new(warp::reply::with_status( "".to_string(), StatusCode::CREATED, @@ -275,7 +354,7 @@ mod handlers { pub async fn reply_response( logger: Logger, response: Result, - ) -> Result { + ) -> Result>, Infallible> { match response { Ok(response) => match StatusCode::from_u16(response.status().into()) { Ok(status) => match response.text().await { diff --git a/mithril-relay/tests/register_signer_signature.rs b/mithril-relay/tests/register_signer_signature.rs index 958fde0b1ca..9be58788594 100644 --- a/mithril-relay/tests/register_signer_signature.rs +++ b/mithril-relay/tests/register_signer_signature.rs @@ -4,7 +4,7 @@ use libp2p::{gossipsub, Multiaddr}; use mithril_common::messages::{RegisterSignatureMessage, RegisterSignerMessage}; use mithril_relay::{ p2p::{BroadcastMessage, PeerBehaviourEvent, PeerEvent}, - PassiveRelay, SignerRelay, + PassiveRelay, SignerRelay, SignerRelayMode, }; use reqwest::StatusCode; use slog::{Drain, Level, Logger}; @@ -35,11 +35,15 @@ async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { let total_peers = 1 + total_p2p_client; let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap(); let server_port = 0; + let signer_registration_mode = SignerRelayMode::P2P; + let signature_registration_mode = SignerRelayMode::P2P; let aggregator_endpoint = "http://0.0.0.0:1234".to_string(); let signer_repeater_delay = Duration::from_secs(100); let mut signer_relay = SignerRelay::start( &addr, &server_port, + &signer_registration_mode, + &signature_registration_mode, &aggregator_endpoint, &signer_repeater_delay, &logger, diff --git a/mithril-test-lab/mithril-end-to-end/src/assertions/check.rs b/mithril-test-lab/mithril-end-to-end/src/assertions/check.rs index c09bf95c1dd..d73c4e828f8 100644 --- a/mithril-test-lab/mithril-end-to-end/src/assertions/check.rs +++ b/mithril-test-lab/mithril-end-to-end/src/assertions/check.rs @@ -18,7 +18,7 @@ use mithril_common::{ }; use crate::{ - attempt, utils::AttemptResult, CardanoDbCommand, CardanoDbV2Command, + attempt, utils::AttemptResult, Aggregator, CardanoDbCommand, CardanoDbV2Command, CardanoStakeDistributionCommand, CardanoTransactionCommand, Client, ClientCommand, MithrilStakeDistributionCommand, }; @@ -37,10 +37,13 @@ async fn get_json_response(url: String) -> StdResult StdResult { - let url = format!("{aggregator_endpoint}/artifact/mithril-stake-distributions"); - info!("Waiting for the aggregator to produce a mithril stake distribution"); + let url = format!( + "{}/artifact/mithril-stake-distributions", + aggregator.endpoint() + ); + info!("Waiting for the aggregator to produce a mithril stake distribution"; "aggregator" => &aggregator.name()); async fn fetch_last_mithril_stake_distribution_hash(url: String) -> StdResult> { match get_json_response::(url) @@ -49,7 +52,7 @@ pub async fn assert_node_producing_mithril_stake_distribution( { Ok([stake_distribution, ..]) => Ok(Some(stake_distribution.hash.clone())), Ok(&[]) => Ok(None), - Err(err) => Err(anyhow!("Invalid mithril stake distribution body : {err}",)), + Err(err) => Err(anyhow!("Invalid mithril stake distribution body: {err}",)), } } @@ -57,42 +60,51 @@ pub async fn assert_node_producing_mithril_stake_distribution( fetch_last_mithril_stake_distribution_hash(url.clone()).await }) { AttemptResult::Ok(hash) => { - info!("Aggregator produced a mithril stake distribution"; "hash" => &hash); + info!("Aggregator produced a mithril stake distribution"; "hash" => &hash, "aggregator" => &aggregator.name()); Ok(hash) } AttemptResult::Err(error) => Err(error), AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_node_producing_mithril_stake_distribution, no response from `{url}`" )), - } + }.with_context(|| { + format!( + "Requesting aggregator `{}`", + aggregator.name() + ) + }) } pub async fn assert_signer_is_signing_mithril_stake_distribution( - aggregator_endpoint: &str, + aggregator: &Aggregator, hash: &str, expected_epoch_min: Epoch, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/mithril-stake-distribution/{hash}"); + let url = format!( + "{}/artifact/mithril-stake-distribution/{hash}", + aggregator.endpoint() + ); info!( "Asserting the aggregator is signing the mithril stake distribution message `{}` with an expected min epoch of `{}`", hash, - expected_epoch_min + expected_epoch_min; + "aggregator" => &aggregator.name() ); async fn fetch_mithril_stake_distribution_message( url: String, expected_epoch_min: Epoch, ) -> StdResult> { - match get_json_response::(url) + match get_json_response::(url.clone()) .await? { Ok(stake_distribution) => match stake_distribution.epoch { epoch if epoch >= expected_epoch_min => Ok(Some(stake_distribution)), epoch => Err(anyhow!( - "Minimum expected mithril stake distribution epoch not reached : {epoch} < {expected_epoch_min}" + "Minimum expected mithril stake distribution epoch not reached: {epoch} < {expected_epoch_min}" )), }, - Err(err) => Err(anyhow!("Invalid mithril stake distribution body : {err}",)), + Err(err) => Err(anyhow!("Invalid mithril stake distribution body: {err}",)), } } @@ -100,19 +112,24 @@ pub async fn assert_signer_is_signing_mithril_stake_distribution( fetch_mithril_stake_distribution_message(url.clone(), expected_epoch_min).await }) { AttemptResult::Ok(stake_distribution) => { - info!("Signer signed a mithril stake distribution"; "certificate_hash" => &stake_distribution.certificate_hash); + info!("Signer signed a mithril stake distribution"; "certificate_hash" => &stake_distribution.certificate_hash, "aggregator" => &aggregator.name()); Ok(stake_distribution.certificate_hash) } AttemptResult::Err(error) => Err(error), AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_signer_is_signing_mithril_stake_distribution, no response from `{url}`" )), - } + }.with_context(|| { + format!( + "Requesting aggregator `{}`", + aggregator.name() + ) + }) } -pub async fn assert_node_producing_snapshot(aggregator_endpoint: &str) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/snapshots"); - info!("Waiting for the aggregator to produce a snapshot"); +pub async fn assert_node_producing_snapshot(aggregator: &Aggregator) -> StdResult { + let url = format!("{}/artifact/snapshots", aggregator.endpoint()); + info!("Waiting for the aggregator to produce a snapshot"; "aggregator" => &aggregator.name()); async fn fetch_last_snapshot_digest(url: String) -> StdResult> { match get_json_response::>(url) @@ -121,7 +138,7 @@ pub async fn assert_node_producing_snapshot(aggregator_endpoint: &str) -> StdRes { Ok([snapshot, ..]) => Ok(Some(snapshot.digest.clone())), Ok(&[]) => Ok(None), - Err(err) => Err(anyhow!("Invalid snapshot body : {err}",)), + Err(err) => Err(anyhow!("Invalid snapshot body: {err}",)), } } @@ -129,7 +146,7 @@ pub async fn assert_node_producing_snapshot(aggregator_endpoint: &str) -> StdRes fetch_last_snapshot_digest(url.clone()).await }) { AttemptResult::Ok(digest) => { - info!("Aggregator produced a snapshot"; "digest" => &digest); + info!("Aggregator produced a snapshot"; "digest" => &digest, "aggregator" => &aggregator.name()); Ok(digest) } AttemptResult::Err(error) => Err(error), @@ -137,18 +154,20 @@ pub async fn assert_node_producing_snapshot(aggregator_endpoint: &str) -> StdRes "Timeout exhausted assert_node_producing_snapshot, no response from `{url}`" )), } + .with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) } pub async fn assert_signer_is_signing_snapshot( - aggregator_endpoint: &str, + aggregator: &Aggregator, digest: &str, expected_epoch_min: Epoch, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/snapshot/{digest}"); + let url = format!("{}/artifact/snapshot/{digest}", aggregator.endpoint()); info!( "Asserting the aggregator is signing the snapshot message `{}` with an expected min epoch of `{}`", digest, - expected_epoch_min + expected_epoch_min; + "aggregator" => &aggregator.name() ); async fn fetch_snapshot_message( @@ -159,7 +178,7 @@ pub async fn assert_signer_is_signing_snapshot( Ok(snapshot) => match snapshot.beacon.epoch { epoch if epoch >= expected_epoch_min => Ok(Some(snapshot)), epoch => Err(anyhow!( - "Minimum expected snapshot epoch not reached : {epoch} < {expected_epoch_min}" + "Minimum expected snapshot epoch not reached: {epoch} < {expected_epoch_min}" )), }, Err(err) => Err(anyhow!(err).context("Invalid snapshot body")), @@ -170,21 +189,24 @@ pub async fn assert_signer_is_signing_snapshot( fetch_snapshot_message(url.clone(), expected_epoch_min).await }) { AttemptResult::Ok(snapshot) => { - info!("Signer signed a snapshot"; "certificate_hash" => &snapshot.certificate_hash); + info!("Signer signed a snapshot"; "certificate_hash" => &snapshot.certificate_hash, "aggregator" => &aggregator.name()); Ok(snapshot.certificate_hash) } - AttemptResult::Err(error) => Err(error), + AttemptResult::Err(error) => { + Err(error).with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) + } AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_signer_is_signing_snapshot, no response from `{url}`" )), } + .with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) } pub async fn assert_node_producing_cardano_database_snapshot( - aggregator_endpoint: &str, + aggregator: &Aggregator, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/cardano-database"); - info!("Waiting for the aggregator to produce a Cardano database snapshot"); + let url = format!("{}/artifact/cardano-database", aggregator.endpoint()); + info!("Waiting for the aggregator to produce a Cardano database snapshot"; "aggregator" => &aggregator.name()); async fn fetch_last_cardano_database_snapshot_hash(url: String) -> StdResult> { match get_json_response::(url) @@ -193,7 +215,7 @@ pub async fn assert_node_producing_cardano_database_snapshot( { Ok([cardano_database_snapshot, ..]) => Ok(Some(cardano_database_snapshot.hash.clone())), Ok(&[]) => Ok(None), - Err(err) => Err(anyhow!("Invalid Cardano database snapshot body : {err}",)), + Err(err) => Err(anyhow!("Invalid Cardano database snapshot body: {err}",)), } } @@ -201,7 +223,7 @@ pub async fn assert_node_producing_cardano_database_snapshot( fetch_last_cardano_database_snapshot_hash(url.clone()).await }) { AttemptResult::Ok(hash) => { - info!("Aggregator produced a Cardano database snapshot"; "hash" => &hash); + info!("Aggregator produced a Cardano database snapshot"; "hash" => &hash, "aggregator" => &aggregator.name()); Ok(hash) } AttemptResult::Err(error) => Err(error), @@ -209,18 +231,20 @@ pub async fn assert_node_producing_cardano_database_snapshot( "Timeout exhausted assert_node_producing_snapshot, no response from `{url}`" )), } + .with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) } pub async fn assert_signer_is_signing_cardano_database_snapshot( - aggregator_endpoint: &str, + aggregator: &Aggregator, hash: &str, expected_epoch_min: Epoch, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/cardano-database/{hash}"); + let url = format!("{}/artifact/cardano-database/{hash}", aggregator.endpoint()); info!( "Asserting the aggregator is signing the Cardano database snapshot message `{}` with an expected min epoch of `{}`", hash, - expected_epoch_min + expected_epoch_min; + "aggregator" => &aggregator.name() ); async fn fetch_cardano_database_snapshot_message( @@ -233,7 +257,7 @@ pub async fn assert_signer_is_signing_cardano_database_snapshot( Ok(cardano_database_snapshot) => match cardano_database_snapshot.beacon.epoch { epoch if epoch >= expected_epoch_min => Ok(Some(cardano_database_snapshot)), epoch => Err(anyhow!( - "Minimum expected Cardano database snapshot epoch not reached : {epoch} < {expected_epoch_min}" + "Minimum expected Cardano database snapshot epoch not reached: {epoch} < {expected_epoch_min}" )), }, Err(err) => Err(anyhow!(err).context("Invalid Cardano database snapshot body")), @@ -244,7 +268,7 @@ pub async fn assert_signer_is_signing_cardano_database_snapshot( fetch_cardano_database_snapshot_message(url.clone(), expected_epoch_min).await }) { AttemptResult::Ok(snapshot) => { - info!("Signer signed a snapshot"; "certificate_hash" => &snapshot.certificate_hash); + info!("Signer signed a snapshot"; "certificate_hash" => &snapshot.certificate_hash, "aggregator" => &aggregator.name()); Ok(snapshot.certificate_hash) } AttemptResult::Err(error) => Err(error), @@ -252,13 +276,17 @@ pub async fn assert_signer_is_signing_cardano_database_snapshot( "Timeout exhausted assert_signer_is_signing_snapshot, no response from `{url}`" )), } + .with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) } pub async fn assert_node_producing_cardano_database_digests_map( - aggregator_endpoint: &str, + aggregator: &Aggregator, ) -> StdResult> { - let url = format!("{aggregator_endpoint}/artifact/cardano-database/digests"); - info!("Waiting for the aggregator to produce a Cardano database digests map"); + let url = format!( + "{}/artifact/cardano-database/digests", + aggregator.endpoint() + ); + info!("Waiting for the aggregator to produce a Cardano database digests map"; "aggregator" => &aggregator.name()); async fn fetch_cardano_database_digests_map( url: String, @@ -274,7 +302,7 @@ pub async fn assert_node_producing_cardano_database_digests_map( .map(|item| (item.immutable_file_name.clone(), item.digest.clone())) .collect(), )), - Err(err) => Err(anyhow!("Invalid Cardano database digests map body : {err}",)), + Err(err) => Err(anyhow!("Invalid Cardano database digests map body: {err}",)), } } @@ -282,21 +310,26 @@ pub async fn assert_node_producing_cardano_database_digests_map( fetch_cardano_database_digests_map(url.clone()).await }) { AttemptResult::Ok(cardano_database_digests_map) => { - info!("Aggregator produced a Cardano database digests map"; "total_digests" => &cardano_database_digests_map.len()); + info!("Aggregator produced a Cardano database digests map"; "total_digests" => &cardano_database_digests_map.len(), "aggregator" => &aggregator.name()); Ok(cardano_database_digests_map) } AttemptResult::Err(error) => Err(error), AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_node_producing_cardano_database_digests_map, no response from `{url}`" )), - } + }.with_context(|| { + format!( + "Requesting aggregator `{}`", + aggregator.name() + ) + }) } pub async fn assert_node_producing_cardano_transactions( - aggregator_endpoint: &str, + aggregator: &Aggregator, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/cardano-transactions"); - info!("Waiting for the aggregator to produce a Cardano transactions artifact"); + let url = format!("{}/artifact/cardano-transactions", aggregator.endpoint()); + info!("Waiting for the aggregator to produce a Cardano transactions artifact"; "aggregator" => &aggregator.name(), "aggregator" => &aggregator.name()); async fn fetch_last_cardano_transaction_snapshot_hash( url: String, @@ -307,9 +340,7 @@ pub async fn assert_node_producing_cardano_transactions( { Ok([artifact, ..]) => Ok(Some(artifact.hash.clone())), Ok(&[]) => Ok(None), - Err(err) => Err(anyhow!( - "Invalid Cardano transactions artifact body : {err}", - )), + Err(err) => Err(anyhow!("Invalid Cardano transactions artifact body: {err}",)), } } @@ -317,7 +348,7 @@ pub async fn assert_node_producing_cardano_transactions( fetch_last_cardano_transaction_snapshot_hash(url.clone()).await }) { AttemptResult::Ok(hash) => { - info!("Aggregator produced a Cardano transactions artifact"; "hash" => &hash); + info!("Aggregator produced a Cardano transactions artifact"; "hash" => &hash, "aggregator" => &aggregator.name()); Ok(hash) } AttemptResult::Err(error) => Err(error), @@ -325,18 +356,23 @@ pub async fn assert_node_producing_cardano_transactions( "Timeout exhausted assert_node_producing_cardano_transactions, no response from `{url}`" )), } + .with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) } pub async fn assert_signer_is_signing_cardano_transactions( - aggregator_endpoint: &str, + aggregator: &Aggregator, hash: &str, expected_epoch_min: Epoch, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/cardano-transaction/{hash}"); + let url = format!( + "{}/artifact/cardano-transaction/{hash}", + aggregator.endpoint() + ); info!( "Asserting the aggregator is signing the Cardano transactions artifact `{}` with an expected min epoch of `{}`", hash, - expected_epoch_min + expected_epoch_min; + "aggregator" => &aggregator.name() ); async fn fetch_cardano_transaction_snapshot_message( @@ -347,7 +383,7 @@ pub async fn assert_signer_is_signing_cardano_transactions( Ok(artifact) => match artifact.epoch { epoch if epoch >= expected_epoch_min => Ok(Some(artifact)), epoch => Err(anyhow!( - "Minimum expected artifact epoch not reached : {epoch} < {expected_epoch_min}" + "Minimum expected artifact epoch not reached: {epoch} < {expected_epoch_min}" )), }, Err(err) => Err(anyhow!(err).context("Invalid Cardano transactions artifact body")), @@ -358,21 +394,29 @@ pub async fn assert_signer_is_signing_cardano_transactions( fetch_cardano_transaction_snapshot_message(url.clone(), expected_epoch_min).await }) { AttemptResult::Ok(artifact) => { - info!("Signer signed a Cardano transactions artifact"; "certificate_hash" => &artifact.certificate_hash); + info!("Signer signed a Cardano transactions artifact"; "certificate_hash" => &artifact.certificate_hash, "aggregator" => &aggregator.name()); Ok(artifact.certificate_hash) } AttemptResult::Err(error) => Err(error), AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_signer_is_signing_cardano_transactions, no response from `{url}`" )), - } + }.with_context(|| { + format!( + "Requesting aggregator `{}`", + aggregator.name() + ) + }) } pub async fn assert_node_producing_cardano_stake_distribution( - aggregator_endpoint: &str, + aggregator: &Aggregator, ) -> StdResult<(String, Epoch)> { - let url = format!("{aggregator_endpoint}/artifact/cardano-stake-distributions"); - info!("Waiting for the aggregator to produce a Cardano stake distribution"); + let url = format!( + "{}/artifact/cardano-stake-distributions", + aggregator.endpoint() + ); + info!("Waiting for the aggregator to produce a Cardano stake distribution"; "aggregator" => &aggregator.name()); async fn fetch_last_cardano_stake_distribution_message( url: String, @@ -386,7 +430,7 @@ pub async fn assert_node_producing_cardano_stake_distribution( stake_distribution.epoch, ))), Ok(&[]) => Ok(None), - Err(err) => Err(anyhow!("Invalid Cardano stake distribution body : {err}",)), + Err(err) => Err(anyhow!("Invalid Cardano stake distribution body: {err}",)), } } @@ -394,26 +438,35 @@ pub async fn assert_node_producing_cardano_stake_distribution( fetch_last_cardano_stake_distribution_message(url.clone()).await }) { AttemptResult::Ok((hash, epoch)) => { - info!("Aggregator produced a Cardano stake distribution"; "hash" => &hash, "epoch" => #?epoch); + info!("Aggregator produced a Cardano stake distribution"; "hash" => &hash, "epoch" => #?epoch, "aggregator" => &aggregator.name()); Ok((hash, epoch)) } AttemptResult::Err(error) => Err(error), AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_node_producing_cardano_stake_distribution, no response from `{url}`" )), - } + }.with_context(|| { + format!( + "Requesting aggregator `{}`", + aggregator.name() + ) + }) } pub async fn assert_signer_is_signing_cardano_stake_distribution( - aggregator_endpoint: &str, + aggregator: &Aggregator, hash: &str, expected_epoch_min: Epoch, ) -> StdResult { - let url = format!("{aggregator_endpoint}/artifact/cardano-stake-distribution/{hash}"); + let url = format!( + "{}/artifact/cardano-stake-distribution/{hash}", + aggregator.endpoint() + ); info!( "Asserting the aggregator is signing the Cardano stake distribution message `{}` with an expected min epoch of `{}`", hash, - expected_epoch_min + expected_epoch_min; + "aggregator" => &aggregator.name() ); async fn fetch_cardano_stake_distribution_message( @@ -426,7 +479,7 @@ pub async fn assert_signer_is_signing_cardano_stake_distribution( Ok(stake_distribution) => match stake_distribution.epoch { epoch if epoch >= expected_epoch_min => Ok(Some(stake_distribution)), epoch => Err(anyhow!( - "Minimum expected Cardano stake distribution epoch not reached : {epoch} < {expected_epoch_min}" + "Minimum expected Cardano stake distribution epoch not reached: {epoch} < {expected_epoch_min}" )), }, Err(err) => Err(anyhow!(err).context("Invalid Cardano stake distribution body",)), @@ -437,22 +490,28 @@ pub async fn assert_signer_is_signing_cardano_stake_distribution( fetch_cardano_stake_distribution_message(url.clone(), expected_epoch_min).await }) { AttemptResult::Ok(cardano_stake_distribution) => { - info!("Signer signed a Cardano stake distribution"; "certificate_hash" => &cardano_stake_distribution.certificate_hash); + info!("Signer signed a Cardano stake distribution"; "certificate_hash" => &cardano_stake_distribution.certificate_hash, "aggregator" => &aggregator.name()); Ok(cardano_stake_distribution.certificate_hash) } AttemptResult::Err(error) => Err(error), AttemptResult::Timeout() => Err(anyhow!( "Timeout exhausted assert_signer_is_signing_cardano_stake_distribution, no response from `{url}`" )), - } + }.with_context(|| { + format!( + "Requesting aggregator `{}`", + aggregator.name() + ) + }) } pub async fn assert_is_creating_certificate_with_enough_signers( - aggregator_endpoint: &str, + aggregator: &Aggregator, certificate_hash: &str, total_signers_expected: usize, ) -> StdResult<()> { - let url = format!("{aggregator_endpoint}/certificate/{certificate_hash}"); + let url = format!("{}/certificate/{certificate_hash}", aggregator.endpoint()); + info!("Waiting for the aggregator to create a certificate with enough signers"; "aggregator" => &aggregator.name()); async fn fetch_certificate_message(url: String) -> StdResult> { match get_json_response::(url).await? { @@ -470,7 +529,8 @@ pub async fn assert_is_creating_certificate_with_enough_signers( info!( "Certificate is signed by expected number of signers: {} >= {} ", certificate.metadata.signers.len(), - total_signers_expected + total_signers_expected ; + "aggregator" => &aggregator.name() ); Ok(()) } else { @@ -486,6 +546,7 @@ pub async fn assert_is_creating_certificate_with_enough_signers( "Timeout exhausted assert_is_creating_certificate, no response from `{url}`" )), } + .with_context(|| format!("Requesting aggregator `{}`", aggregator.name())) } pub async fn assert_client_can_verify_snapshot(client: &mut Client, digest: &str) -> StdResult<()> { diff --git a/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs b/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs index 4bab328576e..1a6a437acc2 100644 --- a/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/assertions/exec.rs @@ -5,28 +5,37 @@ use mithril_common::entities::{Epoch, ProtocolParameters}; use mithril_common::StdResult; use slog_scope::info; -pub async fn bootstrap_genesis_certificate(aggregator: &mut Aggregator) -> StdResult<()> { - info!("Bootstrap genesis certificate"); - - info!("> stopping aggregator"); +pub async fn bootstrap_genesis_certificate(aggregator: &Aggregator) -> StdResult<()> { + info!("Bootstrap genesis certificate"; "aggregator" => &aggregator.name()); + + // A slave aggregator needs to wait few cycles of the state machine to be able to bootstrap + // This should be removed when the aggregator is able to synchronize its certificate chain from another aggregator + if !aggregator.is_first() { + tokio::time::sleep(std::time::Duration::from_millis( + 2 * aggregator.mithril_run_interval() as u64, + )) + .await; + } + + info!("> stopping aggregator"; "aggregator" => &aggregator.name()); aggregator.stop().await?; - info!("> bootstrapping genesis using signers registered two epochs ago..."); + info!("> bootstrapping genesis using signers registered two epochs ago..."; "aggregator" => &aggregator.name()); aggregator.bootstrap_genesis().await?; - info!("> done, restarting aggregator"); - aggregator.serve()?; + info!("> done, restarting aggregator"; "aggregator" => &aggregator.name()); + aggregator.serve().await?; Ok(()) } pub async fn register_era_marker( - aggregator: &mut Aggregator, + aggregator: &Aggregator, devnet: &Devnet, mithril_era: &str, era_epoch: Epoch, ) -> StdResult<()> { - info!("Register '{mithril_era}' era marker"); + info!("Register '{mithril_era}' era marker"; "aggregator" => &aggregator.name()); - info!("> generating era marker tx datum..."); + info!("> generating era marker tx datum..."; "aggregator" => &aggregator.name()); let tx_datum_file_path = devnet .artifacts_dir() .join(PathBuf::from("era-tx-datum.txt".to_string())); @@ -34,7 +43,7 @@ pub async fn register_era_marker( .era_generate_tx_datum(&tx_datum_file_path, mithril_era, era_epoch) .await?; - info!("> writing '{mithril_era}' era marker on the Cardano chain..."); + info!("> writing '{mithril_era}' era marker on the Cardano chain..."; "aggregator" => &aggregator.name()); devnet.write_era_marker(&tx_datum_file_path).await?; Ok(()) @@ -56,8 +65,8 @@ pub async fn transfer_funds(devnet: &Devnet) -> StdResult<()> { Ok(()) } -pub async fn update_protocol_parameters(aggregator: &mut Aggregator) -> StdResult<()> { - info!("Update protocol parameters"); +pub async fn update_protocol_parameters(aggregator: &Aggregator) -> StdResult<()> { + info!("Update protocol parameters"; "aggregator" => &aggregator.name()); info!("> stopping aggregator"); aggregator.stop().await?; @@ -67,12 +76,13 @@ pub async fn update_protocol_parameters(aggregator: &mut Aggregator) -> StdResul phi_f: 0.80, }; info!( - "> updating protocol parameters to {:?}...", - protocol_parameters_new + "> updating protocol parameters to {protocol_parameters_new:?}..."; "aggregator" => &aggregator.name() ); - aggregator.set_protocol_parameters(&protocol_parameters_new); - info!("> done, restarting aggregator"); - aggregator.serve()?; + aggregator + .set_protocol_parameters(&protocol_parameters_new) + .await; + info!("> done, restarting aggregator"; "aggregator" => &aggregator.name()); + aggregator.serve().await?; Ok(()) } diff --git a/mithril-test-lab/mithril-end-to-end/src/assertions/wait.rs b/mithril-test-lab/mithril-end-to-end/src/assertions/wait.rs index 2177940be2b..9178a8b4f11 100644 --- a/mithril-test-lab/mithril-end-to-end/src/assertions/wait.rs +++ b/mithril-test-lab/mithril-end-to-end/src/assertions/wait.rs @@ -1,16 +1,16 @@ -use crate::{attempt, utils::AttemptResult}; +use crate::{attempt, utils::AttemptResult, Aggregator}; use anyhow::{anyhow, Context}; use mithril_common::{ - chain_observer::ChainObserver, digesters::ImmutableFile, entities::Epoch, - messages::EpochSettingsMessage, StdResult, + digesters::ImmutableFile, entities::Epoch, messages::EpochSettingsMessage, StdResult, }; use reqwest::StatusCode; use slog_scope::{info, warn}; -use std::{path::Path, sync::Arc, time::Duration}; +use std::time::Duration; -pub async fn wait_for_enough_immutable(db_directory: &Path) -> StdResult<()> { - info!("Waiting that enough immutable have been written in the devnet"); +pub async fn wait_for_enough_immutable(aggregator: &Aggregator) -> StdResult<()> { + info!("Waiting that enough immutable have been written in the devnet"; "aggregator" => aggregator.name()); + let db_directory = aggregator.db_directory(); match attempt!(24, Duration::from_secs(5), { match ImmutableFile::list_completed_in_dir(db_directory) .with_context(|| { @@ -34,9 +34,10 @@ pub async fn wait_for_enough_immutable(db_directory: &Path) -> StdResult<()> { } } -pub async fn wait_for_epoch_settings(aggregator_endpoint: &str) -> StdResult { +pub async fn wait_for_epoch_settings(aggregator: &Aggregator) -> StdResult { + let aggregator_endpoint = aggregator.endpoint(); let url = format!("{aggregator_endpoint}/epoch-settings"); - info!("Waiting for the aggregator to expose epoch settings"); + info!("Waiting for the aggregator to expose epoch settings"; "aggregator" => aggregator.name()); match attempt!(20, Duration::from_millis(1000), { match reqwest::get(url.clone()).await { @@ -50,10 +51,7 @@ pub async fn wait_for_epoch_settings(aggregator_endpoint: &str) -> StdResult { - warn!( - "Server error while waiting for the Aggregator, http code: {}", - s - ); + warn!( "Server error while waiting for the Aggregator, http code: {s}"; "aggregator" => aggregator.name()); Ok(None) } _ => Ok(None), @@ -69,18 +67,20 @@ pub async fn wait_for_epoch_settings(aggregator_endpoint: &str) -> StdResult, +pub async fn wait_for_aggregator_at_target_epoch( + aggregator: &Aggregator, target_epoch: Epoch, wait_reason: String, ) -> StdResult<()> { info!( "Waiting for the cardano network to be at the target epoch: {}", wait_reason; + "aggregator" => aggregator.name(), "target_epoch" => ?target_epoch ); - match attempt!(90, Duration::from_millis(1000), { - match chain_observer + match attempt!(450, Duration::from_millis(200), { + match aggregator + .chain_observer() .get_current_epoch() .await .with_context(|| "Could not query current epoch")? @@ -96,7 +96,7 @@ pub async fn wait_for_target_epoch( } }) { AttemptResult::Ok(_) => { - info!("Target epoch reached!"; "target_epoch" => ?target_epoch); + info!("Target epoch reached!"; "aggregator" => aggregator.name(), "target_epoch" => ?target_epoch); Ok(()) } AttemptResult::Err(error) => Err(error), diff --git a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs index 4dc5e8a309a..397aee961d6 100644 --- a/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs +++ b/mithril-test-lab/mithril-end-to-end/src/end_to_end_spec.rs @@ -1,12 +1,16 @@ -use crate::assertions; -use crate::MithrilInfrastructure; +use std::sync::Arc; + +use tokio::task::JoinSet; + use mithril_common::{ entities::{Epoch, SignedEntityTypeDiscriminants}, StdResult, }; -pub struct Spec<'a> { - pub infrastructure: &'a mut MithrilInfrastructure, +use crate::{assertions, Aggregator, MithrilInfrastructure}; + +pub struct Spec { + pub infrastructure: Arc, is_signing_cardano_transactions: bool, is_signing_cardano_stake_distribution: bool, is_signing_cardano_database: bool, @@ -14,9 +18,9 @@ pub struct Spec<'a> { regenesis_on_era_switch: bool, } -impl<'a> Spec<'a> { +impl Spec { pub fn new( - infrastructure: &'a mut MithrilInfrastructure, + infrastructure: Arc, signed_entity_types: Vec, next_era: Option, regenesis_on_era_switch: bool, @@ -43,78 +47,106 @@ impl<'a> Spec<'a> { } } - pub async fn run(&mut self) -> StdResult<()> { - let aggregator_endpoint = self.infrastructure.aggregator().endpoint(); - assertions::wait_for_enough_immutable(self.infrastructure.aggregator().db_directory()) - .await?; - let start_epoch = self - .infrastructure - .chain_observer() - .get_current_epoch() - .await? - .unwrap_or_default(); + pub async fn run(self) -> StdResult<()> { + let mut join_set = JoinSet::new(); + let spec = Arc::new(self); // Transfer some funds on the devnet to have some Cardano transactions to sign. // This step needs to be executed early in the process so that the transactions are available // for signing in the penultimate immutable chunk before the end of the test. // As we get closer to the tip of the chain when signing, we'll be able to relax this constraint. - assertions::transfer_funds(self.infrastructure.devnet()).await?; + assertions::transfer_funds(spec.infrastructure.devnet()).await?; + + for index in 0..spec.infrastructure.aggregators().len() { + let spec_clone = spec.clone(); + join_set.spawn(async move { + let infrastructure = &spec_clone.infrastructure; + + spec_clone + .run_scenario(infrastructure.aggregator(index), infrastructure) + .await + }); + } + + while let Some(res) = join_set.join_next().await { + res??; + } + + Ok(()) + } + + pub async fn run_scenario( + &self, + aggregator: &Aggregator, + infrastructure: &MithrilInfrastructure, + ) -> StdResult<()> { + assertions::wait_for_enough_immutable(aggregator).await?; + let chain_observer = aggregator.chain_observer(); + let start_epoch = chain_observer + .get_current_epoch() + .await? + .unwrap_or_default(); // Wait 4 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate let mut target_epoch = start_epoch + 4; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "minimal epoch for the aggregator to be able to bootstrap genesis certificate" .to_string(), ) .await?; - assertions::bootstrap_genesis_certificate(self.infrastructure.aggregator_mut()).await?; - assertions::wait_for_epoch_settings(&aggregator_endpoint).await?; + assertions::bootstrap_genesis_certificate(aggregator).await?; + assertions::wait_for_epoch_settings(aggregator).await?; // Wait 2 epochs before changing stake distribution, so that we use at least one original stake distribution target_epoch += 2; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "epoch after which the stake distribution will change".to_string(), ) .await?; - let delegation_round = 1; - assertions::delegate_stakes_to_pools(self.infrastructure.devnet(), delegation_round) - .await?; + + if aggregator.is_first() { + // Delegate some stakes to pools + let delegation_round = 1; + assertions::delegate_stakes_to_pools(infrastructure.devnet(), delegation_round).await?; + } // Wait 2 epochs before changing protocol parameters target_epoch += 2; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "epoch after which the protocol parameters will change".to_string(), ) .await?; - assertions::update_protocol_parameters(self.infrastructure.aggregator_mut()).await?; + assertions::update_protocol_parameters(aggregator).await?; // Wait 6 epochs after protocol parameters update, so that we make sure that we use new protocol parameters as well as new stake distribution a few times target_epoch += 6; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "epoch after which the certificate chain will be long enough to catch most common troubles with stake distribution and protocol parameters".to_string(), ) .await?; // Verify that artifacts are produced and signed correctly - let mut target_epoch = self.verify_artifacts_production(target_epoch).await?; + let mut target_epoch = self + .verify_artifacts_production(target_epoch, aggregator, infrastructure) + .await?; // Verify that artifacts are produced and signed correctly after era switch if let Some(next_era) = &self.next_era { // Switch to next era - self.infrastructure - .register_switch_to_next_era(next_era) - .await?; + if aggregator.is_first() { + infrastructure.register_switch_to_next_era(next_era).await?; + } target_epoch += 5; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "epoch after which the era switch will have triggered".to_string(), ) @@ -122,11 +154,10 @@ impl<'a> Spec<'a> { // Proceed to a re-genesis of the certificate chain if self.regenesis_on_era_switch { - assertions::bootstrap_genesis_certificate(self.infrastructure.aggregator_mut()) - .await?; + assertions::bootstrap_genesis_certificate(aggregator).await?; target_epoch += 5; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "epoch after which the re-genesis on era switch will be completed".to_string(), ) @@ -134,107 +165,107 @@ impl<'a> Spec<'a> { } // Verify that artifacts are produced and signed correctly - self.verify_artifacts_production(target_epoch).await?; + self.verify_artifacts_production(target_epoch, aggregator, infrastructure) + .await?; } Ok(()) } - async fn verify_artifacts_production(&self, target_epoch: Epoch) -> StdResult { - let aggregator_endpoint = self.infrastructure.aggregator().endpoint(); + async fn verify_artifacts_production( + &self, + target_epoch: Epoch, + aggregator: &Aggregator, + infrastructure: &MithrilInfrastructure, + ) -> StdResult { let expected_epoch_min = target_epoch - 3; // Verify that mithril stake distribution artifacts are produced and signed correctly { let hash = - assertions::assert_node_producing_mithril_stake_distribution(&aggregator_endpoint) - .await?; + assertions::assert_node_producing_mithril_stake_distribution(aggregator).await?; let certificate_hash = assertions::assert_signer_is_signing_mithril_stake_distribution( - &aggregator_endpoint, + aggregator, &hash, expected_epoch_min, ) .await?; assertions::assert_is_creating_certificate_with_enough_signers( - &aggregator_endpoint, + aggregator, &certificate_hash, - self.infrastructure.signers().len(), + infrastructure.signers().len(), ) .await?; - let mut client = self.infrastructure.build_client()?; + let mut client = infrastructure.build_client(aggregator).await?; assertions::assert_client_can_verify_mithril_stake_distribution(&mut client, &hash) .await?; } // Verify that snapshot artifacts are produced and signed correctly { - let digest = assertions::assert_node_producing_snapshot(&aggregator_endpoint).await?; + let digest = assertions::assert_node_producing_snapshot(aggregator).await?; let certificate_hash = assertions::assert_signer_is_signing_snapshot( - &aggregator_endpoint, + aggregator, &digest, expected_epoch_min, ) .await?; assertions::assert_is_creating_certificate_with_enough_signers( - &aggregator_endpoint, + aggregator, &certificate_hash, - self.infrastructure.signers().len(), + infrastructure.signers().len(), ) .await?; - let mut client = self.infrastructure.build_client()?; + let mut client = infrastructure.build_client(aggregator).await?; assertions::assert_client_can_verify_snapshot(&mut client, &digest).await?; } // Verify that Cardano database snapshot artifacts are produced and signed correctly if self.is_signing_cardano_database { let hash = - assertions::assert_node_producing_cardano_database_snapshot(&aggregator_endpoint) - .await?; + assertions::assert_node_producing_cardano_database_snapshot(aggregator).await?; let certificate_hash = assertions::assert_signer_is_signing_cardano_database_snapshot( - &aggregator_endpoint, + aggregator, &hash, expected_epoch_min, ) .await?; assertions::assert_is_creating_certificate_with_enough_signers( - &aggregator_endpoint, + aggregator, &certificate_hash, - self.infrastructure.signers().len(), + infrastructure.signers().len(), ) .await?; - assertions::assert_node_producing_cardano_database_digests_map(&aggregator_endpoint) - .await?; + assertions::assert_node_producing_cardano_database_digests_map(aggregator).await?; - let mut client = self.infrastructure.build_client()?; + let mut client = infrastructure.build_client(aggregator).await?; assertions::assert_client_can_verify_cardano_database(&mut client, &hash).await?; } // Verify that Cardano transactions artifacts are produced and signed correctly if self.is_signing_cardano_transactions { - let hash = assertions::assert_node_producing_cardano_transactions(&aggregator_endpoint) - .await?; + let hash = assertions::assert_node_producing_cardano_transactions(aggregator).await?; let certificate_hash = assertions::assert_signer_is_signing_cardano_transactions( - &aggregator_endpoint, + aggregator, &hash, expected_epoch_min, ) .await?; assertions::assert_is_creating_certificate_with_enough_signers( - &aggregator_endpoint, + aggregator, &certificate_hash, - self.infrastructure.signers().len(), + infrastructure.signers().len(), ) .await?; - let transaction_hashes = self - .infrastructure + let transaction_hashes = infrastructure .devnet() .mithril_payments_transaction_hashes()?; - let mut client = self.infrastructure.build_client()?; + let mut client = infrastructure.build_client(aggregator).await?; assertions::assert_client_can_verify_transactions(&mut client, transaction_hashes) .await?; } @@ -242,25 +273,24 @@ impl<'a> Spec<'a> { // Verify that Cardano stake distribution artifacts are produced and signed correctly if self.is_signing_cardano_stake_distribution { { - let (hash, epoch) = assertions::assert_node_producing_cardano_stake_distribution( - &aggregator_endpoint, - ) - .await?; + let (hash, epoch) = + assertions::assert_node_producing_cardano_stake_distribution(aggregator) + .await?; let certificate_hash = assertions::assert_signer_is_signing_cardano_stake_distribution( - &aggregator_endpoint, + aggregator, &hash, expected_epoch_min, ) .await?; assertions::assert_is_creating_certificate_with_enough_signers( - &aggregator_endpoint, + aggregator, &certificate_hash, - self.infrastructure.signers().len(), + infrastructure.signers().len(), ) .await?; - let mut client = self.infrastructure.build_client()?; + let mut client = infrastructure.build_client(aggregator).await?; assertions::assert_client_can_verify_cardano_stake_distribution( &mut client, &hash, diff --git a/mithril-test-lab/mithril-end-to-end/src/main.rs b/mithril-test-lab/mithril-end-to-end/src/main.rs index 87898022b3e..ed8450159c1 100644 --- a/mithril-test-lab/mithril-end-to-end/src/main.rs +++ b/mithril-test-lab/mithril-end-to-end/src/main.rs @@ -51,9 +51,13 @@ pub struct Args { #[clap(long, default_value = ".")] bin_directory: PathBuf, - /// Number of Pool nodes in the devnet - #[clap(long, default_value_t = 3, value_parser = has_at_least_two_pool_nodes)] - number_of_pool_nodes: u8, + /// Number of aggregators + #[clap(long, default_value_t = 1, value_parser = clap::value_parser!(u8).range(1..))] + number_of_aggregators: u8, + + /// Number of signers + #[clap(long, default_value_t = 2, value_parser = clap::value_parser!(u8).range(1..))] + number_of_signers: u8, /// Length of a Cardano slot in the devnet (in s) #[clap(long, default_value_t = 0.10)] @@ -103,11 +107,19 @@ pub struct Args { #[clap(long)] run_only: bool, - /// Enable P2P network mode + /// Use Mithril relays #[clap(long)] - use_p2p_network: bool, + use_relays: bool, + + /// Signer registration relay mode (used only when 'use_relays' is set, can be 'passthrough' or 'p2p') + #[clap(long, default_value = "passthrough")] + relay_signer_registration_mode: String, - /// Enable P2P passive relays in P2P mode + /// Signature registration relay mode (used only when 'use_relays' is set, can be 'passthrough' or 'p2p') + #[clap(long, default_value = "p2p")] + relay_signature_registration_mode: String, + + /// Enable P2P passive relays in P2P mode (used only when 'use_relays' is set) #[clap(long, default_value = "true")] use_p2p_passive_relays: bool, @@ -135,17 +147,15 @@ impl Args { _ => Level::Trace, } } -} -fn has_at_least_two_pool_nodes(s: &str) -> Result { - let number_of_pool_nodes: u8 = s.parse().map_err(|_| format!("`{}` isn't a number", s))?; - if number_of_pool_nodes >= 2 { - Ok(number_of_pool_nodes) - } else { - Err(format!( - "At least two pool nodes are required (one for the aggregator, one for at least one \ - signer), number given: {s}", - )) + fn validate(&self) -> StdResult<()> { + if !self.use_relays && self.number_of_aggregators >= 2 { + return Err(anyhow!( + "The 'use_relays' parameter must be activated to run more than one aggregator" + )); + } + + Ok(()) } } @@ -190,16 +200,21 @@ async fn main_exec() -> StdResult<()> { }; let artifacts_dir = { let path = work_dir.join("artifacts"); - fs::create_dir(&path).expect("Artifacts dir creation failure"); + fs::create_dir(&path).with_context(|| "Artifacts dir creation failure")?; + path + }; + let store_dir = { + let path = work_dir.join("stores"); + fs::create_dir(&path).with_context(|| "Stores dir creation failure")?; path }; let mut app = App::new(); let mut app_stopper = AppStopper::new(&app); let mut join_set = JoinSet::new(); - with_gracefull_shutdown(&mut join_set); + with_graceful_shutdown(&mut join_set); - join_set.spawn(async move { app.run(args, work_dir, artifacts_dir).await }); + join_set.spawn(async move { app.run(args, work_dir, store_dir, artifacts_dir).await }); let res = match join_set.join_next().await { Some(Ok(tasks_result)) => tasks_result, @@ -274,7 +289,7 @@ impl From> for AppResult { struct App { devnet: Arc>>, - infrastructure: Arc>>, + infrastructure: Arc>>>, } impl App { @@ -305,17 +320,22 @@ impl App { &mut self, args: Args, work_dir: PathBuf, + store_dir: PathBuf, artifacts_dir: PathBuf, ) -> StdResult<()> { let server_port = 8080; + args.validate()?; let run_only_mode = args.run_only; - let use_p2p_network_mode = args.use_p2p_network; + let use_relays = args.use_relays; + let relay_signer_registration_mode = args.relay_signer_registration_mode; + let relay_signature_registration_mode = args.relay_signature_registration_mode; + let use_p2p_passive_relays = args.use_p2p_passive_relays; let devnet = Devnet::bootstrap(&DevnetBootstrapArgs { devnet_scripts_dir: args.devnet_scripts_directory, artifacts_target_dir: work_dir.join("devnet"), - number_of_pool_nodes: args.number_of_pool_nodes, + number_of_pool_nodes: args.number_of_aggregators + args.number_of_signers, cardano_slot_length: args.cardano_slot_length, cardano_epoch_length: args.cardano_epoch_length, cardano_node_version: args.cardano_node_version.to_owned(), @@ -325,40 +345,45 @@ impl App { .await?; *self.devnet.lock().await = Some(devnet.clone()); - let mut infrastructure = MithrilInfrastructure::start(&MithrilInfrastructureConfig { - server_port, - devnet: devnet.clone(), - artifacts_dir, - work_dir, - bin_dir: args.bin_directory, - cardano_node_version: args.cardano_node_version, - mithril_run_interval: args.mithril_run_interval, - mithril_era: args.mithril_era, - mithril_era_reader_adapter: args.mithril_era_reader_adapter, - signed_entity_types: args.signed_entity_types.clone(), - run_only_mode, - use_p2p_network_mode, - use_p2p_passive_relays, - use_era_specific_work_dir: args.mithril_next_era.is_some(), - }) - .await?; + let infrastructure = Arc::new( + MithrilInfrastructure::start(&MithrilInfrastructureConfig { + number_of_aggregators: args.number_of_aggregators, + number_of_signers: args.number_of_signers, + server_port, + devnet: devnet.clone(), + work_dir, + store_dir, + artifacts_dir, + bin_dir: args.bin_directory, + cardano_node_version: args.cardano_node_version, + mithril_run_interval: args.mithril_run_interval, + mithril_era: args.mithril_era, + mithril_era_reader_adapter: args.mithril_era_reader_adapter, + signed_entity_types: args.signed_entity_types.clone(), + run_only_mode, + use_relays, + relay_signer_registration_mode, + relay_signature_registration_mode, + use_p2p_passive_relays, + use_era_specific_work_dir: args.mithril_next_era.is_some(), + }) + .await?, + ); + *self.infrastructure.lock().await = Some(infrastructure.clone()); let runner: StdResult<()> = match run_only_mode { - true => { - let mut run_only = RunOnly::new(&mut infrastructure); - run_only.start().await - } + true => RunOnly::new(infrastructure).run().await, false => { - let mut spec = Spec::new( - &mut infrastructure, + Spec::new( + infrastructure, args.signed_entity_types, args.mithril_next_era, args.mithril_era_regenesis_on_switch, - ); - spec.run().await + ) + .run() + .await } }; - *self.infrastructure.lock().await = Some(infrastructure); match runner.with_context(|| "Mithril End to End test failed") { Ok(()) if run_only_mode => loop { @@ -377,7 +402,7 @@ impl App { struct AppStopper { devnet: Arc>>, - infrastructure: Arc>>, + infrastructure: Arc>>>, } impl AppStopper { @@ -422,7 +447,7 @@ fn create_workdir_if_not_exist_clean_otherwise(work_dir: &Path) { #[error("Signal received: `{0}`")] pub struct SignalError(pub String); -fn with_gracefull_shutdown(join_set: &mut JoinSet>) { +fn with_graceful_shutdown(join_set: &mut JoinSet>) { join_set.spawn(async move { let mut sigterm = signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal"); sigterm.recv().await; @@ -487,4 +512,16 @@ mod tests { AppResult::Cancelled(_) )); } + + #[test] + fn args_fails_validation() { + let args = Args::parse_from(["", "--number-of-aggregators", "2"]); + args.validate().expect_err( + "validate should fail with more than one aggregator if p2p network is not used", + ); + + let args = Args::parse_from(["", "--use-relays", "--number-of-aggregators", "2"]); + args.validate() + .expect("validate should succeed with more than one aggregator if p2p network is used"); + } } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs index 2d8ddcfcd01..c76654c98b3 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/aggregator.rs @@ -1,24 +1,31 @@ use crate::utils::MithrilCommand; use crate::{ - PoolNode, DEVNET_MAGIC_ID, ERA_MARKERS_SECRET_KEY, ERA_MARKERS_VERIFICATION_KEY, - GENESIS_SECRET_KEY, GENESIS_VERIFICATION_KEY, + PoolNode, RetryableDevnetError, DEVNET_MAGIC_ID, ERA_MARKERS_SECRET_KEY, + ERA_MARKERS_VERIFICATION_KEY, GENESIS_SECRET_KEY, GENESIS_VERIFICATION_KEY, }; use anyhow::{anyhow, Context}; +use mithril_common::chain_observer::{ChainObserver, PallasChainObserver}; use mithril_common::era::SupportedEra; -use mithril_common::{entities, StdResult}; +use mithril_common::{entities, CardanoNetwork, StdResult}; use slog_scope::info; use std::cmp; use std::collections::HashMap; +use std::fmt::Debug; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::Duration; use tokio::process::Child; +use tokio::sync::RwLock; #[derive(Debug)] pub struct AggregatorConfig<'a> { + pub index: usize, + pub name: &'a str, pub server_port: u64, pub pool_node: &'a PoolNode, pub cardano_cli_path: &'a Path, pub work_dir: &'a Path, + pub store_dir: &'a Path, pub artifacts_dir: &'a Path, pub bin_dir: &'a Path, pub cardano_node_version: &'a str, @@ -28,14 +35,18 @@ pub struct AggregatorConfig<'a> { pub mithril_era_marker_address: &'a str, pub signed_entity_types: &'a [String], pub chain_observer_type: &'a str, + pub master_aggregator_endpoint: &'a Option, } -#[derive(Debug)] pub struct Aggregator { + index: usize, + name_suffix: String, server_port: u64, db_directory: PathBuf, - command: MithrilCommand, - process: Option, + mithril_run_interval: u32, + command: Arc>, + process: RwLock>, + chain_observer: Arc, } impl Aggregator { @@ -57,7 +68,7 @@ impl Aggregator { let signed_entity_types = aggregator_config.signed_entity_types.join(","); let mithril_run_interval = format!("{}", aggregator_config.mithril_run_interval); let public_server_url = format!("http://localhost:{server_port_parameter}/aggregator"); - let env = HashMap::from([ + let mut env = HashMap::from([ ("NETWORK", "devnet"), ("RUN_INTERVAL", &mithril_run_interval), ("SERVER_IP", "0.0.0.0"), @@ -71,7 +82,10 @@ impl Aggregator { aggregator_config.artifacts_dir.to_str().unwrap(), ), ("NETWORK_MAGIC", &magic_id), - ("DATA_STORES_DIRECTORY", "./stores/aggregator"), + ( + "DATA_STORES_DIRECTORY", + aggregator_config.store_dir.to_str().unwrap(), + ), ( "CARDANO_NODE_SOCKET_PATH", aggregator_config.pool_node.socket_path.to_str().unwrap(), @@ -103,6 +117,9 @@ impl Aggregator { ("CARDANO_TRANSACTIONS_SIGNING_CONFIG__STEP", "15"), ("PERSIST_USAGE_REPORT_INTERVAL_IN_SECONDS", "3"), ]); + if let Some(master_aggregator_endpoint) = aggregator_config.master_aggregator_endpoint { + env.insert("MASTER_AGGREGATOR_ENDPOINT", master_aggregator_endpoint); + } let args = vec![ "--db-directory", aggregator_config.pool_node.db_path.to_str().unwrap(), @@ -116,24 +133,52 @@ impl Aggregator { env, &args, )?; + let chain_observer = Arc::new(PallasChainObserver::new( + &aggregator_config.pool_node.socket_path, + CardanoNetwork::DevNet(DEVNET_MAGIC_ID), + )); Ok(Self { + index: aggregator_config.index, + name_suffix: aggregator_config.name.to_string(), server_port: aggregator_config.server_port, db_directory: aggregator_config.pool_node.db_path.clone(), - command, - process: None, + mithril_run_interval: aggregator_config.mithril_run_interval, + command: Arc::new(RwLock::new(command)), + process: RwLock::new(None), + chain_observer, }) } + pub fn name_suffix(index: usize) -> String { + format!("{}", index + 1) + } + pub fn copy_configuration(other: &Aggregator) -> Self { Self { + index: other.index, + name_suffix: other.name_suffix.clone(), server_port: other.server_port, db_directory: other.db_directory.clone(), + mithril_run_interval: other.mithril_run_interval, command: other.command.clone(), - process: None, + process: RwLock::new(None), + chain_observer: other.chain_observer.clone(), } } + pub fn is_first(&self) -> bool { + self.index == 0 + } + + pub fn index(&self) -> usize { + self.index + } + + pub fn name(&self) -> String { + format!("mithril-aggregator-{}", self.name_suffix) + } + pub fn endpoint(&self) -> String { format!("http://localhost:{}/aggregator", &self.server_port) } @@ -142,16 +187,27 @@ impl Aggregator { &self.db_directory } - pub fn serve(&mut self) -> StdResult<()> { - self.process = Some(self.command.start(&["serve".to_string()])?); + pub fn mithril_run_interval(&self) -> u32 { + self.mithril_run_interval + } + + pub fn chain_observer(&self) -> Arc { + self.chain_observer.clone() + } + + pub async fn serve(&self) -> StdResult<()> { + let mut command = self.command.write().await; + command.set_log_name(&format!("mithril-aggregator-{}", self.name_suffix)); + let mut process = self.process.write().await; + *process = Some(command.start(&["serve".to_string()])?); Ok(()) } - pub async fn bootstrap_genesis(&mut self) -> StdResult<()> { + pub async fn bootstrap_genesis(&self) -> StdResult<()> { // Clone the command so we can alter it without affecting the original - let mut command = self.command.clone(); - let process_name = "mithril-aggregator-genesis-bootstrap"; - command.set_log_name(process_name); + let mut command = self.command.write().await; + let command_name = &format!("mithril-aggregator-genesis-bootstrap-{}", self.name_suffix,); + command.set_log_name(command_name); let exit_status = command .start(&["genesis".to_string(), "bootstrap".to_string()])? @@ -162,7 +218,7 @@ impl Aggregator { if exit_status.success() { Ok(()) } else { - self.command.tail_logs(Some(process_name), 40).await?; + command.tail_logs(Some(command_name), 40).await?; Err(match exit_status.code() { Some(c) => { @@ -172,23 +228,25 @@ impl Aggregator { anyhow!("`mithril-aggregator genesis bootstrap` was terminated with a signal") } }) + .map_err(|e| anyhow!(RetryableDevnetError(e.to_string()))) } } - pub async fn stop(&mut self) -> StdResult<()> { - if let Some(process) = self.process.as_mut() { - info!("Stopping aggregator"); - process + pub async fn stop(&self) -> StdResult<()> { + let mut process = self.process.write().await; + if let Some(mut process_running) = process.take() { + info!("Stopping {}", self.name()); + process_running .kill() .await .with_context(|| "Could not kill aggregator")?; - self.process = None; + *process = None; } Ok(()) } pub async fn era_generate_tx_datum( - &mut self, + &self, target_path: &Path, mithril_era: &str, next_era_activation_epoch: entities::Epoch, @@ -219,8 +277,8 @@ impl Aggregator { args.push(next_era_epoch.to_string()); } - let exit_status = self - .command + let mut command = self.command.write().await; + let exit_status = command .start(&args)? .wait() .await @@ -242,45 +300,53 @@ impl Aggregator { } } - pub fn set_protocol_parameters(&mut self, protocol_parameters: &entities::ProtocolParameters) { - self.command.set_env_var( + pub async fn set_protocol_parameters( + &self, + protocol_parameters: &entities::ProtocolParameters, + ) { + let mut command = self.command.write().await; + command.set_env_var( "PROTOCOL_PARAMETERS__K", &format!("{}", protocol_parameters.k), ); - self.command.set_env_var( + command.set_env_var( "PROTOCOL_PARAMETERS__M", &format!("{}", protocol_parameters.m), ); - self.command.set_env_var( + command.set_env_var( "PROTOCOL_PARAMETERS__PHI_F", &format!("{}", protocol_parameters.phi_f), ); } - pub fn set_mock_cardano_cli_file_path( - &mut self, + pub async fn set_mock_cardano_cli_file_path( + &self, stake_distribution_file: &Path, epoch_file_path: &Path, ) { - self.command.set_env_var( + let mut command = self.command.write().await; + command.set_env_var( "MOCK_STAKE_DISTRIBUTION_FILE", stake_distribution_file.to_str().unwrap(), ); - self.command - .set_env_var("MOCK_EPOCH_FILE", epoch_file_path.to_str().unwrap()); + command.set_env_var("MOCK_EPOCH_FILE", epoch_file_path.to_str().unwrap()); } /// Change the run interval of the aggregator state machine (default: 400ms) - pub fn change_run_interval(&mut self, interval: Duration) { - self.command - .set_env_var("RUN_INTERVAL", &format!("{}", interval.as_millis())) + pub async fn change_run_interval(&self, interval: Duration) { + let mut command = self.command.write().await; + command.set_env_var("RUN_INTERVAL", &format!("{}", interval.as_millis())) } pub async fn tail_logs(&self, number_of_line: u64) -> StdResult<()> { - self.command.tail_logs(None, number_of_line).await + let command = self.command.write().await; + command.tail_logs(Some(&self.name()), number_of_line).await } pub async fn last_error_in_logs(&self, number_of_error: u64) -> StdResult<()> { - self.command.last_error_in_logs(None, number_of_error).await + let command = self.command.write().await; + command + .last_error_in_logs(Some(&self.name()), number_of_error) + .await } } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs index b2f6a468f5c..791d7792fcb 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/infrastructure.rs @@ -1,24 +1,26 @@ -use anyhow::Context; +use crate::mithril::relay_signer::RelaySignerConfiguration; +use crate::{ + assertions, Aggregator, AggregatorConfig, Client, Devnet, PoolNode, RelayAggregator, + RelayPassive, RelaySigner, Signer, DEVNET_MAGIC_ID, +}; use mithril_common::chain_observer::{ChainObserver, PallasChainObserver}; use mithril_common::entities::{Epoch, PartyId, ProtocolParameters}; use mithril_common::{CardanoNetwork, StdResult}; use slog_scope::info; -use std::borrow::BorrowMut; use std::fs; use std::path::PathBuf; use std::sync::Arc; - -use crate::{ - assertions, Aggregator, AggregatorConfig, Client, Devnet, PoolNode, RelayAggregator, - RelayPassive, RelaySigner, Signer, DEVNET_MAGIC_ID, -}; +use tokio::sync::RwLock; use super::signer::SignerConfig; pub struct MithrilInfrastructureConfig { + pub number_of_aggregators: u8, + pub number_of_signers: u8, pub server_port: u64, pub devnet: Devnet, pub work_dir: PathBuf, + pub store_dir: PathBuf, pub artifacts_dir: PathBuf, pub bin_dir: PathBuf, pub cardano_node_version: String, @@ -27,23 +29,60 @@ pub struct MithrilInfrastructureConfig { pub mithril_era_reader_adapter: String, pub signed_entity_types: Vec, pub run_only_mode: bool, - pub use_p2p_network_mode: bool, + pub use_relays: bool, + pub relay_signer_registration_mode: String, + pub relay_signature_registration_mode: String, pub use_p2p_passive_relays: bool, pub use_era_specific_work_dir: bool, } +impl MithrilInfrastructureConfig { + pub fn has_master_slave_signer_registration(&self) -> bool { + if &self.relay_signer_registration_mode == "passthrough" { + self.number_of_aggregators > 1 + } else { + false + } + } + + #[cfg(test)] + pub fn dummy() -> Self { + Self { + number_of_aggregators: 1, + number_of_signers: 1, + server_port: 8080, + devnet: Devnet::default(), + work_dir: PathBuf::from("/tmp/work"), + store_dir: PathBuf::from("/tmp/store"), + artifacts_dir: PathBuf::from("/tmp/artifacts"), + bin_dir: PathBuf::from("/tmp/bin"), + cardano_node_version: "1.0.0".to_string(), + mithril_run_interval: 10, + mithril_era: "era1".to_string(), + mithril_era_reader_adapter: "adapter1".to_string(), + signed_entity_types: vec!["type1".to_string()], + run_only_mode: false, + use_relays: false, + relay_signer_registration_mode: "passthrough".to_string(), + relay_signature_registration_mode: "passthrough".to_string(), + use_p2p_passive_relays: false, + use_era_specific_work_dir: false, + } + } +} + pub struct MithrilInfrastructure { artifacts_dir: PathBuf, bin_dir: PathBuf, devnet: Devnet, - aggregator: Aggregator, + aggregators: Vec, signers: Vec, relay_aggregators: Vec, relay_signers: Vec, relay_passives: Vec, cardano_chain_observer: Arc, run_only_mode: bool, - current_era: String, + current_era: RwLock, era_reader_adapter: String, use_era_specific_work_dir: bool, } @@ -53,29 +92,44 @@ impl MithrilInfrastructure { let chain_observer_type = "pallas"; config.devnet.run().await?; let devnet_topology = config.devnet.topology(); - // Clap check that we always have at least 2 pools, no need to be defensive here - let aggregator_cardano_node = &devnet_topology.pool_nodes[0]; - let signer_cardano_nodes = &devnet_topology.pool_nodes[1..]; + let number_of_aggregators = config.number_of_aggregators as usize; + let number_of_signers = config.number_of_signers as usize; + let aggregator_cardano_nodes = &devnet_topology.pool_nodes[0..number_of_aggregators]; + let signer_cardano_nodes = &devnet_topology.pool_nodes + [number_of_aggregators..number_of_aggregators + number_of_signers]; let signer_party_ids = signer_cardano_nodes .iter() .map(|s| s.party_id()) .collect::>>()?; + let relay_signer_registration_mode = &config.relay_signer_registration_mode; + let relay_signature_registration_mode = &config.relay_signature_registration_mode; - let aggregator = - Self::start_aggregator(config, aggregator_cardano_node, chain_observer_type).await?; + let aggregators = + Self::start_aggregators(config, aggregator_cardano_nodes, chain_observer_type).await?; + let aggregator_endpoints = aggregators + .iter() + .map(|aggregator| aggregator.endpoint()) + .collect::>(); + let master_aggregator_endpoint = aggregator_endpoints[0].to_owned(); - let (relay_aggregators, relay_signers, relay_passives) = - Self::start_relays(config, aggregator.endpoint(), &signer_party_ids)?; + let (relay_aggregators, relay_signers, relay_passives) = Self::start_relays( + config, + &aggregator_endpoints, + &signer_party_ids, + relay_signer_registration_mode.to_owned(), + relay_signature_registration_mode.to_owned(), + )?; let signers = Self::start_signers( config, - aggregator.endpoint(), + master_aggregator_endpoint, signer_cardano_nodes, &relay_signers, - )?; + ) + .await?; let cardano_chain_observer = Arc::new(PallasChainObserver::new( - &aggregator_cardano_node.socket_path, + &aggregator_cardano_nodes[0].socket_path, CardanoNetwork::DevNet(DEVNET_MAGIC_ID), )); @@ -83,21 +137,21 @@ impl MithrilInfrastructure { bin_dir: config.bin_dir.to_path_buf(), artifacts_dir: config.artifacts_dir.to_path_buf(), devnet: config.devnet.clone(), - aggregator, + aggregators, signers, relay_aggregators, relay_signers, relay_passives, cardano_chain_observer, run_only_mode: config.run_only_mode, - current_era: config.mithril_era.clone(), + current_era: RwLock::new(config.mithril_era.clone()), era_reader_adapter: config.mithril_era_reader_adapter.clone(), use_era_specific_work_dir: config.use_era_specific_work_dir, }) } async fn register_startup_era( - aggregator: &mut Aggregator, + aggregator: &Aggregator, config: &MithrilInfrastructureConfig, ) -> StdResult<()> { let era_epoch = Epoch(0); @@ -114,143 +168,171 @@ impl MithrilInfrastructure { Ok(()) } - pub async fn register_switch_to_next_era(&mut self, next_era: &str) -> StdResult<()> { + pub async fn register_switch_to_next_era(&self, next_era: &str) -> StdResult<()> { let next_era_epoch = self - .chain_observer() + .cardano_chain_observer .get_current_epoch() .await? .unwrap_or_default() + 1; if self.era_reader_adapter == "cardano-chain" { - assertions::register_era_marker( - &mut self.aggregator, - &self.devnet, - next_era, - next_era_epoch, - ) - .await?; + let devnet = self.devnet.clone(); + assertions::register_era_marker(self.aggregator(0), &devnet, next_era, next_era_epoch) + .await?; } - self.current_era = next_era.to_owned(); + let mut current_era = self.current_era.write().await; + *current_era = next_era.to_owned(); Ok(()) } - async fn start_aggregator( + async fn start_aggregators( config: &MithrilInfrastructureConfig, - pool_node: &PoolNode, + pool_nodes: &[PoolNode], chain_observer_type: &str, - ) -> StdResult { - let aggregator_artifacts_directory = config.artifacts_dir.join("mithril-aggregator"); - if !aggregator_artifacts_directory.exists() { - fs::create_dir_all(&aggregator_artifacts_directory).with_context(|| { - format!( - "Could not create artifacts directory '{}'", - aggregator_artifacts_directory.display() - ) + ) -> StdResult> { + let mut aggregators = vec![]; + let mut master_aggregator_endpoint: Option = None; + for (index, pool_node) in pool_nodes.iter().enumerate() { + let aggregator_name = Aggregator::name_suffix(index); + let aggregator_artifacts_dir = config + .artifacts_dir + .join(format!("mithril-aggregator-{aggregator_name}")); + let aggregator_store_dir = config + .store_dir + .join(format!("aggregator-{aggregator_name}")); + let aggregator = Aggregator::new(&AggregatorConfig { + index, + name: &aggregator_name, + server_port: config.server_port + index as u64, + pool_node, + cardano_cli_path: &config.devnet.cardano_cli_path(), + work_dir: &config.work_dir, + store_dir: &aggregator_store_dir, + artifacts_dir: &aggregator_artifacts_dir, + bin_dir: &config.bin_dir, + cardano_node_version: &config.cardano_node_version, + mithril_run_interval: config.mithril_run_interval, + mithril_era: &config.mithril_era, + mithril_era_reader_adapter: &config.mithril_era_reader_adapter, + mithril_era_marker_address: &config.devnet.mithril_era_marker_address()?, + signed_entity_types: &config.signed_entity_types, + chain_observer_type, + master_aggregator_endpoint: &master_aggregator_endpoint.clone(), })?; + + aggregator + .set_protocol_parameters(&ProtocolParameters { + k: 75, + m: 105, + phi_f: 0.95, + }) + .await; + + if master_aggregator_endpoint.is_none() && config.has_master_slave_signer_registration() + { + master_aggregator_endpoint = Some(aggregator.endpoint()); + } + + aggregators.push(aggregator); + } + + Self::register_startup_era(&aggregators[0], config).await?; + + for aggregator in &aggregators { + aggregator.serve().await?; } - let mut aggregator = Aggregator::new(&AggregatorConfig { - server_port: config.server_port, - pool_node, - cardano_cli_path: &config.devnet.cardano_cli_path(), - work_dir: &config.work_dir, - artifacts_dir: &aggregator_artifacts_directory, - bin_dir: &config.bin_dir, - cardano_node_version: &config.cardano_node_version, - mithril_run_interval: config.mithril_run_interval, - mithril_era: &config.mithril_era, - mithril_era_reader_adapter: &config.mithril_era_reader_adapter, - mithril_era_marker_address: &config.devnet.mithril_era_marker_address()?, - signed_entity_types: &config.signed_entity_types, - chain_observer_type, - })?; - - aggregator.set_protocol_parameters(&ProtocolParameters { - k: 75, - m: 105, - phi_f: 0.95, - }); - - Self::register_startup_era(&mut aggregator, config).await?; - - aggregator.serve()?; - - Ok(aggregator) + Ok(aggregators) } fn start_relays( config: &MithrilInfrastructureConfig, - aggregator_endpoint: String, + aggregator_endpoints: &[String], signers_party_ids: &[PartyId], + relay_signer_registration_mode: String, + relay_signature_registration_mode: String, ) -> StdResult<(Vec, Vec, Vec)> { - if !config.use_p2p_network_mode { + if !config.use_relays { return Ok((vec![], vec![], vec![])); } let mut relay_aggregators: Vec = vec![]; let mut relay_signers: Vec = vec![]; let mut relay_passives: Vec = vec![]; + let master_aggregator_endpoint = &aggregator_endpoints[0]; info!("Starting the Mithril infrastructure in P2P mode (experimental)"); - let mut relay_aggregator = RelayAggregator::new( - config.server_port + 100, - &aggregator_endpoint, - &config.work_dir, - &config.bin_dir, - )?; - relay_aggregator.start()?; - - let mut relay_passive_id = 1; - if config.use_p2p_passive_relays { - let mut relay_passive_aggregator = RelayPassive::new( - config.server_port + 200, - relay_aggregator.peer_addr().to_owned(), - format!("{relay_passive_id}"), + let mut bootstrap_peer_addr = None; + for (index, aggregator_endpoint) in aggregator_endpoints.iter().enumerate() { + let mut relay_aggregator = RelayAggregator::new( + Aggregator::name_suffix(index), + config.server_port + index as u64 + 100, + bootstrap_peer_addr.clone(), + aggregator_endpoint, &config.work_dir, &config.bin_dir, )?; - relay_passive_aggregator.start()?; - relay_passives.push(relay_passive_aggregator); + if bootstrap_peer_addr.is_none() { + bootstrap_peer_addr = Some(relay_aggregator.peer_addr().to_owned()); + } + relay_aggregator.start()?; + relay_aggregators.push(relay_aggregator); } for (index, party_id) in signers_party_ids.iter().enumerate() { - let mut relay_signer = RelaySigner::new( - config.server_port + index as u64 + 300, - config.server_port + index as u64 + 400, - relay_aggregator.peer_addr().to_owned(), - &aggregator_endpoint, - party_id.clone(), - &config.work_dir, - &config.bin_dir, - )?; + let mut relay_signer = RelaySigner::new(&RelaySignerConfiguration { + listen_port: config.server_port + index as u64 + 200, + server_port: config.server_port + index as u64 + 300, + dial_to: bootstrap_peer_addr.clone(), + relay_signer_registration_mode: relay_signer_registration_mode.clone(), + relay_signature_registration_mode: relay_signature_registration_mode.clone(), + aggregator_endpoint: master_aggregator_endpoint, + party_id: party_id.clone(), + work_dir: &config.work_dir, + bin_dir: &config.bin_dir, + })?; relay_signer.start()?; - if config.use_p2p_passive_relays { + relay_signers.push(relay_signer); + } + + if config.use_p2p_passive_relays { + let mut relay_passive_id = 1; + for (index, _aggregator_endpoint) in aggregator_endpoints.iter().enumerate() { + let mut relay_passive_aggregator = RelayPassive::new( + config.server_port + index as u64 + 400, + bootstrap_peer_addr.clone(), + format!("{relay_passive_id}"), + &config.work_dir, + &config.bin_dir, + )?; + relay_passive_aggregator.start()?; + relay_passives.push(relay_passive_aggregator); relay_passive_id += 1; + } + + for (index, _party_id) in signers_party_ids.iter().enumerate() { let mut relay_passive_signer = RelayPassive::new( config.server_port + index as u64 + 500, - relay_signer.peer_addr().to_owned(), + bootstrap_peer_addr.clone(), format!("{relay_passive_id}"), &config.work_dir, &config.bin_dir, )?; relay_passive_signer.start()?; relay_passives.push(relay_passive_signer); + relay_passive_id += 1; } - - relay_signers.push(relay_signer); } - relay_aggregators.push(relay_aggregator); - Ok((relay_aggregators, relay_signers, relay_passives)) } - fn start_signers( + async fn start_signers( config: &MithrilInfrastructureConfig, - aggregator_endpoint: String, + master_aggregator_endpoint: String, pool_nodes: &[PoolNode], relay_signers: &[RelaySigner], ) -> StdResult> { @@ -261,18 +343,21 @@ impl MithrilInfrastructure { // Or 100% of signers otherwise let enable_certification = index % 2 == 0 || cfg!(not(feature = "allow_skip_signer_certification")); - let aggregator_endpoint = if config.use_p2p_network_mode { + let aggregator_endpoint = if config.use_relays { relay_signers[index].endpoint() } else { - aggregator_endpoint.clone() + master_aggregator_endpoint.clone() }; - let mut signer = Signer::new(&SignerConfig { + let signer = Signer::new(&SignerConfig { signer_number: index + 1, aggregator_endpoint, pool_node, cardano_cli_path: &config.devnet.cardano_cli_path(), work_dir: &config.work_dir, + store_dir: &config + .store_dir + .join(format!("signer-{}", pool_node.party_id()?)), bin_dir: &config.bin_dir, mithril_run_interval: config.mithril_run_interval, mithril_era: &config.mithril_era, @@ -280,7 +365,7 @@ impl MithrilInfrastructure { mithril_era_marker_address: &config.devnet.mithril_era_marker_address()?, enable_certification, })?; - signer.start()?; + signer.start().await?; signers.push(signer); } @@ -288,14 +373,16 @@ impl MithrilInfrastructure { Ok(signers) } - pub async fn stop_nodes(&mut self) -> StdResult<()> { - // Note: The aggregator should be stopped *last* since signers depends on it + pub async fn stop_nodes(&self) -> StdResult<()> { + // Note: The aggregators should be stopped *last* since signers depends on it info!("Stopping Mithril infrastructure"); - for signer in self.signers.as_mut_slice() { + for signer in &self.signers { signer.stop().await?; } - self.aggregator.stop().await?; + for aggregator in &self.aggregators { + aggregator.stop().await?; + } Ok(()) } @@ -304,22 +391,18 @@ impl MithrilInfrastructure { &self.devnet } - pub fn aggregator(&self) -> &Aggregator { - &self.aggregator + pub fn aggregators(&self) -> &[Aggregator] { + &self.aggregators } - pub fn aggregator_mut(&mut self) -> &mut Aggregator { - self.aggregator.borrow_mut() + pub fn aggregator(&self, index: usize) -> &Aggregator { + &self.aggregators[index] } pub fn signers(&self) -> &[Signer] { &self.signers } - pub fn signers_mut(&mut self) -> &mut [Signer] { - self.signers.as_mut_slice() - } - pub fn relay_aggregators(&self) -> &[RelayAggregator] { &self.relay_aggregators } @@ -336,11 +419,14 @@ impl MithrilInfrastructure { self.cardano_chain_observer.clone() } - pub fn build_client(&self) -> StdResult { + pub async fn build_client(&self, aggregator: &Aggregator) -> StdResult { let work_dir = { - let mut artifacts_dir = self.artifacts_dir.join("mithril-client"); + let mut artifacts_dir = self + .artifacts_dir + .join(format!("mithril-client-aggregator-{}", aggregator.name())); if self.use_era_specific_work_dir { - artifacts_dir = artifacts_dir.join(format!("era.{}", self.current_era)); + let current_era = self.current_era.read().await; + artifacts_dir = artifacts_dir.join(format!("era.{}", current_era)); } if !artifacts_dir.exists() { fs::create_dir_all(&artifacts_dir)?; @@ -349,7 +435,7 @@ impl MithrilInfrastructure { artifacts_dir }; - Client::new(self.aggregator.endpoint(), &work_dir, &self.bin_dir) + Client::new(aggregator.endpoint(), &work_dir, &self.bin_dir) } pub fn run_only_mode(&self) -> bool { @@ -357,7 +443,9 @@ impl MithrilInfrastructure { } pub async fn tail_logs(&self, number_of_line: u64) -> StdResult<()> { - self.aggregator().tail_logs(number_of_line).await?; + for aggregator in self.aggregators() { + aggregator.tail_logs(number_of_line).await?; + } for signer in self.signers() { signer.tail_logs(number_of_line).await?; } @@ -375,10 +463,50 @@ impl MithrilInfrastructure { } pub async fn last_error_in_logs(&self, number_of_error: u64) -> StdResult<()> { - self.aggregator() - .last_error_in_logs(number_of_error) - .await?; + for aggregator in self.aggregators() { + aggregator.last_error_in_logs(number_of_error).await?; + } Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::MithrilInfrastructureConfig; + + #[test] + fn has_master_slave_signer_registration_succeeds() { + let config = MithrilInfrastructureConfig { + relay_signer_registration_mode: "passthrough".to_string(), + number_of_aggregators: 1, + ..MithrilInfrastructureConfig::dummy() + }; + + assert!(!config.has_master_slave_signer_registration()); + + let config = MithrilInfrastructureConfig { + relay_signer_registration_mode: "passthrough".to_string(), + number_of_aggregators: 2, + ..MithrilInfrastructureConfig::dummy() + }; + + assert!(config.has_master_slave_signer_registration()); + + let config = MithrilInfrastructureConfig { + relay_signer_registration_mode: "p2p".to_string(), + number_of_aggregators: 1, + ..MithrilInfrastructureConfig::dummy() + }; + + assert!(!config.has_master_slave_signer_registration()); + + let config = MithrilInfrastructureConfig { + relay_signer_registration_mode: "p2p".to_string(), + number_of_aggregators: 2, + ..MithrilInfrastructureConfig::dummy() + }; + + assert!(!config.has_master_slave_signer_registration()); + } +} diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs index dcea189e2ba..f3b4bc2cf9c 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_aggregator.rs @@ -6,6 +6,7 @@ use tokio::process::Child; #[derive(Debug)] pub struct RelayAggregator { + name_suffix: String, listen_port: u64, command: MithrilCommand, process: Option, @@ -13,22 +14,28 @@ pub struct RelayAggregator { impl RelayAggregator { pub fn new( + name: String, listen_port: u64, + dial_to: Option, aggregator_endpoint: &str, work_dir: &Path, bin_dir: &Path, ) -> StdResult { let listen_port_str = format!("{listen_port}"); - let env = HashMap::from([ + let mut env = HashMap::from([ ("LISTEN_PORT", listen_port_str.as_str()), ("AGGREGATOR_ENDPOINT", aggregator_endpoint), ]); + if let Some(dial_to) = &dial_to { + env.insert("DIAL_TO", dial_to); + } let args = vec!["-vvv", "aggregator"]; let mut command = MithrilCommand::new("mithril-relay", work_dir, bin_dir, env, &args)?; - command.set_log_name("mithril-relay-aggregator"); + command.set_log_name(&format!("mithril-relay-aggregator-{}", name,)); Ok(Self { + name_suffix: name, listen_port, command, process: None, @@ -39,6 +46,10 @@ impl RelayAggregator { format!("/ip4/127.0.0.1/tcp/{}", self.listen_port) } + pub fn name_suffix(&self) -> String { + self.name_suffix.clone() + } + pub fn start(&mut self) -> StdResult<()> { self.process = Some(self.command.start(&[])?); Ok(()) @@ -46,7 +57,10 @@ impl RelayAggregator { pub async fn tail_logs(&self, number_of_line: u64) -> StdResult<()> { self.command - .tail_logs(Some("mithril-relay-aggregator"), number_of_line) + .tail_logs( + Some(&format!("mithril-relay-aggregator-{}", self.name_suffix())), + number_of_line, + ) .await } } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_passive.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_passive.rs index e5f096cf29a..bd019d50a76 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_passive.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_passive.rs @@ -15,16 +15,16 @@ pub struct RelayPassive { impl RelayPassive { pub fn new( listen_port: u64, - dial_to: String, + dial_to: Option, relay_id: String, work_dir: &Path, bin_dir: &Path, ) -> StdResult { let listen_port_str = format!("{listen_port}"); - let env = HashMap::from([ - ("LISTEN_PORT", listen_port_str.as_str()), - ("DIAL_TO", &dial_to), - ]); + let mut env = HashMap::from([("LISTEN_PORT", listen_port_str.as_str())]); + if let Some(dial_to) = &dial_to { + env.insert("DIAL_TO", dial_to); + } let args = vec!["-vvv", "passive"]; let mut command = MithrilCommand::new("mithril-relay", work_dir, bin_dir, env, &args)?; diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs index c4c22f77373..4f87f531fba 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs @@ -5,6 +5,18 @@ use std::collections::HashMap; use std::path::Path; use tokio::process::Child; +pub struct RelaySignerConfiguration<'a> { + pub listen_port: u64, + pub server_port: u64, + pub dial_to: Option, + pub relay_signer_registration_mode: String, + pub relay_signature_registration_mode: String, + pub aggregator_endpoint: &'a str, + pub party_id: PartyId, + pub work_dir: &'a Path, + pub bin_dir: &'a Path, +} + #[derive(Debug)] pub struct RelaySigner { listen_port: u64, @@ -15,33 +27,45 @@ pub struct RelaySigner { } impl RelaySigner { - pub fn new( - listen_port: u64, - server_port: u64, - dial_to: String, - aggregator_endpoint: &str, - party_id: PartyId, - work_dir: &Path, - bin_dir: &Path, - ) -> StdResult { - let listen_port_str = format!("{listen_port}"); - let server_port_str = format!("{server_port}"); - let env = HashMap::from([ + pub fn new(configuration: &RelaySignerConfiguration) -> StdResult { + let listen_port_str = format!("{}", configuration.listen_port); + let server_port_str = format!("{}", configuration.server_port); + let relay_signer_registration_mode = + configuration.relay_signer_registration_mode.to_string(); + let relay_signature_registration_mode = + configuration.relay_signature_registration_mode.to_string(); + let mut env = HashMap::from([ ("LISTEN_PORT", listen_port_str.as_str()), ("SERVER_PORT", server_port_str.as_str()), - ("AGGREGATOR_ENDPOINT", aggregator_endpoint), - ("DIAL_TO", &dial_to), + ("AGGREGATOR_ENDPOINT", configuration.aggregator_endpoint), ("SIGNER_REPEATER_DELAY", "100"), + ( + "SIGNER_REGISTRATION_MODE", + relay_signer_registration_mode.as_str(), + ), + ( + "SIGNATURE_REGISTRATION_MODE", + relay_signature_registration_mode.as_str(), + ), ]); + if let Some(dial_to) = &configuration.dial_to { + env.insert("DIAL_TO", dial_to); + } let args = vec!["-vvv", "signer"]; - let mut command = MithrilCommand::new("mithril-relay", work_dir, bin_dir, env, &args)?; - command.set_log_name(format!("mithril-relay-signer-{party_id}").as_str()); + let mut command = MithrilCommand::new( + "mithril-relay", + configuration.work_dir, + configuration.bin_dir, + env, + &args, + )?; + command.set_log_name(format!("mithril-relay-signer-{}", configuration.party_id).as_str()); Ok(Self { - listen_port, - server_port, - party_id, + listen_port: configuration.listen_port, + server_port: configuration.server_port, + party_id: configuration.party_id.to_owned(), command, process: None, }) diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs index b91dee9ed66..53a8ad79c75 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/signer.rs @@ -7,7 +7,9 @@ use mithril_common::StdResult; use slog_scope::info; use std::collections::HashMap; use std::path::Path; +use std::sync::Arc; use tokio::process::Child; +use tokio::sync::RwLock; #[derive(Debug)] pub struct SignerConfig<'a> { @@ -16,6 +18,7 @@ pub struct SignerConfig<'a> { pub pool_node: &'a PoolNode, pub cardano_cli_path: &'a Path, pub work_dir: &'a Path, + pub store_dir: &'a Path, pub bin_dir: &'a Path, pub mithril_run_interval: u32, pub mithril_era: &'a str, @@ -28,15 +31,14 @@ pub struct SignerConfig<'a> { pub struct Signer { name: String, party_id: PartyId, - command: MithrilCommand, - process: Option, + command: Arc>, + process: RwLock>, } impl Signer { pub fn new(signer_config: &SignerConfig) -> StdResult { let party_id = signer_config.pool_node.party_id()?; let magic_id = DEVNET_MAGIC_ID.to_string(); - let data_stores_path = format!("./stores/signer-{party_id}"); let era_reader_adapter_params = if signer_config.mithril_era_reader_adapter == "cardano-chain" { format!( @@ -58,7 +60,10 @@ impl Signer { "DB_DIRECTORY", signer_config.pool_node.db_path.to_str().unwrap(), ), - ("DATA_STORES_DIRECTORY", &data_stores_path), + ( + "DATA_STORES_DIRECTORY", + signer_config.store_dir.to_str().unwrap(), + ), ("STORE_RETENTION_LIMIT", "10"), ("NETWORK_MAGIC", &magic_id), ( @@ -112,30 +117,35 @@ impl Signer { Ok(Self { name, party_id, - command, - process: None, + command: Arc::new(RwLock::new(command)), + process: RwLock::new(None), }) } - pub fn start(&mut self) -> StdResult<()> { - self.process = Some(self.command.start(&[])?); + pub async fn start(&self) -> StdResult<()> { + let mut command = self.command.write().await; + let mut process = self.process.write().await; + *process = Some(command.start(&[])?); Ok(()) } - pub async fn stop(&mut self) -> StdResult<()> { - if let Some(process) = self.process.as_mut() { + pub async fn stop(&self) -> StdResult<()> { + let mut process_option = self.process.write().await; + if let Some(process) = process_option.as_mut() { let name = self.name.as_str(); info!("Stopping {name}"); process .kill() .await .with_context(|| "Could not kill signer")?; - self.process = None; + *process_option = None; } Ok(()) } pub async fn tail_logs(&self, number_of_line: u64) -> StdResult<()> { self.command + .read() + .await .tail_logs( Some(format!("mithril-signer-{}", self.party_id).as_str()), number_of_line, diff --git a/mithril-test-lab/mithril-end-to-end/src/run_only.rs b/mithril-test-lab/mithril-end-to-end/src/run_only.rs index 3fbd20b19d1..2103710139d 100644 --- a/mithril-test-lab/mithril-end-to-end/src/run_only.rs +++ b/mithril-test-lab/mithril-end-to-end/src/run_only.rs @@ -1,41 +1,70 @@ -use crate::assertions; -use crate::MithrilInfrastructure; +use std::sync::Arc; + +use tokio::task::JoinSet; + use mithril_common::StdResult; -pub struct RunOnly<'a> { - pub infrastructure: &'a mut MithrilInfrastructure, +use crate::{assertions, Aggregator, MithrilInfrastructure}; + +pub struct RunOnly { + pub infrastructure: Arc, } -impl<'a> RunOnly<'a> { - pub fn new(infrastructure: &'a mut MithrilInfrastructure) -> Self { +impl RunOnly { + pub fn new(infrastructure: Arc) -> Self { Self { infrastructure } } - pub async fn start(&mut self) -> StdResult<()> { - let aggregator_endpoint = self.infrastructure.aggregator().endpoint(); - assertions::wait_for_enough_immutable(self.infrastructure.aggregator().db_directory()) - .await?; - let start_epoch = self - .infrastructure - .chain_observer() + pub async fn run(self) -> StdResult<()> { + let run_only = Arc::new(self); + let mut join_set = JoinSet::new(); + + for index in 0..run_only.infrastructure.aggregators().len() { + let run_only_clone = run_only.clone(); + join_set.spawn(async move { + let infrastructure = &run_only_clone.infrastructure; + + run_only_clone + .bootstrap_aggregator(infrastructure.aggregator(index), infrastructure) + .await + }); + } + + while let Some(res) = join_set.join_next().await { + res??; + } + + Ok(()) + } + + pub async fn bootstrap_aggregator( + &self, + aggregator: &Aggregator, + infrastructure: &MithrilInfrastructure, + ) -> StdResult<()> { + assertions::wait_for_enough_immutable(aggregator).await?; + let chain_observer = aggregator.chain_observer(); + let start_epoch = chain_observer .get_current_epoch() .await? .unwrap_or_default(); // Wait 3 epochs after start epoch for the aggregator to be able to bootstrap a genesis certificate let target_epoch = start_epoch + 3; - assertions::wait_for_target_epoch( - self.infrastructure.chain_observer(), + assertions::wait_for_aggregator_at_target_epoch( + aggregator, target_epoch, "minimal epoch for the aggregator to be able to bootstrap genesis certificate" .to_string(), ) .await?; - assertions::bootstrap_genesis_certificate(self.infrastructure.aggregator_mut()).await?; - assertions::wait_for_epoch_settings(&aggregator_endpoint).await?; + assertions::bootstrap_genesis_certificate(aggregator).await?; + assertions::wait_for_epoch_settings(aggregator).await?; - // Transfer some funds on the devnet to have some Cardano transactions to sign - assertions::transfer_funds(self.infrastructure.devnet()).await?; + if aggregator.is_first() { + // Transfer some funds on the devnet to have some Cardano transactions to sign + assertions::transfer_funds(infrastructure.devnet()).await?; + } Ok(()) } diff --git a/mithril-test-lab/mithril-end-to-end/src/stress_test/aggregator_helpers.rs b/mithril-test-lab/mithril-end-to-end/src/stress_test/aggregator_helpers.rs index cf25ac1c9dd..9e23076443c 100644 --- a/mithril-test-lab/mithril-end-to-end/src/stress_test/aggregator_helpers.rs +++ b/mithril-test-lab/mithril-end-to-end/src/stress_test/aggregator_helpers.rs @@ -19,10 +19,13 @@ pub async fn bootstrap_aggregator( let chain_observer_type = "cardano-cli"; let mut aggregator = Aggregator::new(&AggregatorConfig { + index: 0, + name: "genesis", server_port: args.server_port as u64, pool_node: &args.pool_node, cardano_cli_path: &args.cardano_cli_path, work_dir: &args.work_dir, + store_dir: &args.work_dir, artifacts_dir: &args.work_dir, bin_dir: &args.bin_dir, cardano_node_version: "1.2.3", @@ -32,6 +35,7 @@ pub async fn bootstrap_aggregator( mithril_era_reader_adapter: "dummy", signed_entity_types: &signed_entity_types, chain_observer_type, + master_aggregator_endpoint: &None, }) .unwrap(); @@ -40,18 +44,24 @@ pub async fn bootstrap_aggregator( // Extremely large interval since, for the two following starts, only the http_server part // of the aggregator is relevant as we need to send signer registrations. - aggregator.change_run_interval(Duration::from_secs(20000)); - aggregator.set_mock_cardano_cli_file_path( - &args.mock_stake_distribution_file_path(), - &args.mock_epoch_file_path(), - ); - aggregator.set_protocol_parameters(&signers_fixture.protocol_parameters()); + aggregator + .change_run_interval(Duration::from_secs(20000)) + .await; + aggregator + .set_mock_cardano_cli_file_path( + &args.mock_stake_distribution_file_path(), + &args.mock_epoch_file_path(), + ) + .await; + aggregator + .set_protocol_parameters(&signers_fixture.protocol_parameters()) + .await; info!( ">> Starting the aggregator with a large run interval to call the http_server\ without being bothered by the state machine cycles" ); - aggregator.serve().unwrap(); + aggregator.serve().await.unwrap(); wait::for_aggregator_http_server_to_start(&aggregator, Duration::from_secs(10)).await?; restart_aggregator_and_move_one_epoch_forward(&mut aggregator, current_epoch, args).await?; @@ -95,7 +105,7 @@ pub async fn bootstrap_aggregator( { info!(">> Compute genesis certificate"); - let mut genesis_aggregator = Aggregator::copy_configuration(&aggregator); + let genesis_aggregator = Aggregator::copy_configuration(&aggregator); genesis_aggregator .bootstrap_genesis() .await @@ -103,8 +113,8 @@ pub async fn bootstrap_aggregator( } info!(">> Restart aggregator with a normal run interval"); - aggregator.change_run_interval(Duration::from_secs(3)); - aggregator.serve().unwrap(); + aggregator.change_run_interval(Duration::from_secs(3)).await; + aggregator.serve().await.unwrap(); wait::for_aggregator_http_server_to_start(&aggregator, Duration::from_secs(10)).await?; @@ -125,7 +135,7 @@ async fn restart_aggregator_and_move_one_epoch_forward( fake_chain::set_epoch(&args.mock_epoch_file_path(), *current_epoch); info!(">> Restarting the aggregator with a large run interval"); - aggregator.serve().unwrap(); + aggregator.serve().await.unwrap(); wait::for_aggregator_http_server_to_start(aggregator, Duration::from_secs(10)).await?; wait::for_epoch_settings_at_epoch(aggregator, Duration::from_secs(10), *current_epoch).await?;