From 94154c03ff7d80076b9e4e10da416ce85a620ab4 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 18 Nov 2024 00:59:18 +0800 Subject: [PATCH] refactor(udf): remove `function_type` field previously used by Deno UDF (#19404) Signed-off-by: Richard Chien --- proto/catalog.proto | 6 +- proto/expr.proto | 9 +-- src/expr/core/src/aggregate/user_defined.rs | 1 - src/expr/core/src/expr/expr_udf.rs | 1 - src/expr/core/src/sig/udf.rs | 1 - .../core/src/table_function/user_defined.rs | 1 - src/frontend/src/catalog/function_catalog.rs | 9 +-- .../src/expr/user_defined_function.rs | 10 ++-- src/frontend/src/handler/create_aggregate.rs | 20 ++++--- src/frontend/src/handler/create_function.rs | 24 ++------ .../src/handler/create_sql_function.rs | 3 +- src/meta/model/migration/src/lib.rs | 2 + .../m20241115_085007_remove_function_type.rs | 35 ++++++++++++ src/meta/model/src/function.rs | 6 +- src/meta/src/controller/mod.rs | 3 +- src/sqlparser/src/ast/mod.rs | 55 +------------------ src/sqlparser/src/keywords.rs | 2 - src/sqlparser/src/parser.rs | 28 ---------- src/sqlparser/tests/sqlparser_postgres.rs | 25 --------- 19 files changed, 75 insertions(+), 166 deletions(-) create mode 100644 src/meta/model/migration/src/m20241115_085007_remove_function_type.rs diff --git a/proto/catalog.proto b/proto/catalog.proto index 5383104e9c0f..9bb8e62fd4a1 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -301,10 +301,10 @@ message Function { // The zstd-compressed binary of the function. optional bytes compressed_binary = 17; bool always_retry_on_network_error = 16; - // The runtime used when javascript is used as the language. Could be "quickjs" or "deno". + // The runtime selected when multiple runtimes are available for the language. Now is not used. optional string runtime = 18; - // The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator" - optional string function_type = 19; + reserved 19; + reserved "function_type"; oneof kind { ScalarFunction scalar = 11; diff --git a/proto/expr.proto b/proto/expr.proto index 533084351284..c6d8e2082fa7 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -610,10 +610,10 @@ message UserDefinedFunction { // - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary. optional bytes compressed_binary = 10; bool always_retry_on_network_error = 9; - // The runtime used when javascript is used as the language. Could be "quickjs" or "deno". + // The runtime selected when multiple runtimes are available for the language. Now is not used. optional string runtime = 11; - // The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator" - optional string function_type = 12; + reserved 12; + reserved "function_type"; } // Additional information for user defined table/aggregate functions. @@ -627,5 +627,6 @@ message UserDefinedFunctionMetadata { optional string body = 7; optional bytes compressed_binary = 10; optional string runtime = 11; - optional string function_type = 12; + reserved 12; + reserved "function_type"; } diff --git a/src/expr/core/src/aggregate/user_defined.rs b/src/expr/core/src/aggregate/user_defined.rs index 2f4fdc5f9f9c..cba83d4f439e 100644 --- a/src/expr/core/src/aggregate/user_defined.rs +++ b/src/expr/core/src/aggregate/user_defined.rs @@ -138,7 +138,6 @@ pub fn new_user_defined( arg_names: &udf.arg_names, return_type, always_retry_on_network_error: false, - function_type: udf.function_type.as_deref(), }) .context("failed to build UDF runtime")?; diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index 6ae27dabb245..41c9257036e7 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -185,7 +185,6 @@ impl Build for UserDefinedFunction { arg_names: &udf.arg_names, return_type: &return_type, always_retry_on_network_error: udf.always_retry_on_network_error, - function_type: udf.function_type.as_deref(), }) .context("failed to build UDF runtime")?; diff --git a/src/expr/core/src/sig/udf.rs b/src/expr/core/src/sig/udf.rs index 047879b9192b..e8aa1c3efdf1 100644 --- a/src/expr/core/src/sig/udf.rs +++ b/src/expr/core/src/sig/udf.rs @@ -105,7 +105,6 @@ pub struct UdfOptions<'a> { pub arg_names: &'a [String], pub return_type: &'a DataType, pub always_retry_on_network_error: bool, - pub function_type: Option<&'a str>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)] diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index b490e9b023af..826369666944 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -140,7 +140,6 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result, pub return_type: DataType, pub language: String, + pub runtime: Option, pub identifier: Option, pub body: Option, pub link: Option, pub compressed_binary: Option>, pub always_retry_on_network_error: bool, - pub function_type: Option, - pub runtime: Option, } #[derive(Clone, Display, PartialEq, Eq, Hash, Debug, EnumAsInner)] @@ -71,13 +70,12 @@ impl From<&PbFunction> for FunctionCatalog { arg_types: prost.arg_types.iter().map(|arg| arg.into()).collect(), return_type: prost.return_type.as_ref().expect("no return type").into(), language: prost.language.clone(), + runtime: prost.runtime.clone(), identifier: prost.identifier.clone(), body: prost.body.clone(), link: prost.link.clone(), compressed_binary: prost.compressed_binary.clone(), always_retry_on_network_error: prost.always_retry_on_network_error, - function_type: prost.function_type.clone(), - runtime: prost.runtime.clone(), } } } @@ -89,12 +87,11 @@ impl From<&FunctionCatalog> for PbUserDefinedFunctionMetadata { arg_types: c.arg_types.iter().map(|t| t.to_protobuf()).collect(), return_type: Some(c.return_type.to_protobuf()), language: c.language.clone(), + runtime: c.runtime.clone(), link: c.link.clone(), identifier: c.identifier.clone(), body: c.body.clone(), compressed_binary: c.compressed_binary.clone(), - function_type: c.function_type.clone(), - runtime: c.runtime.clone(), } } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 44abfa1859c4..084fe7387d76 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -47,21 +47,20 @@ impl UserDefinedFunction { let catalog = FunctionCatalog { // FIXME(yuhao): function id is not in udf proto. id: FunctionId::placeholder(), - name: udf.get_name().clone(), + name: udf.name.clone(), // FIXME(yuhao): owner is not in udf proto. owner: u32::MAX - 1, kind: FunctionKind::Scalar, arg_names: udf.arg_names.clone(), arg_types, return_type, - language: udf.get_language().clone(), + language: udf.language.clone(), + runtime: udf.runtime.clone(), identifier: udf.identifier.clone(), body: udf.body.clone(), link: udf.link.clone(), compressed_binary: udf.compressed_binary.clone(), always_retry_on_network_error: udf.always_retry_on_network_error, - function_type: udf.function_type.clone(), - runtime: udf.runtime.clone(), }; Ok(Self { @@ -93,13 +92,12 @@ impl Expr for UserDefinedFunction { .map(|t| t.to_protobuf()) .collect(), language: self.catalog.language.clone(), + runtime: self.catalog.runtime.clone(), identifier: self.catalog.identifier.clone(), link: self.catalog.link.clone(), body: self.catalog.body.clone(), compressed_binary: self.catalog.compressed_binary.clone(), always_retry_on_network_error: self.catalog.always_retry_on_network_error, - function_type: self.catalog.function_type.clone(), - runtime: self.catalog.runtime.clone(), })), } } diff --git a/src/frontend/src/handler/create_aggregate.rs b/src/frontend/src/handler/create_aggregate.rs index b9b8a391eaff..fe7460ff0997 100644 --- a/src/frontend/src/handler/create_aggregate.rs +++ b/src/frontend/src/handler/create_aggregate.rs @@ -52,6 +52,16 @@ pub async fn handle_create_aggregate( None => return Err(ErrorCode::InvalidParameterValue("no language".into()).into()), }; + let runtime = match params.runtime { + Some(_) => { + return Err(ErrorCode::InvalidParameterValue( + "runtime selection is currently not supported".to_string(), + ) + .into()); + } + None => None, + }; + let return_type = bind_data_type(&returns)?; let mut arg_names = vec![]; @@ -94,13 +104,6 @@ pub async fn handle_create_aggregate( } _ => None, }; - let function_type = match params.function_type { - Some(CreateFunctionType::Sync) => Some("sync".to_string()), - Some(CreateFunctionType::Async) => Some("async".to_string()), - Some(CreateFunctionType::Generator) => Some("generator".to_string()), - Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()), - None => None, - }; let create_fn = risingwave_expr::sig::find_udf_impl(&language, None, link)?.create_fn; let output = create_fn(CreateFunctionOptions { @@ -124,14 +127,13 @@ pub async fn handle_create_aggregate( arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, + runtime, identifier: Some(output.identifier), link: link.map(|s| s.to_string()), body: output.body, compressed_binary: output.compressed_binary, owner: session.user_id(), always_retry_on_network_error: false, - runtime: None, - function_type, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index ccd83a13ed81..b81d2b4514ed 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -60,15 +60,11 @@ pub async fn handle_create_function( }; let runtime = match params.runtime { - Some(runtime) => { - if language == "javascript" { - Some(runtime.real_value()) - } else { - return Err(ErrorCode::InvalidParameterValue( - "runtime is only supported for javascript".to_string(), - ) - .into()); - } + Some(_) => { + return Err(ErrorCode::InvalidParameterValue( + "runtime selection is currently not supported".to_string(), + ) + .into()); } None => None, }; @@ -141,13 +137,6 @@ pub async fn handle_create_function( } _ => None, }; - let function_type = match params.function_type { - Some(CreateFunctionType::Sync) => Some("sync".to_string()), - Some(CreateFunctionType::Async) => Some("async".to_string()), - Some(CreateFunctionType::Generator) => Some("generator".to_string()), - Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()), - None => None, - }; let create_fn = risingwave_expr::sig::find_udf_impl(&language, runtime.as_deref(), link)?.create_fn; @@ -176,6 +165,7 @@ pub async fn handle_create_function( arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, + runtime, identifier: Some(output.identifier), link: link.map(|s| s.to_string()), body: output.body, @@ -184,8 +174,6 @@ pub async fn handle_create_function( always_retry_on_network_error: with_options .always_retry_on_network_error .unwrap_or_default(), - runtime, - function_type, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index 9b5d34c34abe..c733f603a3c4 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -336,14 +336,13 @@ pub async fn handle_create_sql_function( arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, + runtime: None, identifier: None, body: Some(body), compressed_binary: None, link: None, owner: session.user_id(), always_retry_on_network_error: false, - runtime: None, - function_type: None, }; let catalog_writer = session.catalog_writer()?; diff --git a/src/meta/model/migration/src/lib.rs b/src/meta/model/migration/src/lib.rs index b84a29891eee..cdf3b496b045 100644 --- a/src/meta/model/migration/src/lib.rs +++ b/src/meta/model/migration/src/lib.rs @@ -24,6 +24,7 @@ mod m20240820_081248_add_time_travel_per_table_epoch; mod m20240911_083152_variable_vnode_count; mod m20241016_065621_hummock_gc_history; mod m20241025_062548_singleton_vnode_count; +mod m20241115_085007_remove_function_type; mod utils; pub struct Migrator; @@ -86,6 +87,7 @@ impl MigratorTrait for Migrator { Box::new(m20240911_083152_variable_vnode_count::Migration), Box::new(m20241016_065621_hummock_gc_history::Migration), Box::new(m20241025_062548_singleton_vnode_count::Migration), + Box::new(m20241115_085007_remove_function_type::Migration), ] } } diff --git a/src/meta/model/migration/src/m20241115_085007_remove_function_type.rs b/src/meta/model/migration/src/m20241115_085007_remove_function_type.rs new file mode 100644 index 000000000000..b74382991c88 --- /dev/null +++ b/src/meta/model/migration/src/m20241115_085007_remove_function_type.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Function::Table) + .drop_column(Function::FunctionType) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Function::Table) + .add_column(ColumnDef::new(Function::FunctionType).string()) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Function { + Table, + FunctionType, +} diff --git a/src/meta/model/src/function.rs b/src/meta/model/src/function.rs index 0fea52c6c348..48e9812999d6 100644 --- a/src/meta/model/src/function.rs +++ b/src/meta/model/src/function.rs @@ -42,14 +42,13 @@ pub struct Model { pub arg_types: DataTypeArray, pub return_type: DataType, pub language: String, + pub runtime: Option, pub link: Option, pub identifier: Option, pub body: Option, pub compressed_binary: Option>, pub kind: FunctionKind, pub always_retry_on_network_error: bool, - pub runtime: Option, - pub function_type: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -101,14 +100,13 @@ impl From for ActiveModel { arg_types: Set(DataTypeArray::from(function.arg_types)), return_type: Set(DataType::from(&function.return_type.unwrap())), language: Set(function.language), + runtime: Set(function.runtime), link: Set(function.link), identifier: Set(function.identifier), body: Set(function.body), compressed_binary: Set(function.compressed_binary), kind: Set(function.kind.unwrap().into()), always_retry_on_network_error: Set(function.always_retry_on_network_error), - runtime: Set(function.runtime), - function_type: Set(function.function_type), } } } diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index c7cf45daad9e..0c64461ab2ee 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -357,14 +357,13 @@ impl From> for PbFunction { arg_types: value.0.arg_types.to_protobuf(), return_type: Some(value.0.return_type.to_protobuf()), language: value.0.language, + runtime: value.0.runtime, link: value.0.link, identifier: value.0.identifier, body: value.0.body, compressed_binary: value.0.compressed_binary, kind: Some(value.0.kind.into()), always_retry_on_network_error: value.0.always_retry_on_network_error, - runtime: value.0.runtime, - function_type: value.0.function_type, } } } diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index 9c989db50e97..3460465c2a6a 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -3098,8 +3098,6 @@ pub struct CreateFunctionBody { pub return_: Option, /// USING ... pub using: Option, - - pub function_type: Option, } impl fmt::Display for CreateFunctionBody { @@ -3122,9 +3120,6 @@ impl fmt::Display for CreateFunctionBody { if let Some(using) = &self.using { write!(f, " {using}")?; } - if let Some(function_type) = &self.function_type { - write!(f, " {function_type}")?; - } Ok(()) } } @@ -3197,26 +3192,6 @@ impl fmt::Display for CreateFunctionUsing { } } -#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub enum CreateFunctionType { - Sync, - Async, - Generator, - AsyncGenerator, -} - -impl fmt::Display for CreateFunctionType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - CreateFunctionType::Sync => write!(f, "SYNC"), - CreateFunctionType::Async => write!(f, "ASYNC"), - CreateFunctionType::Generator => write!(f, "SYNC GENERATOR"), - CreateFunctionType::AsyncGenerator => write!(f, "ASYNC GENERATOR"), - } - } -} - #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum SetVariableValue { @@ -3460,12 +3435,11 @@ mod tests { returns: Some(CreateFunctionReturns::Value(DataType::Int)), params: CreateFunctionBody { language: Some(Ident::new_unchecked("python")), + runtime: None, behavior: Some(FunctionBehavior::Immutable), as_: Some(FunctionDefinition::SingleQuotedDef("SELECT 1".to_string())), return_: None, using: None, - runtime: None, - function_type: None, }, with_options: CreateFunctionWithOptions { always_retry_on_network_error: None, @@ -3483,12 +3457,11 @@ mod tests { returns: Some(CreateFunctionReturns::Value(DataType::Int)), params: CreateFunctionBody { language: Some(Ident::new_unchecked("python")), + runtime: None, behavior: Some(FunctionBehavior::Immutable), as_: Some(FunctionDefinition::SingleQuotedDef("SELECT 1".to_string())), return_: None, using: None, - runtime: None, - function_type: None, }, with_options: CreateFunctionWithOptions { always_retry_on_network_error: Some(true), @@ -3498,29 +3471,5 @@ mod tests { "CREATE FUNCTION foo(INT) RETURNS INT LANGUAGE python IMMUTABLE AS 'SELECT 1' WITH ( ALWAYS_RETRY_NETWORK_ERRORS = true )", format!("{}", create_function) ); - - let create_function = Statement::CreateFunction { - temporary: false, - or_replace: false, - name: ObjectName(vec![Ident::new_unchecked("foo")]), - args: Some(vec![OperateFunctionArg::unnamed(DataType::Int)]), - returns: Some(CreateFunctionReturns::Value(DataType::Int)), - params: CreateFunctionBody { - language: Some(Ident::new_unchecked("javascript")), - behavior: None, - as_: Some(FunctionDefinition::SingleQuotedDef("SELECT 1".to_string())), - return_: None, - using: None, - runtime: Some(Ident::new_unchecked("deno")), - function_type: Some(CreateFunctionType::AsyncGenerator), - }, - with_options: CreateFunctionWithOptions { - always_retry_on_network_error: None, - }, - }; - assert_eq!( - "CREATE FUNCTION foo(INT) RETURNS INT LANGUAGE javascript RUNTIME deno AS 'SELECT 1' ASYNC GENERATOR", - format!("{}", create_function) - ); } } diff --git a/src/sqlparser/src/keywords.rs b/src/sqlparser/src/keywords.rs index ead0bec453f0..d3b196d3bcb0 100644 --- a/src/sqlparser/src/keywords.rs +++ b/src/sqlparser/src/keywords.rs @@ -90,7 +90,6 @@ define_keywords!( ASENSITIVE, ASOF, ASYMMETRIC, - ASYNC, AT, ATOMIC, AUTHORIZATION, @@ -252,7 +251,6 @@ define_keywords!( FUNCTIONS, FUSION, GAP, - GENERATOR, GET, GLOBAL, GRANT, diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d39a2c1a8f64..f93d41aeed2c 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -2365,15 +2365,6 @@ impl Parser<'_> { } else if self.parse_keyword(Keyword::USING) { ensure_not_set(&body.using, "USING")?; body.using = Some(self.parse_create_function_using()?); - } else if self.parse_keyword(Keyword::SYNC) { - ensure_not_set(&body.function_type, "SYNC | ASYNC")?; - body.function_type = Some(self.parse_function_type(false, false)?); - } else if self.parse_keyword(Keyword::ASYNC) { - ensure_not_set(&body.function_type, "SYNC | ASYNC")?; - body.function_type = Some(self.parse_function_type(true, false)?); - } else if self.parse_keyword(Keyword::GENERATOR) { - ensure_not_set(&body.function_type, "SYNC | ASYNC")?; - body.function_type = Some(self.parse_function_type(false, true)?); } else { return Ok(body); } @@ -2396,25 +2387,6 @@ impl Parser<'_> { } } - fn parse_function_type( - &mut self, - is_async: bool, - is_generator: bool, - ) -> PResult { - let is_generator = if is_generator { - true - } else { - self.parse_keyword(Keyword::GENERATOR) - }; - - match (is_async, is_generator) { - (false, false) => Ok(CreateFunctionType::Sync), - (true, false) => Ok(CreateFunctionType::Async), - (false, true) => Ok(CreateFunctionType::Generator), - (true, true) => Ok(CreateFunctionType::AsyncGenerator), - } - } - // CREATE USER name [ [ WITH ] option [ ... ] ] // where option can be: // SUPERUSER | NOSUPERUSER diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index 311b2ba213c4..549920d1c758 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -874,31 +874,6 @@ fn parse_create_function() { with_options: Default::default(), } ); - - let sql = "CREATE FUNCTION add(INT, INT) RETURNS INT LANGUAGE SQL IMMUTABLE AS 'select $1 + $2;' ASYNC"; - assert_eq!( - verified_stmt(sql), - Statement::CreateFunction { - or_replace: false, - temporary: false, - name: ObjectName(vec![Ident::new_unchecked("add")]), - args: Some(vec![ - OperateFunctionArg::unnamed(DataType::Int), - OperateFunctionArg::unnamed(DataType::Int), - ]), - returns: Some(CreateFunctionReturns::Value(DataType::Int)), - params: CreateFunctionBody { - language: Some("SQL".into()), - behavior: Some(FunctionBehavior::Immutable), - as_: Some(FunctionDefinition::SingleQuotedDef( - "select $1 + $2;".into() - )), - function_type: Some(CreateFunctionType::Async), - ..Default::default() - }, - with_options: Default::default(), - } - ); } #[test]