diff --git a/Cargo.lock b/Cargo.lock index b806567bf1..384fcd83ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7929,6 +7929,7 @@ dependencies = [ "bytestring", "enum-map", "prost 0.14.1", + "restate-encoding", "restate-invoker-api", "restate-storage-api", "restate-types", diff --git a/crates/encoding/src/bilrost_encodings/mod.rs b/crates/encoding/src/bilrost_encodings/mod.rs index 1ec3c82186..8cec053861 100644 --- a/crates/encoding/src/bilrost_encodings/mod.rs +++ b/crates/encoding/src/bilrost_encodings/mod.rs @@ -12,6 +12,7 @@ mod arc_encodings; mod nonzero; +mod phantom_data; mod range; pub mod display_from_str; diff --git a/crates/encoding/src/bilrost_encodings/phantom_data.rs b/crates/encoding/src/bilrost_encodings/phantom_data.rs new file mode 100644 index 0000000000..fbfa99ee87 --- /dev/null +++ b/crates/encoding/src/bilrost_encodings/phantom_data.rs @@ -0,0 +1,56 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::marker::PhantomData; + +use bilrost::{ + DecodeErrorKind, + encoding::{EmptyState, ForOverwrite, Proxiable}, +}; + +use crate::bilrost_encodings::RestateEncoding; + +struct PhantomDataTag; + +impl Proxiable for PhantomData { + type Proxy = (); + + fn encode_proxy(&self) -> Self::Proxy {} + + fn decode_proxy(&mut self, _: Self::Proxy) -> Result<(), DecodeErrorKind> { + Ok(()) + } +} + +impl ForOverwrite> for () { + fn for_overwrite() -> PhantomData { + PhantomData + } +} + +impl EmptyState> for () { + fn empty() -> PhantomData { + PhantomData + } + + fn is_empty(_: &PhantomData) -> bool { + true + } + + fn clear(_: &mut PhantomData) {} +} + +bilrost::delegate_proxied_encoding!( + use encoding (::bilrost::encoding::General) + to encode proxied type (PhantomData) + using proxy tag (PhantomDataTag) + with encoding (RestateEncoding) + with generics (T) +); diff --git a/crates/invoker-api/src/effects.rs b/crates/invoker-api/src/effects.rs index ad0ccf0cb7..4e12bff18c 100644 --- a/crates/invoker-api/src/effects.rs +++ b/crates/invoker-api/src/effects.rs @@ -16,11 +16,11 @@ use restate_types::invocation::InvocationEpoch; use restate_types::journal::EntryIndex; use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal_events::raw::RawEvent; -use restate_types::journal_v2; use restate_types::journal_v2::CommandIndex; use restate_types::journal_v2::raw::RawEntry; use restate_types::storage::{StoredRawEntry, StoredRawEntryHeader}; use restate_types::time::MillisSinceEpoch; +use restate_types::{flexbuffers_storage_encode_decode, journal_v2}; use std::collections::HashSet; #[derive(Debug, Clone, PartialEq, Eq)] @@ -35,6 +35,8 @@ pub struct Effect { pub kind: EffectKind, } +flexbuffers_storage_encode_decode!(Effect); + #[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] // todo: fix this and box the large variant (EffectKind is 320 bytes) diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index 1bee3c5877..634c5e0ba3 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -349,15 +349,26 @@ impl From for opentelemetry::trace::SpanId { /// Services are isolated by key. This means that there cannot be two concurrent /// invocations for the same service instance (service name, key). #[derive( - Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize, + Eq, + Hash, + PartialEq, + PartialOrd, + Ord, + Clone, + Debug, + serde::Serialize, + serde::Deserialize, + bilrost::Message, )] pub struct ServiceId { // TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows) /// Identifies the grpc service + #[bilrost(1)] pub service_name: ByteString, /// Identifies the service instance for the given service name + #[bilrost(2)] pub key: ByteString, - + #[bilrost(3)] partition_key: PartitionKey, } diff --git a/crates/types/src/invocation/mod.rs b/crates/types/src/invocation/mod.rs index 90fc59722a..17f4ecd2b1 100644 --- a/crates/types/src/invocation/mod.rs +++ b/crates/types/src/invocation/mod.rs @@ -19,7 +19,10 @@ use crate::identifiers::{ }; use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal}; use crate::time::MillisSinceEpoch; -use crate::{GenerationalNodeId, RestateVersion}; +use crate::{ + GenerationalNodeId, RestateVersion, bilrost_storage_encode_decode, + flexbuffers_storage_encode_decode, +}; use bytes::Bytes; use bytestring::ByteString; @@ -435,6 +438,8 @@ pub struct ServiceInvocation { pub restate_version: RestateVersion, } +flexbuffers_storage_encode_decode!(ServiceInvocation); + #[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde( from = "serde_hacks::SubmitNotificationSink", @@ -577,6 +582,8 @@ pub struct InvocationResponse { pub result: ResponseResult, } +flexbuffers_storage_encode_decode!(InvocationResponse); + impl WithInvocationId for InvocationResponse { fn invocation_id(&self) -> InvocationId { self.target.invocation_id() @@ -623,6 +630,8 @@ pub struct GetInvocationOutputResponse { pub result: GetInvocationOutputResult, } +bilrost_storage_encode_decode!(GetInvocationOutputResponse); + impl WithInvocationId for GetInvocationOutputResponse { fn invocation_id(&self) -> InvocationId { self.target.invocation_id() @@ -944,6 +953,8 @@ pub struct InvocationTermination { pub response_sink: Option, } +flexbuffers_storage_encode_decode!(InvocationTermination); + /// Flavor of the termination. Can be kill (hard stop) or graceful cancel. #[derive( Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Enumeration, @@ -963,6 +974,8 @@ pub struct PurgeInvocationRequest { pub response_sink: Option, } +flexbuffers_storage_encode_decode!(PurgeInvocationRequest); + impl WithInvocationId for PurgeInvocationRequest { fn invocation_id(&self) -> InvocationId { self.invocation_id @@ -979,6 +992,8 @@ pub struct ResumeInvocationRequest { pub response_sink: Option, } +flexbuffers_storage_encode_decode!(ResumeInvocationRequest); + impl WithInvocationId for ResumeInvocationRequest { fn invocation_id(&self) -> InvocationId { self.invocation_id @@ -1001,6 +1016,8 @@ pub struct RestartAsNewInvocationRequest { pub response_sink: Option, } +flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequest); + impl WithInvocationId for RestartAsNewInvocationRequest { fn invocation_id(&self) -> InvocationId { self.invocation_id @@ -1320,6 +1337,8 @@ pub struct AttachInvocationRequest { pub response_sink: ServiceInvocationResponseSink, } +flexbuffers_storage_encode_decode!(AttachInvocationRequest); + impl WithPartitionKey for AttachInvocationRequest { fn partition_key(&self) -> PartitionKey { self.invocation_query.partition_key() @@ -1333,6 +1352,8 @@ pub struct NotifySignalRequest { pub signal: Signal, } +flexbuffers_storage_encode_decode!(NotifySignalRequest); + impl WithInvocationId for NotifySignalRequest { fn invocation_id(&self) -> InvocationId { self.invocation_id diff --git a/crates/types/src/message.rs b/crates/types/src/message.rs index 1bbcc49adc..b9e6897e8d 100644 --- a/crates/types/src/message.rs +++ b/crates/types/src/message.rs @@ -10,7 +10,7 @@ //! This module defines types used for the internal messaging between Restate components. -use crate::identifiers::PartitionId; +use crate::{bilrost_storage_encode_decode, identifiers::PartitionId}; /// Wrapper that extends a message with its target peer to which the message should be sent. pub type PartitionTarget = (PartitionId, Msg); @@ -29,3 +29,11 @@ pub enum AckKind { last_known_seq_number: MessageIndex, }, } + +#[derive(Debug, Clone, Copy, bilrost::Message)] +pub struct MessageIndexRecrod { + #[bilrost(1)] + pub index: MessageIndex, +} + +bilrost_storage_encode_decode!(MessageIndexRecrod); diff --git a/crates/types/src/state_mut.rs b/crates/types/src/state_mut.rs index 7a28be51cf..7a4c1dd8cf 100644 --- a/crates/types/src/state_mut.rs +++ b/crates/types/src/state_mut.rs @@ -16,21 +16,27 @@ use bytes::Bytes; use serde_with::serde_as; use sha2::{Digest, Sha256}; +use crate::bilrost_storage_encode_decode; use crate::identifiers::ServiceId; #[serde_as] /// ExternalStateMutation /// /// represents an external request to mutate a user's state. -#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Message)] pub struct ExternalStateMutation { + #[bilrost(1)] pub service_id: ServiceId, + #[bilrost(2)] pub version: Option, // flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs + #[bilrost(3)] #[serde_as(as = "serde_with::Seq<(_, _)>")] pub state: HashMap, } +bilrost_storage_encode_decode!(ExternalStateMutation); + /// # StateMutationVersion /// /// This type represents a user state version. This implementation hashes canonically the raw key-value diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index e57e97c146..4cb7026b6e 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -27,6 +27,8 @@ use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry}; use crate::journal_v2::{Decoder, EntryMetadata, EntryType}; use crate::time::MillisSinceEpoch; +pub use tracing; + #[derive(Debug, thiserror::Error)] pub enum StorageEncodeError { #[error("encoding failed: {0}")] @@ -187,7 +189,7 @@ macro_rules! flexbuffers_storage_encode_decode { Self: Sized, { $crate::storage::decode::decode_serde(buf, kind).map_err(|err| { - ::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name)); + $crate::storage::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name)); err }) @@ -196,6 +198,40 @@ macro_rules! flexbuffers_storage_encode_decode { }; } +/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing +/// type using [`bilrost`]. +#[macro_export] +macro_rules! bilrost_storage_encode_decode { + ($name:tt) => { + impl $crate::storage::StorageEncode for $name { + fn default_codec(&self) -> $crate::storage::StorageCodecKind { + $crate::storage::StorageCodecKind::Bilrost + } + + fn encode( + &self, + buf: &mut ::bytes::BytesMut, + ) -> Result<(), $crate::storage::StorageEncodeError> { + bytes::BufMut::put(buf, $crate::storage::encode::encode_bilrost(self)); + Ok(()) + } + } + + impl $crate::storage::StorageDecode for $name { + fn decode( + buf: &mut B, + kind: $crate::storage::StorageCodecKind, + ) -> Result + where + Self: Sized, + { + debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost); + $crate::storage::decode::decode_bilrost(buf) + } + } + }; +} + /// A polymorphic container of a buffer or a cached storage-encodeable object #[derive(Clone, derive_more::Debug, BilrostAs)] #[bilrost_as(dto::PolyBytes)] diff --git a/crates/wal-protocol/Cargo.toml b/crates/wal-protocol/Cargo.toml index 82941fc1f2..85cabe256e 100644 --- a/crates/wal-protocol/Cargo.toml +++ b/crates/wal-protocol/Cargo.toml @@ -13,7 +13,7 @@ serde = ["dep:serde", "enum-map/serde", "bytestring/serde", "restate-invoker-api [dependencies] restate-workspace-hack = { workspace = true } - +restate-encoding = { workspace = true } restate-invoker-api = { workspace = true } restate-storage-api = { workspace = true } restate-types = { workspace = true } diff --git a/crates/wal-protocol/src/control.rs b/crates/wal-protocol/src/control.rs index c51d642729..7d9dc10fbe 100644 --- a/crates/wal-protocol/src/control.rs +++ b/crates/wal-protocol/src/control.rs @@ -13,22 +13,26 @@ use std::ops::RangeInclusive; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::logs::{Keys, Lsn}; use restate_types::time::MillisSinceEpoch; -use restate_types::{GenerationalNodeId, SemanticRestateVersion}; +use restate_types::{GenerationalNodeId, SemanticRestateVersion, bilrost_storage_encode_decode}; /// Announcing a new leader. This message can be written by any component to make the specified /// partition processor the leader. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct AnnounceLeader { /// Sender of the announce leader message. /// /// This became non-optional in v1.5. Noting that it has always been set in previous versions, /// it's safe to assume that it's always set. + #[bilrost(1)] pub node_id: GenerationalNodeId, + #[bilrost(2)] pub leader_epoch: LeaderEpoch, + #[bilrost(3)] pub partition_key_range: RangeInclusive, } +bilrost_storage_encode_decode!(AnnounceLeader); /// A version barrier to fence off state machine changes that require a certain minimum /// version of restate server. /// @@ -39,12 +43,17 @@ pub struct AnnounceLeader { #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct VersionBarrier { /// The minimum version required (inclusive) to progress after this barrier. + #[bilrost(1)] pub version: SemanticRestateVersion, /// A human-readable reason for why this barrier exists. + #[bilrost(2)] pub human_reason: Option, + #[bilrost(3)] pub partition_key_range: Keys, } +bilrost_storage_encode_decode!(VersionBarrier); + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. Note that durability /// only applies to partitions with the same `partition_id`. At replay time, the partition will /// ignore updates that are not targeted to its own ID. @@ -55,10 +64,15 @@ pub struct VersionBarrier { #[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct PartitionDurability { + #[bilrost(1)] pub partition_id: PartitionId, /// The partition has applied this LSN durably to the replica-set and/or has been /// persisted in a snapshot in the snapshot repository. + #[bilrost(2)] pub durable_point: Lsn, /// Timestamp which the durability point was updated + #[bilrost(3)] pub modification_time: MillisSinceEpoch, } + +bilrost_storage_encode_decode!(PartitionDurability); diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index fd9f29482e..867c908cce 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -8,631 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::deduplication_table::DedupInformation; -use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; -use restate_types::invocation::{ - AttachInvocationRequest, GetInvocationOutputResponse, InvocationResponse, - InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, - RestartAsNewInvocationRequest, ResumeInvocationRequest, ServiceInvocation, -}; -use restate_types::logs; -use restate_types::logs::{HasRecordKeys, Keys, MatchKeyQuery}; -use restate_types::message::MessageIndex; -use restate_types::state_mut::ExternalStateMutation; - -use crate::control::{AnnounceLeader, VersionBarrier}; -use crate::timer::TimerKeyValue; - -use self::control::PartitionDurability; - pub mod control; pub mod timer; +mod v1; +pub mod v2; -/// The primary envelope for all messages in the system. -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Envelope { - pub header: Header, - pub command: Command, -} - -impl Envelope { - pub fn new(header: Header, command: Command) -> Self { - Self { header, command } - } -} - -/// Header is set on every message -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Header { - pub source: Source, - pub dest: Destination, -} - -/// Identifies the source of a message -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Source { - /// Message is sent from another partition processor - Processor { - /// if possible, this is used to reroute responses in case of splits/merges - /// v1.4 requires this to be set. - /// v1.5 Marked as `Option`. - /// v1.6 always set to `None`. - /// Will be removed in v1.7. - #[cfg_attr(feature = "serde", serde(default))] - partition_id: Option, - #[cfg_attr(feature = "serde", serde(default))] - partition_key: Option, - /// The current epoch of the partition leader. Readers should observe this to decide which - /// messages to accept. Readers should ignore messages coming from - /// epochs lower than the max observed for a given partition id. - leader_epoch: LeaderEpoch, - // Which node is this message from? - // First deprecation in v1.1, but since v1.5 we switched to Option and it's - // still being set to Some(v) to maintain compatibility with v1.4. - // - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // node_id: Option, - - // From v1.1 this is always set, but maintained to support rollback to v1.0. - // Deprecated(v1.5): It's set to Some(v) to maintain support for v1.4 but - // will be removed in v1.6. Commands that need the node-id of the sender should - // include the node-id in the command payload itself (e.g. in the [`AnnounceLeader`]) - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // generational_node_id: Option, - }, - /// Message is sent from an ingress node - Ingress { - // The identity of the sender node. Generational for fencing. Ingress is - // stateless, so we shouldn't respond to requests from older generation - // if a new generation is alive. - // - // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. - // but will be removed in v1.6. - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // node_id: Option, - - // Last config version observed by sender. If this is a newer generation - // or an unknown ID, we might need to update our config. - // - // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. - // but will be removed in v1.6. - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // nodes_config_version: Option, - }, - /// Message is sent from some control plane component (controller, cli, etc.) - ControlPlane { - // Reserved for future use. - }, -} - -/// Identifies the intended destination of the message -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Destination { - /// Message is sent to partition processor - Processor { - partition_key: PartitionKey, - #[cfg_attr(feature = "serde", serde(default))] - dedup: Option, - }, -} - -/// State machine input commands -#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, strum::VariantNames)] -#[strum_discriminants(derive(strum::IntoStaticStr))] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Command { - /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. - /// See [`PartitionDurability`] for more details. - /// - /// *Since v1.4.2* - UpdatePartitionDurability(PartitionDurability), - /// A version barrier to fence off state machine changes that require a certain minimum - /// version of restate server. - /// *Since v1.4.0* - VersionBarrier(VersionBarrier), - // -- Control-plane related events - AnnounceLeader(Box), - - // -- Partition processor commands - /// Manual patching of storage state - PatchState(ExternalStateMutation), - /// Terminate an ongoing invocation - TerminateInvocation(InvocationTermination), - /// Purge a completed invocation - PurgeInvocation(PurgeInvocationRequest), - /// Purge a completed invocation journal - PurgeJournal(PurgeInvocationRequest), - /// Start an invocation on this partition - Invoke(Box), - /// Truncate the message outbox up to, and including, the specified index. - TruncateOutbox(MessageIndex), - /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. - ProxyThrough(Box), - /// Attach to an existing invocation - AttachInvocation(AttachInvocationRequest), - /// Resume an invocation - ResumeInvocation(ResumeInvocationRequest), - /// Restart as new invocation from prefix - RestartAsNewInvocation(RestartAsNewInvocationRequest), - - // -- Partition processor events for PP - /// Invoker is reporting effect(s) from an ongoing invocation. - InvokerEffect(Box), - /// Timer has fired - Timer(TimerKeyValue), - /// Schedule timer - ScheduleTimer(TimerKeyValue), - /// Another partition processor is reporting a response of an invocation we requested. - /// - /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. - /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. - /// - /// For more details see `OnNotifyInvocationResponse`. - InvocationResponse(InvocationResponse), - - // -- New PP <-> PP commands using Journal V2 - /// Notify Get invocation output - NotifyGetInvocationOutputResponse(GetInvocationOutputResponse), - /// Notify a signal. - NotifySignal(NotifySignalRequest), -} - -impl Command { - pub fn name(&self) -> &'static str { - CommandDiscriminants::from(self).into() - } -} - -impl WithPartitionKey for Envelope { - fn partition_key(&self) -> PartitionKey { - match self.header.dest { - Destination::Processor { partition_key, .. } => partition_key, - } - } -} - -impl HasRecordKeys for Envelope { - fn record_keys(&self) -> logs::Keys { - match &self.command { - // the partition_key is used as key here since the command targets the partition by ID. - // Partitions will ignore this message at read time if the paritition ID (in body) - // does not match. Alternatively, we could use the partition key range or `Keys::None` - // but this would just be a waste of effort for readers after a partition has been - // split or if the log is shared between multiple partitions. - Command::UpdatePartitionDurability(_) => Keys::Single(self.partition_key()), - Command::VersionBarrier(barrier) => barrier.partition_key_range.clone(), - Command::AnnounceLeader(announce) => { - Keys::RangeInclusive(announce.partition_key_range.clone()) - } - Command::PatchState(mutation) => Keys::Single(mutation.service_id.partition_key()), - Command::TerminateInvocation(terminate) => { - Keys::Single(terminate.invocation_id.partition_key()) - } - Command::PurgeInvocation(purge) => Keys::Single(purge.invocation_id.partition_key()), - Command::PurgeJournal(purge) => Keys::Single(purge.invocation_id.partition_key()), - Command::Invoke(invoke) => Keys::Single(invoke.partition_key()), - // todo: Remove this, or pass the partition key range but filter based on partition-id - // on read if needed. - Command::TruncateOutbox(_) => Keys::Single(self.partition_key()), - Command::ProxyThrough(_) => Keys::Single(self.partition_key()), - Command::AttachInvocation(_) => Keys::Single(self.partition_key()), - Command::ResumeInvocation(req) => Keys::Single(req.partition_key()), - Command::RestartAsNewInvocation(req) => Keys::Single(req.partition_key()), - // todo: Handle journal entries that request cross-partition invocations - Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()), - Command::Timer(timer) => Keys::Single(timer.invocation_id().partition_key()), - Command::ScheduleTimer(timer) => Keys::Single(timer.invocation_id().partition_key()), - Command::InvocationResponse(response) => Keys::Single(response.partition_key()), - Command::NotifySignal(sig) => Keys::Single(sig.partition_key()), - Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()), - } - } -} - -impl MatchKeyQuery for Envelope { - fn matches_key_query(&self, query: &logs::KeyFilter) -> bool { - self.record_keys().matches_key_query(query) - } -} - -#[cfg(feature = "serde")] -mod envelope { - use bilrost::{Message, OwnedMessage}; - use bytes::{Buf, Bytes, BytesMut}; - - use restate_storage_api::protobuf_types::v1 as protobuf; - use restate_types::storage::decode::{decode_bilrost, decode_serde}; - use restate_types::storage::encode::{encode_bilrost, encode_serde}; - use restate_types::storage::{ - StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, - }; - - use crate::Command; - - impl StorageEncode for crate::Envelope { - fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { - use bytes::BufMut; - match self.default_codec() { - StorageCodecKind::FlexbuffersSerde => encode_serde(self, buf, self.default_codec()), - StorageCodecKind::Custom => { - buf.put_slice(&encode(self)?); - Ok(()) - } - _ => unreachable!("developer error"), - } - } - - fn default_codec(&self) -> StorageCodecKind { - // todo: Could be changed in v1.6 - StorageCodecKind::FlexbuffersSerde - } - } - - impl StorageDecode for crate::Envelope { - fn decode( - buf: &mut B, - kind: StorageCodecKind, - ) -> Result - where - Self: Sized, - { - match kind { - StorageCodecKind::Json - | StorageCodecKind::BincodeSerde - | StorageCodecKind::FlexbuffersSerde => decode_serde(buf, kind).map_err(|err| { - tracing::error!(%err, "{} decode failure (decoding Envelope)", kind); - err - }), - StorageCodecKind::LengthPrefixedRawBytes - | StorageCodecKind::Protobuf - | StorageCodecKind::Bilrost => Err(StorageDecodeError::UnsupportedCodecKind(kind)), - StorageCodecKind::Custom => decode(buf), - } - } - } - - #[derive(Debug, thiserror::Error)] - enum DecodeError { - #[error("missing field codec")] - MissingFieldCodec, - #[error("unknown command kind")] - UnknownCommandKind, - #[error("unexpected codec kind {0}")] - UnexpectedCodec(StorageCodecKind), - } - - impl From for StorageDecodeError { - fn from(value: DecodeError) -> Self { - Self::DecodeValue(value.into()) - } - } - - #[derive(PartialEq, Eq, bilrost::Enumeration)] - enum CommandKind { - Unknown = 0, - AnnounceLeader = 1, // flexbuffers - PatchState = 2, // protobuf - TerminateInvocation = 3, // flexbuffers - PurgeInvocation = 4, // flexbuffers - Invoke = 5, // protobuf - TruncateOutbox = 6, // flexbuffers - ProxyThrough = 7, // protobuf - AttachInvocation = 8, // protobuf - InvokerEffect = 9, // flexbuffers - Timer = 10, // flexbuffers - ScheduleTimer = 11, // flexbuffers - InvocationResponse = 12, // protobuf - NotifyGetInvocationOutputResponse = 13, // bilrost - NotifySignal = 14, // protobuf - PurgeJournal = 15, // flexbuffers - VersionBarrier = 16, // bilrost - UpdatePartitionDurability = 17, // bilrost - ResumeInvocation = 18, // flexbuffers - RestartAsNewInvocation = 19, // flexbuffers - } - - #[derive(bilrost::Message)] - struct Field { - #[bilrost(1)] - codec: Option, - #[bilrost(2)] - bytes: Bytes, - } - - impl Field { - fn encode_serde( - codec: StorageCodecKind, - value: &T, - ) -> Result { - let mut buf = BytesMut::new(); - encode_serde(value, &mut buf, codec)?; - - Ok(Self { - codec: Some(codec), - bytes: buf.freeze(), - }) - } - - fn encode_bilrost(value: &T) -> Result { - Ok(Self { - codec: Some(StorageCodecKind::Bilrost), - bytes: encode_bilrost(value), - }) - } - - fn encode_protobuf(value: &T) -> Result { - let mut buf = BytesMut::new(); - value - .encode(&mut buf) - .map_err(|err| StorageEncodeError::EncodeValue(err.into()))?; - - Ok(Self { - codec: Some(StorageCodecKind::Protobuf), - bytes: buf.freeze(), - }) - } - - fn decode_serde(mut self) -> Result { - let codec = self.codec()?; - if !matches!( - codec, - StorageCodecKind::Json - | StorageCodecKind::FlexbuffersSerde - | StorageCodecKind::BincodeSerde - ) { - return Err(StorageDecodeError::UnsupportedCodecKind(codec)); - } - - decode_serde( - &mut self.bytes, - self.codec.ok_or(DecodeError::MissingFieldCodec)?, - ) - } - - fn decode_bilrost(mut self) -> Result { - let codec = self.codec()?; - if codec != StorageCodecKind::Bilrost { - return Err(StorageDecodeError::UnsupportedCodecKind(codec)); - } - - decode_bilrost(&mut self.bytes) - } - - fn decode_protobuf(self) -> Result { - let codec = self.codec()?; - if codec != StorageCodecKind::Protobuf { - return Err(StorageDecodeError::UnsupportedCodecKind(codec)); - } - - T::decode(self.bytes).map_err(|err| StorageDecodeError::DecodeValue(err.into())) - } - - fn codec(&self) -> Result { - self.codec.ok_or(DecodeError::MissingFieldCodec) - } - } - - #[derive(bilrost::Message)] - struct Envelope { - #[bilrost(1)] - header: Field, - #[bilrost(2)] - command_kind: CommandKind, - #[bilrost(3)] - command: Field, - } - - macro_rules! codec_or_error { - ($field:expr, $expected:path) => {{ - let codec = $field.codec()?; - if !matches!(codec, $expected) { - return Err(DecodeError::UnexpectedCodec(codec).into()); - } - }}; - } - - pub fn encode(envelope: &super::Envelope) -> Result { - // todo(azmy): avoid clone? this will require change to `From` implementation - let (command_kind, command) = match &envelope.command { - Command::UpdatePartitionDurability(value) => ( - CommandKind::UpdatePartitionDurability, - Field::encode_bilrost(value), - ), - Command::VersionBarrier(value) => { - (CommandKind::VersionBarrier, Field::encode_bilrost(value)) - } - Command::AnnounceLeader(value) => ( - CommandKind::AnnounceLeader, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::PatchState(value) => { - let value = protobuf::StateMutation::from(value.clone()); - (CommandKind::PatchState, Field::encode_protobuf(&value)) - } - Command::TerminateInvocation(value) => ( - CommandKind::TerminateInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::PurgeInvocation(value) => ( - CommandKind::PurgeInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::ResumeInvocation(value) => ( - CommandKind::ResumeInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::RestartAsNewInvocation(value) => ( - CommandKind::RestartAsNewInvocation, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::PurgeJournal(value) => ( - CommandKind::PurgeJournal, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::Invoke(value) => { - let value = protobuf::ServiceInvocation::from(value.as_ref()); - (CommandKind::Invoke, Field::encode_protobuf(&value)) - } - Command::TruncateOutbox(value) => ( - CommandKind::TruncateOutbox, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::ProxyThrough(value) => { - let value = protobuf::ServiceInvocation::from(value.as_ref()); - (CommandKind::ProxyThrough, Field::encode_protobuf(&value)) - } - Command::AttachInvocation(value) => { - let value = protobuf::outbox_message::AttachInvocationRequest::from(value.clone()); - ( - CommandKind::AttachInvocation, - Field::encode_protobuf(&value), - ) - } - Command::InvokerEffect(value) => ( - CommandKind::InvokerEffect, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::Timer(value) => ( - CommandKind::Timer, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::ScheduleTimer(value) => ( - CommandKind::ScheduleTimer, - Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), - ), - Command::InvocationResponse(value) => { - let value = - protobuf::outbox_message::OutboxServiceInvocationResponse::from(value.clone()); - ( - CommandKind::InvocationResponse, - Field::encode_protobuf(&value), - ) - } - Command::NotifyGetInvocationOutputResponse(value) => ( - CommandKind::NotifyGetInvocationOutputResponse, - Field::encode_bilrost(value), - ), - Command::NotifySignal(value) => { - let value = protobuf::outbox_message::NotifySignal::from(value.clone()); - (CommandKind::NotifySignal, Field::encode_protobuf(&value)) - } - }; - - let dto = Envelope { - header: Field::encode_serde(StorageCodecKind::FlexbuffersSerde, &envelope.header)?, - command_kind, - command: command?, - }; - - Ok(dto.encode_contiguous().into_vec().into()) - } - - pub fn decode(buf: B) -> Result { - let envelope = - Envelope::decode(buf).map_err(|err| StorageDecodeError::DecodeValue(err.into()))?; - - // header is encoded with serde - codec_or_error!(envelope.header, StorageCodecKind::FlexbuffersSerde); - let header = envelope.header.decode_serde::()?; - - let command = match envelope.command_kind { - CommandKind::Unknown => return Err(DecodeError::UnknownCommandKind.into()), - CommandKind::UpdatePartitionDurability => { - codec_or_error!(envelope.command, StorageCodecKind::Bilrost); - Command::UpdatePartitionDurability(envelope.command.decode_bilrost()?) - } - CommandKind::VersionBarrier => { - codec_or_error!(envelope.command, StorageCodecKind::Bilrost); - Command::VersionBarrier(envelope.command.decode_bilrost()?) - } - CommandKind::AnnounceLeader => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::AnnounceLeader(envelope.command.decode_serde()?) - } - CommandKind::PatchState => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::StateMutation = envelope.command.decode_protobuf()?; - Command::PatchState(value.try_into()?) - } - CommandKind::TerminateInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::TerminateInvocation(envelope.command.decode_serde()?) - } - CommandKind::PurgeInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::PurgeInvocation(envelope.command.decode_serde()?) - } - CommandKind::ResumeInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::ResumeInvocation(envelope.command.decode_serde()?) - } - CommandKind::RestartAsNewInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::RestartAsNewInvocation(envelope.command.decode_serde()?) - } - CommandKind::PurgeJournal => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::PurgeJournal(envelope.command.decode_serde()?) - } - CommandKind::Invoke => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?; - Command::Invoke(Box::new(value.try_into()?)) - } - CommandKind::TruncateOutbox => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::TruncateOutbox(envelope.command.decode_serde()?) - } - CommandKind::ProxyThrough => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?; - Command::ProxyThrough(Box::new(value.try_into()?)) - } - CommandKind::AttachInvocation => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::outbox_message::AttachInvocationRequest = - envelope.command.decode_protobuf()?; - Command::AttachInvocation(value.try_into()?) - } - CommandKind::InvokerEffect => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::InvokerEffect(envelope.command.decode_serde()?) - } - CommandKind::Timer => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::Timer(envelope.command.decode_serde()?) - } - CommandKind::ScheduleTimer => { - codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); - Command::ScheduleTimer(envelope.command.decode_serde()?) - } - CommandKind::InvocationResponse => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::outbox_message::OutboxServiceInvocationResponse = - envelope.command.decode_protobuf()?; - Command::InvocationResponse(value.try_into()?) - } - CommandKind::NotifyGetInvocationOutputResponse => { - codec_or_error!(envelope.command, StorageCodecKind::Bilrost); - Command::NotifyGetInvocationOutputResponse(envelope.command.decode_bilrost()?) - } - CommandKind::NotifySignal => { - codec_or_error!(envelope.command, StorageCodecKind::Protobuf); - let value: protobuf::outbox_message::NotifySignal = - envelope.command.decode_protobuf()?; - - Command::NotifySignal(value.try_into()?) - } - }; - - Ok(super::Envelope { header, command }) - } -} +pub use v1::{Command, Destination, Envelope, Header, Source}; diff --git a/crates/wal-protocol/src/timer.rs b/crates/wal-protocol/src/timer.rs index 99b70ef801..115178a93c 100644 --- a/crates/wal-protocol/src/timer.rs +++ b/crates/wal-protocol/src/timer.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind}; +use restate_types::flexbuffers_storage_encode_decode; use restate_types::identifiers::{EntryIndex, InvocationId}; use restate_types::invocation::{InvocationEpoch, ServiceInvocation}; use restate_types::time::MillisSinceEpoch; @@ -23,6 +24,8 @@ pub struct TimerKeyValue { value: Timer, } +flexbuffers_storage_encode_decode!(TimerKeyValue); + impl TimerKeyValue { pub fn new(timer_key: TimerKey, value: Timer) -> Self { Self { timer_key, value } diff --git a/crates/wal-protocol/src/v1.rs b/crates/wal-protocol/src/v1.rs new file mode 100644 index 0000000000..2f7bb26eba --- /dev/null +++ b/crates/wal-protocol/src/v1.rs @@ -0,0 +1,633 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_storage_api::deduplication_table::DedupInformation; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; +use restate_types::invocation::{ + AttachInvocationRequest, GetInvocationOutputResponse, InvocationResponse, + InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, + RestartAsNewInvocationRequest, ResumeInvocationRequest, ServiceInvocation, +}; +use restate_types::logs; +use restate_types::logs::{HasRecordKeys, Keys, MatchKeyQuery}; +use restate_types::message::MessageIndex; +use restate_types::state_mut::ExternalStateMutation; + +use crate::control::{AnnounceLeader, PartitionDurability, VersionBarrier}; +use crate::timer::TimerKeyValue; + +/// The primary envelope for all messages in the system. +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Envelope { + pub header: Header, + pub command: Command, +} + +impl Envelope { + pub fn new(header: Header, command: Command) -> Self { + Self { header, command } + } +} + +/// Header is set on every message +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Header { + pub source: Source, + pub dest: Destination, +} + +/// Identifies the source of a message +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Source { + /// Message is sent from another partition processor + Processor { + /// if possible, this is used to reroute responses in case of splits/merges + /// v1.4 requires this to be set. + /// v1.5 Marked as `Option`. + /// v1.6 always set to `None`. + /// Will be removed in v1.7. + #[cfg_attr(feature = "serde", serde(default))] + partition_id: Option, + #[cfg_attr(feature = "serde", serde(default))] + partition_key: Option, + /// The current epoch of the partition leader. Readers should observe this to decide which + /// messages to accept. Readers should ignore messages coming from + /// epochs lower than the max observed for a given partition id. + leader_epoch: LeaderEpoch, + // Which node is this message from? + // First deprecation in v1.1, but since v1.5 we switched to Option and it's + // still being set to Some(v) to maintain compatibility with v1.4. + // + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // node_id: Option, + + // From v1.1 this is always set, but maintained to support rollback to v1.0. + // Deprecated(v1.5): It's set to Some(v) to maintain support for v1.4 but + // will be removed in v1.6. Commands that need the node-id of the sender should + // include the node-id in the command payload itself (e.g. in the [`AnnounceLeader`]) + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // generational_node_id: Option, + }, + /// Message is sent from an ingress node + Ingress { + // The identity of the sender node. Generational for fencing. Ingress is + // stateless, so we shouldn't respond to requests from older generation + // if a new generation is alive. + // + // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. + // but will be removed in v1.6. + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // node_id: Option, + + // Last config version observed by sender. If this is a newer generation + // or an unknown ID, we might need to update our config. + // + // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. + // but will be removed in v1.6. + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // nodes_config_version: Option, + }, + /// Message is sent from some control plane component (controller, cli, etc.) + ControlPlane { + // Reserved for future use. + }, +} + +/// Identifies the intended destination of the message +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Destination { + /// Message is sent to partition processor + Processor { + partition_key: PartitionKey, + #[cfg_attr(feature = "serde", serde(default))] + dedup: Option, + }, +} + +/// State machine input commands +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants, strum::VariantNames)] +#[strum_discriminants(derive(strum::IntoStaticStr))] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Command { + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. + /// See [`PartitionDurability`] for more details. + /// + /// *Since v1.4.2* + UpdatePartitionDurability(PartitionDurability), + /// A version barrier to fence off state machine changes that require a certain minimum + /// version of restate server. + /// *Since v1.4.0* + VersionBarrier(VersionBarrier), + // -- Control-plane related events + AnnounceLeader(Box), + + // -- Partition processor commands + /// Manual patching of storage state + PatchState(ExternalStateMutation), + /// Terminate an ongoing invocation + TerminateInvocation(InvocationTermination), + /// Purge a completed invocation + PurgeInvocation(PurgeInvocationRequest), + /// Purge a completed invocation journal + PurgeJournal(PurgeInvocationRequest), + /// Start an invocation on this partition + Invoke(Box), + /// Truncate the message outbox up to, and including, the specified index. + TruncateOutbox(MessageIndex), + /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. + ProxyThrough(Box), + /// Attach to an existing invocation + AttachInvocation(AttachInvocationRequest), + /// Resume an invocation + ResumeInvocation(ResumeInvocationRequest), + /// Restart as new invocation from prefix + RestartAsNewInvocation(RestartAsNewInvocationRequest), + + // -- Partition processor events for PP + /// Invoker is reporting effect(s) from an ongoing invocation. + InvokerEffect(Box), + /// Timer has fired + Timer(TimerKeyValue), + /// Schedule timer + ScheduleTimer(TimerKeyValue), + /// Another partition processor is reporting a response of an invocation we requested. + /// + /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. + /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. + /// + /// For more details see `OnNotifyInvocationResponse`. + InvocationResponse(InvocationResponse), + + // -- New PP <-> PP commands using Journal V2 + /// Notify Get invocation output + NotifyGetInvocationOutputResponse(GetInvocationOutputResponse), + /// Notify a signal. + NotifySignal(NotifySignalRequest), +} + +impl Command { + pub fn name(&self) -> &'static str { + CommandDiscriminants::from(self).into() + } +} + +impl WithPartitionKey for Envelope { + fn partition_key(&self) -> PartitionKey { + match self.header.dest { + Destination::Processor { partition_key, .. } => partition_key, + } + } +} + +impl HasRecordKeys for Envelope { + fn record_keys(&self) -> logs::Keys { + match &self.command { + // the partition_key is used as key here since the command targets the partition by ID. + // Partitions will ignore this message at read time if the paritition ID (in body) + // does not match. Alternatively, we could use the partition key range or `Keys::None` + // but this would just be a waste of effort for readers after a partition has been + // split or if the log is shared between multiple partitions. + Command::UpdatePartitionDurability(_) => Keys::Single(self.partition_key()), + Command::VersionBarrier(barrier) => barrier.partition_key_range.clone(), + Command::AnnounceLeader(announce) => { + Keys::RangeInclusive(announce.partition_key_range.clone()) + } + Command::PatchState(mutation) => Keys::Single(mutation.service_id.partition_key()), + Command::TerminateInvocation(terminate) => { + Keys::Single(terminate.invocation_id.partition_key()) + } + Command::PurgeInvocation(purge) => Keys::Single(purge.invocation_id.partition_key()), + Command::PurgeJournal(purge) => Keys::Single(purge.invocation_id.partition_key()), + Command::Invoke(invoke) => Keys::Single(invoke.partition_key()), + // todo: Remove this, or pass the partition key range but filter based on partition-id + // on read if needed. + Command::TruncateOutbox(_) => Keys::Single(self.partition_key()), + Command::ProxyThrough(_) => Keys::Single(self.partition_key()), + Command::AttachInvocation(_) => Keys::Single(self.partition_key()), + Command::ResumeInvocation(req) => Keys::Single(req.partition_key()), + Command::RestartAsNewInvocation(req) => Keys::Single(req.partition_key()), + // todo: Handle journal entries that request cross-partition invocations + Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()), + Command::Timer(timer) => Keys::Single(timer.invocation_id().partition_key()), + Command::ScheduleTimer(timer) => Keys::Single(timer.invocation_id().partition_key()), + Command::InvocationResponse(response) => Keys::Single(response.partition_key()), + Command::NotifySignal(sig) => Keys::Single(sig.partition_key()), + Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()), + } + } +} + +impl MatchKeyQuery for Envelope { + fn matches_key_query(&self, query: &logs::KeyFilter) -> bool { + self.record_keys().matches_key_query(query) + } +} + +#[cfg(feature = "serde")] +mod envelope { + use bilrost::{Message, OwnedMessage}; + use bytes::{Buf, Bytes, BytesMut}; + + use restate_storage_api::protobuf_types::v1 as protobuf; + use restate_types::storage::decode::{decode_bilrost, decode_serde}; + use restate_types::storage::encode::{encode_bilrost, encode_serde}; + use restate_types::storage::{ + StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, + }; + + use crate::Command; + + impl StorageEncode for crate::Envelope { + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + use bytes::BufMut; + match self.default_codec() { + StorageCodecKind::FlexbuffersSerde => encode_serde(self, buf, self.default_codec()), + StorageCodecKind::Custom => { + buf.put_slice(&encode(self)?); + Ok(()) + } + _ => unreachable!("developer error"), + } + } + + fn default_codec(&self) -> StorageCodecKind { + // todo: Could be changed in v1.6 + StorageCodecKind::FlexbuffersSerde + } + } + + impl StorageDecode for crate::Envelope { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + match kind { + StorageCodecKind::Json + | StorageCodecKind::BincodeSerde + | StorageCodecKind::FlexbuffersSerde => decode_serde(buf, kind).map_err(|err| { + tracing::error!(%err, "{} decode failure (decoding Envelope)", kind); + err + }), + StorageCodecKind::LengthPrefixedRawBytes + | StorageCodecKind::Protobuf + | StorageCodecKind::Bilrost => Err(StorageDecodeError::UnsupportedCodecKind(kind)), + StorageCodecKind::Custom => decode(buf), + } + } + } + + #[derive(Debug, thiserror::Error)] + enum DecodeError { + #[error("missing field codec")] + MissingFieldCodec, + #[error("unknown command kind")] + UnknownCommandKind, + #[error("unexpected codec kind {0}")] + UnexpectedCodec(StorageCodecKind), + } + + impl From for StorageDecodeError { + fn from(value: DecodeError) -> Self { + Self::DecodeValue(value.into()) + } + } + + #[derive(PartialEq, Eq, bilrost::Enumeration)] + enum CommandKind { + Unknown = 0, + AnnounceLeader = 1, // flexbuffers + PatchState = 2, // protobuf + TerminateInvocation = 3, // flexbuffers + PurgeInvocation = 4, // flexbuffers + Invoke = 5, // protobuf + TruncateOutbox = 6, // flexbuffers + ProxyThrough = 7, // protobuf + AttachInvocation = 8, // protobuf + InvokerEffect = 9, // flexbuffers + Timer = 10, // flexbuffers + ScheduleTimer = 11, // flexbuffers + InvocationResponse = 12, // protobuf + NotifyGetInvocationOutputResponse = 13, // bilrost + NotifySignal = 14, // protobuf + PurgeJournal = 15, // flexbuffers + VersionBarrier = 16, // bilrost + UpdatePartitionDurability = 17, // bilrost + ResumeInvocation = 18, // flexbuffers + RestartAsNewInvocation = 19, // flexbuffers + } + + #[derive(bilrost::Message)] + struct Field { + #[bilrost(1)] + codec: Option, + #[bilrost(2)] + bytes: Bytes, + } + + impl Field { + fn encode_serde( + codec: StorageCodecKind, + value: &T, + ) -> Result { + let mut buf = BytesMut::new(); + encode_serde(value, &mut buf, codec)?; + + Ok(Self { + codec: Some(codec), + bytes: buf.freeze(), + }) + } + + fn encode_bilrost(value: &T) -> Result { + Ok(Self { + codec: Some(StorageCodecKind::Bilrost), + bytes: encode_bilrost(value), + }) + } + + fn encode_protobuf(value: &T) -> Result { + let mut buf = BytesMut::new(); + value + .encode(&mut buf) + .map_err(|err| StorageEncodeError::EncodeValue(err.into()))?; + + Ok(Self { + codec: Some(StorageCodecKind::Protobuf), + bytes: buf.freeze(), + }) + } + + fn decode_serde(mut self) -> Result { + let codec = self.codec()?; + if !matches!( + codec, + StorageCodecKind::Json + | StorageCodecKind::FlexbuffersSerde + | StorageCodecKind::BincodeSerde + ) { + return Err(StorageDecodeError::UnsupportedCodecKind(codec)); + } + + decode_serde( + &mut self.bytes, + self.codec.ok_or(DecodeError::MissingFieldCodec)?, + ) + } + + fn decode_bilrost(mut self) -> Result { + let codec = self.codec()?; + if codec != StorageCodecKind::Bilrost { + return Err(StorageDecodeError::UnsupportedCodecKind(codec)); + } + + decode_bilrost(&mut self.bytes) + } + + fn decode_protobuf(self) -> Result { + let codec = self.codec()?; + if codec != StorageCodecKind::Protobuf { + return Err(StorageDecodeError::UnsupportedCodecKind(codec)); + } + + T::decode(self.bytes).map_err(|err| StorageDecodeError::DecodeValue(err.into())) + } + + fn codec(&self) -> Result { + self.codec.ok_or(DecodeError::MissingFieldCodec) + } + } + + #[derive(bilrost::Message)] + struct Envelope { + #[bilrost(1)] + header: Field, + #[bilrost(2)] + command_kind: CommandKind, + #[bilrost(3)] + command: Field, + } + + macro_rules! codec_or_error { + ($field:expr, $expected:path) => {{ + let codec = $field.codec()?; + if !matches!(codec, $expected) { + return Err(DecodeError::UnexpectedCodec(codec).into()); + } + }}; + } + + pub fn encode(envelope: &super::Envelope) -> Result { + // todo(azmy): avoid clone? this will require change to `From` implementation + let (command_kind, command) = match &envelope.command { + Command::UpdatePartitionDurability(value) => ( + CommandKind::UpdatePartitionDurability, + Field::encode_bilrost(value), + ), + Command::VersionBarrier(value) => { + (CommandKind::VersionBarrier, Field::encode_bilrost(value)) + } + Command::AnnounceLeader(value) => ( + CommandKind::AnnounceLeader, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::PatchState(value) => { + let value = protobuf::StateMutation::from(value.clone()); + (CommandKind::PatchState, Field::encode_protobuf(&value)) + } + Command::TerminateInvocation(value) => ( + CommandKind::TerminateInvocation, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::PurgeInvocation(value) => ( + CommandKind::PurgeInvocation, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::ResumeInvocation(value) => ( + CommandKind::ResumeInvocation, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::RestartAsNewInvocation(value) => ( + CommandKind::RestartAsNewInvocation, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::PurgeJournal(value) => ( + CommandKind::PurgeJournal, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::Invoke(value) => { + let value = protobuf::ServiceInvocation::from(value.as_ref()); + (CommandKind::Invoke, Field::encode_protobuf(&value)) + } + Command::TruncateOutbox(value) => ( + CommandKind::TruncateOutbox, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::ProxyThrough(value) => { + let value = protobuf::ServiceInvocation::from(value.as_ref()); + (CommandKind::ProxyThrough, Field::encode_protobuf(&value)) + } + Command::AttachInvocation(value) => { + let value = protobuf::outbox_message::AttachInvocationRequest::from(value.clone()); + ( + CommandKind::AttachInvocation, + Field::encode_protobuf(&value), + ) + } + Command::InvokerEffect(value) => ( + CommandKind::InvokerEffect, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::Timer(value) => ( + CommandKind::Timer, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::ScheduleTimer(value) => ( + CommandKind::ScheduleTimer, + Field::encode_serde(StorageCodecKind::FlexbuffersSerde, value), + ), + Command::InvocationResponse(value) => { + let value = + protobuf::outbox_message::OutboxServiceInvocationResponse::from(value.clone()); + ( + CommandKind::InvocationResponse, + Field::encode_protobuf(&value), + ) + } + Command::NotifyGetInvocationOutputResponse(value) => ( + CommandKind::NotifyGetInvocationOutputResponse, + Field::encode_bilrost(value), + ), + Command::NotifySignal(value) => { + let value = protobuf::outbox_message::NotifySignal::from(value.clone()); + (CommandKind::NotifySignal, Field::encode_protobuf(&value)) + } + }; + + let dto = Envelope { + header: Field::encode_serde(StorageCodecKind::FlexbuffersSerde, &envelope.header)?, + command_kind, + command: command?, + }; + + Ok(dto.encode_contiguous().into_vec().into()) + } + + pub fn decode(buf: B) -> Result { + let envelope = + Envelope::decode(buf).map_err(|err| StorageDecodeError::DecodeValue(err.into()))?; + + // header is encoded with serde + codec_or_error!(envelope.header, StorageCodecKind::FlexbuffersSerde); + let header = envelope.header.decode_serde::()?; + + let command = match envelope.command_kind { + CommandKind::Unknown => return Err(DecodeError::UnknownCommandKind.into()), + CommandKind::UpdatePartitionDurability => { + codec_or_error!(envelope.command, StorageCodecKind::Bilrost); + Command::UpdatePartitionDurability(envelope.command.decode_bilrost()?) + } + CommandKind::VersionBarrier => { + codec_or_error!(envelope.command, StorageCodecKind::Bilrost); + Command::VersionBarrier(envelope.command.decode_bilrost()?) + } + CommandKind::AnnounceLeader => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::AnnounceLeader(envelope.command.decode_serde()?) + } + CommandKind::PatchState => { + codec_or_error!(envelope.command, StorageCodecKind::Protobuf); + let value: protobuf::StateMutation = envelope.command.decode_protobuf()?; + Command::PatchState(value.try_into()?) + } + CommandKind::TerminateInvocation => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::TerminateInvocation(envelope.command.decode_serde()?) + } + CommandKind::PurgeInvocation => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::PurgeInvocation(envelope.command.decode_serde()?) + } + CommandKind::ResumeInvocation => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::ResumeInvocation(envelope.command.decode_serde()?) + } + CommandKind::RestartAsNewInvocation => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::RestartAsNewInvocation(envelope.command.decode_serde()?) + } + CommandKind::PurgeJournal => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::PurgeJournal(envelope.command.decode_serde()?) + } + CommandKind::Invoke => { + codec_or_error!(envelope.command, StorageCodecKind::Protobuf); + let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?; + Command::Invoke(Box::new(value.try_into()?)) + } + CommandKind::TruncateOutbox => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::TruncateOutbox(envelope.command.decode_serde()?) + } + CommandKind::ProxyThrough => { + codec_or_error!(envelope.command, StorageCodecKind::Protobuf); + let value: protobuf::ServiceInvocation = envelope.command.decode_protobuf()?; + Command::ProxyThrough(Box::new(value.try_into()?)) + } + CommandKind::AttachInvocation => { + codec_or_error!(envelope.command, StorageCodecKind::Protobuf); + let value: protobuf::outbox_message::AttachInvocationRequest = + envelope.command.decode_protobuf()?; + Command::AttachInvocation(value.try_into()?) + } + CommandKind::InvokerEffect => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::InvokerEffect(envelope.command.decode_serde()?) + } + CommandKind::Timer => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::Timer(envelope.command.decode_serde()?) + } + CommandKind::ScheduleTimer => { + codec_or_error!(envelope.command, StorageCodecKind::FlexbuffersSerde); + Command::ScheduleTimer(envelope.command.decode_serde()?) + } + CommandKind::InvocationResponse => { + codec_or_error!(envelope.command, StorageCodecKind::Protobuf); + let value: protobuf::outbox_message::OutboxServiceInvocationResponse = + envelope.command.decode_protobuf()?; + Command::InvocationResponse(value.try_into()?) + } + CommandKind::NotifyGetInvocationOutputResponse => { + codec_or_error!(envelope.command, StorageCodecKind::Bilrost); + Command::NotifyGetInvocationOutputResponse(envelope.command.decode_bilrost()?) + } + CommandKind::NotifySignal => { + codec_or_error!(envelope.command, StorageCodecKind::Protobuf); + let value: protobuf::outbox_message::NotifySignal = + envelope.command.decode_protobuf()?; + + Command::NotifySignal(value.try_into()?) + } + }; + + Ok(super::Envelope { header, command }) + } +} diff --git a/crates/wal-protocol/src/v2.rs b/crates/wal-protocol/src/v2.rs new file mode 100644 index 0000000000..cac4cbc2a3 --- /dev/null +++ b/crates/wal-protocol/src/v2.rs @@ -0,0 +1,781 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::marker::PhantomData; + +use bilrost::encoding::encoded_len_varint; +use bilrost::{Message, OwnedMessage}; +use bytes::{BufMut, Bytes, BytesMut}; + +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; +use restate_types::logs::{HasRecordKeys, Keys}; +use restate_types::storage::{ + StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, StorageEncodeError, +}; + +const ENCODING_VERSION: u8 = 1; + +#[derive(Debug, Clone, bilrost::Message)] +struct EnvelopeInner { + #[bilrost(1)] + header: Header, + #[bilrost(2)] + keys: Keys, + #[bilrost(3)] + kind: RecordKind, + #[bilrost(4)] + encoding: Option, +} + +/// The primary envelope for all messages in the system. +#[derive(Debug, Clone)] +pub struct Envelope { + inner: EnvelopeInner, + payload: Bytes, + phantom: PhantomData, +} + +impl StorageEncode for Envelope { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::Custom + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + buf.put_u8(ENCODING_VERSION); + + let len = self.inner.encoded_len(); + buf.reserve(encoded_len_varint(len as u64) + len + self.payload.len()); + + self.inner + .encode_length_delimited(buf) + .map_err(|err| StorageEncodeError::EncodeValue(err.into()))?; + + buf.put(&self.payload[..]); + Ok(()) + } +} + +impl StorageDecode for Envelope { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + match kind { + StorageCodecKind::FlexbuffersSerde => { + todo!("implement loading from envelop V1") + } + StorageCodecKind::Custom => { + let version = buf.get_u8(); + if version != ENCODING_VERSION { + return Err(StorageDecodeError::DecodeValue( + anyhow::anyhow!("Unknown envelope encoding version {version}").into(), + )); + } + + let inner = EnvelopeInner::decode_length_delimited(&mut *buf) + .map_err(|err| StorageDecodeError::DecodeValue(err.into()))?; + + Ok(Self { + inner, + payload: buf.copy_to_bytes(buf.remaining()), + phantom: PhantomData, + }) + } + _ => { + panic!("unsupported encoding"); + } + } + } +} + +impl HasRecordKeys for Envelope { + fn record_keys(&self) -> Keys { + self.inner.keys.clone() + } +} + +impl WithPartitionKey for Envelope { + fn partition_key(&self) -> PartitionKey { + match self.header().dest { + Destination::None => unimplemented!("expect destinationt to be set"), + Destination::Processor { partition_key, .. } => partition_key, + } + } +} + +impl Envelope { + #[inline] + pub fn record_type(&self) -> RecordKind { + self.inner.kind + } + + #[inline] + pub fn header(&self) -> &Header { + &self.inner.header + } + + pub fn kind(&self) -> RecordKind { + self.inner.kind + } +} + +/// Tag for untyped Envelope +// #[derive(Copy, Clone, Default, PartialEq, Eq, bilrost::Message)] +pub struct Raw; + +impl Envelope { + /// Convers Raw Envelope into a Typed envelope. Panics + /// if the record kind does not match the M::KIND + pub fn into_typed(self) -> Envelope { + assert_eq!(self.inner.kind, M::KIND); + + let Self { + inner, + payload, + phantom: _, + } = self; + + Envelope { + inner, + payload, + phantom: PhantomData, + } + } +} + +impl Envelope { + /// Create a new typed envelope + pub fn create( + header: Header, + record_keys: Keys, + payload: M::Payload, + ) -> Result + where + M::Payload: StorageEncode, + { + let mut buf = BytesMut::new(); + payload.encode(&mut buf)?; + + let inner = EnvelopeInner { + header, + keys: record_keys, + kind: M::KIND, + encoding: payload.default_codec().into(), + }; + + Ok(Self { + inner, + payload: buf.freeze(), + phantom: PhantomData, + }) + } + + /// return the envelope payload + pub fn payload(&mut self) -> Result { + M::Payload::decode( + &mut self.payload, + self.inner.encoding.expect("encoding to be set"), + ) + } + + pub fn into_raw(self) -> Envelope { + self.into() + } +} + +/// It's always safe to go back to Raw Envelope +impl From> for Envelope { + fn from(value: Envelope) -> Self { + let Envelope { inner, payload, .. } = value; + + Self { + inner, + payload, + phantom: PhantomData, + } + } +} + +/// Header is set on every message +#[derive(Debug, Clone, bilrost::Message)] +pub struct Header { + #[bilrost(1)] + pub source: Source, + + #[bilrost(2)] + pub dest: Destination, + + #[bilrost(3)] + pub dedup: Dedup, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, bilrost::Enumeration)] +pub enum RecordKind { + Unknown = 0, + + AnnounceLeader = 1, + /// A version barrier to fence off state machine changes that require a certain minimum + /// version of restate server. + /// *Since v1.4.0* + VersionBarrier = 2, + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. + /// See [`PartitionDurability`] for more details. + /// + /// *Since v1.4.2* + UpdatePartitionDurability = 3, + + // -- Partition processor commands + /// Manual patching of storage state + PatchState = 4, + /// Terminate an ongoing invocation + TerminateInvocation = 5, + /// Purge a completed invocation + PurgeInvocation = 6, + /// Purge a completed invocation journal + PurgeJournal = 7, + /// Start an invocation on this partition + Invoke = 8, + /// Truncate the message outbox up to, and including, the specified index. + TruncateOutbox = 9, + /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. + ProxyThrough = 10, + /// Attach to an existing invocation + AttachInvocation = 11, + /// Resume an invocation + ResumeInvocation = 12, + /// Restart as new invocation from prefix + RestartAsNewInvocation = 13, + // -- Partition processor events for PP + /// Invoker is reporting effect(s) from an ongoing invocation. + InvokerEffect = 14, + /// Timer has fired + Timer = 15, + /// Schedule timer + ScheduleTimer = 16, + /// Another partition processor is reporting a response of an invocation we requested. + /// + /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. + /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. + /// + /// For more details see `OnNotifyInvocationResponse`. + InvocationResponse = 17, + + // -- New PP <-> PP commands using Journal V2 + /// Notify Get invocation output + NotifyGetInvocationOutputResponse = 18, + /// Notify a signal. + NotifySignal = 19, +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct RawRecord { + #[bilrost(1)] + kind: RecordKind, + #[bilrost(2)] + encoding: Option, + #[bilrost(3)] + data: Bytes, +} + +/// Identifies the source of a message +#[derive(Debug, Clone, bilrost::Oneof, bilrost::Message)] +pub enum Source { + #[bilrost(empty)] + None, + + /// Message is sent from an ingress node + #[bilrost(tag = 1, message)] + Ingress, + + /// Message is sent from some control plane component (controller, cli, etc.) + #[bilrost(tag = 2, message)] + ControlPlane, + + /// Message is sent from another partition processor + #[bilrost(tag = 3, message)] + Processor { + /// if possible, this is used to reroute responses in case of splits/merges + /// Marked as `Option` in v1.5. Note that v1.4 requires this to be set but as of v1.6 + /// this can be safely set to `None`. + #[bilrost(1)] + partition_id: Option, + #[bilrost(2)] + partition_key: Option, + /// The current epoch of the partition leader. Readers should observe this to decide which + /// messages to accept. Readers should ignore messages coming from + /// epochs lower than the max observed for a given partition id. + #[bilrost(3)] + leader_epoch: LeaderEpoch, + }, +} + +/// Identifies the intended destination of the message +#[derive(Debug, Clone, bilrost::Oneof, bilrost::Message)] +pub enum Destination { + #[bilrost(empty)] + None, + + /// Message is sent to partition processor + #[bilrost(tag = 1, message)] + Processor { partition_key: PartitionKey }, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default, bilrost::Oneof, bilrost::Message)] +pub enum Dedup { + #[default] + None, + /// Sequence number to deduplicate messages sent by the same partition or a successor + /// of a previous partition (a successor partition will inherit the leader epoch of its + /// predecessor). + #[bilrost(tag(1), message)] + SelfProposal { + #[bilrost(0)] + leader_epoch: LeaderEpoch, + #[bilrost(1)] + seq: u64, + }, + /// Sequence number to deduplicate messages from a foreign partition. + #[bilrost(tag(2), message)] + ForeignPartition { + #[bilrost(0)] + partition: PartitionId, + #[bilrost(1)] + seq: u64, + }, + /// Sequence number to deduplicate messages from an arbitrary string prefix. + #[bilrost(tag(3), message)] + Arbitrary { + #[bilrost(0)] + prefix: String, + #[bilrost(1)] + seq: u64, + }, +} + +pub trait Record: sealed::Sealed + Sized { + const KIND: RecordKind; + type Payload: StorageDecode + 'static; + + /// Create an envelope with `this` record kind + /// given the header, keys and payload + fn envelope( + header: Header, + record_keys: Keys, + payload: Self::Payload, + ) -> Result, StorageEncodeError> + where + Self::Payload: StorageEncode, + { + Envelope::create(header, record_keys, payload) + } +} + +mod sealed { + pub trait Sealed {} +} + +pub mod records { + use restate_types::{ + invocation::{ + AttachInvocationRequest, GetInvocationOutputResponse, InvocationTermination, + NotifySignalRequest, PurgeInvocationRequest, RestartAsNewInvocationRequest, + ResumeInvocationRequest, ServiceInvocation, + }, + message::MessageIndexRecrod, + state_mut::ExternalStateMutation, + }; + + use super::sealed::Sealed; + use super::{Record, RecordKind}; + use crate::timer::TimerKeyValue; + + macro_rules! record { + {@name=$name:ident, @kind=$type:expr, @payload=$payload:path} => { + #[allow(dead_code)] + pub struct $name; + impl Sealed for $name{} + impl Record for $name { + const KIND: RecordKind = $type; + type Payload = $payload; + } + }; + } + + record! { + @name=AnnounceLeader, + @kind=RecordKind::AnnounceLeader, + @payload=crate::control::AnnounceLeader + } + + record! { + @name=VersionBarrier, + @kind=RecordKind::VersionBarrier, + @payload=crate::control::VersionBarrier + } + + record! { + @name=UpdatePartitionDurability, + @kind=RecordKind::UpdatePartitionDurability, + @payload=crate::control::PartitionDurability + } + + record! { + @name=PatchState, + @kind=RecordKind::PatchState, + @payload=ExternalStateMutation + } + + record! { + @name=TerminateInvocation, + @kind=RecordKind::TerminateInvocation, + @payload=InvocationTermination + } + + record! { + @name=PurgeInvocation, + @kind=RecordKind::PurgeInvocation, + @payload=PurgeInvocationRequest + } + + record! { + @name=PurgeJournal, + @kind=RecordKind::PurgeJournal, + @payload=PurgeInvocationRequest + } + + record! { + @name=Invoke, + @kind=RecordKind::Invoke, + @payload=ServiceInvocation + } + + record! { + @name=TruncateOutbox, + @kind=RecordKind::TruncateOutbox, + @payload=MessageIndexRecrod + } + + record! { + @name=ProxyThrough, + @kind=RecordKind::ProxyThrough, + @payload=ServiceInvocation + } + + record! { + @name=AttachInvocation, + @kind=RecordKind::AttachInvocation, + @payload=AttachInvocationRequest + } + + record! { + @name=ResumeInvocation, + @kind=RecordKind::ResumeInvocation, + @payload=ResumeInvocationRequest + } + + record! { + @name=RestartAsNewInvocation, + @kind=RecordKind::RestartAsNewInvocation, + @payload=RestartAsNewInvocationRequest + } + + record! { + @name=InvokerEffect, + @kind=RecordKind::InvokerEffect, + @payload=restate_invoker_api::Effect + } + + record! { + @name=Timer, + @kind=RecordKind::Timer, + @payload=TimerKeyValue + } + + record! { + @name=ScheduleTimer, + @kind=RecordKind::ScheduleTimer, + @payload=TimerKeyValue + } + + record! { + @name=InvocationResponse, + @kind=RecordKind::InvocationResponse, + @payload=restate_types::invocation::InvocationResponse + } + + record! { + @name=NotifyGetInvocationOutputResponse, + @kind=RecordKind::NotifyGetInvocationOutputResponse, + @payload=GetInvocationOutputResponse + } + + record! { + @name=NotifySignal, + @kind=RecordKind::NotifySignal, + @payload=NotifySignalRequest + } +} + +mod compatibility { + /// Compatibility module with v1. We probably can never drop this + /// code unless we are absolutely sure there is no more records + /// ever exited that are still using v1 + use anyhow::Context; + use bytes::Buf; + + use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; + use restate_types::storage::{StorageCodecKind, StorageDecode, StorageDecodeError}; + + use super::{ + Dedup, Destination, Envelope, EnvelopeInner, Header, Raw, Record, RecordKind, Source, + records, + }; + use crate::v1; + + fn decode_payload( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result { + ::decode(buf, kind) + } + + impl TryFrom
for v1::Header { + type Error = anyhow::Error; + + fn try_from(value: Header) -> Result { + let Header { + source, + dest, + dedup, + } = value; + + let source = match source { + Source::None => anyhow::bail!("Missing envelope header source"), + Source::Ingress => v1::Source::Ingress {}, + Source::ControlPlane => v1::Source::ControlPlane {}, + Source::Processor { + partition_id, + partition_key, + leader_epoch, + } => v1::Source::Processor { + partition_id, + partition_key, + leader_epoch, + }, + }; + + let dedup = match dedup { + Dedup::None => None, + Dedup::SelfProposal { leader_epoch, seq } => { + Some(DedupInformation::self_proposal(EpochSequenceNumber { + leader_epoch, + sequence_number: seq, + })) + } + Dedup::ForeignPartition { partition, seq } => { + Some(DedupInformation::cross_partition(partition, seq)) + } + Dedup::Arbitrary { prefix, seq } => Some(DedupInformation::ingress(prefix, seq)), + }; + + let dest = match dest { + Destination::None => anyhow::bail!("Missing envelope header destination"), + Destination::Processor { partition_key } => v1::Destination::Processor { + partition_key, + dedup, + }, + }; + + Ok(v1::Header { source, dest }) + } + } + + impl TryFrom> for v1::Envelope { + type Error = anyhow::Error; + + fn try_from(value: Envelope) -> Result { + let Envelope { + inner: + EnvelopeInner { + encoding, + header, + keys: _, + kind, + }, + mut payload, + .. + } = value; + + // todo: create a bilrost helpder for required fields so it failes + // during decoding. + let encoding = encoding.context("missing encoding")?; + + let command = match kind { + RecordKind::Unknown => anyhow::bail!("Unknown record kind"), + RecordKind::AnnounceLeader => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::AnnounceLeader(value.into()) + } + RecordKind::VersionBarrier => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::VersionBarrier(value) + } + RecordKind::UpdatePartitionDurability => { + let value = decode_payload::( + &mut payload, + encoding, + )?; + v1::Command::UpdatePartitionDurability(value) + } + RecordKind::PatchState => { + let value = decode_payload::(&mut payload, encoding)?; + v1::Command::PatchState(value) + } + RecordKind::TerminateInvocation => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::TerminateInvocation(value) + } + RecordKind::PurgeInvocation => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::PurgeInvocation(value) + } + RecordKind::PurgeJournal => { + let value = decode_payload::(&mut payload, encoding)?; + v1::Command::PurgeJournal(value) + } + RecordKind::Invoke => { + let value = decode_payload::(&mut payload, encoding)?; + v1::Command::Invoke(value.into()) + } + RecordKind::TruncateOutbox => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::TruncateOutbox(value.index) + } + RecordKind::ProxyThrough => { + let value = decode_payload::(&mut payload, encoding)?; + v1::Command::ProxyThrough(value.into()) + } + RecordKind::AttachInvocation => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::AttachInvocation(value) + } + RecordKind::ResumeInvocation => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::ResumeInvocation(value) + } + RecordKind::RestartAsNewInvocation => { + let value = decode_payload::( + &mut payload, + encoding, + )?; + v1::Command::RestartAsNewInvocation(value) + } + RecordKind::InvokerEffect => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::InvokerEffect(value.into()) + } + RecordKind::Timer => { + let value = decode_payload::(&mut payload, encoding)?; + v1::Command::Timer(value) + } + RecordKind::ScheduleTimer => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::ScheduleTimer(value) + } + RecordKind::InvocationResponse => { + let value = + decode_payload::(&mut payload, encoding)?; + v1::Command::InvocationResponse(value) + } + + RecordKind::NotifyGetInvocationOutputResponse => { + let value = decode_payload::( + &mut payload, + encoding, + )?; + v1::Command::NotifyGetInvocationOutputResponse(value) + } + + RecordKind::NotifySignal => { + let value = decode_payload::(&mut payload, encoding)?; + v1::Command::NotifySignal(value) + } + }; + + Ok(v1::Envelope::new(header.try_into()?, command)) + } + } +} + +#[cfg(test)] +mod test { + + use bytes::BytesMut; + + use restate_types::{ + GenerationalNodeId, + logs::Keys, + storage::{StorageCodecKind, StorageDecode, StorageEncode}, + }; + + use super::{Dedup, Destination, Envelope, Header, Source, records}; + use crate::{ + control::AnnounceLeader, + v2::{Raw, Record}, + }; + + #[test] + fn envelope_encode_decode() { + let header = Header { + dedup: Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + dest: Destination::Processor { + partition_key: 1234, + }, + source: Source::Ingress, + }; + + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: 0..=u64::MAX, + }; + + let envelope = + records::AnnounceLeader::envelope(header, Keys::Single(1000), payload.clone()) + .expect("to work"); + + let mut buf = BytesMut::new(); + envelope.encode(&mut buf).expect("to encode"); + + let envelope = + Envelope::::decode(&mut buf, StorageCodecKind::Custom).expect("to decode"); + + let mut typed = envelope.into_typed::(); + + let loaded_payload = typed.payload().expect("to decode"); + + assert_eq!(payload, loaded_payload); + } +}