From fccfd4068fdb05a0bd2117799a913e8414a86264 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 12:35:02 +0800 Subject: [PATCH 01/14] init --- proto/stream_plan.proto | 4 + .../src/optimizer/plan_node/logical_join.rs | 7 - .../plan_node/stream_temporal_join.rs | 91 +++++++++- src/stream/src/executor/temporal_join.rs | 171 +++++++++++++----- src/stream/src/from_proto/temporal_join.rs | 59 +++++- 5 files changed, 277 insertions(+), 55 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index b4393153b57a8..849da32d3ba96 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -435,6 +435,10 @@ message TemporalJoinNode { plan_common.StorageTableDesc table_desc = 7; // The output indices of the lookup side table repeated uint32 table_output_indices = 8; + // The state table used for non-append-only temporal join. + catalog.Table output_table = 9; + // The pk (`join_key` + `left_pk` + `right_pk`) of the output_table. + repeated uint32 output_table_pk_indices = 10; } message DynamicFilterNode { diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 47c5238bde2ba..571efee542c2b 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -1071,13 +1071,6 @@ impl LogicalJoin { // Enforce a shuffle for the temporal join LHS to let the scheduler be able to schedule the join fragment together with the RHS with a `no_shuffle` exchange. let left = required_dist.enforce(left, &Order::any()); - if !left.append_only() { - return Err(RwError::from(ErrorCode::NotSupported( - "Temporal join requires an append-only left input".into(), - "Please ensure your left input is append-only".into(), - ))); - } - // Extract the predicate from logical scan. Only pure scan is supported. let (new_scan, scan_predicate, project_expr) = logical_scan.predicate_pull_up(); // Construct output column to require column mapping diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index ecbdba1b32265..8ed3df1dbdf8e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -14,6 +14,8 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::TemporalJoinNode; @@ -27,25 +29,27 @@ use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary}; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary; -use crate::optimizer::plan_node::utils::IndicesDisplay; +use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::plan_node::{ EqJoinPredicate, EqJoinPredicateDisplay, StreamExchange, StreamTableScan, TryToStreamPb, }; use crate::scheduler::SchedulerResult; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::ColIndexMappingRewriteExt; +use crate::TableCatalog; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamTemporalJoin { pub base: PlanBase, core: generic::Join, eq_join_predicate: EqJoinPredicate, + append_only: bool, } impl StreamTemporalJoin { pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { assert!(core.join_type == JoinType::Inner || core.join_type == JoinType::LeftOuter); - assert!(core.left.append_only()); + let append_only = core.left.append_only(); let right = core.right.clone(); let exchange: &StreamExchange = right .as_stream_exchange() @@ -79,6 +83,7 @@ impl StreamTemporalJoin { base, core, eq_join_predicate, + append_only, } } @@ -90,6 +95,77 @@ impl StreamTemporalJoin { pub fn eq_join_predicate(&self) -> &EqJoinPredicate { &self.eq_join_predicate } + + /// Return output table catalog and its `pk_indices`. + /// (`join_key` + `left_pk` + `right_pk`) -> (`right_table_schema` + `join_key` + `left_pk`) + /// + /// Write pattern: + /// for each output row (with insert op), construct the output table pk and insert the row. + /// Read pattern: + /// for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows. + pub fn infer_output_table_catalog( + &self, + right_scan: &StreamTableScan, + ) -> (TableCatalog, Vec) { + let internal_table_dist_keys = self + .base + .distribution() + .dist_column_indices() + .into_iter() + .cloned() + .collect_vec(); + + let left_eq_indexes = self.eq_join_predicate.left_eq_indexes(); + let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len(); + + // let l2o = self.core.l2i_col_mapping().composite(&self.core.i2o_col_mapping()); + // // Since temporal join could only be inner and left join, left join key and left pk must be present in join output schema. + // let join_key = l2o.try_map_all(self.eq_join_predicate().left_eq_indexes()).unwrap(); + // let left_pk = l2o.try_map_all(self.left().stream_key().unwrap().iter().cloned()).unwrap(); + // let temporal_join_stream_key = self.core.stream_key().unwrap(); + // // reorder the temporal join stream key by placing join_key and left_pk at the beginning which is equivalent to join_key + left_pk + right_pk. + // let mut pk_indices = join_key.into_iter().chain(left_pk.into_iter()).collect_vec(); + // for idx in temporal_join_stream_key { + // if !pk_indices.contains(&idx) { + // pk_indices.push(idx); + // } + // } + + // Build internal table + let mut internal_table_catalog_builder = TableCatalogBuilder::default(); + // Add right table fields + let right_table_desc = right_scan.core().table_desc.clone(); + for field in right_table_desc + .columns + .iter() + .map(|col| Field::from_with_table_name_prefix(col, &right_scan.core().table_name)) + { + internal_table_catalog_builder.add_column(&field); + } + // Add join_key + left_pk + for field in left_eq_indexes + .iter() + .chain(self.core.left.stream_key().unwrap()) + .map(|idx| &self.core.left.schema().fields()[*idx]) + { + internal_table_catalog_builder.add_column(field); + } + + let mut pk_indices = vec![]; + pk_indices.extend( + right_table_desc.columns.len()..(right_table_desc.columns.len() + read_prefix_len_hint), + ); + pk_indices.extend(right_table_desc.stream_key.clone()); + + pk_indices.iter().for_each(|idx| { + internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) + }); + + ( + internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint), + pk_indices, + ) + } } impl Distill for StreamTemporalJoin { @@ -97,6 +173,7 @@ impl Distill for StreamTemporalJoin { let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); + vec.push(("append_only", Pretty::debug(&self.append_only))); let concat_schema = self.core.concat_schema(); vec.push(( @@ -161,6 +238,10 @@ impl TryToStreamPb for StreamTemporalJoin { .as_stream_table_scan() .expect("should be a stream table scan"); + let (output_table, pk_indices) = self.infer_output_table_catalog(scan); + + let pk_indices = pk_indices.iter().map(|idx| *idx as u32).collect_vec(); + Ok(NodeBody::TemporalJoin(TemporalJoinNode { join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, @@ -174,6 +255,12 @@ impl TryToStreamPb for StreamTemporalJoin { output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), table_desc: Some(scan.core().table_desc.try_to_protobuf()?), table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(), + output_table: if self.append_only { + Some(output_table.to_internal_table_prost()) + } else { + None + }, + output_table_pk_indices: if self.append_only { pk_indices } else { vec![] }, })) } } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 1d374189cb5e0..27f6a743665dc 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -45,12 +45,18 @@ use super::{ }; use crate::cache::{cache_may_stale, new_with_hasher_in, ManagedLruCache}; use crate::common::metrics::MetricsInfo; +use crate::common::table::state_table::StateTable; use crate::executor::join::builder::JoinStreamChunkBuilder; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ActorContextRef, Executor, Watermark}; use crate::task::AtomicU64Ref; -pub struct TemporalJoinExecutor { +pub struct TemporalJoinExecutor< + K: HashKey, + S: StateStore, + const T: JoinTypePrimitive, + const A: bool, +> { ctx: ActorContextRef, #[allow(dead_code)] info: ExecutorInfo, @@ -63,9 +69,8 @@ pub struct TemporalJoinExecutor, output_indices: Vec, chunk_size: usize, - // TODO: update metrics - #[allow(dead_code)] - metrics: Arc, + output_table: Option>, + output_table_pk_indices: Vec, } #[derive(Default)] @@ -425,7 +430,13 @@ mod phase1 { #[try_stream(ok = StreamChunk, error = StreamExecutorError)] #[allow(clippy::too_many_arguments)] - pub(super) async fn handle_chunk<'a, K: HashKey, S: StateStore, E: Phase1Evaluation>( + pub(super) async fn handle_chunk< + 'a, + K: HashKey, + S: StateStore, + E: Phase1Evaluation, + const A: bool, + >( chunk_size: usize, right_size: usize, full_schema: Vec, @@ -435,45 +446,120 @@ mod phase1 { null_matched: &'a K::Bitmap, chunk: StreamChunk, ) { - let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); - let keys = K::build_many(left_join_keys, chunk.data_chunk()); - let to_fetch_keys = chunk - .visibility() - .iter() - .zip_eq_debug(keys.iter()) - .filter_map(|(vis, key)| if vis { Some(key) } else { None }); - right_table - .fetch_or_promote_keys(to_fetch_keys, epoch) - .await?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - let mut matched = false; - if key.null_bitmap().is_subset(null_matched) - && let join_entry = right_table.force_peek(&key) - && !join_entry.is_empty() - { - matched = true; - for right_row in join_entry.cached.values() { - if let Some(chunk) = - E::append_matched_row(op, &mut builder, left_row, right_row) - { - yield chunk; + // Append-only temporal join + if A == true { + let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); + let keys = K::build_many(left_join_keys, chunk.data_chunk()); + let to_fetch_keys = chunk + .visibility() + .iter() + .zip_eq_debug(keys.iter()) + .filter_map(|(vis, key)| if vis { Some(key) } else { None }); + right_table + .fetch_or_promote_keys(to_fetch_keys, epoch) + .await?; + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; + let mut matched = false; + if key.null_bitmap().is_subset(null_matched) + && let join_entry = right_table.force_peek(&key) + && !join_entry.is_empty() + { + matched = true; + for right_row in join_entry.cached.values() { + if let Some(chunk) = + E::append_matched_row(op, &mut builder, left_row, right_row) + { + yield chunk; + } } } } - if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { + if let Some(chunk) = builder.take() { + yield chunk; + } + } else { + // Non-append-only temporal join + let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); + let keys = K::build_many(left_join_keys, chunk.data_chunk()); + let to_fetch_keys = chunk + .visibility() + .iter() + .zip_eq_debug(keys.iter()) + .zip_eq_debug(chunk.ops()) + .filter_map(|((vis, key), op)| { + if vis { + match op { + Op::Insert | Op::UpdateInsert => Some(key), + Op::Delete | Op::UpdateDelete => None, + } + } else { + None + } + }); + right_table + .fetch_or_promote_keys(to_fetch_keys, epoch) + .await?; + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; + + match op { + Op::Insert | Op::UpdateInsert => { + let mut matched = false; + if key.null_bitmap().is_subset(null_matched) + && let join_entry = right_table.force_peek(&key) + && !join_entry.is_empty() + { + matched = true; + for right_row in join_entry.cached.values() { + if let Some(chunk) = + E::append_matched_row(op, &mut builder, left_row, right_row) + { + yield chunk; + } + } + } + if let Some(chunk) = + E::match_end(&mut builder, op, left_row, right_size, matched) + { + yield chunk; + } + } + Op::Delete | Op::UpdateDelete => { + todo!() + // let mut matched = false; + // if key.null_bitmap().is_subset(null_matched) + // && let output_rows = output_table.fetch(&key) + // { + // matched = true; + // for output_row in output_rows { + // if let Some(chunk) = + // E::append_matched_row(op, &mut builder, left_row, right_row) + // { + // yield chunk; + // } + // } + // } + // if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { + // yield chunk; + // } + } + } + } + if let Some(chunk) = builder.take() { yield chunk; } - } - if let Some(chunk) = builder.take() { - yield chunk; } } } -impl TemporalJoinExecutor { +impl + TemporalJoinExecutor +{ #[allow(clippy::too_many_arguments)] pub fn new( ctx: ActorContextRef, @@ -492,6 +578,8 @@ impl TemporalJoinExecutor metrics: Arc, chunk_size: usize, join_key_data_types: Vec, + output_table: Option>, + output_table_pk_indices: Vec, ) -> Self { let alloc = StatsAlloc::new(Global).shared(); @@ -528,7 +616,8 @@ impl TemporalJoinExecutor condition, output_indices, chunk_size, - metrics, + output_table, + output_table_pk_indices, } } @@ -591,7 +680,7 @@ impl TemporalJoinExecutor let full_schema = full_schema.clone(); if T == JoinType::Inner { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -621,7 +710,7 @@ impl TemporalJoinExecutor } } else if let Some(ref cond) = self.condition { // Joined result without evaluating non-lookup conditions. - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -670,7 +759,7 @@ impl TemporalJoinExecutor // The last row should always be marker row, assert_eq!(matched_count, 0); } else { - let st1 = phase1::handle_chunk::( + let st1 = phase1::handle_chunk::( self.chunk_size, right_size, full_schema, @@ -710,8 +799,8 @@ impl TemporalJoinExecutor } } -impl Execute - for TemporalJoinExecutor +impl Execute + for TemporalJoinExecutor { fn execute(self: Box) -> super::BoxedMessageStream { self.into_stream().boxed() diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index 15badec97e5cc..f2976361f7d53 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -22,6 +22,7 @@ use risingwave_pb::plan_common::{JoinType as JoinTypeProto, StorageTableDesc}; use risingwave_storage::table::batch_table::storage_table::StorageTable; use super::*; +use crate::common::table::state_table::StateTable; use crate::executor::monitor::StreamingMetrics; use crate::executor::{ActorContextRef, JoinType, TemporalJoinExecutor}; use crate::task::AtomicU64Ref; @@ -45,9 +46,9 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { .collect_vec(); StorageTable::new_partial( - store, + store.clone(), column_ids, - params.vnode_bitmap.map(Into::into), + params.vnode_bitmap.clone().map(Into::into), table_desc, ) }; @@ -99,6 +100,33 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { .map(|idx| source_l.schema().fields[*idx].data_type()) .collect_vec(); + let output_table_pk_indices = node + .get_output_table_pk_indices() + .iter() + .map(|&x| x as usize) + .collect_vec(); + + let output_table = node.get_output_table(); + let output_table = match output_table { + Ok(output_table) => { + let vnodes = Arc::new( + params + .vnode_bitmap + .expect("vnodes not set for temporal join"), + ); + Some( + StateTable::from_table_catalog( + output_table, + store.clone(), + Some(vnodes.clone()), + ) + .await, + ) + } + Err(_) => None, + }; + let append_only = output_table.is_none(); + let dispatcher_args = TemporalJoinExecutorDispatcherArgs { ctx: params.actor_context, info: params.info.clone(), @@ -117,6 +145,9 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { metrics: params.executor_stats, join_type_proto: node.get_join_type()?, join_key_data_types, + output_table, + output_table_pk_indices, + append_only, }; Ok((params.info, dispatcher_args.dispatch()?).into()) @@ -141,6 +172,9 @@ struct TemporalJoinExecutorDispatcherArgs { metrics: Arc, join_type_proto: JoinTypeProto, join_key_data_types: Vec, + output_table: Option>, + output_table_pk_indices: Vec, + append_only: bool, } impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs { @@ -149,11 +183,12 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs fn dispatch_impl(self) -> Self::Output { /// This macro helps to fill the const generic type parameter. macro_rules! build { - ($join_type:ident) => { + ($join_type:ident, $append_only:ident) => { Ok(Box::new(TemporalJoinExecutor::< K, S, { JoinType::$join_type }, + { $append_only }, >::new( self.ctx, self.info, @@ -171,12 +206,26 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs self.metrics, self.chunk_size, self.join_key_data_types, + self.output_table, + self.output_table_pk_indices, ))) }; } match self.join_type_proto { - JoinTypeProto::Inner => build!(Inner), - JoinTypeProto::LeftOuter => build!(LeftOuter), + JoinTypeProto::Inner => { + if self.append_only { + build!(Inner, true) + } else { + build!(Inner, false) + } + } + JoinTypeProto::LeftOuter => { + if self.append_only { + build!(LeftOuter, true) + } else { + build!(LeftOuter, false) + } + } _ => unreachable!(), } } From 10004cc9b858fd486d642db5b5a7b3e2b527ab29 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 15:42:19 +0800 Subject: [PATCH 02/14] only work for inner join --- proto/stream_plan.proto | 4 +- src/common/src/util/stream_graph_visitor.rs | 3 + .../plan_node/stream_temporal_join.rs | 73 ++++--------- src/stream/src/executor/temporal_join.rs | 103 ++++++++++++++---- src/stream/src/from_proto/temporal_join.rs | 31 ++---- 5 files changed, 113 insertions(+), 101 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 849da32d3ba96..e194396a0a024 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -436,9 +436,7 @@ message TemporalJoinNode { // The output indices of the lookup side table repeated uint32 table_output_indices = 8; // The state table used for non-append-only temporal join. - catalog.Table output_table = 9; - // The pk (`join_key` + `left_pk` + `right_pk`) of the output_table. - repeated uint32 output_table_pk_indices = 10; + catalog.Table memo_table = 9; } message DynamicFilterNode { diff --git a/src/common/src/util/stream_graph_visitor.rs b/src/common/src/util/stream_graph_visitor.rs index faab5ddab2ee0..973933b296c6c 100644 --- a/src/common/src/util/stream_graph_visitor.rs +++ b/src/common/src/util/stream_graph_visitor.rs @@ -109,6 +109,9 @@ pub fn visit_stream_node_tables_inner( always!(node.right_table, "HashJoinRight"); always!(node.right_degree_table, "HashJoinDegreeRight"); } + NodeBody::TemporalJoin(node) => { + optional!(node.memo_table, "TemporalJoinMemo"); + } NodeBody::DynamicFilter(node) => { if node.condition_always_relax { always!(node.left_table, "DynamicFilterLeftNotSatisfy"); diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 8ed3df1dbdf8e..6d4a1cc9b74a1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -14,7 +14,6 @@ use itertools::Itertools; use pretty_xmlish::{Pretty, XmlNode}; -use risingwave_common::catalog::{Field, Schema}; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; @@ -28,6 +27,7 @@ use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary}; use crate::expr::{Expr, ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; +use crate::optimizer::plan_node::generic::GenericPlanNode; use crate::optimizer::plan_node::plan_tree_node::PlanTreeNodeUnary; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::plan_node::{ @@ -96,51 +96,23 @@ impl StreamTemporalJoin { &self.eq_join_predicate } - /// Return output table catalog and its `pk_indices`. - /// (`join_key` + `left_pk` + `right_pk`) -> (`right_table_schema` + `join_key` + `left_pk`) + /// Return memo-table catalog and its `pk_indices`. + /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) /// /// Write pattern: /// for each output row (with insert op), construct the output table pk and insert the row. /// Read pattern: /// for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows. - pub fn infer_output_table_catalog( - &self, - right_scan: &StreamTableScan, - ) -> (TableCatalog, Vec) { - let internal_table_dist_keys = self - .base - .distribution() - .dist_column_indices() - .into_iter() - .cloned() - .collect_vec(); - + pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog { let left_eq_indexes = self.eq_join_predicate.left_eq_indexes(); let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len(); - // let l2o = self.core.l2i_col_mapping().composite(&self.core.i2o_col_mapping()); - // // Since temporal join could only be inner and left join, left join key and left pk must be present in join output schema. - // let join_key = l2o.try_map_all(self.eq_join_predicate().left_eq_indexes()).unwrap(); - // let left_pk = l2o.try_map_all(self.left().stream_key().unwrap().iter().cloned()).unwrap(); - // let temporal_join_stream_key = self.core.stream_key().unwrap(); - // // reorder the temporal join stream key by placing join_key and left_pk at the beginning which is equivalent to join_key + left_pk + right_pk. - // let mut pk_indices = join_key.into_iter().chain(left_pk.into_iter()).collect_vec(); - // for idx in temporal_join_stream_key { - // if !pk_indices.contains(&idx) { - // pk_indices.push(idx); - // } - // } - // Build internal table let mut internal_table_catalog_builder = TableCatalogBuilder::default(); // Add right table fields - let right_table_desc = right_scan.core().table_desc.clone(); - for field in right_table_desc - .columns - .iter() - .map(|col| Field::from_with_table_name_prefix(col, &right_scan.core().table_name)) - { - internal_table_catalog_builder.add_column(&field); + let right_scan_schema = right_scan.core().schema(); + for field in right_scan_schema.fields() { + internal_table_catalog_builder.add_column(field); } // Add join_key + left_pk for field in left_eq_indexes @@ -152,19 +124,19 @@ impl StreamTemporalJoin { } let mut pk_indices = vec![]; - pk_indices.extend( - right_table_desc.columns.len()..(right_table_desc.columns.len() + read_prefix_len_hint), - ); - pk_indices.extend(right_table_desc.stream_key.clone()); + pk_indices + .extend(right_scan_schema.len()..(right_scan_schema.len() + read_prefix_len_hint)); + pk_indices.extend(right_scan.stream_key().unwrap()); pk_indices.iter().for_each(|idx| { internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) }); - ( - internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint), - pk_indices, - ) + let internal_table_dist_keys = (right_scan_schema.len() + ..(right_scan_schema.len() + left_eq_indexes.len())) + .into_iter() + .collect(); + internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint) } } @@ -219,7 +191,7 @@ impl_plan_tree_node_for_binary! { StreamTemporalJoin } impl TryToStreamPb for StreamTemporalJoin { fn try_to_stream_prost_body( &self, - _state: &mut BuildFragmentGraphState, + state: &mut BuildFragmentGraphState, ) -> SchedulerResult { let left_jk_indices = self.eq_join_predicate.left_eq_indexes(); let right_jk_indices = self.eq_join_predicate.right_eq_indexes(); @@ -238,10 +210,6 @@ impl TryToStreamPb for StreamTemporalJoin { .as_stream_table_scan() .expect("should be a stream table scan"); - let (output_table, pk_indices) = self.infer_output_table_catalog(scan); - - let pk_indices = pk_indices.iter().map(|idx| *idx as u32).collect_vec(); - Ok(NodeBody::TemporalJoin(TemporalJoinNode { join_type: self.core.join_type as i32, left_key: left_jk_indices_prost, @@ -255,12 +223,13 @@ impl TryToStreamPb for StreamTemporalJoin { output_indices: self.core.output_indices.iter().map(|&x| x as u32).collect(), table_desc: Some(scan.core().table_desc.try_to_protobuf()?), table_output_indices: scan.core().output_col_idx.iter().map(|&i| i as _).collect(), - output_table: if self.append_only { - Some(output_table.to_internal_table_prost()) - } else { + memo_table: if self.append_only { None + } else { + let mut memo_table = self.infer_memo_table_catalog(scan); + memo_table = memo_table.with_id(state.gen_table_id_wrapped()); + Some(memo_table.to_internal_table_prost()) }, - output_table_pk_indices: if self.append_only { pk_indices } else { vec![] }, })) } } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 27f6a743665dc..ca0877ccf43c2 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -22,6 +22,7 @@ use either::Either; use futures::stream::{self, PollNext}; use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::{for_await, try_stream}; +use itertools::Itertools; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; use risingwave_common::array::{Op, StreamChunk}; @@ -69,8 +70,7 @@ pub struct TemporalJoinExecutor< condition: Option, output_indices: Vec, chunk_size: usize, - output_table: Option>, - output_table_pk_indices: Vec, + memo_table: Option>, } #[derive(Default)] @@ -315,17 +315,21 @@ async fn align_input(left: Executor, right: Executor) { } mod phase1 { + use std::ops::Bound; + + use futures::{pin_mut, StreamExt}; use futures_async_stream::try_stream; use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::hash::{HashKey, NullBitmap}; - use risingwave_common::row::{self, Row, RowExt}; + use risingwave_common::row::{self, OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, DatumRef}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_hummock_sdk::HummockEpoch; use risingwave_storage::StateStore; use super::{StreamExecutorError, TemporalSide}; + use crate::common::table::state_table::StateTable; pub(super) trait Phase1Evaluation { /// Called when a matched row is found. @@ -443,6 +447,8 @@ mod phase1 { epoch: HummockEpoch, left_join_keys: &'a [usize], right_table: &'a mut TemporalSide, + memo_table_lookup_prefix: &'a [usize], + memo_table: &'a mut Option>, null_matched: &'a K::Bitmap, chunk: StreamChunk, ) { @@ -476,6 +482,9 @@ mod phase1 { } } } + if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { + yield chunk; + } } if let Some(chunk) = builder.take() { yield chunk; @@ -484,6 +493,7 @@ mod phase1 { // Non-append-only temporal join let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); let keys = K::build_many(left_join_keys, chunk.data_chunk()); + let memo_table = memo_table.as_mut().unwrap(); let to_fetch_keys = chunk .visibility() .iter() @@ -516,6 +526,10 @@ mod phase1 { { matched = true; for right_row in join_entry.cached.values() { + let right_row: OwnedRow = right_row.clone(); + memo_table.insert(right_row.clone().chain( + left_row.project(memo_table_lookup_prefix).into_owned_row(), + )); if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) { @@ -530,23 +544,39 @@ mod phase1 { } } Op::Delete | Op::UpdateDelete => { - todo!() - // let mut matched = false; - // if key.null_bitmap().is_subset(null_matched) - // && let output_rows = output_table.fetch(&key) - // { - // matched = true; - // for output_row in output_rows { - // if let Some(chunk) = - // E::append_matched_row(op, &mut builder, left_row, right_row) - // { - // yield chunk; - // } - // } - // } - // if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { - // yield chunk; - // } + let mut memo_rows_to_delete = vec![]; + let mut matched = false; + if key.null_bitmap().is_subset(null_matched) { + let sub_range: &(Bound, Bound) = + &(Bound::Unbounded, Bound::Unbounded); + let prefix = left_row.project(memo_table_lookup_prefix); + let state_table_iter = memo_table + .iter_with_prefix(prefix, sub_range, Default::default()) + .await?; + pin_mut!(state_table_iter); + + matched = true; + while let Some(memo_row) = state_table_iter.next().await { + let memo_row = memo_row?.into_owned_row(); + memo_rows_to_delete.push(memo_row.clone()); + if let Some(chunk) = E::append_matched_row( + op, + &mut builder, + left_row, + memo_row.slice(0..right_size), + ) { + yield chunk; + } + } + } + for memo_row in memo_rows_to_delete { + memo_table.delete(memo_row); + } + if let Some(chunk) = + E::match_end(&mut builder, op, left_row, right_size, matched) + { + yield chunk; + } } } } @@ -578,8 +608,7 @@ impl metrics: Arc, chunk_size: usize, join_key_data_types: Vec, - output_table: Option>, - output_table_pk_indices: Vec, + memo_table: Option>, ) -> Self { let alloc = StatsAlloc::new(Global).shared(); @@ -616,8 +645,7 @@ impl condition, output_indices, chunk_size, - output_table, - output_table_pk_indices, + memo_table, } } @@ -644,7 +672,14 @@ impl let left_to_output: HashMap = HashMap::from_iter(left_map.iter().cloned()); + let left_stream_key_indices = self.left.pk_indices().to_vec(); let right_stream_key_indices = self.right.pk_indices().to_vec(); + let memo_table_lookup_prefix = self + .left_join_keys + .iter() + .cloned() + .chain(left_stream_key_indices) + .collect_vec(); let null_matched = K::Bitmap::from_bool_vec(self.null_safe); @@ -661,6 +696,8 @@ impl .chain(self.right.schema().data_types().into_iter()) .collect(); + let mut wait_first_barrier = true; + #[for_await] for msg in align_input(self.left, self.right) { self.right_table.cache.evict(); @@ -687,6 +724,8 @@ impl epoch, &self.left_join_keys, &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, &null_matched, chunk, ); @@ -717,6 +756,8 @@ impl epoch, &self.left_join_keys, &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, &null_matched, chunk, ); @@ -766,6 +807,8 @@ impl epoch, &self.left_join_keys, &mut self.right_table, + &memo_table_lookup_prefix, + &mut self.memo_table, &null_matched, chunk, ); @@ -778,6 +821,18 @@ impl } } InternalMessage::Barrier(updates, barrier) => { + if A == false { + if wait_first_barrier { + wait_first_barrier = false; + self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); + } else { + self.memo_table + .as_mut() + .unwrap() + .commit(barrier.epoch) + .await?; + } + } if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { let prev_vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone()); diff --git a/src/stream/src/from_proto/temporal_join.rs b/src/stream/src/from_proto/temporal_join.rs index f2976361f7d53..805049a296883 100644 --- a/src/stream/src/from_proto/temporal_join.rs +++ b/src/stream/src/from_proto/temporal_join.rs @@ -100,32 +100,22 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { .map(|idx| source_l.schema().fields[*idx].data_type()) .collect_vec(); - let output_table_pk_indices = node - .get_output_table_pk_indices() - .iter() - .map(|&x| x as usize) - .collect_vec(); - - let output_table = node.get_output_table(); - let output_table = match output_table { - Ok(output_table) => { + let memo_table = node.get_memo_table(); + let memo_table = match memo_table { + Ok(memo_table) => { let vnodes = Arc::new( params .vnode_bitmap .expect("vnodes not set for temporal join"), ); Some( - StateTable::from_table_catalog( - output_table, - store.clone(), - Some(vnodes.clone()), - ) - .await, + StateTable::from_table_catalog(memo_table, store.clone(), Some(vnodes.clone())) + .await, ) } Err(_) => None, }; - let append_only = output_table.is_none(); + let append_only = memo_table.is_none(); let dispatcher_args = TemporalJoinExecutorDispatcherArgs { ctx: params.actor_context, @@ -145,8 +135,7 @@ impl ExecutorBuilder for TemporalJoinExecutorBuilder { metrics: params.executor_stats, join_type_proto: node.get_join_type()?, join_key_data_types, - output_table, - output_table_pk_indices, + memo_table, append_only, }; @@ -172,8 +161,7 @@ struct TemporalJoinExecutorDispatcherArgs { metrics: Arc, join_type_proto: JoinTypeProto, join_key_data_types: Vec, - output_table: Option>, - output_table_pk_indices: Vec, + memo_table: Option>, append_only: bool, } @@ -206,8 +194,7 @@ impl HashKeyDispatcher for TemporalJoinExecutorDispatcherArgs self.metrics, self.chunk_size, self.join_key_data_types, - self.output_table, - self.output_table_pk_indices, + self.memo_table, ))) }; } From 2ab58c78b67d6e5702a0b07f56f22b1cbe1ae20d Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 16:32:03 +0800 Subject: [PATCH 03/14] add test and fix bug --- .../{ => append_only}/issue_15257.slt | 0 .../{ => append_only}/temporal_join.slt | 0 .../temporal_join_multiple_rows.slt | 0 .../temporal_join_non_loopup_cond.slt | 0 .../temporal_join_watermark.slt | 0 .../temporal_join_with_index.slt | 0 .../non_append_only/temporal_join.slt | 47 +++++++++++++++++++ .../plan_node/stream_temporal_join.rs | 6 +-- src/stream/src/executor/temporal_join.rs | 8 ++-- 9 files changed, 54 insertions(+), 7 deletions(-) rename e2e_test/streaming/temporal_join/{ => append_only}/issue_15257.slt (100%) rename e2e_test/streaming/temporal_join/{ => append_only}/temporal_join.slt (100%) rename e2e_test/streaming/temporal_join/{ => append_only}/temporal_join_multiple_rows.slt (100%) rename e2e_test/streaming/temporal_join/{ => append_only}/temporal_join_non_loopup_cond.slt (100%) rename e2e_test/streaming/temporal_join/{ => append_only}/temporal_join_watermark.slt (100%) rename e2e_test/streaming/temporal_join/{ => append_only}/temporal_join_with_index.slt (100%) create mode 100644 e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt diff --git a/e2e_test/streaming/temporal_join/issue_15257.slt b/e2e_test/streaming/temporal_join/append_only/issue_15257.slt similarity index 100% rename from e2e_test/streaming/temporal_join/issue_15257.slt rename to e2e_test/streaming/temporal_join/append_only/issue_15257.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_multiple_rows.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_multiple_rows.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_non_loopup_cond.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_non_loopup_cond.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_watermark.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_watermark.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_watermark.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_watermark.slt diff --git a/e2e_test/streaming/temporal_join/temporal_join_with_index.slt b/e2e_test/streaming/temporal_join/append_only/temporal_join_with_index.slt similarity index 100% rename from e2e_test/streaming/temporal_join/temporal_join_with_index.slt rename to e2e_test/streaming/temporal_join/append_only/temporal_join_with_index.slt diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt new file mode 100644 index 0000000000000..b98658b6e8d2e --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join.slt @@ -0,0 +1,47 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 + +statement ok +insert into stream values(1, 11, 111); + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +statement ok +delete from version; + +query IIII rowsort +select * from v; +---- +1 11 1 11 +1 11 NULL NULL + +statement ok +update stream set a1 = 22, b1 = 222 + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 6d4a1cc9b74a1..fbf1f42861552 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -132,10 +132,8 @@ impl StreamTemporalJoin { internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) }); - let internal_table_dist_keys = (right_scan_schema.len() - ..(right_scan_schema.len() + left_eq_indexes.len())) - .into_iter() - .collect(); + let internal_table_dist_keys = + (right_scan_schema.len()..(right_scan_schema.len() + left_eq_indexes.len())).collect(); internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint) } } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index ca0877ccf43c2..eb28a30a040e2 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -453,7 +453,7 @@ mod phase1 { chunk: StreamChunk, ) { // Append-only temporal join - if A == true { + if A { let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); let keys = K::build_many(left_join_keys, chunk.data_chunk()); let to_fetch_keys = chunk @@ -527,6 +527,7 @@ mod phase1 { matched = true; for right_row in join_entry.cached.values() { let right_row: OwnedRow = right_row.clone(); + // Insert into memo table memo_table.insert(right_row.clone().chain( left_row.project(memo_table_lookup_prefix).into_owned_row(), )); @@ -555,8 +556,8 @@ mod phase1 { .await?; pin_mut!(state_table_iter); - matched = true; while let Some(memo_row) = state_table_iter.next().await { + matched = true; let memo_row = memo_row?.into_owned_row(); memo_rows_to_delete.push(memo_row.clone()); if let Some(chunk) = E::append_matched_row( @@ -570,6 +571,7 @@ mod phase1 { } } for memo_row in memo_rows_to_delete { + // Delete from memo table memo_table.delete(memo_row); } if let Some(chunk) = @@ -821,7 +823,7 @@ impl } } InternalMessage::Barrier(updates, barrier) => { - if A == false { + if !A { if wait_first_barrier { wait_first_barrier = false; self.memo_table.as_mut().unwrap().init_epoch(barrier.epoch); From b780bcef967a4565ca754b7f752479b46d3dacd5 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 17:23:34 +0800 Subject: [PATCH 04/14] reduce op from update to delete and insert --- .../temporal_join_non_lookup_cond.slt | 100 ++++++++++++++++++ src/stream/src/executor/temporal_join.rs | 8 +- 2 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt new file mode 100644 index 0000000000000..2c0cf094db998 --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_non_lookup_cond.slt @@ -0,0 +1,100 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; + +statement ok +insert into stream values(1, 11, 111); + +statement ok +insert into version values(1, 12, 111); + +statement ok +insert into stream values(1, 13, 111); + +statement ok +delete from version; + +query IIII rowsort +select * from v; +---- +1 11 NULL NULL +1 13 1 12 + +statement ok +delete from stream; + +statement ok +insert into version values(2, 22, 222); + +statement ok +insert into stream values(2, 23, 222); + +query IIII rowsort +select * from v; +---- +2 23 2 22 + +statement ok +delete from stream; + +query IIII rowsort +select * from v; +---- + + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; + +statement ok +insert into version values (1, 12, 111), (2, 12, 111); + +statement ok +insert into stream values (1, 11, 111), (2, 13, 111); + +query IIII rowsort +select * from v; +---- +2 13 2 12 + +statement ok +update stream set a1 = 222, b1 = 333 where id1 = 1; + +statement ok +update stream set a1 = 2, b1 = 3 where id1 = 2; + +query IIII rowsort +select * from v; +---- +1 222 1 12 + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index eb28a30a040e2..79cbbba95e6d3 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -532,14 +532,14 @@ mod phase1 { left_row.project(memo_table_lookup_prefix).into_owned_row(), )); if let Some(chunk) = - E::append_matched_row(op, &mut builder, left_row, right_row) + E::append_matched_row(Op::Insert, &mut builder, left_row, right_row) { yield chunk; } } } if let Some(chunk) = - E::match_end(&mut builder, op, left_row, right_size, matched) + E::match_end(&mut builder, Op::Insert, left_row, right_size, matched) { yield chunk; } @@ -561,7 +561,7 @@ mod phase1 { let memo_row = memo_row?.into_owned_row(); memo_rows_to_delete.push(memo_row.clone()); if let Some(chunk) = E::append_matched_row( - op, + Op::Delete, &mut builder, left_row, memo_row.slice(0..right_size), @@ -575,7 +575,7 @@ mod phase1 { memo_table.delete(memo_row); } if let Some(chunk) = - E::match_end(&mut builder, op, left_row, right_size, matched) + E::match_end(&mut builder, Op::Delete, left_row, right_size, matched) { yield chunk; } From a5dfd4825d678edf722be874fccc0c752a359970 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 17:33:50 +0800 Subject: [PATCH 05/14] add tests --- .../temporal_join_with_index.slt | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt new file mode 100644 index 0000000000000..e9e7e59a483d2 --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index.slt @@ -0,0 +1,44 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create index idx on version (a2); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2; + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +query IIII rowsort +select * from v; +---- +1 11 1 11 + +statement ok +update stream set a1 = 22 where id1 = 1; + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + + From 0c56ff38857623923011a1659c6ff76f0e6712be Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 17:40:37 +0800 Subject: [PATCH 06/14] fix planner tests --- .../tests/testdata/input/temporal_join.yaml | 2 +- .../tests/testdata/output/nexmark.yaml | 6 +-- .../testdata/output/temporal_filter.yaml | 2 +- .../tests/testdata/output/temporal_join.yaml | 47 ++++++++++--------- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml index 500e4ae2c2984..dd9db2b17a7d7 100644 --- a/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml @@ -47,7 +47,7 @@ create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; expected_outputs: - - stream_error + - stream_plan - name: Temporal join type test sql: | create table stream(id1 int, a1 int, b1 int); diff --git a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml index 08916d1539c81..dcdd34ed2c153 100644 --- a/src/frontend/planner_test/tests/testdata/output/nexmark.yaml +++ b/src/frontend/planner_test/tests/testdata/output/nexmark.yaml @@ -1002,7 +1002,7 @@ ON mod(B.auction, 10000) = S.key sink_plan: |- StreamSink { type: append-only, columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr10018(hidden), side_input.key(hidden)] } - └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1011,7 +1011,7 @@ stream_plan: |- StreamMaterialize { columns: [auction, bidder, price, date_time, value, bid._row_id(hidden), $expr1(hidden), side_input.key(hidden)], stream_key: [bid._row_id, $expr1], pk_columns: [bid._row_id, $expr1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(bid._row_id, $expr1) } - └─StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├─StreamExchange { dist: HashShard($expr1) } │ └─StreamProject { exprs: [bid.auction, bid.bidder, bid.price, bid.date_time, (bid.auction % 10000:Int32) as $expr1, bid._row_id] } │ └─StreamTableScan { table: bid, columns: [bid.auction, bid.bidder, bid.price, bid.date_time, bid._row_id], stream_scan_type: ArrangementBackfill, stream_key: [bid._row_id], pk: [_row_id], dist: UpstreamHashShard(bid._row_id) } @@ -1024,7 +1024,7 @@ └── StreamExchange Hash([5, 6]) from 1 Fragment 1 - StreamTemporalJoin { type: Inner, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } + StreamTemporalJoin { type: Inner, append_only: true, predicate: $expr1 = side_input.key, output: [bid.auction, bid.bidder, bid.price, bid.date_time, side_input.value, bid._row_id, $expr1, side_input.key] } ├── StreamExchange Hash([4]) from 2 └── StreamExchange NoShuffle from 3 diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml index fd5f6aec627f2..7bbd43ce3c35c 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_filter.yaml @@ -333,7 +333,7 @@ StreamMaterialize { columns: [id1, a1, id2, v1, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck, watermark_columns: [v1] } └─StreamProject { exprs: [stream.id1, stream.a1, version.id2, stream.v1, stream._row_id], output_watermarks: [stream.v1] } └─StreamDynamicFilter { predicate: (stream.v1 > now), output_watermarks: [stream.v1], output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id], cleaned_by_watermark: true } - ├─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] } + ├─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, stream.v1, version.id2, stream._row_id] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.v1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } │ └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index ebf7af980d23b..58b0622759e45 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -7,7 +7,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2, output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -23,7 +23,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -36,7 +36,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -49,7 +49,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1, a1], pk_columns: [stream._row_id, id1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND stream.a1 = version.a2 AND (version.b2 <> version.a2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1, stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2, version.a2) } @@ -65,7 +65,7 @@ └─StreamSimpleAgg [append_only] { aggs: [sum0(count), count] } └─StreamExchange { dist: Single } └─StreamStatelessSimpleAgg { aggs: [count] } - └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream._row_id, stream.id1, version.id2] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -83,9 +83,14 @@ create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; - stream_error: |- - Not supported: Temporal join requires an append-only left input - HINT: Please ensure your left input is append-only + stream_plan: |- + StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } + └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } + └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + ├─StreamExchange { dist: HashShard(stream.id1) } + │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } + └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } + └─StreamTableScan { table: version, columns: [version.id2, version.a2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } - name: Temporal join type test sql: | create table stream(id1 int, a1 int, b1 int); @@ -106,9 +111,9 @@ stream_plan: |- StreamMaterialize { columns: [k, x1, x2, a1, b1, stream._row_id(hidden), version2.k(hidden)], stream_key: [stream._row_id, k], pk_columns: [stream._row_id, k], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } ├─StreamExchange { dist: HashShard(stream.k) } - │ └─StreamTemporalJoin { type: Inner, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } │ ├─StreamExchange { dist: HashShard(stream.k) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -128,9 +133,9 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -150,9 +155,9 @@ stream_plan: |- StreamMaterialize { columns: [id1, x1, id2, x2, a1, b1, stream._row_id(hidden), version2.id2(hidden)], stream_key: [stream._row_id, id1, id2], pk_columns: [stream._row_id, id1, id2], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } - └─StreamTemporalJoin { type: Inner, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -169,7 +174,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } @@ -183,7 +188,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } @@ -197,7 +202,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, stream.b1, a1], pk_columns: [stream._row_id, id2, stream.b1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.b1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.b2) } @@ -209,7 +214,7 @@ select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count; stream_plan: |- StreamMaterialize { columns: [a, count, t._row_id(hidden), $expr1(hidden)], stream_key: [t._row_id, $expr1], pk_columns: [t._row_id, $expr1], pk_conflict: NoCheck } - └─StreamTemporalJoin { type: LeftOuter, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } ├─StreamExchange { dist: Single } │ └─StreamProject { exprs: [t.a, t.a::Int64 as $expr1, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -224,7 +229,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx.a2) } @@ -239,7 +244,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } @@ -255,7 +260,7 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden), stream.c1(hidden)], stream_key: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_columns: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id, stream.b1, stream.c1) } - └─StreamTemporalJoin { type: LeftOuter, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream.c1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } From 897d55974cee0291b9839027246372fff6a5a352 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Fri, 12 Apr 2024 17:44:40 +0800 Subject: [PATCH 07/14] refactor tests --- .../tests/testdata/input/temporal_join.yaml | 24 +++++----- .../tests/testdata/output/temporal_join.yaml | 46 +++++++++---------- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml index dd9db2b17a7d7..7bf9769b4c431 100644 --- a/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/input/temporal_join.yaml @@ -43,7 +43,7 @@ - stream_error - name: Temporal join append only test sql: | - create table stream(id1 int, a1 int, b1 int); + create table stream(id1 int, a1 int, b1 int) append only; create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; expected_outputs: @@ -57,7 +57,7 @@ - stream_error - name: multi-way temporal join with the same key sql: | - create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table stream(k int, a1 int, b1 int); create table version1(k int, x1 int, y2 int, primary key (k)); create table version2(k int, x2 int, y2 int, primary key (k)); select stream.k, x1, x2, a1, b1 @@ -68,7 +68,7 @@ - stream_plan - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -79,7 +79,7 @@ - stream_plan - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -90,7 +90,7 @@ - stream_plan - name: temporal join with an index (distribution key size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2) distributed by (a2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -98,7 +98,7 @@ - stream_plan - name: temporal join with an index (distribution key size = 2) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -106,7 +106,7 @@ - stream_plan - name: temporal join with an index (index column size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -114,14 +114,14 @@ - stream_plan - name: temporal join with singleton table sql: | - create table t (a int) append only; + create table t (a int); create materialized view v as select count(*) from t; select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count; expected_outputs: - stream_plan - name: index selection for temporal join (with one index). sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx on version (a2, b2); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; @@ -129,7 +129,7 @@ - stream_plan - name: index selection for temporal join (with two indexes) and should choose the index with a longer prefix.. sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); @@ -138,7 +138,7 @@ - stream_plan - name: index selection for temporal join (with three indexes) and should choose primary table. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (b2); @@ -148,7 +148,7 @@ - stream_plan - name: index selection for temporal join (two index) and no one matches. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); diff --git a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml index 58b0622759e45..aa8887e98bef6 100644 --- a/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml +++ b/src/frontend/planner_test/tests/testdata/output/temporal_join.yaml @@ -80,13 +80,13 @@ HINT: Please add the primary key of the lookup table to the join condition and remove any other conditions - name: Temporal join append only test sql: | - create table stream(id1 int, a1 int, b1 int); + create table stream(id1 int, a1 int, b1 int) append only; create table version(id2 int, a2 int, b2 int, primary key (id2)); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 where a2 < 10; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden)], stream_key: [stream._row_id, id1], pk_columns: [stream._row_id, id1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream._row_id) } - └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } + └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version.id2 AND (version.a2 < 10:Int32), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } @@ -101,7 +101,7 @@ HINT: please check your temporal join syntax e.g. consider removing the right outer join if it is being used. - name: multi-way temporal join with the same key sql: | - create table stream(k int, a1 int, b1 int) APPEND ONLY; + create table stream(k int, a1 int, b1 int); create table version1(k int, x1 int, y2 int, primary key (k)); create table version2(k int, x2 int, y2 int, primary key (k)); select stream.k, x1, x2, a1, b1 @@ -113,7 +113,7 @@ └─StreamExchange { dist: HashShard(stream.k, stream._row_id) } └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version2.k, output: [stream.k, version1.x1, version2.x2, stream.a1, stream.b1, stream._row_id, version2.k] } ├─StreamExchange { dist: HashShard(stream.k) } - │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.k = version1.k, output: [stream.k, stream.a1, stream.b1, version1.x1, stream._row_id, version1.k] } │ ├─StreamExchange { dist: HashShard(stream.k) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.k, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -123,7 +123,7 @@ └─StreamTableScan { table: version2, columns: [version2.k, version2.x2], stream_scan_type: UpstreamOnly, stream_key: [version2.k], pk: [k], dist: UpstreamHashShard(version2.k) } - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -135,7 +135,7 @@ └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -145,7 +145,7 @@ └─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], stream_scan_type: UpstreamOnly, stream_key: [version2.id2], pk: [id2], dist: UpstreamHashShard(version2.id2) } - name: multi-way temporal join with different keys sql: | - create table stream(id1 int, id2 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, id2 int, a1 int, b1 int); create table version1(id1 int, x1 int, y2 int, primary key (id1)); create table version2(id2 int, x2 int, y2 int, primary key (id2)); select stream.id1, x1, stream.id2, x2, a1, b1 @@ -157,7 +157,7 @@ └─StreamExchange { dist: HashShard(stream.id1, stream.id2, stream._row_id) } └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id2 = version2.id2, output: [stream.id1, version1.x1, stream.id2, version2.x2, stream.a1, stream.b1, stream._row_id, version2.id2] } ├─StreamExchange { dist: HashShard(stream.id2) } - │ └─StreamTemporalJoin { type: Inner, append_only: true, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } + │ └─StreamTemporalJoin { type: Inner, append_only: false, predicate: stream.id1 = version1.id1, output: [stream.id1, stream.id2, stream.a1, stream.b1, version1.x1, stream._row_id, version1.id1] } │ ├─StreamExchange { dist: HashShard(stream.id1) } │ │ └─StreamFilter { predicate: (stream.a1 < 10:Int32) } │ │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.id2, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } @@ -167,54 +167,54 @@ └─StreamTableScan { table: version2, columns: [version2.id2, version2.x2], stream_scan_type: UpstreamOnly, stream_key: [version2.id2], pk: [id2], dist: UpstreamHashShard(version2.id2) } - name: temporal join with an index (distribution key size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2) distributed by (a2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } └─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } - name: temporal join with an index (distribution key size = 2) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (a2, b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } └─StreamTableScan { table: idx2, columns: [idx2.a2, idx2.b2, idx2.id2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } - name: temporal join with an index (index column size = 1) sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx2 on version (b2); select id1, a1, id2, a2 from stream left join idx2 FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, stream.b1, a1], pk_columns: [stream._row_id, id2, stream.b1, a1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.b1 = idx2.b2 AND (stream.a1 = idx2.a2), output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.b1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.b2) } └─StreamTableScan { table: idx2, columns: [idx2.b2, idx2.id2, idx2.a2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [b2, id2], dist: UpstreamHashShard(idx2.b2) } - name: temporal join with singleton table sql: | - create table t (a int) append only; + create table t (a int); create materialized view v as select count(*) from t; select * from t left join v FOR SYSTEM_TIME AS OF PROCTIME() on a = count; stream_plan: |- StreamMaterialize { columns: [a, count, t._row_id(hidden), $expr1(hidden)], stream_key: [t._row_id, $expr1], pk_columns: [t._row_id, $expr1], pk_conflict: NoCheck } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: AND ($expr1 = v.count), output: [t.a, v.count, t._row_id, $expr1] } ├─StreamExchange { dist: Single } │ └─StreamProject { exprs: [t.a, t.a::Int64 as $expr1, t._row_id] } │ └─StreamTableScan { table: t, columns: [t.a, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) } @@ -222,21 +222,21 @@ └─StreamTableScan { table: v, columns: [v.count], stream_scan_type: UpstreamOnly, stream_key: [], pk: [], dist: Single } - name: index selection for temporal join (with one index). sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx on version (a2, b2); select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2 and b1 = b2; stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx.a2 AND stream.b1 = idx.b2, output: [stream.id1, stream.a1, idx.id2, idx.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx.a2) } └─StreamTableScan { table: idx, columns: [idx.id2, idx.a2, idx.b2], stream_scan_type: UpstreamOnly, stream_key: [idx.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx.a2) } - name: index selection for temporal join (with two indexes) and should choose the index with a longer prefix.. sql: | - create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int); create table version(id2 int, a2 int, b2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); @@ -244,14 +244,14 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden)], stream_key: [stream._row_id, id2, a1, stream.b1], pk_columns: [stream._row_id, id2, a1, stream.b1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.a1, idx2.id2, stream._row_id, stream.b1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.a1 = idx2.a2 AND stream.b1 = idx2.b2, output: [stream.id1, stream.a1, idx2.id2, idx2.a2, stream._row_id, stream.b1] } ├─StreamExchange { dist: HashShard(stream.a1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(idx2.a2) } └─StreamTableScan { table: idx2, columns: [idx2.id2, idx2.a2, idx2.b2], stream_scan_type: UpstreamOnly, stream_key: [idx2.id2], pk: [a2, b2, id2], dist: UpstreamHashShard(idx2.a2) } - name: index selection for temporal join (with three indexes) and should choose primary table. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (b2); @@ -260,14 +260,14 @@ stream_plan: |- StreamMaterialize { columns: [id1, a1, id2, a2, stream._row_id(hidden), stream.b1(hidden), stream.c1(hidden)], stream_key: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_columns: [stream._row_id, id1, a1, stream.b1, stream.c1], pk_conflict: NoCheck } └─StreamExchange { dist: HashShard(stream.id1, stream.a1, stream._row_id, stream.b1, stream.c1) } - └─StreamTemporalJoin { type: LeftOuter, append_only: true, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } + └─StreamTemporalJoin { type: LeftOuter, append_only: false, predicate: stream.id1 = version.id2 AND (stream.a1 = version.a2) AND (stream.b1 = version.b2) AND (stream.c1 = version.c2), output: [stream.id1, stream.a1, version.id2, version.a2, stream._row_id, stream.b1, stream.c1] } ├─StreamExchange { dist: HashShard(stream.id1) } │ └─StreamTableScan { table: stream, columns: [stream.id1, stream.a1, stream.b1, stream.c1, stream._row_id], stream_scan_type: ArrangementBackfill, stream_key: [stream._row_id], pk: [_row_id], dist: UpstreamHashShard(stream._row_id) } └─StreamExchange [no_shuffle] { dist: UpstreamHashShard(version.id2) } └─StreamTableScan { table: version, columns: [version.id2, version.a2, version.b2, version.c2], stream_scan_type: UpstreamOnly, stream_key: [version.id2], pk: [id2], dist: UpstreamHashShard(version.id2) } - name: index selection for temporal join (two index) and no one matches. sql: | - create table stream(id1 int, a1 int, b1 int, c1 int) APPEND ONLY; + create table stream(id1 int, a1 int, b1 int, c1 int); create table version(id2 int, a2 int, b2 int, c2 int, primary key (id2)); create index idx1 on version (a2); create index idx2 on version (a2, b2); From 0c4b0c1ea1b082bfd4c5a17ece50a39a89114278 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Apr 2024 13:13:22 +0800 Subject: [PATCH 08/14] fmt --- src/stream/src/executor/temporal_join.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 79cbbba95e6d3..35ba70d7eb940 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -531,9 +531,12 @@ mod phase1 { memo_table.insert(right_row.clone().chain( left_row.project(memo_table_lookup_prefix).into_owned_row(), )); - if let Some(chunk) = - E::append_matched_row(Op::Insert, &mut builder, left_row, right_row) - { + if let Some(chunk) = E::append_matched_row( + Op::Insert, + &mut builder, + left_row, + right_row, + ) { yield chunk; } } From 5636d34f1afade91716cb7eaf28aa44c8a21aede Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Apr 2024 14:10:27 +0800 Subject: [PATCH 09/14] refactor --- src/stream/src/executor/temporal_join.rs | 112 ++++++++++------------- 1 file changed, 47 insertions(+), 65 deletions(-) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 35ba70d7eb940..f1df78c706389 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -452,23 +452,37 @@ mod phase1 { null_matched: &'a K::Bitmap, chunk: StreamChunk, ) { - // Append-only temporal join - if A { - let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); - let keys = K::build_many(left_join_keys, chunk.data_chunk()); - let to_fetch_keys = chunk - .visibility() - .iter() - .zip_eq_debug(keys.iter()) - .filter_map(|(vis, key)| if vis { Some(key) } else { None }); - right_table - .fetch_or_promote_keys(to_fetch_keys, epoch) - .await?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - let mut matched = false; + let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); + let keys = K::build_many(left_join_keys, chunk.data_chunk()); + let memo_table = memo_table.as_mut().unwrap(); + let to_fetch_keys = chunk + .visibility() + .iter() + .zip_eq_debug(keys.iter()) + .zip_eq_debug(chunk.ops()) + .filter_map(|((vis, key), op)| { + if vis { + match op { + Op::Insert | Op::UpdateInsert => Some(key), + Op::Delete | Op::UpdateDelete => None, + } + } else { + None + } + }); + right_table + .fetch_or_promote_keys(to_fetch_keys, epoch) + .await?; + + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; + + let mut matched = false; + + if A { + // Append-only temporal join if key.null_bitmap().is_subset(null_matched) && let join_entry = right_table.force_peek(&key) && !join_entry.is_empty() @@ -482,44 +496,10 @@ mod phase1 { } } } - if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { - yield chunk; - } - } - if let Some(chunk) = builder.take() { - yield chunk; - } - } else { - // Non-append-only temporal join - let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); - let keys = K::build_many(left_join_keys, chunk.data_chunk()); - let memo_table = memo_table.as_mut().unwrap(); - let to_fetch_keys = chunk - .visibility() - .iter() - .zip_eq_debug(keys.iter()) - .zip_eq_debug(chunk.ops()) - .filter_map(|((vis, key), op)| { - if vis { - match op { - Op::Insert | Op::UpdateInsert => Some(key), - Op::Delete | Op::UpdateDelete => None, - } - } else { - None - } - }); - right_table - .fetch_or_promote_keys(to_fetch_keys, epoch) - .await?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - + } else { + // Non-append-only temporal join match op { Op::Insert | Op::UpdateInsert => { - let mut matched = false; if key.null_bitmap().is_subset(null_matched) && let join_entry = right_table.force_peek(&key) && !join_entry.is_empty() @@ -541,15 +521,9 @@ mod phase1 { } } } - if let Some(chunk) = - E::match_end(&mut builder, Op::Insert, left_row, right_size, matched) - { - yield chunk; - } } Op::Delete | Op::UpdateDelete => { let mut memo_rows_to_delete = vec![]; - let mut matched = false; if key.null_bitmap().is_subset(null_matched) { let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); @@ -577,18 +551,26 @@ mod phase1 { // Delete from memo table memo_table.delete(memo_row); } - if let Some(chunk) = - E::match_end(&mut builder, Op::Delete, left_row, right_size, matched) - { - yield chunk; - } } } } - if let Some(chunk) = builder.take() { + if let Some(chunk) = E::match_end( + &mut builder, + match op { + Op::Insert | Op::UpdateInsert => Op::Insert, + Op::Delete | Op::UpdateDelete => Op::Delete, + }, + left_row, + right_size, + matched, + ) { yield chunk; } } + + if let Some(chunk) = builder.take() { + yield chunk; + } } } From 5d23f62040549937f28a9b9aed4535a0ef4f7efd Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Apr 2024 14:17:23 +0800 Subject: [PATCH 10/14] fix --- src/stream/src/executor/temporal_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index f1df78c706389..f0b71819cfb76 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -454,7 +454,6 @@ mod phase1 { ) { let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); let keys = K::build_many(left_join_keys, chunk.data_chunk()); - let memo_table = memo_table.as_mut().unwrap(); let to_fetch_keys = chunk .visibility() .iter() @@ -498,6 +497,7 @@ mod phase1 { } } else { // Non-append-only temporal join + let memo_table = memo_table.as_mut().unwrap(); match op { Op::Insert | Op::UpdateInsert => { if key.null_bitmap().is_subset(null_matched) From a5aefda95b3677b60155f739551b08adfba2ddd9 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Mon, 15 Apr 2024 15:18:27 +0800 Subject: [PATCH 11/14] add comments --- .../plan_node/stream_temporal_join.rs | 4 ++-- src/stream/src/executor/temporal_join.rs | 23 ++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index fbf1f42861552..0367d12f6a25a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -100,9 +100,9 @@ impl StreamTemporalJoin { /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) /// /// Write pattern: - /// for each output row (with insert op), construct the output table pk and insert the row. + /// for each left input row (with insert op), construct the memo table pk and insert the row into the memo table. /// Read pattern: - /// for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows. + /// for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table. pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog { let left_eq_indexes = self.eq_join_predicate.left_eq_indexes(); let read_prefix_len_hint = left_eq_indexes.len() + self.left().stream_key().unwrap().len(); diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index f0b71819cfb76..ac9b2cc251865 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -461,9 +461,14 @@ mod phase1 { .zip_eq_debug(chunk.ops()) .filter_map(|((vis, key), op)| { if vis { - match op { - Op::Insert | Op::UpdateInsert => Some(key), - Op::Delete | Op::UpdateDelete => None, + if A { + assert_eq!(*op, Op::Insert); + Some(key) + } else { + match op { + Op::Insert | Op::UpdateInsert => Some(key), + Op::Delete | Op::UpdateDelete => None, + } } } else { None @@ -497,6 +502,18 @@ mod phase1 { } } else { // Non-append-only temporal join + // The memo-table pk and columns: + // (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) + // + // Write pattern: + // for each left input row (with insert op), construct the memo table pk and insert the row into the memo table. + // Read pattern: + // for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table. + // + // Temporal join supports inner join and left outer join, additionally, it could contain other conditions. + // Surprisingly, we could handle them in a unified way with memo table. + // The memo table would persist rows fetched from the right table and appending the `join_key` and `left_pk` from the left row. + // The null rows generated by outer join and the other condition somehow is a stateless operation which means we can handle them without the memo table. let memo_table = memo_table.as_mut().unwrap(); match op { Op::Insert | Op::UpdateInsert => { From 3f09ccf1ad3c5cddc35b978f7772b85bd2162752 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Wed, 17 Apr 2024 15:53:46 +0800 Subject: [PATCH 12/14] fix and add more tests --- .../temporal_join_with_index2.slt | 44 +++++++++++++++++++ .../temporal_join_with_index3.slt | 44 +++++++++++++++++++ .../plan_node/stream_temporal_join.rs | 9 +++- 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt create mode 100644 e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt new file mode 100644 index 0000000000000..c560c135f07cf --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index2.slt @@ -0,0 +1,44 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create index idx on version(a2, b2) distributed by (a2); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2; + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +query IIII rowsort +select * from v; +---- +1 11 1 11 + +statement ok +update stream set a1 = 22 where id1 = 1; + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + + diff --git a/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt new file mode 100644 index 0000000000000..d9c52e2db8a3a --- /dev/null +++ b/e2e_test/streaming/temporal_join/non_append_only/temporal_join_with_index3.slt @@ -0,0 +1,44 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int); + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create index idx on version(a2, b2) distributed by (a2, b2); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on b1 = b2 and a1 = a2; + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 11, 111); + +query IIII rowsort +select * from v; +---- +1 11 1 11 + +statement ok +update stream set a1 = 22 where id1 = 1; + +query IIII rowsort +select * from v; +---- +1 22 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + + diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 0367d12f6a25a..58d3220c52b63 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -101,6 +101,7 @@ impl StreamTemporalJoin { /// /// Write pattern: /// for each left input row (with insert op), construct the memo table pk and insert the row into the memo table. + /// /// Read pattern: /// for each left input row (with delete op), construct pk prefix (`join_key` + `left_pk`) to fetch rows and delete them from the memo table. pub fn infer_memo_table_catalog(&self, right_scan: &StreamTableScan) -> TableCatalog { @@ -132,8 +133,14 @@ impl StreamTemporalJoin { internal_table_catalog_builder.add_order_column(*idx, OrderType::ascending()) }); + let dist_key_len = right_scan + .core() + .distribution_key() + .map(|keys| keys.len()) + .unwrap_or(0); + let internal_table_dist_keys = - (right_scan_schema.len()..(right_scan_schema.len() + left_eq_indexes.len())).collect(); + (right_scan_schema.len()..(right_scan_schema.len() + dist_key_len)).collect(); internal_table_catalog_builder.build(internal_table_dist_keys, read_prefix_len_hint) } } From 6da30e7eb3ecff788b843f45abfd8fb416bbb028 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 18 Apr 2024 16:34:46 +0800 Subject: [PATCH 13/14] refine --- proto/stream_plan.proto | 2 +- src/stream/src/executor/temporal_join.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index e194396a0a024..89c0521378972 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -436,7 +436,7 @@ message TemporalJoinNode { // The output indices of the lookup side table repeated uint32 table_output_indices = 8; // The state table used for non-append-only temporal join. - catalog.Table memo_table = 9; + optional catalog.Table memo_table = 9; } message DynamicFilterNode { diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index ac9b2cc251865..f365706317a90 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -56,7 +56,7 @@ pub struct TemporalJoinExecutor< K: HashKey, S: StateStore, const T: JoinTypePrimitive, - const A: bool, + const APPEND_ONLY: bool, > { ctx: ActorContextRef, #[allow(dead_code)] @@ -439,7 +439,7 @@ mod phase1 { K: HashKey, S: StateStore, E: Phase1Evaluation, - const A: bool, + const APPEND_ONLY: bool, >( chunk_size: usize, right_size: usize, @@ -461,7 +461,7 @@ mod phase1 { .zip_eq_debug(chunk.ops()) .filter_map(|((vis, key), op)| { if vis { - if A { + if APPEND_ONLY { assert_eq!(*op, Op::Insert); Some(key) } else { @@ -485,7 +485,7 @@ mod phase1 { let mut matched = false; - if A { + if APPEND_ONLY { // Append-only temporal join if key.null_bitmap().is_subset(null_matched) && let join_entry = right_table.force_peek(&key) From 673f3fe4ee9675f791e02009599dd27a03a95dd3 Mon Sep 17 00:00:00 2001 From: Dylan Chen Date: Thu, 18 Apr 2024 16:57:32 +0800 Subject: [PATCH 14/14] add notive for non-append-only temporal join --- .../plan_node/stream_temporal_join.rs | 4 ++++ .../plan_visitor/temporal_join_validator.rs | 19 ++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 58d3220c52b63..ce8753b9ddbc8 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -96,6 +96,10 @@ impl StreamTemporalJoin { &self.eq_join_predicate } + pub fn append_only(&self) -> bool { + self.append_only + } + /// Return memo-table catalog and its `pk_indices`. /// (`join_key` + `left_pk` + `right_pk`) -> (`right_scan_schema` + `join_key` + `left_pk`) /// diff --git a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs index 2f8d6b3fc89b2..cbdd2c695ad83 100644 --- a/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs +++ b/src/frontend/src/optimizer/plan_visitor/temporal_join_validator.rs @@ -15,6 +15,7 @@ use risingwave_sqlparser::ast::AsOf; use super::{DefaultBehavior, Merge}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ BatchSeqScan, LogicalScan, PlanTreeNodeBinary, StreamTableScan, StreamTemporalJoin, }; @@ -22,12 +23,21 @@ use crate::optimizer::plan_visitor::PlanVisitor; use crate::PlanRef; #[derive(Debug, Clone, Default)] -pub struct TemporalJoinValidator {} +pub struct TemporalJoinValidator { + found_non_append_only_temporal_join: bool, +} impl TemporalJoinValidator { pub fn exist_dangling_temporal_scan(plan: PlanRef) -> bool { - let mut decider = TemporalJoinValidator {}; - decider.visit(plan) + let mut decider = TemporalJoinValidator { + found_non_append_only_temporal_join: false, + }; + let ctx = plan.ctx(); + let has_dangling_temporal_scan = decider.visit(plan); + if decider.found_non_append_only_temporal_join { + ctx.session_ctx().notice_to_user("A non-append-only temporal join is used in the query. It would introduce a additional memo-table comparing to append-only temporal join."); + } + has_dangling_temporal_scan } } @@ -53,6 +63,9 @@ impl PlanVisitor for TemporalJoinValidator { } fn visit_stream_temporal_join(&mut self, stream_temporal_join: &StreamTemporalJoin) -> bool { + if !stream_temporal_join.append_only() { + self.found_non_append_only_temporal_join = true; + } self.visit(stream_temporal_join.left()) } }