Skip to content

Commit 7809811

Browse files
authored
[6/n] simplify FetchArtifactImpl interface (#8040)
1 parent 20e3202 commit 7809811

File tree

5 files changed

+91
-65
lines changed

5 files changed

+91
-65
lines changed

installinator/src/dispatch.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,15 @@ struct DebugDiscoverOpts {
9191

9292
impl DebugDiscoverOpts {
9393
async fn exec(self, log: &slog::Logger) -> Result<()> {
94-
let peers = FetchArtifactBackend::new(
94+
let backend = FetchArtifactBackend::new(
9595
log,
9696
Box::new(HttpFetchBackend::new(
9797
&log,
9898
self.opts.mechanism.discover_peers(&log).await?,
9999
)),
100100
Duration::from_secs(10),
101101
);
102-
println!("discovered peers: {}", peers.display());
102+
println!("discovered peers: {}", backend.peers().display());
103103
Ok(())
104104
}
105105
}

installinator/src/fetch.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@ use installinator_client::ClientError;
1515
use installinator_common::{
1616
InstallinatorProgressMetadata, StepContext, StepProgress,
1717
};
18-
use itertools::Itertools;
1918
use tokio::{sync::mpsc, time::Instant};
2019
use tufaceous_artifact::ArtifactHashId;
2120
use update_engine::events::ProgressUnits;
2221

2322
use crate::{
2423
artifact::ArtifactClient,
2524
errors::{ArtifactFetchError, DiscoverPeersError, HttpError},
26-
peers::PeerAddress,
25+
peers::{PeerAddress, PeerAddresses},
2726
};
2827

2928
/// A fetched artifact.
@@ -55,7 +54,7 @@ impl FetchedArtifact {
5554
let mut attempt = 0;
5655
loop {
5756
attempt += 1;
58-
let peers = match discover_fn().await {
57+
let fetch_backend = match discover_fn().await {
5958
Ok(peers) => peers,
6059
Err(DiscoverPeersError::Retry(error)) => {
6160
slog::warn!(
@@ -78,13 +77,14 @@ impl FetchedArtifact {
7877
}
7978
};
8079

80+
let peers = fetch_backend.peers();
8181
slog::info!(
8282
log,
8383
"discovered {} peers: [{}]",
84-
peers.peer_count(),
84+
peers.len(),
8585
peers.display(),
8686
);
87-
match peers.fetch_artifact(&cx, artifact_hash_id).await {
87+
match fetch_backend.fetch_artifact(&cx, artifact_hash_id).await {
8888
Some((peer, artifact)) => {
8989
return Ok(Self { attempt, peer, artifact });
9090
}
@@ -95,7 +95,7 @@ impl FetchedArtifact {
9595
);
9696
cx.send_progress(StepProgress::retry(format!(
9797
"unable to fetch artifact from any of {} peers, retrying",
98-
peers.peer_count(),
98+
peers.len(),
9999
)))
100100
.await;
101101
tokio::time::sleep(RETRY_DELAY).await;
@@ -146,15 +146,15 @@ impl FetchArtifactBackend {
146146
) -> Option<(PeerAddress, BufList)> {
147147
// TODO: do we want a check phase that happens before the download?
148148
let peers = self.peers();
149-
let mut remaining_peers = self.peer_count();
149+
let mut remaining_peers = peers.len();
150150

151151
let log = self.log.new(
152152
slog::o!("artifact_hash_id" => format!("{artifact_hash_id:?}")),
153153
);
154154

155155
slog::debug!(log, "start fetch from peers"; "remaining_peers" => remaining_peers);
156156

157-
for peer in peers {
157+
for &peer in peers.peers() {
158158
remaining_peers -= 1;
159159

160160
slog::debug!(
@@ -188,18 +188,10 @@ impl FetchArtifactBackend {
188188
None
189189
}
190190

191-
pub(crate) fn peers(&self) -> impl Iterator<Item = PeerAddress> + '_ {
191+
pub(crate) fn peers(&self) -> &PeerAddresses {
192192
self.imp.peers()
193193
}
194194

195-
pub(crate) fn peer_count(&self) -> usize {
196-
self.imp.peer_count()
197-
}
198-
199-
pub(crate) fn display(&self) -> impl fmt::Display {
200-
self.peers().join(", ")
201-
}
202-
203195
async fn fetch_from_peer(
204196
&self,
205197
cx: &StepContext,
@@ -307,10 +299,16 @@ impl FetchArtifactBackend {
307299
}
308300
}
309301

302+
/// Backend implementation for fetching artifacts.
303+
///
304+
/// Note: While [`crate::reporter::ReportProgressImpl`] is a persistent
305+
/// structure, a new `FetchArtifactImpl` is generated separately each time
306+
/// discovery occurs. We should align this with `ReportProgressImpl` in the
307+
/// future, though we'd need some way of looking up delay information in
308+
/// `mock_peers`.
310309
#[async_trait]
311310
pub(crate) trait FetchArtifactImpl: fmt::Debug + Send + Sync {
312-
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_>;
313-
fn peer_count(&self) -> usize;
311+
fn peers(&self) -> &PeerAddresses;
314312

315313
/// Returns (size, receiver) on success, and an error on failure.
316314
async fn fetch_from_peer_impl(
@@ -329,24 +327,20 @@ pub(crate) type FetchReceiver = mpsc::Receiver<Result<Bytes, ClientError>>;
329327
#[derive(Clone, Debug)]
330328
pub(crate) struct HttpFetchBackend {
331329
log: slog::Logger,
332-
peers: Vec<PeerAddress>,
330+
peers: PeerAddresses,
333331
}
334332

335333
impl HttpFetchBackend {
336-
pub(crate) fn new(log: &slog::Logger, peers: Vec<PeerAddress>) -> Self {
334+
pub(crate) fn new(log: &slog::Logger, peers: PeerAddresses) -> Self {
337335
let log = log.new(slog::o!("component" => "HttpPeers"));
338336
Self { log, peers }
339337
}
340338
}
341339

342340
#[async_trait]
343341
impl FetchArtifactImpl for HttpFetchBackend {
344-
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
345-
Box::new(self.peers.iter().copied())
346-
}
347-
348-
fn peer_count(&self) -> usize {
349-
self.peers.len()
342+
fn peers(&self) -> &PeerAddresses {
343+
&self.peers
350344
}
351345

352346
async fn fetch_from_peer_impl(

installinator/src/mock_peers.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use uuid::Uuid;
2828
use crate::{
2929
errors::{DiscoverPeersError, HttpError},
3030
fetch::{FetchArtifactImpl, FetchReceiver},
31-
peers::PeerAddress,
31+
peers::{PeerAddress, PeerAddresses},
3232
reporter::ReportProgressImpl,
3333
};
3434

@@ -143,10 +143,10 @@ impl MockPeersUniverse {
143143
.then(|| (*addr, peer.clone()))
144144
})
145145
.collect();
146-
Ok(MockFetchBackend {
147-
artifact: self.artifact.clone(),
146+
Ok(MockFetchBackend::new(
147+
self.artifact.clone(),
148148
selected_peers,
149-
})
149+
))
150150
}
151151
AttemptBitmap::Failure => {
152152
bail!(
@@ -177,20 +177,27 @@ struct MockFetchBackend {
177177
artifact: Bytes,
178178
// Peers within the universe that have been selected
179179
selected_peers: BTreeMap<PeerAddress, MockPeer>,
180+
// selected_peers keys stored in a suitable form for the
181+
// FetchArtifactImpl trait
182+
peer_addresses: PeerAddresses,
180183
}
181184

182185
impl MockFetchBackend {
183-
fn get(&self, peer: PeerAddress) -> Option<&MockPeer> {
184-
self.selected_peers.get(&peer)
186+
fn new(
187+
artifact: Bytes,
188+
selected_peers: BTreeMap<PeerAddress, MockPeer>,
189+
) -> Self {
190+
let peer_addresses = selected_peers.keys().copied().collect();
191+
Self { artifact, selected_peers, peer_addresses }
185192
}
186193

187-
fn peers(&self) -> impl Iterator<Item = (&PeerAddress, &MockPeer)> + '_ {
188-
self.selected_peers.iter()
194+
fn get(&self, peer: PeerAddress) -> Option<&MockPeer> {
195+
self.selected_peers.get(&peer)
189196
}
190197

191198
/// Returns the peer that can return the entire dataset within the timeout.
192199
fn successful_peer(&self, timeout: Duration) -> Option<PeerAddress> {
193-
self.peers()
200+
self.selected_peers.iter()
194201
.filter_map(|(addr, peer)| {
195202
if peer.artifact != self.artifact {
196203
// We don't handle the case where the peer returns the wrong artifact yet.
@@ -235,12 +242,8 @@ impl MockFetchBackend {
235242

236243
#[async_trait]
237244
impl FetchArtifactImpl for MockFetchBackend {
238-
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
239-
Box::new(self.selected_peers.keys().copied())
240-
}
241-
242-
fn peer_count(&self) -> usize {
243-
self.selected_peers.len()
245+
fn peers(&self) -> &PeerAddresses {
246+
&self.peer_addresses
244247
}
245248

246249
async fn fetch_from_peer_impl(
@@ -478,8 +481,10 @@ impl MockProgressBackend {
478481
impl ReportProgressImpl for MockProgressBackend {
479482
async fn discover_peers(
480483
&self,
481-
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
482-
Ok(vec![Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER])
484+
) -> Result<PeerAddresses, DiscoverPeersError> {
485+
Ok([Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER]
486+
.into_iter()
487+
.collect())
483488
}
484489

485490
async fn report_progress_impl(

installinator/src/peers.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44

55
use std::{
6+
collections::BTreeSet,
67
fmt,
78
net::{AddrParseError, IpAddr, SocketAddr},
89
str::FromStr,
@@ -31,7 +32,7 @@ impl DiscoveryMechanism {
3132
pub(crate) async fn discover_peers(
3233
&self,
3334
log: &slog::Logger,
34-
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
35+
) -> Result<PeerAddresses, DiscoverPeersError> {
3536
let peers = match self {
3637
Self::Bootstrap => {
3738
// Note: we do not abort this process and instead keep retrying
@@ -61,7 +62,7 @@ impl DiscoveryMechanism {
6162
})
6263
.collect()
6364
}
64-
Self::List(peers) => peers.clone(),
65+
Self::List(peers) => peers.iter().copied().collect(),
6566
};
6667

6768
Ok(peers)
@@ -100,6 +101,32 @@ impl FromStr for DiscoveryMechanism {
100101
}
101102
}
102103

104+
#[derive(Clone, Debug)]
105+
pub(crate) struct PeerAddresses {
106+
peers: BTreeSet<PeerAddress>,
107+
}
108+
109+
impl PeerAddresses {
110+
pub(crate) fn peers(&self) -> &BTreeSet<PeerAddress> {
111+
&self.peers
112+
}
113+
114+
pub(crate) fn len(&self) -> usize {
115+
self.peers.len()
116+
}
117+
118+
pub(crate) fn display(&self) -> impl fmt::Display {
119+
self.peers().iter().join(", ")
120+
}
121+
}
122+
123+
impl FromIterator<PeerAddress> for PeerAddresses {
124+
fn from_iter<I: IntoIterator<Item = PeerAddress>>(iter: I) -> Self {
125+
let peers = iter.into_iter().collect::<BTreeSet<_>>();
126+
Self { peers }
127+
}
128+
}
129+
103130
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Ord, Eq)]
104131
#[cfg_attr(test, derive(test_strategy::Arbitrary))]
105132
pub(crate) struct PeerAddress {

installinator/src/reporter.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use uuid::Uuid;
1919
use crate::{
2020
artifact::ArtifactClient,
2121
errors::DiscoverPeersError,
22-
peers::{DiscoveryMechanism, PeerAddress},
22+
peers::{DiscoveryMechanism, PeerAddress, PeerAddresses},
2323
};
2424

2525
#[derive(Debug)]
@@ -129,17 +129,18 @@ impl ProgressReporter {
129129
// that if two servers both say, only one of them will
130130
// deterministically get the update. Need to decide post-PVT1.
131131
let last_reported = report.last_seen;
132-
let results: Vec<_> = futures::stream::iter(peers)
133-
.map(|peer| {
134-
report_backend.send_report_to_peer(
135-
peer,
136-
update_id,
137-
report.clone(),
138-
)
139-
})
140-
.buffer_unordered(8)
141-
.collect()
142-
.await;
132+
let results: Vec<_> =
133+
futures::stream::iter(peers.peers().iter().copied())
134+
.map(|peer| {
135+
report_backend.send_report_to_peer(
136+
peer,
137+
update_id,
138+
report.clone(),
139+
)
140+
})
141+
.buffer_unordered(8)
142+
.collect()
143+
.await;
143144

144145
if results.iter().any(|res| res.is_ok()) {
145146
Some(last_reported)
@@ -188,7 +189,7 @@ impl ReportProgressBackend {
188189

189190
pub(crate) async fn discover_peers(
190191
&self,
191-
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
192+
) -> Result<PeerAddresses, DiscoverPeersError> {
192193
let log = self.log.new(slog::o!("task" => "discover_peers"));
193194
slog::debug!(log, "discovering peers");
194195

@@ -255,9 +256,8 @@ pub(crate) enum SendReportStatus {
255256

256257
#[async_trait]
257258
pub(crate) trait ReportProgressImpl: fmt::Debug + Send + Sync {
258-
async fn discover_peers(
259-
&self,
260-
) -> Result<Vec<PeerAddress>, DiscoverPeersError>;
259+
async fn discover_peers(&self)
260+
-> Result<PeerAddresses, DiscoverPeersError>;
261261

262262
async fn report_progress_impl(
263263
&self,
@@ -287,7 +287,7 @@ impl HttpProgressBackend {
287287
impl ReportProgressImpl for HttpProgressBackend {
288288
async fn discover_peers(
289289
&self,
290-
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
290+
) -> Result<PeerAddresses, DiscoverPeersError> {
291291
self.discovery.discover_peers(&self.log).await
292292
}
293293

0 commit comments

Comments
 (0)