From 82dc1ee86a50f7a8a1855c5ccce969ee4627c050 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Fri, 6 Dec 2024 20:26:07 +0000 Subject: [PATCH] feat: default instance for executor configuration and clean up scheduler configuration. --- ballista/core/src/config.rs | 6 ++- ballista/executor/src/bin/main.rs | 13 ++++--- ballista/executor/src/config.rs | 9 ----- ballista/executor/src/executor_process.rs | 45 ++++++++++++++++++++++- ballista/scheduler/src/config.rs | 16 ++++---- 5 files changed, 64 insertions(+), 25 deletions(-) diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index 10f24c5d5..e00cd1153 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -252,8 +252,9 @@ impl datafusion::config::ConfigExtension for BallistaConfig { // an enum used to configure the scheduler policy // needs to be visible to code generated by configure_me -#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)] +#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)] pub enum TaskSchedulingPolicy { + #[default] PullStaged, PushStaged, } @@ -274,11 +275,12 @@ impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy { // an enum used to configure the log rolling policy // needs to be visible to code generated by configure_me -#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)] +#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)] pub enum LogRotationPolicy { Minutely, Hourly, Daily, + #[default] Never, } diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 5ef88e8bf..2ab1a90c7 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -59,17 +59,18 @@ async fn main() -> Result<()> { // File layer if let Some(log_dir) = &config.log_dir { let log_file = match config.log_rotation_policy { - LogRotationPolicy::Minutely => { - tracing_appender::rolling::minutely(log_dir, &config.log_file_name_prefix) - } + LogRotationPolicy::Minutely => tracing_appender::rolling::minutely( + log_dir, + config.log_file_name_prefix(), + ), LogRotationPolicy::Hourly => { - tracing_appender::rolling::hourly(log_dir, &config.log_file_name_prefix) + tracing_appender::rolling::hourly(log_dir, config.log_file_name_prefix()) } LogRotationPolicy::Daily => { - tracing_appender::rolling::daily(log_dir, &config.log_file_name_prefix) + tracing_appender::rolling::daily(log_dir, config.log_file_name_prefix()) } LogRotationPolicy::Never => { - tracing_appender::rolling::never(log_dir, &config.log_file_name_prefix) + tracing_appender::rolling::never(log_dir, config.log_file_name_prefix()) } }; diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs index bbaf26700..65fa9d467 100644 --- a/ballista/executor/src/config.rs +++ b/ballista/executor/src/config.rs @@ -27,14 +27,6 @@ impl TryFrom for ExecutorProcessConfig { type Error = BallistaError; fn try_from(opt: Config) -> Result { - let log_file_name_prefix = format!( - "executor_{}_{}", - opt.external_host - .clone() - .unwrap_or_else(|| "localhost".to_string()), - opt.bind_port - ); - Ok(ExecutorProcessConfig { special_mod_log_level: opt.log_level_setting, external_host: opt.external_host, @@ -48,7 +40,6 @@ impl TryFrom for ExecutorProcessConfig { task_scheduling_policy: opt.task_scheduling_policy, work_dir: opt.work_dir, log_dir: opt.log_dir, - log_file_name_prefix, log_rotation_policy: opt.log_rotation_policy, print_thread_info: opt.print_thread_info, job_data_ttl_seconds: opt.job_data_ttl_seconds, diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index f87f71276..ed6902881 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -81,7 +81,6 @@ pub struct ExecutorProcessConfig { pub work_dir: Option, pub special_mod_log_level: String, pub print_thread_info: bool, - pub log_file_name_prefix: String, pub log_rotation_policy: LogRotationPolicy, pub job_data_ttl_seconds: u64, pub job_data_clean_up_interval_seconds: u64, @@ -105,6 +104,50 @@ pub struct ExecutorProcessConfig { pub override_physical_codec: Option>, } +impl ExecutorProcessConfig { + pub fn log_file_name_prefix(&self) -> String { + format!( + "executor_{}_{}", + self.external_host + .clone() + .unwrap_or_else(|| "localhost".to_string()), + self.port + ) + } +} + +impl Default for ExecutorProcessConfig { + fn default() -> Self { + Self { + bind_host: "127.0.0.1".into(), + external_host: None, + port: 50051, + grpc_port: 50052, + scheduler_host: "localhost".into(), + scheduler_port: 50050, + scheduler_connect_timeout_seconds: 0, + concurrent_tasks: std::thread::available_parallelism().unwrap().get(), + task_scheduling_policy: Default::default(), + log_dir: None, + work_dir: None, + special_mod_log_level: "INFO".into(), + print_thread_info: true, + log_rotation_policy: Default::default(), + job_data_ttl_seconds: 604800, + job_data_clean_up_interval_seconds: 0, + grpc_max_decoding_message_size: 16777216, + grpc_max_encoding_message_size: 16777216, + executor_heartbeat_interval_seconds: 60, + override_execution_engine: None, + override_function_registry: None, + override_runtime_producer: None, + override_config_producer: None, + override_logical_codec: None, + override_physical_codec: None, + } + } +} + pub async fn start_executor_process(opt: Arc) -> Result<()> { let addr = format!("{}:{}", opt.bind_host, opt.port); let addr = addr diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index 7bb85bd48..9ddb8b6e5 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -135,15 +135,15 @@ impl Default for SchedulerConfig { fn default() -> Self { Self { namespace: String::default(), - external_host: "localhost".to_string(), + external_host: "localhost".into(), bind_port: 50050, - scheduling_policy: TaskSchedulingPolicy::PullStaged, + scheduling_policy: Default::default(), event_loop_buffer_size: 10000, - task_distribution: TaskDistributionPolicy::Bias, + task_distribution: Default::default(), finished_job_data_clean_up_interval_seconds: 300, finished_job_state_clean_up_interval_seconds: 3600, advertise_flight_sql_endpoint: None, - cluster_storage: ClusterStorageConfig::Memory, + cluster_storage: Default::default(), job_resubmit_interval_ms: None, executor_termination_grace_period: 0, scheduler_event_expected_processing_duration: 0, @@ -248,8 +248,9 @@ impl SchedulerConfig { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub enum ClusterStorageConfig { + #[default] Memory, } @@ -285,10 +286,11 @@ impl parse_arg::ParseArgFromStr for TaskDistribution { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] pub enum TaskDistributionPolicy { /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor /// as are currently available + #[default] Bias, /// Distribute tasks evenly across executors. This will try and iterate through available executors /// and assign one task to each executor until all tasks are assigned. @@ -332,7 +334,7 @@ impl TryFrom for SchedulerConfig { finished_job_state_clean_up_interval_seconds: opt .finished_job_state_clean_up_interval_seconds, advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint, - cluster_storage: ClusterStorageConfig::Memory, + cluster_storage: Default::default(), job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0) .then_some(opt.job_resubmit_interval_ms), executor_termination_grace_period: opt.executor_termination_grace_period,