From 46ee5674401b27ad4a725981f860ff360137faa8 Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Tue, 16 Jul 2024 15:32:20 -0500 Subject: [PATCH] feat(streaming): introduce nested loop temporal join executor (#16445) --- src/stream/src/executor/mod.rs | 3 +- .../src/executor/nested_loop_temporal_join.rs | 276 ++++++++++++++++++ src/stream/src/executor/temporal_join.rs | 55 ++-- 3 files changed, 308 insertions(+), 26 deletions(-) create mode 100644 src/stream/src/executor/nested_loop_temporal_join.rs diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index c111bf4ee99d..dc63e62b1d58 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -77,6 +77,7 @@ mod lookup; mod lookup_union; mod merge; mod mview; +mod nested_loop_temporal_join; mod no_op; mod now; mod over_window; @@ -142,7 +143,7 @@ pub use simple_agg::SimpleAggExecutor; pub use sink::SinkExecutor; pub use sort::*; pub use stateless_simple_agg::StatelessSimpleAggExecutor; -pub use temporal_join::*; +pub use temporal_join::TemporalJoinExecutor; pub use top_n::{ AppendOnlyGroupTopNExecutor, AppendOnlyTopNExecutor, GroupTopNExecutor, TopNExecutor, }; diff --git a/src/stream/src/executor/nested_loop_temporal_join.rs b/src/stream/src/executor/nested_loop_temporal_join.rs new file mode 100644 index 000000000000..0888d8981fc8 --- /dev/null +++ b/src/stream/src/executor/nested_loop_temporal_join.rs @@ -0,0 +1,276 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use futures::StreamExt; +use futures_async_stream::try_stream; +use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; +use risingwave_common::array::StreamChunk; +use risingwave_common::bitmap::BitmapBuilder; +use risingwave_common::types::DataType; +use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_expr::expr::NonStrictExpression; +use risingwave_hummock_sdk::{HummockEpoch, HummockReadEpoch}; +use risingwave_storage::store::PrefetchOptions; +use risingwave_storage::table::batch_table::storage_table::StorageTable; +use risingwave_storage::StateStore; + +use super::join::{JoinType, JoinTypePrimitive}; +use super::temporal_join::{align_input, apply_indices_map, phase1, InternalMessage}; +use super::{Execute, ExecutorInfo, Message, StreamExecutorError}; +use crate::common::metrics::MetricsInfo; +use crate::executor::join::builder::JoinStreamChunkBuilder; +use crate::executor::monitor::StreamingMetrics; +use crate::executor::{ActorContextRef, Executor}; + +pub struct NestedLoopTemporalJoinExecutor { + ctx: ActorContextRef, + #[allow(dead_code)] + info: ExecutorInfo, + left: Executor, + right: Executor, + right_table: TemporalSide, + condition: Option, + output_indices: Vec, + chunk_size: usize, + // TODO: update metrics + #[allow(dead_code)] + metrics: Arc, +} + +struct TemporalSide { + source: StorageTable, +} + +impl TemporalSide {} + +#[try_stream(ok = StreamChunk, error = StreamExecutorError)] +#[allow(clippy::too_many_arguments)] +async fn phase1_handle_chunk( + chunk_size: usize, + right_size: usize, + full_schema: Vec, + epoch: HummockEpoch, + right_table: &mut TemporalSide, + chunk: StreamChunk, +) { + let mut builder = StreamChunkBuilder::new(chunk_size, full_schema); + + for (op, left_row) in chunk.rows() { + let mut matched = false; + #[for_await] + for keyed_row in right_table + .source + .batch_iter( + HummockReadEpoch::NoWait(epoch), + false, + PrefetchOptions::prefetch_for_large_range_scan(), + ) + .await? + { + let keyed_row = keyed_row?; + let right_row = keyed_row.row(); + matched = true; + if let Some(chunk) = E::append_matched_row(op, &mut builder, left_row, right_row) { + yield chunk; + } + } + if let Some(chunk) = E::match_end(&mut builder, op, left_row, right_size, matched) { + yield chunk; + } + } + if let Some(chunk) = builder.take() { + yield chunk; + } +} + +impl NestedLoopTemporalJoinExecutor { + #[allow(clippy::too_many_arguments)] + #[expect(dead_code)] + pub fn new( + ctx: ActorContextRef, + info: ExecutorInfo, + left: Executor, + right: Executor, + table: StorageTable, + condition: Option, + output_indices: Vec, + metrics: Arc, + chunk_size: usize, + ) -> Self { + let _metrics_info = MetricsInfo::new( + metrics.clone(), + table.table_id().table_id, + ctx.id, + "nested loop temporal join", + ); + + Self { + ctx: ctx.clone(), + info, + left, + right, + right_table: TemporalSide { source: table }, + condition, + output_indices, + chunk_size, + metrics, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn into_stream(mut self) { + let right_size = self.right.schema().len(); + + let (left_map, _right_map) = JoinStreamChunkBuilder::get_i2o_mapping( + &self.output_indices, + self.left.schema().len(), + right_size, + ); + + let left_to_output: HashMap = HashMap::from_iter(left_map.iter().cloned()); + + let mut prev_epoch = None; + + let full_schema: Vec<_> = self + .left + .schema() + .data_types() + .into_iter() + .chain(self.right.schema().data_types().into_iter()) + .collect(); + + #[for_await] + for msg in align_input::(self.left, self.right) { + match msg? { + InternalMessage::WaterMark(watermark) => { + let output_watermark_col_idx = *left_to_output.get(&watermark.col_idx).unwrap(); + yield Message::Watermark(watermark.with_idx(output_watermark_col_idx)); + } + InternalMessage::Chunk(chunk) => { + let epoch = prev_epoch.expect("Chunk data should come after some barrier."); + + let full_schema = full_schema.clone(); + + if T == JoinType::Inner { + let st1 = phase1_handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &mut self.right_table, + chunk, + ); + #[for_await] + for chunk in st1 { + let chunk = chunk?; + let new_chunk = if let Some(ref cond) = self.condition { + let (data_chunk, ops) = chunk.into_parts(); + let passed_bitmap = cond.eval_infallible(&data_chunk).await; + let passed_bitmap = + Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap(); + let (columns, vis) = data_chunk.into_parts(); + let new_vis = vis & passed_bitmap; + StreamChunk::with_visibility(ops, columns, new_vis) + } else { + chunk + }; + let new_chunk = apply_indices_map(new_chunk, &self.output_indices); + yield Message::Chunk(new_chunk); + } + } else if let Some(ref cond) = self.condition { + // Joined result without evaluating non-lookup conditions. + let st1 = phase1_handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &mut self.right_table, + chunk, + ); + let mut matched_count = 0usize; + #[for_await] + for chunk in st1 { + let chunk = chunk?; + let (data_chunk, ops) = chunk.into_parts(); + let passed_bitmap = cond.eval_infallible(&data_chunk).await; + let passed_bitmap = + Arc::unwrap_or_clone(passed_bitmap).into_bool().to_bitmap(); + let (columns, vis) = data_chunk.into_parts(); + let mut new_vis = BitmapBuilder::with_capacity(vis.len()); + for (passed, not_match_end) in + passed_bitmap.iter().zip_eq_debug(vis.iter()) + { + let is_match_end = !not_match_end; + let vis = if is_match_end && matched_count == 0 { + // Nothing is matched, so the marker row should be visible. + true + } else if is_match_end { + // reset the count + matched_count = 0; + // rows found, so the marker row should be invisible. + false + } else { + if passed { + matched_count += 1; + } + passed + }; + new_vis.append(vis); + } + let new_chunk = apply_indices_map( + StreamChunk::with_visibility(ops, columns, new_vis.finish()), + &self.output_indices, + ); + yield Message::Chunk(new_chunk); + } + // The last row should always be marker row, + assert_eq!(matched_count, 0); + } else { + let st1 = phase1_handle_chunk::( + self.chunk_size, + right_size, + full_schema, + epoch, + &mut self.right_table, + chunk, + ); + #[for_await] + for chunk in st1 { + let chunk = chunk?; + let new_chunk = apply_indices_map(chunk, &self.output_indices); + yield Message::Chunk(new_chunk); + } + } + } + InternalMessage::Barrier(chunk, barrier) => { + assert!(chunk.is_empty()); + if let Some(vnodes) = barrier.as_update_vnode_bitmap(self.ctx.id) { + let _vnodes = self.right_table.source.update_vnode_bitmap(vnodes.clone()); + } + prev_epoch = Some(barrier.epoch.curr); + yield Message::Barrier(barrier) + } + } + } + } +} + +impl Execute for NestedLoopTemporalJoinExecutor { + fn execute(self: Box) -> super::BoxedMessageStream { + self.into_stream().boxed() + } +} diff --git a/src/stream/src/executor/temporal_join.rs b/src/stream/src/executor/temporal_join.rs index db9f8c850ce6..8a994acb5e8d 100644 --- a/src/stream/src/executor/temporal_join.rs +++ b/src/stream/src/executor/temporal_join.rs @@ -200,7 +200,7 @@ impl TemporalSide { } } -enum InternalMessage { +pub(super) enum InternalMessage { Chunk(StreamChunk), Barrier(Vec, Barrier), WaterMark(Watermark), @@ -245,7 +245,7 @@ async fn internal_messages_until_barrier(stream: impl MessageStream, expected_ba // any number of `InternalMessage::Chunk(left_chunk)` and followed by // `InternalMessage::Barrier(right_chunks, barrier)`. #[try_stream(ok = InternalMessage, error = StreamExecutorError)] -async fn align_input(left: Executor, right: Executor) { +pub(super) async fn align_input(left: Executor, right: Executor) { let mut left = pin!(left.execute()); let mut right = pin!(right.execute()); // Keep producing intervals until stream exhaustion or errors. @@ -260,12 +260,18 @@ async fn align_input(left: Executor, right: Executor) { ); match combined.next().await { Some(Either::Left(Ok(Message::Chunk(c)))) => yield InternalMessage::Chunk(c), - Some(Either::Right(Ok(Message::Chunk(c)))) => right_chunks.push(c), + Some(Either::Right(Ok(Message::Chunk(c)))) => { + if YIELD_RIGHT_CHUNKS { + right_chunks.push(c); + } + } Some(Either::Left(Ok(Message::Barrier(b)))) => { let mut remain = chunks_until_barrier(right.by_ref(), b.clone()) .try_collect() .await?; - right_chunks.append(&mut remain); + if YIELD_RIGHT_CHUNKS { + right_chunks.append(&mut remain); + } yield InternalMessage::Barrier(right_chunks, b); break 'inner; } @@ -292,7 +298,18 @@ async fn align_input(left: Executor, right: Executor) { } } -mod phase1 { +pub(super) fn apply_indices_map(chunk: StreamChunk, indices: &[usize]) -> StreamChunk { + let (data_chunk, ops) = chunk.into_parts(); + let (columns, vis) = data_chunk.into_parts(); + let output_columns = indices + .iter() + .cloned() + .map(|idx| columns[idx].clone()) + .collect(); + StreamChunk::with_visibility(ops, output_columns, vis) +} + +pub(super) mod phase1 { use std::ops::Bound; use futures::{pin_mut, StreamExt}; @@ -310,7 +327,7 @@ mod phase1 { use crate::common::table::state_table::StateTable; use crate::executor::monitor::TemporalJoinMetrics; - pub(super) trait Phase1Evaluation { + pub trait Phase1Evaluation { /// Called when a matched row is found. #[must_use = "consume chunk if produced"] fn append_matched_row( @@ -331,9 +348,9 @@ mod phase1 { ) -> Option; } - pub(super) struct Inner; - pub(super) struct LeftOuter; - pub(super) struct LeftOuterWithCond; + pub struct Inner; + pub struct LeftOuter; + pub struct LeftOuterWithCond; impl Phase1Evaluation for Inner { fn append_matched_row( @@ -635,17 +652,6 @@ impl StreamChunk { - let (data_chunk, ops) = chunk.into_parts(); - let (columns, vis) = data_chunk.into_parts(); - let output_columns = indices - .iter() - .cloned() - .map(|idx| columns[idx].clone()) - .collect(); - StreamChunk::with_visibility(ops, output_columns, vis) - } - #[try_stream(ok = Message, error = StreamExecutorError)] async fn into_stream(mut self) { let right_size = self.right.schema().len(); @@ -682,7 +688,7 @@ impl(self.left, self.right) { self.right_table.cache.evict(); self.metrics .temporal_join_cached_entry_count @@ -725,8 +731,7 @@ impl