Skip to content

Commit

Permalink
reorganize bind_function
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Aug 22, 2024
1 parent c00589f commit 8ab2ce5
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 222 deletions.
69 changes: 31 additions & 38 deletions src/frontend/src/binder/expr/function/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use itertools::Itertools;
use risingwave_common::bail_not_implemented;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_expr::aggregate::{agg_kinds, AggKind, PbAggKind};
use risingwave_sqlparser::ast::{Function, FunctionArgExpr};
use risingwave_sqlparser::ast::{self, FunctionArgExpr};

use crate::binder::Clause;
use crate::error::{ErrorCode, Result};
Expand Down Expand Up @@ -48,21 +48,22 @@ impl Binder {

pub(super) fn bind_aggregate_function(
&mut self,
f: Function,
kind: AggKind,
distinct: bool,
args: Vec<ExprImpl>,
order_by: Vec<ast::OrderByExpr>,
within_group: Option<Box<ast::OrderByExpr>>,
filter: Option<Box<ast::Expr>>,
) -> Result<ExprImpl> {
self.ensure_aggregate_allowed()?;

let distinct = f.arg_list.distinct;
let filter_expr = f.filter.clone();

let (direct_args, args, order_by) = if matches!(kind, agg_kinds::ordered_set!()) {
self.bind_ordered_set_agg(f, kind.clone())?
self.bind_ordered_set_agg(&kind, distinct, args, order_by, within_group)?
} else {
self.bind_normal_agg(f, kind.clone())?
self.bind_normal_agg(&kind, distinct, args, order_by, within_group)?
};

let filter = match filter_expr {
let filter = match filter {
Some(filter) => {
let mut clause = Some(Clause::Filter);
std::mem::swap(&mut self.context.clause, &mut clause);
Expand Down Expand Up @@ -96,50 +97,47 @@ impl Binder {

fn bind_ordered_set_agg(
&mut self,
f: Function,
kind: AggKind,
kind: &AggKind,
distinct: bool,
args: Vec<ExprImpl>,
order_by: Vec<ast::OrderByExpr>,
within_group: Option<Box<ast::OrderByExpr>>,
) -> Result<(Vec<Literal>, Vec<ExprImpl>, OrderBy)> {
// Syntax:
// aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause ) [ FILTER
// ( WHERE filter_clause ) ]

assert!(matches!(kind, agg_kinds::ordered_set!()));

if !f.arg_list.order_by.is_empty() {
if !order_by.is_empty() {
return Err(ErrorCode::InvalidInputSyntax(format!(
"ORDER BY is not allowed for ordered-set aggregation `{}`",
"`ORDER BY` is not allowed for ordered-set aggregation `{}`",
kind
))
.into());
}
if f.arg_list.distinct {
if distinct {
return Err(ErrorCode::InvalidInputSyntax(format!(
"DISTINCT is not allowed for ordered-set aggregation `{}`",
"`DISTINCT` is not allowed for ordered-set aggregation `{}`",
kind
))
.into());
}

let within_group = *f.within_group.ok_or_else(|| {
let within_group = *within_group.ok_or_else(|| {
ErrorCode::InvalidInputSyntax(format!(
"WITHIN GROUP is expected for ordered-set aggregation `{}`",
"`WITHIN GROUP` is expected for ordered-set aggregation `{}`",
kind
))
})?;

let mut direct_args: Vec<_> = f
.arg_list
.args
.into_iter()
.map(|arg| self.bind_function_arg(arg))
.flatten_ok()
.try_collect()?;
let mut direct_args = args;
let mut args =
self.bind_function_expr_arg(FunctionArgExpr::Expr(within_group.expr.clone()))?;
let order_by = OrderBy::new(vec![self.bind_order_by_expr(within_group)?]);

// check signature and do implicit cast
match (&kind, direct_args.len(), args.as_mut_slice()) {
match (kind, direct_args.len(), args.as_mut_slice()) {
(AggKind::Builtin(PbAggKind::PercentileCont | PbAggKind::PercentileDisc), 1, [arg]) => {
let fraction = &mut direct_args[0];
decimal_to_float64(fraction, &kind)?;
Expand Down Expand Up @@ -198,8 +196,11 @@ impl Binder {

fn bind_normal_agg(
&mut self,
f: Function,
kind: AggKind,
kind: &AggKind,
distinct: bool,
args: Vec<ExprImpl>,
order_by: Vec<ast::OrderByExpr>,
within_group: Option<Box<ast::OrderByExpr>>,
) -> Result<(Vec<Literal>, Vec<ExprImpl>, OrderBy)> {
// Syntax:
// aggregate_name (expression [ , ... ] [ order_by_clause ] ) [ FILTER ( WHERE
Expand All @@ -212,30 +213,22 @@ impl Binder {

assert!(!matches!(kind, agg_kinds::ordered_set!()));

if f.within_group.is_some() {
if within_group.is_some() {
return Err(ErrorCode::InvalidInputSyntax(format!(
"WITHIN GROUP is not allowed for non-ordered-set aggregation `{}`",
"`WITHIN GROUP` is not allowed for non-ordered-set aggregation `{}`",
kind
))
.into());
}

let args: Vec<_> = f
.arg_list
.args
.iter()
.map(|arg| self.bind_function_arg(arg.clone()))
.flatten_ok()
.try_collect()?;
let order_by = OrderBy::new(
f.arg_list
.order_by
order_by
.into_iter()
.map(|e| self.bind_order_by_expr(e))
.try_collect()?,
);

if f.arg_list.distinct {
if distinct {
if matches!(
kind,
AggKind::Builtin(PbAggKind::ApproxCountDistinct)
Expand Down
Loading

0 comments on commit 8ab2ce5

Please sign in to comment.