diff --git a/Cargo.lock b/Cargo.lock index 417c1b6004..8885f09793 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7119,6 +7119,23 @@ dependencies = [ "tower 0.5.2", ] +[[package]] +name = "restate-ingress-core" +version = "1.6.0-dev" +dependencies = [ + "arc-swap", + "dashmap", + "futures", + "pin-project-lite", + "restate-core", + "restate-types", + "thiserror 2.0.17", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + [[package]] name = "restate-ingress-http" version = "1.6.0-dev" diff --git a/Cargo.toml b/Cargo.toml index cae392d633..050d71154d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ restate-types = { path = "crates/types" } restate-utoipa = { path = "crates/utoipa" } restate-wal-protocol = { path = "crates/wal-protocol" } restate-worker = { path = "crates/worker" } +restate-ingress-core = { path = "crates/ingress-core" } # this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo # outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead diff --git a/crates/ingress-core/Cargo.toml b/crates/ingress-core/Cargo.toml new file mode 100644 index 0000000000..5a8da1174e --- /dev/null +++ b/crates/ingress-core/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "restate-ingress-core" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true + +[dependencies] +arc-swap = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +pin-project-lite = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true } +tokio-stream = { workspace = true } +tokio-util = { workspace = true } +tracing = { workspace = true } + +restate-core = { workspace = true } +restate-types = { workspace = true } diff --git a/crates/ingress-core/src/chunks_timeout.rs b/crates/ingress-core/src/chunks_timeout.rs new file mode 100644 index 0000000000..38a6e3f127 --- /dev/null +++ b/crates/ingress-core/src/chunks_timeout.rs @@ -0,0 +1,93 @@ +use tokio::time::{Sleep, sleep}; +use tokio_stream::adapters::Fuse; +use tokio_stream::{Stream, StreamExt}; + +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll, ready}; +use pin_project_lite::pin_project; +use std::time::Duration; + +// This file is a copy from `tokio_stream` until PR https://github.com/tokio-rs/tokio/pull/7715 is released + +pin_project! { + /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method. + #[must_use = "streams do nothing unless polled"] + #[derive(Debug)] + pub struct ChunksTimeout { + #[pin] + stream: Fuse, + #[pin] + deadline: Option, + duration: Duration, + items: Vec, + cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 + } +} + +impl ChunksTimeout { + pub fn new(stream: S, max_size: usize, duration: Duration) -> Self { + ChunksTimeout { + stream: stream.fuse(), + deadline: None, + duration, + items: Vec::with_capacity(max_size), + cap: max_size, + } + } + /// Drains the buffered items, returning them without waiting for the timeout or capacity limit. + pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec { + let me = self.as_mut().project(); + std::mem::take(me.items) + } +} + +impl Stream for ChunksTimeout { + type Item = Vec; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.as_mut().project(); + loop { + match me.stream.as_mut().poll_next(cx) { + Poll::Pending => break, + Poll::Ready(Some(item)) => { + if me.items.is_empty() { + me.deadline.set(Some(sleep(*me.duration))); + me.items.reserve_exact(*me.cap); + } + me.items.push(item); + if me.items.len() >= *me.cap { + return Poll::Ready(Some(std::mem::take(me.items))); + } + } + Poll::Ready(None) => { + // Returning Some here is only correct because we fuse the inner stream. + let last = if me.items.is_empty() { + None + } else { + Some(std::mem::take(me.items)) + }; + + return Poll::Ready(last); + } + } + } + + if !me.items.is_empty() { + if let Some(deadline) = me.deadline.as_pin_mut() { + ready!(deadline.poll(cx)); + } + return Poll::Ready(Some(std::mem::take(me.items))); + } + + Poll::Pending + } + + fn size_hint(&self) -> (usize, Option) { + let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let (lower, upper) = self.stream.size_hint(); + let lower = (lower / self.cap).saturating_add(chunk_len); + let upper = upper.and_then(|x| x.checked_add(chunk_len)); + (lower, upper) + } +} diff --git a/crates/ingress-core/src/ingress.rs b/crates/ingress-core/src/ingress.rs new file mode 100644 index 0000000000..09518d35da --- /dev/null +++ b/crates/ingress-core/src/ingress.rs @@ -0,0 +1,134 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{collections::HashMap, sync::Arc}; + +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; + +use restate_core::{ + network::{Networking, TransportConnect}, + partitions::PartitionRouting, +}; +use restate_types::{ + identifiers::{PartitionId, PartitionKey}, + live::Live, + net::ingress::IngestRecord, + partitions::{FindPartition, PartitionTable, PartitionTableError}, +}; + +use crate::{ + RecordCommit, SessionOptions, + session::{SessionHandle, SessionManager}, +}; + +/// Errors that can be observed when interacting with the ingress facade. +#[derive(Debug, thiserror::Error)] +pub enum IngestionError { + #[error("Ingress closed")] + Closed, + #[error(transparent)] + PartitionTableError(#[from] PartitionTableError), +} + +/// High-level ingress entry point that allocates permits and hands out session handles per partition. +#[derive(Clone)] +pub struct Ingress { + manager: SessionManager, + partition_table: Live, + // budget for inflight invocations. + // this should be a memory budget but it's + // not possible atm to compute the serialization + // size of an invocation. + permits: Arc, + + // session handles cache just to avoid + // cloning the handle on each ingest request + handles: HashMap, +} + +impl Ingress { + /// Builds a new ingress facade with the provided networking stack, partition metadata, and memory + /// budget for inflight records. + pub fn new( + networking: Networking, + partition_table: Live, + partition_routing: PartitionRouting, + budget: usize, + opts: Option, + ) -> Self { + Self { + manager: SessionManager::new(networking, partition_routing, opts), + partition_table, + permits: Arc::new(Semaphore::new(budget)), + handles: HashMap::default(), + } + } +} + +impl Ingress +where + T: TransportConnect, +{ + /// Reserves an inflight slot and ties it to an [`IngressPermit`] that can ingest exactly one record. + pub async fn reserve(&mut self) -> Result, IngestionError> { + let permit = self + .permits + .clone() + .acquire_owned() + .await + .map_err(|_| IngestionError::Closed)?; + + Ok(IngressPermit { + permit, + ingress: self, + }) + } + + /// Once closed, calls to ingest will return [`IngestionError::Closed`]. + /// Inflight records might still get committed. + pub fn close(&self) { + self.permits.close(); + self.manager.close(); + } +} + +/// Permit that owns capacity for a single record ingest against an [`Ingress`] instance. +pub struct IngressPermit<'a, T> { + permit: OwnedSemaphorePermit, + ingress: &'a mut Ingress, +} + +impl<'a, T> IngressPermit<'a, T> +where + T: TransportConnect, +{ + /// Sends a record to the partition derived from the supplied [`PartitionKey`], consuming the permit. + pub fn ingest( + self, + partition_key: PartitionKey, + record: impl Into, + ) -> Result { + let partition_id = self + .ingress + .partition_table + .pinned() + .find_partition_id(partition_key)?; + + let handle = self + .ingress + .handles + .entry(partition_id) + .or_insert_with(|| self.ingress.manager.get(partition_id)); + + handle + .ingest(self.permit, record.into()) + .map_err(|_| IngestionError::Closed) + } +} diff --git a/crates/ingress-core/src/lib.rs b/crates/ingress-core/src/lib.rs new file mode 100644 index 0000000000..65465b559b --- /dev/null +++ b/crates/ingress-core/src/lib.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod chunks_timeout; +mod ingress; +mod session; + +pub use ingress::{IngestionError, Ingress, IngressPermit}; +pub use session::{CommitError, RecordCommit, SessionOptions}; diff --git a/crates/ingress-core/src/session.rs b/crates/ingress-core/src/session.rs new file mode 100644 index 0000000000..188fbfbb71 --- /dev/null +++ b/crates/ingress-core/src/session.rs @@ -0,0 +1,487 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{ + collections::{HashMap, VecDeque}, + ops::Deref, + sync::{Arc, OnceLock}, + time::Duration, +}; + +use arc_swap::ArcSwap; +use dashmap::DashMap; +use futures::{FutureExt, future::OptionFuture, ready}; +use tokio::sync::{OwnedSemaphorePermit, mpsc, oneshot}; +use tokio_stream::{StreamExt, wrappers::UnboundedReceiverStream}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, trace}; + +use restate_core::{ + TaskCenter, TaskKind, cancellation_token, + network::{ + ConnectError, Connection, ConnectionClosed, NetworkSender, Networking, ReplyRx, Swimlane, + TransportConnect, + }, + partitions::PartitionRouting, +}; +use restate_types::{ + identifiers::PartitionId, + net::ingress::{IngestRecord, IngestRequest, IngestResponse}, + retries::{RetryIter, RetryPolicy}, +}; + +use crate::chunks_timeout::ChunksTimeout; + +/// Error returned when attempting to use a session that has already been closed. +#[derive(Clone, Copy, Debug, thiserror::Error)] +#[error("Partition session is closed")] +pub struct SessionClosed; + +/// Commitment failures that can be observed when waiting on [`RecordCommit`]. +#[derive(Debug, Clone, Copy, thiserror::Error)] +pub enum CommitError { + #[error("commit cancelled")] + Cancelled, +} + +/// Future that is resolved to the commit result +/// A [`CommitError::Cancelled`] might be returned +/// if [`crate::Ingress`] is closed while record is in +/// flight. This does not guarantee that the record +/// was ont processed or committed. +pub struct RecordCommit { + rx: oneshot::Receiver>, +} + +impl Future for RecordCommit { + type Output = Result<(), CommitError>; + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match ready!(self.rx.poll_unpin(cx)) { + Ok(result) => std::task::Poll::Ready(result), + Err(_) => std::task::Poll::Ready(Err(CommitError::Cancelled)), + } + } +} + +impl RecordCommit { + fn new() -> (Self, RecordCommitResolver) { + let (tx, rx) = oneshot::channel(); + (Self { rx }, RecordCommitResolver { tx }) + } +} + +struct RecordCommitResolver { + tx: oneshot::Sender>, +} + +impl RecordCommitResolver { + /// Resolve the [`RecordCommit`] to committed. + pub fn committed(self) { + let _ = self.tx.send(Ok(())); + } + + /// explicitly cancel the RecordCommit + /// If resolver is dropped, the RecordCommit + /// will resolve to [`CommitError::Cancelled`] + #[allow(dead_code)] + pub fn cancelled(self) { + let _ = self.tx.send(Err(CommitError::Cancelled)); + } +} + +struct IngressBatch { + records: Arc<[IngestRecord]>, + trackers: Vec, + reply_rx: Option>, +} + +impl IngressBatch { + /// Marks every tracked record in the batch as committed. + pub fn committed(self) { + for tracker in self.trackers { + tracker.resolver.committed(); + } + } +} + +/// Tunable parameters for batching and networking behaviour of partition sessions. +#[derive(Debug, Clone)] +pub struct SessionOptions { + /// Maximum batch size + pub batch_size: usize, + /// Maximum batch timeout before it's sent + pub batch_timeout: Duration, + /// connection retry policy + pub connect_retry_policy: RetryPolicy, +} + +impl Default for SessionOptions { + fn default() -> Self { + Self { + batch_size: 250, + batch_timeout: Duration::from_millis(100), + connect_retry_policy: RetryPolicy::exponential( + Duration::from_millis(10), + 2.0, + None, + Some(Duration::from_secs(1)), + ), + } + } +} + +struct RecordTracker { + _permit: OwnedSemaphorePermit, + resolver: RecordCommitResolver, +} + +/// Cloneable sender that enqueues records for a specific partition session. +#[derive(Clone)] +pub struct SessionHandle { + tx: mpsc::UnboundedSender<(RecordTracker, IngestRecord)>, +} + +impl SessionHandle { + /// Enqueues an ingest request along with the owned permit and returns a future tracking commit outcome. + pub fn ingest( + &self, + permit: OwnedSemaphorePermit, + record: IngestRecord, + ) -> Result { + let (commit, resolver) = RecordCommit::new(); + self.tx + .send(( + RecordTracker { + _permit: permit, + resolver, + }, + record, + )) + .map_err(|_| SessionClosed)?; + + Ok(commit) + } +} + +enum SessionState { + Connecting { retry: RetryIter<'static> }, + Connected { connection: Connection }, + Disconnected, + Shutdown, +} + +/// Background task that drives the lifecycle of a single partition connection. +pub struct PartitionSession { + partition: PartitionId, + partition_routing: PartitionRouting, + networking: Networking, + opts: SessionOptions, + rx: UnboundedReceiverStream<(RecordTracker, IngestRecord)>, + tx: mpsc::UnboundedSender<(RecordTracker, IngestRecord)>, + inflight: VecDeque, +} + +impl PartitionSession { + fn new( + networking: Networking, + partition_routing: PartitionRouting, + partition: PartitionId, + opts: SessionOptions, + ) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let rx = UnboundedReceiverStream::new(rx); + + Self { + partition, + partition_routing, + networking, + opts, + inflight: Default::default(), + rx, + tx, + } + } + + /// Returns a handle that can be used by callers to enqueue new records. + pub fn handle(&self) -> SessionHandle { + SessionHandle { + tx: self.tx.clone(), + } + } +} + +impl PartitionSession +where + T: TransportConnect, +{ + /// Runs the session state machine until shut down, reacting to cancellation and connection errors. + pub async fn start(mut self, ctx: CancellationToken) { + let mut state = SessionState::Connecting { + retry: self.opts.connect_retry_policy.clone().into_iter(), + }; + + debug!( + partition_id = %self.partition, + "Starting ingress partition session", + ); + + loop { + state = match state { + SessionState::Connecting { retry } => self.connect(&ctx, retry).await, + SessionState::Connected { connection } => self.connected(&ctx, connection).await, + SessionState::Disconnected => SessionState::Connecting { + retry: self.opts.connect_retry_policy.clone().into_iter(), + }, + SessionState::Shutdown => { + self.rx.close(); + break; + } + } + } + } + + async fn connect( + &self, + ctx: &CancellationToken, + mut retry: RetryIter<'static>, + ) -> SessionState { + let Some(node_id) = self.partition_routing.get_node_by_partition(self.partition) else { + tokio::time::sleep(retry.next().unwrap_or_else(|| Duration::from_secs(1))).await; + + return SessionState::Connecting { retry }; + }; + + tokio::select! { + result = self + .networking + .get_connection(node_id, Swimlane::IngressData) => { + match result { + Ok(connection) => SessionState::Connected { connection }, + Err(ConnectError::Shutdown(_)) => SessionState::Shutdown, + Err(err) => { + debug!("Failed to connect to node {node_id}: {err}"); + tokio::time::sleep(retry.next().unwrap_or_else(|| Duration::from_secs(1))).await; + SessionState::Connecting { retry } + } + } + } + _ = ctx.cancelled() => { + SessionState::Shutdown + } + } + } + + /// Re-sends all inflight batches after a connection is restored. + async fn replay(&mut self, connection: &Connection) -> Result<(), ConnectionClosed> { + //todo(azmy): to avoid all the inflight batches again and waste traffic + //maybe test the connection first by sending an empty batch and wait for response + //before proceeding? + + for batch in self.inflight.iter_mut() { + let Some(permit) = connection.reserve().await else { + return Err(ConnectionClosed); + }; + + // resend batch + let reply_rx = permit + .send_rpc( + IngestRequest::from(Arc::clone(&batch.records)), + Some(self.partition.into()), + ) + .expect("encoding version to match"); + batch.reply_rx = reply_rx.into(); + } + + Ok(()) + } + + async fn connected(&mut self, ctx: &CancellationToken, connection: Connection) -> SessionState { + if self.replay(&connection).await.is_err() { + return SessionState::Disconnected; + } + + let chunked = + ChunksTimeout::new(&mut self.rx, self.opts.batch_size, self.opts.batch_timeout); + tokio::pin!(chunked); + + let state = loop { + let head: OptionFuture<_> = self + .inflight + .front_mut() + .and_then(|batch| batch.reply_rx.as_mut()) + .into(); + + tokio::select! { + Some(batch) = chunked.next() => { + let (trackers, records): (Vec<_>, Vec<_>) = batch.into_iter().unzip(); + let batch: Arc<[IngestRecord]> = Arc::from(records); + + let Some(permit) = connection.reserve().await else { + break SessionState::Disconnected; + }; + + trace!("Sending ingest batch, len: {}", batch.len()); + let reply_rx = permit + .send_rpc(IngestRequest::from(Arc::clone(&batch)), Some(self.partition.into())) + .expect("encoding version to match"); + + let batch = IngressBatch{records: batch, trackers, reply_rx: Some(reply_rx)}; + self.inflight.push_back(batch); + } + Some(result) = head => { + match result { + Ok(IngestResponse::Ack) => { + let batch = self.inflight.pop_front().expect("not empty"); + batch.committed(); + } + Ok(response) => { + // Handle any other error as a connection loss + // and retry all inflight batches. + debug!("Ingest response '{:?}'", response); + break SessionState::Disconnected; + } + Err(_err) => { + // we can assume that for any error + // we need to retry all the inflight bathes. + // special case for load shedding we could + // throttle the stream a little bit then + // speed up over a period of time. + + break SessionState::Disconnected; + } + } + } + _ = ctx.cancelled() => { + // relies on auto drain and drop of the inflight + // batches to notify callers that records has been + // cancelled. + return SessionState::Shutdown; + } + } + }; + + // don't lose the buffered batch + let remainder = chunked.into_remainder(); + if !remainder.is_empty() { + let (trackers, records): (Vec<_>, Vec<_>) = remainder.into_iter().unzip(); + let batch: Arc<[IngestRecord]> = Arc::from(records); + self.inflight.push_back(IngressBatch { + records: batch, + trackers, + reply_rx: None, + }); + } + + state + } +} + +struct SessionManagerInner { + networking: Networking, + partition_routing: PartitionRouting, + opts: SessionOptions, + ctx: CancellationToken, + published: ArcSwap>, + locks: DashMap>, +} + +impl SessionManagerInner +where + T: TransportConnect, +{ + /// Gets or start a new session to partition with given partition id. + pub fn get(&self, id: PartitionId) -> SessionHandle { + let inner = self.published.load(); + match inner.get(&id) { + Some(handle) => handle.clone(), + None => { + let once = self.locks.entry(id).or_default(); + + let handle = once.get_or_init(|| { + let session = PartitionSession::new( + self.networking.clone(), + self.partition_routing.clone(), + id, + self.opts.clone(), + ); + + let handle = session.handle(); + + //todo(azmy): handle spawn result + let ctx = self.ctx.clone(); + let _ = TaskCenter::spawn_child( + TaskKind::Background, + "ingress-partition-session", + async move { + session.start(ctx).await; + Ok(()) + }, + ); + + handle + }); + + self.published.rcu(|current| { + let mut current = current.deref().clone(); + current.entry(id).or_insert(handle.clone()); + current + }); + + handle.clone() + } + } + } +} + +/// Manager that owns all partition sessions and caches their handles. +#[derive(Clone)] +pub struct SessionManager { + inner: Arc>, +} + +impl SessionManager { + /// Creates a new session manager with optional overrides for session behaviour. + pub fn new( + networking: Networking, + partition_routing: PartitionRouting, + opts: Option, + ) -> Self { + let inner = SessionManagerInner { + networking, + partition_routing, + opts: opts.unwrap_or_default(), + published: Default::default(), + locks: Default::default(), + ctx: cancellation_token().child_token(), + }; + + Self { + inner: Arc::new(inner), + } + } +} + +impl SessionManager +where + T: TransportConnect, +{ + /// Returns a handle to the session for the given partition, creating it if needed. + pub fn get(&self, id: PartitionId) -> SessionHandle { + self.inner.get(id) + } + + /// Signals all sessions to shut down and prevents new work from being scheduled. + pub fn close(&self) { + self.inner.ctx.cancel(); + } +} diff --git a/crates/types/src/net/ingress.rs b/crates/types/src/net/ingress.rs new file mode 100644 index 0000000000..7c3409055d --- /dev/null +++ b/crates/types/src/net/ingress.rs @@ -0,0 +1,123 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::sync::Arc; + +use bytes::{Bytes, BytesMut}; + +use crate::identifiers::PartitionId; +use crate::logs::{HasRecordKeys, Keys}; +use crate::net::partition_processor::PartitionLeaderService; +use crate::net::{RpcRequest, bilrost_wire_codec, default_wire_codec, define_rpc}; +use crate::storage::{StorageCodec, StorageCodecKind, StorageEncode, StorageEncodeError}; + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize)] +pub enum IngestDedup { + #[default] + None, + Producer { + producer_id: u128, + offset: u64, + }, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct IngestRecord { + pub keys: Keys, + pub record: Bytes, +} + +impl IngestRecord { + pub fn from_parts(keys: Keys, record: T) -> Self + where + T: StorageEncode, + { + let mut buf = BytesMut::new(); + StorageCodec::encode(&record, &mut buf).expect("encode to pass"); + + Self { + keys, + record: buf.freeze(), + } + } +} + +impl HasRecordKeys for IngestRecord { + fn record_keys(&self) -> Keys { + self.keys.clone() + } +} + +impl StorageEncode for IngestRecord { + fn default_codec(&self) -> StorageCodecKind { + self.record.default_codec() + } + + fn encode(&self, buf: &mut BytesMut) -> Result<(), StorageEncodeError> { + self.record.encode(buf) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct IngestRequest { + pub records: Arc<[IngestRecord]>, +} + +impl From> for IngestRequest { + fn from(records: Arc<[IngestRecord]>) -> Self { + Self { records } + } +} + +// todo(azmy): Use bilrost (depends on the payload) +default_wire_codec!(IngestRequest); + +#[derive(Debug, bilrost::Oneof, bilrost::Message)] +pub enum IngestResponse { + Unknown, + #[bilrost(tag = 1, message)] + Ack, + #[bilrost(tag = 2, message)] + Starting, + #[bilrost(tag = 3, message)] + Stopping, + #[bilrost(tag = 4, message)] + NotLeader { + of: PartitionId, + }, + #[bilrost(tag = 5, message)] + Internal { + msg: String, + }, +} + +bilrost_wire_codec!(IngestResponse); + +define_rpc! { + @request=IngestRequest, + @response=IngestResponse, + @service=PartitionLeaderService, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct ReceivedIngestRequest { + pub records: Vec, +} + +default_wire_codec!(ReceivedIngestRequest); + +/// The [`ReceivedIngestRequest`] uses the same TYPE +/// as [`IngestRequest`] to be able to directly decode +/// received RPC message directly to this type. +impl RpcRequest for ReceivedIngestRequest { + const TYPE: &str = stringify!(IngestRequest); + type Response = IngestResponse; + type Service = PartitionLeaderService; +} diff --git a/crates/types/src/net/mod.rs b/crates/types/src/net/mod.rs index 463a0e5646..ff48f85b69 100644 --- a/crates/types/src/net/mod.rs +++ b/crates/types/src/net/mod.rs @@ -11,6 +11,7 @@ pub mod address; pub mod codec; pub mod connect_opts; +pub mod ingress; pub mod listener; pub mod log_server; pub mod metadata;