diff --git a/e2e_test/streaming/temporal_join/issue_15257.slt b/e2e_test/streaming/temporal_join/issue_15257.slt new file mode 100644 index 0000000000000..b376e8c68620e --- /dev/null +++ b/e2e_test/streaming/temporal_join/issue_15257.slt @@ -0,0 +1,31 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2 + +statement ok +insert into version values(1, 11, 111); + +statement ok +insert into stream values(1, 10, 111); + +query IIII rowsort +select * from v; +---- +1 10 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt b/e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt new file mode 100644 index 0000000000000..6cdeb901c39cf --- /dev/null +++ b/e2e_test/streaming/temporal_join/temporal_join_multiple_rows.slt @@ -0,0 +1,109 @@ +# The suite tests the cases that multiple rows are matched. + +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table stream(a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(a2 int, b2 int); + +statement ok +create index idx on version (a2); + +statement ok +create materialized view v as +select a1, a2 from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() on a1 = a2; + +statement ok +insert into stream values + (1,1) +,(2,1) +; + + +statement ok +insert into version values + (1,1) +,(1,2) +,(1,3) +; + +statement ok +insert into stream values + (1,1) +,(2,1) +; + +query II rowsort +select a1, a2 from v; +---- +1 1 +1 1 +1 1 +1 NULL +2 NULL +2 NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; + +# Test non equal conditions + +statement ok +create table stream(a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(a2 int, b2 int); + +statement ok +create index idx on version (a2); + +statement ok +create materialized view v as +select a1, a2, b2 +from stream left join version FOR SYSTEM_TIME AS OF PROCTIME() + on a1 = a2 and b1 > b2; + +statement ok +insert into version values + (1,1) +,(1,2) +,(1,3) +; + +statement ok +insert into stream values + (1,0) +,(1,3) +,(1,6) +,(2,1) +; + + +query III rowsort +select a1, a2, b2 from v; +---- +1 1 1 +1 1 1 +1 1 2 +1 1 2 +1 1 3 +1 NULL NULL +2 NULL NULL + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt b/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt index cf206bbea0a22..1a29d83615ec3 100644 --- a/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt +++ b/e2e_test/streaming/temporal_join/temporal_join_non_loopup_cond.slt @@ -49,3 +49,32 @@ drop table stream; statement ok drop table version; + +statement ok +create table stream(id1 int, a1 int, b1 int) APPEND ONLY; + +statement ok +create table version(id2 int, a2 int, b2 int, primary key (id2)); + +statement ok +create materialized view v as select id1, a1, id2, a2 from stream join version FOR SYSTEM_TIME AS OF PROCTIME() on id1 = id2 and a1 > a2; + +statement ok +insert into version values (1, 12, 111); + +statement ok +insert into stream values (1, 11, 111), (1, 13, 111); + +query IIII rowsort +select * from v; +---- +1 13 1 12 + +statement ok +drop materialized view v; + +statement ok +drop table stream; + +statement ok +drop table version; diff --git a/src/common/src/array/stream_chunk_builder.rs b/src/common/src/array/stream_chunk_builder.rs index 3c1391af3fa44..2d70c925d696f 100644 --- a/src/common/src/array/stream_chunk_builder.rs +++ b/src/common/src/array/stream_chunk_builder.rs @@ -14,6 +14,7 @@ use crate::array::stream_record::Record; use crate::array::{ArrayBuilderImpl, Op, StreamChunk}; +use crate::buffer::BitmapBuilder; use crate::row::Row; use crate::types::{DataType, DatumRef}; use crate::util::iter_util::ZipEqFast; @@ -26,6 +27,9 @@ pub struct StreamChunkBuilder { /// arrays in the data chunk to build column_builders: Vec, + /// Visibility + vis_builder: BitmapBuilder, + /// Data types of columns data_types: Vec, @@ -61,6 +65,7 @@ impl StreamChunkBuilder { ops, column_builders, data_types, + vis_builder: BitmapBuilder::default(), capacity: chunk_size, size: 0, } @@ -93,18 +98,34 @@ impl StreamChunkBuilder { &mut self, op: Op, iter: impl IntoIterator)>, + ) -> Option { + self.append_iter_inner::(op, iter) + } + + #[must_use] + fn append_iter_inner<'a, const VIS: bool>( + &mut self, + op: Op, + iter: impl IntoIterator)>, ) -> Option { self.ops.push(op); for (i, datum) in iter { self.column_builders[i].append(datum); } + self.vis_builder.append(VIS); self.inc_size() } /// Append a row to the builder, return a chunk if the builder is full. #[must_use] pub fn append_row(&mut self, op: Op, row: impl Row) -> Option { - self.append_iter(op, row.iter().enumerate()) + self.append_iter_inner::(op, row.iter().enumerate()) + } + + /// Append an invisible row to the builder, return a chunk if the builder is full. + #[must_use] + pub fn append_row_invisible(&mut self, op: Op, row: impl Row) -> Option { + self.append_iter_inner::(op, row.iter().enumerate()) } /// Append a record to the builder, return a chunk if the builder is full. @@ -138,9 +159,12 @@ impl StreamChunkBuilder { .map(Into::into) .collect::>(); - Some(StreamChunk::new( + let vis = std::mem::take(&mut self.vis_builder).finish(); + + Some(StreamChunk::with_visibility( std::mem::replace(&mut self.ops, Vec::with_capacity(self.capacity)), new_columns, + vis, )) } } diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index 42a8e7d81a04f..fca3a931acbb8 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -24,13 +24,12 @@ use futures::{pin_mut, StreamExt, TryStreamExt}; use futures_async_stream::{for_await, try_stream}; use local_stats_alloc::{SharedStatsAlloc, StatsAlloc}; use lru::DefaultHasher; -use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; -use risingwave_common::array::{ArrayImpl, Op, StreamChunk}; +use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::BitmapBuilder; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::hash::{HashKey, NullBitmap}; use risingwave_common::row::{OwnedRow, Row, RowExt}; -use risingwave_common::types::{DataType, DatumRef}; +use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::NonStrictExpression; use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; @@ -310,6 +309,170 @@ async fn align_input(left: Executor, right: Executor) { } } +mod phase1 { + use futures_async_stream::try_stream; + use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; + use risingwave_common::array::{Op, StreamChunk}; + use risingwave_common::hash::{HashKey, NullBitmap}; + use risingwave_common::row::{self, Row, RowExt}; + use risingwave_common::types::{DataType, DatumRef}; + use risingwave_common::util::iter_util::ZipEqDebug; + use risingwave_hummock_sdk::HummockEpoch; + use risingwave_storage::StateStore; + + use super::{StreamExecutorError, TemporalSide}; + + pub(super) trait Phase1Evaluation { + /// Called when a matched row is found. + #[must_use = "consume chunk if produced"] + fn append_matched_row( + op: Op, + builder: &mut StreamChunkBuilder, + left_row: impl Row, + right_row: impl Row, + ) -> Option; + + /// Called when all matched rows of a join key are appended. + #[must_use = "consume chunk if produced"] + fn match_end( + builder: &mut StreamChunkBuilder, + op: Op, + left_row: impl Row, + right_size: usize, + matched: bool, + ) -> Option; + } + + pub(super) struct Inner; + pub(super) struct LeftOuter; + pub(super) struct LeftOuterWithCond; + + impl Phase1Evaluation for Inner { + fn append_matched_row( + op: Op, + builder: &mut StreamChunkBuilder, + left_row: impl Row, + right_row: impl Row, + ) -> Option { + builder.append_row(op, left_row.chain(right_row)) + } + + fn match_end( + _builder: &mut StreamChunkBuilder, + _op: Op, + _left_row: impl Row, + _right_size: usize, + _matched: bool, + ) -> Option { + None + } + } + + impl Phase1Evaluation for LeftOuter { + fn append_matched_row( + op: Op, + builder: &mut StreamChunkBuilder, + left_row: impl Row, + right_row: impl Row, + ) -> Option { + builder.append_row(op, left_row.chain(right_row)) + } + + fn match_end( + builder: &mut StreamChunkBuilder, + op: Op, + left_row: impl Row, + right_size: usize, + matched: bool, + ) -> Option { + if !matched { + // If no rows matched, a marker row should be inserted. + builder.append_row( + op, + left_row.chain(row::repeat_n(DatumRef::None, right_size)), + ) + } else { + None + } + } + } + + impl Phase1Evaluation for LeftOuterWithCond { + fn append_matched_row( + op: Op, + builder: &mut StreamChunkBuilder, + left_row: impl Row, + right_row: impl Row, + ) -> Option { + builder.append_row(op, left_row.chain(right_row)) + } + + fn match_end( + builder: &mut StreamChunkBuilder, + op: Op, + left_row: impl Row, + right_size: usize, + _matched: bool, + ) -> Option { + // A marker row should always be inserted and mark as invisible for non-lookup filters evaluation. + // The row will be converted to visible in the further steps if no rows matched after all filters evaluated. + builder.append_row_invisible( + op, + left_row.chain(row::repeat_n(DatumRef::None, right_size)), + ) + } + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + #[allow(clippy::too_many_arguments)] + pub(super) async fn handle_chunk<'a, K: HashKey, S: StateStore, E: Phase1Evaluation>( + chunk_size: usize, + right_size: usize, + full_schema: Vec, + epoch: HummockEpoch, + left_join_keys: &'a [usize], + right_table: &'a mut TemporalSide, + null_matched: &'a K::Bitmap, + chunk: StreamChunk, + ) { + let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); + let keys = K::build(left_join_keys, chunk.data_chunk())?; + let to_fetch_keys = chunk + .visibility() + .iter() + .zip_eq_debug(keys.iter()) + .filter_map(|(vis, key)| if vis { Some(key) } else { None }); + right_table + .fetch_or_promote_keys(to_fetch_keys, epoch) + .await?; + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { + let Some((op, left_row)) = r else { + continue; + }; + let mut matched = false; + if key.null_bitmap().is_subset(null_matched) + && let join_entry = right_table.force_peek(&key) + && !join_entry.is_empty() + { + matched = true; + for right_row in join_entry.cached.values() { + if let Some(chunk) = + E::append_matched_row(op, &mut builder, left_row, right_row) + { + yield chunk; + } + } + } + if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { + yield chunk; + } + } + if let Some(chunk) = builder.take() { + yield chunk; + } + } +} + impl TemporalJoinExecutor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -369,6 +532,17 @@ impl TemporalJoinExecutor } } + fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk { + let (data_chunk, ops) = chunk.into_parts(); + let (columns, vis) = data_chunk.into_parts(); + let output_columns = indices + .iter() + .cloned() + .map(|idx| columns[idx].clone()) + .collect(); + StreamChunk::with_visibility(ops, output_columns, vis) + } + #[try_stream(ok = Message, error = StreamExecutorError)] async fn into_stream(mut self) { let right_size = self.right.schema().len(); @@ -387,6 +561,9 @@ impl TemporalJoinExecutor let mut prev_epoch = None; + let table_id_str = self.right_table.source.table_id().to_string(); + let actor_id_str = self.ctx.id.to_string(); + let fragment_id_str = self.ctx.fragment_id.to_string(); let full_schema: Vec<_> = self .left .schema() @@ -395,9 +572,6 @@ impl TemporalJoinExecutor .chain(self.right.schema().data_types().into_iter()) .collect(); - let table_id_str = self.right_table.source.table_id().to_string(); - let actor_id_str = self.ctx.id.to_string(); - let fragment_id_str = self.ctx.fragment_id.to_string(); #[for_await] for msg in align_input(self.left, self.right) { self.right_table.cache.evict(); @@ -412,117 +586,106 @@ impl TemporalJoinExecutor yield Message::Watermark(watermark.with_idx(output_watermark_col_idx)); } InternalMessage::Chunk(chunk) => { - // Joined result without evaluating non-lookup conditions. - let st1 = { - #[try_stream] - async { - #[allow(unreachable_code)] - #[allow(clippy::diverging_sub_expression)] - if false { - return unreachable!("type hints only") as StreamExecutorResult<_>; - } - let mut builder = - StreamChunkBuilder::new(self.chunk_size, full_schema.clone()); - // The bitmap is aligned with `builder`. The bit is set if the record is matched. - // TODO: Consider adding the bitmap to `builder`. - let mut row_matched_bitmap_builder = - BitmapBuilder::with_capacity(self.chunk_size); - let epoch = - prev_epoch.expect("Chunk data should come after some barrier."); - let keys = K::build(&self.left_join_keys, chunk.data_chunk())?; - let to_fetch_keys = chunk - .visibility() - .iter() - .zip_eq_debug(keys.iter()) - .filter_map(|(vis, key)| if vis { Some(key) } else { None }); - self.right_table - .fetch_or_promote_keys(to_fetch_keys, epoch) - .await?; - for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.into_iter()) { - let Some((op, left_row)) = r else { - continue; - }; - if key.null_bitmap().is_subset(&null_matched) - && let join_entry = self.right_table.force_peek(&key) - && !join_entry.is_empty() - { - for right_row in join_entry.cached.values() { - row_matched_bitmap_builder.append(true); - if let Some(chunk) = - builder.append_row(op, left_row.chain(right_row)) - { - let row_matched = - std::mem::take(&mut row_matched_bitmap_builder) - .finish(); - yield (chunk, row_matched); - } - } - } else if T == JoinType::LeftOuter { - row_matched_bitmap_builder.append(false); - if let Some(chunk) = builder.append_row( - op, - left_row.chain(risingwave_common::row::repeat_n( - DatumRef::None, - right_size, - )), - ) { - let row_matched = - std::mem::take(&mut row_matched_bitmap_builder) - .finish(); - yield (chunk, row_matched); - } - } - } - if let Some(chunk) = builder.take() { - let row_matched = - std::mem::take(&mut row_matched_bitmap_builder).finish(); - yield (chunk, row_matched); - } - } - }; - - #[for_await] - for item in st1 { - let (chunk, row_matched) = item?; - // check non-lookup join conditions - if !row_matched.is_empty() - && let Some(ref cond) = self.condition - { - // All chunks are newly created in the previous phase, so no holes should exist. - assert!(chunk.visibility().all()); - // For non matched row, we shouldn't evaluate on it. - // So we treat `row_matched` as visibility here. - let chunk = chunk.clone_with_vis(row_matched.clone()); - let (data_chunk, ops) = chunk.into_parts(); - let filter = cond.eval_infallible(&data_chunk).await; - let ArrayImpl::Bool(bool_array) = &*filter else { - panic!("unmatched type: filter expr returns a non-null array"); + let epoch = prev_epoch.expect("Chunk data should come after some barrier."); + + let full_schema = full_schema.clone(); + + if T == JoinType::Inner { + let st1 = phase1::handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &self.left_join_keys, + &mut self.right_table, + &null_matched, + chunk, + ); + #[for_await] + for chunk in st1 { + let chunk = chunk?; + let new_chunk = if let Some(ref cond) = self.condition { + let (data_chunk, ops) = chunk.into_parts(); + let passed_bitmap = cond.eval_infallible(&data_chunk).await; + let passed_bitmap = + Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap(); + let (columns, vis) = data_chunk.into_parts(); + let new_vis = vis & passed_bitmap; + StreamChunk::with_visibility(ops, columns, new_vis) + } else { + chunk }; - let new_vis = bool_array.to_bitmap() | (!row_matched); - let (columns, _) = data_chunk.into_parts(); - // apply output indices. - let output_columns = self - .output_indices - .iter() - .cloned() - .map(|idx| columns[idx].clone()) - .collect(); let new_chunk = - StreamChunk::with_visibility(ops, output_columns, new_vis); + Self::apply_indices_map(new_chunk, &self.output_indices); yield Message::Chunk(new_chunk); - } else { + } + } else if let Some(ref cond) = self.condition { + // Joined result without evaluating non-lookup conditions. + let st1 = phase1::handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &self.left_join_keys, + &mut self.right_table, + &null_matched, + chunk, + ); + let mut matched_count = 0usize; + #[for_await] + for chunk in st1 { + let chunk = chunk?; let (data_chunk, ops) = chunk.into_parts(); + let passed_bitmap = cond.eval_infallible(&data_chunk).await; + let passed_bitmap = + Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap(); let (columns, vis) = data_chunk.into_parts(); - // apply output indices. - let output_columns = self - .output_indices - .iter() - .cloned() - .map(|idx| columns[idx].clone()) - .collect(); - let new_chunk = StreamChunk::with_visibility(ops, output_columns, vis); + let mut new_vis = BitmapBuilder::with_capacity(vis.len()); + for (passed, not_match_end) in + passed_bitmap.iter().zip_eq_debug(vis.iter()) + { + let is_match_end = !not_match_end; + let vis = if is_match_end && matched_count == 0 { + // Nothing is matched, so the marker row should be visible. + true + } else if is_match_end { + // reset the count + matched_count = 0; + // rows found, so the marker row should be invisible. + false + } else { + if passed { + matched_count += 1; + } + passed + }; + new_vis.append(vis); + } + let new_chunk = Self::apply_indices_map( + StreamChunk::with_visibility(ops, columns, new_vis.finish()), + &self.output_indices, + ); yield Message::Chunk(new_chunk); - }; + } + // The last row should always be marker row, + assert_eq!(matched_count, 0); + } else { + let st1 = phase1::handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &self.left_join_keys, + &mut self.right_table, + &null_matched, + chunk, + ); + #[for_await] + for chunk in st1 { + let chunk = chunk?; + let new_chunk = Self::apply_indices_map(chunk, &self.output_indices); + yield Message::Chunk(new_chunk); + } } } InternalMessage::Barrier(updates, barrier) => {