diff --git a/Cargo.lock b/Cargo.lock index 090129a564581..f89c6646b2cca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,14 +293,14 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34ccd45e217ffa6e53bbb0080990e77113bdd4e91ddb84e97b77649810bcf1a7" +checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", "chrono", "half 2.3.1", "num", @@ -325,14 +325,14 @@ dependencies = [ [[package]] name = "arrow-array" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bda9acea48b25123c08340f3a8ac361aa0f74469bb36f5ee9acf923fce23e9d" +checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" dependencies = [ "ahash 0.8.6", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-buffer 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", "chrono", "half 2.3.1", "hashbrown 0.14.0", @@ -352,9 +352,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01a0fc21915b00fc6c2667b069c1b64bdd920982f426079bc4a7cab86822886c" +checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" dependencies = [ "bytes", "half 2.3.1", @@ -381,15 +381,15 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dc0368ed618d509636c1e3cc20db1281148190a78f43519487b2daf07b63b4a" +checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", + "arrow-select 50.0.0", "base64 0.21.4", "chrono", "half 2.3.1", @@ -430,27 +430,27 @@ dependencies = [ [[package]] name = "arrow-data" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907fafe280a3874474678c1858b9ca4cb7fd83fb8034ff5b6d6376205a08c634" +checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" dependencies = [ - "arrow-buffer 49.0.0", - "arrow-schema 49.0.0", + "arrow-buffer 50.0.0", + "arrow-schema 50.0.0", "half 2.3.1", "num", ] [[package]] name = "arrow-flight" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624e0dcb6b5a7a06222bfd2be3f7e905ce849a6b714ec989f18cdba330c77d38" +checksum = "1d7f215461ad6346f2e4cc853e377d4e076d533e1ed78d327debe83023e3601f" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-cast 49.0.0", - "arrow-ipc 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-cast 50.0.0", + "arrow-ipc 50.0.0", + "arrow-schema 50.0.0", "base64 0.21.4", "bytes", "futures", @@ -476,15 +476,15 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79a43d6808411886b8c7d4f6f7dd477029c1e77ffffffb7923555cc6579639cd" +checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-cast 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-cast 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", "flatbuffers", ] @@ -525,15 +525,15 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b23b0e53c0db57c6749997fd343d4c0354c994be7eca67152dd2bdb9a3e1bb4" +checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" dependencies = [ - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", + "arrow-select 50.0.0", "half 2.3.1", "num", ] @@ -555,15 +555,15 @@ dependencies = [ [[package]] name = "arrow-row" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "361249898d2d6d4a6eeb7484be6ac74977e48da12a4dd81a708d620cc558117a" +checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" dependencies = [ "ahash 0.8.6", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", "half 2.3.1", "hashbrown 0.14.0", ] @@ -579,9 +579,9 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09e28a5e781bf1b0f981333684ad13f5901f4cd2f20589eab7cf1797da8fc167" +checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" [[package]] name = "arrow-select" @@ -599,15 +599,15 @@ dependencies = [ [[package]] name = "arrow-select" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f6208466590960efc1d2a7172bc4ff18a67d6e25c529381d7f96ddaf0dc4036" +checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" dependencies = [ "ahash 0.8.6", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-data 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-data 50.0.0", + "arrow-schema 50.0.0", "num", ] @@ -627,15 +627,28 @@ dependencies = [ "regex-syntax 0.8.0", ] +[[package]] +name = "arrow-udf-js" +version = "0.1.0" +source = "git+https://github.com/risingwavelabs/arrow-udf.git?rev=7ba1c22#7ba1c226fa2f7418a217ee064a19b90efeb7143c" +dependencies = [ + "anyhow", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-schema 50.0.0", + "rquickjs", +] + [[package]] name = "arrow-udf-wasm" version = "0.1.0" -source = "git+https://github.com/risingwavelabs/arrow-udf.git?rev=f9a9e0d#f9a9e0d41d1a4ae26a6d90ac8aebf2e38a0c8a55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c45450e29b016fdc0ccbb22408daceead464a2178f7f8408886c7ca0c9e1aed" dependencies = [ "anyhow", - "arrow-array 49.0.0", - "arrow-ipc 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-ipc 50.0.0", + "arrow-schema 50.0.0", "base64 0.21.4", "genawaiter", "lazy_static", @@ -5064,14 +5077,14 @@ source = "git+https://github.com/icelake-io/icelake?rev=32c0bbf242f5c47b1e743f10 dependencies = [ "anyhow", "apache-avro 0.17.0", - "arrow-arith 49.0.0", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-cast 49.0.0", - "arrow-ord 49.0.0", - "arrow-row 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-arith 50.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-cast 50.0.0", + "arrow-ord 50.0.0", + "arrow-row 50.0.0", + "arrow-schema 50.0.0", + "arrow-select 50.0.0", "async-trait", "bitvec", "bytes", @@ -5088,7 +5101,7 @@ dependencies = [ "once_cell", "opendal", "ordered-float 3.9.1", - "parquet 49.0.0", + "parquet 50.0.0", "prometheus", "regex", "reqwest", @@ -7074,24 +7087,25 @@ dependencies = [ [[package]] name = "parquet" -version = "49.0.0" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af88740a842787da39b3d69ce5fbf6fce97d20211d3b299fee0a0da6430c74d4" +checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" dependencies = [ "ahash 0.8.6", - "arrow-array 49.0.0", - "arrow-buffer 49.0.0", - "arrow-cast 49.0.0", - "arrow-data 49.0.0", - "arrow-ipc 49.0.0", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-array 50.0.0", + "arrow-buffer 50.0.0", + "arrow-cast 50.0.0", + "arrow-data 50.0.0", + "arrow-ipc 50.0.0", + "arrow-schema 50.0.0", + "arrow-select 50.0.0", "base64 0.21.4", "brotli", "bytes", "chrono", "flate2", "futures", + "half 2.3.1", "hashbrown 0.14.0", "lz4_flex", "num", @@ -7820,7 +7834,7 @@ checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck 0.4.1", - "itertools 0.11.0", + "itertools 0.10.5", "log", "multimap 0.8.3", "once_cell", @@ -7854,7 +7868,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.48", @@ -8690,13 +8704,13 @@ dependencies = [ "anyhow", "arc-swap", "arrow-array 48.0.1", - "arrow-array 49.0.0", + "arrow-array 50.0.0", "arrow-buffer 48.0.1", - "arrow-buffer 49.0.0", + "arrow-buffer 50.0.0", "arrow-cast 48.0.1", - "arrow-cast 49.0.0", + "arrow-cast 50.0.0", "arrow-schema 48.0.1", - "arrow-schema 49.0.0", + "arrow-schema 50.0.0", "async-trait", "auto_enums", "auto_impl", @@ -8935,8 +8949,8 @@ version = "1.7.0-alpha" dependencies = [ "anyhow", "apache-avro 0.16.0", - "arrow-array 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-schema 50.0.0", "async-nats", "async-trait", "auto_enums", @@ -9104,8 +9118,9 @@ name = "risingwave_expr" version = "1.7.0-alpha" dependencies = [ "anyhow", - "arrow-array 49.0.0", - "arrow-schema 49.0.0", + "arrow-array 50.0.0", + "arrow-schema 50.0.0", + "arrow-udf-js", "arrow-udf-wasm", "async-trait", "auto_impl", @@ -9145,7 +9160,7 @@ version = "1.7.0-alpha" dependencies = [ "aho-corasick", "anyhow", - "arrow-schema 49.0.0", + "arrow-schema 50.0.0", "async-trait", "auto_enums", "chrono", @@ -9194,7 +9209,7 @@ version = "1.7.0-alpha" dependencies = [ "anyhow", "arc-swap", - "arrow-schema 49.0.0", + "arrow-schema 50.0.0", "arrow-udf-wasm", "assert_matches", "async-recursion", @@ -9984,10 +9999,10 @@ dependencies = [ name = "risingwave_udf" version = "0.1.0" dependencies = [ - "arrow-array 49.0.0", + "arrow-array 50.0.0", "arrow-flight", - "arrow-schema 49.0.0", - "arrow-select 49.0.0", + "arrow-schema 50.0.0", + "arrow-select 50.0.0", "cfg-or-panic", "futures-util", "madsim-tokio", @@ -10062,6 +10077,33 @@ dependencies = [ "retain_mut", ] +[[package]] +name = "rquickjs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0db265d331ae1b1a9fdb68466a8359bc9dcc5e78a9c323f790322f8442e005ac" +dependencies = [ + "rquickjs-core", +] + +[[package]] +name = "rquickjs-core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e51f2fc99917699385bfa290b776e712e414b222d7c2a9b2cd67b8e93585f3" +dependencies = [ + "rquickjs-sys", +] + +[[package]] +name = "rquickjs-sys" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b6865056bc4154c49bc8b2babd9232a8ba55dee4860fc74c789633aecad3ca" +dependencies = [ + "cc", +] + [[package]] name = "rsa" version = "0.9.2" @@ -13668,7 +13710,7 @@ dependencies = [ "hyper", "indexmap 1.9.3", "indexmap 2.0.0", - "itertools 0.11.0", + "itertools 0.10.5", "jni", "lazy_static", "lexical-core", diff --git a/Cargo.toml b/Cargo.toml index e0280fe3b9323..0bc9cab78b4e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,16 +124,17 @@ prost = { version = "0.12" } icelake = { git = "https://github.com/icelake-io/icelake", rev = "32c0bbf242f5c47b1e743f10577012fe7436c770", features = [ "prometheus", ] } -arrow-array = "49" -arrow-arith = "49" -arrow-cast = "49" -arrow-schema = "49" -arrow-buffer = "49" -arrow-flight = "49" -arrow-select = "49" -arrow-ord = "49" -arrow-row = "49" -arrow-udf-wasm = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "f9a9e0d" } +arrow-array = "50" +arrow-arith = "50" +arrow-cast = "50" +arrow-schema = "50" +arrow-buffer = "50" +arrow-flight = "50" +arrow-select = "50" +arrow-ord = "50" +arrow-row = "50" +arrow-udf-js = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "7ba1c22" } +arrow-udf-wasm = "0.1" arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" } arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" } arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" } @@ -143,7 +144,7 @@ arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" } deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd4640490202ffe98adbd13b09cef8e007b", features = [ "s3-no-concurrent-write", ] } -parquet = "49" +parquet = "50" thiserror-ext = "0.0.11" tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" } tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [ diff --git a/e2e_test/udf/js_udf.slt b/e2e_test/udf/js_udf.slt new file mode 100644 index 0000000000000..260fd991f648f --- /dev/null +++ b/e2e_test/udf/js_udf.slt @@ -0,0 +1,154 @@ +statement ok +create function int_42() returns int language javascript as $$ + return 42; +$$; + +query I +select int_42(); +---- +42 + +statement ok +drop function int_42; + + +statement ok +create function gcd(a int, b int) returns int language javascript as $$ + // required before we support `RETURNS NULL ON NULL INPUT` + if(a == null || b == null) { + return null; + } + while (b != 0) { + let t = b; + b = a % b; + a = t; + } + return a; +$$; + +query I +select gcd(25, 15); +---- +5 + +statement ok +drop function gcd; + + +statement ok +create function decimal_add(a decimal, b decimal) returns decimal language javascript as $$ + return a + b; +$$; + +query R +select decimal_add(1.11, 2.22); +---- +3.33 + +statement ok +drop function decimal_add; + + +statement ok +create function to_string(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb) returns varchar language javascript as $$ + return a.toString() + b.toString() + c.toString() + d.toString() + e.toString() + f.toString() + g.toString() + h.toString() + i.toString() + JSON.stringify(j); +$$; + +query T +select to_string(false, 1::smallint, 2, 3, 4.5, 6.7, 8.9, 'abc', '\x010203', '{"key": 1}'); +---- +false1234.56.78.9abc1,2,3{"key":1} + +statement ok +drop function to_string; + + +# show data types in javascript +statement ok +create function js_typeof(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb) returns jsonb language javascript as $$ + return { + boolean: typeof a, + smallint: typeof b, + int: typeof c, + bigint: typeof d, + real: typeof e, + float: typeof f, + decimal: typeof g, + varchar: typeof h, + bytea: typeof i, + jsonb: typeof j, + }; +$$; + +query T +select js_typeof(false, 1::smallint, 2, 3, 4.5, 6.7, 8.9, 'abc', '\x010203', '{"key": 1}'); +---- +{"bigint": "number", "boolean": "boolean", "bytea": "object", "decimal": "bigdecimal", "float": "number", "int": "number", "jsonb": "object", "real": "number", "smallint": "number", "varchar": "string"} + +statement ok +drop function js_typeof; + + +statement ok +create function return_all(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb, s struct) +returns struct> +language javascript as $$ + return {a,b,c,d,e,f,g,h,i,j,s}; +$$; + +query T +select (return_all( + true, + 1 ::smallint, + 1, + 1, + 1, + 1, + 12345678901234567890.12345678, + 'string', + 'bytes', + '{"key":1}', + row(1, 2)::struct +)).*; +---- +t 1 1 1 1 1 12345678901234567890.12345678 string \x6279746573 {"key": 1} (1,2) + +statement ok +drop function return_all; + + +statement ok +create function series(n int) returns table (x int) language javascript as $$ + for(let i = 0; i < n; i++) { + yield i; + } +$$; + +query I +select series(5); +---- +0 +1 +2 +3 +4 + +statement ok +drop function series; + + +statement ok +create function split(s varchar) returns table (word varchar, length int) language javascript as $$ + for(let word of s.split(' ')) { + yield { word: word, length: word.length }; + } +$$; + +query IT +select * from split('rising wave'); +---- +rising 6 +wave 4 + +statement ok +drop function split; diff --git a/e2e_test/udf/wasm/Cargo.toml b/e2e_test/udf/wasm/Cargo.toml index 5e413a40e37b8..79d911279d63c 100644 --- a/e2e_test/udf/wasm/Cargo.toml +++ b/e2e_test/udf/wasm/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] -arrow-udf = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "f9a9e0d" } +arrow-udf = "0.1" genawaiter = "0.99" rust_decimal = "1" serde_json = "1" diff --git a/proto/catalog.proto b/proto/catalog.proto index ec7c68a3802ba..741a85d2d3aa9 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -213,11 +213,12 @@ message Function { uint32 database_id = 3; string name = 4; uint32 owner = 9; + repeated string arg_names = 15; repeated data.DataType arg_types = 5; data.DataType return_type = 6; string language = 7; - string link = 8; - string identifier = 10; + optional string link = 8; + optional string identifier = 10; optional string body = 14; oneof kind { diff --git a/proto/expr.proto b/proto/expr.proto index 9c6dd8e59fbfc..f62ee2936d115 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -471,21 +471,27 @@ message WindowFunction { message UserDefinedFunction { repeated ExprNode children = 1; string name = 2; + repeated string arg_names = 8; repeated data.DataType arg_types = 3; string language = 4; // For external UDF: the link to the external function service. // For WASM UDF: the link to the wasm binary file. - string link = 5; + optional string link = 5; // An unique identifier for the function. // For external UDF, it's the name of the function in the external function service. // For WASM UDF, it's the name of the function in the wasm binary file. - string identifier = 6; + // For JavaScript UDF, it's the name of the function. + optional string identifier = 6; + // For JavaScript UDF, it's the body of the function. + optional string body = 7; } // Additional information for user defined table functions. message UserDefinedTableFunction { + repeated string arg_names = 8; repeated data.DataType arg_types = 3; string language = 4; - string link = 5; - string identifier = 6; + optional string link = 5; + optional string identifier = 6; + optional string body = 7; } diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index ff1910e9a0182..773f27b120323 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -19,6 +19,7 @@ normal = ["workspace-hack", "ctor"] anyhow = "1" arrow-array = { workspace = true } arrow-schema = { workspace = true } +arrow-udf-js = { workspace = true } arrow-udf-wasm = { workspace = true } async-trait = "0.1" auto_impl = "1" diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index 60f04799838ba..260e5bb7a998d 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -20,6 +20,7 @@ use std::time::Duration; use anyhow::Context; use arrow_schema::{Field, Fields, Schema}; +use arrow_udf_js::{CallMode, Runtime as JsRuntime}; use arrow_udf_wasm::Runtime as WasmRuntime; use await_tree::InstrumentAwait; use cfg_or_panic::cfg_or_panic; @@ -61,6 +62,7 @@ const INITIAL_RETRY_COUNT: u8 = 16; enum UdfImpl { External(Arc), Wasm(Arc), + JavaScript(JsRuntime), } #[async_trait::async_trait] @@ -123,6 +125,7 @@ impl UserDefinedFunction { let output: arrow_array::RecordBatch = match &self.imp { UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &input)?, + UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &input)?, UdfImpl::External(client) => { let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); let result = if disable_retry_count != 0 { @@ -189,16 +192,36 @@ impl Build for UserDefinedFunction { let return_type = DataType::from(prost.get_return_type().unwrap()); let udf = prost.get_rex_node().unwrap().as_udf().unwrap(); + let identifier = udf.get_identifier()?; let imp = match udf.language.as_str() { "wasm" => { + let link = udf.get_link()?; // Use `block_in_place` as an escape hatch to run async code here in sync context. // Calling `block_on` directly will panic. UdfImpl::Wasm(tokio::task::block_in_place(|| { - tokio::runtime::Handle::current() - .block_on(get_or_create_wasm_runtime(&udf.link)) + tokio::runtime::Handle::current().block_on(get_or_create_wasm_runtime(link)) })?) } - _ => UdfImpl::External(get_or_create_flight_client(&udf.link)?), + "javascript" => { + let mut rt = JsRuntime::new()?; + let body = format!( + "export function {}({}) {{ {} }}", + identifier, + udf.arg_names.join(","), + udf.get_body()? + ); + rt.add_function( + identifier, + arrow_schema::DataType::try_from(&return_type)?, + CallMode::CalledOnNullInput, + &body, + )?; + UdfImpl::JavaScript(rt) + } + _ => { + let link = udf.get_link()?; + UdfImpl::External(get_or_create_flight_client(link)?) + } }; let arg_schema = Arc::new(Schema::new( @@ -222,8 +245,8 @@ impl Build for UserDefinedFunction { return_type, arg_schema, imp, - identifier: udf.identifier.clone(), - span: format!("udf_call({})", udf.identifier).into(), + identifier: identifier.clone(), + span: format!("udf_call({})", identifier).into(), disable_retry_count: AtomicU8::new(0), }) } diff --git a/src/expr/core/src/table_function/user_defined.rs b/src/expr/core/src/table_function/user_defined.rs index 83658026ed56d..06383543ceb7b 100644 --- a/src/expr/core/src/table_function/user_defined.rs +++ b/src/expr/core/src/table_function/user_defined.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema, SchemaRef}; +use arrow_udf_js::{CallMode, Runtime as JsRuntime}; use arrow_udf_wasm::Runtime as WasmRuntime; use cfg_or_panic::cfg_or_panic; use futures_util::stream; @@ -42,6 +43,7 @@ pub struct UserDefinedTableFunction { enum UdfImpl { External(Arc), Wasm(Arc), + JavaScript(JsRuntime), } #[async_trait::async_trait] @@ -70,6 +72,11 @@ impl UdfImpl { yield res?; } } + UdfImpl::JavaScript(runtime) => { + for res in runtime.call_table_function(identifier, &input, 1024)? { + yield res?; + } + } UdfImpl::Wasm(runtime) => { for res in runtime.call_table_function(identifier, &input)? { yield res?; @@ -177,28 +184,48 @@ pub fn new_user_defined(prost: &PbTableFunction, chunk_size: usize) -> Result()?, )); + let identifier = udtf.get_identifier()?; + let return_type = DataType::from(prost.get_return_type()?); + let client = match udtf.language.as_str() { "wasm" => { + let link = udtf.get_link()?; // Use `block_in_place` as an escape hatch to run async code here in sync context. // Calling `block_on` directly will panic. UdfImpl::Wasm(tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on( - crate::expr::expr_udf::get_or_create_wasm_runtime(&udtf.link), - ) + tokio::runtime::Handle::current() + .block_on(crate::expr::expr_udf::get_or_create_wasm_runtime(link)) })?) } + "javascript" => { + let mut rt = JsRuntime::new()?; + let body = format!( + "export function* {}({}) {{ {} }}", + identifier, + udtf.arg_names.join(","), + udtf.get_body()? + ); + rt.add_function( + identifier, + arrow_schema::DataType::try_from(&return_type)?, + CallMode::CalledOnNullInput, + &body, + )?; + UdfImpl::JavaScript(rt) + } // connect to UDF service - _ => UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client( - &udtf.link, - )?), + _ => { + let link = udtf.get_link()?; + UdfImpl::External(crate::expr::expr_udf::get_or_create_flight_client(link)?) + } }; Ok(UserDefinedTableFunction { children: prost.args.iter().map(expr_build_from_prost).try_collect()?, - return_type: prost.return_type.as_ref().expect("no return type").into(), + return_type, arg_schema, client, - identifier: udtf.identifier.clone(), + identifier: identifier.clone(), chunk_size, } .boxed()) diff --git a/src/expr/udf/README-js.md b/src/expr/udf/README-js.md new file mode 100644 index 0000000000000..902bce4ef52ee --- /dev/null +++ b/src/expr/udf/README-js.md @@ -0,0 +1,83 @@ +# Use UDFs in JavaScript + +This article provides a step-by-step guide for defining JavaScript functions in RisingWave. + +JavaScript code is inlined in `CREATE FUNCTION` statement and then run on the embedded QuickJS virtual machine in RisingWave. It does not support access to external networks and is limited to computational tasks only. +Compared to other languages, JavaScript UDFs offer the easiest way to define UDFs in RisingWave. + +## Define your functions + +You can use the `CREATE FUNCTION` statement to create JavaScript UDFs. The syntax is as follows: + +```sql +CREATE FUNCTION function_name ( arg_name arg_type [, ...] ) + [ RETURNS return_type | RETURNS TABLE ( column_name column_type [, ...] ) ] + LANGUAGE javascript + AS [ $$ function_body $$ | 'function_body' ]; +``` + +The argument names you define can be used in the function body. For example: + +```sql +CREATE FUNCTION gcd(a int, b int) RETURNS int LANGUAGE javascript AS $$ + if(a == null || b == null) { + return null; + } + while (b != 0) { + let t = b; + b = a % b; + a = t; + } + return a; +$$; +``` + +The correspondence between SQL types and JavaScript types can be found in the [appendix table](#appendix-type-mapping). You need to ensure that the type of the return value is either `null` or consistent with the type in the `RETURNS` clause. + +If the function you define returns a table, you need to use the `yield` statement to return the data of each row. For example: + +```sql +CREATE FUNCTION series(n int) RETURNS TABLE (x int) LANGUAGE javascript AS $$ + for(let i = 0; i < n; i++) { + yield i; + } +$$; +``` + +## Use your functions + +Once the UDFs are created in RisingWave, you can use them in SQL queries just like any built-in functions. For example: + +```sql +SELECT gcd(25, 15); +SELECT * from series(5); +``` + +## Appendix: Type Mapping + +The following table shows the type mapping between SQL and JavaScript: + +| SQL Type | JS Type | Note | +| --------------------- | ------------- | --------------------- | +| boolean | boolean | | +| smallint | number | | +| int | number | | +| bigint | number | | +| real | number | | +| double precision | number | | +| decimal | BigDecimal | | +| date | | not supported yet | +| time | | not supported yet | +| timestamp | | not supported yet | +| timestamptz | | not supported yet | +| interval | | not supported yet | +| varchar | string | | +| bytea | Uint8Array | | +| jsonb | null, boolean, number, string, array or object | `JSON.parse(string)` | +| smallint[] | Int16Array | | +| int[] | Int32Array | | +| bigint[] | BigInt64Array | | +| real[] | Float32Array | | +| double precision[] | Float64Array | | +| others[] | array | | +| struct<..> | object | | diff --git a/src/frontend/src/catalog/function_catalog.rs b/src/frontend/src/catalog/function_catalog.rs index d0f037bcb47b5..96dbbe77c2a12 100644 --- a/src/frontend/src/catalog/function_catalog.rs +++ b/src/frontend/src/catalog/function_catalog.rs @@ -26,12 +26,13 @@ pub struct FunctionCatalog { pub name: String, pub owner: u32, pub kind: FunctionKind, + pub arg_names: Vec, pub arg_types: Vec, pub return_type: DataType, pub language: String, - pub identifier: String, + pub identifier: Option, pub body: Option, - pub link: String, + pub link: Option, } #[derive(Clone, Display, PartialEq, Eq, Hash, Debug)] @@ -60,6 +61,7 @@ impl From<&PbFunction> for FunctionCatalog { name: prost.name.clone(), owner: prost.owner, kind: prost.kind.as_ref().unwrap().into(), + arg_names: prost.arg_names.clone(), arg_types: prost.arg_types.iter().map(|arg| arg.into()).collect(), return_type: prost.return_type.as_ref().expect("no return type").into(), language: prost.language.clone(), diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs index 7e618b30ec623..da4f7de4f6438 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_functions.rs @@ -70,7 +70,7 @@ impl SysCatalogReaderImpl { ))), Some(ScalarImpl::Int32(function.return_type.to_oid())), Some(ScalarImpl::Utf8(function.language.clone().into())), - Some(ScalarImpl::Utf8(function.link.clone().into())), + function.link.clone().map(|s| ScalarImpl::Utf8(s.into())), Some(ScalarImpl::Utf8( get_acl_items( &Object::FunctionId(function.id.function_id()), diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index 7fa8232736baa..e3000d0c245ab 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -73,10 +73,12 @@ impl TableFunction { .udtf_catalog .as_ref() .map(|c| UserDefinedTableFunctionPb { + arg_names: c.arg_names.clone(), arg_types: c.arg_types.iter().map(|t| t.to_protobuf()).collect(), language: c.language.clone(), link: c.link.clone(), identifier: c.identifier.clone(), + body: c.body.clone(), }), } } diff --git a/src/frontend/src/expr/user_defined_function.rs b/src/frontend/src/expr/user_defined_function.rs index 165774d1acb4b..0724b55254617 100644 --- a/src/frontend/src/expr/user_defined_function.rs +++ b/src/frontend/src/expr/user_defined_function.rs @@ -51,13 +51,13 @@ impl UserDefinedFunction { // FIXME(yuhao): owner is not in udf proto. owner: u32::MAX - 1, kind: FunctionKind::Scalar, + arg_names: udf.arg_names.clone(), arg_types, return_type, language: udf.get_language().clone(), - identifier: udf.get_identifier().clone(), - // TODO: Ensure if we need `body` here - body: None, - link: udf.get_link().clone(), + identifier: udf.identifier.clone(), + body: udf.body.clone(), + link: udf.link.clone(), }; Ok(Self { @@ -81,6 +81,7 @@ impl Expr for UserDefinedFunction { rex_node: Some(RexNode::Udf(UserDefinedFunction { children: self.args.iter().map(Expr::to_expr_proto).collect(), name: self.catalog.name.clone(), + arg_names: self.catalog.arg_names.clone(), arg_types: self .catalog .arg_types @@ -90,6 +91,7 @@ impl Expr for UserDefinedFunction { language: self.catalog.language.clone(), identifier: self.catalog.identifier.clone(), link: self.catalog.link.clone(), + body: self.catalog.body.clone(), })), } } diff --git a/src/frontend/src/handler/create_function.rs b/src/frontend/src/handler/create_function.rs index 64d5d615dce51..10a7fab06267d 100644 --- a/src/frontend/src/handler/create_function.rs +++ b/src/frontend/src/handler/create_function.rs @@ -54,7 +54,7 @@ pub async fn handle_create_function( Some(lang) => { let lang = lang.real_value().to_lowercase(); match &*lang { - "python" | "java" | "wasm" => lang, + "python" | "java" | "wasm" | "javascript" => lang, _ => { return Err(ErrorCode::InvalidParameterValue(format!( "language {} is not supported", @@ -96,12 +96,10 @@ pub async fn handle_create_function( } }; - let Some(using) = params.using else { - return Err(ErrorCode::InvalidParameterValue("USING must be specified".to_string()).into()); - }; - + let mut arg_names = vec![]; let mut arg_types = vec![]; for arg in args.unwrap_or_default() { + arg_names.push(arg.name.map_or("".to_string(), |n| n.real_value())); arg_types.push(bind_data_type(&arg.data_type)?); } @@ -124,12 +122,13 @@ pub async fn handle_create_function( return Err(CatalogError::Duplicated("function", name).into()); } - let link; let identifier; + let mut link = None; + let mut body = None; match language.as_str() { "python" | "java" | "" => { - let CreateFunctionUsing::Link(l) = using else { + let Some(CreateFunctionUsing::Link(l)) = params.using else { return Err(ErrorCode::InvalidParameterValue( "USING LINK must be specified".to_string(), ) @@ -141,11 +140,10 @@ pub async fn handle_create_function( ); }; identifier = id; - link = l; // check UDF server { - let client = ArrowFlightUdfClient::connect(&link) + let client = ArrowFlightUdfClient::connect(&l) .await .map_err(|e| anyhow!(e))?; /// A helper function to create a unnamed field from data type. @@ -171,6 +169,20 @@ pub async fn handle_create_function( .await .context("failed to check UDF signature")?; } + link = Some(l); + } + "javascript" => { + identifier = function_name.to_string(); + body = Some(match params.as_ { + Some(FunctionDefinition::SingleQuotedDef(s)) => s, + Some(FunctionDefinition::DoubleDollarDef(s)) => s, + _ => { + return Err(ErrorCode::InvalidParameterValue( + "AS must be specified".to_string(), + ) + .into()) + } + }); } "wasm" => { identifier = wasm_identifier( @@ -179,12 +191,17 @@ pub async fn handle_create_function( &return_type, matches!(kind, Kind::Table(_)), ); - + let Some(using) = params.using else { + return Err(ErrorCode::InvalidParameterValue( + "USING must be specified".to_string(), + ) + .into()); + }; link = match using { CreateFunctionUsing::Link(link) => { let runtime = get_or_create_wasm_runtime(&link).await?; check_wasm_function(&runtime, &identifier)?; - link + Some(link) } CreateFunctionUsing::Base64(encoded) => { // decode wasm binary from base64 @@ -205,7 +222,11 @@ pub async fn handle_create_function( ) .await?; - format!("{}/{}", system_params.wasm_storage_url(), object_name) + Some(format!( + "{}/{}", + system_params.wasm_storage_url(), + object_name + )) } }; } @@ -218,12 +239,13 @@ pub async fn handle_create_function( database_id, name: function_name, kind: Some(kind), + arg_names, arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, - identifier, - body: None, + identifier: Some(identifier), link, + body, owner: session.user_id(), }; diff --git a/src/frontend/src/handler/create_sql_function.rs b/src/frontend/src/handler/create_sql_function.rs index bbe504d779bfd..c0f80844351a9 100644 --- a/src/frontend/src/handler/create_sql_function.rs +++ b/src/frontend/src/handler/create_sql_function.rs @@ -113,8 +113,10 @@ pub async fn handle_create_sql_function( } }; + let mut arg_names = vec![]; let mut arg_types = vec![]; for arg in args.unwrap_or_default() { + arg_names.push(arg.name.map_or("".to_string(), |n| n.real_value())); arg_types.push(bind_data_type(&arg.data_type)?); } @@ -156,12 +158,13 @@ pub async fn handle_create_sql_function( database_id, name: function_name, kind: Some(kind), + arg_names, arg_types: arg_types.into_iter().map(|t| t.into()).collect(), return_type: Some(return_type.into()), language, - identifier: "".to_string(), + identifier: None, body: Some(body), - link: "".to_string(), + link: None, owner: session.user_id(), }; diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 704a8ceb1bc00..d5addc3094e48 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -252,7 +252,7 @@ pub async fn handle_show_object( Some(t.arg_types.iter().map(|t| t.to_string()).join(", ").into()), Some(t.return_type.to_string().into()), Some(t.language.clone().into()), - Some(t.link.clone().into()), + t.link.clone().map(Into::into), ]) }) .collect_vec(); diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index bc9ce2b08c32b..bf8cb8c0fc1e1 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -703,11 +703,12 @@ impl MigrationTrait for Migration { .table(Function::Table) .col(ColumnDef::new(Function::FunctionId).integer().primary_key()) .col(ColumnDef::new(Function::Name).string().not_null()) + .col(ColumnDef::new(Function::ArgNames).json().not_null()) .col(ColumnDef::new(Function::ArgTypes).json().not_null()) .col(ColumnDef::new(Function::ReturnType).json().not_null()) .col(ColumnDef::new(Function::Language).string().not_null()) - .col(ColumnDef::new(Function::Link).string().not_null()) - .col(ColumnDef::new(Function::Identifier).string().not_null()) + .col(ColumnDef::new(Function::Link).string()) + .col(ColumnDef::new(Function::Identifier).string()) .col(ColumnDef::new(Function::Body).string()) .col(ColumnDef::new(Function::Kind).string().not_null()) .foreign_key( @@ -1095,6 +1096,7 @@ enum Function { Table, FunctionId, Name, + ArgNames, ArgTypes, ReturnType, Language, diff --git a/src/meta/model_v2/src/function.rs b/src/meta/model_v2/src/function.rs index 5976685893afb..ae68782a50fd1 100644 --- a/src/meta/model_v2/src/function.rs +++ b/src/meta/model_v2/src/function.rs @@ -36,11 +36,13 @@ pub struct Model { #[sea_orm(primary_key, auto_increment = false)] pub function_id: FunctionId, pub name: String, + // encode Vec as comma separated string + pub arg_names: String, pub arg_types: DataTypeArray, pub return_type: DataType, pub language: String, - pub link: String, - pub identifier: String, + pub link: Option, + pub identifier: Option, pub body: Option, pub kind: FunctionKind, } @@ -90,6 +92,7 @@ impl From for ActiveModel { Self { function_id: Set(function.id as _), name: Set(function.name), + arg_names: Set(function.arg_names.join(",")), arg_types: Set(DataTypeArray(function.arg_types)), return_type: Set(DataType(function.return_type.unwrap())), language: Set(function.language), diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 8128981b12283..562dd1845376b 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -274,6 +274,12 @@ impl From> for PbFunction { database_id: value.1.database_id.unwrap() as _, name: value.0.name, owner: value.1.owner_id as _, + arg_names: value + .0 + .arg_names + .split(',') + .map(|s| s.to_string()) + .collect(), arg_types: value.0.arg_types.into_inner(), return_type: Some(value.0.return_type.into_inner()), language: value.0.language, diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 8e7630d97f875..743e290899f12 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -66,7 +66,7 @@ hmac = { version = "0.12", default-features = false, features = ["reset"] } hyper = { version = "0.14", features = ["full"] } indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["serde"] } indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } -itertools = { version = "0.11" } +itertools = { version = "0.10" } jni = { version = "0.21", features = ["invocation"] } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lexical-core = { version = "0.8", features = ["format"] } @@ -170,7 +170,7 @@ frunk_core = { version = "0.4", default-features = false, features = ["std"] } generic-array = { version = "0.14", default-features = false, features = ["more_lengths", "zeroize"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2", features = ["serde"] } -itertools = { version = "0.11" } +itertools = { version = "0.10" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits"] } log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] }