Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support deltalake sink with rust sdk #13600

Merged
merged 35 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e6bc0ee
add delta lake sink
xxhZs Nov 21, 2023
013eede
fix
xxhZs Nov 21, 2023
f0047ba
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 23, 2023
2ff021f
fmt
xxhZs Nov 23, 2023
7b85ea2
save
xxhZs Nov 23, 2023
6fbbac3
fix cargo lock
xxhZs Nov 24, 2023
c6a709c
fmt
xxhZs Nov 28, 2023
1ddf1f9
fix
xxhZs Nov 28, 2023
cdecda5
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 28, 2023
f521372
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Nov 28, 2023
c890167
use separate arrow version for deltalake
wenym1 Dec 1, 2023
2a35a5e
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 1, 2023
0aa6785
fix timestamptz
xxhZs Dec 1, 2023
9b7b543
use mod path to avoid macro
wenym1 Dec 1, 2023
e460901
add license and comment
wenym1 Dec 1, 2023
13ff6ed
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 4, 2023
c187f72
update test
wenym1 Dec 5, 2023
2217f61
add comment
wenym1 Dec 5, 2023
4c503fa
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 5, 2023
1aaf516
use expect
wenym1 Dec 5, 2023
e1a3814
save
xxhZs Dec 5, 2023
76547e4
fix fmt
xxhZs Dec 7, 2023
b0b1b87
add ci
xxhZs Dec 7, 2023
50899c0
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Dec 7, 2023
10b1ba6
Empty commit
xxhZs Dec 7, 2023
5a492a6
fix ci
xxhZs Dec 7, 2023
435eb73
fix
xxhZs Dec 8, 2023
cf96d64
ut timeout 20 -> 22
xxhZs Dec 12, 2023
4a83b8f
fix region
xxhZs Dec 12, 2023
814dcc9
reduce compile time and binary size
wenym1 Dec 12, 2023
f9605ad
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
wenym1 Dec 12, 2023
899070a
temp add hakari third party
wenym1 Dec 13, 2023
86625ab
use new delta-rs commit
wenym1 Dec 13, 2023
5c365a9
Merge branch 'main' into 13152-reimplement-delta-lake-sink-with-rust-sdk
xxhZs Dec 14, 2023
a4023e5
fix ci
xxhZs Dec 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,315 changes: 1,177 additions & 138 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ arrow-flight = "49"
arrow-select = "49"
arrow-ord = "49"
arrow-row = "49"
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
arrow-schema-deltalake = { package = "arrow-schema", version = "48.0.1" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "72505449e9538371fe5fda35d545dbd662facd07", features = ["s3","datafusion"] }
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
parquet = "49"
thiserror-ext = "0.0.10"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
Expand Down
80 changes: 80 additions & 0 deletions ci/scripts/e2e-deltalake-sink-rust-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

# prepare environment
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- starting risingwave cluster"
mkdir -p .risingwave/log
cargo make ci-start ci-deltalake-test
sleep 1

# prepare minio deltalake sink
echo "--- preparing deltalake"
.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/deltalake
wget https://ci-deps-dist.s3.amazonaws.com/spark-3.3.1-bin-hadoop3.tgz
tar -xf spark-3.3.1-bin-hadoop3.tgz --no-same-owner
DEPENDENCIES=io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2
spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--S --e 'create table delta.`s3a://deltalake/deltalake-test`(v1 int, v2 short, v3 long, v4 float, v5 double, v6 string, v7 date, v8 Timestamp, v9 boolean) using delta;'


echo "--- testing sinks"
sqllogictest -p 4566 -d dev './e2e_test/sink/deltalake_rust_sink.slt'
sleep 1


spark-3.3.1-bin-hadoop3/bin/spark-sql --packages $DEPENDENCIES \
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.endpoint=http://localhost:9301' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--S --e 'INSERT OVERWRITE DIRECTORY "./spark-output" USING CSV SELECT * FROM delta.`s3a://deltalake/deltalake-test`;'

# check sink destination using shell
if cat ./spark-output/*.csv | sort | awk -F "," '{
exit !($1 == 1 && $2 == 1 && $3 == 1 && $4 == 1.1 && $5 == 1.2 && $6 == "test" && $7 == "2013-01-01" && $8 == "2013-01-01T01:01:01.000Z" && $9 == "false"); }'; then
echo "DeltaLake sink check passed"
else
cat ./spark-output/*.csv
echo "The output is not as expected."
exit 1
fi

echo "--- Kill cluster"
cargo make ci-kill
21 changes: 20 additions & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,26 @@ steps:
timeout_in_minutes: 14
retry: *auto-retry

- label: "end-to-end clickhouse sink test"
- label: "end-to-end deltalake sink test"
key: "e2e-deltalake-sink-rust-tests"
command: "ci/scripts/e2e-deltalake-sink-rust-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-deltalake-sink-rust-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-deltalake-sink-rust-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v4.9.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end clickhouse sink test"
key: "e2e-clickhouse-sink-tests"
command: "ci/scripts/e2e-clickhouse-sink-test.sh -p ci-release"
if: |
Expand Down
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,21 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end deltalake sink test"
if: build.pull_request.labels includes "ci/run- e2e-deltalake-sink-rust-tests" || build.env("CI_STEPS") =~ /(^|,) e2e-deltalake-sink-rust-tests?(,|$$)/
command: "ci/scripts/e2e-deltalake-sink-rust-test.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v4.9.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "e2e java-binding test"
if: build.pull_request.labels includes "ci/run-java-binding-tests" || build.env("CI_STEPS") =~ /(^|,)java-binding-tests?(,|$$)/
command: "ci/scripts/java-binding-test.sh -p ci-dev"
Expand Down
33 changes: 33 additions & 0 deletions e2e_test/sink/deltalake_rust_sink.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
CREATE TABLE t6 (v1 int primary key, v2 smallint, v3 bigint, v4 real, v5 float, v6 varchar, v7 date, v8 timestamptz, v9 boolean);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
create sink s6 as select * from mv6
with (
connector = 'deltalake_rust',
type = 'append-only',
force_append_only = 'true',
location = 's3a://deltalake/deltalake-test',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region='us-east-1'
xxhZs marked this conversation as resolved.
Show resolved Hide resolved
);

statement ok
INSERT INTO t6 VALUES (1, 1, 1, 1.1, 1.2, 'test', '2013-01-01', '2013-01-01 01:01:01+00:00' , false);

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;
10 changes: 10 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,16 @@ profile:
- use: frontend
- use: compactor

ci-deltalake-test:
config-path: src/config/ci.toml
steps:
- use: minio
- use: meta-node
- use: compute-node
enable-tiered-cache: true
- use: frontend
- use: compactor

ci-clickhouse-test:
config-path: src/config/ci.toml
steps:
Expand Down
4 changes: 4 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ normal = ["workspace-hack"]
anyhow = "1"
arc-swap = "1"
arrow-array = { workspace = true }
arrow-array-deltalake = { workspace = true }
arrow-buffer = { workspace = true }
arrow-buffer-deltalake = { workspace = true }
arrow-cast = { workspace = true }
arrow-cast-deltalake = { workspace = true }
arrow-schema = { workspace = true }
arrow-schema-deltalake = { workspace = true }
async-trait = "0.1"
auto_enums = "0.8"
auto_impl = "1"
Expand Down
26 changes: 26 additions & 0 deletions src/common/src/array/arrow/arrow_default.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This is for arrow dependency named `arrow-xxx` such as `arrow-array` in the cargo workspace.
//!
//! This should the default arrow version to be used in our system.
//!
//! The corresponding version of arrow is currently used by `udf` and `iceberg` sink.

pub use arrow_impl::to_record_batch_with_schema;
use {arrow_array, arrow_buffer, arrow_cast, arrow_schema};

#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;
28 changes: 28 additions & 0 deletions src/common/src/array/arrow/arrow_deltalake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! This is for arrow dependency named `arrow-xxx-deltalake` such as `arrow-array-deltalake`
//! in the cargo workspace.
//!
//! The corresponding version of arrow is currently used by `deltalake` sink.

pub use arrow_impl::to_record_batch_with_schema as to_deltalake_record_batch_with_schema;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
use {
arrow_array_deltalake as arrow_array, arrow_buffer_deltalake as arrow_buffer,
arrow_cast_deltalake as arrow_cast, arrow_schema_deltalake as arrow_schema,
};

#[expect(clippy::duplicate_mod)]
#[path = "./arrow_impl.rs"]
mod arrow_impl;
Loading
Loading