Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 11, 2023
2 parents b1e250c + 4b3b35f commit c9f821d
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 2 deletions.
6 changes: 6 additions & 0 deletions ballista/executor/executor_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB"

[[param]]
name = "grpc_server_max_encoding_message_size"
type = "u32"
default = "16777216"
doc = "The maximum size of an encoded message at the grpc server side. Default: 16MB"

[[param]]
name = "executor_heartbeat_interval_seconds"
type = "u64"
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async fn main() -> Result<()> {
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
data_cache_policy: opt.data_cache_policy,
cache_dir: opt.cache_dir,
Expand Down
2 changes: 2 additions & 0 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub struct ExecutorProcessConfig {
pub cache_io_concurrency: u32,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
/// The maximum size of an encoded message at the grpc server side.
pub grpc_server_max_encoding_message_size: u32,
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/src/executor_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub async fn startup<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
);
let server = ExecutorGrpcServer::new(executor_server.clone())
.max_encoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
config.grpc_server_max_encoding_message_size as usize,
)
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
Expand Down
6 changes: 6 additions & 0 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ type = "u32"
default = "16777216"
doc = "The maximum size of a decoded message at the grpc server side. Default: 16MB"

[[param]]
name = "grpc_server_max_encoding_message_size"
type = "u32"
default = "16777216"
doc = "The maximum size of an encoded message at the grpc server side. Default: 16MB"

[[param]]
name = "executor_timeout_seconds"
type = "u64"
Expand Down
1 change: 1 addition & 0 deletions ballista/scheduler/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ async fn main() -> Result<()> {
scheduler_event_expected_processing_duration: opt
.scheduler_event_expected_processing_duration,
grpc_server_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_server_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_timeout_seconds: opt.executor_timeout_seconds,
expire_dead_executor_interval_seconds: opt.expire_dead_executor_interval_seconds,
};
Expand Down
8 changes: 8 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct SchedulerConfig {
pub scheduler_event_expected_processing_duration: u64,
/// The maximum size of a decoded message at the grpc server side.
pub grpc_server_max_decoding_message_size: u32,
/// The maximum size of an encoded message at the grpc server side.
pub grpc_server_max_encoding_message_size: u32,
/// The executor timeout in seconds. It should be longer than executor's heartbeat intervals.
pub executor_timeout_seconds: u64,
/// The interval to check expired or dead executors
Expand All @@ -79,6 +81,7 @@ impl Default for SchedulerConfig {
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
grpc_server_max_decoding_message_size: 16777216,
grpc_server_max_encoding_message_size: 16777216,
executor_timeout_seconds: 180,
expire_dead_executor_interval_seconds: 15,
}
Expand Down Expand Up @@ -167,6 +170,11 @@ impl SchedulerConfig {
self.grpc_server_max_decoding_message_size = value;
self
}

pub fn with_grpc_server_max_encoding_message_size(mut self, value: u32) -> Self {
self.grpc_server_max_encoding_message_size = value;
self
}
}

#[derive(Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/src/scheduler_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub async fn start_server(
let scheduler_grpc_server =
SchedulerGrpcServer::new(scheduler_server.clone())
.max_encoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
config.grpc_server_max_encoding_message_size as usize,
)
.max_decoding_message_size(
config.grpc_server_max_decoding_message_size as usize,
Expand Down

0 comments on commit c9f821d

Please sign in to comment.