Skip to content

Commit

Permalink
LogicalOverAgg::to_stream
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed May 6, 2023
1 parent 8454ed5 commit 3054713
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ impl<PlanRef: GenericPlanRef> OverWindow<PlanRef> {
}
}

pub fn input_len(&self) -> usize {
self.input.schema().len()
}

pub fn output_len(&self) -> usize {
self.input.schema().len() + self.window_functions.len()
}
Expand Down
56 changes: 53 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_over_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncKind};

use super::generic::{OverWindow, PlanWindowFunction};
use super::{
gen_filter_and_pushdown, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef,
PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream,
PlanTreeNodeUnary, PredicatePushdown, StreamEowcOverWindow, StreamSort, ToBatch, ToStream,
};
use crate::expr::{Expr, ExprImpl, InputRef, WindowFunction};
use crate::optimizer::plan_node::{
ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext,
};
use crate::optimizer::property::{Order, RequiredDist};
use crate::utils::{ColIndexMapping, Condition};

/// `LogicalOverAgg` performs `OVER` window aggregates ([`WindowFunction`]) to its input.
Expand Down Expand Up @@ -355,7 +356,56 @@ impl ToBatch for LogicalOverAgg {
}

impl ToStream for LogicalOverAgg {
fn to_stream(&self, _ctx: &mut ToStreamContext) -> Result<PlanRef> {
fn to_stream(&self, ctx: &mut ToStreamContext) -> Result<PlanRef> {
let stream_input = self.core.input.to_stream(ctx)?;
stream_input.watermark_columns();

if ctx.emit_on_window_close() {
if !self.core.funcs_have_same_partition_and_order() {
return Err(ErrorCode::InvalidInputSyntax(
"All window functions must have the same PARTITION BY and ORDER BY".to_string(),
)
.into());
}

let order_by = &self.window_functions()[0].order_by;
if order_by.len() != 1
|| !stream_input
.watermark_columns()
.contains(order_by[0].column_index)
|| order_by[0].order_type != OrderType::ascending()
{
return Err(ErrorCode::InvalidInputSyntax(
"Only support window functions order by single watermark column in ascending order"
.to_string(),
)
.into());
}
let order_key_index = order_by[0].column_index;

let partition_key_indices = self.window_functions()[0]
.partition_by
.iter()
.map(|e| e.index())
.collect_vec();
if partition_key_indices.is_empty() {
return Err(ErrorCode::NotImplemented(
"Window function with empty PARTITION BY is not supported yet".to_string(),
None.into(),
)
.into());
}

let sort_input =
RequiredDist::shard_by_key(stream_input.schema().len(), &partition_key_indices)
.enforce_if_not_satisfies(stream_input, &Order::any())?;
let sort = StreamSort::new(sort_input, order_key_index);

let mut logical = self.core.clone();
logical.input = sort.into();
return Ok(StreamEowcOverWindow::new(logical).into());
}

Err(ErrorCode::NotImplemented("OverAgg to stream".to_string(), 9124.into()).into())
}

Expand Down

0 comments on commit 3054713

Please sign in to comment.