Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions crates/types/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,15 +349,26 @@ impl From<InvocationUuid> for opentelemetry::trace::SpanId {
/// Services are isolated by key. This means that there cannot be two concurrent
/// invocations for the same service instance (service name, key).
#[derive(
Eq, Hash, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Serialize, serde::Deserialize,
Eq,
Hash,
PartialEq,
PartialOrd,
Ord,
Clone,
Debug,
serde::Serialize,
serde::Deserialize,
bilrost::Message,
)]
pub struct ServiceId {
// TODO rename this to KeyedServiceId. This type can be used only by keyed service types (virtual objects and workflows)
/// Identifies the grpc service
#[bilrost(1)]
pub service_name: ByteString,
/// Identifies the service instance for the given service name
#[bilrost(2)]
pub key: ByteString,

#[bilrost(3)]
partition_key: PartitionKey,
}

Expand Down
23 changes: 12 additions & 11 deletions crates/types/src/invocation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@

pub mod client;

use std::borrow::Cow;
use std::hash::Hash;
use std::ops::Deref;
use std::str::FromStr;
use std::time::Duration;
use std::{cmp, fmt};

use bytes::Bytes;
use bytestring::ByteString;
use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
use serde_with::{DisplayFromStr, FromInto, serde_as};

use crate::errors::InvocationError;
use crate::identifiers::{
DeploymentId, EntryIndex, IdempotencyId, InvocationId, PartitionKey,
Expand All @@ -21,17 +33,6 @@ use crate::journal_v2::{CompletionId, GetInvocationOutputResult, Signal};
use crate::time::MillisSinceEpoch;
use crate::{GenerationalNodeId, RestateVersion};

use bytes::Bytes;
use bytestring::ByteString;
use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceState};
use serde_with::{DisplayFromStr, FromInto, serde_as};
use std::borrow::Cow;
use std::hash::Hash;
use std::ops::Deref;
use std::str::FromStr;
use std::time::Duration;
use std::{cmp, fmt};

// Re-exporting opentelemetry [`TraceId`] to avoid having to import opentelemetry in all crates.
pub use opentelemetry::trace::TraceId;

Expand Down
5 changes: 4 additions & 1 deletion crates/types/src/state_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ use crate::identifiers::ServiceId;
/// ExternalStateMutation
///
/// represents an external request to mutate a user's state.
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize, bilrost::Message)]
pub struct ExternalStateMutation {
#[bilrost(1)]
pub service_id: ServiceId,
#[bilrost(2)]
pub version: Option<String>,
// flexbuffers only supports string-keyed maps :-( --> so we store it as vector of kv pairs
#[bilrost(3)]
#[serde_as(as = "serde_with::Seq<(_, _)>")]
pub state: HashMap<Bytes, Bytes>,
}
Expand Down
38 changes: 37 additions & 1 deletion crates/types/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::journal_v2::raw::{RawEntry, RawEntryError, TryFromEntry};
use crate::journal_v2::{Decoder, EntryMetadata, EntryType};
use crate::time::MillisSinceEpoch;

pub use tracing;

#[derive(Debug, thiserror::Error)]
pub enum StorageEncodeError {
#[error("encoding failed: {0}")]
Expand Down Expand Up @@ -187,7 +189,7 @@ macro_rules! flexbuffers_storage_encode_decode {
Self: Sized,
{
$crate::storage::decode::decode_serde(buf, kind).map_err(|err| {
::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name));
$crate::storage::tracing::error!(%err, "{} decode failure (decoding {})", kind, stringify!($name));
err
})

Expand All @@ -196,6 +198,40 @@ macro_rules! flexbuffers_storage_encode_decode {
};
}

/// Implements the [`StorageEncode`] and [`StorageDecode`] by encoding/decoding the implementing
/// type using [`bilrost`].
#[macro_export]
macro_rules! bilrost_storage_encode_decode {
($name:tt) => {
impl $crate::storage::StorageEncode for $name {
fn default_codec(&self) -> $crate::storage::StorageCodecKind {
$crate::storage::StorageCodecKind::Bilrost
}

fn encode(
&self,
buf: &mut ::bytes::BytesMut,
) -> Result<(), $crate::storage::StorageEncodeError> {
bytes::BufMut::put(buf, $crate::storage::encode::encode_bilrost(self));
Ok(())
}
}

impl $crate::storage::StorageDecode for $name {
fn decode<B: ::bytes::Buf>(
buf: &mut B,
kind: $crate::storage::StorageCodecKind,
) -> Result<Self, $crate::storage::StorageDecodeError>
where
Self: Sized,
{
debug_assert_eq!(kind, $crate::storage::StorageCodecKind::Bilrost);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd suggest we make this an assert_eq

$crate::storage::decode::decode_bilrost(buf)
}
}
};
}

/// A polymorphic container of a buffer or a cached storage-encodeable object
#[derive(Clone, derive_more::Debug, BilrostAs)]
#[bilrost_as(dto::PolyBytes)]
Expand Down
2 changes: 1 addition & 1 deletion crates/wal-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ serde = ["dep:serde", "enum-map/serde", "bytestring/serde", "restate-invoker-api

[dependencies]
restate-workspace-hack = { workspace = true }

restate-invoker-api = { workspace = true }
restate-storage-api = { workspace = true }
restate-types = { workspace = true }
Expand All @@ -22,6 +21,7 @@ anyhow = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
bilrost = { workspace = true }
derive_more = { workspace = true }
enum-map = { workspace = true }
serde = { workspace = true, optional = true }
strum = { workspace = true }
Expand Down
52 changes: 49 additions & 3 deletions crates/wal-protocol/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,38 @@
use std::ops::RangeInclusive;

use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::logs::{Keys, Lsn};
use restate_types::logs::{HasRecordKeys, Keys, Lsn};
use restate_types::schema::Schema;
use restate_types::time::MillisSinceEpoch;
use restate_types::{GenerationalNodeId, SemanticRestateVersion};
use restate_types::{
GenerationalNodeId, SemanticRestateVersion, bilrost_storage_encode_decode,
flexbuffers_storage_encode_decode,
};

/// Announcing a new leader. This message can be written by any component to make the specified
/// partition processor the leader.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct AnnounceLeader {
/// Sender of the announce leader message.
///
/// This became non-optional in v1.5. Noting that it has always been set in previous versions,
/// it's safe to assume that it's always set.
#[bilrost(1)]
pub node_id: GenerationalNodeId,
#[bilrost(2)]
pub leader_epoch: LeaderEpoch,
#[bilrost(3)]
pub partition_key_range: RangeInclusive<PartitionKey>,
}

bilrost_storage_encode_decode!(AnnounceLeader);

impl HasRecordKeys for AnnounceLeader {
fn record_keys(&self) -> Keys {
Keys::RangeInclusive(self.partition_key_range.clone())
}
}
/// A version barrier to fence off state machine changes that require a certain minimum
/// version of restate server.
///
Expand All @@ -40,12 +53,23 @@ pub struct AnnounceLeader {
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct VersionBarrier {
/// The minimum version required (inclusive) to progress after this barrier.
#[bilrost(1)]
pub version: SemanticRestateVersion,
/// A human-readable reason for why this barrier exists.
#[bilrost(2)]
pub human_reason: Option<String>,
#[bilrost(3)]
pub partition_key_range: Keys,
}

bilrost_storage_encode_decode!(VersionBarrier);

impl HasRecordKeys for VersionBarrier {
fn record_keys(&self) -> Keys {
self.partition_key_range.clone()
}
}

/// Updates the `PARTITION_DURABILITY` FSM variable to the given value. Note that durability
/// only applies to partitions with the same `partition_id`. At replay time, the partition will
/// ignore updates that are not targeted to its own ID.
Expand All @@ -56,12 +80,26 @@ pub struct VersionBarrier {
#[derive(Debug, Clone, PartialEq, Eq, bilrost::Message)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct PartitionDurability {
#[bilrost(1)]
pub partition_id: PartitionId,
/// The partition has applied this LSN durably to the replica-set and/or has been
/// persisted in a snapshot in the snapshot repository.
#[bilrost(2)]
pub durable_point: Lsn,
/// Timestamp which the durability point was updated
#[bilrost(3)]
pub modification_time: MillisSinceEpoch,
/// partition key range
#[bilrost(4)]
pub partition_key_range: Keys,
}

bilrost_storage_encode_decode!(PartitionDurability);

impl HasRecordKeys for PartitionDurability {
fn record_keys(&self) -> Keys {
self.partition_key_range.clone()
}
}

/// Consistently store schema across partition replicas.
Expand All @@ -73,3 +111,11 @@ pub struct UpsertSchema {
pub partition_key_range: Keys,
pub schema: Schema,
}

flexbuffers_storage_encode_decode!(UpsertSchema);

impl HasRecordKeys for UpsertSchema {
fn record_keys(&self) -> Keys {
self.partition_key_range.clone()
}
}
Loading