Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion common/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@ macro_rules! if_local {
)*}
}

/// Convenience macro to easily export items for d14n
#[macro_export]
macro_rules! if_test {
($($item:item)*) => {$(
#[cfg(any(test, feature = "test-utils"))]
$item
)*}
}

// cfg only test but not any extra test-utils features
#[macro_export]
macro_rules! if_only_test {
($($item:item)*) => {$(
#[cfg(test)]
$item
)*}
}
3 changes: 2 additions & 1 deletion common/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ impl ExponentialBackoffBuilder {
duration: self.duration.unwrap_or(Duration::from_millis(25)),
max_jitter: self.max_jitter.unwrap_or(Duration::from_millis(25)),
multiplier: self.multiplier.unwrap_or(3),
..Default::default()
total_wait_max: self.total_wait_max.unwrap_or_default(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was making the builder not use a max wait. didn't effect anything because we construct the strategy directly rather than use builder

individual_wait_max: Default::default(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

individual_wait_max defaults to Duration::ZERO in build(), so backoff() clamps base waits to zero and only jitter remains. Consider defaulting it to the same as ExponentialBackoff::default() (30s) to keep expected timing.

Suggested change
individual_wait_max: Default::default(),
individual_wait_max: ExponentialBackoff::default().individual_wait_max,

🚀 Reply to ask Macroscope to explain or update this suggestion.

👍 Helpful? React to give us feedback.

}
}
}
Expand Down
2 changes: 1 addition & 1 deletion xmtp_api_d14n/src/protocol/impls/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl VectorClock for GlobalCursor {

fn merge_least(&mut self, other: &Self) {
for (&node, &seq) in other {
let entry = self.entry(node).or_insert(0);
let entry = self.entry(node).or_insert(seq);
*entry = (*entry).min(seq);
}
}
Expand Down
2 changes: 2 additions & 0 deletions xmtp_api_d14n/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ pub mod types;

mod order;
pub use order::*;

mod utils;
2 changes: 1 addition & 1 deletion xmtp_api_d14n/src/protocol/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ where
.collect::<HashSet<_>>();
// if the resolver fails to resolve some envelopes, ignore them.
// delete unresolved envelopes from missing envelopes list.
// cannot use retain directly b/c curosr returns Result<>.
// cannot use retain directly b/c cursor returns Result<>.
// see https://github.com/xmtp/libxmtp/issues/2691
// TODO:2691
let mut to_remove = HashSet::new();
Expand Down
117 changes: 98 additions & 19 deletions xmtp_api_d14n/src/protocol/resolve/network_backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use crate::{
types::MissingEnvelope,
},
};
use derive_builder::Builder;
use itertools::Itertools;
use tracing::warn;
use xmtp_common::{ExponentialBackoff, Strategy};
use xmtp_common::{ExponentialBackoff, RetryableError, Strategy};
use xmtp_configuration::MAX_PAGE_SIZE;
use xmtp_proto::{
api::{Client, Query},
Expand All @@ -19,50 +18,49 @@ use xmtp_proto::{
};

/// try resolve d14n dependencies based on a backoff strategy
#[derive(Clone, Debug, Builder)]
#[builder(setter(strip_option), build_fn(error = "ResolutionError"))]
#[derive(Clone, Debug)]
pub struct NetworkBackoffResolver<ApiClient> {
client: ApiClient,
backoff: ExponentialBackoff,
}

impl<ApiClient: Clone> NetworkBackoffResolver<ApiClient> {
pub fn builder() -> NetworkBackoffResolverBuilder<ApiClient> {
NetworkBackoffResolverBuilder::default()
pub fn network_backoff<ApiClient>(client: &ApiClient) -> NetworkBackoffResolver<&ApiClient> {
NetworkBackoffResolver {
client,
backoff: ExponentialBackoff::default(),
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<ApiClient: Client> ResolveDependencies for NetworkBackoffResolver<ApiClient> {
impl<ApiClient> ResolveDependencies for NetworkBackoffResolver<ApiClient>
where
ApiClient: Client,
<ApiClient as Client>::Error: RetryableError,
{
type ResolvedEnvelope = OriginatorEnvelope;
/// Resolve dependencies, starting with a list of dependencies. Should try to resolve
/// all dependents after `dependency`, if `Dependency` is missing as well.
/// * Once resolved, these dependencies may have missing dependencies of their own.
/// # Returns
/// * `HashSet<Self::ResolvedEnvelope>`: The list of envelopes which were resolved.
async fn resolve(
&mut self,
&self,
mut missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError> {
let mut attempts = 0;
let time_spent = xmtp_common::time::Instant::now();
let mut resolved = Vec::new();
while !missing.is_empty() {
if let Some(wait_for) = self.backoff.backoff(attempts, time_spent) {
tracing::info!("waiting for {:?}", wait_for);
xmtp_common::time::sleep(wait_for).await;
attempts += 1;
} else {
missing.iter().for_each(|m| {
warn!(
"dropping missing dependency {} due to lack of resolution",
m
);
});
return Ok(Resolved {
envelopes: resolved,
unresolved: Some(missing),
warn!("dropping dependency {}, could not resolve", m);
});
break;
}
let (topics, lcc) = lcc(&missing);
let envelopes = QueryEnvelope::builder()
Expand All @@ -72,7 +70,7 @@ impl<ApiClient: Client> ResolveDependencies for NetworkBackoffResolver<ApiClient
.build()?
.query(&self.client)
.await
.map_err(|e| ResolutionError::Api(Box::new(e)))?
.map_err(ResolutionError::api)?
.envelopes;
let got = envelopes
.iter()
Expand All @@ -83,7 +81,7 @@ impl<ApiClient: Client> ResolveDependencies for NetworkBackoffResolver<ApiClient
}
Ok(Resolved {
envelopes: resolved,
unresolved: None,
unresolved: (!missing.is_empty()).then_some(missing),
})
}
}
Expand All @@ -110,3 +108,84 @@ fn lcc(missing: &HashSet<MissingEnvelope>) -> (Vec<Topic>, GlobalCursor) {
});
(topics, last_seen)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::extractors::test_utils::TestEnvelopeBuilder;
use crate::protocol::utils::test;
use prost::Message;
use xmtp_proto::api::mock::MockNetworkClient;
use xmtp_proto::types::TopicKind;
use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse;

#[xmtp_common::test]
async fn test_resolve_all_found_immediately() {
let mut client = MockNetworkClient::new();
let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]);

let missing = test::create_missing_set(topic.clone(), vec![(1, 10), (2, 20)]);

let envelope1 = TestEnvelopeBuilder::new()
.with_originator_node_id(1)
.with_originator_sequence_id(10)
.build();
let envelope2 = TestEnvelopeBuilder::new()
.with_originator_node_id(2)
.with_originator_sequence_id(20)
.build();

let response = QueryEnvelopesResponse {
envelopes: vec![envelope1, envelope2],
};

client.expect_request().returning(move |_, _, _| {
let bytes = response.clone().encode_to_vec();
Ok(http::Response::new(bytes.into()))
});

let resolver = network_backoff(&client);
test::test_resolve_all_found_immediately(&resolver, missing, 2).await;
}

#[xmtp_common::test]
async fn test_resolve_partial_resolution() {
let mut client = MockNetworkClient::new();
let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]);

let missing = test::create_missing_set(topic.clone(), vec![(1, 10), (2, 20)]);
let expected_unresolved = test::create_missing_set(topic.clone(), vec![(2, 20)]);

// Only return one of the two requested envelopes
let envelope1 = TestEnvelopeBuilder::new()
.with_originator_node_id(1)
.with_originator_sequence_id(10)
.build();

client.expect_request().returning(move |_, _, _| {
let response = QueryEnvelopesResponse {
envelopes: vec![envelope1.clone()],
};
let bytes = response.encode_to_vec();
Ok(http::Response::new(bytes.into()))
});

let resolver = NetworkBackoffResolver {
client: &client,
// Use a backoff with very short timeout for testing
backoff: ExponentialBackoff::builder()
.total_wait_max(std::time::Duration::from_millis(10))
.build(),
};

test::test_resolve_partial_resolution(&resolver, missing, 1, expected_unresolved).await;
}

#[xmtp_common::test]
async fn test_resolve_empty_missing_set() {
let client = MockNetworkClient::new();
let resolver = network_backoff(&client);

test::test_resolve_empty_missing_set(&resolver).await;
}
}
Loading
Loading