Skip to content

Commit

Permalink
add plan node for two-phase simple agg
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 18, 2024
1 parent e720d8e commit d40caaf
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 5 deletions.
75 changes: 70 additions & 5 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use risingwave_expr::aggregate::{agg_kinds, AggKind};
use super::generic::{self, Agg, GenericPlanRef, PlanAggCall, ProjectBuilder};
use super::utils::impl_distill_by_unit;
use super::{
BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamSimpleAgg,
StreamStatelessSimpleAgg, ToBatch, ToStream,
BatchHashAgg, BatchSimpleAgg, ColPrunable, ExprRewritable, Logical, LogicalShare, PlanBase,
PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHashAgg, StreamProject, StreamShare,
StreamSimpleAgg, StreamStatelessSimpleAgg, ToBatch, ToStream,
};
use crate::error::{ErrorCode, Result, RwError};
use crate::expr::{
Expand All @@ -33,6 +33,9 @@ use crate::expr::{
};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanNode;
use crate::optimizer::plan_node::stream_global_approx_percentile::StreamGlobalApproxPercentile;
use crate::optimizer::plan_node::stream_keyed_merge::StreamKeyedMerge;
use crate::optimizer::plan_node::stream_local_approx_percentile::StreamLocalApproxPercentile;
use crate::optimizer::plan_node::{
gen_filter_and_pushdown, BatchSortAgg, ColumnPruningContext, LogicalDedup, LogicalProject,
PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
Expand Down Expand Up @@ -167,7 +170,11 @@ impl LogicalAgg {
}

/// Generates distributed stream plan.
fn gen_dist_stream_agg_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
fn gen_dist_stream_agg_plan(
&self,
stream_input: PlanRef,
ctx: &mut ToStreamContext,
) -> Result<PlanRef> {
use super::stream::prelude::*;

let input_dist = stream_input.distribution();
Expand All @@ -192,6 +199,15 @@ impl LogicalAgg {
self.core.can_two_phase_agg()
});

// Handle 2-phase approx_percentile aggregation.
if self
.agg_calls()
.iter()
.any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile)
{
return self.gen_approx_percentile_plan(stream_input, ctx);
}

// Stateless 2-phase simple agg
// can be applied on stateless simple agg calls,
// with input distributed by [`Distribution::AnyShard`]
Expand Down Expand Up @@ -242,6 +258,55 @@ impl LogicalAgg {
}
}

/// ------- REWRITES -------
///
/// SimpleAgg { agg_calls: [count(*), approx_percentile(quantile, relative_error, v1), sum(v1)] }
/// -> Input
///
/// -------- INTO ----------
///
/// KeyedMerge { agg_calls: [count(*), approx_percentile(quantile, relative_error, v1), sum(v1)] }
/// -> SimpleAgg { agg_calls: [count(*)] }
/// -> StreamShare { id: 0 }
/// -> Input
/// -> GlobalApproxPercentile { args: [bucket_id, count] }
/// -> LocalApproxPercentile { args: [relative_error, ] }
/// -> StreamShare { id: 0 }
/// -> Input
// NOTE(kwannoel): We can't split LocalApproxPercentile into Project + Agg, because
// for the agg step, we need to count by bucket id, and not just dispatch a single record
// downstream. So we need a custom executor for this logic.
fn gen_approx_percentile_plan(
&self,
stream_input: PlanRef,
ctx: &mut ToStreamContext,
) -> Result<PlanRef> {
if !self.group_key().is_empty() {
return Err(ErrorCode::NotSupported(
"ApproxPercentile aggregation with GROUP BY is not supported".into(),
"try to remove GROUP BY".into(),
)
.into());
}

let shared_input = LogicalShare::new(stream_input).to_stream(ctx)?;
let (rewritten_simple_agg, lhs_mapping, rhs_mapping) =
Self::rewrite_simple_agg_for_approx_percentile(shared_input.clone())?;
let rewritten_approx_percentile = StreamLocalApproxPercentile::new(shared_input);
let global_approx_percentile =
StreamGlobalApproxPercentile::new(rewritten_approx_percentile.into());
let agg_merge =
StreamKeyedMerge::new(rewritten_simple_agg, global_approx_percentile.into());
Ok(agg_merge.into())
}

/// Returns rewritten simple agg + Mapping of the lhs and rhs indices for keyed merge.
fn rewrite_simple_agg_for_approx_percentile(
input: PlanRef,
) -> Result<(PlanRef, ColIndexMapping, ColIndexMapping)> {
todo!()
}

pub fn core(&self) -> &Agg<PlanRef> {
&self.core
}
Expand Down Expand Up @@ -1111,7 +1176,7 @@ impl ToStream for LogicalAgg {
return logical_dedup.to_stream(ctx);
}

let plan = self.gen_dist_stream_agg_plan(stream_input)?;
let plan = self.gen_dist_stream_agg_plan(stream_input, ctx)?;

let (plan, n_final_agg_calls) = if let Some(final_agg) = plan.as_stream_simple_agg() {
if eowc {
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/plan_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,10 +887,13 @@ mod stream_exchange;
mod stream_expand;
mod stream_filter;
mod stream_fs_fetch;
mod stream_global_approx_percentile;
mod stream_group_topn;
mod stream_hash_agg;
mod stream_hash_join;
mod stream_hop_window;
mod stream_keyed_merge;
mod stream_local_approx_percentile;
mod stream_materialize;
mod stream_now;
mod stream_over_window;
Expand Down Expand Up @@ -990,10 +993,13 @@ pub use stream_exchange::StreamExchange;
pub use stream_expand::StreamExpand;
pub use stream_filter::StreamFilter;
pub use stream_fs_fetch::StreamFsFetch;
pub use stream_global_approx_percentile::StreamGlobalApproxPercentile;
pub use stream_group_topn::StreamGroupTopN;
pub use stream_hash_agg::StreamHashAgg;
pub use stream_hash_join::StreamHashJoin;
pub use stream_hop_window::StreamHopWindow;
pub use stream_keyed_merge::StreamKeyedMerge;
pub use stream_local_approx_percentile::StreamLocalApproxPercentile;
pub use stream_materialize::StreamMaterialize;
pub use stream_now::StreamNow;
pub use stream_over_window::StreamOverWindow;
Expand Down Expand Up @@ -1134,6 +1140,9 @@ macro_rules! for_all_plan_nodes {
, { Stream, EowcSort }
, { Stream, OverWindow }
, { Stream, FsFetch }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, KeyedMerge }
}
};
}
Expand Down Expand Up @@ -1256,6 +1265,9 @@ macro_rules! for_stream_plan_nodes {
, { Stream, EowcSort }
, { Stream, OverWindow }
, { Stream, FsFetch }
, { Stream, GlobalApproxPercentile }
, { Stream, LocalApproxPercentile }
, { Stream, KeyedMerge }
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;

use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Distill};
use crate::optimizer::plan_node::{
ExprRewritable, PlanBase, PlanTreeNodeUnary, Stream, StreamHopWindow, StreamKeyedMerge,
StreamNode,
};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamGlobalApproxPercentile {
pub base: PlanBase<Stream>,
}

impl StreamGlobalApproxPercentile {
pub fn new(input: PlanRef) -> Self {
Self { base: todo!() }
}
}

impl Distill for StreamGlobalApproxPercentile {
fn distill<'a>(&self) -> XmlNode<'a> {
todo!()
}
}

impl PlanTreeNodeUnary for StreamGlobalApproxPercentile {
fn input(&self) -> PlanRef {
todo!()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
todo!()
}
}

impl_plan_tree_node_for_unary! {StreamGlobalApproxPercentile}

impl StreamNode for StreamGlobalApproxPercentile {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
todo!()
}
}

impl ExprRewritable for StreamGlobalApproxPercentile {
fn has_rewritable_expr(&self) -> bool {
todo!()
}

fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
todo!()
}
}

impl ExprVisitable for StreamGlobalApproxPercentile {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
todo!()
}
}
88 changes: 88 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;

use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::utils::Distill;
use crate::optimizer::plan_node::{
ExprRewritable, PlanBase, PlanTreeNodeBinary, Stream, StreamHopWindow,
StreamLocalApproxPercentile, StreamNode,
};
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::PlanRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StreamKeyedMerge {
pub lhs_input: PlanRef,
pub rhs_input: PlanRef,
pub base: PlanBase<Stream>,
}

impl StreamKeyedMerge {
pub fn new(lhs_input: PlanRef, rhs_input: PlanRef) -> Self {
Self {
lhs_input,
rhs_input,
base: todo!(),
}
}
}

impl Distill for StreamKeyedMerge {
fn distill<'a>(&self) -> XmlNode<'a> {
todo!()
}
}

impl PlanTreeNodeBinary for StreamKeyedMerge {
fn left(&self) -> PlanRef {
todo!()
}

fn right(&self) -> PlanRef {
todo!()
}

fn clone_with_left_right(&self, left: PlanRef, right: PlanRef) -> Self {
todo!()
}
}

impl_plan_tree_node_for_binary! { StreamKeyedMerge }

impl StreamNode for StreamKeyedMerge {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
todo!()
}
}

impl ExprRewritable for StreamKeyedMerge {
fn has_rewritable_expr(&self) -> bool {
todo!()
}

fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
todo!()
}
}

impl ExprVisitable for StreamKeyedMerge {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
todo!()
}
}
Loading

0 comments on commit d40caaf

Please sign in to comment.