diff --git a/Cargo.lock b/Cargo.lock index e0a426c1fd..2799b0164e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8238,6 +8238,7 @@ dependencies = [ "bilrost", "bytes", "bytestring", + "derive_more", "enum-map", "prost 0.14.1", "restate-invoker-api", diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index e4e850679f..79eea440f4 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -349,15 +349,26 @@ impl From for opentelemetry::trace::SpanId { /// Services are isolated by key. This means that there cannot be two concurrent /// invocations for the same service instance (service name, key). #[derive( - Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize, + Eq, + Hash, + PartialEq, + PartialOrd, + Ord, + Clone, + Debug, + serde::Serialize, + serde::Deserialize, + bilrost::Message, )] pub struct ServiceId { // TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows) /// Identifies the grpc service + #[bilrost(1)] pub service_name: ByteString, /// Identifies the service instance for the given service name + #[bilrost(2)] pub key: ByteString, - + #[bilrost(3)] partition_key: PartitionKey, } diff --git a/crates/types/src/invocation/mod.rs b/crates/types/src/invocation/mod.rs index 90fc59722a..e3afbf65bd 100644 --- a/crates/types/src/invocation/mod.rs +++ b/crates/types/src/invocation/mod.rs @@ -12,6 +12,18 @@ pub mod client; +use std::borrow::Cow; +use std::hash::Hash; +use std::ops::Deref; +use std::str::FromStr; +use std::time::Duration; +use std::{cmp, fmt}; + +use bytes::Bytes; +use bytestring::ByteString; +use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState}; +use serde_with::{DisplayFromStr, FromInto, serde_as}; + use crate::errors::InvocationError; use crate::identifiers::{ DeploymentId, EntryIndex, IdempotencyId, InvocationId, PartitionKey, @@ -21,17 +33,6 @@ use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal}; use crate::time::MillisSinceEpoch; use crate::{GenerationalNodeId, RestateVersion}; -use bytes::Bytes; -use bytestring::ByteString; -use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState}; -use serde_with::{DisplayFromStr, FromInto, serde_as}; -use std::borrow::Cow; -use std::hash::Hash; -use std::ops::Deref; -use std::str::FromStr; -use std::time::Duration; -use std::{cmp, fmt}; - // Re-exporting opentelemetry [`TraceId`] to avoid having to import opentelemetry in all crates. pub use opentelemetry::trace::TraceId; diff --git a/crates/types/src/state_mut.rs b/crates/types/src/state_mut.rs index 7a28be51cf..6426aca6e1 100644 --- a/crates/types/src/state_mut.rs +++ b/crates/types/src/state_mut.rs @@ -22,11 +22,14 @@ use crate::identifiers::ServiceId; /// ExternalStateMutation /// /// represents an external request to mutate a user's state. -#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Message)] pub struct ExternalStateMutation { + #[bilrost(1)] pub service_id: ServiceId, + #[bilrost(2)] pub version: Option, // flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs + #[bilrost(3)] #[serde_as(as = "serde_with::Seq<(_, _)>")] pub state: HashMap, } diff --git a/crates/types/src/storage.rs b/crates/types/src/storage.rs index e57e97c146..4cb7026b6e 100644 --- a/crates/types/src/storage.rs +++ b/crates/types/src/storage.rs @@ -27,6 +27,8 @@ use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry}; use crate::journal_v2::{Decoder, EntryMetadata, EntryType}; use crate::time::MillisSinceEpoch; +pub use tracing; + #[derive(Debug, thiserror::Error)] pub enum StorageEncodeError { #[error("encoding failed: {0}")] @@ -187,7 +189,7 @@ macro_rules! flexbuffers_storage_encode_decode { Self: Sized, { $crate::storage::decode::decode_serde(buf, kind).map_err(|err| { - ::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name)); + $crate::storage::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name)); err }) @@ -196,6 +198,40 @@ macro_rules! flexbuffers_storage_encode_decode { }; } +/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing +/// type using [`bilrost`]. +#[macro_export] +macro_rules! bilrost_storage_encode_decode { + ($name:tt) => { + impl $crate::storage::StorageEncode for $name { + fn default_codec(&self) -> $crate::storage::StorageCodecKind { + $crate::storage::StorageCodecKind::Bilrost + } + + fn encode( + &self, + buf: &mut ::bytes::BytesMut, + ) -> Result<(), $crate::storage::StorageEncodeError> { + bytes::BufMut::put(buf, $crate::storage::encode::encode_bilrost(self)); + Ok(()) + } + } + + impl $crate::storage::StorageDecode for $name { + fn decode( + buf: &mut B, + kind: $crate::storage::StorageCodecKind, + ) -> Result + where + Self: Sized, + { + debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost); + $crate::storage::decode::decode_bilrost(buf) + } + } + }; +} + /// A polymorphic container of a buffer or a cached storage-encodeable object #[derive(Clone, derive_more::Debug, BilrostAs)] #[bilrost_as(dto::PolyBytes)] diff --git a/crates/wal-protocol/Cargo.toml b/crates/wal-protocol/Cargo.toml index 82941fc1f2..2083ae1bd5 100644 --- a/crates/wal-protocol/Cargo.toml +++ b/crates/wal-protocol/Cargo.toml @@ -13,7 +13,6 @@ serde = ["dep:serde", "enum-map/serde", "bytestring/serde", "restate-invoker-api [dependencies] restate-workspace-hack = { workspace = true } - restate-invoker-api = { workspace = true } restate-storage-api = { workspace = true } restate-types = { workspace = true } @@ -22,6 +21,7 @@ anyhow = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } bilrost = { workspace = true } +derive_more = { workspace = true } enum-map = { workspace = true } serde = { workspace = true, optional = true } strum = { workspace = true } diff --git a/crates/wal-protocol/src/control.rs b/crates/wal-protocol/src/control.rs index 01bfa59368..3a9ad7a434 100644 --- a/crates/wal-protocol/src/control.rs +++ b/crates/wal-protocol/src/control.rs @@ -11,25 +11,38 @@ use std::ops::RangeInclusive; use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; -use restate_types::logs::{Keys, Lsn}; +use restate_types::logs::{HasRecordKeys, Keys, Lsn}; use restate_types::schema::Schema; use restate_types::time::MillisSinceEpoch; -use restate_types::{GenerationalNodeId, SemanticRestateVersion}; +use restate_types::{ + GenerationalNodeId, SemanticRestateVersion, bilrost_storage_encode_decode, + flexbuffers_storage_encode_decode, +}; /// Announcing a new leader. This message can be written by any component to make the specified /// partition processor the leader. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct AnnounceLeader { /// Sender of the announce leader message. /// /// This became non-optional in v1.5. Noting that it has always been set in previous versions, /// it's safe to assume that it's always set. + #[bilrost(1)] pub node_id: GenerationalNodeId, + #[bilrost(2)] pub leader_epoch: LeaderEpoch, + #[bilrost(3)] pub partition_key_range: RangeInclusive, } +bilrost_storage_encode_decode!(AnnounceLeader); + +impl HasRecordKeys for AnnounceLeader { + fn record_keys(&self) -> Keys { + Keys::RangeInclusive(self.partition_key_range.clone()) + } +} /// A version barrier to fence off state machine changes that require a certain minimum /// version of restate server. /// @@ -40,12 +53,23 @@ pub struct AnnounceLeader { #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct VersionBarrier { /// The minimum version required (inclusive) to progress after this barrier. + #[bilrost(1)] pub version: SemanticRestateVersion, /// A human-readable reason for why this barrier exists. + #[bilrost(2)] pub human_reason: Option, + #[bilrost(3)] pub partition_key_range: Keys, } +bilrost_storage_encode_decode!(VersionBarrier); + +impl HasRecordKeys for VersionBarrier { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. Note that durability /// only applies to partitions with the same `partition_id`. At replay time, the partition will /// ignore updates that are not targeted to its own ID. @@ -56,12 +80,26 @@ pub struct VersionBarrier { #[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct PartitionDurability { + #[bilrost(1)] pub partition_id: PartitionId, /// The partition has applied this LSN durably to the replica-set and/or has been /// persisted in a snapshot in the snapshot repository. + #[bilrost(2)] pub durable_point: Lsn, /// Timestamp which the durability point was updated + #[bilrost(3)] pub modification_time: MillisSinceEpoch, + /// partition key range + #[bilrost(4)] + pub partition_key_range: Keys, +} + +bilrost_storage_encode_decode!(PartitionDurability); + +impl HasRecordKeys for PartitionDurability { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } } /// Consistently store schema across partition replicas. @@ -73,3 +111,11 @@ pub struct UpsertSchema { pub partition_key_range: Keys, pub schema: Schema, } + +flexbuffers_storage_encode_decode!(UpsertSchema); + +impl HasRecordKeys for UpsertSchema { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} diff --git a/crates/wal-protocol/src/lib.rs b/crates/wal-protocol/src/lib.rs index 43d9491006..867c908cce 100644 --- a/crates/wal-protocol/src/lib.rs +++ b/crates/wal-protocol/src/lib.rs @@ -8,244 +8,9 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use restate_storage_api::deduplication_table::DedupInformation; -use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; -use restate_types::invocation::{ - AttachInvocationRequest, GetInvocationOutputResponse, InvocationResponse, - InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, - RestartAsNewInvocationRequest, ResumeInvocationRequest, ServiceInvocation, -}; -use restate_types::logs::{self, HasRecordKeys, Keys, MatchKeyQuery}; -use restate_types::message::MessageIndex; -use restate_types::state_mut::ExternalStateMutation; - -use crate::control::{AnnounceLeader, UpsertSchema, VersionBarrier}; -use crate::timer::TimerKeyValue; - -use self::control::PartitionDurability; - pub mod control; pub mod timer; +mod v1; +pub mod v2; -/// The primary envelope for all messages in the system. -#[derive(Debug, Clone)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Envelope { - pub header: Header, - pub command: Command, -} - -#[cfg(feature = "serde")] -restate_types::flexbuffers_storage_encode_decode!(Envelope); - -impl Envelope { - pub fn new(header: Header, command: Command) -> Self { - Self { header, command } - } -} - -/// Header is set on every message -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Header { - pub source: Source, - pub dest: Destination, -} - -/// Identifies the source of a message -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Source { - /// Message is sent from another partition processor - Processor { - /// if possible, this is used to reroute responses in case of splits/merges - /// v1.4 requires this to be set. - /// v1.5 Marked as `Option`. - /// v1.6 always set to `None`. - /// Will be removed in v1.7. - #[cfg_attr(feature = "serde", serde(default))] - partition_id: Option, - #[cfg_attr(feature = "serde", serde(default))] - partition_key: Option, - /// The current epoch of the partition leader. Readers should observe this to decide which - /// messages to accept. Readers should ignore messages coming from - /// epochs lower than the max observed for a given partition id. - leader_epoch: LeaderEpoch, - // Which node is this message from? - // First deprecation in v1.1, but since v1.5 we switched to Option and it's - // still being set to Some(v) to maintain compatibility with v1.4. - // - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // node_id: Option, - - // From v1.1 this is always set, but maintained to support rollback to v1.0. - // Deprecated(v1.5): It's set to Some(v) to maintain support for v1.4 but - // will be removed in v1.6. Commands that need the node-id of the sender should - // include the node-id in the command payload itself (e.g. in the [`AnnounceLeader`]) - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // generational_node_id: Option, - }, - /// Message is sent from an ingress node - Ingress { - // The identity of the sender node. Generational for fencing. Ingress is - // stateless, so we shouldn't respond to requests from older generation - // if a new generation is alive. - // - // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. - // but will be removed in v1.6. - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // node_id: Option, - - // Last config version observed by sender. If this is a newer generation - // or an unknown ID, we might need to update our config. - // - // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. - // but will be removed in v1.6. - // v1.6 field is removed. -- Kept here for reference only. - // #[cfg_attr(feature = "serde", serde(default))] - // nodes_config_version: Option, - }, - /// Message is sent from some control plane component (controller, cli, etc.) - ControlPlane { - // Reserved for future use. - }, -} - -/// Identifies the intended destination of the message -#[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Destination { - /// Message is sent to partition processor - Processor { - partition_key: PartitionKey, - #[cfg_attr(feature = "serde", serde(default))] - dedup: Option, - }, -} - -/// State machine input commands -#[derive(Debug, Clone, strum::EnumDiscriminants, strum::VariantNames)] -#[strum_discriminants(derive(strum::IntoStaticStr))] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum Command { - /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. - /// See [`PartitionDurability`] for more details. - /// - /// *Since v1.4.2* - UpdatePartitionDurability(PartitionDurability), - /// A version barrier to fence off state machine changes that require a certain minimum - /// version of restate server. - /// *Since v1.4.0* - VersionBarrier(VersionBarrier), - // -- Control-plane related events - AnnounceLeader(Box), - - // -- Partition processor commands - /// Manual patching of storage state - PatchState(ExternalStateMutation), - /// Terminate an ongoing invocation - TerminateInvocation(InvocationTermination), - /// Purge a completed invocation - PurgeInvocation(PurgeInvocationRequest), - /// Purge a completed invocation journal - PurgeJournal(PurgeInvocationRequest), - /// Start an invocation on this partition - Invoke(Box), - /// Truncate the message outbox up to, and including, the specified index. - TruncateOutbox(MessageIndex), - /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. - ProxyThrough(Box), - /// Attach to an existing invocation - AttachInvocation(AttachInvocationRequest), - /// Resume an invocation - ResumeInvocation(ResumeInvocationRequest), - /// Restart as new invocation from prefix - RestartAsNewInvocation(RestartAsNewInvocationRequest), - - // -- Partition processor events for PP - /// Invoker is reporting effect(s) from an ongoing invocation. - InvokerEffect(Box), - /// Timer has fired - Timer(TimerKeyValue), - /// Schedule timer - ScheduleTimer(TimerKeyValue), - /// Another partition processor is reporting a response of an invocation we requested. - /// - /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. - /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. - /// - /// For more details see `OnNotifyInvocationResponse`. - InvocationResponse(InvocationResponse), - - // -- New PP <-> PP commands using Journal V2 - /// Notify Get invocation output - NotifyGetInvocationOutputResponse(GetInvocationOutputResponse), - /// Notify a signal. - NotifySignal(NotifySignalRequest), - - /// Upsert schema for consistent schema across replicas - /// *Since v1.6.0 - UpsertSchema(UpsertSchema), -} - -impl Command { - pub fn name(&self) -> &'static str { - CommandDiscriminants::from(self).into() - } -} - -impl WithPartitionKey for Envelope { - fn partition_key(&self) -> PartitionKey { - match self.header.dest { - Destination::Processor { partition_key, .. } => partition_key, - } - } -} - -impl HasRecordKeys for Envelope { - fn record_keys(&self) -> logs::Keys { - match &self.command { - // the partition_key is used as key here since the command targets the partition by ID. - // Partitions will ignore this message at read time if the paritition ID (in body) - // does not match. Alternatively, we could use the partition key range or `Keys::None` - // but this would just be a waste of effort for readers after a partition has been - // split or if the log is shared between multiple partitions. - Command::UpdatePartitionDurability(_) => Keys::Single(self.partition_key()), - Command::VersionBarrier(barrier) => barrier.partition_key_range.clone(), - Command::AnnounceLeader(announce) => { - Keys::RangeInclusive(announce.partition_key_range.clone()) - } - Command::PatchState(mutation) => Keys::Single(mutation.service_id.partition_key()), - Command::TerminateInvocation(terminate) => { - Keys::Single(terminate.invocation_id.partition_key()) - } - Command::PurgeInvocation(purge) => Keys::Single(purge.invocation_id.partition_key()), - Command::PurgeJournal(purge) => Keys::Single(purge.invocation_id.partition_key()), - Command::Invoke(invoke) => Keys::Single(invoke.partition_key()), - // todo: Remove this, or pass the partition key range but filter based on partition-id - // on read if needed. - Command::TruncateOutbox(_) => Keys::Single(self.partition_key()), - Command::ProxyThrough(_) => Keys::Single(self.partition_key()), - Command::AttachInvocation(_) => Keys::Single(self.partition_key()), - Command::ResumeInvocation(req) => Keys::Single(req.partition_key()), - Command::RestartAsNewInvocation(req) => Keys::Single(req.partition_key()), - // todo: Handle journal entries that request cross-partition invocations - Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()), - Command::Timer(timer) => Keys::Single(timer.invocation_id().partition_key()), - Command::ScheduleTimer(timer) => Keys::Single(timer.invocation_id().partition_key()), - Command::InvocationResponse(response) => Keys::Single(response.partition_key()), - Command::NotifySignal(sig) => Keys::Single(sig.partition_key()), - Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()), - Command::UpsertSchema(schema) => schema.partition_key_range.clone(), - } - } -} - -impl MatchKeyQuery for Envelope { - fn matches_key_query(&self, query: &logs::KeyFilter) -> bool { - self.record_keys().matches_key_query(query) - } -} +pub use v1::{Command, Destination, Envelope, Header, Source}; diff --git a/crates/wal-protocol/src/v1.rs b/crates/wal-protocol/src/v1.rs new file mode 100644 index 0000000000..66ff9f95fd --- /dev/null +++ b/crates/wal-protocol/src/v1.rs @@ -0,0 +1,246 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_storage_api::deduplication_table::DedupInformation; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey}; +use restate_types::invocation::{ + AttachInvocationRequest, GetInvocationOutputResponse, InvocationResponse, + InvocationTermination, NotifySignalRequest, PurgeInvocationRequest, + RestartAsNewInvocationRequest, ResumeInvocationRequest, ServiceInvocation, +}; +use restate_types::logs::{self, HasRecordKeys, Keys, MatchKeyQuery}; +use restate_types::message::MessageIndex; +use restate_types::state_mut::ExternalStateMutation; + +use crate::control::{AnnounceLeader, PartitionDurability, UpsertSchema, VersionBarrier}; +use crate::timer::TimerKeyValue; + +/// The primary envelope for all messages in the system. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Envelope { + pub header: Header, + pub command: Command, +} + +#[cfg(feature = "serde")] +restate_types::flexbuffers_storage_encode_decode!(Envelope); + +impl Envelope { + pub fn new(header: Header, command: Command) -> Self { + Self { header, command } + } +} + +/// Header is set on every message +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct Header { + pub source: Source, + pub dest: Destination, +} + +/// Identifies the source of a message +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Source { + /// Message is sent from another partition processor + Processor { + /// if possible, this is used to reroute responses in case of splits/merges + /// v1.4 requires this to be set. + /// v1.5 Marked as `Option`. + /// v1.6 always set to `None`. + /// Will be removed in v1.7. + #[cfg_attr(feature = "serde", serde(default))] + partition_id: Option, + #[cfg_attr(feature = "serde", serde(default))] + partition_key: Option, + /// The current epoch of the partition leader. Readers should observe this to decide which + /// messages to accept. Readers should ignore messages coming from + /// epochs lower than the max observed for a given partition id. + leader_epoch: LeaderEpoch, + // Which node is this message from? + // First deprecation in v1.1, but since v1.5 we switched to Option and it's + // still being set to Some(v) to maintain compatibility with v1.4. + // + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // node_id: Option, + + // From v1.1 this is always set, but maintained to support rollback to v1.0. + // Deprecated(v1.5): It's set to Some(v) to maintain support for v1.4 but + // will be removed in v1.6. Commands that need the node-id of the sender should + // include the node-id in the command payload itself (e.g. in the [`AnnounceLeader`]) + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // generational_node_id: Option, + }, + /// Message is sent from an ingress node + Ingress { + // The identity of the sender node. Generational for fencing. Ingress is + // stateless, so we shouldn't respond to requests from older generation + // if a new generation is alive. + // + // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. + // but will be removed in v1.6. + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // node_id: Option, + + // Last config version observed by sender. If this is a newer generation + // or an unknown ID, we might need to update our config. + // + // Deprecated(v1.5): This field is set to Some(v) to maintain compatibility with v1.4. + // but will be removed in v1.6. + // v1.6 field is removed. -- Kept here for reference only. + // #[cfg_attr(feature = "serde", serde(default))] + // nodes_config_version: Option, + }, + /// Message is sent from some control plane component (controller, cli, etc.) + ControlPlane { + // Reserved for future use. + }, +} + +/// Identifies the intended destination of the message +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Destination { + /// Message is sent to partition processor + Processor { + partition_key: PartitionKey, + #[cfg_attr(feature = "serde", serde(default))] + dedup: Option, + }, +} + +/// State machine input commands +#[derive(Debug, Clone, strum::EnumDiscriminants, strum::VariantNames)] +#[strum_discriminants(derive(strum::IntoStaticStr))] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Command { + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. + /// See [`PartitionDurability`] for more details. + /// + /// *Since v1.4.2* + UpdatePartitionDurability(PartitionDurability), + /// A version barrier to fence off state machine changes that require a certain minimum + /// version of restate server. + /// *Since v1.4.0* + VersionBarrier(VersionBarrier), + // -- Control-plane related events + AnnounceLeader(Box), + + // -- Partition processor commands + /// Manual patching of storage state + PatchState(ExternalStateMutation), + /// Terminate an ongoing invocation + TerminateInvocation(InvocationTermination), + /// Purge a completed invocation + PurgeInvocation(PurgeInvocationRequest), + /// Purge a completed invocation journal + PurgeJournal(PurgeInvocationRequest), + /// Start an invocation on this partition + Invoke(Box), + /// Truncate the message outbox up to, and including, the specified index. + TruncateOutbox(MessageIndex), + /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. + ProxyThrough(Box), + /// Attach to an existing invocation + AttachInvocation(AttachInvocationRequest), + /// Resume an invocation + ResumeInvocation(ResumeInvocationRequest), + /// Restart as new invocation from prefix + RestartAsNewInvocation(RestartAsNewInvocationRequest), + + // -- Partition processor events for PP + /// Invoker is reporting effect(s) from an ongoing invocation. + InvokerEffect(Box), + /// Timer has fired + Timer(TimerKeyValue), + /// Schedule timer + ScheduleTimer(TimerKeyValue), + /// Another partition processor is reporting a response of an invocation we requested. + /// + /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. + /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. + /// + /// For more details see `OnNotifyInvocationResponse`. + InvocationResponse(InvocationResponse), + + // -- New PP <-> PP commands using Journal V2 + /// Notify Get invocation output + NotifyGetInvocationOutputResponse(GetInvocationOutputResponse), + /// Notify a signal. + NotifySignal(NotifySignalRequest), + + /// Upsert schema for consistent schema across replicas + /// *Since v1.6.0 + UpsertSchema(UpsertSchema), +} + +impl Command { + pub fn name(&self) -> &'static str { + CommandDiscriminants::from(self).into() + } +} + +impl WithPartitionKey for Envelope { + fn partition_key(&self) -> PartitionKey { + match self.header.dest { + Destination::Processor { partition_key, .. } => partition_key, + } + } +} + +impl HasRecordKeys for Envelope { + fn record_keys(&self) -> logs::Keys { + match &self.command { + // the partition_key is used as key here since the command targets the partition by ID. + // Partitions will ignore this message at read time if the paritition ID (in body) + // does not match. Alternatively, we could use the partition key range or `Keys::None` + // but this would just be a waste of effort for readers after a partition has been + // split or if the log is shared between multiple partitions. + Command::UpdatePartitionDurability(_) => Keys::Single(self.partition_key()), + Command::VersionBarrier(barrier) => barrier.partition_key_range.clone(), + Command::AnnounceLeader(announce) => { + Keys::RangeInclusive(announce.partition_key_range.clone()) + } + Command::PatchState(mutation) => Keys::Single(mutation.service_id.partition_key()), + Command::TerminateInvocation(terminate) => { + Keys::Single(terminate.invocation_id.partition_key()) + } + Command::PurgeInvocation(purge) => Keys::Single(purge.invocation_id.partition_key()), + Command::PurgeJournal(purge) => Keys::Single(purge.invocation_id.partition_key()), + Command::Invoke(invoke) => Keys::Single(invoke.partition_key()), + // todo: Remove this, or pass the partition key range but filter based on partition-id + // on read if needed. + Command::TruncateOutbox(_) => Keys::Single(self.partition_key()), + Command::ProxyThrough(_) => Keys::Single(self.partition_key()), + Command::AttachInvocation(req) => Keys::Single(req.partition_key()), + Command::ResumeInvocation(req) => Keys::Single(req.partition_key()), + Command::RestartAsNewInvocation(req) => Keys::Single(req.partition_key()), + // todo: Handle journal entries that request cross-partition invocations + Command::InvokerEffect(effect) => Keys::Single(effect.invocation_id.partition_key()), + Command::Timer(timer) => Keys::Single(timer.invocation_id().partition_key()), + Command::ScheduleTimer(timer) => Keys::Single(timer.invocation_id().partition_key()), + Command::InvocationResponse(response) => Keys::Single(response.partition_key()), + Command::NotifySignal(sig) => Keys::Single(sig.partition_key()), + Command::NotifyGetInvocationOutputResponse(res) => Keys::Single(res.partition_key()), + Command::UpsertSchema(schema) => schema.partition_key_range.clone(), + } + } +} + +impl MatchKeyQuery for Envelope { + fn matches_key_query(&self, query: &logs::KeyFilter) -> bool { + self.record_keys().matches_key_query(query) + } +} diff --git a/crates/wal-protocol/src/v2.rs b/crates/wal-protocol/src/v2.rs new file mode 100644 index 0000000000..da0450af80 --- /dev/null +++ b/crates/wal-protocol/src/v2.rs @@ -0,0 +1,509 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::marker::PhantomData; +use std::sync::Arc; + +use bilrost::encoding::encoded_len_varint; +use bilrost::{Message, OwnedMessage}; +use bytes::{BufMut, BytesMut}; + +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; +use restate_types::storage::{ + PolyBytes, StorageCodec, StorageCodecKind, StorageDecode, StorageDecodeError, StorageEncode, + StorageEncodeError, +}; + +use crate::v1; + +mod compatibility; +pub mod records; + +mod sealed { + pub trait Sealed {} +} + +/// Metadata that accompanies every WAL record and carries routing, deduplication, +/// and serialization details required to interpret the payload. +#[derive(Debug, Clone, bilrost::Message)] +pub struct Header { + #[bilrost(1)] + source: Source, + #[bilrost(3)] + dedup: Dedup, + #[bilrost(5)] + kind: RecordKind, +} + +impl Header { + pub fn source(&self) -> &Source { + &self.source + } + + pub fn dedup(&self) -> &Dedup { + &self.dedup + } + + pub fn kind(&self) -> RecordKind { + self.kind + } +} + +impl StorageDecode for Header { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + // we use custom encoding because it's the length delimited version + // of bilrost + debug_assert_eq!(kind, StorageCodecKind::Custom); + + Self::decode_length_delimited(buf) + .map_err(|err| StorageDecodeError::DecodeValue(err.into())) + } +} + +/// Outgoing envelope used when you are sending out records +/// over bifrost. +#[derive(Clone, derive_more::Deref)] +pub struct Envelope { + #[deref] + header: Header, + payload: PolyBytes, + _p: PhantomData, +} + +impl Envelope { + pub fn header(&self) -> &Header { + &self.header + } +} + +impl Envelope { + pub fn new(source: Source, dedup: Dedup, payload: R::Payload) -> Self { + Self { + header: Header { + source, + dedup, + kind: R::KIND, + }, + payload: PolyBytes::Typed(Arc::new(payload)), + _p: PhantomData, + } + } +} + +impl StorageEncode for Envelope { + fn default_codec(&self) -> StorageCodecKind { + StorageCodecKind::Custom + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + let len = self.header.encoded_len(); + buf.reserve(encoded_len_varint(len as u64) + len); + + self.header + .encode_length_delimited(buf) + .map_err(|err| StorageEncodeError::EncodeValue(err.into()))?; + + match &self.payload { + PolyBytes::Bytes(bytes) => buf.put_slice(bytes), + PolyBytes::Typed(payload) => StorageCodec::encode(payload.as_ref(), buf)?, + } + + Ok(()) + } +} + +/// Marker type used with [`IncomingEnvelope`] to signal that the payload has not been +/// decoded into a typed record yet. +#[derive(Clone, Copy)] +pub struct Raw; + +impl StorageDecode for Envelope { + fn decode( + buf: &mut B, + kind: StorageCodecKind, + ) -> Result + where + Self: Sized, + { + match kind { + StorageCodecKind::FlexbuffersSerde => { + let envelope = v1::Envelope::decode(buf, kind)?; + Self::try_from(envelope).map_err(|err| StorageDecodeError::DecodeValue(err.into())) + } + StorageCodecKind::Custom => { + let header = StorageDecode::decode(buf, StorageCodecKind::Custom)?; + + Ok(Self { + header, + payload: PolyBytes::Bytes(buf.copy_to_bytes(buf.remaining())), + _p: PhantomData, + }) + } + _ => { + panic!("unsupported encoding"); + } + } + } +} + +impl Envelope { + /// Converts Raw Envelope into a Typed envelope. Panics + /// if the record kind does not match the M::KIND + pub fn into_typed(self) -> Envelope { + assert_eq!(self.header.kind, M::KIND); + + let Self { + header, payload, .. + } = self; + + Envelope { + header, + payload, + _p: PhantomData, + } + } +} + +impl Envelope +where + M::Payload: Clone, +{ + /// return the envelope payload + pub fn split(self) -> Result<(Header, M::Payload), StorageDecodeError> { + let payload = match self.payload { + PolyBytes::Bytes(mut bytes) => StorageCodec::decode(&mut bytes)?, + PolyBytes::Typed(typed) => { + let typed = typed.downcast_arc::().map_err(|_| { + StorageDecodeError::DecodeValue("Type mismatch. Original value in PolyBytes::Typed does not match requested type".into()) + })?; + + match Arc::try_unwrap(typed) { + Ok(value) => value, + Err(arc) => arc.as_ref().clone(), + } + } + }; + + Ok((self.header, payload)) + } + + pub fn into_inner(self) -> Result { + self.split().map(|v| v.1) + } +} + +impl Envelope { + pub fn into_raw(self) -> Envelope { + Envelope { + header: self.header, + payload: self.payload, + _p: PhantomData, + } + } +} + +/// Enumerates the logical categories of WAL records that the partition +/// processor understands. +#[derive(Debug, Clone, Copy, PartialEq, Eq, bilrost::Enumeration, strum::Display)] +pub enum RecordKind { + Unknown = 0, + + AnnounceLeader = 1, + /// A version barrier to fence off state machine changes that require a certain minimum + /// version of restate server. + /// *Since v1.4.0* + VersionBarrier = 2, + /// Updates the `PARTITION_DURABILITY` FSM variable to the given value. + /// See [`PartitionDurability`] for more details. + /// + /// *Since v1.4.2* + UpdatePartitionDurability = 3, + + // -- Partition processor commands + /// Manual patching of storage state + PatchState = 4, + /// Terminate an ongoing invocation + TerminateInvocation = 5, + /// Purge a completed invocation + PurgeInvocation = 6, + /// Purge a completed invocation journal + PurgeJournal = 7, + /// Start an invocation on this partition + Invoke = 8, + /// Truncate the message outbox up to, and including, the specified index. + TruncateOutbox = 9, + /// Proxy a service invocation through this partition processor, to reuse the deduplication id map. + ProxyThrough = 10, + /// Attach to an existing invocation + AttachInvocation = 11, + /// Resume an invocation + ResumeInvocation = 12, + /// Restart as new invocation from prefix + RestartAsNewInvocation = 13, + // -- Partition processor events for PP + /// Invoker is reporting effect(s) from an ongoing invocation. + InvokerEffect = 14, + /// Timer has fired + Timer = 15, + /// Schedule timer + ScheduleTimer = 16, + /// Another partition processor is reporting a response of an invocation we requested. + /// + /// KINDA DEPRECATED: When Journal Table V1 is removed, this command should be used only to reply to invocations. + /// Now it's abused for a bunch of other scenarios, like replying to get promise and get invocation output. + /// + /// For more details see `OnNotifyInvocationResponse`. + InvocationResponse = 17, + + // -- New PP <-> PP commands using Journal V2 + /// Notify Get invocation output + NotifyGetInvocationOutputResponse = 18, + /// Notify a signal. + NotifySignal = 19, + + /// UpsertSchema + UpsertSchema = 20, +} + +/// Identifies which subsystem produced a given WAL record. +#[derive(Debug, Clone, PartialEq, Eq, bilrost::Oneof, bilrost::Message)] +pub enum Source { + /// Message is sent from an ingress node + #[bilrost(empty)] + Ingress, + + /// Message is sent from some control plane component (controller, cli, etc.) + #[bilrost(tag = 1, message)] + ControlPlane, + + /// Message is sent from another partition processor + #[bilrost(tag = 2, message)] + Processor { + /// if possible, this is used to reroute responses in case of splits/merges + /// Marked as `Option` in v1.5. Note that v1.4 requires this to be set but as of v1.6 + /// this can be safely set to `None`. + #[bilrost(1)] + partition_id: Option, + #[bilrost(2)] + partition_key: Option, + /// The current epoch of the partition leader. Readers should observe this to decide which + /// messages to accept. Readers should ignore messages coming from + /// epochs lower than the max observed for a given partition id. + #[bilrost(3)] + leader_epoch: LeaderEpoch, + }, +} + +/// Specifies the deduplication strategy that allows receivers to discard +/// duplicate WAL records safely. +#[derive(Debug, Clone, PartialEq, Eq, Default, bilrost::Oneof, bilrost::Message)] +pub enum Dedup { + #[default] + None, + /// Sequence number to deduplicate messages sent by the same partition or a successor + /// of a previous partition (a successor partition will inherit the leader epoch of its + /// predecessor). + #[bilrost(tag(1), message)] + SelfProposal { + #[bilrost(0)] + leader_epoch: LeaderEpoch, + #[bilrost(1)] + seq: u64, + }, + /// Sequence number to deduplicate messages from a foreign partition. + #[bilrost(tag(2), message)] + ForeignPartition { + #[bilrost(0)] + partition: PartitionId, + #[bilrost(1)] + seq: u64, + }, + /// Sequence number to deduplicate messages from an arbitrary string prefix. + #[bilrost(tag(3), message)] + Arbitrary { + #[bilrost(0)] + prefix: String, + #[bilrost(1)] + seq: u64, + }, +} + +/// Marker trait implemented by strongly-typed representations of WAL record +/// payloads. +pub trait Record: sealed::Sealed + Send + Sync + Clone + Copy + Sized { + const KIND: RecordKind; + type Payload: StorageEncode + StorageDecode + 'static; + + /// Create an envelope with `this` record kind + /// given the header, keys and payload + fn new(source: Source, dedup: Dedup, payload: impl Into) -> Envelope + where + Self::Payload: StorageEncode, + { + Envelope::new(source, dedup, payload.into()) + } + + /// Creates a new test envelope. Shortcut for new(Source::Ingress, Dedup::None, payload) + // #[cfg(test)] + fn new_test(payload: impl Into) -> Envelope + where + Self::Payload: StorageEncode, + { + let record = Self::new(Source::Ingress, Dedup::None, payload); + record.into_raw() + } +} + +#[cfg(test)] +mod test { + + use bytes::BytesMut; + + use restate_types::{GenerationalNodeId, storage::StorageCodec}; + + use super::{Dedup, Header, Source, records}; + use crate::{ + control::AnnounceLeader, + v1, + v2::{Envelope, Raw, Record, RecordKind}, + }; + + #[test] + fn envelope_encode_decode() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: 0..=u64::MAX, + }; + + let envelope = records::AnnounceLeader::new( + Source::Ingress, + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + payload.clone(), + ); + + let mut buf = BytesMut::new(); + StorageCodec::encode(&envelope, &mut buf).expect("to encode"); + + let envelope: Envelope = StorageCodec::decode(&mut buf).expect("to decode"); + + assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); + let typed = envelope.into_typed::(); + + let (_, loaded_payload) = typed.split().expect("to decode"); + + assert_eq!(payload, loaded_payload); + } + + #[test] + fn header_decode_discard_payload() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: 0..=u64::MAX, + }; + + let envelope = records::AnnounceLeader::new( + Source::Ingress, + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + payload.clone(), + ); + + let mut buf = BytesMut::new(); + StorageCodec::encode(&envelope, &mut buf).expect("to encode"); + + let mut slice = &buf[..]; + // decode header only and discard the rest of the envelope + let header: Header = StorageCodec::decode(&mut slice).expect("to decode"); + + assert_eq!(header.source, Source::Ingress); + + assert_eq!( + header.dedup, + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120 + } + ); + } + + #[test] + fn envelope_skip_encode() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: 0..=u64::MAX, + }; + + let envelope = records::AnnounceLeader::new( + Source::Ingress, + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + payload.clone(), + ); + + // assert_eq!(envelope.record_keys(), Keys::RangeInclusive(0..=u64::MAX)); + + let envelope = envelope.into_raw(); + + assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); + let typed = envelope.into_typed::(); + + let (_, loaded_payload) = typed.split().expect("to decode"); + + assert_eq!(payload, loaded_payload); + } + + #[test] + fn envelope_from_to_v1() { + let payload = AnnounceLeader { + leader_epoch: 11.into(), + node_id: GenerationalNodeId::new(1, 3), + partition_key_range: 0..=u64::MAX, + }; + + let envelope = records::AnnounceLeader::new( + Source::Ingress, + Dedup::SelfProposal { + leader_epoch: 10.into(), + seq: 120, + }, + payload.clone(), + ); + + let env_v1: v1::Envelope = envelope.try_into().expect("to work"); + + let mut buf = BytesMut::new(); + StorageCodec::encode(&env_v1, &mut buf).expect("to encode"); + + let envelope: Envelope = StorageCodec::decode(&mut buf).expect("to decode"); + + assert_eq!(envelope.kind(), RecordKind::AnnounceLeader); + let typed = envelope.into_typed::(); + + let (_, loaded_payload) = typed.split().expect("to decode"); + + assert_eq!(payload, loaded_payload); + } +} diff --git a/crates/wal-protocol/src/v2/compatibility.rs b/crates/wal-protocol/src/v2/compatibility.rs new file mode 100644 index 0000000000..e1dd656e3c --- /dev/null +++ b/crates/wal-protocol/src/v2/compatibility.rs @@ -0,0 +1,507 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_storage_api::deduplication_table::{ + DedupInformation, DedupSequenceNumber, EpochSequenceNumber, ProducerId, +}; +use restate_types::{ + logs::{HasRecordKeys, Keys}, + storage::StorageDecodeError, +}; + +use super::{Raw, records}; +use crate::{ + v1, + v2::{ + self, Record, + records::{ProxyThroughRequest, TruncateOutboxRequest}, + }, +}; + +impl TryFrom for v2::Envelope { + type Error = anyhow::Error; + + fn try_from(value: v1::Envelope) -> Result { + let source = match value.header.source { + v1::Source::Ingress {} => v2::Source::Ingress, + v1::Source::Processor { + partition_id, + partition_key, + leader_epoch, + } => v2::Source::Processor { + partition_id, + partition_key, + leader_epoch, + }, + v1::Source::ControlPlane {} => v2::Source::ControlPlane, + }; + + let v1::Destination::Processor { + dedup, + partition_key, + } = value.header.dest; + + let dedup = match dedup { + None => v2::Dedup::None, + Some(info) => match (info.producer_id, info.sequence_number) { + (ProducerId::Partition(id), DedupSequenceNumber::Sn(seq)) => { + v2::Dedup::ForeignPartition { partition: id, seq } + } + (ProducerId::Partition(_), _) => anyhow::bail!("invalid deduplication information"), + (ProducerId::Other(_), DedupSequenceNumber::Esn(sn)) => v2::Dedup::SelfProposal { + leader_epoch: sn.leader_epoch, + seq: sn.sequence_number, + }, + (ProducerId::Other(prefix), DedupSequenceNumber::Sn(seq)) => v2::Dedup::Arbitrary { + prefix: prefix.into(), + seq, + }, + }, + }; + + let envelope = match value.command { + v1::Command::AnnounceLeader(payload) => { + records::AnnounceLeader::new(source, dedup, *payload).into_raw() + } + v1::Command::AttachInvocation(payload) => { + records::AttachInvocation::new(source, dedup, payload).into_raw() + } + v1::Command::InvocationResponse(payload) => { + records::InvocationResponse::new(source, dedup, payload).into_raw() + } + v1::Command::Invoke(payload) => records::Invoke::new(source, dedup, payload).into_raw(), + v1::Command::InvokerEffect(payload) => { + records::InvokerEffect::new(source, dedup, payload).into_raw() + } + v1::Command::NotifyGetInvocationOutputResponse(payload) => { + records::NotifyGetInvocationOutputResponse::new(source, dedup, payload).into_raw() + } + v1::Command::NotifySignal(payload) => { + records::NotifySignal::new(source, dedup, payload).into_raw() + } + v1::Command::PatchState(payload) => { + records::PatchState::new(source, dedup, payload).into_raw() + } + v1::Command::ProxyThrough(payload) => records::ProxyThrough::new( + source, + dedup, + ProxyThroughRequest { + invocation: payload.into(), + proxy_partition: Keys::Single(partition_key), + }, + ) + .into_raw(), + v1::Command::PurgeInvocation(payload) => { + records::PurgeInvocation::new(source, dedup, payload).into_raw() + } + v1::Command::PurgeJournal(payload) => { + records::PurgeJournal::new(source, dedup, payload).into_raw() + } + v1::Command::RestartAsNewInvocation(payload) => { + records::RestartAsNewInvocation::new(source, dedup, payload).into_raw() + } + v1::Command::ResumeInvocation(payload) => { + records::ResumeInvocation::new(source, dedup, payload).into_raw() + } + v1::Command::ScheduleTimer(payload) => { + records::ScheduleTimer::new(source, dedup, payload).into_raw() + } + v1::Command::TerminateInvocation(payload) => { + records::TerminateInvocation::new(source, dedup, payload).into_raw() + } + v1::Command::Timer(payload) => records::Timer::new(source, dedup, payload).into_raw(), + v1::Command::TruncateOutbox(payload) => records::TruncateOutbox::new( + source, + dedup, + TruncateOutboxRequest { + index: payload, + // this actually should be a key-range but v1 unfortunately + // only hold the "start" of the range. + // will be fixed in v2 + partition_key_range: Keys::Single(partition_key), + }, + ) + .into_raw(), + v1::Command::UpdatePartitionDurability(payload) => { + records::UpdatePartitionDurability::new(source, dedup, payload).into_raw() + } + v1::Command::UpsertSchema(payload) => { + records::UpsertSchema::new(source, dedup, payload).into_raw() + } + v1::Command::VersionBarrier(payload) => { + records::VersionBarrier::new(source, dedup, payload).into_raw() + } + }; + + Ok(envelope) + } +} + +impl From<(Keys, v2::Header)> for v1::Header { + fn from((keys, value): (Keys, v2::Header)) -> Self { + let source = match value.source { + v2::Source::Ingress => v1::Source::Ingress {}, + v2::Source::ControlPlane => v1::Source::ControlPlane {}, + v2::Source::Processor { + partition_id, + partition_key, + leader_epoch, + } => v1::Source::Processor { + partition_id, + partition_key, + leader_epoch, + }, + }; + + // this is only for backward compatibility + // but in reality the partition_id in the dest + // should never be used. Instead the associated + // record keys should. + let partition_key = match keys { + Keys::None => 0, + Keys::Single(pk) => pk, + Keys::Pair(pk, _) => pk, + Keys::RangeInclusive(range) => *range.start(), + }; + + v1::Header { + source, + dest: v1::Destination::Processor { + partition_key, + dedup: value.dedup.into(), + }, + } + } +} + +impl From for Option { + fn from(value: v2::Dedup) -> Self { + match value { + v2::Dedup::None => None, + v2::Dedup::SelfProposal { leader_epoch, seq } => { + Some(DedupInformation::self_proposal(EpochSequenceNumber { + leader_epoch, + sequence_number: seq, + })) + } + v2::Dedup::ForeignPartition { partition, seq } => { + Some(DedupInformation::cross_partition(partition, seq)) + } + v2::Dedup::Arbitrary { prefix, seq } => Some(DedupInformation::ingress(prefix, seq)), + } + } +} + +// compatibility from v2 to v1. We will keep writing v1 envelops +// until the following release. + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::AnnounceLeader(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::VersionBarrier(inner), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from( + value: v2::Envelope, + ) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::UpdatePartitionDurability(inner), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::PatchState(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::TerminateInvocation(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::PurgeInvocation(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::PurgeJournal(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::Invoke(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::TruncateOutbox(inner.index), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::ProxyThrough(inner.invocation.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::AttachInvocation(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::ResumeInvocation(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::RestartAsNewInvocation(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::InvokerEffect(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::Timer(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::ScheduleTimer(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::InvocationResponse(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from( + value: v2::Envelope, + ) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::NotifyGetInvocationOutputResponse(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::NotifySignal(inner.into()), + }) + } +} + +impl TryFrom> for v1::Envelope { + type Error = StorageDecodeError; + + fn try_from(value: v2::Envelope) -> Result { + let (header, inner) = value.split()?; + + let header = v1::Header::from((inner.record_keys(), header)); + + Ok(Self { + header, + command: v1::Command::UpsertSchema(inner), + }) + } +} diff --git a/crates/wal-protocol/src/v2/records.rs b/crates/wal-protocol/src/v2/records.rs new file mode 100644 index 0000000000..7ce3bf7b6f --- /dev/null +++ b/crates/wal-protocol/src/v2/records.rs @@ -0,0 +1,431 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use restate_types::{ + bilrost_storage_encode_decode, flexbuffers_storage_encode_decode, + identifiers::WithPartitionKey, + invocation, + logs::{HasRecordKeys, Keys}, + message::MessageIndex, + state_mut, +}; +use serde::{Deserialize, Serialize}; + +use super::sealed::Sealed; +use super::{Record, RecordKind}; +use crate::timer::{self}; + +// Create type wrappers to implement storage encode/decode +// and HasRecordKeys +#[derive( + Debug, + Clone, + Eq, + PartialEq, + bilrost::Message, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ExternalStateMutation(state_mut::ExternalStateMutation); +bilrost_storage_encode_decode!(ExternalStateMutation); + +impl HasRecordKeys for ExternalStateMutation { + fn record_keys(&self) -> Keys { + Keys::Single(self.0.service_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct InvocationTermination(invocation::InvocationTermination); +flexbuffers_storage_encode_decode!(InvocationTermination); + +impl HasRecordKeys for InvocationTermination { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct PurgeInvocationRequest(invocation::PurgeInvocationRequest); +flexbuffers_storage_encode_decode!(PurgeInvocationRequest); + +impl HasRecordKeys for PurgeInvocationRequest { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ServiceInvocation(Box); + +flexbuffers_storage_encode_decode!(ServiceInvocation); + +impl HasRecordKeys for ServiceInvocation { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct TruncateOutboxRequest { + #[bilrost(1)] + pub index: MessageIndex, + + #[bilrost(2)] + pub partition_key_range: Keys, +} + +impl HasRecordKeys for TruncateOutboxRequest { + fn record_keys(&self) -> Keys { + self.partition_key_range.clone() + } +} + +bilrost_storage_encode_decode!(TruncateOutboxRequest); + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct AttachInvocationRequest(invocation::AttachInvocationRequest); + +flexbuffers_storage_encode_decode!(AttachInvocationRequest); + +impl HasRecordKeys for AttachInvocationRequest { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct ResumeInvocationRequest(invocation::ResumeInvocationRequest); + +flexbuffers_storage_encode_decode!(ResumeInvocationRequest); + +impl HasRecordKeys for ResumeInvocationRequest { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct RestartAsNewInvocationRequest(invocation::RestartAsNewInvocationRequest); + +flexbuffers_storage_encode_decode!(RestartAsNewInvocationRequest); + +impl HasRecordKeys for RestartAsNewInvocationRequest { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct Effect(Box); + +flexbuffers_storage_encode_decode!(Effect); + +impl HasRecordKeys for Effect { + fn record_keys(&self) -> restate_types::logs::Keys { + Keys::Single(self.invocation_id.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct TimerKeyValue(timer::TimerKeyValue); + +flexbuffers_storage_encode_decode!(TimerKeyValue); + +impl HasRecordKeys for TimerKeyValue { + fn record_keys(&self) -> Keys { + Keys::Single(self.invocation_id().partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct InvocationResponsePayload(invocation::InvocationResponse); + +flexbuffers_storage_encode_decode!(InvocationResponsePayload); + +impl HasRecordKeys for InvocationResponsePayload { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct GetInvocationOutputResponse(invocation::GetInvocationOutputResponse); + +flexbuffers_storage_encode_decode!(GetInvocationOutputResponse); + +impl HasRecordKeys for GetInvocationOutputResponse { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive( + Clone, + Eq, + PartialEq, + Serialize, + Deserialize, + derive_more::Deref, + derive_more::Into, + derive_more::From, +)] +pub struct NotifySignalRequest(invocation::NotifySignalRequest); + +flexbuffers_storage_encode_decode!(NotifySignalRequest); + +impl HasRecordKeys for NotifySignalRequest { + fn record_keys(&self) -> Keys { + Keys::Single(self.partition_key()) + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct ProxyThroughRequest { + pub invocation: ServiceInvocation, + + pub proxy_partition: Keys, +} + +flexbuffers_storage_encode_decode!(ProxyThroughRequest); + +impl HasRecordKeys for ProxyThroughRequest { + fn record_keys(&self) -> Keys { + self.proxy_partition.clone() + } +} + +// end types + +// define record types + +macro_rules! record { + {@name=$name:ident, @kind=$type:expr, @payload=$payload:path} => { + #[allow(dead_code)] + #[derive(Clone, Copy)] + pub struct $name; + impl Sealed for $name{} + impl Record for $name { + const KIND: RecordKind = $type; + type Payload = $payload; + } + }; + } + +record! { + @name=AnnounceLeader, + @kind=RecordKind::AnnounceLeader, + @payload=crate::control::AnnounceLeader +} + +record! { + @name=VersionBarrier, + @kind=RecordKind::VersionBarrier, + @payload=crate::control::VersionBarrier +} + +record! { + @name=UpdatePartitionDurability, + @kind=RecordKind::UpdatePartitionDurability, + @payload=crate::control::PartitionDurability +} + +record! { + @name=PatchState, + @kind=RecordKind::PatchState, + @payload=ExternalStateMutation +} + +record! { + @name=TerminateInvocation, + @kind=RecordKind::TerminateInvocation, + @payload=InvocationTermination +} + +record! { + @name=PurgeInvocation, + @kind=RecordKind::PurgeInvocation, + @payload=PurgeInvocationRequest +} + +record! { + @name=PurgeJournal, + @kind=RecordKind::PurgeJournal, + @payload=PurgeInvocationRequest +} + +record! { + @name=Invoke, + @kind=RecordKind::Invoke, + @payload=ServiceInvocation +} + +record! { + @name=TruncateOutbox, + @kind=RecordKind::TruncateOutbox, + @payload=TruncateOutboxRequest +} + +record! { + @name=ProxyThrough, + @kind=RecordKind::ProxyThrough, + @payload=ProxyThroughRequest +} + +record! { + @name=AttachInvocation, + @kind=RecordKind::AttachInvocation, + @payload=AttachInvocationRequest +} + +record! { + @name=ResumeInvocation, + @kind=RecordKind::ResumeInvocation, + @payload=ResumeInvocationRequest +} + +record! { + @name=RestartAsNewInvocation, + @kind=RecordKind::RestartAsNewInvocation, + @payload=RestartAsNewInvocationRequest +} + +record! { + @name=InvokerEffect, + @kind=RecordKind::InvokerEffect, + @payload=Effect +} + +record! { + @name=Timer, + @kind=RecordKind::Timer, + @payload=TimerKeyValue +} + +record! { + @name=ScheduleTimer, + @kind=RecordKind::ScheduleTimer, + @payload=TimerKeyValue +} + +record! { + @name=InvocationResponse, + @kind=RecordKind::InvocationResponse, + @payload=InvocationResponsePayload +} + +record! { + @name=NotifyGetInvocationOutputResponse, + @kind=RecordKind::NotifyGetInvocationOutputResponse, + @payload=GetInvocationOutputResponse +} + +record! { + @name=NotifySignal, + @kind=RecordKind::NotifySignal, + @payload=NotifySignalRequest +} + +record! { + @name=UpsertSchema, + @kind=RecordKind::UpsertSchema, + @payload=crate::control::UpsertSchema +} diff --git a/crates/worker/src/partition/leadership/durability_tracker.rs b/crates/worker/src/partition/leadership/durability_tracker.rs index a59200c4a8..886fc84d37 100644 --- a/crates/worker/src/partition/leadership/durability_tracker.rs +++ b/crates/worker/src/partition/leadership/durability_tracker.rs @@ -9,21 +9,22 @@ // by the Apache License, Version 2.0. use std::pin::Pin; +use std::sync::Arc; use std::task::Poll; use std::time::Duration; use futures::{Stream, StreamExt}; -use restate_partition_store::snapshots::ArchivedLsn; use tokio::sync::watch; use tokio::time::{Instant, MissedTickBehavior}; use tokio_stream::wrappers::{IntervalStream, WatchStream}; use tracing::{debug, warn}; use restate_core::Metadata; +use restate_partition_store::snapshots::ArchivedLsn; use restate_types::config::{Configuration, DurabilityMode}; -use restate_types::identifiers::PartitionId; -use restate_types::logs::{Lsn, SequenceNumber}; +use restate_types::logs::{Keys, Lsn, SequenceNumber}; use restate_types::nodes_config::Role; +use restate_types::partitions::Partition; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::time::MillisSinceEpoch; use restate_wal_protocol::control::PartitionDurability; @@ -37,7 +38,7 @@ const WARN_PERIOD: Duration = Duration::from_secs(60); /// changed in the replica-set, but it'll react immediately to changes in the archived Lsn /// watch. pub struct DurabilityTracker { - partition_id: PartitionId, + partition: Arc, last_reported_durable_lsn: Lsn, replica_set_states: PartitionReplicaSetStates, archived_lsn_watch: WatchStream>, @@ -50,7 +51,7 @@ pub struct DurabilityTracker { impl DurabilityTracker { pub fn new( - partition_id: PartitionId, + partition: Arc, last_reported_durable_lsn: Option, replica_set_states: PartitionReplicaSetStates, archived_lsn_watch: watch::Receiver>, @@ -62,7 +63,7 @@ impl DurabilityTracker { let check_timer = IntervalStream::new(check_timer); Self { - partition_id, + partition, last_reported_durable_lsn: last_reported_durable_lsn.unwrap_or(Lsn::INVALID), replica_set_states, archived_lsn_watch: WatchStream::new(archived_lsn_watch), @@ -184,11 +185,11 @@ impl Stream for DurabilityTracker { } DurabilityMode::ReplicaSetOnly => self .replica_set_states - .get_min_durable_lsn(self.partition_id), + .get_min_durable_lsn(self.partition.partition_id), DurabilityMode::SnapshotAndReplicaSet => { let min_durable_lsn = self .replica_set_states - .get_min_durable_lsn(self.partition_id); + .get_min_durable_lsn(self.partition.partition_id); self.last_archived.min(min_durable_lsn) } // disabled until ad-hoc snapshot sharing is supported @@ -202,7 +203,7 @@ impl Stream for DurabilityTracker { DurabilityMode::Balanced => { let max_durable_lsn = self .replica_set_states - .get_max_durable_lsn(self.partition_id); + .get_max_durable_lsn(self.partition.partition_id); self.last_archived.min(max_durable_lsn) } }; @@ -213,15 +214,16 @@ impl Stream for DurabilityTracker { } let partition_durability = PartitionDurability { - partition_id: self.partition_id, + partition_id: self.partition.partition_id, durable_point: suggested, modification_time: MillisSinceEpoch::now(), + partition_key_range: Keys::RangeInclusive(self.partition.key_range.clone()), }; // We don't want to keep reporting the same durable Lsn over and over. self.last_reported_durable_lsn = suggested; debug!( - partition_id = %self.partition_id, + partition_id = %self.partition.partition_id, durability_mode = %durability_mode, "Reporting {suggested:?} as a durable point for partition" ); diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index b356807625..1269897e12 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -405,7 +405,7 @@ where .map(|d| d.durable_point); let durability_tracker = DurabilityTracker::new( - self.partition.partition_id, + Arc::clone(&self.partition), last_reported_durable_lsn, replica_set_states, partition_store.partition_db().watch_archived_lsn(),