Skip to content

Commit

Permalink
refine sink test for iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Sep 28, 2023
1 parent 3e94c08 commit 48a606d
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 62 deletions.
3 changes: 2 additions & 1 deletion ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ bash ./start_spark_connect_server.sh
# Don't remove the `--quiet` option since poetry has a bug when printing output, see
# https://github.com/python-poetry/poetry/issues/3412
"$HOME"/.local/bin/poetry update --quiet
"$HOME"/.local/bin/poetry run python main.py
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_append_only.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml


echo "--- Kill cluster"
Expand Down
3 changes: 0 additions & 3 deletions e2e_test/iceberg/config.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[default]
result = data.csv

[spark]
url=sc://localhost:15002

Expand Down
5 changes: 0 additions & 5 deletions e2e_test/iceberg/data.csv

This file was deleted.

114 changes: 62 additions & 52 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from pyspark.sql import SparkSession
import argparse
import configparser
import subprocess
import csv
import unittest
import time
import tomli as toml
from datetime import date
from datetime import datetime
from datetime import timezone
Expand All @@ -23,25 +25,6 @@ def strtots(v):

g_spark = None

init_table_sqls = [
"CREATE SCHEMA IF NOT EXISTS demo_db",
"DROP TABLE IF EXISTS demo_db.demo_table",
"""
CREATE TABLE demo_db.demo_table (
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
) TBLPROPERTIES ('format-version'='2');
""",
]


def get_spark(args):
spark_config = args['spark']
Expand All @@ -52,54 +35,81 @@ def get_spark(args):
return g_spark


def init_iceberg_table(args):
def init_iceberg_table(args,init_sqls):
spark = get_spark(args)
for sql in init_table_sqls:
for sql in init_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def init_risingwave_mv(args):
def init_risingwave_mv(args,slt):
rw_config = args['risingwave']
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} iceberg_sink_v2.slt"
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}"
print(f"Command line is [{cmd}]")
subprocess.run(cmd,
shell=True,
check=True)
time.sleep(60)
time.sleep(10)


def verify_result(args):
sql = "SELECT * FROM demo_db.demo_table ORDER BY id ASC"
def verify_result(args,verify_sql,verify_schema,verify_data):
tc = unittest.TestCase()
print(f"Executing sql: {sql}")
print(f"Executing sql: {verify_sql}")
spark = get_spark(args)
df = spark.sql(sql).collect()
df = spark.sql(verify_sql).collect()
for row in df:
print(row)

with open(args['default']['result'], newline='') as csv_file:
csv_result = list(csv.reader(csv_file))
for (row1, row2) in zip(df, csv_result):
print(f"Row1: {row1}, row 2: {row2}")
tc.assertEqual(row1[0], int(row2[0]))
tc.assertEqual(row1[1], int(row2[1]))
tc.assertEqual(row1[2], int(row2[2]))
tc.assertEqual(round(row1[3], 5), round(float(row2[3]), 5))
tc.assertEqual(round(row1[4], 5), round(float(row2[4]), 5))
tc.assertEqual(row1[5], row2[5])
tc.assertEqual(row1[6], strtobool(row2[6]))
tc.assertEqual(row1[7], strtodate(row2[7]))
tc.assertEqual(row1[8].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[8]))
tc.assertEqual(row1[9], datetime.fromisoformat(row2[9]))

tc.assertEqual(len(df), len(csv_result))

rows = verify_data.splitlines()
tc.assertEqual(len(df), len(rows))
for (row1, row2) in zip(df, rows):
print(f"Row1: {row1}, Row 2: {row2}")
row2 = row2.split(',')
for idx, ty in enumerate(verify_schema):
if ty == "int" or ty == "long":
tc.assertEqual(row1[idx], int(row2[idx]))
elif ty == "float" or ty == "double":
tc.assertEqual(round(row1[idx], 5), round(float(row2[idx]), 5))
elif ty == "boolean":
tc.assertEqual(row1[idx], strtobool(row2[idx]))
elif ty == "date":
tc.assertEqual(row1[idx], strtodate(row2[idx]))
elif ty == "timestamp":
tc.assertEqual(row1[idx].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[idx]))
elif ty == "timestamp_ntz":
tc.assertEqual(row1[idx], datetime.fromisoformat(row2[idx]))
elif ty == "string":
tc.assertEqual(row1[idx], row2[idx])
else:
tc.fail(f"Unsupported type {ty}")

def drop_table(args,drop_sqls):
spark = get_spark(args)
for sql in drop_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)

if __name__ == "__main__":
config = configparser.ConfigParser()
config.read("config.ini")
print({section: dict(config[section]) for section in config.sections()})
init_iceberg_table(config)
init_risingwave_mv(config)
verify_result(config)
parser = argparse.ArgumentParser(description="Test script for iceberg")
parser.add_argument("-t", dest="test_case", type=str, help="Test case file")
with open(parser.parse_args().test_case,"rb") as test_case:
test_case = toml.load(test_case)
# Extract content from testcase
init_sqls = test_case['init_sqls']
print(f"init_sqls:{init_sqls}")
slt = test_case['slt']
print(f"slt:{slt}")
verify_schema = test_case['verify_schema']
print(f"verify_schema:{verify_schema}")
verify_sql = test_case['verify_sql']
print(f"verify_sql:{verify_sql}")
verify_data = test_case['verify_data']
drop_sqls = test_case['drop_sqls']

config = configparser.ConfigParser()
config.read("config.ini")
print({section: dict(config[section]) for section in config.sections()})

init_iceberg_table(config,init_sqls)
init_risingwave_mv(config,slt)
verify_result(config,verify_sql,verify_schema,verify_data)
drop_table(config,drop_sqls)
2 changes: 1 addition & 1 deletion e2e_test/iceberg/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = ["risingwavelabs"]
[tool.poetry.dependencies]
python = "^3.10"
pyspark = { version = "3.4.1", extras = ["sql", "connect"] }

tomli = "2.0"


[build-system]
Expand Down
File renamed without changes.
45 changes: 45 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_sink_upsert.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE t6 (id int, v1 int primary key, v2 bigint, v3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1'
);

statement ok
INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1, 5, 2, '5-2'), (1, 8, 2, '8-2'), (1, 13, 2, '13-2'), (1, 21, 2, '21-2');

statement ok
FLUSH;

statement ok
INSERT INTO t6 VALUES (1, 1, 50, '1-50');

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;
38 changes: 38 additions & 0 deletions e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.demo_table (
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
) TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_append_only.slt'

verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC'


verify_data = """
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00
"""

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.demo_table',
'DROP SCHEMA IF EXISTS demo_db'
]
34 changes: 34 additions & 0 deletions e2e_test/iceberg/test_case/no_partition_upsert.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.demo_table (
id int,
v1 int,
v2 long,
v3 string
) USING iceberg
TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_upsert.slt'

verify_schema = ['int','int','long','string']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id, v1 ASC'

verify_data = """
1,1,50,1-50
1,2,2,2-2
1,3,2,3-2
1,5,2,5-2
1,8,2,8-2
1,13,2,13-2
1,21,2,21-2
"""

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.demo_table',
'DROP SCHEMA IF EXISTS demo_db'
]

0 comments on commit 48a606d

Please sign in to comment.