Skip to content

Commit

Permalink
feat(expr): support implicit cast of arguments for table and aggregat…
Browse files Browse the repository at this point in the history
…e functions (#13545)

Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Nov 28, 2023
1 parent ce73e80 commit c47b9ed
Show file tree
Hide file tree
Showing 22 changed files with 358 additions and 254 deletions.
54 changes: 27 additions & 27 deletions e2e_test/batch/types/jsonb.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -159,113 +159,113 @@ SELECT '{

# jsonb_array_elements
query T
select * from jsonb_array_elements('[1,true, [2,false]]'::jsonb);
select * from jsonb_array_elements('[1,true, [2,false]]');
----
1
true
[2, false]

statement error cannot extract elements
select * from jsonb_array_elements('null'::jsonb)
select * from jsonb_array_elements('null');

statement error cannot extract elements
select * from jsonb_array_elements('1'::jsonb)
select * from jsonb_array_elements('1');

statement error cannot extract elements
select * from jsonb_array_elements('"string"'::jsonb)
select * from jsonb_array_elements('"string"');

statement error cannot extract elements
select * from jsonb_array_elements('{}'::jsonb)
select * from jsonb_array_elements('{}');


# jsonb_array_elements_text
query T
select * from jsonb_array_elements_text('["foo", "bar"]'::jsonb);
select * from jsonb_array_elements_text('["foo", "bar"]');
----
foo
bar

statement error cannot extract elements
select * from jsonb_array_elements_text('null'::jsonb)
select * from jsonb_array_elements_text('null');

statement error cannot extract elements
select * from jsonb_array_elements_text('1'::jsonb)
select * from jsonb_array_elements_text('1');

statement error cannot extract elements
select * from jsonb_array_elements_text('"string"'::jsonb)
select * from jsonb_array_elements_text('"string"');

statement error cannot extract elements
select * from jsonb_array_elements_text('{}'::jsonb)
select * from jsonb_array_elements_text('{}');

# jsonb_object_keys
query T
select * from jsonb_object_keys('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}'::jsonb);
select * from jsonb_object_keys('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}');
----
f1
f2

statement error cannot call jsonb_object_keys
select * from jsonb_object_keys('null'::jsonb)
select * from jsonb_object_keys('null');

statement error cannot call jsonb_object_keys
select * from jsonb_object_keys('1'::jsonb)
select * from jsonb_object_keys('1');

statement error cannot call jsonb_object_keys
select * from jsonb_object_keys('"string"'::jsonb)
select * from jsonb_object_keys('"string"');

statement error cannot call jsonb_object_keys
select * from jsonb_object_keys('[]'::jsonb)
select * from jsonb_object_keys('[]');


# jsonb_each
query TT
select * from jsonb_each('{"a":"foo", "b":"bar"}'::jsonb);
select * from jsonb_each('{"a":"foo", "b":"bar"}');
----
a "foo"
b "bar"

query T
select jsonb_each('{"a":"foo", "b":"bar"}'::jsonb);
select jsonb_each('{"a":"foo", "b":"bar"}');
----
(a,"""foo""")
(b,"""bar""")

statement error cannot deconstruct
select * from jsonb_each('null'::jsonb)
select * from jsonb_each('null');

statement error cannot deconstruct
select * from jsonb_each('1'::jsonb)
select * from jsonb_each('1');

statement error cannot deconstruct
select * from jsonb_each('"string"'::jsonb)
select * from jsonb_each('"string"');

statement error cannot deconstruct
select * from jsonb_each('[]'::jsonb)
select * from jsonb_each('[]');

# jsonb_each_text
query TT
select * from jsonb_each_text('{"a":"foo", "b":"bar"}'::jsonb);
select * from jsonb_each_text('{"a":"foo", "b":"bar"}');
----
a foo
b bar

query T
select jsonb_each_text('{"a":"foo", "b":"bar"}'::jsonb);
select jsonb_each_text('{"a":"foo", "b":"bar"}');
----
(a,foo)
(b,bar)

statement error cannot deconstruct
select * from jsonb_each_text('null'::jsonb)
select * from jsonb_each_text('null');

statement error cannot deconstruct
select * from jsonb_each_text('1'::jsonb)
select * from jsonb_each_text('1');

statement error cannot deconstruct
select * from jsonb_each_text('"string"'::jsonb)
select * from jsonb_each_text('"string"');

statement error cannot deconstruct
select * from jsonb_each_text('[]'::jsonb)
select * from jsonb_each_text('[]');

query TTTTT
SELECT js,
Expand Down
2 changes: 2 additions & 0 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub enum ErrorCode {
// Tips: Use this only if it's intended to reject the query
#[error("Not supported: {0}\nHINT: {1}")]
NotSupported(String, String),
#[error("function {0} does not exist")]
NoFunction(String),
#[error(transparent)]
IoError(#[from] IoError),
#[error("Storage error: {0}")]
Expand Down
22 changes: 12 additions & 10 deletions src/expr/core/src/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::array::StreamChunk;
use risingwave_common::estimate_size::EstimateSize;
use risingwave_common::types::{DataType, Datum};

use crate::sig::FuncBuilder;
use crate::{ExprError, Result};

// aggregate definition
Expand Down Expand Up @@ -131,18 +132,16 @@ pub fn build_retractable(agg: &AggCall) -> Result<BoxedAggregateFunction> {
build(agg, false)
}

/// Build an `Aggregator` from `AggCall`.
/// Build an aggregate function.
///
/// If `prefer_append_only` is true, and both append-only and retractable implementations exist,
/// the append-only version will be used.
///
/// NOTE: This function ignores argument indices, `column_orders`, `filter` and `distinct` in
/// `AggCall`. Such operations should be done in batch or streaming executors.
pub fn build(agg: &AggCall, append_only: bool) -> Result<BoxedAggregateFunction> {
let desc = crate::sig::FUNCTION_REGISTRY
.get_aggregate(
agg.kind,
agg.args.arg_types(),
&agg.return_type,
append_only,
)
pub fn build(agg: &AggCall, prefer_append_only: bool) -> Result<BoxedAggregateFunction> {
let sig = crate::sig::FUNCTION_REGISTRY
.get(agg.kind, agg.args.arg_types(), &agg.return_type)
.ok_or_else(|| {
ExprError::UnsupportedFunction(format!(
"{}({}) -> {}",
Expand All @@ -152,5 +151,8 @@ pub fn build(agg: &AggCall, append_only: bool) -> Result<BoxedAggregateFunction>
))
})?;

desc.build_aggregate(agg)
if let FuncBuilder::Aggregate{ append_only: Some(f), .. } = sig.build && prefer_append_only {
return f(agg);
}
sig.build_aggregate(agg)
}
115 changes: 79 additions & 36 deletions src/expr/core/src/sig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,42 @@ pub struct FunctionRegistry(HashMap<FuncName, Vec<FuncSign>>);
impl FunctionRegistry {
/// Inserts a function signature.
pub fn insert(&mut self, sig: FuncSign) {
self.0.entry(sig.name).or_default().push(sig)
let list = self.0.entry(sig.name).or_default();
if sig.is_aggregate() {
// merge retractable and append-only aggregate
if let Some(existing) = list
.iter_mut()
.find(|d| d.inputs_type == sig.inputs_type && d.ret_type == sig.ret_type)
{
let (
FuncBuilder::Aggregate {
retractable,
append_only,
retractable_state_type,
append_only_state_type,
},
FuncBuilder::Aggregate {
retractable: r1,
append_only: a1,
retractable_state_type: rs1,
append_only_state_type: as1,
},
) = (&mut existing.build, sig.build)
else {
panic!("expected aggregate function")
};
if let Some(f) = r1 {
*retractable = Some(f);
*retractable_state_type = rs1;
}
if let Some(f) = a1 {
*append_only = Some(f);
*append_only_state_type = as1;
}
return;
}
}
list.push(sig);
}

/// Returns a function signature with the same type, argument types and return type.
Expand All @@ -76,27 +111,8 @@ impl FunctionRegistry {
}
}

/// Returns a function signature with the given type, argument types, return type.
///
/// The `prefer_append_only` flag only works when both append-only and retractable version exist.
/// Otherwise, return the signature of the only version.
pub fn get_aggregate(
&self,
ty: AggregateFunctionType,
args: &[DataType],
ret: &DataType,
prefer_append_only: bool,
) -> Option<&FuncSign> {
let v = self.0.get(&ty.into())?;
let mut iter = v.iter().filter(|d| d.match_args_ret(args, ret));
if iter.clone().count() == 2 {
iter.find(|d| d.append_only == prefer_append_only)
} else {
iter.next()
}
}

/// Returns the return type for the given function and arguments.
/// Deprecated functions are excluded.
pub fn get_return_type(
&self,
name: impl Into<FuncName>,
Expand All @@ -109,7 +125,7 @@ impl FunctionRegistry {
.ok_or_else(|| ExprError::UnsupportedFunction(name.to_string()))?;
let sig = v
.iter()
.find(|d| d.match_args(args))
.find(|d| d.match_args(args) && !d.deprecated)
.ok_or_else(|| ExprError::UnsupportedFunction(name.to_string()))?;
(sig.type_infer)(args)
}
Expand Down Expand Up @@ -154,13 +170,6 @@ pub struct FuncSign {
/// Whether the function is deprecated and should not be used in the frontend.
/// For backward compatibility, it is still available in the backend.
pub deprecated: bool,

/// The state type of the aggregate function.
/// `None` means equal to the return type.
pub state_type: Option<DataType>,

/// Whether the aggregate function is append-only.
pub append_only: bool,
}

impl fmt::Debug for FuncSign {
Expand All @@ -182,9 +191,6 @@ impl fmt::Debug for FuncSign {
if self.name.is_table() { "setof " } else { "" },
self.ret_type,
)?;
if self.append_only {
write!(f, " [append-only]")?;
}
if self.deprecated {
write!(f, " [deprecated]")?;
}
Expand Down Expand Up @@ -235,6 +241,28 @@ impl FuncSign {
matches!(self.name, FuncName::Aggregate(_))
}

/// Returns true if the aggregate function is append-only.
pub const fn is_append_only(&self) -> bool {
matches!(
self.build,
FuncBuilder::Aggregate {
retractable: None,
..
}
)
}

/// Returns true if the aggregate function has a retractable version.
pub const fn is_retractable(&self) -> bool {
matches!(
self.build,
FuncBuilder::Aggregate {
retractable: Some(_),
..
}
)
}

/// Builds the scalar function.
pub fn build_scalar(
&self,
Expand All @@ -260,10 +288,15 @@ impl FuncSign {
}
}

/// Builds the aggregate function.
/// Builds the aggregate function. If both retractable and append-only versions exist, the
/// retractable version will be built.
pub fn build_aggregate(&self, agg: &AggCall) -> Result<BoxedAggregateFunction> {
match self.build {
FuncBuilder::Aggregate(f) => f(agg),
FuncBuilder::Aggregate {
retractable,
append_only,
..
} => retractable.or(append_only).unwrap()(agg),
_ => panic!("Expected an aggregate function"),
}
}
Expand Down Expand Up @@ -385,7 +418,7 @@ impl SigDataType {
}
}

#[derive(Clone, Copy)]
#[derive(Clone)]
pub enum FuncBuilder {
Scalar(fn(return_type: DataType, children: Vec<BoxedExpression>) -> Result<BoxedExpression>),
Table(
Expand All @@ -395,7 +428,17 @@ pub enum FuncBuilder {
children: Vec<BoxedExpression>,
) -> Result<BoxedTableFunction>,
),
Aggregate(fn(agg: &AggCall) -> Result<BoxedAggregateFunction>),
// An aggregate function may contain both or either one of retractable and append-only versions.
Aggregate {
retractable: Option<fn(agg: &AggCall) -> Result<BoxedAggregateFunction>>,
append_only: Option<fn(agg: &AggCall) -> Result<BoxedAggregateFunction>>,
/// The state type of the retractable aggregate function.
/// `None` means equal to the return type.
retractable_state_type: Option<DataType>,
/// The state type of the append-only aggregate function.
/// `None` means equal to the return type.
append_only_state_type: Option<DataType>,
},
}

/// Register a function into global registry.
Expand Down
9 changes: 2 additions & 7 deletions src/expr/core/src/window_function/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,11 @@ impl AggregateState {
direct_args: vec![],
};
let agg_func_sig = FUNCTION_REGISTRY
.get_aggregate(
agg_kind,
&arg_data_types,
&call.return_type,
false, // means prefer retractable version
)
.get(agg_kind, &arg_data_types, &call.return_type)
.expect("the agg func must exist");
let agg_func = agg_func_sig.build_aggregate(&agg_call)?;
let (agg_impl, enable_delta) =
if !agg_func_sig.append_only && call.frame.exclusion.is_no_others() {
if agg_func_sig.is_retractable() && call.frame.exclusion.is_no_others() {
let init_state = agg_func.create_state();
(AggImpl::Incremental(init_state), true)
} else {
Expand Down
Loading

0 comments on commit c47b9ed

Please sign in to comment.