Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions xmtp_api_d14n/src/endpoints/d14n/query_envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use prost::Message;
use prost::bytes::Bytes;
use std::borrow::Cow;
use xmtp_proto::api::{BodyError, Endpoint};
use xmtp_proto::types::GlobalCursor;
use xmtp_proto::types::{GlobalCursor, Topic};
use xmtp_proto::xmtp::xmtpv4::message_api::QueryEnvelopesRequest;
use xmtp_proto::xmtp::xmtpv4::message_api::{EnvelopesQuery, QueryEnvelopesResponse};

Expand All @@ -12,7 +12,7 @@ use xmtp_proto::xmtp::xmtpv4::message_api::{EnvelopesQuery, QueryEnvelopesRespon
#[builder(build_fn(error = "BodyError"))]
pub struct QueryEnvelope {
#[builder(setter(each(name = "topic", into)))]
topics: Vec<Vec<u8>>,
topics: Vec<Topic>,
last_seen: GlobalCursor,
limit: u32,
}
Expand All @@ -32,7 +32,7 @@ impl Endpoint for QueryEnvelope {
fn body(&self) -> Result<Bytes, BodyError> {
let query = QueryEnvelopesRequest {
query: Some(EnvelopesQuery {
topics: self.topics.clone(),
topics: self.topics.iter().map(Topic::bytes).collect(),
originator_node_ids: vec![],
last_seen: Some(self.last_seen.clone().into()),
}),
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Endpoint for QueryEnvelopes {
mod test {
use super::*;
use xmtp_api_grpc::{error::GrpcError, test::XmtpdClient};
use xmtp_proto::{api, prelude::*};
use xmtp_proto::{api, prelude::*, types::TopicKind};

#[xmtp_common::test]
fn test_file_descriptor() {
Expand Down Expand Up @@ -142,20 +142,10 @@ mod test {

let endpoint = QueryEnvelope::builder()
.last_seen(Default::default())
.topic(vec![])
.topic(TopicKind::GroupMessagesV1.create(vec![]))
.limit(0)
.build()
.unwrap();
let err = api::ignore(endpoint).query(&client).await.unwrap_err();
tracing::info!("{}", err);
// the request will fail b/c we're using dummy data but
// we just care if the endpoint is working
match err {
ApiClientError::<GrpcError>::ClientWithEndpoint {
source: GrpcError::Status(ref s),
..
} => assert!(s.message().contains("invalid topic"), "{}", err),
_ => panic!("request failed"),
}
api::ignore(endpoint).query(&client).await.unwrap();
}
}
3 changes: 2 additions & 1 deletion xmtp_api_d14n/src/middleware/read_write_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ mod tests {

use xmtp_proto::{
api::{Query, mock::MockNetworkClient},
types::TopicKind,
xmtp::xmtpv4::envelopes::ClientEnvelope,
};
const FILTER: &str = "xmtp.xmtpv4.payer_api.PayerApi";
Expand Down Expand Up @@ -159,7 +160,7 @@ mod tests {
.times(1)
.returning(|_, _, _| Ok(http::Response::new(vec![].into())));
let mut e = QueryEnvelope::builder()
.topic(vec![])
.topic(TopicKind::GroupMessagesV1.create(vec![]))
.last_seen(Default::default())
.limit(0)
.build()?;
Expand Down
7 changes: 7 additions & 0 deletions xmtp_api_d14n/src/protocol/impls/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ impl VectorClock for GlobalCursor {
}
}

fn merge_least(&mut self, other: &Self) {
for (&node, &seq) in other {
let entry = self.entry(node).or_insert(0);
*entry = (*entry).min(seq);
}
}

fn compare(&self, other: &Self) -> ClockOrdering {
let all_nodes: HashSet<_> = self.keys().chain(other.keys()).collect();

Expand Down
58 changes: 44 additions & 14 deletions xmtp_api_d14n/src/protocol/order.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashSet;

use crate::protocol::{
Envelope, EnvelopeError, OrderedEnvelopeCollection, ResolutionError, ResolveDependencies, Sort,
VectorClock, sort, types::MissingEnvelope,
Envelope, EnvelopeError, OrderedEnvelopeCollection, ResolutionError, ResolveDependencies,
Resolved, Sort, VectorClock, sort, types::MissingEnvelope,
};
use derive_builder::Builder;
use itertools::Itertools;
Expand All @@ -18,6 +20,12 @@ pub struct Ordered<T, R> {
topic_cursor: TopicCursor,
}

impl<T, R> Ordered<T, R> {
pub fn into_parts(self) -> (Vec<T>, TopicCursor) {
(self.envelopes, self.topic_cursor)
}
}

impl<T: Clone, R: Clone> Ordered<T, R> {
pub fn builder() -> OrderedBuilder<T, R> {
OrderedBuilder::default()
Expand All @@ -38,7 +46,7 @@ where
topic_cursor,
} = self;
sort::timestamp(envelopes).sort()?;
while let Some(missing) = sort::causal(envelopes, topic_cursor).sort()? {
while let Some(mut missing) = sort::causal(envelopes, topic_cursor).sort()? {
let cursors = missing
.iter()
.map(|e| {
Expand All @@ -49,22 +57,44 @@ where
Ok(need
.into_iter()
.map(|c| MissingEnvelope::new(topic.clone(), c))
.collect::<Vec<MissingEnvelope>>())
.collect::<HashSet<MissingEnvelope>>())
})
.flatten_ok()
.collect::<Result<Vec<MissingEnvelope>, EnvelopeError>>()?;
let resolved = match resolver.resolve(cursors).await {
// if resolution fails, drop the missing envelopes.
// in this case, we will not process any of those envelopes
// until the next query.
Err(ResolutionError::ResolutionFailed) => {
return Ok(());
.collect::<Result<HashSet<MissingEnvelope>, EnvelopeError>>()?;
let Resolved {
envelopes: resolved,
unresolved,
} = resolver.resolve(cursors).await?;
if resolved.is_empty() {
// if we cant resolve anything, break the loop
break;
}
if let Some(unresolved) = unresolved {
let unresolved = unresolved
.into_iter()
.map(|m| m.cursor)
.collect::<HashSet<_>>();
// if the resolver fails to resolve some envelopes, ignore them.
// delete unresolved envelopes from missing envelopes list.
// cannot use retain directly b/c curosr returns Result<>.
// see https://github.com/xmtp/libxmtp/issues/2691
// TODO:2691
let mut to_remove = HashSet::new();
for (i, m) in missing.iter().enumerate() {
if unresolved.contains(&m.cursor()?) {
to_remove.insert(i);
}
}
Err(e) => return Err(e),
Ok(r) => r,
};
let mut i = 0;
missing.retain(|_m| {
let could_not_resolve = to_remove.contains(&i);
i += 1;
!could_not_resolve
});
}
// apply missing before resolved, so that the resolved
// are applied to the topic cursor before the missing dependencies.
// todo: maybe `VecDeque` better here?
envelopes.splice(0..0, missing.into_iter());
envelopes.splice(0..0, resolved.into_iter());
sort::timestamp(envelopes).sort()?;
Expand Down
3 changes: 3 additions & 0 deletions xmtp_api_d14n/src/protocol/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@
//! - query random originators for the dependency
//! - round robin query for dependency
//! - etc.

mod network_backoff;
pub use network_backoff::*;
Empty file.
112 changes: 112 additions & 0 deletions xmtp_api_d14n/src/protocol/resolve/network_backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::collections::HashSet;

use crate::{
d14n::QueryEnvelope,
protocol::{
Envelope, ResolutionError, ResolveDependencies, Resolved, VectorClock,
types::MissingEnvelope,
},
};
use derive_builder::Builder;
use itertools::Itertools;
use tracing::warn;
use xmtp_common::{ExponentialBackoff, Strategy};
use xmtp_configuration::MAX_PAGE_SIZE;
use xmtp_proto::{
api::{Client, Query},
types::{Cursor, GlobalCursor, Topic},
xmtp::xmtpv4::envelopes::OriginatorEnvelope,
};

/// try resolve d14n dependencies based on a backoff strategy
#[derive(Clone, Debug, Builder)]
#[builder(setter(strip_option), build_fn(error = "ResolutionError"))]
pub struct NetworkBackoffResolver<ApiClient> {
client: ApiClient,
backoff: ExponentialBackoff,
}

impl<ApiClient: Clone> NetworkBackoffResolver<ApiClient> {
pub fn builder() -> NetworkBackoffResolverBuilder<ApiClient> {
NetworkBackoffResolverBuilder::default()
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<ApiClient: Client> ResolveDependencies for NetworkBackoffResolver<ApiClient> {
type ResolvedEnvelope = OriginatorEnvelope;
/// Resolve dependencies, starting with a list of dependencies. Should try to resolve
/// all dependents after `dependency`, if `Dependency` is missing as well.
/// * Once resolved, these dependencies may have missing dependencies of their own.
/// # Returns
/// * `HashSet<Self::ResolvedEnvelope>`: The list of envelopes which were resolved.
async fn resolve(
&mut self,
mut missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError> {
let mut attempts = 0;
let time_spent = xmtp_common::time::Instant::now();
let mut resolved = Vec::new();
while !missing.is_empty() {
if let Some(wait_for) = self.backoff.backoff(attempts, time_spent) {
xmtp_common::time::sleep(wait_for).await;
attempts += 1;
} else {
missing.iter().for_each(|m| {
warn!(
"dropping missing dependency {} due to lack of resolution",
m
);
});
return Ok(Resolved {
envelopes: resolved,
unresolved: Some(missing),
});
}
let (topics, lcc) = lcc(&missing);
let envelopes = QueryEnvelope::builder()
.topics(topics)
.last_seen(lcc)
.limit(MAX_PAGE_SIZE)
.build()?
.query(&self.client)
.await
.map_err(|e| ResolutionError::Api(Box::new(e)))?
.envelopes;
let got = envelopes
.iter()
.map(|e| e.cursor())
.collect::<Result<HashSet<Cursor>, _>>()?;
missing.retain(|m| !got.contains(&m.cursor));
resolved.extend(envelopes);
}
Ok(Resolved {
envelopes: resolved,
unresolved: None,
})
}
}

/// Get the LCC and topics from a list of missing envelopes
fn lcc(missing: &HashSet<MissingEnvelope>) -> (Vec<Topic>, GlobalCursor) {
// get the lcc by first getting lowest Cursor
// per topic, then merging the global cursor of every topic into
// one.
let (topics, last_seen): (Vec<_>, Vec<GlobalCursor>) = missing
.iter()
.into_grouping_map_by(|m| m.topic.clone())
.fold(GlobalCursor::default(), |mut acc, _key, val| {
acc.apply_least(&val.cursor);
acc
})
.into_iter()
.unzip();
let last_seen = last_seen
.into_iter()
.fold(GlobalCursor::default(), |mut acc, clock| {
acc.merge_least(&clock);
acc
});
(topics, last_seen)
}
31 changes: 26 additions & 5 deletions xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::error::Error;
use std::{collections::HashSet, error::Error};

use derive_builder::UninitializedFieldError;
use xmtp_common::{MaybeSend, MaybeSync};
Expand All @@ -20,6 +20,21 @@ pub enum ResolutionError {
ResolutionFailed,
}

pub struct Resolved<E> {
pub envelopes: Vec<E>,
/// list of envelopes that could not be resolved with this strategy
pub unresolved: Option<HashSet<MissingEnvelope>>,
}

impl<E> Resolved<E> {
pub fn new(envelopes: Vec<E>, unresolved: Option<HashSet<MissingEnvelope>>) -> Self {
Self {
envelopes,
unresolved,
}
}
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait ResolveDependencies: MaybeSend + MaybeSync {
Expand All @@ -31,8 +46,8 @@ pub trait ResolveDependencies: MaybeSend + MaybeSync {
/// * `Vec<Self::ResolvedEnvelope>`: The list of envelopes which were resolved.
async fn resolve(
&mut self,
missing: Vec<MissingEnvelope>,
) -> Result<Vec<Self::ResolvedEnvelope>, ResolutionError>;
missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<Self::ResolvedEnvelope>, ResolutionError>;
}

/// A resolver that does not even attempt to try and get dependencies
Expand All @@ -42,7 +57,13 @@ pub struct NoopResolver;
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl ResolveDependencies for NoopResolver {
type ResolvedEnvelope = ();
async fn resolve(&mut self, _: Vec<MissingEnvelope>) -> Result<Vec<()>, ResolutionError> {
Ok(vec![])
async fn resolve(
&mut self,
m: HashSet<MissingEnvelope>,
) -> Result<Resolved<()>, ResolutionError> {
Ok(Resolved {
envelopes: vec![],
unresolved: Some(m),
})
}
}
3 changes: 3 additions & 0 deletions xmtp_api_d14n/src/protocol/traits/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub trait VectorClock {
/// Merges another clock into this one by taking the max ordering per node
fn merge(&mut self, other: &Self);

/// Merges another clock into this one by taking the min ordering per node
fn merge_least(&mut self, other: &Self);

/// Compares this clock to another to determine their relative ordering
fn compare(&self, other: &Self) -> ClockOrdering;

Expand Down
10 changes: 8 additions & 2 deletions xmtp_api_d14n/src/protocol/types/missing_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ use xmtp_proto::types::{Cursor, Topic};
/// [`Sort`](crate::protocol::Sort)
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct MissingEnvelope {
topic: Topic,
cursor: Cursor,
pub topic: Topic,
pub cursor: Cursor,
}

impl MissingEnvelope {
pub fn new(topic: Topic, cursor: Cursor) -> Self {
Self { topic, cursor }
}
}

impl std::fmt::Display for MissingEnvelope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.cursor, self.topic)
}
}
Loading
Loading