From 8abba17d37ee3c38fc70acb6d8492dc4fe1a4ae4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 11 Dec 2023 08:08:57 -0700 Subject: [PATCH] Make max encoding message size configurable --- ballista/executor/executor_config_spec.toml | 6 ++++++ ballista/executor/src/bin/main.rs | 1 + ballista/executor/src/executor_process.rs | 2 ++ ballista/executor/src/executor_server.rs | 3 +++ ballista/scheduler/scheduler_config_spec.toml | 6 ++++++ ballista/scheduler/src/bin/main.rs | 1 + ballista/scheduler/src/config.rs | 8 ++++++++ ballista/scheduler/src/scheduler_process.rs | 3 +++ 8 files changed, 30 insertions(+) diff --git a/ballista/executor/executor_config_spec.toml b/ballista/executor/executor_config_spec.toml index 8bed5ac6d..209069de1 100644 --- a/ballista/executor/executor_config_spec.toml +++ b/ballista/executor/executor_config_spec.toml @@ -132,6 +132,12 @@ type = "u32" default = "16777216" doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB" +[[param]] +name = "grpc_server_max_encoding_message_size" +type = "u32" +default = "16777216" +doc = "The maximum size of an encoded message at the grpc server side. Default: 16MB" + [[param]] name = "executor_heartbeat_interval_seconds" type = "u64" diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 52bf63488..a7ba8e36e 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -79,6 +79,7 @@ async fn main() -> Result<()> { job_data_ttl_seconds: opt.job_data_ttl_seconds, job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds, grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size, + grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size, executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds, data_cache_policy: opt.data_cache_policy, cache_dir: opt.cache_dir, diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 4672dffb4..d3b569782 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -96,6 +96,8 @@ pub struct ExecutorProcessConfig { pub cache_io_concurrency: u32, /// The maximum size of a decoded message at the grpc server side. pub grpc_server_max_decoding_message_size: u32, + /// The maximum size of an encoded message at the grpc server side. + pub grpc_server_max_encoding_message_size: u32, pub executor_heartbeat_interval_seconds: u64, /// Optional execution engine to use to execute physical plans, will default to /// DataFusion if none is provided. diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index c046558bf..90f888cc0 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -112,6 +112,9 @@ pub async fn startup( BALLISTA_VERSION, addr ); let server = ExecutorGrpcServer::new(executor_server.clone()) + .max_encoding_message_size( + config.grpc_server_max_encoding_message_size as usize, + ) .max_decoding_message_size( config.grpc_server_max_decoding_message_size as usize, ); diff --git a/ballista/scheduler/scheduler_config_spec.toml b/ballista/scheduler/scheduler_config_spec.toml index c9f5154a2..857212fe8 100644 --- a/ballista/scheduler/scheduler_config_spec.toml +++ b/ballista/scheduler/scheduler_config_spec.toml @@ -171,6 +171,12 @@ type = "u32" default = "16777216" doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB" +[[param]] +name = "grpc_server_max_encoding_message_size" +type = "u32" +default = "16777216" +doc = "The maximum size of an encoded message at the grpc server side. Default: 16MB" + [[param]] name = "executor_timeout_seconds" type = "u64" diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index 46ac7cc9a..dbdfc7474 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -155,6 +155,7 @@ async fn main() -> Result<()> { scheduler_event_expected_processing_duration: opt .scheduler_event_expected_processing_duration, grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size, + grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size, executor_timeout_seconds: opt.executor_timeout_seconds, expire_dead_executor_interval_seconds: opt.expire_dead_executor_interval_seconds, }; diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index 7a0c10c4a..38ae5ae01 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -56,6 +56,8 @@ pub struct SchedulerConfig { pub scheduler_event_expected_processing_duration: u64, /// The maximum size of a decoded message at the grpc server side. pub grpc_server_max_decoding_message_size: u32, + /// The maximum size of an encoded message at the grpc server side. + pub grpc_server_max_encoding_message_size: u32, /// The executor timeout in seconds. It should be longer than executor's heartbeat intervals. pub executor_timeout_seconds: u64, /// The interval to check expired or dead executors @@ -79,6 +81,7 @@ impl Default for SchedulerConfig { executor_termination_grace_period: 0, scheduler_event_expected_processing_duration: 0, grpc_server_max_decoding_message_size: 16777216, + grpc_server_max_encoding_message_size: 16777216, executor_timeout_seconds: 180, expire_dead_executor_interval_seconds: 15, } @@ -167,6 +170,11 @@ impl SchedulerConfig { self.grpc_server_max_decoding_message_size = value; self } + + pub fn with_grpc_server_max_encoding_message_size(mut self, value: u32) -> Self { + self.grpc_server_max_encoding_message_size = value; + self + } } #[derive(Clone, Debug)] diff --git a/ballista/scheduler/src/scheduler_process.rs b/ballista/scheduler/src/scheduler_process.rs index 64ea30725..6bcaaec5c 100644 --- a/ballista/scheduler/src/scheduler_process.rs +++ b/ballista/scheduler/src/scheduler_process.rs @@ -75,6 +75,9 @@ pub async fn start_server( let config = &scheduler_server.state.config; let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone()) + .max_encoding_message_size( + config.grpc_server_max_encoding_message_size as usize, + ) .max_decoding_message_size( config.grpc_server_max_decoding_message_size as usize, );