Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions crates/types/src/net/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ use std::sync::Arc;
use bytes::{Bytes, BytesMut};
use metrics::Key;

use restate_encoding::ArcedSlice;

use crate::identifiers::PartitionId;
use crate::logs::{HasRecordKeys, Keys};
use crate::net::partition_processor::PartitionLeaderService;
use crate::net::{RpcRequest, bilrost_wire_codec, default_wire_codec, define_rpc};
use crate::net::{RpcRequest, bilrost_wire_codec, define_rpc};
use crate::storage::{StorageCodec, StorageEncode};

#[derive(Debug, Eq, PartialEq, Clone, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Eq, PartialEq, Clone, bilrost::Message)]
pub struct IngestRecord {
#[bilrost(1)]
pub keys: Keys,
#[bilrost(2)]
pub record: Bytes,
}

Expand Down Expand Up @@ -50,8 +54,9 @@ impl HasRecordKeys for IngestRecord {
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, bilrost::Message)]
pub struct IngestRequest {
#[bilrost(tag(1), encoding(ArcedSlice<packed>))]
pub records: Arc<[IngestRecord]>,
}

Expand All @@ -69,8 +74,7 @@ impl From<Arc<[IngestRecord]>> for IngestRequest {
}
}

// todo(azmy): Use bilrost (depends on the payload)
default_wire_codec!(IngestRequest);
bilrost_wire_codec!(IngestRequest);

#[derive(Debug, Clone, bilrost::Oneof, bilrost::Message)]
pub enum ResponseStatus {
Expand Down Expand Up @@ -107,12 +111,13 @@ define_rpc! {
@service=PartitionLeaderService,
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]
#[derive(Debug, bilrost::Message)]
pub struct ReceivedIngestRequest {
#[bilrost(tag(1), encoding(packed))]
pub records: Vec<IngestRecord>,
}

default_wire_codec!(ReceivedIngestRequest);
bilrost_wire_codec!(ReceivedIngestRequest);

/// The [`ReceivedIngestRequest`] uses the same TYPE
/// as [`IngestRequest`] to be able to directly decode
Expand Down
200 changes: 76 additions & 124 deletions crates/worker/src/partition/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,48 @@
// by the Apache License, Version 2.0.

use std::ops::RangeInclusive;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use anyhow::Context;
use futures::StreamExt;
use futures::{Stream, StreamExt};
use tokio::sync::mpsc::{self, Sender};
use tokio::time::{Instant, MissedTickBehavior};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, instrument, warn};

use restate_bifrost::Bifrost;
use restate_core::cancellation_watcher;
use restate_core::{ShutdownError, TaskCenter, TaskHandle, TaskId, TaskKind, cancellation_watcher};
use restate_storage_api::invocation_status_table::{InvocationStatus, ScanInvocationStatusTable};
use restate_types::identifiers::WithPartitionKey;
use restate_types::identifiers::{LeaderEpoch, PartitionKey};
use restate_types::invocation::PurgeInvocationRequest;
use restate_types::identifiers::PartitionKey;
use restate_types::identifiers::{InvocationId, PartitionId};
use restate_types::retries::with_jitter;
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};

const CLEANER_EFFECT_QUEUE_SIZE: usize = 10;

#[derive(Debug, Clone)]
pub enum CleanerEffect {
PurgeInvocation(InvocationId),
PurgeJournal(InvocationId),
}

pub(super) struct CleanerHandle {
task_id: TaskId,
rx: ReceiverStream<CleanerEffect>,
}

impl CleanerHandle {
pub fn stop(self) -> Option<TaskHandle<()>> {
TaskCenter::cancel_task(self.task_id)
}

pub fn effects(&mut self) -> impl Stream<Item = CleanerEffect> {
&mut self.rx
}
}

pub(super) struct Cleaner<Storage> {
leader_epoch: LeaderEpoch,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
storage: Storage,
bifrost: Bifrost,
cleanup_interval: Duration,
}

Expand All @@ -39,53 +59,54 @@ where
Storage: ScanInvocationStatusTable + Send + Sync + 'static,
{
pub(super) fn new(
leader_epoch: LeaderEpoch,
storage: Storage,
bifrost: Bifrost,
partition_id: PartitionId,
partition_key_range: RangeInclusive<PartitionKey>,
cleanup_interval: Duration,
) -> Self {
Self {
leader_epoch,
partition_id,
partition_key_range,
storage,
bifrost,
cleanup_interval,
}
}

#[instrument(skip_all)]
pub(super) async fn run(self) -> anyhow::Result<()> {
let Self {
leader_epoch,
partition_key_range,
storage,
bifrost,
cleanup_interval,
} = self;
pub(super) fn start(self) -> Result<CleanerHandle, ShutdownError> {
let (tx, rx) = mpsc::channel(CLEANER_EFFECT_QUEUE_SIZE);
let task_id = TaskCenter::spawn_child(TaskKind::Cleaner, "cleaner", self.run(tx))?;

debug!(?cleanup_interval, "Running cleaner");
Ok(CleanerHandle {
task_id,
rx: ReceiverStream::new(rx),
})
}

let bifrost_envelope_source = Source::Processor {
partition_id: None,
partition_key: None,
leader_epoch,
};
#[instrument(skip_all)]
async fn run(self, tx: Sender<CleanerEffect>) -> anyhow::Result<()> {
debug!(
partition_id=%self.partition_id,
cleanup_interval=?self.cleanup_interval,
"Running cleaner"
);

// the cleaner is currently quite an expensive scan and we don't strictly need to do it on startup, so we will wait
// for 20-40% of the interval (so, 12-24 minutes by default) before doing the first one
let initial_wait = with_jitter(cleanup_interval.mul_f32(0.2), 1.0);
let initial_wait = with_jitter(self.cleanup_interval.mul_f32(0.2), 1.0);

// the first tick will fire after initial_wait
let mut interval =
tokio::time::interval_at(Instant::now() + initial_wait, cleanup_interval);
tokio::time::interval_at(Instant::now() + initial_wait, self.cleanup_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = Self::do_cleanup(&storage, &bifrost, partition_key_range.clone(), &bifrost_envelope_source).await {
warn!("Error when trying to cleanup completed invocations: {e:?}");
if let Err(e) = self.do_cleanup(&tx).await {
warn!(
partition_id=%self.partition_id,
"Error when trying to cleanup completed invocations: {e:?}"
);
}
},
_ = cancellation_watcher() => {
Expand All @@ -99,15 +120,12 @@ where
Ok(())
}

pub(super) async fn do_cleanup(
storage: &Storage,
bifrost: &Bifrost,
partition_key_range: RangeInclusive<PartitionKey>,
bifrost_envelope_source: &Source,
) -> anyhow::Result<()> {
debug!("Executing completed invocations cleanup");
pub(super) async fn do_cleanup(&self, tx: &Sender<CleanerEffect>) -> anyhow::Result<()> {
debug!(partition_id=%self.partition_id, "Starting invocation cleanup");

let invocations_stream = storage.scan_invocation_statuses(partition_key_range)?;
let invocations_stream = self
.storage
.scan_invocation_statuses(self.partition_key_range.clone())?;
tokio::pin!(invocations_stream);

while let Some((invocation_id, invocation_status)) = invocations_stream
Expand All @@ -132,24 +150,9 @@ where
.checked_add(completed_invocation.completion_retention_duration)
&& now >= status_expiration_time
{
restate_bifrost::append_to_bifrost(
bifrost,
Arc::new(Envelope {
header: Header {
source: bifrost_envelope_source.clone(),
dest: Destination::Processor {
partition_key: invocation_id.partition_key(),
dedup: None,
},
},
command: Command::PurgeInvocation(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
}),
)
.await
.context("Cannot append to bifrost purge invocation")?;
tx.send(CleanerEffect::PurgeInvocation(invocation_id))
.await
.context("Cannot append to bifrost purge invocation")?;
continue;
}

Expand All @@ -165,24 +168,9 @@ where
};

if now >= journal_expiration_time {
restate_bifrost::append_to_bifrost(
bifrost,
Arc::new(Envelope {
header: Header {
source: bifrost_envelope_source.clone(),
dest: Destination::Processor {
partition_key: invocation_id.partition_key(),
dedup: None,
},
},
command: Command::PurgeJournal(PurgeInvocationRequest {
invocation_id,
response_sink: None,
}),
}),
)
.await
.context("Cannot append to bifrost purge journal")?;
tx.send(CleanerEffect::PurgeJournal(invocation_id))
.await
.context("Cannot append to bifrost purge journal")?;
continue;
}
}
Expand All @@ -198,16 +186,13 @@ mod tests {

use futures::{Stream, stream};
use googletest::prelude::*;
use restate_core::{Metadata, TaskCenter, TaskKind, TestCoreEnvBuilder};
use restate_storage_api::StorageError;
use restate_storage_api::invocation_status_table::{
CompletedInvocation, InFlightInvocationMetadata, InvocationStatus,
InvokedInvocationStatusLite, JournalMetadata, ScanInvocationStatusTable,
};
use restate_storage_api::protobuf_types::v1::lazy::InvocationStatusV2Lazy;
use restate_types::Version;
use restate_types::identifiers::{InvocationId, InvocationUuid};
use restate_types::partition_table::{FindPartition, PartitionTable};
use test_log::test;

#[allow(dead_code)]
Expand Down Expand Up @@ -256,15 +241,6 @@ mod tests {
// Start paused makes sure the timer is immediately fired
#[test(restate_core::test(start_paused = true))]
pub async fn cleanup_works() {
let env = TestCoreEnvBuilder::with_incoming_only_connector()
.set_partition_table(PartitionTable::with_equally_sized_partitions(
Version::MIN,
1,
))
.build()
.await;
let bifrost = Bifrost::init_in_memory(env.metadata_writer).await;

let expired_invocation =
InvocationId::from_parts(PartitionKey::MIN, InvocationUuid::mock_random());
let expired_journal =
Expand Down Expand Up @@ -315,50 +291,26 @@ mod tests {
),
]);

TaskCenter::spawn(
TaskKind::Cleaner,
"cleaner",
Cleaner::new(
LeaderEpoch::INITIAL,
mock_storage,
bifrost.clone(),
RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX),
Duration::from_secs(1),
)
.run(),
let mut handle = Cleaner::new(
mock_storage,
0.into(),
RangeInclusive::new(PartitionKey::MIN, PartitionKey::MAX),
Duration::from_secs(1),
)
.start()
.unwrap();

// cleanup will run after around 200ms
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::advance(Duration::from_secs(1)).await;

// All the invocation ids were created with same partition keys, hence same partition id.
let partition_id = Metadata::with_current(|m| {
m.partition_table_snapshot()
.find_partition_id(expired_invocation.partition_key())
})
.unwrap();

let log_entries: Vec<_> = bifrost
.read_all(partition_id.into())
.await
.unwrap()
.into_iter()
.map(|e| e.try_decode::<Envelope>().unwrap().unwrap().command)
.collect();
let received: Vec<_> = handle.effects().ready_chunks(10).next().await.unwrap();

assert_that!(
log_entries,
received,
all!(
len(eq(2)),
contains(pat!(Command::PurgeInvocation(pat!(
PurgeInvocationRequest {
invocation_id: eq(expired_invocation),
}
)))),
contains(pat!(Command::PurgeJournal(pat!(PurgeInvocationRequest {
invocation_id: eq(expired_journal),
})))),
contains(pat!(CleanerEffect::PurgeInvocation(eq(expired_invocation)))),
contains(pat!(CleanerEffect::PurgeJournal(eq(expired_journal))))
)
);
}
Expand Down
Loading
Loading