-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sql-udf): support basic anonymous sql udf #14139
Changes from 6 commits
6e3f5d4
bf57014
16780b4
fa8fc7a
5b5c82d
456d6fd
94ebbda
22ca504
7084a6c
fa0c44b
2dde82e
831a7b2
d5f6b7f
7e821f2
8074636
44f8d46
a356566
1c9b8ab
2618a0b
922c1e7
cfb0a62
9ccaa07
c6d677b
88b00f1
53e53ff
d6c3d0d
199f04f
d601ac8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
statement ok | ||
SET RW_IMPLICIT_FLUSH TO true; | ||
|
||
# Create an anonymous function with double dollar as clause | ||
statement ok | ||
create function add(INT, INT) returns int as $$select $1 + $2$$ language sql; | ||
|
||
# Create an anonymous function with single quote as clause | ||
statement ok | ||
create function sub(INT, INT) returns int as 'select $1 - $2' language sql; | ||
|
||
# Currently we can only support constant calling convention | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
statement ok | ||
create function add_sub_binding() returns int as 'select add(1, 1) + sub(2, 2)' language sql; | ||
|
||
# Call the defined sql udf | ||
query I | ||
select add(1, -1); | ||
---- | ||
0 | ||
|
||
query I | ||
select sub(1, 1); | ||
---- | ||
0 | ||
|
||
query I | ||
select add_sub_binding(); | ||
---- | ||
2 | ||
|
||
query III | ||
select add(1, -1), sub(1, 1), add_sub_binding(); | ||
---- | ||
0 0 2 | ||
|
||
# Create a mock table | ||
statement ok | ||
create table t1 (c1 INT, c2 INT); | ||
|
||
# Insert some data into the mock table | ||
statement ok | ||
insert into t1 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5); | ||
|
||
query III | ||
select sub(c1, c2), c1, c2, add(c1, c2) from t1 order by c1 asc; | ||
---- | ||
0 1 1 2 | ||
0 2 2 4 | ||
0 3 3 6 | ||
0 4 4 8 | ||
0 5 5 10 | ||
|
||
# Invalid function body syntax | ||
statement error | ||
create function add_error(INT, INT) returns int as $$select $1 + $2 +$$ language sql; | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Multiple type interleaving sql udf | ||
statement ok | ||
create function add_sub(INT, FLOAT, INT) returns FLOAT as $$select -$1 + $2 - $3$$ language sql; | ||
|
||
# Note: need EXPLICIT type cast in order to call the multiple types interleaving sql udf | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
query I | ||
select add_sub(1::INT, 5.1415926::FLOAT, 1::INT); | ||
---- | ||
3.1415926 | ||
|
||
query III | ||
select add(1, -1), sub(1, 1), add_sub(1::INT, 5.1415926::FLOAT, 1::INT); | ||
---- | ||
0 0 3.1415926 | ||
|
||
# Create another mock table | ||
statement ok | ||
create table t2 (c1 INT, c2 FLOAT, c3 INT); | ||
|
||
statement ok | ||
insert into t2 values (1, 3.14, 2), (2, 4.44, 5), (20, 10.30, 02); | ||
|
||
query IIIII | ||
select c1, c2, c3, add(c1, c3), sub(c1, c3), add_sub(c1::INT, c2::FLOAT, c3::INT) from t2 order by c1 asc; | ||
---- | ||
1 3.14 2 3 -1 0.14000000000000012 | ||
2 4.44 5 7 -3 -2.5599999999999996 | ||
20 10.3 2 22 18 -11.7 | ||
|
||
# Drop the functions | ||
statement ok | ||
drop function add; | ||
|
||
statement ok | ||
drop function sub; | ||
|
||
statement ok | ||
drop function add_sub_binding; | ||
|
||
statement ok | ||
drop function add_sub; | ||
|
||
# Drop the mock table | ||
statement ok | ||
drop table t1; | ||
|
||
statement ok | ||
drop table t2; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
use std::collections::HashMap; | ||
use std::iter::once; | ||
use std::str::FromStr; | ||
use std::sync::LazyLock; | ||
use std::sync::{Arc, LazyLock}; | ||
|
||
use bk_tree::{metrics, BKTree}; | ||
use itertools::Itertools; | ||
|
@@ -30,13 +30,14 @@ use risingwave_expr::window_function::{ | |
Frame, FrameBound, FrameBounds, FrameExclusion, WindowFuncKind, | ||
}; | ||
use risingwave_sqlparser::ast::{ | ||
self, Function, FunctionArg, FunctionArgExpr, Ident, WindowFrameBound, WindowFrameExclusion, | ||
WindowFrameUnits, WindowSpec, | ||
self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, Ident, SelectItem, SetExpr, | ||
Statement, WindowFrameBound, WindowFrameExclusion, WindowFrameUnits, WindowSpec, | ||
}; | ||
use thiserror_ext::AsReport; | ||
|
||
use crate::binder::bind_context::Clause; | ||
use crate::binder::{Binder, BoundQuery, BoundSetExpr}; | ||
use crate::catalog::function_catalog::FunctionCatalog; | ||
use crate::expr::{ | ||
AggCall, Expr, ExprImpl, ExprType, FunctionCall, FunctionCallWithLambda, Literal, Now, OrderBy, | ||
Subquery, SubqueryKind, TableFunction, TableFunctionType, UserDefinedFunction, WindowFunction, | ||
|
@@ -117,6 +118,9 @@ impl Binder { | |
return self.bind_array_transform(f); | ||
} | ||
|
||
// Used later in sql udf expression evaluation | ||
let args = f.args.clone(); | ||
|
||
let inputs = f | ||
.args | ||
.into_iter() | ||
|
@@ -149,6 +153,53 @@ impl Binder { | |
return Ok(TableFunction::new(function_type, inputs)?.into()); | ||
} | ||
|
||
/// TODO: add name related logic | ||
fn create_udf_context( | ||
binder: &mut Binder, | ||
args: &[FunctionArg], | ||
_catalog: &Arc<FunctionCatalog>, | ||
) { | ||
binder.udf_context = args | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.iter() | ||
.enumerate() | ||
.map(|(i, current_arg)| { | ||
if let FunctionArg::Unnamed(arg) = current_arg { | ||
let FunctionArgExpr::Expr(e) = arg else { | ||
panic!("invalid syntax"); | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
// if catalog.arg_names.is_some() { | ||
// panic!("invalid syntax"); | ||
// } | ||
return ("$".to_string() + &(i + 1).to_string(), e.clone()); | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
panic!("invalid syntax"); | ||
}) | ||
.collect() | ||
} | ||
|
||
fn extract_udf_expression(ast: Vec<Statement>) -> AstExpr { | ||
// Extract the expression out | ||
let Statement::Query(query) = ast | ||
.into_iter() | ||
.exactly_one() | ||
.expect("sql udf should contain only one statement") | ||
else { | ||
unreachable!("sql udf should contain a query statement"); | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}; | ||
let SetExpr::Select(select) = query.body else { | ||
panic!("Invalid syntax"); | ||
}; | ||
let projection = select.projection; | ||
let SelectItem::UnnamedExpr(expr) = projection | ||
.into_iter() | ||
.exactly_one() | ||
.expect("`projection` should contain only one `SelectItem`") | ||
else { | ||
unreachable!("`projection` should contain only one `SelectItem`"); | ||
}; | ||
expr | ||
} | ||
|
||
// user defined function | ||
// TODO: resolve schema name https://github.com/risingwavelabs/risingwave/issues/12422 | ||
if let Ok(schema) = self.first_valid_schema() | ||
|
@@ -158,13 +209,22 @@ impl Binder { | |
) | ||
{ | ||
use crate::catalog::function_catalog::FunctionKind::*; | ||
match &func.kind { | ||
Scalar { .. } => return Ok(UserDefinedFunction::new(func.clone(), inputs).into()), | ||
Table { .. } => { | ||
self.ensure_table_function_allowed()?; | ||
return Ok(TableFunction::new_user_defined(func.clone(), inputs).into()); | ||
if func.language == "sql" { | ||
// This represents the current user defined function is `language sql` | ||
let ast = risingwave_sqlparser::parser::Parser::parse_sql(func.identifier.as_str()).unwrap(); | ||
xzhseh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// The actual inline logic | ||
create_udf_context(self, &args, &Arc::clone(func)); | ||
return self.bind_expr(extract_udf_expression(ast)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if we should create a new binder to do the inline. Or at least we should clean the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer the latter solution, creating a new binder to do the inline may add extra cost and is hard to integrate with the on-going binding process in the main routine 🤔️? |
||
} else { | ||
debug_assert!(func.language == "python" || func.language == "java", "only `python` and `java` are currently supported for general udf"); | ||
match &func.kind { | ||
Scalar { .. } => return Ok(UserDefinedFunction::new(func.clone(), inputs).into()), | ||
Table { .. } => { | ||
self.ensure_table_function_allowed()?; | ||
return Ok(TableFunction::new_user_defined(func.clone(), inputs).into()); | ||
} | ||
Aggregate => todo!("support UDAF"), | ||
} | ||
Aggregate => todo!("support UDAF"), | ||
} | ||
} | ||
|
||
|
@@ -1216,7 +1276,7 @@ impl Binder { | |
static FUNCTIONS_BKTREE: LazyLock<BKTree<&str>> = LazyLock::new(|| { | ||
let mut tree = BKTree::new(metrics::Levenshtein); | ||
|
||
// TODO: Also hint other functinos, e,g, Agg or UDF. | ||
// TODO: Also hint other functinos, e.g., Agg or UDF. | ||
for k in HANDLES.keys() { | ||
tree.add(*k); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QQ: Can prepared statements work correctly? And are there any tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be surprising but I couldn't find any e2e tests for the prepared statement itself. 😕
#14141
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, under current implementation the check of
udf_context
will be immediately cleared afterbind_function
, which ensure it will not interleave with the parameter bindings of prepare-related statements. (e.g., triggered by third party drivers, etc)