From 48a606d988c0942b68a379d2b68180ae4a974f04 Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 28 Sep 2023 08:33:20 +0800 Subject: [PATCH] refine sink test for iceberg --- ci/scripts/e2e-iceberg-sink-v2-test.sh | 3 +- e2e_test/iceberg/config.ini | 3 - e2e_test/iceberg/data.csv | 5 - e2e_test/iceberg/main.py | 114 ++++++++++-------- e2e_test/iceberg/pyproject.toml | 2 +- .../iceberg_sink_append_only.slt} | 0 .../iceberg/test_case/iceberg_sink_upsert.slt | 45 +++++++ .../test_case/no_partition_append_only.toml | 38 ++++++ .../test_case/no_partition_upsert.toml | 34 ++++++ 9 files changed, 182 insertions(+), 62 deletions(-) delete mode 100644 e2e_test/iceberg/data.csv rename e2e_test/iceberg/{iceberg_sink_v2.slt => test_case/iceberg_sink_append_only.slt} (100%) create mode 100644 e2e_test/iceberg/test_case/iceberg_sink_upsert.slt create mode 100644 e2e_test/iceberg/test_case/no_partition_append_only.toml create mode 100644 e2e_test/iceberg/test_case/no_partition_upsert.toml diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index 83c0d187d6b3b..c365e417319c2 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -38,7 +38,8 @@ bash ./start_spark_connect_server.sh # Don't remove the `--quiet` option since poetry has a bug when printing output, see # https://github.com/python-poetry/poetry/issues/3412 "$HOME"/.local/bin/poetry update --quiet -"$HOME"/.local/bin/poetry run python main.py +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_append_only.toml +"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/config.ini b/e2e_test/iceberg/config.ini index 6fa1ffbdc6832..bd95eddc5b80e 100644 --- a/e2e_test/iceberg/config.ini +++ b/e2e_test/iceberg/config.ini @@ -1,6 +1,3 @@ -[default] -result = data.csv - [spark] url=sc://localhost:15002 diff --git a/e2e_test/iceberg/data.csv b/e2e_test/iceberg/data.csv deleted file mode 100644 index 77ad8f16dbc9d..0000000000000 --- a/e2e_test/iceberg/data.csv +++ /dev/null @@ -1,5 +0,0 @@ -1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00 -2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00 -3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00 -4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00 -5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00 diff --git a/e2e_test/iceberg/main.py b/e2e_test/iceberg/main.py index 304962ef08ccc..fa07aa367a9b3 100644 --- a/e2e_test/iceberg/main.py +++ b/e2e_test/iceberg/main.py @@ -1,9 +1,11 @@ from pyspark.sql import SparkSession +import argparse import configparser import subprocess import csv import unittest import time +import tomli as toml from datetime import date from datetime import datetime from datetime import timezone @@ -23,25 +25,6 @@ def strtots(v): g_spark = None -init_table_sqls = [ - "CREATE SCHEMA IF NOT EXISTS demo_db", - "DROP TABLE IF EXISTS demo_db.demo_table", - """ - CREATE TABLE demo_db.demo_table ( - id long, - v_int int, - v_long long, - v_float float, - v_double double, - v_varchar string, - v_bool boolean, - v_date date, - v_timestamp timestamp, - v_ts_ntz timestamp_ntz - ) TBLPROPERTIES ('format-version'='2'); - """, -] - def get_spark(args): spark_config = args['spark'] @@ -52,54 +35,81 @@ def get_spark(args): return g_spark -def init_iceberg_table(args): +def init_iceberg_table(args,init_sqls): spark = get_spark(args) - for sql in init_table_sqls: + for sql in init_sqls: print(f"Executing sql: {sql}") spark.sql(sql) -def init_risingwave_mv(args): +def init_risingwave_mv(args,slt): rw_config = args['risingwave'] - cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} iceberg_sink_v2.slt" + cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}" print(f"Command line is [{cmd}]") subprocess.run(cmd, shell=True, check=True) - time.sleep(60) + time.sleep(10) -def verify_result(args): - sql = "SELECT * FROM demo_db.demo_table ORDER BY id ASC" +def verify_result(args,verify_sql,verify_schema,verify_data): tc = unittest.TestCase() - print(f"Executing sql: {sql}") + print(f"Executing sql: {verify_sql}") spark = get_spark(args) - df = spark.sql(sql).collect() + df = spark.sql(verify_sql).collect() for row in df: print(row) - - with open(args['default']['result'], newline='') as csv_file: - csv_result = list(csv.reader(csv_file)) - for (row1, row2) in zip(df, csv_result): - print(f"Row1: {row1}, row 2: {row2}") - tc.assertEqual(row1[0], int(row2[0])) - tc.assertEqual(row1[1], int(row2[1])) - tc.assertEqual(row1[2], int(row2[2])) - tc.assertEqual(round(row1[3], 5), round(float(row2[3]), 5)) - tc.assertEqual(round(row1[4], 5), round(float(row2[4]), 5)) - tc.assertEqual(row1[5], row2[5]) - tc.assertEqual(row1[6], strtobool(row2[6])) - tc.assertEqual(row1[7], strtodate(row2[7])) - tc.assertEqual(row1[8].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[8])) - tc.assertEqual(row1[9], datetime.fromisoformat(row2[9])) - - tc.assertEqual(len(df), len(csv_result)) - + rows = verify_data.splitlines() + tc.assertEqual(len(df), len(rows)) + for (row1, row2) in zip(df, rows): + print(f"Row1: {row1}, Row 2: {row2}") + row2 = row2.split(',') + for idx, ty in enumerate(verify_schema): + if ty == "int" or ty == "long": + tc.assertEqual(row1[idx], int(row2[idx])) + elif ty == "float" or ty == "double": + tc.assertEqual(round(row1[idx], 5), round(float(row2[idx]), 5)) + elif ty == "boolean": + tc.assertEqual(row1[idx], strtobool(row2[idx])) + elif ty == "date": + tc.assertEqual(row1[idx], strtodate(row2[idx])) + elif ty == "timestamp": + tc.assertEqual(row1[idx].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[idx])) + elif ty == "timestamp_ntz": + tc.assertEqual(row1[idx], datetime.fromisoformat(row2[idx])) + elif ty == "string": + tc.assertEqual(row1[idx], row2[idx]) + else: + tc.fail(f"Unsupported type {ty}") + +def drop_table(args,drop_sqls): + spark = get_spark(args) + for sql in drop_sqls: + print(f"Executing sql: {sql}") + spark.sql(sql) if __name__ == "__main__": - config = configparser.ConfigParser() - config.read("config.ini") - print({section: dict(config[section]) for section in config.sections()}) - init_iceberg_table(config) - init_risingwave_mv(config) - verify_result(config) + parser = argparse.ArgumentParser(description="Test script for iceberg") + parser.add_argument("-t", dest="test_case", type=str, help="Test case file") + with open(parser.parse_args().test_case,"rb") as test_case: + test_case = toml.load(test_case) + # Extract content from testcase + init_sqls = test_case['init_sqls'] + print(f"init_sqls:{init_sqls}") + slt = test_case['slt'] + print(f"slt:{slt}") + verify_schema = test_case['verify_schema'] + print(f"verify_schema:{verify_schema}") + verify_sql = test_case['verify_sql'] + print(f"verify_sql:{verify_sql}") + verify_data = test_case['verify_data'] + drop_sqls = test_case['drop_sqls'] + + config = configparser.ConfigParser() + config.read("config.ini") + print({section: dict(config[section]) for section in config.sections()}) + + init_iceberg_table(config,init_sqls) + init_risingwave_mv(config,slt) + verify_result(config,verify_sql,verify_schema,verify_data) + drop_table(config,drop_sqls) diff --git a/e2e_test/iceberg/pyproject.toml b/e2e_test/iceberg/pyproject.toml index 7b19ed7b044f5..d13be72277592 100644 --- a/e2e_test/iceberg/pyproject.toml +++ b/e2e_test/iceberg/pyproject.toml @@ -7,7 +7,7 @@ authors = ["risingwavelabs"] [tool.poetry.dependencies] python = "^3.10" pyspark = { version = "3.4.1", extras = ["sql", "connect"] } - +tomli = "2.0" [build-system] diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt similarity index 100% rename from e2e_test/iceberg/iceberg_sink_v2.slt rename to e2e_test/iceberg/test_case/iceberg_sink_append_only.slt diff --git a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt new file mode 100644 index 0000000000000..646a39cc08e28 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt @@ -0,0 +1,45 @@ +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE t6 (id int, v1 int primary key, v2 bigint, v3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6; + +statement ok +CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH ( + connector = 'iceberg', + type = 'upsert', + force_append_only = 'false', + database.name = 'demo', + table.name = 'demo_db.demo_table', + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + primary_key = 'v1' +); + +statement ok +INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1, 5, 2, '5-2'), (1, 8, 2, '8-2'), (1, 13, 2, '13-2'), (1, 21, 2, '21-2'); + +statement ok +FLUSH; + +statement ok +INSERT INTO t6 VALUES (1, 1, 50, '1-50'); + +statement ok +FLUSH; + +statement ok +DROP SINK s6; + +statement ok +DROP MATERIALIZED VIEW mv6; + +statement ok +DROP TABLE t6; diff --git a/e2e_test/iceberg/test_case/no_partition_append_only.toml b/e2e_test/iceberg/test_case/no_partition_append_only.toml new file mode 100644 index 0000000000000..211407644abec --- /dev/null +++ b/e2e_test/iceberg/test_case/no_partition_append_only.toml @@ -0,0 +1,38 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id long, + v_int int, + v_long long, + v_float float, + v_double double, + v_varchar string, + v_bool boolean, + v_date date, + v_timestamp timestamp, + v_ts_ntz timestamp_ntz + ) TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_append_only.slt' + +verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC' + + +verify_data = """ +1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00 +2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00 +3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00 +4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00 +5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +] diff --git a/e2e_test/iceberg/test_case/no_partition_upsert.toml b/e2e_test/iceberg/test_case/no_partition_upsert.toml new file mode 100644 index 0000000000000..0e0215d37465d --- /dev/null +++ b/e2e_test/iceberg/test_case/no_partition_upsert.toml @@ -0,0 +1,34 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.demo_table', + ''' + CREATE TABLE demo_db.demo_table ( + id int, + v1 int, + v2 long, + v3 string + ) USING iceberg + TBLPROPERTIES ('format-version'='2'); + ''' +] + +slt = 'test_case/iceberg_sink_upsert.slt' + +verify_schema = ['int','int','long','string'] + +verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC' + +verify_data = """ +1,1,50,1-50 +1,2,2,2-2 +1,3,2,3-2 +1,5,2,5-2 +1,8,2,8-2 +1,13,2,13-2 +1,21,2,21-2 +""" + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.demo_table', + 'DROP SCHEMA IF EXISTS demo_db' +]