Skip to content

Commit

Permalink
support table function
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 committed Jan 13, 2024
1 parent bad4644 commit 2214869
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
19 changes: 19 additions & 0 deletions e2e_test/udf/js_udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ create function to_string(a boolean, b smallint, c int, d bigint, e real, f floa
return a.toString() + b.toString() + c.toString() + d.toString() + e.toString() + f.toString() + g.toString();
$$;

statement ok
create function series(n int) returns table (x int) language javascript as $$
for(let i = 0; i < n; i++) {
yield i;
}
$$;

query I
select int_42();
----
Expand All @@ -36,6 +43,15 @@ select to_string(false, 1::smallint, 2, 3, 4.5, 6.7, 'abc');
----
false1234.56.7abc

query I
select series(5);
----
0
1
2
3
4

statement ok
drop function int_42;

Expand All @@ -44,3 +60,6 @@ drop function gcd;

statement ok
drop function to_string;

statement ok
drop function series;
37 changes: 33 additions & 4 deletions src/expr/core/src/table_function/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema, SchemaRef};
use arrow_udf_js::{CallMode, Runtime as JsRuntime};
use arrow_udf_wasm::Runtime as WasmRuntime;
use cfg_or_panic::cfg_or_panic;
use futures_util::stream;
Expand All @@ -42,6 +43,7 @@ pub struct UserDefinedTableFunction {
enum UdfImpl {
External(Arc<ArrowFlightUdfClient>),
Wasm(Arc<WasmRuntime>),
JavaScript(JsRuntime),
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -70,6 +72,11 @@ impl UdfImpl {
yield res?;
}
}
UdfImpl::JavaScript(runtime) => {
for res in runtime.call_table_function(identifier, &input, 1024)? {
yield res?;
}
}
UdfImpl::Wasm(runtime) => {
for res in runtime.call_table_function(identifier, &input)? {
yield res?;
Expand Down Expand Up @@ -177,26 +184,48 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result<Bo
.try_collect::<_, Fields, _>()?,
));

let link = udtf.get_link()?;
let identifier = udtf.get_identifier()?;
let return_type = DataType::from(prost.get_return_type()?);

let client = match udtf.language.as_str() {
"wasm" => {
let link = udtf.get_link()?;
// Use `block_in_place` as an escape hatch to run async code here in sync context.
// Calling `block_on` directly will panic.
UdfImpl::Wasm(tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(crate::expr::expr_udf::get_or_create_wasm_runtime(link))
})?)
}
"javascript" => {
let mut rt = JsRuntime::new()?;
let body = format!(
"export function* {}({}) {{ {} }}",
identifier,
udtf.arg_names.join(","),
udtf.get_body()?
);
rt.add_function(
identifier,
arrow_schema::DataType::try_from(&return_type)?,
CallMode::CalledOnNullInput,
&body,
)?;
UdfImpl::JavaScript(rt)
}
// connect to UDF service
_ => UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client(link)?),
_ => {
let link = udtf.get_link()?;
UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client(link)?)
}
};

Ok(UserDefinedTableFunction {
children: prost.args.iter().map(expr_build_from_prost).try_collect()?,
return_type: prost.return_type.as_ref().expect("no return type").into(),
return_type,
arg_schema,
client,
identifier: udtf.get_identifier()?.clone(),
identifier: identifier.clone(),
chunk_size,
}
.boxed())
Expand Down

0 comments on commit 2214869

Please sign in to comment.