Skip to content

Commit

Permalink
Support joining updating data
Browse files Browse the repository at this point in the history
Change the JoinWithExpiration stream node to operate on updating data.
  • Loading branch information
jbeisen committed Nov 22, 2023
1 parent e4eed97 commit 248d94d
Show file tree
Hide file tree
Showing 8 changed files with 1,011 additions and 330 deletions.
80 changes: 71 additions & 9 deletions arroyo-datastream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1789,27 +1789,89 @@ impl Program {

let in_k = parse_type(&inputs[0].weight().key);
let in_t1 = parse_type(&inputs[0].weight().value);

let (t1_updating, in_t1_inner) = match extract_container_type("UpdatingData", &in_t1) {
Some(t) => (true, t),
None => (false, in_t1.clone()),
};

let in_t2 = parse_type(&inputs[1].weight().value);

let (t2_updating, in_t2_inner) = match extract_container_type("UpdatingData", &in_t2) {
Some(t) => (true, t),
None => (false, in_t2.clone()),
};

let left_expiration = duration_to_syn_expr(*left_expiration);
let right_expiration = duration_to_syn_expr(*right_expiration);
match join_type {
arroyo_types::JoinType::Inner => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::

match (join_type, t1_updating, t2_updating) {
(JoinType::Inner, false, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
inner_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Left => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
(JoinType::Inner, true, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
inner_join_left_updating::<#in_k, #in_t1_inner, #in_t2>(#left_expiration, #right_expiration))
},
(JoinType::Inner, false, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
inner_join_right_updating::<#in_k, #in_t1, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Inner, true, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
inner_join_both_updating::<#in_k, #in_t1_inner, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Left, false, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
left_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Right => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
(JoinType::Left, true, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
left_join_left_updating::<#in_k, #in_t1_inner, #in_t2>(#left_expiration, #right_expiration))
},
(JoinType::Left, false, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
left_join_right_updating::<#in_k, #in_t1, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Left, true, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
left_join_both_updating::<#in_k, #in_t1_inner, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Right, false, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
right_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
arroyo_types::JoinType::Full => quote!{
Box::new(arroyo_worker::operators::join_with_expiration::
(JoinType::Right, true, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
right_join_left_updating::<#in_k, #in_t1_inner, #in_t2>(#left_expiration, #right_expiration))
},
(JoinType::Right, false, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
right_join_right_updating::<#in_k, #in_t1, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Right, true, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
right_join_both_updating::<#in_k, #in_t1_inner, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Full, false, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
full_join::<#in_k, #in_t1, #in_t2>(#left_expiration, #right_expiration))
},
(JoinType::Full, true, false) => quote! {
Box::new(arroyo_worker::operators::joiners::
full_join_left_updating::<#in_k, #in_t1_inner, #in_t2>(#left_expiration, #right_expiration))
},
(JoinType::Full, false, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
full_join_right_updating::<#in_k, #in_t1, #in_t2_inner>(#left_expiration, #right_expiration))
},
(JoinType::Full, true, true) => quote! {
Box::new(arroyo_worker::operators::joiners::
full_join_both_updating::<#in_k, #in_t1_inner, #in_t2_inner>(#left_expiration, #right_expiration))
},
}

},
Operator::UpdatingOperator { name, expression } => {
let expr : syn::Expr = parse_str(expression).expect(expression);
Expand Down
5 changes: 3 additions & 2 deletions arroyo-sql/src/code_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ impl JoinPairContext {
let merge_expr = code_generator.generate(self);
let left_ident = self.left_ident();
let right_ident = self.right_ident();

parse_quote!({
let #left_ident = &record.value.0;
let #right_ident = &record.value.1;
let (#left_ident, #right_ident) = &record.value.unwrap_append().clone();

arroyo_types::Record {
timestamp: record.timestamp.clone(),
key: None,
Expand Down
9 changes: 6 additions & 3 deletions arroyo-sql/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ pub struct JoinOperator {
pub join_type: JoinType,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InputsUpdating {
pub left: bool,
pub right: bool,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JoinType {
/// Inner Join
Expand Down Expand Up @@ -805,9 +811,6 @@ impl<'a> SqlPipelineBuilder<'a> {
fn insert_join(&mut self, join: &datafusion_expr::logical_plan::Join) -> Result<SqlOperator> {
let left_input = self.insert_sql_plan(&join.left)?;
let right_input = self.insert_sql_plan(&join.right)?;
if left_input.is_updating() || right_input.is_updating() {
bail!("don't support joins with updating inputs");
}
match join.join_constraint {
JoinConstraint::On => {}
JoinConstraint::Using => bail!("don't support 'using' in joins"),
Expand Down
93 changes: 57 additions & 36 deletions arroyo-sql/src/plan_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use quote::{quote, ToTokens};
use syn::{parse_quote, parse_str, Type};

use crate::expressions::AggregateComputation;
use crate::pipeline::InputsUpdating;
use crate::{
code_gen::{
BinAggregatingContext, CodeGenerator, CombiningContext, JoinListsContext, JoinPairContext,
Expand Down Expand Up @@ -66,7 +67,7 @@ pub enum PlanOperator {
join_type: JoinType,
},
JoinListMerge(JoinType, StructPair),
JoinPairMerge(JoinType, StructPair),
JoinPairMerge(JoinType, StructPair, InputsUpdating),
Flatten,
// TODO: figure out naming of various things called 'window'
WindowFunction(WindowFunctionOperator),
Expand Down Expand Up @@ -327,7 +328,7 @@ impl PlanNode {
PlanOperator::InstantJoin => "instant_join".to_string(),
PlanOperator::JoinWithExpiration { .. } => "join_with_expiration".to_string(),
PlanOperator::JoinListMerge(_, _) => "join_list_merge".to_string(),
PlanOperator::JoinPairMerge(_, _) => "join_pair_merge".to_string(),
PlanOperator::JoinPairMerge(_, _, _) => "join_pair_merge".to_string(),
PlanOperator::Flatten => "flatten".to_string(),
PlanOperator::WindowFunction { .. } => "window_function".to_string(),
PlanOperator::StreamOperator(name, _) => name.to_string(),
Expand Down Expand Up @@ -455,23 +456,20 @@ impl PlanNode {
let record_expression = context.compile_list_merge_record_expression(join_type);
MethodCompiler::record_expression_operator("join_list_merge", record_expression)
}
PlanOperator::JoinPairMerge(join_type, struct_pair) => {
PlanOperator::JoinPairMerge(join_type, struct_pair, inputs_updating) => {
let context =
JoinPairContext::new(struct_pair.left.clone(), struct_pair.right.clone());
match join_type {
JoinType::Inner => {
let record_expression =
context.compile_pair_merge_record_expression(join_type);
MethodCompiler::record_expression_operator("join_merge", record_expression)
}
JoinType::Left | JoinType::Right | JoinType::Full => {
let value_expression =
context.compile_updating_pair_merge_value_expression(join_type);
MethodCompiler::value_updating_operator(
"updating_join_merge",
value_expression,
)
}

if !matches!(join_type, JoinType::Inner)
|| inputs_updating.left
|| inputs_updating.right
{
let value_expression =
context.compile_updating_pair_merge_value_expression(join_type);
MethodCompiler::value_updating_operator("updating_join_merge", value_expression)
} else {
let record_expression = context.compile_pair_merge_record_expression(join_type);
MethodCompiler::record_expression_operator("join_merge", record_expression)
}
}

Expand Down Expand Up @@ -848,7 +846,7 @@ impl PlanNode {
output_types.extend(self.output_type.get_all_types());
// TODO: populate types only created within operators.
match &self.operator {
PlanOperator::JoinPairMerge(join_type, StructPair { left, right })
PlanOperator::JoinPairMerge(join_type, StructPair { left, right }, ..)
| PlanOperator::JoinListMerge(join_type, StructPair { left, right }) => {
output_types.insert(join_type.join_struct_type(left, right));
}
Expand Down Expand Up @@ -932,7 +930,9 @@ impl PlanType {
let left_type = left_value.get_type();
let right_type = right_value.get_type();
match join_type {
JoinType::Inner => parse_quote!((#left_type,#right_type)),
JoinType::Inner => {
parse_quote!(arroyo_types::UpdatingData<(#left_type,#right_type)>)
}
JoinType::Left => {
parse_quote!(arroyo_types::UpdatingData<(#left_type,Option<#right_type>)>)
}
Expand Down Expand Up @@ -1135,6 +1135,20 @@ impl PlanType {
pub(crate) fn is_updating(&self) -> bool {
matches!(self, PlanType::Updating(_))
}

fn for_defs(key_struct: StructDef, value_struct: StructDef, updating: bool) -> PlanType {
if updating {
PlanType::Updating(Box::new(PlanType::Keyed {
key: key_struct.clone(),
value: value_struct.clone(),
}))
} else {
PlanType::Keyed {
key: key_struct.clone(),
value: value_struct.clone(),
}
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1225,7 +1239,7 @@ impl PlanGraph {
PlanOperator::InstantJoin => {}
PlanOperator::JoinWithExpiration { .. } => {}
PlanOperator::JoinListMerge(_, _) => {}
PlanOperator::JoinPairMerge(_, _) => {}
PlanOperator::JoinPairMerge(_, _, _) => {}
PlanOperator::Flatten => {}
PlanOperator::WindowFunction(w) => {
w.order_by
Expand Down Expand Up @@ -1483,29 +1497,31 @@ impl PlanGraph {
// right now left and right either both have or don't have windows.
let has_window = left.has_window();
let join_type = join_operator.join_type;
let left_updating = left.is_updating();
let right_updating = right.is_updating();
let left_index = self.add_sql_operator(*left);
let right_index = self.add_sql_operator(*right);

let key_struct = join_operator.left_key.output_struct();

let inputs_updating = InputsUpdating {
left: left_updating,
right: right_updating,
};

let left_key_operator =
PlanOperator::RecordTransform(RecordTransform::KeyProjection(join_operator.left_key));
let right_key_operator =
PlanOperator::RecordTransform(RecordTransform::KeyProjection(join_operator.right_key));

let left_key_index = self.insert_operator(
left_key_operator,
PlanType::Keyed {
key: key_struct.clone(),
value: left_type.clone(),
},
PlanType::for_defs(key_struct.clone(), left_type.clone(), left_updating),
);

let right_key_index = self.insert_operator(
right_key_operator,
PlanType::Keyed {
key: key_struct.clone(),
value: right_type.clone(),
},
PlanType::for_defs(key_struct.clone(), right_type.clone(), right_updating),
);

let left_key_edge = PlanEdge {
Expand Down Expand Up @@ -1536,6 +1552,7 @@ impl PlanGraph {
left_type,
right_type,
join_type,
inputs_updating,
)
}
}
Expand Down Expand Up @@ -1596,6 +1613,7 @@ impl PlanGraph {

flatten_index
}

fn add_join_with_expiration(
&mut self,
left_index: NodeIndex,
Expand All @@ -1604,6 +1622,7 @@ impl PlanGraph {
left_struct: StructDef,
right_struct: StructDef,
join_type: JoinType,
inputs_updating: InputsUpdating,
) -> NodeIndex {
let join_node = PlanOperator::JoinWithExpiration {
left_expiration: Duration::from_secs(24 * 60 * 60),
Expand Down Expand Up @@ -1636,16 +1655,18 @@ impl PlanGraph {
left: left_struct,
right: right_struct,
},
inputs_updating.clone(),
);
let merge_output_type = match join_type {
JoinType::Inner => PlanType::Unkeyed(merge_type),
JoinType::Left | JoinType::Right | JoinType::Full => {
PlanType::Updating(Box::new(PlanType::Keyed {
key: key_struct,
value: merge_type,
}))
}

let updating = inputs_updating.left || inputs_updating.right;
let merge_output_type = match (join_type, updating) {
(JoinType::Inner, false) => PlanType::Unkeyed(merge_type),
_ => PlanType::Updating(Box::new(PlanType::Keyed {
key: key_struct,
value: merge_type,
})),
};

let merge_index = self.insert_operator(merge_operator, merge_output_type);

let merge_edge = PlanEdge {
Expand Down
7 changes: 7 additions & 0 deletions arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ impl<T: Data> UpdatingData<T> {
UpdatingData::Append(t) => t.clone(),
}
}

pub fn unwrap_append(&self) -> &T {
match self {
UpdatingData::Append(t) => t,
_ => panic!("UpdatingData is not an append"),
}
}
}

#[derive(Clone, Encode, Decode, Debug, Serialize, Deserialize, PartialEq)]
Expand Down
Loading

0 comments on commit 248d94d

Please sign in to comment.