From 44b5d1907c12308021502a025e583e3364273f4f Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Thu, 23 Oct 2025 16:03:57 -0400 Subject: [PATCH 1/2] scaffolding --- xmtp_api_d14n/src/protocol/extractors.rs | 2 + .../src/protocol/extractors/cursor.rs | 64 +++++++++++++++++++ xmtp_api_d14n/src/protocol/mod.rs | 8 +++ xmtp_api_d14n/src/protocol/resolution.rs | 10 +++ .../src/protocol/resolution/network.rs | 0 xmtp_api_d14n/src/protocol/sort.rs | 3 + xmtp_api_d14n/src/protocol/sort/casual.rs | 0 xmtp_api_d14n/src/protocol/sort/timestamp.rs | 14 ++++ xmtp_api_d14n/src/protocol/traits.rs | 6 ++ .../protocol/traits/dependency_resolution.rs | 30 +++++++++ xmtp_api_d14n/src/protocol/traits/sort.rs | 6 +- xmtp_api_d14n/src/protocol/types.rs | 3 + .../src/protocol/types/missing_envelope.rs | 17 +++++ .../originator_id_refresh_state.rs | 2 +- xmtp_proto/src/types/cursor.rs | 4 +- 15 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 xmtp_api_d14n/src/protocol/extractors/cursor.rs create mode 100644 xmtp_api_d14n/src/protocol/resolution.rs create mode 100644 xmtp_api_d14n/src/protocol/resolution/network.rs create mode 100644 xmtp_api_d14n/src/protocol/sort.rs create mode 100644 xmtp_api_d14n/src/protocol/sort/casual.rs create mode 100644 xmtp_api_d14n/src/protocol/sort/timestamp.rs create mode 100644 xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs create mode 100644 xmtp_api_d14n/src/protocol/types.rs create mode 100644 xmtp_api_d14n/src/protocol/types/missing_envelope.rs diff --git a/xmtp_api_d14n/src/protocol/extractors.rs b/xmtp_api_d14n/src/protocol/extractors.rs index 3e70a65663..762bb01d45 100644 --- a/xmtp_api_d14n/src/protocol/extractors.rs +++ b/xmtp_api_d14n/src/protocol/extractors.rs @@ -21,6 +21,8 @@ mod topics; pub use topics::*; mod data; pub use data::*; +mod cursor; +pub use cursor::*; #[cfg(test)] pub mod test_utils; diff --git a/xmtp_api_d14n/src/protocol/extractors/cursor.rs b/xmtp_api_d14n/src/protocol/extractors/cursor.rs new file mode 100644 index 0000000000..e3d2ae2755 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/extractors/cursor.rs @@ -0,0 +1,64 @@ +//! Extractor for a envelope [`Cursor`](xmtp_proto::types::Cursor) +//! useful for verifing a message has been read or maybe duplicates. +use xmtp_proto::ConversionError; +use xmtp_proto::mls_v1::welcome_message::WelcomePointer as V3WelcomePointer; +use xmtp_proto::types::Cursor; +use xmtp_proto::xmtp::mls::api::v1::{ + group_message::V1 as V3GroupMessage, welcome_message::V1 as V3WelcomeMessage, +}; +use xmtp_proto::xmtp::xmtpv4::envelopes::UnsignedOriginatorEnvelope; + +use crate::protocol::{EnvelopeVisitor, Extractor}; + +/// Extract Cursor from Envelopes +#[derive(Default, Clone, Debug)] +pub struct CursorExtractor { + cursor: Option, +} + +impl CursorExtractor { + pub fn new() -> Self { + Default::default() + } +} + +impl Extractor for CursorExtractor { + type Output = Result; + + fn get(self) -> Self::Output { + self.cursor.ok_or(ConversionError::Missing { + item: "cursor", + r#type: std::any::type_name::(), + }) + } +} + +impl EnvelopeVisitor<'_> for CursorExtractor { + type Error = ConversionError; + + fn visit_unsigned_originator( + &mut self, + e: &UnsignedOriginatorEnvelope, + ) -> Result<(), Self::Error> { + self.cursor = Some(Cursor { + sequence_id: e.originator_sequence_id, + originator_id: e.originator_node_id, + }); + Ok(()) + } + + fn visit_v3_group_message(&mut self, m: &V3GroupMessage) -> Result<(), Self::Error> { + self.cursor = Some(Cursor::v3_messages(m.id)); + Ok(()) + } + + fn visit_v3_welcome_message(&mut self, m: &V3WelcomeMessage) -> Result<(), Self::Error> { + self.cursor = Some(Cursor::v3_welcomes(m.id)); + Ok(()) + } + + fn visit_v3_welcome_pointer(&mut self, m: &V3WelcomePointer) -> Result<(), Self::Error> { + self.cursor = Some(Cursor::v3_welcomes(m.id)); + Ok(()) + } +} diff --git a/xmtp_api_d14n/src/protocol/mod.rs b/xmtp_api_d14n/src/protocol/mod.rs index 7f3106a4ce..46676ea740 100644 --- a/xmtp_api_d14n/src/protocol/mod.rs +++ b/xmtp_api_d14n/src/protocol/mod.rs @@ -10,3 +10,11 @@ mod in_memory_cursor_store; pub use in_memory_cursor_store::*; mod impls; + +mod resolution; +// pub use resolution::*; + +mod sort; +// pub use sort::*; + +pub mod types; diff --git a/xmtp_api_d14n/src/protocol/resolution.rs b/xmtp_api_d14n/src/protocol/resolution.rs new file mode 100644 index 0000000000..d64e0e37d5 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/resolution.rs @@ -0,0 +1,10 @@ +//! Implementations for Dependency Resolution strategies [XIP](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering) +//! +//! Possible Implementation of Dependency Resolution Strategies: +//! - keep retrying same query and error forever after and finish after some backoff +//! - query the originator that the mesasge is stored on. +//! - file misbehavior report if originator message came from is unresponsive +//! - dont resolve dependencies at all +//! - query random originators for the dependency +//! - round robin query for dependency +//! - etc. diff --git a/xmtp_api_d14n/src/protocol/resolution/network.rs b/xmtp_api_d14n/src/protocol/resolution/network.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/xmtp_api_d14n/src/protocol/sort.rs b/xmtp_api_d14n/src/protocol/sort.rs new file mode 100644 index 0000000000..e9fdffc2a2 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/sort.rs @@ -0,0 +1,3 @@ +//! Sorting Implelementations on Envelope Collections [XIP](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering) +mod casual; +mod timestamp; diff --git a/xmtp_api_d14n/src/protocol/sort/casual.rs b/xmtp_api_d14n/src/protocol/sort/casual.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/xmtp_api_d14n/src/protocol/sort/timestamp.rs b/xmtp_api_d14n/src/protocol/sort/timestamp.rs new file mode 100644 index 0000000000..a86a6aa60b --- /dev/null +++ b/xmtp_api_d14n/src/protocol/sort/timestamp.rs @@ -0,0 +1,14 @@ +use crate::protocol::{ProtocolEnvelope, Sort}; + +pub struct TimestampSort<'a, E> { + envelopes: &'a mut [E], +} + +impl<'a, E> Sort for TimestampSort<'a, E> +where + E: ProtocolEnvelope<'a>, +{ + fn sort(&mut self, topic_cursor: usize) { + todo!() + } +} diff --git a/xmtp_api_d14n/src/protocol/traits.rs b/xmtp_api_d14n/src/protocol/traits.rs index 21eda8fd34..c8644ede59 100644 --- a/xmtp_api_d14n/src/protocol/traits.rs +++ b/xmtp_api_d14n/src/protocol/traits.rs @@ -46,6 +46,12 @@ pub use vector_clock::*; mod full_api; pub use full_api::*; +mod dependency_resolution; +pub use dependency_resolution::*; + +mod sort; +pub use sort::*; + #[derive(thiserror::Error, Debug)] pub enum EnvelopeError { #[error(transparent)] diff --git a/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs b/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs new file mode 100644 index 0000000000..1235004f40 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs @@ -0,0 +1,30 @@ +use xmtp_common::{MaybeSend, MaybeSync}; + +use crate::protocol::{Envelope, EnvelopeError, types::MissingEnvelope}; + +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +pub trait ResolveDependencies: MaybeSend + MaybeSync { + type ResolvedEnvelope: Envelope<'static> + MaybeSend + MaybeSync; + /// Resolve dependencies, starting with a list of dependencies. Should try to resolve + /// all dependents after `dependency`, if `Dependency` is missing as well. + /// * Once resolved, these dependencies may have missing dependencies of their own. + /// # Returns + /// * `Vec`: The list of envelopes which were resolved. + async fn resolve( + &mut self, + missing: Vec, + ) -> Result, EnvelopeError>; +} + +/// A resolver that does not even attempt to try and get dependencies +pub struct NoopResolver; + +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +impl ResolveDependencies for NoopResolver { + type ResolvedEnvelope = (); + async fn resolve(&mut self, _: Vec) -> Result, EnvelopeError> { + Ok(vec![]) + } +} diff --git a/xmtp_api_d14n/src/protocol/traits/sort.rs b/xmtp_api_d14n/src/protocol/traits/sort.rs index 45f3cc1ac9..e133f5c243 100644 --- a/xmtp_api_d14n/src/protocol/traits/sort.rs +++ b/xmtp_api_d14n/src/protocol/traits/sort.rs @@ -2,8 +2,6 @@ /// dependencies, and by-originator. /// [XIP, cross-originator sorting](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering) pub trait Sort { - /// Sort envelopes by timestamp in-place - fn timestamp_sort(&mut self); - /// Casually Sort envelopes in-place - fn casual_sort(&mut self, topic_cursor: usize); + /// Sort envelopes in-place + fn sort(&mut self, topic_cursor: usize); } diff --git a/xmtp_api_d14n/src/protocol/types.rs b/xmtp_api_d14n/src/protocol/types.rs new file mode 100644 index 0000000000..50a36b4909 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/types.rs @@ -0,0 +1,3 @@ +//! Types specific to the xmtp d14n implementation +mod missing_envelope; +pub use missing_envelope::*; diff --git a/xmtp_api_d14n/src/protocol/types/missing_envelope.rs b/xmtp_api_d14n/src/protocol/types/missing_envelope.rs new file mode 100644 index 0000000000..3168cffa2e --- /dev/null +++ b/xmtp_api_d14n/src/protocol/types/missing_envelope.rs @@ -0,0 +1,17 @@ +use xmtp_proto::types::{Cursor, Topic}; + +/// An envelope that is depended on by another envelope, +/// but is missing from our local database or from a network call +/// see [`ResolveDependencies`](crate::protocol::ResolveDependencies) and +/// [`Sort`](crate::protocol::Sort) +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct MissingEnvelope { + topic: Topic, + cursor: Cursor, +} + +impl MissingEnvelope { + pub fn new(topic: Topic, cursor: Cursor) -> Self { + Self { topic, cursor } + } +} diff --git a/xmtp_db/src/encrypted_store/migration_test/originator_id_refresh_state.rs b/xmtp_db/src/encrypted_store/migration_test/originator_id_refresh_state.rs index a35c3c704a..6b8ce89a66 100644 --- a/xmtp_db/src/encrypted_store/migration_test/originator_id_refresh_state.rs +++ b/xmtp_db/src/encrypted_store/migration_test/originator_id_refresh_state.rs @@ -285,7 +285,7 @@ async fn up_welcome_unchanged() { .unwrap(); assert_eq!( welcome_cursor, - Cursor::welcomes(100), + Cursor::v3_welcomes(100), "Welcome entry should remain unchanged" ); } diff --git a/xmtp_proto/src/types/cursor.rs b/xmtp_proto/src/types/cursor.rs index db7814d9f5..d6fc45e6c2 100644 --- a/xmtp_proto/src/types/cursor.rs +++ b/xmtp_proto/src/types/cursor.rs @@ -29,7 +29,7 @@ impl Cursor { } } - pub const fn welcomes(sequence_id: u64) -> Self { + pub const fn v3_welcomes(sequence_id: u64) -> Self { Self { sequence_id, originator_id: Originators::WELCOME_MESSAGES, @@ -89,7 +89,7 @@ mod test { #[rstest] #[case(Cursor::commit_log(100), 100, Originators::REMOTE_COMMIT_LOG)] - #[case(Cursor::welcomes(200), 200, Originators::WELCOME_MESSAGES)] + #[case(Cursor::v3_welcomes(200), 200, Originators::WELCOME_MESSAGES)] #[case(Cursor::v3_messages(300), 300, Originators::APPLICATION_MESSAGES)] #[case(Cursor::installations(400), 400, Originators::INSTALLATIONS)] #[case(Cursor::mls_commits(500), 500, Originators::MLS_COMMITS)] From 7e857bd26f513eae4b30b03e1c0cc619b8f6d3ff Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Mon, 27 Oct 2025 10:42:53 -0400 Subject: [PATCH 2/2] add timestamp to `Envelope` trait and sort according to it --- Cargo.lock | 6 +- Cargo.toml | 1 + xmtp_api_d14n/Cargo.toml | 7 + .../protocol/sort/timestamp.txt | 7 + xmtp_api_d14n/src/protocol/extractors.rs | 2 + .../src/protocol/extractors/timestamp.rs | 57 +++++++++ xmtp_api_d14n/src/protocol/mod.rs | 3 +- xmtp_api_d14n/src/protocol/sort.rs | 4 +- xmtp_api_d14n/src/protocol/sort/casual.rs | 0 xmtp_api_d14n/src/protocol/sort/timestamp.rs | 121 +++++++++++++++++- .../src/protocol/traits/envelopes.rs | 20 +++ .../src/protocol/traits/extractor.rs | 16 +++ xmtp_api_d14n/src/protocol/traits/sort.rs | 2 +- 13 files changed, 234 insertions(+), 12 deletions(-) create mode 100644 xmtp_api_d14n/proptest-regressions/protocol/sort/timestamp.txt create mode 100644 xmtp_api_d14n/src/protocol/extractors/timestamp.rs delete mode 100644 xmtp_api_d14n/src/protocol/sort/casual.rs diff --git a/Cargo.lock b/Cargo.lock index 45a07a70d1..04a11683be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5154,14 +5154,13 @@ dependencies = [ [[package]] name = "proptest" -version = "1.7.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fcdab19deb5195a31cf7726a210015ff1496ba1464fd42cb4f537b8b01b471f" +checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40" dependencies = [ "bit-set", "bit-vec", "bitflags 2.9.4", - "lazy_static", "num-traits", "rand 0.9.2", "rand_chacha 0.9.0", @@ -8401,6 +8400,7 @@ dependencies = [ "openmls_rust_crypto", "parking_lot 0.12.4", "pin-project-lite", + "proptest", "prost", "rstest 0.26.1", "thiserror 2.0.16", diff --git a/Cargo.toml b/Cargo.toml index 758fc11fec..aa18907d48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,6 +108,7 @@ parking_lot = "0.12.3" pbjson = "0.8" pbjson-types = "0.8" pin-project-lite = "0.2" +proptest = { version = "1.9", default-features = false } prost = { version = "0.14", default-features = false } prost-types = { version = "0.14", default-features = false } # updating crates are blocked on https://github.com/dalek-cryptography/curve25519-dalek/pull/729 diff --git a/xmtp_api_d14n/Cargo.toml b/xmtp_api_d14n/Cargo.toml index 5170d92766..d39b80c1b9 100644 --- a/xmtp_api_d14n/Cargo.toml +++ b/xmtp_api_d14n/Cargo.toml @@ -56,11 +56,18 @@ xmtp_proto = { workspace = true, features = ["test-utils"] } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] ctor.workspace = true +proptest = { workspace = true, features = [ + "std", + "fork", + "timeout", + "bit-set", +] } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test.workspace = true # needed for rstest futures-timer = { workspace = true, features = ["wasm-bindgen"] } +proptest = { workspace = true, features = ["std"] } [features] diff --git a/xmtp_api_d14n/proptest-regressions/protocol/sort/timestamp.txt b/xmtp_api_d14n/proptest-regressions/protocol/sort/timestamp.txt new file mode 100644 index 0000000000..0d78a91392 --- /dev/null +++ b/xmtp_api_d14n/proptest-regressions/protocol/sort/timestamp.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 783ebaf4bd78f8d0bb95fa2dbd7c798aee6abfb3d504dbc7ac1f7724fd1bb326 # shrinks to mut envelopes = [TestEnvelope { time: Some(0) }, TestEnvelope { time: Some(-1) }] diff --git a/xmtp_api_d14n/src/protocol/extractors.rs b/xmtp_api_d14n/src/protocol/extractors.rs index 762bb01d45..56400eaede 100644 --- a/xmtp_api_d14n/src/protocol/extractors.rs +++ b/xmtp_api_d14n/src/protocol/extractors.rs @@ -23,6 +23,8 @@ mod data; pub use data::*; mod cursor; pub use cursor::*; +mod timestamp; +pub use timestamp::*; #[cfg(test)] pub mod test_utils; diff --git a/xmtp_api_d14n/src/protocol/extractors/timestamp.rs b/xmtp_api_d14n/src/protocol/extractors/timestamp.rs new file mode 100644 index 0000000000..2b32603472 --- /dev/null +++ b/xmtp_api_d14n/src/protocol/extractors/timestamp.rs @@ -0,0 +1,57 @@ +//! Extractor for an MLS Data field +//! useful for verifing a message has been read or maybe duplicates. +use chrono::Utc; +use xmtp_proto::ConversionError; +use xmtp_proto::mls_v1::welcome_message::WelcomePointer; +use xmtp_proto::xmtp::mls::api::v1::{ + group_message::V1 as V3GroupMessage, welcome_message::V1 as V3WelcomeMessage, +}; +use xmtp_proto::xmtp::xmtpv4::envelopes::UnsignedOriginatorEnvelope; + +use crate::protocol::{EnvelopeVisitor, Extractor}; + +/// Extract Mls Data from Envelopes +#[derive(Default, Clone, Debug)] +pub struct TimestampExtractor { + time: Option, +} + +impl TimestampExtractor { + pub fn new() -> Self { + Default::default() + } +} + +impl Extractor for TimestampExtractor { + type Output = Option>; + + fn get(self) -> Self::Output { + self.time.map(chrono::DateTime::from_timestamp_nanos) + } +} + +impl EnvelopeVisitor<'_> for TimestampExtractor { + type Error = ConversionError; + + fn visit_unsigned_originator( + &mut self, + e: &UnsignedOriginatorEnvelope, + ) -> Result<(), Self::Error> { + self.time = Some(e.originator_ns); + Ok(()) + } + + fn visit_v3_group_message(&mut self, message: &V3GroupMessage) -> Result<(), Self::Error> { + self.time = Some(message.created_ns as i64); + Ok(()) + } + + fn visit_v3_welcome_message(&mut self, message: &V3WelcomeMessage) -> Result<(), Self::Error> { + self.time = Some(message.created_ns as i64); + Ok(()) + } + fn visit_v3_welcome_pointer(&mut self, ptr: &WelcomePointer) -> Result<(), Self::Error> { + self.time = Some(ptr.created_ns as i64); + Ok(()) + } +} diff --git a/xmtp_api_d14n/src/protocol/mod.rs b/xmtp_api_d14n/src/protocol/mod.rs index 46676ea740..168ae56d8e 100644 --- a/xmtp_api_d14n/src/protocol/mod.rs +++ b/xmtp_api_d14n/src/protocol/mod.rs @@ -14,7 +14,6 @@ mod impls; mod resolution; // pub use resolution::*; -mod sort; -// pub use sort::*; +pub mod sort; pub mod types; diff --git a/xmtp_api_d14n/src/protocol/sort.rs b/xmtp_api_d14n/src/protocol/sort.rs index e9fdffc2a2..3ebdfb9d44 100644 --- a/xmtp_api_d14n/src/protocol/sort.rs +++ b/xmtp_api_d14n/src/protocol/sort.rs @@ -1,3 +1,5 @@ //! Sorting Implelementations on Envelope Collections [XIP](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering) -mod casual; +// mod causal; +// pub use causal::*; mod timestamp; +pub use timestamp::*; diff --git a/xmtp_api_d14n/src/protocol/sort/casual.rs b/xmtp_api_d14n/src/protocol/sort/casual.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/xmtp_api_d14n/src/protocol/sort/timestamp.rs b/xmtp_api_d14n/src/protocol/sort/timestamp.rs index a86a6aa60b..c03c40fe5f 100644 --- a/xmtp_api_d14n/src/protocol/sort/timestamp.rs +++ b/xmtp_api_d14n/src/protocol/sort/timestamp.rs @@ -1,14 +1,125 @@ -use crate::protocol::{ProtocolEnvelope, Sort}; +use crate::protocol::{Envelope, Sort}; pub struct TimestampSort<'a, E> { envelopes: &'a mut [E], } -impl<'a, E> Sort for TimestampSort<'a, E> +impl<'b, 'a, E> Sort for TimestampSort<'b, E> where - E: ProtocolEnvelope<'a>, + E: Envelope<'a>, { - fn sort(&mut self, topic_cursor: usize) { - todo!() + fn sort(mut self) { + let envelopes = &mut self.envelopes; + // we can only sort envelopes which have a timestamp + envelopes.sort_unstable_by_key(|e| e.timestamp()); + } +} + +/// Sorts Envelopes by server-side Timestamp in ascending order +/// * for d14n this will sort envelopes by +/// [`originator_ns`](xmtp_proto::xmtp::xmtpv4::envelopes::UnsignedOriginatorEnvelope::originator_ns) +/// * for v3 this will sort by created_ns on GroupMessage, WelcomeMessage, or WelcomePointer +/// overall, sorts according to the timestamp extracted by +/// [`TimestampExtractor`](crate::protocol::TimestampExtractor) +/// +/// If a timestamp does not have a cursor (extractor return [`Option::None`]) it is +/// sorted according to [`Ord`], [impl](https://doc.rust-lang.org/src/core/option.rs.html#2341) +/// This sort will never return any missing envelopes. +pub fn timestamp<'b, 'a: 'b, E: Envelope<'a>>(envelopes: &'b mut [E]) -> impl Sort { + TimestampSort { envelopes } +} + +#[cfg(test)] +mod tests { + use crate::protocol::sort; + use chrono::Utc; + use proptest::prelude::*; + use xmtp_common::Generate; + use xmtp_proto::xmtp::xmtpv4::envelopes::ClientEnvelope; + use xmtp_proto::xmtp::xmtpv4::envelopes::client_envelope::Payload; + + use super::*; + + #[derive(Debug)] + struct TestEnvelope { + time: Option>, + } + + impl TestEnvelope { + fn new(time: i64) -> Self { + Self { + time: Some(chrono::DateTime::from_timestamp_nanos(time)), + } + } + } + + impl Generate for TestEnvelope { + fn generate() -> Self { + TestEnvelope { + time: Some(chrono::DateTime::from_timestamp_nanos( + xmtp_common::rand_i64(), + )), + } + } + } + + impl Envelope<'_> for TestEnvelope { + fn topic(&self) -> Result { + unreachable!() + } + + fn payload(&self) -> Result { + unreachable!() + } + + fn timestamp(&self) -> Option> { + self.time + } + + fn client_envelope(&self) -> Result { + unreachable!() + } + + fn group_message( + &self, + ) -> Result, crate::protocol::EnvelopeError> + { + unreachable!() + } + + fn welcome_message( + &self, + ) -> Result, crate::protocol::EnvelopeError> + { + unreachable!() + } + + fn consume(&self, _extractor: E) -> Result + where + Self: Sized, + for<'a> crate::protocol::EnvelopeError: + From<>::Error>, + for<'a> E: crate::protocol::EnvelopeVisitor<'a> + crate::protocol::Extractor, + { + unreachable!() + } + } + + prop_compose! { + fn envelope_all_some()(id in 1..u32::MAX) -> TestEnvelope { + TestEnvelope::new(id as i64) + } + } + + fn is_sorted(sorted: &[TestEnvelope]) -> bool { + sorted.is_sorted_by_key(|e| e.time) + } + + #[xmtp_common::test] + fn sorts_by_timestamp() { + proptest!(|(mut envelopes in prop::collection::vec(envelope_all_some(), 0 .. 100))| { + sort::timestamp(&mut envelopes).sort(); + assert!(is_sorted(&envelopes), "envelopes were not sorted") + }); } } diff --git a/xmtp_api_d14n/src/protocol/traits/envelopes.rs b/xmtp_api_d14n/src/protocol/traits/envelopes.rs index 2ba27afdd2..187b369aeb 100644 --- a/xmtp_api_d14n/src/protocol/traits/envelopes.rs +++ b/xmtp_api_d14n/src/protocol/traits/envelopes.rs @@ -1,4 +1,8 @@ //! Traits representing un-processed (extracted) and processed (extracted) protobuf types +use chrono::Utc; + +use crate::protocol::TimestampExtractor; + use super::*; /// An low-level envelope from the network gRPC interface /* @@ -18,6 +22,9 @@ pub trait ProtocolEnvelope<'env> { fn get_nested(&self) -> Result, ConversionError>; } +//TODO: https://github.com/xmtp/libxmtp/issues/2691 +// will improve usage of timestamp/sorting/resolution, so that earlier +// networking layers do not deserialize more than necessary. /// Represents a Single High-Level Envelope /// An [`Envelope`] is a [`ProtocolEnvelope`] with some [`Extractor`](super::Extractor) /// applied to it. @@ -32,6 +39,8 @@ pub trait Envelope<'env> { fn topic(&self) -> Result; /// Extract the payload for this envelope fn payload(&self) -> Result; + /// Get the timestamp of this envelope + fn timestamp(&self) -> Option>; /// Extract the client envelope (envelope containing message payload & AAD, if any) for this /// envelope. fn client_envelope(&self) -> Result; @@ -65,6 +74,17 @@ where Ok(extractor.get()?) } + // TODO: Currently the only "unexpected" way for this to fail + // would be a deserialization error, or if timestamp is + // > 2262 A.D. + // Deserializing/failing earlier: https://github.com/xmtp/libxmtp/issues/2691 + // would encode more invariants into these extractor types + fn timestamp(&self) -> Option> { + let mut extractor = TimestampExtractor::default(); + self.accept(&mut extractor).ok()?; + extractor.maybe_get() + } + fn client_envelope(&self) -> Result { // ensures we only recurse the proto data structure once. let mut extractor = (TopicExtractor::new(), PayloadExtractor::new()); diff --git a/xmtp_api_d14n/src/protocol/traits/extractor.rs b/xmtp_api_d14n/src/protocol/traits/extractor.rs index 3d3fb5ce7a..8783116560 100644 --- a/xmtp_api_d14n/src/protocol/traits/extractor.rs +++ b/xmtp_api_d14n/src/protocol/traits/extractor.rs @@ -32,3 +32,19 @@ where self.get() } } + +pub trait MaybeExtractor: Extractor> { + type Value; + fn maybe_get(self) -> Option; +} + +impl MaybeExtractor for T +where + T: Extractor>, +{ + type Value = V; + + fn maybe_get(self) -> Option { + self.get() + } +} diff --git a/xmtp_api_d14n/src/protocol/traits/sort.rs b/xmtp_api_d14n/src/protocol/traits/sort.rs index e133f5c243..a8dbdb09e5 100644 --- a/xmtp_api_d14n/src/protocol/traits/sort.rs +++ b/xmtp_api_d14n/src/protocol/traits/sort.rs @@ -3,5 +3,5 @@ /// [XIP, cross-originator sorting](https://github.com/xmtp/XIPs/blob/main/XIPs/xip-49-decentralized-backend.md#335-cross-originator-message-ordering) pub trait Sort { /// Sort envelopes in-place - fn sort(&mut self, topic_cursor: usize); + fn sort(self); }