Skip to content

Commit 6203301

Browse files
committed
[Bifrost] BackgroundAppender serializes payload to ensure correct size estimation
Part of #4130, #4132
1 parent c8ad011 commit 6203301

File tree

8 files changed

+79
-93
lines changed

8 files changed

+79
-93
lines changed

crates/bifrost/src/background_appender.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
use std::num::NonZeroUsize;
1212

13+
use bytes::BytesMut;
1314
use futures::FutureExt;
1415
use pin_project::pin_project;
1516
use restate_types::logs::Record;
@@ -70,6 +71,7 @@ where
7071
Ok(AppenderHandle {
7172
inner_handle: Some(handle),
7273
sender: Some(LogSender {
74+
arena: BytesMut::default(),
7375
tx,
7476
record_size_limit,
7577
_phantom: std::marker::PhantomData,
@@ -213,8 +215,8 @@ impl<T> AppenderHandle<T> {
213215
}
214216

215217
/// If you need an owned LogSender, clone this.
216-
pub fn sender(&self) -> &LogSender<T> {
217-
self.sender.as_ref().unwrap()
218+
pub fn sender(&mut self) -> &mut LogSender<T> {
219+
self.sender.as_mut().unwrap()
218220
}
219221

220222
/// Waits for the underlying appender task to finish.
@@ -231,13 +233,24 @@ impl<T> AppenderHandle<T> {
231233
}
232234
}
233235

234-
#[derive(Clone)]
235236
pub struct LogSender<T> {
237+
arena: BytesMut,
236238
tx: tokio::sync::mpsc::Sender<AppendOperation>,
237239
record_size_limit: NonZeroUsize,
238240
_phantom: std::marker::PhantomData<T>,
239241
}
240242

243+
impl<T> Clone for LogSender<T> {
244+
fn clone(&self) -> Self {
245+
Self {
246+
arena: BytesMut::default(),
247+
tx: self.tx.clone(),
248+
record_size_limit: self.record_size_limit,
249+
_phantom: std::marker::PhantomData,
250+
}
251+
}
252+
}
253+
241254
impl<T: StorageEncode> LogSender<T> {
242255
fn check_record_size<E>(&self, record: &Record) -> Result<(), EnqueueError<E>> {
243256
let record_size = record.estimated_encode_size();
@@ -252,7 +265,7 @@ impl<T: StorageEncode> LogSender<T> {
252265

253266
/// Attempt to enqueue a record to the appender. Returns immediately if the
254267
/// appender is pushing back or if the appender is draining or drained.
255-
pub fn try_enqueue<A>(&self, record: A) -> Result<(), EnqueueError<A>>
268+
pub fn try_enqueue<A>(&mut self, record: A) -> Result<(), EnqueueError<A>>
256269
where
257270
A: Into<InputRecord<T>>,
258271
{
@@ -262,15 +275,15 @@ impl<T: StorageEncode> LogSender<T> {
262275
Err(mpsc::error::TrySendError::Closed(_)) => return Err(EnqueueError::Closed(record)),
263276
};
264277

265-
let record = record.into().into_record();
278+
let record = record.into().into_record().ensure_encoded(&mut self.arena);
266279
self.check_record_size(&record)?;
267280
permit.send(AppendOperation::Enqueue(record));
268281
Ok(())
269282
}
270283

271284
/// Enqueues an append and returns a commit token
272285
pub fn try_enqueue_with_notification<A>(
273-
&self,
286+
&mut self,
274287
record: A,
275288
) -> Result<CommitToken, EnqueueError<A>>
276289
where
@@ -283,22 +296,22 @@ impl<T: StorageEncode> LogSender<T> {
283296
};
284297

285298
let (tx, rx) = oneshot::channel();
286-
let record = record.into().into_record();
299+
let record = record.into().into_record().ensure_encoded(&mut self.arena);
287300
self.check_record_size(&record)?;
288301
permit.send(AppendOperation::EnqueueWithNotification(record, tx));
289302
Ok(CommitToken { rx })
290303
}
291304

292305
/// Waits for capacity on the channel and returns an error if the appender is
293306
/// draining or drained.
294-
pub async fn enqueue<A>(&self, record: A) -> Result<(), EnqueueError<A>>
307+
pub async fn enqueue<A>(&mut self, record: A) -> Result<(), EnqueueError<A>>
295308
where
296309
A: Into<InputRecord<T>>,
297310
{
298311
let Ok(permit) = self.tx.reserve().await else {
299312
return Err(EnqueueError::Closed(record));
300313
};
301-
let record = record.into().into_record();
314+
let record = record.into().into_record().ensure_encoded(&mut self.arena);
302315
self.check_record_size(&record)?;
303316
permit.send(AppendOperation::Enqueue(record));
304317

@@ -310,7 +323,7 @@ impl<T: StorageEncode> LogSender<T> {
310323
///
311324
/// Attempts to enqueue all records in the iterator. This will immediately return if there is
312325
/// no capacity in the channel to enqueue _all_ records.
313-
pub fn try_enqueue_many<I, A>(&self, records: I) -> Result<(), EnqueueError<I>>
326+
pub fn try_enqueue_many<I, A>(&mut self, records: I) -> Result<(), EnqueueError<I>>
314327
where
315328
I: Iterator<Item = A> + ExactSizeIterator,
316329
A: Into<InputRecord<T>>,
@@ -322,7 +335,7 @@ impl<T: StorageEncode> LogSender<T> {
322335
};
323336

324337
for (permit, record) in std::iter::zip(permits, records) {
325-
let record = record.into().into_record();
338+
let record = record.into().into_record().ensure_encoded(&mut self.arena);
326339
self.check_record_size(&record)?;
327340
permit.send(AppendOperation::Enqueue(record));
328341
}
@@ -334,7 +347,7 @@ impl<T: StorageEncode> LogSender<T> {
334347
///
335348
/// The method is cancel safe in the sense that if enqueue_many is used in a `tokio::select!`,
336349
/// no records are enqueued if another branch completed.
337-
pub async fn enqueue_many<I, A>(&self, records: I) -> Result<(), EnqueueError<I>>
350+
pub async fn enqueue_many<I, A>(&mut self, records: I) -> Result<(), EnqueueError<I>>
338351
where
339352
I: Iterator<Item = A> + ExactSizeIterator,
340353
A: Into<InputRecord<T>>,
@@ -344,7 +357,7 @@ impl<T: StorageEncode> LogSender<T> {
344357
};
345358

346359
for (permit, record) in std::iter::zip(permits, records) {
347-
let record = record.into().into_record();
360+
let record = record.into().into_record().ensure_encoded(&mut self.arena);
348361
self.check_record_size(&record)?;
349362
permit.send(AppendOperation::Enqueue(record));
350363
}
@@ -355,7 +368,7 @@ impl<T: StorageEncode> LogSender<T> {
355368
/// Enqueues a record and returns a [`CommitToken`] future that's resolved when the record is
356369
/// committed.
357370
pub async fn enqueue_with_notification<A>(
358-
&self,
371+
&mut self,
359372
record: A,
360373
) -> Result<CommitToken, EnqueueError<A>>
361374
where
@@ -366,7 +379,7 @@ impl<T: StorageEncode> LogSender<T> {
366379
};
367380

368381
let (tx, rx) = oneshot::channel();
369-
let record = record.into().into_record();
382+
let record = record.into().into_record().ensure_encoded(&mut self.arena);
370383
self.check_record_size(&record)?;
371384
permit.send(AppendOperation::EnqueueWithNotification(record, tx));
372385

crates/bifrost/src/bifrost.rs

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -141,28 +141,6 @@ impl Bifrost {
141141
.await
142142
}
143143

144-
/// Appends a batch of records to a log. The log id must exist, otherwise the
145-
/// operation fails with [`Error::UnknownLogId`]. The returned Lsn is the Lsn of the last
146-
/// record in this batch. This will only return after all records have been stored.
147-
#[instrument(
148-
level="trace",
149-
skip(self, batch),
150-
fields(
151-
otel.name = "Bifrost: append_batch",
152-
)
153-
)]
154-
pub async fn append_batch<T: StorageEncode>(
155-
&self,
156-
log_id: LogId,
157-
error_recovery_strategy: ErrorRecoveryStrategy,
158-
batch: Vec<impl Into<InputRecord<T>>>,
159-
) -> Result<Lsn> {
160-
self.inner.fail_if_shutting_down()?;
161-
self.inner
162-
.append_batch(log_id, error_recovery_strategy, batch)
163-
.await
164-
}
165-
166144
/// Read the next record from the LSN provided. The `from` indicates the LSN where we will
167145
/// start reading from. This means that the record returned will have a LSN that is equal
168146
/// or greater than `from`. If no records are committed yet at this LSN, this read operation
@@ -343,17 +321,6 @@ impl BifrostInner {
343321
.await
344322
}
345323

346-
pub async fn append_batch<T: StorageEncode>(
347-
self: &Arc<Self>,
348-
log_id: LogId,
349-
error_recovery_strategy: ErrorRecoveryStrategy,
350-
batch: Vec<impl Into<InputRecord<T>>>,
351-
) -> Result<Lsn> {
352-
Appender::new(log_id, error_recovery_strategy, Arc::clone(self))
353-
.append_batch(batch)
354-
.await
355-
}
356-
357324
#[cfg(any(test, feature = "test-util"))]
358325
pub async fn read(
359326
self: &Arc<Self>,
@@ -1357,11 +1324,11 @@ mod tests {
13571324
let background_appender: crate::BackgroundAppender<String> = bifrost
13581325
.create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?;
13591326

1360-
let handle = background_appender.start("test-appender")?;
1327+
let mut handle = background_appender.start("test-appender")?;
13611328
let sender = handle.sender();
13621329

1363-
// Any record will have an estimated size of ~2KB due to PolyBytes::Typed constant estimate
1364-
let payload = "test".to_string();
1330+
// A string with 100 bytes
1331+
let payload = String::from_utf8(vec![b't'; 100]).unwrap();
13651332

13661333
// try_enqueue should fail with RecordTooLarge
13671334
let result = sender.try_enqueue(payload.clone());
@@ -1413,7 +1380,7 @@ mod tests {
14131380
let background_appender: crate::BackgroundAppender<String> = bifrost
14141381
.create_background_appender(LogId::new(0), ErrorRecoveryStrategy::Wait, 10, 10)?;
14151382

1416-
let handle = background_appender.start("test-appender")?;
1383+
let mut handle = background_appender.start("test-appender")?;
14171384
let sender = handle.sender();
14181385

14191386
// With a 10KB limit, the ~2KB estimated record should succeed

crates/bifrost/src/providers/local_loglet/record_format.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ fn write_legacy_payload(record: &Record, serde_buffer: &mut BytesMut) -> BytesMu
124124
// encoding the user payload.
125125
let body = match record.body() {
126126
PolyBytes::Bytes(raw_bytes) => raw_bytes.clone(),
127+
PolyBytes::Both(_, raw_bytes) => raw_bytes.clone(),
127128
PolyBytes::Typed(encodeable) => {
128129
StorageCodec::encode_and_split(encodeable.deref(), serde_buffer)
129130
.expect("record serde is infallible")
@@ -183,6 +184,7 @@ fn write_record(record: &Record, buf: &mut BytesMut) -> BytesMut {
183184
// serialize payload
184185
match record.body() {
185186
PolyBytes::Bytes(raw_bytes) => buf.put_slice(raw_bytes),
187+
PolyBytes::Both(_, raw_bytes) => buf.put_slice(raw_bytes),
186188
PolyBytes::Typed(encodeable) => {
187189
StorageCodec::encode(encodeable.deref(), buf).expect("record serde is infallible")
188190
}

crates/bifrost/src/providers/replicated_loglet/sequencer.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ use std::sync::{
1515
atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
1616
};
1717

18-
use bytes::BytesMut;
1918
use crossbeam_utils::CachePadded;
2019
use tokio::sync::Semaphore;
2120
use tokio_util::{sync::CancellationToken, task::TaskTracker};
@@ -284,31 +283,13 @@ impl<T: TransportConnect> Sequencer<T> {
284283
.fetch_add(len, std::sync::atomic::Ordering::Relaxed),
285284
);
286285

287-
// Add to cache before we commit those records, we also do this to try and cache the
288-
// records before we transform/serialize their payloads.
289-
//
290286
// The records being in cache does not mean they are committed, all readers must respect
291287
// the result of find_tail() or the global_known_tail.
292288
self.record_cache
293289
.extend(*self.sequencer_shared_state.loglet_id(), offset, &payloads);
294290

295291
let (loglet_commit, commit_resolver) = LogletCommit::deferred();
296292

297-
// Why not in bifrost's appender? because we don't need serialization in all loglet providers.
298-
// In-memory loglet won't require this setup and it would be a waste of cpu-cycles to serialize.
299-
// estimate total to allocate one big allocation for all payloads. This is not particularly
300-
// useful with flexbuffers, but will be useful when Store/Record are representative as
301-
// protobuf or flatbuffers.
302-
let estimated_bufsize: usize = payloads
303-
.iter()
304-
.map(|record| record.estimated_encode_size())
305-
.sum();
306-
let mut buf = BytesMut::with_capacity(estimated_bufsize);
307-
let payloads = payloads
308-
.iter()
309-
.map(|record| record.to_encoded(&mut buf))
310-
.collect();
311-
312293
let appender = SequencerAppender::new(
313294
Arc::clone(&self.sequencer_shared_state),
314295
self.networking.clone(),

crates/types/src/logs/record.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,15 @@ impl Record {
4646
size_of::<Keys>() + size_of::<NanosSinceEpoch>() + self.body.estimated_encode_size()
4747
}
4848

49-
pub fn to_encoded(&self, buf: &mut BytesMut) -> Self {
50-
let keys = self.keys.clone();
49+
pub fn ensure_encoded(self, buf: &mut BytesMut) -> Self {
50+
let keys = self.keys;
5151
let created_at = self.created_at;
52-
let body = match &self.body {
53-
PolyBytes::Bytes(bytes) => PolyBytes::Bytes(bytes.clone()),
52+
let body = match self.body {
53+
PolyBytes::Bytes(bytes) => PolyBytes::Bytes(bytes),
54+
PolyBytes::Both(v, bytes) => PolyBytes::Both(v, bytes),
5455
PolyBytes::Typed(typed) => {
55-
StorageCodec::encode(&**typed, buf).expect("serde is infallible");
56-
PolyBytes::Bytes(buf.split().freeze())
56+
StorageCodec::encode(&*typed, buf).expect("serde is infallible");
57+
PolyBytes::Both(typed, buf.split().freeze())
5758
}
5859
};
5960
Self {
@@ -83,7 +84,7 @@ impl Record {
8384
let mut buf = std::io::Cursor::new(slice);
8485
StorageCodec::decode(&mut buf)?
8586
}
86-
PolyBytes::Typed(value) => {
87+
PolyBytes::Both(value, _) | PolyBytes::Typed(value) => {
8788
let target_arc: Arc<T> = value.downcast_arc().map_err(|_| {
8889
StorageDecodeError::DecodeValue(
8990
anyhow::anyhow!(
@@ -113,7 +114,7 @@ impl Record {
113114
let mut buf = std::io::Cursor::new(slice);
114115
Arc::new(StorageCodec::decode(&mut buf)?)
115116
}
116-
PolyBytes::Typed(value) => {
117+
PolyBytes::Typed(value) | PolyBytes::Both(value, _) => {
117118
value.downcast_arc().map_err(|_| {
118119
StorageDecodeError::DecodeValue(
119120
anyhow::anyhow!(

crates/types/src/logs/record_cache.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::sync::Arc;
12+
1113
use moka::{
1214
ops::compute::Op,
1315
policy::EvictionPolicy,
1416
sync::{Cache, CacheBuilder},
1517
};
1618

19+
use crate::storage::PolyBytes;
20+
1721
use super::{LogletId, LogletOffset, Record, SequenceNumber};
1822

1923
/// Unique record key across different loglets.
@@ -64,14 +68,28 @@ impl RecordCache {
6468
let Some(existing) = existing else {
6569
return Op::Put(record.clone());
6670
};
67-
match (
68-
existing.value().body().is_encoded(),
69-
record.body().is_encoded(),
70-
) {
71-
// both are encoded, we don't want to replace the existing value.
72-
(true, true) | (false, false) | (false, true) => Op::Nop,
73-
// replace the existing value if the new one is deserialized.
74-
(true, false) => Op::Put(record.clone()),
71+
match (existing.value().body(), record.body()) {
72+
(PolyBytes::Bytes(_), PolyBytes::Bytes(_)) => Op::Nop,
73+
(PolyBytes::Bytes(_), PolyBytes::Typed(_)) => Op::Put(record.clone()),
74+
(PolyBytes::Bytes(_), PolyBytes::Both(typed, _)) => {
75+
// we only need to cache the typed value, let's repackage it.
76+
Op::Put(Record::from_parts(
77+
record.created_at(),
78+
record.keys().clone(),
79+
PolyBytes::Typed(Arc::clone(typed)),
80+
))
81+
}
82+
// Shouldn't happen (we only cache Typed or Bytes), but let's handle it anyway.
83+
(PolyBytes::Both(typed, _), _) =>
84+
// repackge the existing value into Typed only
85+
{
86+
Op::Put(Record::from_parts(
87+
existing.value().created_at(),
88+
existing.value().keys().clone(),
89+
PolyBytes::Typed(Arc::clone(typed)),
90+
))
91+
}
92+
(PolyBytes::Typed(_), _) => Op::Nop,
7593
}
7694
});
7795
}

0 commit comments

Comments
 (0)