Skip to content

Commit

Permalink
feat(udf): Add deno as UDF language (#16263)
Browse files Browse the repository at this point in the history
Co-authored-by: Runji Wang <[email protected]>
  • Loading branch information
bakjos and wangrunji0408 authored Apr 18, 2024
1 parent 0c464ab commit 3daa160
Show file tree
Hide file tree
Showing 32 changed files with 2,659 additions and 216 deletions.
1,940 changes: 1,733 additions & 207 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = "0.1"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "23fe0dd" }
arrow-udf-wasm = { version = "0.2.1", features = ["build"] }
arrow-udf-python = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "6c32f71" }
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
Expand Down Expand Up @@ -315,6 +316,15 @@ futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev =
etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" }
# todo(wcy-fdu): remove this patch fork after opendal release a new version to apply azure workload identity change.
reqsign = { git = "https://github.com/wcy-fdu/reqsign.git", rev = "002ee2a" }
# patch to remove preserve_order from serde_json
deno_core = { git = "https://github.com/bakjos/deno_core", rev = "9b241c6" }
# patch to user reqwest 0.12.2
deno_fetch = { git = "https://github.com/bakjos/deno", rev = "787a232" }
deno_http = { git = "https://github.com/bakjos/deno", rev = "787a232" }
deno_net = { git = "https://github.com/bakjos/deno", rev = "787a232" }
deno_tls = { git = "https://github.com/bakjos/deno", rev = "787a232" }
deno_web = { git = "https://github.com/bakjos/deno", rev = "787a232" }
deno_websocket = { git = "https://github.com/bakjos/deno", rev = "787a232" }

[workspace.metadata.dylint]
libraries = [{ path = "./lints" }]
6 changes: 6 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ is_release = get_env ENABLE_RELEASE_PROFILE
is_not_release = not ${is_release}
is_dynamic_linking = get_env ENABLE_DYNAMIC_LINKING
is_hummock_trace = get_env ENABLE_HUMMOCK_TRACE
is_deno_udf_enabled = get_env ENABLE_DENO_UDF
is_python_udf_enabled = get_env ENABLE_PYTHON_UDF
if ${is_sanitizer_enabled}
Expand All @@ -58,6 +59,11 @@ else
set_env RISINGWAVE_FEATURE_FLAGS "--features rw-static-link"
end
if ${is_deno_udf_enabled}
flags = get_env RISINGWAVE_FEATURE_FLAGS
set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features embedded-deno-udf"
end
if ${is_python_udf_enabled}
flags = get_env RISINGWAVE_FEATURE_FLAGS
set_env RISINGWAVE_FEATURE_FLAGS "${flags} --features embedded-python-udf"
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ cargo build \
-p risingwave_compaction_test \
-p risingwave_e2e_extended_mode_test \
"${RISINGWAVE_FEATURE_FLAGS[@]}" \
--features embedded-deno-udf \
--features embedded-python-udf \
--profile "$profile"

Expand Down
2 changes: 2 additions & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
pkill java

echo "--- e2e, $mode, embedded udf"
python3 -m pip install --break-system-packages flask waitress
sqllogictest -p 4566 -d dev './e2e_test/udf/wasm_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/rust_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/js_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/python_udf.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/deno_udf.slt'

echo "--- Kill cluster"
cluster_stop
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ WORKDIR /risingwave
ENV ENABLE_BUILD_DASHBOARD=1

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" --features embedded-python-udf && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" --features embedded-deno-udf --features embedded-python-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ ENV JAVA_HOME ${JAVA_HOME_PATH}
ENV LD_LIBRARY_PATH ${JAVA_HOME_PATH}/lib/server:${LD_LIBRARY_PATH}

RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release -p risingwave_object_store --features hdfs-backend --features "rw-static-link" --features embedded-python-udf && \
cargo build -p risingwave_cmd_all --release -p risingwave_object_store --features hdfs-backend --features "rw-static-link" --features embedded-deno-udf --features embedded-python-udf && \
mkdir -p /risingwave/bin && \
mv /risingwave/target/release/risingwave /risingwave/bin/ && \
mv /risingwave/target/release/risingwave.dwp /risingwave/bin/ && \
Expand Down
233 changes: 233 additions & 0 deletions e2e_test/udf/deno_udf.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
statement ok
CREATE FUNCTION gcd(a int, b int) RETURNS int LANGUAGE javascript RUNTIME deno AS $$
if(a == null || b == null) {
return null;
}
while (b != 0) {
let t = b;
b = a % b;
a = t;
}
return a;
$$;

query I
select gcd(25, 15);
----
5

statement ok
drop function gcd;

statement ok
create function decimal_add(a decimal, b decimal) returns decimal language javascript RUNTIME deno as $$
return a.add(b);
$$;

query R
select decimal_add(1.11, 2.22);
----
3.33

statement ok
drop function decimal_add;


statement ok
create function to_string(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb) returns varchar language javascript RUNTIME deno as $$
return a.toString() + b.toString() + c.toString() + d.toString() + e.toString() + f.toString() + g.toString() + h.toString() + i.toString() + JSON.stringify(j);
$$;

query T
select to_string(false, 1::smallint, 2, 3, 4.5, 6.7, 8.9, 'abc', '\x010203', '{"key": 1}');
----
false1234.56.78.9abc1,2,3{"key":1}

statement ok
drop function to_string;

# show data types in javascript
statement ok
create function js_typeof(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb) returns jsonb language javascript RUNTIME deno as $$
return {
boolean: typeof a,
smallint: typeof b,
int: typeof c,
bigint: typeof d,
real: typeof e,
float: typeof f,
decimal: typeof g,
varchar: typeof h,
bytea: typeof i,
jsonb: typeof j,
};
$$;

query T
select js_typeof(false, 1::smallint, 2, 3, 4.5, 6.7, 8.9, 'abc', '\x010203', '{"key": 1}');
----
{"bigint": "bigint", "boolean": "boolean", "bytea": "object", "decimal": "object", "float": "number", "int": "number", "jsonb": "object", "real": "number", "smallint": "number", "varchar": "string"}

statement ok
drop function js_typeof;

statement ok
create function return_all(a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb, s struct<f1 int, f2 int>)
returns struct<a boolean, b smallint, c int, d bigint, e real, f float, g decimal, h varchar, i bytea, j jsonb, s struct<f1 int, f2 int>>
language javascript runtime deno as $$
return {a,b,c,d,e,f,g,h,i,j,s};
$$;

query T
select (return_all(
true,
1 ::smallint,
1,
1,
1,
1,
12345678901234567890.12345678,
'string',
'bytes',
'{"key":1}',
row(1, 2)::struct<f1 int, f2 int>
)).*;
----
t 1 1 1 1 1 12345678901234567890.12345678 string \x6279746573 {"key": 1} (1,2)

statement ok
drop function return_all;


statement ok
create function series(n int) returns table (x int) language javascript RUNTIME deno as $$
for(let i = 0; i < n; i++) {
yield i;
}
$$;

query I
select series(5);
----
0
1
2
3
4

statement ok
drop function series;


statement ok
create function split(s varchar) returns table (word varchar, length int) language javascript RUNTIME deno as $$
for(let word of s.split(' ')) {
yield { word: word, length: word.length };
}
$$;

query IT
select * from split('rising wave');
----
rising 6
wave 4

statement ok
drop function split;


statement ok
CREATE FUNCTION digest( t string ) RETURNS bytea LANGUAGE javascript RUNTIME deno AS $$
const subtle = crypto.subtle;
const key = await subtle.generateKey({
name: 'HMAC',
hash: 'SHA-256',
length: 256,
}, true, ['sign', 'verify']);
const enc = new TextEncoder();
const message = enc.encode(t);
const result = await subtle.sign({
name: 'HMAC',
}, key, message);
return result;
$$ ASYNC;

query I
select bit_length(digest('Hello'));
----
256

statement ok
drop function digest;

statement ok
CREATE FUNCTION delay_response()
RETURNS TABLE (x int) LANGUAGE javascript RUNTIME deno AS $$
const delayedResponses = {
delays: [50, 10, 15],
wait(delay) {
return new Promise((resolve) => {
setTimeout(resolve, delay);
});
},
async *[Symbol.asyncIterator]() {
for (const delay of this.delays) {
await this.wait(delay);
yield delay;
}
},
};
return delayedResponses;
$$ SYNC;

query I
select * FROM delay_response();
----
50
10
15

statement ok
drop function delay_response;

system ok
python3 e2e_test/udf/mock_server.py &

# wait for server to start
sleep 1s

statement ok
CREATE FUNCTION call_sse() RETURNS TABLE ( data struct<data struct<greetings string>>) LANGUAGE javascript RUNTIME deno USING LINK 'fs://e2e_test/udf/sse/bundled.table.js' SYNC;

query I
select * FROM call_sse();
----
(Hi)
(Bonjour)
(Hola)
(Ciao)
(Zdravo)

statement ok
drop function call_sse;

statement ok
CREATE FUNCTION fetch_api() RETURNS TABLE ( data struct< idx int>) LANGUAGE javascript RUNTIME deno AS $$
const response = await fetch('http://127.0.0.1:4200');
const resp = await response.json();
for (const r of resp.results) {
yield r;
}
$$ ASYNC GENERATOR;

query I
select * FROM fetch_api();
----
1
2

statement ok
drop function fetch_api;

system ok
pkill -9 python3
43 changes: 43 additions & 0 deletions e2e_test/udf/mock_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import json

from flask import Flask, Response, stream_with_context, jsonify

app = Flask(__name__)
def format_sse(data: str | None, event=None) -> str:
if data:
msg = f'data: {data}\n\n'
else:
msg = '\n'

if event is not None:
msg = f'event: {event}\n{msg}'

return msg

@app.route('/')
def home():
return jsonify({"results": [{"idx": 1}, {"idx": 2}]})

@app.route('/graphql/stream', methods=['POST'])
def stream():
print("sse stream called")
@stream_with_context
def eventStream():
messages = ["Hi", "Bonjour", "Hola", "Ciao", "Zdravo"]
for msg in messages:
data = {
"data": {
"greetings": msg
}
}
yield format_sse(json.dumps(data), "next")

yield format_sse(None, "complete")
return Response(eventStream(), mimetype="text/event-stream")

if __name__ == '__main__':
from waitress import serve
from werkzeug.serving import WSGIRequestHandler
WSGIRequestHandler.protocol_version = "HTTP/1.1"
serve(app, host="127.0.0.1", port=4200)
print("Server stopped.")
2 changes: 2 additions & 0 deletions e2e_test/udf/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
flask
waitress
Loading

0 comments on commit 3daa160

Please sign in to comment.