Skip to content

Commit d2d3986

Browse files
committed
add resolution strategy
1 parent 157a5d8 commit d2d3986

File tree

8 files changed

+84
-5
lines changed

8 files changed

+84
-5
lines changed

xmtp_api_d14n/src/protocol/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ pub use in_memory_cursor_store::*;
1111

1212
mod impls;
1313

14-
mod resolution;
15-
// pub use resolution::*;
14+
pub mod resolve;
1615

1716
pub mod sort;
1817

1918
pub mod types;
19+
20+
mod order;
21+
pub use order::*;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use crate::protocol::{
2+
Envelope, EnvelopeError, OrderedEnvelopeCollection, ResolveDependencies, Sort, sort,
3+
};
4+
use derive_builder::Builder;
5+
use xmtp_proto::types::TopicCursor;
6+
7+
/// Order dependencies of `Self` according to [XIP](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering)
8+
/// If dependencies are missing, this ordering will try to resolve them
9+
/// and re-apply resolved dependencies to the front of the envelope list
10+
/// construct this strategy with [`Ordered::builder`]
11+
#[derive(Debug, Clone, Builder)]
12+
#[builder(setter(strip_option), build_fn(error = "EnvelopeError"))]
13+
pub struct Ordered<T, R> {
14+
envelopes: Vec<T>,
15+
resolver: R,
16+
topic_cursor: TopicCursor,
17+
}
18+
19+
impl<T: Clone, R: Clone> Ordered<T, R> {
20+
pub fn builder() -> OrderedBuilder<T, R> {
21+
OrderedBuilder::default()
22+
}
23+
}
24+
25+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
26+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
27+
impl<T, R> OrderedEnvelopeCollection for Ordered<T, R>
28+
where
29+
T: Envelope<'static>,
30+
R: ResolveDependencies<ResolvedEnvelope = T>,
31+
{
32+
async fn order(&mut self) -> Result<(), EnvelopeError> {
33+
let Self {
34+
envelopes,
35+
resolver,
36+
topic_cursor,
37+
} = self;
38+
sort::timestamp(envelopes).sort()?;
39+
while let Some(missing) = sort::causal(envelopes, topic_cursor).sort()? {
40+
let cursors = missing
41+
.iter()
42+
.map(|e| e.cursor())
43+
.collect::<Result<Vec<_>, _>>()?;
44+
// try to resolve the missing dependencies
45+
let resolved = resolver.resolve(cursors).await?;
46+
// re-apply at start of vec
47+
envelopes.splice(0..0, resolved.into_iter());
48+
sort::timestamp(envelopes).sort()?;
49+
}
50+
Ok(())
51+
}
52+
}
File renamed without changes.

xmtp_api_d14n/src/protocol/traits.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::protocol::SequencedExtractor;
66
use crate::protocol::V3GroupMessageExtractor;
77
use crate::protocol::V3WelcomeMessageExtractor;
88
use crate::protocol::WelcomeMessageExtractor;
9+
use derive_builder::UninitializedFieldError;
910
use itertools::Itertools;
1011
use xmtp_proto::types::GlobalCursor;
1112
use xmtp_proto::types::GroupMessage;
@@ -52,6 +53,9 @@ pub use dependency_resolution::*;
5253
mod sort;
5354
pub use sort::*;
5455

56+
mod ordered_collection;
57+
pub use ordered_collection::*;
58+
5559
#[derive(thiserror::Error, Debug)]
5660
pub enum EnvelopeError {
5761
#[error(transparent)]
@@ -62,6 +66,8 @@ pub enum EnvelopeError {
6266
TopicMismatch,
6367
#[error("Envelope not found")]
6468
NotFound(&'static str),
69+
#[error(transparent)]
70+
MissingBuilderField(#[from] UninitializedFieldError),
6571
// for extractors defined outside of this crate or
6672
// generic implementations like Tuples
6773
#[error("{0}")]
@@ -76,6 +82,7 @@ impl RetryableError for EnvelopeError {
7682
Self::TopicMismatch => false,
7783
Self::DynError(d) => retryable!(d),
7884
Self::NotFound(_) => false,
85+
Self::MissingBuilderField(_) => false,
7986
}
8087
}
8188
}

xmtp_api_d14n/src/protocol/traits/envelopes.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Traits representing un-processed (extracted) and processed (extracted) protobuf types
22
use chrono::Utc;
3+
use xmtp_common::{MaybeSend, MaybeSync};
34
use xmtp_proto::types::Cursor;
45

56
use crate::protocol::{CursorExtractor, DependsOnExtractor, TimestampExtractor};
@@ -13,7 +14,7 @@ use super::*;
1314
* Theres a way to seal this trait implementation to
1415
* avoid external implementations which should be done.
1516
*/
16-
pub trait ProtocolEnvelope<'env>: std::fmt::Debug {
17+
pub trait ProtocolEnvelope<'env>: std::fmt::Debug + Send + Sync {
1718
type Nested<'a>
1819
where
1920
Self: 'a;
@@ -35,7 +36,7 @@ pub trait ProtocolEnvelope<'env>: std::fmt::Debug {
3536
/// a [`Cursor`](xmtp_proto::types::Cursor) per envelope.
3637
/// Likewise, Clients form the [`ClientEnvelope`] according to the [Client Node2Node Protocol](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#332-envelopes)
3738
/// Client envelopes maintain a payload/topic with MLS and Client-specific duties.
38-
pub trait Envelope<'env>: std::fmt::Debug {
39+
pub trait Envelope<'env>: std::fmt::Debug + MaybeSend + MaybeSync {
3940
/// Extract the topic for this envelope
4041
fn topic(&self) -> Result<Topic, EnvelopeError>;
4142
/// Extract the cursor for this envelope
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use xmtp_common::{MaybeSend, MaybeSync};
2+
3+
use crate::protocol::EnvelopeError;
4+
5+
// these functions are not on `EnvelopeCollection` to keep its object-safety simpler.
6+
// since dependency resolution requires `async fn`.
7+
/// A ordered envelope collection
8+
/// an `OrderedEnvelopeCollection` differs from [`Sort`](super::Sort)
9+
/// since it adds the inclusing of `async`, allowing
10+
/// an `OrderedEnvelopeCollection` to both
11+
/// [Sort](super::Sort) and [ResolveDependencies](super::ResolveDependencies)
12+
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
13+
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
14+
pub trait OrderedEnvelopeCollection: MaybeSend + MaybeSync {
15+
/// Order dependencies of `Self` according to [XIP](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering)
16+
async fn order(&mut self) -> Result<(), EnvelopeError>;
17+
}

xmtp_proto/src/types/topic_cursor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
use crate::types::{GlobalCursor, Topic};
77

88
/// A cursor that keeps a [`super::GlobalCursor`] for each topic it has seen.
9-
#[derive(Default, PartialEq)]
9+
#[derive(Default, Debug, PartialEq, Clone)]
1010
pub struct TopicCursor {
1111
inner: HashMap<Topic, GlobalCursor>,
1212
}

0 commit comments

Comments
 (0)