Skip to content

Commit

Permalink
add iceberg cdc test
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Oct 19, 2023
1 parent 495ab39 commit 8a04f19
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,7 @@ simulation-it-test.tar.zst
# hummock-trace
.trace

# spark binary
e2e_test/iceberg/spark-*-bin*

**/poetry.lock
91 changes: 91 additions & 0 deletions ci/scripts/e2e-iceberg-cdc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.sh

# prepare environment
export CONNECTOR_RPC_ENDPOINT="localhost:50051"
export CONNECTOR_LIBS_PATH="./connector-node/libs"

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

download_and_prepare_rw "$profile" source

echo "--- Download connector node package"
buildkite-agent artifact download risingwave-connector.tar.gz ./
mkdir ./connector-node
tar xf ./risingwave-connector.tar.gz -C ./connector-node

echo "--- e2e, ci-1cn-1fe, iceberg cdc"

node_port=50051
node_timeout=10

wait_for_connector_node_start() {
start_time=$(date +%s)
while :
do
if nc -z localhost $node_port; then
echo "Port $node_port is listened! Connector Node is up!"
break
fi

current_time=$(date +%s)
elapsed_time=$((current_time - start_time))
if [ $elapsed_time -ge $node_timeout ]; then
echo "Timeout waiting for port $node_port to be listened!"
exit 1
fi
sleep 0.1
done
sleep 2
}

echo "--- starting risingwave cluster with connector node"

RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
cargo make ci-start ci-1cn-1fe-with-recovery
./connector-node/start-service.sh -p $node_port > .risingwave/log/connector-node.log 2>&1 &
echo "waiting for connector node to start"
wait_for_connector_node_start

# prepare minio iceberg sink
echo "--- preparing iceberg"
.risingwave/bin/mcli -C .risingwave/config/mcli mb hummock-minio/icebergdata

cd e2e_test/iceberg
bash ./start_spark_connect_server.sh

# Don't remove the `--quiet` option since poetry has a bug when printing output, see
# https://github.com/python-poetry/poetry/issues/3412
"$HOME"/.local/bin/poetry update --quiet

# 1. import data to mysql
mysql --host=mysql --port=3306 -u root -p123456 < ./test_case/cdc/mysql_cdc.sql

# 2. create table and sink
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/cdc/no_partition_cdc_init.toml

# 3. insert new data to mysql
mysql --host=mysql --port=3306 -u root -p123456 < ./test_case/cdc/mysql_cdc_insert.sql

sleep 20

# 4. check change
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/cdc/no_partition_cdc.toml
15 changes: 15 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,21 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end iceberg cdc test"
if: build.pull_request.labels includes "ci/run-e2e-iceberg-sink-tests"
command: "ci/scripts/e2e-iceberg-cdc.sh -p ci-dev"
depends_on:
- "build"
- "build-other"
plugins:
- docker-compose#v4.9.0:
run: sink-test-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end pulsar sink test"
if: build.pull_request.labels includes "ci/run-e2e-pulsar-sink-tests"
command: "ci/scripts/e2e-pulsar-sink-test.sh -p ci-dev"
Expand Down
8 changes: 5 additions & 3 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,16 @@ def init_iceberg_table(args,init_sqls):
spark.sql(sql)


def init_risingwave_mv(args,slt):
def execute_slt(args,slt):
if slt is None or slt == "":
return
rw_config = args['risingwave']
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}"
print(f"Command line is [{cmd}]")
subprocess.run(cmd,
shell=True,
check=True)
time.sleep(10)
time.sleep(30)


def verify_result(args,verify_sql,verify_schema,verify_data):
Expand Down Expand Up @@ -110,6 +112,6 @@ def drop_table(args,drop_sqls):
print({section: dict(config[section]) for section in config.sections()})

init_iceberg_table(config,init_sqls)
init_risingwave_mv(config,slt)
execute_slt(config,slt)
verify_result(config,verify_sql,verify_schema,verify_data)
drop_table(config,drop_sqls)
46 changes: 46 additions & 0 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# CDC source basic test

# enable cdc backfill in ci
statement ok
set cdc_backfill='true';

statement ok
create table products ( id INT,
name STRING,
description STRING,
PRIMARY KEY (id)
) with (
connector = 'mysql-cdc',
hostname = 'mysql',
port = '3306',
username = 'root',
password = '123456',
database.name = 'my@db',
table.name = 'products',
server.id = '5085'
);


statement ok
CREATE SINK s1 AS select * from products WITH (
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'id'
);

query I
select count(*) from products;
----
8

statement ok
flush;
21 changes: 21 additions & 0 deletions e2e_test/iceberg/test_case/cdc/mysql_cdc.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
DROP DATABASE IF EXISTS `my@db`;
CREATE DATABASE `my@db`;

USE `my@db`;

CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);

ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products VALUES (default,"101","101"),
(default,"102","102"),
(default,"103","103"),
(default,"104","104"),
(default,"105","105"),
(default,"106","106"),
(default,"107","107"),
(default,"108","108")
7 changes: 7 additions & 0 deletions e2e_test/iceberg/test_case/cdc/mysql_cdc_insert.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
USE `my@db`;

INSERT INTO products VALUES (default,"109","109"),
(default,"110","110"),
(default,"111","111"),
(default,"112","112"),
(default,"113","113");
25 changes: 25 additions & 0 deletions e2e_test/iceberg/test_case/cdc/no_partition_cdc.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
init_sqls = []

slt = ''

verify_schema = ['int','string','string']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC'

verify_data = """
101,101,101
102,102,102
103,103,103
104,104,104
105,105,105
106,106,106
107,107,107
108,108,108
109,109,109
110,110,110
111,111,111
112,112,112
113,113,113
"""

drop_sqls = []
31 changes: 31 additions & 0 deletions e2e_test/iceberg/test_case/cdc/no_partition_cdc_init.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.demo_table (
id int,
name string,
description string
) USING iceberg
TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/cdc/load.slt'

verify_schema = ['int','string','string']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC'

verify_data = """
101,101,101
102,102,102
103,103,103
104,104,104
105,105,105
106,106,106
107,107,107
108,108,108
"""

drop_sqls = []

0 comments on commit 8a04f19

Please sign in to comment.