Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(expr): allow defining functions in frontend #12287

Merged
merged 10 commits into from
Sep 18, 2023
3 changes: 3 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ message ExprNode {
PG_SLEEP = 2024;
PG_SLEEP_FOR = 2025;
PG_SLEEP_UNTIL = 2026;

// Adminitration functions
COL_DESCRIPTION = 2100;
}
Type function_type = 1;
data.DataType return_type = 3;
Expand Down
5 changes: 3 additions & 2 deletions src/batch/src/executor/sort_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::row::{OwnedRow, Row, RowExt};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::function::window::WindowFuncCall;
use risingwave_expr::window_function::{create_window_state, StateKey, WindowStates};
use risingwave_expr::window_function::{
create_window_state, StateKey, WindowFuncCall, WindowStates,
};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder};
Expand Down
48 changes: 24 additions & 24 deletions src/expr/macro/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl FunctionAttr {

let pb_type = format_ident!("{}", utils::to_camel_case(&name));
let ctor_name = format_ident!("{}", self.ident_name());
let descriptor_type = quote! { crate::sig::func::FuncSign };
let descriptor_type = quote! { risingwave_expr::sig::func::FuncSign };
let build_fn = if build_fn {
let name = format_ident!("{}", user_fn.name);
quote! { #name }
Expand All @@ -80,10 +80,10 @@ impl FunctionAttr {
};
let deprecated = self.deprecated;
Ok(quote! {
#[ctor::ctor]
#[risingwave_expr::ctor]
fn #ctor_name() {
use risingwave_common::types::{DataType, DataTypeName};
unsafe { crate::sig::func::_register(#descriptor_type {
unsafe { risingwave_expr::sig::func::_register(#descriptor_type {
func: risingwave_pb::expr::expr_node::Type::#pb_type,
inputs_type: &[#(#args),*],
variadic: #variadic,
Expand Down Expand Up @@ -203,7 +203,7 @@ impl FunctionAttr {
let #prebuilt_inputs = match &#prebuilt_inputs {
Some(s) => s.as_scalar_ref_impl().try_into()?,
// the function should always return null if any const argument is null
None => return Ok(Box::new(crate::expr::LiteralExpression::new(
None => return Ok(Box::new(risingwave_expr::expr::LiteralExpression::new(
return_type,
None,
))),
Expand All @@ -217,8 +217,8 @@ impl FunctionAttr {

// ensure the number of children matches the number of arguments
let check_children = match variadic {
true => quote! { crate::ensure!(children.len() >= #num_args); },
false => quote! { crate::ensure!(children.len() == #num_args); },
true => quote! { risingwave_expr::ensure!(children.len() >= #num_args); },
false => quote! { risingwave_expr::ensure!(children.len() == #num_args); },
};

// evaluate variadic arguments in `eval`
Expand Down Expand Up @@ -407,8 +407,8 @@ impl FunctionAttr {
};

Ok(quote! {
|return_type: DataType, children: Vec<crate::expr::BoxedExpression>|
-> crate::Result<crate::expr::BoxedExpression>
|return_type: DataType, children: Vec<risingwave_expr::expr::BoxedExpression>|
-> risingwave_expr::Result<risingwave_expr::expr::BoxedExpression>
{
use std::sync::Arc;
use risingwave_common::array::*;
Expand All @@ -418,8 +418,8 @@ impl FunctionAttr {
use risingwave_common::util::iter_util::ZipEqFast;
use itertools::multizip;

use crate::expr::{Context, BoxedExpression};
use crate::Result;
use risingwave_expr::expr::{Context, BoxedExpression};
use risingwave_expr::Result;

#check_children
let prebuilt_arg = #prebuild_const;
Expand All @@ -435,7 +435,7 @@ impl FunctionAttr {
prebuilt_arg: #prebuilt_arg_type,
}
#[async_trait::async_trait]
impl crate::expr::Expression for #struct_name {
impl risingwave_expr::expr::Expression for #struct_name {
fn return_type(&self) -> DataType {
self.context.return_type.clone()
}
Expand Down Expand Up @@ -497,19 +497,19 @@ impl FunctionAttr {
false => format_ident!("{}", self.ident_name()),
true => format_ident!("{}_append_only", self.ident_name()),
};
let descriptor_type = quote! { crate::sig::agg::AggFuncSig };
let descriptor_type = quote! { risingwave_expr::sig::agg::AggFuncSig };
let build_fn = if build_fn {
let name = format_ident!("{}", user_fn.as_fn().name);
quote! { #name }
} else {
self.generate_agg_build_fn(user_fn)?
};
Ok(quote! {
#[ctor::ctor]
#[risingwave_expr::ctor]
fn #ctor_name() {
use risingwave_common::types::{DataType, DataTypeName};
unsafe { crate::sig::agg::_register(#descriptor_type {
func: crate::agg::AggKind::#pb_type,
unsafe { risingwave_expr::sig::agg::_register(#descriptor_type {
func: risingwave_expr::agg::AggKind::#pb_type,
inputs_type: &[#(#args),*],
state_type: #state_type,
ret_type: #ret,
Expand Down Expand Up @@ -678,8 +678,8 @@ impl FunctionAttr {
use risingwave_common::buffer::Bitmap;
use risingwave_common::estimate_size::EstimateSize;

use crate::Result;
use crate::agg::AggregateState;
use risingwave_expr::Result;
use risingwave_expr::agg::AggregateState;

#[derive(Clone)]
struct Agg {
Expand All @@ -688,7 +688,7 @@ impl FunctionAttr {
}

#[async_trait::async_trait]
impl crate::agg::AggregateFunction for Agg {
impl risingwave_expr::agg::AggregateFunction for Agg {
fn return_type(&self) -> DataType {
self.return_type.clone()
}
Expand Down Expand Up @@ -780,7 +780,7 @@ impl FunctionAttr {

let pb_type = format_ident!("{}", utils::to_camel_case(&name));
let ctor_name = format_ident!("{}", self.ident_name());
let descriptor_type = quote! { crate::sig::table_function::FuncSign };
let descriptor_type = quote! { risingwave_expr::sig::table_function::FuncSign };
let build_fn = if build_fn {
let name = format_ident!("{}", user_fn.name);
quote! { #name }
Expand All @@ -800,10 +800,10 @@ impl FunctionAttr {
quote! { |_| Ok(#ty) }
};
Ok(quote! {
#[ctor::ctor]
#[risingwave_expr::ctor]
fn #ctor_name() {
use risingwave_common::types::{DataType, DataTypeName};
unsafe { crate::sig::table_function::_register(#descriptor_type {
unsafe { risingwave_expr::sig::table_function::_register(#descriptor_type {
func: risingwave_pb::expr::table_function::Type::#pb_type,
inputs_type: &[#(#args),*],
ret_type: #ret,
Expand Down Expand Up @@ -910,15 +910,15 @@ impl FunctionAttr {
use risingwave_common::util::iter_util::ZipEqFast;
use itertools::multizip;

crate::ensure!(children.len() == #num_args);
risingwave_expr::ensure!(children.len() == #num_args);
let mut iter = children.into_iter();
#(let #all_child = iter.next().unwrap();)*
#(
let #const_child = #const_child.eval_const()?;
let #const_child = match &#const_child {
Some(s) => s.as_scalar_ref_impl().try_into()?,
// the function should always return empty if any const argument is null
None => return Ok(crate::table_function::empty(return_type)),
None => return Ok(risingwave_expr::table_function::empty(return_type)),
};
)*

Expand All @@ -930,7 +930,7 @@ impl FunctionAttr {
prebuilt_arg: #prebuilt_arg_type,
}
#[async_trait::async_trait]
impl crate::table_function::TableFunction for #struct_name {
impl risingwave_expr::table_function::TableFunction for #struct_name {
fn return_type(&self) -> DataType {
self.return_type.clone()
}
Expand Down
6 changes: 4 additions & 2 deletions src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@
#![feature(test)]
#![feature(arc_unwrap_or_clone)]

extern crate self as risingwave_expr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today I learned. 🤯


pub mod agg;
mod error;
pub mod expr;
pub mod function;
pub mod sig;
pub mod table_function;
pub mod vector_op;
pub mod window_function;

pub use ctor::ctor;
pub use error::{ExprError, Result};
use risingwave_common::{bail, ensure};
pub use risingwave_common::{bail, ensure};
pub use risingwave_expr_macro::*;
2 changes: 1 addition & 1 deletion src/expr/src/window_function/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use smallvec::SmallVec;
use super::buffer::WindowBuffer;
use super::{StateEvictHint, StateKey, StatePos, WindowState};
use crate::agg::{build_append_only, AggArgs, AggCall, BoxedAggregateFunction};
use crate::function::window::{WindowFuncCall, WindowFuncKind};
use crate::window_function::{WindowFuncCall, WindowFuncKind};
use crate::Result;

pub struct AggregateState {
Expand Down
4 changes: 2 additions & 2 deletions src/expr/src/window_function/state/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::ops::Range;

use either::Either;

use crate::function::window::{Frame, FrameBounds, FrameExclusion};
use crate::window_function::{Frame, FrameBounds, FrameExclusion};

struct Entry<K: Ord, V> {
key: K,
Expand Down Expand Up @@ -238,7 +238,7 @@ mod tests {
use itertools::Itertools;

use super::*;
use crate::function::window::{Frame, FrameBound};
use crate::window_function::{Frame, FrameBound};

#[test]
fn test_rows_frame_unbounded_preceding_to_current_row() {
Expand Down
3 changes: 1 addition & 2 deletions src/expr/src/window_function/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use risingwave_common::types::{Datum, DefaultOrdered};
use risingwave_common::util::memcmp_encoding::MemcmpEncoded;
use smallvec::SmallVec;

use super::WindowFuncCall;
use crate::function::window::WindowFuncKind;
use super::{WindowFuncCall, WindowFuncKind};
use crate::sig::FuncSigDebug;
use crate::{ExprError, Result};

Expand Down
4 changes: 2 additions & 2 deletions src/expr/src/window_function/state/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::types::Datum;
use smallvec::SmallVec;

use super::{StateEvictHint, StateKey, StatePos, WindowState};
use crate::function::window::WindowFuncCall;
use crate::window_function::WindowFuncCall;
use crate::Result;

#[derive(EstimateSize)]
Expand Down Expand Up @@ -83,7 +83,7 @@ mod tests {

use super::*;
use crate::agg::AggArgs;
use crate::function::window::{Frame, FrameBound, WindowFuncKind};
use crate::window_function::{Frame, FrameBound, WindowFuncKind};

fn create_state_key(pk: i64) -> StateKey {
StateKey {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_common::session_config::USER_NAME_WILD_CARD;
use risingwave_common::types::{DataType, ScalarImpl, Timestamptz};
use risingwave_common::{GIT_SHA, RW_VERSION};
use risingwave_expr::agg::{agg_kinds, AggKind};
use risingwave_expr::function::window::{
use risingwave_expr::window_function::{
Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind,
};
use risingwave_sqlparser::ast::{
Expand Down Expand Up @@ -1127,7 +1127,7 @@ impl Binder {
// TODO: really implement them.
// https://www.postgresql.org/docs/9.5/functions-info.html#FUNCTIONS-INFO-COMMENT-TABLE
// WARN: Hacked in [`Binder::bind_function`]!!!
("col_description", raw_literal(ExprImpl::literal_varchar("".to_string()))),
("col_description", raw_call(ExprType::ColDescription)),
("obj_description", raw_literal(ExprImpl::literal_varchar("".to_string()))),
("shobj_description", raw_literal(ExprImpl::literal_varchar("".to_string()))),
("pg_is_in_recovery", raw_literal(ExprImpl::literal_bool(false))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod window;
use std::fmt::Write;

// TODO(rc): this module is to be removed
use risingwave_expr::{function, ExprError};

#[function("col_description(varchar, int32) -> varchar")]
fn col_description(_name: &str, _col: i32, writer: &mut impl Write) -> Result<(), ExprError> {
// TODO: Currently we don't support `COMMENT` statement, so we just return empty string.
writer.write_str("").unwrap();

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub use crate::window_function::*;
mod col_description;
1 change: 1 addition & 0 deletions src/frontend/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub use order_by_expr::{OrderBy, OrderByExpr};
mod expr_mutator;
mod expr_rewriter;
mod expr_visitor;
mod function_impl;
mod session_timezone;
mod type_inference;
mod utils;
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ impl ExprVisitor<bool> for ImpureAnalyzer {
| expr_node::Type::Proctime
| expr_node::Type::PgSleep
| expr_node::Type::PgSleepFor
| expr_node::Type::PgSleepUntil => true,
| expr_node::Type::PgSleepUntil
| expr_node::Type::ColDescription => true,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/expr/window_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use itertools::Itertools;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::types::DataType;
use risingwave_expr::function::window::{Frame, WindowFuncKind};
use risingwave_expr::window_function::{Frame, WindowFuncKind};

use super::{AggCall, Expr, ExprImpl, OrderBy, RwResult};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::sort_util::{ColumnOrder, ColumnOrderDisplay};
use risingwave_expr::function::window::{Frame, WindowFuncKind};
use risingwave_expr::window_function::{Frame, WindowFuncKind};
use risingwave_pb::expr::PbWindowFunction;

use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::types::{DataType, Datum, ScalarImpl};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::agg::AggKind;
use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncKind};
use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncKind};

use super::generic::{GenericPlanRef, OverWindow, PlanWindowFunction, ProjectBuilder};
use super::utils::impl_distill_by_unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_expr::function::window::WindowFuncKind;
use risingwave_expr::window_function::WindowFuncKind;
use risingwave_pb::expr::expr_node::Type;
use risingwave_pb::plan_common::JoinType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use fixedbitset::FixedBitSet;
use risingwave_common::types::DataType;
use risingwave_expr::function::window::WindowFuncKind;
use risingwave_expr::window_function::WindowFuncKind;

use super::Rule;
use crate::expr::{collect_input_refs, ExprImpl, ExprType};
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/eowc_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use risingwave_expr::function::window::WindowFuncCall;
use risingwave_expr::window_function::WindowFuncCall;
use risingwave_pb::stream_plan::PbEowcOverWindowNode;
use risingwave_storage::StateStore;

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/from_proto/over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use risingwave_common::session_config::OverWindowCachePolicy;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_expr::function::window::WindowFuncCall;
use risingwave_expr::window_function::WindowFuncCall;
use risingwave_pb::stream_plan::PbOverWindowNode;
use risingwave_storage::StateStore;

Expand Down
2 changes: 1 addition & 1 deletion src/stream/tests/integration_tests/eowc_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use risingwave_expr::agg::{AggArgs, AggKind};
use risingwave_expr::function::window::{Frame, FrameBound, WindowFuncCall, WindowFuncKind};
use risingwave_expr::window_function::{Frame, FrameBound, WindowFuncCall, WindowFuncKind};
use risingwave_stream::executor::{EowcOverWindowExecutor, EowcOverWindowExecutorArgs};

use crate::prelude::*;
Expand Down
Loading