Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use restate_types::logs::metadata::{Logs, SegmentIndex};
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::metadata::{GlobalMetadata, Precondition};
use restate_types::metadata_store::keys::{NODES_CONFIG_KEY, partition_processor_epoch_key};
use restate_types::net::connect_opts::GrpcConnectionOptions;
use restate_types::net::partition_processor_manager::Snapshot;
use restate_types::nodes_config::{NodesConfiguration, Role};
use restate_types::partitions::PartitionTable;
Expand Down Expand Up @@ -80,8 +81,7 @@ impl ClusterCtrlSvcHandler {

pub fn into_server(self, config: &NetworkingOptions) -> ClusterCtrlSvcServer<Self> {
let server = ClusterCtrlSvcServer::new(self)
.max_decoding_message_size(config.max_message_size.as_usize())
.max_encoding_message_size(config.max_message_size.as_usize())
.max_decoding_message_size(config.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
Expand Down
30 changes: 23 additions & 7 deletions crates/cli-util/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroUsize;
use std::time::Duration;

use clap::{Args, ValueEnum};
use clap_verbosity_flag::{LogLevel, VerbosityFilter};
use cling::Collect;

use restate_types::net::connect_opts::{CommonClientConnectionOptions, GrpcConnectionOptions};
use restate_types::config::DEFAULT_MESSAGE_SIZE_LIMIT;
use restate_types::net::connect_opts::{
CommonClientConnectionOptions, GrpcConnectionOptions, MESSAGE_SIZE_OVERHEAD,
};

const DEFAULT_CONNECT_TIMEOUT: u64 = 3_000;
const DEFAULT_REQUEST_TIMEOUT: u64 = 13_000;
Expand Down Expand Up @@ -79,7 +83,7 @@ pub(crate) struct ConfirmMode {
pub yes: bool,
}

#[derive(Args, Clone, Default)]
#[derive(Args, Clone)]
pub struct NetworkOpts {
/// Connection timeout for network calls, in milliseconds.
#[arg(long, default_value_t = DEFAULT_CONNECT_TIMEOUT, global = true)]
Expand All @@ -91,14 +95,26 @@ pub struct NetworkOpts {
/// insecure
#[arg[long = "insecure-skip-tls-verify", default_value_t = false, global = true]]
pub insecure_skip_tls_verify: bool,
/// Limits the maximum size of a grpc message.
#[arg[long, default_value_t = 10*1024*1024, global = true, hide = true]]
pub max_message_size: usize,
/// Sets the maximum size of a network messages.
#[arg[long, default_value_t = DEFAULT_MESSAGE_SIZE_LIMIT, global = true, hide = true]]
pub message_size_limit: NonZeroUsize,
}

impl Default for NetworkOpts {
fn default() -> Self {
Self {
connect_timeout: DEFAULT_CONNECT_TIMEOUT,
request_timeout: DEFAULT_REQUEST_TIMEOUT,
insecure_skip_tls_verify: false,
message_size_limit: DEFAULT_MESSAGE_SIZE_LIMIT,
}
}
}

impl GrpcConnectionOptions for NetworkOpts {
fn max_message_size(&self) -> usize {
self.max_message_size
fn message_size_limit(&self) -> NonZeroUsize {
self.message_size_limit
.saturating_add(MESSAGE_SIZE_OVERHEAD)
}
}

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/network/grpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use tracing::{debug, warn};

use restate_types::config::{Configuration, NetworkingOptions};
use restate_types::net::address::{AdvertisedAddress, GrpcPort, ListenerPort, PeerNetAddress};
use restate_types::net::connect_opts::GrpcConnectionOptions;

use super::MAX_MESSAGE_SIZE;
use crate::network::grpc::DEFAULT_GRPC_COMPRESSION;
use crate::network::protobuf::core_node_svc::core_node_svc_client::CoreNodeSvcClient;
use crate::network::protobuf::network::Message;
Expand Down Expand Up @@ -52,19 +52,19 @@ impl TransportConnect for GrpcConnector {
};

debug!("Connecting to {} at {}", destination, address);
let channel = create_channel(address, swimlane, &Configuration::pinned().networking);
let networking = &Configuration::pinned().networking;
let channel = create_channel(address, swimlane, networking);

// Establish the connection
let client = CoreNodeSvcClient::new(channel)
.max_encoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(networking.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
// Apply send compression only if compression is enabled. Note that this doesn't impact the
// "receive" compression. The receive compression is always applied if the peer compresses
// its send stream.
let mut client = if Configuration::pinned().networking.disable_compression {
let mut client = if networking.disable_compression {
client
} else {
client.send_compressed(DEFAULT_GRPC_COMPRESSION)
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/network/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
mod connector;
mod svc_handler;

pub use connector::GrpcConnector;
pub use svc_handler::CoreNodeSvcHandler;
use tonic::codec::CompressionEncoding;

/// The maximum size for a grpc message for core networking service.
/// This impacts the buffer limit for prost codec.
pub const MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;
pub use connector::GrpcConnector;
pub use svc_handler::CoreNodeSvcHandler;

/// Default send compression for grpc clients
pub const DEFAULT_GRPC_COMPRESSION: CompressionEncoding = CompressionEncoding::Zstd;
6 changes: 2 additions & 4 deletions crates/core/src/network/grpc/svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tonic::{Request, Response, Status, Streaming};
use tracing::warn;

use restate_types::config::NetworkingOptions;
use restate_types::net::connect_opts::GrpcConnectionOptions;

use crate::network::ConnectionManager;
use crate::network::protobuf::core_node_svc::core_node_svc_server::{
Expand All @@ -23,8 +24,6 @@ use crate::network::protobuf::core_node_svc::core_node_svc_server::{
use crate::network::protobuf::core_node_svc::{RpcRequest, RpcResponse};
use crate::network::protobuf::network::Message;

use super::MAX_MESSAGE_SIZE;

pub struct CoreNodeSvcHandler {
connections: ConnectionManager,
}
Expand All @@ -36,8 +35,7 @@ impl CoreNodeSvcHandler {

pub fn into_server(self, config: &NetworkingOptions) -> CoreNodeSvcServer<Self> {
let server = CoreNodeSvcServer::new(self)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_encoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(config.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
Expand Down
6 changes: 2 additions & 4 deletions crates/core/src/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub mod cluster_ctrl_svc {
connection_options: &O,
) -> cluster_ctrl_svc_client::ClusterCtrlSvcClient<tonic::transport::Channel> {
cluster_ctrl_svc_client::ClusterCtrlSvcClient::new(channel)
.max_encoding_message_size(connection_options.max_message_size())
.max_decoding_message_size(connection_options.max_message_size())
.max_decoding_message_size(connection_options.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
Expand Down Expand Up @@ -69,8 +68,7 @@ pub mod node_ctl_svc {
connection_options: &O,
) -> NodeCtlSvcClient<Channel> {
node_ctl_svc_client::NodeCtlSvcClient::new(channel)
.max_encoding_message_size(connection_options.max_message_size())
.max_decoding_message_size(connection_options.max_message_size())
.max_decoding_message_size(connection_options.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
Expand Down
9 changes: 5 additions & 4 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use super::Notification;
use std::collections::HashSet;
use std::convert::Infallible;
use std::iter::Empty;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -145,8 +146,8 @@ pub(super) struct InvocationTask<IR, EE, DMR> {
inactivity_timeout: Duration,
abort_timeout: Duration,
disable_eager_state: bool,
message_size_warning: usize,
message_size_limit: Option<usize>,
message_size_warning: NonZeroUsize,
message_size_limit: NonZeroUsize,
retry_count_since_last_stored_entry: u32,

// Invoker tx/rx
Expand Down Expand Up @@ -207,8 +208,8 @@ where
default_inactivity_timeout: Duration,
default_abort_timeout: Duration,
disable_eager_state: bool,
message_size_warning: usize,
message_size_limit: Option<usize>,
message_size_warning: NonZeroUsize,
message_size_limit: NonZeroUsize,
retry_count_since_last_stored_entry: u32,
invocation_reader: IR,
entry_enricher: EE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
Expand Down Expand Up @@ -1152,8 +1153,8 @@ impl<S> DecoderStream<S> {
fn new(
inner: S,
service_protocol_version: ServiceProtocolVersion,
message_size_warning: usize,
message_size_limit: Option<usize>,
message_size_warning: NonZeroUsize,
message_size_limit: NonZeroUsize,
) -> Self {
Self {
inner,
Expand Down
10 changes: 5 additions & 5 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
opts.inactivity_timeout.into(),
opts.abort_timeout.into(),
opts.disable_eager_state,
opts.message_size_warning.get(),
opts.message_size_warning.as_non_zero_usize(),
opts.message_size_limit(),
retry_count_since_last_stored_entry,
storage_reader,
Expand Down Expand Up @@ -1881,7 +1881,7 @@ mod tests {
.inactivity_timeout(FriendlyDuration::ZERO)
.abort_timeout(FriendlyDuration::ZERO)
.disable_eager_state(false)
.message_size_warning(NonZeroUsize::new(1024).unwrap())
.message_size_warning(NonZeroUsize::new(1024).unwrap().into())
.message_size_limit(None)
.build()
.unwrap();
Expand Down Expand Up @@ -1951,7 +1951,7 @@ mod tests {
.inactivity_timeout(FriendlyDuration::ZERO)
.abort_timeout(FriendlyDuration::ZERO)
.disable_eager_state(false)
.message_size_warning(NonZeroUsize::new(1024).unwrap())
.message_size_warning(NonZeroUsize::new(1024).unwrap().into())
.message_size_limit(None)
.build()
.unwrap();
Expand Down Expand Up @@ -2066,7 +2066,7 @@ mod tests {
.inactivity_timeout(FriendlyDuration::ZERO)
.abort_timeout(FriendlyDuration::ZERO)
.disable_eager_state(false)
.message_size_warning(NonZeroUsize::new(1024).unwrap())
.message_size_warning(NonZeroUsize::new(1024).unwrap().into())
.message_size_limit(None)
.build()
.unwrap();
Expand Down Expand Up @@ -2143,7 +2143,7 @@ mod tests {
.inactivity_timeout(FriendlyDuration::ZERO)
.abort_timeout(FriendlyDuration::ZERO)
.disable_eager_state(false)
.message_size_warning(NonZeroUsize::new(1024).unwrap())
.message_size_warning(NonZeroUsize::new(1024).unwrap().into())
.message_size_limit(None)
.build()
.unwrap();
Expand Down
14 changes: 5 additions & 9 deletions crates/log-server-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,18 @@ pub const FILE_DESCRIPTOR_SET: &[u8] =

/// Creates a new ClusterCtrlSvcClient with appropriate configuration
#[cfg(feature = "grpc-client")]
pub fn new_log_server_client(
pub fn new_log_server_client<O: restate_types::net::connect_opts::GrpcConnectionOptions>(
channel: tonic::transport::Channel,
connection_options: &O,
) -> log_server_svc_client::LogServerSvcClient<tonic::transport::Channel> {
/// The maximum size for a grpc message for core networking service.
/// This impacts the buffer limit for prost codec.
pub const MAX_MESSAGE_SIZE: usize = 32 * 1024 * 1024;

use tonic::codec::CompressionEncoding;
/// Default send compression for grpc clients
pub const DEFAULT_GRPC_COMPRESSION: CompressionEncoding = CompressionEncoding::Zstd;

log_server_svc_client::LogServerSvcClient::new(channel)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_encoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(connection_options.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(DEFAULT_GRPC_COMPRESSION)
}
5 changes: 2 additions & 3 deletions crates/log-server/src/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use async_trait::async_trait;
use tonic::codec::CompressionEncoding;
use tonic::{Request, Response, Status};

use restate_core::network::grpc::MAX_MESSAGE_SIZE;
use restate_types::config::NetworkingOptions;
use restate_types::logs::{LogletId, LogletOffset, SequenceNumber};
use restate_types::net::connect_opts::GrpcConnectionOptions;
use restate_types::net::log_server::{GetDigest, LogServerResponseHeader, LogletInfo};

use crate::logstore::LogStore;
Expand Down Expand Up @@ -42,8 +42,7 @@ where

pub fn into_server(self, config: &NetworkingOptions) -> LogServerSvcServer<Self> {
let server = LogServerSvcServer::new(self)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_encoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(config.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
Expand Down
8 changes: 4 additions & 4 deletions crates/log-server/src/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ pub const FILE_DESCRIPTOR_SET: &[u8] =

/// Creates a new ClusterCtrlSvcClient with appropriate configuration
#[cfg(feature = "clients")]
pub fn new_log_server_client(
pub fn new_log_server_client<O: restate_types::net::connect_opts::GrpcConnectionOptions>(
channel: tonic::transport::Channel,
connection_options: &O,
) -> log_server_svc_client::LogServerSvcClient<tonic::transport::Channel> {
use restate_core::network::grpc::{DEFAULT_GRPC_COMPRESSION, MAX_MESSAGE_SIZE};
use restate_core::network::grpc::DEFAULT_GRPC_COMPRESSION;
log_server_svc_client::LogServerSvcClient::new(channel)
.max_decoding_message_size(MAX_MESSAGE_SIZE)
.max_encoding_message_size(MAX_MESSAGE_SIZE)
.max_decoding_message_size(connection_options.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
Expand Down
3 changes: 1 addition & 2 deletions crates/metadata-server-grpc/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ where
pub const DEFAULT_GRPC_COMPRESSION: CompressionEncoding = CompressionEncoding::Zstd;

metadata_server_svc_client::MetadataServerSvcClient::new(channel)
.max_decoding_message_size(connection_options.max_message_size())
.max_encoding_message_size(connection_options.max_message_size())
.max_decoding_message_size(connection_options.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip)
Expand Down
4 changes: 2 additions & 2 deletions crates/metadata-server/src/grpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use restate_types::config::{Configuration, NetworkingOptions};
use restate_types::errors::ConversionError;
use restate_types::metadata::Precondition;
use restate_types::metadata_store::keys::NODES_CONFIG_KEY;
use restate_types::net::connect_opts::GrpcConnectionOptions;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::storage::StorageCodec;

Expand Down Expand Up @@ -71,8 +72,7 @@ impl MetadataServerHandler {

pub fn into_server(self, config: &NetworkingOptions) -> MetadataServerSvcServer<Self> {
let server = MetadataServerSvcServer::new(self)
.max_decoding_message_size(config.max_message_size.as_usize())
.max_encoding_message_size(config.max_message_size.as_usize())
.max_decoding_message_size(config.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
Expand Down
4 changes: 2 additions & 2 deletions crates/metadata-server/src/raft/network/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tonic::{Request, Response, Status, Streaming};

use restate_types::PlainNodeId;
use restate_types::config::NetworkingOptions;
use restate_types::net::connect_opts::GrpcConnectionOptions;

use super::MetadataServerNetworkSvcServer;
use crate::raft::network::connection_manager::ConnectionError;
Expand Down Expand Up @@ -46,8 +47,7 @@ impl<M> MetadataServerNetworkHandler<M> {

pub fn into_server(self, config: &NetworkingOptions) -> MetadataServerNetworkSvcServer<Self> {
let server = MetadataServerNetworkSvcServer::new(self)
.max_decoding_message_size(config.max_message_size.as_usize())
.max_encoding_message_size(config.max_message_size.as_usize())
.max_decoding_message_size(config.message_size_limit().get())
// note: the order of those calls defines the priority
.accept_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Gzip);
Expand Down
2 changes: 1 addition & 1 deletion crates/metadata-store/src/metadata_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ impl MetadataStoreClient {
value: &VersionedValue,
) -> Result<(), MetadataSizeHardLimitError> {
let config = &Configuration::pinned().common.metadata_client;
let grp_msg_size_limit = config.max_message_size() as f64;
let grp_msg_size_limit = config.message_size_limit().get() as f64;

let soft_limit = (grp_msg_size_limit * METADATA_SIZE_SOFT_LIMIT) as usize;
let hard_limit = (grp_msg_size_limit * METADATA_SIZE_HARD_LIMIT) as usize;
Expand Down
Loading
Loading