-
Notifications
You must be signed in to change notification settings - Fork 596
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into feat-sink-kafka-avro-registry
- Loading branch information
Showing
44 changed files
with
986 additions
and
232 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,4 +74,7 @@ simulation-it-test.tar.zst | |
# hummock-trace | ||
.trace | ||
|
||
# spark binary | ||
e2e_test/iceberg/spark-*-bin* | ||
|
||
**/poetry.lock |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
#!/usr/bin/env bash | ||
|
||
# Exits as soon as any line fails. | ||
set -euo pipefail | ||
|
||
source ci/scripts/common.sh | ||
|
||
# prepare environment | ||
export CONNECTOR_RPC_ENDPOINT="localhost:50051" | ||
export CONNECTOR_LIBS_PATH="./connector-node/libs" | ||
|
||
while getopts 'p:' opt; do | ||
case ${opt} in | ||
p ) | ||
profile=$OPTARG | ||
;; | ||
\? ) | ||
echo "Invalid Option: -$OPTARG" 1>&2 | ||
exit 1 | ||
;; | ||
: ) | ||
echo "Invalid option: $OPTARG requires an argument" 1>&2 | ||
;; | ||
esac | ||
done | ||
shift $((OPTIND -1)) | ||
|
||
download_and_prepare_rw "$profile" source | ||
|
||
echo "--- Download connector node package" | ||
buildkite-agent artifact download risingwave-connector.tar.gz ./ | ||
mkdir ./connector-node | ||
tar xf ./risingwave-connector.tar.gz -C ./connector-node | ||
|
||
echo "--- e2e, ci-1cn-1fe, iceberg cdc" | ||
|
||
node_port=50051 | ||
node_timeout=10 | ||
|
||
wait_for_connector_node_start() { | ||
start_time=$(date +%s) | ||
while : | ||
do | ||
if nc -z localhost $node_port; then | ||
echo "Port $node_port is listened! Connector Node is up!" | ||
break | ||
fi | ||
|
||
current_time=$(date +%s) | ||
elapsed_time=$((current_time - start_time)) | ||
if [ $elapsed_time -ge $node_timeout ]; then | ||
echo "Timeout waiting for port $node_port to be listened!" | ||
exit 1 | ||
fi | ||
sleep 0.1 | ||
done | ||
sleep 2 | ||
} | ||
|
||
echo "--- starting risingwave cluster with connector node" | ||
|
||
RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \ | ||
cargo make ci-start ci-1cn-1fe-with-recovery | ||
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 & | ||
echo "waiting for connector node to start" | ||
wait_for_connector_node_start | ||
|
||
# prepare minio iceberg sink | ||
echo "--- preparing iceberg" | ||
.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/icebergdata | ||
|
||
cd e2e_test/iceberg | ||
bash ./start_spark_connect_server.sh | ||
|
||
# Don't remove the `--quiet` option since poetry has a bug when printing output, see | ||
# https://github.com/python-poetry/poetry/issues/3412 | ||
"$HOME"/.local/bin/poetry update --quiet | ||
|
||
# 1. import data to mysql | ||
mysql --host=mysql --port=3306 -u root -p123456 < ./test_case/cdc/mysql_cdc.sql | ||
|
||
# 2. create table and sink | ||
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/cdc/no_partition_cdc_init.toml | ||
|
||
# 3. insert new data to mysql | ||
mysql --host=mysql --port=3306 -u root -p123456 < ./test_case/cdc/mysql_cdc_insert.sql | ||
|
||
sleep 20 | ||
|
||
# 4. check change | ||
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/cdc/no_partition_cdc.toml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# CDC source basic test | ||
|
||
# enable cdc backfill in ci | ||
statement ok | ||
set cdc_backfill='true'; | ||
|
||
statement ok | ||
create table products ( id INT, | ||
name STRING, | ||
description STRING, | ||
PRIMARY KEY (id) | ||
) with ( | ||
connector = 'mysql-cdc', | ||
hostname = 'mysql', | ||
port = '3306', | ||
username = 'root', | ||
password = '123456', | ||
database.name = 'my@db', | ||
table.name = 'products', | ||
server.id = '5085' | ||
); | ||
|
||
|
||
statement ok | ||
CREATE SINK s1 AS select * from products WITH ( | ||
connector = 'iceberg', | ||
type = 'upsert', | ||
force_append_only = 'false', | ||
database.name = 'demo', | ||
table.name = 'demo_db.demo_table', | ||
catalog.type = 'storage', | ||
warehouse.path = 's3://icebergdata/demo', | ||
s3.endpoint = 'http://127.0.0.1:9301', | ||
s3.region = 'us-east-1', | ||
s3.access.key = 'hummockadmin', | ||
s3.secret.key = 'hummockadmin', | ||
primary_key = 'id' | ||
); | ||
|
||
query I | ||
select count(*) from products; | ||
---- | ||
8 | ||
|
||
statement ok | ||
flush; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
DROP DATABASE IF EXISTS `my@db`; | ||
CREATE DATABASE `my@db`; | ||
|
||
USE `my@db`; | ||
|
||
CREATE TABLE products ( | ||
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, | ||
name VARCHAR(255) NOT NULL, | ||
description VARCHAR(512) | ||
); | ||
|
||
ALTER TABLE products AUTO_INCREMENT = 101; | ||
|
||
INSERT INTO products VALUES (default,"101","101"), | ||
(default,"102","102"), | ||
(default,"103","103"), | ||
(default,"104","104"), | ||
(default,"105","105"), | ||
(default,"106","106"), | ||
(default,"107","107"), | ||
(default,"108","108") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
USE `my@db`; | ||
|
||
INSERT INTO products VALUES (default,"109","109"), | ||
(default,"110","110"), | ||
(default,"111","111"), | ||
(default,"112","112"), | ||
(default,"113","113"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
init_sqls = [] | ||
|
||
slt = '' | ||
|
||
verify_schema = ['int','string','string'] | ||
|
||
verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' | ||
|
||
verify_data = """ | ||
101,101,101 | ||
102,102,102 | ||
103,103,103 | ||
104,104,104 | ||
105,105,105 | ||
106,106,106 | ||
107,107,107 | ||
108,108,108 | ||
109,109,109 | ||
110,110,110 | ||
111,111,111 | ||
112,112,112 | ||
113,113,113 | ||
""" | ||
|
||
drop_sqls = [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
init_sqls = [ | ||
'CREATE SCHEMA IF NOT EXISTS demo_db', | ||
'DROP TABLE IF EXISTS demo_db.demo_table', | ||
''' | ||
CREATE TABLE demo_db.demo_table ( | ||
id int, | ||
name string, | ||
description string | ||
) USING iceberg | ||
TBLPROPERTIES ('format-version'='2'); | ||
''' | ||
] | ||
|
||
slt = 'test_case/cdc/load.slt' | ||
|
||
verify_schema = ['int','string','string'] | ||
|
||
verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' | ||
|
||
verify_data = """ | ||
101,101,101 | ||
102,102,102 | ||
103,103,103 | ||
104,104,104 | ||
105,105,105 | ||
106,106,106 | ||
107,107,107 | ||
108,108,108 | ||
""" | ||
|
||
drop_sqls = [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# Iceberg CDC Integration Tests | ||
`mysql -> rw -> iceberg` | ||
|
||
# How to run | ||
./run_test.sh |
Oops, something went wrong.