diff --git a/Cargo.lock b/Cargo.lock index 56e8f5e446..e651351f67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7420,6 +7420,7 @@ dependencies = [ "base64 0.22.1", "bytes", "derive_more", + "futures", "metrics", "opentelemetry", "opentelemetry_sdk", @@ -7427,6 +7428,7 @@ dependencies = [ "rdkafka", "restate-bifrost", "restate-core", + "restate-ingestion-client", "restate-storage-api", "restate-timer-queue", "restate-tracing-instrumentation", @@ -7438,6 +7440,7 @@ dependencies = [ "tokio", "tracing", "tracing-opentelemetry", + "xxhash-rust", ] [[package]] @@ -7762,6 +7765,7 @@ dependencies = [ "restate-bifrost", "restate-core", "restate-futures-util", + "restate-ingestion-client", "restate-ingress-http", "restate-log-server", "restate-metadata-providers", @@ -7775,6 +7779,7 @@ dependencies = [ "restate-storage-query-datafusion", "restate-tracing-instrumentation", "restate-types", + "restate-wal-protocol", "restate-worker", "restate-workspace-hack", "rust-rocksdb", @@ -8398,6 +8403,7 @@ dependencies = [ "restate-core", "restate-errors", "restate-futures-util", + "restate-ingestion-client", "restate-ingress-http", "restate-ingress-kafka", "restate-invoker-api", @@ -8570,6 +8576,7 @@ dependencies = [ "typenum", "ulid", "uuid", + "xxhash-rust", "zerocopy 0.8.31", "zeroize", "zstd 0.13.2", diff --git a/crates/encoding/src/common.rs b/crates/encoding/src/common.rs index ad24714d49..0c6d7b7ed0 100644 --- a/crates/encoding/src/common.rs +++ b/crates/encoding/src/common.rs @@ -8,9 +8,55 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use bilrost::{ + DecodeErrorKind, + encoding::{EmptyState, ForOverwrite, Proxiable}, +}; use restate_encoding_derive::BilrostNewType; -use crate::NetSerde; +use crate::{NetSerde, bilrost_encodings::RestateEncoding}; + +struct U128Tag; + +impl Proxiable for u128 { + type Proxy = (u64, u64); + + fn encode_proxy(&self) -> Self::Proxy { + ((*self >> 64) as u64, *self as u64) + } + + fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> { + *self = (proxy.0 as u128) << 64 | proxy.1 as u128; + Ok(()) + } +} + +impl ForOverwrite for () { + fn for_overwrite() -> u128 { + 0 + } +} + +impl EmptyState for () { + fn empty() -> u128 { + 0 + } + + fn is_empty(val: &u128) -> bool { + *val == 0 + } + + fn clear(val: &mut u128) { + *val = 0; + } +} + +bilrost::delegate_proxied_encoding!( + use encoding (::bilrost::encoding::General) + to encode proxied type (u128) + using proxy tag (U128Tag) + with encoding (RestateEncoding) +); /// A Bilrost compatible U128 type. #[derive(Debug, Clone, Copy, PartialEq, Eq, BilrostNewType)] diff --git a/crates/ingress-kafka/Cargo.toml b/crates/ingress-kafka/Cargo.toml index c72409f053..f1a83a4964 100644 --- a/crates/ingress-kafka/Cargo.toml +++ b/crates/ingress-kafka/Cargo.toml @@ -20,6 +20,7 @@ restate-workspace-hack = { workspace = true } restate-bifrost = { workspace = true } restate-core = { workspace = true } +restate-ingestion-client = { workspace = true } restate-storage-api = { workspace = true } restate-timer-queue = { workspace = true } restate-types = { workspace = true } @@ -30,6 +31,7 @@ anyhow = { workspace = true } base64 = { workspace = true } bytes = { workspace = true } derive_more = { workspace = true } +futures = { workspace = true } metrics = { workspace = true } opentelemetry = { workspace = true } opentelemetry_sdk = { workspace = true } @@ -44,8 +46,9 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "rt"] } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } +xxhash-rust = { workspace = true, features = ["xxh3", "std"] } [dev-dependencies] restate-types = { workspace = true, features = ["test-util"] } -base64 = { workspace = true } \ No newline at end of file +base64 = { workspace = true } diff --git a/crates/ingress-kafka/src/builder.rs b/crates/ingress-kafka/src/builder.rs new file mode 100644 index 0000000000..f3cc96c8b2 --- /dev/null +++ b/crates/ingress-kafka/src/builder.rs @@ -0,0 +1,311 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::borrow::Borrow; + +use anyhow::bail; +use base64::Engine; +use bytes::Bytes; +use opentelemetry::propagation::{Extractor, TextMapPropagator}; +use opentelemetry::trace::{Span, SpanContext, TraceContextExt}; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use rdkafka::Message; +use rdkafka::message::BorrowedMessage; +use tracing::{info_span, trace}; + +use restate_storage_api::deduplication_table::DedupInformation; +use restate_types::identifiers::{InvocationId, WithPartitionKey, partitioner}; +use restate_types::invocation::{Header, InvocationTarget, ServiceInvocation, SpanRelation}; +use restate_types::live::Live; +use restate_types::schema::Schema; +use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTargetResolver}; +use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink, Subscription}; +use restate_wal_protocol::{Command, Destination, Envelope, Source}; + +use crate::Error; + +#[derive(Clone)] +pub struct EnvelopeBuilder { + subscription: Subscription, + schema: Live, + // avoids creating a new string for each invocation + subscription_id: String, +} + +impl EnvelopeBuilder { + pub fn new(subscription: Subscription, schema: Live) -> Self { + Self { + subscription_id: subscription.id().to_string(), + subscription, + schema, + } + } + + pub fn subscription(&self) -> &Subscription { + &self.subscription + } + + pub fn build( + &mut self, + producer_id: u128, + consumer_group_id: &str, + msg: BorrowedMessage<'_>, + ) -> Result { + // Prepare ingress span + let ingress_span = info_span!( + "kafka_ingress_consume", + otel.name = "kafka_ingress_consume", + messaging.system = "kafka", + messaging.operation = "receive", + messaging.source.name = msg.topic(), + messaging.source.partition = msg.partition(), + messaging.destination.name = %self.subscription.sink(), + restate.subscription.id = %self.subscription.id(), + messaging.consumer.group.name = consumer_group_id + ); + + trace!(parent: &ingress_span, "Building Kafka ingress request"); + + let key = if let Some(k) = msg.key() { + Bytes::copy_from_slice(k) + } else { + Bytes::default() + }; + let payload = if let Some(p) = msg.payload() { + Bytes::copy_from_slice(p) + } else { + Bytes::default() + }; + + let headers = Self::generate_events_attributes(&msg, &self.subscription_id); + let dedup = DedupInformation::producer(producer_id, msg.offset() as u64); + + let invocation = InvocationBuilder::create( + &self.subscription, + producer_id, + self.schema.live_load(), + key, + payload, + headers, + consumer_group_id, + msg.topic(), + msg.partition(), + msg.offset(), + ) + .map_err(|cause| Error::Event { + subscription: self.subscription_id.clone(), + topic: msg.topic().to_string(), + partition: msg.partition(), + offset: msg.offset(), + cause, + })?; + + Ok(self.wrap_service_invocation_in_envelope(invocation, dedup)) + } + + fn wrap_service_invocation_in_envelope( + &self, + service_invocation: Box, + dedup_information: DedupInformation, + ) -> Envelope { + let header = restate_wal_protocol::Header { + source: Source::Ingress {}, + dest: Destination::Processor { + partition_key: service_invocation.partition_key(), + dedup: Some(dedup_information), + }, + }; + + Envelope::new(header, Command::Invoke(service_invocation)) + } + + fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec
{ + let mut headers = Vec::with_capacity(6); + headers.push(Header::new("kafka.offset", msg.offset().to_string())); + headers.push(Header::new("kafka.topic", msg.topic())); + headers.push(Header::new("kafka.partition", msg.partition().to_string())); + if let Some(timestamp) = msg.timestamp().to_millis() { + headers.push(Header::new("kafka.timestamp", timestamp.to_string())); + } + headers.push(Header::new("restate.subscription.id", subscription_id)); + + if let Some(key) = msg.key() { + headers.push(Header::new( + "kafka.key", + &*base64::prelude::BASE64_URL_SAFE.encode(key), + )); + } + + headers + } +} + +#[derive(Debug)] +pub struct InvocationBuilder; + +impl InvocationBuilder { + #[allow(clippy::too_many_arguments)] + pub fn create( + subscription: &Subscription, + producer_id: u128, + schema: &Schema, + key: Bytes, + payload: Bytes, + headers: Vec, + consumer_group_id: &str, + topic: &str, + partition: i32, + offset: i64, + ) -> Result, anyhow::Error> { + let Sink::Invocation { + event_invocation_target_template, + } = subscription.sink(); + + let invocation_target = match event_invocation_target_template { + EventInvocationTargetTemplate::Service { name, handler } => { + InvocationTarget::service(name.clone(), handler.clone()) + } + EventInvocationTargetTemplate::VirtualObject { + name, + handler, + handler_ty, + } => InvocationTarget::virtual_object( + name.clone(), + std::str::from_utf8(&key) + .map_err(|e| anyhow::anyhow!("The Kafka record key must be valid UTF-8: {e}"))? + .to_owned(), + handler.clone(), + *handler_ty, + ), + EventInvocationTargetTemplate::Workflow { + name, + handler, + handler_ty, + } => InvocationTarget::workflow( + name.clone(), + std::str::from_utf8(&key) + .map_err(|e| anyhow::anyhow!("The Kafka record key must be valid UTF-8: {e}"))? + .to_owned(), + handler.clone(), + *handler_ty, + ), + }; + + // Compute the retention values + let target = schema + .resolve_latest_invocation_target( + invocation_target.service_name(), + invocation_target.handler_name(), + ) + .ok_or_else(|| anyhow::anyhow!("Service and handler are not registered"))?; + + if let DeploymentStatus::Deprecated(dp_id) = target.deployment_status { + bail!( + "the service {} is exposed by the deprecated deployment {dp_id}, please upgrade the SDK.", + invocation_target.service_name() + ) + } + + let invocation_retention = target.compute_retention(false); + + let seed = KafkaPartitionKeySeed { + producer: &producer_id, + offset: &offset, + }; + + let invocation_id = InvocationId::generate_or_else(&invocation_target, None, || { + partitioner::HashPartitioner::compute_partition_key(seed) + }); + + // Figure out tracing span + let ingress_span_context = prepare_tracing_span( + &invocation_id, + &invocation_target, + &headers, + consumer_group_id, + topic, + partition as i64, + offset, + ); + + // Finally generate service invocation + let mut service_invocation = Box::new(ServiceInvocation::initialize( + invocation_id, + invocation_target, + restate_types::invocation::Source::Subscription(subscription.id()), + )); + service_invocation.with_related_span(SpanRelation::parent(ingress_span_context)); + service_invocation.argument = payload; + service_invocation.headers = headers; + service_invocation.with_retention(invocation_retention); + + Ok(service_invocation) + } +} + +#[derive(Hash)] +/// Hashable seed that yields a deterministic partition key for service invocations, keeping +/// identical invocations on the same partition for deduplication. +struct KafkaPartitionKeySeed<'a> { + producer: &'a u128, + offset: &'a i64, +} + +#[allow(clippy::too_many_arguments)] +pub(crate) fn prepare_tracing_span( + invocation_id: &InvocationId, + invocation_target: &InvocationTarget, + headers: &[restate_types::invocation::Header], + consumer_group_name: &str, + topic: &str, + partition: i64, + offset: i64, +) -> SpanContext { + let tracing_context = TraceContextPropagator::new().extract(&HeaderExtractor(headers)); + let inbound_span = tracing_context.span(); + + let relation = if inbound_span.span_context().is_valid() { + SpanRelation::parent(inbound_span.span_context()) + } else { + SpanRelation::None + }; + + let span = restate_tracing_instrumentation::info_invocation_span!( + relation = relation, + prefix = "ingress_kafka", + id = invocation_id, + target = invocation_target, + tags = ( + messaging.system = "kafka", + messaging.consumer.group.name = consumer_group_name.to_owned(), + messaging.operation.type = "process", + messaging.kafka.offset = offset, + messaging.source.partition.id = partition, + messaging.source.name = topic.to_owned() + ) + ); + + span.span_context().clone() +} + +struct HeaderExtractor<'a>(pub &'a [restate_types::invocation::Header]); + +impl Extractor for HeaderExtractor<'_> { + fn get(&self, key: &str) -> Option<&str> { + self.0 + .iter() + .find(|h| h.name.eq_ignore_ascii_case(key)) + .map(|value| value.value.borrow()) + } + + fn keys(&self) -> Vec<&str> { + self.0.iter().map(|h| h.name.borrow()).collect::>() + } +} diff --git a/crates/ingress-kafka/src/consumer_task.rs b/crates/ingress-kafka/src/consumer_task.rs index f3b1300f86..bbde429b9c 100644 --- a/crates/ingress-kafka/src/consumer_task.rs +++ b/crates/ingress-kafka/src/consumer_task.rs @@ -8,238 +8,75 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt; +use std::hash::Hash; use std::sync::{Arc, OnceLock, Weak}; +use std::time::Duration; -use crate::dispatcher::{DispatchKafkaEvent, KafkaIngressDispatcher, KafkaIngressEvent}; -use crate::metric_definitions::{KAFKA_INGRESS_CONSUMER_LAG, KAFKA_INGRESS_REQUESTS}; -use base64::Engine; -use bytes::Bytes; +use anyhow::Context; +use futures::StreamExt; +use futures::future::OptionFuture; use metrics::{counter, gauge}; use rdkafka::consumer::stream_consumer::StreamPartitionQueue; use rdkafka::consumer::{ BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer, }; use rdkafka::error::KafkaError; -use rdkafka::message::BorrowedMessage; use rdkafka::topic_partition_list::TopicPartitionListElem; use rdkafka::types::RDKafkaErrorCode; use rdkafka::{ClientConfig, ClientContext, Message, Statistics}; -use restate_core::{TaskCenter, TaskHandle, TaskKind, task_center}; -use restate_types::invocation::Header; -use restate_types::live::Live; -use restate_types::message::MessageIndex; -use restate_types::schema::Schema; -use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink, Subscription}; -use tokio::sync::{mpsc, oneshot}; -use tracing::{Instrument, debug, info, info_span, warn}; - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error(transparent)] - Kafka(#[from] KafkaError), - #[error( - "error processing message topic {topic} partition {partition} offset {offset}: {cause}" - )] - Event { - topic: String, - partition: i32, - offset: i64, - #[source] - cause: anyhow::Error, - }, - #[error("ingress dispatcher channel is closed")] - IngressDispatcherClosed, - #[error( - "received a message on the main partition queue for topic {0} partition {1} despite partitioned queues" - )] - UnexpectedMainQueueMessage(String, i32), -} - -type MessageConsumer = StreamConsumer; - -#[derive(Debug, Hash)] -pub struct KafkaDeduplicationId { - consumer_group: String, - topic: String, - partition: i32, -} - -impl fmt::Display for KafkaDeduplicationId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{}-{}-{}", - self.consumer_group, self.topic, self.partition - ) - } -} -impl KafkaDeduplicationId { - pub(crate) fn requires_proxying(subscription: &Subscription) -> bool { - // Service event receiver requires proxying because we don't want to scatter deduplication ids (kafka topic/partition offsets) in all the Restate partitions. - matches!( - subscription.sink(), - Sink::Invocation { - event_invocation_target_template: EventInvocationTargetTemplate::Service { .. } - }, - ) - } -} - -#[derive(Clone)] -pub struct MessageSender { - subscription: Subscription, - dispatcher: KafkaIngressDispatcher, - schema: Live, - - subscription_id: String, - ingress_request_counter: metrics::Counter, -} - -impl MessageSender { - pub fn new( - subscription: Subscription, - dispatcher: KafkaIngressDispatcher, - schema: Live, - ) -> Self { - Self { - subscription_id: subscription.id().to_string(), - ingress_request_counter: counter!( - KAFKA_INGRESS_REQUESTS, - "subscription" => subscription.id().to_string() - ), - subscription, - dispatcher, - schema, - } - } - - async fn send(&self, consumer_group_id: &str, msg: BorrowedMessage<'_>) -> Result<(), Error> { - // Prepare ingress span - let ingress_span = info_span!( - "kafka_ingress_consume", - otel.name = "kafka_ingress_consume", - messaging.system = "kafka", - messaging.operation = "receive", - messaging.source.name = msg.topic(), - messaging.destination.name = %self.subscription.sink(), - restate.subscription.id = %self.subscription.id(), - messaging.consumer.group.name = consumer_group_id - ); - info!(parent: &ingress_span, "Processing Kafka ingress request"); - - let key = if let Some(k) = msg.key() { - Bytes::copy_from_slice(k) - } else { - Bytes::default() - }; - let payload = if let Some(p) = msg.payload() { - Bytes::copy_from_slice(p) - } else { - Bytes::default() - }; - let headers = Self::generate_events_attributes(&msg, &self.subscription_id); - - let (deduplication_id, deduplication_index) = - Self::generate_deduplication_id(consumer_group_id, &msg); - let req = KafkaIngressEvent::new( - &self.subscription, - self.schema.pinned(), - key, - payload, - deduplication_id, - deduplication_index, - headers, - consumer_group_id, - msg.topic(), - msg.partition(), - msg.offset(), - ) - .map_err(|cause| Error::Event { - topic: msg.topic().to_string(), - partition: msg.partition(), - offset: msg.offset(), - cause, - })?; - - self.ingress_request_counter.increment(1); - - self.dispatcher - .dispatch_kafka_event(req) - .instrument(ingress_span) - .await - .map_err(|_| Error::IngressDispatcherClosed)?; - Ok(()) - } - - fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec
{ - let mut headers = Vec::with_capacity(6); - headers.push(Header::new("kafka.offset", msg.offset().to_string())); - headers.push(Header::new("kafka.topic", msg.topic())); - headers.push(Header::new("kafka.partition", msg.partition().to_string())); - if let Some(timestamp) = msg.timestamp().to_millis() { - headers.push(Header::new("kafka.timestamp", timestamp.to_string())); - } - headers.push(Header::new( - "restate.subscription.id".to_string(), - subscription_id, - )); - - if let Some(key) = msg.key() { - headers.push(Header::new( - "kafka.key", - &*base64::prelude::BASE64_URL_SAFE.encode(key), - )); - } - - headers - } +use restate_core::network::{NetworkSender, Swimlane, TransportConnect}; +use restate_core::{Metadata, TaskCenter, TaskHandle, TaskKind, task_center}; +use restate_ingestion_client::{IngestionClient, IngestionError, RecordCommit}; +use restate_types::identifiers::partitioner::HashPartitioner; +use restate_types::identifiers::{SubscriptionId, WithPartitionKey}; +use restate_types::net::ingest::{DedupSequenceNrQueryRequest, ProducerId, ResponseStatus}; +use restate_types::partitions::FindPartition; +use restate_types::retries::RetryPolicy; +use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink}; +use restate_wal_protocol::Envelope; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, info, instrument, trace, warn}; - fn generate_deduplication_id( - consumer_group: &str, - msg: &impl Message, - ) -> (KafkaDeduplicationId, MessageIndex) { - ( - KafkaDeduplicationId { - consumer_group: consumer_group.to_owned(), - topic: msg.topic().to_owned(), - partition: msg.partition(), - }, - msg.offset() as u64, - ) - } +use crate::Error; +use crate::builder::EnvelopeBuilder; +use crate::metric_definitions::{KAFKA_INGRESS_CONSUMER_LAG, KAFKA_INGRESS_REQUESTS}; - fn update_consumer_stats(&self, stats: Statistics) { - for topic in stats.topics { - for partition in topic.1.partitions { - let lag = partition.1.consumer_lag as f64; - gauge!( - KAFKA_INGRESS_CONSUMER_LAG, - "subscription" => self.subscription.id().to_string(), - "topic" => topic.0.to_string(), - "partition" => partition.0.to_string() - ) - .set(lag); - } +impl From for Error { + fn from(value: IngestionError) -> Self { + match value { + IngestionError::Closed => Self::IngestionClosed, + IngestionError::PartitionTableError(err) => Self::PartitionTableError(err), } } } +type MessageConsumer = StreamConsumer>; #[derive(Clone)] -pub struct ConsumerTask { +pub struct ConsumerTask { client_config: ClientConfig, topics: Vec, - sender: MessageSender, + ingestion: IngestionClient, + builder: EnvelopeBuilder, } -impl ConsumerTask { - pub fn new(client_config: ClientConfig, topics: Vec, sender: MessageSender) -> Self { +impl ConsumerTask +where + T: TransportConnect, +{ + pub fn new( + client_config: ClientConfig, + topics: Vec, + ingestion: IngestionClient, + builder: EnvelopeBuilder, + ) -> Self { Self { client_config, topics, - sender, + ingestion, + builder, } } @@ -251,7 +88,7 @@ impl ConsumerTask { .expect("group.id must be set") .to_string(); debug!( - restate.subscription.id = %self.sender.subscription.id(), + restate.subscription.id = %self.builder.subscription().id(), messaging.consumer.group.name = consumer_group_id, "Starting consumer for topics {:?} with configuration {:?}", self.topics, self.client_config @@ -264,10 +101,11 @@ impl ConsumerTask { consumer: OnceLock::new(), topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()), failures_tx, - sender: self.sender.clone(), + ingestion: self.ingestion.clone(), + builder: self.builder.clone(), consumer_group_id, }; - let consumer: Arc = + let consumer: Arc> = Arc::new(self.client_config.create_with_context(rebalance_context)?); // this OnceLock dance is needed because the rebalance callbacks don't get a handle on the consumer, // which is strange because practically everything you'd want to do with them involves the consumer. @@ -303,9 +141,9 @@ impl ConsumerTask { } #[derive(derive_more::Deref)] -struct ConsumerDrop(Arc); +struct ConsumerDrop(Arc>); -impl Drop for ConsumerDrop { +impl Drop for ConsumerDrop { fn drop(&mut self) { debug!( "Stopping consumer with id {}", @@ -332,18 +170,33 @@ impl fmt::Display for TopicPartition { } } -struct RebalanceContext { +struct RebalanceContext { task_center_handle: task_center::Handle, - consumer: OnceLock>, + consumer: OnceLock>>, topic_partition_tasks: parking_lot::Mutex>, failures_tx: mpsc::UnboundedSender, - sender: MessageSender, + ingestion: IngestionClient, + builder: EnvelopeBuilder, consumer_group_id: String, } -impl ClientContext for RebalanceContext { +impl ClientContext for RebalanceContext +where + T: TransportConnect, +{ fn stats(&self, statistics: Statistics) { - self.sender.update_consumer_stats(statistics); + for topic in statistics.topics { + for partition in topic.1.partitions { + let lag = partition.1.consumer_lag as f64; + gauge!( + KAFKA_INGRESS_CONSUMER_LAG, + "subscription" => self.builder.subscription().id().to_string(), + "topic" => topic.0.to_string(), + "partition" => partition.0.to_string() + ) + .set(lag); + } + } } } @@ -358,7 +211,10 @@ impl ClientContext for RebalanceContext { // and their queues are destroyed. Split partition queues will stop working in this case. We should ensure // that they are not polled again after the assign. Then there will be a further rebalance callback after the revoke // and we will set up new split partition streams before the assign. -impl ConsumerContext for RebalanceContext { +impl ConsumerContext for RebalanceContext +where + T: TransportConnect, +{ fn pre_rebalance(&self, _base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) { let mut topic_partition_tasks = self.topic_partition_tasks.lock(); let consumer = self @@ -375,6 +231,12 @@ impl ConsumerContext for RebalanceContext { Rebalance::Assign(partitions) if partitions.count() > 0 => { for partition in partitions.elements() { let partition: TopicPartition = partition.into(); + info!( + subscription = %self.builder.subscription().id(), + topic = %partition.0, + partition = %partition.1, + "Assigned kafka partition" + ); if let Some(task_id) = topic_partition_tasks.remove(&partition) { // This probably implies a problem in our assumptions, because librdkafka shouldn't be assigning us a partition again without having revoked it. @@ -387,8 +249,9 @@ impl ConsumerContext for RebalanceContext { match consumer.split_partition_queue(&partition.0, partition.1) { Some(queue) => { - let task = topic_partition_queue_consumption_loop( - self.sender.clone(), + let task = TopicPartitionConsumptionTask::new( + self.ingestion.clone(), + self.builder.clone(), partition.clone(), queue, Arc::clone(&consumer), @@ -399,7 +262,7 @@ impl ConsumerContext for RebalanceContext { if let Ok(task_handle) = self.task_center_handle.spawn_unmanaged( TaskKind::Ingress, "kafka-partition-ingest", - task, + task.run(), ) { topic_partition_tasks.insert(partition, AbortOnDrop(task_handle)); } else { @@ -418,7 +281,14 @@ impl ConsumerContext for RebalanceContext { } Rebalance::Revoke(partitions) if partitions.count() > 0 => { for partition in partitions.elements() { - let partition = partition.into(); + let partition: TopicPartition = partition.into(); + info!( + subscription = %self.builder.subscription().id(), + topic = %partition.0, + partition = %partition.1, + "Revoked kafka partition" + ); + match topic_partition_tasks.remove(&partition) { Some(task_id) => { debug!( @@ -457,36 +327,256 @@ impl Drop for AbortOnDrop { } } -async fn topic_partition_queue_consumption_loop( - sender: MessageSender, +struct TopicPartitionConsumptionTask +where + T: TransportConnect, + C: ConsumerContext, +{ + ingestion: IngestionClient, + builder: EnvelopeBuilder, topic_partition: TopicPartition, - topic_partition_consumer: StreamPartitionQueue, - consumer: Arc, + topic_partition_consumer: StreamPartitionQueue, + consumer: Arc>, consumer_group_id: String, failed: mpsc::UnboundedSender, -) { - debug!( - restate.subscription.id = %sender.subscription.id(), - messaging.consumer.group.name = consumer_group_id, - "Starting topic '{}' partition '{}' consumption loop", - topic_partition.0, - topic_partition.1 - ); - // this future will be aborted when the partition is no longer needed, so any exit is a failure - let err = loop { - let res = topic_partition_consumer.recv().await; - let msg = match res { - Ok(msg) => msg, - Err(err) => break err.into(), +} + +impl TopicPartitionConsumptionTask +where + T: TransportConnect, + C: ConsumerContext, +{ + fn new( + ingestion: IngestionClient, + builder: EnvelopeBuilder, + topic_partition: TopicPartition, + topic_partition_consumer: StreamPartitionQueue, + consumer: Arc>, + consumer_group_id: String, + failed: mpsc::UnboundedSender, + ) -> Self { + Self { + ingestion, + builder, + topic_partition, + topic_partition_consumer, + consumer, + consumer_group_id, + failed, + } + } + + async fn run(mut self) { + // this future will be aborted when the partition is no longer needed, so any exit is a failure + let err = match self.run_inner().await { + Err(err) => err, + Ok(_) => Error::UnexpectedConsumptionTaskExited { + subscription: self.builder.subscription().id().to_string(), + topic: self.topic_partition.0, + partition: self.topic_partition.1, + }, }; - let offset = msg.offset(); - if let Err(err) = sender.send(&consumer_group_id, msg).await { - break err; + + _ = self.failed.send(err); + } + + /// query the legacy dedup information for this consumption task. + async fn legacy_dedup_offset(&self) -> Option { + if !matches!( + self.builder.subscription().sink(), + Sink::Invocation { + event_invocation_target_template: EventInvocationTargetTemplate::Service { .. } + } + ) { + // legacy dedup is only valid for services which used to + // be proxied + return None; } - if let Err(err) = consumer.store_offset(&topic_partition.0, topic_partition.1, offset) { - break err.into(); + + #[derive(Hash)] + pub struct LegacyKafkaDeduplicationId<'a> { + consumer_group: &'a str, + topic: &'a str, + partition: i32, + } + + // producer id constructed as {consumer-group}-{topic}-{kafka-partition} + let legacy_producer_id = format!( + "{}-{}-{}", + self.consumer_group_id, self.topic_partition.0, self.topic_partition.1 + ); + + let proxy_partition_key = + HashPartitioner::compute_partition_key(&LegacyKafkaDeduplicationId { + consumer_group: &self.consumer_group_id, + topic: &self.topic_partition.0, + partition: self.topic_partition.1, + }); + + RetryPolicy::exponential( + Duration::from_millis(50), + 2.0, + None, + Some(Duration::from_secs(1)), + ) + .retry_with_inspect( + || async { + let partition_id = Metadata::with_current(|m| { + m.partition_table_ref() + .find_partition_id(proxy_partition_key) + })?; + + let node_id = self + .ingestion + .partition_routing() + .get_node_by_partition(partition_id) + .with_context(|| { + format!("cannot lookup node id for partition id {partition_id}") + })?; + + // we use long timeout of 5 seconds in case partition processor is catching up + let response = self + .ingestion + .networking() + .call_rpc( + node_id, + Swimlane::General, + DedupSequenceNrQueryRequest { + producer_id: ProducerId::String(legacy_producer_id.clone()), + }, + Some(partition_id.into()), + Some(Duration::from_secs(5)), + ) + .await + .with_context(|| { + format!( + "failed to query legacy dedup \ + sequence number for producer '{legacy_producer_id}' \ + from node {node_id}" + ) + })?; + + match response.status { + ResponseStatus::Ack => Ok(response.sequence_number), + status => Err(anyhow::anyhow!( + "failed to query latest dedup \ + sequence number from node {node_id} for '{legacy_producer_id}': {status:?}" + )), + } + }, + |attempts, err| { + if attempts >= 10 { + warn!("Failed to query legacy dedup information: {err:#} .. retrying"); + } else { + debug!("Failed to query legacy dedup information: {err:#} .. retrying"); + } + }, + ) + .await + .expect("tries forever") + } + + #[instrument(skip(self), fields( + restate.subscription.id = %self.builder.subscription().id(), + topic=%self.topic_partition.0, + kafka_partition=%self.topic_partition.1, + consumer_group=%self.consumer_group_id) + )] + async fn run_inner(&mut self) -> Result<(), Error> { + debug!("Starting topic consumption loop"); + + let legacy_dedup_offset = self.legacy_dedup_offset().await; + debug!("Legacy dedup offset: {legacy_dedup_offset:?}",); + + let producer_id = dedup_producer_id( + &self.builder.subscription().id(), + &self.consumer_group_id, + &self.topic_partition.0, + self.topic_partition.1, + ); + + let ingress_request_counter = counter!( + KAFKA_INGRESS_REQUESTS, + "subscription" => self.builder.subscription().id().to_string(), + "topic" => self.topic_partition.0.to_string(), + "partition" => self.topic_partition.1.to_string(), + ); + + let mut inflight = VecDeque::new(); + + let mut consumer_stream = self.topic_partition_consumer.stream(); + loop { + tokio::select! { + biased; + Some(committed) = Self::head_committed(&mut inflight) => { + _ = inflight.pop_front().expect("to exist"); + let offset = committed.map_err(|_| Error::IngestionClosed)?; + + ingress_request_counter.increment(1); + trace!( + offset=%offset, + "Store kafka offset", + ); + + self.consumer.store_offset(&self.topic_partition.0, self.topic_partition.1, offset)?; + }, + Some(received) = consumer_stream.next() => { + let msg = received?; + let offset = msg.offset(); + if legacy_dedup_offset.is_some_and(|dedup_offset| offset as u64 <= dedup_offset) { + // skip duplicated messages. Any gap should be small. + debug!( + offset=%offset, + "Skipping kafka message (dedup)" + ); + self.consumer.store_offset(&self.topic_partition.0, self.topic_partition.1, offset)?; + continue; + } + + trace!( + offset=%offset, + "Ingesting kafka message" + ); + + let envelope = self.builder.build(producer_id, &self.consumer_group_id, msg)?; + + let commit_token = self + .ingestion + .ingest(envelope.partition_key(), envelope) + .await? + .map(|_| offset); + + inflight.push_back(commit_token); + } + } } - }; + } - _ = failed.send(err); + #[inline] + pub fn head_committed( + inflight: &mut VecDeque>, + ) -> OptionFuture<&mut RecordCommit> { + OptionFuture::from(inflight.front_mut()) + } +} + +// Do not change. Changing this hasher will create new producer-id which can +// cause duplicates +fn dedup_producer_id( + subscription: &SubscriptionId, + consumer_group: &str, + topic: &str, + partition: i32, +) -> u128 { + let mut hasher = xxhash_rust::xxh3::Xxh3::new(); + + subscription.hash(&mut hasher); + '\0'.hash(&mut hasher); + consumer_group.hash(&mut hasher); + '\0'.hash(&mut hasher); + topic.hash(&mut hasher); + '\0'.hash(&mut hasher); + partition.hash(&mut hasher); + + hasher.digest128() } diff --git a/crates/ingress-kafka/src/legacy/consumer_task.rs b/crates/ingress-kafka/src/legacy/consumer_task.rs new file mode 100644 index 0000000000..d1f0ba9af5 --- /dev/null +++ b/crates/ingress-kafka/src/legacy/consumer_task.rs @@ -0,0 +1,691 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::HashMap; +use std::fmt; +use std::hash::Hash; +use std::sync::{Arc, OnceLock, Weak}; +use std::time::Duration; + +use crate::Error; +use crate::legacy::dispatcher::{DispatchKafkaEvent, KafkaIngressDispatcher, KafkaIngressEvent}; +use crate::legacy::metric_definitions::{KAFKA_INGRESS_CONSUMER_LAG, KAFKA_INGRESS_REQUESTS}; +use anyhow::Context; +use base64::Engine; +use bytes::Bytes; +use futures::StreamExt; +use futures::stream::FuturesUnordered; +use metrics::{counter, gauge}; +use rdkafka::consumer::stream_consumer::StreamPartitionQueue; +use rdkafka::consumer::{ + BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer, +}; +use rdkafka::error::KafkaError; +use rdkafka::message::BorrowedMessage; +use rdkafka::topic_partition_list::TopicPartitionListElem; +use rdkafka::types::RDKafkaErrorCode; +use rdkafka::{ClientConfig, ClientContext, Message, Statistics}; +use restate_core::network::{ + NetworkSender, Networking, RpcError, RpcReplyError, Swimlane, TransportConnect, +}; +use restate_core::partitions::PartitionRouting; +use restate_core::{Metadata, TaskCenter, TaskHandle, TaskKind, task_center}; +use restate_types::identifiers::{PartitionId, SubscriptionId}; +use restate_types::invocation::Header; +use restate_types::live::Live; +use restate_types::message::MessageIndex; +use restate_types::net::ingest::{DedupSequenceNrQueryRequest, ProducerId, ResponseStatus}; +use restate_types::retries::RetryPolicy; +use restate_types::schema::Schema; +use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink, Subscription}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{Instrument, debug, info_span, trace, warn}; + +type MessageConsumer = StreamConsumer>; + +#[derive(Debug, Hash)] +pub struct KafkaDeduplicationId { + consumer_group: String, + topic: String, + partition: i32, +} + +impl fmt::Display for KafkaDeduplicationId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{}-{}-{}", + self.consumer_group, self.topic, self.partition + ) + } +} + +impl KafkaDeduplicationId { + pub(crate) fn requires_proxying(subscription: &Subscription) -> bool { + // Service event receiver requires proxying because we don't want to scatter deduplication ids (kafka topic/partition offsets) in all the Restate partitions. + matches!( + subscription.sink(), + Sink::Invocation { + event_invocation_target_template: EventInvocationTargetTemplate::Service { .. } + }, + ) + } +} + +#[derive(Clone)] +pub struct MessageSender { + subscription: Subscription, + dispatcher: KafkaIngressDispatcher, + schema: Live, + + subscription_id: String, + ingress_request_counter: metrics::Counter, +} + +impl MessageSender { + pub fn new( + subscription: Subscription, + dispatcher: KafkaIngressDispatcher, + schema: Live, + ) -> Self { + Self { + subscription_id: subscription.id().to_string(), + ingress_request_counter: counter!( + KAFKA_INGRESS_REQUESTS, + "subscription" => subscription.id().to_string() + ), + subscription, + dispatcher, + schema, + } + } + + async fn send(&self, consumer_group_id: &str, msg: BorrowedMessage<'_>) -> Result<(), Error> { + // Prepare ingress span + let ingress_span = info_span!( + "kafka_ingress_consume", + otel.name = "kafka_ingress_consume", + messaging.system = "kafka", + messaging.operation = "receive", + messaging.source.name = msg.topic(), + messaging.destination.name = %self.subscription.sink(), + restate.subscription.id = %self.subscription.id(), + messaging.consumer.group.name = consumer_group_id + ); + trace!(parent: &ingress_span, "Processing Kafka ingress request"); + + let key = if let Some(k) = msg.key() { + Bytes::copy_from_slice(k) + } else { + Bytes::default() + }; + let payload = if let Some(p) = msg.payload() { + Bytes::copy_from_slice(p) + } else { + Bytes::default() + }; + let headers = Self::generate_events_attributes(&msg, &self.subscription_id); + + let (deduplication_id, deduplication_index) = + Self::generate_deduplication_id(consumer_group_id, &msg); + let req = KafkaIngressEvent::new( + &self.subscription, + self.schema.pinned(), + key, + payload, + deduplication_id, + deduplication_index, + headers, + consumer_group_id, + msg.topic(), + msg.partition(), + msg.offset(), + ) + .map_err(|cause| Error::Event { + subscription: self.subscription_id.clone(), + topic: msg.topic().to_string(), + partition: msg.partition(), + offset: msg.offset(), + cause, + })?; + + self.ingress_request_counter.increment(1); + + self.dispatcher + .dispatch_kafka_event(req) + .instrument(ingress_span) + .await + .map_err(|_| Error::IngestionClosed)?; + Ok(()) + } + + fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec
{ + let mut headers = Vec::with_capacity(6); + headers.push(Header::new("kafka.offset", msg.offset().to_string())); + headers.push(Header::new("kafka.topic", msg.topic())); + headers.push(Header::new("kafka.partition", msg.partition().to_string())); + if let Some(timestamp) = msg.timestamp().to_millis() { + headers.push(Header::new("kafka.timestamp", timestamp.to_string())); + } + headers.push(Header::new("restate.subscription.id", subscription_id)); + + if let Some(key) = msg.key() { + headers.push(Header::new( + "kafka.key", + &*base64::prelude::BASE64_URL_SAFE.encode(key), + )); + } + + headers + } + + fn generate_deduplication_id( + consumer_group: &str, + msg: &impl Message, + ) -> (KafkaDeduplicationId, MessageIndex) { + ( + KafkaDeduplicationId { + consumer_group: consumer_group.to_owned(), + topic: msg.topic().to_owned(), + partition: msg.partition(), + }, + msg.offset() as u64, + ) + } + + fn update_consumer_stats(&self, stats: Statistics) { + for topic in stats.topics { + for partition in topic.1.partitions { + let lag = partition.1.consumer_lag as f64; + gauge!( + KAFKA_INGRESS_CONSUMER_LAG, + "subscription" => self.subscription.id().to_string(), + "topic" => topic.0.to_string(), + "partition" => partition.0.to_string() + ) + .set(lag); + } + } + } +} + +#[derive(Clone)] +pub struct ConsumerTask { + networking: Networking, + partition_routing: PartitionRouting, + client_config: ClientConfig, + topics: Vec, + sender: MessageSender, +} + +impl ConsumerTask +where + T: TransportConnect, +{ + pub fn new( + networking: Networking, + partition_routing: PartitionRouting, + client_config: ClientConfig, + topics: Vec, + sender: MessageSender, + ) -> Self { + Self { + networking, + partition_routing, + client_config, + topics, + sender, + } + } + + pub async fn run(self, mut rx: oneshot::Receiver<()>) -> Result<(), Error> { + // Create the consumer and subscribe to the topic + let consumer_group_id = self + .client_config + .get("group.id") + .expect("group.id must be set") + .to_string(); + debug!( + restate.subscription.id = %self.sender.subscription.id(), + messaging.consumer.group.name = consumer_group_id, + "Starting consumer for topics {:?} with configuration {:?}", + self.topics, self.client_config + ); + + let (failures_tx, failures_rx) = mpsc::unbounded_channel(); + + let rebalance_context = RebalanceContext { + networking: self.networking.clone(), + partition_routing: self.partition_routing.clone(), + task_center_handle: TaskCenter::current(), + consumer: OnceLock::new(), + topic_partition_tasks: parking_lot::Mutex::new(HashMap::new()), + failures_tx, + sender: self.sender.clone(), + consumer_group_id, + }; + let consumer: Arc> = + Arc::new(self.client_config.create_with_context(rebalance_context)?); + // this OnceLock dance is needed because the rebalance callbacks don't get a handle on the consumer, + // which is strange because practically everything you'd want to do with them involves the consumer. + _ = consumer.context().consumer.set(Arc::downgrade(&consumer)); + + // ensure partitioned tasks are cancelled when this function exits/stops being polled + let consumer = ConsumerDrop(consumer); + + let topics: Vec<&str> = self.topics.iter().map(|x| &**x).collect(); + consumer.subscribe(&topics)?; + + let mut failures_rx = std::pin::pin!(failures_rx); + + tokio::select! { + // we have to poll the main consumer for callbacks to be processed, but we expect to only see messages on the partitioned queues + res = consumer.recv() => { + match res { + // We shouldn't see any messages on the main consumer loop, because we split the queues into partitioned queues before they + // are ever assigned. Messages here should be treated as a bug in our assumptions. + Ok(msg) => Err(Error::UnexpectedMainQueueMessage(msg.topic().into(), msg.partition())), + Err(e) => Err(e.into()), + } + } + // watch for errors in the partitioned consumers - they should only ever abort, not return errors + Some(err) = failures_rx.recv() => { + Err(err) + } + _ = &mut rx => { + Ok(()) + } + } + } +} + +#[derive(derive_more::Deref)] +struct ConsumerDrop(Arc>) +where + T: TransportConnect; + +impl Drop for ConsumerDrop +where + T: TransportConnect, +{ + fn drop(&mut self) { + debug!( + "Stopping consumer with id {}", + self.context().consumer_group_id + ); + + // we have to clear this because the partitioned tasks themselves hold a reference to MessageConsumer + self.context().topic_partition_tasks.lock().clear(); + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +struct TopicPartition(String, i32); + +impl<'a> From> for TopicPartition { + fn from(value: TopicPartitionListElem<'a>) -> Self { + Self(value.topic().into(), value.partition()) + } +} + +impl fmt::Display for TopicPartition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.0, self.1) + } +} + +struct RebalanceContext +where + T: TransportConnect, +{ + networking: Networking, + partition_routing: PartitionRouting, + task_center_handle: task_center::Handle, + consumer: OnceLock>>, + topic_partition_tasks: parking_lot::Mutex>, + failures_tx: mpsc::UnboundedSender, + sender: MessageSender, + consumer_group_id: String, +} + +impl ClientContext for RebalanceContext +where + T: TransportConnect, +{ + fn stats(&self, statistics: Statistics) { + self.sender.update_consumer_stats(statistics); + } +} + +// This callback is called synchronously with the poll of the main queue, so we don't want to block here. +// Once the pre balance steps finish assign() will be called. If we have not split at this point, +// then queues will be created defaulting to forward to the main loop - which we don't want. +// However, if we have split the partition before assign is called, the queue will be created +// with a flag RD_KAFKA_Q_F_FWD_APP and this flag will ensure that the queue will not be sent to the +// main loop. Therefore its critical that the splits happen synchronously before the pre_rebalance ends. +// +// On non-cooperative rebalance during assign all the existing partitions are revoked, +// and their queues are destroyed. Split partition queues will stop working in this case. We should ensure +// that they are not polled again after the assign. Then there will be a further rebalance callback after the revoke +// and we will set up new split partition streams before the assign. +impl ConsumerContext for RebalanceContext +where + T: TransportConnect, +{ + fn pre_rebalance(&self, _base_consumer: &BaseConsumer, rebalance: &Rebalance<'_>) { + let mut topic_partition_tasks = self.topic_partition_tasks.lock(); + let consumer = self + .consumer + .get() + .expect("consumer must have been set in context at rebalance time"); + + let Some(consumer) = consumer.upgrade() else { + // if the consumer has been dropped, we don't need to maintain tasks any more + return; + }; + + match rebalance { + Rebalance::Assign(partitions) if partitions.count() > 0 => { + for partition in partitions.elements() { + let partition: TopicPartition = partition.into(); + + if let Some(task_id) = topic_partition_tasks.remove(&partition) { + // This probably implies a problem in our assumptions, because librdkafka shouldn't be assigning us a partition again without having revoked it. + // However its fair to assume that the existing partitioned consumer is now invalid. + warn!( + "Kafka informed us of an assigned partition {partition} which we already consider assigned, cancelling the existing partitioned consumer" + ); + drop(task_id); + } + + match consumer.split_partition_queue(&partition.0, partition.1) { + Some(queue) => { + let task = topic_partition_queue_consumption_loop( + self.networking.clone(), + self.partition_routing.clone(), + self.sender.clone(), + partition.clone(), + queue, + Arc::clone(&consumer), + self.consumer_group_id.clone(), + self.failures_tx.clone(), + ); + + if let Ok(task_handle) = self.task_center_handle.spawn_unmanaged( + TaskKind::Ingress, + "kafka-partition-ingest", + task, + ) { + topic_partition_tasks.insert(partition, AbortOnDrop(task_handle)); + } else { + // shutting down + return; + } + } + None => { + warn!( + "Invalid partition {partition} given to us in rebalance, ignoring it" + ); + continue; + } + } + } + } + Rebalance::Revoke(partitions) if partitions.count() > 0 => { + for partition in partitions.elements() { + let partition = partition.into(); + match topic_partition_tasks.remove(&partition) { + Some(task_id) => { + debug!( + "Stopping partitioned consumer for partition {partition} due to rebalance" + ); + // The partitioned queue will not be polled again. + // It might be mid-poll right now, but if so its result will not be sent anywhere. + drop(task_id); + } + None => warn!( + "Kafka informed us of a revoked partition {partition} which we had no consumer task for" + ), + } + } + + match consumer.commit_consumer_state(CommitMode::Async) { + Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => { + // Success + } + Err(error) => warn!("Failed to commit the current consumer state: {error}"), + } + } + // called with empty partitions; important to not call .elements() as this panics apparently. + // unclear why we are called with no partitions + Rebalance::Assign(_) | Rebalance::Revoke(_) => {} + Rebalance::Error(_) => {} + } + } +} + +struct AbortOnDrop(TaskHandle<()>); + +impl Drop for AbortOnDrop { + fn drop(&mut self) { + self.0.abort(); + } +} + +#[allow(clippy::too_many_arguments)] +async fn topic_partition_queue_consumption_loop( + networking: Networking, + partition_routing: PartitionRouting, + sender: MessageSender, + topic_partition: TopicPartition, + topic_partition_consumer: StreamPartitionQueue, + consumer: Arc>, + consumer_group_id: String, + failed: mpsc::UnboundedSender, +) { + let producer_id = new_style_dedup_producer_id( + sender.subscription.id(), + &consumer_group_id, + &topic_partition.0, + topic_partition.1, + ); + + let dedup_offset = query_dedup_offset(networking, partition_routing, producer_id).await; + + debug!( + topic=%topic_partition.0, + kafka_partition=%topic_partition.1, + consumer_group=%consumer_group_id, + "Forward compatibility dedup offset {dedup_offset:?}" + ); + + debug!( + restate.subscription.id = %sender.subscription.id(), + messaging.consumer.group.name = consumer_group_id, + "Starting topic '{}' partition '{}' consumption loop", + topic_partition.0, + topic_partition.1 + ); + // this future will be aborted when the partition is no longer needed, so any exit is a failure + let err = loop { + let res = topic_partition_consumer.recv().await; + let msg = match res { + Ok(msg) => msg, + Err(err) => break err.into(), + }; + let offset = msg.offset(); + + if dedup_offset.is_some_and(|dedup_offset| offset as u64 <= dedup_offset) { + debug!( + topic=%topic_partition.0, + kafka_partition=%topic_partition.1, + offset=%offset, + consumer_group=%consumer_group_id, + "skipping kafka message (dedup)" + ); + + if let Err(err) = consumer.store_offset(&topic_partition.0, topic_partition.1, offset) { + break err.into(); + } + + continue; + } + + if let Err(err) = sender.send(&consumer_group_id, msg).await { + break err; + } + + if let Err(err) = consumer.store_offset(&topic_partition.0, topic_partition.1, offset) { + break err.into(); + } + }; + + _ = failed.send(err); +} + +fn new_style_dedup_producer_id( + subscription: SubscriptionId, + consumer_group: &str, + topic: &str, + partition: i32, +) -> u128 { + let mut hasher = xxhash_rust::xxh3::Xxh3::new(); + + subscription.hash(&mut hasher); + '\0'.hash(&mut hasher); + consumer_group.hash(&mut hasher); + '\0'.hash(&mut hasher); + topic.hash(&mut hasher); + '\0'.hash(&mut hasher); + partition.hash(&mut hasher); + + hasher.digest128() +} + +async fn query_dedup_offset( + networking: Networking, + partition_routing: PartitionRouting, + producer_id: u128, +) -> Option { + // now we need to scatter-gather this producer id to all + // partitions. Then take the 'max' as the last processed + // message offset. + + let partition_table = Metadata::with_current(|m| m.partition_table_snapshot()); + let mut fut = FuturesUnordered::new(); + + for partition_id in partition_table.iter_ids() { + fut.push(query_partition_dedup_offset( + networking.clone(), + partition_routing.clone(), + producer_id, + *partition_id, + None, + )); + } + + let retry_policy = RetryPolicy::exponential( + Duration::from_millis(100), + 2.0, + None, + Some(Duration::from_secs(1)), + ); + + let mut backoff = HashMap::new(); + + let mut max_offset = None; + while let Some((partition_id, result)) = fut.next().await { + let (err_count, retry_iter) = backoff + .entry(partition_id) + .or_insert_with(|| (0, retry_policy.clone().into_iter())); + + let offset = match result { + Ok(offset) => offset, + Err(err) => { + *err_count += 1; + if *err_count >= 10 { + warn!( + "Error while looking up latest dedup info for {partition_id}: {err} .. retrying" + ); + } else { + debug!( + "Error while looking up latest dedup info for {partition_id}: {err} .. retrying" + ); + } + + fut.push(query_partition_dedup_offset( + networking.clone(), + partition_routing.clone(), + producer_id, + partition_id, + retry_iter.next(), + )); + + continue; + } + }; + + max_offset = max_offset.max(offset) + } + + max_offset +} + +async fn query_partition_dedup_offset( + networking: Networking, + partition_routing: PartitionRouting, + producer_id: u128, + partition_id: PartitionId, + delay: Option, +) -> (PartitionId, anyhow::Result>) { + if let Some(delay) = delay { + tokio::time::sleep(delay).await; + } + + let result = async { + let node_id = partition_routing + .get_node_by_partition(partition_id) + .with_context(|| format!("cannot find node for partition {partition_id}"))?; + + let result = networking + .call_rpc( + node_id, + Swimlane::General, + DedupSequenceNrQueryRequest { + producer_id: ProducerId::Numeric(producer_id), + }, + Some(partition_id.into()), + None, + ) + .await; + + let result = match result { + Ok(result) => result, + Err(RpcError::Receive(RpcReplyError::MessageUnrecognized)) => { + // MessageUnrecognized indicates a mixed v1.5/v1.6 deployment where some nodes lack the query endpoint. + // Returning None is fine because legacy ingestion remains enabled by default. + // Once the cluster finishes rolling to v1.6 or higher, the endpoint should exist and this branch should not trigger. + return Ok(None); + } + Err(err) => anyhow::bail!(err), + }; + + match result.status { + ResponseStatus::Ack => Ok(result.sequence_number), + status => anyhow::bail!( + "failed to query latest dedup sequence number for partition \ + {partition_id} and producer '{producer_id}' on node {node_id}: {status:?}" + ), + } + } + .await; + + (partition_id, result) +} diff --git a/crates/ingress-kafka/src/dispatcher.rs b/crates/ingress-kafka/src/legacy/dispatcher.rs similarity index 99% rename from crates/ingress-kafka/src/dispatcher.rs rename to crates/ingress-kafka/src/legacy/dispatcher.rs index 558c23b448..3858c40cbf 100644 --- a/crates/ingress-kafka/src/dispatcher.rs +++ b/crates/ingress-kafka/src/legacy/dispatcher.rs @@ -15,7 +15,7 @@ use opentelemetry::trace::{Span, SpanContext, TraceContextExt}; use opentelemetry_sdk::propagation::TraceContextPropagator; use std::borrow::Borrow; use std::sync::Arc; -use tracing::debug; +use tracing::trace; use restate_bifrost::Bifrost; use restate_storage_api::deduplication_table::DedupInformation; @@ -29,7 +29,7 @@ use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTarge use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink, Subscription}; use restate_wal_protocol::{Command, Destination, Envelope, Header, Source}; -use crate::consumer_task::KafkaDeduplicationId; +use crate::legacy::consumer_task::KafkaDeduplicationId; #[derive(Debug)] pub struct KafkaIngressEvent { @@ -202,7 +202,7 @@ impl DispatchKafkaEvent for KafkaIngressDispatcher { let (log_id, lsn) = restate_bifrost::append_to_bifrost(&self.bifrost, Arc::new(envelope)).await?; - debug!( + trace!( log_id = %log_id, lsn = %lsn, "Ingress request written to bifrost" diff --git a/crates/ingress-kafka/src/legacy/metric_definitions.rs b/crates/ingress-kafka/src/legacy/metric_definitions.rs new file mode 100644 index 0000000000..0ce679fe71 --- /dev/null +++ b/crates/ingress-kafka/src/legacy/metric_definitions.rs @@ -0,0 +1,27 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use metrics::{Unit, describe_counter, describe_gauge}; + +pub const KAFKA_INGRESS_REQUESTS: &str = "restate.kafka_ingress.requests.total"; +pub const KAFKA_INGRESS_CONSUMER_LAG: &str = "restate.kafka_ingress.consumer.lag"; + +pub(crate) fn describe_metrics() { + describe_counter!( + KAFKA_INGRESS_REQUESTS, + Unit::Count, + "Number of Kafka ingress requests" + ); + describe_gauge!( + KAFKA_INGRESS_CONSUMER_LAG, + Unit::Count, + "Kafka Consumer Lag per partition" + ); +} diff --git a/crates/ingress-kafka/src/legacy/mod.rs b/crates/ingress-kafka/src/legacy/mod.rs new file mode 100644 index 0000000000..126d9ca798 --- /dev/null +++ b/crates/ingress-kafka/src/legacy/mod.rs @@ -0,0 +1,16 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod consumer_task; +mod dispatcher; +mod metric_definitions; +mod subscription_controller; + +pub use subscription_controller::Service; diff --git a/crates/ingress-kafka/src/legacy/subscription_controller.rs b/crates/ingress-kafka/src/legacy/subscription_controller.rs new file mode 100644 index 0000000000..3654710a6c --- /dev/null +++ b/crates/ingress-kafka/src/legacy/subscription_controller.rs @@ -0,0 +1,416 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use anyhow::Context; +use restate_core::network::{Networking, TransportConnect}; +use restate_core::partitions::PartitionRouting; +use std::collections::HashSet; + +use restate_bifrost::Bifrost; +use restate_core::cancellation_watcher; +use restate_types::config::IngressOptions; +use restate_types::identifiers::SubscriptionId; +use restate_types::live::{Live, LiveLoad}; +use restate_types::retries::RetryPolicy; +use restate_types::schema::Schema; +use restate_types::schema::subscriptions::{Source, Subscription}; +use std::time::Duration; +use tokio::sync::mpsc; +use tracing::warn; + +use crate::legacy::consumer_task::{self, MessageSender}; +use crate::legacy::dispatcher::KafkaIngressDispatcher; +use crate::legacy::metric_definitions; +use crate::legacy::subscription_controller::task_orchestrator::TaskOrchestrator; +use crate::{Command, SubscriptionCommandReceiver, SubscriptionCommandSender}; + +// For simplicity of the current implementation, this currently lives in this module +// In future versions, we should either pull this out in a separate process, or generify it and move it to the worker, or an ad-hoc module +pub struct Service { + networking: Networking, + partition_routing: PartitionRouting, + dispatcher: KafkaIngressDispatcher, + schema: Live, + + commands_tx: SubscriptionCommandSender, + commands_rx: SubscriptionCommandReceiver, +} + +impl Service +where + T: TransportConnect, +{ + pub fn new( + networking: Networking, + partition_routing: PartitionRouting, + bifrost: Bifrost, + schema: Live, + ) -> Self { + metric_definitions::describe_metrics(); + let (commands_tx, commands_rx) = mpsc::channel(10); + + Service { + networking, + partition_routing, + dispatcher: KafkaIngressDispatcher::new(bifrost), + schema, + commands_tx, + commands_rx, + } + } + + pub fn create_command_sender(&self) -> SubscriptionCommandSender { + self.commands_tx.clone() + } + + pub async fn run( + mut self, + mut updateable_config: impl LiveLoad, + ) -> anyhow::Result<()> { + let shutdown = cancellation_watcher(); + tokio::pin!(shutdown); + + let mut task_orchestrator = TaskOrchestrator::new(RetryPolicy::exponential( + Duration::from_millis(200), + 2.0, + None, + Some(Duration::from_secs(10)), + )); + + // NOTE: Configuration is pinned to a certain snapshot until we support adding/removing + // subscriptions dynamically from config + let options = &updateable_config.live_load(); + + loop { + tokio::select! { + Some(cmd) = self.commands_rx.recv() => { + match cmd { + Command::StartSubscription(sub) => if let Err(e) = self.handle_start_subscription(options, sub, &mut task_orchestrator) { + warn!("Error when starting a subscription: {e:?}"); + break; + }, + Command::StopSubscription(sub_id) => self.handle_stop_subscription(sub_id, &mut task_orchestrator), + Command::UpdateSubscriptions(subscriptions) => if let Err(e) = self.handle_update_subscriptions(options, subscriptions, &mut task_orchestrator) { + warn!("Error when updating subscriptions: {e:?}"); + break; + }, + } + } + _ = task_orchestrator.poll(), if !task_orchestrator.is_empty() => {}, + _ = &mut shutdown => { + break; + } + } + } + + // Wait for consumers to shutdown + task_orchestrator.shutdown().await; + Ok(()) + } + + fn handle_start_subscription( + &mut self, + options: &IngressOptions, + subscription: Subscription, + task_orchestrator: &mut TaskOrchestrator, + ) -> anyhow::Result<()> { + let mut client_config = rdkafka::ClientConfig::new(); + // enabling probing for the ca certificates if the user does not specify anything else + client_config.set("https.ca.location", "probe"); + + let Source::Kafka { cluster, topic, .. } = subscription.source(); + + // Copy cluster options and subscription metadata into client_config + let cluster_options = options + .get_kafka_cluster(cluster) + .with_context(|| format!("KafkaOptions is expected to contain the cluster '{}'. This might happen if you registered a subscription with a cluster name, but this cluster is not available anymore in the configuration. Configured Kafka clusters: {:?}", cluster, options.available_kafka_clusters()))?; + + client_config.set("metadata.broker.list", cluster_options.brokers.join(",")); + for (k, v) in cluster_options.additional_options.clone() { + client_config.set(k, v); + } + for (k, v) in subscription.metadata() { + client_config.set(k, v); + } + + // Options required by the business logic of our consumer, + // see ConsumerTask::run + client_config.set("enable.auto.commit", "true"); + client_config.set("enable.auto.offset.store", "false"); + + let subscription_id = subscription.id(); + + // Create the consumer task + let consumer_task = consumer_task::ConsumerTask::new( + self.networking.clone(), + self.partition_routing.clone(), + client_config, + vec![topic.to_string()], + MessageSender::new(subscription, self.dispatcher.clone(), self.schema.clone()), + ); + + task_orchestrator.start(subscription_id, consumer_task); + + Ok(()) + } + + fn handle_stop_subscription( + &mut self, + subscription_id: SubscriptionId, + task_orchestrator: &mut TaskOrchestrator, + ) { + task_orchestrator.stop(subscription_id); + } + + fn handle_update_subscriptions( + &mut self, + options: &IngressOptions, + subscriptions: Vec, + task_orchestrator: &mut TaskOrchestrator, + ) -> anyhow::Result<()> { + let mut running_subscriptions: HashSet<_> = + task_orchestrator.running_subscriptions().cloned().collect(); + + for subscription in subscriptions { + if !running_subscriptions.contains(&subscription.id()) { + self.handle_start_subscription(options, subscription, task_orchestrator)?; + } else { + running_subscriptions.remove(&subscription.id()); + } + } + + for subscription_id in running_subscriptions { + self.handle_stop_subscription(subscription_id, task_orchestrator); + } + Ok(()) + } +} + +mod task_orchestrator { + use crate::Error; + use crate::legacy::consumer_task; + use restate_core::network::TransportConnect; + use restate_core::{TaskCenterFutureExt, TaskKind}; + use restate_timer_queue::TimerQueue; + use restate_types::identifiers::SubscriptionId; + use restate_types::retries::{RetryIter, RetryPolicy}; + use std::collections::HashMap; + use std::time::SystemTime; + use tokio::sync::oneshot; + use tokio::task; + use tokio::task::{JoinError, JoinSet}; + use tracing::{debug, warn}; + + struct TaskState { + // We use this to restart the consumer task in case of a failure + consumer_task_clone: consumer_task::ConsumerTask, + task_state_inner: TaskStateInner, + retry_iter: RetryIter<'static>, + } + + enum TaskStateInner { + Running { + task_id: task::Id, + _close_ch: oneshot::Sender<()>, + }, + WaitingRetryTimer, + } + + pub(super) struct TaskOrchestrator { + retry_policy: RetryPolicy, + running_tasks_to_subscriptions: HashMap, + subscription_id_to_task_state: HashMap>, + tasks: JoinSet>, + timer_queue: TimerQueue, + } + + impl TaskOrchestrator + where + T: TransportConnect, + { + pub(super) fn new(retry_policy: RetryPolicy) -> Self { + Self { + retry_policy, + running_tasks_to_subscriptions: HashMap::default(), + subscription_id_to_task_state: HashMap::default(), + tasks: JoinSet::default(), + timer_queue: TimerQueue::default(), + } + } + + pub(super) async fn poll(&mut self) { + tokio::select! { + Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => { + self.handle_task_closed(res); + }, + timer = self.timer_queue.await_timer(), if !self.timer_queue.is_empty() => { + self.handle_timer_fired(timer.into_inner()); + } + } + } + + pub(super) fn is_empty(&self) -> bool { + self.tasks.is_empty() && self.timer_queue.is_empty() + } + + fn handle_task_closed(&mut self, result: Result<(task::Id, Result<(), Error>), JoinError>) { + let task_id = match result { + Ok((id, _)) => id, + Err(ref err) => err.id(), + }; + + let subscription_id = if let Some(subscription_id) = + self.running_tasks_to_subscriptions.remove(&task_id) + { + subscription_id + } else { + match result { + Ok((_, Ok(_))) => {} // the normal case; a removed subscription should exit cleanly + Ok((_, Err(e))) => { + warn!( + "Consumer task for removed subscription unexpectedly returned error: {e}" + ); + } + Err(e) => { + warn!("Consumer task for removed subscription unexpectedly panicked: {e}"); + } + } + // no need to retry a subscription we don't care about any more + return; + }; + + match result { + Ok((_, Ok(_))) => { + warn!("Consumer task for subscription {subscription_id} unexpectedly closed"); + } + Ok((_, Err(e))) => { + warn!( + "Consumer task for subscription {subscription_id} unexpectedly returned error: {e}" + ); + } + Err(e) => { + warn!( + "Consumer task for subscription {subscription_id} unexpectedly panicked: {e}" + ); + } + }; + + let task_state = self + .subscription_id_to_task_state + .get_mut(&subscription_id) + .expect("There must be a task state to start the retry timer"); + task_state.task_state_inner = TaskStateInner::WaitingRetryTimer; + if let Some(next_timer) = task_state.retry_iter.next() { + self.timer_queue + .sleep_until(SystemTime::now() + next_timer, subscription_id); + } else { + warn!( + "Not going to retry consumer task for subscription {subscription_id} because retry limit exhausted." + ); + self.subscription_id_to_task_state.remove(&subscription_id); + } + } + + fn handle_timer_fired(&mut self, subscription_id: SubscriptionId) { + match self.subscription_id_to_task_state.get(&subscription_id) { + Some(TaskState { + task_state_inner: TaskStateInner::Running { .. }, + .. + }) => { + // Timer fired for a subscription task that is already running + return; + } + None => { + // Timer fired for a subscription that was removed + return; + } + _ => {} + }; + + let TaskState { + consumer_task_clone, + .. + } = self + .subscription_id_to_task_state + .remove(&subscription_id) + .expect("Checked in the previous match statement"); + self.start(subscription_id, consumer_task_clone); + } + + pub(super) fn start( + &mut self, + subscription_id: SubscriptionId, + consumer_task_clone: consumer_task::ConsumerTask, + ) { + // Shutdown old task, if any + if let Some(task_state) = self.subscription_id_to_task_state.remove(&subscription_id) { + // Shutdown the old task + if let TaskStateInner::Running { task_id, .. } = task_state.task_state_inner { + self.running_tasks_to_subscriptions.remove(&task_id); + } + } + + // Prepare shutdown channel + let (tx, rx) = oneshot::channel(); + + debug!( + "Spawning the consumer task for subscription id {}", + subscription_id + ); + let task_id = self + .tasks + .build_task() + .name("kafka-consumer") + .spawn({ + let consumer_task_clone = consumer_task_clone.clone(); + consumer_task_clone + .run(rx) + .in_current_tc_as_task(TaskKind::Kafka, "kafka-consumer-task") + }) + .expect("to spawn kafka consumer task") + .id(); + + self.running_tasks_to_subscriptions + .insert(task_id, subscription_id); + self.subscription_id_to_task_state.insert( + subscription_id, + TaskState { + consumer_task_clone: consumer_task_clone.clone(), + task_state_inner: TaskStateInner::Running { + task_id, + _close_ch: tx, + }, + retry_iter: self.retry_policy.clone().into_iter(), + }, + ); + } + + pub(super) fn stop(&mut self, subscription_id: SubscriptionId) { + if let Some(TaskState { + task_state_inner: TaskStateInner::Running { task_id, .. }, + .. + }) = self.subscription_id_to_task_state.remove(&subscription_id) + { + self.running_tasks_to_subscriptions.remove(&task_id); + } + } + + pub(super) async fn shutdown(&mut self) { + self.subscription_id_to_task_state.clear(); + // This will close all the channels + self.running_tasks_to_subscriptions.clear(); + self.tasks.shutdown().await; + } + + pub(super) fn running_subscriptions(&self) -> impl Iterator { + self.subscription_id_to_task_state.keys() + } + } +} diff --git a/crates/ingress-kafka/src/lib.rs b/crates/ingress-kafka/src/lib.rs index 9709e51bd0..02d88cf50e 100644 --- a/crates/ingress-kafka/src/lib.rs +++ b/crates/ingress-kafka/src/lib.rs @@ -8,14 +8,123 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +mod builder; mod consumer_task; -mod dispatcher; +mod legacy; mod metric_definitions; mod subscription_controller; +use rdkafka::error::KafkaError; use tokio::sync::mpsc; -pub use subscription_controller::{Command, Error, Service}; +use restate_bifrost::Bifrost; +use restate_core::network::TransportConnect; +use restate_ingestion_client::IngestionClient; +use restate_types::{ + config::{Configuration, IngressOptions}, + identifiers::SubscriptionId, + live::{Live, LiveLoad}, + partitions::PartitionTableError, + schema::{Schema, subscriptions::Subscription}, +}; +use restate_wal_protocol::Envelope; +use tracing::info; + +#[derive(Debug)] +pub enum Command { + StartSubscription(Subscription), + StopSubscription(SubscriptionId), + UpdateSubscriptions(Vec), +} pub type SubscriptionCommandSender = mpsc::Sender; pub type SubscriptionCommandReceiver = mpsc::Receiver; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + Kafka(#[from] KafkaError), + #[error( + "error processing message subscription {subscription} topic {topic} partition {partition} offset {offset}: {cause}" + )] + Event { + subscription: String, + topic: String, + partition: i32, + offset: i64, + #[source] + cause: anyhow::Error, + }, + #[error("ingress stream is closed")] + IngestionClosed, + #[error(transparent)] + PartitionTableError(#[from] PartitionTableError), + #[error( + "received a message on the main partition queue for topic {0} partition {1} despite partitioned queues" + )] + UnexpectedMainQueueMessage(String, i32), + #[error( + "consumption task exited unexpectedly for subscription '{subscription}', topic: {topic} and partition: {partition}" + )] + UnexpectedConsumptionTaskExited { + subscription: String, + topic: String, + partition: i32, + }, +} + +enum ServiceInner { + Legacy(legacy::Service), + IngestionClient(subscription_controller::Service), +} + +pub struct Service { + inner: ServiceInner, +} + +impl Service +where + T: TransportConnect, +{ + pub fn new( + bifrost: Bifrost, + ingestion: IngestionClient, + schema: Live, + ) -> Self { + let batch_ingestion = Configuration::pinned() + .common + .experimental_kafka_batch_ingestion; + + let inner = if batch_ingestion { + info!("Using kafka experimental batch ingestion mechanism"); + ServiceInner::IngestionClient(subscription_controller::Service::new(ingestion, schema)) + } else { + info!("Using kafka legacy ingestion mechanism"); + ServiceInner::Legacy(legacy::Service::new( + ingestion.networking().clone(), + ingestion.partition_routing().clone(), + bifrost, + schema, + )) + }; + + Self { inner } + } + + pub fn create_command_sender(&self) -> SubscriptionCommandSender { + match &self.inner { + ServiceInner::Legacy(svc) => svc.create_command_sender(), + ServiceInner::IngestionClient(svc) => svc.create_command_sender(), + } + } + + pub async fn run( + self, + updateable_config: impl LiveLoad, + ) -> anyhow::Result<()> { + match self.inner { + ServiceInner::Legacy(svc) => svc.run(updateable_config).await, + ServiceInner::IngestionClient(svc) => svc.run(updateable_config).await, + } + } +} diff --git a/crates/ingress-kafka/src/subscription_controller.rs b/crates/ingress-kafka/src/subscription_controller.rs index 0b6a3023b9..9fc6b4ab80 100644 --- a/crates/ingress-kafka/src/subscription_controller.rs +++ b/crates/ingress-kafka/src/subscription_controller.rs @@ -8,56 +8,48 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::consumer_task::MessageSender; -use super::*; use std::collections::HashSet; +use std::time::Duration; -use crate::dispatcher::KafkaIngressDispatcher; -use crate::subscription_controller::task_orchestrator::TaskOrchestrator; use anyhow::Context; -use rdkafka::error::KafkaError; -use restate_bifrost::Bifrost; +use restate_wal_protocol::Envelope; +use tokio::sync::mpsc; +use tracing::warn; + use restate_core::cancellation_watcher; +use restate_core::network::TransportConnect; +use restate_ingestion_client::IngestionClient; use restate_types::config::IngressOptions; use restate_types::identifiers::SubscriptionId; use restate_types::live::{Live, LiveLoad}; use restate_types::retries::RetryPolicy; use restate_types::schema::Schema; use restate_types::schema::subscriptions::{Source, Subscription}; -use std::time::Duration; -use tokio::sync::mpsc; -use tracing::warn; - -#[derive(Debug)] -pub enum Command { - StartSubscription(Subscription), - StopSubscription(SubscriptionId), - UpdateSubscriptions(Vec), -} -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error(transparent)] - Kafka(#[from] KafkaError), -} +use super::*; +use crate::builder::EnvelopeBuilder; +use crate::subscription_controller::task_orchestrator::TaskOrchestrator; // For simplicity of the current implementation, this currently lives in this module // In future versions, we should either pull this out in a separate process, or generify it and move it to the worker, or an ad-hoc module -pub struct Service { - dispatcher: KafkaIngressDispatcher, +pub struct Service { + ingestion: IngestionClient, schema: Live, commands_tx: SubscriptionCommandSender, commands_rx: SubscriptionCommandReceiver, } -impl Service { - pub fn new(bifrost: Bifrost, schema: Live) -> Service { +impl Service +where + T: TransportConnect, +{ + pub fn new(ingestion: IngestionClient, schema: Live) -> Self { metric_definitions::describe_metrics(); let (commands_tx, commands_rx) = mpsc::channel(10); Service { - dispatcher: KafkaIngressDispatcher::new(bifrost), + ingestion, schema, commands_tx, commands_rx, @@ -117,7 +109,7 @@ impl Service { &mut self, options: &IngressOptions, subscription: Subscription, - task_orchestrator: &mut TaskOrchestrator, + task_orchestrator: &mut TaskOrchestrator, ) -> anyhow::Result<()> { let mut client_config = rdkafka::ClientConfig::new(); // enabling probing for the ca certificates if the user does not specify anything else @@ -149,7 +141,8 @@ impl Service { let consumer_task = consumer_task::ConsumerTask::new( client_config, vec![topic.to_string()], - MessageSender::new(subscription, self.dispatcher.clone(), self.schema.clone()), + self.ingestion.clone(), + EnvelopeBuilder::new(subscription, self.schema.clone()), ); task_orchestrator.start(subscription_id, consumer_task); @@ -160,7 +153,7 @@ impl Service { fn handle_stop_subscription( &mut self, subscription_id: SubscriptionId, - task_orchestrator: &mut TaskOrchestrator, + task_orchestrator: &mut TaskOrchestrator, ) { task_orchestrator.stop(subscription_id); } @@ -169,7 +162,7 @@ impl Service { &mut self, options: &IngressOptions, subscriptions: Vec, - task_orchestrator: &mut TaskOrchestrator, + task_orchestrator: &mut TaskOrchestrator, ) -> anyhow::Result<()> { let mut running_subscriptions: HashSet<_> = task_orchestrator.running_subscriptions().cloned().collect(); @@ -191,6 +184,7 @@ impl Service { mod task_orchestrator { use crate::consumer_task; + use restate_core::network::TransportConnect; use restate_core::{TaskCenterFutureExt, TaskKind}; use restate_timer_queue::TimerQueue; use restate_types::identifiers::SubscriptionId; @@ -202,9 +196,9 @@ mod task_orchestrator { use tokio::task::{JoinError, JoinSet}; use tracing::{debug, warn}; - struct TaskState { + struct TaskState { // We use this to restart the consumer task in case of a failure - consumer_task_clone: consumer_task::ConsumerTask, + consumer_task_clone: consumer_task::ConsumerTask, task_state_inner: TaskStateInner, retry_iter: RetryIter<'static>, } @@ -217,15 +211,18 @@ mod task_orchestrator { WaitingRetryTimer, } - pub(super) struct TaskOrchestrator { + pub(super) struct TaskOrchestrator { retry_policy: RetryPolicy, running_tasks_to_subscriptions: HashMap, - subscription_id_to_task_state: HashMap, - tasks: JoinSet>, + subscription_id_to_task_state: HashMap>, + tasks: JoinSet>, timer_queue: TimerQueue, } - impl TaskOrchestrator { + impl TaskOrchestrator + where + T: TransportConnect, + { pub(super) fn new(retry_policy: RetryPolicy) -> Self { Self { retry_policy, @@ -253,7 +250,7 @@ mod task_orchestrator { fn handle_task_closed( &mut self, - result: Result<(task::Id, Result<(), consumer_task::Error>), JoinError>, + result: Result<(task::Id, Result<(), crate::Error>), JoinError>, ) { let task_id = match result { Ok((id, _)) => id, @@ -341,7 +338,7 @@ mod task_orchestrator { pub(super) fn start( &mut self, subscription_id: SubscriptionId, - consumer_task_clone: consumer_task::ConsumerTask, + consumer_task_clone: consumer_task::ConsumerTask, ) { // Shutdown old task, if any if let Some(task_state) = self.subscription_id_to_task_state.remove(&subscription_id) { diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 430fce7d5c..c25fccea62 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -31,6 +31,7 @@ restate-admin = { workspace = true, features = ["storage-query"]} restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-loglet"] } restate-core = { workspace = true } restate-futures-util = { workspace = true } +restate-ingestion-client = { workspace = true } restate-ingress-http = { workspace = true } restate-log-server = { workspace = true } restate-metadata-providers = { workspace = true } @@ -44,6 +45,7 @@ restate-service-protocol = { workspace = true, features = ["discovery"] } restate-storage-query-datafusion = { workspace = true } restate-tracing-instrumentation = { workspace = true, features = ["prometheus"] } restate-types = { workspace = true, features = ["clap"] } +restate-wal-protocol = { workspace = true } restate-worker = { workspace = true } ahash = { workspace = true } diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 513de1de35..6bdb125cd1 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -25,12 +25,13 @@ use tracing::{debug, info, trace, warn}; use codederror::CodedError; use restate_bifrost::BifrostService; use restate_core::network::{ - GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, + GrpcConnector, MessageRouterBuilder, NetworkServerBuilder, Networking, Swimlane, }; use restate_core::partitions::PartitionRouting; use restate_core::{Metadata, MetadataKind, MetadataWriter, TaskKind}; use restate_core::{MetadataBuilder, MetadataManager, TaskCenter, spawn_metadata_manager}; use restate_futures_util::overdue::OverdueLoggingExt; +use restate_ingestion_client::{IngestionClient, SessionOptions}; use restate_log_server::LogServerService; use restate_metadata_server::{ BoxedMetadataServer, MetadataServer, MetadataStoreClient, ReadModifyWriteError, @@ -133,7 +134,7 @@ pub struct Node { metadata_server_role: Option, failure_detector: FailureDetector>, admin_role: Option>, - worker_role: Option, + worker_role: Option>, ingress_role: Option>, log_server: Option, networking: Networking, @@ -256,6 +257,23 @@ impl Node { None }; + // initialize the ingestion client. + let ingestion_client = IngestionClient::new( + networking.clone(), + Metadata::with_current(|m| m.updateable_partition_table()), + PartitionRouting::new(replica_set_states.clone(), TaskCenter::current()), + config + .ingress + .ingestion + .inflight_memory_budget + .as_non_zero_usize(), + Some(SessionOptions { + batch_size: config.ingress.ingestion.request_batch_size.as_usize(), + connection_retry_policy: config.ingress.ingestion.connection_retry_policy.clone(), + swimlane: Swimlane::IngressData, + }), + ); + let worker_role = if config.has_role(Role::Worker) { Some( WorkerRole::create( @@ -265,6 +283,7 @@ impl Node { partition_store_manager.clone(), networking.clone(), bifrost_svc.handle(), + ingestion_client, metadata_manager.writer(), ) .await?, diff --git a/crates/node/src/roles/worker.rs b/crates/node/src/roles/worker.rs index 44deabb9ba..a7bdaf2712 100644 --- a/crates/node/src/roles/worker.rs +++ b/crates/node/src/roles/worker.rs @@ -19,11 +19,13 @@ use restate_core::network::Networking; use restate_core::network::TransportConnect; use restate_core::worker_api::ProcessorsManagerHandle; use restate_core::{MetadataWriter, TaskCenter}; +use restate_ingestion_client::IngestionClient; use restate_partition_store::PartitionStoreManager; use restate_storage_query_datafusion::context::QueryContext; use restate_types::health::HealthStatus; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::protobuf::common::WorkerStatus; +use restate_wal_protocol::Envelope; use restate_worker::Worker; #[derive(Debug, thiserror::Error, CodedError)] @@ -36,18 +38,23 @@ pub enum WorkerRoleBuildError { ), } -pub struct WorkerRole { - worker: Worker, +pub struct WorkerRole { + worker: Worker, } -impl WorkerRole { - pub async fn create( +impl WorkerRole +where + T: TransportConnect, +{ + #[allow(clippy::too_many_arguments)] + pub async fn create( health_status: HealthStatus, replica_set_states: PartitionReplicaSetStates, router_builder: &mut MessageRouterBuilder, partition_store_manager: Arc, networking: Networking, bifrost: Bifrost, + ingestion_client: IngestionClient, metadata_writer: MetadataWriter, ) -> Result { let worker = Worker::create( @@ -56,6 +63,7 @@ impl WorkerRole { partition_store_manager, networking, bifrost, + ingestion_client, router_builder, metadata_writer, ) diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index 1bcb082044..4324f3a367 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -487,6 +487,21 @@ impl KeyCodec for PaddedPartitionId { } } +impl KeyCodec for u128 { + fn encode(&self, target: &mut B) { + // store u128 in big-endian order to support byte-wise increment operation. See `crate::scan::try_increment`. + target.put_u128(*self); + } + + fn decode(source: &mut B) -> crate::Result { + Ok(source.get_u128()) + } + + fn serialized_length(&self) -> usize { + 16 + } +} + impl KeyCodec for UniqueTimestamp { fn encode(&self, target: &mut B) { // store u64 in big-endian order to support byte-wise increment operation. See `crate::scan::try_increment`. @@ -640,6 +655,10 @@ impl KeyCodec for ProducerId { target.put_u8(1); KeyCodec::encode(i, target) } + ProducerId::Producer(i) => { + target.put_u8(2); + KeyCodec::encode(&u128::from(*i), target) + } } } @@ -650,6 +669,7 @@ impl KeyCodec for ProducerId { ProducerId::Partition(padded.into()) } 1 => ProducerId::Other(KeyCodec::decode(source)?), + 2 => ProducerId::Producer(u128::decode(source)?.into()), i => { return Err(StorageError::Generic(anyhow!( "Unexpected wrong discriminator for SequenceNumberSource: {}", @@ -663,6 +683,7 @@ impl KeyCodec for ProducerId { 1 + match self { ProducerId::Partition(p) => KeyCodec::serialized_length(&PaddedPartitionId::from(*p)), ProducerId::Other(i) => KeyCodec::serialized_length(i), + ProducerId::Producer(i) => KeyCodec::serialized_length(&u128::from(*i)), } } } diff --git a/crates/storage-api/src/deduplication_table/mod.rs b/crates/storage-api/src/deduplication_table/mod.rs index a8bc3479a7..ac1d2df1ef 100644 --- a/crates/storage-api/src/deduplication_table/mod.rs +++ b/crates/storage-api/src/deduplication_table/mod.rs @@ -14,6 +14,7 @@ use bytestring::ByteString; use restate_types::identifiers::{LeaderEpoch, PartitionId}; use restate_types::message::MessageIndex; +use serde::{Deserialize, Serialize}; use crate::Result; use crate::protobuf_types::PartitionStoreProtobufValue; @@ -45,6 +46,13 @@ impl DedupInformation { sequence_number: DedupSequenceNumber::Sn(sequence_number), } } + + pub fn producer(producer_id: u128, message_index: MessageIndex) -> Self { + DedupInformation { + producer_id: ProducerId::Producer(producer_id.into()), + sequence_number: DedupSequenceNumber::Sn(message_index), + } + } } static SELF_PRODUCER: ByteString = ByteString::from_static("SELF"); @@ -53,6 +61,7 @@ static SELF_PRODUCER: ByteString = ByteString::from_static("SELF"); pub enum ProducerId { Partition(PartitionId), Other(ByteString), + Producer(U128), } impl ProducerId { @@ -140,3 +149,40 @@ pub trait WriteDeduplicationTable { dedup_sequence_number: &DedupSequenceNumber, ) -> Result<()>; } + +// Flexbuffers does not support u128 so we need to +// make this representation for it. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub struct U128 { + h: u64, + l: u64, +} + +impl From for U128 { + fn from(value: u128) -> Self { + Self { + h: (value >> 64) as u64, + l: value as u64, + } + } +} + +impl From for u128 { + fn from(value: U128) -> Self { + let v = (value.h as u128) << 64; + v | (value.l as u128) + } +} + +#[cfg(test)] +mod test { + use crate::deduplication_table::U128; + + #[test] + fn test_u128() { + let x = u128::MAX; + let y = U128::from(x); + let z = u128::from(y); + assert_eq!(x, z); + } +} diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 58e7d0eb29..84ec189a38 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -459,6 +459,18 @@ pub struct CommonOptions { /// unless you are trying to recover a cluster from previous synchronization-related issues. #[serde(default)] hlc_max_drift: FriendlyDuration, + + /// # Experimental Kafka batch ingestion + /// + /// Use the new experimental kafka ingestion path which leverages batching + /// for a faster kafka ingestion. + /// + /// Set to `true` to enable the experimental ingestion mechanism. + /// + /// The legacy path will be removed in v1.7. + /// + /// Defaults to `false` in v1.6. + pub experimental_kafka_batch_ingestion: bool, } serde_with::with_prefix!(pub prefix_tokio_console "tokio_console_"); @@ -684,6 +696,7 @@ impl Default for CommonOptions { gossip: GossipOptions::default(), experimental_enable_vqueues: false, hlc_max_drift: FriendlyDuration::from_millis(5000), + experimental_kafka_batch_ingestion: false, } } } @@ -1066,6 +1079,62 @@ impl Default for TracingOptions { } } +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "kebab-case")] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr( + feature = "schemars", + schemars( + title = "Ingestion Options", + description = "Options for ingestion client" + ) +)] +pub struct IngestionOptions { + /// # Inflight Memory Budget + /// + /// Maximum total size of in-flight ingestion requests in bytes. + /// Tune this to your workload so there are enough unpersisted + /// requests for efficient batching without exhausting memory. + /// + /// Defaults to 1 MiB. + pub inflight_memory_budget: NonZeroByteCount, + + /// # Connection retry policy + /// + /// Retry policy for the ingestion client. It must allow unlimited + /// retries; if configured with a cap, the client falls back to + /// retrying every 2 seconds. + pub connection_retry_policy: RetryPolicy, + + /// # Request Batch Size + /// + /// Maximum size of a single ingestion request batch. + /// Tune to keep enough requests per batch for + /// throughput; overly large batches can increase tail latency. + /// + /// Defaults to 50 KiB. + pub request_batch_size: NonZeroByteCount, +} + +impl Default for IngestionOptions { + fn default() -> Self { + Self { + inflight_memory_budget: NonZeroByteCount::new( + NonZeroUsize::new(1024 * 1024).expect("non zero"), + ), //1 MiB + connection_retry_policy: RetryPolicy::exponential( + Duration::from_millis(10), + 2.0, + None, + Some(Duration::from_secs(2)), + ), + request_batch_size: NonZeroByteCount::new( + NonZeroUsize::new(50 * 1024).expect("non zero"), + ), + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/crates/types/src/config/ingress.rs b/crates/types/src/config/ingress.rs index e9e5c781bb..f729beb24b 100644 --- a/crates/types/src/config/ingress.rs +++ b/crates/types/src/config/ingress.rs @@ -13,6 +13,7 @@ use std::num::NonZeroUsize; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; +use crate::config::IngestionOptions; use crate::net::address::{AdvertisedAddress, BindAddress, HttpIngressPort}; use crate::net::listener::AddressBook; @@ -43,6 +44,12 @@ pub struct IngressOptions { /// Ingress endpoint that the Web UI should use to interact with. #[serde(default, skip_serializing_if = "Option::is_none")] advertised_ingress_endpoint: Option>, + + /// # Ingestion Options + /// + /// Settings for the ingestion client + /// Currently only used by the Kafka ingress and the admin API. + pub ingestion: IngestionOptions, } impl IngressOptions { diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index af97fdd460..d4ef656b43 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -450,7 +450,24 @@ pub trait WithInvocationId { pub type EncodedInvocationId = [u8; InvocationId::RAW_BYTES_LEN]; impl InvocationId { + /// Creates a new [`InvocationId`], using a deterministic partition key from the invocation + /// target/idempotency key when available; otherwise a random partition key is used. pub fn generate(invocation_target: &InvocationTarget, idempotency_key: Option<&str>) -> Self { + Self::generate_or_else(invocation_target, idempotency_key, || { + rand::rng().next_u64() + }) + } + + /// Creates a new [`InvocationId`]; prefers a deterministic partition key from the invocation + /// target/idempotency key, otherwise uses the provided fallback function to produce one. + pub fn generate_or_else( + invocation_target: &InvocationTarget, + idempotency_key: Option<&str>, + f: F, + ) -> Self + where + F: FnOnce() -> PartitionKey, + { // --- Partition key generation let partition_key = // Either try to generate the deterministic partition key, if possible @@ -459,7 +476,7 @@ impl InvocationId { idempotency_key, ) // If no deterministic partition key can be generated, just pick a random number - .unwrap_or_else(|| rand::rng().next_u64()); + .unwrap_or_else(f); // --- Invocation UUID generation InvocationId::from_parts( diff --git a/crates/types/src/net/ingest.rs b/crates/types/src/net/ingest.rs index 47d88ad0e5..87d9667423 100644 --- a/crates/types/src/net/ingest.rs +++ b/crates/types/src/net/ingest.rs @@ -13,10 +13,11 @@ use std::sync::Arc; use bytes::{Bytes, BytesMut}; use metrics::Key; -use restate_encoding::ArcedSlice; +use restate_encoding::{ArcedSlice, RestateEncoding}; use crate::identifiers::PartitionId; use crate::logs::{HasRecordKeys, Keys}; +use crate::message::MessageIndex; use crate::net::partition_processor::PartitionLeaderService; use crate::net::{RpcRequest, bilrost_wire_codec, define_rpc}; use crate::storage::{StorageCodec, StorageEncode}; @@ -127,3 +128,48 @@ impl RpcRequest for ReceivedIngestRequest { type Response = IngestResponse; type Service = PartitionLeaderService; } + +// The following messages are used by the kafka ingress +// only during the migration from using string based +// producer ids to number producer ids. +// added on version v1.6.0 +// todo(azmy): deprecate + +/// Query the sequence number associated with +/// the deduplication key + +#[derive(Debug, Clone, bilrost::Oneof)] +pub enum ProducerId { + Unknown, + #[bilrost(1)] + String(String), + #[bilrost(tag(2), encoding(RestateEncoding))] + Numeric(u128), +} + +#[derive(Debug, Clone, bilrost::Message)] +pub struct DedupSequenceNrQueryRequest { + // ProducerId of the deduplication information + // required. + #[bilrost(oneof(1, 2))] + pub producer_id: ProducerId, +} + +bilrost_wire_codec!(DedupSequenceNrQueryRequest); + +/// Last sequence number recorded by the partition processor +#[derive(Debug, Clone, bilrost::Message)] +pub struct DedupSequenceNrQueryResponse { + #[bilrost(1)] + pub status: ResponseStatus, + #[bilrost(2)] + pub sequence_number: Option, +} + +bilrost_wire_codec!(DedupSequenceNrQueryResponse); + +define_rpc! { + @request = DedupSequenceNrQueryRequest, + @response = DedupSequenceNrQueryResponse, + @service = PartitionLeaderService, +} diff --git a/crates/types/src/retries.rs b/crates/types/src/retries.rs index 8c288f0db8..af4dcb0fcf 100644 --- a/crates/types/src/retries.rs +++ b/crates/types/src/retries.rs @@ -174,6 +174,36 @@ impl RetryPolicy { } } + /// Retry the provided closure respecting this retry policy. + /// Calls inspect with the number of attempts and the error on each failed try, + /// except for the last retry. + pub async fn retry_with_inspect( + self, + mut operation: F, + mut inspect: I, + ) -> Result + where + F: FnMut() -> Fut, + Fut: Future>, + I: FnMut(u32, E), + { + let mut retry_iter = self.into_iter(); + let mut attempts = 0; + loop { + attempts += 1; + match (operation().await, retry_iter.next()) { + (Ok(res), _) => return Ok(res), + (Err(e), None) => { + return Err(e); + } + (Err(e), Some(timer)) => { + inspect(attempts, e); + tokio::time::sleep(timer).await; + } + } + } + } + /// Retry the provided closure respecting this retry policy and the retry condition. pub async fn retry_if( self, diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index fba8aa29ed..148e0023ae 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -29,6 +29,7 @@ restate-errors = { workspace = true } restate-futures-util = { workspace = true } restate-ingress-http = { workspace = true } restate-ingress-kafka = { workspace = true } +restate-ingestion-client = { workspace = true } restate-invoker-api = { workspace = true } restate-invoker-impl = { workspace = true } restate-metadata-server = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 572dd83914..dd077cc68d 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -34,6 +34,7 @@ use restate_core::partitions::PartitionRouting; use restate_core::worker_api::ProcessorsManagerHandle; use restate_core::{Metadata, TaskKind}; use restate_core::{MetadataWriter, TaskCenter}; +use restate_ingestion_client::IngestionClient; use restate_ingress_kafka::Service as IngressKafkaService; use restate_invoker_impl::InvokerHandle as InvokerChannelServiceHandle; use restate_partition_store::snapshots::SnapshotRepository; @@ -51,6 +52,7 @@ use restate_types::health::HealthStatus; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::protobuf::common::WorkerStatus; use restate_types::schema::subscriptions::SubscriptionResolver; +use restate_wal_protocol::Envelope; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition_processor_manager::PartitionProcessorManager; @@ -91,21 +93,26 @@ pub enum BuildError { SnapshotRepository(#[from] anyhow::Error), } -pub struct Worker { +pub struct Worker { storage_query_context: QueryContext, datafusion_remote_scanner: RemoteQueryScannerServer, - ingress_kafka: IngressKafkaService, + ingress_kafka: IngressKafkaService, subscription_controller_handle: SubscriptionControllerHandle, partition_processor_manager: PartitionProcessorManager, } -impl Worker { - pub async fn create( +impl Worker +where + T: TransportConnect, +{ + #[allow(clippy::too_many_arguments)] + pub async fn create( health_status: HealthStatus, replica_set_states: PartitionReplicaSetStates, partition_store_manager: Arc, networking: Networking, bifrost: Bifrost, + ingestion_client: IngestionClient, router_builder: &mut MessageRouterBuilder, metadata_writer: MetadataWriter, ) -> Result { @@ -122,7 +129,9 @@ impl Worker { let schema = metadata.updateable_schema(); // ingress_kafka - let ingress_kafka = IngressKafkaService::new(bifrost.clone(), schema.clone()); + let ingress_kafka = + IngressKafkaService::new(bifrost.clone(), ingestion_client.clone(), schema.clone()); + let subscription_controller_handle = SubscriptionControllerHandle::new(ingress_kafka.create_command_sender()); diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index ad5df06917..aabf0e0976 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -32,7 +32,9 @@ use tracing::{Span, debug, error, info, instrument, trace, warn}; use restate_bifrost::loglet::FindTailOptions; use restate_bifrost::{Bifrost, LogEntry, MaybeRecord}; use restate_core::network::{Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, Verdict}; -use restate_core::{Metadata, ShutdownError, cancellation_watcher, my_node_id}; +use restate_core::{ + Metadata, ShutdownError, TaskCenter, TaskKind, cancellation_watcher, my_node_id, +}; use restate_invoker_api::capacity::InvokerCapacity; use restate_partition_store::{PartitionStore, PartitionStoreTransaction}; use restate_storage_api::deduplication_table::{ @@ -47,12 +49,15 @@ use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStat use restate_types::config::Configuration; use restate_types::identifiers::LeaderEpoch; use restate_types::logs::{KeyFilter, Lsn, Record, SequenceNumber}; -use restate_types::net::RpcRequest; -use restate_types::net::ingest::{ReceivedIngestRequest, ResponseStatus}; +use restate_types::net::ingest::{ + DedupSequenceNrQueryRequest, DedupSequenceNrQueryResponse, ReceivedIngestRequest, + ResponseStatus, +}; use restate_types::net::partition_processor::{ PartitionLeaderService, PartitionProcessorRpcError, PartitionProcessorRpcRequest, PartitionProcessorRpcResponse, }; +use restate_types::net::{RpcRequest, ingest}; use restate_types::partitions::state::PartitionReplicaSetStates; use restate_types::retries::{RetryPolicy, with_jitter}; use restate_types::schema::Schema; @@ -167,6 +172,8 @@ where trim_queue.clone(), ); + let last_applied_log_lsn_watch = watch::Sender::new(Lsn::INVALID); + Ok(PartitionProcessor { partition_id_str, leadership_state, @@ -179,6 +186,7 @@ where status, replica_set_states, trim_queue, + last_applied_log_lsn_watch, }) } @@ -228,6 +236,8 @@ pub struct PartitionProcessor { partition_store: PartitionStore, trim_queue: TrimQueue, + + last_applied_log_lsn_watch: watch::Sender, } #[derive(Debug, thiserror::Error)] @@ -283,6 +293,16 @@ struct LsnEnvelope { pub envelope: Arc, } +/// OrderedOperations are scheduled operations that +/// will only get executed once the partition read up to +/// the bifrost tail that was found once the operation +/// was submitted. +enum OrderedOp { + QueryLegacyDedupSn { + request: Incoming>, + }, +} + impl PartitionProcessor where InvokerSender: restate_invoker_api::InvokerHandle> + Clone, @@ -342,6 +362,10 @@ where .await? .unwrap_or(Lsn::INVALID); + self.last_applied_log_lsn_watch + .send_replace(last_applied_lsn); + let last_applied_lsn_watch = self.last_applied_log_lsn_watch.subscribe(); + let log_id = self.partition_store.partition().log_id(); let partition_id = self.partition_store.partition_id(); let my_node = my_node_id().as_plain(); @@ -478,7 +502,7 @@ where self.status.effective_mode = self.leadership_state.effective_mode(); } Some(msg) = self.network_leader_svc_rx.recv() => { - self.on_rpc(msg, &mut partition_store, live_schemas.live_load()).await; + self.on_rpc(msg, &mut partition_store, live_schemas.live_load(), &last_applied_lsn_watch).await; } _ = status_update_timer.tick() => { if durable_lsn_watch.has_changed().map_err(|e| ProcessorError::Other(e.into()))? { @@ -536,6 +560,9 @@ where // commit all changes so far, this is important so that the actuators see all changes // when becoming leader. transaction.commit().await?; + // Notify all lsn watchers that the lsn has been committed + + self.last_applied_log_lsn_watch.send_replace(lsn); // We can ignore all actions collected so far because as a new leader we have to instruct the // actuators afresh. @@ -569,6 +596,10 @@ where // Commit our changes and notify actuators about actions if we are the leader transaction.commit().await?; + // Notify all lsn watchers that the lsn has been committed + if let Some(lsn) = &self.status.last_applied_log_lsn { + self.last_applied_log_lsn_watch.send_replace(*lsn); + } self.leadership_state.handle_actions(action_collector.drain(..), vqueues.view())?; }, result = self.leadership_state.run(&self.state_machine, vqueues.view()) => { @@ -629,6 +660,7 @@ where msg: ServiceMessage, partition_store: &mut PartitionStore, schemas: &Schema, + last_applied_lsn_watch: &watch::Receiver, ) { match msg { ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => { @@ -641,12 +673,103 @@ where ServiceMessage::Rpc(msg) if msg.msg_type() == ReceivedIngestRequest::TYPE => { self.on_pp_ingest_request(msg.into_typed()).await; } + ServiceMessage::Rpc(msg) if msg.msg_type() == DedupSequenceNrQueryRequest::TYPE => { + self.wait_for_tail_then( + last_applied_lsn_watch, + OrderedOp::QueryLegacyDedupSn { + request: msg.into_typed(), + }, + ); + } msg => { msg.fail(Verdict::MessageUnrecognized); } } } + async fn on_ordered_op(partition_store: &mut PartitionStore, op: OrderedOp) { + match op { + OrderedOp::QueryLegacyDedupSn { request } => { + Self::on_dedup_sn_query(partition_store, request).await; + } + } + } + + fn wait_for_tail_then( + &self, + last_applied_lsn_watch: &watch::Receiver, + ordered_op: OrderedOp, + ) { + let bifrost = self.bifrost.clone(); + let log_id = self.partition_store.partition().log_id(); + let mut last_applied_lsn_watch = last_applied_lsn_watch.clone(); + let mut partition_store = self.partition_store.clone(); + + _ = TaskCenter::current().spawn_child( + TaskKind::Disposable, + "ordered-operation", + async move { + let tail = bifrost + .find_tail(log_id, FindTailOptions::ConsistentRead) + .await?; + let wait_for = tail.offset().prev(); + last_applied_lsn_watch.wait_for(|v| v >= &wait_for).await?; + Self::on_ordered_op(&mut partition_store, ordered_op).await; + Ok(()) + }, + ); + } + + /// Used mainly by kafka-ingress to query old style dedup information + /// during the migration to the new u128 based producer id introduced with v1.6. + async fn on_dedup_sn_query( + partition_store: &mut PartitionStore, + msg: Incoming>, + ) { + let (tx, body) = msg.split(); + let producer_id = match body.producer_id { + ingest::ProducerId::Unknown => { + tx.send(DedupSequenceNrQueryResponse { + status: ResponseStatus::Internal { + msg: "missing producer id".into(), + }, + sequence_number: None, + }); + return; + } + ingest::ProducerId::String(v) => ProducerId::Other(v.into()), + ingest::ProducerId::Numeric(v) => ProducerId::Producer(v.into()), + }; + + match partition_store + .get_dedup_sequence_number(&producer_id) + .await + { + Ok(result) => { + let sequence_number = result.and_then(|v| { + if let DedupSequenceNumber::Sn(sn) = v { + Some(sn) + } else { + None + } + }); + + tx.send(DedupSequenceNrQueryResponse { + status: ResponseStatus::Ack, + sequence_number, + }); + } + Err(err) => { + tx.send(DedupSequenceNrQueryResponse { + status: ResponseStatus::Internal { + msg: err.to_string(), + }, + sequence_number: None, + }); + } + } + } + async fn on_pp_ingest_request(&mut self, msg: Incoming>) { let (reciprocal, request) = msg.split(); histogram!( diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index ea764c1d00..822642b2f0 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -138,6 +138,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parki typenum = { version = "1", default-features = false, features = ["const-generics"] } ulid = { version = "1", features = ["serde"] } uuid = { version = "1", features = ["js", "serde", "v4", "v7"] } +xxhash-rust = { version = "0.8", default-features = false, features = ["std", "xxh3"] } zerocopy = { version = "0.8", default-features = false, features = ["derive", "simd"] } zeroize = { version = "1", features = ["zeroize_derive"] } zstd = { version = "0.13" } @@ -269,6 +270,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parki typenum = { version = "1", default-features = false, features = ["const-generics"] } ulid = { version = "1", features = ["serde"] } uuid = { version = "1", features = ["js", "serde", "v4", "v7"] } +xxhash-rust = { version = "0.8", default-features = false, features = ["std", "xxh3"] } zerocopy = { version = "0.8", default-features = false, features = ["derive", "simd"] } zeroize = { version = "1", features = ["zeroize_derive"] } zstd = { version = "0.13" }