Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings_wasm/src/conversation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ mod tests {
originator_id: 0,
sequence_id: 0,
expire_at_ns: None,
published_in_epoch: None,
};
crate::to_value(&stored_message).unwrap();
}
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE group_messages DROP COLUMN published_in_epoch;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Your SQL goes here
ALTER TABLE group_messages ADD COLUMN published_in_epoch BIGINT;

5 changes: 5 additions & 0 deletions xmtp_db/src/encrypted_store/group_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct StoredGroupMessage {
pub inserted_at_ns: i64,
/// Timestamp (in NS) after which the message must be deleted
pub expire_at_ns: Option<i64>,
/// the epoch this message was published in
pub published_in_epoch: Option<i64>,
}

impl StoredGroupMessage {
Expand Down Expand Up @@ -108,6 +110,8 @@ struct NewStoredGroupMessage {
pub sequence_id: i64,
// inserted_at_ns is NOT included - let database set it
pub expire_at_ns: Option<i64>,
/// the epoch this message was published in
pub published_in_epoch: Option<i64>,
}

impl From<&StoredGroupMessage> for NewStoredGroupMessage {
Expand All @@ -129,6 +133,7 @@ impl From<&StoredGroupMessage> for NewStoredGroupMessage {
originator_id: msg.originator_id,
sequence_id: msg.sequence_id,
expire_at_ns: msg.expire_at_ns,
published_in_epoch: msg.published_in_epoch,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions xmtp_db/src/encrypted_store/group_message/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl TryFrom<GroupMessageSave> for StoredGroupMessage {
.unwrap_or(Originators::APPLICATION_MESSAGES.into()),
expire_at_ns: None,
inserted_at_ns: 0, // Will be set by database
published_in_epoch: value.published_in_epoch,
})
}
}
Expand Down Expand Up @@ -109,6 +110,7 @@ impl From<StoredGroupMessage> for GroupMessageSave {
reference_id: value.reference_id,
sequence_id: Some(value.sequence_id),
originator_id: Some(value.originator_id),
published_in_epoch: value.published_in_epoch,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn generate_message_with_cursor(
sequence_id,
originator_id,
expire_at_ns: None,
published_in_epoch: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions xmtp_db/src/encrypted_store/group_message/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(crate) fn generate_message(
originator_id: 0,
expire_at_ns,
inserted_at_ns: 0, // Will be set by database
published_in_epoch: None,
}
}

Expand Down Expand Up @@ -608,6 +609,7 @@ pub(crate) fn generate_message_with_reference<C: ConnectionExt>(
originator_id: 0,
expire_at_ns: None,
inserted_at_ns: 0, // Will be set by database
published_in_epoch: None,
};
message.store(conn).unwrap();
message
Expand Down
1 change: 1 addition & 0 deletions xmtp_db/src/encrypted_store/schema_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ diesel::table! {
sequence_id -> BigInt,
inserted_at_ns -> BigInt,
expire_at_ns -> Nullable<BigInt>,
published_in_epoch -> Nullable<BigInt>,
}
}

Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,7 @@ where
originator_id: conversation_item.originator_id?,
expire_at_ns: None, //Question: do we need to include this in conversation last message?
inserted_at_ns: 0, // Not used for conversation list display
published_in_epoch: None
});
if msg.is_none() {
tracing::warn!("tried listing message, but message had missing fields so it was skipped");
Expand Down
2 changes: 2 additions & 0 deletions xmtp_mls/src/groups/message_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ mod tests {
originator_id: Originators::APPLICATION_MESSAGES.into(),
expire_at_ns: None,
inserted_at_ns: 0,
published_in_epoch: None,
}
}

Expand Down Expand Up @@ -132,6 +133,7 @@ mod tests {
originator_id: Originators::APPLICATION_MESSAGES.into(),
expire_at_ns: None,
inserted_at_ns: 0,
published_in_epoch: None,
}
}

Expand Down
9 changes: 8 additions & 1 deletion xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ where
);

if let Some((staged_commit, validated_commit)) = commit {
let before_commit_applied_epoch = mls_group.epoch().as_u64();

tracing::info!(
"[{}] merging pending commit for intent {}",
self.context.inbox_id(),
Expand Down Expand Up @@ -799,6 +801,7 @@ where
envelope_timestamp_ns as u64,
*cursor,
storage,
before_commit_applied_epoch,
)
.map_err(|err| IntentResolutionError {
processing_error: err,
Expand Down Expand Up @@ -1083,6 +1086,7 @@ where
originator_id: cursor.originator_id as i64,
expire_at_ns: Self::get_message_expire_at_ns(mls_group),
inserted_at_ns: 0, // Will be set by database
published_in_epoch: Some(msg_epoch as i64),
};
message.store_or_ignore(&storage.db())?;
// make sure internal id is on return type after its stored successfully
Expand Down Expand Up @@ -1132,7 +1136,7 @@ where
let staged_commit = *staged_commit;
let validated_commit =
validated_commit.expect("Needs to be present when this is a staged commit");

let before_commit_apply_epoch = mls_group.epoch().as_u64();
tracing::info!(
inbox_id = self.context.inbox_id(),
sender_inbox_id = sender_inbox_id,
Expand Down Expand Up @@ -1195,6 +1199,7 @@ where
envelope_timestamp_ns as u64,
*cursor,
storage,
before_commit_apply_epoch,
)?;

// remove left/removed members from the pending_remove list
Expand Down Expand Up @@ -1953,6 +1958,7 @@ where
timestamp_ns: u64,
cursor: Cursor,
storage: &impl XmtpMlsStorageProvider,
published_in_epoch: u64,
) -> Result<Option<StoredGroupMessage>, GroupMessageProcessingError> {
if validated_commit.is_empty() {
return Ok(None);
Expand Down Expand Up @@ -2012,6 +2018,7 @@ where
originator_id: cursor.originator_id as i64,
expire_at_ns: None,
inserted_at_ns: 0, // Will be set by database
published_in_epoch: Some(published_in_epoch as i64),
};
msg.store_or_ignore(&storage.db())?;
Ok(Some(msg))
Expand Down
2 changes: 2 additions & 0 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ where
originator_id: 0,
expire_at_ns: None,
inserted_at_ns: 0, // Will be set by database
// we haven't published it yet
published_in_epoch: None,
};
group_message.store(&self.context.db())?;

Expand Down
3 changes: 3 additions & 0 deletions xmtp_mls/src/groups/welcomes/xmtp_welcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ where
originator_id: Originators::MLS_COMMITS as i64,
expire_at_ns: None,
inserted_at_ns: 0, // Will be set by database
// when we get a welcome we join in the epoch we send messages
// so the commit the commit the welcome created is the _previous_ epoch
published_in_epoch: Some(mls_group.epoch().as_u64() as i64 - 1),
};

added_msg.store_or_ignore(&db)?;
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/test/mock/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,6 @@ pub fn generate_stored_msg(cursor: Cursor, group_id: Vec<u8>) -> StoredGroupMess
originator_id: cursor.originator_id as i64,
expire_at_ns: None,
inserted_at_ns: 0,
published_in_epoch: None,
}
}
2 changes: 1 addition & 1 deletion xmtp_proto/proto_version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
c1b814649fcd7c87326c0da2c2af635ee571ce97
c1d3a3510bb1e91f937b250b52e98cf76f7e774e
Binary file modified xmtp_proto/src/gen/proto_descriptor.bin
Binary file not shown.
2 changes: 2 additions & 0 deletions xmtp_proto/src/gen/xmtp.device_sync.message_backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct GroupMessageSave {
pub sequence_id: ::core::option::Option<i64>,
#[prost(int64, optional, tag = "15")]
pub originator_id: ::core::option::Option<i64>,
#[prost(int64, optional, tag = "16")]
pub published_in_epoch: ::core::option::Option<i64>,
}
impl ::prost::Name for GroupMessageSave {
const NAME: &'static str = "GroupMessageSave";
Expand Down
22 changes: 22 additions & 0 deletions xmtp_proto/src/gen/xmtp.device_sync.message_backup.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ impl serde::Serialize for GroupMessageSave {
if self.originator_id.is_some() {
len += 1;
}
if self.published_in_epoch.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("xmtp.device_sync.message_backup.GroupMessageSave", len)?;
if !self.id.is_empty() {
#[allow(clippy::needless_borrow)]
Expand Down Expand Up @@ -368,6 +371,11 @@ impl serde::Serialize for GroupMessageSave {
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("originator_id", ToString::to_string(&v).as_str())?;
}
if let Some(v) = self.published_in_epoch.as_ref() {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("published_in_epoch", ToString::to_string(&v).as_str())?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -406,6 +414,8 @@ impl<'de> serde::Deserialize<'de> for GroupMessageSave {
"sequenceId",
"originator_id",
"originatorId",
"published_in_epoch",
"publishedInEpoch",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -425,6 +435,7 @@ impl<'de> serde::Deserialize<'de> for GroupMessageSave {
ReferenceId,
SequenceId,
OriginatorId,
PublishedInEpoch,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -462,6 +473,7 @@ impl<'de> serde::Deserialize<'de> for GroupMessageSave {
"referenceId" | "reference_id" => Ok(GeneratedField::ReferenceId),
"sequenceId" | "sequence_id" => Ok(GeneratedField::SequenceId),
"originatorId" | "originator_id" => Ok(GeneratedField::OriginatorId),
"publishedInEpoch" | "published_in_epoch" => Ok(GeneratedField::PublishedInEpoch),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -496,6 +508,7 @@ impl<'de> serde::Deserialize<'de> for GroupMessageSave {
let mut reference_id__ = None;
let mut sequence_id__ = None;
let mut originator_id__ = None;
let mut published_in_epoch__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Id => {
Expand Down Expand Up @@ -608,6 +621,14 @@ impl<'de> serde::Deserialize<'de> for GroupMessageSave {
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
;
}
GeneratedField::PublishedInEpoch => {
if published_in_epoch__.is_some() {
return Err(serde::de::Error::duplicate_field("publishedInEpoch"));
}
published_in_epoch__ =
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
;
}
GeneratedField::__SkipField__ => {
let _ = map_.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -629,6 +650,7 @@ impl<'de> serde::Deserialize<'de> for GroupMessageSave {
reference_id: reference_id__,
sequence_id: sequence_id__,
originator_id: originator_id__,
published_in_epoch: published_in_epoch__,
})
}
}
Expand Down
100 changes: 50 additions & 50 deletions xmtp_proto/src/gen/xmtp.message_api.v1.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,54 @@
// This file is @generated by prost-build.
/// Token is used by clients to prove to the nodes
/// that they are serving a specific wallet.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct Token {
/// identity key signed by a wallet
#[prost(message, optional, tag = "1")]
pub identity_key: ::core::option::Option<super::super::message_contents::PublicKey>,
/// encoded bytes of AuthData
#[prost(bytes = "vec", tag = "2")]
pub auth_data_bytes: ::prost::alloc::vec::Vec<u8>,
/// identity key signature of AuthData bytes
#[prost(message, optional, tag = "3")]
pub auth_data_signature: ::core::option::Option<
super::super::message_contents::Signature,
>,
}
impl ::prost::Name for Token {
const NAME: &'static str = "Token";
const PACKAGE: &'static str = "xmtp.message_api.v1";
fn full_name() -> ::prost::alloc::string::String {
"xmtp.message_api.v1.Token".into()
}
fn type_url() -> ::prost::alloc::string::String {
Comment on lines +2 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can be done so that generation of these are stable and don't move around when we add more code?

Copy link
Contributor Author

@insipx insipx Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could try using https://docs.rs/tonic/latest/tonic/macro.include_proto.html

This would generate the protos into the rust OUT_DIR. the downside is first compile will be a bit longer as it needs to clone xmtp/proto and googleapis to generate the protos. but it will delete the entire gen/ directory, which will create smaller PRs.

at that point, i would argue for moving xmtp_proto, or at least just the "proto" part into xmtp/proto, then renaming xmtp_proto to xmtp_types or xmtp_primitives or something. Then we could get rid of the companion PR pattern and proto generation could just be a cargo update which makes versioning clearer (rather than ad-hoc commit refs we store in txt, cargo would take care of it in the lockfile)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we open an issue with prost. My guess is they're using some hash that is dependent on the contents of the protos and that's why things change around when a single field is added.

"/xmtp.message_api.v1.Token".into()
}
}
/// AuthData carries token parameters that are authenticated
/// by the identity key signature.
/// It is embedded in the Token structure as bytes
/// so that the bytes don't need to be reconstructed
/// to verify the token signature.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AuthData {
/// address of the wallet
#[prost(string, tag = "1")]
pub wallet_addr: ::prost::alloc::string::String,
/// time when the token was generated/signed
#[prost(uint64, tag = "2")]
pub created_ns: u64,
}
impl ::prost::Name for AuthData {
const NAME: &'static str = "AuthData";
const PACKAGE: &'static str = "xmtp.message_api.v1";
fn full_name() -> ::prost::alloc::string::String {
"xmtp.message_api.v1.AuthData".into()
}
fn type_url() -> ::prost::alloc::string::String {
"/xmtp.message_api.v1.AuthData".into()
}
}
/// This is based off of the go-waku Index type, but with the
/// receiverTime and pubsubTopic removed for simplicity.
/// Both removed fields are optional
Expand Down Expand Up @@ -718,53 +768,3 @@ pub mod message_api_server {
const NAME: &'static str = SERVICE_NAME;
}
}
/// Token is used by clients to prove to the nodes
/// that they are serving a specific wallet.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct Token {
/// identity key signed by a wallet
#[prost(message, optional, tag = "1")]
pub identity_key: ::core::option::Option<super::super::message_contents::PublicKey>,
/// encoded bytes of AuthData
#[prost(bytes = "vec", tag = "2")]
pub auth_data_bytes: ::prost::alloc::vec::Vec<u8>,
/// identity key signature of AuthData bytes
#[prost(message, optional, tag = "3")]
pub auth_data_signature: ::core::option::Option<
super::super::message_contents::Signature,
>,
}
impl ::prost::Name for Token {
const NAME: &'static str = "Token";
const PACKAGE: &'static str = "xmtp.message_api.v1";
fn full_name() -> ::prost::alloc::string::String {
"xmtp.message_api.v1.Token".into()
}
fn type_url() -> ::prost::alloc::string::String {
"/xmtp.message_api.v1.Token".into()
}
}
/// AuthData carries token parameters that are authenticated
/// by the identity key signature.
/// It is embedded in the Token structure as bytes
/// so that the bytes don't need to be reconstructed
/// to verify the token signature.
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct AuthData {
/// address of the wallet
#[prost(string, tag = "1")]
pub wallet_addr: ::prost::alloc::string::String,
/// time when the token was generated/signed
#[prost(uint64, tag = "2")]
pub created_ns: u64,
}
impl ::prost::Name for AuthData {
const NAME: &'static str = "AuthData";
const PACKAGE: &'static str = "xmtp.message_api.v1";
fn full_name() -> ::prost::alloc::string::String {
"xmtp.message_api.v1.AuthData".into()
}
fn type_url() -> ::prost::alloc::string::String {
"/xmtp.message_api.v1.AuthData".into()
}
}
Loading
Loading