diff --git a/ballista/core/proto/ballista.proto b/ballista/core/proto/ballista.proto index a40e6f2d2..cb3c148b4 100644 --- a/ballista/core/proto/ballista.proto +++ b/ballista/core/proto/ballista.proto @@ -172,7 +172,7 @@ message TaskInputPartitions { message KeyValuePair { string key = 1; - string value = 2; + optional string value = 2; } message Action { @@ -458,10 +458,6 @@ message MultiTaskDefinition { repeated KeyValuePair props = 9; } -message SessionSettings { - repeated KeyValuePair configs = 1; -} - message JobSessionConfig { string session_id = 1; repeated KeyValuePair configs = 2; @@ -526,9 +522,8 @@ message ExecuteQueryParams { bytes logical_plan = 1; string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql` } - oneof optional_session_id { - string session_id = 3; - } + + optional string session_id = 3; repeated KeyValuePair settings = 4; } diff --git a/ballista/core/src/execution_plans/distributed_query.rs b/ballista/core/src/execution_plans/distributed_query.rs index dae4bb8ee..785d3b0cb 100644 --- a/ballista/core/src/execution_plans/distributed_query.rs +++ b/ballista/core/src/execution_plans/distributed_query.rs @@ -17,7 +17,6 @@ use crate::client::BallistaClient; use crate::config::BallistaConfig; -use crate::serde::protobuf::execute_query_params::OptionalSessionId; use crate::serde::protobuf::{ execute_query_params::Query, execute_query_result, job_status, scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, GetJobStatusParams, @@ -218,7 +217,7 @@ impl ExecutionPlan for DistributedQueryExec { .map( |datafusion::config::ConfigEntry { key, value, .. }| KeyValuePair { key: key.to_owned(), - value: value.clone().unwrap_or_else(|| String::from("")), + value: value.clone(), }, ) .collect(); @@ -226,9 +225,7 @@ impl ExecutionPlan for DistributedQueryExec { let query = ExecuteQueryParams { query: Some(Query::LogicalPlan(buf)), settings, - optional_session_id: Some(OptionalSessionId::SessionId( - self.session_id.clone(), - )), + session_id: Some(self.session_id.clone()), }; let stream = futures::stream::once( diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 13892c1a0..bb43e93bd 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -287,60 +287,49 @@ impl SessionConfigHelperExt for SessionConfig { self.options() .entries() .iter() - .filter(|v| v.value.is_some()) - .map( - // TODO MM make `value` optional value - |datafusion::config::ConfigEntry { key, value, .. }| { - log::trace!( - "sending configuration key: `{}`, value`{:?}`", - key, - value - ); - KeyValuePair { - key: key.to_owned(), - value: value.clone().unwrap(), - } - }, - ) + .map(|datafusion::config::ConfigEntry { key, value, .. }| { + log::trace!("sending configuration key: `{}`, value`{:?}`", key, value); + KeyValuePair { + key: key.to_owned(), + value: value.clone(), + } + }) .collect() } fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> Self { let mut s = self; - for KeyValuePair { key, value } in key_value_pairs { - log::trace!( - "setting up configuration key: `{}`, value: `{}`", - key, - value - ); - if let Err(e) = s.options_mut().set(key, value) { - // there is not much we can do about this error at the moment - log::debug!( - "could not set configuration key: `{}`, value: `{}`, reason: {}", - key, - value, - e.to_string() - ) - } - } + s.update_from_key_value_pair_mut(key_value_pairs); s } fn update_from_key_value_pair_mut(&mut self, key_value_pairs: &[KeyValuePair]) { for KeyValuePair { key, value } in key_value_pairs { - log::trace!( - "setting up configuration key : `{}`, value: `{}`", - key, - value - ); - if let Err(e) = self.options_mut().set(key, value) { - // there is not much we can do about this error at the moment - log::debug!( - "could not set configuration key: `{}`, value: `{}`, reason: {}", - key, - value, - e.to_string() - ) + match value { + Some(value) => { + log::trace!( + "setting up configuration key: `{}`, value: `{:?}`", + key, + value + ); + if let Err(e) = self.options_mut().set(key, value) { + // there is not much we can do about this error at the moment. + // it used to be warning but it gets very verbose + // as even datafusion properties can't be parsed + log::debug!( + "could not set configuration key: `{}`, value: `{:?}`, reason: {}", + key, + value, + e.to_string() + ) + } + } + None => { + log::trace!( + "can't set up configuration key: `{}`, as value is None", + key, + ) + } } } } diff --git a/ballista/core/src/serde/generated/ballista.rs b/ballista/core/src/serde/generated/ballista.rs index d61ef331e..d4faef825 100644 --- a/ballista/core/src/serde/generated/ballista.rs +++ b/ballista/core/src/serde/generated/ballista.rs @@ -249,8 +249,8 @@ pub struct TaskInputPartitions { pub struct KeyValuePair { #[prost(string, tag = "1")] pub key: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub value: ::prost::alloc::string::String, + #[prost(string, optional, tag = "2")] + pub value: ::core::option::Option<::prost::alloc::string::String>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct Action { @@ -708,11 +708,6 @@ pub struct MultiTaskDefinition { pub props: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] -pub struct SessionSettings { - #[prost(message, repeated, tag = "1")] - pub configs: ::prost::alloc::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] pub struct JobSessionConfig { #[prost(string, tag = "1")] pub session_id: ::prost::alloc::string::String, @@ -789,14 +784,12 @@ pub struct UpdateTaskStatusResult { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ExecuteQueryParams { + #[prost(string, optional, tag = "3")] + pub session_id: ::core::option::Option<::prost::alloc::string::String>, #[prost(message, repeated, tag = "4")] pub settings: ::prost::alloc::vec::Vec, #[prost(oneof = "execute_query_params::Query", tags = "1, 2")] pub query: ::core::option::Option, - #[prost(oneof = "execute_query_params::OptionalSessionId", tags = "3")] - pub optional_session_id: ::core::option::Option< - execute_query_params::OptionalSessionId, - >, } /// Nested message and enum types in `ExecuteQueryParams`. pub mod execute_query_params { @@ -808,11 +801,6 @@ pub mod execute_query_params { #[prost(string, tag = "2")] Sql(::prost::alloc::string::String), } - #[derive(Clone, PartialEq, ::prost::Oneof)] - pub enum OptionalSessionId { - #[prost(string, tag = "3")] - SessionId(::prost::alloc::string::String), - } } #[derive(Clone, PartialEq, ::prost::Message)] pub struct CreateSessionParams { diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs b/ballista/scheduler/src/scheduler_server/grpc.rs index 52cdc9857..02c21a884 100644 --- a/ballista/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/scheduler/src/scheduler_server/grpc.rs @@ -18,7 +18,7 @@ use axum::extract::ConnectInfo; use ballista_core::config::BALLISTA_JOB_NAME; use ballista_core::extension::SessionConfigHelperExt; -use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query}; +use ballista_core::serde::protobuf::execute_query_params::Query; use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc; use ballista_core::serde::protobuf::{ execute_query_failure_result, execute_query_result, AvailableTaskSlots, @@ -337,25 +337,28 @@ impl SchedulerGrpc let query_params = request.into_inner(); if let ExecuteQueryParams { query: Some(query), - optional_session_id, + session_id, settings, } = query_params { let job_name = settings .iter() .find(|s| s.key == BALLISTA_JOB_NAME) - .map(|s| s.value.clone()) - .unwrap_or_else(|| "None".to_string()); + .and_then(|s| s.value.clone()) + .unwrap_or_default(); - let (session_id, session_ctx) = match optional_session_id { - Some(OptionalSessionId::SessionId(session_id)) => { + let (session_id, session_ctx) = match session_id { + Some(session_id) => { match self.state.session_manager.get_session(&session_id).await { Ok(ctx) => { - // [SessionConfig] will be updated from received properties + // Update [SessionConfig] using received properties // TODO MM can we do something better here? // move this to update session and use .update_session(&session_params.session_id, &session_config) - // instead of get_session + // instead of get_session. + // + // also we should consider sending properties if/when changed rather than + // all properties every time let state = ctx.state_ref(); let mut state = state.write();