Skip to content

Commit

Permalink
refactor(udf): remove function_type field previously used by Deno U…
Browse files Browse the repository at this point in the history
…DF (#19404)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Nov 17, 2024
1 parent 0519b3f commit 94154c0
Show file tree
Hide file tree
Showing 19 changed files with 75 additions and 166 deletions.
6 changes: 3 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ message Function {
// The zstd-compressed binary of the function.
optional bytes compressed_binary = 17;
bool always_retry_on_network_error = 16;
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
// The runtime selected when multiple runtimes are available for the language. Now is not used.
optional string runtime = 18;
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
optional string function_type = 19;
reserved 19;
reserved "function_type";

oneof kind {
ScalarFunction scalar = 11;
Expand Down
9 changes: 5 additions & 4 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,10 @@ message UserDefinedFunction {
// - If `language` is `rust` or `wasm`, the zstd-compressed wasm binary.
optional bytes compressed_binary = 10;
bool always_retry_on_network_error = 9;
// The runtime used when javascript is used as the language. Could be "quickjs" or "deno".
// The runtime selected when multiple runtimes are available for the language. Now is not used.
optional string runtime = 11;
// The function type, which is used to execute the function. Could be "sync", "async", "generator" or "async_generator"
optional string function_type = 12;
reserved 12;
reserved "function_type";
}

// Additional information for user defined table/aggregate functions.
Expand All @@ -627,5 +627,6 @@ message UserDefinedFunctionMetadata {
optional string body = 7;
optional bytes compressed_binary = 10;
optional string runtime = 11;
optional string function_type = 12;
reserved 12;
reserved "function_type";
}
1 change: 0 additions & 1 deletion src/expr/core/src/aggregate/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ pub fn new_user_defined(
arg_names: &udf.arg_names,
return_type,
always_retry_on_network_error: false,
function_type: udf.function_type.as_deref(),
})
.context("failed to build UDF runtime")?;

Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ impl Build for UserDefinedFunction {
arg_names: &udf.arg_names,
return_type: &return_type,
always_retry_on_network_error: udf.always_retry_on_network_error,
function_type: udf.function_type.as_deref(),
})
.context("failed to build UDF runtime")?;

Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/sig/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub struct UdfOptions<'a> {
pub arg_names: &'a [String],
pub return_type: &'a DataType,
pub always_retry_on_network_error: bool,
pub function_type: Option<&'a str>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumAsInner)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/table_function/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result<Bo
arg_names: &udf.arg_names,
return_type: &return_type,
always_retry_on_network_error: false,
function_type: udf.function_type.as_deref(),
})
.context("failed to build UDF runtime")?;

Expand Down
9 changes: 3 additions & 6 deletions src/frontend/src/catalog/function_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ pub struct FunctionCatalog {
pub arg_types: Vec<DataType>,
pub return_type: DataType,
pub language: String,
pub runtime: Option<String>,
pub identifier: Option<String>,
pub body: Option<String>,
pub link: Option<String>,
pub compressed_binary: Option<Vec<u8>>,
pub always_retry_on_network_error: bool,
pub function_type: Option<String>,
pub runtime: Option<String>,
}

#[derive(Clone, Display, PartialEq, Eq, Hash, Debug, EnumAsInner)]
Expand Down Expand Up @@ -71,13 +70,12 @@ impl From<&PbFunction> for FunctionCatalog {
arg_types: prost.arg_types.iter().map(|arg| arg.into()).collect(),
return_type: prost.return_type.as_ref().expect("no return type").into(),
language: prost.language.clone(),
runtime: prost.runtime.clone(),
identifier: prost.identifier.clone(),
body: prost.body.clone(),
link: prost.link.clone(),
compressed_binary: prost.compressed_binary.clone(),
always_retry_on_network_error: prost.always_retry_on_network_error,
function_type: prost.function_type.clone(),
runtime: prost.runtime.clone(),
}
}
}
Expand All @@ -89,12 +87,11 @@ impl From<&FunctionCatalog> for PbUserDefinedFunctionMetadata {
arg_types: c.arg_types.iter().map(|t| t.to_protobuf()).collect(),
return_type: Some(c.return_type.to_protobuf()),
language: c.language.clone(),
runtime: c.runtime.clone(),
link: c.link.clone(),
identifier: c.identifier.clone(),
body: c.body.clone(),
compressed_binary: c.compressed_binary.clone(),
function_type: c.function_type.clone(),
runtime: c.runtime.clone(),
}
}
}
Expand Down
10 changes: 4 additions & 6 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,20 @@ impl UserDefinedFunction {
let catalog = FunctionCatalog {
// FIXME(yuhao): function id is not in udf proto.
id: FunctionId::placeholder(),
name: udf.get_name().clone(),
name: udf.name.clone(),
// FIXME(yuhao): owner is not in udf proto.
owner: u32::MAX - 1,
kind: FunctionKind::Scalar,
arg_names: udf.arg_names.clone(),
arg_types,
return_type,
language: udf.get_language().clone(),
language: udf.language.clone(),
runtime: udf.runtime.clone(),
identifier: udf.identifier.clone(),
body: udf.body.clone(),
link: udf.link.clone(),
compressed_binary: udf.compressed_binary.clone(),
always_retry_on_network_error: udf.always_retry_on_network_error,
function_type: udf.function_type.clone(),
runtime: udf.runtime.clone(),
};

Ok(Self {
Expand Down Expand Up @@ -93,13 +92,12 @@ impl Expr for UserDefinedFunction {
.map(|t| t.to_protobuf())
.collect(),
language: self.catalog.language.clone(),
runtime: self.catalog.runtime.clone(),
identifier: self.catalog.identifier.clone(),
link: self.catalog.link.clone(),
body: self.catalog.body.clone(),
compressed_binary: self.catalog.compressed_binary.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
function_type: self.catalog.function_type.clone(),
runtime: self.catalog.runtime.clone(),
})),
}
}
Expand Down
20 changes: 11 additions & 9 deletions src/frontend/src/handler/create_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ pub async fn handle_create_aggregate(
None => return Err(ErrorCode::InvalidParameterValue("no language".into()).into()),
};

let runtime = match params.runtime {
Some(_) => {
return Err(ErrorCode::InvalidParameterValue(
"runtime selection is currently not supported".to_string(),
)
.into());
}
None => None,
};

let return_type = bind_data_type(&returns)?;

let mut arg_names = vec![];
Expand Down Expand Up @@ -94,13 +104,6 @@ pub async fn handle_create_aggregate(
}
_ => None,
};
let function_type = match params.function_type {
Some(CreateFunctionType::Sync) => Some("sync".to_string()),
Some(CreateFunctionType::Async) => Some("async".to_string()),
Some(CreateFunctionType::Generator) => Some("generator".to_string()),
Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()),
None => None,
};

let create_fn = risingwave_expr::sig::find_udf_impl(&language, None, link)?.create_fn;
let output = create_fn(CreateFunctionOptions {
Expand All @@ -124,14 +127,13 @@ pub async fn handle_create_aggregate(
arg_types: arg_types.into_iter().map(|t| t.into()).collect(),
return_type: Some(return_type.into()),
language,
runtime,
identifier: Some(output.identifier),
link: link.map(|s| s.to_string()),
body: output.body,
compressed_binary: output.compressed_binary,
owner: session.user_id(),
always_retry_on_network_error: false,
runtime: None,
function_type,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
24 changes: 6 additions & 18 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,11 @@ pub async fn handle_create_function(
};

let runtime = match params.runtime {
Some(runtime) => {
if language == "javascript" {
Some(runtime.real_value())
} else {
return Err(ErrorCode::InvalidParameterValue(
"runtime is only supported for javascript".to_string(),
)
.into());
}
Some(_) => {
return Err(ErrorCode::InvalidParameterValue(
"runtime selection is currently not supported".to_string(),
)
.into());
}
None => None,
};
Expand Down Expand Up @@ -141,13 +137,6 @@ pub async fn handle_create_function(
}
_ => None,
};
let function_type = match params.function_type {
Some(CreateFunctionType::Sync) => Some("sync".to_string()),
Some(CreateFunctionType::Async) => Some("async".to_string()),
Some(CreateFunctionType::Generator) => Some("generator".to_string()),
Some(CreateFunctionType::AsyncGenerator) => Some("async_generator".to_string()),
None => None,
};

let create_fn =
risingwave_expr::sig::find_udf_impl(&language, runtime.as_deref(), link)?.create_fn;
Expand Down Expand Up @@ -176,6 +165,7 @@ pub async fn handle_create_function(
arg_types: arg_types.into_iter().map(|t| t.into()).collect(),
return_type: Some(return_type.into()),
language,
runtime,
identifier: Some(output.identifier),
link: link.map(|s| s.to_string()),
body: output.body,
Expand All @@ -184,8 +174,6 @@ pub async fn handle_create_function(
always_retry_on_network_error: with_options
.always_retry_on_network_error
.unwrap_or_default(),
runtime,
function_type,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/handler/create_sql_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,13 @@ pub async fn handle_create_sql_function(
arg_types: arg_types.into_iter().map(|t| t.into()).collect(),
return_type: Some(return_type.into()),
language,
runtime: None,
identifier: None,
body: Some(body),
compressed_binary: None,
link: None,
owner: session.user_id(),
always_retry_on_network_error: false,
runtime: None,
function_type: None,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod m20240820_081248_add_time_travel_per_table_epoch;
mod m20240911_083152_variable_vnode_count;
mod m20241016_065621_hummock_gc_history;
mod m20241025_062548_singleton_vnode_count;
mod m20241115_085007_remove_function_type;
mod utils;

pub struct Migrator;
Expand Down Expand Up @@ -86,6 +87,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240911_083152_variable_vnode_count::Migration),
Box::new(m20241016_065621_hummock_gc_history::Migration),
Box::new(m20241025_062548_singleton_vnode_count::Migration),
Box::new(m20241115_085007_remove_function_type::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Function::Table)
.drop_column(Function::FunctionType)
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Function::Table)
.add_column(ColumnDef::new(Function::FunctionType).string())
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum Function {
Table,
FunctionType,
}
6 changes: 2 additions & 4 deletions src/meta/model/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@ pub struct Model {
pub arg_types: DataTypeArray,
pub return_type: DataType,
pub language: String,
pub runtime: Option<String>,
pub link: Option<String>,
pub identifier: Option<String>,
pub body: Option<String>,
pub compressed_binary: Option<Vec<u8>>,
pub kind: FunctionKind,
pub always_retry_on_network_error: bool,
pub runtime: Option<String>,
pub function_type: Option<String>,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -101,14 +100,13 @@ impl From<PbFunction> for ActiveModel {
arg_types: Set(DataTypeArray::from(function.arg_types)),
return_type: Set(DataType::from(&function.return_type.unwrap())),
language: Set(function.language),
runtime: Set(function.runtime),
link: Set(function.link),
identifier: Set(function.identifier),
body: Set(function.body),
compressed_binary: Set(function.compressed_binary),
kind: Set(function.kind.unwrap().into()),
always_retry_on_network_error: Set(function.always_retry_on_network_error),
runtime: Set(function.runtime),
function_type: Set(function.function_type),
}
}
}
3 changes: 1 addition & 2 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,13 @@ impl From<ObjectModel<function::Model>> for PbFunction {
arg_types: value.0.arg_types.to_protobuf(),
return_type: Some(value.0.return_type.to_protobuf()),
language: value.0.language,
runtime: value.0.runtime,
link: value.0.link,
identifier: value.0.identifier,
body: value.0.body,
compressed_binary: value.0.compressed_binary,
kind: Some(value.0.kind.into()),
always_retry_on_network_error: value.0.always_retry_on_network_error,
runtime: value.0.runtime,
function_type: value.0.function_type,
}
}
}
Loading

0 comments on commit 94154c0

Please sign in to comment.