From dee9849a74a723a274de7be7e4ba9cf8f4c7f0e5 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Wed, 24 Jul 2024 11:22:46 +0800 Subject: [PATCH] docs for stream keyed merge --- src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs index 2f09d2267f18c..e84a2c9dd9b00 100644 --- a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs +++ b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs @@ -32,6 +32,10 @@ use crate::optimizer::plan_node::{ use crate::stream_fragmenter::BuildFragmentGraphState; use crate::PlanRef; +/// `StreamKeyedMerge` is used for merging two streams with the same stream key and distribution. +/// It will buffer the outputs from its input streams until we receive a barrier. +/// On receiving a barrier, it will `Project` their outputs according +/// to the provided `lhs_mapping` and `rhs_mapping`. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamKeyedMerge { pub base: PlanBase, @@ -51,6 +55,8 @@ impl StreamKeyedMerge { rhs_mapping: ColIndexMapping, ) -> Result { assert_eq!(lhs_mapping.target_size(), rhs_mapping.target_size()); + assert_eq!(lhs_input.distribution(), rhs_input.distribution()); + assert_eq!(lhs_input.stream_key(), rhs_input.stream_key()); let mut schema_fields = Vec::with_capacity(lhs_mapping.target_size()); let o2i_lhs = lhs_mapping .inverse()