diff --git a/Cargo.toml b/Cargo.toml index f8e46de5e..d51f26ee6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,18 +29,18 @@ members = [ resolver = "2" [workspace.dependencies] -arrow = { version = "46.0.0" } -arrow-flight = { version = "46.0.0", features = ["flight-sql-experimental"] } -arrow-schema = { version = "46.0.0", default-features = false } +arrow = { version = "47.0.0" } +arrow-flight = { version = "47.0.0", features = ["flight-sql-experimental"] } +arrow-schema = { version = "47.0.0", default-features = false } configure_me = { version = "0.4.0" } configure_me_codegen = { version = "0.4.4" } -datafusion = "31.0.0" -datafusion-cli = "31.0.0" -datafusion-proto = "31.0.0" +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="1cf808d0e081221cc31b5f92fc414de7c76cf5c4" } +datafusion-cli = { git = "https://github.com/apache/arrow-datafusion.git", rev="1cf808d0e081221cc31b5f92fc414de7c76cf5c4" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="1cf808d0e081221cc31b5f92fc414de7c76cf5c4" } object_store = "0.7.0" -sqlparser = "0.37.0" -tonic = { version = "0.9" } -tonic-build = { version = "0.9", default-features = false, features = [ +sqlparser = "0.38.0" +tonic = { version = "0.10" } +tonic-build = { version = "0.10", default-features = false, features = [ "transport", "prost" ] } diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs index 3f8f9ba87..66c01441d 100644 --- a/ballista-cli/src/main.rs +++ b/ballista-cli/src/main.rs @@ -23,6 +23,7 @@ use ballista_cli::{ exec, print_format::PrintFormat, print_options::PrintOptions, BALLISTA_CLI_VERSION, }; use clap::Parser; +use datafusion_cli::print_options::MaxRows; use mimalloc::MiMalloc; #[global_allocator] @@ -82,6 +83,13 @@ struct Args { #[clap(long, help = "Ballista scheduler port")] port: Option, + #[clap( + long, + help = "The max number of rows to display for 'Table' format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]", + default_value = "40" + )] + maxrows: MaxRows, + #[clap( short, long, @@ -133,6 +141,7 @@ pub async fn main() -> Result<()> { let mut print_options = PrintOptions { format: args.format, quiet: args.quiet, + maxrows: args.maxrows, }; let files = args.file; diff --git a/ballista/cache/Cargo.toml b/ballista/cache/Cargo.toml index e8ad7552f..80237ce39 100644 --- a/ballista/cache/Cargo.toml +++ b/ballista/cache/Cargo.toml @@ -23,7 +23,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.64" +async-trait = "0.1.73" futures = "0.3" hashbrown = "0.14" log = "0.4" diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index a0671acf2..76c8d439f 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -357,7 +357,7 @@ impl BallistaContext { // the show tables、 show columns sql can not run at scheduler because the tables is store at client if is_show { let state = self.state.lock(); - ctx = Arc::new(SessionContext::with_config( + ctx = Arc::new(SessionContext::new_with_config( SessionConfig::new().with_information_schema( state.config.default_with_information_schema(), ), diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml index 40239ae48..8a00c078b 100644 --- a/ballista/core/Cargo.toml +++ b/ballista/core/Cargo.toml @@ -46,7 +46,7 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.8", default-features = false } arrow-flight = { workspace = true } -async-trait = "0.1.41" +async-trait = "0.1.73" ballista-cache = { path = "../cache", version = "0.11.0" } bytes = "1.0" chrono = { version = "0.4", default-features = false } @@ -66,7 +66,7 @@ once_cell = "1.9.0" parking_lot = "0.12" parse_arg = "0.1.3" -prost = "0.11" +prost = "0.12" prost-types = "0.11" rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index 3a98a28b3..13a17f7ec 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -2025,7 +2025,9 @@ pub mod scheduler_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).poll_work(request).await }; + let fut = async move { + ::poll_work(&inner, request).await + }; Box::pin(fut) } } @@ -2070,7 +2072,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).register_executor(request).await + ::register_executor(&inner, request) + .await }; Box::pin(fut) } @@ -2116,7 +2119,11 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).heart_beat_from_executor(request).await + ::heart_beat_from_executor( + &inner, + request, + ) + .await }; Box::pin(fut) } @@ -2162,7 +2169,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).update_task_status(request).await + ::update_task_status(&inner, request) + .await }; Box::pin(fut) } @@ -2208,7 +2216,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).get_file_metadata(request).await + ::get_file_metadata(&inner, request) + .await }; Box::pin(fut) } @@ -2254,7 +2263,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).create_session(request).await + ::create_session(&inner, request).await }; Box::pin(fut) } @@ -2300,7 +2309,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).update_session(request).await + ::update_session(&inner, request).await }; Box::pin(fut) } @@ -2346,7 +2355,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).remove_session(request).await + ::remove_session(&inner, request).await }; Box::pin(fut) } @@ -2392,7 +2401,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).execute_query(request).await + ::execute_query(&inner, request).await }; Box::pin(fut) } @@ -2438,7 +2447,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).get_job_status(request).await + ::get_job_status(&inner, request).await }; Box::pin(fut) } @@ -2484,7 +2493,8 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).executor_stopped(request).await + ::executor_stopped(&inner, request) + .await }; Box::pin(fut) } @@ -2529,7 +2539,9 @@ pub mod scheduler_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).cancel_job(request).await }; + let fut = async move { + ::cancel_job(&inner, request).await + }; Box::pin(fut) } } @@ -2574,7 +2586,7 @@ pub mod scheduler_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).clean_job_data(request).await + ::clean_job_data(&inner, request).await }; Box::pin(fut) } @@ -2782,7 +2794,9 @@ pub mod executor_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { (*inner).launch_task(request).await }; + let fut = async move { + ::launch_task(&inner, request).await + }; Box::pin(fut) } } @@ -2827,7 +2841,8 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).launch_multi_task(request).await + ::launch_multi_task(&inner, request) + .await }; Box::pin(fut) } @@ -2873,7 +2888,7 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).stop_executor(request).await + ::stop_executor(&inner, request).await }; Box::pin(fut) } @@ -2919,7 +2934,7 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).cancel_tasks(request).await + ::cancel_tasks(&inner, request).await }; Box::pin(fut) } @@ -2965,7 +2980,7 @@ pub mod executor_grpc_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).remove_job_data(request).await + ::remove_job_data(&inner, request).await }; Box::pin(fut) } diff --git a/ballista/core/src/serde/scheduler/to_proto.rs b/ballista/core/src/serde/scheduler/to_proto.rs index 6ceb1dd6e..b584fdf7c 100644 --- a/ballista/core/src/serde/scheduler/to_proto.rs +++ b/ballista/core/src/serde/scheduler/to_proto.rs @@ -158,12 +158,18 @@ impl TryInto for &MetricValue { }), MetricValue::StartTimestamp(timestamp) => Ok(protobuf::OperatorMetric { metric: Some(operator_metric::Metric::StartTimestamp( - timestamp.value().map(|m| m.timestamp_nanos()).unwrap_or(0), + timestamp + .value() + .map(|m| m.timestamp_nanos_opt().unwrap_or(0)) + .unwrap_or(0), )), }), MetricValue::EndTimestamp(timestamp) => Ok(protobuf::OperatorMetric { metric: Some(operator_metric::Metric::EndTimestamp( - timestamp.value().map(|m| m.timestamp_nanos()).unwrap_or(0), + timestamp + .value() + .map(|m| m.timestamp_nanos_opt().unwrap_or(0)) + .unwrap_or(0), )), }), } diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs index 7a99ae1d6..3252ad72f 100644 --- a/ballista/core/src/utils.rs +++ b/ballista/core/src/utils.rs @@ -59,7 +59,7 @@ use tonic::transport::{Channel, Error, Server}; /// Default session builder using the provided configuration pub fn default_session_builder(config: SessionConfig) -> SessionState { - SessionState::with_config_rt( + SessionState::new_with_config_rt( config, Arc::new( RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) @@ -244,7 +244,7 @@ pub fn create_df_ctx_with_ballista_query_planner( let session_config = SessionConfig::new() .with_target_partitions(config.default_shuffle_partitions()) .with_information_schema(true); - let mut session_state = SessionState::with_config_rt( + let mut session_state = SessionState::new_with_config_rt( session_config, Arc::new( RuntimeEnv::new(with_object_store_registry(RuntimeConfig::default())) @@ -254,7 +254,7 @@ pub fn create_df_ctx_with_ballista_query_planner( .with_query_planner(planner); session_state = session_state.with_session_id(session_id); // the SessionContext created here is the client side context, but the session_id is from server side. - SessionContext::with_state(session_state) + SessionContext::new_with_state(session_state) } pub struct BallistaQueryPlanner { diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index a645b433c..46aea5fa6 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -40,7 +40,7 @@ default = ["mimalloc"] anyhow = "1" arrow = { workspace = true } arrow-flight = { workspace = true } -async-trait = "0.1.41" +async-trait = "0.1.73" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } chrono = { version = "0.4", default-features = false } configure_me = { workspace = true } diff --git a/ballista/executor/src/executor_process.rs b/ballista/executor/src/executor_process.rs index 9d2279812..0dd943ebb 100644 --- a/ballista/executor/src/executor_process.rs +++ b/ballista/executor/src/executor_process.rs @@ -25,6 +25,7 @@ use std::{env, io}; use anyhow::{Context, Result}; use arrow_flight::flight_service_server::FlightServiceServer; +use datafusion::execution::cache::cache_manager::CacheManager; use futures::stream::FuturesUnordered; use futures::StreamExt; use log::{error, info, warn}; @@ -199,6 +200,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( let cache_dir = opt.cache_dir.clone(); let cache_capacity = opt.cache_capacity; let cache_io_concurrency = opt.cache_io_concurrency; + let cache_manager = CacheManager::default(); let cache_layer = opt.data_cache_policy .map(|data_cache_policy| match data_cache_policy { @@ -221,6 +223,7 @@ pub async fn start_executor_process(opt: Arc) -> Result<( memory_pool: runtime.memory_pool.clone(), disk_manager: runtime.disk_manager.clone(), object_store_registry: registry, + cache_manager: Arc::new(cache_manager), })) } else { None diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml index c7ee94bc2..b166c65ce 100644 --- a/ballista/scheduler/Cargo.toml +++ b/ballista/scheduler/Cargo.toml @@ -45,9 +45,9 @@ sled = ["sled_package", "tokio-stream"] anyhow = "1" arrow-flight = { workspace = true } async-recursion = "1.0.0" -async-trait = "0.1.41" +async-trait = "0.1.73" ballista-core = { path = "../core", version = "0.11.0", features = ["s3"] } -base64 = { version = "0.13", default-features = false } +base64 = { version = "0.21", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } configure_me = { workspace = true } dashmap = "5.4.0" @@ -67,7 +67,7 @@ once_cell = { version = "1.16.0", optional = true } parking_lot = "0.12" parse_arg = "0.1.3" prometheus = { version = "0.13", features = ["process"], optional = true } -prost = "0.11" +prost = "0.12" prost-types = { version = "0.11.0" } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/ballista/scheduler/src/flight_sql.rs b/ballista/scheduler/src/flight_sql.rs index 942930a17..3f02e6bc0 100644 --- a/ballista/scheduler/src/flight_sql.rs +++ b/ballista/scheduler/src/flight_sql.rs @@ -17,7 +17,7 @@ use arrow_flight::flight_descriptor::DescriptorType; use arrow_flight::flight_service_server::FlightService; -use arrow_flight::sql::server::FlightSqlService; +use arrow_flight::sql::server::{FlightSqlService, PeekableFlightDataStream}; use arrow_flight::sql::{ ActionBeginSavepointRequest, ActionBeginSavepointResult, ActionBeginTransactionRequest, ActionBeginTransactionResult, @@ -35,6 +35,8 @@ use arrow_flight::{ Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, Location, Ticket, }; +use base64::Engine; +use futures::Stream; use log::{debug, error, warn}; use std::convert::TryFrom; use std::pin::Pin; @@ -71,7 +73,6 @@ use prost::Message; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::time::sleep; use tokio_stream::wrappers::ReceiverStream; -use tonic::codegen::futures_core::Stream; use tonic::metadata::MetadataValue; use uuid::Uuid; @@ -502,9 +503,9 @@ impl FlightSqlService for FlightSqlServiceImpl { Err(Status::invalid_argument(format!( "Auth type not implemented: {authorization}" )))?; - } - let base64 = &authorization[basic.len()..]; - let bytes = base64::decode(base64) + }; + let bytes = base64::engine::general_purpose::STANDARD + .decode(&authorization[basic.len()..]) .map_err(|_| Status::invalid_argument("authorization not parsable"))?; let str = String::from_utf8(bytes) .map_err(|_| Status::invalid_argument("authorization not parsable"))?; @@ -846,7 +847,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_statement_update( &self, _ticket: CommandStatementUpdate, - _request: Request>, + _request: tonic::Request, ) -> Result { debug!("do_put_statement_update"); Err(Status::unimplemented("Implement do_put_statement_update")) @@ -854,7 +855,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_prepared_statement_query( &self, _query: CommandPreparedStatementQuery, - _request: Request>, + _request: tonic::Request, ) -> Result::DoPutStream>, Status> { debug!("do_put_prepared_statement_query"); Err(Status::unimplemented( @@ -864,7 +865,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_prepared_statement_update( &self, handle: CommandPreparedStatementUpdate, - request: Request>, + request: tonic::Request, ) -> Result { debug!("do_put_prepared_statement_update"); let ctx = self.get_ctx(&request)?; @@ -927,7 +928,7 @@ impl FlightSqlService for FlightSqlServiceImpl { async fn do_put_substrait_plan( &self, _query: CommandStatementSubstraitPlan, - _request: Request>, + _request: tonic::Request, ) -> Result { debug!("do_put_substrait_plan"); Err(Status::unimplemented("Implement do_put_substrait_plan")) diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs b/ballista/scheduler/src/state/execution_graph_dot.rs index 028c0bd2e..0cd6837f0 100644 --- a/ballista/scheduler/src/state/execution_graph_dot.rs +++ b/ballista/scheduler/src/state/execution_graph_dot.rs @@ -630,7 +630,7 @@ filter_expr="] .options_mut() .optimizer .enable_round_robin_repartition = false; - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::UInt32, false), Field::new("b", DataType::UInt32, false), @@ -657,7 +657,7 @@ filter_expr="] .options_mut() .optimizer .enable_round_robin_repartition = false; - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::UInt32, false)])); let table = Arc::new(MemTable::try_new(schema.clone(), vec![])?); diff --git a/ballista/scheduler/src/state/session_manager.rs b/ballista/scheduler/src/state/session_manager.rs index d1e0afce7..4cf9d83fe 100644 --- a/ballista/scheduler/src/state/session_manager.rs +++ b/ballista/scheduler/src/state/session_manager.rs @@ -81,5 +81,5 @@ pub fn create_datafusion_context( ) .set_bool("datafusion.optimizer.enable_round_robin_repartition", false); let session_state = session_builder(config); - Arc::new(SessionContext::with_state(session_state)) + Arc::new(SessionContext::new_with_state(session_state)) } diff --git a/ballista/scheduler/src/test_utils.rs b/ballista/scheduler/src/test_utils.rs index 051b41ad4..f1253c929 100644 --- a/ballista/scheduler/src/test_utils.rs +++ b/ballista/scheduler/src/test_utils.rs @@ -129,7 +129,7 @@ pub fn test_cluster_context() -> BallistaCluster { pub async fn datafusion_test_context(path: &str) -> Result { let default_shuffle_partitions = 2; let config = SessionConfig::new().with_target_partitions(default_shuffle_partitions); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); for table in TPCH_TABLES { let schema = get_tpch_schema(table); let options = CsvReadOptions::new() @@ -791,7 +791,7 @@ pub async fn test_aggregation_plan_with_job_id( job_id: &str, ) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -823,7 +823,7 @@ pub async fn test_aggregation_plan_with_job_id( pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -858,7 +858,7 @@ pub async fn test_two_aggregations_plan(partition: usize) -> ExecutionGraph { pub async fn test_coalesce_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -889,7 +889,7 @@ pub async fn test_join_plan(partition: usize) -> ExecutionGraph { .options_mut() .optimizer .enable_round_robin_repartition = false; - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let schema = Schema::new(vec![ @@ -938,7 +938,7 @@ pub async fn test_join_plan(partition: usize) -> ExecutionGraph { pub async fn test_union_all_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let logical_plan = ctx @@ -970,7 +970,7 @@ pub async fn test_union_all_plan(partition: usize) -> ExecutionGraph { pub async fn test_union_plan(partition: usize) -> ExecutionGraph { let config = SessionConfig::new().with_target_partitions(partition); - let ctx = Arc::new(SessionContext::with_config(config)); + let ctx = Arc::new(SessionContext::new_with_config(config)); let session_state = ctx.state(); let logical_plan = ctx diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index cd0cb994c..ce0f6dd49 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -72,7 +72,7 @@ async fn main() -> Result<()> { let config = SessionConfig::new() .with_target_partitions(opt.partitions) .with_batch_size(opt.batch_size); - let mut ctx = SessionContext::with_config(config); + let mut ctx = SessionContext::new_with_config(config); let path = opt.path.to_str().unwrap(); diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 60e6ee6d3..189719c3e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -289,7 +289,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { .file_extension(".tbl"); let config = SessionConfig::new().with_batch_size(opt.batch_size); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let session_state = ctx.state(); // build plan to read the TBL file @@ -1526,7 +1526,7 @@ mod tests { let config = SessionConfig::new() .with_target_partitions(1) .with_batch_size(10); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); for &table in TABLES { let schema = get_schema(table); @@ -1592,7 +1592,7 @@ mod ballista_round_trip { let config = SessionConfig::new() .with_target_partitions(1) .with_batch_size(10); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let session_state = ctx.state(); let codec: BallistaCodec< datafusion_proto::protobuf::LogicalPlanNode, @@ -1648,7 +1648,7 @@ mod ballista_round_trip { let config = SessionConfig::new() .with_target_partitions(1) .with_batch_size(10); - let ctx = SessionContext::with_config(config); + let ctx = SessionContext::new_with_config(config); let session_state = ctx.state(); let codec: BallistaCodec< datafusion_proto::protobuf::LogicalPlanNode, diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0224cc63a..01e8a3f8f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -38,7 +38,7 @@ ballista = { path = "../ballista/client", version = "0.11.0" } datafusion = { workspace = true } futures = "0.3" num_cpus = "1.13.0" -prost = "0.11" +prost = "0.12" tokio = { version = "1.0", features = [ "macros", "rt",