Skip to content

Commit

Permalink
test case of approx_percentile alone
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Jul 21, 2024
1 parent 802a5f8 commit 8c6ea39
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 30 deletions.
16 changes: 8 additions & 8 deletions src/frontend/planner_test/tests/testdata/output/agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1867,14 +1867,14 @@
CREATE TABLE t (v1 int);
SELECT approx_percentile(0.5, 0.01) WITHIN GROUP (order by v1) from t;
logical_plan: |-
LogicalProject { exprs: [approx_percentile($expr1 order_by(t.v1 ASC))] }
└─LogicalAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC))] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1, t.v1] }
LogicalProject { exprs: [approx_percentile($expr1)] }
└─LogicalAgg { aggs: [approx_percentile($expr1)] }
└─LogicalProject { exprs: [t.v1::Float64 as $expr1] }
└─LogicalScan { table: t, columns: [t.v1, t._row_id] }
stream_plan: |-
StreamMaterialize { columns: [approx_percentile], stream_key: [], pk_columns: [], pk_conflict: NoCheck }
└─StreamProject { exprs: [approx_percentile($expr1 order_by(t.v1 ASC))] }
└─StreamSimpleAgg { aggs: [approx_percentile($expr1 order_by(t.v1 ASC)), count] }
└─StreamExchange { dist: Single }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t.v1, t._row_id] }
StreamMaterialize { columns: [approx_percentile], stream_key: [approx_percentile], pk_columns: [approx_percentile], pk_conflict: NoCheck }
└─StreamExchange { dist: HashShard(approx_percentile) }
└─StreamGlobalApproxPercentile
└─StreamLocalApproxPercentile { percentile_col: $expr1, quantile: 0.5:Decimal, relative_error: 0.01:Decimal }
└─StreamProject { exprs: [t.v1::Float64 as $expr1, t._row_id] }
└─StreamTableScan { table: t, columns: [t.v1, t._row_id], stream_scan_type: ArrangementBackfill, stream_key: [t._row_id], pk: [_row_id], dist: UpstreamHashShard(t._row_id) }
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/generic/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<PlanRef: GenericPlanRef> Agg<PlanRef> {
self.agg_calls.iter().all(|c| {
matches!(c.agg_kind, agg_kinds::single_value_state!())
|| (matches!(c.agg_kind, agg_kinds::single_value_state_iff_in_append_only!() if stream_input_append_only))
|| (matches!(c.agg_kind, AggKind::ApproxPercentile))
})
}

Expand Down
76 changes: 63 additions & 13 deletions src/frontend/src/optimizer/plan_node/logical_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,39 @@ impl LogicalAgg {
/// Generate plan for stateless 2-phase streaming agg.
/// Should only be used iff input is distributed. Input must be converted to stream form.
fn gen_stateless_two_phase_streaming_agg_plan(&self, stream_input: PlanRef) -> Result<PlanRef> {
println!("generating stateless simple agg plan");
debug_assert!(self.group_key().is_empty());
let mut core = self.core.clone();
core.input = stream_input;

// First, handle approx percentile.
let has_approx_percentile = self
.agg_calls()
.iter()
.any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile);
let approx_percentile_info = if has_approx_percentile {
let (
approx_percentile_agg_call,
non_approx_percentile_agg_calls,
lhs_mapping,
rhs_mapping,
) = self.extract_approx_percentile();
if non_approx_percentile_agg_calls.is_empty() {
let approx_percentile_agg: PlanRef =
self.build_approx_percentile_agg(stream_input.clone(), &approx_percentile_agg_call);
return Ok(approx_percentile_agg);
} else {
let stream_input: PlanRef = StreamShare::new_from_input(stream_input).into();
let approx_percentile_agg: PlanRef =
self.build_approx_percentile_agg(stream_input.clone(), &approx_percentile_agg_call);

core.input = stream_input;
core.agg_calls = non_approx_percentile_agg_calls;
Some((approx_percentile_agg, lhs_mapping, rhs_mapping))
}
} else {
core.input = stream_input;
None
};
let local_agg = StreamStatelessSimpleAgg::new(core);
let exchange =
RequiredDist::single().enforce_if_not_satisfies(local_agg.into(), &Order::any())?;
Expand All @@ -78,7 +108,17 @@ impl LogicalAgg {
IndexSet::empty(),
exchange,
));
Ok(global_agg.into())
if let Some((approx_percentile_agg, lhs_mapping, rhs_mapping)) = approx_percentile_info {
let keyed_merge = StreamKeyedMerge::new(
global_agg.into(),
approx_percentile_agg,
lhs_mapping,
rhs_mapping,
);
Ok(keyed_merge.into())
} else {
Ok(global_agg.into())
}
}

/// Generate plan for stateless/stateful 2-phase streaming agg.
Expand Down Expand Up @@ -198,15 +238,14 @@ impl LogicalAgg {
} else {
self.core.can_two_phase_agg()
});

// Handle 2-phase approx_percentile aggregation.
if self
.agg_calls()
.iter()
.any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile)
{
return self.gen_approx_percentile_plan(stream_input, ctx);
}
// // Handle 2-phase approx_percentile aggregation.
// if self
// .agg_calls()
// .iter()
// .any(|agg_call| agg_call.agg_kind == AggKind::ApproxPercentile)
// {
// return self.gen_two_phase_approx_percentile_plan(stream_input, ctx);
// }

// Stateless 2-phase simple agg
// can be applied on stateless simple agg calls,
Expand Down Expand Up @@ -277,7 +316,7 @@ impl LogicalAgg {
// for the agg step, we need to count by bucket id, and not just dispatch a single record
// downstream. So we need a custom executor for this logic.
// TODO: Ban distinct agg?
fn gen_approx_percentile_plan(
fn gen_two_phase_approx_percentile_plan(
&self,
stream_input: PlanRef,
ctx: &mut ToStreamContext,
Expand All @@ -295,14 +334,16 @@ impl LogicalAgg {
self.extract_approx_percentile();
let approx_percentile_agg =
self.build_approx_percentile_agg(shared_input.clone(), &approx_percentile_agg_call);
let simple_agg_without_approx_percentile = Agg::new(
let simple_agg_without_approx_percentile: PlanRef = Agg::new(
non_approx_percentile_agg_calls,
self.group_key().clone(),
shared_input,
)
.with_grouping_sets(self.grouping_sets().clone())
.with_enable_two_phase(self.core().enable_two_phase)
.into();
let simple_agg_without_approx_percentile =
simple_agg_without_approx_percentile.to_stream(ctx)?;
let agg_merge = StreamKeyedMerge::new(
simple_agg_without_approx_percentile,
approx_percentile_agg,
Expand Down Expand Up @@ -1246,6 +1287,15 @@ impl ToStream for LogicalAgg {
},
final_agg.agg_calls().len(),
)
} else if let Some(approx_percentile_agg) = plan.as_stream_global_approx_percentile() {
if eowc {
return Err(ErrorCode::InvalidInputSyntax(
"`EMIT ON WINDOW CLOSE` cannot be used for aggregation without `GROUP BY`"
.to_string(),
)
.into());
}
(plan.clone(), 1)
} else {
panic!("the root PlanNode must be either StreamHashAgg or StreamSimpleAgg");
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
Expand All @@ -38,15 +39,16 @@ pub struct StreamGlobalApproxPercentile {
impl StreamGlobalApproxPercentile {
pub fn new(input: PlanRef, approx_percentile_agg_call: &PlanAggCall) -> Self {
let schema = Schema::new(vec![Field::new("approx_percentile", DataType::Float64)]);
let watermark_columns = FixedBitSet::with_capacity(1);
let base = PlanBase::new_stream(
input.ctx(),
schema,
input.stream_key().map(|k| k.to_vec()),
Some(vec![0]),
input.functional_dependency().clone(),
input.distribution().clone(),
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
watermark_columns,
input.columns_monotonicity().clone(),
);
Self { base, input }
Expand Down Expand Up @@ -92,6 +94,5 @@ impl ExprRewritable for StreamGlobalApproxPercentile {

impl ExprVisitable for StreamGlobalApproxPercentile {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
unimplemented!()
}
}
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_node/stream_keyed_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl StreamKeyedMerge {
lhs_mapping: ColIndexMapping,
rhs_mapping: ColIndexMapping,
) -> Self {
// FIXME: schema is wrong.
let base = PlanBase::new_stream(
lhs_input.ctx(),
lhs_input.schema().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use fixedbitset::FixedBitSet;
use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
Expand Down Expand Up @@ -41,7 +42,6 @@ pub struct StreamLocalApproxPercentile {
quantile: Literal,
relative_error: Literal,
percentile_col: InputRef,
order_type: OrderType,
}

impl StreamLocalApproxPercentile {
Expand All @@ -50,6 +50,8 @@ impl StreamLocalApproxPercentile {
Field::new("bucket_id", DataType::Int64),
Field::new("count", DataType::Int64),
]);
// FIXME(kwannoel): How does watermark work with FixedBitSet
let watermark_columns = FixedBitSet::with_capacity(2);
let base = PlanBase::new_stream(
input.ctx(),
schema,
Expand All @@ -58,7 +60,7 @@ impl StreamLocalApproxPercentile {
input.distribution().clone(),
input.append_only(),
input.emit_on_window_close(),
input.watermark_columns().clone(),
watermark_columns,
input.columns_monotonicity().clone(),
);
Self {
Expand All @@ -67,7 +69,6 @@ impl StreamLocalApproxPercentile {
quantile: approx_percentile_agg_call.direct_args[0].clone(),
relative_error: approx_percentile_agg_call.direct_args[1].clone(),
percentile_col: approx_percentile_agg_call.inputs[0].clone(),
order_type: approx_percentile_agg_call.order_by[0].order_type,
}
}
}
Expand All @@ -84,7 +85,6 @@ impl Distill for StreamLocalApproxPercentile {
));
out.push(("quantile", Pretty::debug(&self.quantile)));
out.push(("relative_error", Pretty::debug(&self.relative_error)));
out.push(("order_type", Pretty::display(&self.order_type)));
if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
out.push(("output_watermarks", ow));
}
Expand All @@ -104,7 +104,6 @@ impl PlanTreeNodeUnary for StreamLocalApproxPercentile {
quantile: self.quantile.clone(),
relative_error: self.relative_error.clone(),
percentile_col: self.percentile_col.clone(),
order_type: self.order_type,
}
}
}
Expand All @@ -129,6 +128,5 @@ impl ExprRewritable for StreamLocalApproxPercentile {

impl ExprVisitable for StreamLocalApproxPercentile {
fn visit_exprs(&self, v: &mut dyn ExprVisitor) {
unimplemented!()
}
}

0 comments on commit 8c6ea39

Please sign in to comment.