From 9997c8081b8d1e876ecd19791526d9a22719c1c5 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 17 Dec 2025 12:52:27 +0100 Subject: [PATCH 1/2] [Ingest] Use bilrost encoding for ingestion message Summary: Switch to bilrost encoding --- crates/types/src/net/ingest.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/crates/types/src/net/ingest.rs b/crates/types/src/net/ingest.rs index b18d7f4ef1..47d88ad0e5 100644 --- a/crates/types/src/net/ingest.rs +++ b/crates/types/src/net/ingest.rs @@ -13,15 +13,19 @@ use std::sync::Arc; use bytes::{Bytes, BytesMut}; use metrics::Key; +use restate_encoding::ArcedSlice; + use crate::identifiers::PartitionId; use crate::logs::{HasRecordKeys, Keys}; use crate::net::partition_processor::PartitionLeaderService; -use crate::net::{RpcRequest, bilrost_wire_codec, default_wire_codec, define_rpc}; +use crate::net::{RpcRequest, bilrost_wire_codec, define_rpc}; use crate::storage::{StorageCodec, StorageEncode}; -#[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Eq, PartialEq, Clone, bilrost::Message)] pub struct IngestRecord { + #[bilrost(1)] pub keys: Keys, + #[bilrost(2)] pub record: Bytes, } @@ -50,8 +54,9 @@ impl HasRecordKeys for IngestRecord { } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, bilrost::Message)] pub struct IngestRequest { + #[bilrost(tag(1), encoding(ArcedSlice))] pub records: Arc<[IngestRecord]>, } @@ -69,8 +74,7 @@ impl From> for IngestRequest { } } -// todo(azmy): Use bilrost (depends on the payload) -default_wire_codec!(IngestRequest); +bilrost_wire_codec!(IngestRequest); #[derive(Debug, Clone, bilrost::Oneof, bilrost::Message)] pub enum ResponseStatus { @@ -107,12 +111,13 @@ define_rpc! { @service=PartitionLeaderService, } -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, bilrost::Message)] pub struct ReceivedIngestRequest { + #[bilrost(tag(1), encoding(packed))] pub records: Vec, } -default_wire_codec!(ReceivedIngestRequest); +bilrost_wire_codec!(ReceivedIngestRequest); /// The [`ReceivedIngestRequest`] uses the same TYPE /// as [`IngestRequest`] to be able to directly decode From 88c812deaad9ca44968ddd7e4eec4ac9c4442125 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 17 Dec 2025 12:52:27 +0100 Subject: [PATCH 2/2] [Cleaner] remove the cleaner external bifrost writer Summary: This PR makes sure cleaner does not do an external bifrost write by using creating a cleaner effect stream that can be handled directly by the PP event loop --- crates/worker/src/partition/cleaner.rs | 200 +++++++----------- .../src/partition/leadership/leader_state.rs | 72 +++---- crates/worker/src/partition/leadership/mod.rs | 14 +- .../src/partition/state_machine/actions.rs | 6 - 4 files changed, 112 insertions(+), 180 deletions(-) diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index 200ee93345..5c19486717 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -9,28 +9,48 @@ // by the Apache License, Version 2.0. use std::ops::RangeInclusive; -use std::sync::Arc; use std::time::{Duration, SystemTime}; use anyhow::Context; -use futures::StreamExt; +use futures::{Stream, StreamExt}; +use tokio::sync::mpsc::{self, Sender}; use tokio::time::{Instant, MissedTickBehavior}; +use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, instrument, warn}; -use restate_bifrost::Bifrost; -use restate_core::cancellation_watcher; +use restate_core::{ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind, cancellation_watcher}; use restate_storage_api::invocation_status_table::{InvocationStatus, ScanInvocationStatusTable}; -use restate_types::identifiers::WithPartitionKey; -use restate_types::identifiers::{LeaderEpoch, PartitionKey}; -use restate_types::invocation::PurgeInvocationRequest; +use restate_types::identifiers::PartitionKey; +use restate_types::identifiers::{InvocationId, PartitionId}; use restate_types::retries::with_jitter; -use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; + +const CLEANER_EFFECT_QUEUE_SIZE: usize = 10; + +#[derive(Debug, Clone)] +pub enum CleanerEffect { + PurgeInvocation(InvocationId), + PurgeJournal(InvocationId), +} + +pub(super) struct CleanerHandle { + task_id: TaskId, + rx: ReceiverStream, +} + +impl CleanerHandle { + pub fn stop(self) -> Option> { + TaskCenter::cancel_task(self.task_id) + } + + pub fn effects(&mut self) -> impl Stream { + &mut self.rx + } +} pub(super) struct Cleaner { - leader_epoch: LeaderEpoch, + partition_id: PartitionId, partition_key_range: RangeInclusive, storage: Storage, - bifrost: Bifrost, cleanup_interval: Duration, } @@ -39,53 +59,54 @@ where Storage: ScanInvocationStatusTable + Send + Sync + 'static, { pub(super) fn new( - leader_epoch: LeaderEpoch, storage: Storage, - bifrost: Bifrost, + partition_id: PartitionId, partition_key_range: RangeInclusive, cleanup_interval: Duration, ) -> Self { Self { - leader_epoch, + partition_id, partition_key_range, storage, - bifrost, cleanup_interval, } } - #[instrument(skip_all)] - pub(super) async fn run(self) -> anyhow::Result<()> { - let Self { - leader_epoch, - partition_key_range, - storage, - bifrost, - cleanup_interval, - } = self; + pub(super) fn start(self) -> Result { + let (tx, rx) = mpsc::channel(CLEANER_EFFECT_QUEUE_SIZE); + let task_id = TaskCenter::spawn_child(TaskKind::Cleaner, "cleaner", self.run(tx))?; - debug!(?cleanup_interval, "Running cleaner"); + Ok(CleanerHandle { + task_id, + rx: ReceiverStream::new(rx), + }) + } - let bifrost_envelope_source = Source::Processor { - partition_id: None, - partition_key: None, - leader_epoch, - }; + #[instrument(skip_all)] + async fn run(self, tx: Sender) -> anyhow::Result<()> { + debug!( + partition_id=%self.partition_id, + cleanup_interval=?self.cleanup_interval, + "Running cleaner" + ); // the cleaner is currently quite an expensive scan and we don't strictly need to do it on startup, so we will wait // for 20-40% of the interval (so, 12-24 minutes by default) before doing the first one - let initial_wait = with_jitter(cleanup_interval.mul_f32(0.2), 1.0); + let initial_wait = with_jitter(self.cleanup_interval.mul_f32(0.2), 1.0); // the first tick will fire after initial_wait let mut interval = - tokio::time::interval_at(Instant::now() + initial_wait, cleanup_interval); + tokio::time::interval_at(Instant::now() + initial_wait, self.cleanup_interval); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { tokio::select! { _ = interval.tick() => { - if let Err(e) = Self::do_cleanup(&storage, &bifrost, partition_key_range.clone(), &bifrost_envelope_source).await { - warn!("Error when trying to cleanup completed invocations: {e:?}"); + if let Err(e) = self.do_cleanup(&tx).await { + warn!( + partition_id=%self.partition_id, + "Error when trying to cleanup completed invocations: {e:?}" + ); } }, _ = cancellation_watcher() => { @@ -99,15 +120,12 @@ where Ok(()) } - pub(super) async fn do_cleanup( - storage: &Storage, - bifrost: &Bifrost, - partition_key_range: RangeInclusive, - bifrost_envelope_source: &Source, - ) -> anyhow::Result<()> { - debug!("Executing completed invocations cleanup"); + pub(super) async fn do_cleanup(&self, tx: &Sender) -> anyhow::Result<()> { + debug!(partition_id=%self.partition_id, "Starting invocation cleanup"); - let invocations_stream = storage.scan_invocation_statuses(partition_key_range)?; + let invocations_stream = self + .storage + .scan_invocation_statuses(self.partition_key_range.clone())?; tokio::pin!(invocations_stream); while let Some((invocation_id, invocation_status)) = invocations_stream @@ -132,24 +150,9 @@ where .checked_add(completed_invocation.completion_retention_duration) && now >= status_expiration_time { - restate_bifrost::append_to_bifrost( - bifrost, - Arc::new(Envelope { - header: Header { - source: bifrost_envelope_source.clone(), - dest: Destination::Processor { - partition_key: invocation_id.partition_key(), - dedup: None, - }, - }, - command: Command::PurgeInvocation(PurgeInvocationRequest { - invocation_id, - response_sink: None, - }), - }), - ) - .await - .context("Cannot append to bifrost purge invocation")?; + tx.send(CleanerEffect::PurgeInvocation(invocation_id)) + .await + .context("Cannot append to bifrost purge invocation")?; continue; } @@ -165,24 +168,9 @@ where }; if now >= journal_expiration_time { - restate_bifrost::append_to_bifrost( - bifrost, - Arc::new(Envelope { - header: Header { - source: bifrost_envelope_source.clone(), - dest: Destination::Processor { - partition_key: invocation_id.partition_key(), - dedup: None, - }, - }, - command: Command::PurgeJournal(PurgeInvocationRequest { - invocation_id, - response_sink: None, - }), - }), - ) - .await - .context("Cannot append to bifrost purge journal")?; + tx.send(CleanerEffect::PurgeJournal(invocation_id)) + .await + .context("Cannot append to bifrost purge journal")?; continue; } } @@ -198,16 +186,13 @@ mod tests { use futures::{Stream, stream}; use googletest::prelude::*; - use restate_core::{Metadata, TaskCenter, TaskKind, TestCoreEnvBuilder}; use restate_storage_api::StorageError; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, InvokedInvocationStatusLite, JournalMetadata, ScanInvocationStatusTable, }; use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy; - use restate_types::Version; use restate_types::identifiers::{InvocationId, InvocationUuid}; - use restate_types::partition_table::{FindPartition, PartitionTable}; use test_log::test; #[allow(dead_code)] @@ -256,15 +241,6 @@ mod tests { // Start paused makes sure the timer is immediately fired #[test(restate_core::test(start_paused = true))] pub async fn cleanup_works() { - let env = TestCoreEnvBuilder::with_incoming_only_connector() - .set_partition_table(PartitionTable::with_equally_sized_partitions( - Version::MIN, - 1, - )) - .build() - .await; - let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; - let expired_invocation = InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random()); let expired_journal = @@ -315,50 +291,26 @@ mod tests { ), ]); - TaskCenter::spawn( - TaskKind::Cleaner, - "cleaner", - Cleaner::new( - LeaderEpoch::INITIAL, - mock_storage, - bifrost.clone(), - RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX), - Duration::from_secs(1), - ) - .run(), + let mut handle = Cleaner::new( + mock_storage, + 0.into(), + RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX), + Duration::from_secs(1), ) + .start() .unwrap(); // cleanup will run after around 200ms - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::advance(Duration::from_secs(1)).await; - // All the invocation ids were created with same partition keys, hence same partition id. - let partition_id = Metadata::with_current(|m| { - m.partition_table_snapshot() - .find_partition_id(expired_invocation.partition_key()) - }) - .unwrap(); - - let log_entries: Vec<_> = bifrost - .read_all(partition_id.into()) - .await - .unwrap() - .into_iter() - .map(|e| e.try_decode::().unwrap().unwrap().command) - .collect(); + let received: Vec<_> = handle.effects().ready_chunks(10).next().await.unwrap(); assert_that!( - log_entries, + received, all!( len(eq(2)), - contains(pat!(Command::PurgeInvocation(pat!( - PurgeInvocationRequest { - invocation_id: eq(expired_invocation), - } - )))), - contains(pat!(Command::PurgeJournal(pat!(PurgeInvocationRequest { - invocation_id: eq(expired_journal), - })))), + contains(pat!(CleanerEffect::PurgeInvocation(eq(expired_invocation)))), + contains(pat!(CleanerEffect::PurgeJournal(eq(expired_journal)))) ) ); } diff --git a/crates/worker/src/partition/leadership/leader_state.rs b/crates/worker/src/partition/leadership/leader_state.rs index 500b202492..842009956d 100644 --- a/crates/worker/src/partition/leadership/leader_state.rs +++ b/crates/worker/src/partition/leadership/leader_state.rs @@ -8,14 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::collections::HashMap; use std::collections::hash_map::Entry; -use std::collections::{HashMap, VecDeque}; -use std::future; use std::future::Future; use std::ops::RangeInclusive; use std::pin::Pin; use std::task::{Context, Poll, ready}; -use std::time::{Duration, SystemTime}; use futures::future::OptionFuture; use futures::stream::FuturesUnordered; @@ -31,25 +29,24 @@ use restate_futures_util::concurrency::Permit; use restate_partition_store::{PartitionDb, PartitionStore}; use restate_storage_api::vqueue_table::EntryCard; use restate_types::identifiers::{ - InvocationId, LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, - WithPartitionKey, + LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; +use restate_types::invocation::PurgeInvocationRequest; use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification}; use restate_types::logs::Keys; use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ PartitionProcessorRpcError, PartitionProcessorRpcResponse, }; -use restate_types::time::MillisSinceEpoch; use restate_types::{SemanticRestateVersion, Version, Versioned}; use restate_vqueues::VQueueEvent; use restate_vqueues::{SchedulerService, VQueuesMeta, scheduler}; use restate_wal_protocol::control::UpsertSchema; -use restate_wal_protocol::timer::TimerKeyValue; use restate_wal_protocol::vqueues::Assignment; use restate_wal_protocol::{Command, vqueues}; use crate::metric_definitions::{PARTITION_HANDLE_LEADER_ACTIONS, USAGE_LEADER_ACTION_COUNT}; +use crate::partition::cleaner::{CleanerEffect, CleanerHandle}; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::self_proposer::SelfProposer; use crate::partition::leadership::{ActionEffect, Error, InvokerStream, TimerService}; @@ -86,8 +83,7 @@ pub struct LeaderState { invoker_stream: InvokerStream, shuffle_stream: ReceiverStream, schema_stream: WatchStream, - pub pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>, - cleaner_task_id: TaskId, + cleaner_handle: CleanerHandle, trimmer_task_id: TaskId, durability_tracker: DurabilityTracker, } @@ -99,7 +95,7 @@ impl LeaderState { leader_epoch: LeaderEpoch, partition_key_range: RangeInclusive, shuffle_task_handle: TaskHandle>, - cleaner_task_id: TaskId, + cleaner_handle: CleanerHandle, trimmer_task_id: TaskId, shuffle_hint_tx: HintSender, timer_service: TimerService, @@ -114,7 +110,7 @@ impl LeaderState { leader_epoch, partition_key_range, shuffle_task_handle: Some(shuffle_task_handle), - cleaner_task_id, + cleaner_handle, trimmer_task_id, shuffle_hint_tx, schema_stream: Metadata::with_current(|m| { @@ -128,7 +124,6 @@ impl LeaderState { awaiting_rpc_self_propose: Default::default(), invoker_stream: invoker_rx, shuffle_stream: ReceiverStream::new(shuffle_rx), - pending_cleanup_timers_to_schedule: Default::default(), durability_tracker, } } @@ -176,22 +171,11 @@ impl LeaderState { let invoker_stream = (&mut self.invoker_stream).map(ActionEffect::Invoker); let shuffle_stream = (&mut self.shuffle_stream).map(ActionEffect::Shuffle); + let cleaner_stream = self.cleaner_handle.effects().map(ActionEffect::Cleaner); + let dur_tracker_stream = (&mut self.durability_tracker).map(ActionEffect::PartitionMaintenance); - let action_effects_stream = stream::unfold( - &mut self.pending_cleanup_timers_to_schedule, - |pending_cleanup_timers_to_schedule| { - let result = pending_cleanup_timers_to_schedule.pop_front(); - future::ready(result.map(|(invocation_id, duration)| { - ( - ActionEffect::ScheduleCleanupTimer(invocation_id, duration), - pending_cleanup_timers_to_schedule, - ) - })) - }, - ) - .fuse(); let awaiting_rpc_self_propose_stream = (&mut self.awaiting_rpc_self_propose).map(|_| ActionEffect::AwaitingRpcSelfProposeDone); @@ -200,7 +184,7 @@ impl LeaderState { invoker_stream, shuffle_stream, timer_stream, - action_effects_stream, + cleaner_stream, awaiting_rpc_self_propose_stream, dur_tracker_stream, schema_stream @@ -249,7 +233,7 @@ impl LeaderState { // re-use of the self proposer self.self_proposer.mark_as_non_leader().await; - let cleaner_handle = OptionFuture::from(TaskCenter::cancel_task(self.cleaner_task_id)); + let cleaner_handle = OptionFuture::from(self.cleaner_handle.stop()); // We don't really care about waiting for the trimmer to finish cancelling TaskCenter::cancel_task(self.trimmer_task_id); @@ -384,15 +368,26 @@ impl LeaderState { .propose(timer.invocation_id().partition_key(), Command::Timer(timer)) .await?; } - ActionEffect::ScheduleCleanupTimer(invocation_id, duration) => { - self.self_proposer - .propose( - invocation_id.partition_key(), - Command::ScheduleTimer(TimerKeyValue::clean_invocation_status( - MillisSinceEpoch::from(SystemTime::now() + duration), + ActionEffect::Cleaner(effect) => { + let (invocation_id, cmd) = match effect { + CleanerEffect::PurgeJournal(invocation_id) => ( + invocation_id, + Command::PurgeJournal(PurgeInvocationRequest { invocation_id, - )), - ) + response_sink: None, + }), + ), + CleanerEffect::PurgeInvocation(invocation_id) => ( + invocation_id, + Command::PurgeInvocation(PurgeInvocationRequest { + invocation_id, + response_sink: None, + }), + ), + }; + + self.self_proposer + .propose(invocation_id.partition_key(), cmd) .await?; } ActionEffect::UpsertSchema(schema) => { @@ -606,13 +601,6 @@ impl LeaderState { ))); } } - Action::ScheduleInvocationStatusCleanup { - invocation_id, - retention, - } => { - self.pending_cleanup_timers_to_schedule - .push_back((invocation_id, retention)); - } Action::ForwardNotification { invocation_id, notification, diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index fcce25dc2f..a166717cf4 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -46,8 +46,8 @@ use restate_types::GenerationalNodeId; use restate_types::cluster::cluster_state::RunMode; use restate_types::config::Configuration; use restate_types::errors::GenericError; -use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId}; use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch}; +use restate_types::identifiers::{PartitionKey, PartitionProcessorRpcRequestId}; use restate_types::message::MessageIndex; use restate_types::net::ingest::IngestRecord; use restate_types::net::partition_processor::{ @@ -63,7 +63,7 @@ use restate_wal_protocol::Command; use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability}; use restate_wal_protocol::timer::TimerKeyValue; -use crate::partition::cleaner::Cleaner; +use crate::partition::cleaner::{self, Cleaner}; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::leader_state::LeaderState; use crate::partition::leadership::self_proposer::SelfProposer; @@ -131,7 +131,7 @@ pub(crate) enum ActionEffect { Invoker(Box), Shuffle(shuffle::OutboxTruncation), Timer(TimerKeyValue), - ScheduleCleanupTimer(InvocationId, Duration), + Cleaner(cleaner::CleanerEffect), PartitionMaintenance(PartitionDurability), UpsertSchema(Schema), AwaitingRpcSelfProposeDone, @@ -421,15 +421,13 @@ where TaskCenter::spawn_unmanaged(TaskKind::Shuffle, "shuffle", shuffle.run())?; let cleaner = Cleaner::new( - *leader_epoch, partition_store.clone(), - self.bifrost.clone(), + self.partition.partition_id, self.partition.key_range.clone(), config.worker.cleanup_interval(), ); - let cleaner_task_id = - TaskCenter::spawn_child(TaskKind::Cleaner, "cleaner", cleaner.run())?; + let cleaner_handle = cleaner.start()?; let trimmer_task_id = LogTrimmer::spawn( self.bifrost.clone(), @@ -458,7 +456,7 @@ where *leader_epoch, self.partition.key_range.clone(), shuffle_task_handle, - cleaner_task_id, + cleaner_handle, trimmer_task_id, shuffle_hint_tx, timer_service, diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index d077fb16dc..b902e2ea29 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::time::Duration; - use restate_invoker_api::InvokeInputJournal; use restate_storage_api::outbox_table::OutboxMessage; use restate_storage_api::timer_table::TimerKey; @@ -86,10 +84,6 @@ pub enum Action { /// otherwise the invocation was previously submitted. is_new_invocation: bool, }, - ScheduleInvocationStatusCleanup { - invocation_id: InvocationId, - retention: Duration, - }, ForwardKillResponse { request_id: PartitionProcessorRpcRequestId, response: KillInvocationResponse,