Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions common/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ impl ExponentialBackoffBuilder {
duration: self.duration.unwrap_or(Duration::from_millis(25)),
max_jitter: self.max_jitter.unwrap_or(Duration::from_millis(25)),
multiplier: self.multiplier.unwrap_or(3),
total_wait_max: self.total_wait_max.unwrap_or_default(),
individual_wait_max: Default::default(),
total_wait_max: self.total_wait_max.unwrap_or(Duration::from_secs(120)),
individual_wait_max: Duration::from_secs(30),
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions nix/libxmtp.nix
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
, xmtp
, omnix
, toxiproxy
, rr
, vscode-extensions
, lldb
, ...
}:
let
Expand Down Expand Up @@ -95,6 +98,9 @@ mkShell {
binaryen
wasm-pack
binaryen
rr
vscode-extensions.vadimcn.vscode-lldb
lldb

# Protobuf
buf
Expand Down
9 changes: 9 additions & 0 deletions xmtp_api_d14n/proptest-regressions/protocol/order.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc c3efa58ec48f8da2087da00734e7b8477b98ad2f1519988add8e5f840964525f # shrinks to env_with_missing = EnvelopesWithMissing { removed: [TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 20 }, depends_on: GlobalCursor { inner: {20: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 40 }, depends_on: GlobalCursor { inner: {20: 2, 10: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 1, 20: 1, 10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 2, 10: 2, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 20 }, depends_on: GlobalCursor { inner: {40: 3, 20: 2, 10: 6, 30: 2} } }], envelopes: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 20 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 10 }, depends_on: GlobalCursor { inner: {20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 1, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 3, 20: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 30 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 3, 30: 1, 40: 2, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 10 }, depends_on: GlobalCursor { inner: {30: 1, 20: 2, 10: 4, 40: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 30 }, depends_on: GlobalCursor { inner: {10: 5, 40: 3, 30: 1, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 6, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 5, 20: 2, 40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 20 }, depends_on: GlobalCursor { inner: {10: 6, 20: 3, 30: 2, 40: 3} } }] }
cc 4cb58b61e81291b31590cc6ca29df9cf8737a6f2fcc33c20b6021fa991355a31 # shrinks to env_with_missing = EnvelopesWithMissing { removed: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 40 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 2, 40: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 3, 40: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 20 }, depends_on: GlobalCursor { inner: {10: 3, 20: 2, 40: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 20 }, depends_on: GlobalCursor { inner: {20: 3} } }], envelopes: [TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 2, 10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 20 }, depends_on: GlobalCursor { inner: {40: 4, 10: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 30 }, depends_on: GlobalCursor { inner: {20: 1, 40: 4, 10: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 20 }, depends_on: GlobalCursor { inner: {20: 1, 10: 3, 40: 4, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 3, 20: 1, 40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 10 }, depends_on: GlobalCursor { inner: {20: 3, 10: 4, 40: 4} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 20 }, depends_on: GlobalCursor { inner: {30: 1, 20: 4, 10: 5, 40: 2} } }] }
cc 0ab5e000585326d1342cb92385d095a09cb4fc3c9af67b5e16807bddf0017f75 # shrinks to env_with_missing = EnvelopesWithMissing { removed: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 10 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 1, 10: 2, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 30 }, depends_on: GlobalCursor { inner: {10: 2, 40: 2, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 3, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 30 }, depends_on: GlobalCursor { inner: {20: 1, 10: 4, 30: 3, 40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 6, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 3, 20: 1, 10: 5, 30: 4} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 4, 30: 2, 40: 3, 20: 1} } }], envelopes: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 30 }, depends_on: GlobalCursor { inner: {40: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 1, 30: 1, 10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 30 }, depends_on: GlobalCursor { inner: {30: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 20 }, depends_on: GlobalCursor { inner: {40: 2, 10: 4, 30: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 10 }, depends_on: GlobalCursor { inner: {20: 1, 40: 2, 10: 4, 30: 4} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 40 }, depends_on: GlobalCursor { inner: {20: 1, 40: 2, 30: 3} } }] }
13 changes: 13 additions & 0 deletions xmtp_api_d14n/src/protocol/impls/vector_clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,16 @@ impl VectorClock for GlobalCursor {
*entry = (*entry).max(*seq)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[xmtp_common::test]
fn dominates_empty() {
let empty = GlobalCursor::default();
let mut not_empty = GlobalCursor::default();
not_empty.insert(1, 1);
assert!(not_empty.dominates(&empty));
}
}
145 changes: 141 additions & 4 deletions xmtp_api_d14n/src/protocol/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
.flatten_ok()
.collect::<Result<HashSet<MissingEnvelope>, EnvelopeError>>()?;
let Resolved {
envelopes: resolved,
resolved,
unresolved,
} = resolver.resolve(cursors).await?;
if resolved.is_empty() {
Expand All @@ -86,10 +86,11 @@ where
}
}
let mut i = 0;
// or, retain all resolved envelopes
missing.retain(|_m| {
let could_not_resolve = to_remove.contains(&i);
let resolved = to_remove.contains(&i);
i += 1;
!could_not_resolve
!resolved
});
}
// apply missing before resolved, so that the resolved
Expand All @@ -103,4 +104,140 @@ where
}
}

// TODO: tests
#[cfg(test)]
mod test {
use super::*;
use crate::protocol::utils::test::{EnvelopesWithMissing, TestEnvelope, missing_dependencies};
use futures::FutureExt;
use proptest::{prelude::*, sample::subsequence};
use xmtp_proto::types::OriginatorId;

// Simple mock resolver that holds available envelopes to resolve
#[derive(Clone, Debug)]
struct MockResolver {
available: Vec<TestEnvelope>,
unavailable: Vec<TestEnvelope>,
}

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl ResolveDependencies for MockResolver {
type ResolvedEnvelope = TestEnvelope;

async fn resolve(
&self,
missing: HashSet<MissingEnvelope>,
) -> Result<Resolved<TestEnvelope>, ResolutionError> {
// Return envelopes that match the missing set
let resolved = self
.available
.iter()
.filter(|env| {
let cursor = env.cursor().unwrap();
let topic = env.topic().unwrap();
missing.contains(&MissingEnvelope::new(topic, cursor))
})
.cloned()
.collect::<Vec<_>>();

Ok(Resolved::new(resolved, None))
}
}

prop_compose! {
pub fn resolvable_dependencies(length: usize, originators: Vec<OriginatorId>)
(envelopes in missing_dependencies(length, originators))
(available in subsequence(envelopes.removed.clone(), envelopes.removed.len()), envelopes in Just(envelopes))
-> EnvelopesWithResolver {
let mut unavailable = envelopes.removed.clone();
unavailable.retain(|e| !available.contains(e));
EnvelopesWithResolver {
missing: envelopes,
resolver: MockResolver {
available,
unavailable
}
}
}
}

#[derive(Debug, Clone)]
struct EnvelopesWithResolver {
missing: EnvelopesWithMissing,
resolver: MockResolver,
}
proptest! {
#[xmtp_common::test]
fn orders_with_unresolvable_dependencies(
envelopes in resolvable_dependencies(30, vec![10, 20, 30, 40, 50, 60])
) {
let EnvelopesWithResolver {
missing,
resolver
} = envelopes;

let (available, unavailable) = (resolver.available.clone(), resolver.unavailable.clone());
let mut ordered = Ordered::builder()
.envelopes(missing.envelopes)
.resolver(resolver)
.topic_cursor(TopicCursor::default())
.build()
.unwrap();

// Perform ordering - some dependencies cannot be resolved
ordered.order().now_or_never()
.expect("Future should complete immediately")
.unwrap();

let (result, mut topic_cursor) = ordered.into_parts();

// Check that no envelope in the result depends on an unavailable removed envelope
for envelope in &result {
let depends_on = envelope.depends_on().unwrap().unwrap_or_default();
let topic = envelope.topic().unwrap();
let topic_clock = topic_cursor.get_or_default(&topic);

// If this envelope's dependencies are satisfied by the topic cursor,
// it should not depend on any unavailable envelopes
if topic_clock.dominates(&depends_on) {
for unavailable_env in &unavailable {
prop_assert!(
!envelope.has_dependency_on(unavailable_env),
"Envelope with satisfied dependencies should not depend on unavailable envelope. \
Envelope: {}, Unavailable: {}",
envelope,
unavailable_env
);
}
} else {
panic!("topic clock should always be complete at conclusion of ordering. {} does not dominate envelope {} depending on {}", topic_clock, envelope.cursor, depends_on);
}
}

// Verify that envelopes which were made available are in the result
// (unless they themselves depend on unavailable envelopes)
for available_env in &available {
//
if available_env.has_dependency_on_any(&unavailable) { continue; }
// none of the envelopes have a dependency on this one, so resolver wont care
if result.iter().all(|e| !e.has_dependency_on(available_env)) { continue; }
prop_assert!(
result.iter().any(|e| e == available_env),
"Result does not contain {}", available_env
);
// If it's in the result, verify its dependencies are satisfied
let depends_on = available_env.depends_on().unwrap().unwrap_or_default();
let topic = available_env.topic().unwrap();
let topic_clock = topic_cursor.get_or_default(&topic);

prop_assert!(
topic_clock.dominates(&depends_on),
"Available envelope in result should have satisfied dependencies. \
Envelope: {}, Topic clock: {}",
available_env,
topic_clock
);
}
}
}
}
2 changes: 1 addition & 1 deletion xmtp_api_d14n/src/protocol/resolve/network_backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ where
resolved.extend(envelopes);
}
Ok(Resolved {
envelopes: resolved,
resolved,
unresolved: (!missing.is_empty()).then_some(missing),
})
}
Expand Down
6 changes: 3 additions & 3 deletions xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use xmtp_proto::api::BodyError;
use crate::protocol::{Envelope, EnvelopeError, types::MissingEnvelope};

pub struct Resolved<E> {
pub envelopes: Vec<E>,
pub resolved: Vec<E>,
/// list of envelopes that could not be resolved with this strategy
pub unresolved: Option<HashSet<MissingEnvelope>>,
}

impl<E> Resolved<E> {
pub fn new(envelopes: Vec<E>, unresolved: Option<HashSet<MissingEnvelope>>) -> Self {
Self {
envelopes,
resolved: envelopes,
unresolved,
}
}
Expand Down Expand Up @@ -60,7 +60,7 @@ impl ResolveDependencies for NoopResolver {
type ResolvedEnvelope = ();
async fn resolve(&self, m: HashSet<MissingEnvelope>) -> Result<Resolved<()>, ResolutionError> {
Ok(Resolved {
envelopes: vec![],
resolved: vec![],
unresolved: Some(m),
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn test_resolve_all_found_immediately<R>(
assert!(result.is_ok(), "Resolution should succeed");
let resolved = result.unwrap();
assert_eq!(
resolved.envelopes.len(),
resolved.resolved.len(),
expected_count,
"Should resolve exactly {} envelopes",
expected_count
Expand Down Expand Up @@ -76,7 +76,7 @@ pub async fn test_resolve_partial_resolution<R>(
let resolved = result.unwrap();

assert_eq!(
resolved.envelopes.len(),
resolved.resolved.len(),
expected_resolved_count,
"Should resolve exactly {} envelopes",
expected_resolved_count
Expand Down Expand Up @@ -127,7 +127,7 @@ where

assert!(result.is_ok(), "Resolution should succeed with empty set");
let resolved = result.unwrap();
assert!(resolved.envelopes.is_empty(), "Should resolve no envelopes");
assert!(resolved.resolved.is_empty(), "Should resolve no envelopes");
assert!(
resolved.unresolved.is_none() || resolved.unresolved.as_ref().unwrap().is_empty(),
"Should have no unresolved envelopes"
Expand Down
10 changes: 9 additions & 1 deletion xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ impl TestEnvelope {
let depends_on_sid = self.depends_on.get(&originator);
depends_on_sid == other.cursor.sequence_id
}

pub fn has_dependency_on_any(&self, other: &[TestEnvelope]) -> bool {
other.iter().any(|e| self.has_dependency_on(e))
}
}

impl std::fmt::Display for TestEnvelope {
Expand All @@ -38,7 +42,11 @@ impl Envelope<'_> for TestEnvelope {
}

fn timestamp(&self) -> Option<chrono::DateTime<Utc>> {
unreachable!()
// Create a deterministic timestamp based on originator_id and sequence_id
// This ensures envelopes can be sorted by timestamp in tests
let nanos = (self.cursor.originator_id as i64 * 1_000_000_000)
+ (self.cursor.sequence_id as i64 * 1000);
chrono::DateTime::from_timestamp_nanos(nanos).into()
}

fn client_envelope(&self) -> Result<ClientEnvelope, crate::protocol::EnvelopeError> {
Expand Down
2 changes: 2 additions & 0 deletions xmtp_proto/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ impl<T> XmtpApi for T where T: XmtpMlsClient + XmtpIdentityClient + ?Sized {}

/// Trait which for protobuf-generated type
/// which can be paged.
/// Paged implementation indicates a response
/// that returns a collection of envelopes
pub trait Paged: MaybeSend + MaybeSync {
type Message: MaybeSend + MaybeSync;
fn info(&self) -> &Option<PagingInfo>;
Expand Down
2 changes: 1 addition & 1 deletion xmtp_proto/src/traits/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub(super) async fn request<C: Client>(
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl<Q, C> Query<C> for Q
where
Q: QueryRaw<C> + Endpoint,
Q: Endpoint,
C: Client,
<Q as Endpoint>::Output: Default + prost::Message + 'static,
{
Expand Down
Loading