diff --git a/crates/admin/src/cluster_controller/grpc_svc_handler.rs b/crates/admin/src/cluster_controller/grpc_svc_handler.rs index 39f293eac8..358fa6c9be 100644 --- a/crates/admin/src/cluster_controller/grpc_svc_handler.rs +++ b/crates/admin/src/cluster_controller/grpc_svc_handler.rs @@ -39,6 +39,7 @@ use restate_types::logs::metadata::{Logs, SegmentIndex}; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::metadata::{GlobalMetadata, Precondition}; use restate_types::metadata_store::keys::{NODES_CONFIG_KEY, partition_processor_epoch_key}; +use restate_types::net::connect_opts::GrpcConnectionOptions; use restate_types::net::partition_processor_manager::Snapshot; use restate_types::nodes_config::{NodesConfiguration, Role}; use restate_types::partitions::PartitionTable; @@ -80,8 +81,7 @@ impl ClusterCtrlSvcHandler { pub fn into_server(self, config: &NetworkingOptions) -> ClusterCtrlSvcServer { let server = ClusterCtrlSvcServer::new(self) - .max_decoding_message_size(config.max_message_size.as_usize()) - .max_encoding_message_size(config.max_message_size.as_usize()) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); diff --git a/crates/cli-util/src/opts.rs b/crates/cli-util/src/opts.rs index 12ce9f6f5d..e1c46fceec 100644 --- a/crates/cli-util/src/opts.rs +++ b/crates/cli-util/src/opts.rs @@ -8,13 +8,17 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; use std::time::Duration; use clap::{Args, ValueEnum}; use clap_verbosity_flag::{LogLevel, VerbosityFilter}; use cling::Collect; -use restate_types::net::connect_opts::{CommonClientConnectionOptions, GrpcConnectionOptions}; +use restate_types::config::DEFAULT_MESSAGE_SIZE_LIMIT; +use restate_types::net::connect_opts::{ + CommonClientConnectionOptions, GrpcConnectionOptions, MESSAGE_SIZE_OVERHEAD, +}; const DEFAULT_CONNECT_TIMEOUT: u64 = 3_000; const DEFAULT_REQUEST_TIMEOUT: u64 = 13_000; @@ -79,7 +83,7 @@ pub(crate) struct ConfirmMode { pub yes: bool, } -#[derive(Args, Clone, Default)] +#[derive(Args, Clone)] pub struct NetworkOpts { /// Connection timeout for network calls, in milliseconds. #[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT, global = true)] @@ -91,14 +95,26 @@ pub struct NetworkOpts { /// insecure #[arg[long = "insecure-skip-tls-verify", default_value_t = false, global = true]] pub insecure_skip_tls_verify: bool, - /// Limits the maximum size of a grpc message. - #[arg[long, default_value_t = 10*1024*1024, global = true, hide = true]] - pub max_message_size: usize, + /// Sets the maximum size of a network messages. + #[arg[long, default_value_t = DEFAULT_MESSAGE_SIZE_LIMIT, global = true, hide = true]] + pub message_size_limit: NonZeroUsize, +} + +impl Default for NetworkOpts { + fn default() -> Self { + Self { + connect_timeout: DEFAULT_CONNECT_TIMEOUT, + request_timeout: DEFAULT_REQUEST_TIMEOUT, + insecure_skip_tls_verify: false, + message_size_limit: DEFAULT_MESSAGE_SIZE_LIMIT, + } + } } impl GrpcConnectionOptions for NetworkOpts { - fn max_message_size(&self) -> usize { - self.max_message_size + fn message_size_limit(&self) -> NonZeroUsize { + self.message_size_limit + .saturating_add(MESSAGE_SIZE_OVERHEAD) } } diff --git a/crates/core/src/network/grpc/connector.rs b/crates/core/src/network/grpc/connector.rs index cba4c1cf26..9e7767308c 100644 --- a/crates/core/src/network/grpc/connector.rs +++ b/crates/core/src/network/grpc/connector.rs @@ -21,8 +21,8 @@ use tracing::{debug, warn}; use restate_types::config::{Configuration, NetworkingOptions}; use restate_types::net::address::{AdvertisedAddress, GrpcPort, ListenerPort, PeerNetAddress}; +use restate_types::net::connect_opts::GrpcConnectionOptions; -use super::MAX_MESSAGE_SIZE; use crate::network::grpc::DEFAULT_GRPC_COMPRESSION; use crate::network::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient; use crate::network::protobuf::network::Message; @@ -52,19 +52,19 @@ impl TransportConnect for GrpcConnector { }; debug!("Connecting to {} at {}", destination, address); - let channel = create_channel(address, swimlane, &Configuration::pinned().networking); + let networking = &Configuration::pinned().networking; + let channel = create_channel(address, swimlane, networking); // Establish the connection let client = CoreNodeSvcClient::new(channel) - .max_encoding_message_size(MAX_MESSAGE_SIZE) - .max_decoding_message_size(MAX_MESSAGE_SIZE) + .max_decoding_message_size(networking.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); // Apply send compression only if compression is enabled. Note that this doesn't impact the // "receive" compression. The receive compression is always applied if the peer compresses // its send stream. - let mut client = if Configuration::pinned().networking.disable_compression { + let mut client = if networking.disable_compression { client } else { client.send_compressed(DEFAULT_GRPC_COMPRESSION) diff --git a/crates/core/src/network/grpc/mod.rs b/crates/core/src/network/grpc/mod.rs index 36b09149e0..4547738e81 100644 --- a/crates/core/src/network/grpc/mod.rs +++ b/crates/core/src/network/grpc/mod.rs @@ -11,13 +11,10 @@ mod connector; mod svc_handler; -pub use connector::GrpcConnector; -pub use svc_handler::CoreNodeSvcHandler; use tonic::codec::CompressionEncoding; -/// The maximum size for a grpc message for core networking service. -/// This impacts the buffer limit for prost codec. -pub const MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024; +pub use connector::GrpcConnector; +pub use svc_handler::CoreNodeSvcHandler; /// Default send compression for grpc clients pub const DEFAULT_GRPC_COMPRESSION: CompressionEncoding = CompressionEncoding::Zstd; diff --git a/crates/core/src/network/grpc/svc_handler.rs b/crates/core/src/network/grpc/svc_handler.rs index e5c5d8adfa..7a0bacd29f 100644 --- a/crates/core/src/network/grpc/svc_handler.rs +++ b/crates/core/src/network/grpc/svc_handler.rs @@ -15,6 +15,7 @@ use tonic::{Request, Response, Status, Streaming}; use tracing::warn; use restate_types::config::NetworkingOptions; +use restate_types::net::connect_opts::GrpcConnectionOptions; use crate::network::ConnectionManager; use crate::network::protobuf::core_node_svc::core_node_svc_server::{ @@ -23,8 +24,6 @@ use crate::network::protobuf::core_node_svc::core_node_svc_server::{ use crate::network::protobuf::core_node_svc::{RpcRequest, RpcResponse}; use crate::network::protobuf::network::Message; -use super::MAX_MESSAGE_SIZE; - pub struct CoreNodeSvcHandler { connections: ConnectionManager, } @@ -36,8 +35,7 @@ impl CoreNodeSvcHandler { pub fn into_server(self, config: &NetworkingOptions) -> CoreNodeSvcServer { let server = CoreNodeSvcServer::new(self) - .max_decoding_message_size(MAX_MESSAGE_SIZE) - .max_encoding_message_size(MAX_MESSAGE_SIZE) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); diff --git a/crates/core/src/protobuf.rs b/crates/core/src/protobuf.rs index 0773bda8ba..42d897f8d4 100644 --- a/crates/core/src/protobuf.rs +++ b/crates/core/src/protobuf.rs @@ -22,8 +22,7 @@ pub mod cluster_ctrl_svc { connection_options: &O, ) -> cluster_ctrl_svc_client::ClusterCtrlSvcClient { cluster_ctrl_svc_client::ClusterCtrlSvcClient::new(channel) - .max_encoding_message_size(connection_options.max_message_size()) - .max_decoding_message_size(connection_options.max_message_size()) + .max_decoding_message_size(connection_options.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(tonic::codec::CompressionEncoding::Zstd) .accept_compressed(tonic::codec::CompressionEncoding::Gzip) @@ -69,8 +68,7 @@ pub mod node_ctl_svc { connection_options: &O, ) -> NodeCtlSvcClient { node_ctl_svc_client::NodeCtlSvcClient::new(channel) - .max_encoding_message_size(connection_options.max_message_size()) - .max_decoding_message_size(connection_options.max_message_size()) + .max_decoding_message_size(connection_options.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip) diff --git a/crates/invoker-impl/src/invocation_task/mod.rs b/crates/invoker-impl/src/invocation_task/mod.rs index 48aafbf857..7c804e0913 100644 --- a/crates/invoker-impl/src/invocation_task/mod.rs +++ b/crates/invoker-impl/src/invocation_task/mod.rs @@ -16,6 +16,7 @@ use super::Notification; use std::collections::HashSet; use std::convert::Infallible; use std::iter::Empty; +use std::num::NonZeroUsize; use std::pin::Pin; use std::task::{Context, Poll, ready}; use std::time::{Duration, Instant}; @@ -145,8 +146,8 @@ pub(super) struct InvocationTask { inactivity_timeout: Duration, abort_timeout: Duration, disable_eager_state: bool, - message_size_warning: usize, - message_size_limit: Option, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, retry_count_since_last_stored_entry: u32, // Invoker tx/rx @@ -207,8 +208,8 @@ where default_inactivity_timeout: Duration, default_abort_timeout: Duration, disable_eager_state: bool, - message_size_warning: usize, - message_size_limit: Option, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, retry_count_since_last_stored_entry: u32, invocation_reader: IR, entry_enricher: EE, diff --git a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs index e0b8f3f586..5e55e96d6c 100644 --- a/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs +++ b/crates/invoker-impl/src/invocation_task/service_protocol_runner_v4.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::HashSet; +use std::num::NonZeroUsize; use std::ops::Deref; use std::pin::Pin; use std::task::{Context, Poll, ready}; @@ -1152,8 +1153,8 @@ impl DecoderStream { fn new( inner: S, service_protocol_version: ServiceProtocolVersion, - message_size_warning: usize, - message_size_limit: Option, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, ) -> Self { Self { inner, diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 58f3e297ae..7ae2c13abf 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -146,7 +146,7 @@ where opts.inactivity_timeout.into(), opts.abort_timeout.into(), opts.disable_eager_state, - opts.message_size_warning.get(), + opts.message_size_warning.as_non_zero_usize(), opts.message_size_limit(), retry_count_since_last_stored_entry, storage_reader, @@ -1881,7 +1881,7 @@ mod tests { .inactivity_timeout(FriendlyDuration::ZERO) .abort_timeout(FriendlyDuration::ZERO) .disable_eager_state(false) - .message_size_warning(NonZeroUsize::new(1024).unwrap()) + .message_size_warning(NonZeroUsize::new(1024).unwrap().into()) .message_size_limit(None) .build() .unwrap(); @@ -1951,7 +1951,7 @@ mod tests { .inactivity_timeout(FriendlyDuration::ZERO) .abort_timeout(FriendlyDuration::ZERO) .disable_eager_state(false) - .message_size_warning(NonZeroUsize::new(1024).unwrap()) + .message_size_warning(NonZeroUsize::new(1024).unwrap().into()) .message_size_limit(None) .build() .unwrap(); @@ -2066,7 +2066,7 @@ mod tests { .inactivity_timeout(FriendlyDuration::ZERO) .abort_timeout(FriendlyDuration::ZERO) .disable_eager_state(false) - .message_size_warning(NonZeroUsize::new(1024).unwrap()) + .message_size_warning(NonZeroUsize::new(1024).unwrap().into()) .message_size_limit(None) .build() .unwrap(); @@ -2143,7 +2143,7 @@ mod tests { .inactivity_timeout(FriendlyDuration::ZERO) .abort_timeout(FriendlyDuration::ZERO) .disable_eager_state(false) - .message_size_warning(NonZeroUsize::new(1024).unwrap()) + .message_size_warning(NonZeroUsize::new(1024).unwrap().into()) .message_size_limit(None) .build() .unwrap(); diff --git a/crates/log-server-grpc/src/lib.rs b/crates/log-server-grpc/src/lib.rs index 4e689652ec..516d32091e 100644 --- a/crates/log-server-grpc/src/lib.rs +++ b/crates/log-server-grpc/src/lib.rs @@ -15,22 +15,18 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = /// Creates a new ClusterCtrlSvcClient with appropriate configuration #[cfg(feature = "grpc-client")] -pub fn new_log_server_client( +pub fn new_log_server_client( channel: tonic::transport::Channel, + connection_options: &O, ) -> log_server_svc_client::LogServerSvcClient { - /// The maximum size for a grpc message for core networking service. - /// This impacts the buffer limit for prost codec. - pub const MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024; - use tonic::codec::CompressionEncoding; /// Default send compression for grpc clients pub const DEFAULT_GRPC_COMPRESSION: CompressionEncoding = CompressionEncoding::Zstd; log_server_svc_client::LogServerSvcClient::new(channel) - .max_decoding_message_size(MAX_MESSAGE_SIZE) - .max_encoding_message_size(MAX_MESSAGE_SIZE) + .max_decoding_message_size(connection_options.message_size_limit().get()) // note: the order of those calls defines the priority - .accept_compressed(tonic::codec::CompressionEncoding::Zstd) - .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Gzip) .send_compressed(DEFAULT_GRPC_COMPRESSION) } diff --git a/crates/log-server/src/grpc_svc_handler.rs b/crates/log-server/src/grpc_svc_handler.rs index c9d84c8284..b02c0259f8 100644 --- a/crates/log-server/src/grpc_svc_handler.rs +++ b/crates/log-server/src/grpc_svc_handler.rs @@ -12,9 +12,9 @@ use async_trait::async_trait; use tonic::codec::CompressionEncoding; use tonic::{Request, Response, Status}; -use restate_core::network::grpc::MAX_MESSAGE_SIZE; use restate_types::config::NetworkingOptions; use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; +use restate_types::net::connect_opts::GrpcConnectionOptions; use restate_types::net::log_server::{GetDigest, LogServerResponseHeader, LogletInfo}; use crate::logstore::LogStore; @@ -42,8 +42,7 @@ where pub fn into_server(self, config: &NetworkingOptions) -> LogServerSvcServer { let server = LogServerSvcServer::new(self) - .max_decoding_message_size(MAX_MESSAGE_SIZE) - .max_encoding_message_size(MAX_MESSAGE_SIZE) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); diff --git a/crates/log-server/src/protobuf.rs b/crates/log-server/src/protobuf.rs index ab93ef2b57..78facfd154 100644 --- a/crates/log-server/src/protobuf.rs +++ b/crates/log-server/src/protobuf.rs @@ -15,13 +15,13 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = /// Creates a new ClusterCtrlSvcClient with appropriate configuration #[cfg(feature = "clients")] -pub fn new_log_server_client( +pub fn new_log_server_client( channel: tonic::transport::Channel, + connection_options: &O, ) -> log_server_svc_client::LogServerSvcClient { - use restate_core::network::grpc::{DEFAULT_GRPC_COMPRESSION, MAX_MESSAGE_SIZE}; + use restate_core::network::grpc::DEFAULT_GRPC_COMPRESSION; log_server_svc_client::LogServerSvcClient::new(channel) - .max_decoding_message_size(MAX_MESSAGE_SIZE) - .max_encoding_message_size(MAX_MESSAGE_SIZE) + .max_decoding_message_size(connection_options.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(tonic::codec::CompressionEncoding::Zstd) .accept_compressed(tonic::codec::CompressionEncoding::Gzip) diff --git a/crates/metadata-server-grpc/src/grpc.rs b/crates/metadata-server-grpc/src/grpc.rs index 0881170d17..ea24300731 100644 --- a/crates/metadata-server-grpc/src/grpc.rs +++ b/crates/metadata-server-grpc/src/grpc.rs @@ -28,8 +28,7 @@ where pub const DEFAULT_GRPC_COMPRESSION: CompressionEncoding = CompressionEncoding::Zstd; metadata_server_svc_client::MetadataServerSvcClient::new(channel) - .max_decoding_message_size(connection_options.max_message_size()) - .max_encoding_message_size(connection_options.max_message_size()) + .max_decoding_message_size(connection_options.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip) diff --git a/crates/metadata-server/src/grpc/handler.rs b/crates/metadata-server/src/grpc/handler.rs index de3a10fee5..44f60d5c5d 100644 --- a/crates/metadata-server/src/grpc/handler.rs +++ b/crates/metadata-server/src/grpc/handler.rs @@ -30,6 +30,7 @@ use restate_types::config::{Configuration, NetworkingOptions}; use restate_types::errors::ConversionError; use restate_types::metadata::Precondition; use restate_types::metadata_store::keys::NODES_CONFIG_KEY; +use restate_types::net::connect_opts::GrpcConnectionOptions; use restate_types::nodes_config::NodesConfiguration; use restate_types::storage::StorageCodec; @@ -71,8 +72,7 @@ impl MetadataServerHandler { pub fn into_server(self, config: &NetworkingOptions) -> MetadataServerSvcServer { let server = MetadataServerSvcServer::new(self) - .max_decoding_message_size(config.max_message_size.as_usize()) - .max_encoding_message_size(config.max_message_size.as_usize()) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); diff --git a/crates/metadata-server/src/raft/network/handler.rs b/crates/metadata-server/src/raft/network/handler.rs index 336cb00e36..467a0172a2 100644 --- a/crates/metadata-server/src/raft/network/handler.rs +++ b/crates/metadata-server/src/raft/network/handler.rs @@ -19,6 +19,7 @@ use tonic::{Request, Response, Status, Streaming}; use restate_types::PlainNodeId; use restate_types::config::NetworkingOptions; +use restate_types::net::connect_opts::GrpcConnectionOptions; use super::MetadataServerNetworkSvcServer; use crate::raft::network::connection_manager::ConnectionError; @@ -46,8 +47,7 @@ impl MetadataServerNetworkHandler { pub fn into_server(self, config: &NetworkingOptions) -> MetadataServerNetworkSvcServer { let server = MetadataServerNetworkSvcServer::new(self) - .max_decoding_message_size(config.max_message_size.as_usize()) - .max_encoding_message_size(config.max_message_size.as_usize()) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); diff --git a/crates/metadata-store/src/metadata_store.rs b/crates/metadata-store/src/metadata_store.rs index 7508d816dd..5de26e15fa 100644 --- a/crates/metadata-store/src/metadata_store.rs +++ b/crates/metadata-store/src/metadata_store.rs @@ -514,7 +514,7 @@ impl MetadataStoreClient { value: &VersionedValue, ) -> Result<(), MetadataSizeHardLimitError> { let config = &Configuration::pinned().common.metadata_client; - let grp_msg_size_limit = config.max_message_size() as f64; + let grp_msg_size_limit = config.message_size_limit().get() as f64; let soft_limit = (grp_msg_size_limit * METADATA_SIZE_SOFT_LIMIT) as usize; let hard_limit = (grp_msg_size_limit * METADATA_SIZE_HARD_LIMIT) as usize; diff --git a/crates/metadata-store/src/protobuf.rs b/crates/metadata-store/src/protobuf.rs index e58007a1bb..53bc7b8b67 100644 --- a/crates/metadata-store/src/protobuf.rs +++ b/crates/metadata-store/src/protobuf.rs @@ -42,8 +42,7 @@ pub mod metadata_proxy_svc { options: &O, ) -> MetadataProxySvcClient { MetadataProxySvcClient::new(connection_options) - .max_encoding_message_size(options.max_message_size()) - .max_decoding_message_size(options.max_message_size()) + .max_decoding_message_size(options.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip) diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 344a5c6058..864968f2e8 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -23,6 +23,7 @@ use restate_metadata_store::protobuf::metadata_proxy_svc::metadata_proxy_svc_ser use restate_metadata_store::protobuf::metadata_proxy_svc::{ DeleteRequest, GetRequest, GetResponse, GetVersionResponse, PutRequest, }; +use restate_types::net::connect_opts::GrpcConnectionOptions; use restate_core::network::net_util::{DNSResolution, create_tonic_channel}; use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::{NodeCtlSvc, NodeCtlSvcServer}; @@ -56,8 +57,7 @@ impl NodeCtlSvcHandler { pub fn into_server(self, config: &NetworkingOptions) -> NodeCtlSvcServer { let server = NodeCtlSvcServer::new(self) - .max_decoding_message_size(config.max_message_size.as_usize()) - .max_encoding_message_size(config.max_message_size.as_usize()) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); @@ -274,9 +274,8 @@ impl MetadataProxySvcHandler { pub fn into_server(self, config: &NetworkingOptions) -> MetadataProxySvcServer { let server = MetadataProxySvcServer::new(self) + .max_decoding_message_size(config.message_size_limit().get()) // note: the order of those calls defines the priority - .max_decoding_message_size(config.max_message_size.as_usize()) - .max_encoding_message_size(config.max_message_size.as_usize()) .accept_compressed(CompressionEncoding::Zstd) .accept_compressed(CompressionEncoding::Gzip); if config.disable_compression { diff --git a/crates/serde-util/src/byte_count.rs b/crates/serde-util/src/byte_count.rs index eabcd1e36e..95b3b1f20f 100644 --- a/crates/serde-util/src/byte_count.rs +++ b/crates/serde-util/src/byte_count.rs @@ -73,17 +73,17 @@ impl schemars::JsonSchema for ByteCount { } impl ByteCount { - pub fn new(value: u64) -> Self { + pub const fn new(value: u64) -> Self { Self(value) } } impl ByteCount { - pub fn new(value: NonZeroUsize) -> Self { - Self(usize::from(value) as u64) + pub const fn new(value: NonZeroUsize) -> Self { + Self(value.get() as u64) } - pub fn as_non_zero_usize(&self) -> NonZeroUsize { + pub const fn as_non_zero_usize(&self) -> NonZeroUsize { NonZeroUsize::new(self.0 as usize).expect("ByteCount is not zero") } } @@ -95,11 +95,11 @@ impl Display for ByteCount { } impl ByteCount { - pub fn as_u64(&self) -> u64 { + pub const fn as_u64(&self) -> u64 { self.0 } - pub fn as_usize(&self) -> usize { + pub const fn as_usize(&self) -> usize { self.0 as usize } } diff --git a/crates/service-protocol-v4/src/message_codec/encoding.rs b/crates/service-protocol-v4/src/message_codec/encoding.rs index 8f317cb12f..cb8e243ff1 100644 --- a/crates/service-protocol-v4/src/message_codec/encoding.rs +++ b/crates/service-protocol-v4/src/message_codec/encoding.rs @@ -12,6 +12,7 @@ use super::UnknownMessageType; use super::*; use std::mem; +use std::num::NonZeroUsize; use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes_utils::SegmentedBuf; @@ -29,7 +30,7 @@ pub enum EncodingError { UnknownMessageType(#[from] UnknownMessageType), #[error("hit message size limit: {0} >= {1}")] #[code(restate_errors::RT0003)] - MessageSizeLimit(usize, usize), + MessageSizeLimit(usize, NonZeroUsize), } // --- Input message encoder @@ -107,15 +108,15 @@ fn generate_header(msg: &Message) -> MessageHeader { pub struct Decoder { buf: SegmentedBuf, state: DecoderState, - message_size_warning: usize, - message_size_limit: usize, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, } impl Decoder { pub fn new( service_protocol_version: ServiceProtocolVersion, - message_size_warning: usize, - message_size_limit: Option, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, ) -> Self { assert_ne!( service_protocol_version, @@ -126,7 +127,7 @@ impl Decoder { buf: SegmentedBuf::new(), state: DecoderState::WaitingHeader, message_size_warning, - message_size_limit: message_size_limit.unwrap_or(usize::MAX), + message_size_limit, } } @@ -177,8 +178,8 @@ impl DecoderState { fn decode( &mut self, mut buf: impl Buf, - message_size_warning: usize, - message_size_limit: usize, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, ) -> Result, EncodingError> { let mut res = None; @@ -188,7 +189,7 @@ impl DecoderState { let message_length = usize::try_from(header.frame_length()).expect("u32 must convert into usize"); - if message_length >= message_size_warning { + if message_length >= message_size_warning.get() { warn!( "Message size warning for '{:?}': {} >= {}. \ Generating very large messages can make the system unstable if configured with too little memory. \ @@ -198,7 +199,7 @@ impl DecoderState { ByteCount::from(message_size_warning), ); } - if message_length >= message_size_limit { + if message_length >= message_size_limit.get() { return Err(EncodingError::MessageSizeLimit( message_length, message_size_limit, @@ -230,7 +231,11 @@ mod tests { #[test] fn fill_decoder_with_several_messages() { let mut encoder = Encoder::new(ServiceProtocolVersion::V1); - let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); + let mut decoder = Decoder::new( + ServiceProtocolVersion::V1, + NonZeroUsize::MAX, + NonZeroUsize::MAX, + ); let expected_msg_0 = Message::new_start_message( "key".into(), @@ -284,7 +289,11 @@ mod tests { fn partial_decoding_test(split_index: usize) { let mut encoder = Encoder::new(ServiceProtocolVersion::V1); - let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); + let mut decoder = Decoder::new( + ServiceProtocolVersion::V1, + NonZeroUsize::MAX, + NonZeroUsize::MAX, + ); let expected_msg = Message::InputCommand(Bytes::from_static(b"123")); let expected_msg_encoded = encoder.encode(expected_msg.clone()); @@ -305,8 +314,8 @@ mod tests { fn hit_message_size_limit() { let mut decoder = Decoder::new( ServiceProtocolVersion::V1, - (u8::MAX / 2) as usize, - Some(u8::MAX as usize), + NonZeroUsize::new((u8::MAX / 2) as usize).unwrap(), + NonZeroUsize::new(u8::MAX as usize).unwrap(), ); let mut encoder = Encoder::new(ServiceProtocolVersion::V1); @@ -319,6 +328,6 @@ mod tests { EncodingError::MessageSizeLimit(msg_size, limit) = decoder.consume_next().unwrap_err() ); assert_eq!(msg_size, expected_msg_size); - assert_eq!(limit, u8::MAX as usize) + assert_eq!(limit, NonZeroUsize::new(u8::MAX as usize).unwrap()) } } diff --git a/crates/service-protocol/src/message/encoding.rs b/crates/service-protocol/src/message/encoding.rs index 9b36f85f52..8422f76963 100644 --- a/crates/service-protocol/src/message/encoding.rs +++ b/crates/service-protocol/src/message/encoding.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::mem; +use std::num::NonZeroUsize; use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes_utils::SegmentedBuf; @@ -30,7 +31,7 @@ pub enum EncodingError { UnknownMessageType(#[from] UnknownMessageType), #[error("hit message size limit: {0} >= {1}")] #[code(restate_errors::RT0003)] - MessageSizeLimit(usize, usize), + MessageSizeLimit(usize, NonZeroUsize), } // --- Input message encoder @@ -126,15 +127,15 @@ fn encode_msg(msg: &ProtocolMessage, buf: &mut impl BufMut) -> Result<(), prost: pub struct Decoder { buf: SegmentedBuf, state: DecoderState, - message_size_warning: usize, - message_size_limit: usize, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, } impl Decoder { pub fn new( service_protocol_version: ServiceProtocolVersion, - message_size_warning: usize, - message_size_limit: Option, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, ) -> Self { assert_ne!( service_protocol_version, @@ -145,7 +146,7 @@ impl Decoder { buf: SegmentedBuf::new(), state: DecoderState::WaitingHeader, message_size_warning, - message_size_limit: message_size_limit.unwrap_or(usize::MAX), + message_size_limit, } } @@ -198,8 +199,8 @@ impl DecoderState { fn decode( &mut self, mut buf: impl Buf, - message_size_warning: usize, - message_size_limit: usize, + message_size_warning: NonZeroUsize, + message_size_limit: NonZeroUsize, ) -> Result, EncodingError> { let mut res = None; @@ -209,7 +210,7 @@ impl DecoderState { let message_length = usize::try_from(header.frame_length()).expect("u32 must convert into usize"); - if message_length >= message_size_warning { + if message_length >= message_size_warning.get() { warn!( "Message size warning for '{:?}': {} >= {}. \ Generating very large messages can make the system unstable if configured with too little memory. \ @@ -219,7 +220,7 @@ impl DecoderState { ByteCount::from(message_size_warning), ); } - if message_length >= message_size_limit { + if message_length >= message_size_limit.get() { return Err(EncodingError::MessageSizeLimit( message_length, message_size_limit, @@ -383,7 +384,11 @@ mod tests { #[test] fn fill_decoder_with_several_messages() { let mut encoder = Encoder::new(ServiceProtocolVersion::V1); - let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); + let mut decoder = Decoder::new( + ServiceProtocolVersion::V1, + NonZeroUsize::MAX, + NonZeroUsize::MAX, + ); let expected_msg_0 = ProtocolMessage::new_start_message( "key".into(), @@ -440,7 +445,11 @@ mod tests { fn partial_decoding_test(split_index: usize) { let mut encoder = Encoder::new(ServiceProtocolVersion::V1); - let mut decoder = Decoder::new(ServiceProtocolVersion::V1, usize::MAX, None); + let mut decoder = Decoder::new( + ServiceProtocolVersion::V1, + NonZeroUsize::MAX, + NonZeroUsize::MAX, + ); let expected_msg: ProtocolMessage = ProtobufRawEntryCodec::serialize_as_input_entry( vec![], @@ -467,8 +476,8 @@ mod tests { fn hit_message_size_limit() { let mut decoder = Decoder::new( ServiceProtocolVersion::V1, - (u8::MAX / 2) as usize, - Some(u8::MAX as usize), + NonZeroUsize::new((u8::MAX / 2) as usize).unwrap(), + NonZeroUsize::new(u8::MAX as usize).unwrap(), ); let mut encoder = Encoder::new(ServiceProtocolVersion::V1); @@ -487,6 +496,6 @@ mod tests { EncodingError::MessageSizeLimit(msg_size, limit) = decoder.consume_next().unwrap_err() ); assert_eq!(msg_size, expected_msg_size); - assert_eq!(limit, u8::MAX as usize) + assert_eq!(limit, NonZeroUsize::new(u8::MAX as usize).unwrap()) } } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index ccc2f4d387..4c23b58828 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -22,8 +22,8 @@ use restate_serde_util::{NonZeroByteCount, SerdeableHeaderHashMap}; use restate_time_util::{FriendlyDuration, NonZeroFriendlyDuration}; use super::{ - AwsLambdaOptions, GossipOptions, HttpOptions, InvalidConfigurationError, ObjectStoreOptions, - PerfStatsLevel, RocksDbOptions, + AwsLambdaOptions, DEFAULT_MESSAGE_SIZE_LIMIT, GossipOptions, HttpOptions, + InvalidConfigurationError, ObjectStoreOptions, PerfStatsLevel, RocksDbOptions, }; use crate::PlainNodeId; use crate::config::NetworkingOptions; @@ -803,27 +803,30 @@ pub struct MetadataClientOptions { /// Backoff policy used by the metadata client when it encounters concurrent modifications. pub backoff_policy: RetryPolicy, - /// # Max Grpc Message Size + /// # Metadata Network Message Size /// - /// Limits the maximum size of a grpc message. + /// Maximum size of network messages that metadata client can receive from a metadata server. /// - /// Default: `10MB` - #[cfg_attr(feature = "schemars", schemars(skip))] + /// If unset, defaults to `networking.message-size-limit`. If set, it will be clamped at + /// the value of `networking.message-size-limit` since larger messages cannot be transmitted + /// over the cluster internal network. #[serde(skip_serializing_if = "Option::is_none")] - pub max_message_size: Option, + pub message_size_limit: Option, } impl MetadataClientOptions { pub(crate) fn merge(&mut self, network_options: &NetworkingOptions) { - if self.max_message_size.is_none() { - self.max_message_size = Some(network_options.max_message_size); - } + self.message_size_limit = Some( + self.message_size_limit + .map(|limit| limit.min(network_options.message_size_limit)) + .unwrap_or(network_options.message_size_limit), + ); } - pub fn max_message_size(&self) -> usize { - self.max_message_size - .map(|v| v.as_usize()) - .unwrap_or(10 * 1024 * 1024) + pub fn message_size_limit(&self) -> NonZeroUsize { + self.message_size_limit + .map(|v| v.as_non_zero_usize()) + .unwrap_or(DEFAULT_MESSAGE_SIZE_LIMIT) } } @@ -841,7 +844,7 @@ impl Default for MetadataClientOptions { Some(10), Some(Duration::from_millis(1000)), ), - max_message_size: None, + message_size_limit: None, } } } diff --git a/crates/types/src/config/mod.rs b/crates/types/src/config/mod.rs index f33d760105..0e89978383 100644 --- a/crates/types/src/config/mod.rs +++ b/crates/types/src/config/mod.rs @@ -194,6 +194,7 @@ impl Configuration { .common .set_derived_values(&config.networking) .unwrap(); + config.worker.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); config @@ -206,6 +207,7 @@ impl Configuration { .common .set_derived_values(&config.networking) .unwrap(); + config.worker.set_derived_values(&config.networking); config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); config diff --git a/crates/types/src/config/networking.rs b/crates/types/src/config/networking.rs index a4b996040f..d932cab82d 100644 --- a/crates/types/src/config/networking.rs +++ b/crates/types/src/config/networking.rs @@ -14,10 +14,14 @@ use std::time::Duration; use restate_serde_util::NonZeroByteCount; use restate_time_util::NonZeroFriendlyDuration; -use crate::retries::RetryPolicy; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use crate::retries::RetryPolicy; + +/// The default maximum size for messages (32 MiB). +pub const DEFAULT_MESSAGE_SIZE_LIMIT: NonZeroUsize = NonZeroUsize::new(32 * 1024 * 1024).unwrap(); + /// # Networking options /// /// Common network configuration options for communicating with Restate cluster nodes. Note that @@ -73,21 +77,26 @@ pub struct NetworkingOptions { /// Maximum theoretical value is 2^31-1 (2 GiB - 1), but we will sanitize this value to 500 MiB. data_stream_window_size: NonZeroByteCount, - // The network fabric gRPC server ignores `max_message_size`; it uses its own cap defined in crates/core/src/network/grpc/mod.rs. - // This setting is honored by metadata-server, metadata-proxy, the node ctl service, and other clients. - // - /// # Max Grpc Message Size + /// # Networking Message Size Limit /// - /// Limits the maximum size of a grpc message. + /// Maximum size of a message that can be sent or received over the network. + /// This applies to communication between Restate cluster nodes, as well as + /// between Restate servers and external tools such as CLI and management APIs. /// - /// Default: `10MB` - #[serde(default = "default_max_message_size")] - #[cfg_attr(feature = "schemars", schemars(skip))] - pub max_message_size: NonZeroByteCount, + /// Default: `32MiB` + #[serde( + default = "default_message_size_limit", + skip_serializing_if = "is_default_message_size_limit" + )] + pub message_size_limit: NonZeroByteCount, +} + +const fn default_message_size_limit() -> NonZeroByteCount { + NonZeroByteCount::new(DEFAULT_MESSAGE_SIZE_LIMIT) } -fn default_max_message_size() -> NonZeroByteCount { - NonZeroByteCount::new(NonZeroUsize::new(10 * 1024 * 1024).expect("Non zero number")) +fn is_default_message_size_limit(value: &NonZeroByteCount) -> bool { + value.as_non_zero_usize() == DEFAULT_MESSAGE_SIZE_LIMIT } impl NetworkingOptions { pub fn stream_window_size(&self) -> u32 { @@ -121,7 +130,7 @@ impl Default for NetworkingOptions { data_stream_window_size: NonZeroByteCount::new( NonZeroUsize::new(2 * 1024 * 1024).expect("Non zero number"), ), - max_message_size: default_max_message_size(), + message_size_limit: default_message_size_limit(), } } } diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 0368c0003a..4345e39840 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -19,7 +19,10 @@ use tracing::warn; use restate_serde_util::NonZeroByteCount; use restate_time_util::{FriendlyDuration, NonZeroFriendlyDuration}; -use super::{CommonOptions, ObjectStoreOptions, RocksDbOptions, RocksDbOptionsBuilder}; +use super::{ + CommonOptions, DEFAULT_MESSAGE_SIZE_LIMIT, NetworkingOptions, ObjectStoreOptions, + RocksDbOptions, RocksDbOptionsBuilder, +}; use crate::config::IngestionOptions; use crate::identifiers::PartitionId; use crate::rate::Rate; @@ -106,6 +109,11 @@ pub struct WorkerOptions { } impl WorkerOptions { + /// set networking-derived values if they are not configured to reduce verbose configurations + pub fn set_derived_values(&mut self, opts: &NetworkingOptions) { + self.invoker.merge(opts); + } + pub fn internal_queue_length(&self) -> usize { self.internal_queue_length.into() } @@ -266,16 +274,18 @@ pub struct InvokerOptions { /// # Message size warning /// /// Threshold to log a warning in case protocol messages coming from a service are larger than the specified amount. - #[serde_as(as = "NonZeroByteCount")] - #[cfg_attr(feature = "schemars", schemars(with = "NonZeroByteCount"))] - pub message_size_warning: NonZeroUsize, + pub message_size_warning: NonZeroByteCount, /// # Message size limit /// - /// Threshold to fail the invocation in case protocol messages coming from a service are larger than the specified amount. - #[serde_as(as = "Option")] - #[cfg_attr(feature = "schemars", schemars(with = "Option"))] - message_size_limit: Option, + /// Maximum size of journal messages that can be received from a service. If a service sends a message + /// larger than this limit, the invocation will fail. + /// + /// If unset, defaults to `networking.message-size-limit`. If set, it will be clamped at + /// the value of `networking.message-size-limit` since larger messages cannot be transmitted + /// over the cluster internal network. + #[serde(skip_serializing_if = "Option::is_none")] + message_size_limit: Option, /// # Temporary directory /// @@ -342,8 +352,18 @@ impl InvokerOptions { self.in_memory_queue_length_limit.into() } - pub fn message_size_limit(&self) -> Option { - self.message_size_limit.map(Into::into) + pub fn message_size_limit(&self) -> NonZeroUsize { + self.message_size_limit + .map(|v| v.as_non_zero_usize()) + .unwrap_or(DEFAULT_MESSAGE_SIZE_LIMIT) + } + + pub(crate) fn merge(&mut self, opts: &NetworkingOptions) { + self.message_size_limit = Some( + self.message_size_limit + .map(|limit| limit.min(opts.message_size_limit)) + .unwrap_or(opts.message_size_limit), + ); } } @@ -353,7 +373,9 @@ impl Default for InvokerOptions { in_memory_queue_length_limit: NonZeroUsize::new(66_049).unwrap(), inactivity_timeout: FriendlyDuration::new(DEFAULT_INACTIVITY_TIMEOUT), abort_timeout: FriendlyDuration::new(DEFAULT_ABORT_TIMEOUT), - message_size_warning: NonZeroUsize::new(10 * 1024 * 1024).unwrap(), // 10MiB + message_size_warning: NonZeroByteCount::new( + NonZeroUsize::new(10 * 1024 * 1024).unwrap(), + ), message_size_limit: None, tmp_dir: None, concurrent_invocations_limit: Some(NonZeroUsize::new(1000).expect("is non zero")), diff --git a/crates/types/src/config_loader.rs b/crates/types/src/config_loader.rs index 6b96ba71bc..9ba69d81d4 100644 --- a/crates/types/src/config_loader.rs +++ b/crates/types/src/config_loader.rs @@ -81,6 +81,7 @@ impl ConfigLoader { config.common.set_derived_values(&config.networking)?; config.admin.set_derived_values(&config.common); config.ingress.set_derived_values(&config.common); + config.worker.set_derived_values(&config.networking); if self.metadata_migration_mode { // In metadata migration mode we keep only Admin and MetadataServer roles that were diff --git a/crates/types/src/net/connect_opts.rs b/crates/types/src/net/connect_opts.rs index d93edd813b..e71d4cad24 100644 --- a/crates/types/src/net/connect_opts.rs +++ b/crates/types/src/net/connect_opts.rs @@ -8,14 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use crate::config::{MetadataClientOptions, NetworkingOptions}; +/// Overhead added to user-facing max_message_size. +/// +/// This accounts for message wrapping (headers, envelopes, encoding overhead) +/// when messages are transmitted over the internal message fabric. +pub const MESSAGE_SIZE_OVERHEAD: usize = 1024 * 1024; // 1 MiB + pub trait GrpcConnectionOptions { - /// Gets the maximum message size for grpc servers and clients. - fn max_message_size(&self) -> usize; + /// The maximum message size for tonic gRPC configuration. + /// Implementations should add [`MESSAGE_SIZE_OVERHEAD`] to account for message + /// wrapping when transmitted over the internal network. + fn message_size_limit(&self) -> NonZeroUsize; } /// Helper trait to extract common client connection options from different configuration types. pub trait CommonClientConnectionOptions: GrpcConnectionOptions { @@ -27,8 +36,8 @@ pub trait CommonClientConnectionOptions: GrpcConnectionOptions { } impl GrpcConnectionOptions for &T { - fn max_message_size(&self) -> usize { - (*self).max_message_size() + fn message_size_limit(&self) -> NonZeroUsize { + (*self).message_size_limit() } } @@ -61,8 +70,8 @@ impl GrpcConnectionOptions for Arc where T: GrpcConnectionOptions, { - fn max_message_size(&self) -> usize { - (**self).max_message_size() + fn message_size_limit(&self) -> NonZeroUsize { + (**self).message_size_limit() } } @@ -92,8 +101,11 @@ where } impl GrpcConnectionOptions for NetworkingOptions { - fn max_message_size(&self) -> usize { - self.max_message_size.as_usize() + #[inline] + fn message_size_limit(&self) -> NonZeroUsize { + self.message_size_limit + .as_non_zero_usize() + .saturating_add(MESSAGE_SIZE_OVERHEAD) } } @@ -120,8 +132,8 @@ impl CommonClientConnectionOptions for NetworkingOptions { } impl GrpcConnectionOptions for MetadataClientOptions { - fn max_message_size(&self) -> usize { - Self::max_message_size(self) + fn message_size_limit(&self) -> NonZeroUsize { + Self::message_size_limit(self).saturating_add(MESSAGE_SIZE_OVERHEAD) } } diff --git a/lite/src/lib.rs b/lite/src/lib.rs index 63e2a66dbe..6205b9ab0b 100644 --- a/lite/src/lib.rs +++ b/lite/src/lib.rs @@ -201,6 +201,7 @@ impl Restate { config.common.set_derived_values(&config.networking)?; config.ingress.set_derived_values(&config.common); config.admin.set_derived_values(&config.common); + config.worker.set_derived_values(&config.networking); let config = config.apply_cascading_values(); config.validate()?; diff --git a/tools/mock-service-endpoint/src/handler.rs b/tools/mock-service-endpoint/src/handler.rs index 578130d129..22cf1412e7 100644 --- a/tools/mock-service-endpoint/src/handler.rs +++ b/tools/mock-service-endpoint/src/handler.rs @@ -10,6 +10,7 @@ use std::convert::Infallible; use std::fmt::{Display, Formatter}; +use std::num::NonZeroUsize; use std::str::FromStr; use async_stream::{stream, try_stream}; @@ -83,7 +84,11 @@ pub async fn serve( }; let req_body = BodyStream::new(req_body); - let mut decoder = Decoder::new(ServiceProtocolVersion::V5, usize::MAX, None); + let mut decoder = Decoder::new( + ServiceProtocolVersion::V5, + NonZeroUsize::MAX, + NonZeroUsize::MAX, + ); let mut encoder = Encoder::new(ServiceProtocolVersion::V5); let incoming = stream! { diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 17bbb049da..6e864a3a4d 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -14,8 +14,8 @@ use cling::prelude::*; use tracing::{info, warn}; use restate_cli_util::_comfy_table::{Cell, Color, Table}; -use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; +use restate_cli_util::{CliContext, c_println}; use restate_log_server_grpc::{GetDigestRequest, GetLogletInfoRequest, new_log_server_client}; use restate_types::PlainNodeId; use restate_types::logs::TailOffsetWatch; @@ -83,7 +83,7 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R // get loglet info let mut loglet_infos: HashMap = HashMap::default(); for (node_id, channel) in nodeset_channels.iter() { - let mut client = new_log_server_client(channel.clone()); + let mut client = new_log_server_client(channel.clone(), &CliContext::get().network); let Ok(Some(loglet_info)) = client .get_loglet_info(GetLogletInfoRequest { loglet_id: opts.loglet_id.into(), @@ -131,7 +131,7 @@ async fn get_digest(connection: &ConnectionInfo, opts: &DigestOpts) -> anyhow::R from_offset, to_offset, }; - let mut client = new_log_server_client(channel.clone()); + let mut client = new_log_server_client(channel.clone(), &CliContext::get().network); let digest = match client.get_digest(req).await { Ok(response) => response.into_inner().digest.expect("always set by servers"), Err(err) => { diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index 7c74e73a16..9ec7272117 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -14,9 +14,9 @@ use cling::prelude::*; use tracing::warn; use restate_cli_util::_comfy_table::{Attribute, Cell, Color, Table}; -use restate_cli_util::c_println; use restate_cli_util::ui::console::{Styled, StyledTable}; use restate_cli_util::ui::stylesheet::Style; +use restate_cli_util::{CliContext, c_println}; use restate_log_server_grpc::{GetLogletInfoRequest, new_log_server_client}; use restate_types::PlainNodeId; use restate_types::logs::LogletId; @@ -124,7 +124,10 @@ async fn get_info(connection: &ConnectionInfo, opts: &InfoOpts) -> anyhow::Resul ); continue; } - let mut client = new_log_server_client(grpc_channel(node.address.clone())); + let mut client = new_log_server_client( + grpc_channel(node.address.clone()), + &CliContext::get().network, + ); let Ok(Some(loglet_info)) = client .get_loglet_info(GetLogletInfoRequest { loglet_id: opts.loglet_id.into(), diff --git a/tools/service-protocol-wireshark-dissector/src/lib.rs b/tools/service-protocol-wireshark-dissector/src/lib.rs index 40e7471273..5825c25496 100644 --- a/tools/service-protocol-wireshark-dissector/src/lib.rs +++ b/tools/service-protocol-wireshark-dissector/src/lib.rs @@ -14,6 +14,7 @@ use mlua::{Table, Value}; use restate_service_protocol_v4::message_codec::{Decoder, Message, MessageType}; use restate_time_util::DurationExt; use restate_types::service_protocol::ServiceProtocolVersion; +use std::num::NonZeroUsize; use std::time::Duration; #[derive(Debug, thiserror::Error)] @@ -32,7 +33,11 @@ fn decode_packages(lua: &Lua, buf_lua: Value) -> LuaResult { // We should store it somewhere, but right now wireshark doesn't support conversations in lua api // so we just keep it simple and assume all messages are self contained within the same http data frame // https://ask.wireshark.org/question/11650/lua-wireshark-dissector-combine-data-from-2-udp-packets - let mut dec = Decoder::new(ServiceProtocolVersion::V4, usize::MAX, None); + let mut dec = Decoder::new( + ServiceProtocolVersion::V4, + NonZeroUsize::MAX, + NonZeroUsize::MAX, + ); // Convert the buffer and push it to the decoder let buf = match buf_lua {