Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions bindings_wasm/src/content_types/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl From<xmtp_mls::messages::decoded_message::Text> for TextContent {
}
}

#[wasm_bindgen(js_name = "encodeText")]
#[wasm_bindgen(js_name = "encodeXmtpText")]
pub fn encode_text(text: String) -> Result<Uint8Array, JsError> {
// Use TextCodec to encode the text
let encoded = TextCodec::encode(text).map_err(|e| JsError::new(&format!("{}", e)))?;
Expand All @@ -33,7 +33,10 @@ pub fn encode_text(text: String) -> Result<Uint8Array, JsError> {
Ok(Uint8Array::from(buf.as_slice()))
}

#[wasm_bindgen(js_name = "decodeText")]
// `decode` conflicts with some wasm function in bindgen/tests/somewhere
// breaking `bindings_wasm` tests
// PR: https://github.com/xmtp/libxmtp/pull/2863
#[wasm_bindgen(js_name = "decodeXmtpText")]
pub fn decode_text(bytes: Uint8Array) -> Result<String, JsError> {
// Decode bytes into EncodedContent
let encoded_content = EncodedContent::decode(bytes.to_vec().as_slice())
Expand Down
3 changes: 2 additions & 1 deletion nix/libxmtp.nix
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
, vscode-extensions
, lldb
, wasm-tools
, rr
, ...
}:
let
Expand Down Expand Up @@ -117,5 +118,5 @@ mkShell {
++ lib.optionals isDarwin [
darwin.cctools
]
++ lib.optionals isLinux [ cargo-llvm-cov ];
++ lib.optionals isLinux [ cargo-llvm-cov rr ];
}
1 change: 1 addition & 0 deletions xmtp_db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ wasm-bindgen = { workspace = true }


[dev-dependencies]
futures.workspace = true
futures-timer.workspace = true
mockall = { workspace = true }
rstest.workspace = true
Expand Down
161 changes: 161 additions & 0 deletions xmtp_db/src/encrypted_store/group_intent.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::collections::HashMap;

use derive_builder::Builder;
use diesel::{
backend::Backend,
connection::DefaultLoadingMode,
deserialize::{self, FromSql, FromSqlRow},
expression::AsExpression,
prelude::*,
serialize::{self, IsNull, Output, ToSql},
sql_types::Integer,
};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use xmtp_common::fmt;
use xmtp_proto::types::Cursor;
Expand All @@ -20,6 +24,12 @@ use super::{
use crate::{
Delete, NotFound, StorageError, group_message::QueryGroupMessage, impl_fetch, impl_store,
};

mod error;
mod types;
pub use error::*;
pub use types::*;

pub type ID = i32;

#[repr(i32)]
Expand Down Expand Up @@ -210,6 +220,12 @@ pub trait QueryGroupIntent {
payload_hash: &[u8],
) -> Result<Option<StoredGroupIntent>, StorageError>;

/// find the commit message refresh state for each intent payload hash
fn find_dependant_commits<P: AsRef<[u8]>>(
&self,
payload_hashes: &[P],
) -> Result<HashMap<PayloadHash, IntentDependency>, StorageError>;

fn increment_intent_publish_attempt_count(&self, intent_id: ID) -> Result<(), StorageError>;

fn set_group_intent_error_and_fail_msg(
Expand Down Expand Up @@ -283,6 +299,13 @@ where
(**self).find_group_intent_by_payload_hash(payload_hash)
}

fn find_dependant_commits<P: AsRef<[u8]>>(
&self,
payload_hashes: &[P],
) -> Result<HashMap<PayloadHash, IntentDependency>, StorageError> {
(**self).find_dependant_commits(payload_hashes)
}

fn increment_intent_publish_attempt_count(&self, intent_id: ID) -> Result<(), StorageError> {
(**self).increment_intent_publish_attempt_count(intent_id)
}
Expand Down Expand Up @@ -485,6 +508,75 @@ impl<C: ConnectionExt> QueryGroupIntent for DbConnection<C> {
Ok(result)
}

/// Find the commit message refresh state for each intent by payload hash.
/// Returns a map from payload hash to a vector of dependencies (one per originator).
fn find_dependant_commits<P: AsRef<[u8]>>(
&self,
payload_hashes: &[P],
) -> Result<HashMap<PayloadHash, IntentDependency>, StorageError> {
use super::schema::refresh_state;
use crate::encrypted_store::refresh_state::EntityKind;

let hashes = payload_hashes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty payload_hashes leads to eq_any([])IN () on SQLite, causing a runtime error. Consider returning an empty map early when payload_hashes is empty.

+         if payload_hashes.is_empty() {
+             return Ok(HashMap::new());
+         }

🚀 Reply to ask Macroscope to explain or update this suggestion.

👍 Helpful? React to give us feedback.

.iter()
.map(|h| PayloadHashRef::from(h.as_ref()));

// Query all dependencies in a single database call
let map: HashMap<PayloadHash, Vec<IntentDependency>> = self.raw_query_read(|conn| {
dsl::group_intents
.filter(dsl::payload_hash.eq_any(hashes))
.inner_join(
refresh_state::table.on(refresh_state::entity_id
.eq(dsl::group_id)
.and(refresh_state::entity_kind.eq(EntityKind::CommitMessage))),
)
.select((
dsl::payload_hash.assume_not_null(),
refresh_state::sequence_id,
refresh_state::originator_id,
dsl::group_id,
))
.load_iter::<(Vec<u8>, i64, i32, Vec<u8>), DefaultLoadingMode>(conn)?
.map_ok(|(hash, sequence_id, originator_id, group_id)| {
(
PayloadHash::from(hash),
IntentDependency {
cursor: Cursor {
sequence_id: sequence_id as u64,
originator_id: originator_id as u32,
},
group_id: group_id.into(),
},
)
})
.process_results(|iter| iter.into_grouping_map().collect())
})?;

let map = map
.into_iter()
.map(|(hash, mut d)| {
if d.len() > 1 {
return Err(GroupIntentError::MoreThanOneDependency {
payload_hash: hash.clone(),
cursors: d.iter().map(|d| d.cursor).collect(),
group_id: d[0].group_id.clone(),
}
.into());
}

// this should be impossible since the sql query wouldnt return anything for
// an empty payload hash.
let dep = d
.pop()
.ok_or_else(|| GroupIntentError::NoDependencyFound { hash: hash.clone() })
.map_err(StorageError::from)?;
Ok::<_, StorageError>((hash, dep))
})
.try_collect()?;

Ok(map)
}

fn increment_intent_publish_attempt_count(&self, intent_id: ID) -> Result<(), StorageError> {
self.raw_query_write(|conn| {
diesel::update(dsl::group_intents)
Expand Down Expand Up @@ -922,4 +1014,73 @@ pub(crate) mod tests {
assert_eq!(intent.publish_attempts, 2);
})
}

#[xmtp_common::test]
fn test_find_dependant_commits() {
use crate::encrypted_store::refresh_state::{EntityKind, QueryRefreshState};

let group_id = rand_vec::<24>();
let payload_hash1 = rand_vec::<24>();
let payload_hash2 = rand_vec::<24>();

with_connection(|conn| {
insert_group(conn, group_id.clone());
NewGroupIntent::new(
IntentKind::SendMessage,
group_id.clone(),
rand_vec::<24>(),
false,
)
.store(conn)
.unwrap();

let intent1 = find_first_intent(conn, group_id.clone());
conn.set_group_intent_published(intent1.id, &payload_hash1, None, None, 1)
.unwrap();

NewGroupIntent::new(
IntentKind::KeyUpdate,
group_id.clone(),
rand_vec::<24>(),
false,
)
.store(conn)
.unwrap();
let intents = conn
.find_group_intents(group_id.clone(), None, None)
.unwrap();
let intent2 = intents.iter().find(|i| i.id != intent1.id).unwrap();
conn.set_group_intent_published(intent2.id, &payload_hash2, None, None, 1)
.unwrap();

conn.update_cursor(
group_id.clone(),
EntityKind::CommitMessage,
Cursor {
sequence_id: 100,
originator_id: 42,
},
)
.unwrap();

let result = conn
.find_dependant_commits(&[&payload_hash1, &payload_hash2])
.unwrap();

assert_eq!(result.len(), 2);
let dep1 = result
.get(&PayloadHash::from(payload_hash1.clone()))
.unwrap();
assert_eq!(dep1.cursor.sequence_id, 100);
assert_eq!(dep1.cursor.originator_id, 42);
assert_eq!(dep1.group_id.as_ref(), &group_id);

let dep2 = result
.get(&PayloadHash::from(payload_hash2.clone()))
.unwrap();
assert_eq!(dep2.cursor.sequence_id, 100);
assert_eq!(dep2.cursor.originator_id, 42);
assert_eq!(dep2.group_id.as_ref(), &group_id);
})
}
}
30 changes: 30 additions & 0 deletions xmtp_db/src/encrypted_store/group_intent/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use thiserror::Error;
use xmtp_common::RetryableError;
use xmtp_proto::types::{CursorList, GroupId};

use crate::group_intent::PayloadHash;

#[derive(Debug, Error)]
pub enum GroupIntentError {
#[error(
"intent {} for group {group_id} has invalid dependencies={}. one message cannot have more than 1 dependency in same epoch",
hex::encode(payload_hash),
cursors
)]
MoreThanOneDependency {
payload_hash: PayloadHash,
cursors: CursorList,
group_id: GroupId,
},
#[error("intent with hash {hash} has no known dependencies")]
NoDependencyFound { hash: PayloadHash },
}

impl RetryableError for GroupIntentError {
fn is_retryable(&self) -> bool {
match self {
Self::MoreThanOneDependency { .. } => true,
Self::NoDependencyFound { .. } => true,
}
}
}
72 changes: 72 additions & 0 deletions xmtp_db/src/encrypted_store/group_intent/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::borrow::Cow;
use std::fmt::Debug;
use std::fmt::Display;
use std::ops::Deref;

use diesel::expression::AsExpression;
use diesel::serialize;
use diesel::serialize::Output;
use diesel::serialize::ToSql;
use diesel::sql_types::Binary;
use diesel::sqlite::Sqlite;
use xmtp_proto::types::Cursor;
use xmtp_proto::types::GroupId;

#[derive(Debug, PartialEq, Clone)]
pub struct IntentDependency {
pub cursor: Cursor,
pub group_id: GroupId,
}

pub type PayloadHash = PayloadHashRef<'static>;

#[derive(Hash, Clone, Eq, PartialEq, AsExpression)]
#[diesel(sql_type = Binary)]
pub struct PayloadHashRef<'a>(Cow<'a, [u8]>);

impl Deref for PayloadHash {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<T> AsRef<T> for PayloadHash
where
T: ?Sized,
<PayloadHash as Deref>::Target: AsRef<T>,
{
fn as_ref(&self) -> &T {
self.deref().as_ref()
}
}

impl ToSql<Binary, Sqlite> for PayloadHashRef<'_> {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Sqlite>) -> serialize::Result {
<Cow<'_, [u8]> as ToSql<Binary, Sqlite>>::to_sql(&self.0, out)
}
}

impl<'a> Debug for PayloadHashRef<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(&self.0))
}
}

impl<'a> Display for PayloadHashRef<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", hex::encode(&self.0))
}
}

impl From<Vec<u8>> for PayloadHash {
fn from(value: Vec<u8>) -> PayloadHash {
PayloadHashRef(Cow::from(value))
}
}

impl<'a> From<&'a [u8]> for PayloadHashRef<'a> {
fn from(value: &'a [u8]) -> Self {
PayloadHashRef(Cow::from(value))
}
}
5 changes: 5 additions & 0 deletions xmtp_db/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use diesel::result::DatabaseErrorKind;
use thiserror::Error;

use crate::group_intent::GroupIntentError;

use super::{
refresh_state::EntityKind,
sql_key_store::{self, SqlKeyStoreError},
Expand Down Expand Up @@ -42,6 +44,8 @@ pub enum StorageError {
Connection(#[from] crate::ConnectionError),
#[error("HMAC key must be 42 bytes")]
InvalidHmacLength,
#[error(transparent)]
GroupIntent(#[from] GroupIntentError),
}

impl From<std::convert::Infallible> for StorageError {
Expand Down Expand Up @@ -151,6 +155,7 @@ impl RetryableError for StorageError {
Self::OpenMlsStorage(storage) => retryable!(storage),
Self::Platform(p) => retryable!(p),
Self::Connection(e) => retryable!(e),
Self::GroupIntent(e) => retryable!(e),
Self::MigrationError(_)
| Self::Conversion(_)
| Self::NotFound(_)
Expand Down
6 changes: 6 additions & 0 deletions xmtp_db/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ mock! {
payload_hash: &[u8],
) -> Result<Option<crate::group_intent::StoredGroupIntent>, StorageError>;

#[mockall::concretize]
fn find_dependant_commits<P: AsRef<[u8]>>(
&self,
payload_hashes: &[P],
) -> Result<HashMap<crate::group_intent::PayloadHash, crate::group_intent::IntentDependency>, StorageError>;

fn increment_intent_publish_attempt_count(
&self,
intent_id: crate::group_intent::ID,
Expand Down
Loading
Loading