Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion xmtp_api_d14n/src/protocol/impls/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl VectorClock for GlobalCursor {

fn merge_least(&mut self, other: &Self) {
for (&node, &seq) in other {
let entry = self.entry(node).or_insert(0);
let entry = self.entry(node).or_insert(seq);
*entry = (*entry).min(seq);
}
}
Expand Down
23 changes: 13 additions & 10 deletions xmtp_api_d14n/src/protocol/resolve/network_backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use crate::{
types::MissingEnvelope,
},
};
use derive_builder::Builder;
use itertools::Itertools;
use tracing::warn;
use xmtp_common::{ExponentialBackoff, Strategy};
use xmtp_common::{ExponentialBackoff, RetryableError, Strategy};
use xmtp_configuration::MAX_PAGE_SIZE;
use xmtp_proto::{
api::{Client, Query},
Expand All @@ -19,30 +18,34 @@ use xmtp_proto::{
};

/// try resolve d14n dependencies based on a backoff strategy
#[derive(Clone, Debug, Builder)]
#[builder(setter(strip_option), build_fn(error = "ResolutionError"))]
Copy link
Contributor Author

@insipx insipx Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this builder was a bit annoying b/c it required more Clone in trait args so i got rid of it in favor of a standalone function. Using default exponential backoff for now anyway, so only one argument

#[derive(Clone, Debug)]
pub struct NetworkBackoffResolver<ApiClient> {
client: ApiClient,
backoff: ExponentialBackoff,
}

impl<ApiClient: Clone> NetworkBackoffResolver<ApiClient> {
pub fn builder() -> NetworkBackoffResolverBuilder<ApiClient> {
NetworkBackoffResolverBuilder::default()
pub fn network_backoff<ApiClient>(client: &ApiClient) -> NetworkBackoffResolver<&ApiClient> {
NetworkBackoffResolver {
client,
backoff: ExponentialBackoff::default(),
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<ApiClient: Client> ResolveDependencies for NetworkBackoffResolver<ApiClient> {
impl<ApiClient> ResolveDependencies for NetworkBackoffResolver<ApiClient>
where
ApiClient: Client,
<ApiClient as Client>::Error: RetryableError,
{
type ResolvedEnvelope = OriginatorEnvelope;
/// Resolve dependencies, starting with a list of dependencies. Should try to resolve
/// all dependents after `dependency`, if `Dependency` is missing as well.
/// * Once resolved, these dependencies may have missing dependencies of their own.
/// # Returns
/// * `HashSet<Self::ResolvedEnvelope>`: The list of envelopes which were resolved.
async fn resolve(
&mut self,
&self,
mut missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError> {
let mut attempts = 0;
Expand Down Expand Up @@ -72,7 +75,7 @@ impl<ApiClient: Client> ResolveDependencies for NetworkBackoffResolver<ApiClient
.build()?
.query(&self.client)
.await
.map_err(|e| ResolutionError::Api(Box::new(e)))?
.map_err(ResolutionError::api)?
.envelopes;
let got = envelopes
.iter()
Expand Down
73 changes: 52 additions & 21 deletions xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
use std::{collections::HashSet, error::Error};
use std::collections::HashSet;

use derive_builder::UninitializedFieldError;
use xmtp_common::{MaybeSend, MaybeSync};
use xmtp_common::{MaybeSend, MaybeSync, RetryableError};
use xmtp_proto::api::BodyError;

use crate::protocol::{Envelope, EnvelopeError, types::MissingEnvelope};

#[derive(thiserror::Error, Debug)]
pub enum ResolutionError {
#[error(transparent)]
Envelope(#[from] EnvelopeError),
#[error(transparent)]
Body(#[from] BodyError),
#[error("{0}")]
Api(Box<dyn Error>),
#[error(transparent)]
Build(#[from] UninitializedFieldError),
#[error("Resolution failed to find all missing dependant envelopes")]
ResolutionFailed,
}

pub struct Resolved<E> {
pub envelopes: Vec<E>,
/// list of envelopes that could not be resolved with this strategy
Expand All @@ -45,25 +31,70 @@ pub trait ResolveDependencies: MaybeSend + MaybeSync {
/// # Returns
/// * `Vec<Self::ResolvedEnvelope>`: The list of envelopes which were resolved.
async fn resolve(
&mut self,
&self,
missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError>;
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<T> ResolveDependencies for &T
where
T: ResolveDependencies,
{
type ResolvedEnvelope = T::ResolvedEnvelope;
async fn resolve(
&self,
missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError> {
<T as ResolveDependencies>::resolve(*self, missing).await
}
}

/// A resolver that does not even attempt to try and get dependencies
pub struct NoopResolver;

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl ResolveDependencies for NoopResolver {
type ResolvedEnvelope = ();
async fn resolve(
&mut self,
m: HashSet<MissingEnvelope>,
) -> Result<Resolved<()>, ResolutionError> {
async fn resolve(&self, m: HashSet<MissingEnvelope>) -> Result<Resolved<()>, ResolutionError> {
Ok(Resolved {
envelopes: vec![],
unresolved: Some(m),
})
}
}

#[derive(thiserror::Error, Debug)]
pub enum ResolutionError {
#[error(transparent)]
Envelope(#[from] EnvelopeError),
#[error(transparent)]
Body(#[from] BodyError),
#[error("{0}")]
Api(Box<dyn RetryableError>),
#[error(transparent)]
Build(#[from] UninitializedFieldError),
#[error("Resolution failed to find all missing dependant envelopes")]
ResolutionFailed,
}

impl RetryableError for ResolutionError {
fn is_retryable(&self) -> bool {
use ResolutionError::*;
match self {
Envelope(e) => e.is_retryable(),
Body(b) => b.is_retryable(),
Api(a) => a.is_retryable(),
Build(_) => false,
ResolutionFailed => false,
}
}
}

impl ResolutionError {
pub fn api<E: RetryableError + 'static>(e: E) -> Self {
ResolutionError::Api(Box::new(e))
}
}
24 changes: 24 additions & 0 deletions xmtp_api_d14n/src/queries/combinators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//! D14n-specific api combinators

use xmtp_proto::{api::Endpoint, api_client::Paged, types::TopicCursor};

use crate::protocol::ResolveDependencies;

mod ordered_query;

pub trait D14nCombinatorExt<S>: Endpoint<S> {
fn ordered<R>(
self,
resolver: R,
topic_cursor: TopicCursor,
) -> ordered_query::OrderedQuery<Self, R, <Self as Endpoint<S>>::Output>
where
Self: Sized + Endpoint<S>,
<Self as Endpoint<S>>::Output: Paged,
R: ResolveDependencies,
{
ordered_query::ordered(self, resolver, topic_cursor)
}
}

impl<S, E> D14nCombinatorExt<S> for E where E: Endpoint<S> {}
58 changes: 58 additions & 0 deletions xmtp_api_d14n/src/queries/combinators/ordered_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::marker::PhantomData;

use xmtp_common::RetryableError;
use xmtp_proto::{
api::{ApiClientError, Client, Query},
api_client::Paged,
types::TopicCursor,
};

use crate::protocol::{Ordered, OrderedEnvelopeCollection, ProtocolEnvelope, ResolveDependencies};

pub struct OrderedQuery<E, R, T> {
endpoint: E,
resolver: R,
topic_cursor: TopicCursor,
_marker: PhantomData<T>,
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<E, C, R, T> Query<C> for OrderedQuery<E, R, T>
where
E: Query<C, Output = T>,
C: Client,
C::Error: RetryableError,
R: ResolveDependencies<ResolvedEnvelope = <T as Paged>::Message> + Clone,
T: Default + prost::Message + Paged + 'static,
for<'a> T::Message: ProtocolEnvelope<'a> + Clone,
{
type Output = Vec<T::Message>;
async fn query(&mut self, client: &C) -> Result<Self::Output, ApiClientError<C::Error>> {
let envelopes = Query::<C>::query(&mut self.endpoint, client)
.await?
.messages();
let mut ordering = Ordered::builder()
.envelopes(envelopes)
.resolver(&self.resolver)
// todo: maybe no clone here?
.topic_cursor(self.topic_cursor.clone())
.build()?;
ordering.order().await.map_err(ApiClientError::other)?;
let (envelopes, _) = ordering.into_parts();
Ok(envelopes)
}
}

pub fn ordered<E, R, T>(
endpoint: E,
resolver: R,
topic_cursor: TopicCursor,
) -> OrderedQuery<E, R, T> {
OrderedQuery::<E, R, T> {
endpoint,
resolver,
topic_cursor,
_marker: PhantomData,
}
}
12 changes: 9 additions & 3 deletions xmtp_api_d14n/src/queries/d14n/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use crate::protocol::MessageMetadataExtractor;
use crate::protocol::ProtocolEnvelope;
use crate::protocol::SequencedExtractor;
use crate::protocol::WelcomeMessageExtractor;
use crate::protocol::resolve;
use crate::protocol::traits::Envelope;
use crate::protocol::traits::EnvelopeCollection;
use crate::protocol::traits::Extractor;
use crate::queries::D14nCombinatorExt;
use xmtp_common::RetryableError;
use xmtp_configuration::MAX_PAGE_SIZE;
use xmtp_proto::api;
Expand All @@ -25,11 +27,11 @@ use xmtp_proto::mls_v1::BatchQueryCommitLogResponse;
use xmtp_proto::types::GroupId;
use xmtp_proto::types::GroupMessageMetadata;
use xmtp_proto::types::InstallationId;
use xmtp_proto::types::TopicCursor;
use xmtp_proto::types::TopicKind;
use xmtp_proto::types::WelcomeMessage;
use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope;
use xmtp_proto::xmtp::xmtpv4::message_api::GetNewestEnvelopeResponse;
use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesResponse;

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
Expand Down Expand Up @@ -122,16 +124,20 @@ where
) -> Result<Vec<xmtp_proto::types::GroupMessage>, Self::Error> {
let topic = TopicKind::GroupMessagesV1.create(&group_id);
let lcc = self.cursor_store.lowest_common_cursor(&[&topic])?;
let response: QueryEnvelopesResponse = QueryEnvelope::builder()
let mut topic_cursor = TopicCursor::default();
topic_cursor.insert(topic.clone(), lcc.clone());
let resolver = resolve::network_backoff(&self.client);
let response = QueryEnvelope::builder()
.topic(topic)
.last_seen(lcc)
.limit(MAX_PAGE_SIZE)
.build()?
.ordered(resolver, topic_cursor)
.query(&self.client)
.await?;

let messages = SequencedExtractor::builder()
.envelopes(response.envelopes)
.envelopes(response)
.build::<GroupMessageExtractor>()
.get()?;
Ok(messages
Expand Down
2 changes: 2 additions & 0 deletions xmtp_api_d14n/src/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod api_stats;
mod boxed_streams;
mod builder;
mod client_bundle;
mod combinators;
mod combined;
mod d14n;
mod stream;
Expand All @@ -11,6 +12,7 @@ pub use api_stats::*;
pub use boxed_streams::*;
pub use builder::*;
pub use client_bundle::*;
pub use combinators::*;
pub use combined::*;
pub use d14n::*;
pub use stream::*;
Expand Down
28 changes: 28 additions & 0 deletions xmtp_proto/src/api_client/impls.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::{
mls_v1::QueryGroupMessagesResponse,
types::{GroupId, GroupMessageMetadata, WelcomeMessage},
xmtp::xmtpv4::{
envelopes::OriginatorEnvelope,
message_api::{QueryEnvelopesResponse, SubscribeEnvelopesResponse},
},
};

use super::*;
Expand All @@ -27,6 +31,30 @@ impl Paged for QueryWelcomeMessagesResponse {
}
}

impl Paged for QueryEnvelopesResponse {
type Message = OriginatorEnvelope;

fn info(&self) -> &Option<PagingInfo> {
&None
}

fn messages(self) -> Vec<Self::Message> {
self.envelopes
}
}

impl Paged for SubscribeEnvelopesResponse {
type Message = OriginatorEnvelope;

fn info(&self) -> &Option<PagingInfo> {
&None
}

fn messages(self) -> Vec<Self::Message> {
self.envelopes
}
}

impl std::fmt::Debug for AggregateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "============ Api Stats ============")?;
Expand Down
Loading
Loading