Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ restate-types = { path = "crates/types" }
restate-utoipa = { path = "crates/utoipa" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-ingress-core = { path = "crates/ingress-core" }

# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead
Expand Down
24 changes: 24 additions & 0 deletions crates/ingress-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "restate-ingress-core"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
arc-swap = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
pin-project-lite = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }

restate-core = { workspace = true }
restate-types = { workspace = true }
93 changes: 93 additions & 0 deletions crates/ingress-core/src/chunks_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use tokio::time::{Sleep, sleep};
use tokio_stream::adapters::Fuse;
use tokio_stream::{Stream, StreamExt};

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll, ready};
use pin_project_lite::pin_project;
use std::time::Duration;

// This file is a copy from `tokio_stream` until PR https://github.com/tokio-rs/tokio/pull/7715 is released

pin_project! {
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct ChunksTimeout<S: Stream> {
#[pin]
stream: Fuse<S>,
#[pin]
deadline: Option<Sleep>,
duration: Duration,
items: Vec<S::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}

impl<S: Stream> ChunksTimeout<S> {
pub fn new(stream: S, max_size: usize, duration: Duration) -> Self {
ChunksTimeout {
stream: stream.fuse(),
deadline: None,
duration,
items: Vec::with_capacity(max_size),
cap: max_size,
}
}
/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
let me = self.as_mut().project();
std::mem::take(me.items)
}
}

impl<S: Stream> Stream for ChunksTimeout<S> {
type Item = Vec<S::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.as_mut().project();
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending => break,
Poll::Ready(Some(item)) => {
if me.items.is_empty() {
me.deadline.set(Some(sleep(*me.duration)));
me.items.reserve_exact(*me.cap);
}
me.items.push(item);
if me.items.len() >= *me.cap {
return Poll::Ready(Some(std::mem::take(me.items)));
}
}
Poll::Ready(None) => {
// Returning Some here is only correct because we fuse the inner stream.
let last = if me.items.is_empty() {
None
} else {
Some(std::mem::take(me.items))
};

return Poll::Ready(last);
}
}
}

if !me.items.is_empty() {
if let Some(deadline) = me.deadline.as_pin_mut() {
ready!(deadline.poll(cx));
}
return Poll::Ready(Some(std::mem::take(me.items)));
}

Poll::Pending
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = upper.and_then(|x| x.checked_add(chunk_len));
(lower, upper)
}
}
134 changes: 134 additions & 0 deletions crates/ingress-core/src/ingress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::{collections::HashMap, sync::Arc};

use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use restate_core::{
network::{Networking, TransportConnect},
partitions::PartitionRouting,
};
use restate_types::{
identifiers::{PartitionId, PartitionKey},
live::Live,
net::ingress::IngestRecord,
partitions::{FindPartition, PartitionTable, PartitionTableError},
};

use crate::{
RecordCommit, SessionOptions,
session::{SessionHandle, SessionManager},
};

/// Errors that can be observed when interacting with the ingress facade.
#[derive(Debug, thiserror::Error)]
pub enum IngestionError {
#[error("Ingress closed")]
Closed,
#[error(transparent)]
PartitionTableError(#[from] PartitionTableError),
}

/// High-level ingress entry point that allocates permits and hands out session handles per partition.
#[derive(Clone)]
pub struct Ingress<T> {
manager: SessionManager<T>,
partition_table: Live<PartitionTable>,
// budget for inflight invocations.
// this should be a memory budget but it's
// not possible atm to compute the serialization
// size of an invocation.
permits: Arc<Semaphore>,

// session handles cache just to avoid
// cloning the handle on each ingest request
handles: HashMap<PartitionId, SessionHandle>,
}

impl<T> Ingress<T> {
/// Builds a new ingress facade with the provided networking stack, partition metadata, and memory
/// budget for inflight records.
pub fn new(
networking: Networking<T>,
partition_table: Live<PartitionTable>,
partition_routing: PartitionRouting,
budget: usize,
opts: Option<SessionOptions>,
) -> Self {
Self {
manager: SessionManager::new(networking, partition_routing, opts),
partition_table,
permits: Arc::new(Semaphore::new(budget)),
handles: HashMap::default(),
}
}
}

impl<T> Ingress<T>
where
T: TransportConnect,
{
/// Reserves an inflight slot and ties it to an [`IngressPermit`] that can ingest exactly one record.
pub async fn reserve(&mut self) -> Result<IngressPermit<'_, T>, IngestionError> {
let permit = self
.permits
.clone()
.acquire_owned()
.await
.map_err(|_| IngestionError::Closed)?;

Ok(IngressPermit {
permit,
ingress: self,
})
}

/// Once closed, calls to ingest will return [`IngestionError::Closed`].
/// Inflight records might still get committed.
pub fn close(&self) {
self.permits.close();
self.manager.close();
}
}

/// Permit that owns capacity for a single record ingest against an [`Ingress`] instance.
pub struct IngressPermit<'a, T> {
permit: OwnedSemaphorePermit,
ingress: &'a mut Ingress<T>,
}

impl<'a, T> IngressPermit<'a, T>
where
T: TransportConnect,
{
/// Sends a record to the partition derived from the supplied [`PartitionKey`], consuming the permit.
pub fn ingest(
self,
partition_key: PartitionKey,
record: impl Into<IngestRecord>,
) -> Result<RecordCommit, IngestionError> {
let partition_id = self
.ingress
.partition_table
.pinned()
.find_partition_id(partition_key)?;

let handle = self
.ingress
.handles
.entry(partition_id)
.or_insert_with(|| self.ingress.manager.get(partition_id));

handle
.ingest(self.permit, record.into())
.map_err(|_| IngestionError::Closed)
}
}
16 changes: 16 additions & 0 deletions crates/ingress-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod chunks_timeout;
mod ingress;
mod session;

pub use ingress::{IngestionError, Ingress, IngressPermit};
pub use session::{CommitError, RecordCommit, SessionOptions};
Loading
Loading