diff --git a/bindings_wasm/src/content_types/text.rs b/bindings_wasm/src/content_types/text.rs index d76ec54945..aef5674dac 100644 --- a/bindings_wasm/src/content_types/text.rs +++ b/bindings_wasm/src/content_types/text.rs @@ -19,7 +19,7 @@ impl From for TextContent { } } -#[wasm_bindgen(js_name = "encodeText")] +#[wasm_bindgen(js_name = "encodeXmtpText")] pub fn encode_text(text: String) -> Result { // Use TextCodec to encode the text let encoded = TextCodec::encode(text).map_err(|e| JsError::new(&format!("{}", e)))?; @@ -33,7 +33,10 @@ pub fn encode_text(text: String) -> Result { Ok(Uint8Array::from(buf.as_slice())) } -#[wasm_bindgen(js_name = "decodeText")] +// `decode` conflicts with some wasm function in bindgen/tests/somewhere +// breaking `bindings_wasm` tests +// PR: https://github.com/xmtp/libxmtp/pull/2863 +#[wasm_bindgen(js_name = "decodeXmtpText")] pub fn decode_text(bytes: Uint8Array) -> Result { // Decode bytes into EncodedContent let encoded_content = EncodedContent::decode(bytes.to_vec().as_slice()) diff --git a/nix/libxmtp.nix b/nix/libxmtp.nix index e096404069..4fdc867545 100644 --- a/nix/libxmtp.nix +++ b/nix/libxmtp.nix @@ -42,6 +42,7 @@ , vscode-extensions , lldb , wasm-tools +, rr , ... }: let @@ -117,5 +118,5 @@ mkShell { ++ lib.optionals isDarwin [ darwin.cctools ] - ++ lib.optionals isLinux [ cargo-llvm-cov ]; + ++ lib.optionals isLinux [ cargo-llvm-cov rr ]; } diff --git a/xmtp_db/Cargo.toml b/xmtp_db/Cargo.toml index 6bcf60a733..09018274f4 100644 --- a/xmtp_db/Cargo.toml +++ b/xmtp_db/Cargo.toml @@ -75,6 +75,7 @@ wasm-bindgen = { workspace = true } [dev-dependencies] +futures.workspace = true futures-timer.workspace = true mockall = { workspace = true } rstest.workspace = true diff --git a/xmtp_db/src/encrypted_store/group_intent.rs b/xmtp_db/src/encrypted_store/group_intent.rs index 75c6b82105..951f2d8c3e 100644 --- a/xmtp_db/src/encrypted_store/group_intent.rs +++ b/xmtp_db/src/encrypted_store/group_intent.rs @@ -1,12 +1,16 @@ +use std::collections::HashMap; + use derive_builder::Builder; use diesel::{ backend::Backend, + connection::DefaultLoadingMode, deserialize::{self, FromSql, FromSqlRow}, expression::AsExpression, prelude::*, serialize::{self, IsNull, Output, ToSql}, sql_types::Integer, }; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use xmtp_common::fmt; use xmtp_proto::types::Cursor; @@ -20,6 +24,12 @@ use super::{ use crate::{ Delete, NotFound, StorageError, group_message::QueryGroupMessage, impl_fetch, impl_store, }; + +mod error; +mod types; +pub use error::*; +pub use types::*; + pub type ID = i32; #[repr(i32)] @@ -210,6 +220,12 @@ pub trait QueryGroupIntent { payload_hash: &[u8], ) -> Result, StorageError>; + /// find the commit message refresh state for each intent payload hash + fn find_dependant_commits>( + &self, + payload_hashes: &[P], + ) -> Result, StorageError>; + fn increment_intent_publish_attempt_count(&self, intent_id: ID) -> Result<(), StorageError>; fn set_group_intent_error_and_fail_msg( @@ -283,6 +299,13 @@ where (**self).find_group_intent_by_payload_hash(payload_hash) } + fn find_dependant_commits>( + &self, + payload_hashes: &[P], + ) -> Result, StorageError> { + (**self).find_dependant_commits(payload_hashes) + } + fn increment_intent_publish_attempt_count(&self, intent_id: ID) -> Result<(), StorageError> { (**self).increment_intent_publish_attempt_count(intent_id) } @@ -485,6 +508,75 @@ impl QueryGroupIntent for DbConnection { Ok(result) } + /// Find the commit message refresh state for each intent by payload hash. + /// Returns a map from payload hash to a vector of dependencies (one per originator). + fn find_dependant_commits>( + &self, + payload_hashes: &[P], + ) -> Result, StorageError> { + use super::schema::refresh_state; + use crate::encrypted_store::refresh_state::EntityKind; + + let hashes = payload_hashes + .iter() + .map(|h| PayloadHashRef::from(h.as_ref())); + + // Query all dependencies in a single database call + let map: HashMap> = self.raw_query_read(|conn| { + dsl::group_intents + .filter(dsl::payload_hash.eq_any(hashes)) + .inner_join( + refresh_state::table.on(refresh_state::entity_id + .eq(dsl::group_id) + .and(refresh_state::entity_kind.eq(EntityKind::CommitMessage))), + ) + .select(( + dsl::payload_hash.assume_not_null(), + refresh_state::sequence_id, + refresh_state::originator_id, + dsl::group_id, + )) + .load_iter::<(Vec, i64, i32, Vec), DefaultLoadingMode>(conn)? + .map_ok(|(hash, sequence_id, originator_id, group_id)| { + ( + PayloadHash::from(hash), + IntentDependency { + cursor: Cursor { + sequence_id: sequence_id as u64, + originator_id: originator_id as u32, + }, + group_id: group_id.into(), + }, + ) + }) + .process_results(|iter| iter.into_grouping_map().collect()) + })?; + + let map = map + .into_iter() + .map(|(hash, mut d)| { + if d.len() > 1 { + return Err(GroupIntentError::MoreThanOneDependency { + payload_hash: hash.clone(), + cursors: d.iter().map(|d| d.cursor).collect(), + group_id: d[0].group_id.clone(), + } + .into()); + } + + // this should be impossible since the sql query wouldnt return anything for + // an empty payload hash. + let dep = d + .pop() + .ok_or_else(|| GroupIntentError::NoDependencyFound { hash: hash.clone() }) + .map_err(StorageError::from)?; + Ok::<_, StorageError>((hash, dep)) + }) + .try_collect()?; + + Ok(map) + } + fn increment_intent_publish_attempt_count(&self, intent_id: ID) -> Result<(), StorageError> { self.raw_query_write(|conn| { diesel::update(dsl::group_intents) @@ -922,4 +1014,73 @@ pub(crate) mod tests { assert_eq!(intent.publish_attempts, 2); }) } + + #[xmtp_common::test] + fn test_find_dependant_commits() { + use crate::encrypted_store::refresh_state::{EntityKind, QueryRefreshState}; + + let group_id = rand_vec::<24>(); + let payload_hash1 = rand_vec::<24>(); + let payload_hash2 = rand_vec::<24>(); + + with_connection(|conn| { + insert_group(conn, group_id.clone()); + NewGroupIntent::new( + IntentKind::SendMessage, + group_id.clone(), + rand_vec::<24>(), + false, + ) + .store(conn) + .unwrap(); + + let intent1 = find_first_intent(conn, group_id.clone()); + conn.set_group_intent_published(intent1.id, &payload_hash1, None, None, 1) + .unwrap(); + + NewGroupIntent::new( + IntentKind::KeyUpdate, + group_id.clone(), + rand_vec::<24>(), + false, + ) + .store(conn) + .unwrap(); + let intents = conn + .find_group_intents(group_id.clone(), None, None) + .unwrap(); + let intent2 = intents.iter().find(|i| i.id != intent1.id).unwrap(); + conn.set_group_intent_published(intent2.id, &payload_hash2, None, None, 1) + .unwrap(); + + conn.update_cursor( + group_id.clone(), + EntityKind::CommitMessage, + Cursor { + sequence_id: 100, + originator_id: 42, + }, + ) + .unwrap(); + + let result = conn + .find_dependant_commits(&[&payload_hash1, &payload_hash2]) + .unwrap(); + + assert_eq!(result.len(), 2); + let dep1 = result + .get(&PayloadHash::from(payload_hash1.clone())) + .unwrap(); + assert_eq!(dep1.cursor.sequence_id, 100); + assert_eq!(dep1.cursor.originator_id, 42); + assert_eq!(dep1.group_id.as_ref(), &group_id); + + let dep2 = result + .get(&PayloadHash::from(payload_hash2.clone())) + .unwrap(); + assert_eq!(dep2.cursor.sequence_id, 100); + assert_eq!(dep2.cursor.originator_id, 42); + assert_eq!(dep2.group_id.as_ref(), &group_id); + }) + } } diff --git a/xmtp_db/src/encrypted_store/group_intent/error.rs b/xmtp_db/src/encrypted_store/group_intent/error.rs new file mode 100644 index 0000000000..0d20541ee0 --- /dev/null +++ b/xmtp_db/src/encrypted_store/group_intent/error.rs @@ -0,0 +1,30 @@ +use thiserror::Error; +use xmtp_common::RetryableError; +use xmtp_proto::types::{CursorList, GroupId}; + +use crate::group_intent::PayloadHash; + +#[derive(Debug, Error)] +pub enum GroupIntentError { + #[error( + "intent {} for group {group_id} has invalid dependencies={}. one message cannot have more than 1 dependency in same epoch", + hex::encode(payload_hash), + cursors + )] + MoreThanOneDependency { + payload_hash: PayloadHash, + cursors: CursorList, + group_id: GroupId, + }, + #[error("intent with hash {hash} has no known dependencies")] + NoDependencyFound { hash: PayloadHash }, +} + +impl RetryableError for GroupIntentError { + fn is_retryable(&self) -> bool { + match self { + Self::MoreThanOneDependency { .. } => true, + Self::NoDependencyFound { .. } => true, + } + } +} diff --git a/xmtp_db/src/encrypted_store/group_intent/types.rs b/xmtp_db/src/encrypted_store/group_intent/types.rs new file mode 100644 index 0000000000..e33c4f4491 --- /dev/null +++ b/xmtp_db/src/encrypted_store/group_intent/types.rs @@ -0,0 +1,72 @@ +use std::borrow::Cow; +use std::fmt::Debug; +use std::fmt::Display; +use std::ops::Deref; + +use diesel::expression::AsExpression; +use diesel::serialize; +use diesel::serialize::Output; +use diesel::serialize::ToSql; +use diesel::sql_types::Binary; +use diesel::sqlite::Sqlite; +use xmtp_proto::types::Cursor; +use xmtp_proto::types::GroupId; + +#[derive(Debug, PartialEq, Clone)] +pub struct IntentDependency { + pub cursor: Cursor, + pub group_id: GroupId, +} + +pub type PayloadHash = PayloadHashRef<'static>; + +#[derive(Hash, Clone, Eq, PartialEq, AsExpression)] +#[diesel(sql_type = Binary)] +pub struct PayloadHashRef<'a>(Cow<'a, [u8]>); + +impl Deref for PayloadHash { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsRef for PayloadHash +where + T: ?Sized, + ::Target: AsRef, +{ + fn as_ref(&self) -> &T { + self.deref().as_ref() + } +} + +impl ToSql for PayloadHashRef<'_> { + fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result { + as ToSql>::to_sql(&self.0, out) + } +} + +impl<'a> Debug for PayloadHashRef<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(&self.0)) + } +} + +impl<'a> Display for PayloadHashRef<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", hex::encode(&self.0)) + } +} + +impl From> for PayloadHash { + fn from(value: Vec) -> PayloadHash { + PayloadHashRef(Cow::from(value)) + } +} + +impl<'a> From<&'a [u8]> for PayloadHashRef<'a> { + fn from(value: &'a [u8]) -> Self { + PayloadHashRef(Cow::from(value)) + } +} diff --git a/xmtp_db/src/errors.rs b/xmtp_db/src/errors.rs index 55449ab35c..4d0ed4cb62 100644 --- a/xmtp_db/src/errors.rs +++ b/xmtp_db/src/errors.rs @@ -1,6 +1,8 @@ use diesel::result::DatabaseErrorKind; use thiserror::Error; +use crate::group_intent::GroupIntentError; + use super::{ refresh_state::EntityKind, sql_key_store::{self, SqlKeyStoreError}, @@ -42,6 +44,8 @@ pub enum StorageError { Connection(#[from] crate::ConnectionError), #[error("HMAC key must be 42 bytes")] InvalidHmacLength, + #[error(transparent)] + GroupIntent(#[from] GroupIntentError), } impl From for StorageError { @@ -151,6 +155,7 @@ impl RetryableError for StorageError { Self::OpenMlsStorage(storage) => retryable!(storage), Self::Platform(p) => retryable!(p), Self::Connection(e) => retryable!(e), + Self::GroupIntent(e) => retryable!(e), Self::MigrationError(_) | Self::Conversion(_) | Self::NotFound(_) diff --git a/xmtp_db/src/mock.rs b/xmtp_db/src/mock.rs index 64d58c7c36..0b36428f0f 100644 --- a/xmtp_db/src/mock.rs +++ b/xmtp_db/src/mock.rs @@ -320,6 +320,12 @@ mock! { payload_hash: &[u8], ) -> Result, StorageError>; + #[mockall::concretize] + fn find_dependant_commits>( + &self, + payload_hashes: &[P], + ) -> Result, StorageError>; + fn increment_intent_publish_attempt_count( &self, intent_id: crate::group_intent::ID, diff --git a/xmtp_proto/src/types.rs b/xmtp_proto/src/types.rs index df885e9e5d..8e4bc88510 100644 --- a/xmtp_proto/src/types.rs +++ b/xmtp_proto/src/types.rs @@ -2,6 +2,7 @@ mod api_identifier; mod app_version; mod cursor; +mod cursor_list; mod global_cursor; mod group_message; mod ids; @@ -12,6 +13,7 @@ mod welcome_message; pub use api_identifier::*; pub use app_version::*; pub use cursor::*; +pub use cursor_list::*; pub use global_cursor::*; pub use group_message::*; pub use ids::*; diff --git a/xmtp_proto/src/types/cursor_list.rs b/xmtp_proto/src/types/cursor_list.rs new file mode 100644 index 0000000000..3c1c02de08 --- /dev/null +++ b/xmtp_proto/src/types/cursor_list.rs @@ -0,0 +1,47 @@ +//! A list of cursors +//! this is mostly for readable the Display implementation + +use std::ops::Deref; + +use crate::types::Cursor; + +/// A owned list of [`Cursor`] +#[derive(Debug, Clone)] +pub struct CursorList { + inner: Vec, +} + +impl CursorList { + pub fn new(cursors: Vec) -> Self { + Self { inner: cursors } + } +} + +impl From> for CursorList { + fn from(value: Vec) -> CursorList { + CursorList { inner: value } + } +} + +impl Deref for CursorList { + type Target = [Cursor]; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::fmt::Display for CursorList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for c in &self.inner { + write!(f, "{}", c)?; + } + Ok(()) + } +} + +impl FromIterator for CursorList { + fn from_iter>(iter: T) -> Self { + CursorList::new(Vec::from_iter(iter)) + } +}