Skip to content

Commit

Permalink
refactor(udf): switch to the latest arrow-udf versions (#16619)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored May 11, 2024
1 parent ffb8a0a commit 228b9e8
Show file tree
Hide file tree
Showing 57 changed files with 741 additions and 4,008 deletions.
317 changes: 152 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ members = [
"src/expr/core",
"src/expr/impl",
"src/expr/macro",
"src/expr/udf",
"src/frontend",
"src/frontend/macro",
"src/frontend/planner_test",
Expand Down Expand Up @@ -139,10 +138,11 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = "0.1"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "23fe0dd" }
arrow-udf-wasm = { version = "0.2.1", features = ["build"] }
arrow-udf-python = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "6c32f71" }
arrow-udf-js = "0.2"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "fa36365" }
arrow-udf-wasm = { version = "0.2.2", features = ["build"] }
arrow-udf-python = "0.1"
arrow-udf-flight = "0.1"
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
Expand Down
10 changes: 7 additions & 3 deletions ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ cd java
mvn -B package -Dmaven.test.skip=true
mvn -B install -Dmaven.test.skip=true --pl java-binding-integration-test --am
mvn dependency:copy-dependencies --no-transfer-progress --pl java-binding-integration-test
mvn -B test --pl udf
cd ..

echo "--- Build Java UDF"
cd e2e_test/udf/java
mvn -B package
cd ../../..

echo "--- Build rust binary for java binding integration test"
cargo build -p risingwave_java_binding --bin data-chunk-payload-generator --bin data-chunk-payload-convert-generator

Expand All @@ -30,9 +34,9 @@ tar --zstd -cf java-binding-integration-test.tar.zst bin java/java-binding-integ

echo "--- Upload Java artifacts"
cp java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz ./risingwave-connector.tar.gz
cp java/udf-example/target/risingwave-udf-example.jar ./risingwave-udf-example.jar
cp e2e_test/udf/java/target/risingwave-udf-example.jar ./udf.jar
cp e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm udf.wasm
buildkite-agent artifact upload ./risingwave-connector.tar.gz
buildkite-agent artifact upload ./risingwave-udf-example.jar
buildkite-agent artifact upload ./java-binding-integration-test.tar.zst
buildkite-agent artifact upload ./udf.jar
buildkite-agent artifact upload ./udf.wasm
5 changes: 3 additions & 2 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ download-and-decompress-artifact e2e_test_generated ./
download-and-decompress-artifact risingwave_e2e_extended_mode_test-"$profile" target/debug/
mkdir -p e2e_test/udf/wasm/target/wasm32-wasi/release/
buildkite-agent artifact download udf.wasm e2e_test/udf/wasm/target/wasm32-wasi/release/
buildkite-agent artifact download risingwave-udf-example.jar ./
buildkite-agent artifact download udf.jar ./
mv target/debug/risingwave_e2e_extended_mode_test-"$profile" target/debug/risingwave_e2e_extended_mode_test

chmod +x ./target/debug/risingwave_e2e_extended_mode_test
Expand Down Expand Up @@ -105,6 +105,7 @@ echo "--- e2e, $mode, Apache Superset"
sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile}"

echo "--- e2e, $mode, external python udf"
python3 -m pip install --break-system-packages arrow-udf==0.2.1
python3 e2e_test/udf/test.py &
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
Expand All @@ -117,7 +118,7 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/always_retry_python.slt'
# sqllogictest -p 4566 -d dev './e2e_test/udf/retry_python.slt'

echo "--- e2e, $mode, external java udf"
java -jar risingwave-udf-example.jar &
java -jar udf.jar &
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
pkill java
Expand Down
5 changes: 0 additions & 5 deletions ci/scripts/run-unit-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ set -euo pipefail

REPO_ROOT=${PWD}

echo "+++ Run python UDF SDK unit tests"
cd "${REPO_ROOT}"/src/expr/udf/python
python3 -m pytest
cd "${REPO_ROOT}"

echo "+++ Run unit tests"
# use tee to disable progress bar
NEXTEST_PROFILE=ci cargo nextest run --features failpoints,sync_point --workspace --exclude risingwave_simulation
6 changes: 4 additions & 2 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ create function int_42() returns int as int_42 using link '555.0.0.1:8815';
----
db error: ERROR: Failed to run the query

Caused by:
Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address
Caused by these errors (recent errors listed first):
1: Expr error
2: UDF error
3: Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address


statement error
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/udf/external_udf.slt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Before running this test:
# python3 e2e_test/udf/test.py
# or:
# cd java/udf-example && mvn package && java -jar target/risingwave-udf-example.jar
# cd e2e_test/udf/java && mvn package && java -jar target/risingwave-udf-example.jar

# Create a function.
statement ok
Expand Down
File renamed without changes.
12 changes: 2 additions & 10 deletions java/udf-example/pom.xml → e2e_test/udf/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- remove this section if you want to build the project independently -->
<parent>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-java-root</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>com.risingwave</groupId>
<artifactId>risingwave-udf-example</artifactId>
<version>0.1.1-SNAPSHOT</version>
<version>0.1.0-SNAPSHOT</version>

<name>udf-example</name>
<url>https://docs.risingwave.com/docs/current/udf-java</url>
Expand All @@ -31,7 +23,7 @@
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-udf</artifactId>
<version>0.1.3-SNAPSHOT</version>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class UdfExample {
public static void main(String[] args) throws IOException {
try (var server = new UdfServer("0.0.0.0", 8815)) {
try (var server = new UdfServer("localhost", 8815)) {
server.addFunction("int_42", new Int42());
server.addFunction("float_to_decimal", new FloatToDecimal());
server.addFunction("sleep", new Sleep());
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/udf/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
flask
waitress
waitress
arrow_udf==0.2.1
170 changes: 118 additions & 52 deletions e2e_test/udf/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
from typing import Iterator, List, Optional, Tuple, Any
from decimal import Decimal

sys.path.append("src/expr/udf/python") # noqa

from risingwave.udf import udf, udtf, UdfServer
from arrow_udf import udf, udtf, UdfServer


@udf(input_types=[], result_type="INT")
Expand All @@ -47,13 +45,21 @@ def gcd3(x: int, y: int, z: int) -> int:
return gcd(gcd(x, y), z)


@udf(input_types=["BYTEA"], result_type="STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>")
@udf(
input_types=["BYTEA"],
result_type="STRUCT<src_addr: VARCHAR, dst_addr: VARCHAR, src_port: SMALLINT, dst_port: SMALLINT>",
)
def extract_tcp_info(tcp_packet: bytes):
src_addr, dst_addr = struct.unpack("!4s4s", tcp_packet[12:20])
src_port, dst_port = struct.unpack("!HH", tcp_packet[20:24])
src_addr = socket.inet_ntoa(src_addr)
dst_addr = socket.inet_ntoa(dst_addr)
return src_addr, dst_addr, src_port, dst_port
return {
"src_addr": src_addr,
"dst_addr": dst_addr,
"src_port": src_port,
"dst_port": dst_port,
}


@udtf(input_types="INT", result_types="INT")
Expand Down Expand Up @@ -84,7 +90,7 @@ def hex_to_dec(hex: Optional[str]) -> Optional[Decimal]:
return dec


@udf(input_types=["FLOAT8"], result_type="DECIMAL")
@udf(input_types=["FLOAT64"], result_type="DECIMAL")
def float_to_decimal(f: float) -> Decimal:
return Decimal(f)

Expand Down Expand Up @@ -120,21 +126,49 @@ def jsonb_array_identity(list: List[Any]) -> List[Any]:
return list


@udf(input_types="STRUCT<JSONB[], INT>", result_type="STRUCT<JSONB[], INT>")
@udf(
input_types="STRUCT<v: JSONB[], len: INT>",
result_type="STRUCT<v: JSONB[], len: INT>",
)
def jsonb_array_struct_identity(v: Tuple[List[Any], int]) -> Tuple[List[Any], int]:
return v


ALL_TYPES = "BOOLEAN,SMALLINT,INT,BIGINT,FLOAT4,FLOAT8,DECIMAL,DATE,TIME,TIMESTAMP,INTERVAL,VARCHAR,BYTEA,JSONB".split(
","
) + [
"STRUCT<INT,INT>"
]


@udf(
input_types=ALL_TYPES,
result_type=f"struct<{','.join(ALL_TYPES)}>",
input_types=[
"boolean",
"int16",
"int32",
"int64",
"float32",
"float64",
"decimal",
"date32",
"time64",
"timestamp",
"interval",
"string",
"binary",
"json",
"struct<f1:int, f2:int>",
],
result_type="""struct<
boolean: boolean,
int16: int16,
int32: int32,
int64: int64,
float32: float32,
float64: float64,
decimal: decimal,
date32: date32,
time64: time64,
timestamp: timestamp,
interval: interval,
string: string,
binary: binary,
json: json,
struct: struct<f1:int, f2:int>,
>""",
)
def return_all(
bool,
Expand All @@ -153,28 +187,60 @@ def return_all(
jsonb,
struct,
):
return (
bool,
i16,
i32,
i64,
f32,
f64,
decimal,
date,
time,
timestamp,
interval,
varchar,
bytea,
jsonb,
struct,
)
return {
"boolean": bool,
"int16": i16,
"int32": i32,
"int64": i64,
"float32": f32,
"float64": f64,
"decimal": decimal,
"date32": date,
"time64": time,
"timestamp": timestamp,
"interval": interval,
"string": varchar,
"binary": bytea,
"json": jsonb,
"struct": struct,
}


@udf(
input_types=[t + "[]" for t in ALL_TYPES],
result_type=f"struct<{','.join(t + '[]' for t in ALL_TYPES)}>",
input_types=[
"boolean[]",
"int16[]",
"int32[]",
"int64[]",
"float32[]",
"float64[]",
"decimal[]",
"date32[]",
"time64[]",
"timestamp[]",
"interval[]",
"string[]",
"binary[]",
"json[]",
"struct<f1:int, f2:int>[]",
],
result_type="""struct<
boolean: boolean[],
int16: int16[],
int32: int32[],
int64: int64[],
float32: float32[],
float64: float64[],
decimal: decimal[],
date32: date32[],
time64: time64[],
timestamp: timestamp[],
interval: interval[],
string: string[],
binary: binary[],
json: json[],
struct: struct<f1:int, f2:int>[],
>""",
)
def return_all_arrays(
bool,
Expand All @@ -193,23 +259,23 @@ def return_all_arrays(
jsonb,
struct,
):
return (
bool,
i16,
i32,
i64,
f32,
f64,
decimal,
date,
time,
timestamp,
interval,
varchar,
bytea,
jsonb,
struct,
)
return {
"boolean": bool,
"int16": i16,
"int32": i32,
"int64": i64,
"float32": f32,
"float64": f64,
"decimal": decimal,
"date32": date,
"time64": time,
"timestamp": timestamp,
"interval": interval,
"string": varchar,
"binary": bytea,
"json": jsonb,
"struct": struct,
}


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/udf/wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
crate-type = ["cdylib"]

[dependencies]
arrow-udf = "0.2"
arrow-udf = "0.3"
genawaiter = "0.99"
rust_decimal = "1"
serde_json = "1"
6 changes: 0 additions & 6 deletions java/dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,3 @@ Config with the following. It may work.
"java.format.settings.profile": "Android"
}
```

## Deploy UDF Library to Maven

```sh
mvn clean deploy --pl udf --am
```
Loading

0 comments on commit 228b9e8

Please sign in to comment.