diff --git a/installinator/src/dispatch.rs b/installinator/src/dispatch.rs index aff9cc256b0..469ebeaaab5 100644 --- a/installinator/src/dispatch.rs +++ b/installinator/src/dispatch.rs @@ -91,7 +91,7 @@ struct DebugDiscoverOpts { impl DebugDiscoverOpts { async fn exec(self, log: &slog::Logger) -> Result<()> { - let peers = FetchArtifactBackend::new( + let backend = FetchArtifactBackend::new( log, Box::new(HttpFetchBackend::new( &log, @@ -99,7 +99,7 @@ impl DebugDiscoverOpts { )), Duration::from_secs(10), ); - println!("discovered peers: {}", peers.display()); + println!("discovered peers: {}", backend.peers().display()); Ok(()) } } diff --git a/installinator/src/fetch.rs b/installinator/src/fetch.rs index 3901f38c990..7c040ae069f 100644 --- a/installinator/src/fetch.rs +++ b/installinator/src/fetch.rs @@ -15,7 +15,6 @@ use installinator_client::ClientError; use installinator_common::{ InstallinatorProgressMetadata, StepContext, StepProgress, }; -use itertools::Itertools; use tokio::{sync::mpsc, time::Instant}; use tufaceous_artifact::ArtifactHashId; use update_engine::events::ProgressUnits; @@ -23,7 +22,7 @@ use update_engine::events::ProgressUnits; use crate::{ artifact::ArtifactClient, errors::{ArtifactFetchError, DiscoverPeersError, HttpError}, - peers::PeerAddress, + peers::{PeerAddress, PeerAddresses}, }; /// A fetched artifact. @@ -55,7 +54,7 @@ impl FetchedArtifact { let mut attempt = 0; loop { attempt += 1; - let peers = match discover_fn().await { + let fetch_backend = match discover_fn().await { Ok(peers) => peers, Err(DiscoverPeersError::Retry(error)) => { slog::warn!( @@ -78,13 +77,14 @@ impl FetchedArtifact { } }; + let peers = fetch_backend.peers(); slog::info!( log, "discovered {} peers: [{}]", - peers.peer_count(), + peers.len(), peers.display(), ); - match peers.fetch_artifact(&cx, artifact_hash_id).await { + match fetch_backend.fetch_artifact(&cx, artifact_hash_id).await { Some((peer, artifact)) => { return Ok(Self { attempt, peer, artifact }); } @@ -95,7 +95,7 @@ impl FetchedArtifact { ); cx.send_progress(StepProgress::retry(format!( "unable to fetch artifact from any of {} peers, retrying", - peers.peer_count(), + peers.len(), ))) .await; tokio::time::sleep(RETRY_DELAY).await; @@ -146,7 +146,7 @@ impl FetchArtifactBackend { ) -> Option<(PeerAddress, BufList)> { // TODO: do we want a check phase that happens before the download? let peers = self.peers(); - let mut remaining_peers = self.peer_count(); + let mut remaining_peers = peers.len(); let log = self.log.new( slog::o!("artifact_hash_id" => format!("{artifact_hash_id:?}")), @@ -154,7 +154,7 @@ impl FetchArtifactBackend { slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers); - for peer in peers { + for &peer in peers.peers() { remaining_peers -= 1; slog::debug!( @@ -188,18 +188,10 @@ impl FetchArtifactBackend { None } - pub(crate) fn peers(&self) -> impl Iterator<Item = PeerAddress> + '_ { + pub(crate) fn peers(&self) -> &PeerAddresses { self.imp.peers() } - pub(crate) fn peer_count(&self) -> usize { - self.imp.peer_count() - } - - pub(crate) fn display(&self) -> impl fmt::Display { - self.peers().join(", ") - } - async fn fetch_from_peer( &self, cx: &StepContext, @@ -307,10 +299,16 @@ impl FetchArtifactBackend { } } +/// Backend implementation for fetching artifacts. +/// +/// Note: While [`crate::reporter::ReportProgressImpl`] is a persistent +/// structure, a new `FetchArtifactImpl` is generated separately each time +/// discovery occurs. We should align this with `ReportProgressImpl` in the +/// future, though we'd need some way of looking up delay information in +/// `mock_peers`. #[async_trait] pub(crate) trait FetchArtifactImpl: fmt::Debug + Send + Sync { - fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_>; - fn peer_count(&self) -> usize; + fn peers(&self) -> &PeerAddresses; /// Returns (size, receiver) on success, and an error on failure. async fn fetch_from_peer_impl( @@ -329,11 +327,11 @@ pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>; #[derive(Clone, Debug)] pub(crate) struct HttpFetchBackend { log: slog::Logger, - peers: Vec<PeerAddress>, + peers: PeerAddresses, } impl HttpFetchBackend { - pub(crate) fn new(log: &slog::Logger, peers: Vec<PeerAddress>) -> Self { + pub(crate) fn new(log: &slog::Logger, peers: PeerAddresses) -> Self { let log = log.new(slog::o!("component" => "HttpPeers")); Self { log, peers } } @@ -341,12 +339,8 @@ impl HttpFetchBackend { #[async_trait] impl FetchArtifactImpl for HttpFetchBackend { - fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> { - Box::new(self.peers.iter().copied()) - } - - fn peer_count(&self) -> usize { - self.peers.len() + fn peers(&self) -> &PeerAddresses { + &self.peers } async fn fetch_from_peer_impl( diff --git a/installinator/src/mock_peers.rs b/installinator/src/mock_peers.rs index 87c99253365..6bd62bacacd 100644 --- a/installinator/src/mock_peers.rs +++ b/installinator/src/mock_peers.rs @@ -28,7 +28,7 @@ use uuid::Uuid; use crate::{ errors::{DiscoverPeersError, HttpError}, fetch::{FetchArtifactImpl, FetchReceiver}, - peers::PeerAddress, + peers::{PeerAddress, PeerAddresses}, reporter::ReportProgressImpl, }; @@ -143,10 +143,10 @@ impl MockPeersUniverse { .then(|| (*addr, peer.clone())) }) .collect(); - Ok(MockFetchBackend { - artifact: self.artifact.clone(), + Ok(MockFetchBackend::new( + self.artifact.clone(), selected_peers, - }) + )) } AttemptBitmap::Failure => { bail!( @@ -177,20 +177,27 @@ struct MockFetchBackend { artifact: Bytes, // Peers within the universe that have been selected selected_peers: BTreeMap<PeerAddress, MockPeer>, + // selected_peers keys stored in a suitable form for the + // FetchArtifactImpl trait + peer_addresses: PeerAddresses, } impl MockFetchBackend { - fn get(&self, peer: PeerAddress) -> Option<&MockPeer> { - self.selected_peers.get(&peer) + fn new( + artifact: Bytes, + selected_peers: BTreeMap<PeerAddress, MockPeer>, + ) -> Self { + let peer_addresses = selected_peers.keys().copied().collect(); + Self { artifact, selected_peers, peer_addresses } } - fn peers(&self) -> impl Iterator<Item = (&PeerAddress, &MockPeer)> + '_ { - self.selected_peers.iter() + fn get(&self, peer: PeerAddress) -> Option<&MockPeer> { + self.selected_peers.get(&peer) } /// Returns the peer that can return the entire dataset within the timeout. fn successful_peer(&self, timeout: Duration) -> Option<PeerAddress> { - self.peers() + self.selected_peers.iter() .filter_map(|(addr, peer)| { if peer.artifact != self.artifact { // We don't handle the case where the peer returns the wrong artifact yet. @@ -235,12 +242,8 @@ impl MockFetchBackend { #[async_trait] impl FetchArtifactImpl for MockFetchBackend { - fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> { - Box::new(self.selected_peers.keys().copied()) - } - - fn peer_count(&self) -> usize { - self.selected_peers.len() + fn peers(&self) -> &PeerAddresses { + &self.peer_addresses } async fn fetch_from_peer_impl( @@ -478,8 +481,10 @@ impl MockProgressBackend { impl ReportProgressImpl for MockProgressBackend { async fn discover_peers( &self, - ) -> Result<Vec<PeerAddress>, DiscoverPeersError> { - Ok(vec![Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER]) + ) -> Result<PeerAddresses, DiscoverPeersError> { + Ok([Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER] + .into_iter() + .collect()) } async fn report_progress_impl( diff --git a/installinator/src/peers.rs b/installinator/src/peers.rs index 867302baafb..dfb2c71495e 100644 --- a/installinator/src/peers.rs +++ b/installinator/src/peers.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use std::{ + collections::BTreeSet, fmt, net::{AddrParseError, IpAddr, SocketAddr}, str::FromStr, @@ -31,7 +32,7 @@ impl DiscoveryMechanism { pub(crate) async fn discover_peers( &self, log: &slog::Logger, - ) -> Result<Vec<PeerAddress>, DiscoverPeersError> { + ) -> Result<PeerAddresses, DiscoverPeersError> { let peers = match self { Self::Bootstrap => { // Note: we do not abort this process and instead keep retrying @@ -61,7 +62,7 @@ impl DiscoveryMechanism { }) .collect() } - Self::List(peers) => peers.clone(), + Self::List(peers) => peers.iter().copied().collect(), }; Ok(peers) @@ -100,6 +101,32 @@ impl FromStr for DiscoveryMechanism { } } +#[derive(Clone, Debug)] +pub(crate) struct PeerAddresses { + peers: BTreeSet<PeerAddress>, +} + +impl PeerAddresses { + pub(crate) fn peers(&self) -> &BTreeSet<PeerAddress> { + &self.peers + } + + pub(crate) fn len(&self) -> usize { + self.peers.len() + } + + pub(crate) fn display(&self) -> impl fmt::Display { + self.peers().iter().join(", ") + } +} + +impl FromIterator<PeerAddress> for PeerAddresses { + fn from_iter<I: IntoIterator<Item = PeerAddress>>(iter: I) -> Self { + let peers = iter.into_iter().collect::<BTreeSet<_>>(); + Self { peers } + } +} + #[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)] #[cfg_attr(test, derive(test_strategy::Arbitrary))] pub(crate) struct PeerAddress { diff --git a/installinator/src/reporter.rs b/installinator/src/reporter.rs index 00ece7e4bc4..a1f81db9929 100644 --- a/installinator/src/reporter.rs +++ b/installinator/src/reporter.rs @@ -19,7 +19,7 @@ use uuid::Uuid; use crate::{ artifact::ArtifactClient, errors::DiscoverPeersError, - peers::{DiscoveryMechanism, PeerAddress}, + peers::{DiscoveryMechanism, PeerAddress, PeerAddresses}, }; #[derive(Debug)] @@ -129,17 +129,18 @@ impl ProgressReporter { // that if two servers both say, only one of them will // deterministically get the update. Need to decide post-PVT1. let last_reported = report.last_seen; - let results: Vec<_> = futures::stream::iter(peers) - .map(|peer| { - report_backend.send_report_to_peer( - peer, - update_id, - report.clone(), - ) - }) - .buffer_unordered(8) - .collect() - .await; + let results: Vec<_> = + futures::stream::iter(peers.peers().iter().copied()) + .map(|peer| { + report_backend.send_report_to_peer( + peer, + update_id, + report.clone(), + ) + }) + .buffer_unordered(8) + .collect() + .await; if results.iter().any(|res| res.is_ok()) { Some(last_reported) @@ -188,7 +189,7 @@ impl ReportProgressBackend { pub(crate) async fn discover_peers( &self, - ) -> Result<Vec<PeerAddress>, DiscoverPeersError> { + ) -> Result<PeerAddresses, DiscoverPeersError> { let log = self.log.new(slog::o!("task" => "discover_peers")); slog::debug!(log, "discovering peers"); @@ -255,9 +256,8 @@ pub(crate) enum SendReportStatus { #[async_trait] pub(crate) trait ReportProgressImpl: fmt::Debug + Send + Sync { - async fn discover_peers( - &self, - ) -> Result<Vec<PeerAddress>, DiscoverPeersError>; + async fn discover_peers(&self) + -> Result<PeerAddresses, DiscoverPeersError>; async fn report_progress_impl( &self, @@ -287,7 +287,7 @@ impl HttpProgressBackend { impl ReportProgressImpl for HttpProgressBackend { async fn discover_peers( &self, - ) -> Result<Vec<PeerAddress>, DiscoverPeersError> { + ) -> Result<PeerAddresses, DiscoverPeersError> { self.discovery.discover_peers(&self.log).await }