Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(udf): switch to the latest arrow-udf versions #16619

Merged
merged 32 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ee1a12b
refactor array arrow conversion
wangrunji0408 Apr 30, 2024
c37dfc7
fix usages
wangrunji0408 Apr 30, 2024
7ef80fe
fix unit test
wangrunji0408 May 6, 2024
c218388
fix clippy
wangrunji0408 May 6, 2024
f44db24
add missing data type
wangrunji0408 May 6, 2024
5341ee4
fix unit test
wangrunji0408 May 6, 2024
3c6e87a
fix udtf
wangrunji0408 May 7, 2024
9ff4625
remove java udf sdk
wangrunji0408 Apr 29, 2024
19b036c
move java udf example to e2e_test
wangrunji0408 Apr 29, 2024
2526b40
remove python udf sdk
wangrunji0408 Apr 29, 2024
8ea1b1b
remove risingwave_udf crate
wangrunji0408 Apr 29, 2024
a94ee09
migrate to arrow-udf-flight
wangrunji0408 Apr 29, 2024
d8805eb
Merge branch 'wrj/arrow' into wrj/udf
wangrunji0408 May 7, 2024
b1f3cf6
introduce NewUdfArrowConvert
wangrunji0408 May 7, 2024
2abf399
Merge branch 'wrj/arrow' into wrj/udf
wangrunji0408 May 7, 2024
2b7e3c3
fix arrow conversion for the new UDF protocol
wangrunji0408 May 7, 2024
7c8bce9
update python udf test
wangrunji0408 May 7, 2024
b66a4f6
use arrow-udf-flight v0.1
wangrunji0408 May 7, 2024
61667fc
revert dns resolution
wangrunji0408 May 7, 2024
d94df06
fix clippy
wangrunji0408 May 7, 2024
ec49c16
Merge commit '0422eab57aa522daf0e7a9618a9a51c35924b8b2' into wrj/arrow
wangrunji0408 May 7, 2024
3e17bcd
Merge branch 'wrj/arrow' into wrj/udf
wangrunji0408 May 7, 2024
c7bbe2f
remove python udf sdk unit test from ci
wangrunji0408 May 8, 2024
5a786ad
fix udf test in ci
wangrunji0408 May 8, 2024
f73fb82
use java sdk from maven repository
wangrunji0408 May 8, 2024
7f46c76
Merge remote-tracking branch 'origin/main' into wrj/udf
wangrunji0408 May 8, 2024
f543906
fix e2e test
wangrunji0408 May 9, 2024
e38e2b8
fix decimal output
wangrunji0408 May 9, 2024
9b46f05
add link for backward compatibility
wangrunji0408 May 9, 2024
bdc6770
fix: handle error column
wangrunji0408 May 10, 2024
2970441
update arrow-udf-js-deno to fix decimal output
wangrunji0408 May 10, 2024
a35ac1c
fix error ui slt
wangrunji0408 May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 152 additions & 165 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ members = [
"src/expr/core",
"src/expr/impl",
"src/expr/macro",
"src/expr/udf",
"src/frontend",
"src/frontend/macro",
"src/frontend/planner_test",
Expand Down Expand Up @@ -139,10 +138,11 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = "0.1"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "23fe0dd" }
arrow-udf-wasm = { version = "0.2.1", features = ["build"] }
arrow-udf-python = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "6c32f71" }
arrow-udf-js = "0.2"
arrow-udf-js-deno = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "fa36365" }
arrow-udf-wasm = { version = "0.2.2", features = ["build"] }
arrow-udf-python = "0.1"
arrow-udf-flight = "0.1"
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
Expand Down
10 changes: 7 additions & 3 deletions ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ cd java
mvn -B package -Dmaven.test.skip=true
mvn -B install -Dmaven.test.skip=true --pl java-binding-integration-test --am
mvn dependency:copy-dependencies --no-transfer-progress --pl java-binding-integration-test
mvn -B test --pl udf
cd ..

echo "--- Build Java UDF"
cd e2e_test/udf/java
mvn -B package
cd ../../..

echo "--- Build rust binary for java binding integration test"
cargo build -p risingwave_java_binding --bin data-chunk-payload-generator --bin data-chunk-payload-convert-generator

Expand All @@ -30,9 +34,9 @@ tar --zstd -cf java-binding-integration-test.tar.zst bin java/java-binding-integ

echo "--- Upload Java artifacts"
cp java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz ./risingwave-connector.tar.gz
cp java/udf-example/target/risingwave-udf-example.jar ./risingwave-udf-example.jar
cp e2e_test/udf/java/target/risingwave-udf-example.jar ./udf.jar
cp e2e_test/udf/wasm/target/wasm32-wasi/release/udf.wasm udf.wasm
buildkite-agent artifact upload ./risingwave-connector.tar.gz
buildkite-agent artifact upload ./risingwave-udf-example.jar
buildkite-agent artifact upload ./java-binding-integration-test.tar.zst
buildkite-agent artifact upload ./udf.jar
buildkite-agent artifact upload ./udf.wasm
5 changes: 3 additions & 2 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ download-and-decompress-artifact e2e_test_generated ./
download-and-decompress-artifact risingwave_e2e_extended_mode_test-"$profile" target/debug/
mkdir -p e2e_test/udf/wasm/target/wasm32-wasi/release/
buildkite-agent artifact download udf.wasm e2e_test/udf/wasm/target/wasm32-wasi/release/
buildkite-agent artifact download risingwave-udf-example.jar ./
buildkite-agent artifact download udf.jar ./
mv target/debug/risingwave_e2e_extended_mode_test-"$profile" target/debug/risingwave_e2e_extended_mode_test

chmod +x ./target/debug/risingwave_e2e_extended_mode_test
Expand Down Expand Up @@ -105,6 +105,7 @@ echo "--- e2e, $mode, Apache Superset"
sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile}"

echo "--- e2e, $mode, external python udf"
python3 -m pip install --break-system-packages arrow-udf==0.2.1
python3 e2e_test/udf/test.py &
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
Expand All @@ -117,7 +118,7 @@ sqllogictest -p 4566 -d dev './e2e_test/udf/always_retry_python.slt'
# sqllogictest -p 4566 -d dev './e2e_test/udf/retry_python.slt'

echo "--- e2e, $mode, external java udf"
java -jar risingwave-udf-example.jar &
java -jar udf.jar &
sleep 1
sqllogictest -p 4566 -d dev './e2e_test/udf/external_udf.slt'
pkill java
Expand Down
5 changes: 0 additions & 5 deletions ci/scripts/run-unit-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@ set -euo pipefail

REPO_ROOT=${PWD}

echo "+++ Run python UDF SDK unit tests"
cd "${REPO_ROOT}"/src/expr/udf/python
python3 -m pytest
cd "${REPO_ROOT}"

echo "+++ Run unit tests"
# use tee to disable progress bar
NEXTEST_PROFILE=ci cargo nextest run --features failpoints,sync_point --workspace --exclude risingwave_simulation
6 changes: 4 additions & 2 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ create function int_42() returns int as int_42 using link '555.0.0.1:8815';
----
db error: ERROR: Failed to run the query

Caused by:
Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address
Caused by these errors (recent errors listed first):
1: Expr error
2: UDF error
3: Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address


statement error
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/udf/external_udf.slt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Before running this test:
# python3 e2e_test/udf/test.py
# or:
# cd java/udf-example && mvn package && java -jar target/risingwave-udf-example.jar
# cd e2e_test/udf/java && mvn package && java -jar target/risingwave-udf-example.jar

# Create a function.
statement ok
Expand Down
File renamed without changes.
12 changes: 2 additions & 10 deletions java/udf-example/pom.xml → e2e_test/udf/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<!-- remove this section if you want to build the project independently -->
<parent>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-java-root</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<groupId>com.risingwave</groupId>
<artifactId>risingwave-udf-example</artifactId>
<version>0.1.1-SNAPSHOT</version>
<version>0.1.0-SNAPSHOT</version>

<name>udf-example</name>
<url>https://docs.risingwave.com/docs/current/udf-java</url>
Expand All @@ -31,7 +23,7 @@
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>risingwave-udf</artifactId>
<version>0.1.3-SNAPSHOT</version>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

public class UdfExample {
public static void main(String[] args) throws IOException {
try (var server = new UdfServer("0.0.0.0", 8815)) {
try (var server = new UdfServer("localhost", 8815)) {
server.addFunction("int_42", new Int42());
server.addFunction("float_to_decimal", new FloatToDecimal());
server.addFunction("sleep", new Sleep());
Expand Down
3 changes: 2 additions & 1 deletion e2e_test/udf/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
flask
waitress
waitress
arrow_udf==0.2.1
170 changes: 118 additions & 52 deletions e2e_test/udf/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
from typing import Iterator, List, Optional, Tuple, Any
from decimal import Decimal

sys.path.append("src/expr/udf/python") # noqa

from risingwave.udf import udf, udtf, UdfServer
from arrow_udf import udf, udtf, UdfServer


@udf(input_types=[], result_type="INT")
Expand All @@ -47,13 +45,21 @@ def gcd3(x: int, y: int, z: int) -> int:
return gcd(gcd(x, y), z)


@udf(input_types=["BYTEA"], result_type="STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>")
@udf(
input_types=["BYTEA"],
result_type="STRUCT<src_addr: VARCHAR, dst_addr: VARCHAR, src_port: SMALLINT, dst_port: SMALLINT>",
)
def extract_tcp_info(tcp_packet: bytes):
src_addr, dst_addr = struct.unpack("!4s4s", tcp_packet[12:20])
src_port, dst_port = struct.unpack("!HH", tcp_packet[20:24])
src_addr = socket.inet_ntoa(src_addr)
dst_addr = socket.inet_ntoa(dst_addr)
return src_addr, dst_addr, src_port, dst_port
return {
"src_addr": src_addr,
"dst_addr": dst_addr,
"src_port": src_port,
"dst_port": dst_port,
}


@udtf(input_types="INT", result_types="INT")
Expand Down Expand Up @@ -84,7 +90,7 @@ def hex_to_dec(hex: Optional[str]) -> Optional[Decimal]:
return dec


@udf(input_types=["FLOAT8"], result_type="DECIMAL")
@udf(input_types=["FLOAT64"], result_type="DECIMAL")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this considered a breaking change that should be notable for existing users?

Copy link
Contributor Author

@wangrunji0408 wangrunji0408 May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is a breaking change, but only for the new version (arrow-udf v0.2). Existing users will not be affected unless they try to upgrade to the new version.

def float_to_decimal(f: float) -> Decimal:
return Decimal(f)

Expand Down Expand Up @@ -120,21 +126,49 @@ def jsonb_array_identity(list: List[Any]) -> List[Any]:
return list


@udf(input_types="STRUCT<JSONB[], INT>", result_type="STRUCT<JSONB[], INT>")
@udf(
input_types="STRUCT<v: JSONB[], len: INT>",
result_type="STRUCT<v: JSONB[], len: INT>",
)
def jsonb_array_struct_identity(v: Tuple[List[Any], int]) -> Tuple[List[Any], int]:
return v


ALL_TYPES = "BOOLEAN,SMALLINT,INT,BIGINT,FLOAT4,FLOAT8,DECIMAL,DATE,TIME,TIMESTAMP,INTERVAL,VARCHAR,BYTEA,JSONB".split(
","
) + [
"STRUCT<INT,INT>"
]


@udf(
input_types=ALL_TYPES,
result_type=f"struct<{','.join(ALL_TYPES)}>",
input_types=[
"boolean",
"int16",
"int32",
"int64",
"float32",
"float64",
"decimal",
"date32",
"time64",
"timestamp",
"interval",
"string",
"binary",
"json",
"struct<f1:int, f2:int>",
],
result_type="""struct<
boolean: boolean,
int16: int16,
int32: int32,
int64: int64,
float32: float32,
float64: float64,
decimal: decimal,
date32: date32,
time64: time64,
timestamp: timestamp,
interval: interval,
string: string,
binary: binary,
json: json,
struct: struct<f1:int, f2:int>,
>""",
)
def return_all(
bool,
Expand All @@ -153,28 +187,60 @@ def return_all(
jsonb,
struct,
):
return (
bool,
i16,
i32,
i64,
f32,
f64,
decimal,
date,
time,
timestamp,
interval,
varchar,
bytea,
jsonb,
struct,
)
return {
"boolean": bool,
"int16": i16,
"int32": i32,
"int64": i64,
"float32": f32,
"float64": f64,
"decimal": decimal,
"date32": date,
"time64": time,
"timestamp": timestamp,
"interval": interval,
"string": varchar,
"binary": bytea,
"json": jsonb,
"struct": struct,
}


@udf(
input_types=[t + "[]" for t in ALL_TYPES],
result_type=f"struct<{','.join(t + '[]' for t in ALL_TYPES)}>",
input_types=[
"boolean[]",
"int16[]",
"int32[]",
"int64[]",
"float32[]",
"float64[]",
"decimal[]",
"date32[]",
"time64[]",
"timestamp[]",
"interval[]",
"string[]",
"binary[]",
"json[]",
"struct<f1:int, f2:int>[]",
],
result_type="""struct<
boolean: boolean[],
int16: int16[],
int32: int32[],
int64: int64[],
float32: float32[],
float64: float64[],
decimal: decimal[],
date32: date32[],
time64: time64[],
timestamp: timestamp[],
interval: interval[],
string: string[],
binary: binary[],
json: json[],
struct: struct<f1:int, f2:int>[],
>""",
)
def return_all_arrays(
bool,
Expand All @@ -193,23 +259,23 @@ def return_all_arrays(
jsonb,
struct,
):
return (
bool,
i16,
i32,
i64,
f32,
f64,
decimal,
date,
time,
timestamp,
interval,
varchar,
bytea,
jsonb,
struct,
)
return {
"boolean": bool,
"int16": i16,
"int32": i32,
"int64": i64,
"float32": f32,
"float64": f64,
"decimal": decimal,
"date32": date,
"time64": time,
"timestamp": timestamp,
"interval": interval,
"string": varchar,
"binary": bytea,
"json": jsonb,
"struct": struct,
}


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/udf/wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
crate-type = ["cdylib"]

[dependencies]
arrow-udf = "0.2"
arrow-udf = "0.3"
genawaiter = "0.99"
rust_decimal = "1"
serde_json = "1"
6 changes: 0 additions & 6 deletions java/dev.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,3 @@ Config with the following. It may work.
"java.format.settings.profile": "Android"
}
```

## Deploy UDF Library to Maven

```sh
mvn clean deploy --pl udf --am
```
Loading
Loading