Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ restate-bifrost = {path = ".", default-features = false, features = ["local-logl
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-rocksdb = { workspace = true }
restate-serde-util = { workspace = true }
restate-storage-api = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
28 changes: 28 additions & 0 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -57,6 +58,11 @@ impl Appender {
}
}

/// Returns the record size limit for this appender.
pub fn record_size_limit(&self) -> NonZeroUsize {
self.config.pinned().bifrost.record_size_limit()
}

/// Marks this node as a preferred writer for the underlying log
pub fn mark_as_preferred(&mut self) {
if self.preference_token.is_none() {
Expand All @@ -83,6 +89,16 @@ impl Appender {
body: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
let body = body.into().into_record();
// Validate record sizes before attempting to append
let batch_size_bytes = body.estimated_encode_size();
let limit = self.record_size_limit();
if batch_size_bytes > limit.get() {
return Err(Error::BatchTooLarge {
batch_size_bytes,
limit,
});
}

self.append_batch_erased(Arc::new([body])).await
}

Expand All @@ -103,12 +119,23 @@ impl Appender {
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
let batch: Arc<[_]> = batch.into_iter().map(|r| r.into().into_record()).collect();
let batch_size_bytes = batch.iter().map(|r| r.estimated_encode_size()).sum();
// Validate record sizes before attempting to append
let limit = self.record_size_limit();
if batch_size_bytes > limit.get() {
return Err(Error::BatchTooLarge {
batch_size_bytes,
limit,
});
}
self.append_batch_erased(batch).await
}

pub(crate) async fn append_batch_erased(&mut self, batch: Arc<[Record]>) -> Result<Lsn> {
self.bifrost_inner.fail_if_shutting_down()?;

let retry_iter = self.config.live_load().bifrost.append_retry_policy();

debug_assert!(retry_iter.max_attempts().is_none());
let mut retry_iter = retry_iter.into_iter();

Expand All @@ -123,6 +150,7 @@ impl Appender {
.bifrost
.auto_recovery_interval
.into();

let loglet = match self.loglet_cache.as_mut() {
None => self
.loglet_cache
Expand Down
34 changes: 28 additions & 6 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;

use futures::FutureExt;
use pin_project::pin_project;
use restate_types::logs::Record;
Expand Down Expand Up @@ -57,6 +59,7 @@ where
/// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate.
pub fn start(self, name: &'static str) -> Result<AppenderHandle<T>, ShutdownError> {
let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity);
let record_size_limit = self.appender.record_size_limit();

let handle = TaskCenter::spawn_unmanaged_child(
restate_core::TaskKind::BifrostAppender,
Expand All @@ -68,6 +71,7 @@ where
inner_handle: Some(handle),
sender: Some(LogSender {
tx,
record_size_limit,
_phantom: std::marker::PhantomData,
}),
})
Expand Down Expand Up @@ -230,10 +234,22 @@ impl<T> AppenderHandle<T> {
#[derive(Clone)]
pub struct LogSender<T> {
tx: tokio::sync::mpsc::Sender<AppendOperation>,
record_size_limit: NonZeroUsize,
_phantom: std::marker::PhantomData<T>,
}

impl<T: StorageEncode> LogSender<T> {
fn check_record_size<E>(&self, record: &Record) -> Result<(), EnqueueError<E>> {
let record_size = record.estimated_encode_size();
if record_size > self.record_size_limit.get() {
return Err(EnqueueError::RecordTooLarge {
record_size,
limit: self.record_size_limit,
});
}
Ok(())
}

/// Attempt to enqueue a record to the appender. Returns immediately if the
/// appender is pushing back or if the appender is draining or drained.
pub fn try_enqueue<A>(&self, record: A) -> Result<(), EnqueueError<A>>
Expand All @@ -247,6 +263,7 @@ impl<T: StorageEncode> LogSender<T> {
};

let record = record.into().into_record();
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));
Ok(())
}
Expand All @@ -267,6 +284,7 @@ impl<T: StorageEncode> LogSender<T> {

let (tx, rx) = oneshot::channel();
let record = record.into().into_record();
self.check_record_size(&record)?;
permit.send(AppendOperation::EnqueueWithNotification(record, tx));
Ok(CommitToken { rx })
}
Expand All @@ -281,6 +299,7 @@ impl<T: StorageEncode> LogSender<T> {
return Err(EnqueueError::Closed(record));
};
let record = record.into().into_record();
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));

Ok(())
Expand All @@ -303,7 +322,9 @@ impl<T: StorageEncode> LogSender<T> {
};

for (permit, record) in std::iter::zip(permits, records) {
permit.send(AppendOperation::Enqueue(record.into().into_record()));
let record = record.into().into_record();
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));
}
Ok(())
}
Expand All @@ -323,7 +344,9 @@ impl<T: StorageEncode> LogSender<T> {
};

for (permit, record) in std::iter::zip(permits, records) {
permit.send(AppendOperation::Enqueue(record.into().into_record()));
let record = record.into().into_record();
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));
}

Ok(())
Expand All @@ -343,10 +366,9 @@ impl<T: StorageEncode> LogSender<T> {
};

let (tx, rx) = oneshot::channel();
permit.send(AppendOperation::EnqueueWithNotification(
record.into().into_record(),
tx,
));
let record = record.into().into_record();
self.check_record_size(&record)?;
permit.send(AppendOperation::EnqueueWithNotification(record, tx));

Ok(CommitToken { rx })
}
Expand Down
138 changes: 138 additions & 0 deletions crates/bifrost/src/bifrost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ impl Drop for PreferenceToken {
mod tests {
use super::*;

use std::num::NonZeroUsize;
use std::sync::atomic::AtomicUsize;

use futures::StreamExt;
Expand All @@ -770,14 +771,21 @@ mod tests {
use restate_core::TestCoreEnvBuilder;
use restate_core::{TaskCenter, TaskKind, TestCoreEnv};
use restate_rocksdb::RocksDbManager;
use restate_types::config::set_current_config;
use restate_types::logs::SequenceNumber;
use restate_types::logs::metadata::{SegmentIndex, new_single_node_loglet_params};
use restate_types::metadata::Precondition;
use restate_types::partition_table::PartitionTable;
use restate_types::{Version, Versioned};

use crate::error::EnqueueError;
use crate::providers::memory_loglet::{self};

// Helper to create a small byte count for testing
fn small_byte_limit(bytes: usize) -> restate_serde_util::NonZeroByteCount {
restate_serde_util::NonZeroByteCount::new(NonZeroUsize::new(bytes).unwrap())
}

#[restate_core::test]
#[traced_test]
async fn test_append_smoke() -> googletest::Result<()> {
Expand Down Expand Up @@ -1287,4 +1295,134 @@ mod tests {
RocksDbManager::get().shutdown().await;
Ok(())
}

#[restate_core::test]
async fn test_append_record_too_large() -> googletest::Result<()> {
// Set up a configuration with a small record size limit.
// Note: The estimated_encode_size for a typed record (e.g., String) is ~2KB constant,
// plus overhead for Keys and NanosSinceEpoch. We set the limit to 1KB to ensure
// the check triggers.
let mut config = restate_types::config::Configuration::default();
config.networking.message_size_limit = small_byte_limit(1024); // 1KB limit
let config = config.apply_cascading_values();
set_current_config(config);

let env = TestCoreEnv::create_with_single_node(1, 1).await;
let bifrost = Bifrost::init_in_memory(env.metadata_writer).await;

// Get the configured record size limit
let record_size_limit = restate_types::config::Configuration::pinned()
.bifrost
.record_size_limit();
assert_eq!(record_size_limit.get(), 1024);

// Any record will have an estimated size of ~2KB+ due to the constant estimate
// for PolyBytes::Typed, which exceeds our 1KB limit
let payload = "test";

let appender = bifrost.create_appender(LogId::new(0), ErrorRecoveryStrategy::Wait)?;

// Verify the appender has the correct limit
assert_eq!(appender.record_size_limit().get(), 1024);

// Attempting to append should fail with RecordTooLarge
let mut appender = appender;
let result = appender.append(payload).await;

assert_that!(
result,
pat!(Err(pat!(Error::BatchTooLarge {
batch_size_bytes: gt(1024),
limit: eq(record_size_limit),
})))
);

Ok(())
}

#[restate_core::test]
async fn test_background_appender_record_too_large() -> googletest::Result<()> {
// Set up configuration with a small record size limit (100 bytes).
// Note: The estimated_encode_size for any typed record is ~2KB constant,
// so even "small" strings will exceed the 100 byte limit.
let mut config = restate_types::config::Configuration::default();
config.networking.message_size_limit = small_byte_limit(100);
// Apply cascading values to propagate the networking limit to bifrost
let config = config.apply_cascading_values();
set_current_config(config);

let env = TestCoreEnv::create_with_single_node(1, 1).await;
let bifrost = Bifrost::init_in_memory(env.metadata_writer).await;

let background_appender: crate::BackgroundAppender<String> = bifrost
.create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?;

let handle = background_appender.start("test-appender")?;
let sender = handle.sender();

// Any record will have an estimated size of ~2KB due to PolyBytes::Typed constant estimate
let payload = "test".to_string();

// try_enqueue should fail with RecordTooLarge
let result = sender.try_enqueue(payload.clone());
assert_that!(
result,
pat!(Err(pat!(EnqueueError::RecordTooLarge {
record_size: gt(100),
limit: eq(NonZeroUsize::new(100).unwrap()),
})))
);

// enqueue (async) should also fail with RecordTooLarge
let result = sender.enqueue(payload.clone()).await;
assert_that!(
result,
pat!(Err(pat!(EnqueueError::RecordTooLarge {
record_size: gt(100),
limit: eq(NonZeroUsize::new(100).unwrap()),
})))
);

// try_enqueue_with_notification should also fail
let result = sender.try_enqueue_with_notification(payload.clone());
assert!(matches!(
result,
Err(EnqueueError::RecordTooLarge {
record_size,
limit,
}) if record_size > 100 && limit.get() == 100
));

// Drain the appender (nothing should have been enqueued)
handle.drain().await?;

Ok(())
}

#[restate_core::test]
async fn test_background_appender_record_within_limit() -> googletest::Result<()> {
// Set up configuration with a large enough record size limit (10KB) to allow records
let mut config = restate_types::config::Configuration::default();
config.networking.message_size_limit = small_byte_limit(10 * 1024); // 10KB
let config = config.apply_cascading_values();
set_current_config(config);

let env = TestCoreEnv::create_with_single_node(1, 1).await;
let bifrost = Bifrost::init_in_memory(env.metadata_writer).await;

let background_appender: crate::BackgroundAppender<String> = bifrost
.create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?;

let handle = background_appender.start("test-appender")?;
let sender = handle.sender();

// With a 10KB limit, the ~2KB estimated record should succeed
let payload = "test".to_string();
sender.enqueue(payload).await?;

// Drain and wait for commit
handle.drain().await?;

Ok(())
}
}
11 changes: 11 additions & 0 deletions crates/bifrost/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;
use std::sync::Arc;

use restate_core::{ShutdownError, SyncError};
Expand Down Expand Up @@ -43,6 +44,11 @@ pub enum Error {
AdminError(#[from] AdminError),
#[error(transparent)]
MetadataStoreError(#[from] Arc<ReadWriteError>),
#[error("record batch too large: {batch_size_bytes} bytes exceeds limit of {limit} bytes")]
BatchTooLarge {
batch_size_bytes: usize,
limit: NonZeroUsize,
},
#[error("{0}")]
Other(String),
}
Expand All @@ -53,6 +59,11 @@ pub enum EnqueueError<T> {
Full(T),
#[error("appender is draining, closed, or crashed")]
Closed(T),
#[error("record too large: {record_size} bytes exceeds limit of {limit} bytes")]
RecordTooLarge {
record_size: usize,
limit: NonZeroUsize,
},
}

#[derive(Clone, Debug, thiserror::Error)]
Expand Down
Loading
Loading