Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ restate-bifrost = {path = ".", default-features = false, features = ["local-logl
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-rocksdb = { workspace = true }
restate-serde-util = { workspace = true }
restate-storage-api = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
Expand Down
28 changes: 28 additions & 0 deletions crates/bifrost/src/appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -57,6 +58,11 @@ impl Appender {
}
}

/// Returns the record size limit for this appender.
pub fn record_size_limit(&self) -> NonZeroUsize {
self.config.pinned().bifrost.record_size_limit()
}

/// Marks this node as a preferred writer for the underlying log
pub fn mark_as_preferred(&mut self) {
if self.preference_token.is_none() {
Expand All @@ -83,6 +89,16 @@ impl Appender {
body: impl Into<InputRecord<T>>,
) -> Result<Lsn> {
let body = body.into().into_record();
// Validate record sizes before attempting to append
let batch_size_bytes = body.estimated_encode_size();
let limit = self.record_size_limit();
if batch_size_bytes > limit.get() {
return Err(Error::BatchTooLarge {
batch_size_bytes,
limit,
});
}

self.append_batch_erased(Arc::new([body])).await
}

Expand All @@ -103,12 +119,23 @@ impl Appender {
batch: Vec<impl Into<InputRecord<T>>>,
) -> Result<Lsn> {
let batch: Arc<[_]> = batch.into_iter().map(|r| r.into().into_record()).collect();
let batch_size_bytes = batch.iter().map(|r| r.estimated_encode_size()).sum();
// Validate record sizes before attempting to append
let limit = self.record_size_limit();
if batch_size_bytes > limit.get() {
return Err(Error::BatchTooLarge {
batch_size_bytes,
limit,
});
}
self.append_batch_erased(batch).await
}

pub(crate) async fn append_batch_erased(&mut self, batch: Arc<[Record]>) -> Result<Lsn> {
self.bifrost_inner.fail_if_shutting_down()?;

let retry_iter = self.config.live_load().bifrost.append_retry_policy();

debug_assert!(retry_iter.max_attempts().is_none());
let mut retry_iter = retry_iter.into_iter();

Expand All @@ -123,6 +150,7 @@ impl Appender {
.bifrost
.auto_recovery_interval
.into();

let loglet = match self.loglet_cache.as_mut() {
None => self
.loglet_cache
Expand Down
71 changes: 53 additions & 18 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;

use bytes::BytesMut;
use futures::FutureExt;
use pin_project::pin_project;
use restate_types::logs::Record;
Expand Down Expand Up @@ -57,6 +60,7 @@ where
/// behaviour to the owner of [`AppenderHandle`] to drain or drop when appropriate.
pub fn start(self, name: &'static str) -> Result<AppenderHandle<T>, ShutdownError> {
let (tx, rx) = tokio::sync::mpsc::channel(self.queue_capacity);
let record_size_limit = self.appender.record_size_limit();

let handle = TaskCenter::spawn_unmanaged_child(
restate_core::TaskKind::BifrostAppender,
Expand All @@ -67,7 +71,9 @@ where
Ok(AppenderHandle {
inner_handle: Some(handle),
sender: Some(LogSender {
arena: BytesMut::default(),
tx,
record_size_limit,
_phantom: std::marker::PhantomData,
}),
})
Expand Down Expand Up @@ -209,8 +215,8 @@ impl<T> AppenderHandle<T> {
}

/// If you need an owned LogSender, clone this.
pub fn sender(&self) -> &LogSender<T> {
self.sender.as_ref().unwrap()
pub fn sender(&mut self) -> &mut LogSender<T> {
self.sender.as_mut().unwrap()
}

/// Waits for the underlying appender task to finish.
Expand All @@ -227,16 +233,39 @@ impl<T> AppenderHandle<T> {
}
}

#[derive(Clone)]
pub struct LogSender<T> {
arena: BytesMut,
tx: tokio::sync::mpsc::Sender<AppendOperation>,
record_size_limit: NonZeroUsize,
_phantom: std::marker::PhantomData<T>,
}

impl<T> Clone for LogSender<T> {
fn clone(&self) -> Self {
Self {
arena: BytesMut::default(),
tx: self.tx.clone(),
record_size_limit: self.record_size_limit,
_phantom: std::marker::PhantomData,
}
}
}

impl<T: StorageEncode> LogSender<T> {
fn check_record_size<E>(&self, record: &Record) -> Result<(), EnqueueError<E>> {
let record_size = record.estimated_encode_size();
if record_size > self.record_size_limit.get() {
return Err(EnqueueError::RecordTooLarge {
record_size,
limit: self.record_size_limit,
});
}
Ok(())
}

/// Attempt to enqueue a record to the appender. Returns immediately if the
/// appender is pushing back or if the appender is draining or drained.
pub fn try_enqueue<A>(&self, record: A) -> Result<(), EnqueueError<A>>
pub fn try_enqueue<A>(&mut self, record: A) -> Result<(), EnqueueError<A>>
where
A: Into<InputRecord<T>>,
{
Expand All @@ -246,14 +275,15 @@ impl<T: StorageEncode> LogSender<T> {
Err(mpsc::error::TrySendError::Closed(_)) => return Err(EnqueueError::Closed(record)),
};

let record = record.into().into_record();
let record = record.into().into_record().ensure_encoded(&mut self.arena);
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));
Ok(())
}

/// Enqueues an append and returns a commit token
pub fn try_enqueue_with_notification<A>(
&self,
&mut self,
record: A,
) -> Result<CommitToken, EnqueueError<A>>
where
Expand All @@ -266,21 +296,23 @@ impl<T: StorageEncode> LogSender<T> {
};

let (tx, rx) = oneshot::channel();
let record = record.into().into_record();
let record = record.into().into_record().ensure_encoded(&mut self.arena);
self.check_record_size(&record)?;
permit.send(AppendOperation::EnqueueWithNotification(record, tx));
Ok(CommitToken { rx })
}

/// Waits for capacity on the channel and returns an error if the appender is
/// draining or drained.
pub async fn enqueue<A>(&self, record: A) -> Result<(), EnqueueError<A>>
pub async fn enqueue<A>(&mut self, record: A) -> Result<(), EnqueueError<A>>
where
A: Into<InputRecord<T>>,
{
let Ok(permit) = self.tx.reserve().await else {
return Err(EnqueueError::Closed(record));
};
let record = record.into().into_record();
let record = record.into().into_record().ensure_encoded(&mut self.arena);
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));

Ok(())
Expand All @@ -291,7 +323,7 @@ impl<T: StorageEncode> LogSender<T> {
///
/// Attempts to enqueue all records in the iterator. This will immediately return if there is
/// no capacity in the channel to enqueue _all_ records.
pub fn try_enqueue_many<I, A>(&self, records: I) -> Result<(), EnqueueError<I>>
pub fn try_enqueue_many<I, A>(&mut self, records: I) -> Result<(), EnqueueError<I>>
where
I: Iterator<Item = A> + ExactSizeIterator,
A: Into<InputRecord<T>>,
Expand All @@ -303,7 +335,9 @@ impl<T: StorageEncode> LogSender<T> {
};

for (permit, record) in std::iter::zip(permits, records) {
permit.send(AppendOperation::Enqueue(record.into().into_record()));
let record = record.into().into_record().ensure_encoded(&mut self.arena);
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));
}
Ok(())
}
Expand All @@ -313,7 +347,7 @@ impl<T: StorageEncode> LogSender<T> {
///
/// The method is cancel safe in the sense that if enqueue_many is used in a `tokio::select!`,
/// no records are enqueued if another branch completed.
pub async fn enqueue_many<I, A>(&self, records: I) -> Result<(), EnqueueError<I>>
pub async fn enqueue_many<I, A>(&mut self, records: I) -> Result<(), EnqueueError<I>>
where
I: Iterator<Item = A> + ExactSizeIterator,
A: Into<InputRecord<T>>,
Expand All @@ -323,7 +357,9 @@ impl<T: StorageEncode> LogSender<T> {
};

for (permit, record) in std::iter::zip(permits, records) {
permit.send(AppendOperation::Enqueue(record.into().into_record()));
let record = record.into().into_record().ensure_encoded(&mut self.arena);
self.check_record_size(&record)?;
permit.send(AppendOperation::Enqueue(record));
}

Ok(())
Expand All @@ -332,7 +368,7 @@ impl<T: StorageEncode> LogSender<T> {
/// Enqueues a record and returns a [`CommitToken`] future that's resolved when the record is
/// committed.
pub async fn enqueue_with_notification<A>(
&self,
&mut self,
record: A,
) -> Result<CommitToken, EnqueueError<A>>
where
Expand All @@ -343,10 +379,9 @@ impl<T: StorageEncode> LogSender<T> {
};

let (tx, rx) = oneshot::channel();
permit.send(AppendOperation::EnqueueWithNotification(
record.into().into_record(),
tx,
));
let record = record.into().into_record().ensure_encoded(&mut self.arena);
self.check_record_size(&record)?;
permit.send(AppendOperation::EnqueueWithNotification(record, tx));

Ok(CommitToken { rx })
}
Expand Down
Loading
Loading