From c8ad011046aa84b3065d40c1f1ae1aabac78f731 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Thu, 8 Jan 2026 10:28:02 +0000 Subject: [PATCH] [Bifrost] Introduce bifrost.record-size-limit (hidden) Add a record size limit check at append time in Bifrost to validate that individual records do not exceed a configured maximum size. Important note: This configuration option is not going to be effective without implementing size estimation of records. At the moment, all typed records are assumed to be 2048 bytes in size which makes this check useless. Nevertheless, This check is useful for the future when we implement size estimation of records. Changes: - Add `bifrost.record-size-limit` configuration option that defaults to `networking.message-size-limit` (32 MiB) and is clamped to that value - Add `BatchTooLarge/RecordTooLarge` error variants to get notified when a record too large or when a batch is too large depending on whether you're using Appender or BackgroundAppender. - Add record size validation to all `LogSender` enqueue methods in `BackgroundAppender` to fail fast at enqueue time This prevents oversized records from being written to the log, which could cause issues during replication and network transmission. Part of #4130, #4132 --- Cargo.lock | 1 + crates/bifrost/Cargo.toml | 1 + crates/bifrost/src/appender.rs | 28 ++++ crates/bifrost/src/background_appender.rs | 34 ++++- crates/bifrost/src/bifrost.rs | 138 ++++++++++++++++++ crates/bifrost/src/error.rs | 11 ++ crates/types/src/config/bifrost.rs | 44 +++++- crates/types/src/config/mod.rs | 3 + crates/worker/src/partition/leadership/mod.rs | 4 +- .../src/partition/leadership/self_proposer.rs | 12 +- 10 files changed, 261 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 809317af63..8d8cd904c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7047,6 +7047,7 @@ dependencies = [ "restate-log-server", "restate-metadata-store", "restate-rocksdb", + "restate-serde-util", "restate-storage-api", "restate-test-util", "restate-types", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index b7b2edad2b..1953316684 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -56,6 +56,7 @@ restate-bifrost = {path = ".", default-features = false, features = ["local-logl restate-core = { workspace = true, features = ["test-util"] } restate-log-server = { workspace = true } restate-rocksdb = { workspace = true } +restate-serde-util = { workspace = true } restate-storage-api = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index b75ece4445..dbeb46cbd9 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -57,6 +58,11 @@ impl Appender { } } + /// Returns the record size limit for this appender. + pub fn record_size_limit(&self) -> NonZeroUsize { + self.config.pinned().bifrost.record_size_limit() + } + /// Marks this node as a preferred writer for the underlying log pub fn mark_as_preferred(&mut self) { if self.preference_token.is_none() { @@ -83,6 +89,16 @@ impl Appender { body: impl Into>, ) -> Result { let body = body.into().into_record(); + // Validate record sizes before attempting to append + let batch_size_bytes = body.estimated_encode_size(); + let limit = self.record_size_limit(); + if batch_size_bytes > limit.get() { + return Err(Error::BatchTooLarge { + batch_size_bytes, + limit, + }); + } + self.append_batch_erased(Arc::new([body])).await } @@ -103,12 +119,23 @@ impl Appender { batch: Vec>>, ) -> Result { let batch: Arc<[_]> = batch.into_iter().map(|r| r.into().into_record()).collect(); + let batch_size_bytes = batch.iter().map(|r| r.estimated_encode_size()).sum(); + // Validate record sizes before attempting to append + let limit = self.record_size_limit(); + if batch_size_bytes > limit.get() { + return Err(Error::BatchTooLarge { + batch_size_bytes, + limit, + }); + } self.append_batch_erased(batch).await } pub(crate) async fn append_batch_erased(&mut self, batch: Arc<[Record]>) -> Result { self.bifrost_inner.fail_if_shutting_down()?; + let retry_iter = self.config.live_load().bifrost.append_retry_policy(); + debug_assert!(retry_iter.max_attempts().is_none()); let mut retry_iter = retry_iter.into_iter(); @@ -123,6 +150,7 @@ impl Appender { .bifrost .auto_recovery_interval .into(); + let loglet = match self.loglet_cache.as_mut() { None => self .loglet_cache diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index 5d8142d524..cbc0d9fd1a 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; + use futures::FutureExt; use pin_project::pin_project; use restate_types::logs::Record; @@ -57,6 +59,7 @@ where /// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate. pub fn start(self, name: &'static str) -> Result, ShutdownError> { let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity); + let record_size_limit = self.appender.record_size_limit(); let handle = TaskCenter::spawn_unmanaged_child( restate_core::TaskKind::BifrostAppender, @@ -68,6 +71,7 @@ where inner_handle: Some(handle), sender: Some(LogSender { tx, + record_size_limit, _phantom: std::marker::PhantomData, }), }) @@ -230,10 +234,22 @@ impl AppenderHandle { #[derive(Clone)] pub struct LogSender { tx: tokio::sync::mpsc::Sender, + record_size_limit: NonZeroUsize, _phantom: std::marker::PhantomData, } impl LogSender { + fn check_record_size(&self, record: &Record) -> Result<(), EnqueueError> { + let record_size = record.estimated_encode_size(); + if record_size > self.record_size_limit.get() { + return Err(EnqueueError::RecordTooLarge { + record_size, + limit: self.record_size_limit, + }); + } + Ok(()) + } + /// Attempt to enqueue a record to the appender. Returns immediately if the /// appender is pushing back or if the appender is draining or drained. pub fn try_enqueue(&self, record: A) -> Result<(), EnqueueError> @@ -247,6 +263,7 @@ impl LogSender { }; let record = record.into().into_record(); + self.check_record_size(&record)?; permit.send(AppendOperation::Enqueue(record)); Ok(()) } @@ -267,6 +284,7 @@ impl LogSender { let (tx, rx) = oneshot::channel(); let record = record.into().into_record(); + self.check_record_size(&record)?; permit.send(AppendOperation::EnqueueWithNotification(record, tx)); Ok(CommitToken { rx }) } @@ -281,6 +299,7 @@ impl LogSender { return Err(EnqueueError::Closed(record)); }; let record = record.into().into_record(); + self.check_record_size(&record)?; permit.send(AppendOperation::Enqueue(record)); Ok(()) @@ -303,7 +322,9 @@ impl LogSender { }; for (permit, record) in std::iter::zip(permits, records) { - permit.send(AppendOperation::Enqueue(record.into().into_record())); + let record = record.into().into_record(); + self.check_record_size(&record)?; + permit.send(AppendOperation::Enqueue(record)); } Ok(()) } @@ -323,7 +344,9 @@ impl LogSender { }; for (permit, record) in std::iter::zip(permits, records) { - permit.send(AppendOperation::Enqueue(record.into().into_record())); + let record = record.into().into_record(); + self.check_record_size(&record)?; + permit.send(AppendOperation::Enqueue(record)); } Ok(()) @@ -343,10 +366,9 @@ impl LogSender { }; let (tx, rx) = oneshot::channel(); - permit.send(AppendOperation::EnqueueWithNotification( - record.into().into_record(), - tx, - )); + let record = record.into().into_record(); + self.check_record_size(&record)?; + permit.send(AppendOperation::EnqueueWithNotification(record, tx)); Ok(CommitToken { rx }) } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 21691cbadb..9bfe1c596b 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -758,6 +758,7 @@ impl Drop for PreferenceToken { mod tests { use super::*; + use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; use futures::StreamExt; @@ -770,14 +771,21 @@ mod tests { use restate_core::TestCoreEnvBuilder; use restate_core::{TaskCenter, TaskKind, TestCoreEnv}; use restate_rocksdb::RocksDbManager; + use restate_types::config::set_current_config; use restate_types::logs::SequenceNumber; use restate_types::logs::metadata::{SegmentIndex, new_single_node_loglet_params}; use restate_types::metadata::Precondition; use restate_types::partition_table::PartitionTable; use restate_types::{Version, Versioned}; + use crate::error::EnqueueError; use crate::providers::memory_loglet::{self}; + // Helper to create a small byte count for testing + fn small_byte_limit(bytes: usize) -> restate_serde_util::NonZeroByteCount { + restate_serde_util::NonZeroByteCount::new(NonZeroUsize::new(bytes).unwrap()) + } + #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { @@ -1287,4 +1295,134 @@ mod tests { RocksDbManager::get().shutdown().await; Ok(()) } + + #[restate_core::test] + async fn test_append_record_too_large() -> googletest::Result<()> { + // Set up a configuration with a small record size limit. + // Note: The estimated_encode_size for a typed record (e.g., String) is ~2KB constant, + // plus overhead for Keys and NanosSinceEpoch. We set the limit to 1KB to ensure + // the check triggers. + let mut config = restate_types::config::Configuration::default(); + config.networking.message_size_limit = small_byte_limit(1024); // 1KB limit + let config = config.apply_cascading_values(); + set_current_config(config); + + let env = TestCoreEnv::create_with_single_node(1, 1).await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; + + // Get the configured record size limit + let record_size_limit = restate_types::config::Configuration::pinned() + .bifrost + .record_size_limit(); + assert_eq!(record_size_limit.get(), 1024); + + // Any record will have an estimated size of ~2KB+ due to the constant estimate + // for PolyBytes::Typed, which exceeds our 1KB limit + let payload = "test"; + + let appender = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?; + + // Verify the appender has the correct limit + assert_eq!(appender.record_size_limit().get(), 1024); + + // Attempting to append should fail with RecordTooLarge + let mut appender = appender; + let result = appender.append(payload).await; + + assert_that!( + result, + pat!(Err(pat!(Error::BatchTooLarge { + batch_size_bytes: gt(1024), + limit: eq(record_size_limit), + }))) + ); + + Ok(()) + } + + #[restate_core::test] + async fn test_background_appender_record_too_large() -> googletest::Result<()> { + // Set up configuration with a small record size limit (100 bytes). + // Note: The estimated_encode_size for any typed record is ~2KB constant, + // so even "small" strings will exceed the 100 byte limit. + let mut config = restate_types::config::Configuration::default(); + config.networking.message_size_limit = small_byte_limit(100); + // Apply cascading values to propagate the networking limit to bifrost + let config = config.apply_cascading_values(); + set_current_config(config); + + let env = TestCoreEnv::create_with_single_node(1, 1).await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; + + let background_appender: crate::BackgroundAppender = bifrost + .create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?; + + let handle = background_appender.start("test-appender")?; + let sender = handle.sender(); + + // Any record will have an estimated size of ~2KB due to PolyBytes::Typed constant estimate + let payload = "test".to_string(); + + // try_enqueue should fail with RecordTooLarge + let result = sender.try_enqueue(payload.clone()); + assert_that!( + result, + pat!(Err(pat!(EnqueueError::RecordTooLarge { + record_size: gt(100), + limit: eq(NonZeroUsize::new(100).unwrap()), + }))) + ); + + // enqueue (async) should also fail with RecordTooLarge + let result = sender.enqueue(payload.clone()).await; + assert_that!( + result, + pat!(Err(pat!(EnqueueError::RecordTooLarge { + record_size: gt(100), + limit: eq(NonZeroUsize::new(100).unwrap()), + }))) + ); + + // try_enqueue_with_notification should also fail + let result = sender.try_enqueue_with_notification(payload.clone()); + assert!(matches!( + result, + Err(EnqueueError::RecordTooLarge { + record_size, + limit, + }) if record_size > 100 && limit.get() == 100 + )); + + // Drain the appender (nothing should have been enqueued) + handle.drain().await?; + + Ok(()) + } + + #[restate_core::test] + async fn test_background_appender_record_within_limit() -> googletest::Result<()> { + // Set up configuration with a large enough record size limit (10KB) to allow records + let mut config = restate_types::config::Configuration::default(); + config.networking.message_size_limit = small_byte_limit(10 * 1024); // 10KB + let config = config.apply_cascading_values(); + set_current_config(config); + + let env = TestCoreEnv::create_with_single_node(1, 1).await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; + + let background_appender: crate::BackgroundAppender = bifrost + .create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?; + + let handle = background_appender.start("test-appender")?; + let sender = handle.sender(); + + // With a 10KB limit, the ~2KB estimated record should succeed + let payload = "test".to_string(); + sender.enqueue(payload).await?; + + // Drain and wait for commit + handle.drain().await?; + + Ok(()) + } } diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index c0824e6b9c..5879a8c7c6 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; use std::sync::Arc; use restate_core::{ShutdownError, SyncError}; @@ -43,6 +44,11 @@ pub enum Error { AdminError(#[from] AdminError), #[error(transparent)] MetadataStoreError(#[from] Arc), + #[error("record batch too large: {batch_size_bytes} bytes exceeds limit of {limit} bytes")] + BatchTooLarge { + batch_size_bytes: usize, + limit: NonZeroUsize, + }, #[error("{0}")] Other(String), } @@ -53,6 +59,11 @@ pub enum EnqueueError { Full(T), #[error("appender is draining, closed, or crashed")] Closed(T), + #[error("record too large: {record_size} bytes exceeds limit of {limit} bytes")] + RecordTooLarge { + record_size: usize, + limit: NonZeroUsize, + }, } #[derive(Clone, Debug, thiserror::Error)] diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 9c300b4530..d3360158c3 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -20,9 +20,11 @@ use restate_serde_util::{ByteCount, NonZeroByteCount}; use restate_time_util::{FriendlyDuration, NonZeroFriendlyDuration}; use crate::logs::metadata::{NodeSetSize, ProviderKind}; +use crate::net::connect_opts::MESSAGE_SIZE_OVERHEAD; use crate::retries::RetryPolicy; -use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; +use super::networking::DEFAULT_MESSAGE_SIZE_LIMIT; +use super::{CommonOptions, NetworkingOptions, RocksDbOptions, RocksDbOptionsBuilder}; /// # Bifrost options #[serde_as] @@ -86,6 +88,20 @@ pub struct BifrostOptions { /// of replicas, or for other reasons. #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub disable_auto_improvement: bool, + + /// # Record size limit + /// + /// Maximum size of a single record that can be appended to Bifrost. + /// If a record exceeds this limit, the append operation will fail immediately. + /// + /// If unset, defaults to `networking.message-size-limit`. If set, it will be clamped at + /// the value of `networking.message-size-limit` since larger records cannot be transmitted + /// over the cluster internal network. + #[serde(skip_serializing_if = "Option::is_none")] + // Hide the configuration until record size estimation is implemented, currently this option is + // not effective due to the fixed size estimation of typed records. + #[cfg_attr(feature = "schemars", schemars(skip))] + record_size_limit: Option, } impl BifrostOptions { @@ -99,9 +115,34 @@ impl BifrostOptions { ) } + /// Returns the record size limit, defaulting to `networking.message-size-limit` if not set. + pub fn record_size_limit(&self) -> NonZeroUsize { + let limit = self + .record_size_limit + .map(|v| v.as_non_zero_usize()) + .unwrap_or(DEFAULT_MESSAGE_SIZE_LIMIT); + + if cfg!(any(test, feature = "test-util")) { + // In tests, we don't want to leave overhead + limit + } else { + // Add a bit of overhead to account for the overhead of the record envelope + limit.saturating_add(MESSAGE_SIZE_OVERHEAD.div_ceil(2)) + } + } + pub fn apply_common(&mut self, common: &CommonOptions) { self.local.apply_common(common); } + + /// Clamps the record size limit to the networking message size limit. + pub(crate) fn set_derived_values(&mut self, networking: &NetworkingOptions) { + self.record_size_limit = Some( + self.record_size_limit + .map(|limit| limit.min(networking.message_size_limit)) + .unwrap_or(networking.message_size_limit), + ); + } } impl Default for BifrostOptions { @@ -122,6 +163,7 @@ impl Default for BifrostOptions { seal_retry_interval: NonZeroFriendlyDuration::from_secs_unchecked(2), record_cache_memory_size: ByteCount::from(250u64 * 1024 * 1024), // 250 MiB disable_auto_improvement: false, + record_size_limit: None, } } } diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index 0e89978383..cb236a8b11 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -195,6 +195,7 @@ impl Configuration { .set_derived_values(&config.networking) .unwrap(); config.worker.set_derived_values(&config.networking); + config.bifrost.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); config @@ -208,6 +209,7 @@ impl Configuration { .set_derived_values(&config.networking) .unwrap(); config.worker.set_derived_values(&config.networking); + config.bifrost.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); config @@ -260,6 +262,7 @@ impl Configuration { pub fn apply_cascading_values(mut self) -> Self { self.worker.storage.apply_common(&self.common); self.bifrost.apply_common(&self.common); + self.bifrost.set_derived_values(&self.networking); self.metadata_server.apply_common(&self.common); self.log_server.apply_common(&self.common); self diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index ed13f73162..dd6fab9c4a 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -93,8 +93,8 @@ pub(crate) enum Error { Decode(#[from] StorageDecodeError), #[error(transparent)] Shutdown(#[from] ShutdownError), - #[error("error when self proposing")] - SelfProposer, + #[error("error when self proposing: {0}")] + SelfProposer(String), #[error("task '{name}' failed: {cause}")] TaskFailed { name: &'static str, diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index cb1e445e3a..79717085e6 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -106,7 +106,7 @@ impl SelfProposer { .sender() .enqueue_many(envelopes) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; // update the sequence number range for the next batch self.epoch_sequence_number = EpochSequenceNumber { @@ -129,7 +129,7 @@ impl SelfProposer { .sender() .enqueue(Arc::new(envelope)) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; Ok(()) } @@ -146,7 +146,7 @@ impl SelfProposer { .sender() .enqueue_with_notification(Arc::new(envelope)) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; Ok(commit_token) } @@ -166,7 +166,7 @@ impl SelfProposer { // sender // .enqueue_many(records) // .await - // .map_err(|_| Error::SelfProposer)?; + // .map_err(|e| Error::SelfProposer(e.to_string()))?; // // so instead we do this. @@ -184,13 +184,13 @@ impl SelfProposer { sender .enqueue(input) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; } sender .notify_committed() .await - .map_err(|_| Error::SelfProposer) + .map_err(|e| Error::SelfProposer(e.to_string())) } fn create_header(&mut self, partition_key: PartitionKey) -> Header {