From 7630583f13f0a0ec181fb8845796b66080fe55af Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 12 Nov 2025 23:22:15 +0000 Subject: [PATCH 1/5] add envelope ordering strategy From 292dde5074ce263f87a4e5b89971b1a287064a1e Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Thu, 13 Nov 2025 16:40:14 +0000 Subject: [PATCH 2/5] network backoff resolution strategy From ae330f1c836167fcca79105296bd12a679d45842 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Fri, 14 Nov 2025 00:05:16 +0000 Subject: [PATCH 3/5] add ordered query combinator --- .../src/protocol/impls/vector_clock.rs | 2 +- .../src/protocol/resolve/network_backoff.rs | 23 +++--- .../protocol/traits/dependency_resolution.rs | 73 +++++++++++++------ xmtp_api_d14n/src/queries/combinators.rs | 24 ++++++ .../src/queries/combinators/ordered_query.rs | 58 +++++++++++++++ xmtp_api_d14n/src/queries/d14n/mls.rs | 12 ++- xmtp_api_d14n/src/queries/mod.rs | 2 + xmtp_proto/src/api_client/impls.rs | 28 +++++++ xmtp_proto/src/traits.rs | 29 ++++++++ xmtp_proto/src/traits/error.rs | 6 ++ 10 files changed, 222 insertions(+), 35 deletions(-) create mode 100644 xmtp_api_d14n/src/queries/combinators.rs create mode 100644 xmtp_api_d14n/src/queries/combinators/ordered_query.rs diff --git a/xmtp_api_d14n/src/protocol/impls/vector_clock.rs b/xmtp_api_d14n/src/protocol/impls/vector_clock.rs index a1548ef2cf..a66ecfd151 100644 --- a/xmtp_api_d14n/src/protocol/impls/vector_clock.rs +++ b/xmtp_api_d14n/src/protocol/impls/vector_clock.rs @@ -31,7 +31,7 @@ impl VectorClock for GlobalCursor { fn merge_least(&mut self, other: &Self) { for (&node, &seq) in other { - let entry = self.entry(node).or_insert(0); + let entry = self.entry(node).or_insert(seq); *entry = (*entry).min(seq); } } diff --git a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs index 5c3ac9b5c1..5b771b5887 100644 --- a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs +++ b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs @@ -7,10 +7,9 @@ use crate::{ types::MissingEnvelope, }, }; -use derive_builder::Builder; use itertools::Itertools; use tracing::warn; -use xmtp_common::{ExponentialBackoff, Strategy}; +use xmtp_common::{ExponentialBackoff, RetryableError, Strategy}; use xmtp_configuration::MAX_PAGE_SIZE; use xmtp_proto::{ api::{Client, Query}, @@ -19,22 +18,26 @@ use xmtp_proto::{ }; /// try resolve d14n dependencies based on a backoff strategy -#[derive(Clone, Debug, Builder)] -#[builder(setter(strip_option), build_fn(error = "ResolutionError"))] +#[derive(Clone, Debug)] pub struct NetworkBackoffResolver { client: ApiClient, backoff: ExponentialBackoff, } -impl NetworkBackoffResolver { - pub fn builder() -> NetworkBackoffResolverBuilder { - NetworkBackoffResolverBuilder::default() +pub fn network_backoff(client: &ApiClient) -> NetworkBackoffResolver<&ApiClient> { + NetworkBackoffResolver { + client, + backoff: ExponentialBackoff::default(), } } #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -impl ResolveDependencies for NetworkBackoffResolver { +impl ResolveDependencies for NetworkBackoffResolver +where + ApiClient: Client, + ::Error: RetryableError, +{ type ResolvedEnvelope = OriginatorEnvelope; /// Resolve dependencies, starting with a list of dependencies. Should try to resolve /// all dependents after `dependency`, if `Dependency` is missing as well. @@ -42,7 +45,7 @@ impl ResolveDependencies for NetworkBackoffResolver`: The list of envelopes which were resolved. async fn resolve( - &mut self, + &self, mut missing: HashSet, ) -> Result, ResolutionError> { let mut attempts = 0; @@ -72,7 +75,7 @@ impl ResolveDependencies for NetworkBackoffResolver), - #[error(transparent)] - Build(#[from] UninitializedFieldError), - #[error("Resolution failed to find all missing dependant envelopes")] - ResolutionFailed, -} - pub struct Resolved { pub envelopes: Vec, /// list of envelopes that could not be resolved with this strategy @@ -45,11 +31,26 @@ pub trait ResolveDependencies: MaybeSend + MaybeSync { /// # Returns /// * `Vec`: The list of envelopes which were resolved. async fn resolve( - &mut self, + &self, missing: HashSet, ) -> Result, ResolutionError>; } +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl ResolveDependencies for &T +where + T: ResolveDependencies, +{ + type ResolvedEnvelope = T::ResolvedEnvelope; + async fn resolve( + &self, + missing: HashSet, + ) -> Result, ResolutionError> { + ::resolve(*self, missing).await + } +} + /// A resolver that does not even attempt to try and get dependencies pub struct NoopResolver; @@ -57,13 +58,43 @@ pub struct NoopResolver; #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl ResolveDependencies for NoopResolver { type ResolvedEnvelope = (); - async fn resolve( - &mut self, - m: HashSet, - ) -> Result, ResolutionError> { + async fn resolve(&self, m: HashSet) -> Result, ResolutionError> { Ok(Resolved { envelopes: vec![], unresolved: Some(m), }) } } + +#[derive(thiserror::Error, Debug)] +pub enum ResolutionError { + #[error(transparent)] + Envelope(#[from] EnvelopeError), + #[error(transparent)] + Body(#[from] BodyError), + #[error("{0}")] + Api(Box), + #[error(transparent)] + Build(#[from] UninitializedFieldError), + #[error("Resolution failed to find all missing dependant envelopes")] + ResolutionFailed, +} + +impl RetryableError for ResolutionError { + fn is_retryable(&self) -> bool { + use ResolutionError::*; + match self { + Envelope(e) => e.is_retryable(), + Body(b) => b.is_retryable(), + Api(a) => a.is_retryable(), + Build(_) => false, + ResolutionFailed => false, + } + } +} + +impl ResolutionError { + pub fn api(e: E) -> Self { + ResolutionError::Api(Box::new(e)) + } +} diff --git a/xmtp_api_d14n/src/queries/combinators.rs b/xmtp_api_d14n/src/queries/combinators.rs new file mode 100644 index 0000000000..a4137b804a --- /dev/null +++ b/xmtp_api_d14n/src/queries/combinators.rs @@ -0,0 +1,24 @@ +//! D14n-specific api combinators + +use xmtp_proto::{api::Endpoint, api_client::Paged, types::TopicCursor}; + +use crate::protocol::ResolveDependencies; + +mod ordered_query; + +pub trait D14nCombinatorExt: Endpoint { + fn ordered( + self, + resolver: R, + topic_cursor: TopicCursor, + ) -> ordered_query::OrderedQuery>::Output> + where + Self: Sized + Endpoint, + >::Output: Paged, + R: ResolveDependencies, + { + ordered_query::ordered(self, resolver, topic_cursor) + } +} + +impl D14nCombinatorExt for E where E: Endpoint {} diff --git a/xmtp_api_d14n/src/queries/combinators/ordered_query.rs b/xmtp_api_d14n/src/queries/combinators/ordered_query.rs new file mode 100644 index 0000000000..188378c8c9 --- /dev/null +++ b/xmtp_api_d14n/src/queries/combinators/ordered_query.rs @@ -0,0 +1,58 @@ +use std::marker::PhantomData; + +use xmtp_common::RetryableError; +use xmtp_proto::{ + api::{ApiClientError, Client, Query}, + api_client::Paged, + types::TopicCursor, +}; + +use crate::protocol::{Ordered, OrderedEnvelopeCollection, ProtocolEnvelope, ResolveDependencies}; + +pub struct OrderedQuery { + endpoint: E, + resolver: R, + topic_cursor: TopicCursor, + _marker: PhantomData, +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl Query for OrderedQuery +where + E: Query, + C: Client, + C::Error: RetryableError, + R: ResolveDependencies::Message> + Clone, + T: Default + prost::Message + Paged + 'static, + for<'a> T::Message: ProtocolEnvelope<'a> + Clone, +{ + type Output = Vec; + async fn query(&mut self, client: &C) -> Result> { + let envelopes = Query::::query(&mut self.endpoint, client) + .await? + .messages(); + let mut ordering = Ordered::builder() + .envelopes(envelopes) + .resolver(&self.resolver) + // todo: maybe no clone here? + .topic_cursor(self.topic_cursor.clone()) + .build()?; + ordering.order().await.map_err(ApiClientError::other)?; + let (envelopes, _) = ordering.into_parts(); + Ok(envelopes) + } +} + +pub fn ordered( + endpoint: E, + resolver: R, + topic_cursor: TopicCursor, +) -> OrderedQuery { + OrderedQuery:: { + endpoint, + resolver, + topic_cursor, + _marker: PhantomData, + } +} diff --git a/xmtp_api_d14n/src/queries/d14n/mls.rs b/xmtp_api_d14n/src/queries/d14n/mls.rs index 446fd8c52f..1bba98e1e7 100644 --- a/xmtp_api_d14n/src/queries/d14n/mls.rs +++ b/xmtp_api_d14n/src/queries/d14n/mls.rs @@ -11,9 +11,11 @@ use crate::protocol::MessageMetadataExtractor; use crate::protocol::ProtocolEnvelope; use crate::protocol::SequencedExtractor; use crate::protocol::WelcomeMessageExtractor; +use crate::protocol::resolve; use crate::protocol::traits::Envelope; use crate::protocol::traits::EnvelopeCollection; use crate::protocol::traits::Extractor; +use crate::queries::D14nCombinatorExt; use xmtp_common::RetryableError; use xmtp_configuration::MAX_PAGE_SIZE; use xmtp_proto::api; @@ -25,11 +27,11 @@ use xmtp_proto::mls_v1::BatchQueryCommitLogResponse; use xmtp_proto::types::GroupId; use xmtp_proto::types::GroupMessageMetadata; use xmtp_proto::types::InstallationId; +use xmtp_proto::types::TopicCursor; use xmtp_proto::types::TopicKind; use xmtp_proto::types::WelcomeMessage; use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope; use xmtp_proto::xmtp::xmtpv4::message_api::GetNewestEnvelopeResponse; -use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse; #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] @@ -122,16 +124,20 @@ where ) -> Result, Self::Error> { let topic = TopicKind::GroupMessagesV1.create(&group_id); let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?; - let response: QueryEnvelopesResponse = QueryEnvelope::builder() + let mut topic_cursor = TopicCursor::default(); + topic_cursor.insert(topic.clone(), lcc.clone()); + let resolver = resolve::network_backoff(&self.client); + let response = QueryEnvelope::builder() .topic(topic) .last_seen(lcc) .limit(MAX_PAGE_SIZE) .build()? + .ordered(resolver, topic_cursor) .query(&self.client) .await?; let messages = SequencedExtractor::builder() - .envelopes(response.envelopes) + .envelopes(response) .build::() .get()?; Ok(messages diff --git a/xmtp_api_d14n/src/queries/mod.rs b/xmtp_api_d14n/src/queries/mod.rs index 60cc93a984..c7758a12dd 100644 --- a/xmtp_api_d14n/src/queries/mod.rs +++ b/xmtp_api_d14n/src/queries/mod.rs @@ -2,6 +2,7 @@ mod api_stats; mod boxed_streams; mod builder; mod client_bundle; +mod combinators; mod combined; mod d14n; mod stream; @@ -11,6 +12,7 @@ pub use api_stats::*; pub use boxed_streams::*; pub use builder::*; pub use client_bundle::*; +pub use combinators::*; pub use combined::*; pub use d14n::*; pub use stream::*; diff --git a/xmtp_proto/src/api_client/impls.rs b/xmtp_proto/src/api_client/impls.rs index d11dbe56f0..7c3bfecee4 100644 --- a/xmtp_proto/src/api_client/impls.rs +++ b/xmtp_proto/src/api_client/impls.rs @@ -1,6 +1,10 @@ use crate::{ mls_v1::QueryGroupMessagesResponse, types::{GroupId, GroupMessageMetadata, WelcomeMessage}, + xmtp::xmtpv4::{ + envelopes::OriginatorEnvelope, + message_api::{QueryEnvelopesResponse, SubscribeEnvelopesResponse}, + }, }; use super::*; @@ -27,6 +31,30 @@ impl Paged for QueryWelcomeMessagesResponse { } } +impl Paged for QueryEnvelopesResponse { + type Message = OriginatorEnvelope; + + fn info(&self) -> &Option { + &None + } + + fn messages(self) -> Vec { + self.envelopes + } +} + +impl Paged for SubscribeEnvelopesResponse { + type Message = OriginatorEnvelope; + + fn info(&self) -> &Option { + &None + } + + fn messages(self) -> Vec { + self.envelopes + } +} + impl std::fmt::Debug for AggregateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "============ Api Stats ============")?; diff --git a/xmtp_proto/src/traits.rs b/xmtp_proto/src/traits.rs index 192c66060b..17016b10b7 100644 --- a/xmtp_proto/src/traits.rs +++ b/xmtp_proto/src/traits.rs @@ -106,6 +106,35 @@ pub trait Client: MaybeSend + MaybeSync { ) -> Result, ApiClientError>; } +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl Client for &T +where + T: Client, +{ + type Error = T::Error; + + type Stream = T::Stream; + + async fn request( + &self, + request: request::Builder, + path: PathAndQuery, + body: Bytes, + ) -> Result, ApiClientError> { + (**self).request(request, path, body).await + } + + async fn stream( + &self, + request: request::Builder, + path: http::uri::PathAndQuery, + body: Bytes, + ) -> Result, ApiClientError> { + (**self).stream(request, path, body).await + } +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl Client for Box diff --git a/xmtp_proto/src/traits/error.rs b/xmtp_proto/src/traits/error.rs index 426dd3c2e8..26375c3e69 100644 --- a/xmtp_proto/src/traits/error.rs +++ b/xmtp_proto/src/traits/error.rs @@ -71,6 +71,12 @@ where } } +impl ApiClientError { + pub fn other(e: R) -> Self { + ApiClientError::Other(Box::new(e)) + } +} + impl RetryableError for ApiClientError where E: RetryableError + std::error::Error + 'static, From cfc4edce6ebcc0bea672dcfdb280a1c80e78074f Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 17 Nov 2025 17:38:04 +0000 Subject: [PATCH 4/5] move envelope dependency graph generation into utils submodule --- xmtp_api_d14n/src/protocol/mod.rs | 2 + xmtp_api_d14n/src/protocol/order.rs | 2 +- xmtp_api_d14n/src/protocol/sort/causal.rs | 184 +----------------- xmtp_api_d14n/src/protocol/utils/mod.rs | 3 + xmtp_api_d14n/src/protocol/utils/test.rs | 5 + .../src/protocol/utils/test/props.rs | 103 ++++++++++ .../src/protocol/utils/test/test_envelope.rs | 77 ++++++++ 7 files changed, 196 insertions(+), 180 deletions(-) create mode 100644 xmtp_api_d14n/src/protocol/utils/mod.rs create mode 100644 xmtp_api_d14n/src/protocol/utils/test.rs create mode 100644 xmtp_api_d14n/src/protocol/utils/test/props.rs create mode 100644 xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs diff --git a/xmtp_api_d14n/src/protocol/mod.rs b/xmtp_api_d14n/src/protocol/mod.rs index d89204efc7..0491070f99 100644 --- a/xmtp_api_d14n/src/protocol/mod.rs +++ b/xmtp_api_d14n/src/protocol/mod.rs @@ -19,3 +19,5 @@ pub mod types; mod order; pub use order::*; + +mod utils; diff --git a/xmtp_api_d14n/src/protocol/order.rs b/xmtp_api_d14n/src/protocol/order.rs index 9723109aa1..d89280f655 100644 --- a/xmtp_api_d14n/src/protocol/order.rs +++ b/xmtp_api_d14n/src/protocol/order.rs @@ -76,7 +76,7 @@ where .collect::>(); // if the resolver fails to resolve some envelopes, ignore them. // delete unresolved envelopes from missing envelopes list. - // cannot use retain directly b/c curosr returns Result<>. + // cannot use retain directly b/c cursor returns Result<>. // see https://github.com/xmtp/libxmtp/issues/2691 // TODO:2691 let mut to_remove = HashSet::new(); diff --git a/xmtp_api_d14n/src/protocol/sort/causal.rs b/xmtp_api_d14n/src/protocol/sort/causal.rs index 652f08abab..dccb46112d 100644 --- a/xmtp_api_d14n/src/protocol/sort/causal.rs +++ b/xmtp_api_d14n/src/protocol/sort/causal.rs @@ -1,3 +1,4 @@ +use crate::protocol::sort; use xmtp_proto::types::TopicCursor; use crate::protocol::{ApplyCursor, Envelope, EnvelopeError, Sort, VectorClock}; @@ -62,188 +63,13 @@ pub fn causal<'b, 'a: 'b, E: Envelope<'a>>( #[cfg(test)] mod tests { - use crate::protocol::sort; - use chrono::Utc; - use itertools::Itertools; + use crate::protocol::utils::test::{ + EnvelopesWithMissing, TestEnvelope, depends_on_one, missing_dependencies, + sorted_dependencies, + }; use proptest::prelude::*; - use proptest::sample::subsequence; - use std::sync::LazyLock; - use xmtp_proto::types::{Cursor, GlobalCursor, OriginatorId, SequenceId, Topic, TopicKind}; - use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope; - use xmtp_proto::xmtp::xmtpv4::envelopes::client_envelope::Payload; use super::*; - - static TOPIC: LazyLock = - LazyLock::new(|| Topic::new(TopicKind::GroupMessagesV1, vec![0, 1, 2])); - - #[derive(Clone, Debug, PartialEq)] - struct TestEnvelope { - cursor: Cursor, - depends_on: GlobalCursor, - } - - impl TestEnvelope { - fn has_dependency_on(&self, other: &TestEnvelope) -> bool { - let originator = other.cursor.originator_id; - let depends_on_sid = self.depends_on.get(&originator); - depends_on_sid == other.cursor.sequence_id - } - } - - impl std::fmt::Display for TestEnvelope { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "cursor {} depends on {}", &self.cursor, &self.depends_on) - } - } - - impl Envelope<'_> for TestEnvelope { - fn topic(&self) -> Result { - Ok(TOPIC.clone()) - } - - fn payload(&self) -> Result { - unreachable!() - } - - fn timestamp(&self) -> Option> { - unreachable!() - } - - fn client_envelope(&self) -> Result { - unreachable!() - } - - fn group_message( - &self, - ) -> Result, crate::protocol::EnvelopeError> - { - unreachable!() - } - - fn welcome_message( - &self, - ) -> Result, crate::protocol::EnvelopeError> - { - unreachable!() - } - - fn consume(&self, _extractor: E) -> Result - where - Self: Sized, - for<'a> crate::protocol::EnvelopeError: - From<>::Error>, - for<'a> E: crate::protocol::EnvelopeVisitor<'a> + crate::protocol::Extractor, - { - unreachable!() - } - - fn cursor(&self) -> Result { - Ok(self.cursor) - } - - fn depends_on(&self) -> Result, EnvelopeError> { - Ok(Some(self.depends_on.clone())) - } - } - - // Advance the clock for a given originator - fn advance_clock(base: &GlobalCursor, originator: &OriginatorId) -> SequenceId { - base.get(originator) + 1 - } - - /// always creates a _sorted_ list of dependencies - fn sorted_dependencies( - length: usize, - originators: Vec, - ) -> impl Strategy> { - let init = Just(Vec::::new()).boxed(); - - (0..length).fold(init, move |acc_strategy, _| { - let originators = originators.clone(); - - acc_strategy - .prop_flat_map(move |envelopes| { - let originators = originators.clone(); - let envelopes_len = envelopes.len(); - - // Pick an originator for this envelope - prop::sample::select(originators).prop_flat_map(move |originator| { - let envelopes_clone = envelopes.clone(); - - // Choose 0 or more previous envelopes to depend on - prop::sample::subsequence(envelopes_clone.clone(), 0..=envelopes_len) - .prop_map(move |envelopes_subset| { - let mut envelopes = envelopes_clone.clone(); - let total_clock: GlobalCursor = envelopes_clone - .iter() - .map(|e| (e.cursor.originator_id, e.cursor.sequence_id)) - .into_grouping_map() - .max() - .into(); - let mut base = GlobalCursor::default(); - envelopes_clone - .iter() - .filter(|e| e.cursor.originator_id == originator) - .map(|e| e.cursor) - .for_each(|c| base.apply(&c)); - - let new_clock = if envelopes_subset.is_empty() { - // must inherit dependencies of earlier sequence ids from - // same originator id - base - } else { - for cursor in envelopes_subset.iter().map(|e| &e.cursor) { - base.apply(cursor); - } - base - }; - - // Advance clock for this originator - let sequence_id = advance_clock(&total_clock, &originator); - - envelopes.push(TestEnvelope { - cursor: Cursor { - originator_id: originator, - sequence_id, - }, - depends_on: new_clock, - }); - envelopes - }) - }) - }) - .boxed() - }) - } - - #[derive(Debug, Clone)] - struct EnvelopesWithMissing { - removed: Vec, - envelopes: Vec, - } - // higher order composition - // creates dependencies then randomly removes some - prop_compose! { - fn missing_dependencies(length: usize, originators: Vec)(envelopes_o in sorted_dependencies(length, originators))(remove in subsequence(envelopes_o.clone(), 0..=envelopes_o.len()), mut envelopes in Just(envelopes_o)) -> EnvelopesWithMissing { - envelopes.retain(|e| !remove.contains(e)); - EnvelopesWithMissing { - removed: remove, - envelopes, - - } - } - } - - fn depends_on_one(missing: &TestEnvelope, removed: &[TestEnvelope]) -> bool { - for envelope in removed { - if missing.has_dependency_on(envelope) { - return true; - } - } - false - } - fn assert_sorted(sorted: &[TestEnvelope], missing: &[TestEnvelope], removed: &[TestEnvelope]) { let mut missing_and_removed = removed.to_vec(); missing_and_removed.extend(missing.to_vec().iter().cloned()); diff --git a/xmtp_api_d14n/src/protocol/utils/mod.rs b/xmtp_api_d14n/src/protocol/utils/mod.rs new file mode 100644 index 0000000000..60a913e5d0 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/utils/mod.rs @@ -0,0 +1,3 @@ +xmtp_common::if_test! { + pub mod test; +} diff --git a/xmtp_api_d14n/src/protocol/utils/test.rs b/xmtp_api_d14n/src/protocol/utils/test.rs new file mode 100644 index 0000000000..e17ff48247 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/utils/test.rs @@ -0,0 +1,5 @@ +mod test_envelope; +pub use test_envelope::*; + +mod props; +pub use props::*; diff --git a/xmtp_api_d14n/src/protocol/utils/test/props.rs b/xmtp_api_d14n/src/protocol/utils/test/props.rs new file mode 100644 index 0000000000..40b9b2e145 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/utils/test/props.rs @@ -0,0 +1,103 @@ +use super::TestEnvelope; +use itertools::Itertools; +use proptest::prelude::*; +use proptest::sample::subsequence; +use xmtp_proto::types::{Cursor, GlobalCursor, OriginatorId, SequenceId}; + +// Advance the clock for a given originator +fn advance_clock(base: &GlobalCursor, originator: &OriginatorId) -> SequenceId { + base.get(originator) + 1 +} + +/// always creates a _sorted_ list of dependencies +pub fn sorted_dependencies( + length: usize, + originators: Vec, +) -> impl Strategy> { + let init = Just(Vec::::new()).boxed(); + + (0..length).fold(init, move |acc_strategy, _| { + let originators = originators.clone(); + + acc_strategy + .prop_flat_map(move |envelopes| { + let originators = originators.clone(); + let envelopes_len = envelopes.len(); + + // Pick an originator for this envelope + prop::sample::select(originators).prop_flat_map(move |originator| { + let envelopes_clone = envelopes.clone(); + + // Choose 0 or more previous envelopes to depend on + prop::sample::subsequence(envelopes_clone.clone(), 0..=envelopes_len).prop_map( + move |envelopes_subset| { + let mut envelopes = envelopes_clone.clone(); + let total_clock: GlobalCursor = envelopes_clone + .iter() + .map(|e| (e.cursor.originator_id, e.cursor.sequence_id)) + .into_grouping_map() + .max() + .into(); + let mut base = GlobalCursor::default(); + envelopes_clone + .iter() + .filter(|e| e.cursor.originator_id == originator) + .map(|e| e.cursor) + .for_each(|c| base.apply(&c)); + + let new_clock = if envelopes_subset.is_empty() { + // must inherit dependencies of earlier sequence ids from + // same originator id + base + } else { + for cursor in envelopes_subset.iter().map(|e| &e.cursor) { + base.apply(cursor); + } + base + }; + + // Advance clock for this originator + let sequence_id = advance_clock(&total_clock, &originator); + + envelopes.push(TestEnvelope { + cursor: Cursor { + originator_id: originator, + sequence_id, + }, + depends_on: new_clock, + }); + envelopes + }, + ) + }) + }) + .boxed() + }) +} + +#[derive(Debug, Clone)] +pub struct EnvelopesWithMissing { + pub removed: Vec, + pub envelopes: Vec, +} + +prop_compose! { + pub fn missing_dependencies(length: usize, originators: Vec)(envelopes_o in sorted_dependencies(length, originators))(remove in subsequence(envelopes_o.clone(), 0..=envelopes_o.len()), mut envelopes in Just(envelopes_o)) -> EnvelopesWithMissing { + envelopes.retain(|e| !remove.contains(e)); + EnvelopesWithMissing { + removed: remove, + envelopes, + + } + } +} + +/// check if `missing` depends on any envelope in `removed` +pub fn depends_on_one(missing: &TestEnvelope, removed: &[TestEnvelope]) -> bool { + for envelope in removed { + if missing.has_dependency_on(envelope) { + return true; + } + } + false +} diff --git a/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs b/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs new file mode 100644 index 0000000000..f7cc503a50 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs @@ -0,0 +1,77 @@ +use crate::protocol::{Envelope, EnvelopeError}; +use chrono::Utc; +use std::sync::LazyLock; +use xmtp_proto::types::{Cursor, GlobalCursor, Topic, TopicKind}; +use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope; +use xmtp_proto::xmtp::xmtpv4::envelopes::client_envelope::Payload; + +static TOPIC: LazyLock = + LazyLock::new(|| Topic::new(TopicKind::GroupMessagesV1, vec![0, 1, 2])); + +#[derive(Clone, Debug, PartialEq)] +pub struct TestEnvelope { + pub cursor: Cursor, + pub depends_on: GlobalCursor, +} + +impl TestEnvelope { + pub fn has_dependency_on(&self, other: &TestEnvelope) -> bool { + let originator = other.cursor.originator_id; + let depends_on_sid = self.depends_on.get(&originator); + depends_on_sid == other.cursor.sequence_id + } +} + +impl std::fmt::Display for TestEnvelope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "cursor {} depends on {}", &self.cursor, &self.depends_on) + } +} + +impl Envelope<'_> for TestEnvelope { + fn topic(&self) -> Result { + Ok(TOPIC.clone()) + } + + fn payload(&self) -> Result { + unreachable!() + } + + fn timestamp(&self) -> Option> { + unreachable!() + } + + fn client_envelope(&self) -> Result { + unreachable!() + } + + fn group_message( + &self, + ) -> Result, crate::protocol::EnvelopeError> { + unreachable!() + } + + fn welcome_message( + &self, + ) -> Result, crate::protocol::EnvelopeError> { + unreachable!() + } + + fn consume(&self, _extractor: E) -> Result + where + Self: Sized, + for<'a> crate::protocol::EnvelopeError: + From<>::Error>, + for<'a> E: crate::protocol::EnvelopeVisitor<'a> + crate::protocol::Extractor, + { + unreachable!() + } + + fn cursor(&self) -> Result { + Ok(self.cursor) + } + + fn depends_on(&self) -> Result, EnvelopeError> { + Ok(Some(self.depends_on.clone())) + } +} From b293d24e24731ca3ec52e74490b09097d4d34264 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 17 Nov 2025 18:39:12 +0000 Subject: [PATCH 5/5] dependency resolution tests --- common/src/macros.rs | 10 +- common/src/retry.rs | 3 +- .../src/protocol/resolve/network_backoff.rs | 94 ++++++++++- xmtp_api_d14n/src/protocol/sort/causal.rs | 2 +- xmtp_api_d14n/src/protocol/utils/mod.rs | 2 +- xmtp_api_d14n/src/protocol/utils/test.rs | 3 + .../utils/test/dependency_resolution_test.rs | 158 ++++++++++++++++++ 7 files changed, 259 insertions(+), 13 deletions(-) create mode 100644 xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs diff --git a/common/src/macros.rs b/common/src/macros.rs index 65565a1d6b..8004b0a8a6 100644 --- a/common/src/macros.rs +++ b/common/src/macros.rs @@ -76,7 +76,6 @@ macro_rules! if_local { )*} } -/// Convenience macro to easily export items for d14n #[macro_export] macro_rules! if_test { ($($item:item)*) => {$( @@ -84,3 +83,12 @@ macro_rules! if_test { $item )*} } + +// cfg only test but not any extra test-utils features +#[macro_export] +macro_rules! if_only_test { + ($($item:item)*) => {$( + #[cfg(test)] + $item + )*} +} diff --git a/common/src/retry.rs b/common/src/retry.rs index 7314894ba9..0ed448e196 100644 --- a/common/src/retry.rs +++ b/common/src/retry.rs @@ -191,7 +191,8 @@ impl ExponentialBackoffBuilder { duration: self.duration.unwrap_or(Duration::from_millis(25)), max_jitter: self.max_jitter.unwrap_or(Duration::from_millis(25)), multiplier: self.multiplier.unwrap_or(3), - ..Default::default() + total_wait_max: self.total_wait_max.unwrap_or_default(), + individual_wait_max: Default::default(), } } } diff --git a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs index 5b771b5887..bca90ee4ae 100644 --- a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs +++ b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs @@ -53,19 +53,14 @@ where let mut resolved = Vec::new(); while !missing.is_empty() { if let Some(wait_for) = self.backoff.backoff(attempts, time_spent) { + tracing::info!("waiting for {:?}", wait_for); xmtp_common::time::sleep(wait_for).await; attempts += 1; } else { missing.iter().for_each(|m| { - warn!( - "dropping missing dependency {} due to lack of resolution", - m - ); - }); - return Ok(Resolved { - envelopes: resolved, - unresolved: Some(missing), + warn!("dropping dependency {}, could not resolve", m); }); + break; } let (topics, lcc) = lcc(&missing); let envelopes = QueryEnvelope::builder() @@ -86,7 +81,7 @@ where } Ok(Resolved { envelopes: resolved, - unresolved: None, + unresolved: (!missing.is_empty()).then_some(missing), }) } } @@ -113,3 +108,84 @@ fn lcc(missing: &HashSet) -> (Vec, GlobalCursor) { }); (topics, last_seen) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::extractors::test_utils::TestEnvelopeBuilder; + use crate::protocol::utils::test; + use prost::Message; + use xmtp_proto::api::mock::MockNetworkClient; + use xmtp_proto::types::TopicKind; + use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse; + + #[xmtp_common::test] + async fn test_resolve_all_found_immediately() { + let mut client = MockNetworkClient::new(); + let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]); + + let missing = test::create_missing_set(topic.clone(), vec![(1, 10), (2, 20)]); + + let envelope1 = TestEnvelopeBuilder::new() + .with_originator_node_id(1) + .with_originator_sequence_id(10) + .build(); + let envelope2 = TestEnvelopeBuilder::new() + .with_originator_node_id(2) + .with_originator_sequence_id(20) + .build(); + + let response = QueryEnvelopesResponse { + envelopes: vec![envelope1, envelope2], + }; + + client.expect_request().returning(move |_, _, _| { + let bytes = response.clone().encode_to_vec(); + Ok(http::Response::new(bytes.into())) + }); + + let resolver = network_backoff(&client); + test::test_resolve_all_found_immediately(&resolver, missing, 2).await; + } + + #[xmtp_common::test] + async fn test_resolve_partial_resolution() { + let mut client = MockNetworkClient::new(); + let topic = Topic::new(TopicKind::GroupMessagesV1, vec![1, 2, 3]); + + let missing = test::create_missing_set(topic.clone(), vec![(1, 10), (2, 20)]); + let expected_unresolved = test::create_missing_set(topic.clone(), vec![(2, 20)]); + + // Only return one of the two requested envelopes + let envelope1 = TestEnvelopeBuilder::new() + .with_originator_node_id(1) + .with_originator_sequence_id(10) + .build(); + + client.expect_request().returning(move |_, _, _| { + let response = QueryEnvelopesResponse { + envelopes: vec![envelope1.clone()], + }; + let bytes = response.encode_to_vec(); + Ok(http::Response::new(bytes.into())) + }); + + let resolver = NetworkBackoffResolver { + client: &client, + // Use a backoff with very short timeout for testing + backoff: ExponentialBackoff::builder() + .total_wait_max(std::time::Duration::from_millis(10)) + .build(), + }; + + test::test_resolve_partial_resolution(&resolver, missing, 1, expected_unresolved).await; + } + + #[xmtp_common::test] + async fn test_resolve_empty_missing_set() { + let client = MockNetworkClient::new(); + let resolver = network_backoff(&client); + + test::test_resolve_empty_missing_set(&resolver).await; + } +} diff --git a/xmtp_api_d14n/src/protocol/sort/causal.rs b/xmtp_api_d14n/src/protocol/sort/causal.rs index dccb46112d..43c1b87cc5 100644 --- a/xmtp_api_d14n/src/protocol/sort/causal.rs +++ b/xmtp_api_d14n/src/protocol/sort/causal.rs @@ -1,4 +1,3 @@ -use crate::protocol::sort; use xmtp_proto::types::TopicCursor; use crate::protocol::{ApplyCursor, Envelope, EnvelopeError, Sort, VectorClock}; @@ -63,6 +62,7 @@ pub fn causal<'b, 'a: 'b, E: Envelope<'a>>( #[cfg(test)] mod tests { + use crate::protocol::sort; use crate::protocol::utils::test::{ EnvelopesWithMissing, TestEnvelope, depends_on_one, missing_dependencies, sorted_dependencies, diff --git a/xmtp_api_d14n/src/protocol/utils/mod.rs b/xmtp_api_d14n/src/protocol/utils/mod.rs index 60a913e5d0..17818716c1 100644 --- a/xmtp_api_d14n/src/protocol/utils/mod.rs +++ b/xmtp_api_d14n/src/protocol/utils/mod.rs @@ -1,3 +1,3 @@ -xmtp_common::if_test! { +xmtp_common::if_only_test! { pub mod test; } diff --git a/xmtp_api_d14n/src/protocol/utils/test.rs b/xmtp_api_d14n/src/protocol/utils/test.rs index e17ff48247..c4f11d9bd3 100644 --- a/xmtp_api_d14n/src/protocol/utils/test.rs +++ b/xmtp_api_d14n/src/protocol/utils/test.rs @@ -3,3 +3,6 @@ pub use test_envelope::*; mod props; pub use props::*; + +mod dependency_resolution_test; +pub use dependency_resolution_test::*; diff --git a/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs b/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs new file mode 100644 index 0000000000..91caa2e4ef --- /dev/null +++ b/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs @@ -0,0 +1,158 @@ +use std::collections::HashSet; + +use crate::protocol::{ResolveDependencies, types::MissingEnvelope}; +use xmtp_proto::types::{Cursor, Topic}; + +/// Test that all missing envelopes are found immediately on the first resolution attempt. +/// +/// This function verifies that when all requested envelopes are available, +/// the resolver returns them all in a single resolution pass with no unresolved items. +/// +/// # Type Parameters +/// * `R` - A resolver that implements `ResolveDependencies` +/// +/// # Arguments +/// * `resolver` - The dependency resolver to test +/// * `missing` - Set of missing envelopes to resolve +/// * `expected_count` - Number of envelopes expected to be resolved +/// +/// # Panics +/// * If resolution fails +/// * If the number of resolved envelopes doesn't match `expected_count` +/// * If there are any unresolved envelopes remaining +pub async fn test_resolve_all_found_immediately( + resolver: &R, + missing: HashSet, + expected_count: usize, +) where + R: ResolveDependencies, +{ + let result = resolver.resolve(missing).await; + + assert!(result.is_ok(), "Resolution should succeed"); + let resolved = result.unwrap(); + assert_eq!( + resolved.envelopes.len(), + expected_count, + "Should resolve exactly {} envelopes", + expected_count + ); + assert!( + resolved.unresolved.is_none() || resolved.unresolved.as_ref().unwrap().is_empty(), + "Should have no unresolved envelopes" + ); +} + +/// Test that the resolver handles partial resolution correctly. +/// +/// This function verifies that when only some of the requested envelopes are available, +/// the resolver returns what it can find and correctly reports the unresolved items. +/// +/// # Type Parameters +/// * `R` - A resolver that implements `ResolveDependencies` +/// +/// # Arguments +/// * `resolver` - The dependency resolver to test +/// * `missing` - Set of missing envelopes to resolve +/// * `expected_resolved_count` - Number of envelopes expected to be resolved +/// * `expected_unresolved` - Expected set of unresolved missing envelopes +/// +/// # Panics +/// * If resolution fails +/// * If the number of resolved envelopes doesn't match `expected_resolved_count` +/// * If there are no unresolved envelopes when some are expected +/// * If the unresolved set doesn't match `expected_unresolved` +pub async fn test_resolve_partial_resolution( + resolver: &R, + missing: HashSet, + expected_resolved_count: usize, + expected_unresolved: HashSet, +) where + R: ResolveDependencies, +{ + let result = resolver.resolve(missing).await; + + assert!(result.is_ok(), "Resolution should succeed"); + let resolved = result.unwrap(); + + assert_eq!( + resolved.envelopes.len(), + expected_resolved_count, + "Should resolve exactly {} envelopes", + expected_resolved_count + ); + + assert!( + resolved.unresolved.is_some(), + "Should have unresolved envelopes" + ); + + let unresolved = resolved.unresolved.unwrap(); + assert_eq!( + unresolved.len(), + expected_unresolved.len(), + "Unresolved count should match" + ); + + for expected in &expected_unresolved { + assert!( + unresolved.contains(expected), + "Should contain unresolved envelope {:?}", + expected + ); + } +} + +/// Test that the resolver handles an empty missing set correctly. +/// +/// This function verifies that when no envelopes need to be resolved, +/// the resolver returns an empty result without errors. +/// +/// # Type Parameters +/// * `R` - A resolver that implements `ResolveDependencies` +/// +/// # Arguments +/// * `resolver` - The dependency resolver to test +/// +/// # Panics +/// * If resolution fails +/// * If any envelopes are returned +/// * If there are any unresolved envelopes +pub async fn test_resolve_empty_missing_set(resolver: &R) +where + R: ResolveDependencies, +{ + let missing = HashSet::new(); + let result = resolver.resolve(missing).await; + + assert!(result.is_ok(), "Resolution should succeed with empty set"); + let resolved = result.unwrap(); + assert!(resolved.envelopes.is_empty(), "Should resolve no envelopes"); + assert!( + resolved.unresolved.is_none() || resolved.unresolved.as_ref().unwrap().is_empty(), + "Should have no unresolved envelopes" + ); +} + +/// Helper function to create a set of missing envelopes for testing. +/// +/// # Arguments +/// * `topic` - The topic for the envelopes +/// * `cursors` - List of (originator_id, sequence_id) pairs +/// +/// # Returns +/// A `HashSet` of `MissingEnvelope` instances +pub fn create_missing_set(topic: Topic, cursors: Vec<(u32, u64)>) -> HashSet { + cursors + .into_iter() + .map(|(originator_id, sequence_id)| { + MissingEnvelope::new( + topic.clone(), + Cursor { + originator_id, + sequence_id, + }, + ) + }) + .collect() +}