From a8a8e43d820ac0598a1551fa6d0b67434f3fb927 Mon Sep 17 00:00:00 2001
From: Rain <rain@oxide.computer>
Date: Wed, 23 Apr 2025 19:13:16 -0700
Subject: [PATCH 1/2] [spr] changes to main this commit is based on

Created using spr 1.3.6-beta.1

[skip ci]
---
 installinator/src/artifact.rs   |   2 +-
 installinator/src/dispatch.rs   |  48 ++--
 installinator/src/errors.rs     |   9 +-
 installinator/src/fetch.rs      | 359 ++++++++++++++++++++++++++
 installinator/src/lib.rs        |   1 +
 installinator/src/mock_peers.rs | 181 ++++++-------
 installinator/src/peers.rs      | 437 ++------------------------------
 installinator/src/reporter.rs   | 196 ++++++++++++--
 8 files changed, 666 insertions(+), 567 deletions(-)
 create mode 100644 installinator/src/fetch.rs

diff --git a/installinator/src/artifact.rs b/installinator/src/artifact.rs
index 44eb03d270c..30b70c4d010 100644
--- a/installinator/src/artifact.rs
+++ b/installinator/src/artifact.rs
@@ -14,7 +14,7 @@ use tokio::sync::mpsc;
 use tufaceous_artifact::{ArtifactHash, ArtifactHashId};
 use uuid::Uuid;
 
-use crate::{errors::HttpError, peers::FetchReceiver};
+use crate::{errors::HttpError, fetch::FetchReceiver};
 
 #[derive(Clone, Debug, Eq, PartialEq, Args)]
 pub(crate) struct ArtifactIdOpts {
diff --git a/installinator/src/dispatch.rs b/installinator/src/dispatch.rs
index ba3154b04a2..aff9cc256b0 100644
--- a/installinator/src/dispatch.rs
+++ b/installinator/src/dispatch.rs
@@ -23,10 +23,11 @@ use tufaceous_lib::ControlPlaneZoneImages;
 use update_engine::StepResult;
 
 use crate::{
+    ArtifactWriter, WriteDestination,
     artifact::ArtifactIdOpts,
-    peers::{DiscoveryMechanism, FetchedArtifact, Peers},
-    reporter::ProgressReporter,
-    write::{ArtifactWriter, WriteDestination},
+    fetch::{FetchArtifactBackend, FetchedArtifact, HttpFetchBackend},
+    peers::DiscoveryMechanism,
+    reporter::{HttpProgressBackend, ProgressReporter, ReportProgressBackend},
 };
 
 /// Installinator app.
@@ -90,9 +91,12 @@ struct DebugDiscoverOpts {
 
 impl DebugDiscoverOpts {
     async fn exec(self, log: &slog::Logger) -> Result<()> {
-        let peers = Peers::new(
+        let peers = FetchArtifactBackend::new(
             log,
-            self.opts.mechanism.discover_peers(log).await?,
+            Box::new(HttpFetchBackend::new(
+                &log,
+                self.opts.mechanism.discover_peers(&log).await?,
+            )),
             Duration::from_secs(10),
         );
         println!("discovered peers: {}", peers.display());
@@ -182,19 +186,14 @@ impl InstallOpts {
         let image_id = self.artifact_ids.resolve()?;
 
         let discovery = self.discover_opts.mechanism.clone();
-        let discovery_log = log.clone();
-        let (progress_reporter, event_sender) =
-            ProgressReporter::new(log, image_id.update_id, move || {
-                let log = discovery_log.clone();
-                let discovery = discovery.clone();
-                async move {
-                    Ok(Peers::new(
-                        &log,
-                        discovery.discover_peers(&log).await?,
-                        Duration::from_secs(10),
-                    ))
-                }
-            });
+        let (progress_reporter, event_sender) = ProgressReporter::new(
+            log,
+            image_id.update_id,
+            ReportProgressBackend::new(
+                log,
+                HttpProgressBackend::new(log, discovery),
+            ),
+        );
         let progress_handle = progress_reporter.start();
         let discovery = &self.discover_opts.mechanism;
 
@@ -234,7 +233,7 @@ impl InstallOpts {
                     )
                     .await?;
 
-                    let address = host_phase_2_artifact.addr;
+                    let address = host_phase_2_artifact.peer.address();
 
                     StepSuccess::new(host_phase_2_artifact)
                         .with_metadata(
@@ -273,7 +272,7 @@ impl InstallOpts {
                     )
                     .await?;
 
-                    let address = control_plane_artifact.addr;
+                    let address = control_plane_artifact.peer.address();
 
                     StepSuccess::new(control_plane_artifact)
                         .with_metadata(
@@ -493,9 +492,12 @@ async fn fetch_artifact(
         cx,
         &log,
         || async {
-            Ok(Peers::new(
+            Ok(FetchArtifactBackend::new(
                 &log,
-                discovery.discover_peers(&log).await?,
+                Box::new(HttpFetchBackend::new(
+                    &log,
+                    discovery.discover_peers(&log).await?,
+                )),
                 Duration::from_secs(10),
             ))
         },
@@ -508,7 +510,7 @@ async fn fetch_artifact(
         log,
         "fetched {} bytes from {}",
         artifact.artifact.num_bytes(),
-        artifact.addr,
+        artifact.peer,
     );
 
     Ok(artifact)
diff --git a/installinator/src/errors.rs b/installinator/src/errors.rs
index 7c04226dfda..adf298d73a5 100644
--- a/installinator/src/errors.rs
+++ b/installinator/src/errors.rs
@@ -33,8 +33,13 @@ pub(crate) enum DiscoverPeersError {
     #[allow(unused)]
     Retry(#[source] anyhow::Error),
 
-    #[error("failed to discover peers (no more retries left, will abort)")]
-    #[allow(unused)]
+    /// Abort further discovery.
+    ///
+    /// The installinator must keep retrying until it has completed, which is why
+    /// there's no abort case here in the not cfg(test) case. However, we test
+    /// some abort-related functionality in tests.
+    #[cfg(test)]
+    #[error("failed to discover peers (will abort)")]
     Abort(#[source] anyhow::Error),
 }
 
diff --git a/installinator/src/fetch.rs b/installinator/src/fetch.rs
new file mode 100644
index 00000000000..6659051b945
--- /dev/null
+++ b/installinator/src/fetch.rs
@@ -0,0 +1,359 @@
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+
+//! Logic to fetch artifacts from a peer.
+
+use std::{fmt, future::Future, time::Duration};
+
+use anyhow::Result;
+use async_trait::async_trait;
+use buf_list::BufList;
+use bytes::Bytes;
+use display_error_chain::DisplayErrorChain;
+use installinator_client::ClientError;
+use installinator_common::{
+    InstallinatorProgressMetadata, StepContext, StepProgress,
+};
+use itertools::Itertools;
+use tokio::{sync::mpsc, time::Instant};
+use tufaceous_artifact::ArtifactHashId;
+use update_engine::events::ProgressUnits;
+
+use crate::{
+    artifact::ArtifactClient,
+    errors::{ArtifactFetchError, DiscoverPeersError, HttpError},
+    peers::PeerAddress,
+};
+
+/// A fetched artifact.
+pub(crate) struct FetchedArtifact {
+    pub(crate) attempt: usize,
+    pub(crate) peer: PeerAddress,
+    pub(crate) artifact: BufList,
+}
+
+impl FetchedArtifact {
+    /// In a loop, discover peers, and fetch from them.
+    ///
+    /// If `discover_fn` returns [`DiscoverPeersError::Retry`], this function will retry. If it
+    /// returns `DiscoverPeersError::Abort`, this function will exit with the underlying error.
+    pub(crate) async fn loop_fetch_from_peers<F, Fut>(
+        cx: &StepContext,
+        log: &slog::Logger,
+        mut discover_fn: F,
+        artifact_hash_id: &ArtifactHashId,
+    ) -> Result<Self>
+    where
+        F: FnMut() -> Fut,
+        Fut: Future<Output = Result<FetchArtifactBackend, DiscoverPeersError>>,
+    {
+        // How long to sleep between retries if we fail to find a peer or fail
+        // to fetch an artifact from a found peer.
+        const RETRY_DELAY: Duration = Duration::from_secs(5);
+
+        let mut attempt = 0;
+        loop {
+            attempt += 1;
+            let peers = match discover_fn().await {
+                Ok(peers) => peers,
+                Err(DiscoverPeersError::Retry(error)) => {
+                    slog::warn!(
+                        log,
+                        "(attempt {attempt}) failed to discover peers, retrying: {}",
+                        DisplayErrorChain::new(
+                            AsRef::<dyn std::error::Error>::as_ref(&error)
+                        ),
+                    );
+                    cx.send_progress(StepProgress::retry(format!(
+                        "failed to discover peers: {error}"
+                    )))
+                    .await;
+                    tokio::time::sleep(RETRY_DELAY).await;
+                    continue;
+                }
+                #[cfg(test)]
+                Err(DiscoverPeersError::Abort(error)) => {
+                    return Err(error);
+                }
+            };
+
+            slog::info!(
+                log,
+                "discovered {} peers: [{}]",
+                peers.peer_count(),
+                peers.display(),
+            );
+            match peers.fetch_artifact(&cx, artifact_hash_id).await {
+                Some((peer, artifact)) => {
+                    return Ok(Self { attempt, peer, artifact });
+                }
+                None => {
+                    slog::warn!(
+                        log,
+                        "unable to fetch artifact from peers, retrying discovery",
+                    );
+                    cx.send_progress(StepProgress::retry(format!(
+                        "unable to fetch artifact from any of {} peers, retrying",
+                        peers.peer_count(),
+                    )))
+                    .await;
+                    tokio::time::sleep(RETRY_DELAY).await;
+                }
+            }
+        }
+    }
+}
+
+impl fmt::Debug for FetchedArtifact {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("FetchedArtifact")
+            .field("attempt", &self.attempt)
+            .field("peer", &self.peer)
+            .field(
+                "artifact",
+                &format!(
+                    "({} bytes in {} chunks)",
+                    self.artifact.num_bytes(),
+                    self.artifact.num_chunks()
+                ),
+            )
+            .finish()
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct FetchArtifactBackend {
+    log: slog::Logger,
+    imp: Box<dyn FetchArtifactImpl>,
+    timeout: Duration,
+}
+
+impl FetchArtifactBackend {
+    pub(crate) fn new(
+        log: &slog::Logger,
+        imp: Box<dyn FetchArtifactImpl>,
+        timeout: Duration,
+    ) -> Self {
+        let log = log.new(slog::o!("component" => "Peers"));
+        Self { log, imp, timeout }
+    }
+
+    pub(crate) async fn fetch_artifact(
+        &self,
+        cx: &StepContext,
+        artifact_hash_id: &ArtifactHashId,
+    ) -> Option<(PeerAddress, BufList)> {
+        // TODO: do we want a check phase that happens before the download?
+        let peers = self.peers();
+        let mut remaining_peers = self.peer_count();
+
+        let log = self.log.new(
+            slog::o!("artifact_hash_id" => format!("{artifact_hash_id:?}")),
+        );
+
+        slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers);
+
+        for peer in peers {
+            remaining_peers -= 1;
+
+            slog::debug!(
+                log,
+                "start fetch from peer {peer:?}"; "remaining_peers" => remaining_peers,
+            );
+
+            // Attempt to download data from this peer.
+            let start = Instant::now();
+            match self.fetch_from_peer(cx, peer, artifact_hash_id).await {
+                Ok(artifact_bytes) => {
+                    let elapsed = start.elapsed();
+                    slog::info!(
+                        log,
+                        "fetched artifact from peer {peer} in {elapsed:?}"
+                    );
+                    return Some((peer, artifact_bytes));
+                }
+                Err(error) => {
+                    let elapsed = start.elapsed();
+                    slog::warn!(
+                        log,
+                        "error after {elapsed:?}: {}",
+                        DisplayErrorChain::new(&error);
+                        "remaining_peers" => remaining_peers,
+                    );
+                }
+            }
+        }
+
+        None
+    }
+
+    pub(crate) fn peers(&self) -> impl Iterator<Item = PeerAddress> + '_ {
+        self.imp.peers()
+    }
+
+    pub(crate) fn peer_count(&self) -> usize {
+        self.imp.peer_count()
+    }
+
+    pub(crate) fn display(&self) -> impl fmt::Display {
+        self.peers().join(", ")
+    }
+
+    async fn fetch_from_peer(
+        &self,
+        cx: &StepContext,
+        peer: PeerAddress,
+        artifact_hash_id: &ArtifactHashId,
+    ) -> Result<BufList, ArtifactFetchError> {
+        let log = self.log.new(slog::o!("peer" => peer.to_string()));
+
+        let (total_bytes, mut receiver) = match self
+            .imp
+            .fetch_from_peer_impl(peer, artifact_hash_id.clone())
+            .await
+        {
+            Ok(x) => x,
+            Err(error) => {
+                cx.send_progress(StepProgress::Reset {
+                    metadata: InstallinatorProgressMetadata::Download {
+                        peer: peer.address(),
+                    },
+                    message: error.to_string().into(),
+                })
+                .await;
+                return Err(ArtifactFetchError::HttpError {
+                    peer: peer.address(),
+                    error,
+                });
+            }
+        };
+
+        let mut artifact_bytes = BufList::new();
+        let mut downloaded_bytes = 0u64;
+        let metadata =
+            InstallinatorProgressMetadata::Download { peer: peer.address() };
+
+        loop {
+            match tokio::time::timeout(self.timeout, receiver.recv()).await {
+                Ok(Some(Ok(bytes))) => {
+                    slog::debug!(
+                        &log,
+                        "received chunk of {} bytes from peer",
+                        bytes.len()
+                    );
+                    downloaded_bytes += bytes.len() as u64;
+                    artifact_bytes.push_chunk(bytes);
+                    cx.send_progress(StepProgress::with_current_and_total(
+                        downloaded_bytes,
+                        total_bytes,
+                        ProgressUnits::BYTES,
+                        metadata.clone(),
+                    ))
+                    .await;
+                }
+                Ok(Some(Err(error))) => {
+                    slog::debug!(
+                        &log,
+                        "received error from peer, sending cancellation: {}",
+                        DisplayErrorChain::new(&error),
+                    );
+                    cx.send_progress(StepProgress::Reset {
+                        metadata: metadata.clone(),
+                        message: error.to_string().into(),
+                    })
+                    .await;
+                    return Err(ArtifactFetchError::HttpError {
+                        peer: peer.address(),
+                        error: error.into(),
+                    });
+                }
+                Ok(None) => {
+                    // The entire artifact has been downloaded.
+                    break;
+                }
+                Err(_) => {
+                    // The operation timed out.
+                    cx.send_progress(StepProgress::Reset {
+                        metadata,
+                        message: format!(
+                            "operation timed out ({:?})",
+                            self.timeout
+                        )
+                        .into(),
+                    })
+                    .await;
+                    return Err(ArtifactFetchError::Timeout {
+                        peer: peer.address(),
+                        timeout: self.timeout,
+                        bytes_fetched: artifact_bytes.num_bytes(),
+                    });
+                }
+            }
+        }
+
+        // Check that the artifact size matches the returned size.
+        if total_bytes != artifact_bytes.num_bytes() as u64 {
+            let error = ArtifactFetchError::SizeMismatch {
+                artifact_size: total_bytes,
+                downloaded_bytes,
+            };
+            cx.send_progress(StepProgress::reset(metadata, error.to_string()))
+                .await;
+            return Err(error);
+        }
+
+        Ok(artifact_bytes)
+    }
+}
+
+#[async_trait]
+pub(crate) trait FetchArtifactImpl: fmt::Debug + Send + Sync {
+    fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_>;
+    fn peer_count(&self) -> usize;
+
+    /// Returns (size, receiver) on success, and an error on failure.
+    async fn fetch_from_peer_impl(
+        &self,
+        peer: PeerAddress,
+        artifact_hash_id: ArtifactHashId,
+    ) -> Result<(u64, FetchReceiver), HttpError>;
+}
+
+/// The send side of the channel over which data is sent.
+pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>;
+
+/// A [`PeersImpl`] that uses HTTP to fetch artifacts from peers. This is the real implementation.
+#[derive(Clone, Debug)]
+pub(crate) struct HttpFetchBackend {
+    log: slog::Logger,
+    peers: Vec<PeerAddress>,
+}
+
+impl HttpFetchBackend {
+    pub(crate) fn new(log: &slog::Logger, peers: Vec<PeerAddress>) -> Self {
+        let log = log.new(slog::o!("component" => "HttpPeers"));
+        Self { log, peers }
+    }
+}
+
+#[async_trait]
+impl FetchArtifactImpl for HttpFetchBackend {
+    fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
+        Box::new(self.peers.iter().copied())
+    }
+
+    fn peer_count(&self) -> usize {
+        self.peers.len()
+    }
+
+    async fn fetch_from_peer_impl(
+        &self,
+        peer: PeerAddress,
+        artifact_hash_id: ArtifactHashId,
+    ) -> Result<(u64, FetchReceiver), HttpError> {
+        // TODO: be able to fetch from sled-agent clients as well
+        let artifact_client = ArtifactClient::new(peer.address(), &self.log);
+        artifact_client.fetch(artifact_hash_id).await
+    }
+}
diff --git a/installinator/src/lib.rs b/installinator/src/lib.rs
index 3b1d768a7d1..5e126e823e3 100644
--- a/installinator/src/lib.rs
+++ b/installinator/src/lib.rs
@@ -7,6 +7,7 @@ mod async_temp_file;
 mod bootstrap;
 mod dispatch;
 mod errors;
+mod fetch;
 mod hardware;
 #[cfg(test)]
 mod mock_peers;
diff --git a/installinator/src/mock_peers.rs b/installinator/src/mock_peers.rs
index e2ff4981ee8..87c99253365 100644
--- a/installinator/src/mock_peers.rs
+++ b/installinator/src/mock_peers.rs
@@ -26,13 +26,15 @@ use tufaceous_artifact::ArtifactHashId;
 use uuid::Uuid;
 
 use crate::{
-    errors::HttpError,
-    peers::{FetchReceiver, PeersImpl},
+    errors::{DiscoverPeersError, HttpError},
+    fetch::{FetchArtifactImpl, FetchReceiver},
+    peers::PeerAddress,
+    reporter::ReportProgressImpl,
 };
 
 struct MockPeersUniverse {
     artifact: Bytes,
-    peers: BTreeMap<SocketAddr, MockPeer>,
+    peers: BTreeMap<PeerAddress, MockPeer>,
     attempt_bitmaps: Vec<AttemptBitmap>,
 }
 
@@ -49,7 +51,7 @@ impl fmt::Debug for MockPeersUniverse {
 impl MockPeersUniverse {
     fn new(
         artifact: Bytes,
-        peers: BTreeMap<SocketAddr, MockPeer>,
+        peers: BTreeMap<PeerAddress, MockPeer>,
         attempt_bitmaps: Vec<AttemptBitmap>,
     ) -> Self {
         assert!(peers.len() <= 32, "this test only supports up to 32 peers");
@@ -70,7 +72,7 @@ impl MockPeersUniverse {
         // being unique identifiers. This means that this code can use a BTreeMap rather than a
         // fancier structure like an IndexMap.
         let peers_strategy = prop::collection::btree_map(
-            any::<SocketAddr>(),
+            any::<PeerAddress>(),
             any::<MockResponse_>(),
             0..max_peer_count,
         );
@@ -82,7 +84,7 @@ impl MockPeersUniverse {
         (artifact_strategy, peers_strategy, attempt_bitmaps_strategy).prop_map(
             |(artifact, peers, attempt_bitmaps): (
                 Vec<u8>,
-                BTreeMap<SocketAddr, MockResponse_>,
+                BTreeMap<PeerAddress, MockResponse_>,
                 Vec<AttemptBitmap>,
             )| {
                 let artifact = Bytes::from(artifact);
@@ -107,7 +109,7 @@ impl MockPeersUniverse {
     fn expected_result(
         &self,
         timeout: Duration,
-    ) -> Result<(usize, SocketAddr), usize> {
+    ) -> Result<(usize, PeerAddress), usize> {
         self.attempts()
             .enumerate()
             .filter_map(|(attempt, peers)| {
@@ -126,7 +128,7 @@ impl MockPeersUniverse {
             })
     }
 
-    fn attempts(&self) -> impl Iterator<Item = Result<MockPeers>> + '_ {
+    fn attempts(&self) -> impl Iterator<Item = Result<MockFetchBackend>> + '_ {
         self.attempt_bitmaps.iter().enumerate().map(
             move |(i, &attempt_bitmap)| {
                 match attempt_bitmap {
@@ -141,7 +143,7 @@ impl MockPeersUniverse {
                                     .then(|| (*addr, peer.clone()))
                             })
                             .collect();
-                        Ok(MockPeers {
+                        Ok(MockFetchBackend {
                             artifact: self.artifact.clone(),
                             selected_peers,
                         })
@@ -171,23 +173,23 @@ enum AttemptBitmap {
 }
 
 #[derive(Debug)]
-struct MockPeers {
+struct MockFetchBackend {
     artifact: Bytes,
     // Peers within the universe that have been selected
-    selected_peers: BTreeMap<SocketAddr, MockPeer>,
+    selected_peers: BTreeMap<PeerAddress, MockPeer>,
 }
 
-impl MockPeers {
-    fn get(&self, addr: SocketAddr) -> Option<&MockPeer> {
-        self.selected_peers.get(&addr)
+impl MockFetchBackend {
+    fn get(&self, peer: PeerAddress) -> Option<&MockPeer> {
+        self.selected_peers.get(&peer)
     }
 
-    fn peers(&self) -> impl Iterator<Item = (&SocketAddr, &MockPeer)> + '_ {
+    fn peers(&self) -> impl Iterator<Item = (&PeerAddress, &MockPeer)> + '_ {
         self.selected_peers.iter()
     }
 
     /// Returns the peer that can return the entire dataset within the timeout.
-    fn successful_peer(&self, timeout: Duration) -> Option<SocketAddr> {
+    fn successful_peer(&self, timeout: Duration) -> Option<PeerAddress> {
         self.peers()
             .filter_map(|(addr, peer)| {
                 if peer.artifact != self.artifact {
@@ -232,8 +234,8 @@ impl MockPeers {
 }
 
 #[async_trait]
-impl PeersImpl for MockPeers {
-    fn peers(&self) -> Box<dyn Iterator<Item = SocketAddr> + Send + '_> {
+impl FetchArtifactImpl for MockFetchBackend {
+    fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
         Box::new(self.selected_peers.keys().copied())
     }
 
@@ -243,7 +245,7 @@ impl PeersImpl for MockPeers {
 
     async fn fetch_from_peer_impl(
         &self,
-        peer: SocketAddr,
+        peer: PeerAddress,
         // We don't (yet) use the artifact ID in MockPeers
         _artifact_hash_id: ArtifactHashId,
     ) -> Result<(u64, FetchReceiver), HttpError> {
@@ -258,18 +260,6 @@ impl PeersImpl for MockPeers {
         // TODO: add tests to ensure an invalid artifact size is correctly detected
         Ok((artifact_size, receiver))
     }
-
-    async fn report_progress_impl(
-        &self,
-        _peer: SocketAddr,
-        _update_id: Uuid,
-        _report: EventReport,
-    ) -> Result<(), ClientError> {
-        panic!(
-            "this is currently unused -- at some point we'll want to \
-             unify this with MockReportPeers"
-        )
-    }
 }
 
 #[derive(Clone)]
@@ -462,69 +452,47 @@ impl ResponseAction_ {
 ///
 /// In the future, this will be combined with `MockPeers` so we can model.
 #[derive(Debug)]
-struct MockReportPeers {
+struct MockProgressBackend {
     update_id: Uuid,
     report_sender: mpsc::Sender<EventReport>,
 }
 
-impl MockReportPeers {
-    // SocketAddr::new is not a const fn in stable Rust as of this writing
-    fn valid_peer() -> SocketAddr {
-        SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 2000)
-    }
-
-    fn invalid_peer() -> SocketAddr {
-        SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2)), 2000)
-    }
-
-    fn unresponsive_peer() -> SocketAddr {
-        SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 3)), 2000)
-    }
-
-    fn new(update_id: Uuid, report_sender: mpsc::Sender<EventReport>) -> Self {
-        Self { update_id, report_sender }
-    }
+impl MockProgressBackend {
+    const VALID_PEER: PeerAddress = PeerAddress::new(SocketAddr::new(
+        IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
+        2000,
+    ));
+
+    const INVALID_PEER: PeerAddress = PeerAddress::new(SocketAddr::new(
+        IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 2)),
+        2000,
+    ));
+
+    const UNRESPONSIVE_PEER: PeerAddress = PeerAddress::new(SocketAddr::new(
+        IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 3)),
+        2000,
+    ));
 }
 
 #[async_trait]
-impl PeersImpl for MockReportPeers {
-    fn peers(&self) -> Box<dyn Iterator<Item = SocketAddr> + Send + '_> {
-        Box::new(
-            [
-                Self::valid_peer(),
-                Self::invalid_peer(),
-                Self::unresponsive_peer(),
-            ]
-            .into_iter(),
-        )
-    }
-
-    fn peer_count(&self) -> usize {
-        3
-    }
-
-    async fn fetch_from_peer_impl(
+impl ReportProgressImpl for MockProgressBackend {
+    async fn discover_peers(
         &self,
-        _peer: SocketAddr,
-        _artifact_hash_id: ArtifactHashId,
-    ) -> Result<(u64, FetchReceiver), HttpError> {
-        unimplemented!(
-            "this should never be called -- \
-            eventually we'll want to unify this with MockPeers",
-        )
+    ) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
+        Ok(vec![Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER])
     }
 
     async fn report_progress_impl(
         &self,
-        peer: SocketAddr,
+        peer: PeerAddress,
         update_id: Uuid,
         report: EventReport,
     ) -> Result<(), ClientError> {
         assert_eq!(update_id, self.update_id, "update ID matches");
-        if peer == Self::valid_peer() {
+        if peer == Self::VALID_PEER {
             _ = self.report_sender.send(report).await;
             Ok(())
-        } else if peer == Self::invalid_peer() {
+        } else if peer == Self::INVALID_PEER {
             Err(ClientError::ErrorResponse(ResponseValue::new(
                 installinator_client::types::Error {
                     error_code: None,
@@ -534,7 +502,7 @@ impl PeersImpl for MockReportPeers {
                 StatusCode::UNPROCESSABLE_ENTITY,
                 Default::default(),
             )))
-        } else if peer == Self::unresponsive_peer() {
+        } else if peer == Self::UNRESPONSIVE_PEER {
             // The real implementation generates a reqwest::Error, which can't be
             // created outside of the reqwest library. Generate a different error.
             Err(ClientError::InvalidRequest("unresponsive peer".to_owned()))
@@ -548,8 +516,8 @@ mod tests {
     use super::*;
     use crate::{
         errors::DiscoverPeersError,
-        peers::{FetchedArtifact, Peers},
-        reporter::ProgressReporter,
+        fetch::{FetchArtifactBackend, FetchedArtifact},
+        reporter::{ProgressReporter, ReportProgressBackend},
         test_helpers::{dummy_artifact_hash_id, with_test_runtime},
     };
 
@@ -588,25 +556,14 @@ mod tests {
                 ReceiverStream::new(report_receiver).collect::<Vec<_>>().await
             });
 
-            let reporter_log = logctx.log.clone();
-
-            let (progress_reporter, event_sender) =
-                ProgressReporter::new(&logctx.log, update_id, move || {
-                    let reporter_log = reporter_log.clone();
-                    let report_sender = report_sender.clone();
-
-                    async move {
-                        Ok(Peers::new(
-                            &reporter_log,
-                            Box::new(MockReportPeers::new(
-                                update_id,
-                                report_sender,
-                            )),
-                            // The timeout is currently unused by broadcast_report.
-                            Duration::from_secs(10),
-                        ))
-                    }
-                });
+            let (progress_reporter, event_sender) = ProgressReporter::new(
+                &logctx.log,
+                update_id,
+                ReportProgressBackend::new(
+                    &logctx.log,
+                    MockProgressBackend { update_id, report_sender },
+                ),
+            );
             let progress_handle = progress_reporter.start();
 
             let engine = UpdateEngine::new(&logctx.log, event_sender);
@@ -620,7 +577,7 @@ mod tests {
                         let artifact =
                             fetch_artifact(&cx, &log, attempts, timeout)
                                 .await?;
-                        let address = artifact.addr;
+                        let address = artifact.peer.address();
                         StepSuccess::new(artifact)
                             .with_metadata(
                                 InstallinatorCompletionMetadata::Download {
@@ -652,21 +609,21 @@ mod tests {
             match (expected_result, fetched_artifact) {
                 (
                     Ok((expected_attempt, expected_addr)),
-                    Ok(FetchedArtifact { attempt, addr, mut artifact }),
+                    Ok(FetchedArtifact { attempt, peer, mut artifact }),
                 ) => {
                     assert_eq!(
                         expected_attempt, attempt,
                         "expected successful attempt is the same as actual attempt"
                     );
                     assert_eq!(
-                        expected_addr, addr,
+                        expected_addr, peer,
                         "expected successful peer is the same as actual peer"
                     );
                     let artifact = artifact.copy_to_bytes(artifact.num_bytes());
                     assert_eq!(
                         expected_artifact, artifact,
                         "correct artifact fetched from peer {}",
-                        addr,
+                        peer,
                     );
                 }
                 (Err(_), Err(_)) => {}
@@ -691,7 +648,7 @@ mod tests {
     async fn fetch_artifact(
         cx: &StepContext,
         log: &slog::Logger,
-        attempts: impl IntoIterator<Item = Result<MockPeers>>,
+        attempts: impl IntoIterator<Item = Result<MockFetchBackend>>,
         timeout: Duration,
     ) -> Result<FetchedArtifact> {
         let mut attempts = attempts.into_iter();
@@ -699,9 +656,11 @@ mod tests {
             cx,
             log,
             || match attempts.next() {
-                Some(Ok(peers)) => {
-                    future::ok(Peers::new(&log, Box::new(peers), timeout))
-                }
+                Some(Ok(peers)) => future::ok(FetchArtifactBackend::new(
+                    &log,
+                    Box::new(peers),
+                    timeout,
+                )),
                 Some(Err(error)) => {
                     future::err(DiscoverPeersError::Retry(error))
                 }
@@ -716,7 +675,7 @@ mod tests {
 
     fn assert_reports(
         reports: &[EventReport],
-        expected_result: Result<(usize, SocketAddr), usize>,
+        expected_result: Result<(usize, PeerAddress), usize>,
     ) {
         let all_step_events: Vec<_> =
             reports.iter().flat_map(|report| &report.step_events).collect();
@@ -740,7 +699,7 @@ mod tests {
     fn assert_success_events(
         all_step_events: Vec<&StepEvent>,
         expected_attempt: usize,
-        expected_addr: SocketAddr,
+        expected_peer: PeerAddress,
     ) {
         let mut saw_success = false;
 
@@ -753,7 +712,8 @@ mod tests {
                                 peer,
                             } => {
                                 assert_ne!(
-                                    *peer, expected_addr,
+                                    *peer,
+                                    expected_peer.address(),
                                     "peer cannot match since this is the last attempt"
                                 );
                             }
@@ -795,7 +755,8 @@ mod tests {
                             ..
                         } => {
                             assert_eq!(
-                                *address, expected_addr,
+                                *address,
+                                expected_peer.address(),
                                 "address matches expected"
                             );
                         }
diff --git a/installinator/src/peers.rs b/installinator/src/peers.rs
index d18353c7702..867302baafb 100644
--- a/installinator/src/peers.rs
+++ b/installinator/src/peers.rs
@@ -4,36 +4,17 @@
 
 use std::{
     fmt,
-    future::Future,
-    net::{IpAddr, SocketAddr},
+    net::{AddrParseError, IpAddr, SocketAddr},
     str::FromStr,
-    time::Duration,
 };
 
 use anyhow::{Result, bail};
-use async_trait::async_trait;
-use buf_list::BufList;
-use bytes::Bytes;
-use display_error_chain::DisplayErrorChain;
-use futures::{Stream, StreamExt};
-use installinator_client::ClientError;
-use installinator_common::{
-    EventReport, InstallinatorProgressMetadata, StepContext, StepProgress,
-};
 use itertools::Itertools;
 use omicron_common::address::BOOTSTRAP_ARTIFACT_PORT;
 use omicron_ddm_admin_client::Client as DdmAdminClient;
-use reqwest::StatusCode;
 use sled_hardware_types::underlay::BootstrapInterface;
-use tokio::{sync::mpsc, time::Instant};
-use tufaceous_artifact::ArtifactHashId;
-use update_engine::events::ProgressUnits;
-use uuid::Uuid;
 
-use crate::{
-    artifact::ArtifactClient,
-    errors::{ArtifactFetchError, DiscoverPeersError, HttpError},
-};
+use crate::errors::DiscoverPeersError;
 
 /// A chosen discovery mechanism for peers, passed in over the command line.
 #[derive(Clone, Debug)]
@@ -42,7 +23,7 @@ pub(crate) enum DiscoveryMechanism {
     Bootstrap,
 
     /// A list of peers is manually specified.
-    List(Vec<SocketAddr>),
+    List(Vec<PeerAddress>),
 }
 
 impl DiscoveryMechanism {
@@ -50,11 +31,12 @@ impl DiscoveryMechanism {
     pub(crate) async fn discover_peers(
         &self,
         log: &slog::Logger,
-    ) -> Result<Box<dyn PeersImpl>, DiscoverPeersError> {
+    ) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
         let peers = match self {
             Self::Bootstrap => {
-                // XXX: consider adding aborts to this after a certain number of tries.
-
+                // Note: we do not abort this process and instead keep retrying
+                // forever. This attempts to ensure that we'll eventually find
+                // peers.
                 let ddm_admin_client =
                     DdmAdminClient::localhost(log).map_err(|err| {
                         DiscoverPeersError::Retry(anyhow::anyhow!(err))
@@ -72,17 +54,17 @@ impl DiscoveryMechanism {
                     })?;
                 addrs
                     .map(|addr| {
-                        SocketAddr::new(
+                        PeerAddress::new(SocketAddr::new(
                             IpAddr::V6(addr),
                             BOOTSTRAP_ARTIFACT_PORT,
-                        )
+                        ))
                     })
                     .collect()
             }
             Self::List(peers) => peers.clone(),
         };
 
-        Ok(Box::new(HttpPeers::new(log, peers)))
+        Ok(peers)
     }
 }
 
@@ -118,398 +100,33 @@ impl FromStr for DiscoveryMechanism {
     }
 }
 
-/// A fetched artifact.
-pub(crate) struct FetchedArtifact {
-    pub(crate) attempt: usize,
-    pub(crate) addr: SocketAddr,
-    pub(crate) artifact: BufList,
+#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)]
+#[cfg_attr(test, derive(test_strategy::Arbitrary))]
+pub(crate) struct PeerAddress {
+    address: SocketAddr,
 }
 
-impl FetchedArtifact {
-    /// In a loop, discover peers, and fetch from them.
-    ///
-    /// If `discover_fn` returns [`DiscoverPeersError::Retry`], this function will retry. If it
-    /// returns `DiscoverPeersError::Abort`, this function will exit with the underlying error.
-    pub(crate) async fn loop_fetch_from_peers<F, Fut>(
-        cx: &StepContext,
-        log: &slog::Logger,
-        mut discover_fn: F,
-        artifact_hash_id: &ArtifactHashId,
-    ) -> Result<Self>
-    where
-        F: FnMut() -> Fut,
-        Fut: Future<Output = Result<Peers, DiscoverPeersError>>,
-    {
-        // How long to sleep between retries if we fail to find a peer or fail
-        // to fetch an artifact from a found peer.
-        const RETRY_DELAY: Duration = Duration::from_secs(5);
-
-        let mut attempt = 0;
-        loop {
-            attempt += 1;
-            let peers = match discover_fn().await {
-                Ok(peers) => peers,
-                Err(DiscoverPeersError::Retry(error)) => {
-                    slog::warn!(
-                        log,
-                        "(attempt {attempt}) failed to discover peers, retrying: {}",
-                        DisplayErrorChain::new(
-                            AsRef::<dyn std::error::Error>::as_ref(&error)
-                        ),
-                    );
-                    cx.send_progress(StepProgress::retry(format!(
-                        "failed to discover peers: {error}"
-                    )))
-                    .await;
-                    tokio::time::sleep(RETRY_DELAY).await;
-                    continue;
-                }
-                Err(DiscoverPeersError::Abort(error)) => {
-                    return Err(error);
-                }
-            };
-
-            slog::info!(
-                log,
-                "discovered {} peers: [{}]",
-                peers.peer_count(),
-                peers.display(),
-            );
-            match peers.fetch_artifact(&cx, artifact_hash_id).await {
-                Some((addr, artifact)) => {
-                    return Ok(Self { attempt, addr, artifact });
-                }
-                None => {
-                    slog::warn!(
-                        log,
-                        "unable to fetch artifact from peers, retrying discovery",
-                    );
-                    cx.send_progress(StepProgress::retry(format!(
-                        "unable to fetch artifact from any of {} peers, retrying",
-                        peers.peer_count(),
-                    )))
-                    .await;
-                    tokio::time::sleep(RETRY_DELAY).await;
-                }
-            }
-        }
+impl PeerAddress {
+    pub(crate) const fn new(address: SocketAddr) -> Self {
+        Self { address }
     }
-}
 
-impl fmt::Debug for FetchedArtifact {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("FetchedArtifact")
-            .field("attempt", &self.attempt)
-            .field("addr", &self.addr)
-            .field(
-                "artifact",
-                &format!(
-                    "({} bytes in {} chunks)",
-                    self.artifact.num_bytes(),
-                    self.artifact.num_chunks()
-                ),
-            )
-            .finish()
+    pub(crate) fn address(&self) -> SocketAddr {
+        self.address
     }
 }
 
-#[derive(Debug)]
-pub(crate) struct Peers {
-    log: slog::Logger,
-    imp: Box<dyn PeersImpl>,
-    timeout: Duration,
-}
-
-impl Peers {
-    pub(crate) fn new(
-        log: &slog::Logger,
-        imp: Box<dyn PeersImpl>,
-        timeout: Duration,
-    ) -> Self {
-        let log = log.new(slog::o!("component" => "Peers"));
-        Self { log, imp, timeout }
-    }
-
-    pub(crate) async fn fetch_artifact(
-        &self,
-        cx: &StepContext,
-        artifact_hash_id: &ArtifactHashId,
-    ) -> Option<(SocketAddr, BufList)> {
-        // TODO: do we want a check phase that happens before the download?
-        let peers = self.peers();
-        let mut remaining_peers = self.peer_count();
-
-        let log = self.log.new(
-            slog::o!("artifact_hash_id" => format!("{artifact_hash_id:?}")),
-        );
-
-        slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers);
-
-        for peer in peers {
-            remaining_peers -= 1;
-
-            slog::debug!(
-                log,
-                "start fetch from peer {peer:?}"; "remaining_peers" => remaining_peers,
-            );
-
-            // Attempt to download data from this peer.
-            let start = Instant::now();
-            match self.fetch_from_peer(cx, peer, artifact_hash_id).await {
-                Ok(artifact_bytes) => {
-                    let elapsed = start.elapsed();
-                    slog::info!(
-                        log,
-                        "fetched artifact from peer {peer} in {elapsed:?}"
-                    );
-                    return Some((peer, artifact_bytes));
-                }
-                Err(error) => {
-                    let elapsed = start.elapsed();
-                    slog::warn!(
-                        log,
-                        "error after {elapsed:?}: {}",
-                        DisplayErrorChain::new(&error);
-                        "remaining_peers" => remaining_peers,
-                    );
-                }
-            }
-        }
-
-        None
-    }
-
-    pub(crate) fn peers(&self) -> impl Iterator<Item = SocketAddr> + '_ {
-        self.imp.peers()
-    }
-
-    pub(crate) fn peer_count(&self) -> usize {
-        self.imp.peer_count()
-    }
-
-    pub(crate) fn display(&self) -> impl fmt::Display {
-        self.peers().join(", ")
-    }
-
-    async fn fetch_from_peer(
-        &self,
-        cx: &StepContext,
-        peer: SocketAddr,
-        artifact_hash_id: &ArtifactHashId,
-    ) -> Result<BufList, ArtifactFetchError> {
-        let log = self.log.new(slog::o!("peer" => peer.to_string()));
-
-        let (total_bytes, mut receiver) = match self
-            .imp
-            .fetch_from_peer_impl(peer, artifact_hash_id.clone())
-            .await
-        {
-            Ok(x) => x,
-            Err(error) => {
-                cx.send_progress(StepProgress::Reset {
-                    metadata: InstallinatorProgressMetadata::Download { peer },
-                    message: error.to_string().into(),
-                })
-                .await;
-                return Err(ArtifactFetchError::HttpError { peer, error });
-            }
-        };
-
-        let mut artifact_bytes = BufList::new();
-        let mut downloaded_bytes = 0u64;
-        let metadata = InstallinatorProgressMetadata::Download { peer };
-
-        loop {
-            match tokio::time::timeout(self.timeout, receiver.recv()).await {
-                Ok(Some(Ok(bytes))) => {
-                    slog::debug!(
-                        &log,
-                        "received chunk of {} bytes from peer",
-                        bytes.len()
-                    );
-                    downloaded_bytes += bytes.len() as u64;
-                    artifact_bytes.push_chunk(bytes);
-                    cx.send_progress(StepProgress::with_current_and_total(
-                        downloaded_bytes,
-                        total_bytes,
-                        ProgressUnits::BYTES,
-                        metadata.clone(),
-                    ))
-                    .await;
-                }
-                Ok(Some(Err(error))) => {
-                    slog::debug!(
-                        &log,
-                        "received error from peer, sending cancellation: {}",
-                        DisplayErrorChain::new(&error),
-                    );
-                    cx.send_progress(StepProgress::Reset {
-                        metadata: metadata.clone(),
-                        message: error.to_string().into(),
-                    })
-                    .await;
-                    return Err(ArtifactFetchError::HttpError {
-                        peer,
-                        error: error.into(),
-                    });
-                }
-                Ok(None) => {
-                    // The entire artifact has been downloaded.
-                    break;
-                }
-                Err(_) => {
-                    // The operation timed out.
-                    cx.send_progress(StepProgress::Reset {
-                        metadata,
-                        message: format!(
-                            "operation timed out ({:?})",
-                            self.timeout
-                        )
-                        .into(),
-                    })
-                    .await;
-                    return Err(ArtifactFetchError::Timeout {
-                        peer,
-                        timeout: self.timeout,
-                        bytes_fetched: artifact_bytes.num_bytes(),
-                    });
-                }
-            }
-        }
-
-        // Check that the artifact size matches the returned size.
-        if total_bytes != artifact_bytes.num_bytes() as u64 {
-            let error = ArtifactFetchError::SizeMismatch {
-                artifact_size: total_bytes,
-                downloaded_bytes,
-            };
-            cx.send_progress(StepProgress::reset(metadata, error.to_string()))
-                .await;
-            return Err(error);
-        }
-
-        Ok(artifact_bytes)
-    }
-
-    pub(crate) fn broadcast_report(
-        &self,
-        update_id: Uuid,
-        report: EventReport,
-    ) -> impl Stream<Item = Result<(), ClientError>> + Send + '_ {
-        futures::stream::iter(self.peers())
-            .map(move |peer| {
-                let report = report.clone();
-                self.send_report_to_peer(peer, update_id, report)
-            })
-            .buffer_unordered(8)
-    }
-
-    async fn send_report_to_peer(
-        &self,
-        peer: SocketAddr,
-        update_id: Uuid,
-        report: EventReport,
-    ) -> Result<(), ClientError> {
-        let log = self.log.new(slog::o!("peer" => peer.to_string()));
-        // For each peer, report it to the network.
-        match self.imp.report_progress_impl(peer, update_id, report).await {
-            Ok(()) => Ok(()),
-            Err(err) => {
-                // Error 422 means that the server didn't accept the update ID.
-                if err.status() == Some(StatusCode::UNPROCESSABLE_ENTITY) {
-                    slog::debug!(
-                        log,
-                        "received HTTP 422 Unprocessable Entity \
-                         for update ID {update_id} (update ID unrecognized)",
-                    );
-                } else if err.status() == Some(StatusCode::GONE) {
-                    // XXX If we establish a 1:1 relationship
-                    // between a particular instance of wicketd and
-                    // installinator, 410 Gone can be used to abort
-                    // the update. But we don't have that kind of
-                    // relationship at the moment.
-                    slog::warn!(
-                        log,
-                        "received HTTP 410 Gone for update ID {update_id} \
-                         (receiver closed)",
-                    );
-                } else {
-                    slog::warn!(
-                        log,
-                        "received HTTP error code {:?} for update ID {update_id}",
-                        err.status()
-                    );
-                }
-                Err(err)
-            }
-        }
-    }
-}
-
-#[async_trait]
-pub(crate) trait PeersImpl: fmt::Debug + Send + Sync {
-    fn peers(&self) -> Box<dyn Iterator<Item = SocketAddr> + Send + '_>;
-    fn peer_count(&self) -> usize;
-
-    /// Returns (size, receiver) on success, and an error on failure.
-    async fn fetch_from_peer_impl(
-        &self,
-        peer: SocketAddr,
-        artifact_hash_id: ArtifactHashId,
-    ) -> Result<(u64, FetchReceiver), HttpError>;
-
-    async fn report_progress_impl(
-        &self,
-        peer: SocketAddr,
-        update_id: Uuid,
-        report: EventReport,
-    ) -> Result<(), ClientError>;
-}
-
-/// The send side of the channel over which data is sent.
-pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>;
-
-/// A [`PeersImpl`] that uses HTTP to fetch artifacts from peers. This is the real implementation.
-#[derive(Clone, Debug)]
-pub(crate) struct HttpPeers {
-    log: slog::Logger,
-    peers: Vec<SocketAddr>,
-}
-
-impl HttpPeers {
-    pub(crate) fn new(log: &slog::Logger, peers: Vec<SocketAddr>) -> Self {
-        let log = log.new(slog::o!("component" => "HttpPeers"));
-        Self { log, peers }
+impl fmt::Display for PeerAddress {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.address.fmt(f)
     }
 }
 
-#[async_trait]
-impl PeersImpl for HttpPeers {
-    fn peers(&self) -> Box<dyn Iterator<Item = SocketAddr> + Send + '_> {
-        Box::new(self.peers.iter().copied())
-    }
+impl FromStr for PeerAddress {
+    type Err = AddrParseError;
 
-    fn peer_count(&self) -> usize {
-        self.peers.len()
-    }
-
-    async fn fetch_from_peer_impl(
-        &self,
-        peer: SocketAddr,
-        artifact_hash_id: ArtifactHashId,
-    ) -> Result<(u64, FetchReceiver), HttpError> {
-        // TODO: be able to fetch from sled-agent clients as well
-        let artifact_client = ArtifactClient::new(peer, &self.log);
-        artifact_client.fetch(artifact_hash_id).await
-    }
-
-    async fn report_progress_impl(
-        &self,
-        peer: SocketAddr,
-        update_id: Uuid,
-        report: EventReport,
-    ) -> Result<(), ClientError> {
-        let artifact_client = ArtifactClient::new(peer, &self.log);
-        artifact_client.report_progress(update_id, report).await
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        let address = s.parse()?;
+        Ok(Self { address })
     }
 }
diff --git a/installinator/src/reporter.rs b/installinator/src/reporter.rs
index 47ff4a00625..73397e0759e 100644
--- a/installinator/src/reporter.rs
+++ b/installinator/src/reporter.rs
@@ -4,21 +4,29 @@
 
 //! Code to report events to the artifact server.
 
-use std::time::Duration;
+use std::{fmt, sync::Arc, time::Duration};
 
+use async_trait::async_trait;
 use display_error_chain::DisplayErrorChain;
-use futures::{Future, StreamExt};
-use installinator_common::{Event, EventBuffer};
+use futures::StreamExt;
+use http::StatusCode;
+use installinator_client::ClientError;
+use installinator_common::{Event, EventBuffer, EventReport};
 use tokio::{sync::mpsc, task::JoinHandle, time};
+use update_engine::AsError;
 use uuid::Uuid;
 
-use crate::{errors::DiscoverPeersError, peers::Peers};
+use crate::{
+    artifact::ArtifactClient,
+    errors::DiscoverPeersError,
+    peers::{DiscoveryMechanism, PeerAddress},
+};
 
 #[derive(Debug)]
-pub(crate) struct ProgressReporter<F> {
+pub(crate) struct ProgressReporter {
     log: slog::Logger,
     update_id: Uuid,
-    discover_fn: F,
+    report_backend: ReportProgressBackend,
     // Receives updates about progress and completion.
     event_receiver: mpsc::Receiver<Event>,
     buffer: EventBuffer,
@@ -26,27 +34,23 @@ pub(crate) struct ProgressReporter<F> {
     on_tick_task: Option<JoinHandle<Option<Option<usize>>>>,
 }
 
-impl<F, Fut> ProgressReporter<F>
-where
-    F: 'static + Clone + Send + FnMut() -> Fut,
-    Fut: Future<Output = Result<Peers, DiscoverPeersError>> + Send,
-{
+impl ProgressReporter {
     pub(crate) fn new(
         log: &slog::Logger,
         update_id: Uuid,
-        discover_fn: F,
+        report_backend: ReportProgressBackend,
     ) -> (Self, mpsc::Sender<Event>) {
         let (event_sender, event_receiver) = update_engine::channel();
         let ret = Self {
             log: log.new(slog::o!("component" => "EventReporter")),
             update_id,
-            discover_fn,
             event_receiver,
             // We have to keep max_low_priority low since a bigger number will
             // cause a payload that's too large.
             buffer: EventBuffer::new(8),
             last_reported: None,
             on_tick_task: None,
+            report_backend,
         };
         (ret, event_sender)
     }
@@ -90,23 +94,31 @@ where
 
     fn spawn_on_tick_task(&self) -> JoinHandle<Option<Option<usize>>> {
         let report = self.buffer.generate_report();
-        let mut discover_fn = self.discover_fn.clone();
         let update_id = self.update_id;
         let log = self.log.clone();
+        let report_backend = self.report_backend.clone();
 
         tokio::spawn(async move {
-            let peers = match (discover_fn)().await {
+            let peers = match report_backend.discover_peers().await {
                 Ok(peers) => peers,
-                Err(error) => {
-                    // Ignore DiscoverPeersError::Abort here because the
-                    // installinator must keep retrying.
+                Err(DiscoverPeersError::Retry(error)) => {
                     slog::warn!(
                         log,
                         "failed to discover peers: {}",
-                        DisplayErrorChain::new(&error),
+                        DisplayErrorChain::new(error.as_error()),
                     );
                     return None;
                 }
+                #[cfg(test)]
+                Err(DiscoverPeersError::Abort(_)) => {
+                    // This is not possible, since test implementations don't
+                    // currently generate Abort errors during reporter
+                    // discovery.
+                    unreachable!(
+                        "DiscoverPeersError::Abort is not generated for the \
+                         reporter by test implementations"
+                    )
+                }
             };
 
             // We could use StreamExt::any() here, but we're choosing to avoid
@@ -117,8 +129,18 @@ where
             // that if two servers both say, only one of them will
             // deterministically get the update. Need to decide post-PVT1.
             let last_reported = report.last_seen;
-            let results: Vec<_> =
-                peers.broadcast_report(update_id, report).collect().await;
+            let results: Vec<_> = futures::stream::iter(peers)
+                .map(|peer| {
+                    report_backend.send_report_to_peer(
+                        peer,
+                        update_id,
+                        report.clone(),
+                    )
+                })
+                .buffer_unordered(8)
+                .collect()
+                .await;
+
             if results.iter().any(|res| res.is_ok()) {
                 Some(last_reported)
             } else {
@@ -148,3 +170,135 @@ where
         self.on_tick_task = Some(self.spawn_on_tick_task());
     }
 }
+
+#[derive(Clone, Debug)]
+pub(crate) struct ReportProgressBackend {
+    log: slog::Logger,
+    imp: Arc<dyn ReportProgressImpl>,
+}
+
+impl ReportProgressBackend {
+    pub(crate) fn new<P: ReportProgressImpl + 'static>(
+        log: &slog::Logger,
+        imp: P,
+    ) -> Self {
+        let log = log.new(slog::o!("component" => "ReportProgressBackend"));
+        Self { log, imp: Arc::new(imp) }
+    }
+
+    pub(crate) async fn discover_peers(
+        &self,
+    ) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
+        let log = self.log.new(slog::o!("task" => "discover_peers"));
+        slog::debug!(log, "discovering peers");
+
+        self.imp.discover_peers().await
+    }
+
+    pub(crate) async fn send_report_to_peer(
+        &self,
+        peer: PeerAddress,
+        update_id: Uuid,
+        report: EventReport,
+    ) -> Result<SendReportStatus, ClientError> {
+        let log = self.log.new(slog::o!("peer" => peer.to_string()));
+        // For each peer, report it to the network.
+        match self.imp.report_progress_impl(peer, update_id, report).await {
+            Ok(()) => {
+                slog::debug!(log, "sent report to peer");
+                Ok(SendReportStatus::Processed)
+            }
+            Err(err) => {
+                // Error 422 means that the server didn't accept the update ID.
+                if err.status() == Some(StatusCode::UNPROCESSABLE_ENTITY) {
+                    slog::debug!(
+                        log,
+                        "received HTTP 422 Unprocessable Entity \
+                         for update ID {update_id} (returning UnknownUpdateId)",
+                    );
+                    Ok(SendReportStatus::UnknownUpdateId)
+                } else if err.status() == Some(StatusCode::GONE) {
+                    // XXX If we establish a 1:1 relationship between a
+                    // particular instance of wicketd and installinator, 410
+                    // Gone can be used to abort the update. But we don't have
+                    // that kind of relationship at the moment.
+                    slog::warn!(
+                        log,
+                        "received HTTP 410 Gone for update ID {update_id} \
+                         (peer closed, returning PeerFinished)",
+                    );
+                    Ok(SendReportStatus::PeerFinished)
+                } else {
+                    slog::warn!(
+                        log,
+                        "received HTTP error code {:?} for update ID {update_id}",
+                        err.status()
+                    );
+                    Err(err)
+                }
+            }
+        }
+    }
+}
+
+#[derive(Clone, Copy, Debug)]
+#[must_use]
+pub(crate) enum SendReportStatus {
+    /// The peer accepted the report.
+    Processed,
+
+    /// The peer could not process the report.
+    UnknownUpdateId,
+
+    /// The peer indicated that it had finished processing all reports.
+    PeerFinished,
+}
+
+#[async_trait]
+pub(crate) trait ReportProgressImpl: fmt::Debug + Send + Sync {
+    async fn discover_peers(
+        &self,
+    ) -> Result<Vec<PeerAddress>, DiscoverPeersError>;
+
+    async fn report_progress_impl(
+        &self,
+        peer: PeerAddress,
+        update_id: Uuid,
+        report: EventReport,
+    ) -> Result<(), ClientError>;
+}
+
+#[derive(Debug)]
+pub(crate) struct HttpProgressBackend {
+    log: slog::Logger,
+    discovery: DiscoveryMechanism,
+}
+
+impl HttpProgressBackend {
+    pub(crate) fn new(
+        log: &slog::Logger,
+        discovery: DiscoveryMechanism,
+    ) -> Self {
+        let log = log.new(slog::o!("component" => "HttpProgressBackend"));
+        Self { log, discovery }
+    }
+}
+
+#[async_trait]
+impl ReportProgressImpl for HttpProgressBackend {
+    async fn discover_peers(
+        &self,
+    ) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
+        self.discovery.discover_peers(&self.log).await
+    }
+
+    async fn report_progress_impl(
+        &self,
+        peer: PeerAddress,
+        update_id: Uuid,
+        report: EventReport,
+    ) -> Result<(), ClientError> {
+        let artifact_client = ArtifactClient::new(peer.address(), &self.log);
+        artifact_client.report_progress(update_id, report).await
+    }
+}

From 835c1032ea3241154f4c9c09249fa9ccf8ecac03 Mon Sep 17 00:00:00 2001
From: Rain <rain@oxide.computer>
Date: Wed, 23 Apr 2025 19:24:08 -0700
Subject: [PATCH 2/2] [spr] changes introduced through rebase

Created using spr 1.3.6-beta.1

[skip ci]
---
 installinator/src/fetch.rs | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/installinator/src/fetch.rs b/installinator/src/fetch.rs
index 6659051b945..3901f38c990 100644
--- a/installinator/src/fetch.rs
+++ b/installinator/src/fetch.rs
@@ -323,7 +323,9 @@ pub(crate) trait FetchArtifactImpl: fmt::Debug + Send + Sync {
 /// The send side of the channel over which data is sent.
 pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>;
 
-/// A [`PeersImpl`] that uses HTTP to fetch artifacts from peers. This is the real implementation.
+/// A [`FetchArtifactImpl`] that uses HTTP to fetch artifacts from peers.
+///
+/// This is the real implementation.
 #[derive(Clone, Debug)]
 pub(crate) struct HttpFetchBackend {
     log: slog::Logger,