Skip to content

Commit

Permalink
test(sink): add deltalake-sink compatible test (#16448)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored Apr 23, 2024
1 parent cad5d4b commit e113445
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 22 deletions.
13 changes: 12 additions & 1 deletion integration_tests/deltalake-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,15 @@ with (
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.endpoint = 'http://minio-0:9301'
);
);

create sink data_types_sink from data_types
with (
connector = 'deltalake',
type = 'append-only',
force_append_only='true',
location = 's3a://deltalake/data_types',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.endpoint = 'http://minio-0:9301'
);
34 changes: 33 additions & 1 deletion integration_tests/deltalake-sink/create_source.sql

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration_tests/deltalake-sink/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ volumes:
external: false
prometheus-0:
external: false
name: risingwave-compose
name: risingwave-compose
2 changes: 1 addition & 1 deletion integration_tests/deltalake-sink/prepare.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set -euo pipefail

# build minio dir and create table
docker compose exec minio-0 mkdir /data/deltalake
docker compose exec spark bash /spark-script/run-sql-file.sh create-table
docker compose exec spark bash /spark-script/run-sql-file.sh create-table
35 changes: 20 additions & 15 deletions integration_tests/deltalake-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
import subprocess
from time import sleep

sleep(60)
def run_query(file):
with open(query_output_file_name, 'w') as f:
subprocess.run(
["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", file],
check=True, stdout=f)



query_sql = open("spark-script/query-table.sql").read()

print("querying deltalake with sql: %s" % query_sql)

query_output_file_name = "query_output.txt"

query_output_file = open(query_output_file_name, "wb")

subprocess.run(
["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", "query-table"],
check=True, stdout=query_output_file)
query_output_file.close()

run_query('query-table')
with open(query_output_file_name, 'r') as file:
all_lines = file.readlines()
last_three_lines = all_lines[-3:]

last_three_lines = all_lines[-3:]

print("result", last_three_lines)
print("result", last_three_lines)
line1, line2, line3 = last_three_lines
assert line1.strip() == '1\ta'
assert line2.strip() == '2\tb'
assert line3.strip() == '3\tc'

line1, line2, line3 = last_three_lines

assert line1.strip() == '1\ta'
assert line2.strip() == '2\tb'
assert line3.strip() == '3\tc'
run_query('data-types-query')
with open(query_output_file_name, 'r') as f:
all_lines = f.readlines()
last_line = all_lines[-1]
print("rows of data_types: ", last_line)
assert last_line.strip() == '3'
27 changes: 26 additions & 1 deletion integration_tests/deltalake-sink/spark-script/create-table.sql
Original file line number Diff line number Diff line change
@@ -1 +1,26 @@
create table delta.`s3a://deltalake/delta`(id int, name string) using delta
create table delta.`s3a://deltalake/delta`(id int, name string) using delta;

create table delta.`s3a://deltalake/data_types`(
types_id INT,
c_boolean boolean,
c_smallint short,
c_integer integer,
c_bigint long,
c_decimal decimal(28),
c_real float,
c_double_precision double,
c_varchar string,
c_date date,
c_timestamptz timestamp,
c_boolean_array ARRAY<boolean>,
c_smallint_array ARRAY<short>,
c_integer_array ARRAY<integer>,
c_bigint_array ARRAY<long>,
c_decimal_array ARRAY<decimal(28)>,
c_real_array ARRAY<float>,
c_double_precision_array ARRAY<double>,
c_varchar_array ARRAY<string>,
c_date_array ARRAY<date>,
c_timestamptz_array ARRAY<timestamp>,
c_struct STRUCT<s_int:INTEGER, s_varchar:string>
) using delta;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT count(*) from delta.`s3a://deltalake/data_types`;
Original file line number Diff line number Diff line change
@@ -1 +1 @@
SELECT * from delta.`s3a://deltalake/delta` order by id;
SELECT * from delta.`s3a://deltalake/delta` order by id;
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ set -ex
--conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.endpoint=http://minio-0:9301' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
-f /spark-script/$1.sql
-f /spark-script/$1.sql

0 comments on commit e113445

Please sign in to comment.