From f5f5701ab5f7cfe084190eac4fabfe1d381d0e49 Mon Sep 17 00:00:00 2001 From: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Date: Thu, 8 Aug 2024 18:37:25 +0800 Subject: [PATCH] feat(stream): support row merge (a.k.a keyed merge) (#17930) --- .../two_phase_approx_percentile_merge.slt | 93 +++++++++ proto/stream_plan.proto | 6 + .../tests/testdata/output/agg.yaml | 35 ++-- .../src/optimizer/plan_node/logical_agg.rs | 40 ++-- src/frontend/src/optimizer/plan_node/mod.rs | 8 +- .../optimizer/plan_node/stream_exchange.rs | 1 + ...eam_keyed_merge.rs => stream_row_merge.rs} | 26 ++- src/stream/src/executor/mod.rs | 3 + src/stream/src/executor/row_merge.rs | 189 ++++++++++++++++++ src/stream/src/from_proto/mod.rs | 4 + src/stream/src/from_proto/row_merge.rs | 45 +++++ 11 files changed, 397 insertions(+), 53 deletions(-) create mode 100644 e2e_test/streaming/aggregate/two_phase_approx_percentile_merge.slt rename src/frontend/src/optimizer/plan_node/{stream_keyed_merge.rs => stream_row_merge.rs} (87%) create mode 100644 src/stream/src/executor/row_merge.rs create mode 100644 src/stream/src/from_proto/row_merge.rs diff --git a/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge.slt b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge.slt new file mode 100644 index 000000000000..e168767b7b9b --- /dev/null +++ b/e2e_test/streaming/aggregate/two_phase_approx_percentile_merge.slt @@ -0,0 +1,93 @@ +# Single phase approx percentile +statement ok +create table t(p_col double, grp_col int); + +statement ok +insert into t select a, 1 from generate_series(-1000, 1000) t(a); + +statement ok +flush; + +query I +select + percentile_cont(0.01) within group (order by p_col) as p01, + percentile_cont(0.1) within group (order by p_col) as p10, + percentile_cont(0.5) within group (order by p_col) as p50, + percentile_cont(0.9) within group (order by p_col) as p90, + percentile_cont(0.99) within group (order by p_col) as p99 +from t; +---- +-980 -800 0 800 980 + +statement ok +create materialized view m1 as + select + approx_percentile(0.01, 0.01) within group (order by p_col) as p01, + approx_percentile(0.1, 0.01) within group (order by p_col) as p10, + approx_percentile(0.5, 0.01) within group (order by p_col) as p50, + approx_percentile(0.9, 0.01) within group (order by p_col) as p90, + approx_percentile(0.99, 0.01) within group (order by p_col) as p99 + from t; + +query I +select * from m1; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +# Test state encode / decode +onlyif can-use-recover +statement ok +recover; + +onlyif can-use-recover +sleep 10s + +query I +select * from m1; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +# Test state encode / decode +onlyif can-use-recover +statement ok +recover; + +onlyif can-use-recover +sleep 10s + +query I +select * from m1; +---- +-982.5779489474152 -804.4614206837127 0 804.4614206837127 982.5779489474152 + +# Test 0= 2; - core.input = if needs_keyed_merge { - // If there's keyed merge, we need to share the input. + core.input = if needs_row_merge { + // If there's row merge, we need to share the input. StreamShare::new_from_input(stream_input.clone()).into() } else { stream_input @@ -102,6 +102,12 @@ impl LogicalAgg { self.build_approx_percentile_aggs(core.input.clone(), &approx_percentile_agg_calls)?; // ====== Handle normal aggs + if core.agg_calls.is_empty() { + if let Some(approx_percentile) = approx_percentile { + return Ok(approx_percentile); + }; + bail!("expected at least one agg call"); + } let total_agg_calls = core .agg_calls .iter() @@ -118,14 +124,14 @@ impl LogicalAgg { // ====== Merge approx percentile and normal aggs if let Some(approx_percentile) = approx_percentile { - if needs_keyed_merge { - let keyed_merge = StreamKeyedMerge::new( + if needs_row_merge { + let row_merge = StreamRowMerge::new( approx_percentile, global_agg.into(), approx_percentile_col_mapping, non_approx_percentile_col_mapping, )?; - Ok(keyed_merge.into()) + Ok(row_merge.into()) } else { Ok(approx_percentile) } @@ -345,15 +351,15 @@ impl LogicalAgg { } /// If only 1 approx percentile, just return it. - /// Otherwise build a tree of approx percentile with `KeyedMerge`. + /// Otherwise build a tree of approx percentile with `MergeProject`. /// e.g. /// ApproxPercentile(col1, 0.5) as x, /// ApproxPercentile(col2, 0.5) as y, /// ApproxPercentile(col3, 0.5) as z /// will be built as - /// `KeyedMerge` + /// `MergeProject` /// / \ - /// `KeyedMerge` z + /// `MergeProject` z /// / \ /// x y @@ -374,14 +380,14 @@ impl LogicalAgg { let mut acc = iter.next().unwrap(); for (current_size, plan) in iter.enumerate().map(|(i, p)| (i + 1, p)) { let new_size = current_size + 1; - let keyed_merge = StreamKeyedMerge::new( + let row_merge = StreamRowMerge::new( acc, plan, ColIndexMapping::identity_or_none(current_size, new_size), ColIndexMapping::new(vec![Some(current_size)], new_size), ) - .expect("failed to build keyed merge"); - acc = keyed_merge.into(); + .expect("failed to build row merge"); + acc = row_merge.into(); } Ok(Some(acc)) } @@ -1312,7 +1318,7 @@ impl ToStream for LogicalAgg { .into()); } (plan.clone(), 1) - } else if let Some(stream_keyed_merge) = plan.as_stream_keyed_merge() { + } else if let Some(stream_row_merge) = plan.as_stream_row_merge() { if eowc { return Err(ErrorCode::InvalidInputSyntax( "`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`" @@ -1320,9 +1326,9 @@ impl ToStream for LogicalAgg { ) .into()); } - (plan.clone(), stream_keyed_merge.base.schema().len()) + (plan.clone(), stream_row_merge.base.schema().len()) } else { - panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamKeyedMerge"); + panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamRowMerge"); }; if self.agg_calls().len() == n_final_agg_calls { diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 2cf7e67dd2b6..db1200de2a27 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -898,7 +898,6 @@ mod stream_group_topn; mod stream_hash_agg; mod stream_hash_join; mod stream_hop_window; -mod stream_keyed_merge; mod stream_local_approx_percentile; mod stream_materialize; mod stream_now; @@ -906,6 +905,7 @@ mod stream_over_window; mod stream_project; mod stream_project_set; mod stream_row_id_gen; +mod stream_row_merge; mod stream_simple_agg; mod stream_sink; mod stream_sort; @@ -1010,7 +1010,6 @@ pub use stream_group_topn::StreamGroupTopN; pub use stream_hash_agg::StreamHashAgg; pub use stream_hash_join::StreamHashJoin; pub use stream_hop_window::StreamHopWindow; -pub use stream_keyed_merge::StreamKeyedMerge; pub use stream_local_approx_percentile::StreamLocalApproxPercentile; pub use stream_materialize::StreamMaterialize; pub use stream_now::StreamNow; @@ -1018,6 +1017,7 @@ pub use stream_over_window::StreamOverWindow; pub use stream_project::StreamProject; pub use stream_project_set::StreamProjectSet; pub use stream_row_id_gen::StreamRowIdGen; +pub use stream_row_merge::StreamRowMerge; pub use stream_share::StreamShare; pub use stream_simple_agg::StreamSimpleAgg; pub use stream_sink::{IcebergPartitionInfo, PartitionComputeInfo, StreamSink}; @@ -1158,7 +1158,7 @@ macro_rules! for_all_plan_nodes { , { Stream, ChangeLog } , { Stream, GlobalApproxPercentile } , { Stream, LocalApproxPercentile } - , { Stream, KeyedMerge } + , { Stream, RowMerge } } }; } @@ -1287,7 +1287,7 @@ macro_rules! for_stream_plan_nodes { , { Stream, ChangeLog } , { Stream, GlobalApproxPercentile } , { Stream, LocalApproxPercentile } - , { Stream, KeyedMerge } + , { Stream, RowMerge } } }; } diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 878e34d577b3..d42f9a9392b4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -43,6 +43,7 @@ impl StreamExchange { } else { MonotonicityMap::new() }; + assert!(!input.schema().is_empty()); let base = PlanBase::new_stream( input.ctx(), input.schema().clone(), diff --git a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs b/src/frontend/src/optimizer/plan_node/stream_row_merge.rs similarity index 87% rename from src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs rename to src/frontend/src/optimizer/plan_node/stream_row_merge.rs index e84a2c9dd9b0..f4c2c894fdf6 100644 --- a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs +++ b/src/frontend/src/optimizer/plan_node/stream_row_merge.rs @@ -32,12 +32,12 @@ use crate::optimizer::plan_node::{ use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; -/// `StreamKeyedMerge` is used for merging two streams with the same stream key and distribution. +/// `StreamRowMerge` is used for merging two streams with the same stream key and distribution. /// It will buffer the outputs from its input streams until we receive a barrier. /// On receiving a barrier, it will `Project` their outputs according /// to the provided `lhs_mapping` and `rhs_mapping`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct StreamKeyedMerge { +pub struct StreamRowMerge { pub base: PlanBase, pub lhs_input: PlanRef, pub rhs_input: PlanRef, @@ -47,7 +47,7 @@ pub struct StreamKeyedMerge { pub rhs_mapping: ColIndexMapping, } -impl StreamKeyedMerge { +impl StreamRowMerge { pub fn new( lhs_input: PlanRef, rhs_input: PlanRef, @@ -77,6 +77,7 @@ impl StreamKeyedMerge { } } let schema = Schema::new(schema_fields); + assert!(!schema.is_empty()); let watermark_columns = FixedBitSet::with_capacity(schema.fields.len()); let base = PlanBase::new_stream( @@ -100,7 +101,7 @@ impl StreamKeyedMerge { } } -impl Distill for StreamKeyedMerge { +impl Distill for StreamRowMerge { fn distill<'a>(&self) -> XmlNode<'a> { let mut out = Vec::with_capacity(1); @@ -109,11 +110,11 @@ impl Distill for StreamKeyedMerge { let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect()); out = vec![("output", e)]; } - childless_record("StreamKeyedMerge", out) + childless_record("StreamRowMerge", out) } } -impl PlanTreeNodeBinary for StreamKeyedMerge { +impl PlanTreeNodeBinary for StreamRowMerge { fn left(&self) -> PlanRef { self.lhs_input.clone() } @@ -133,15 +134,18 @@ impl PlanTreeNodeBinary for StreamKeyedMerge { } } -impl_plan_tree_node_for_binary! { StreamKeyedMerge } +impl_plan_tree_node_for_binary! { StreamRowMerge } -impl StreamNode for StreamKeyedMerge { +impl StreamNode for StreamRowMerge { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - todo!() + PbNodeBody::RowMerge(risingwave_pb::stream_plan::RowMergeNode { + lhs_mapping: Some(self.lhs_mapping.to_protobuf()), + rhs_mapping: Some(self.rhs_mapping.to_protobuf()), + }) } } -impl ExprRewritable for StreamKeyedMerge { +impl ExprRewritable for StreamRowMerge { fn has_rewritable_expr(&self) -> bool { false } @@ -151,6 +155,6 @@ impl ExprRewritable for StreamKeyedMerge { } } -impl ExprVisitable for StreamKeyedMerge { +impl ExprVisitable for StreamRowMerge { fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {} } diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index 93eaabded85a..cfa8e895ad16 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -105,6 +105,8 @@ mod wrapper; mod approx_percentile; +mod row_merge; + #[cfg(test)] mod integration_tests; pub mod test_utils; @@ -143,6 +145,7 @@ pub use project_set::*; pub use rearranged_chain::RearrangedChainExecutor; pub use receiver::ReceiverExecutor; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; +pub use row_merge::RowMergeExecutor; pub use simple_agg::SimpleAggExecutor; pub use sink::SinkExecutor; pub use sort::*; diff --git a/src/stream/src/executor/row_merge.rs b/src/stream/src/executor/row_merge.rs new file mode 100644 index 000000000000..b19b9bdcca8c --- /dev/null +++ b/src/stream/src/executor/row_merge.rs @@ -0,0 +1,189 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::bail; +use risingwave_common::types::ToOwnedDatum; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; +use risingwave_common::util::column_index_mapping::ColIndexMapping; + +use super::barrier_align::*; +use crate::executor::prelude::*; + +pub struct RowMergeExecutor { + ctx: ActorContextRef, + pub lhs_input: Executor, + pub rhs_input: Executor, + /// Maps input from the lhs to the output. + pub lhs_mapping: ColIndexMapping, + /// Maps input from the rhs to the output. + pub rhs_mapping: ColIndexMapping, + /// Output schema + pub schema: Schema, +} + +impl RowMergeExecutor { + pub fn new( + ctx: ActorContextRef, + lhs_input: Executor, + rhs_input: Executor, + lhs_mapping: ColIndexMapping, + rhs_mapping: ColIndexMapping, + schema: Schema, + ) -> Self { + Self { + ctx, + lhs_input, + rhs_input, + lhs_mapping, + rhs_mapping, + schema, + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + pub async fn execute_inner(self) { + let lhs_mapping = self.lhs_mapping; + let rhs_mapping = self.rhs_mapping; + let data_types = self + .schema + .fields() + .iter() + .map(|f| f.data_type()) + .collect::>(); + + { + let mut lhs_buffer: Vec = Vec::with_capacity(1); + let mut rhs_buffer: Vec = Vec::with_capacity(1); + let aligned_stream = barrier_align( + self.lhs_input.execute(), + self.rhs_input.execute(), + self.ctx.id, + self.ctx.fragment_id, + self.ctx.streaming_metrics.clone(), + "RowMerge", + ); + pin_mut!(aligned_stream); + #[for_await] + for message in aligned_stream { + match message? { + AlignedMessage::Left(chunk) => { + lhs_buffer.push(chunk); + } + AlignedMessage::Right(chunk) => { + rhs_buffer.push(chunk); + } + AlignedMessage::Barrier(barrier) => { + if lhs_buffer.is_empty() && rhs_buffer.is_empty() { + yield Message::Barrier(barrier); + continue; + } + #[for_await] + for output in Self::flush_buffers( + &data_types, + &lhs_mapping, + &rhs_mapping, + &mut lhs_buffer, + &mut rhs_buffer, + ) { + yield output?; + } + yield Message::Barrier(barrier); + } + AlignedMessage::WatermarkLeft(watermark) => { + tracing::warn!("unexpected watermark from left stream: {:?}", watermark); + } + AlignedMessage::WatermarkRight(watermark) => { + tracing::warn!("unexpected watermark from right stream: {:?}", watermark); + } + } + } + } + } + + #[try_stream(ok = Message, error = StreamExecutorError)] + async fn flush_buffers<'a>( + data_types: &'a [DataType], + lhs_mapping: &'a ColIndexMapping, + rhs_mapping: &'a ColIndexMapping, + lhs_buffer: &'a mut Vec, + rhs_buffer: &'a mut Vec, + ) { + if lhs_buffer.is_empty() { + bail!("lhs buffer should not be empty "); + }; + if rhs_buffer.is_empty() { + bail!("rhs buffer should not be empty "); + }; + + for lhs_chunk in lhs_buffer.drain(..) { + for rhs_chunk in rhs_buffer.drain(..) { + yield Self::build_chunk( + data_types, + lhs_mapping, + rhs_mapping, + lhs_chunk.clone(), + rhs_chunk, + )?; + } + } + } + + fn build_chunk( + data_types: &[DataType], + lhs_mapping: &ColIndexMapping, + rhs_mapping: &ColIndexMapping, + lhs_chunk: StreamChunk, + rhs_chunk: StreamChunk, + ) -> Result { + if !(1..=2).contains(&lhs_chunk.cardinality()) { + bail!("lhs chunk cardinality should be 1 or 2"); + } + if !(1..=2).contains(&rhs_chunk.cardinality()) { + bail!("rhs chunk cardinality should be 1 or 2"); + } + if lhs_chunk.cardinality() != rhs_chunk.cardinality() { + bail!("lhs and rhs chunk cardinality should be the same"); + } + let cardinality = lhs_chunk.cardinality(); + let mut ops = Vec::with_capacity(cardinality); + let mut merged_rows = vec![vec![Datum::None; data_types.len()]; cardinality]; + for (i, (op, lhs_row)) in lhs_chunk.rows().enumerate() { + ops.push(op); + for (j, d) in lhs_row.iter().enumerate() { + let out_index = lhs_mapping.map(j); + merged_rows[i][out_index] = d.to_owned_datum(); + } + } + + for (i, (_, rhs_row)) in rhs_chunk.rows().enumerate() { + for (j, d) in rhs_row.iter().enumerate() { + let out_index = rhs_mapping.map(j); + merged_rows[i][out_index] = d.to_owned_datum(); + } + } + let mut builder = DataChunkBuilder::new(data_types.to_vec(), cardinality); + for row in merged_rows { + if let Some(chunk) = builder.append_one_row(&row[..]) { + return Ok(Message::Chunk(StreamChunk::from_parts(ops, chunk))); + } + } + bail!("builder should have yielded a chunk") + } +} + +impl Execute for RowMergeExecutor { + fn execute(self: Box) -> BoxedMessageStream { + self.execute_inner().boxed() + } +} diff --git a/src/stream/src/from_proto/mod.rs b/src/stream/src/from_proto/mod.rs index 5b54619fb932..6f185695eadf 100644 --- a/src/stream/src/from_proto/mod.rs +++ b/src/stream/src/from_proto/mod.rs @@ -53,6 +53,8 @@ mod union; mod values; mod watermark_filter; +mod row_merge; + mod approx_percentile; // import for submodules @@ -86,6 +88,7 @@ use self::over_window::*; use self::project::*; use self::project_set::*; use self::row_id_gen::RowIdGenExecutorBuilder; +use self::row_merge::*; use self::simple_agg::*; use self::sink::*; use self::sort::*; @@ -181,5 +184,6 @@ pub async fn create_executor( NodeBody::Changelog => ChangeLogExecutorBuilder, NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder, NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder, + NodeBody::RowMerge => RowMergeExecutorBuilder, } } diff --git a/src/stream/src/from_proto/row_merge.rs b/src/stream/src/from_proto/row_merge.rs new file mode 100644 index 000000000000..41e3620d4000 --- /dev/null +++ b/src/stream/src/from_proto/row_merge.rs @@ -0,0 +1,45 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_pb::stream_plan::RowMergeNode; + +use crate::executor::RowMergeExecutor; +use crate::from_proto::*; + +pub struct RowMergeExecutorBuilder; + +impl ExecutorBuilder for RowMergeExecutorBuilder { + type Node = RowMergeNode; + + async fn new_boxed_executor( + params: ExecutorParams, + node: &Self::Node, + _store: impl StateStore, + ) -> StreamResult { + let [lhs_input, rhs_input]: [_; 2] = params.input.try_into().unwrap(); + let lhs_mapping = ColIndexMapping::from_protobuf(node.lhs_mapping.as_ref().unwrap()); + let rhs_mapping = ColIndexMapping::from_protobuf(node.rhs_mapping.as_ref().unwrap()); + + let exec = RowMergeExecutor::new( + params.actor_context, + lhs_input, + rhs_input, + lhs_mapping, + rhs_mapping, + params.info.schema.clone(), + ); + Ok(Executor::new(params.info, exec.boxed())) + } +}