Skip to content

Commit

Permalink
feat: default instance for executor configuration (#1147)
Browse files Browse the repository at this point in the history
and clean up scheduler configuration.
  • Loading branch information
milenkovicm authored Dec 7, 2024
1 parent cc2ddcb commit 3af9ae0
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 25 deletions.
6 changes: 4 additions & 2 deletions ballista/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,9 @@ impl datafusion::config::ConfigExtension for BallistaConfig {

// an enum used to configure the scheduler policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
pub enum TaskSchedulingPolicy {
#[default]
PullStaged,
PushStaged,
}
Expand All @@ -274,11 +275,12 @@ impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy {

// an enum used to configure the log rolling policy
// needs to be visible to code generated by configure_me
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)]
#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)]
pub enum LogRotationPolicy {
Minutely,
Hourly,
Daily,
#[default]
Never,
}

Expand Down
13 changes: 7 additions & 6 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,18 @@ async fn main() -> Result<()> {
// File layer
if let Some(log_dir) = &config.log_dir {
let log_file = match config.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &config.log_file_name_prefix)
}
LogRotationPolicy::Minutely => tracing_appender::rolling::minutely(
log_dir,
config.log_file_name_prefix(),
),
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &config.log_file_name_prefix)
tracing_appender::rolling::hourly(log_dir, config.log_file_name_prefix())
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &config.log_file_name_prefix)
tracing_appender::rolling::daily(log_dir, config.log_file_name_prefix())
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &config.log_file_name_prefix)
tracing_appender::rolling::never(log_dir, config.log_file_name_prefix())
}
};

Expand Down
9 changes: 0 additions & 9 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ impl TryFrom<Config> for ExecutorProcessConfig {
type Error = BallistaError;

fn try_from(opt: Config) -> Result<Self, Self::Error> {
let log_file_name_prefix = format!(
"executor_{}_{}",
opt.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
opt.bind_port
);

Ok(ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
Expand All @@ -48,7 +40,6 @@ impl TryFrom<Config> for ExecutorProcessConfig {
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
log_rotation_policy: opt.log_rotation_policy,
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
Expand Down
45 changes: 44 additions & 1 deletion ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ pub struct ExecutorProcessConfig {
pub work_dir: Option<String>,
pub special_mod_log_level: String,
pub print_thread_info: bool,
pub log_file_name_prefix: String,
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
Expand All @@ -105,6 +104,50 @@ pub struct ExecutorProcessConfig {
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
}

impl ExecutorProcessConfig {
pub fn log_file_name_prefix(&self) -> String {
format!(
"executor_{}_{}",
self.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
self.port
)
}
}

impl Default for ExecutorProcessConfig {
fn default() -> Self {
Self {
bind_host: "127.0.0.1".into(),
external_host: None,
port: 50051,
grpc_port: 50052,
scheduler_host: "localhost".into(),
scheduler_port: 50050,
scheduler_connect_timeout_seconds: 0,
concurrent_tasks: std::thread::available_parallelism().unwrap().get(),
task_scheduling_policy: Default::default(),
log_dir: None,
work_dir: None,
special_mod_log_level: "INFO".into(),
print_thread_info: true,
log_rotation_policy: Default::default(),
job_data_ttl_seconds: 604800,
job_data_clean_up_interval_seconds: 0,
grpc_max_decoding_message_size: 16777216,
grpc_max_encoding_message_size: 16777216,
executor_heartbeat_interval_seconds: 60,
override_execution_engine: None,
override_function_registry: None,
override_runtime_producer: None,
override_config_producer: None,
override_logical_codec: None,
override_physical_codec: None,
}
}
}

pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
let addr = format!("{}:{}", opt.bind_host, opt.port);
let addr = addr
Expand Down
16 changes: 9 additions & 7 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,15 @@ impl Default for SchedulerConfig {
fn default() -> Self {
Self {
namespace: String::default(),
external_host: "localhost".to_string(),
external_host: "localhost".into(),
bind_port: 50050,
scheduling_policy: TaskSchedulingPolicy::PullStaged,
scheduling_policy: Default::default(),
event_loop_buffer_size: 10000,
task_distribution: TaskDistributionPolicy::Bias,
task_distribution: Default::default(),
finished_job_data_clean_up_interval_seconds: 300,
finished_job_state_clean_up_interval_seconds: 3600,
advertise_flight_sql_endpoint: None,
cluster_storage: ClusterStorageConfig::Memory,
cluster_storage: Default::default(),
job_resubmit_interval_ms: None,
executor_termination_grace_period: 0,
scheduler_event_expected_processing_duration: 0,
Expand Down Expand Up @@ -248,8 +248,9 @@ impl SchedulerConfig {
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub enum ClusterStorageConfig {
#[default]
Memory,
}

Expand Down Expand Up @@ -285,10 +286,11 @@ impl parse_arg::ParseArgFromStr for TaskDistribution {
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Default)]
pub enum TaskDistributionPolicy {
/// Eagerly assign tasks to executor slots. This will assign as many task slots per executor
/// as are currently available
#[default]
Bias,
/// Distribute tasks evenly across executors. This will try and iterate through available executors
/// and assign one task to each executor until all tasks are assigned.
Expand Down Expand Up @@ -332,7 +334,7 @@ impl TryFrom<Config> for SchedulerConfig {
finished_job_state_clean_up_interval_seconds: opt
.finished_job_state_clean_up_interval_seconds,
advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint,
cluster_storage: ClusterStorageConfig::Memory,
cluster_storage: Default::default(),
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
.then_some(opt.job_resubmit_interval_ms),
executor_termination_grace_period: opt.executor_termination_grace_period,
Expand Down

0 comments on commit 3af9ae0

Please sign in to comment.