Skip to content

Commit bd2576d

Browse files
committed
add resolution strategy
1 parent 9fc52ab commit bd2576d

File tree

10 files changed

+91
-23
lines changed

10 files changed

+91
-23
lines changed

xmtp_api_d14n/src/protocol/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ pub use in_memory_cursor_store::*;
1111

1212
mod impls;
1313

14-
mod resolution;
15-
// pub use resolution::*;
14+
pub mod resolve;
1615

1716
pub mod sort;
17+
18+
mod order;
19+
pub use order::*;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use crate::protocol::{
2+
Envelope, EnvelopeError, OrderedEnvelopeCollection, ResolveDependencies, Sort, sort,
3+
};
4+
use derive_builder::Builder;
5+
use xmtp_proto::types::TopicCursor;
6+
7+
#[derive(Debug, Clone, Builder)]
8+
#[builder(setter(strip_option), build_fn(error = "EnvelopeError"))]
9+
pub struct Ordered<T, R> {
10+
envelopes: Vec<T>,
11+
resolver: R,
12+
}
13+
14+
impl<T: Clone, R: Clone> Ordered<T, R> {
15+
pub fn builder() -> OrderedBuilder<T, R> {
16+
OrderedBuilder::default()
17+
}
18+
}
19+
20+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
21+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
22+
impl<T, R> OrderedEnvelopeCollection for Ordered<T, R>
23+
where
24+
T: Envelope<'static>,
25+
R: ResolveDependencies<ResolvedEnvelope = T>,
26+
{
27+
/// Sort dependencies of `Self` according to XIP
28+
/// TODO fill in docs
29+
async fn sort(&mut self) -> Result<(), EnvelopeError> {
30+
let mut topic_cursor = TopicCursor::default();
31+
{
32+
sort::timestamp(&mut self.envelopes).sort()?;
33+
}
34+
while let Some(missing) = sort::causal(&mut self.envelopes, &mut topic_cursor).sort()? {
35+
let cursors = missing
36+
.iter()
37+
.map(|e| e.cursor())
38+
.collect::<Result<Vec<_>, _>>()?;
39+
// try to resolve the missing dependencies
40+
let resolved = self.resolver.resolve(cursors).await?;
41+
// re-apply at start of vec
42+
self.envelopes.splice(0..0, resolved.into_iter());
43+
sort::timestamp(&mut self.envelopes).sort()?;
44+
}
45+
Ok(())
46+
}
47+
}
File renamed without changes.

xmtp_api_d14n/src/protocol/sort/causal.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct CausalSort<'a, E> {
77
topic_cursor: &'a mut TopicCursor,
88
}
99

10-
impl<'a, E: Envelope<'a>> Sort<Vec<E>> for CausalSort<'a, E> {
10+
impl<'b, 'a: 'b, E: Envelope<'a>> Sort<Vec<E>> for CausalSort<'b, E> {
1111
fn sort(self) -> Result<Option<Vec<E>>, EnvelopeError> {
1212
let mut i = 0;
1313

@@ -52,10 +52,10 @@ impl<'a, E: Envelope<'a>> Sort<Vec<E>> for CausalSort<'a, E> {
5252
/// * `envelopes`: the [`Envelope`]'s being sorted
5353
/// * `topic_cursor`: the cursor position of all known topics
5454
///
55-
pub fn causal<'a, E: Envelope<'a>>(
56-
envelopes: &'a mut Vec<E>,
57-
topic_cursor: &'a mut TopicCursor,
58-
) -> impl Sort<Vec<E>> + use<'a, E> {
55+
pub fn causal<'b, 'a: 'b, E: Envelope<'a>>(
56+
envelopes: &'b mut Vec<E>,
57+
topic_cursor: &'b mut TopicCursor,
58+
) -> impl Sort<Vec<E>> + use<'a, 'b, E> {
5959
CausalSort {
6060
envelopes,
6161
topic_cursor,

xmtp_api_d14n/src/protocol/sort/timestamp.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ pub struct TimestampSort<'a, E> {
44
envelopes: &'a mut [E],
55
}
66

7-
impl<'a, E> Sort<&'a mut [E]> for TimestampSort<'a, E>
7+
impl<'b, 'a: 'b, E> Sort<()> for TimestampSort<'b, E>
88
where
99
E: Envelope<'a>,
1010
{
11-
fn sort(mut self) -> Result<Option<&'a mut [E]>, EnvelopeError> {
11+
fn sort(mut self) -> Result<Option<()>, EnvelopeError> {
1212
let envelopes = &mut self.envelopes;
1313
// we can only sort envelopes which have a timestamp
1414
envelopes.sort_by_key(|e| e.timestamp());
@@ -27,9 +27,7 @@ where
2727
/// If a timestamp does not have a cursor (extractor return [`Option::None`]) it is
2828
/// sorted according to [`Ord`], [impl](https://doc.rust-lang.org/src/core/option.rs.html#2341)
2929
/// This sort will never return any missing envelopes.
30-
pub fn timestamp<'a, E: Envelope<'a>>(
31-
envelopes: &'a mut [E],
32-
) -> impl Sort<&'a mut [E]> + use<'a, E> {
30+
pub fn timestamp<'b, 'a: 'b, E: Envelope<'a>>(envelopes: &'b mut [E]) -> impl Sort<()> {
3331
TimestampSort { envelopes }
3432
}
3533

xmtp_api_d14n/src/protocol/traits.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::protocol::SequencedExtractor;
66
use crate::protocol::V3GroupMessageExtractor;
77
use crate::protocol::V3WelcomeMessageExtractor;
88
use crate::protocol::WelcomeMessageExtractor;
9+
use derive_builder::UninitializedFieldError;
910
use itertools::Itertools;
1011
use xmtp_proto::types::GlobalCursor;
1112
use xmtp_proto::types::GroupMessage;
@@ -52,6 +53,9 @@ pub use dependency_resolution::*;
5253
mod sort;
5354
pub use sort::*;
5455

56+
mod ordered_collection;
57+
pub use ordered_collection::*;
58+
5559
#[derive(thiserror::Error, Debug)]
5660
pub enum EnvelopeError {
5761
#[error(transparent)]
@@ -62,6 +66,8 @@ pub enum EnvelopeError {
6266
TopicMismatch,
6367
#[error("Envelope not found")]
6468
NotFound(&'static str),
69+
#[error(transparent)]
70+
MissingBuilderField(#[from] UninitializedFieldError),
6571
// for extractors defined outside of this crate or
6672
// generic implementations like Tuples
6773
#[error("{0}")]
@@ -76,6 +82,7 @@ impl RetryableError for EnvelopeError {
7682
Self::TopicMismatch => false,
7783
Self::DynError(d) => retryable!(d),
7884
Self::NotFound(_) => false,
85+
Self::MissingBuilderField(_) => false,
7986
}
8087
}
8188
}
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
use xmtp_common::{MaybeSend, MaybeSync};
12
use xmtp_proto::types::Cursor;
23

34
use crate::protocol::{Envelope, EnvelopeError};
45

5-
#[allow(async_fn_in_trait)]
6-
pub trait ResolveDependencies<'a> {
7-
type ResolvedEnvelope: Envelope<'a>;
6+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
7+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
8+
pub trait ResolveDependencies: MaybeSend + MaybeSync {
9+
type ResolvedEnvelope: Envelope<'static> + MaybeSend + MaybeSync;
810
/// Resolve dependencies, starting with a list of dependencies. Should try to resolve
911
/// all dependents after `dependency`, if `Dependency` is missing as well.
1012
/// * Once resolved, these dependencies may have missing dependencies of their own.
@@ -19,13 +21,11 @@ pub trait ResolveDependencies<'a> {
1921
/// A resolver that does not even attempt to try and get dependencies
2022
pub struct NoopResolver;
2123

22-
#[allow(async_fn_in_trait)]
23-
impl ResolveDependencies<'_> for NoopResolver {
24+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
25+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
26+
impl ResolveDependencies for NoopResolver {
2427
type ResolvedEnvelope = ();
25-
async fn resolve(
26-
&mut self,
27-
_: Vec<Cursor>,
28-
) -> Result<Vec<Self::ResolvedEnvelope>, EnvelopeError> {
28+
async fn resolve(&mut self, _: Vec<Cursor>) -> Result<Vec<()>, EnvelopeError> {
2929
Ok(vec![])
3030
}
3131
}

xmtp_api_d14n/src/protocol/traits/envelopes.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Traits representing un-processed (extracted) and processed (extracted) protobuf types
22
use chrono::Utc;
3+
use xmtp_common::{MaybeSend, MaybeSync};
34
use xmtp_proto::types::Cursor;
45

56
use crate::protocol::{CursorExtractor, DependsOnExtractor, TimestampExtractor};
@@ -13,7 +14,7 @@ use super::*;
1314
* Theres a way to seal this trait implementation to
1415
* avoid external implementations which should be done.
1516
*/
16-
pub trait ProtocolEnvelope<'env>: std::fmt::Debug {
17+
pub trait ProtocolEnvelope<'env>: std::fmt::Debug + Send + Sync {
1718
type Nested<'a>
1819
where
1920
Self: 'a;
@@ -35,7 +36,7 @@ pub trait ProtocolEnvelope<'env>: std::fmt::Debug {
3536
/// a [`Cursor`](xmtp_proto::types::Cursor) per envelope.
3637
/// Likewise, Clients form the [`ClientEnvelope`] according to the [Client Node2Node Protocol](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#332-envelopes)
3738
/// Client envelopes maintain a payload/topic with MLS and Client-specific duties.
38-
pub trait Envelope<'env>: std::fmt::Debug {
39+
pub trait Envelope<'env>: std::fmt::Debug + MaybeSend + MaybeSync {
3940
/// Extract the topic for this envelope
4041
fn topic(&self) -> Result<Topic, EnvelopeError>;
4142
/// Extract the cursor for this envelope
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
use xmtp_common::{MaybeSend, MaybeSync};
2+
3+
use crate::protocol::EnvelopeError;
4+
5+
// these functions are not on `EnvelopeCollection` to keep its object-safety simpler.
6+
// since dependency resolution requires `async fn`.
7+
/// A sorted envelope collection
8+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
9+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
10+
pub trait OrderedEnvelopeCollection: MaybeSend + MaybeSync {
11+
/// Sort dependencies of `Self` according to XIP
12+
async fn sort(&mut self) -> Result<(), EnvelopeError>;
13+
}

0 commit comments

Comments
 (0)