From 221486904408f035e09928d210040a10c64fb305 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sun, 14 Jan 2024 00:32:55 +0800 Subject: [PATCH] support table function Signed-off-by: Runji Wang --- e2e_test/udf/js_udf.slt | 19 ++++++++++ .../core/src/table_function/user_defined.rs | 37 +++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/e2e_test/udf/js_udf.slt b/e2e_test/udf/js_udf.slt index 0a64dcd535cd9..44a103d3c02ee 100644 --- a/e2e_test/udf/js_udf.slt +++ b/e2e_test/udf/js_udf.slt @@ -21,6 +21,13 @@ create function to_string(a boolean, b smallint, c int, d bigint, e real, f floa return a.toString() + b.toString() + c.toString() + d.toString() + e.toString() + f.toString() + g.toString(); $$; +statement ok +create function series(n int) returns table (x int) language javascript as $$ + for(let i = 0; i < n; i++) { + yield i; + } +$$; + query I select int_42(); ---- @@ -36,6 +43,15 @@ select to_string(false, 1::smallint, 2, 3, 4.5, 6.7, 'abc'); ---- false1234.56.7abc +query I +select series(5); +---- +0 +1 +2 +3 +4 + statement ok drop function int_42; @@ -44,3 +60,6 @@ drop function gcd; statement ok drop function to_string; + +statement ok +drop function series; diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index ffe0ba39db86b..06383543ceb7b 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; +use arrow_udf_js::{CallMode, Runtime as JsRuntime}; use arrow_udf_wasm::Runtime as WasmRuntime; use cfg_or_panic::cfg_or_panic; use futures_util::stream; @@ -42,6 +43,7 @@ pub struct UserDefinedTableFunction { enum UdfImpl { External(Arc), Wasm(Arc), + JavaScript(JsRuntime), } #[async_trait::async_trait] @@ -70,6 +72,11 @@ impl UdfImpl { yield res?; } } + UdfImpl::JavaScript(runtime) => { + for res in runtime.call_table_function(identifier, &input, 1024)? { + yield res?; + } + } UdfImpl::Wasm(runtime) => { for res in runtime.call_table_function(identifier, &input)? { yield res?; @@ -177,9 +184,12 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result()?, )); - let link = udtf.get_link()?; + let identifier = udtf.get_identifier()?; + let return_type = DataType::from(prost.get_return_type()?); + let client = match udtf.language.as_str() { "wasm" => { + let link = udtf.get_link()?; // Use `block_in_place` as an escape hatch to run async code here in sync context. // Calling `block_on` directly will panic. UdfImpl::Wasm(tokio::task::block_in_place(|| { @@ -187,16 +197,35 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result { + let mut rt = JsRuntime::new()?; + let body = format!( + "export function* {}({}) {{ {} }}", + identifier, + udtf.arg_names.join(","), + udtf.get_body()? + ); + rt.add_function( + identifier, + arrow_schema::DataType::try_from(&return_type)?, + CallMode::CalledOnNullInput, + &body, + )?; + UdfImpl::JavaScript(rt) + } // connect to UDF service - _ => UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client(link)?), + _ => { + let link = udtf.get_link()?; + UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client(link)?) + } }; Ok(UserDefinedTableFunction { children: prost.args.iter().map(expr_build_from_prost).try_collect()?, - return_type: prost.return_type.as_ref().expect("no return type").into(), + return_type, arg_schema, client, - identifier: udtf.get_identifier()?.clone(), + identifier: identifier.clone(), chunk_size, } .boxed())