Skip to content

Commit

Permalink
rename MergeProject to RowMerge
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Aug 6, 2024
1 parent 1b2282f commit 6772b51
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 44 deletions.
4 changes: 2 additions & 2 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ message GlobalApproxPercentileNode {
catalog.Table count_state_table = 4;
}

message MergeProjectNode {
message RowMergeNode {
catalog.ColIndexMapping lhs_mapping = 1;
catalog.ColIndexMapping rhs_mapping = 2;
}
Expand Down Expand Up @@ -854,7 +854,7 @@ message StreamNode {
ChangeLogNode changelog = 143;
LocalApproxPercentileNode local_approx_percentile = 144;
GlobalApproxPercentileNode global_approx_percentile = 145;
MergeProjectNode merge_project = 146;
RowMergeNode row_merge = 146;
}
// The id for the operator. This is local per mview.
// TODO: should better be a uint32.
Expand Down
26 changes: 13 additions & 13 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::stream_global_approx_percentile::StreamGlobalApproxPercentile;
use crate::optimizer::plan_node::stream_local_approx_percentile::StreamLocalApproxPercentile;
use crate::optimizer::plan_node::stream_merge_project::StreamMergeProject;
use crate::optimizer::plan_node::stream_row_merge::StreamRowMerge;
use crate::optimizer::plan_node::{
gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalDedup, LogicalProject,
PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down Expand Up @@ -87,11 +87,11 @@ impl LogicalAgg {
col_mapping: approx_percentile_col_mapping,
} = approx;

let needs_merge_project = (!non_approx_percentile_agg_calls.is_empty()
let needs_row_merge = (!non_approx_percentile_agg_calls.is_empty()
&& !approx_percentile_agg_calls.is_empty())
|| approx_percentile_agg_calls.len() >= 2;
core.input = if needs_merge_project {
// If there's merge project, we need to share the input.
core.input = if needs_row_merge {
// If there's row merge, we need to share the input.
StreamShare::new_from_input(stream_input.clone()).into()
} else {
stream_input
Expand Down Expand Up @@ -124,14 +124,14 @@ impl LogicalAgg {

// ====== Merge approx percentile and normal aggs
if let Some(approx_percentile) = approx_percentile {
if needs_merge_project {
let merge_project = StreamMergeProject::new(
if needs_row_merge {
let row_merge = StreamRowMerge::new(
approx_percentile,
global_agg.into(),
approx_percentile_col_mapping,
non_approx_percentile_col_mapping,
)?;
Ok(merge_project.into())
Ok(row_merge.into())
} else {
Ok(approx_percentile)
}
Expand Down Expand Up @@ -380,14 +380,14 @@ impl LogicalAgg {
let mut acc = iter.next().unwrap();
for (current_size, plan) in iter.enumerate().map(|(i, p)| (i + 1, p)) {
let new_size = current_size + 1;
let merge_project = StreamMergeProject::new(
let row_merge = StreamRowMerge::new(
acc,
plan,
ColIndexMapping::identity_or_none(current_size, new_size),
ColIndexMapping::new(vec![Some(current_size)], new_size),
)
.expect("failed to build merge project");
acc = merge_project.into();
.expect("failed to build row merge");
acc = row_merge.into();
}
Ok(Some(acc))
}
Expand Down Expand Up @@ -1318,17 +1318,17 @@ impl ToStream for LogicalAgg {
.into());
}
(plan.clone(), 1)
} else if let Some(stream_merge_project) = plan.as_stream_merge_project() {
} else if let Some(stream_row_merge) = plan.as_stream_row_merge() {
if eowc {
return Err(ErrorCode::InvalidInputSyntax(
"`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`"
.to_string(),
)
.into());
}
(plan.clone(), stream_merge_project.base.schema().len())
(plan.clone(), stream_row_merge.base.schema().len())
} else {
panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamMergeProject");
panic!("the root PlanNode must be StreamHashAgg, StreamSimpleAgg, StreamGlobalApproxPercentile, or StreamRowMerge");
};

if self.agg_calls().len() == n_final_agg_calls {
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ mod stream_hash_join;
mod stream_hop_window;
mod stream_local_approx_percentile;
mod stream_materialize;
mod stream_merge_project;
mod stream_row_merge;
mod stream_now;
mod stream_over_window;
mod stream_project;
Expand Down Expand Up @@ -1012,7 +1012,7 @@ pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
pub use stream_materialize::StreamMaterialize;
pub use stream_merge_project::StreamMergeProject;
pub use stream_row_merge::StreamRowMerge;
pub use stream_now::StreamNow;
pub use stream_over_window::StreamOverWindow;
pub use stream_project::StreamProject;
Expand Down Expand Up @@ -1158,7 +1158,7 @@ macro_rules! for_all_plan_nodes {
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, MergeProject }
, { Stream, RowMerge }
}
};
}
Expand Down Expand Up @@ -1287,7 +1287,7 @@ macro_rules! for_stream_plan_nodes {
, { Stream, ChangeLog }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, MergeProject }
, { Stream, RowMerge }
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ use crate::optimizer::plan_node::{
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

/// `StreamMergeProject` is used for merging two streams with the same stream key and distribution.
/// `StreamRowMerge` is used for merging two streams with the same stream key and distribution.
/// It will buffer the outputs from its input streams until we receive a barrier.
/// On receiving a barrier, it will `Project` their outputs according
/// to the provided `lhs_mapping` and `rhs_mapping`.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamMergeProject {
pub struct StreamRowMerge {
pub base: PlanBase<Stream>,
pub lhs_input: PlanRef,
pub rhs_input: PlanRef,
Expand All @@ -47,7 +47,7 @@ pub struct StreamMergeProject {
pub rhs_mapping: ColIndexMapping,
}

impl StreamMergeProject {
impl StreamRowMerge {
pub fn new(
lhs_input: PlanRef,
rhs_input: PlanRef,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl StreamMergeProject {
}
}

impl Distill for StreamMergeProject {
impl Distill for StreamRowMerge {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut out = Vec::with_capacity(1);

Expand All @@ -110,11 +110,11 @@ impl Distill for StreamMergeProject {
let e = Pretty::Array(self.base.schema().fields().iter().map(f).collect());
out = vec![("output", e)];
}
childless_record("StreamMergeProject", out)
childless_record("StreamRowMerge", out)
}
}

impl PlanTreeNodeBinary for StreamMergeProject {
impl PlanTreeNodeBinary for StreamRowMerge {
fn left(&self) -> PlanRef {
self.lhs_input.clone()
}
Expand All @@ -134,18 +134,18 @@ impl PlanTreeNodeBinary for StreamMergeProject {
}
}

impl_plan_tree_node_for_binary! { StreamMergeProject }
impl_plan_tree_node_for_binary! { StreamRowMerge }

impl StreamNode for StreamMergeProject {
impl StreamNode for StreamRowMerge {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
PbNodeBody::MergeProject(risingwave_pb::stream_plan::MergeProjectNode {
PbNodeBody::RowMerge(risingwave_pb::stream_plan::RowMergeNode {
lhs_mapping: Some(self.lhs_mapping.to_protobuf()),
rhs_mapping: Some(self.rhs_mapping.to_protobuf()),
})
}
}

impl ExprRewritable for StreamMergeProject {
impl ExprRewritable for StreamRowMerge {
fn has_rewritable_expr(&self) -> bool {
false
}
Expand All @@ -155,6 +155,6 @@ impl ExprRewritable for StreamMergeProject {
}
}

impl ExprVisitable for StreamMergeProject {
impl ExprVisitable for StreamRowMerge {
fn visit_exprs(&self, _v: &mut dyn ExprVisitor) {}
}
4 changes: 2 additions & 2 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ mod wrapper;

mod approx_percentile;

mod merge_project;
mod row_merge;

#[cfg(test)]
mod integration_tests;
Expand Down Expand Up @@ -136,7 +136,7 @@ pub use join::JoinType;
pub use lookup::*;
pub use lookup_union::LookupUnionExecutor;
pub use merge::MergeExecutor;
pub use merge_project::MergeProjectExecutor;
pub use row_merge::RowMergeExecutor;
pub use mview::*;
pub use no_op::NoOpExecutor;
pub use now::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use super::barrier_align::*;
use crate::executor::prelude::*;

pub struct MergeProjectExecutor {
pub struct RowMergeExecutor {
ctx: ActorContextRef,
pub lhs_input: Executor,
pub rhs_input: Executor,
Expand All @@ -32,7 +32,7 @@ pub struct MergeProjectExecutor {
pub schema: Schema,
}

impl MergeProjectExecutor {
impl RowMergeExecutor {
pub fn new(
ctx: ActorContextRef,
lhs_input: Executor,
Expand Down Expand Up @@ -146,7 +146,7 @@ impl MergeProjectExecutor {
}
}

impl Execute for MergeProjectExecutor {
impl Execute for RowMergeExecutor {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
}
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod union;
mod values;
mod watermark_filter;

mod merge_project;
mod row_merge;

mod approx_percentile;

Expand Down Expand Up @@ -81,7 +81,7 @@ use self::hop_window::*;
use self::lookup::*;
use self::lookup_union::*;
use self::merge::*;
use self::merge_project::*;
use self::row_merge::*;
use self::mview::*;
use self::no_op::*;
use self::now::NowExecutorBuilder;
Expand Down Expand Up @@ -184,6 +184,6 @@ pub async fn create_executor(
NodeBody::Changelog => ChangeLogExecutorBuilder,
NodeBody::GlobalApproxPercentile => GlobalApproxPercentileExecutorBuilder,
NodeBody::LocalApproxPercentile => LocalApproxPercentileExecutorBuilder,
NodeBody::MergeProject => MergeProjectExecutorBuilder,
NodeBody::RowMerge => RowMergeExecutorBuilder,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
// limitations under the License.

use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::MergeProjectNode;
use risingwave_pb::stream_plan::RowMergeNode;

use crate::executor::MergeProjectExecutor;
use crate::executor::RowMergeExecutor;
use crate::from_proto::*;

pub struct MergeProjectExecutorBuilder;
pub struct RowMergeExecutorBuilder;

impl ExecutorBuilder for MergeProjectExecutorBuilder {
type Node = MergeProjectNode;
impl ExecutorBuilder for RowMergeExecutorBuilder {
type Node = RowMergeNode;

async fn new_boxed_executor(
params: ExecutorParams,
Expand All @@ -32,7 +32,7 @@ impl ExecutorBuilder for MergeProjectExecutorBuilder {
let lhs_mapping = ColIndexMapping::from_protobuf(node.lhs_mapping.as_ref().unwrap());
let rhs_mapping = ColIndexMapping::from_protobuf(node.rhs_mapping.as_ref().unwrap());

let exec = MergeProjectExecutor::new(
let exec = RowMergeExecutor::new(
params.actor_context,
lhs_input,
rhs_input,
Expand Down

0 comments on commit 6772b51

Please sign in to comment.