diff --git a/Cargo.lock b/Cargo.lock index 809317af63..8d8cd904c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7047,6 +7047,7 @@ dependencies = [ "restate-log-server", "restate-metadata-store", "restate-rocksdb", + "restate-serde-util", "restate-storage-api", "restate-test-util", "restate-types", diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index b7b2edad2b..1953316684 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -56,6 +56,7 @@ restate-bifrost = {path = ".", default-features = false, features = ["local-logl restate-core = { workspace = true, features = ["test-util"] } restate-log-server = { workspace = true } restate-rocksdb = { workspace = true } +restate-serde-util = { workspace = true } restate-storage-api = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/crates/bifrost/src/appender.rs b/crates/bifrost/src/appender.rs index b75ece4445..dbeb46cbd9 100644 --- a/crates/bifrost/src/appender.rs +++ b/crates/bifrost/src/appender.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -57,6 +58,11 @@ impl Appender { } } + /// Returns the record size limit for this appender. + pub fn record_size_limit(&self) -> NonZeroUsize { + self.config.pinned().bifrost.record_size_limit() + } + /// Marks this node as a preferred writer for the underlying log pub fn mark_as_preferred(&mut self) { if self.preference_token.is_none() { @@ -83,6 +89,16 @@ impl Appender { body: impl Into>, ) -> Result { let body = body.into().into_record(); + // Validate record sizes before attempting to append + let batch_size_bytes = body.estimated_encode_size(); + let limit = self.record_size_limit(); + if batch_size_bytes > limit.get() { + return Err(Error::BatchTooLarge { + batch_size_bytes, + limit, + }); + } + self.append_batch_erased(Arc::new([body])).await } @@ -103,12 +119,23 @@ impl Appender { batch: Vec>>, ) -> Result { let batch: Arc<[_]> = batch.into_iter().map(|r| r.into().into_record()).collect(); + let batch_size_bytes = batch.iter().map(|r| r.estimated_encode_size()).sum(); + // Validate record sizes before attempting to append + let limit = self.record_size_limit(); + if batch_size_bytes > limit.get() { + return Err(Error::BatchTooLarge { + batch_size_bytes, + limit, + }); + } self.append_batch_erased(batch).await } pub(crate) async fn append_batch_erased(&mut self, batch: Arc<[Record]>) -> Result { self.bifrost_inner.fail_if_shutting_down()?; + let retry_iter = self.config.live_load().bifrost.append_retry_policy(); + debug_assert!(retry_iter.max_attempts().is_none()); let mut retry_iter = retry_iter.into_iter(); @@ -123,6 +150,7 @@ impl Appender { .bifrost .auto_recovery_interval .into(); + let loglet = match self.loglet_cache.as_mut() { None => self .loglet_cache diff --git a/crates/bifrost/src/background_appender.rs b/crates/bifrost/src/background_appender.rs index 5d8142d524..cbc0d9fd1a 100644 --- a/crates/bifrost/src/background_appender.rs +++ b/crates/bifrost/src/background_appender.rs @@ -8,6 +8,8 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; + use futures::FutureExt; use pin_project::pin_project; use restate_types::logs::Record; @@ -57,6 +59,7 @@ where /// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate. pub fn start(self, name: &'static str) -> Result, ShutdownError> { let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity); + let record_size_limit = self.appender.record_size_limit(); let handle = TaskCenter::spawn_unmanaged_child( restate_core::TaskKind::BifrostAppender, @@ -68,6 +71,7 @@ where inner_handle: Some(handle), sender: Some(LogSender { tx, + record_size_limit, _phantom: std::marker::PhantomData, }), }) @@ -230,10 +234,22 @@ impl AppenderHandle { #[derive(Clone)] pub struct LogSender { tx: tokio::sync::mpsc::Sender, + record_size_limit: NonZeroUsize, _phantom: std::marker::PhantomData, } impl LogSender { + fn check_record_size(&self, record: &Record) -> Result<(), EnqueueError> { + let record_size = record.estimated_encode_size(); + if record_size > self.record_size_limit.get() { + return Err(EnqueueError::RecordTooLarge { + record_size, + limit: self.record_size_limit, + }); + } + Ok(()) + } + /// Attempt to enqueue a record to the appender. Returns immediately if the /// appender is pushing back or if the appender is draining or drained. pub fn try_enqueue(&self, record: A) -> Result<(), EnqueueError> @@ -247,6 +263,7 @@ impl LogSender { }; let record = record.into().into_record(); + self.check_record_size(&record)?; permit.send(AppendOperation::Enqueue(record)); Ok(()) } @@ -267,6 +284,7 @@ impl LogSender { let (tx, rx) = oneshot::channel(); let record = record.into().into_record(); + self.check_record_size(&record)?; permit.send(AppendOperation::EnqueueWithNotification(record, tx)); Ok(CommitToken { rx }) } @@ -281,6 +299,7 @@ impl LogSender { return Err(EnqueueError::Closed(record)); }; let record = record.into().into_record(); + self.check_record_size(&record)?; permit.send(AppendOperation::Enqueue(record)); Ok(()) @@ -303,7 +322,9 @@ impl LogSender { }; for (permit, record) in std::iter::zip(permits, records) { - permit.send(AppendOperation::Enqueue(record.into().into_record())); + let record = record.into().into_record(); + self.check_record_size(&record)?; + permit.send(AppendOperation::Enqueue(record)); } Ok(()) } @@ -323,7 +344,9 @@ impl LogSender { }; for (permit, record) in std::iter::zip(permits, records) { - permit.send(AppendOperation::Enqueue(record.into().into_record())); + let record = record.into().into_record(); + self.check_record_size(&record)?; + permit.send(AppendOperation::Enqueue(record)); } Ok(()) @@ -343,10 +366,9 @@ impl LogSender { }; let (tx, rx) = oneshot::channel(); - permit.send(AppendOperation::EnqueueWithNotification( - record.into().into_record(), - tx, - )); + let record = record.into().into_record(); + self.check_record_size(&record)?; + permit.send(AppendOperation::EnqueueWithNotification(record, tx)); Ok(CommitToken { rx }) } diff --git a/crates/bifrost/src/bifrost.rs b/crates/bifrost/src/bifrost.rs index 21691cbadb..9bfe1c596b 100644 --- a/crates/bifrost/src/bifrost.rs +++ b/crates/bifrost/src/bifrost.rs @@ -758,6 +758,7 @@ impl Drop for PreferenceToken { mod tests { use super::*; + use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; use futures::StreamExt; @@ -770,14 +771,21 @@ mod tests { use restate_core::TestCoreEnvBuilder; use restate_core::{TaskCenter, TaskKind, TestCoreEnv}; use restate_rocksdb::RocksDbManager; + use restate_types::config::set_current_config; use restate_types::logs::SequenceNumber; use restate_types::logs::metadata::{SegmentIndex, new_single_node_loglet_params}; use restate_types::metadata::Precondition; use restate_types::partition_table::PartitionTable; use restate_types::{Version, Versioned}; + use crate::error::EnqueueError; use crate::providers::memory_loglet::{self}; + // Helper to create a small byte count for testing + fn small_byte_limit(bytes: usize) -> restate_serde_util::NonZeroByteCount { + restate_serde_util::NonZeroByteCount::new(NonZeroUsize::new(bytes).unwrap()) + } + #[restate_core::test] #[traced_test] async fn test_append_smoke() -> googletest::Result<()> { @@ -1287,4 +1295,134 @@ mod tests { RocksDbManager::get().shutdown().await; Ok(()) } + + #[restate_core::test] + async fn test_append_record_too_large() -> googletest::Result<()> { + // Set up a configuration with a small record size limit. + // Note: The estimated_encode_size for a typed record (e.g., String) is ~2KB constant, + // plus overhead for Keys and NanosSinceEpoch. We set the limit to 1KB to ensure + // the check triggers. + let mut config = restate_types::config::Configuration::default(); + config.networking.message_size_limit = small_byte_limit(1024); // 1KB limit + let config = config.apply_cascading_values(); + set_current_config(config); + + let env = TestCoreEnv::create_with_single_node(1, 1).await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; + + // Get the configured record size limit + let record_size_limit = restate_types::config::Configuration::pinned() + .bifrost + .record_size_limit(); + assert_eq!(record_size_limit.get(), 1024); + + // Any record will have an estimated size of ~2KB+ due to the constant estimate + // for PolyBytes::Typed, which exceeds our 1KB limit + let payload = "test"; + + let appender = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?; + + // Verify the appender has the correct limit + assert_eq!(appender.record_size_limit().get(), 1024); + + // Attempting to append should fail with RecordTooLarge + let mut appender = appender; + let result = appender.append(payload).await; + + assert_that!( + result, + pat!(Err(pat!(Error::BatchTooLarge { + batch_size_bytes: gt(1024), + limit: eq(record_size_limit), + }))) + ); + + Ok(()) + } + + #[restate_core::test] + async fn test_background_appender_record_too_large() -> googletest::Result<()> { + // Set up configuration with a small record size limit (100 bytes). + // Note: The estimated_encode_size for any typed record is ~2KB constant, + // so even "small" strings will exceed the 100 byte limit. + let mut config = restate_types::config::Configuration::default(); + config.networking.message_size_limit = small_byte_limit(100); + // Apply cascading values to propagate the networking limit to bifrost + let config = config.apply_cascading_values(); + set_current_config(config); + + let env = TestCoreEnv::create_with_single_node(1, 1).await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; + + let background_appender: crate::BackgroundAppender = bifrost + .create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?; + + let handle = background_appender.start("test-appender")?; + let sender = handle.sender(); + + // Any record will have an estimated size of ~2KB due to PolyBytes::Typed constant estimate + let payload = "test".to_string(); + + // try_enqueue should fail with RecordTooLarge + let result = sender.try_enqueue(payload.clone()); + assert_that!( + result, + pat!(Err(pat!(EnqueueError::RecordTooLarge { + record_size: gt(100), + limit: eq(NonZeroUsize::new(100).unwrap()), + }))) + ); + + // enqueue (async) should also fail with RecordTooLarge + let result = sender.enqueue(payload.clone()).await; + assert_that!( + result, + pat!(Err(pat!(EnqueueError::RecordTooLarge { + record_size: gt(100), + limit: eq(NonZeroUsize::new(100).unwrap()), + }))) + ); + + // try_enqueue_with_notification should also fail + let result = sender.try_enqueue_with_notification(payload.clone()); + assert!(matches!( + result, + Err(EnqueueError::RecordTooLarge { + record_size, + limit, + }) if record_size > 100 && limit.get() == 100 + )); + + // Drain the appender (nothing should have been enqueued) + handle.drain().await?; + + Ok(()) + } + + #[restate_core::test] + async fn test_background_appender_record_within_limit() -> googletest::Result<()> { + // Set up configuration with a large enough record size limit (10KB) to allow records + let mut config = restate_types::config::Configuration::default(); + config.networking.message_size_limit = small_byte_limit(10 * 1024); // 10KB + let config = config.apply_cascading_values(); + set_current_config(config); + + let env = TestCoreEnv::create_with_single_node(1, 1).await; + let bifrost = Bifrost::init_in_memory(env.metadata_writer).await; + + let background_appender: crate::BackgroundAppender = bifrost + .create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?; + + let handle = background_appender.start("test-appender")?; + let sender = handle.sender(); + + // With a 10KB limit, the ~2KB estimated record should succeed + let payload = "test".to_string(); + sender.enqueue(payload).await?; + + // Drain and wait for commit + handle.drain().await?; + + Ok(()) + } } diff --git a/crates/bifrost/src/error.rs b/crates/bifrost/src/error.rs index c0824e6b9c..5879a8c7c6 100644 --- a/crates/bifrost/src/error.rs +++ b/crates/bifrost/src/error.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; use std::sync::Arc; use restate_core::{ShutdownError, SyncError}; @@ -43,6 +44,11 @@ pub enum Error { AdminError(#[from] AdminError), #[error(transparent)] MetadataStoreError(#[from] Arc), + #[error("record batch too large: {batch_size_bytes} bytes exceeds limit of {limit} bytes")] + BatchTooLarge { + batch_size_bytes: usize, + limit: NonZeroUsize, + }, #[error("{0}")] Other(String), } @@ -53,6 +59,11 @@ pub enum EnqueueError { Full(T), #[error("appender is draining, closed, or crashed")] Closed(T), + #[error("record too large: {record_size} bytes exceeds limit of {limit} bytes")] + RecordTooLarge { + record_size: usize, + limit: NonZeroUsize, + }, } #[derive(Clone, Debug, thiserror::Error)] diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 9c300b4530..d3360158c3 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -20,9 +20,11 @@ use restate_serde_util::{ByteCount, NonZeroByteCount}; use restate_time_util::{FriendlyDuration, NonZeroFriendlyDuration}; use crate::logs::metadata::{NodeSetSize, ProviderKind}; +use crate::net::connect_opts::MESSAGE_SIZE_OVERHEAD; use crate::retries::RetryPolicy; -use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; +use super::networking::DEFAULT_MESSAGE_SIZE_LIMIT; +use super::{CommonOptions, NetworkingOptions, RocksDbOptions, RocksDbOptionsBuilder}; /// # Bifrost options #[serde_as] @@ -86,6 +88,20 @@ pub struct BifrostOptions { /// of replicas, or for other reasons. #[cfg_attr(feature = "schemars", schemars(with = "String"))] pub disable_auto_improvement: bool, + + /// # Record size limit + /// + /// Maximum size of a single record that can be appended to Bifrost. + /// If a record exceeds this limit, the append operation will fail immediately. + /// + /// If unset, defaults to `networking.message-size-limit`. If set, it will be clamped at + /// the value of `networking.message-size-limit` since larger records cannot be transmitted + /// over the cluster internal network. + #[serde(skip_serializing_if = "Option::is_none")] + // Hide the configuration until record size estimation is implemented, currently this option is + // not effective due to the fixed size estimation of typed records. + #[cfg_attr(feature = "schemars", schemars(skip))] + record_size_limit: Option, } impl BifrostOptions { @@ -99,9 +115,34 @@ impl BifrostOptions { ) } + /// Returns the record size limit, defaulting to `networking.message-size-limit` if not set. + pub fn record_size_limit(&self) -> NonZeroUsize { + let limit = self + .record_size_limit + .map(|v| v.as_non_zero_usize()) + .unwrap_or(DEFAULT_MESSAGE_SIZE_LIMIT); + + if cfg!(any(test, feature = "test-util")) { + // In tests, we don't want to leave overhead + limit + } else { + // Add a bit of overhead to account for the overhead of the record envelope + limit.saturating_add(MESSAGE_SIZE_OVERHEAD.div_ceil(2)) + } + } + pub fn apply_common(&mut self, common: &CommonOptions) { self.local.apply_common(common); } + + /// Clamps the record size limit to the networking message size limit. + pub(crate) fn set_derived_values(&mut self, networking: &NetworkingOptions) { + self.record_size_limit = Some( + self.record_size_limit + .map(|limit| limit.min(networking.message_size_limit)) + .unwrap_or(networking.message_size_limit), + ); + } } impl Default for BifrostOptions { @@ -122,6 +163,7 @@ impl Default for BifrostOptions { seal_retry_interval: NonZeroFriendlyDuration::from_secs_unchecked(2), record_cache_memory_size: ByteCount::from(250u64 * 1024 * 1024), // 250 MiB disable_auto_improvement: false, + record_size_limit: None, } } } diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index 0e89978383..cb236a8b11 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -195,6 +195,7 @@ impl Configuration { .set_derived_values(&config.networking) .unwrap(); config.worker.set_derived_values(&config.networking); + config.bifrost.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); config @@ -208,6 +209,7 @@ impl Configuration { .set_derived_values(&config.networking) .unwrap(); config.worker.set_derived_values(&config.networking); + config.bifrost.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); config @@ -260,6 +262,7 @@ impl Configuration { pub fn apply_cascading_values(mut self) -> Self { self.worker.storage.apply_common(&self.common); self.bifrost.apply_common(&self.common); + self.bifrost.set_derived_values(&self.networking); self.metadata_server.apply_common(&self.common); self.log_server.apply_common(&self.common); self diff --git a/crates/worker/src/partition/leadership/mod.rs b/crates/worker/src/partition/leadership/mod.rs index ed13f73162..dd6fab9c4a 100644 --- a/crates/worker/src/partition/leadership/mod.rs +++ b/crates/worker/src/partition/leadership/mod.rs @@ -93,8 +93,8 @@ pub(crate) enum Error { Decode(#[from] StorageDecodeError), #[error(transparent)] Shutdown(#[from] ShutdownError), - #[error("error when self proposing")] - SelfProposer, + #[error("error when self proposing: {0}")] + SelfProposer(String), #[error("task '{name}' failed: {cause}")] TaskFailed { name: &'static str, diff --git a/crates/worker/src/partition/leadership/self_proposer.rs b/crates/worker/src/partition/leadership/self_proposer.rs index cb1e445e3a..79717085e6 100644 --- a/crates/worker/src/partition/leadership/self_proposer.rs +++ b/crates/worker/src/partition/leadership/self_proposer.rs @@ -106,7 +106,7 @@ impl SelfProposer { .sender() .enqueue_many(envelopes) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; // update the sequence number range for the next batch self.epoch_sequence_number = EpochSequenceNumber { @@ -129,7 +129,7 @@ impl SelfProposer { .sender() .enqueue(Arc::new(envelope)) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; Ok(()) } @@ -146,7 +146,7 @@ impl SelfProposer { .sender() .enqueue_with_notification(Arc::new(envelope)) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; Ok(commit_token) } @@ -166,7 +166,7 @@ impl SelfProposer { // sender // .enqueue_many(records) // .await - // .map_err(|_| Error::SelfProposer)?; + // .map_err(|e| Error::SelfProposer(e.to_string()))?; // // so instead we do this. @@ -184,13 +184,13 @@ impl SelfProposer { sender .enqueue(input) .await - .map_err(|_| Error::SelfProposer)?; + .map_err(|e| Error::SelfProposer(e.to_string()))?; } sender .notify_committed() .await - .map_err(|_| Error::SelfProposer) + .map_err(|e| Error::SelfProposer(e.to_string())) } fn create_header(&mut self, partition_key: PartitionKey) -> Header {