diff --git a/common/src/retry.rs b/common/src/retry.rs index 0ed448e196..fab5a2111f 100644 --- a/common/src/retry.rs +++ b/common/src/retry.rs @@ -191,8 +191,8 @@ impl ExponentialBackoffBuilder { duration: self.duration.unwrap_or(Duration::from_millis(25)), max_jitter: self.max_jitter.unwrap_or(Duration::from_millis(25)), multiplier: self.multiplier.unwrap_or(3), - total_wait_max: self.total_wait_max.unwrap_or_default(), - individual_wait_max: Default::default(), + total_wait_max: self.total_wait_max.unwrap_or(Duration::from_secs(120)), + individual_wait_max: Duration::from_secs(30), } } } diff --git a/nix/libxmtp.nix b/nix/libxmtp.nix index 0055876529..4a908c0a53 100644 --- a/nix/libxmtp.nix +++ b/nix/libxmtp.nix @@ -40,6 +40,9 @@ , xmtp , omnix , toxiproxy +, rr +, vscode-extensions +, lldb , ... }: let @@ -95,6 +98,9 @@ mkShell { binaryen wasm-pack binaryen + rr + vscode-extensions.vadimcn.vscode-lldb + lldb # Protobuf buf diff --git a/xmtp_api_d14n/proptest-regressions/protocol/order.txt b/xmtp_api_d14n/proptest-regressions/protocol/order.txt new file mode 100644 index 0000000000..46ea91aa90 --- /dev/null +++ b/xmtp_api_d14n/proptest-regressions/protocol/order.txt @@ -0,0 +1,9 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc c3efa58ec48f8da2087da00734e7b8477b98ad2f1519988add8e5f840964525f # shrinks to env_with_missing = EnvelopesWithMissing { removed: [TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 20 }, depends_on: GlobalCursor { inner: {20: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 40 }, depends_on: GlobalCursor { inner: {20: 2, 10: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 1, 20: 1, 10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 2, 10: 2, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 20 }, depends_on: GlobalCursor { inner: {40: 3, 20: 2, 10: 6, 30: 2} } }], envelopes: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 20 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 10 }, depends_on: GlobalCursor { inner: {20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 1, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 3, 20: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 30 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 3, 30: 1, 40: 2, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 10 }, depends_on: GlobalCursor { inner: {30: 1, 20: 2, 10: 4, 40: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 30 }, depends_on: GlobalCursor { inner: {10: 5, 40: 3, 30: 1, 20: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 6, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 5, 20: 2, 40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 20 }, depends_on: GlobalCursor { inner: {10: 6, 20: 3, 30: 2, 40: 3} } }] } +cc 4cb58b61e81291b31590cc6ca29df9cf8737a6f2fcc33c20b6021fa991355a31 # shrinks to env_with_missing = EnvelopesWithMissing { removed: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 40 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 2, 40: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 3, 40: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 20 }, depends_on: GlobalCursor { inner: {10: 3, 20: 2, 40: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 20 }, depends_on: GlobalCursor { inner: {20: 3} } }], envelopes: [TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 2, 10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 20 }, depends_on: GlobalCursor { inner: {40: 4, 10: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 30 }, depends_on: GlobalCursor { inner: {20: 1, 40: 4, 10: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 20 }, depends_on: GlobalCursor { inner: {20: 1, 10: 3, 40: 4, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 3, 20: 1, 40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 10 }, depends_on: GlobalCursor { inner: {20: 3, 10: 4, 40: 4} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 20 }, depends_on: GlobalCursor { inner: {30: 1, 20: 4, 10: 5, 40: 2} } }] } +cc 0ab5e000585326d1342cb92385d095a09cb4fc3c9af67b5e16807bddf0017f75 # shrinks to env_with_missing = EnvelopesWithMissing { removed: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 10 }, depends_on: GlobalCursor { inner: {} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 40 }, depends_on: GlobalCursor { inner: {40: 1, 10: 2, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 30 }, depends_on: GlobalCursor { inner: {10: 2, 40: 2, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 10 }, depends_on: GlobalCursor { inner: {10: 3, 30: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 30 }, depends_on: GlobalCursor { inner: {20: 1, 10: 4, 30: 3, 40: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 6, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 3, 20: 1, 10: 5, 30: 4} } }, TestEnvelope { cursor: Cursor { sequence_id: 4, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 4, 30: 2, 40: 3, 20: 1} } }], envelopes: [TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 40 }, depends_on: GlobalCursor { inner: {10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 30 }, depends_on: GlobalCursor { inner: {40: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 2, originator_id: 10 }, depends_on: GlobalCursor { inner: {40: 1, 30: 1, 10: 1} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 30 }, depends_on: GlobalCursor { inner: {30: 2} } }, TestEnvelope { cursor: Cursor { sequence_id: 1, originator_id: 20 }, depends_on: GlobalCursor { inner: {40: 2, 10: 4, 30: 3} } }, TestEnvelope { cursor: Cursor { sequence_id: 5, originator_id: 10 }, depends_on: GlobalCursor { inner: {20: 1, 40: 2, 10: 4, 30: 4} } }, TestEnvelope { cursor: Cursor { sequence_id: 3, originator_id: 40 }, depends_on: GlobalCursor { inner: {20: 1, 40: 2, 30: 3} } }] } diff --git a/xmtp_api_d14n/src/protocol/impls/vector_clock.rs b/xmtp_api_d14n/src/protocol/impls/vector_clock.rs index a66ecfd151..93912681d0 100644 --- a/xmtp_api_d14n/src/protocol/impls/vector_clock.rs +++ b/xmtp_api_d14n/src/protocol/impls/vector_clock.rs @@ -70,3 +70,16 @@ impl VectorClock for GlobalCursor { *entry = (*entry).max(*seq) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[xmtp_common::test] + fn dominates_empty() { + let empty = GlobalCursor::default(); + let mut not_empty = GlobalCursor::default(); + not_empty.insert(1, 1); + assert!(not_empty.dominates(&empty)); + } +} diff --git a/xmtp_api_d14n/src/protocol/order.rs b/xmtp_api_d14n/src/protocol/order.rs index d89280f655..48e9da4b22 100644 --- a/xmtp_api_d14n/src/protocol/order.rs +++ b/xmtp_api_d14n/src/protocol/order.rs @@ -62,7 +62,7 @@ where .flatten_ok() .collect::, EnvelopeError>>()?; let Resolved { - envelopes: resolved, + resolved, unresolved, } = resolver.resolve(cursors).await?; if resolved.is_empty() { @@ -86,10 +86,11 @@ where } } let mut i = 0; + // or, retain all resolved envelopes missing.retain(|_m| { - let could_not_resolve = to_remove.contains(&i); + let resolved = to_remove.contains(&i); i += 1; - !could_not_resolve + !resolved }); } // apply missing before resolved, so that the resolved @@ -103,4 +104,140 @@ where } } -// TODO: tests +#[cfg(test)] +mod test { + use super::*; + use crate::protocol::utils::test::{EnvelopesWithMissing, TestEnvelope, missing_dependencies}; + use futures::FutureExt; + use proptest::{prelude::*, sample::subsequence}; + use xmtp_proto::types::OriginatorId; + + // Simple mock resolver that holds available envelopes to resolve + #[derive(Clone, Debug)] + struct MockResolver { + available: Vec, + unavailable: Vec, + } + + #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] + impl ResolveDependencies for MockResolver { + type ResolvedEnvelope = TestEnvelope; + + async fn resolve( + &self, + missing: HashSet, + ) -> Result, ResolutionError> { + // Return envelopes that match the missing set + let resolved = self + .available + .iter() + .filter(|env| { + let cursor = env.cursor().unwrap(); + let topic = env.topic().unwrap(); + missing.contains(&MissingEnvelope::new(topic, cursor)) + }) + .cloned() + .collect::>(); + + Ok(Resolved::new(resolved, None)) + } + } + + prop_compose! { + pub fn resolvable_dependencies(length: usize, originators: Vec) + (envelopes in missing_dependencies(length, originators)) + (available in subsequence(envelopes.removed.clone(), envelopes.removed.len()), envelopes in Just(envelopes)) + -> EnvelopesWithResolver { + let mut unavailable = envelopes.removed.clone(); + unavailable.retain(|e| !available.contains(e)); + EnvelopesWithResolver { + missing: envelopes, + resolver: MockResolver { + available, + unavailable + } + } + } + } + + #[derive(Debug, Clone)] + struct EnvelopesWithResolver { + missing: EnvelopesWithMissing, + resolver: MockResolver, + } + proptest! { + #[xmtp_common::test] + fn orders_with_unresolvable_dependencies( + envelopes in resolvable_dependencies(30, vec![10, 20, 30, 40, 50, 60]) + ) { + let EnvelopesWithResolver { + missing, + resolver + } = envelopes; + + let (available, unavailable) = (resolver.available.clone(), resolver.unavailable.clone()); + let mut ordered = Ordered::builder() + .envelopes(missing.envelopes) + .resolver(resolver) + .topic_cursor(TopicCursor::default()) + .build() + .unwrap(); + + // Perform ordering - some dependencies cannot be resolved + ordered.order().now_or_never() + .expect("Future should complete immediately") + .unwrap(); + + let (result, mut topic_cursor) = ordered.into_parts(); + + // Check that no envelope in the result depends on an unavailable removed envelope + for envelope in &result { + let depends_on = envelope.depends_on().unwrap().unwrap_or_default(); + let topic = envelope.topic().unwrap(); + let topic_clock = topic_cursor.get_or_default(&topic); + + // If this envelope's dependencies are satisfied by the topic cursor, + // it should not depend on any unavailable envelopes + if topic_clock.dominates(&depends_on) { + for unavailable_env in &unavailable { + prop_assert!( + !envelope.has_dependency_on(unavailable_env), + "Envelope with satisfied dependencies should not depend on unavailable envelope. \ + Envelope: {}, Unavailable: {}", + envelope, + unavailable_env + ); + } + } else { + panic!("topic clock should always be complete at conclusion of ordering. {} does not dominate envelope {} depending on {}", topic_clock, envelope.cursor, depends_on); + } + } + + // Verify that envelopes which were made available are in the result + // (unless they themselves depend on unavailable envelopes) + for available_env in &available { + // + if available_env.has_dependency_on_any(&unavailable) { continue; } + // none of the envelopes have a dependency on this one, so resolver wont care + if result.iter().all(|e| !e.has_dependency_on(available_env)) { continue; } + prop_assert!( + result.iter().any(|e| e == available_env), + "Result does not contain {}", available_env + ); + // If it's in the result, verify its dependencies are satisfied + let depends_on = available_env.depends_on().unwrap().unwrap_or_default(); + let topic = available_env.topic().unwrap(); + let topic_clock = topic_cursor.get_or_default(&topic); + + prop_assert!( + topic_clock.dominates(&depends_on), + "Available envelope in result should have satisfied dependencies. \ + Envelope: {}, Topic clock: {}", + available_env, + topic_clock + ); + } + } + } +} diff --git a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs index bca90ee4ae..d4c599a790 100644 --- a/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs +++ b/xmtp_api_d14n/src/protocol/resolve/network_backoff.rs @@ -80,7 +80,7 @@ where resolved.extend(envelopes); } Ok(Resolved { - envelopes: resolved, + resolved, unresolved: (!missing.is_empty()).then_some(missing), }) } diff --git a/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs b/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs index e28836ebfa..c0c72ff19f 100644 --- a/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs +++ b/xmtp_api_d14n/src/protocol/traits/dependency_resolution.rs @@ -7,7 +7,7 @@ use xmtp_proto::api::BodyError; use crate::protocol::{Envelope, EnvelopeError, types::MissingEnvelope}; pub struct Resolved { - pub envelopes: Vec, + pub resolved: Vec, /// list of envelopes that could not be resolved with this strategy pub unresolved: Option>, } @@ -15,7 +15,7 @@ pub struct Resolved { impl Resolved { pub fn new(envelopes: Vec, unresolved: Option>) -> Self { Self { - envelopes, + resolved: envelopes, unresolved, } } @@ -60,7 +60,7 @@ impl ResolveDependencies for NoopResolver { type ResolvedEnvelope = (); async fn resolve(&self, m: HashSet) -> Result, ResolutionError> { Ok(Resolved { - envelopes: vec![], + resolved: vec![], unresolved: Some(m), }) } diff --git a/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs b/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs index 91caa2e4ef..9be374fabb 100644 --- a/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs +++ b/xmtp_api_d14n/src/protocol/utils/test/dependency_resolution_test.rs @@ -32,7 +32,7 @@ pub async fn test_resolve_all_found_immediately( assert!(result.is_ok(), "Resolution should succeed"); let resolved = result.unwrap(); assert_eq!( - resolved.envelopes.len(), + resolved.resolved.len(), expected_count, "Should resolve exactly {} envelopes", expected_count @@ -76,7 +76,7 @@ pub async fn test_resolve_partial_resolution( let resolved = result.unwrap(); assert_eq!( - resolved.envelopes.len(), + resolved.resolved.len(), expected_resolved_count, "Should resolve exactly {} envelopes", expected_resolved_count @@ -127,7 +127,7 @@ where assert!(result.is_ok(), "Resolution should succeed with empty set"); let resolved = result.unwrap(); - assert!(resolved.envelopes.is_empty(), "Should resolve no envelopes"); + assert!(resolved.resolved.is_empty(), "Should resolve no envelopes"); assert!( resolved.unresolved.is_none() || resolved.unresolved.as_ref().unwrap().is_empty(), "Should have no unresolved envelopes" diff --git a/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs b/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs index f7cc503a50..3d71f0e600 100644 --- a/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs +++ b/xmtp_api_d14n/src/protocol/utils/test/test_envelope.rs @@ -20,6 +20,10 @@ impl TestEnvelope { let depends_on_sid = self.depends_on.get(&originator); depends_on_sid == other.cursor.sequence_id } + + pub fn has_dependency_on_any(&self, other: &[TestEnvelope]) -> bool { + other.iter().any(|e| self.has_dependency_on(e)) + } } impl std::fmt::Display for TestEnvelope { @@ -38,7 +42,11 @@ impl Envelope<'_> for TestEnvelope { } fn timestamp(&self) -> Option> { - unreachable!() + // Create a deterministic timestamp based on originator_id and sequence_id + // This ensures envelopes can be sorted by timestamp in tests + let nanos = (self.cursor.originator_id as i64 * 1_000_000_000) + + (self.cursor.sequence_id as i64 * 1000); + chrono::DateTime::from_timestamp_nanos(nanos).into() } fn client_envelope(&self) -> Result { diff --git a/xmtp_proto/src/api_client.rs b/xmtp_proto/src/api_client.rs index 55ddb52b01..74f914cd77 100644 --- a/xmtp_proto/src/api_client.rs +++ b/xmtp_proto/src/api_client.rs @@ -87,6 +87,8 @@ impl XmtpApi for T where T: XmtpMlsClient + XmtpIdentityClient + ?Sized {} /// Trait which for protobuf-generated type /// which can be paged. +/// Paged implementation indicates a response +/// that returns a collection of envelopes pub trait Paged: MaybeSend + MaybeSync { type Message: MaybeSend + MaybeSync; fn info(&self) -> &Option; diff --git a/xmtp_proto/src/traits/query.rs b/xmtp_proto/src/traits/query.rs index e48acd80c3..6bc4b73c48 100644 --- a/xmtp_proto/src/traits/query.rs +++ b/xmtp_proto/src/traits/query.rs @@ -27,7 +27,7 @@ pub(super) async fn request( #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl Query for Q where - Q: QueryRaw + Endpoint, + Q: Endpoint, C: Client, ::Output: Default + prost::Message + 'static, {