Skip to content

[6/n] simplify FetchArtifactImpl interface #8040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions installinator/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ struct DebugDiscoverOpts {

impl DebugDiscoverOpts {
async fn exec(self, log: &slog::Logger) -> Result<()> {
let peers = FetchArtifactBackend::new(
let backend = FetchArtifactBackend::new(
log,
Box::new(HttpFetchBackend::new(
&log,
self.opts.mechanism.discover_peers(&log).await?,
)),
Duration::from_secs(10),
);
println!("discovered peers: {}", peers.display());
println!("discovered peers: {}", backend.peers().display());
Ok(())
}
}
Expand Down
48 changes: 21 additions & 27 deletions installinator/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ use installinator_client::ClientError;
use installinator_common::{
InstallinatorProgressMetadata, StepContext, StepProgress,
};
use itertools::Itertools;
use tokio::{sync::mpsc, time::Instant};
use tufaceous_artifact::ArtifactHashId;
use update_engine::events::ProgressUnits;

use crate::{
artifact::ArtifactClient,
errors::{ArtifactFetchError, DiscoverPeersError, HttpError},
peers::PeerAddress,
peers::{PeerAddress, PeerAddresses},
};

/// A fetched artifact.
Expand Down Expand Up @@ -55,7 +54,7 @@ impl FetchedArtifact {
let mut attempt = 0;
loop {
attempt += 1;
let peers = match discover_fn().await {
let fetch_backend = match discover_fn().await {
Ok(peers) => peers,
Err(DiscoverPeersError::Retry(error)) => {
slog::warn!(
Expand All @@ -78,13 +77,14 @@ impl FetchedArtifact {
}
};

let peers = fetch_backend.peers();
slog::info!(
log,
"discovered {} peers: [{}]",
peers.peer_count(),
peers.len(),
peers.display(),
);
match peers.fetch_artifact(&cx, artifact_hash_id).await {
match fetch_backend.fetch_artifact(&cx, artifact_hash_id).await {
Some((peer, artifact)) => {
return Ok(Self { attempt, peer, artifact });
}
Expand All @@ -95,7 +95,7 @@ impl FetchedArtifact {
);
cx.send_progress(StepProgress::retry(format!(
"unable to fetch artifact from any of {} peers, retrying",
peers.peer_count(),
peers.len(),
)))
.await;
tokio::time::sleep(RETRY_DELAY).await;
Expand Down Expand Up @@ -146,15 +146,15 @@ impl FetchArtifactBackend {
) -> Option<(PeerAddress, BufList)> {
// TODO: do we want a check phase that happens before the download?
let peers = self.peers();
let mut remaining_peers = self.peer_count();
let mut remaining_peers = peers.len();

let log = self.log.new(
slog::o!("artifact_hash_id" => format!("{artifact_hash_id:?}")),
);

slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers);

for peer in peers {
for &peer in peers.peers() {
remaining_peers -= 1;

slog::debug!(
Expand Down Expand Up @@ -188,18 +188,10 @@ impl FetchArtifactBackend {
None
}

pub(crate) fn peers(&self) -> impl Iterator<Item = PeerAddress> + '_ {
pub(crate) fn peers(&self) -> &PeerAddresses {
self.imp.peers()
}

pub(crate) fn peer_count(&self) -> usize {
self.imp.peer_count()
}

pub(crate) fn display(&self) -> impl fmt::Display {
self.peers().join(", ")
}

async fn fetch_from_peer(
&self,
cx: &StepContext,
Expand Down Expand Up @@ -307,10 +299,16 @@ impl FetchArtifactBackend {
}
}

/// Backend implementation for fetching artifacts.
///
/// Note: While [`crate::reporter::ReportProgressImpl`] is a persistent
/// structure, a new `FetchArtifactImpl` is generated separately each time
/// discovery occurs. We should align this with `ReportProgressImpl` in the
/// future, though we'd need some way of looking up delay information in
/// `mock_peers`.
#[async_trait]
pub(crate) trait FetchArtifactImpl: fmt::Debug + Send + Sync {
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_>;
fn peer_count(&self) -> usize;
fn peers(&self) -> &PeerAddresses;

/// Returns (size, receiver) on success, and an error on failure.
async fn fetch_from_peer_impl(
Expand All @@ -329,24 +327,20 @@ pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>;
#[derive(Clone, Debug)]
pub(crate) struct HttpFetchBackend {
log: slog::Logger,
peers: Vec<PeerAddress>,
peers: PeerAddresses,
}

impl HttpFetchBackend {
pub(crate) fn new(log: &slog::Logger, peers: Vec<PeerAddress>) -> Self {
pub(crate) fn new(log: &slog::Logger, peers: PeerAddresses) -> Self {
let log = log.new(slog::o!("component" => "HttpPeers"));
Self { log, peers }
}
}

#[async_trait]
impl FetchArtifactImpl for HttpFetchBackend {
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
Box::new(self.peers.iter().copied())
}

fn peer_count(&self) -> usize {
self.peers.len()
fn peers(&self) -> &PeerAddresses {
&self.peers
}

async fn fetch_from_peer_impl(
Expand Down
39 changes: 22 additions & 17 deletions installinator/src/mock_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use uuid::Uuid;
use crate::{
errors::{DiscoverPeersError, HttpError},
fetch::{FetchArtifactImpl, FetchReceiver},
peers::PeerAddress,
peers::{PeerAddress, PeerAddresses},
reporter::ReportProgressImpl,
};

Expand Down Expand Up @@ -143,10 +143,10 @@ impl MockPeersUniverse {
.then(|| (*addr, peer.clone()))
})
.collect();
Ok(MockFetchBackend {
artifact: self.artifact.clone(),
Ok(MockFetchBackend::new(
self.artifact.clone(),
selected_peers,
})
))
}
AttemptBitmap::Failure => {
bail!(
Expand Down Expand Up @@ -177,20 +177,27 @@ struct MockFetchBackend {
artifact: Bytes,
// Peers within the universe that have been selected
selected_peers: BTreeMap<PeerAddress, MockPeer>,
// selected_peers keys stored in a suitable form for the
// FetchArtifactImpl trait
peer_addresses: PeerAddresses,
}

impl MockFetchBackend {
fn get(&self, peer: PeerAddress) -> Option<&MockPeer> {
self.selected_peers.get(&peer)
fn new(
artifact: Bytes,
selected_peers: BTreeMap<PeerAddress, MockPeer>,
) -> Self {
let peer_addresses = selected_peers.keys().copied().collect();
Self { artifact, selected_peers, peer_addresses }
}

fn peers(&self) -> impl Iterator<Item = (&PeerAddress, &MockPeer)> + '_ {
self.selected_peers.iter()
fn get(&self, peer: PeerAddress) -> Option<&MockPeer> {
self.selected_peers.get(&peer)
}

/// Returns the peer that can return the entire dataset within the timeout.
fn successful_peer(&self, timeout: Duration) -> Option<PeerAddress> {
self.peers()
self.selected_peers.iter()
.filter_map(|(addr, peer)| {
if peer.artifact != self.artifact {
// We don't handle the case where the peer returns the wrong artifact yet.
Expand Down Expand Up @@ -235,12 +242,8 @@ impl MockFetchBackend {

#[async_trait]
impl FetchArtifactImpl for MockFetchBackend {
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
Box::new(self.selected_peers.keys().copied())
}

fn peer_count(&self) -> usize {
self.selected_peers.len()
fn peers(&self) -> &PeerAddresses {
&self.peer_addresses
}

async fn fetch_from_peer_impl(
Expand Down Expand Up @@ -478,8 +481,10 @@ impl MockProgressBackend {
impl ReportProgressImpl for MockProgressBackend {
async fn discover_peers(
&self,
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
Ok(vec![Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER])
) -> Result<PeerAddresses, DiscoverPeersError> {
Ok([Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER]
.into_iter()
.collect())
}

async fn report_progress_impl(
Expand Down
31 changes: 29 additions & 2 deletions installinator/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::{
collections::BTreeSet,
fmt,
net::{AddrParseError, IpAddr, SocketAddr},
str::FromStr,
Expand Down Expand Up @@ -31,7 +32,7 @@ impl DiscoveryMechanism {
pub(crate) async fn discover_peers(
&self,
log: &slog::Logger,
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
) -> Result<PeerAddresses, DiscoverPeersError> {
let peers = match self {
Self::Bootstrap => {
// Note: we do not abort this process and instead keep retrying
Expand Down Expand Up @@ -61,7 +62,7 @@ impl DiscoveryMechanism {
})
.collect()
}
Self::List(peers) => peers.clone(),
Self::List(peers) => peers.iter().copied().collect(),
};

Ok(peers)
Expand Down Expand Up @@ -100,6 +101,32 @@ impl FromStr for DiscoveryMechanism {
}
}

#[derive(Clone, Debug)]
pub(crate) struct PeerAddresses {
peers: BTreeSet<PeerAddress>,
}

impl PeerAddresses {
pub(crate) fn peers(&self) -> &BTreeSet<PeerAddress> {
&self.peers
}

pub(crate) fn len(&self) -> usize {
self.peers.len()
}

pub(crate) fn display(&self) -> impl fmt::Display {
self.peers().iter().join(", ")
}
}

impl FromIterator<PeerAddress> for PeerAddresses {
fn from_iter<I: IntoIterator<Item = PeerAddress>>(iter: I) -> Self {
let peers = iter.into_iter().collect::<BTreeSet<_>>();
Self { peers }
}
}

#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)]
#[cfg_attr(test, derive(test_strategy::Arbitrary))]
pub(crate) struct PeerAddress {
Expand Down
34 changes: 17 additions & 17 deletions installinator/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use uuid::Uuid;
use crate::{
artifact::ArtifactClient,
errors::DiscoverPeersError,
peers::{DiscoveryMechanism, PeerAddress},
peers::{DiscoveryMechanism, PeerAddress, PeerAddresses},
};

#[derive(Debug)]
Expand Down Expand Up @@ -129,17 +129,18 @@ impl ProgressReporter {
// that if two servers both say, only one of them will
// deterministically get the update. Need to decide post-PVT1.
let last_reported = report.last_seen;
let results: Vec<_> = futures::stream::iter(peers)
.map(|peer| {
report_backend.send_report_to_peer(
peer,
update_id,
report.clone(),
)
})
.buffer_unordered(8)
.collect()
.await;
let results: Vec<_> =
futures::stream::iter(peers.peers().iter().copied())
.map(|peer| {
report_backend.send_report_to_peer(
peer,
update_id,
report.clone(),
)
})
.buffer_unordered(8)
.collect()
.await;

if results.iter().any(|res| res.is_ok()) {
Some(last_reported)
Expand Down Expand Up @@ -188,7 +189,7 @@ impl ReportProgressBackend {

pub(crate) async fn discover_peers(
&self,
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
) -> Result<PeerAddresses, DiscoverPeersError> {
let log = self.log.new(slog::o!("task" => "discover_peers"));
slog::debug!(log, "discovering peers");

Expand Down Expand Up @@ -255,9 +256,8 @@ pub(crate) enum SendReportStatus {

#[async_trait]
pub(crate) trait ReportProgressImpl: fmt::Debug + Send + Sync {
async fn discover_peers(
&self,
) -> Result<Vec<PeerAddress>, DiscoverPeersError>;
async fn discover_peers(&self)
-> Result<PeerAddresses, DiscoverPeersError>;

async fn report_progress_impl(
&self,
Expand Down Expand Up @@ -287,7 +287,7 @@ impl HttpProgressBackend {
impl ReportProgressImpl for HttpProgressBackend {
async fn discover_peers(
&self,
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
) -> Result<PeerAddresses, DiscoverPeersError> {
self.discovery.discover_peers(&self.log).await
}

Expand Down
Loading