Skip to content

Commit

Permalink
make session optional
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Dec 3, 2024
1 parent c28ccaf commit f1d4625
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 80 deletions.
11 changes: 3 additions & 8 deletions ballista/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ message TaskInputPartitions {

message KeyValuePair {
string key = 1;
string value = 2;
optional string value = 2;
}

message Action {
Expand Down Expand Up @@ -458,10 +458,6 @@ message MultiTaskDefinition {
repeated KeyValuePair props = 9;
}

message SessionSettings {
repeated KeyValuePair configs = 1;
}

message JobSessionConfig {
string session_id = 1;
repeated KeyValuePair configs = 2;
Expand Down Expand Up @@ -526,9 +522,8 @@ message ExecuteQueryParams {
bytes logical_plan = 1;
string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql`
}
oneof optional_session_id {
string session_id = 3;
}

optional string session_id = 3;
repeated KeyValuePair settings = 4;
}

Expand Down
7 changes: 2 additions & 5 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use crate::client::BallistaClient;
use crate::config::BallistaConfig;
use crate::serde::protobuf::execute_query_params::OptionalSessionId;
use crate::serde::protobuf::{
execute_query_params::Query, execute_query_result, job_status,
scheduler_grpc_client::SchedulerGrpcClient, ExecuteQueryParams, GetJobStatusParams,
Expand Down Expand Up @@ -218,17 +217,15 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
.map(
|datafusion::config::ConfigEntry { key, value, .. }| KeyValuePair {
key: key.to_owned(),
value: value.clone().unwrap_or_else(|| String::from("")),
value: value.clone(),
},
)
.collect();

let query = ExecuteQueryParams {
query: Some(Query::LogicalPlan(buf)),
settings,
optional_session_id: Some(OptionalSessionId::SessionId(
self.session_id.clone(),
)),
session_id: Some(self.session_id.clone()),
};

let stream = futures::stream::once(
Expand Down
77 changes: 33 additions & 44 deletions ballista/core/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,60 +287,49 @@ impl SessionConfigHelperExt for SessionConfig {
self.options()
.entries()
.iter()
.filter(|v| v.value.is_some())
.map(
// TODO MM make `value` optional value
|datafusion::config::ConfigEntry { key, value, .. }| {
log::trace!(
"sending configuration key: `{}`, value`{:?}`",
key,
value
);
KeyValuePair {
key: key.to_owned(),
value: value.clone().unwrap(),
}
},
)
.map(|datafusion::config::ConfigEntry { key, value, .. }| {
log::trace!("sending configuration key: `{}`, value`{:?}`", key, value);
KeyValuePair {
key: key.to_owned(),
value: value.clone(),
}
})
.collect()
}

fn update_from_key_value_pair(self, key_value_pairs: &[KeyValuePair]) -> Self {
let mut s = self;
for KeyValuePair { key, value } in key_value_pairs {
log::trace!(
"setting up configuration key: `{}`, value: `{}`",
key,
value
);
if let Err(e) = s.options_mut().set(key, value) {
// there is not much we can do about this error at the moment
log::debug!(
"could not set configuration key: `{}`, value: `{}`, reason: {}",
key,
value,
e.to_string()
)
}
}
s.update_from_key_value_pair_mut(key_value_pairs);
s
}

fn update_from_key_value_pair_mut(&mut self, key_value_pairs: &[KeyValuePair]) {
for KeyValuePair { key, value } in key_value_pairs {
log::trace!(
"setting up configuration key : `{}`, value: `{}`",
key,
value
);
if let Err(e) = self.options_mut().set(key, value) {
// there is not much we can do about this error at the moment
log::debug!(
"could not set configuration key: `{}`, value: `{}`, reason: {}",
key,
value,
e.to_string()
)
match value {
Some(value) => {
log::trace!(
"setting up configuration key: `{}`, value: `{:?}`",
key,
value
);
if let Err(e) = self.options_mut().set(key, value) {
// there is not much we can do about this error at the moment.
// it used to be warning but it gets very verbose
// as even datafusion properties can't be parsed
log::debug!(
"could not set configuration key: `{}`, value: `{:?}`, reason: {}",
key,
value,
e.to_string()
)
}
}
None => {
log::trace!(
"can't set up configuration key: `{}`, as value is None",
key,
)
}
}
}
}
Expand Down
20 changes: 4 additions & 16 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ pub struct TaskInputPartitions {
pub struct KeyValuePair {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub value: ::prost::alloc::string::String,
#[prost(string, optional, tag = "2")]
pub value: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Action {
Expand Down Expand Up @@ -708,11 +708,6 @@ pub struct MultiTaskDefinition {
pub props: ::prost::alloc::vec::Vec<KeyValuePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SessionSettings {
#[prost(message, repeated, tag = "1")]
pub configs: ::prost::alloc::vec::Vec<KeyValuePair>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JobSessionConfig {
#[prost(string, tag = "1")]
pub session_id: ::prost::alloc::string::String,
Expand Down Expand Up @@ -789,14 +784,12 @@ pub struct UpdateTaskStatusResult {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecuteQueryParams {
#[prost(string, optional, tag = "3")]
pub session_id: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, repeated, tag = "4")]
pub settings: ::prost::alloc::vec::Vec<KeyValuePair>,
#[prost(oneof = "execute_query_params::Query", tags = "1, 2")]
pub query: ::core::option::Option<execute_query_params::Query>,
#[prost(oneof = "execute_query_params::OptionalSessionId", tags = "3")]
pub optional_session_id: ::core::option::Option<
execute_query_params::OptionalSessionId,
>,
}
/// Nested message and enum types in `ExecuteQueryParams`.
pub mod execute_query_params {
Expand All @@ -808,11 +801,6 @@ pub mod execute_query_params {
#[prost(string, tag = "2")]
Sql(::prost::alloc::string::String),
}
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum OptionalSessionId {
#[prost(string, tag = "3")]
SessionId(::prost::alloc::string::String),
}
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateSessionParams {
Expand Down
17 changes: 10 additions & 7 deletions ballista/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use axum::extract::ConnectInfo;
use ballista_core::config::BALLISTA_JOB_NAME;
use ballista_core::extension::SessionConfigHelperExt;
use ballista_core::serde::protobuf::execute_query_params::{OptionalSessionId, Query};
use ballista_core::serde::protobuf::execute_query_params::Query;
use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
execute_query_failure_result, execute_query_result, AvailableTaskSlots,
Expand Down Expand Up @@ -337,25 +337,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
let query_params = request.into_inner();
if let ExecuteQueryParams {
query: Some(query),
optional_session_id,
session_id,
settings,
} = query_params
{
let job_name = settings
.iter()
.find(|s| s.key == BALLISTA_JOB_NAME)
.map(|s| s.value.clone())
.and_then(|s| s.value.clone())
.unwrap_or_else(|| "None".to_string());

let (session_id, session_ctx) = match optional_session_id {
Some(OptionalSessionId::SessionId(session_id)) => {
let (session_id, session_ctx) = match session_id {
Some(session_id) => {
match self.state.session_manager.get_session(&session_id).await {
Ok(ctx) => {
// [SessionConfig] will be updated from received properties
// Update [SessionConfig] using received properties

// TODO MM can we do something better here?
// move this to update session and use .update_session(&session_params.session_id, &session_config)
// instead of get_session
// instead of get_session.
//
// also we should consider sending properties if/when changed rather than
// all properties every time

let state = ctx.state_ref();
let mut state = state.write();
Expand Down

0 comments on commit f1d4625

Please sign in to comment.