Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ restate-bifrost = {path = ".", default-features = false, features = ["local-logl
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-rocksdb = { workspace = true }
restate-serde-util = { workspace = true }
restate-storage-api = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
28 changes: 28 additions & 0 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -57,6 +58,11 @@ impl Appender {
}
}

/// Returns the record size limit for this appender.
pub fn record_size_limit(&self) -> NonZeroUsize {
self.config.pinned().bifrost.record_size_limit()
}

/// Marks this node as a preferred writer for the underlying log
pub fn mark_as_preferred(&mut self) {
if self.preference_token.is_none() {
Expand All @@ -83,6 +89,16 @@ impl Appender {
body: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
let body = body.into().into_record();
// Validate record sizes before attempting to append
let batch_size_bytes = body.estimated_encode_size();
let limit = self.record_size_limit();
if batch_size_bytes > limit.get() {
return Err(Error::BatchTooLarge {
batch_size_bytes,
limit,
});
}

self.append_batch_erased(Arc::new([body])).await
}

Expand All @@ -103,12 +119,23 @@ impl Appender {
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
let batch: Arc<[_]> = batch.into_iter().map(|r| r.into().into_record()).collect();
let batch_size_bytes = batch.iter().map(|r| r.estimated_encode_size()).sum();
// Validate record sizes before attempting to append
let limit = self.record_size_limit();
if batch_size_bytes > limit.get() {
return Err(Error::BatchTooLarge {
batch_size_bytes,
limit,
});
}
self.append_batch_erased(batch).await
}

pub(crate) async fn append_batch_erased(&mut self, batch: Arc<[Record]>) -> Result<Lsn> {
self.bifrost_inner.fail_if_shutting_down()?;

let retry_iter = self.config.live_load().bifrost.append_retry_policy();

debug_assert!(retry_iter.max_attempts().is_none());
let mut retry_iter = retry_iter.into_iter();

Expand All @@ -123,6 +150,7 @@ impl Appender {
.bifrost
.auto_recovery_interval
.into();

let loglet = match self.loglet_cache.as_mut() {
None => self
.loglet_cache
Expand Down
Loading
Loading