Skip to content

Commit

Permalink
Make easier to create custom schedulers and executors (#1118)
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm authored Nov 22, 2024
1 parent 4e8c64b commit 169cadd
Show file tree
Hide file tree
Showing 21 changed files with 1,150 additions and 209 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
resolver = "2"

[workspace.dependencies]
anyhow = "1"
arrow = { version = "53", features = ["ipc_compression"] }
arrow-flight = { version = "53", features = ["flight-sql-experimental"] }
clap = { version = "3", features = ["derive", "cargo"] }
Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ path = "src/bin/main.rs"
default = ["mimalloc"]

[dependencies]
anyhow = "1"
anyhow = { workspace = true }
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
Expand Down
90 changes: 37 additions & 53 deletions ballista/executor/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,15 @@
//! Ballista Rust executor binary.
use anyhow::Result;
use std::sync::Arc;

use ballista_core::config::LogRotationPolicy;
use ballista_core::print_version;
use ballista_executor::config::prelude::*;
use ballista_executor::executor_process::{
start_executor_process, ExecutorProcessConfig,
};
use config::prelude::*;

#[allow(unused_imports)]
#[macro_use]
extern crate configure_me;

#[allow(clippy::all, warnings)]
mod config {
// Ideally we would use the include_config macro from configure_me, but then we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
}
use std::env;
use std::sync::Arc;
use tracing_subscriber::EnvFilter;

#[cfg(feature = "mimalloc")]
#[global_allocator]
Expand All @@ -53,46 +44,39 @@ async fn main() -> Result<()> {
std::process::exit(0);
}

let log_file_name_prefix = format!(
"executor_{}_{}",
opt.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
opt.bind_port
);
let config: ExecutorProcessConfig = opt.try_into()?;

let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter =
EnvFilter::new(rust_log.unwrap_or(config.special_mod_log_level.clone()));

let tracing = tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(config.print_thread_info)
.with_thread_ids(config.print_thread_info)
.with_env_filter(log_filter);

let config = ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
bind_host: opt.bind_host,
port: opt.bind_port,
grpc_port: opt.bind_grpc_port,
scheduler_host: opt.scheduler_host,
scheduler_port: opt.scheduler_port,
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
log_rotation_policy: opt.log_rotation_policy,
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
data_cache_policy: opt.data_cache_policy,
cache_dir: opt.cache_dir,
cache_capacity: opt.cache_capacity,
cache_io_concurrency: opt.cache_io_concurrency,
execution_engine: None,
function_registry: None,
config_producer: None,
runtime_producer: None,
logical_codec: None,
physical_codec: None,
};
// File layer
if let Some(log_dir) = &config.log_dir {
let log_file = match config.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &config.log_file_name_prefix)
}
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &config.log_file_name_prefix)
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &config.log_file_name_prefix)
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &config.log_file_name_prefix)
}
};

tracing.with_writer(log_file).init();
} else {
tracing.init();
}

start_executor_process(Arc::new(config)).await
}
71 changes: 71 additions & 0 deletions ballista/executor/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use ballista_core::error::BallistaError;

use crate::executor_process::ExecutorProcessConfig;

// Ideally we would use the include_config macro from configure_me, but then we cannot use
// #[allow(clippy::all)] to silence clippy warnings from the generated code
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));

impl TryFrom<Config> for ExecutorProcessConfig {
type Error = BallistaError;

fn try_from(opt: Config) -> Result<Self, Self::Error> {
let log_file_name_prefix = format!(
"executor_{}_{}",
opt.external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
opt.bind_port
);

Ok(ExecutorProcessConfig {
special_mod_log_level: opt.log_level_setting,
external_host: opt.external_host,
bind_host: opt.bind_host,
port: opt.bind_port,
grpc_port: opt.bind_grpc_port,
scheduler_host: opt.scheduler_host,
scheduler_port: opt.scheduler_port,
scheduler_connect_timeout_seconds: opt.scheduler_connect_timeout_seconds,
concurrent_tasks: opt.concurrent_tasks,
task_scheduling_policy: opt.task_scheduling_policy,
work_dir: opt.work_dir,
log_dir: opt.log_dir,
log_file_name_prefix,
log_rotation_policy: opt.log_rotation_policy,
print_thread_info: opt.print_thread_info,
job_data_ttl_seconds: opt.job_data_ttl_seconds,
job_data_clean_up_interval_seconds: opt.job_data_clean_up_interval_seconds,
grpc_max_decoding_message_size: opt.grpc_server_max_decoding_message_size,
grpc_max_encoding_message_size: opt.grpc_server_max_encoding_message_size,
executor_heartbeat_interval_seconds: opt.executor_heartbeat_interval_seconds,
data_cache_policy: opt.data_cache_policy,
cache_dir: opt.cache_dir,
cache_capacity: opt.cache_capacity,
cache_io_concurrency: opt.cache_io_concurrency,
override_execution_engine: None,
override_function_registry: None,
override_config_producer: None,
override_runtime_producer: None,
override_logical_codec: None,
override_physical_codec: None,
})
}
}
72 changes: 18 additions & 54 deletions ballista/executor/src/executor_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use std::{env, io};

use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
Expand All @@ -37,7 +36,6 @@ use tokio::signal;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::{fs, time};
use tracing_subscriber::EnvFilter;
use uuid::Uuid;

use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
Expand Down Expand Up @@ -98,57 +96,20 @@ pub struct ExecutorProcessConfig {
pub executor_heartbeat_interval_seconds: u64,
/// Optional execution engine to use to execute physical plans, will default to
/// DataFusion if none is provided.
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
pub override_execution_engine: Option<Arc<dyn ExecutionEngine>>,
/// Overrides default function registry
pub function_registry: Option<Arc<BallistaFunctionRegistry>>,
pub override_function_registry: Option<Arc<BallistaFunctionRegistry>>,
/// [RuntimeProducer] override option
pub runtime_producer: Option<RuntimeProducer>,
pub override_runtime_producer: Option<RuntimeProducer>,
/// [ConfigProducer] override option
pub config_producer: Option<ConfigProducer>,
pub override_config_producer: Option<ConfigProducer>,
/// [PhysicalExtensionCodec] override option
pub logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
pub override_logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
/// [PhysicalExtensionCodec] override option
pub physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
pub override_physical_codec: Option<Arc<dyn PhysicalExtensionCodec>>,
}

pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter =
EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
// File layer
if let Some(log_dir) = opt.log_dir.clone() {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix)
}
};
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(log_file)
.with_env_filter(log_filter)
.init();
} else {
// Console layer
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();
}

let addr = format!("{}:{}", opt.bind_host, opt.port);
let addr = addr
.parse()
Expand Down Expand Up @@ -194,23 +155,26 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
// put them to session config
let metrics_collector = Arc::new(LoggingMetricsCollector::default());
let config_producer = opt
.config_producer
.override_config_producer
.clone()
.unwrap_or_else(|| Arc::new(default_config_producer));

let wd = work_dir.clone();
let runtime_producer: RuntimeProducer = Arc::new(move |_| {
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
Ok(Arc::new(RuntimeEnv::new(config)?))
});
let runtime_producer: RuntimeProducer =
opt.override_runtime_producer.clone().unwrap_or_else(|| {
Arc::new(move |_| {
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
Ok(Arc::new(RuntimeEnv::new(config)?))
})
});

let logical = opt
.logical_codec
.override_logical_codec
.clone()
.unwrap_or_else(|| Arc::new(BallistaLogicalExtensionCodec::default()));

let physical = opt
.physical_codec
.override_physical_codec
.clone()
.unwrap_or_else(|| Arc::new(BallistaPhysicalExtensionCodec::default()));

Expand All @@ -224,10 +188,10 @@ pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<(
&work_dir,
runtime_producer,
config_producer,
opt.function_registry.clone().unwrap_or_default(),
opt.override_function_registry.clone().unwrap_or_default(),
metrics_collector,
concurrent_tasks,
opt.execution_engine.clone(),
opt.override_execution_engine.clone(),
));

let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#![doc = include_str!("../README.md")]

pub mod collect;
pub mod config;
pub mod execution_engine;
pub mod execution_loop;
pub mod executor;
Expand Down
2 changes: 1 addition & 1 deletion ballista/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ prometheus-metrics = ["prometheus", "once_cell"]
rest-api = []

[dependencies]
anyhow = "1"
anyhow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
axum = "0.7.7"
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/scheduler_config_spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ doc = "Delayed interval for cleaning up finished job state. Default: 3600"

[[param]]
name = "task_distribution"
type = "ballista_scheduler::config::TaskDistribution"
type = "crate::config::TaskDistribution"
doc = "The policy of distributing tasks to available executor slots, possible values: bias, round-robin, consistent-hash. Default: bias"
default = "ballista_scheduler::config::TaskDistribution::Bias"
default = "crate::config::TaskDistribution::Bias"

[[param]]
name = "consistent_hash_num_replicas"
Expand Down
Loading

0 comments on commit 169cadd

Please sign in to comment.