Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ restate-utoipa = { path = "crates/utoipa" }
restate-vqueues = { path = "crates/vqueues" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-ingestion-client = { path = "crates/ingestion-client" }

# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead
Expand Down
24 changes: 9 additions & 15 deletions crates/bifrost/src/background_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use futures::FutureExt;
use pin_project::pin_project;
use restate_types::logs::Record;
use tokio::sync::{Notify, mpsc, oneshot};
use tokio::sync::{mpsc, oneshot};
use tracing::{trace, warn};

use restate_core::{ShutdownError, TaskCenter, TaskHandle, cancellation_watcher};
Expand Down Expand Up @@ -130,8 +128,8 @@ where
batch.push(record);
notif_buffer.push(tx);
}
AppendOperation::Canary(notify) => {
notify.notify_one();
AppendOperation::Canary(tx) => {
notif_buffer.push(tx);
}
AppendOperation::MarkAsPreferred => {
appender.mark_as_preferred();
Expand Down Expand Up @@ -353,23 +351,19 @@ impl<T: StorageEncode> LogSender<T> {
Ok(CommitToken { rx })
}

/// Wait for previously enqueued records to be committed
///
/// Not cancellation safe. Every call will attempt to acquire capacity on the channel and send
/// a new message to the appender.
pub async fn notify_committed(&self) -> Result<(), EnqueueError<()>> {
/// Returns a [`CommitToken`] that is resolved once all previously enqueued records are committed.
pub async fn notify_committed(&self) -> Result<CommitToken, EnqueueError<()>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe update the docs to reflect the new contract.

let Ok(permit) = self.tx.reserve().await else {
// channel is closed, this should happen the appender is draining or has been darained
// already
return Err(EnqueueError::Closed(()));
};

let notify = Arc::new(Notify::new());
let canary = AppendOperation::Canary(notify.clone());
let (tx, rx) = oneshot::channel();
let canary = AppendOperation::Canary(tx);
permit.send(canary);

notify.notified().await;
Ok(())
Ok(CommitToken { rx })
}

/// Marks this node as a preferred writer for the underlying log
Expand Down Expand Up @@ -422,7 +416,7 @@ enum AppendOperation {
EnqueueWithNotification(Record, oneshot::Sender<()>),
// A message denoting a request to be notified when it's processed by the appender.
// It's used to check if previously enqueued appends have been committed or not
Canary(Arc<Notify>),
Canary(oneshot::Sender<()>),
/// Let's bifrost know that this node is the preferred writer of this log
MarkAsPreferred,
/// Let's bifrost know that this node might not be the preferred writer of this log
Expand Down
30 changes: 30 additions & 0 deletions crates/ingestion-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "restate-ingestion-client"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish = false

[dependencies]
arc-swap = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
pin-project = { workspace = true }
thiserror = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

restate-core = { workspace = true }
restate-types = { workspace = true }
restate-workspace-hack = { workspace = true }

[dev-dependencies]
bytes = { workspace = true }
googletest = { workspace = true }
test-log = { workspace = true }

restate-core = { workspace = true, features = ["test-util"] }
144 changes: 144 additions & 0 deletions crates/ingestion-client/src/chunks_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use tokio_stream::adapters::Fuse;
use tokio_stream::{Stream, StreamExt};

use core::pin::Pin;
use core::task::{Context, Poll};

#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
#[pin_project::pin_project]
pub struct ChunksSize<F, S: Stream> {
#[pin]
stream: Fuse<S>,
items: Vec<S::Item>,
size: usize,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
size_fn: F,
}

impl<F, S: Stream> ChunksSize<F, S>
where
F: Fn(&S::Item) -> usize,
{
pub fn new(stream: S, max_size: usize, size_fn: F) -> Self {
ChunksSize {
stream: stream.fuse(),
items: Vec::default(),
size: 0,
cap: max_size,
size_fn,
}
}

/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
pub fn into_remainder(self) -> Vec<S::Item> {
self.items
}
}

impl<F, S: Stream> Stream for ChunksSize<F, S>
where
F: Fn(&S::Item) -> usize,
{
type Item = Vec<S::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.as_mut().project();
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending if me.items.is_empty() => return Poll::Pending,
Poll::Pending => {
*me.size = 0;
return Poll::Ready(Some(std::mem::take(me.items)));
}
Poll::Ready(Some(item)) => {
let item_size = (me.size_fn)(&item);

if *me.size + item_size <= *me.cap {
*me.size += item_size;
me.items.push(item);
} else if me.items.is_empty() {
*me.size = 0;
return Poll::Ready(Some(vec![item]));
} else {
let items = std::mem::replace(me.items, vec![item]);
*me.size = item_size;
return Poll::Ready(Some(items));
}
}
Poll::Ready(None) => {
// Returning Some here is only correct because we fuse the inner stream.
let last = if me.items.is_empty() {
None
} else {
Some(std::mem::take(me.items))
};

return Poll::Ready(last);
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = upper.and_then(|x| x.checked_add(chunk_len));
(lower, upper)
}
}

#[cfg(test)]
mod tests {
use std::task::Poll;

use futures::{StreamExt, stream::poll_fn};
use tokio_stream::iter;

use super::ChunksSize;

#[tokio::test]
async fn splits_into_size_bound_chunks() {
let stream = iter([2usize, 2, 3, 1]);
let chunks: Vec<Vec<usize>> = ChunksSize::new(stream, 5, |item| *item).collect().await;

assert_eq!(chunks, vec![vec![2, 2], vec![3, 1]]);
}

#[tokio::test]
async fn emits_item_larger_than_cap_as_its_own_chunk() {
let stream = iter([10usize, 2]);
let chunks: Vec<Vec<usize>> = ChunksSize::new(stream, 5, |item| *item).collect().await;

assert_eq!(chunks, vec![vec![10], vec![2]]);
}

#[tokio::test]
async fn flushes_buffer_when_inner_stream_is_pending() {
let mut state = 0;
let stream = poll_fn(move |_| {
state += 1;
match state {
1 => Poll::Ready(Some(1usize)),
2 => Poll::Pending,
3 => Poll::Ready(Some(2usize)),
_ => Poll::Ready(None),
}
});

let chunks: Vec<Vec<usize>> = ChunksSize::new(stream, 10, |item| *item).collect().await;

assert_eq!(chunks, vec![vec![1], vec![2]]);
}
}
Loading
Loading