Skip to content

Commit

Permalink
handle global approx percentile
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 21, 2024
1 parent 8d37305 commit d7ce1fd
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
// limitations under the License.

use pretty_xmlish::XmlNode;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;

use crate::expr::{ExprRewriter, ExprVisitor};
use crate::optimizer::plan_node::expr_visitable::ExprVisitable;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::generic::{GenericPlanRef, PhysicalPlanRef};
use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{childless_record, watermark_pretty, Distill};
use crate::optimizer::plan_node::{
ExprRewritable, PlanAggCall, PlanBase, PlanTreeNodeUnary, Stream, StreamHopWindow,
Expand All @@ -34,26 +37,38 @@ pub struct StreamGlobalApproxPercentile {

impl StreamGlobalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
Self {
base: todo!(),
input,
}
let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]);
let base = PlanBase::new_stream(
input.ctx(),
schema,
input.stream_key().map(|k| k.to_vec()),
input.functional_dependency().clone(),
input.distribution().clone(),
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
input.columns_monotonicity().clone(),
);
Self { base, input }
}
}

impl Distill for StreamGlobalApproxPercentile {
fn distill<'a>(&self) -> XmlNode<'a> {
todo!()
childless_record("StreamGlobalApproxPercentile", vec![])
}
}

impl PlanTreeNodeUnary for StreamGlobalApproxPercentile {
fn input(&self) -> PlanRef {
todo!()
self.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
todo!()
Self {
base: self.base.clone(),
input,
}
}
}

Expand All @@ -67,16 +82,16 @@ impl StreamNode for StreamGlobalApproxPercentile {

impl ExprRewritable for StreamGlobalApproxPercentile {
fn has_rewritable_expr(&self) -> bool {
todo!()
false
}

fn rewrite_exprs(&self, _rewriter: &mut dyn ExprRewriter) -> PlanRef {
todo!()
unimplemented!()
}
}

impl ExprVisitable for StreamGlobalApproxPercentile {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
todo!()
unimplemented!()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ impl StreamLocalApproxPercentile {

impl Distill for StreamLocalApproxPercentile {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut out = Vec::with_capacity(4);
let output_type = DataType::Float64;
let mut out = Vec::with_capacity(5);
out.push((
"percentile_col",
Pretty::display(&InputRefDisplay {
Expand Down

0 comments on commit d7ce1fd

Please sign in to comment.