diff --git a/crates/partition-store/src/deduplication_table/mod.rs b/crates/partition-store/src/deduplication_table/mod.rs index e6238e0431..957f9a9c2c 100644 --- a/crates/partition-store/src/deduplication_table/mod.rs +++ b/crates/partition-store/src/deduplication_table/mod.rs @@ -35,7 +35,7 @@ fn get_dedup_sequence_number( .partition_id(partition_id.into()) .producer_id(producer_id.clone()); - storage.get_value(key) + storage.get_value_proto(key) } impl ReadDeduplicationTable for PartitionStore { @@ -66,6 +66,6 @@ impl WriteDeduplicationTable for PartitionStoreTransaction<'_> { .partition_id(self.partition_id().into()) .producer_id(producer_id); - self.put_kv(key, dedup_sequence_number) + self.put_kv_proto(key, dedup_sequence_number) } } diff --git a/crates/partition-store/src/fsm_table/mod.rs b/crates/partition-store/src/fsm_table/mod.rs index d2cd599c2c..8fc3ac0ac9 100644 --- a/crates/partition-store/src/fsm_table/mod.rs +++ b/crates/partition-store/src/fsm_table/mod.rs @@ -17,6 +17,7 @@ use restate_types::SemanticRestateVersion; use restate_types::identifiers::PartitionId; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; +use restate_types::schema::Schema; use crate::TableKind::PartitionStateMachine; use crate::keys::{KeyKind, define_table_key}; @@ -37,7 +38,10 @@ pub(crate) mod fsm_variable { pub(crate) const PARTITION_DURABILITY: u64 = 4; /// Schema versions are represented as a strictly monotonically increasing number. - pub(crate) const SCHEMA_VERSION: u64 = 5; + /// This represent the partition storage schema version, not the user services schema. + pub(crate) const STORAGE_VERSION: u64 = 5; + + pub(crate) const SERVICES_SCHEMA_METADATA: u64 = 6; } fn get( @@ -51,7 +55,7 @@ where let key = PartitionStateMachineKey::default() .partition_id(partition_id.into()) .state_id(state_id); - storage.get_value(key) + storage.get_value_proto(key) } /// Forces a read from persistent storage, bypassing memtables and block cache. @@ -78,7 +82,7 @@ fn put( let key = PartitionStateMachineKey::default() .partition_id(partition_id.into()) .state_id(state_id); - storage.put_kv(key, state_value) + storage.put_kv_proto(key, state_value) } pub async fn get_locally_durable_lsn(partition_store: &mut PartitionStore) -> Result> { @@ -90,15 +94,15 @@ pub async fn get_locally_durable_lsn(partition_store: &mut PartitionStore) -> Re .map(|opt| opt.map(|seq_number| Lsn::from(u64::from(seq_number)))) } -pub(crate) async fn get_schema_version( +pub(crate) async fn get_storage_version( storage: &mut S, partition_id: PartitionId, ) -> Result { - get::(storage, partition_id, fsm_variable::SCHEMA_VERSION) + get::(storage, partition_id, fsm_variable::STORAGE_VERSION) .map(|opt| opt.map(|s| s.0 as u16).unwrap_or_default()) } -pub(crate) async fn put_schema_version( +pub(crate) async fn put_storage_version( storage: &mut S, partition_id: PartitionId, last_executed_migration: u16, @@ -106,7 +110,7 @@ pub(crate) async fn put_schema_version( put( storage, partition_id, - fsm_variable::SCHEMA_VERSION, + fsm_variable::STORAGE_VERSION, &SequenceNumber::from(last_executed_migration as u64), ) } @@ -143,6 +147,13 @@ impl ReadFsmTable for PartitionStore { fsm_variable::PARTITION_DURABILITY, ) } + + async fn get_schema(&mut self) -> Result> { + let key = PartitionStateMachineKey::default() + .partition_id(self.partition_id().into()) + .state_id(fsm_variable::SERVICES_SCHEMA_METADATA); + self.get_value_storage_codec(key) + } } impl WriteFsmTable for PartitionStoreTransaction<'_> { @@ -190,4 +201,11 @@ impl WriteFsmTable for PartitionStoreTransaction<'_> { durability, ) } + + fn put_schema(&mut self, schema: &Schema) -> Result<()> { + let key = PartitionStateMachineKey::default() + .partition_id(self.partition_id().into()) + .state_id(fsm_variable::SERVICES_SCHEMA_METADATA); + self.put_kv_storage_codec(key, schema) + } } diff --git a/crates/partition-store/src/idempotency_table/mod.rs b/crates/partition-store/src/idempotency_table/mod.rs index ae9465618d..a25b450d95 100644 --- a/crates/partition-store/src/idempotency_table/mod.rs +++ b/crates/partition-store/src/idempotency_table/mod.rs @@ -57,7 +57,7 @@ fn get_idempotency_metadata( storage: &mut S, idempotency_id: &IdempotencyId, ) -> Result> { - storage.get_value(create_key(idempotency_id)) + storage.get_value_proto(create_key(idempotency_id)) } fn put_idempotency_metadata( @@ -65,7 +65,7 @@ fn put_idempotency_metadata( idempotency_id: &IdempotencyId, metadata: &IdempotencyMetadata, ) -> Result<()> { - storage.put_kv(create_key(idempotency_id), metadata) + storage.put_kv_proto(create_key(idempotency_id), metadata) } fn delete_idempotency_metadata( diff --git a/crates/partition-store/src/inbox_table/mod.rs b/crates/partition-store/src/inbox_table/mod.rs index fe8ade94ef..aa907ae856 100644 --- a/crates/partition-store/src/inbox_table/mod.rs +++ b/crates/partition-store/src/inbox_table/mod.rs @@ -156,7 +156,7 @@ impl WriteInboxTable for PartitionStoreTransaction<'_> { .service_key(service_id.key.clone()) .sequence_number(inbox_sequence_number); - self.put_kv(key, inbox_entry) + self.put_kv_proto(key, inbox_entry) } fn delete_inbox_entry(&mut self, service_id: &ServiceId, sequence_number: u64) -> Result<()> { diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index 8e3c95939d..9b42c83324 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -75,7 +75,7 @@ fn put_invocation_status( ) -> Result<()> { match status { InvocationStatus::Free => storage.delete_key(&create_invocation_status_key(invocation_id)), - _ => storage.put_kv(create_invocation_status_key(invocation_id), status), + _ => storage.put_kv_proto(create_invocation_status_key(invocation_id), status), } } @@ -86,7 +86,7 @@ fn get_invocation_status( let _x = RocksDbPerfGuard::new("get-invocation-status"); storage - .get_value::<_, InvocationStatus>(create_invocation_status_key(invocation_id)) + .get_value_proto::<_, InvocationStatus>(create_invocation_status_key(invocation_id)) .map(|value| { if let Some(invocation_status) = value { invocation_status diff --git a/crates/partition-store/src/journal_events/mod.rs b/crates/partition-store/src/journal_events/mod.rs index ad51339aad..5fb8939192 100644 --- a/crates/partition-store/src/journal_events/mod.rs +++ b/crates/partition-store/src/journal_events/mod.rs @@ -80,7 +80,7 @@ fn put_journal_event( lsn: u64, ) -> Result<()> { let (event_ty, event_value) = event.event.into_inner(); - storage.put_kv( + storage.put_kv_proto( write_journal_event_key( invocation_id, event_ty as u8, diff --git a/crates/partition-store/src/journal_table/mod.rs b/crates/partition-store/src/journal_table/mod.rs index 11c6b32109..8fc01482f4 100644 --- a/crates/partition-store/src/journal_table/mod.rs +++ b/crates/partition-store/src/journal_table/mod.rs @@ -56,7 +56,7 @@ fn put_journal_entry( ) -> Result<()> { let key = write_journal_entry_key(invocation_id, journal_index); - storage.put_kv(key, journal_entry) + storage.put_kv_proto(key, journal_entry) } fn get_journal_entry( @@ -66,7 +66,7 @@ fn get_journal_entry( ) -> Result> { let key = write_journal_entry_key(invocation_id, journal_index); - storage.get_value(key) + storage.get_value_proto(key) } fn get_journal( diff --git a/crates/partition-store/src/journal_table_v2/mod.rs b/crates/partition-store/src/journal_table_v2/mod.rs index de715ec9e8..b52fc5e7b5 100644 --- a/crates/partition-store/src/journal_table_v2/mod.rs +++ b/crates/partition-store/src/journal_table_v2/mod.rs @@ -81,7 +81,7 @@ fn put_journal_entry( related_completion_ids: &[CompletionId], ) -> Result<()> { if let RawEntry::Notification(notification) = &journal_entry.inner { - storage.put_kv( + storage.put_kv_proto( JournalNotificationIdToNotificationIndexKey::default() .partition_key(invocation_id.partition_key()) .invocation_uuid(invocation_id.invocation_uuid()) @@ -90,7 +90,7 @@ fn put_journal_entry( )?; } else if let RawEntry::Command(_) = &journal_entry.inner { for completion_id in related_completion_ids { - storage.put_kv( + storage.put_kv_proto( JournalCompletionIdToCommandIndexKey::default() .partition_key(invocation_id.partition_key()) .invocation_uuid(invocation_id.invocation_uuid()) @@ -100,7 +100,7 @@ fn put_journal_entry( } } - storage.put_kv( + storage.put_kv_proto( write_journal_entry_key(invocation_id, journal_index), &StoredEntry(journal_entry.clone()), ) @@ -112,7 +112,7 @@ fn get_journal_entry( journal_index: u32, ) -> Result> { let key = write_journal_entry_key(invocation_id, journal_index); - let opt: Option = storage.get_value(key)?; + let opt: Option = storage.get_value_proto(key)?; Ok(opt.map(|e| e.0)) } @@ -251,7 +251,7 @@ fn get_command_by_completion_id( .partition_key(invocation_id.partition_key()) .invocation_uuid(invocation_id.invocation_uuid()) .completion_id(completion_id); - let opt: Option = storage.get_value(completion_id_to_command_index)?; + let opt: Option = storage.get_value_proto(completion_id_to_command_index)?; if opt.is_none() { return Ok(None); } @@ -259,7 +259,7 @@ fn get_command_by_completion_id( // Now access the entry let journal_index = opt.unwrap().0; let key = write_journal_entry_key(&invocation_id, journal_index); - let opt: Option = storage.get_value(key)?; + let opt: Option = storage.get_value_proto(key)?; if opt.is_none() { return Ok(None); } @@ -287,7 +287,9 @@ fn has_completion( .partition_key(invocation_id.partition_key()) .invocation_uuid(invocation_id.invocation_uuid()) .notification_id(NotificationId::CompletionId(completion_id)); - Ok(storage.get_value::<_, JournalEntryIndex>(key)?.is_some()) + Ok(storage + .get_value_proto::<_, JournalEntryIndex>(key)? + .is_some()) } impl ReadJournalTable for PartitionStore { diff --git a/crates/partition-store/src/outbox_table/mod.rs b/crates/partition-store/src/outbox_table/mod.rs index b4f22a1037..bea39e8e2b 100644 --- a/crates/partition-store/src/outbox_table/mod.rs +++ b/crates/partition-store/src/outbox_table/mod.rs @@ -39,7 +39,7 @@ fn add_message( .partition_id(partition_id.into()) .message_index(message_index); - storage.put_kv(key, outbox_message) + storage.put_kv_proto(key, outbox_message) } fn get_outbox_head_seq_number( @@ -103,7 +103,7 @@ fn get_outbox_message( .partition_id(partition_id.into()) .message_index(sequence_number); - storage.get_value(outbox_key) + storage.get_value_proto(outbox_key) } fn truncate_outbox( diff --git a/crates/partition-store/src/partition_store.rs b/crates/partition-store/src/partition_store.rs index b7b621c967..371cb13bc0 100644 --- a/crates/partition-store/src/partition_store.rs +++ b/crates/partition-store/src/partition_store.rs @@ -38,8 +38,10 @@ use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId, WithPart use restate_types::logs::Lsn; use restate_types::partitions::Partition; use restate_types::storage::StorageCodec; +use restate_types::storage::StorageDecode; +use restate_types::storage::StorageEncode; -use crate::fsm_table::{get_locally_durable_lsn, get_schema_version, put_schema_version}; +use crate::fsm_table::{get_locally_durable_lsn, get_storage_version, put_storage_version}; use crate::keys::KeyKind; use crate::keys::TableKey; use crate::migrations::{LATEST_VERSION, SchemaVersion}; @@ -679,7 +681,7 @@ impl PartitionStore { pub async fn verify_and_run_migrations(&mut self) -> Result<()> { let mut schema_version: SchemaVersion = - get_schema_version(self, self.partition_id()).await?.into(); + get_storage_version(self, self.partition_id()).await?.into(); if schema_version != LATEST_VERSION { // We need to run some migrations! debug!( @@ -687,7 +689,7 @@ impl PartitionStore { schema_version, LATEST_VERSION ); schema_version = schema_version.run_all_migrations(self).await?; - put_schema_version(self, self.partition_id(), schema_version as u16).await?; + put_storage_version(self, self.partition_id(), schema_version as u16).await?; } Ok(()) @@ -1071,7 +1073,19 @@ pub(crate) trait StorageAccess { } #[inline] - fn put_kv( + fn put_kv_proto( + &mut self, + key: K, + value: &V, + ) -> Result<()> { + self.put_kv_storage_codec( + key, + &ProtobufStorageWrapper::(value.clone().into()), + ) + } + + #[inline] + fn put_kv_storage_codec( &mut self, key: K, value: &V, @@ -1081,11 +1095,7 @@ pub(crate) trait StorageAccess { let key_buffer = key_buffer.split(); let value_buffer = self.cleared_value_buffer_mut(0); - StorageCodec::encode( - &ProtobufStorageWrapper::(value.clone().into()), - value_buffer, - ) - .map_err(|e| StorageError::Generic(e.into()))?; + StorageCodec::encode(value, value_buffer).map_err(|e| StorageError::Generic(e.into()))?; let value_buffer = value_buffer.split(); self.put_cf(K::TABLE, key_buffer, value_buffer) @@ -1101,29 +1111,39 @@ pub(crate) trait StorageAccess { } #[inline] - fn get_value(&mut self, key: K) -> Result> + fn get_value_proto(&mut self, key: K) -> Result> where K: TableKey, V: PartitionStoreProtobufValue, <::ProtobufType as TryInto>::Error: Into, + { + let value: Option> = + self.get_value_storage_codec(key)?; + + value + .map(|v| v.0.try_into()) + .transpose() + .map_err(|err| StorageError::Conversion(err.into())) + } + + #[inline] + fn get_value_storage_codec(&mut self, key: K) -> Result> + where + K: TableKey, + V: StorageDecode, { let mut buf = self.cleared_key_buffer_mut(key.serialized_length()); key.serialize_to(&mut buf); let buf = buf.split(); - match self.get(K::TABLE, &buf) { - Ok(value) => { - let slice = value.as_ref().map(|v| v.as_ref()); - - if let Some(mut slice) = slice { - Ok(Some(V::decode(&mut slice)?)) - } else { - Ok(None) - } - } - Err(err) => Err(err), - } + self.get(K::TABLE, &buf)? + .map(|value| { + let mut slice = value.as_ref(); + StorageCodec::decode(&mut slice) + }) + .transpose() + .map_err(|err| StorageError::Generic(err.into())) } /// Forces a read from persistent storage, bypassing memtables and block cache. diff --git a/crates/partition-store/src/promise_table/mod.rs b/crates/partition-store/src/promise_table/mod.rs index ee77e36768..726ada4daa 100644 --- a/crates/partition-store/src/promise_table/mod.rs +++ b/crates/partition-store/src/promise_table/mod.rs @@ -53,7 +53,7 @@ fn get_promise( key: &ByteString, ) -> Result> { let _x = RocksDbPerfGuard::new("get-promise"); - storage.get_value(create_key(service_id, key)) + storage.get_value_proto(create_key(service_id, key)) } fn put_promise( @@ -62,7 +62,7 @@ fn put_promise( key: &ByteString, metadata: &Promise, ) -> Result<()> { - storage.put_kv(create_key(service_id, key), metadata) + storage.put_kv_proto(create_key(service_id, key), metadata) } fn delete_all_promises(storage: &mut S, service_id: &ServiceId) -> Result<()> { diff --git a/crates/partition-store/src/service_status_table/mod.rs b/crates/partition-store/src/service_status_table/mod.rs index 2b774c5972..be91398bff 100644 --- a/crates/partition-store/src/service_status_table/mod.rs +++ b/crates/partition-store/src/service_status_table/mod.rs @@ -54,7 +54,7 @@ fn put_virtual_object_status( if *status == VirtualObjectStatus::Unlocked { storage.delete_key(&key) } else { - storage.put_kv(key, status) + storage.put_kv_proto(key, status) } } @@ -69,7 +69,7 @@ fn get_virtual_object_status( .service_key(service_id.key.clone()); storage - .get_value(key) + .get_value_proto(key) .map(|value| value.unwrap_or(VirtualObjectStatus::Unlocked)) } diff --git a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs index 3f2ab8696d..baa3482d8a 100644 --- a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs @@ -33,7 +33,7 @@ use restate_types::invocation::{ }; use restate_types::time::MillisSinceEpoch; -use crate::fsm_table::get_schema_version; +use crate::fsm_table::get_storage_version; use crate::invocation_status_table::{InvocationStatusKey, InvocationStatusKeyV1}; use crate::migrations::{LATEST_VERSION, SchemaVersion}; use crate::partition_store::StorageAccess; @@ -211,7 +211,7 @@ async fn test_migration() { // Let's mock the old invocation statuses let mut txn = rocksdb.transaction(); for (invocation_id, status) in &mocked_invocations { - txn.put_kv( + txn.put_kv_proto( InvocationStatusKeyV1::default() .partition_key(invocation_id.partition_key()) .invocation_uuid(invocation_id.invocation_uuid()), @@ -226,7 +226,7 @@ async fn test_migration() { let partition_id = rocksdb.partition_id(); assert_eq!( SchemaVersion::from( - get_schema_version(&mut rocksdb, partition_id) + get_storage_version(&mut rocksdb, partition_id) .await .unwrap() ), diff --git a/crates/partition-store/src/timer_table/mod.rs b/crates/partition-store/src/timer_table/mod.rs index 5f6b9bfbfe..2bd3252e61 100644 --- a/crates/partition-store/src/timer_table/mod.rs +++ b/crates/partition-store/src/timer_table/mod.rs @@ -145,7 +145,7 @@ fn add_timer( ) -> Result<()> { let key = write_timer_key(partition_id, key); - storage.put_kv(key, timer) + storage.put_kv_proto(key, timer) } fn delete_timer( diff --git a/crates/storage-api/src/fsm_table/mod.rs b/crates/storage-api/src/fsm_table/mod.rs index 6da9c1097b..acdc6f2b5d 100644 --- a/crates/storage-api/src/fsm_table/mod.rs +++ b/crates/storage-api/src/fsm_table/mod.rs @@ -13,6 +13,7 @@ use std::future::Future; use restate_types::SemanticRestateVersion; use restate_types::logs::Lsn; use restate_types::message::MessageIndex; +use restate_types::schema::Schema; use restate_types::time::MillisSinceEpoch; use crate::Result; @@ -32,6 +33,8 @@ pub trait ReadFsmTable { fn get_partition_durability( &mut self, ) -> impl Future>> + Send + '_; + + fn get_schema(&mut self) -> impl Future>> + Send + '_; } pub trait WriteFsmTable { @@ -44,6 +47,8 @@ pub trait WriteFsmTable { fn put_min_restate_version(&mut self, version: &SemanticRestateVersion) -> Result<()>; fn put_partition_durability(&mut self, durability: &PartitionDurability) -> Result<()>; + + fn put_schema(&mut self, schema: &Schema) -> Result<()>; } #[derive(Debug, Clone, Copy, derive_more::From, derive_more::Into)]