diff --git a/Cargo.lock b/Cargo.lock index 355e31028d..0bc2606a65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3652,15 +3652,16 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", - "futures-util", + "futures-core", "http 1.3.1", "http-body 1.0.1", "pin-project-lite", + "tokio", ] [[package]] @@ -7158,6 +7159,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "http-body-util", + "indexmap 2.11.4", "itertools 0.14.0", "metrics", "opentelemetry", diff --git a/Cargo.toml b/Cargo.toml index d9e5d6efc2..1484970f7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ googletest = { version = "0.10", features = ["anyhow"] } hostname = { version = "0.4.0" } http = "1.3.1" http-body = "1.0.1" -http-body-util = "0.1.2" +http-body-util = { version = "0.1.3", features = ["channel"] } http-serde = { version = "2.1.1" } humantime = "2.1.0" hyper = { version = "1.6.0", default-features = false } diff --git a/crates/invoker-impl/Cargo.toml b/crates/invoker-impl/Cargo.toml index 794323f11a..1d955ba484 100644 --- a/crates/invoker-impl/Cargo.toml +++ b/crates/invoker-impl/Cargo.toml @@ -33,6 +33,7 @@ codederror = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } gardal = { workspace = true , features = ["async"]} +indexmap = { workspace = true } http = { workspace = true } http-body = { workspace = true } http-body-util = { workspace = true } diff --git a/crates/invoker-impl/src/invocation_state_machine.rs b/crates/invoker-impl/src/invocation_state_machine.rs index c4dcab63cd..7b7ef52ffc 100644 --- a/crates/invoker-impl/src/invocation_state_machine.rs +++ b/crates/invoker-impl/src/invocation_state_machine.rs @@ -499,6 +499,18 @@ impl InvocationStateMachine { true } + /// Returns true if it was an open bidi stream, thus this suspend had some effect. + pub(super) fn suspend(&mut self) -> bool { + if let AttemptState::InFlight { + notifications_tx, .. + } = &mut self.invocation_state + { + // Close notifications_tx to trigger suspension + return notifications_tx.take().is_some_and(|tx| !tx.is_closed()); + } + false + } + pub(crate) fn should_emit_transient_error_event( &mut self, new_error_event: &TransientErrorEvent, @@ -566,6 +578,26 @@ mod tests { pub(crate) fn is_waiting_retry(&self) -> bool { matches!(self.invocation_state, AttemptState::WaitingRetry { .. }) } + + pub(crate) fn in_flight_with_notifications_tx_closed(&self) -> bool { + matches!( + self.invocation_state, + AttemptState::InFlight { + notifications_tx: None, + .. + } + ) + } + + pub(crate) fn in_flight_with_notifications_tx_open(&self) -> bool { + matches!( + &self.invocation_state, + AttemptState::InFlight { + notifications_tx: Some(_), + .. + } + ) + } } #[test] diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index b251afedcf..5e30b8e386 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -129,10 +129,9 @@ impl From for InvocationTaskOutputInner { } } -type InvokerBodyStream = - http_body_util::StreamBody, Infallible>>>; +type InvokerBodyStream = http_body_util::channel::Channel; -type InvokerRequestStreamSender = mpsc::Sender, Infallible>>; +type InvokerRequestStreamSender = http_body_util::channel::Sender; /// Represents an open invocation stream pub(super) struct InvocationTask { diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs index d29da38734..fea7c1a559 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner.rs @@ -16,6 +16,7 @@ use futures::{Stream, StreamExt}; use http::uri::PathAndQuery; use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use http_body::Frame; +use http_body_util::Channel; use opentelemetry::trace::TraceFlags; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -216,8 +217,7 @@ where parent_span_context: &ServiceInvocationSpanContext, ) -> (InvokerRequestStreamSender, Request) { // Just an arbitrary buffering size - let (http_stream_tx, http_stream_rx) = mpsc::channel(10); - let req_body = InvokerBodyStream::new(ReceiverStream::new(http_stream_rx)); + let (http_stream_tx, req_body) = Channel::new(10); let service_protocol_header_value = service_protocol_version_to_header_value(service_protocol_version); @@ -458,7 +458,7 @@ where trace!(restate.protocol.message = ?msg, "Sending message"); let buf = self.encoder.encode(msg); - if http_stream_tx.send(Ok(Frame::data(buf))).await.is_err() { + if http_stream_tx.send(Frame::data(buf)).await.is_err() { return Err(InvokerError::UnexpectedClosedRequestStream); }; Ok(()) diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index e0b8f3f586..60a4fde093 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::HashSet; +use std::convert::Infallible; use std::ops::Deref; use std::pin::Pin; use std::task::{Context, Poll, ready}; @@ -21,6 +22,8 @@ use gardal::futures::StreamExt as GardalStreamExt; use http::uri::PathAndQuery; use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; use http_body::Frame; +use http_body_util::Channel; +use http_body_util::channel::Sender; use opentelemetry::trace::TraceFlags; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -232,8 +235,7 @@ where parent_span_context: &ServiceInvocationSpanContext, ) -> (InvokerRequestStreamSender, Request) { // Just an arbitrary buffering size - let (http_stream_tx, http_stream_rx) = mpsc::channel(10); - let req_body = InvokerBodyStream::new(ReceiverStream::new(http_stream_rx)); + let (http_stream_tx, req_body) = Channel::new(10); let service_protocol_header_value = service_protocol_version_to_header_value(service_protocol_version); @@ -388,6 +390,7 @@ where crate::shortcircuit!(self.write(&mut http_stream_tx, Message::new_command_ack(entry_index)).await); }, None => { + drop(http_stream_tx); // Completion channel is closed, // the invoker main loop won't send completions anymore. // Response stream might still be open though. @@ -407,6 +410,7 @@ where } }, _ = tokio::time::sleep(self.invocation_task.inactivity_timeout) => { + drop(http_stream_tx); debug!("Inactivity detected, going to suspend invocation"); // Just return. This will drop the invoker_rx and http_stream_tx, // closing the request stream and the invoker input channel. @@ -516,7 +520,7 @@ where trace!(restate.protocol.message = ?msg, "Sending message"); let buf = self.encoder.encode(msg); - if http_stream_tx.send(Ok(Frame::data(buf))).await.is_err() { + if http_stream_tx.send(Frame::data(buf)).await.is_err() { return Err(InvokerError::UnexpectedClosedRequestStream); }; Ok(()) @@ -531,7 +535,7 @@ where trace!(restate.protocol.message = ?ty, "Sending message"); let buf = self.encoder.encode_raw(ty, buf); - if http_stream_tx.send(Ok(Frame::data(buf))).await.is_err() { + if http_stream_tx.send(Frame::data(buf)).await.is_err() { return Err(InvokerError::UnexpectedClosedRequestStream); }; Ok(()) diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 2d1b9c8f66..a46b422380 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -17,6 +17,12 @@ mod quota; mod state_machine_manager; mod status_store; +use futures::StreamExt; +use futures::stream::Peekable; +use gardal::futures::ThrottledStream; +use gardal::{PaddedAtomicSharedStorage, StreamExt as GardalStreamExt, TokioClock}; +use metrics::counter; +use restate_time_util::DurationExt; use std::collections::{HashMap, HashSet}; use std::future::Future; use std::io::ErrorKind; @@ -25,12 +31,6 @@ use std::path::PathBuf; use std::pin::Pin; use std::time::SystemTime; use std::{cmp, panic}; - -use futures::StreamExt; -use gardal::futures::ThrottledStream; -use gardal::{PaddedAtomicSharedStorage, StreamExt as GardalStreamExt, TokioClock}; -use metrics::counter; -use restate_time_util::DurationExt; use tokio::sync::mpsc; use tokio::task::{AbortHandle, JoinSet}; use tracing::{debug, trace, warn}; @@ -340,7 +340,7 @@ where let mut segmented_input_queue = match SegmentQueue::init(tmp_dir.clone(), in_memory_limit) .await { - Ok(queue) => std::pin::pin!(queue.throttle(invocation_token_bucket)), + Ok(queue) => std::pin::pin!(queue.throttle(invocation_token_bucket).peekable()), Err(e) if e.kind() == ErrorKind::PermissionDenied => { warn!( "Could not initialize the invoker spill queue, permission denied to write the directory '{}'\n\ @@ -408,14 +408,17 @@ where Schemas: InvocationTargetResolver, { // Returns true if we should execute another step, false if we should stop executing steps + #[allow(clippy::type_complexity)] async fn step( &mut self, options: &InvokerOptions, mut segmented_input_queue: Pin< - &mut ThrottledStream< - SegmentQueue>, - PaddedAtomicSharedStorage, - TokioClock, + &mut Peekable< + ThrottledStream< + SegmentQueue>, + PaddedAtomicSharedStorage, + TokioClock, + >, >, >, mut shutdown: Pin<&mut F>, @@ -423,142 +426,416 @@ where where F: Future, { - tokio::select! { - Some(cmd) = self.status_rx.recv() => { - let keys = cmd.payload(); - let statuses = self - .invocation_state_machine_manager - .registered_partitions_with_keys(keys.clone()) - .flat_map(|partition| self.status_store.status_for_partition(partition)) - .filter(|status| keys.contains(&status.invocation_id().partition_key())) - .collect(); + if self.quota.is_slot_available() { + tokio::select! { + Some(cmd) = self.status_rx.recv() => { + let keys = cmd.payload(); + let statuses = self + .invocation_state_machine_manager + .registered_partitions_with_keys(keys.clone()) + .flat_map(|partition| self.status_store.status_for_partition(partition)) + .filter(|status| keys.contains(&status.invocation_id().partition_key())) + .collect(); - let _ = cmd.reply(statuses); - }, + let _ = cmd.reply(statuses); + }, - Some(input_message) = self.input_rx.recv() => { - match input_message { - // --- Spillable queue loading/offloading - InputCommand::Invoke(invoke_command) => { - counter!(INVOKER_ENQUEUE, "partition_id" => invoke_command.partition.0.to_string()).increment(1); - segmented_input_queue.inner_pin_mut().enqueue(invoke_command).await; - }, - // --- Other commands (they don't go through the segment queue) - InputCommand::RegisterPartition { partition, partition_key_range, storage_reader, sender, } => { - self.handle_register_partition(partition, partition_key_range, - storage_reader, sender); - }, - InputCommand::Abort { partition, invocation_id, invocation_epoch } => { - self.handle_abort_invocation(partition, invocation_id,invocation_epoch); - } - InputCommand::RetryNow { partition, invocation_id, invocation_epoch } => { - self.handle_retry_now_invocation(options, partition, invocation_id,invocation_epoch); - } - InputCommand::Pause { partition, invocation_id, invocation_epoch } => { - self.handle_pause_invocation( partition, invocation_id,invocation_epoch).await; - } - InputCommand::AbortAllPartition { partition } => { - self.handle_abort_partition(partition); + Some(input_message) = self.input_rx.recv() => { + match input_message { + // --- Spillable queue loading/offloading + InputCommand::Invoke(invoke_command) => { + counter!(INVOKER_ENQUEUE, "partition_id" => invoke_command.partition.0.to_string()).increment(1); + segmented_input_queue.as_mut().get_pin_mut().inner_pin_mut().enqueue(invoke_command).await; + }, + // --- Other commands (they don't go through the segment queue) + InputCommand::RegisterPartition { partition, partition_key_range, storage_reader, sender, } => { + self.handle_register_partition(partition, partition_key_range, storage_reader, sender); + }, + InputCommand::Abort { partition, invocation_id, invocation_epoch } => { + self.handle_abort_invocation(partition, invocation_id,invocation_epoch); + } + InputCommand::RetryNow { partition, invocation_id, invocation_epoch } => { + self.handle_retry_now_invocation(options, partition, invocation_id,invocation_epoch); + } + InputCommand::Pause { partition, invocation_id, invocation_epoch } => { + self.handle_pause_invocation( partition, invocation_id,invocation_epoch).await; + } + InputCommand::AbortAllPartition { partition } => { + self.handle_abort_partition(partition); + } + InputCommand::Completion { partition, invocation_id, completion } => { + self.handle_completion(partition, invocation_id, completion); + }, + InputCommand::Notification { partition, invocation_id, invocation_epoch, notification } => { + self.handle_notification(options, partition, invocation_id,invocation_epoch, notification); + }, + InputCommand::StoredCommandAck { partition, invocation_id, invocation_epoch, command_index } => { + self.handle_stored_command_ack(options, partition, invocation_id,invocation_epoch, command_index); + } } - InputCommand::Completion { partition, invocation_id, completion } => { - self.handle_completion(partition, invocation_id, completion); - }, - InputCommand::Notification { partition, invocation_id, invocation_epoch, notification } => { - self.handle_notification(options, partition, invocation_id,invocation_epoch, notification); - }, - InputCommand::StoredCommandAck { partition, invocation_id, invocation_epoch, command_index } => { - self.handle_stored_command_ack(options, partition, invocation_id,invocation_epoch, command_index); + }, + Some(invoke_input_command) = segmented_input_queue.next() => { + self.handle_invoke(options, invoke_input_command.partition, invoke_input_command.invocation_id, invoke_input_command.invocation_epoch, invoke_input_command.invocation_target, invoke_input_command.journal); + }, + Some(invocation_task_msg) = self.invocation_tasks_rx.recv() => { + let InvocationTaskOutput { + invocation_id, + partition, + invocation_epoch, + inner + } = invocation_task_msg; + match inner { + InvocationTaskOutputInner::PinnedDeployment(deployment_metadata, has_changed) => { + self.handle_pinned_deployment( + partition, + invocation_id, + invocation_epoch, + deployment_metadata, + has_changed, + ) + } + InvocationTaskOutputInner::ServerHeaderReceived(x_restate_server_header) => { + self.handle_server_header_received( + partition, + invocation_id, + invocation_epoch, + x_restate_server_header + ) + } + InvocationTaskOutputInner::NewEntry {entry_index, entry, requires_ack} => { + self.handle_new_entry( + partition, + invocation_id, + invocation_epoch, + entry_index, + *entry, + requires_ack + ).await + }, + InvocationTaskOutputInner::NewNotificationProposal { notification } => { + self.handle_new_notification_proposal( + partition, + invocation_id, + invocation_epoch, + notification + ).await + }, + InvocationTaskOutputInner::Closed => { + self.handle_invocation_task_closed(partition, invocation_id, invocation_epoch).await + }, + InvocationTaskOutputInner::Failed(e) => { + self.handle_invocation_task_failed(partition, invocation_id, invocation_epoch, e).await + }, + InvocationTaskOutputInner::Suspended(indexes) => { + self.handle_invocation_task_suspended(partition, invocation_id, invocation_epoch, indexes).await + } + InvocationTaskOutputInner::NewCommand { command, command_index, requires_ack } => { + self.handle_new_command( + partition, + invocation_id, + invocation_epoch, + command_index, + command, + requires_ack + ).await + } + InvocationTaskOutputInner::SuspendedV2(notification_ids) => { + self.handle_invocation_task_suspended_v2(partition, invocation_id, invocation_epoch, notification_ids).await + } + }; + }, + timer = self.retry_timers.await_timer() => { + let (partition, fid, invocation_epoch) = timer.into_inner(); + self.handle_retry_timer_fired(options, partition, fid, invocation_epoch); + }, + Some(invocation_task_result) = self.invocation_tasks.join_next() => { + if let Err(err) = invocation_task_result { + // Propagate panics coming from invocation tasks. + if err.is_panic() { + panic::resume_unwind(err.into_panic()); + } } + // Other errors are cancellations caused by us (e.g. after AbortAllPartition), + // hence we can ignore them. } - }, - Some(invoke_input_command) = segmented_input_queue.next(), if !segmented_input_queue.inner().is_empty() && self.quota.is_slot_available() => { - self.handle_invoke(options, invoke_input_command.partition, invoke_input_command.invocation_id, invoke_input_command.invocation_epoch, invoke_input_command.invocation_target, invoke_input_command.journal); - }, - Some(invocation_task_msg) = self.invocation_tasks_rx.recv() => { - let InvocationTaskOutput { - invocation_id, - partition, - invocation_epoch, - inner - } = invocation_task_msg; - match inner { - InvocationTaskOutputInner::PinnedDeployment(deployment_metadata, has_changed) => { - self.handle_pinned_deployment( - partition, - invocation_id, - invocation_epoch, - deployment_metadata, - has_changed, - ) - } - InvocationTaskOutputInner::ServerHeaderReceived(x_restate_server_header) => { - self.handle_server_header_received( - partition, - invocation_id, - invocation_epoch, - x_restate_server_header - ) - } - InvocationTaskOutputInner::NewEntry {entry_index, entry, requires_ack} => { - self.handle_new_entry( - partition, - invocation_id, - invocation_epoch, - entry_index, - *entry, - requires_ack - ).await - }, - InvocationTaskOutputInner::NewNotificationProposal { notification } => { - self.handle_new_notification_proposal( - partition, - invocation_id, - invocation_epoch, - notification - ).await - }, - InvocationTaskOutputInner::Closed => { - self.handle_invocation_task_closed(partition, invocation_id, invocation_epoch).await - }, - InvocationTaskOutputInner::Failed(e) => { - self.handle_invocation_task_failed(partition, invocation_id, invocation_epoch, e).await - }, - InvocationTaskOutputInner::Suspended(indexes) => { - self.handle_invocation_task_suspended(partition, invocation_id, invocation_epoch, indexes).await + _ = &mut shutdown => { + debug!("Shutting down the invoker"); + self.handle_shutdown(); + return false; + } + } + } else if self.quota.is_evictable() { + tokio::select! { + Some(cmd) = self.status_rx.recv() => { + let keys = cmd.payload(); + let statuses = self + .invocation_state_machine_manager + .registered_partitions_with_keys(keys.clone()) + .flat_map(|partition| self.status_store.status_for_partition(partition)) + .filter(|status| keys.contains(&status.invocation_id().partition_key())) + .collect(); + + let _ = cmd.reply(statuses); + }, + + Some(input_message) = self.input_rx.recv() => { + match input_message { + // --- Spillable queue loading/offloading + InputCommand::Invoke(invoke_command) => { + counter!(INVOKER_ENQUEUE, "partition_id" => invoke_command.partition.0.to_string()).increment(1); + segmented_input_queue.get_pin_mut().inner_pin_mut().enqueue(invoke_command).await; + }, + // --- Other commands (they don't go through the segment queue) + InputCommand::RegisterPartition { partition, partition_key_range, storage_reader, sender, } => { + self.handle_register_partition(partition, partition_key_range, storage_reader, sender); + }, + InputCommand::Abort { partition, invocation_id, invocation_epoch } => { + self.handle_abort_invocation(partition, invocation_id,invocation_epoch); + } + InputCommand::RetryNow { partition, invocation_id, invocation_epoch } => { + self.handle_retry_now_invocation(options, partition, invocation_id,invocation_epoch); + } + InputCommand::Pause { partition, invocation_id, invocation_epoch } => { + self.handle_pause_invocation( partition, invocation_id,invocation_epoch).await; + } + InputCommand::AbortAllPartition { partition } => { + self.handle_abort_partition(partition); + } + InputCommand::Completion { partition, invocation_id, completion } => { + self.handle_completion(partition, invocation_id, completion); + }, + InputCommand::Notification { partition, invocation_id, invocation_epoch, notification } => { + self.handle_notification(options, partition, invocation_id,invocation_epoch, notification); + }, + InputCommand::StoredCommandAck { partition, invocation_id, invocation_epoch, command_index } => { + self.handle_stored_command_ack(options, partition, invocation_id,invocation_epoch, command_index); + } } - InvocationTaskOutputInner::NewCommand { command, command_index, requires_ack } => { - self.handle_new_command( - partition, - invocation_id, - invocation_epoch, - command_index, - command, - requires_ack - ).await + }, + Some(_) = segmented_input_queue.as_mut().peek() => { + self.handle_eviction(options); + }, + Some(invocation_task_msg) = self.invocation_tasks_rx.recv() => { + let InvocationTaskOutput { + invocation_id, + partition, + invocation_epoch, + inner + } = invocation_task_msg; + match inner { + InvocationTaskOutputInner::PinnedDeployment(deployment_metadata, has_changed) => { + self.handle_pinned_deployment( + partition, + invocation_id, + invocation_epoch, + deployment_metadata, + has_changed, + ) + } + InvocationTaskOutputInner::ServerHeaderReceived(x_restate_server_header) => { + self.handle_server_header_received( + partition, + invocation_id, + invocation_epoch, + x_restate_server_header + ) + } + InvocationTaskOutputInner::NewEntry {entry_index, entry, requires_ack} => { + self.handle_new_entry( + partition, + invocation_id, + invocation_epoch, + entry_index, + *entry, + requires_ack + ).await + }, + InvocationTaskOutputInner::NewNotificationProposal { notification } => { + self.handle_new_notification_proposal( + partition, + invocation_id, + invocation_epoch, + notification + ).await + }, + InvocationTaskOutputInner::Closed => { + self.handle_invocation_task_closed(partition, invocation_id, invocation_epoch).await + }, + InvocationTaskOutputInner::Failed(e) => { + self.handle_invocation_task_failed(partition, invocation_id, invocation_epoch, e).await + }, + InvocationTaskOutputInner::Suspended(indexes) => { + self.handle_invocation_task_suspended(partition, invocation_id, invocation_epoch, indexes).await + } + InvocationTaskOutputInner::NewCommand { command, command_index, requires_ack } => { + self.handle_new_command( + partition, + invocation_id, + invocation_epoch, + command_index, + command, + requires_ack + ).await + } + InvocationTaskOutputInner::SuspendedV2(notification_ids) => { + self.handle_invocation_task_suspended_v2(partition, invocation_id, invocation_epoch, notification_ids).await + } + }; + }, + timer = self.retry_timers.await_timer() => { + let (partition, fid, invocation_epoch) = timer.into_inner(); + self.handle_retry_timer_fired(options, partition, fid, invocation_epoch); + }, + Some(invocation_task_result) = self.invocation_tasks.join_next() => { + if let Err(err) = invocation_task_result { + // Propagate panics coming from invocation tasks. + if err.is_panic() { + panic::resume_unwind(err.into_panic()); + } } - InvocationTaskOutputInner::SuspendedV2(notification_ids) => { - self.handle_invocation_task_suspended_v2(partition, invocation_id, invocation_epoch, notification_ids).await + // Other errors are cancellations caused by us (e.g. after AbortAllPartition), + // hence we can ignore them. + } + _ = &mut shutdown => { + debug!("Shutting down the invoker"); + self.handle_shutdown(); + return false; + } + } + } else { + tokio::select! { + Some(cmd) = self.status_rx.recv() => { + let keys = cmd.payload(); + let statuses = self + .invocation_state_machine_manager + .registered_partitions_with_keys(keys.clone()) + .flat_map(|partition| self.status_store.status_for_partition(partition)) + .filter(|status| keys.contains(&status.invocation_id().partition_key())) + .collect(); + + let _ = cmd.reply(statuses); + }, + + Some(input_message) = self.input_rx.recv() => { + match input_message { + // --- Spillable queue loading/offloading + InputCommand::Invoke(invoke_command) => { + counter!(INVOKER_ENQUEUE, "partition_id" => invoke_command.partition.0.to_string()).increment(1); + segmented_input_queue.as_mut().get_pin_mut().inner_pin_mut().enqueue(invoke_command).await; + }, + // --- Other commands (they don't go through the segment queue) + InputCommand::RegisterPartition { partition, partition_key_range, storage_reader, sender, } => { + self.handle_register_partition(partition, partition_key_range, storage_reader, sender); + }, + InputCommand::Abort { partition, invocation_id, invocation_epoch } => { + self.handle_abort_invocation(partition, invocation_id,invocation_epoch); + } + InputCommand::RetryNow { partition, invocation_id, invocation_epoch } => { + self.handle_retry_now_invocation(options, partition, invocation_id,invocation_epoch); + } + InputCommand::Pause { partition, invocation_id, invocation_epoch } => { + self.handle_pause_invocation( partition, invocation_id,invocation_epoch).await; + } + InputCommand::AbortAllPartition { partition } => { + self.handle_abort_partition(partition); + } + InputCommand::Completion { partition, invocation_id, completion } => { + self.handle_completion(partition, invocation_id, completion); + }, + InputCommand::Notification { partition, invocation_id, invocation_epoch, notification } => { + self.handle_notification(options, partition, invocation_id,invocation_epoch, notification); + }, + InputCommand::StoredCommandAck { partition, invocation_id, invocation_epoch, command_index } => { + self.handle_stored_command_ack(options, partition, invocation_id,invocation_epoch, command_index); + } } - }; - }, - timer = self.retry_timers.await_timer() => { - let (partition, fid, invocation_epoch) = timer.into_inner(); - self.handle_retry_timer_fired(options, partition, fid, invocation_epoch); - }, - Some(invocation_task_result) = self.invocation_tasks.join_next() => { - if let Err(err) = invocation_task_result { - // Propagate panics coming from invocation tasks. - if err.is_panic() { - panic::resume_unwind(err.into_panic()); + }, + Some(invocation_task_msg) = self.invocation_tasks_rx.recv() => { + let InvocationTaskOutput { + invocation_id, + partition, + invocation_epoch, + inner + } = invocation_task_msg; + match inner { + InvocationTaskOutputInner::PinnedDeployment(deployment_metadata, has_changed) => { + self.handle_pinned_deployment( + partition, + invocation_id, + invocation_epoch, + deployment_metadata, + has_changed, + ) + } + InvocationTaskOutputInner::ServerHeaderReceived(x_restate_server_header) => { + self.handle_server_header_received( + partition, + invocation_id, + invocation_epoch, + x_restate_server_header + ) + } + InvocationTaskOutputInner::NewEntry {entry_index, entry, requires_ack} => { + self.handle_new_entry( + partition, + invocation_id, + invocation_epoch, + entry_index, + *entry, + requires_ack + ).await + }, + InvocationTaskOutputInner::NewNotificationProposal { notification } => { + self.handle_new_notification_proposal( + partition, + invocation_id, + invocation_epoch, + notification + ).await + }, + InvocationTaskOutputInner::Closed => { + self.handle_invocation_task_closed(partition, invocation_id, invocation_epoch).await + }, + InvocationTaskOutputInner::Failed(e) => { + self.handle_invocation_task_failed(partition, invocation_id, invocation_epoch, e).await + }, + InvocationTaskOutputInner::Suspended(indexes) => { + self.handle_invocation_task_suspended(partition, invocation_id, invocation_epoch, indexes).await + } + InvocationTaskOutputInner::NewCommand { command, command_index, requires_ack } => { + self.handle_new_command( + partition, + invocation_id, + invocation_epoch, + command_index, + command, + requires_ack + ).await + } + InvocationTaskOutputInner::SuspendedV2(notification_ids) => { + self.handle_invocation_task_suspended_v2(partition, invocation_id, invocation_epoch, notification_ids).await + } + }; + }, + timer = self.retry_timers.await_timer() => { + let (partition, fid, invocation_epoch) = timer.into_inner(); + self.handle_retry_timer_fired(options, partition, fid, invocation_epoch); + }, + Some(invocation_task_result) = self.invocation_tasks.join_next() => { + if let Err(err) = invocation_task_result { + // Propagate panics coming from invocation tasks. + if err.is_panic() { + panic::resume_unwind(err.into_panic()); + } } + // Other errors are cancellations caused by us (e.g. after AbortAllPartition), + // hence we can ignore them. + } + _ = &mut shutdown => { + debug!("Shutting down the invoker"); + self.handle_shutdown(); + return false; } - // Other errors are cancellations caused by us (e.g. after AbortAllPartition), - // hence we can ignore them. - } - _ = &mut shutdown => { - debug!("Shutting down the invoker"); - self.handle_shutdown(); - return false; } } // Execute next loop @@ -676,6 +953,34 @@ where } } + #[instrument(level = "trace", skip_all)] + fn handle_eviction(&mut self, options: &InvokerOptions) { + if let Some(concurrency_limit) = options.concurrent_invocations_limit() { + let how_many_to_evict = concurrency_limit.div_ceil(10); + let max_iterations = concurrency_limit.div_ceil(5); + let mut evicted_count = 0; + for (id, ism) in self + .invocation_state_machine_manager + .invocations_from_older_to_newer() + .take(max_iterations) + { + trace!( + restate.invocation.id = %id, + "Evicting invocation", + ); + if ism.suspend() { + evicted_count += 1; + } + if evicted_count >= how_many_to_evict { + break; + } + } + // Even if we didn't evict all of them, + // still wait for these to complete. + self.quota.evict(how_many_to_evict); + } + } + #[instrument( level = "trace", skip_all, @@ -1651,6 +1956,7 @@ mod tests { use super::*; use std::future::{pending, ready}; + use std::mem::ManuallyDrop; use std::num::NonZeroUsize; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -1994,11 +2300,15 @@ mod tests { .disable_eager_state(false) .message_size_warning(NonZeroUsize::new(1024).unwrap()) .message_size_limit(None) + .concurrent_invocations_limit(Some(NonZeroUsize::new(1).unwrap())) .build() .unwrap(); - let mut segment_queue = - std::pin::pin!(SegmentQueue::new(tempdir().unwrap().keep(), 1024).throttle(None)); + let mut segment_queue = std::pin::pin!( + SegmentQueue::new(tempdir().unwrap().keep(), 1024) + .throttle(None) + .peekable() + ); let cancel_token = CancellationToken::new(); let shutdown = cancel_token.cancelled(); @@ -2008,7 +2318,10 @@ mod tests { let invocation_id_2 = InvocationId::mock_random(); let (_invoker_tx, _status_tx, mut service_inner) = ServiceInner::mock( - |_, _, _, _, _, _, _| ready(()), + |_, _, _, _, _, rx, _| { + let _ = ManuallyDrop::new(rx); + ready(()) + }, MockSchemas( // fixed amount of retries so that an invocation eventually completes with a failure Some(RetryPolicy::fixed_delay(Duration::ZERO, Some(1))), @@ -2021,6 +2334,7 @@ mod tests { // Enqueue sid_1 and sid_2 segment_queue .as_mut() + .get_pin_mut() .inner_pin_mut() .enqueue(Box::new(InvokeCommand { partition: MOCK_PARTITION, @@ -2032,6 +2346,7 @@ mod tests { .await; segment_queue .as_mut() + .get_pin_mut() .inner_pin_mut() .enqueue(Box::new(InvokeCommand { partition: MOCK_PARTITION, @@ -2058,6 +2373,7 @@ mod tests { .in_flight() ); assert!(!service_inner.quota.is_slot_available()); + assert!(service_inner.quota.is_evictable()); // Step again to remove sid_1 from task queue. This should not invoke sid_2! assert!( @@ -2072,6 +2388,7 @@ mod tests { .is_none() ); assert!(!service_inner.quota.is_slot_available()); + assert!(!service_inner.quota.is_evictable()); // Send the close signal service_inner @@ -2080,11 +2397,12 @@ mod tests { // Slot should be available again assert!(service_inner.quota.is_slot_available()); + assert!(!service_inner.quota.is_evictable()); // Step now should invoke sid_2 assert!( service_inner - .step(&invoker_options, segment_queue.as_mut(), shutdown.as_mut(),) + .step(&invoker_options, segment_queue.as_mut(), shutdown.as_mut()) .await ); assert!( @@ -3115,4 +3433,106 @@ mod tests { }) ); } + + mod eviction { + use super::*; + + use test_log::test; + + #[test(restate_core::test)] + async fn works() { + const CONCURRENCY_LIMIT: usize = 2; + let invoker_options = InvokerOptionsBuilder::default() + .inactivity_timeout(FriendlyDuration::ZERO) + .abort_timeout(FriendlyDuration::ZERO) + .concurrent_invocations_limit(Some(NonZeroUsize::new(CONCURRENCY_LIMIT).unwrap())) + .build() + .unwrap(); + + let invocation_id_1 = InvocationId::mock_random(); + let invocation_id_2 = InvocationId::mock_random(); + let invocation_id_3 = InvocationId::mock_random(); + + // Use OnMaxAttempts::Kill to ensure pause is from manual request + let (_, _status_tx, mut service_inner) = ServiceInner::mock( + |_, _, _, _, _, rx, _| { + let _ = ManuallyDrop::new(rx); + ready(()) + }, + MockSchemas::default(), + Some(CONCURRENCY_LIMIT), + ); + let _effects_rx = service_inner.register_mock_partition(EmptyStorageReader); + + // Start the first two invocations. + for invocation_id in [invocation_id_1, invocation_id_2] { + service_inner.handle_invoke( + &invoker_options, + MOCK_PARTITION, + invocation_id, + 0, + InvocationTarget::mock_virtual_object(), + InvokeInputJournal::NoCachedJournal, + ); + } + + // Now we should be in an evictable state + assert!(!service_inner.quota.is_slot_available()); + assert!(service_inner.quota.is_evictable()); + + // Let's evict + service_inner.handle_eviction(&invoker_options); + + // When evicting, we just request suspension, let's check that. We should have done it only for invocation_id_1 + assert!( + service_inner + .invocation_state_machine_manager + .resolve_invocation(MOCK_PARTITION, &invocation_id_1) + .unwrap() + .1 + .in_flight_with_notifications_tx_closed() + ); + + // invocation_id_2 should be left untouched + assert!( + service_inner + .invocation_state_machine_manager + .resolve_invocation(MOCK_PARTITION, &invocation_id_2) + .unwrap() + .1 + .in_flight_with_notifications_tx_open() + ); + + // We're not in evictable state, and we also don't have quota. We gotta wait for suspension. + assert!(!service_inner.quota.is_evictable()); + assert!(!service_inner.quota.is_slot_available()); + + service_inner + .handle_invocation_task_suspended_v2( + MOCK_PARTITION, + invocation_id_1, + 0, + HashSet::from([NotificationId::for_completion(17)]), + ) + .await; + + // We can be evicted again now, and we also have slots available + assert!(!service_inner.quota.is_evictable()); + assert!(service_inner.quota.is_slot_available()); + + // Let's go with invocation_id_3 + service_inner.handle_invoke( + &invoker_options, + MOCK_PARTITION, + invocation_id_3, + 0, + InvocationTarget::mock_virtual_object(), + InvokeInputJournal::NoCachedJournal, + ); + + // We're again evictable + assert!(service_inner.quota.is_evictable()); + assert!(!service_inner.quota.is_slot_available()); + } + } } diff --git a/crates/invoker-impl/src/quota.rs b/crates/invoker-impl/src/quota.rs index e73787175e..328b5c708e 100644 --- a/crates/invoker-impl/src/quota.rs +++ b/crates/invoker-impl/src/quota.rs @@ -18,7 +18,10 @@ use crate::{ #[derive(Debug)] enum InvokerConcurrencyQuotaInner { Unlimited, - Limited { available_slots: usize }, + Limited { + available_slots: usize, + evicted_waiting: usize, + }, } #[derive(Debug)] @@ -36,7 +39,10 @@ impl InvokerConcurrencyQuota { .set(available_slots as f64); gauge!(INVOKER_AVAILABLE_SLOTS, "invoker_id" => ID_LOOKUP.get(invoker_id)) .set(available_slots as f64); - InvokerConcurrencyQuotaInner::Limited { available_slots } + InvokerConcurrencyQuotaInner::Limited { + available_slots, + evicted_waiting: 0, + } } None => { gauge!(INVOKER_CONCURRENCY_LIMIT, "invoker_id" => ID_LOOKUP.get(invoker_id)) @@ -50,18 +56,43 @@ impl InvokerConcurrencyQuota { Self { inner, invoker_id } } + pub(super) fn is_evictable(&self) -> bool { + match &self.inner { + InvokerConcurrencyQuotaInner::Unlimited => false, + InvokerConcurrencyQuotaInner::Limited { + available_slots, + evicted_waiting, + } => *available_slots == 0 && *evicted_waiting == 0, + } + } + + pub(super) fn evict(&mut self, evicted: usize) { + match &mut self.inner { + InvokerConcurrencyQuotaInner::Unlimited => {} + InvokerConcurrencyQuotaInner::Limited { + evicted_waiting, .. + } => *evicted_waiting += evicted, + } + } + pub(super) fn is_slot_available(&self) -> bool { match &self.inner { InvokerConcurrencyQuotaInner::Unlimited => true, - InvokerConcurrencyQuotaInner::Limited { available_slots } => *available_slots > 0, + InvokerConcurrencyQuotaInner::Limited { + available_slots, .. + } => *available_slots > 0, } } pub(super) fn unreserve_slot(&mut self) { match &mut self.inner { InvokerConcurrencyQuotaInner::Unlimited => {} - InvokerConcurrencyQuotaInner::Limited { available_slots } => { + InvokerConcurrencyQuotaInner::Limited { + available_slots, + evicted_waiting, + } => { *available_slots += 1; + *evicted_waiting = evicted_waiting.saturating_sub(1); gauge!( INVOKER_AVAILABLE_SLOTS, "invoker_id" => @@ -76,7 +107,9 @@ impl InvokerConcurrencyQuota { assert!(self.is_slot_available()); match &mut self.inner { InvokerConcurrencyQuotaInner::Unlimited => {} - InvokerConcurrencyQuotaInner::Limited { available_slots } => { + InvokerConcurrencyQuotaInner::Limited { + available_slots, .. + } => { *available_slots -= 1; gauge!( INVOKER_AVAILABLE_SLOTS, @@ -92,7 +125,9 @@ impl InvokerConcurrencyQuota { pub(super) fn available_slots(&self) -> usize { match self.inner { InvokerConcurrencyQuotaInner::Unlimited => usize::MAX, - InvokerConcurrencyQuotaInner::Limited { available_slots } => available_slots, + InvokerConcurrencyQuotaInner::Limited { + available_slots, .. + } => available_slots, } } } diff --git a/crates/invoker-impl/src/state_machine_manager.rs b/crates/invoker-impl/src/state_machine_manager.rs index 8138fe6905..efa8d74b64 100644 --- a/crates/invoker-impl/src/state_machine_manager.rs +++ b/crates/invoker-impl/src/state_machine_manager.rs @@ -9,11 +9,12 @@ // by the Apache License, Version 2.0. use super::*; -use std::ops::RangeInclusive; - +use indexmap::IndexMap; use restate_invoker_api::Effect; use restate_invoker_api::invocation_reader::InvocationReader; use restate_types::identifiers::PartitionKey; +use std::iter::empty; +use std::ops::RangeInclusive; /// Tree of [InvocationStateMachine] held by the [Service]. #[derive(Debug)] @@ -199,4 +200,17 @@ where ) -> Option<&mut PartitionInvocationStateMachineCoordinator> { self.partitions.get_mut(&partition) } + + #[inline] + pub(super) fn invocations_from_older_to_newer( + &mut self, + ) -> impl Iterator { + // this is based on the assumption that there is only one partition per invoker (which is always the case at the moment). + let partition_map = self.partitions.iter_mut().next(); + if let Some(partition) = partition_map { + itertools::Either::Left(partition.1.invocation_state_machines.iter_mut()) + } else { + itertools::Either::Right(empty()) + } + } } diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index d9e8a7db6a..eca17f80ee 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -211,7 +211,7 @@ pub enum DurabilityMode { } pub const DEFAULT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(60); -pub const DEFAULT_ABORT_TIMEOUT: Duration = Duration::from_secs(60); +pub const DEFAULT_ABORT_TIMEOUT: Duration = Duration::from_secs(10 * 60); /// # Invoker options #[serde_as] diff --git a/scripts/metrics-compose-setup/README.md b/scripts/metrics-compose-setup/README.md new file mode 100644 index 0000000000..f7cb1ddc59 --- /dev/null +++ b/scripts/metrics-compose-setup/README.md @@ -0,0 +1 @@ +# Docker compose Prometheus - Grafana setup for local testing diff --git a/scripts/metrics-compose-setup/docker-compose.yaml b/scripts/metrics-compose-setup/docker-compose.yaml new file mode 100644 index 0000000000..bd2cee7617 --- /dev/null +++ b/scripts/metrics-compose-setup/docker-compose.yaml @@ -0,0 +1,30 @@ +services: + prometheus: + image: docker.io/prom/prometheus:latest + command: + - '--config.file=/prometheus.yml' + restart: unless-stopped + network_mode: "host" + volumes: + - ./prometheus.yml:/prometheus.yml:Z,ro + + json_exporter: + image: quay.io/prometheuscommunity/json-exporter:latest + container_name: json_exporter + network_mode: "host" + volumes: + - ./json-exporter.yml:/config/config.yml:Z,ro + command: + - '--config.file=/config/config.yml' + - '--web.listen-address=:7979' + + grafana: + image: docker.io/grafana/grafana:latest + restart: unless-stopped + network_mode: "host" + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning:Z,rw + - ./grafana/dashboards:/var/lib/grafana/dashboards:Z,rw \ No newline at end of file diff --git a/scripts/metrics-compose-setup/grafana/dashboards/dashboard.json b/scripts/metrics-compose-setup/grafana/dashboards/dashboard.json new file mode 100644 index 0000000000..fbb63e50b0 --- /dev/null +++ b/scripts/metrics-compose-setup/grafana/dashboards/dashboard.json @@ -0,0 +1,315 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "{instance=\"host.containers.internal:5122\", job=\"restate\"}" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "#00000000", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "rate(restate_partition_apply_command_seconds_count{command=\"Invoke\"}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "rate(restate_invoker_task_duration_seconds_count[$__rate_interval])", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B", + "useBackend": false + } + ], + "title": "Panel Title", + "type": "timeseries" + }, + { + "datasource": { + "default": true, + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "{instance=\"host.containers.internal:5122\", job=\"restate\"}" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "#00000000", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "11.5.2", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "rate(restate_partition_apply_command_seconds_count{command=\"Invoke\"}[$__rate_interval])", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "PP ingestion", + "useBackend": false + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "builder", + "expr": "rate(restate_kafka_ingress_requests_total[$__rate_interval])", + "fullMetaSearch": false, + "hide": false, + "includeNullMetadata": true, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "Kafka Ingress ingestion", + "useBackend": false + } + ], + "title": "Panel Title", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "", + "schemaVersion": 40, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-15m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "New dashboard", + "uid": "bdy29e1mhn1fka", + "version": 3, + "weekStart": "" +} \ No newline at end of file diff --git a/scripts/metrics-compose-setup/grafana/dashboards/invoker_concurrency.json b/scripts/metrics-compose-setup/grafana/dashboards/invoker_concurrency.json new file mode 100644 index 0000000000..164428edad --- /dev/null +++ b/scripts/metrics-compose-setup/grafana/dashboards/invoker_concurrency.json @@ -0,0 +1,675 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 2, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "Bps" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 6, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "exemplar": false, + "expr": "rate(restate_rocksdb_bytes_written_total[$__rate_interval])", + "hide": false, + "instant": false, + "legendFormat": "Writes {{db}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "rate(restate_rocksdb_bytes_read_total[$__rate_interval])", + "hide": false, + "instant": false, + "legendFormat": "Reads {{db}}", + "range": true, + "refId": "C" + } + ], + "title": "Rocksdb reads and writes", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 4, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.1", + "targets": [ + { + "editorMode": "builder", + "expr": "restate_invoker_task_duration_seconds", + "legendFormat": "Quantile {{quantile}}", + "range": true, + "refId": "A" + } + ], + "title": "Invoker task duration", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "__systemRef": "hideSeriesFrom", + "matcher": { + "id": "byNames", + "options": { + "mode": "exclude", + "names": [ + "invoked", + "suspended" + ], + "prefix": "All except:", + "readOnly": true + } + }, + "properties": [ + { + "id": "custom.hideFrom", + "value": { + "legend": false, + "tooltip": true, + "viz": true + } + } + ] + } + ] + }, + "gridPos": { + "h": 9, + "w": 24, + "x": 0, + "y": 13 + }, + "id": 2, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.1", + "targets": [ + { + "editorMode": "builder", + "expr": "restate_invocations_total{status!=\"completed\"}", + "legendFormat": "{{status}}", + "range": true, + "refId": "A" + } + ], + "title": "Invocation count (through SQL)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 14, + "x": 0, + "y": 22 + }, + "id": 1, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "restate_invoker_available_slots", + "legendFormat": "Invoker available slots", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "editorMode": "builder", + "expr": "vertx_http_server_active_requests{method=\"POST\"}", + "hide": false, + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "B" + } + ], + "title": "Requests and invoker slots", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 10, + "x": 14, + "y": 22 + }, + "id": 7, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.1", + "targets": [ + { + "editorMode": "builder", + "expr": "jvm_threads_live_threads", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "JVM Live threads", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 5, + "w": 24, + "x": 0, + "y": 30 + }, + "id": 3, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.1", + "targets": [ + { + "editorMode": "builder", + "expr": "restate_partition_applied_lsn_lag", + "legendFormat": "{{partition}}", + "range": true, + "refId": "A" + } + ], + "title": "PP Lag", + "type": "timeseries" + } + ], + "preload": false, + "refresh": "auto", + "schemaVersion": 42, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-3m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Invoker concurrency dashboard", + "uid": "ad4ktvw", + "version": 1 +} \ No newline at end of file diff --git a/scripts/metrics-compose-setup/grafana/provisioning/dashboards/dashboards.yml b/scripts/metrics-compose-setup/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000000..a26d4a30f3 --- /dev/null +++ b/scripts/metrics-compose-setup/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + options: + path: /var/lib/grafana/dashboards \ No newline at end of file diff --git a/scripts/metrics-compose-setup/grafana/provisioning/datasources/datasource.yml b/scripts/metrics-compose-setup/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000000..ad643f423b --- /dev/null +++ b/scripts/metrics-compose-setup/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,9 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + url: http://localhost:9090 + isDefault: true + access: proxy + editable: true diff --git a/scripts/metrics-compose-setup/greeter.lua b/scripts/metrics-compose-setup/greeter.lua new file mode 100644 index 0000000000..d5fce8074c --- /dev/null +++ b/scripts/metrics-compose-setup/greeter.lua @@ -0,0 +1,6 @@ +-- example HTTP POST script which demonstrates setting the +-- HTTP method, body, and adding a header + +wrk.method = "POST" +wrk.body = '{"name":"Francesco"}' +wrk.headers["Content-Type"] = "application/json" diff --git a/scripts/metrics-compose-setup/json-exporter.yml b/scripts/metrics-compose-setup/json-exporter.yml new file mode 100644 index 0000000000..51a1a15d3a --- /dev/null +++ b/scripts/metrics-compose-setup/json-exporter.yml @@ -0,0 +1,31 @@ +modules: + restate_query: + headers: + Content-Type: application/json + Accept: application/json + body: + content: "{ \"query\": \"SELECT 'completed' AS status, COUNT(*) AS count FROM sys_invocation_status WHERE status = 'completed' UNION ALL SELECT 'suspended' AS status, COUNT(*) AS count FROM sys_invocation_status WHERE status = 'suspended' UNION ALL SELECT 'invoked' AS status, COUNT(*) AS count FROM sys_invocation_status WHERE status = 'invoked' UNION ALL SELECT 'paused' AS status, COUNT(*) AS count FROM sys_invocation_status WHERE status = 'paused';\" }" + + metrics: + - name: restate_invocations_total + help: "Number of invocations per status" + path: '{ .rows[?(@.status == "suspended")].count }' + labels: + status: suspended + - name: restate_invocations_total + help: "Number of invocations per status" + path: '{ .rows[?(@.status == "invoked")].count }' + labels: + status: invoked + - name: restate_invocations_total + help: "Number of invocations per status" + path: '{ .rows[?(@.status == "completed")].count }' + value_type: "gauge" + valueType: "gauge" + labels: + status: completed + - name: restate_invocations_total + help: "Number of invocations per status" + path: '{ .rows[?(@.status == "paused")].count }' + labels: + status: paused diff --git a/scripts/metrics-compose-setup/prometheus.yml b/scripts/metrics-compose-setup/prometheus.yml new file mode 100644 index 0000000000..0b68ccab96 --- /dev/null +++ b/scripts/metrics-compose-setup/prometheus.yml @@ -0,0 +1,41 @@ +global: + scrape_interval: 1s + scrape_timeout: 1s + evaluation_interval: 1s +scrape_configs: + - job_name: restate + honor_timestamps: true + scrape_interval: 1s + scrape_timeout: 1s + metrics_path: /metrics + scheme: http + static_configs: + - targets: + - localhost:5122 + + - job_name: service + honor_timestamps: true + scrape_interval: 1s + scrape_timeout: 1s + metrics_path: /metrics/vertx + scheme: http + static_configs: + - targets: + - localhost:9081 + + - job_name: 'restate_invocations_sql' + metrics_path: /probe + scrape_interval: 5s + scrape_timeout: 5s + params: + module: [ restate_query ] + static_configs: + - targets: + - 'http://localhost:9070/query' + relabel_configs: + - source_labels: [ __address__ ] + target_label: __param_target + - source_labels: [ __param_target ] + target_label: instance + - target_label: __address__ + replacement: localhost:7979 \ No newline at end of file