Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(expr): allow defining functions in frontend #12287

Merged
merged 10 commits into from
Sep 18, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ risingwave_compute = { path = "./src/compute" }
risingwave_ctl = { path = "./src/ctl" }
risingwave_connector = { path = "./src/connector" }
risingwave_expr = { path = "./src/expr" }
risingwave_expr_macro = { path = "./src/expr/macro" }
risingwave_frontend = { path = "./src/frontend" }
risingwave_hummock_sdk = { path = "./src/storage/hummock_sdk" }
risingwave_hummock_test = { path = "./src/storage/hummock_test" }
Expand Down
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ message ExprNode {
VNODE = 1101;
// Non-deterministic functions
PROCTIME = 2023;

// Adminitration functions
COL_DESCRIPTION = 2100;
}
Type function_type = 1;
data.DataType return_type = 3;
Expand Down
40 changes: 20 additions & 20 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl FunctionAttr {

let pb_type = format_ident!("{}", utils::to_camel_case(&name));
let ctor_name = format_ident!("{}", self.ident_name());
let descriptor_type = quote! { crate::sig::func::FuncSign };
let descriptor_type = quote! { risingwave_expr::sig::func::FuncSign };
let build_fn = if build_fn {
let name = format_ident!("{}", user_fn.name);
quote! { #name }
Expand All @@ -79,7 +79,7 @@ impl FunctionAttr {
#[ctor::ctor]
fn #ctor_name() {
use risingwave_common::types::{DataType, DataTypeName};
unsafe { crate::sig::func::_register(#descriptor_type {
unsafe { risingwave_expr::sig::func::_register(#descriptor_type {
func: risingwave_pb::expr::expr_node::Type::#pb_type,
inputs_type: &[#(#args),*],
ret_type: #ret,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl FunctionAttr {
let #prebuilt_inputs = match &#prebuilt_inputs {
Some(s) => s.as_scalar_ref_impl().try_into()?,
// the function should always return null if any const argument is null
None => return Ok(Box::new(crate::expr::LiteralExpression::new(
None => return Ok(Box::new(risingwave_expr::expr::LiteralExpression::new(
return_type,
None,
))),
Expand Down Expand Up @@ -358,8 +358,8 @@ impl FunctionAttr {
};

Ok(quote! {
|return_type: DataType, children: Vec<crate::expr::BoxedExpression>|
-> crate::Result<crate::expr::BoxedExpression>
|return_type: DataType, children: Vec<risingwave_expr::expr::BoxedExpression>|
-> risingwave_expr::Result<risingwave_expr::expr::BoxedExpression>
{
use std::sync::Arc;
use risingwave_common::array::*;
Expand All @@ -369,10 +369,10 @@ impl FunctionAttr {
use risingwave_common::util::iter_util::ZipEqFast;
use itertools::multizip;

use crate::expr::{Context, BoxedExpression};
use crate::Result;
use risingwave_expr::expr::{Context, BoxedExpression};
use risingwave_expr::Result;

crate::ensure!(children.len() == #num_args);
risingwave_expr::ensure!(children.len() == #num_args);
let prebuilt_arg = #prebuild_const;
let context = Context {
return_type,
Expand All @@ -388,7 +388,7 @@ impl FunctionAttr {
prebuilt_arg: #prebuilt_arg_type,
}
#[async_trait::async_trait]
impl crate::expr::Expression for #struct_name {
impl risingwave_expr::expr::Expression for #struct_name {
fn return_type(&self) -> DataType {
self.context.return_type.clone()
}
Expand Down Expand Up @@ -436,7 +436,7 @@ impl FunctionAttr {

let pb_type = format_ident!("{}", utils::to_camel_case(&name));
let ctor_name = format_ident!("{}", self.ident_name());
let descriptor_type = quote! { crate::sig::agg::AggFuncSig };
let descriptor_type = quote! { risingwave_expr::sig::agg::AggFuncSig };
let build_fn = if build_fn {
let name = format_ident!("{}", user_fn.name);
quote! { #name }
Expand All @@ -447,8 +447,8 @@ impl FunctionAttr {
#[ctor::ctor]
fn #ctor_name() {
use risingwave_common::types::{DataType, DataTypeName};
unsafe { crate::sig::agg::_register(#descriptor_type {
func: crate::agg::AggKind::#pb_type,
unsafe { risingwave_expr::sig::agg::_register(#descriptor_type {
func: risingwave_expr::agg::AggKind::#pb_type,
inputs_type: &[#(#args),*],
ret_type: #ret,
build: #build_fn,
Expand Down Expand Up @@ -561,16 +561,16 @@ impl FunctionAttr {
use risingwave_common::buffer::Bitmap;
use risingwave_common::estimate_size::EstimateSize;

use crate::Result;
use crate::agg::AggregateState;
use risingwave_expr::Result;
use risingwave_expr::agg::AggregateState;

#[derive(Clone)]
struct Agg {
return_type: DataType,
}

#[async_trait::async_trait]
impl crate::agg::AggregateFunction for Agg {
impl risingwave_expr::agg::AggregateFunction for Agg {
fn return_type(&self) -> DataType {
self.return_type.clone()
}
Expand Down Expand Up @@ -664,7 +664,7 @@ impl FunctionAttr {

let pb_type = format_ident!("{}", utils::to_camel_case(&name));
let ctor_name = format_ident!("{}", self.ident_name());
let descriptor_type = quote! { crate::sig::table_function::FuncSign };
let descriptor_type = quote! { risingwave_expr::sig::table_function::FuncSign };
let build_fn = if build_fn {
let name = format_ident!("{}", user_fn.name);
quote! { #name }
Expand All @@ -687,7 +687,7 @@ impl FunctionAttr {
#[ctor::ctor]
fn #ctor_name() {
use risingwave_common::types::{DataType, DataTypeName};
unsafe { crate::sig::table_function::_register(#descriptor_type {
unsafe { risingwave_expr::sig::table_function::_register(#descriptor_type {
func: risingwave_pb::expr::table_function::Type::#pb_type,
inputs_type: &[#(#args),*],
ret_type: #ret,
Expand Down Expand Up @@ -794,15 +794,15 @@ impl FunctionAttr {
use risingwave_common::util::iter_util::ZipEqFast;
use itertools::multizip;

crate::ensure!(children.len() == #num_args);
risingwave_expr::ensure!(children.len() == #num_args);
let mut iter = children.into_iter();
#(let #all_child = iter.next().unwrap();)*
#(
let #const_child = #const_child.eval_const()?;
let #const_child = match &#const_child {
Some(s) => s.as_scalar_ref_impl().try_into()?,
// the function should always return empty if any const argument is null
None => return Ok(crate::table_function::empty(return_type)),
None => return Ok(risingwave_expr::table_function::empty(return_type)),
};
)*

Expand All @@ -814,7 +814,7 @@ impl FunctionAttr {
prebuilt_arg: #prebuilt_arg_type,
}
#[async_trait::async_trait]
impl crate::table_function::TableFunction for #struct_name {
impl risingwave_expr::table_function::TableFunction for #struct_name {
fn return_type(&self) -> DataType {
self.return_type.clone()
}
Expand Down
4 changes: 3 additions & 1 deletion src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#![feature(test)]
#![feature(arc_unwrap_or_clone)]

extern crate self as risingwave_expr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today I learned. 🤯


pub mod agg;
mod error;
pub mod expr;
Expand All @@ -34,5 +36,5 @@ pub mod vector_op;
pub mod window_function;

pub use error::{ExprError, Result};
use risingwave_common::{bail, ensure};
pub use risingwave_common::{bail, ensure};
pub use risingwave_expr_macro::*;
2 changes: 2 additions & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ auto_enums = { version = "0.8", features = ["futures03"] }
bk-tree = "0.5.0"
bytes = "1"
clap = { version = "4", features = ["derive"] }
ctor = "0.2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about re-exporting ctor::ctor from the expr crate?

downcast-rs = "1.2"
dyn-clone = "1.0.13"
easy-ext = "1"
Expand Down Expand Up @@ -54,6 +55,7 @@ risingwave_common = { workspace = true }
risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_expr = { workspace = true }
risingwave_expr_macro = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can re-export these macros from the expr crate so that we don't need to depend on the macro crate. (like #[tokio::main])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea how to do that, we've already made risingwave_expr::function mod public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove that empty module now to avoid name conflict. 🤣

risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
risingwave_source = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ impl Binder {
// TODO: really implement them.
// https://www.postgresql.org/docs/9.5/functions-info.html#FUNCTIONS-INFO-COMMENT-TABLE
// WARN: Hacked in [`Binder::bind_function`]!!!
("col_description", raw_literal(ExprImpl::literal_varchar("".to_string()))),
("col_description", raw_call(ExprType::ColDescription)),
("obj_description", raw_literal(ExprImpl::literal_varchar("".to_string()))),
("shobj_description", raw_literal(ExprImpl::literal_varchar("".to_string()))),
("pg_is_in_recovery", raw_literal(ExprImpl::literal_bool(false))),
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/expr/function/col_description.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::fmt::Write;
TennyZhuang marked this conversation as resolved.
Show resolved Hide resolved

use risingwave_expr::ExprError;
use risingwave_expr_macro::function;

#[function("col_description(varchar, int32) -> varchar")]
fn col_description(_name: &str, _col: i32, writer: &mut impl Write) -> Result<(), ExprError> {
writer.write_str("").unwrap();
TennyZhuang marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}
1 change: 1 addition & 0 deletions src/frontend/src/expr/function/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod col_description;
TennyZhuang marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub use order_by_expr::{OrderBy, OrderByExpr};
mod expr_mutator;
mod expr_rewriter;
mod expr_visitor;
mod function;
TennyZhuang marked this conversation as resolved.
Show resolved Hide resolved
mod session_timezone;
mod type_inference;
mod utils;
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ impl ExprVisitor<bool> for ImpureAnalyzer {
x
}
// expression output is not deterministic
expr_node::Type::Vnode | expr_node::Type::Proctime => true,
expr_node::Type::Vnode
| expr_node::Type::Proctime
| expr_node::Type::ColDescription => true,
}
}
}
Expand Down