diff --git a/xmtp_api_d14n/src/protocol/impls/vector_clock.rs b/xmtp_api_d14n/src/protocol/impls/vector_clock.rs index a1548ef2cf..a66ecfd151 100644 --- a/xmtp_api_d14n/src/protocol/impls/vector_clock.rs +++ b/xmtp_api_d14n/src/protocol/impls/vector_clock.rs @@ -31,7 +31,7 @@ impl VectorClock for GlobalCursor { fn merge_least(&mut self, other: &Self) { for (&node, &seq) in other { - let entry = self.entry(node).or_insert(0); + let entry = self.entry(node).or_insert(seq); *entry = (*entry).min(seq); } } diff --git a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs index 5c3ac9b5c1..5b771b5887 100644 --- a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs +++ b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs @@ -7,10 +7,9 @@ use crate::{ types::MissingEnvelope, }, }; -use derive_builder::Builder; use itertools::Itertools; use tracing::warn; -use xmtp_common::{ExponentialBackoff, Strategy}; +use xmtp_common::{ExponentialBackoff, RetryableError, Strategy}; use xmtp_configuration::MAX_PAGE_SIZE; use xmtp_proto::{ api::{Client, Query}, @@ -19,22 +18,26 @@ use xmtp_proto::{ }; /// try resolve d14n dependencies based on a backoff strategy -#[derive(Clone, Debug, Builder)] -#[builder(setter(strip_option), build_fn(error = "ResolutionError"))] +#[derive(Clone, Debug)] pub struct NetworkBackoffResolver { client: ApiClient, backoff: ExponentialBackoff, } -impl NetworkBackoffResolver { - pub fn builder() -> NetworkBackoffResolverBuilder { - NetworkBackoffResolverBuilder::default() +pub fn network_backoff(client: &ApiClient) -> NetworkBackoffResolver<&ApiClient> { + NetworkBackoffResolver { + client, + backoff: ExponentialBackoff::default(), } } #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] -impl ResolveDependencies for NetworkBackoffResolver { +impl ResolveDependencies for NetworkBackoffResolver +where + ApiClient: Client, + ::Error: RetryableError, +{ type ResolvedEnvelope = OriginatorEnvelope; /// Resolve dependencies, starting with a list of dependencies. Should try to resolve /// all dependents after `dependency`, if `Dependency` is missing as well. @@ -42,7 +45,7 @@ impl ResolveDependencies for NetworkBackoffResolver`: The list of envelopes which were resolved. async fn resolve( - &mut self, + &self, mut missing: HashSet, ) -> Result, ResolutionError> { let mut attempts = 0; @@ -72,7 +75,7 @@ impl ResolveDependencies for NetworkBackoffResolver), - #[error(transparent)] - Build(#[from] UninitializedFieldError), - #[error("Resolution failed to find all missing dependant envelopes")] - ResolutionFailed, -} - pub struct Resolved { pub envelopes: Vec, /// list of envelopes that could not be resolved with this strategy @@ -45,11 +31,26 @@ pub trait ResolveDependencies: MaybeSend + MaybeSync { /// # Returns /// * `Vec`: The list of envelopes which were resolved. async fn resolve( - &mut self, + &self, missing: HashSet, ) -> Result, ResolutionError>; } +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl ResolveDependencies for &T +where + T: ResolveDependencies, +{ + type ResolvedEnvelope = T::ResolvedEnvelope; + async fn resolve( + &self, + missing: HashSet, + ) -> Result, ResolutionError> { + ::resolve(*self, missing).await + } +} + /// A resolver that does not even attempt to try and get dependencies pub struct NoopResolver; @@ -57,13 +58,43 @@ pub struct NoopResolver; #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl ResolveDependencies for NoopResolver { type ResolvedEnvelope = (); - async fn resolve( - &mut self, - m: HashSet, - ) -> Result, ResolutionError> { + async fn resolve(&self, m: HashSet) -> Result, ResolutionError> { Ok(Resolved { envelopes: vec![], unresolved: Some(m), }) } } + +#[derive(thiserror::Error, Debug)] +pub enum ResolutionError { + #[error(transparent)] + Envelope(#[from] EnvelopeError), + #[error(transparent)] + Body(#[from] BodyError), + #[error("{0}")] + Api(Box), + #[error(transparent)] + Build(#[from] UninitializedFieldError), + #[error("Resolution failed to find all missing dependant envelopes")] + ResolutionFailed, +} + +impl RetryableError for ResolutionError { + fn is_retryable(&self) -> bool { + use ResolutionError::*; + match self { + Envelope(e) => e.is_retryable(), + Body(b) => b.is_retryable(), + Api(a) => a.is_retryable(), + Build(_) => false, + ResolutionFailed => false, + } + } +} + +impl ResolutionError { + pub fn api(e: E) -> Self { + ResolutionError::Api(Box::new(e)) + } +} diff --git a/xmtp_api_d14n/src/queries/combinators.rs b/xmtp_api_d14n/src/queries/combinators.rs new file mode 100644 index 0000000000..a4137b804a --- /dev/null +++ b/xmtp_api_d14n/src/queries/combinators.rs @@ -0,0 +1,24 @@ +//! D14n-specific api combinators + +use xmtp_proto::{api::Endpoint, api_client::Paged, types::TopicCursor}; + +use crate::protocol::ResolveDependencies; + +mod ordered_query; + +pub trait D14nCombinatorExt: Endpoint { + fn ordered( + self, + resolver: R, + topic_cursor: TopicCursor, + ) -> ordered_query::OrderedQuery>::Output> + where + Self: Sized + Endpoint, + >::Output: Paged, + R: ResolveDependencies, + { + ordered_query::ordered(self, resolver, topic_cursor) + } +} + +impl D14nCombinatorExt for E where E: Endpoint {} diff --git a/xmtp_api_d14n/src/queries/combinators/ordered_query.rs b/xmtp_api_d14n/src/queries/combinators/ordered_query.rs new file mode 100644 index 0000000000..188378c8c9 --- /dev/null +++ b/xmtp_api_d14n/src/queries/combinators/ordered_query.rs @@ -0,0 +1,58 @@ +use std::marker::PhantomData; + +use xmtp_common::RetryableError; +use xmtp_proto::{ + api::{ApiClientError, Client, Query}, + api_client::Paged, + types::TopicCursor, +}; + +use crate::protocol::{Ordered, OrderedEnvelopeCollection, ProtocolEnvelope, ResolveDependencies}; + +pub struct OrderedQuery { + endpoint: E, + resolver: R, + topic_cursor: TopicCursor, + _marker: PhantomData, +} + +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl Query for OrderedQuery +where + E: Query, + C: Client, + C::Error: RetryableError, + R: ResolveDependencies::Message> + Clone, + T: Default + prost::Message + Paged + 'static, + for<'a> T::Message: ProtocolEnvelope<'a> + Clone, +{ + type Output = Vec; + async fn query(&mut self, client: &C) -> Result> { + let envelopes = Query::::query(&mut self.endpoint, client) + .await? + .messages(); + let mut ordering = Ordered::builder() + .envelopes(envelopes) + .resolver(&self.resolver) + // todo: maybe no clone here? + .topic_cursor(self.topic_cursor.clone()) + .build()?; + ordering.order().await.map_err(ApiClientError::other)?; + let (envelopes, _) = ordering.into_parts(); + Ok(envelopes) + } +} + +pub fn ordered( + endpoint: E, + resolver: R, + topic_cursor: TopicCursor, +) -> OrderedQuery { + OrderedQuery:: { + endpoint, + resolver, + topic_cursor, + _marker: PhantomData, + } +} diff --git a/xmtp_api_d14n/src/queries/d14n/mls.rs b/xmtp_api_d14n/src/queries/d14n/mls.rs index 446fd8c52f..1bba98e1e7 100644 --- a/xmtp_api_d14n/src/queries/d14n/mls.rs +++ b/xmtp_api_d14n/src/queries/d14n/mls.rs @@ -11,9 +11,11 @@ use crate::protocol::MessageMetadataExtractor; use crate::protocol::ProtocolEnvelope; use crate::protocol::SequencedExtractor; use crate::protocol::WelcomeMessageExtractor; +use crate::protocol::resolve; use crate::protocol::traits::Envelope; use crate::protocol::traits::EnvelopeCollection; use crate::protocol::traits::Extractor; +use crate::queries::D14nCombinatorExt; use xmtp_common::RetryableError; use xmtp_configuration::MAX_PAGE_SIZE; use xmtp_proto::api; @@ -25,11 +27,11 @@ use xmtp_proto::mls_v1::BatchQueryCommitLogResponse; use xmtp_proto::types::GroupId; use xmtp_proto::types::GroupMessageMetadata; use xmtp_proto::types::InstallationId; +use xmtp_proto::types::TopicCursor; use xmtp_proto::types::TopicKind; use xmtp_proto::types::WelcomeMessage; use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope; use xmtp_proto::xmtp::xmtpv4::message_api::GetNewestEnvelopeResponse; -use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse; #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] @@ -122,16 +124,20 @@ where ) -> Result, Self::Error> { let topic = TopicKind::GroupMessagesV1.create(&group_id); let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?; - let response: QueryEnvelopesResponse = QueryEnvelope::builder() + let mut topic_cursor = TopicCursor::default(); + topic_cursor.insert(topic.clone(), lcc.clone()); + let resolver = resolve::network_backoff(&self.client); + let response = QueryEnvelope::builder() .topic(topic) .last_seen(lcc) .limit(MAX_PAGE_SIZE) .build()? + .ordered(resolver, topic_cursor) .query(&self.client) .await?; let messages = SequencedExtractor::builder() - .envelopes(response.envelopes) + .envelopes(response) .build::() .get()?; Ok(messages diff --git a/xmtp_api_d14n/src/queries/mod.rs b/xmtp_api_d14n/src/queries/mod.rs index 60cc93a984..c7758a12dd 100644 --- a/xmtp_api_d14n/src/queries/mod.rs +++ b/xmtp_api_d14n/src/queries/mod.rs @@ -2,6 +2,7 @@ mod api_stats; mod boxed_streams; mod builder; mod client_bundle; +mod combinators; mod combined; mod d14n; mod stream; @@ -11,6 +12,7 @@ pub use api_stats::*; pub use boxed_streams::*; pub use builder::*; pub use client_bundle::*; +pub use combinators::*; pub use combined::*; pub use d14n::*; pub use stream::*; diff --git a/xmtp_proto/src/api_client/impls.rs b/xmtp_proto/src/api_client/impls.rs index d11dbe56f0..7c3bfecee4 100644 --- a/xmtp_proto/src/api_client/impls.rs +++ b/xmtp_proto/src/api_client/impls.rs @@ -1,6 +1,10 @@ use crate::{ mls_v1::QueryGroupMessagesResponse, types::{GroupId, GroupMessageMetadata, WelcomeMessage}, + xmtp::xmtpv4::{ + envelopes::OriginatorEnvelope, + message_api::{QueryEnvelopesResponse, SubscribeEnvelopesResponse}, + }, }; use super::*; @@ -27,6 +31,30 @@ impl Paged for QueryWelcomeMessagesResponse { } } +impl Paged for QueryEnvelopesResponse { + type Message = OriginatorEnvelope; + + fn info(&self) -> &Option { + &None + } + + fn messages(self) -> Vec { + self.envelopes + } +} + +impl Paged for SubscribeEnvelopesResponse { + type Message = OriginatorEnvelope; + + fn info(&self) -> &Option { + &None + } + + fn messages(self) -> Vec { + self.envelopes + } +} + impl std::fmt::Debug for AggregateStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "============ Api Stats ============")?; diff --git a/xmtp_proto/src/traits.rs b/xmtp_proto/src/traits.rs index 192c66060b..17016b10b7 100644 --- a/xmtp_proto/src/traits.rs +++ b/xmtp_proto/src/traits.rs @@ -106,6 +106,35 @@ pub trait Client: MaybeSend + MaybeSync { ) -> Result, ApiClientError>; } +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl Client for &T +where + T: Client, +{ + type Error = T::Error; + + type Stream = T::Stream; + + async fn request( + &self, + request: request::Builder, + path: PathAndQuery, + body: Bytes, + ) -> Result, ApiClientError> { + (**self).request(request, path, body).await + } + + async fn stream( + &self, + request: request::Builder, + path: http::uri::PathAndQuery, + body: Bytes, + ) -> Result, ApiClientError> { + (**self).stream(request, path, body).await + } +} + #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl Client for Box diff --git a/xmtp_proto/src/traits/error.rs b/xmtp_proto/src/traits/error.rs index 426dd3c2e8..26375c3e69 100644 --- a/xmtp_proto/src/traits/error.rs +++ b/xmtp_proto/src/traits/error.rs @@ -71,6 +71,12 @@ where } } +impl ApiClientError { + pub fn other(e: R) -> Self { + ApiClientError::Other(Box::new(e)) + } +} + impl RetryableError for ApiClientError where E: RetryableError + std::error::Error + 'static,