Skip to content

Commit

Permalink
test(integration-test): enhance postgres-cdc test case (#9982)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored May 24, 2023
1 parent 7f3bedf commit b707303
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 24 deletions.
10 changes: 9 additions & 1 deletion ci/scripts/integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,15 @@ echo "--- run Demos"
python3 run_demos.py --case ${case} --format ${format}

echo "--- check if the ingestion is successful"
python3 check_data.py ${case}
# extract the type of upstream source,e.g. mysql,postgres,etc
upstream=$(echo ${case} | cut -d'-' -f 1)
if [ "${upstream}" == "mysql" ]; then
echo "install mysql"
sudo yum install -y mysql
fi

export PGPASSWORD=123456
python3 check_data.py ${case} ${upstream}

echo "--- clean Demos"
python3 clean_demos.py --case ${case}
1 change: 1 addition & 0 deletions integration_tests/mysql-cdc/cdc_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lineitem_rw
2 changes: 1 addition & 1 deletion integration_tests/mysql-cdc/create_mv.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE MATERIALIZED VIEW lineitem_count AS
CREATE MATERIALIZED VIEW lineitem_rw_count AS
SELECT
COUNT(*) as cnt
FROM
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/mysql-cdc/data_check
Original file line number Diff line number Diff line change
@@ -1 +1 @@
lineitem_count
lineitem_rw_count
6 changes: 3 additions & 3 deletions integration_tests/mysql-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ services:
extends:
file: ../../docker/docker-compose.yml
service: connector-node
datagen:
datagen_tpch:
image: ghcr.io/risingwavelabs/go-tpc:v0.1
depends_on:
- mysql
command: tpch prepare --sf 1 -H mysql -U root -p '123456' -D mydb -P 3306
container_name: datagen
command: tpch prepare --sf 1 --threads 4 -H mysql -U root -p '123456' -D mydb -P 3306
container_name: datagen_tpch
restart: on-failure
volumes:
compute-node-0:
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/mysql-cdc/query.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
SELECT
*
FROM
lineitem_count
lineitem_rw_count
LIMIT
10;
1 change: 1 addition & 0 deletions integration_tests/postgres-cdc/cdc_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
orders_rw
9 changes: 8 additions & 1 deletion integration_tests/postgres-cdc/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ FROM
GROUP BY
city;

-- Join on a Kafka stream and a CDC table
CREATE MATERIALIZED VIEW nexmark_q8 AS
SELECT
P.id,
Expand All @@ -25,4 +26,10 @@ FROM
seller,
window_start,
window_end
) A ON P.id = A.seller;
) A ON P.id = A.seller;

CREATE MATERIALIZED VIEW orders_rw_count AS
SELECT
COUNT(*) as cnt
FROM
orders_rw;
25 changes: 24 additions & 1 deletion integration_tests/postgres-cdc/create_source.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,27 @@ SELECT
seller,
category
FROM
t_auction;
t_auction;

CREATE TABLE orders_rw (
O_ORDERKEY BIGINT,
O_CUSTKEY BIGINT,
O_ORDERSTATUS VARCHAR,
O_TOTALPRICE DECIMAL,
O_ORDERDATE DATE,
O_ORDERPRIORITY VARCHAR,
O_CLERK VARCHAR,
O_SHIPPRIORITY BIGINT,
O_COMMENT VARCHAR,
PRIMARY KEY (O_ORDERKEY)
) WITH (
connector = 'postgres-cdc',
hostname = 'postgres',
port = '5432',
username = 'myuser',
password = '123456',
database.name = 'mydb',
schema.name = 'public',
table.name = 'orders',
slot.name = 'orders'
);
12 changes: 9 additions & 3 deletions integration_tests/postgres-cdc/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ services:
# Use this command to connect to the DB from outside the container:
# docker exec postgres psql --username=myuser --dbname=mydb
postgres:
image: postgres
image: postgres:14-alpine
environment:
- POSTGRES_USER=myuser
- POSTGRES_PASSWORD=123456
Expand Down Expand Up @@ -67,15 +67,21 @@ services:
- "./postgres_prepare.sql:/postgres_prepare.sql"
container_name: postgres_prepare
restart: on-failure
datagen:
datagen_tpch:
image: ghcr.io/risingwavelabs/go-tpc:v0.1
depends_on: [postgres]
command: tpch prepare --sf 1 --threads 4 -d postgres -U myuser -p '123456' -H postgres -D mydb -P 5432 --conn-params sslmode=disable
container_name: datagen_tpch
restart: on-failure
datagen_kafka:
build: ../datagen
depends_on: [message_queue]
command:
- /bin/sh
- -c
- /datagen --mode nexmark --qps 2 kafka --brokers message_queue:29092
restart: always
container_name: datagen
container_name: datagen_kafka
message_queue:
extends:
file: ../../docker/docker-compose.yml
Expand Down
11 changes: 10 additions & 1 deletion integration_tests/postgres-cdc/query.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
SELECT
*
FROM
person
LIMIT
10;

SELECT
*
FROM
city_population
LIMIT
10;
10;

SELECT * FROM orders_rw_count LIMIT 10;
53 changes: 43 additions & 10 deletions integration_tests/scripts/check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,67 @@
def create_mv(rel: str):
if "_mv" in rel:
raise Exception('relation "{}" must not contains "_mv"'.format(rel))
run_sql("CREATE MATERIALIZED VIEW {0}_mv AS SELECT * FROM {0}".format(rel))
run_psql("CREATE MATERIALIZED VIEW {0}_mv AS SELECT * FROM {0}".format(rel))


def check_mv(rel: str):
rows = run_sql("SELECT COUNT(*) FROM {}_mv".format(rel))
rows = run_psql("SELECT COUNT(*) FROM {}_mv".format(rel))
rows = int(rows.decode('utf8').strip())
print("{} rows in {}".format(rows, rel))
assert rows >= 1


def run_sql(sql):
print("Running SQL: {}".format(sql))
# Check the number of rows of cdc table
def check_cdc_table(rel: str):
print("Wait for all upstream data to be available in RisingWave")
mv_count_sql = "SELECT * FROM {}_count".format(rel)
mv_rows = 0
rows = run_psql(mv_count_sql)
rows = int(rows.decode('utf8').strip())
while rows > mv_rows:
print("Current row count: {}".format(rows))
mv_rows = rows
time.sleep(60)
rows = run_psql(mv_count_sql)
rows = int(rows.decode('utf8').strip())

# don't know why if query upstream with `mysql` or `psql` command it will get stuck,
# so just check the count approximately. maybe due to bad cpu and disk of the spot instance
print("All upstream data (roughly) has been loaded into RisingWave: {}".format(mv_rows))
assert mv_rows >= 200000


def run_psql(sql):
print("Running SQL: {} on RisingWave".format(sql))
return subprocess.check_output(["psql", "-h", "localhost", "-p", "4566",
"-d", "dev", "-U", "root", "--tuples-only", "-c", sql])


demo = sys.argv[1]
if demo in ['docker', 'iceberg-sink'] :
print('Skip for running test for `%s`'%demo)
upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh
if demo in ['docker', 'iceberg-sink']:
print('Skip for running test for `%s`' % demo)
sys.exit(0)

file_dir = dirname(abspath(__file__))
project_dir = dirname(file_dir)
demo_dir = os.path.join(project_dir, demo)
data_check = os.path.join(demo_dir, 'data_check')
with open(data_check) as f:
relations = f.read().split(",")
data_check_file = os.path.join(demo_dir, 'data_check')
with open(data_check_file) as f:
relations = f.read().strip().split(",")
for rel in relations:
create_mv(rel)
time.sleep(20)
time.sleep(20)
for rel in relations:
check_mv(rel)

cdc_check_file = os.path.join(demo_dir, 'cdc_check')
if not os.path.exists(cdc_check_file):
print("Skip cdc check for {}".format(demo))
sys.exit(0)

with open(cdc_check_file) as f:
print("Check cdc table with upstream {}".format(upstream))
relations = f.read().strip().split(",")
for rel in relations:
check_cdc_table(rel)
2 changes: 1 addition & 1 deletion integration_tests/scripts/run_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def run_sql_file(f: str, dir: str):
print("Running SQL file: {}".format(f))
print("Running SQL file: {} on RisingWave".format(f))
# ON_ERROR_STOP=1 will let psql return error code when the query fails.
# https://stackoverflow.com/questions/37072245/check-return-status-of-psql-command-in-unix-shell-scripting
proc = subprocess.run(["psql", "-h", "localhost", "-p", "4566",
Expand Down

0 comments on commit b707303

Please sign in to comment.