diff --git a/src/frontend/src/optimizer/plan_node/logical_agg.rs b/src/frontend/src/optimizer/plan_node/logical_agg.rs index 6956d61cc971d..ed24b88927479 100644 --- a/src/frontend/src/optimizer/plan_node/logical_agg.rs +++ b/src/frontend/src/optimizer/plan_node/logical_agg.rs @@ -303,8 +303,12 @@ impl LogicalAgg { .with_grouping_sets(self.grouping_sets().clone()) .with_enable_two_phase(self.core().enable_two_phase) .into(); - let agg_merge = - StreamKeyedMerge::new(simple_agg_without_approx_percentile, approx_percentile_agg); + let agg_merge = StreamKeyedMerge::new( + simple_agg_without_approx_percentile, + approx_percentile_agg, + lhs_mapping, + rhs_mapping, + ); Ok(agg_merge.into()) } diff --git a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs index ed726dda3befe..c917f0c5522b2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs +++ b/src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use pretty_xmlish::XmlNode; +use pretty_xmlish::{Pretty, XmlNode}; +use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; use crate::expr::{ExprRewriter, ExprVisitor}; use crate::optimizer::plan_node::expr_visitable::ExprVisitable; -use crate::optimizer::plan_node::utils::Distill; +use crate::optimizer::plan_node::generic::PhysicalPlanRef; +use crate::optimizer::plan_node::stream::StreamPlanRef; +use crate::optimizer::plan_node::utils::{childless_record, Distill}; use crate::optimizer::plan_node::{ ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamHopWindow, StreamLocalApproxPercentile, StreamNode, @@ -28,38 +31,69 @@ use crate::PlanRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamKeyedMerge { + pub base: PlanBase, pub lhs_input: PlanRef, pub rhs_input: PlanRef, - pub base: PlanBase, + /// Maps input from the lhs to the output. + pub lhs_mapping: ColIndexMapping, + /// Maps input from the rhs to the output. + pub rhs_mapping: ColIndexMapping, } impl StreamKeyedMerge { - pub fn new(lhs_input: PlanRef, rhs_input: PlanRef) -> Self { + pub fn new( + lhs_input: PlanRef, + rhs_input: PlanRef, + lhs_mapping: ColIndexMapping, + rhs_mapping: ColIndexMapping, + ) -> Self { + let base = PlanBase::new_stream( + lhs_input.ctx(), + lhs_input.schema().clone(), + lhs_input.stream_key().map(|k| k.to_vec()), + lhs_input.functional_dependency().clone(), + lhs_input.distribution().clone(), + lhs_input.append_only(), + lhs_input.emit_on_window_close(), + lhs_input.watermark_columns().clone(), + lhs_input.columns_monotonicity().clone(), + ); Self { + base, lhs_input, rhs_input, - base: todo!(), + lhs_mapping, + rhs_mapping, } } } impl Distill for StreamKeyedMerge { fn distill<'a>(&self) -> XmlNode<'a> { - todo!() + let mut out = Vec::with_capacity(2); + out.push(("lhs_col_mapping", Pretty::debug(&self.lhs_mapping))); + out.push(("rhs_col_mapping", Pretty::debug(&self.rhs_mapping))); + childless_record("StreamKeyedMerge", out) } } impl PlanTreeNodeBinary for StreamKeyedMerge { fn left(&self) -> PlanRef { - todo!() + self.lhs_input.clone() } fn right(&self) -> PlanRef { - todo!() + self.rhs_input.clone() } fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self { - todo!() + Self { + base: self.base.clone(), + lhs_input: left, + rhs_input: right, + lhs_mapping: self.lhs_mapping.clone(), + rhs_mapping: self.rhs_mapping.clone(), + } } } @@ -73,16 +107,16 @@ impl StreamNode for StreamKeyedMerge { impl ExprRewritable for StreamKeyedMerge { fn has_rewritable_expr(&self) -> bool { - todo!() + false } fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef { - todo!() + unimplemented!() } } impl ExprVisitable for StreamKeyedMerge { fn visit_exprs(&self, v: &mut dyn ExprVisitor) { - todo!() + unimplemented!() } }