From 71d81704ec1c3ff7fb9375fd111a0aab3b3c5f6b Mon Sep 17 00:00:00 2001 From: TennyZhuang Date: Mon, 18 Sep 2023 15:58:26 +0800 Subject: [PATCH] refactor(expr): allow defining functions in frontend (#12287) Signed-off-by: TennyZhuang Co-authored-by: zwang28 <70626450+zwang28@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- proto/expr.proto | 3 ++ src/batch/src/executor/sort_over_window.rs | 5 +- src/expr/macro/src/gen.rs | 48 +++++++++---------- src/expr/src/lib.rs | 6 ++- .../src/window_function/state/aggregate.rs | 2 +- src/expr/src/window_function/state/buffer.rs | 4 +- src/expr/src/window_function/state/mod.rs | 3 +- .../src/window_function/state/row_number.rs | 4 +- src/frontend/src/binder/expr/function.rs | 4 +- .../expr/function_impl/col_description.rs} | 12 ++++- .../src/expr/function_impl}/mod.rs | 2 +- src/frontend/src/expr/mod.rs | 1 + src/frontend/src/expr/pure.rs | 3 +- src/frontend/src/expr/window_function.rs | 2 +- .../plan_node/generic/over_window.rs | 2 +- .../plan_node/logical_over_window.rs | 2 +- .../rule/over_window_to_agg_and_join_rule.rs | 2 +- .../rule/over_window_to_topn_rule.rs | 2 +- src/stream/src/from_proto/eowc_over_window.rs | 2 +- src/stream/src/from_proto/over_window.rs | 2 +- .../integration_tests/eowc_over_window.rs | 2 +- .../tests/integration_tests/over_window.rs | 2 +- 22 files changed, 65 insertions(+), 50 deletions(-) rename src/{expr/src/function/mod.rs => frontend/src/expr/function_impl/col_description.rs} (62%) rename src/{expr/src/function/window => frontend/src/expr/function_impl}/mod.rs (94%) diff --git a/proto/expr.proto b/proto/expr.proto index a0e655aaee0f7..7d24241850918 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -224,6 +224,9 @@ message ExprNode { PG_SLEEP = 2024; PG_SLEEP_FOR = 2025; PG_SLEEP_UNTIL = 2026; + + // Adminitration functions + COL_DESCRIPTION = 2100; } Type function_type = 1; data.DataType return_type = 3; diff --git a/src/batch/src/executor/sort_over_window.rs b/src/batch/src/executor/sort_over_window.rs index f12ebc2452384..c8b6c7ef9388c 100644 --- a/src/batch/src/executor/sort_over_window.rs +++ b/src/batch/src/executor/sort_over_window.rs @@ -19,8 +19,9 @@ use risingwave_common::error::{Result, RwError}; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_expr::function::window::WindowFuncCall; -use risingwave_expr::window_function::{create_window_state, StateKey, WindowStates}; +use risingwave_expr::window_function::{ + create_window_state, StateKey, WindowFuncCall, WindowStates, +}; use risingwave_pb::batch_plan::plan_node::NodeBody; use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder}; diff --git a/src/expr/macro/src/gen.rs b/src/expr/macro/src/gen.rs index 56464f2bf2809..77026581492c2 100644 --- a/src/expr/macro/src/gen.rs +++ b/src/expr/macro/src/gen.rs @@ -71,7 +71,7 @@ impl FunctionAttr { let pb_type = format_ident!("{}", utils::to_camel_case(&name)); let ctor_name = format_ident!("{}", self.ident_name()); - let descriptor_type = quote! { crate::sig::func::FuncSign }; + let descriptor_type = quote! { risingwave_expr::sig::func::FuncSign }; let build_fn = if build_fn { let name = format_ident!("{}", user_fn.name); quote! { #name } @@ -80,10 +80,10 @@ impl FunctionAttr { }; let deprecated = self.deprecated; Ok(quote! { - #[ctor::ctor] + #[risingwave_expr::ctor] fn #ctor_name() { use risingwave_common::types::{DataType, DataTypeName}; - unsafe { crate::sig::func::_register(#descriptor_type { + unsafe { risingwave_expr::sig::func::_register(#descriptor_type { func: risingwave_pb::expr::expr_node::Type::#pb_type, inputs_type: &[#(#args),*], variadic: #variadic, @@ -203,7 +203,7 @@ impl FunctionAttr { let #prebuilt_inputs = match &#prebuilt_inputs { Some(s) => s.as_scalar_ref_impl().try_into()?, // the function should always return null if any const argument is null - None => return Ok(Box::new(crate::expr::LiteralExpression::new( + None => return Ok(Box::new(risingwave_expr::expr::LiteralExpression::new( return_type, None, ))), @@ -217,8 +217,8 @@ impl FunctionAttr { // ensure the number of children matches the number of arguments let check_children = match variadic { - true => quote! { crate::ensure!(children.len() >= #num_args); }, - false => quote! { crate::ensure!(children.len() == #num_args); }, + true => quote! { risingwave_expr::ensure!(children.len() >= #num_args); }, + false => quote! { risingwave_expr::ensure!(children.len() == #num_args); }, }; // evaluate variadic arguments in `eval` @@ -407,8 +407,8 @@ impl FunctionAttr { }; Ok(quote! { - |return_type: DataType, children: Vec| - -> crate::Result + |return_type: DataType, children: Vec| + -> risingwave_expr::Result { use std::sync::Arc; use risingwave_common::array::*; @@ -418,8 +418,8 @@ impl FunctionAttr { use risingwave_common::util::iter_util::ZipEqFast; use itertools::multizip; - use crate::expr::{Context, BoxedExpression}; - use crate::Result; + use risingwave_expr::expr::{Context, BoxedExpression}; + use risingwave_expr::Result; #check_children let prebuilt_arg = #prebuild_const; @@ -435,7 +435,7 @@ impl FunctionAttr { prebuilt_arg: #prebuilt_arg_type, } #[async_trait::async_trait] - impl crate::expr::Expression for #struct_name { + impl risingwave_expr::expr::Expression for #struct_name { fn return_type(&self) -> DataType { self.context.return_type.clone() } @@ -497,7 +497,7 @@ impl FunctionAttr { false => format_ident!("{}", self.ident_name()), true => format_ident!("{}_append_only", self.ident_name()), }; - let descriptor_type = quote! { crate::sig::agg::AggFuncSig }; + let descriptor_type = quote! { risingwave_expr::sig::agg::AggFuncSig }; let build_fn = if build_fn { let name = format_ident!("{}", user_fn.as_fn().name); quote! { #name } @@ -505,11 +505,11 @@ impl FunctionAttr { self.generate_agg_build_fn(user_fn)? }; Ok(quote! { - #[ctor::ctor] + #[risingwave_expr::ctor] fn #ctor_name() { use risingwave_common::types::{DataType, DataTypeName}; - unsafe { crate::sig::agg::_register(#descriptor_type { - func: crate::agg::AggKind::#pb_type, + unsafe { risingwave_expr::sig::agg::_register(#descriptor_type { + func: risingwave_expr::agg::AggKind::#pb_type, inputs_type: &[#(#args),*], state_type: #state_type, ret_type: #ret, @@ -678,8 +678,8 @@ impl FunctionAttr { use risingwave_common::buffer::Bitmap; use risingwave_common::estimate_size::EstimateSize; - use crate::Result; - use crate::agg::AggregateState; + use risingwave_expr::Result; + use risingwave_expr::agg::AggregateState; #[derive(Clone)] struct Agg { @@ -688,7 +688,7 @@ impl FunctionAttr { } #[async_trait::async_trait] - impl crate::agg::AggregateFunction for Agg { + impl risingwave_expr::agg::AggregateFunction for Agg { fn return_type(&self) -> DataType { self.return_type.clone() } @@ -780,7 +780,7 @@ impl FunctionAttr { let pb_type = format_ident!("{}", utils::to_camel_case(&name)); let ctor_name = format_ident!("{}", self.ident_name()); - let descriptor_type = quote! { crate::sig::table_function::FuncSign }; + let descriptor_type = quote! { risingwave_expr::sig::table_function::FuncSign }; let build_fn = if build_fn { let name = format_ident!("{}", user_fn.name); quote! { #name } @@ -800,10 +800,10 @@ impl FunctionAttr { quote! { |_| Ok(#ty) } }; Ok(quote! { - #[ctor::ctor] + #[risingwave_expr::ctor] fn #ctor_name() { use risingwave_common::types::{DataType, DataTypeName}; - unsafe { crate::sig::table_function::_register(#descriptor_type { + unsafe { risingwave_expr::sig::table_function::_register(#descriptor_type { func: risingwave_pb::expr::table_function::Type::#pb_type, inputs_type: &[#(#args),*], ret_type: #ret, @@ -910,7 +910,7 @@ impl FunctionAttr { use risingwave_common::util::iter_util::ZipEqFast; use itertools::multizip; - crate::ensure!(children.len() == #num_args); + risingwave_expr::ensure!(children.len() == #num_args); let mut iter = children.into_iter(); #(let #all_child = iter.next().unwrap();)* #( @@ -918,7 +918,7 @@ impl FunctionAttr { let #const_child = match &#const_child { Some(s) => s.as_scalar_ref_impl().try_into()?, // the function should always return empty if any const argument is null - None => return Ok(crate::table_function::empty(return_type)), + None => return Ok(risingwave_expr::table_function::empty(return_type)), }; )* @@ -930,7 +930,7 @@ impl FunctionAttr { prebuilt_arg: #prebuilt_arg_type, } #[async_trait::async_trait] - impl crate::table_function::TableFunction for #struct_name { + impl risingwave_expr::table_function::TableFunction for #struct_name { fn return_type(&self) -> DataType { self.return_type.clone() } diff --git a/src/expr/src/lib.rs b/src/expr/src/lib.rs index ee4cea38e4bb5..c8f2e432f79af 100644 --- a/src/expr/src/lib.rs +++ b/src/expr/src/lib.rs @@ -24,15 +24,17 @@ #![feature(test)] #![feature(arc_unwrap_or_clone)] +extern crate self as risingwave_expr; + pub mod agg; mod error; pub mod expr; -pub mod function; pub mod sig; pub mod table_function; pub mod vector_op; pub mod window_function; +pub use ctor::ctor; pub use error::{ExprError, Result}; -use risingwave_common::{bail, ensure}; +pub use risingwave_common::{bail, ensure}; pub use risingwave_expr_macro::*; diff --git a/src/expr/src/window_function/state/aggregate.rs b/src/expr/src/window_function/state/aggregate.rs index b67d42042bc91..749baf7784343 100644 --- a/src/expr/src/window_function/state/aggregate.rs +++ b/src/expr/src/window_function/state/aggregate.rs @@ -25,7 +25,7 @@ use smallvec::SmallVec; use super::buffer::WindowBuffer; use super::{StateEvictHint, StateKey, StatePos, WindowState}; use crate::agg::{build_append_only, AggArgs, AggCall, BoxedAggregateFunction}; -use crate::function::window::{WindowFuncCall, WindowFuncKind}; +use crate::window_function::{WindowFuncCall, WindowFuncKind}; use crate::Result; pub struct AggregateState { diff --git a/src/expr/src/window_function/state/buffer.rs b/src/expr/src/window_function/state/buffer.rs index 97f68b18375b2..a375c7bfec225 100644 --- a/src/expr/src/window_function/state/buffer.rs +++ b/src/expr/src/window_function/state/buffer.rs @@ -17,7 +17,7 @@ use std::ops::Range; use either::Either; -use crate::function::window::{Frame, FrameBounds, FrameExclusion}; +use crate::window_function::{Frame, FrameBounds, FrameExclusion}; struct Entry { key: K, @@ -238,7 +238,7 @@ mod tests { use itertools::Itertools; use super::*; - use crate::function::window::{Frame, FrameBound}; + use crate::window_function::{Frame, FrameBound}; #[test] fn test_rows_frame_unbounded_preceding_to_current_row() { diff --git a/src/expr/src/window_function/state/mod.rs b/src/expr/src/window_function/state/mod.rs index fe6e47b016d40..977a04b2a7a70 100644 --- a/src/expr/src/window_function/state/mod.rs +++ b/src/expr/src/window_function/state/mod.rs @@ -20,8 +20,7 @@ use risingwave_common::types::{Datum, DefaultOrdered}; use risingwave_common::util::memcmp_encoding::MemcmpEncoded; use smallvec::SmallVec; -use super::WindowFuncCall; -use crate::function::window::WindowFuncKind; +use super::{WindowFuncCall, WindowFuncKind}; use crate::sig::FuncSigDebug; use crate::{ExprError, Result}; diff --git a/src/expr/src/window_function/state/row_number.rs b/src/expr/src/window_function/state/row_number.rs index 01b713cbc1196..7abf30fc5f5c7 100644 --- a/src/expr/src/window_function/state/row_number.rs +++ b/src/expr/src/window_function/state/row_number.rs @@ -18,7 +18,7 @@ use risingwave_common::types::Datum; use smallvec::SmallVec; use super::{StateEvictHint, StateKey, StatePos, WindowState}; -use crate::function::window::WindowFuncCall; +use crate::window_function::WindowFuncCall; use crate::Result; #[derive(EstimateSize)] @@ -83,7 +83,7 @@ mod tests { use super::*; use crate::agg::AggArgs; - use crate::function::window::{Frame, FrameBound, WindowFuncKind}; + use crate::window_function::{Frame, FrameBound, WindowFuncKind}; fn create_state_key(pk: i64) -> StateKey { StateKey { diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 44c79a1620c79..1146050e8dde4 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -26,7 +26,7 @@ use risingwave_common::session_config::USER_NAME_WILD_CARD; use risingwave_common::types::{DataType, ScalarImpl, Timestamptz}; use risingwave_common::{GIT_SHA, RW_VERSION}; use risingwave_expr::agg::{agg_kinds, AggKind}; -use risingwave_expr::function::window::{ +use risingwave_expr::window_function::{ Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind, }; use risingwave_sqlparser::ast::{ @@ -1127,7 +1127,7 @@ impl Binder { // TODO: really implement them. // https://www.postgresql.org/docs/9.5/functions-info.html#FUNCTIONS-INFO-COMMENT-TABLE // WARN: Hacked in [`Binder::bind_function`]!!! - ("col_description", raw_literal(ExprImpl::literal_varchar("".to_string()))), + ("col_description", raw_call(ExprType::ColDescription)), ("obj_description", raw_literal(ExprImpl::literal_varchar("".to_string()))), ("shobj_description", raw_literal(ExprImpl::literal_varchar("".to_string()))), ("pg_is_in_recovery", raw_literal(ExprImpl::literal_bool(false))), diff --git a/src/expr/src/function/mod.rs b/src/frontend/src/expr/function_impl/col_description.rs similarity index 62% rename from src/expr/src/function/mod.rs rename to src/frontend/src/expr/function_impl/col_description.rs index ed628e99398ca..8b6458eba5d30 100644 --- a/src/expr/src/function/mod.rs +++ b/src/frontend/src/expr/function_impl/col_description.rs @@ -12,6 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod window; +use std::fmt::Write; -// TODO(rc): this module is to be removed +use risingwave_expr::{function, ExprError}; + +#[function("col_description(varchar, int32) -> varchar")] +fn col_description(_name: &str, _col: i32, writer: &mut impl Write) -> Result<(), ExprError> { + // TODO: Currently we don't support `COMMENT` statement, so we just return empty string. + writer.write_str("").unwrap(); + + Ok(()) +} diff --git a/src/expr/src/function/window/mod.rs b/src/frontend/src/expr/function_impl/mod.rs similarity index 94% rename from src/expr/src/function/window/mod.rs rename to src/frontend/src/expr/function_impl/mod.rs index add145718c948..33d402b4bb6af 100644 --- a/src/expr/src/function/window/mod.rs +++ b/src/frontend/src/expr/function_impl/mod.rs @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use crate::window_function::*; +mod col_description; diff --git a/src/frontend/src/expr/mod.rs b/src/frontend/src/expr/mod.rs index d999fcbe4c1e8..11fa86d682b7b 100644 --- a/src/frontend/src/expr/mod.rs +++ b/src/frontend/src/expr/mod.rs @@ -46,6 +46,7 @@ pub use order_by_expr::{OrderBy, OrderByExpr}; mod expr_mutator; mod expr_rewriter; mod expr_visitor; +mod function_impl; mod session_timezone; mod type_inference; mod utils; diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index cea3b9eb46bc2..91e92ee651205 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -212,7 +212,8 @@ impl ExprVisitor for ImpureAnalyzer { | expr_node::Type::Proctime | expr_node::Type::PgSleep | expr_node::Type::PgSleepFor - | expr_node::Type::PgSleepUntil => true, + | expr_node::Type::PgSleepUntil + | expr_node::Type::ColDescription => true, } } } diff --git a/src/frontend/src/expr/window_function.rs b/src/frontend/src/expr/window_function.rs index 62f961515cdd0..371a00dc6b62a 100644 --- a/src/frontend/src/expr/window_function.rs +++ b/src/frontend/src/expr/window_function.rs @@ -15,7 +15,7 @@ use itertools::Itertools; use risingwave_common::error::{ErrorCode, RwError}; use risingwave_common::types::DataType; -use risingwave_expr::function::window::{Frame, WindowFuncKind}; +use risingwave_expr::window_function::{Frame, WindowFuncKind}; use super::{AggCall, Expr, ExprImpl, OrderBy, RwResult}; diff --git a/src/frontend/src/optimizer/plan_node/generic/over_window.rs b/src/frontend/src/optimizer/plan_node/generic/over_window.rs index 5f7b0705fba26..c148711698a24 100644 --- a/src/frontend/src/optimizer/plan_node/generic/over_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/over_window.rs @@ -18,7 +18,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::types::DataType; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay}; -use risingwave_expr::function::window::{Frame, WindowFuncKind}; +use risingwave_expr::window_function::{Frame, WindowFuncKind}; use risingwave_pb::expr::PbWindowFunction; use super::{DistillUnit, GenericPlanNode, GenericPlanRef}; diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index b2057f28e05fc..ed74f379cf4ba 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::types::{DataType, Datum, ScalarImpl}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_expr::agg::AggKind; -use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncKind}; +use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind}; use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder}; use super::utils::impl_distill_by_unit; diff --git a/src/frontend/src/optimizer/rule/over_window_to_agg_and_join_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_agg_and_join_rule.rs index b9587650f8726..dbf3e9809675c 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_agg_and_join_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_agg_and_join_rule.rs @@ -13,7 +13,7 @@ // limitations under the License. use itertools::Itertools; -use risingwave_expr::function::window::WindowFuncKind; +use risingwave_expr::window_function::WindowFuncKind; use risingwave_pb::expr::expr_node::Type; use risingwave_pb::plan_common::JoinType; diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index 297522a41c8c9..dfb6963c7fb4f 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -14,7 +14,7 @@ use fixedbitset::FixedBitSet; use risingwave_common::types::DataType; -use risingwave_expr::function::window::WindowFuncKind; +use risingwave_expr::window_function::WindowFuncKind; use super::Rule; use crate::expr::{collect_input_refs, ExprImpl, ExprType}; diff --git a/src/stream/src/from_proto/eowc_over_window.rs b/src/stream/src/from_proto/eowc_over_window.rs index 0cd0060a40b68..bcee0736ae30f 100644 --- a/src/stream/src/from_proto/eowc_over_window.rs +++ b/src/stream/src/from_proto/eowc_over_window.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use risingwave_expr::function::window::WindowFuncCall; +use risingwave_expr::window_function::WindowFuncCall; use risingwave_pb::stream_plan::PbEowcOverWindowNode; use risingwave_storage::StateStore; diff --git a/src/stream/src/from_proto/over_window.rs b/src/stream/src/from_proto/over_window.rs index 7d139ca3f74db..e18e753caf126 100644 --- a/src/stream/src/from_proto/over_window.rs +++ b/src/stream/src/from_proto/over_window.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use risingwave_common::session_config::OverWindowCachePolicy; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_expr::function::window::WindowFuncCall; +use risingwave_expr::window_function::WindowFuncCall; use risingwave_pb::stream_plan::PbOverWindowNode; use risingwave_storage::StateStore; diff --git a/src/stream/tests/integration_tests/eowc_over_window.rs b/src/stream/tests/integration_tests/eowc_over_window.rs index d7d788680af55..35cc4954aff45 100644 --- a/src/stream/tests/integration_tests/eowc_over_window.rs +++ b/src/stream/tests/integration_tests/eowc_over_window.rs @@ -13,7 +13,7 @@ // limitations under the License. use risingwave_expr::agg::{AggArgs, AggKind}; -use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncCall, WindowFuncKind}; +use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncCall, WindowFuncKind}; use risingwave_stream::executor::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs}; use crate::prelude::*; diff --git a/src/stream/tests/integration_tests/over_window.rs b/src/stream/tests/integration_tests/over_window.rs index 4b7b53aaae31b..2377ce12e9147 100644 --- a/src/stream/tests/integration_tests/over_window.rs +++ b/src/stream/tests/integration_tests/over_window.rs @@ -14,7 +14,7 @@ use risingwave_common::session_config::OverWindowCachePolicy; use risingwave_expr::agg::{AggArgs, AggKind}; -use risingwave_expr::function::window::{ +use risingwave_expr::window_function::{ Frame, FrameBound, FrameExclusion, WindowFuncCall, WindowFuncKind, }; use risingwave_stream::executor::monitor::StreamingMetrics;