From 80c2c56cf00448dac3dae3e66a80c2b4450ad17b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 14 Dec 2024 13:29:25 +0000 Subject: [PATCH] chore: dependency cleanup (#1150) --- Cargo.toml | 6 +- ballista-cli/Cargo.toml | 1 - ballista/client/Cargo.toml | 8 +- ballista/core/Cargo.toml | 9 +- ballista/core/src/config.rs | 25 +-- ballista/core/src/diagram.rs | 148 ++++++++++++++++++ ballista/core/src/error.rs | 48 +----- ballista/core/src/lib.rs | 5 +- ballista/core/src/utils.rs | 129 +-------------- ballista/executor/Cargo.toml | 13 +- ballista/executor/build.rs | 8 +- ballista/executor/src/bin/main.rs | 3 +- ballista/executor/src/config.rs | 1 + ballista/executor/src/execution_loop.rs | 20 ++- ballista/executor/src/executor.rs | 4 +- ballista/executor/src/executor_process.rs | 39 +++-- ballista/executor/src/flight_service.rs | 12 +- ballista/executor/src/lib.rs | 1 + ballista/scheduler/Cargo.toml | 22 ++- ballista/scheduler/build.rs | 4 +- ballista/scheduler/src/bin/main.rs | 11 +- ballista/scheduler/src/cluster/memory.rs | 3 +- ballista/scheduler/src/cluster/mod.rs | 7 +- ballista/scheduler/src/config.rs | 72 ++------- ballista/scheduler/src/scheduler_process.rs | 8 +- .../scheduler_server/query_stage_scheduler.rs | 5 - ballista/scheduler/src/state/task_manager.rs | 4 +- benchmarks/Cargo.toml | 1 - examples/Cargo.toml | 6 +- examples/examples/custom-executor.rs | 36 ++--- examples/examples/custom-scheduler.rs | 46 +++--- python/Cargo.toml | 7 +- python/src/cluster.rs | 10 +- 33 files changed, 320 insertions(+), 402 deletions(-) create mode 100644 ballista/core/src/diagram.rs diff --git a/Cargo.toml b/Cargo.toml index a68222a2b..f92064585 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,6 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo resolver = "2" [workspace.dependencies] -anyhow = "1" arrow = { version = "53", features = ["ipc_compression"] } arrow-flight = { version = "53", features = ["flight-sql-experimental"] } clap = { version = "4.5", features = ["derive", "cargo"] } @@ -40,9 +39,9 @@ tonic-build = { version = "0.12", default-features = false, features = [ "transport", "prost" ] } -tracing = "0.1.36" +tracing = "0.1" tracing-appender = "0.2.2" -tracing-subscriber = { version = "0.3.15", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } ctor = { version = "0.2" } mimalloc = { version = "0.1" } @@ -58,7 +57,6 @@ dashmap = { version = "6.1" } async-trait = { version = "0.1.4" } serde = { version = "1.0" } tokio-stream = { version = "0.1" } -parse_arg = { version = "0.1" } url = { version = "2.5" } # cargo build --profile release-lto diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index 2f1ddeb0f..9b527e56d 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -25,7 +25,6 @@ keywords = ["ballista", "cli"] license = "Apache-2.0" homepage = "https://github.com/apache/arrow-ballista" repository = "https://github.com/apache/arrow-ballista" -rust-version = "1.72" readme = "README.md" [dependencies] diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml index 9614412f8..e462367a6 100644 --- a/ballista/client/Cargo.toml +++ b/ballista/client/Cargo.toml @@ -25,7 +25,6 @@ repository = "https://github.com/apache/arrow-ballista" readme = "README.md" authors = ["Apache DataFusion "] edition = "2021" -rust-version = "1.72" [dependencies] async-trait = { workspace = true } @@ -33,11 +32,8 @@ ballista-core = { path = "../core", version = "0.12.0" } ballista-executor = { path = "../executor", version = "0.12.0", optional = true } ballista-scheduler = { path = "../scheduler", version = "0.12.0", optional = true } datafusion = { workspace = true } -datafusion-proto = { workspace = true } -futures = { workspace = true } log = { workspace = true } -parking_lot = { workspace = true } -tempfile = { workspace = true } + tokio = { workspace = true } url = { workspace = true } @@ -45,8 +41,10 @@ url = { workspace = true } ballista-executor = { path = "../executor", version = "0.12.0" } ballista-scheduler = { path = "../scheduler", version = "0.12.0" } ctor = { workspace = true } +datafusion-proto = { workspace = true } env_logger = { workspace = true } rstest = { version = "0.23" } +tempfile = { workspace = true } tonic = { workspace = true } [features] diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 80a3d1028..1bf888582 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -34,25 +34,24 @@ exclude = ["*.proto"] rustc-args = ["--cfg", "docsrs"] [features] +build-binary = ["configure_me", "clap"] docsrs = [] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion/force_hash_collisions"] - [dependencies] arrow-flight = { workspace = true } async-trait = { workspace = true } chrono = { version = "0.4", default-features = false } -clap = { workspace = true } +clap = { workspace = true, optional = true } +configure_me = { workspace = true, optional = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } datafusion-proto-common = { workspace = true } futures = { workspace = true } - itertools = "0.13" log = { workspace = true } md-5 = { version = "^0.10.0" } -parse_arg = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } rand = { workspace = true } @@ -66,5 +65,5 @@ url = { workspace = true } tempfile = { workspace = true } [build-dependencies] -rustc_version = "0.4.0" +rustc_version = "0.4.1" tonic-build = { workspace = true } diff --git a/ballista/core/src/config.rs b/ballista/core/src/config.rs index e00cd1153..cb7f7c5d7 100644 --- a/ballista/core/src/config.rs +++ b/ballista/core/src/config.rs @@ -18,8 +18,6 @@ //! Ballista configuration -use clap::ValueEnum; -use core::fmt; use std::collections::HashMap; use std::result; @@ -252,30 +250,33 @@ impl datafusion::config::ConfigExtension for BallistaConfig { // an enum used to configure the scheduler policy // needs to be visible to code generated by configure_me -#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)] +#[derive(Clone, Copy, Debug, serde::Deserialize, Default)] +#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum TaskSchedulingPolicy { #[default] PullStaged, PushStaged, } +#[cfg(feature = "build-binary")] impl std::str::FromStr for TaskSchedulingPolicy { type Err = String; fn from_str(s: &str) -> std::result::Result { - ValueEnum::from_str(s, true) + clap::ValueEnum::from_str(s, true) } } - -impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy { - fn describe_type(mut writer: W) -> fmt::Result { +#[cfg(feature = "build-binary")] +impl configure_me::parse_arg::ParseArgFromStr for TaskSchedulingPolicy { + fn describe_type(mut writer: W) -> core::fmt::Result { write!(writer, "The scheduler policy for the scheduler") } } // an enum used to configure the log rolling policy // needs to be visible to code generated by configure_me -#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize, Default)] +#[derive(Clone, Copy, Debug, serde::Deserialize, Default)] +#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum LogRotationPolicy { Minutely, Hourly, @@ -284,16 +285,18 @@ pub enum LogRotationPolicy { Never, } +#[cfg(feature = "build-binary")] impl std::str::FromStr for LogRotationPolicy { type Err = String; fn from_str(s: &str) -> std::result::Result { - ValueEnum::from_str(s, true) + clap::ValueEnum::from_str(s, true) } } -impl parse_arg::ParseArgFromStr for LogRotationPolicy { - fn describe_type(mut writer: W) -> fmt::Result { +#[cfg(feature = "build-binary")] +impl configure_me::parse_arg::ParseArgFromStr for LogRotationPolicy { + fn describe_type(mut writer: W) -> core::fmt::Result { write!(writer, "The log rotation policy") } } diff --git a/ballista/core/src/diagram.rs b/ballista/core/src/diagram.rs new file mode 100644 index 000000000..9ef0da981 --- /dev/null +++ b/ballista/core/src/diagram.rs @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec}; + +use datafusion::datasource::physical_plan::{CsvExec, ParquetExec}; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::joins::HashJoinExec; +use datafusion::physical_plan::projection::ProjectionExec; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::ExecutionPlan; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +pub fn produce_diagram(filename: &str, stages: &[Arc]) -> Result<()> { + let write_file = File::create(filename)?; + let mut w = BufWriter::new(&write_file); + writeln!(w, "digraph G {{")?; + + // draw stages and entities + for stage in stages { + writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?; + writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?; + let mut id = AtomicUsize::new(0); + build_exec_plan_diagram( + &mut w, + stage.children()[0].as_ref(), + stage.stage_id(), + &mut id, + true, + )?; + writeln!(w, "\t}}")?; + } + + // draw relationships + for stage in stages { + let mut id = AtomicUsize::new(0); + build_exec_plan_diagram( + &mut w, + stage.children()[0].as_ref(), + stage.stage_id(), + &mut id, + false, + )?; + } + + write!(w, "}}")?; + Ok(()) +} + +fn build_exec_plan_diagram( + w: &mut BufWriter<&File>, + plan: &dyn ExecutionPlan, + stage_id: usize, + id: &mut AtomicUsize, + draw_entity: bool, +) -> Result { + let operator_str = if plan.as_any().downcast_ref::().is_some() { + "AggregateExec" + } else if plan.as_any().downcast_ref::().is_some() { + "SortExec" + } else if plan.as_any().downcast_ref::().is_some() { + "ProjectionExec" + } else if plan.as_any().downcast_ref::().is_some() { + "HashJoinExec" + } else if plan.as_any().downcast_ref::().is_some() { + "ParquetExec" + } else if plan.as_any().downcast_ref::().is_some() { + "CsvExec" + } else if plan.as_any().downcast_ref::().is_some() { + "FilterExec" + } else if plan.as_any().downcast_ref::().is_some() { + "ShuffleWriterExec" + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { + "UnresolvedShuffleExec" + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { + "CoalesceBatchesExec" + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { + "CoalescePartitionsExec" + } else { + println!("Unknown: {plan:?}"); + "Unknown" + }; + + let node_id = id.load(Ordering::SeqCst); + id.store(node_id + 1, Ordering::SeqCst); + + if draw_entity { + writeln!( + w, + "\t\tstage_{stage_id}_exec_{node_id} [shape=box, label=\"{operator_str}\"];" + )?; + } + for child in plan.children() { + if let Some(shuffle) = child.as_any().downcast_ref::() { + if !draw_entity { + writeln!( + w, + "\tstage_{}_exec_1 -> stage_{}_exec_{};", + shuffle.stage_id, stage_id, node_id + )?; + } + } else { + // relationships within same entity + let child_id = + build_exec_plan_diagram(w, child.as_ref(), stage_id, id, draw_entity)?; + if draw_entity { + writeln!( + w, + "\t\tstage_{stage_id}_exec_{child_id} -> stage_{stage_id}_exec_{node_id};" + )?; + } + } + } + Ok(node_id) +} diff --git a/ballista/core/src/error.rs b/ballista/core/src/error.rs index cbdd90a71..05a706cce 100644 --- a/ballista/core/src/error.rs +++ b/ballista/core/src/error.rs @@ -37,15 +37,11 @@ pub enum BallistaError { NotImplemented(String), General(String), Internal(String), + Configuration(String), ArrowError(ArrowError), DataFusionError(DataFusionError), SqlError(parser::ParserError), IoError(io::Error), - // ReqwestError(reqwest::Error), - // HttpError(http::Error), - // KubeAPIError(kube::error::Error), - // KubeAPIRequestError(k8s_openapi::RequestError), - // KubeAPIResponseError(k8s_openapi::ResponseError), TonicError(tonic::transport::Error), GrpcError(tonic::Status), GrpcConnectionError(String), @@ -112,36 +108,6 @@ impl From for BallistaError { } } -// impl From for BallistaError { -// fn from(e: reqwest::Error) -> Self { -// BallistaError::ReqwestError(e) -// } -// } -// -// impl From for BallistaError { -// fn from(e: http::Error) -> Self { -// BallistaError::HttpError(e) -// } -// } - -// impl From for BallistaError { -// fn from(e: kube::error::Error) -> Self { -// BallistaError::KubeAPIError(e) -// } -// } - -// impl From for BallistaError { -// fn from(e: k8s_openapi::RequestError) -> Self { -// BallistaError::KubeAPIRequestError(e) -// } -// } - -// impl From for BallistaError { -// fn from(e: k8s_openapi::ResponseError) -> Self { -// BallistaError::KubeAPIResponseError(e) -// } -// } - impl From for BallistaError { fn from(e: tonic::transport::Error) -> Self { BallistaError::TonicError(e) @@ -191,15 +157,6 @@ impl Display for BallistaError { } BallistaError::SqlError(ref desc) => write!(f, "SQL error: {desc}"), BallistaError::IoError(ref desc) => write!(f, "IO error: {desc}"), - // BallistaError::ReqwestError(ref desc) => write!(f, "Reqwest error: {}", desc), - // BallistaError::HttpError(ref desc) => write!(f, "HTTP error: {}", desc), - // BallistaError::KubeAPIError(ref desc) => write!(f, "Kube API error: {}", desc), - // BallistaError::KubeAPIRequestError(ref desc) => { - // write!(f, "KubeAPI request error: {}", desc) - // } - // BallistaError::KubeAPIResponseError(ref desc) => { - // write!(f, "KubeAPI response error: {}", desc) - // } BallistaError::TonicError(desc) => write!(f, "Tonic error: {desc}"), BallistaError::GrpcError(desc) => write!(f, "Grpc error: {desc}"), BallistaError::GrpcConnectionError(desc) => { @@ -220,6 +177,9 @@ impl Display for BallistaError { ) } BallistaError::Cancelled => write!(f, "Task cancelled"), + BallistaError::Configuration(desc) => { + write!(f, "Configuration error: {desc}") + } } } } diff --git a/ballista/core/src/lib.rs b/ballista/core/src/lib.rs index c2d92d353..7864d56ec 100644 --- a/ballista/core/src/lib.rs +++ b/ballista/core/src/lib.rs @@ -29,15 +29,14 @@ pub fn print_version() { pub mod client; pub mod config; pub mod consistent_hash; +pub mod diagram; pub mod error; pub mod event_loop; pub mod execution_plans; pub mod extension; pub mod registry; -pub mod utils; - -#[macro_use] pub mod serde; +pub mod utils; /// /// [RuntimeProducer] is a factory which creates runtime [RuntimeEnv] diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 1506c2bb5..14eeb9a21 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -17,9 +17,7 @@ use crate::config::BallistaConfig; use crate::error::{BallistaError, Result}; -use crate::execution_plans::{ - DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec, -}; +use crate::execution_plans::DistributedQueryExec; use crate::extension::SessionConfigExt; use crate::serde::scheduler::PartitionStats; @@ -32,29 +30,19 @@ use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::ipc::CompressionType; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor}; -use datafusion::datasource::physical_plan::{CsvExec, ParquetExec}; use datafusion::error::DataFusionError; use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState}; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan}; -use datafusion::physical_plan::aggregates::AggregateExec; -use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::joins::HashJoinExec; use datafusion::physical_plan::metrics::MetricsSet; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec}; use futures::StreamExt; use log::error; -use std::io::{BufWriter, Write}; use std::marker::PhantomData; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{fs::File, pin::Pin}; @@ -129,121 +117,6 @@ pub async fn collect_stream( Ok(batches) } -pub fn produce_diagram(filename: &str, stages: &[Arc]) -> Result<()> { - let write_file = File::create(filename)?; - let mut w = BufWriter::new(&write_file); - writeln!(w, "digraph G {{")?; - - // draw stages and entities - for stage in stages { - writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?; - writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?; - let mut id = AtomicUsize::new(0); - build_exec_plan_diagram( - &mut w, - stage.children()[0].as_ref(), - stage.stage_id(), - &mut id, - true, - )?; - writeln!(w, "\t}}")?; - } - - // draw relationships - for stage in stages { - let mut id = AtomicUsize::new(0); - build_exec_plan_diagram( - &mut w, - stage.children()[0].as_ref(), - stage.stage_id(), - &mut id, - false, - )?; - } - - write!(w, "}}")?; - Ok(()) -} - -fn build_exec_plan_diagram( - w: &mut BufWriter<&File>, - plan: &dyn ExecutionPlan, - stage_id: usize, - id: &mut AtomicUsize, - draw_entity: bool, -) -> Result { - let operator_str = if plan.as_any().downcast_ref::().is_some() { - "AggregateExec" - } else if plan.as_any().downcast_ref::().is_some() { - "SortExec" - } else if plan.as_any().downcast_ref::().is_some() { - "ProjectionExec" - } else if plan.as_any().downcast_ref::().is_some() { - "HashJoinExec" - } else if plan.as_any().downcast_ref::().is_some() { - "ParquetExec" - } else if plan.as_any().downcast_ref::().is_some() { - "CsvExec" - } else if plan.as_any().downcast_ref::().is_some() { - "FilterExec" - } else if plan.as_any().downcast_ref::().is_some() { - "ShuffleWriterExec" - } else if plan - .as_any() - .downcast_ref::() - .is_some() - { - "UnresolvedShuffleExec" - } else if plan - .as_any() - .downcast_ref::() - .is_some() - { - "CoalesceBatchesExec" - } else if plan - .as_any() - .downcast_ref::() - .is_some() - { - "CoalescePartitionsExec" - } else { - println!("Unknown: {plan:?}"); - "Unknown" - }; - - let node_id = id.load(Ordering::SeqCst); - id.store(node_id + 1, Ordering::SeqCst); - - if draw_entity { - writeln!( - w, - "\t\tstage_{stage_id}_exec_{node_id} [shape=box, label=\"{operator_str}\"];" - )?; - } - for child in plan.children() { - if let Some(shuffle) = child.as_any().downcast_ref::() { - if !draw_entity { - writeln!( - w, - "\tstage_{}_exec_1 -> stage_{}_exec_{};", - shuffle.stage_id, stage_id, node_id - )?; - } - } else { - // relationships within same entity - let child_id = - build_exec_plan_diagram(w, child.as_ref(), stage_id, id, draw_entity)?; - if draw_entity { - writeln!( - w, - "\t\tstage_{stage_id}_exec_{child_id} -> stage_{stage_id}_exec_{node_id};" - )?; - } - } - } - Ok(node_id) -} - pub struct BallistaQueryPlanner { scheduler_url: String, config: BallistaConfig, diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index abe256ebe..6a2dfa619 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -32,17 +32,18 @@ executor = "executor_config_spec.toml" [[bin]] name = "ballista-executor" path = "src/bin/main.rs" +required-features = ["build-binary"] [features] -default = ["mimalloc"] +build-binary = ["configure_me", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] +default = ["build-binary", "mimalloc"] [dependencies] -anyhow = { workspace = true } arrow = { workspace = true } arrow-flight = { workspace = true } async-trait = { workspace = true } ballista-core = { path = "../core", version = "0.12.0" } -configure_me = { workspace = true } +configure_me = { workspace = true, optional = true } dashmap = { workspace = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } @@ -60,9 +61,9 @@ tokio = { workspace = true, features = [ ] } tokio-stream = { workspace = true, features = ["net"] } tonic = { workspace = true } -tracing = { workspace = true } -tracing-appender = { workspace = true } -tracing-subscriber = { workspace = true } +tracing = { workspace = true, optional = true } +tracing-appender = { workspace = true, optional = true } +tracing-subscriber = { workspace = true, optional = true } uuid = { workspace = true } [dev-dependencies] diff --git a/ballista/executor/build.rs b/ballista/executor/build.rs index 7d2b9b87b..21ce2d8fe 100644 --- a/ballista/executor/build.rs +++ b/ballista/executor/build.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -extern crate configure_me_codegen; - fn main() -> Result<(), String> { + #[cfg(feature = "build-binary")] println!("cargo:rerun-if-changed=executor_config_spec.toml"); + #[cfg(feature = "build-binary")] configure_me_codegen::build_script_auto() - .map_err(|e| format!("configure_me code generation failed: {e}")) + .map_err(|e| format!("configure_me code generation failed: {e}"))?; + + Ok(()) } diff --git a/ballista/executor/src/bin/main.rs b/ballista/executor/src/bin/main.rs index 2ab1a90c7..18abb9960 100644 --- a/ballista/executor/src/bin/main.rs +++ b/ballista/executor/src/bin/main.rs @@ -17,7 +17,6 @@ //! Ballista Rust executor binary. -use anyhow::Result; use ballista_core::config::LogRotationPolicy; use ballista_core::print_version; use ballista_executor::config::prelude::*; @@ -33,7 +32,7 @@ use tracing_subscriber::EnvFilter; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> ballista_core::error::Result<()> { // parse command-line arguments let (opt, _remaining_args) = Config::including_optional_config_files(&["/etc/ballista/executor.toml"]) diff --git a/ballista/executor/src/config.rs b/ballista/executor/src/config.rs index 65fa9d467..91b547327 100644 --- a/ballista/executor/src/config.rs +++ b/ballista/executor/src/config.rs @@ -21,6 +21,7 @@ use crate::executor_process::ExecutorProcessConfig; // Ideally we would use the include_config macro from configure_me, but then we cannot use // #[allow(clippy::all)] to silence clippy warnings from the generated code + include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs")); impl TryFrom for ExecutorProcessConfig { diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 649b366b4..2094425d7 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -77,16 +77,14 @@ pub async fn poll_loop let task_status: Vec = sample_tasks_status(&mut task_status_receiver).await; - let poll_work_result: anyhow::Result< - tonic::Response, - tonic::Status, - > = scheduler - .poll_work(PollWorkParams { - metadata: Some(executor.metadata.clone()), - num_free_slots: available_task_slots.available_permits() as u32, - task_status, - }) - .await; + let poll_work_result: Result, tonic::Status> = + scheduler + .poll_work(PollWorkParams { + metadata: Some(executor.metadata.clone()), + num_free_slots: available_task_slots.available_permits() as u32, + task_status, + }) + .await; match poll_work_result { Ok(result) => { @@ -274,7 +272,7 @@ async fn sample_tasks_status( loop { match task_status_receiver.try_recv() { - anyhow::Result::Ok(status) => { + Result::Ok(status) => { task_status.push(status); } Err(TryRecvError::Empty) => { diff --git a/ballista/executor/src/executor.rs b/ballista/executor/src/executor.rs index 1ebf3e56f..1b029e171 100644 --- a/ballista/executor/src/executor.rs +++ b/ballista/executor/src/executor.rs @@ -215,13 +215,13 @@ impl Executor { mod test { use crate::execution_engine::DefaultQueryStageExec; use crate::executor::Executor; - use arrow::datatypes::{Schema, SchemaRef}; - use arrow::record_batch::RecordBatch; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf::ExecutorRegistration; use ballista_core::serde::scheduler::PartitionId; use ballista_core::utils::default_config_producer; use ballista_core::RuntimeProducer; + use datafusion::arrow::datatypes::{Schema, SchemaRef}; + use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index ed6902881..e350f391e 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -22,7 +22,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant, UNIX_EPOCH}; -use anyhow::{Context, Result}; use arrow_flight::flight_service_server::FlightServiceServer; use ballista_core::registry::BallistaFunctionRegistry; use datafusion_proto::logical_plan::LogicalExtensionCodec; @@ -148,11 +147,13 @@ impl Default for ExecutorProcessConfig { } } -pub async fn start_executor_process(opt: Arc) -> Result<()> { +pub async fn start_executor_process( + opt: Arc, +) -> ballista_core::error::Result<()> { let addr = format!("{}:{}", opt.bind_host, opt.port); - let addr = addr - .parse() - .with_context(|| format!("Could not parse address: {addr}"))?; + let addr = addr.parse().map_err(|e: std::net::AddrParseError| { + BallistaError::Configuration(e.to_string()) + })?; let scheduler_host = opt.scheduler_host.clone(); let scheduler_port = opt.scheduler_port; @@ -237,7 +238,11 @@ pub async fn start_executor_process(opt: Arc) -> Result<( let connection = if connect_timeout == 0 { create_grpc_client_connection(scheduler_url) .await - .context("Could not connect to scheduler") + .map_err(|_| { + BallistaError::GrpcConnectionError( + "Could not connect to scheduler".to_string(), + ) + }) } else { // this feature was added to support docker-compose so that we can have the executor // wait for the scheduler to start, or at least run for 10 seconds before failing so @@ -249,8 +254,11 @@ pub async fn start_executor_process(opt: Arc) -> Result<( { match create_grpc_client_connection(scheduler_url.clone()) .await - .context("Could not connect to scheduler") - { + .map_err(|_| { + BallistaError::GrpcConnectionError( + "Could not connect to scheduler".to_string(), + ) + }) { Ok(conn) => { info!("Connected to scheduler at {}", scheduler_url); x = Some(conn); @@ -268,8 +276,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( Some(conn) => Ok(conn), _ => Err(BallistaError::General(format!( "Timed out attempting to connect to scheduler at {scheduler_url}" - )) - .into()), + ))), } }?; @@ -489,7 +496,10 @@ async fn check_services( /// This function will be scheduled periodically for cleanup the job shuffle data left on the executor. /// Only directories will be checked cleaned. -async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> { +async fn clean_shuffle_data_loop( + work_dir: &str, + seconds: u64, +) -> ballista_core::error::Result<()> { let mut dir = fs::read_dir(work_dir).await?; let mut to_deleted = Vec::new(); while let Some(child) = dir.next_entry().await? { @@ -527,7 +537,7 @@ async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> { } /// This function will clean up all shuffle data on this executor -async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> { +async fn clean_all_shuffle_data(work_dir: &str) -> ballista_core::error::Result<()> { let mut dir = fs::read_dir(work_dir).await?; let mut to_deleted = Vec::new(); while let Some(child) = dir.next_entry().await? { @@ -552,7 +562,10 @@ async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> { /// Determines if a directory contains files newer than the cutoff time. /// If return true, it means the directory contains files newer than the cutoff time. It satisfy the ttl and should not be deleted. -pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result { +pub async fn satisfy_dir_ttl( + dir: DirEntry, + ttl_seconds: u64, +) -> ballista_core::error::Result { let cutoff = get_time_before(ttl_seconds); let mut to_check = vec![dir]; diff --git a/ballista/executor/src/flight_service.rs b/ballista/executor/src/flight_service.rs index a96a752c2..939b5a8f5 100644 --- a/ballista/executor/src/flight_service.rs +++ b/ballista/executor/src/flight_service.rs @@ -17,24 +17,24 @@ //! Implementation of the Apache Arrow Flight protocol that wraps an executor. -use arrow::ipc::reader::StreamReader; +use datafusion::arrow::ipc::reader::StreamReader; use std::convert::TryFrom; use std::fs::File; use std::pin::Pin; -use arrow::ipc::CompressionType; use arrow_flight::encode::FlightDataEncoderBuilder; use arrow_flight::error::FlightError; use ballista_core::error::BallistaError; use ballista_core::serde::decode_protobuf; use ballista_core::serde::scheduler::Action as BallistaAction; +use datafusion::arrow::ipc::CompressionType; -use arrow::ipc::writer::IpcWriteOptions; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PollInfo, PutResult, SchemaResult, Ticket, }; +use datafusion::arrow::ipc::writer::IpcWriteOptions; use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch}; use futures::{Stream, StreamExt, TryStreamExt}; use log::{debug, info}; @@ -45,7 +45,6 @@ use tokio::{sync::mpsc::Sender, task}; use tokio_stream::wrappers::ReceiverStream; use tonic::metadata::MetadataValue; use tonic::{Request, Response, Status, Streaming}; -use tracing::warn; /// Service implementing the Apache Arrow Flight Protocol #[derive(Clone)] @@ -103,7 +102,10 @@ impl FlightService for BallistaFlightService { let schema = reader.schema(); task::spawn_blocking(move || { if let Err(e) = read_partition(reader, tx) { - warn!(error = %e, "error streaming shuffle partition"); + log::warn!( + "error streaming shuffle partition: {}", + e.to_string() + ); } }); diff --git a/ballista/executor/src/lib.rs b/ballista/executor/src/lib.rs index f0284cbdb..23e68f85c 100644 --- a/ballista/executor/src/lib.rs +++ b/ballista/executor/src/lib.rs @@ -18,6 +18,7 @@ #![doc = include_str!("../README.md")] pub mod collect; +#[cfg(feature = "build-binary")] pub mod config; pub mod execution_engine; pub mod execution_loop; diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index ad3e09636..fc3ca09a8 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -32,34 +32,33 @@ scheduler = "scheduler_config_spec.toml" [[bin]] name = "ballista-scheduler" path = "src/bin/main.rs" +required-features = ["build-binary"] [features] -default = [] -flight-sql = [] +build-binary = ["configure_me", "clap", "tracing-subscriber", "tracing-appender", "tracing", "ballista-core/build-binary"] +default = ["build-binary"] +flight-sql = ["base64"] keda-scaler = [] prometheus-metrics = ["prometheus", "once_cell"] rest-api = [] [dependencies] -anyhow = { workspace = true } arrow-flight = { workspace = true } async-trait = { workspace = true } axum = "0.7.7" ballista-core = { path = "../core", version = "0.12.0" } -base64 = { version = "0.22" } -clap = { workspace = true } -configure_me = { workspace = true } +base64 = { version = "0.22", optional = true } +clap = { workspace = true, optional = true } +configure_me = { workspace = true, optional = true } dashmap = { workspace = true } datafusion = { workspace = true } datafusion-proto = { workspace = true } futures = { workspace = true } -graphviz-rust = "0.9.0" http = "1.1" log = { workspace = true } object_store = { workspace = true } once_cell = { version = "1.16.0", optional = true } parking_lot = { workspace = true } -parse_arg = { workspace = true } prometheus = { version = "0.13", features = ["process"], optional = true } prost = { workspace = true } prost-types = { workspace = true } @@ -68,13 +67,12 @@ serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true, features = ["net"] } tonic = { workspace = true } -tracing = { workspace = true } -tracing-appender = { workspace = true } -tracing-subscriber = { workspace = true } +tracing = { workspace = true, optional = true } +tracing-appender = { workspace = true, optional = true } +tracing-subscriber = { workspace = true, optional = true } uuid = { workspace = true } [dev-dependencies] -ballista-core = { path = "../core", version = "0.12.0" } [build-dependencies] configure_me_codegen = { workspace = true } diff --git a/ballista/scheduler/build.rs b/ballista/scheduler/build.rs index 5a3e00cc1..9f2f123f2 100644 --- a/ballista/scheduler/build.rs +++ b/ballista/scheduler/build.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -extern crate configure_me_codegen; - fn main() -> Result<(), String> { + #[cfg(feature = "build-binary")] println!("cargo:rerun-if-changed=scheduler_config_spec.toml"); + #[cfg(feature = "build-binary")] configure_me_codegen::build_script_auto() .map_err(|e| format!("configure_me code generation failed: {e}"))?; diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index f6a063284..ea31810a9 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -17,8 +17,8 @@ //! Ballista Rust scheduler binary. -use anyhow::Result; use ballista_core::config::LogRotationPolicy; +use ballista_core::error::BallistaError; use ballista_core::print_version; use ballista_scheduler::cluster::BallistaCluster; use ballista_scheduler::config::{Config, ResultExt}; @@ -27,7 +27,7 @@ use std::sync::Arc; use std::{env, io}; use tracing_subscriber::EnvFilter; -fn main() -> Result<()> { +fn main() -> ballista_core::error::Result<()> { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_io() .enable_time() @@ -37,7 +37,7 @@ fn main() -> Result<()> { runtime.block_on(inner()) } -async fn inner() -> Result<()> { +async fn inner() -> ballista_core::error::Result<()> { // parse options let (opt, _remaining_args) = Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"]) @@ -85,7 +85,10 @@ async fn inner() -> Result<()> { tracing.init(); } let addr = format!("{}:{}", opt.bind_host, opt.bind_port); - let addr = addr.parse()?; + let addr = addr.parse().map_err(|e: std::net::AddrParseError| { + BallistaError::Configuration(e.to_string()) + })?; + let config = opt.try_into()?; let cluster = BallistaCluster::new_from_config(&config).await?; start_server(cluster, addr, Arc::new(config)).await?; diff --git a/ballista/scheduler/src/cluster/memory.rs b/ballista/scheduler/src/cluster/memory.rs index c9eac5640..07f646b8c 100644 --- a/ballista/scheduler/src/cluster/memory.rs +++ b/ballista/scheduler/src/cluster/memory.rs @@ -37,7 +37,7 @@ use crate::scheduler_server::{timestamp_millis, timestamp_secs, SessionBuilder}; use crate::state::session_manager::create_datafusion_context; use crate::state::task_manager::JobInfoCache; use ballista_core::serde::protobuf::job_status::Status; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use std::collections::{HashMap, HashSet}; use std::ops::DerefMut; @@ -45,7 +45,6 @@ use ballista_core::consistent_hash::node::Node; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; use tokio::sync::{Mutex, MutexGuard}; -use tracing::debug; #[derive(Default)] pub struct InMemoryClusterState { diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index 94f86969e..c54b0ceae 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::fmt; use std::pin::Pin; use std::sync::Arc; @@ -69,9 +68,9 @@ impl std::str::FromStr for ClusterStorage { ValueEnum::from_str(s, true) } } - -impl parse_arg::ParseArgFromStr for ClusterStorage { - fn describe_type(mut writer: W) -> fmt::Result { +#[cfg(feature = "build-binary")] +impl configure_me::parse_arg::ParseArgFromStr for ClusterStorage { + fn describe_type(mut writer: W) -> std::fmt::Result { write!(writer, "The cluster storage backend for the scheduler") } } diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index 10c6df1db..b221ecb65 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -19,12 +19,12 @@ //! Ballista scheduler specific configuration use crate::SessionBuilder; -use ballista_core::{config::TaskSchedulingPolicy, error::BallistaError, ConfigProducer}; -use clap::ValueEnum; +use ballista_core::{config::TaskSchedulingPolicy, ConfigProducer}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; -use std::{fmt, sync::Arc}; +use std::sync::Arc; +#[cfg(feature = "build-binary")] include!(concat!( env!("OUT_DIR"), "/scheduler_configure_me_config.rs" @@ -83,57 +83,6 @@ pub struct SchedulerConfig { pub override_physical_codec: Option>, } -impl std::fmt::Debug for SchedulerConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SchedulerConfig") - .field("namespace", &self.namespace) - .field("external_host", &self.external_host) - .field("bind_port", &self.bind_port) - .field("bind_host", &self.bind_host) - .field("scheduling_policy", &self.scheduling_policy) - .field("event_loop_buffer_size", &self.event_loop_buffer_size) - .field("task_distribution", &self.task_distribution) - .field( - "finished_job_data_clean_up_interval_seconds", - &self.finished_job_data_clean_up_interval_seconds, - ) - .field( - "finished_job_state_clean_up_interval_seconds", - &self.finished_job_state_clean_up_interval_seconds, - ) - .field( - "advertise_flight_sql_endpoint", - &self.advertise_flight_sql_endpoint, - ) - .field("job_resubmit_interval_ms", &self.job_resubmit_interval_ms) - .field("cluster_storage", &self.cluster_storage) - .field( - "executor_termination_grace_period", - &self.executor_termination_grace_period, - ) - .field( - "scheduler_event_expected_processing_duration", - &self.scheduler_event_expected_processing_duration, - ) - .field( - "grpc_server_max_decoding_message_size", - &self.grpc_server_max_decoding_message_size, - ) - .field( - "grpc_server_max_encoding_message_size", - &self.grpc_server_max_encoding_message_size, - ) - .field("executor_timeout_seconds", &self.executor_timeout_seconds) - .field( - "expire_dead_executor_interval_seconds", - &self.expire_dead_executor_interval_seconds, - ) - .field("override_logical_codec", &self.override_logical_codec) - .field("override_physical_codec", &self.override_physical_codec) - .finish() - } -} - impl Default for SchedulerConfig { fn default() -> Self { Self { @@ -261,7 +210,8 @@ pub enum ClusterStorageConfig { /// Policy of distributing tasks to available executor slots /// /// It needs to be visible to code generated by configure_me -#[derive(Clone, ValueEnum, Copy, Debug, serde::Deserialize)] +#[derive(Clone, Copy, Debug, serde::Deserialize)] +#[cfg_attr(feature = "build-binary", derive(clap::ValueEnum))] pub enum TaskDistribution { /// Eagerly assign tasks to executor slots. This will assign as many task slots per executor /// as are currently available @@ -276,16 +226,18 @@ pub enum TaskDistribution { ConsistentHash, } +#[cfg(feature = "build-binary")] impl std::str::FromStr for TaskDistribution { type Err = String; fn from_str(s: &str) -> std::result::Result { - ValueEnum::from_str(s, true) + clap::ValueEnum::from_str(s, true) } } -impl parse_arg::ParseArgFromStr for TaskDistribution { - fn describe_type(mut writer: W) -> fmt::Result { +#[cfg(feature = "build-binary")] +impl configure_me::parse_arg::ParseArgFromStr for TaskDistribution { + fn describe_type(mut writer: W) -> std::fmt::Result { write!(writer, "The executor slots policy for the scheduler") } } @@ -308,9 +260,9 @@ pub enum TaskDistributionPolicy { tolerance: usize, }, } - +#[cfg(feature = "build-binary")] impl TryFrom for SchedulerConfig { - type Error = BallistaError; + type Error = ballista_core::error::BallistaError; fn try_from(opt: Config) -> Result { let task_distribution = match opt.task_distribution { diff --git a/ballista/scheduler/src/scheduler_process.rs b/ballista/scheduler/src/scheduler_process.rs index 393b03b62..bf6d484f0 100644 --- a/ballista/scheduler/src/scheduler_process.rs +++ b/ballista/scheduler/src/scheduler_process.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use anyhow::{Error, Result}; #[cfg(feature = "flight-sql")] use arrow_flight::flight_service_server::FlightServiceServer; +use ballista_core::error::BallistaError; use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer; use ballista_core::serde::{ BallistaCodec, BallistaLogicalExtensionCodec, BallistaPhysicalExtensionCodec, @@ -43,7 +43,7 @@ pub async fn start_server( cluster: BallistaCluster, addr: SocketAddr, config: Arc, -) -> Result<()> { +) -> ballista_core::error::Result<()> { info!( "Ballista v{} Scheduler listening on {:?}", BALLISTA_VERSION, addr @@ -109,9 +109,9 @@ pub async fn start_server( let listener = tokio::net::TcpListener::bind(&addr) .await - .map_err(Error::from)?; + .map_err(BallistaError::from)?; axum::serve(listener, final_route) .await - .map_err(Error::from) + .map_err(BallistaError::from) } diff --git a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs index c3f3e7eb8..b9b49c7fe 100644 --- a/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs +++ b/ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -359,14 +359,9 @@ mod tests { use datafusion::test_util::scan_empty_with_partitions; use std::sync::Arc; use std::time::Duration; - use tracing_subscriber::EnvFilter; #[tokio::test] async fn test_pending_job_metric() -> Result<()> { - tracing_subscriber::fmt() - .with_env_filter(EnvFilter::from_default_env()) - .init(); - let plan = test_plan(10); let metrics_collector = Arc::new(TestMetricsCollector::default()); diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index 2e5b76b48..cc8442f2f 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -38,7 +38,7 @@ use dashmap::DashMap; use datafusion::physical_plan::ExecutionPlan; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::collections::{HashMap, HashSet}; @@ -48,8 +48,6 @@ use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; -use tracing::trace; - type ActiveJobCache = Arc>; // TODO move to configuration file diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 4701e9c3c..941ec8498 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -25,7 +25,6 @@ homepage = "https://github.com/apache/arrow-ballista" repository = "https://github.com/apache/arrow-ballista" license = "Apache-2.0" publish = false -rust-version = "1.72" [features] ci = [] diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 65d9cd946..743ff8264 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -26,7 +26,6 @@ license = "Apache-2.0" keywords = ["arrow", "distributed", "query", "sql"] edition = "2021" publish = false -rust-version = "1.72" [[example]] name = "standalone_sql" @@ -34,11 +33,10 @@ path = "examples/standalone-sql.rs" required-features = ["ballista/standalone"] [dependencies] -anyhow = { workspace = true } ballista = { path = "../ballista/client", version = "0.12.0" } ballista-core = { path = "../ballista/core", version = "0.12.0" } -ballista-executor = { path = "../ballista/executor", version = "0.12.0" } -ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" } +ballista-executor = { path = "../ballista/executor", version = "0.12.0", default-features = false } +ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0", default-features = false } datafusion = { workspace = true } env_logger = { workspace = true } log = { workspace = true } diff --git a/examples/examples/custom-executor.rs b/examples/examples/custom-executor.rs index df3f7c241..534182121 100644 --- a/examples/examples/custom-executor.rs +++ b/examples/examples/custom-executor.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use anyhow::Result; use ballista_examples::object_store::{ custom_runtime_env_with_s3_support, custom_session_config_with_s3_options, }; -use ballista_executor::config::prelude::*; + use ballista_executor::executor_process::{ start_executor_process, ExecutorProcessConfig, }; @@ -31,34 +30,23 @@ use std::sync::Arc; /// This example demonstrates how to crate custom ballista executors. /// #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> ballista_core::error::Result<()> { let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .is_test(true) .try_init(); - let (opt, _remaining_args) = - Config::including_optional_config_files(&["/etc/ballista/executor.toml"]) - .unwrap_or_exit(); - - if opt.version { - ballista_core::print_version(); - std::process::exit(0); - } - - let mut config: ExecutorProcessConfig = opt.try_into().unwrap(); - - // overriding default config producer with custom producer - // which has required S3 configuration options - config.override_config_producer = - Some(Arc::new(custom_session_config_with_s3_options)); - - // overriding default runtime producer with custom producer - // which knows how to create S3 connections - config.override_runtime_producer = - Some(Arc::new(|session_config: &SessionConfig| { + let config: ExecutorProcessConfig = ExecutorProcessConfig { + // overriding default config producer with custom producer + // which has required S3 configuration options + override_config_producer: Some(Arc::new(custom_session_config_with_s3_options)), + // overriding default runtime producer with custom producer + // which knows how to create S3 connections + override_runtime_producer: Some(Arc::new(|session_config: &SessionConfig| { custom_runtime_env_with_s3_support(session_config) - })); + })), + ..Default::default() + }; start_executor_process(Arc::new(config)).await } diff --git a/examples/examples/custom-scheduler.rs b/examples/examples/custom-scheduler.rs index 30aeb3e3f..9783ae28e 100644 --- a/examples/examples/custom-scheduler.rs +++ b/examples/examples/custom-scheduler.rs @@ -15,52 +15,46 @@ // specific language governing permissions and limitations // under the License. -use anyhow::Result; -use ballista_core::print_version; +use ballista_core::error::BallistaError; use ballista_examples::object_store::{ custom_session_config_with_s3_options, custom_session_state_with_s3_support, }; use ballista_scheduler::cluster::BallistaCluster; -use ballista_scheduler::config::{Config, ResultExt, SchedulerConfig}; +use ballista_scheduler::config::SchedulerConfig; use ballista_scheduler::scheduler_process::start_server; use datafusion::prelude::SessionConfig; +use std::net::AddrParseError; use std::sync::Arc; /// /// # Custom Ballista Scheduler /// -/// This example demonstrates how to crate custom made ballista schedulers. +/// This example demonstrates how to crate custom ballista schedulers. /// #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> ballista_core::error::Result<()> { let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .is_test(true) .try_init(); - // parse options - let (opt, _remaining_args) = - Config::including_optional_config_files(&["/etc/ballista/scheduler.toml"]) - .unwrap_or_exit(); + let config: SchedulerConfig = SchedulerConfig { + // overriding default runtime producer with custom producer + // which knows how to create S3 connections + override_config_producer: Some(Arc::new(custom_session_config_with_s3_options)), + // overriding default session builder, which has custom session configuration + // runtime environment and session state. + override_session_builder: Some(Arc::new(|session_config: SessionConfig| { + custom_session_state_with_s3_support(session_config) + })), + ..Default::default() + }; - if opt.version { - print_version(); - std::process::exit(0); - } + let addr = format!("{}:{}", config.bind_host, config.bind_port); + let addr = addr + .parse() + .map_err(|e: AddrParseError| BallistaError::Configuration(e.to_string()))?; - let addr = format!("{}:{}", opt.bind_host, opt.bind_port); - let addr = addr.parse()?; - let mut config: SchedulerConfig = opt.try_into()?; - - // overriding default runtime producer with custom producer - // which knows how to create S3 connections - config.override_config_producer = - Some(Arc::new(custom_session_config_with_s3_options)); - // overriding default session builder, which has custom session configuration - // runtime environment and session state. - config.override_session_builder = Some(Arc::new(|session_config: SessionConfig| { - custom_session_state_with_s3_support(session_config) - })); let cluster = BallistaCluster::new_from_config(&config).await?; start_server(cluster, addr, Arc::new(config)).await?; diff --git a/python/Cargo.toml b/python/Cargo.toml index 747f330a9..f70838226 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -25,7 +25,6 @@ description = "Apache Arrow Ballista Python Client" readme = "README.md" license = "Apache-2.0" edition = "2021" -rust-version = "1.72" include = ["/src", "/ballista", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] publish = false @@ -33,15 +32,15 @@ publish = false async-trait = "0.1.77" ballista = { path = "../ballista/client", version = "0.12.0" } ballista-core = { path = "../ballista/core", version = "0.12.0" } -ballista-executor = { path = "../ballista/executor", version = "0.12.0" } -ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" } +ballista-executor = { path = "../ballista/executor", version = "0.12.0", default-features = false } +ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0", default-features = false } datafusion = { version = "42", features = ["pyarrow", "avro"] } datafusion-proto = { version = "42" } datafusion-python = { version = "42" } pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] } pyo3-log = "0.11.0" -tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio = { version = "1.42", features = ["macros", "rt", "rt-multi-thread", "sync"] } [lib] crate-type = ["cdylib"] diff --git a/python/src/cluster.rs b/python/src/cluster.rs index aa4260ce2..848fc4888 100644 --- a/python/src/cluster.rs +++ b/python/src/cluster.rs @@ -128,8 +128,9 @@ impl PyScheduler { pub fn __repr__(&self) -> String { format!( - "BallistaScheduler(config={:?}, listening= {})", - self.config, + "BallistaScheduler(listening address={}:{}, listening= {})", + self.config.bind_host, + self.config.bind_port, self.handle.is_some() ) } @@ -246,18 +247,19 @@ impl PyExecutor { self.config.bind_host, self.config.port, self.config.scheduler_host, - self.config.scheduler_port + self.config.scheduler_port, ), } } pub fn __repr__(&self) -> String { format!( - "BallistaExecutor(address={}:{}, scheduler={}:{}, listening={})", + "BallistaExecutor(address={}:{}, scheduler={}:{}, concurrent_tasks={} listening={})", self.config.bind_host, self.config.port, self.config.scheduler_host, self.config.scheduler_port, + self.config.concurrent_tasks, self.handle.is_some() ) }