diff --git a/Cargo.toml b/Cargo.toml index f8e46de5e..6f34fa069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,18 +29,18 @@ members = [ resolver = "2" [workspace.dependencies] -arrow = { version = "46.0.0" } -arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] } -arrow-schema = { version = "46.0.0", default-features = false } +arrow = { version = "47.0.0" } +arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] } +arrow-schema = { version = "47.0.0", default-features = false } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -datafusion = "31.0.0" -datafusion-cli = "31.0.0" -datafusion-proto = "31.0.0" +datafusion = "32.0.0" +datafusion-cli = "32.0.0" +datafusion-proto = "32.0.0" object_store = "0.7.0" -sqlparser = "0.37.0" -tonic = { version = "0.9" } -tonic-build = { version = "0.9", default-features = false, features = [ +sqlparser = "0.38.0" +tonic = { version = "0.10" } +tonic-build = { version = "0.10", default-features = false, features = [ "transport", "prost" ] } diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs index 3f8f9ba87..677504291 100644 --- a/ballista-cli/src/main.rs +++ b/ballista-cli/src/main.rs @@ -23,6 +23,7 @@ use ballista_cli::{ exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION, }; use clap::Parser; +use datafusion_cli::print_options::MaxRows; use mimalloc::MiMalloc; #[global_allocator] @@ -133,6 +134,7 @@ pub async fn main() -> Result<()> { let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, + maxrows: MaxRows::Unlimited, }; let files = args.file; diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index a0671acf2..76c8d439f 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -357,7 +357,7 @@ impl BallistaContext { // the show tables、 show columns sql can not run at scheduler because the tables is store at client if is_show { let state = self.state.lock(); - ctx = Arc::new(SessionContext::with_config( + ctx = Arc::new(SessionContext::new_with_config( SessionConfig::new().with_information_schema( state.config.default_with_information_schema(), ), diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 40239ae48..31a5d4959 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -66,8 +66,8 @@ once_cell = "1.9.0" parking_lot = "0.12" parse_arg = "0.1.3" -prost = "0.11" -prost-types = "0.11" +prost = "0.12" +prost-types = "0.12" rand = "0.8" serde = { version = "1", features = ["derive"] } sqlparser = { workspace = true } diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 3a98a28b3..13a17f7ec 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -2025,7 +2025,9 @@ pub mod scheduler_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).poll_work(request).await }; + let fut = async move { + ::poll_work(&inner, request).await + }; Box::pin(fut) } } @@ -2070,7 +2072,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).register_executor(request).await + ::register_executor(&inner, request) + .await }; Box::pin(fut) } @@ -2116,7 +2119,11 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).heart_beat_from_executor(request).await + ::heart_beat_from_executor( + &inner, + request, + ) + .await }; Box::pin(fut) } @@ -2162,7 +2169,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).update_task_status(request).await + ::update_task_status(&inner, request) + .await }; Box::pin(fut) } @@ -2208,7 +2216,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).get_file_metadata(request).await + ::get_file_metadata(&inner, request) + .await }; Box::pin(fut) } @@ -2254,7 +2263,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).create_session(request).await + ::create_session(&inner, request).await }; Box::pin(fut) } @@ -2300,7 +2309,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).update_session(request).await + ::update_session(&inner, request).await }; Box::pin(fut) } @@ -2346,7 +2355,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).remove_session(request).await + ::remove_session(&inner, request).await }; Box::pin(fut) } @@ -2392,7 +2401,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).execute_query(request).await + ::execute_query(&inner, request).await }; Box::pin(fut) } @@ -2438,7 +2447,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).get_job_status(request).await + ::get_job_status(&inner, request).await }; Box::pin(fut) } @@ -2484,7 +2493,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).executor_stopped(request).await + ::executor_stopped(&inner, request) + .await }; Box::pin(fut) } @@ -2529,7 +2539,9 @@ pub mod scheduler_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).cancel_job(request).await }; + let fut = async move { + ::cancel_job(&inner, request).await + }; Box::pin(fut) } } @@ -2574,7 +2586,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).clean_job_data(request).await + ::clean_job_data(&inner, request).await }; Box::pin(fut) } @@ -2782,7 +2794,9 @@ pub mod executor_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).launch_task(request).await }; + let fut = async move { + ::launch_task(&inner, request).await + }; Box::pin(fut) } } @@ -2827,7 +2841,8 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).launch_multi_task(request).await + ::launch_multi_task(&inner, request) + .await }; Box::pin(fut) } @@ -2873,7 +2888,7 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).stop_executor(request).await + ::stop_executor(&inner, request).await }; Box::pin(fut) } @@ -2919,7 +2934,7 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).cancel_tasks(request).await + ::cancel_tasks(&inner, request).await }; Box::pin(fut) } @@ -2965,7 +2980,7 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).remove_job_data(request).await + ::remove_job_data(&inner, request).await }; Box::pin(fut) } diff --git a/ballista/core/src/serde/scheduler/to_proto.rs b/ballista/core/src/serde/scheduler/to_proto.rs index 6ceb1dd6e..82977e3b1 100644 --- a/ballista/core/src/serde/scheduler/to_proto.rs +++ b/ballista/core/src/serde/scheduler/to_proto.rs @@ -158,12 +158,18 @@ impl TryInto for &MetricValue { }), MetricValue::StartTimestamp(timestamp) => Ok(protobuf::OperatorMetric { metric: Some(operator_metric::Metric::StartTimestamp( - timestamp.value().map(|m| m.timestamp_nanos()).unwrap_or(0), + timestamp + .value() + .and_then(|m| m.timestamp_nanos_opt()) + .unwrap_or(0), )), }), MetricValue::EndTimestamp(timestamp) => Ok(protobuf::OperatorMetric { metric: Some(operator_metric::Metric::EndTimestamp( - timestamp.value().map(|m| m.timestamp_nanos()).unwrap_or(0), + timestamp + .value() + .and_then(|m| m.timestamp_nanos_opt()) + .unwrap_or(0), )), }), } diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 7a99ae1d6..3252ad72f 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -59,7 +59,7 @@ use tonic::transport::{Channel, Error, Server}; /// Default session builder using the provided configuration pub fn default_session_builder(config: SessionConfig) -> SessionState { - SessionState::with_config_rt( + SessionState::new_with_config_rt( config, Arc::new( RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) @@ -244,7 +244,7 @@ pub fn create_df_ctx_with_ballista_query_planner( let session_config = SessionConfig::new() .with_target_partitions(config.default_shuffle_partitions()) .with_information_schema(true); - let mut session_state = SessionState::with_config_rt( + let mut session_state = SessionState::new_with_config_rt( session_config, Arc::new( RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) @@ -254,7 +254,7 @@ pub fn create_df_ctx_with_ballista_query_planner( .with_query_planner(planner); session_state = session_state.with_session_id(session_id); // the SessionContext created here is the client side context, but the session_id is from server side. - SessionContext::with_state(session_state) + SessionContext::new_with_state(session_state) } pub struct BallistaQueryPlanner { diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 9d2279812..4672dffb4 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -220,6 +220,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( Some(Arc::new(RuntimeEnv { memory_pool: runtime.memory_pool.clone(), disk_manager: runtime.disk_manager.clone(), + cache_manager: runtime.cache_manager.clone(), object_store_registry: registry, })) } else { diff --git a/ballista/executor/src/executor_server.rs b/ballista/executor/src/executor_server.rs index 57223c546..c046558bf 100644 --- a/ballista/executor/src/executor_server.rs +++ b/ballista/executor/src/executor_server.rs @@ -509,7 +509,7 @@ impl TaskRunnerPool TaskRunnerPool { let task_status_vec = curator_task_status_map .entry(task_status.scheduler_id) - .or_insert_with(Vec::new); + .or_default(); task_status_vec.push(task_status.task_status); fetched_task_num += 1; } diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index c7ee94bc2..470195ced 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -47,7 +47,7 @@ arrow-flight = { workspace = true } async-recursion = "1.0.0" async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } -base64 = { version = "0.13", default-features = false } +base64 = { version = "0.13" } clap = { version = "3", features = ["derive", "cargo"] } configure_me = { workspace = true } dashmap = "5.4.0" @@ -67,8 +67,8 @@ once_cell = { version = "1.16.0", optional = true } parking_lot = "0.12" parse_arg = "0.1.3" prometheus = { version = "0.13", features = ["process"], optional = true } -prost = "0.11" -prost-types = { version = "0.11.0" } +prost = "0.12" +prost-types = { version = "0.12.0" } rand = "0.8" serde = { version = "1", features = ["derive"] } sled_package = { package = "sled", version = "0.34", optional = true } diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs index 942930a17..23a863e23 100644 --- a/ballista/scheduler/src/flight_sql.rs +++ b/ballista/scheduler/src/flight_sql.rs @@ -17,7 +17,7 @@ use arrow_flight::flight_descriptor::DescriptorType; use arrow_flight::flight_service_server::FlightService; -use arrow_flight::sql::server::FlightSqlService; +use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream}; use arrow_flight::sql::{ ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest, ActionBeginTransactionResult, @@ -35,6 +35,7 @@ use arrow_flight::{ Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, Location, Ticket, }; +use futures::Stream; use log::{debug, error, warn}; use std::convert::TryFrom; use std::pin::Pin; @@ -71,7 +72,6 @@ use prost::Message; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::sleep; use tokio_stream::wrappers::ReceiverStream; -use tonic::codegen::futures_core::Stream; use tonic::metadata::MetadataValue; use uuid::Uuid; @@ -846,7 +846,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_statement_update( &self, _ticket: CommandStatementUpdate, - _request: Request>, + _request: Request, ) -> Result { debug!("do_put_statement_update"); Err(Status::unimplemented("Implement do_put_statement_update")) @@ -854,7 +854,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_prepared_statement_query( &self, _query: CommandPreparedStatementQuery, - _request: Request>, + _request: Request, ) -> Result::DoPutStream>, Status> { debug!("do_put_prepared_statement_query"); Err(Status::unimplemented( @@ -864,7 +864,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_prepared_statement_update( &self, handle: CommandPreparedStatementUpdate, - request: Request>, + request: Request, ) -> Result { debug!("do_put_prepared_statement_update"); let ctx = self.get_ctx(&request)?; @@ -927,7 +927,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_substrait_plan( &self, _query: CommandStatementSubstraitPlan, - _request: Request>, + _request: Request, ) -> Result { debug!("do_put_substrait_plan"); Err(Status::unimplemented("Implement do_put_substrait_plan")) diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index ba1bbd2bb..70b2659f9 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -278,8 +278,7 @@ impl ExecutionGraph { let mut job_task_statuses: HashMap> = HashMap::new(); for task_status in task_statuses { let stage_id = task_status.stage_id as usize; - let stage_task_statuses = - job_task_statuses.entry(stage_id).or_insert_with(Vec::new); + let stage_task_statuses = job_task_statuses.entry(stage_id).or_default(); stage_task_statuses.push(task_status); } @@ -345,7 +344,7 @@ impl ExecutionGraph { )) => { let failed_attempts = failed_stage_attempts .entry(stage_id) - .or_insert_with(HashSet::new); + .or_default(); failed_attempts.insert(task_stage_attempt_num); if failed_attempts.len() < max_stage_failures { let map_stage_id = @@ -382,7 +381,7 @@ impl ExecutionGraph { let missing_inputs = resubmit_successful_stages .entry(map_stage_id) - .or_insert_with(HashSet::new); + .or_default(); missing_inputs.extend(removed_map_partitions); warn!("Need to resubmit the current running Stage {} and its map Stage {} due to FetchPartitionError from task {}", stage_id, map_stage_id, task_identity) @@ -544,7 +543,7 @@ impl ExecutionGraph { let missing_inputs = reset_running_stages .entry(map_stage_id) - .or_insert_with(HashSet::new); + .or_default(); missing_inputs.extend(removed_map_partitions); warn!("Need to reset the current running Stage {} due to late come FetchPartitionError from its parent stage {} of task {}", map_stage_id, stage_id, task_identity); diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs b/ballista/scheduler/src/state/execution_graph_dot.rs index 028c0bd2e..0cd6837f0 100644 --- a/ballista/scheduler/src/state/execution_graph_dot.rs +++ b/ballista/scheduler/src/state/execution_graph_dot.rs @@ -630,7 +630,7 @@ filter_expr="] .options_mut() .optimizer .enable_round_robin_repartition = false; - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::UInt32, false), Field::new("b", DataType::UInt32, false), @@ -657,7 +657,7 @@ filter_expr="] .options_mut() .optimizer .enable_round_robin_repartition = false; - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)])); let table = Arc::new(MemTable::try_new(schema.clone(), vec![])?); diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index eed1dba04..702d38ead 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -103,9 +103,7 @@ impl ExecutorManager { Default::default(); for task_info in tasks { - let infos = tasks_to_cancel - .entry(task_info.executor_id) - .or_insert_with(Vec::new); + let infos = tasks_to_cancel.entry(task_info.executor_id).or_default(); infos.push(protobuf::RunningTaskInfo { task_id: task_info.task_id as u32, job_id: task_info.job_id, diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index d1e0afce7..4cf9d83fe 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -81,5 +81,5 @@ pub fn create_datafusion_context( ) .set_bool("datafusion.optimizer.enable_round_robin_repartition", false); let session_state = session_builder(config); - Arc::new(SessionContext::with_state(session_state)) + Arc::new(SessionContext::new_with_state(session_state)) } diff --git a/ballista/scheduler/src/state/task_manager.rs b/ballista/scheduler/src/state/task_manager.rs index 864bf7994..02b908616 100644 --- a/ballista/scheduler/src/state/task_manager.rs +++ b/ballista/scheduler/src/state/task_manager.rs @@ -304,7 +304,7 @@ impl TaskManager for status in task_status { trace!("Task Update\n{:?}", status); let job_id = status.job_id.clone(); - let job_task_statuses = job_updates.entry(job_id).or_insert_with(Vec::new); + let job_task_statuses = job_updates.entry(job_id).or_default(); job_task_statuses.push(status); } @@ -695,7 +695,6 @@ impl TaskManager let state = self.state.clone(); tokio::spawn(async move { - let job_id = job_id; tokio::time::sleep(Duration::from_secs(clean_up_interval)).await; if let Err(err) = state.remove_job(&job_id).await { error!("Failed to delete job {job_id}: {err:?}"); diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 051b41ad4..f1253c929 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -129,7 +129,7 @@ pub fn test_cluster_context() -> BallistaCluster { pub async fn datafusion_test_context(path: &str) -> Result { let default_shuffle_partitions = 2; let config = SessionConfig::new().with_target_partitions(default_shuffle_partitions); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); for table in TPCH_TABLES { let schema = get_tpch_schema(table); let options = CsvReadOptions::new() @@ -791,7 +791,7 @@ pub async fn test_aggregation_plan_with_job_id( job_id: &str, ) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -823,7 +823,7 @@ pub async fn test_aggregation_plan_with_job_id( pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -858,7 +858,7 @@ pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph { pub async fn test_coalesce_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -889,7 +889,7 @@ pub async fn test_join_plan(partition: usize) -> ExecutionGraph { .options_mut() .optimizer .enable_round_robin_repartition = false; - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -938,7 +938,7 @@ pub async fn test_join_plan(partition: usize) -> ExecutionGraph { pub async fn test_union_all_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let logical_plan = ctx @@ -970,7 +970,7 @@ pub async fn test_union_all_plan(partition: usize) -> ExecutionGraph { pub async fn test_union_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let logical_plan = ctx diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index cd0cb994c..ce0f6dd49 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -72,7 +72,7 @@ async fn main() -> Result<()> { let config = SessionConfig::new() .with_target_partitions(opt.partitions) .with_batch_size(opt.batch_size); - let mut ctx = SessionContext::with_config(config); + let mut ctx = SessionContext::new_with_config(config); let path = opt.path.to_str().unwrap(); diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 60e6ee6d3..189719c3e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -289,7 +289,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { .file_extension(".tbl"); let config = SessionConfig::new().with_batch_size(opt.batch_size); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let session_state = ctx.state(); // build plan to read the TBL file @@ -1526,7 +1526,7 @@ mod tests { let config = SessionConfig::new() .with_target_partitions(1) .with_batch_size(10); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); for &table in TABLES { let schema = get_schema(table); @@ -1592,7 +1592,7 @@ mod ballista_round_trip { let config = SessionConfig::new() .with_target_partitions(1) .with_batch_size(10); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let session_state = ctx.state(); let codec: BallistaCodec< datafusion_proto::protobuf::LogicalPlanNode, @@ -1648,7 +1648,7 @@ mod ballista_round_trip { let config = SessionConfig::new() .with_target_partitions(1) .with_batch_size(10); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let session_state = ctx.state(); let codec: BallistaCodec< datafusion_proto::protobuf::LogicalPlanNode, diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0224cc63a..f7db10f4f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -38,12 +38,12 @@ ballista = { path = "../ballista/client", version = "0.11.0" } datafusion = { workspace = true } futures = "0.3" num_cpus = "1.13.0" -prost = "0.11" +prost = "0.12" tokio = { version = "1.0", features = [ "macros", "rt", "rt-multi-thread", "sync", - "parking_lot", + "parking_lot" ] } -tonic = "0.9" +tonic = "0.10"