diff --git a/Cargo.lock b/Cargo.lock index a22408f5c4..4a8db973ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7330,6 +7330,27 @@ dependencies = [ "tower 0.5.2", ] +[[package]] +name = "restate-ingestion-client" +version = "1.6.0-dev" +dependencies = [ + "arc-swap", + "bytes", + "dashmap", + "futures", + "googletest", + "pin-project", + "restate-core", + "restate-types", + "restate-workspace-hack", + "test-log", + "thiserror 2.0.17", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + [[package]] name = "restate-ingress-http" version = "1.6.0-dev" diff --git a/Cargo.toml b/Cargo.toml index 3845eace17..3d0b307c40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ restate-utoipa = { path = "crates/utoipa" } restate-vqueues = { path = "crates/vqueues" } restate-wal-protocol = { path = "crates/wal-protocol" } restate-worker = { path = "crates/worker" } +restate-ingestion-client = { path = "crates/ingestion-client" } # this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo # outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index 5ad0bbe1db..5d8142d524 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -8,12 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use futures::FutureExt; use pin_project::pin_project; use restate_types::logs::Record; -use tokio::sync::{Notify, mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tracing::{trace, warn}; use restate_core::{ShutdownError, TaskCenter, TaskHandle, cancellation_watcher}; @@ -130,8 +128,8 @@ where batch.push(record); notif_buffer.push(tx); } - AppendOperation::Canary(notify) => { - notify.notify_one(); + AppendOperation::Canary(tx) => { + notif_buffer.push(tx); } AppendOperation::MarkAsPreferred => { appender.mark_as_preferred(); @@ -353,23 +351,19 @@ impl LogSender { Ok(CommitToken { rx }) } - /// Wait for previously enqueued records to be committed - /// - /// Not cancellation safe. Every call will attempt to acquire capacity on the channel and send - /// a new message to the appender. - pub async fn notify_committed(&self) -> Result<(), EnqueueError<()>> { + /// Returns a [`CommitToken`] that is resolved once all previously enqueued records are committed. + pub async fn notify_committed(&self) -> Result> { let Ok(permit) = self.tx.reserve().await else { // channel is closed, this should happen the appender is draining or has been darained // already return Err(EnqueueError::Closed(())); }; - let notify = Arc::new(Notify::new()); - let canary = AppendOperation::Canary(notify.clone()); + let (tx, rx) = oneshot::channel(); + let canary = AppendOperation::Canary(tx); permit.send(canary); - notify.notified().await; - Ok(()) + Ok(CommitToken { rx }) } /// Marks this node as a preferred writer for the underlying log @@ -422,7 +416,7 @@ enum AppendOperation { EnqueueWithNotification(Record, oneshot::Sender<()>), // A message denoting a request to be notified when it's processed by the appender. // It's used to check if previously enqueued appends have been committed or not - Canary(Arc), + Canary(oneshot::Sender<()>), /// Let's bifrost know that this node is the preferred writer of this log MarkAsPreferred, /// Let's bifrost know that this node might not be the preferred writer of this log diff --git a/crates/bifrost/src/record.rs b/crates/bifrost/src/record.rs index aba7d1aed6..48a71d7136 100644 --- a/crates/bifrost/src/record.rs +++ b/crates/bifrost/src/record.rs @@ -12,6 +12,7 @@ use core::str; use std::marker::PhantomData; use std::sync::Arc; +use bytes::Bytes; use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn, Record}; use restate_types::logs::{LogletOffset, SequenceNumber}; use restate_types::storage::{PolyBytes, StorageDecode, StorageDecodeError, StorageEncode}; @@ -200,29 +201,19 @@ pub struct Gap { pub to: S, } +#[derive(Clone)] pub struct InputRecord { created_at: NanosSinceEpoch, keys: Keys, - body: Arc, + body: PolyBytes, _phantom: PhantomData, } -impl Clone for InputRecord { - fn clone(&self) -> Self { - Self { - created_at: self.created_at, - keys: self.keys.clone(), - body: Arc::clone(&self.body), - _phantom: self._phantom, - } - } -} - // This is a zero-cost transformation. The type is erased at runtime, but the underlying // layout is identical. impl InputRecord { pub fn into_record(self) -> Record { - Record::from_parts(self.created_at, self.keys, PolyBytes::Typed(self.body)) + Record::from_parts(self.created_at, self.keys, self.body) } } @@ -231,7 +222,24 @@ impl InputRecord { Self { created_at, keys, - body, + body: PolyBytes::Typed(body), + _phantom: PhantomData, + } + } + + /// Builds an [`InputRecord`] directly from raw bytes without validating the payload. + /// + /// # Safety + /// Caller must guarantee the bytes are a correctly storage-encoded `T`. + pub unsafe fn from_bytes_unchecked( + created_at: NanosSinceEpoch, + keys: Keys, + body: Bytes, + ) -> Self { + Self { + created_at, + keys, + body: PolyBytes::Bytes(body), _phantom: PhantomData, } } @@ -246,7 +254,7 @@ impl From> for InputRecord { InputRecord { created_at: NanosSinceEpoch::now(), keys: val.record_keys(), - body: val, + body: PolyBytes::Typed(val), _phantom: PhantomData, } } @@ -257,7 +265,7 @@ impl From for InputRecord { InputRecord { created_at: NanosSinceEpoch::now(), keys: Keys::None, - body: Arc::new(val), + body: PolyBytes::Typed(Arc::new(val)), _phantom: PhantomData, } } @@ -268,7 +276,7 @@ impl From<&str> for InputRecord { InputRecord { created_at: NanosSinceEpoch::now(), keys: Keys::None, - body: Arc::new(String::from(val)), + body: PolyBytes::Typed(Arc::new(String::from(val))), _phantom: PhantomData, } } @@ -279,7 +287,7 @@ impl From> for InputRecord { InputRecord { created_at: NanosSinceEpoch::now(), keys: val.record_keys(), - body: Arc::new(val.into_inner()), + body: PolyBytes::Typed(Arc::new(val.into_inner())), _phantom: PhantomData, } } diff --git a/crates/core/src/worker_api/partition_processor_rpc_client.rs b/crates/core/src/worker_api/partition_processor_rpc_client.rs index cc8ba97d70..4c35e95c25 100644 --- a/crates/core/src/worker_api/partition_processor_rpc_client.rs +++ b/crates/core/src/worker_api/partition_processor_rpc_client.rs @@ -71,10 +71,6 @@ pub enum RpcErrorKind { Busy, #[error("internal error: {0}")] Internal(String), - #[error("partition processor starting")] - Starting, - #[error("partition processor stopping")] - Stopping, } impl PartitionProcessorInvocationClientError { @@ -106,10 +102,8 @@ impl RpcError { match self.source { RpcErrorKind::Connect(_) | RpcErrorKind::NotLeader - | RpcErrorKind::Starting | RpcErrorKind::Busy - | RpcErrorKind::SendFailed - | RpcErrorKind::Stopping => { + | RpcErrorKind::SendFailed => { // These are pre-flight error that we can distinguish, // and for which we know for certain that no message was proposed yet to the log. true @@ -143,7 +137,7 @@ impl From for RpcErrorKind { RpcReplyError::ServiceNotFound | RpcReplyError::SortCodeNotFound => Self::NotLeader, RpcReplyError::LoadShedding => Self::Busy, RpcReplyError::ServiceNotReady => Self::Busy, - RpcReplyError::ServiceStopped => Self::Stopping, + RpcReplyError::ServiceStopped => Self::NotLeader, } } } @@ -154,8 +148,6 @@ impl From for RpcErrorKind { PartitionProcessorRpcError::NotLeader(_) => RpcErrorKind::NotLeader, PartitionProcessorRpcError::LostLeadership(_) => RpcErrorKind::LostLeadership, PartitionProcessorRpcError::Internal(msg) => RpcErrorKind::Internal(msg), - PartitionProcessorRpcError::Starting => RpcErrorKind::Starting, - PartitionProcessorRpcError::Stopping => RpcErrorKind::Stopping, } } } diff --git a/crates/ingestion-client/Cargo.toml b/crates/ingestion-client/Cargo.toml new file mode 100644 index 0000000000..b868750615 --- /dev/null +++ b/crates/ingestion-client/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "restate-ingestion-client" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[dependencies] +arc-swap = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +pin-project = { workspace = true } +thiserror = { workspace = true } +tokio-stream = { workspace = true } +tokio-util = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +restate-core = { workspace = true } +restate-types = { workspace = true } +restate-workspace-hack = { workspace = true } + +[dev-dependencies] +bytes = { workspace = true } +googletest = { workspace = true } +test-log = { workspace = true } + +restate-core = { workspace = true, features = ["test-util"] } diff --git a/crates/ingestion-client/src/chunks_size.rs b/crates/ingestion-client/src/chunks_size.rs new file mode 100644 index 0000000000..7a7ad03686 --- /dev/null +++ b/crates/ingestion-client/src/chunks_size.rs @@ -0,0 +1,144 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use tokio_stream::adapters::Fuse; +use tokio_stream::{Stream, StreamExt}; + +use core::pin::Pin; +use core::task::{Context, Poll}; + +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +#[pin_project::pin_project] +pub struct ChunksSize { + #[pin] + stream: Fuse, + items: Vec, + size: usize, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + size_fn: F, +} + +impl ChunksSize +where + F: Fn(&S::Item) -> usize, +{ + pub fn new(stream: S, max_size: usize, size_fn: F) -> Self { + ChunksSize { + stream: stream.fuse(), + items: Vec::default(), + size: 0, + cap: max_size, + size_fn, + } + } + + /// Drains the buffered items, returning them without waiting for the timeout or capacity limit. + pub fn into_remainder(self) -> Vec { + self.items + } +} + +impl Stream for ChunksSize +where + F: Fn(&S::Item) -> usize, +{ + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending if me.items.is_empty() => return Poll::Pending, + Poll::Pending => { + *me.size = 0; + return Poll::Ready(Some(std::mem::take(me.items))); + } + Poll::Ready(Some(item)) => { + let item_size = (me.size_fn)(&item); + + if *me.size + item_size <= *me.cap { + *me.size += item_size; + me.items.push(item); + } else if me.items.is_empty() { + *me.size = 0; + return Poll::Ready(Some(vec![item])); + } else { + let items = std::mem::replace(me.items, vec![item]); + *me.size = item_size; + return Poll::Ready(Some(items)); + } + } + Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. + let last = if me.items.is_empty() { + None + } else { + Some(std::mem::take(me.items)) + }; + + return Poll::Ready(last); + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = (lower / self.cap).saturating_add(chunk_len); + let upper = upper.and_then(|x| x.checked_add(chunk_len)); + (lower, upper) + } +} + +#[cfg(test)] +mod tests { + use std::task::Poll; + + use futures::{StreamExt, stream::poll_fn}; + use tokio_stream::iter; + + use super::ChunksSize; + + #[tokio::test] + async fn splits_into_size_bound_chunks() { + let stream = iter([2usize, 2, 3, 1]); + let chunks: Vec> = ChunksSize::new(stream, 5, |item| *item).collect().await; + + assert_eq!(chunks, vec![vec![2, 2], vec![3, 1]]); + } + + #[tokio::test] + async fn emits_item_larger_than_cap_as_its_own_chunk() { + let stream = iter([10usize, 2]); + let chunks: Vec> = ChunksSize::new(stream, 5, |item| *item).collect().await; + + assert_eq!(chunks, vec![vec![10], vec![2]]); + } + + #[tokio::test] + async fn flushes_buffer_when_inner_stream_is_pending() { + let mut state = 0; + let stream = poll_fn(move |_| { + state += 1; + match state { + 1 => Poll::Ready(Some(1usize)), + 2 => Poll::Pending, + 3 => Poll::Ready(Some(2usize)), + _ => Poll::Ready(None), + } + }); + + let chunks: Vec> = ChunksSize::new(stream, 10, |item| *item).collect().await; + + assert_eq!(chunks, vec![vec![1], vec![2]]); + } +} diff --git a/crates/ingestion-client/src/client.rs b/crates/ingestion-client/src/client.rs new file mode 100644 index 0000000000..705edcbf0c --- /dev/null +++ b/crates/ingestion-client/src/client.rs @@ -0,0 +1,444 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{marker::PhantomData, num::NonZeroUsize, sync::Arc, task::Poll}; + +use futures::{FutureExt, future::BoxFuture, ready}; +use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore}; + +use restate_core::{ + network::{Networking, TransportConnect}, + partitions::PartitionRouting, +}; +use restate_types::{ + identifiers::PartitionKey, + live::Live, + logs::{HasRecordKeys, Keys}, + net::ingest::IngestRecord, + partitions::{FindPartition, PartitionTable, PartitionTableError}, + storage::StorageEncode, +}; + +use crate::{ + RecordCommit, SessionOptions, + session::{SessionHandle, SessionManager}, +}; + +/// Errors that can be observed when interacting with the ingestion facade. +#[derive(Debug, thiserror::Error)] +pub enum IngestionError { + #[error("Ingestion closed")] + Closed, + #[error(transparent)] + PartitionTableError(#[from] PartitionTableError), +} + +/// High-level ingestion entry point that allocates permits and hands out session handles per partition. +/// [`IngestionClient`] can be cloned and shared across different routines. All users will share the same budget +/// and underlying partition sessions. +#[derive(Clone)] +pub struct IngestionClient { + manager: SessionManager, + partition_table: Live, + // memory budget for inflight invocations. + permits: Arc, + memory_budget: NonZeroUsize, + _phantom: PhantomData, +} + +impl IngestionClient { + /// Builds a new ingestion facade with the provided networking stack, partition metadata, and + /// budget (in bytes) for inflight records. + pub fn new( + networking: Networking, + partition_table: Live, + partition_routing: PartitionRouting, + memory_budget: NonZeroUsize, + opts: Option, + ) -> Self { + Self { + manager: SessionManager::new(networking, partition_routing, opts), + partition_table, + permits: Arc::new(Semaphore::new(memory_budget.get())), + memory_budget, + _phantom: PhantomData, + } + } +} + +impl IngestionClient +where + T: TransportConnect, + V: StorageEncode, +{ + pub fn partition_routing(&self) -> &PartitionRouting { + self.manager.partition_routing() + } + + pub fn partition_table(&self) -> &Live { + &self.partition_table + } + + pub fn networking(&self) -> &Networking { + self.manager.networking() + } + + /// Ingest a record with `partition_key`. + #[must_use] + pub fn ingest( + &self, + partition_key: PartitionKey, + record: impl Into>, + ) -> IngestFuture { + let record = record.into().into_record(); + + let budget = record.estimate_size().min(self.memory_budget.get()); + + let partition_id = match self + .partition_table + .pinned() + .find_partition_id(partition_key) + { + Ok(partition_id) => partition_id, + Err(err) => return IngestFuture::error(err.into()), + }; + + let handle = self.manager.get(partition_id); + + let acquire = self.permits.clone().acquire_many_owned(budget as u32); + + IngestFuture::awaiting_permits(record, handle, acquire) + } + + /// Once closed, calls to ingest will return [`IngestionError::Closed`]. + /// Inflight records might still get committed. + pub fn close(&self) { + self.permits.close(); + self.manager.close(); + } +} + +/// Future returned by [`IngestionClient::ingest`] +#[pin_project::pin_project(project=IngestFutureStateProj)] +enum IngestFutureState { + Error { + err: Option, + }, + AwaitingPermit { + record: Option, + handle: SessionHandle, + acquire: BoxFuture<'static, Result>, + }, + Done, +} + +#[pin_project::pin_project] +pub struct IngestFuture { + #[pin] + state: IngestFutureState, +} + +impl IngestFuture { + /// create a "ready" ingestion future that will resolve to error + fn error(err: IngestionError) -> Self { + IngestFuture { + state: IngestFutureState::Error { err: Some(err) }, + } + } + + /// create a pending ingestion future that will eventually resolve to + /// [`RecordCommit`] or error + fn awaiting_permits(record: IngestRecord, handle: SessionHandle, acquire: F) -> Self + where + F: Future> + Send + 'static, + { + Self { + state: IngestFutureState::AwaitingPermit { + record: Some(record), + handle, + acquire: acquire.boxed(), + }, + } + } +} + +impl Future for IngestFuture { + type Output = Result; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut this = self.project(); + let result = match this.state.as_mut().project() { + IngestFutureStateProj::Error { err } => Poll::Ready(Err(err.take().unwrap())), + IngestFutureStateProj::AwaitingPermit { + record, + handle, + acquire, + } => { + let output = match ready!(acquire.as_mut().poll(cx)) { + Ok(permit) => { + let record = record.take().unwrap(); + handle + .ingest(permit, record) + .map_err(|_| IngestionError::Closed) + } + Err(_) => Err(IngestionError::Closed), + }; + + Poll::Ready(output) + } + IngestFutureStateProj::Done => { + panic!("polled IngestFuture after completion"); + } + }; + + this.state.set(IngestFutureState::Done); + result + } +} + +pub struct InputRecord { + keys: Keys, + record: T, +} + +impl InputRecord +where + T: StorageEncode, +{ + fn into_record(self) -> IngestRecord { + IngestRecord::from_parts(self.keys, self.record) + } +} + +impl From for InputRecord +where + T: HasRecordKeys + StorageEncode, +{ + fn from(value: T) -> Self { + InputRecord { + keys: value.record_keys(), + record: value, + } + } +} + +impl InputRecord { + #[cfg(test)] + fn from_str(s: impl Into) -> Self { + InputRecord { + keys: Keys::None, + record: s.into(), + } + } +} + +#[cfg(test)] +mod test { + use std::{num::NonZeroUsize, time::Duration}; + + use futures::{FutureExt, StreamExt}; + use googletest::prelude::*; + use test_log::test; + + use restate_core::{ + Metadata, TaskCenter, TestCoreEnvBuilder, + network::{ + BackPressureMode, FailingConnector, Incoming, Rpc, ServiceMessage, ServiceStream, + }, + partitions::PartitionRouting, + }; + use restate_types::{ + Version, + identifiers::{LeaderEpoch, PartitionId}, + net::{ + self, RpcRequest, + ingest::{ReceivedIngestRequest, ResponseStatus}, + partition_processor::PartitionLeaderService, + }, + partitions::{ + PartitionTable, + state::{LeadershipState, PartitionReplicaSetStates}, + }, + }; + + use crate::{CancelledError, IngestionClient, SessionOptions, client::InputRecord}; + + async fn init_env( + batch_size: usize, + ) -> ( + ServiceStream, + IngestionClient, + ) { + let mut builder = TestCoreEnvBuilder::with_incoming_only_connector() + .add_mock_nodes_config() + .set_partition_table(PartitionTable::with_equally_sized_partitions( + Version::MIN, + 4, + )); + + let partition_replica_set_states = PartitionReplicaSetStates::default(); + for i in 0..4 { + partition_replica_set_states.note_observed_leader( + i.into(), + LeadershipState { + current_leader: builder.my_node_id, + current_leader_epoch: LeaderEpoch::INITIAL, + }, + ); + } + + let svc = builder + .router_builder + .register_service::( + 10, + BackPressureMode::PushBack, + ); + + let incoming = svc.start(); + + let env = builder.build().await; + let client = IngestionClient::new( + env.networking, + env.metadata.updateable_partition_table(), + PartitionRouting::new(partition_replica_set_states, TaskCenter::current()), + NonZeroUsize::new(10 * 1024 * 1024).unwrap(), // 10MB + SessionOptions { + batch_size, + ..Default::default() + } + .into(), + ); + + (incoming, client) + } + + async fn must_next( + recv: &mut ServiceStream, + ) -> Incoming> { + let Some(ServiceMessage::Rpc(msg)) = recv.next().await else { + panic!("stream closed"); + }; + + assert_eq!(msg.msg_type(), ReceivedIngestRequest::TYPE); + msg.into_typed() + } + + #[test(restate_core::test)] + async fn test_client_single_record() { + let (mut incoming, client) = init_env(10).await; + + let commit = client + .ingest(0, InputRecord::from_str("hello world")) + .await + .unwrap(); + + let msg = must_next(&mut incoming).await; + let (rx, body) = msg.split(); + assert_that!( + body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("hello world").into_record())) + ) + ); + + rx.send(ResponseStatus::Ack.into()); + + commit.await.expect("to resolve"); + } + + #[test(restate_core::test)] + async fn test_client_single_record_retry() { + let (mut incoming, client) = init_env(10).await; + + let mut commit = client + .ingest(0, InputRecord::from_str("hello world")) + .await + .unwrap(); + + let msg = must_next(&mut incoming).await; + let (rx, _) = msg.split(); + rx.send(ResponseStatus::NotLeader { of: 0.into() }.into()); + + assert!((&mut commit).now_or_never().is_none()); + + // ingestion will retry automatically so we must receive another message + let msg = must_next(&mut incoming).await; + let (rx, body) = msg.split(); + assert_that!( + body.records, + all!( + len(eq(1)), + contains(eq(InputRecord::from_str("hello world").into_record())) + ) + ); + // lets acknowledge it this time + rx.send(ResponseStatus::Ack.into()); + + commit.await.expect("to resolve"); + } + + #[test(restate_core::test)] + async fn test_client_close() { + let (_, client) = init_env(10).await; + + let commit = client + .ingest(0, InputRecord::from_str("hello world")) + .await + .unwrap(); + + client.close(); + + assert!(matches!(commit.await, Err(CancelledError))); + } + + #[test(restate_core::test(start_paused = true))] + async fn test_client_dispatch() { + let (mut incoming, client) = init_env(10).await; + + let pt = Metadata::with_current(|p| p.partition_table_snapshot()); + + for p in 0..4 { + let partition_id = PartitionId::from(p); + let partition = pt.get(&partition_id).unwrap(); + client + .ingest( + *partition.key_range.start(), + InputRecord::from_str(format!("partition {p}")), + ) + .await + .unwrap(); + } + + tokio::time::advance(Duration::from_millis(10)).await; // batch timeout + + // what happens is that we still get 4 different messages because each targets + // a single partition. + let mut received = vec![]; + for _ in 0..4 { + let msg = must_next(&mut incoming).await; + received.push(msg.sort_code()); + } + + assert_that!( + received, + all!( + len(eq(4)), //4 messages for 4 partitions + contains(eq(Some(0))), + contains(eq(Some(1))), + contains(eq(Some(2))), + contains(eq(Some(3))), + ) + ); + } +} diff --git a/crates/ingestion-client/src/lib.rs b/crates/ingestion-client/src/lib.rs new file mode 100644 index 0000000000..3d9de384e1 --- /dev/null +++ b/crates/ingestion-client/src/lib.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod chunks_size; +mod client; +mod session; + +pub use client::{IngestFuture, IngestionClient, IngestionError}; +pub use session::{CancelledError, RecordCommit, SessionClosed, SessionOptions}; diff --git a/crates/ingestion-client/src/session.rs b/crates/ingestion-client/src/session.rs new file mode 100644 index 0000000000..c3f9d9243b --- /dev/null +++ b/crates/ingestion-client/src/session.rs @@ -0,0 +1,535 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{collections::VecDeque, sync::Arc, time::Duration}; + +use dashmap::DashMap; +use futures::{FutureExt, StreamExt, future::OptionFuture, ready}; +use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_util::sync::CancellationToken; +use tracing::{debug, trace}; + +use restate_core::{ + TaskCenter, TaskKind, + network::{ + ConnectError, Connection, ConnectionClosed, NetworkSender, Networking, ReplyRx, Swimlane, + TransportConnect, + }, + partitions::PartitionRouting, +}; +use restate_types::{ + identifiers::PartitionId, + net::ingest::{IngestRecord, IngestRequest, IngestResponse, ResponseStatus}, + retries::RetryPolicy, +}; + +use crate::chunks_size::ChunksSize; + +/// Error returned when attempting to use a session that has already been closed. +#[derive(Clone, Copy, Debug, thiserror::Error)] +#[error("Partition session is closed")] +pub struct SessionClosed; + +/// Commitment failures that can be observed when waiting on [`RecordCommit`]. +#[derive(Debug, Clone, Copy, thiserror::Error)] +#[error("commit cancelled")] +pub struct CancelledError; + +/// Future that is resolved to the commit result +/// A [`CommitError::Cancelled`] might be returned +/// if [`IngestionClient`] is closed while record is in +/// flight. This does not guarantee that the record +/// was not processed or committed. +#[pin_project::pin_project] +pub struct RecordCommit { + v: Option, + #[pin] + rx: oneshot::Receiver>, +} + +impl Future for RecordCommit +where + V: Send + 'static, +{ + type Output = Result; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut this = self.project(); + match ready!(this.rx.poll_unpin(cx)) { + Ok(result) => std::task::Poll::Ready(result.map(|_| { + this.v + .take() + .expect("future should not be polled after completion") + })), + Err(_) => std::task::Poll::Ready(Err(CancelledError)), + } + } +} + +impl RecordCommit { + fn new(permit: OwnedSemaphorePermit) -> (Self, RecordCommitResolver) { + let (tx, rx) = oneshot::channel(); + ( + Self { v: Some(()), rx }, + RecordCommitResolver { + tx, + _permit: permit, + }, + ) + } +} + +impl RecordCommit { + pub fn map(self, f: F) -> RecordCommit + where + F: FnOnce(V) -> T, + { + let RecordCommit { v, rx } = self; + RecordCommit { v: v.map(f), rx } + } +} + +struct RecordCommitResolver { + tx: oneshot::Sender>, + _permit: OwnedSemaphorePermit, +} + +impl RecordCommitResolver { + /// Resolve the [`RecordCommit`] to committed. + pub fn committed(self) { + let _ = self.tx.send(Ok(())); + } + + /// explicitly cancel the RecordCommit + /// If resolver is dropped, the RecordCommit + /// will resolve to [`CommitError::Cancelled`] + #[allow(dead_code)] + pub fn cancelled(self) { + let _ = self.tx.send(Err(CancelledError)); + } +} + +struct IngestionBatch { + records: Arc<[IngestRecord]>, + resolvers: Vec, + + reply_rx: Option>, +} + +impl IngestionBatch { + fn new(batch: impl IntoIterator) -> Self { + let (resolvers, records): (Vec<_>, Vec<_>) = batch.into_iter().unzip(); + let records: Arc<[IngestRecord]> = Arc::from(records); + + Self { + records, + resolvers, + reply_rx: None, + } + } + + /// Marks every tracked record in the batch as committed. + fn committed(self) { + for resolver in self.resolvers { + resolver.committed(); + } + } + + fn len(&self) -> usize { + self.records.len() + } +} + +/// Tunable parameters for batching and networking behaviour of partition sessions. +#[derive(Debug, Clone)] +pub struct SessionOptions { + /// Maximum batch size in `bytes` + pub batch_size: usize, + /// Connection retry policy + /// Retry policy must be infinite (retries forever) + /// If not, the retry will fallback to 2 seconds intervals + pub connection_retry_policy: RetryPolicy, + /// Connection swimlane + pub swimlane: Swimlane, +} + +impl Default for SessionOptions { + fn default() -> Self { + Self { + // The default batch size of 50KB is to avoid + // overwhelming the PP on the hot path. + batch_size: 50 * 1024, // 50 KB + swimlane: Swimlane::IngressData, + connection_retry_policy: RetryPolicy::exponential( + Duration::from_millis(10), + 2.0, + None, + Some(Duration::from_secs(1)), + ), + } + } +} + +/// Cloneable sender that enqueues records for a specific partition session. +#[derive(Clone)] +pub struct SessionHandle { + tx: mpsc::UnboundedSender<(RecordCommitResolver, IngestRecord)>, +} + +impl SessionHandle { + /// Enqueues an ingest request along with the owned permit and returns a future tracking commit outcome. + pub fn ingest( + &self, + permit: OwnedSemaphorePermit, + record: IngestRecord, + ) -> Result { + let (commit, resolver) = RecordCommit::new(permit); + self.tx + .send((resolver, record)) + .map_err(|_| SessionClosed)?; + + Ok(commit) + } +} + +enum SessionState { + Connecting, + Connected { connection: Connection }, + Shutdown, +} + +/// Background task that drives the lifecycle of a single partition connection. +pub struct PartitionSession { + partition: PartitionId, + partition_routing: PartitionRouting, + networking: Networking, + opts: SessionOptions, + rx: UnboundedReceiverStream<(RecordCommitResolver, IngestRecord)>, + tx: mpsc::UnboundedSender<(RecordCommitResolver, IngestRecord)>, + inflight: VecDeque, +} + +impl PartitionSession { + fn new( + networking: Networking, + partition_routing: PartitionRouting, + partition: PartitionId, + opts: SessionOptions, + ) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let rx = UnboundedReceiverStream::new(rx); + + Self { + partition, + partition_routing, + networking, + opts, + inflight: Default::default(), + rx, + tx, + } + } + + /// Returns a handle that can be used by callers to enqueue new records. + pub fn handle(&self) -> SessionHandle { + SessionHandle { + tx: self.tx.clone(), + } + } +} + +impl PartitionSession +where + T: TransportConnect, +{ + /// Runs the session state machine until shut down, reacting to cancellation and connection errors. + pub async fn start(self, cancellation: CancellationToken) { + debug!( + partition_id = %self.partition, + "Starting ingestion partition session", + ); + + cancellation.run_until_cancelled(self.run_inner()).await; + } + + /// Runs the session state machine until shut down, reacting to cancellation and connection errors. + async fn run_inner(mut self) { + let mut state = SessionState::Connecting; + debug!( + partition_id = %self.partition, + "Starting ingestion partition session", + ); + + loop { + state = match state { + SessionState::Connecting => { + let mut retry = self.opts.connection_retry_policy.iter(); + loop { + match self.connect().await { + Some(state) => break state, + None => { + // retry + // this assumes that retry policy is infinite. If it's not it falls back + // to a fixed 2 seconds sleep between retries + tokio::time::sleep(retry.next().unwrap_or(Duration::from_secs(2))) + .await; + } + } + } + } + SessionState::Connected { connection } => self.connected(connection).await, + SessionState::Shutdown => { + self.rx.close(); + break; + } + } + } + } + + async fn connect(&self) -> Option { + let node_id = self + .partition_routing + .get_node_by_partition(self.partition)?; + + let result = self + .networking + .get_connection(node_id, self.opts.swimlane) + .await; + + match result { + Ok(connection) => { + debug!("Connection established to node {node_id}"); + Some(SessionState::Connected { connection }) + } + Err(ConnectError::Shutdown(_)) => Some(SessionState::Shutdown), + Err(err) => { + debug!("Failed to connect to node {node_id}: {err}"); + None + } + } + } + + /// Re-sends all inflight batches after a connection is restored. + async fn replay(&mut self, connection: &Connection) -> Result<(), ConnectionClosed> { + // todo(azmy): to avoid all the inflight batches again and waste traffic + // maybe test the connection first by sending an empty batch and wait for response + // before proceeding? + + let total = self.inflight.iter().fold(0, |v, i| v + i.len()); + trace!( + partition = %self.partition, + batches = self.inflight.len(), + records = total, + "Replaying inflight records after connection was restored" + ); + + for batch in self.inflight.iter_mut() { + let Some(permit) = connection.reserve().await else { + return Err(ConnectionClosed); + }; + + // resend batch + let reply_rx = permit + .send_rpc( + IngestRequest::from(Arc::clone(&batch.records)), + Some(self.partition.into()), + ) + .expect("encoding version to match"); + batch.reply_rx = Some(reply_rx); + } + + Ok(()) + } + + async fn connected(&mut self, connection: Connection) -> SessionState { + if self.replay(&connection).await.is_err() { + return SessionState::Connecting; + } + + let mut chunked = ChunksSize::new(&mut self.rx, self.opts.batch_size, |(_, item)| { + item.estimate_size() + }); + + let state = loop { + let head: OptionFuture<_> = self + .inflight + .front_mut() + .and_then(|batch| batch.reply_rx.as_mut()) + .into(); + + tokio::select! { + _ = connection.closed() => { + break SessionState::Connecting; + } + Some(batch) = chunked.next() => { + let batch = IngestionBatch::new(batch); + let records = Arc::clone(&batch.records); + + self.inflight.push_back(batch); + + let Some(permit) = connection.reserve().await else { + break SessionState::Connecting; + }; + + trace!("Sending ingest batch, len: {}", records.len()); + let reply_rx = permit + .send_rpc(IngestRequest::from(records), Some(self.partition.into())) + .expect("encoding version to match"); + + self.inflight.back_mut().expect("to exist").reply_rx = Some(reply_rx); + } + Some(result) = head => { + match result.map(|r|r.status) { + Ok(ResponseStatus::Ack) => { + let batch = self.inflight.pop_front().expect("not empty"); + batch.committed(); + } + Ok(response) => { + // Handle any other response code as a connection loss + // and retry all inflight batches. + debug!("Ingestion response from {}: {:?}", connection.peer(), response); + break SessionState::Connecting; + } + Err(err) => { + // we can assume that for any error + // we need to retry all the inflight batches. + // special case for load shedding we could + // throttle the stream a little bit then + // speed up over a period of time. + + debug!("Ingestion error from {}: {}", connection.peer(), err); + break SessionState::Connecting; + } + } + } + } + }; + + // state == Connecting + assert!(matches!(state, SessionState::Connecting)); + + // don't lose the buffered batch + let remainder = chunked.into_remainder(); + if !remainder.is_empty() { + self.inflight.push_back(IngestionBatch::new(remainder)); + } + + state + } +} + +struct SessionManagerInner { + networking: Networking, + partition_routing: PartitionRouting, + opts: SessionOptions, + // Since ingestion sessions are started on demand + // we make sure we decouple the session cancellation + // from the initiating task. Hence the session manager + // keep it's own cancellation token that is passed to + // all the sessions. + cancellation: CancellationToken, + handles: DashMap, +} + +impl SessionManagerInner +where + T: TransportConnect, +{ + /// Gets or start a new session to partition with given partition id. + /// It guarantees that only one session is started per partition id. + pub fn get(&self, id: PartitionId) -> SessionHandle { + self.handles + .entry(id) + .or_insert_with(|| { + let session = PartitionSession::new( + self.networking.clone(), + self.partition_routing.clone(), + id, + self.opts.clone(), + ); + + let handle = session.handle(); + + let cancellation = self.cancellation.child_token(); + let _ = TaskCenter::spawn( + TaskKind::Background, + "ingestion-partition-session", + async move { + session.start(cancellation).await; + Ok(()) + }, + ); + + handle + }) + .value() + .clone() + } +} + +impl Drop for SessionManagerInner { + fn drop(&mut self) { + self.cancellation.cancel(); + } +} + +/// Manager that owns all partition sessions and caches their handles. +#[derive(Clone)] +pub struct SessionManager { + inner: Arc>, +} + +impl SessionManager { + /// Creates a new session manager with optional overrides for session behaviour. + pub fn new( + networking: Networking, + partition_routing: PartitionRouting, + opts: Option, + ) -> Self { + let inner = SessionManagerInner { + networking, + partition_routing, + opts: opts.unwrap_or_default(), + handles: Default::default(), + cancellation: CancellationToken::new(), + }; + + Self { + inner: Arc::new(inner), + } + } + + pub fn partition_routing(&self) -> &PartitionRouting { + &self.inner.partition_routing + } + + pub fn networking(&self) -> &Networking { + &self.inner.networking + } +} + +impl SessionManager +where + T: TransportConnect, +{ + /// Returns a handle to the session for the given partition, creating it if needed. + pub fn get(&self, id: PartitionId) -> SessionHandle { + self.inner.get(id) + } + + /// Signals all sessions to shut down and prevents new work from being scheduled. + pub fn close(&self) { + self.inner.cancellation.cancel(); + } +} diff --git a/crates/types/src/net/ingest.rs b/crates/types/src/net/ingest.rs new file mode 100644 index 0000000000..b18d7f4ef1 --- /dev/null +++ b/crates/types/src/net/ingest.rs @@ -0,0 +1,124 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use bytes::{Bytes, BytesMut}; +use metrics::Key; + +use crate::identifiers::PartitionId; +use crate::logs::{HasRecordKeys, Keys}; +use crate::net::partition_processor::PartitionLeaderService; +use crate::net::{RpcRequest, bilrost_wire_codec, default_wire_codec, define_rpc}; +use crate::storage::{StorageCodec, StorageEncode}; + +#[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)] +pub struct IngestRecord { + pub keys: Keys, + pub record: Bytes, +} + +impl IngestRecord { + pub fn estimate_size(&self) -> usize { + size_of::() + self.record.len() + } + + pub fn from_parts(keys: Keys, record: T) -> Self + where + T: StorageEncode, + { + let mut buf = BytesMut::new(); + StorageCodec::encode(&record, &mut buf).expect("encode to pass"); + + Self { + keys, + record: buf.freeze(), + } + } +} + +impl HasRecordKeys for IngestRecord { + fn record_keys(&self) -> Keys { + self.keys.clone() + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct IngestRequest { + pub records: Arc<[IngestRecord]>, +} + +impl IngestRequest { + pub fn estimate_size(&self) -> usize { + self.records + .iter() + .fold(0, |size, item| size + item.estimate_size()) + } +} + +impl From> for IngestRequest { + fn from(records: Arc<[IngestRecord]>) -> Self { + Self { records } + } +} + +// todo(azmy): Use bilrost (depends on the payload) +default_wire_codec!(IngestRequest); + +#[derive(Debug, Clone, bilrost::Oneof, bilrost::Message)] +pub enum ResponseStatus { + Unknown, + #[bilrost(tag = 1, message)] + Ack, + #[bilrost(tag = 2, message)] + NotLeader { + of: PartitionId, + }, + #[bilrost(tag = 3, message)] + Internal { + msg: String, + }, +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct IngestResponse { + #[bilrost(1)] + pub status: ResponseStatus, +} + +impl From for IngestResponse { + fn from(status: ResponseStatus) -> Self { + Self { status } + } +} + +bilrost_wire_codec!(IngestResponse); + +define_rpc! { + @request=IngestRequest, + @response=IngestResponse, + @service=PartitionLeaderService, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct ReceivedIngestRequest { + pub records: Vec, +} + +default_wire_codec!(ReceivedIngestRequest); + +/// The [`ReceivedIngestRequest`] uses the same TYPE +/// as [`IngestRequest`] to be able to directly decode +/// received RPC messages to this type. +impl RpcRequest for ReceivedIngestRequest { + const TYPE: &str = stringify!(IngestRequest); + type Response = IngestResponse; + type Service = PartitionLeaderService; +} diff --git a/crates/types/src/net/mod.rs b/crates/types/src/net/mod.rs index 463a0e5646..e861254fa0 100644 --- a/crates/types/src/net/mod.rs +++ b/crates/types/src/net/mod.rs @@ -11,6 +11,7 @@ pub mod address; pub mod codec; pub mod connect_opts; +pub mod ingest; pub mod listener; pub mod log_server; pub mod metadata; diff --git a/crates/types/src/net/partition_processor.rs b/crates/types/src/net/partition_processor.rs index 8ebf4569c1..e555ea9c36 100644 --- a/crates/types/src/net/partition_processor.rs +++ b/crates/types/src/net/partition_processor.rs @@ -142,10 +142,6 @@ pub enum PartitionProcessorRpcError { //Busy, #[error("internal error: {0}")] Internal(String), - #[error("partition processor starting")] - Starting, - #[error("partition processor stopping")] - Stopping, } impl PartitionProcessorRpcError { @@ -153,9 +149,7 @@ impl PartitionProcessorRpcError { match self { PartitionProcessorRpcError::NotLeader(_) => true, PartitionProcessorRpcError::LostLeadership(_) => true, - PartitionProcessorRpcError::Stopping => true, PartitionProcessorRpcError::Internal(_) => false, - PartitionProcessorRpcError::Starting => false, } } } diff --git a/crates/worker/src/metric_definitions.rs b/crates/worker/src/metric_definitions.rs index 4330554f91..5351e1b8e7 100644 --- a/crates/worker/src/metric_definitions.rs +++ b/crates/worker/src/metric_definitions.rs @@ -33,6 +33,9 @@ pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_ pub const PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS: &str = "restate.partition.record_committed_to_read_latency.seconds"; +pub const PARTITION_INGESTION_REQUEST_LEN: &str = "restate.partition.ingest.request.len"; +pub const PARTITION_INGESTION_REQUEST_SIZE: &str = "restate.partition.ingest.request.size.bytes"; + pub(crate) fn describe_metrics() { describe_gauge!( PARTITION_BLOCKED_FLARE, @@ -97,4 +100,16 @@ pub(crate) fn describe_metrics() { Unit::Count, "Number of records between last applied lsn and the log tail" ); + + describe_histogram!( + PARTITION_INGESTION_REQUEST_LEN, + Unit::Count, + "Number of records in a single ingestion request" + ); + + describe_histogram!( + PARTITION_INGESTION_REQUEST_SIZE, + Unit::Bytes, + "Total size of records in a single ingestion request" + ); } diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 4a87029269..500b202492 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -36,6 +36,7 @@ use restate_types::identifiers::{ }; use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification}; use restate_types::logs::Keys; +use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; @@ -466,14 +467,35 @@ impl LeaderState { Ok(commit_token) => { self.awaiting_rpc_self_propose.push(SelfAppendFuture::new( commit_token, - success_response, - reciprocal, + |result: Result<(), PartitionProcessorRpcError>| { + reciprocal.send(result.map(|_| success_response)); + }, )); } Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))), } } + pub async fn propose_many_with_callback( + &mut self, + records: impl ExactSizeIterator, + callback: F, + ) where + F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static, + { + match self + .self_proposer + .propose_many_with_notification(records) + .await + { + Ok(commit_token) => { + self.awaiting_rpc_self_propose + .push(SelfAppendFuture::new(commit_token, callback)); + } + Err(e) => callback(Err(PartitionProcessorRpcError::Internal(e.to_string()))), + } + } + pub fn handle_actions( &mut self, invoker_tx: &mut impl restate_invoker_api::InvokerHandle>, @@ -702,42 +724,72 @@ impl LeaderState { } } +trait CallbackInner: Send + Sync + 'static { + fn call(self: Box, result: Result<(), PartitionProcessorRpcError>); +} + +impl CallbackInner for F +where + F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static, +{ + fn call(self: Box, result: Result<(), PartitionProcessorRpcError>) { + self(result) + } +} + +struct Callback { + inner: Box, +} + +impl Callback { + fn call(self, result: Result<(), PartitionProcessorRpcError>) { + self.inner.call(result); + } +} + +impl From for Callback +where + I: CallbackInner, +{ + fn from(value: I) -> Self { + Self { + inner: Box::new(value), + } + } +} + struct SelfAppendFuture { commit_token: CommitToken, - response: Option<(PartitionProcessorRpcResponse, RpcReciprocal)>, + callback: Option, } impl SelfAppendFuture { - fn new( - commit_token: CommitToken, - success_response: PartitionProcessorRpcResponse, - response_reciprocal: RpcReciprocal, - ) -> Self { + fn new(commit_token: CommitToken, callback: impl Into) -> Self { Self { commit_token, - response: Some((success_response, response_reciprocal)), + callback: Some(callback.into()), } } fn fail_with_internal(&mut self) { - if let Some((_, reciprocal)) = self.response.take() { - reciprocal.send(Err(PartitionProcessorRpcError::Internal( + if let Some(callback) = self.callback.take() { + callback.call(Err(PartitionProcessorRpcError::Internal( "error when proposing to bifrost".to_string(), ))); } } fn fail_with_lost_leadership(&mut self, this_partition_id: PartitionId) { - if let Some((_, reciprocal)) = self.response.take() { - reciprocal.send(Err(PartitionProcessorRpcError::LostLeadership( + if let Some(callback) = self.callback.take() { + callback.call(Err(PartitionProcessorRpcError::LostLeadership( this_partition_id, ))); } } fn succeed_with_appended(&mut self) { - if let Some((success_response, reciprocal)) = self.response.take() { - reciprocal.send(Ok(success_response)); + if let Some(callback) = self.callback.take() { + callback.call(Ok(())) } } } diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index 01c133a945..fcce25dc2f 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -49,6 +49,7 @@ use restate_types::errors::GenericError; use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId}; use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch}; use restate_types::message::MessageIndex; +use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; @@ -56,7 +57,7 @@ use restate_types::partitions::Partition; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::retries::with_jitter; use restate_types::schema::Schema; -use restate_types::storage::StorageEncodeError; +use restate_types::storage::{StorageDecodeError, StorageEncodeError}; use restate_vqueues::{SchedulerService, VQueuesMeta, VQueuesMetaMut}; use restate_wal_protocol::Command; use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability}; @@ -86,7 +87,9 @@ pub(crate) enum Error { #[error("failed writing to bifrost: {0}")] Bifrost(#[from] restate_bifrost::Error), #[error("failed serializing payload: {0}")] - Codec(#[from] StorageEncodeError), + Encode(#[from] StorageEncodeError), + #[error("failed deserializing payload: {0}")] + Decode(#[from] StorageDecodeError), #[error(transparent)] Shutdown(#[from] ShutdownError), #[error("error when self proposing")] @@ -647,6 +650,26 @@ impl LeadershipState { } } } + + /// propose to this partition + pub async fn propose_many_with_callback( + &mut self, + records: impl ExactSizeIterator, + callback: F, + ) where + F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static, + { + match &mut self.state { + State::Follower | State::Candidate { .. } => callback(Err( + PartitionProcessorRpcError::NotLeader(self.partition.partition_id), + )), + State::Leader(leader_state) => { + leader_state + .propose_many_with_callback(records, callback) + .await; + } + } + } } #[derive(Debug, derive_more::From)] struct TimerReader(PartitionStore); diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index eea7bc2ba9..cb1e445e3a 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -12,9 +12,11 @@ use std::sync::Arc; use futures::never::Never; -use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy}; +use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy, InputRecord}; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; -use restate_types::{identifiers::PartitionKey, logs::LogId}; +use restate_types::{ + identifiers::PartitionKey, logs::LogId, net::ingest::IngestRecord, time::NanosSinceEpoch, +}; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; use crate::partition::leadership::Error; @@ -149,6 +151,48 @@ impl SelfProposer { Ok(commit_token) } + pub async fn propose_many_with_notification( + &mut self, + records: impl ExactSizeIterator, + ) -> Result where { + let sender = self.bifrost_appender.sender(); + + // This should ideally be implemented + // by using `sender.enqueue_many` + // but since we have no guarantee over the + // underlying channel size a `reserve_many()` might + // return a misleading Closed error + // + // sender + // .enqueue_many(records) + // .await + // .map_err(|_| Error::SelfProposer)?; + // + // so instead we do this. + + for record in records { + // Skip decoding the envelope; build the InputRecord directly from the raw bytes. + // The ingestion client should only handle payloads of type Envelope. + let input = unsafe { + InputRecord::from_bytes_unchecked( + NanosSinceEpoch::now(), + record.keys, + record.record, + ) + }; + + sender + .enqueue(input) + .await + .map_err(|_| Error::SelfProposer)?; + } + + sender + .notify_committed() + .await + .map_err(|_| Error::SelfProposer) + } + fn create_header(&mut self, partition_key: PartitionKey) -> Header { let esn = self.epoch_sequence_number; self.epoch_sequence_number = self.epoch_sequence_number.next(); diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 12f74241d0..ad5df06917 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -31,7 +31,7 @@ use tracing::{Span, debug, error, info, instrument, trace, warn}; use restate_bifrost::loglet::FindTailOptions; use restate_bifrost::{Bifrost, LogEntry, MaybeRecord}; -use restate_core::network::{Oneshot, Reciprocal, ServiceMessage, Verdict}; +use restate_core::network::{Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, Verdict}; use restate_core::{Metadata, ShutdownError, cancellation_watcher, my_node_id}; use restate_invoker_api::capacity::InvokerCapacity; use restate_partition_store::{PartitionStore, PartitionStoreTransaction}; @@ -48,6 +48,7 @@ use restate_types::config::Configuration; use restate_types::identifiers::LeaderEpoch; use restate_types::logs::{KeyFilter, Lsn, Record, SequenceNumber}; use restate_types::net::RpcRequest; +use restate_types::net::ingest::{ReceivedIngestRequest, ResponseStatus}; use restate_types::net::partition_processor::{ PartitionLeaderService, PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcResponse, @@ -64,7 +65,8 @@ use restate_wal_protocol::{Command, Destination, Envelope, Header}; use self::leadership::trim_queue::TrimQueue; use crate::metric_definitions::{ - PARTITION_BLOCKED_FLARE, PARTITION_LABEL, PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, + PARTITION_BLOCKED_FLARE, PARTITION_INGESTION_REQUEST_LEN, PARTITION_INGESTION_REQUEST_SIZE, + PARTITION_LABEL, PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS, }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::LeadershipState; @@ -476,15 +478,7 @@ where self.status.effective_mode = self.leadership_state.effective_mode(); } Some(msg) = self.network_leader_svc_rx.recv() => { - match msg { - ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => { - let msg = msg.into_typed::(); - // note: split() decodes the payload - let (response_tx, body) = msg.split(); - self.on_rpc(response_tx, body, &mut partition_store, live_schemas.live_load()).await; - } - msg => { msg.fail(Verdict::MessageUnrecognized); } - } + self.on_rpc(msg, &mut partition_store, live_schemas.live_load()).await; } _ = status_update_timer.tick() => { if durable_lsn_watch.has_changed().map_err(|e| ProcessorError::Other(e.into()))? { @@ -613,7 +607,7 @@ where Ok(()) } - async fn on_rpc( + async fn on_pp_rpc_request( &mut self, response_tx: Reciprocal< Oneshot>, @@ -629,6 +623,61 @@ where ) .await; } + + async fn on_rpc( + &mut self, + msg: ServiceMessage, + partition_store: &mut PartitionStore, + schemas: &Schema, + ) { + match msg { + ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => { + let msg = msg.into_typed::(); + // note: split() decodes the payload + let (response_tx, body) = msg.split(); + self.on_pp_rpc_request(response_tx, body, partition_store, schemas) + .await; + } + ServiceMessage::Rpc(msg) if msg.msg_type() == ReceivedIngestRequest::TYPE => { + self.on_pp_ingest_request(msg.into_typed()).await; + } + msg => { + msg.fail(Verdict::MessageUnrecognized); + } + } + } + + async fn on_pp_ingest_request(&mut self, msg: Incoming>) { + let (reciprocal, request) = msg.split(); + histogram!( + PARTITION_INGESTION_REQUEST_LEN, PARTITION_LABEL => self.partition_id_str.clone() + ) + .record(request.records.len() as f64); + + histogram!( + PARTITION_INGESTION_REQUEST_SIZE, PARTITION_LABEL => self.partition_id_str.clone() + ) + .record(request.records.iter().fold(0, |s, r| s + r.estimate_size()) as f64); + + self.leadership_state + .propose_many_with_callback( + request.records.into_iter(), + |result: Result<(), PartitionProcessorRpcError>| match result { + Ok(_) => reciprocal.send(ResponseStatus::Ack.into()), + Err(err) => match err { + PartitionProcessorRpcError::NotLeader(id) + | PartitionProcessorRpcError::LostLeadership(id) => { + reciprocal.send(ResponseStatus::NotLeader { of: id }.into()) + } + PartitionProcessorRpcError::Internal(msg) => { + reciprocal.send(ResponseStatus::Internal { msg }.into()) + } + }, + }, + ) + .await; + } + async fn maybe_advance<'a>( &mut self, maybe_record: LogEntry, diff --git a/docs/dev/bilrost-migration-guidelines.md b/docs/dev/bilrost-migration-guidelines.md index 9939bc1a75..b2d9dfc56e 100644 --- a/docs/dev/bilrost-migration-guidelines.md +++ b/docs/dev/bilrost-migration-guidelines.md @@ -108,6 +108,9 @@ This works completely different from above. It’s mainly because fat enums tran - You also need to have an `EmptyState` an empty variant that carries no data, otherwise you need your field to also become `Option` (that’s where the similarity with numeric enums ends) - Your enum must derive `bilrost::Oneof` - Each variant must carry **EXACTLY** one value. Only **ONE** empty enum variant can show up at the top of the enum to work as the “empty state” AND it must be `untagged` +- If a variant contains more than one value, it must have the `message` attribute +- An enum that derives `bilrost::Oneof` can also derive `bilrost::Message`. In that case, it can be used as + an property in a `Message` struct without the `oneof(..)` attribute. This is very handy for reusability ```rust // Derives Oneof @@ -131,6 +134,11 @@ enum MyFatEnum { ThirdVariant { name: String, }, + #[bilrost(tag=5, message)] + FourthVariant { + name: String, + age: u32 + }, /// This one will not work // FourthVariant(u64, u64) /// Neither this @@ -175,19 +183,30 @@ struct MyOtherStructure{ } ``` -To work around this issue, your fat enums must be wrapped. in it’s own mesage +To work around this issue, your fat can also derive `bilrost::Message` ```rust - -#[derive(bilrost::Message)] -struct MyFatEnumWrapper(#[bilrost(oneof(2,3,4))] MyFatEnum); +// Derives Oneof +#[derive(bilrost::Oneof, bilrost::Message)] +enum MyFatEnum { + Unknown, + #[bilrost(2)] + FirstVariant { + name: String, + }, + #[bilrost(tag=3, message)] + SecondVariant { + name: String, + age: u32 + }, +} // then use it normally as a sub-message struct MyStruct { #[bilrost(1)] id: String, #[bilrost(2)] - fat: MyFatEnumWrapper, + fat: MyFatEnum, } struct MyOtherStructure{ @@ -195,10 +214,10 @@ struct MyOtherStructure{ id: String, #[bilrost(2)] value: SomeMessage - // This is now possible because MyFatEnumWrapper - // is just it's own sub-message + // This is now possible because MyFatEnum + // derives bilrost::Message #[bilrost(3)] - fat: MyFatEnumWrapper, + fat: MyFatEnum, } ```