Skip to content

Commit

Permalink
feat: support iceberg sink upsert mode (#12576)
Browse files Browse the repository at this point in the history
Co-authored-by: ZENOTME <[email protected]>
  • Loading branch information
ZENOTME and ZENOTME authored Sep 28, 2023
1 parent 6f099ae commit f869d8b
Show file tree
Hide file tree
Showing 16 changed files with 751 additions and 157 deletions.
58 changes: 54 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ hashbrown = { version = "0.14.0", features = [
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.3.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.3.1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "166a36b1a40a64086db09a0e0f2ed6791cec548b" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "72a65aed6ed7b3d529b311031c2c8d99650990e2" }
arrow-array = "46"
arrow-cast = "46"
arrow-schema = "46"
arrow-buffer = "46"
arrow-flight = "46"
arrow-select = "46"
arrow-ord = "46"
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
"profiling",
"stats",
Expand Down
3 changes: 2 additions & 1 deletion ci/scripts/e2e-iceberg-sink-v2-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ bash ./start_spark_connect_server.sh
# Don't remove the `--quiet` option since poetry has a bug when printing output, see
# https://github.com/python-poetry/poetry/issues/3412
"$HOME"/.local/bin/poetry update --quiet
"$HOME"/.local/bin/poetry run python main.py
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_append_only.toml
"$HOME"/.local/bin/poetry run python main.py -t ./test_case/no_partition_upsert.toml


echo "--- Kill cluster"
Expand Down
3 changes: 0 additions & 3 deletions e2e_test/iceberg/config.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
[default]
result = data.csv

[spark]
url=sc://localhost:15002

Expand Down
5 changes: 0 additions & 5 deletions e2e_test/iceberg/data.csv

This file was deleted.

114 changes: 62 additions & 52 deletions e2e_test/iceberg/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from pyspark.sql import SparkSession
import argparse
import configparser
import subprocess
import csv
import unittest
import time
import tomli as toml
from datetime import date
from datetime import datetime
from datetime import timezone
Expand All @@ -23,25 +25,6 @@ def strtots(v):

g_spark = None

init_table_sqls = [
"CREATE SCHEMA IF NOT EXISTS demo_db",
"DROP TABLE IF EXISTS demo_db.demo_table",
"""
CREATE TABLE demo_db.demo_table (
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
) TBLPROPERTIES ('format-version'='2');
""",
]


def get_spark(args):
spark_config = args['spark']
Expand All @@ -52,54 +35,81 @@ def get_spark(args):
return g_spark


def init_iceberg_table(args):
def init_iceberg_table(args,init_sqls):
spark = get_spark(args)
for sql in init_table_sqls:
for sql in init_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def init_risingwave_mv(args):
def init_risingwave_mv(args,slt):
rw_config = args['risingwave']
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} iceberg_sink_v2.slt"
cmd = f"sqllogictest -p {rw_config['port']} -d {rw_config['db']} {slt}"
print(f"Command line is [{cmd}]")
subprocess.run(cmd,
shell=True,
check=True)
time.sleep(60)
time.sleep(10)


def verify_result(args):
sql = "SELECT * FROM demo_db.demo_table ORDER BY id ASC"
def verify_result(args,verify_sql,verify_schema,verify_data):
tc = unittest.TestCase()
print(f"Executing sql: {sql}")
print(f"Executing sql: {verify_sql}")
spark = get_spark(args)
df = spark.sql(sql).collect()
df = spark.sql(verify_sql).collect()
for row in df:
print(row)

with open(args['default']['result'], newline='') as csv_file:
csv_result = list(csv.reader(csv_file))
for (row1, row2) in zip(df, csv_result):
print(f"Row1: {row1}, row 2: {row2}")
tc.assertEqual(row1[0], int(row2[0]))
tc.assertEqual(row1[1], int(row2[1]))
tc.assertEqual(row1[2], int(row2[2]))
tc.assertEqual(round(row1[3], 5), round(float(row2[3]), 5))
tc.assertEqual(round(row1[4], 5), round(float(row2[4]), 5))
tc.assertEqual(row1[5], row2[5])
tc.assertEqual(row1[6], strtobool(row2[6]))
tc.assertEqual(row1[7], strtodate(row2[7]))
tc.assertEqual(row1[8].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[8]))
tc.assertEqual(row1[9], datetime.fromisoformat(row2[9]))

tc.assertEqual(len(df), len(csv_result))

rows = verify_data.splitlines()
tc.assertEqual(len(df), len(rows))
for (row1, row2) in zip(df, rows):
print(f"Row1: {row1}, Row 2: {row2}")
row2 = row2.split(',')
for idx, ty in enumerate(verify_schema):
if ty == "int" or ty == "long":
tc.assertEqual(row1[idx], int(row2[idx]))
elif ty == "float" or ty == "double":
tc.assertEqual(round(row1[idx], 5), round(float(row2[idx]), 5))
elif ty == "boolean":
tc.assertEqual(row1[idx], strtobool(row2[idx]))
elif ty == "date":
tc.assertEqual(row1[idx], strtodate(row2[idx]))
elif ty == "timestamp":
tc.assertEqual(row1[idx].astimezone(timezone.utc).replace(tzinfo=None), strtots(row2[idx]))
elif ty == "timestamp_ntz":
tc.assertEqual(row1[idx], datetime.fromisoformat(row2[idx]))
elif ty == "string":
tc.assertEqual(row1[idx], row2[idx])
else:
tc.fail(f"Unsupported type {ty}")

def drop_table(args,drop_sqls):
spark = get_spark(args)
for sql in drop_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)

if __name__ == "__main__":
config = configparser.ConfigParser()
config.read("config.ini")
print({section: dict(config[section]) for section in config.sections()})
init_iceberg_table(config)
init_risingwave_mv(config)
verify_result(config)
parser = argparse.ArgumentParser(description="Test script for iceberg")
parser.add_argument("-t", dest="test_case", type=str, help="Test case file")
with open(parser.parse_args().test_case,"rb") as test_case:
test_case = toml.load(test_case)
# Extract content from testcase
init_sqls = test_case['init_sqls']
print(f"init_sqls:{init_sqls}")
slt = test_case['slt']
print(f"slt:{slt}")
verify_schema = test_case['verify_schema']
print(f"verify_schema:{verify_schema}")
verify_sql = test_case['verify_sql']
print(f"verify_sql:{verify_sql}")
verify_data = test_case['verify_data']
drop_sqls = test_case['drop_sqls']

config = configparser.ConfigParser()
config.read("config.ini")
print({section: dict(config[section]) for section in config.sections()})

init_iceberg_table(config,init_sqls)
init_risingwave_mv(config,slt)
verify_result(config,verify_sql,verify_schema,verify_data)
drop_table(config,drop_sqls)
2 changes: 1 addition & 1 deletion e2e_test/iceberg/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = ["risingwavelabs"]
[tool.poetry.dependencies]
python = "^3.10"
pyspark = { version = "3.4.1", extras = ["sql", "connect"] }

tomli = "2.0"


[build-system]
Expand Down
File renamed without changes.
45 changes: 45 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_sink_upsert.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
statement ok
set streaming_parallelism=4;

statement ok
CREATE TABLE t6 (id int, v1 int primary key, v2 bigint, v3 varchar);

statement ok
CREATE MATERIALIZED VIEW mv6 AS SELECT * FROM t6;

statement ok
CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
primary_key = 'v1'
);

statement ok
INSERT INTO t6 VALUES (1, 1, 2, '1-2'), (1, 2, 2, '2-2'), (1, 3, 2, '3-2'), (1, 5, 2, '5-2'), (1, 8, 2, '8-2'), (1, 13, 2, '13-2'), (1, 21, 2, '21-2');

statement ok
FLUSH;

statement ok
INSERT INTO t6 VALUES (1, 1, 50, '1-50');

statement ok
FLUSH;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;

statement ok
DROP TABLE t6;
38 changes: 38 additions & 0 deletions e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
init_sqls = [
'CREATE SCHEMA IF NOT EXISTS demo_db',
'DROP TABLE IF EXISTS demo_db.demo_table',
'''
CREATE TABLE demo_db.demo_table (
id long,
v_int int,
v_long long,
v_float float,
v_double double,
v_varchar string,
v_bool boolean,
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
) TBLPROPERTIES ('format-version'='2');
'''
]

slt = 'test_case/iceberg_sink_append_only.slt'

verify_schema = ['long', 'int', 'long', 'float', 'double', 'string', 'boolean', 'date', 'timestamp', 'timestamp_ntz']

verify_sql = 'SELECT * FROM demo_db.demo_table ORDER BY id ASC'


verify_data = """
1,1,1000,1.1,1.11,1-1,true,2022-03-11,2022-03-11 01:00:00+00:00,2022-03-11 01:00:00
2,2,2000,2.2,2.22,2-2,false,2022-03-12,2022-03-12 02:00:00+00:00,2022-03-12 02:00:00
3,3,3000,3.3,3.33,3-3,true,2022-03-13,2022-03-13 03:00:00+00:00,2022-03-13 03:00:00
4,4,4000,4.4,4.44,4-4,false,2022-03-14,2022-03-14 04:00:00+00:00,2022-03-14 04:00:00
5,5,5000,5.5,5.55,5-5,true,2022-03-15,2022-03-15 05:00:00+00:00,2022-03-15 05:00:00
"""

drop_sqls = [
'DROP TABLE IF EXISTS demo_db.demo_table',
'DROP SCHEMA IF EXISTS demo_db'
]
Loading

0 comments on commit f869d8b

Please sign in to comment.