From 5a312a206def916902ec901e106da0293db3dc30 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 26 Dec 2023 19:22:43 +0800 Subject: [PATCH] doc(delta-lake): add demo for delta lake sink (#11087) Co-authored-by: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Co-authored-by: xxhZs <1060434431@qq.com> --- ci/scripts/gen-integration-test-yaml.py | 1 + ci/scripts/notify.py | 1 + integration_tests/deltalake-sink/README.md | 25 +++++++++++ .../deltalake-sink/create_sink.sql | 10 +++++ .../deltalake-sink/create_source.sql | 3 ++ .../deltalake-sink/docker-compose.yml | 43 +++++++++++++++++++ integration_tests/deltalake-sink/prepare.sh | 7 +++ .../deltalake-sink/sink_check.py | 30 +++++++++++++ .../deltalake-sink/spark-script/.gitignore | 3 ++ .../spark-script/create-table.sql | 1 + .../spark-script/query-table.sql | 1 + .../spark-script/run-sql-file.sh | 11 +++++ 12 files changed, 136 insertions(+) create mode 100644 integration_tests/deltalake-sink/README.md create mode 100644 integration_tests/deltalake-sink/create_sink.sql create mode 100644 integration_tests/deltalake-sink/create_source.sql create mode 100644 integration_tests/deltalake-sink/docker-compose.yml create mode 100644 integration_tests/deltalake-sink/prepare.sh create mode 100644 integration_tests/deltalake-sink/sink_check.py create mode 100644 integration_tests/deltalake-sink/spark-script/.gitignore create mode 100644 integration_tests/deltalake-sink/spark-script/create-table.sql create mode 100644 integration_tests/deltalake-sink/spark-script/query-table.sql create mode 100644 integration_tests/deltalake-sink/spark-script/run-sql-file.sh diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 9ca065f856e89..13022a06d5ebe 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -35,6 +35,7 @@ 'nats': ['json'], 'doris-sink': ['json'], 'starrocks-sink': ['json'], + 'deltalake-sink': ['json'], } def gen_pipeline_steps(): diff --git a/ci/scripts/notify.py b/ci/scripts/notify.py index 1e8e8580e9082..2f68733b26022 100755 --- a/ci/scripts/notify.py +++ b/ci/scripts/notify.py @@ -58,6 +58,7 @@ "vector-json": ["tao"], "doris-sink": ["xinhao"], "starrocks-sink": ["xinhao"], + "deltalake-sink": ["xinhao"], } def get_failed_tests(get_test_status, test_map): diff --git a/integration_tests/deltalake-sink/README.md b/integration_tests/deltalake-sink/README.md new file mode 100644 index 0000000000000..0dd34cab2ff70 --- /dev/null +++ b/integration_tests/deltalake-sink/README.md @@ -0,0 +1,25 @@ +# Demo: Sinking to Delta Lake + +In this demo, we will create an append-only source via our datagen source, +and sink the data generated from source to the downstream delta lake table +stored on minio. + +1. Launch the cluster via docker compose +``` +docker compose up -d +``` + +2. Create a delta lake table on minio +``` +docker compose exec minio-0 mkdir /data/deltalake +docker compose exec spark bash /spark-script/run-sql-file.sh create-table +``` + +3. Execute the SQL queries in sequence: + - create_source.sql + - create_sink.sql + +4. Query delta lake table. The following command will query the total count of records. +``` +docker compose exec spark bash /spark-script/run-sql-file.sh query-table +``` \ No newline at end of file diff --git a/integration_tests/deltalake-sink/create_sink.sql b/integration_tests/deltalake-sink/create_sink.sql new file mode 100644 index 0000000000000..c7dab6ef5dd9e --- /dev/null +++ b/integration_tests/deltalake-sink/create_sink.sql @@ -0,0 +1,10 @@ +create sink delta_lake_sink from source +with ( + connector = 'deltalake', + type = 'append-only', + force_append_only='true', + location = 's3a://deltalake/delta', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + s3.endpoint = 'http://minio-0:9301' +); \ No newline at end of file diff --git a/integration_tests/deltalake-sink/create_source.sql b/integration_tests/deltalake-sink/create_source.sql new file mode 100644 index 0000000000000..d78b460512b8b --- /dev/null +++ b/integration_tests/deltalake-sink/create_source.sql @@ -0,0 +1,3 @@ +CREATE table source (id int, name varchar); + +INSERT into source values (1, 'a'), (2, 'b'), (3, 'c'); \ No newline at end of file diff --git a/integration_tests/deltalake-sink/docker-compose.yml b/integration_tests/deltalake-sink/docker-compose.yml new file mode 100644 index 0000000000000..8e9a20533f25e --- /dev/null +++ b/integration_tests/deltalake-sink/docker-compose.yml @@ -0,0 +1,43 @@ +--- +version: "3" +services: + spark: + image: apache/spark:3.3.1 + command: tail -f /dev/null + depends_on: + - minio-0 + volumes: + - "./spark-script:/spark-script" + container_name: spark + risingwave-standalone: + extends: + file: ../../docker/docker-compose.yml + service: risingwave-standalone + etcd-0: + extends: + file: ../../docker/docker-compose.yml + service: etcd-0 + grafana-0: + extends: + file: ../../docker/docker-compose.yml + service: grafana-0 + minio-0: + extends: + file: ../../docker/docker-compose.yml + service: minio-0 + prometheus-0: + extends: + file: ../../docker/docker-compose.yml + service: prometheus-0 +volumes: + compute-node-0: + external: false + etcd-0: + external: false + grafana-0: + external: false + minio-0: + external: false + prometheus-0: + external: false +name: risingwave-compose \ No newline at end of file diff --git a/integration_tests/deltalake-sink/prepare.sh b/integration_tests/deltalake-sink/prepare.sh new file mode 100644 index 0000000000000..419edbb9b794f --- /dev/null +++ b/integration_tests/deltalake-sink/prepare.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -euo pipefail + +# build minio dir and create table +docker compose exec minio-0 mkdir /data/deltalake +docker compose exec spark bash /spark-script/run-sql-file.sh create-table \ No newline at end of file diff --git a/integration_tests/deltalake-sink/sink_check.py b/integration_tests/deltalake-sink/sink_check.py new file mode 100644 index 0000000000000..b3796d2ca56ee --- /dev/null +++ b/integration_tests/deltalake-sink/sink_check.py @@ -0,0 +1,30 @@ +import subprocess +from time import sleep + +sleep(60) + +query_sql = open("spark-script/query-table.sql").read() + +print("querying deltalake with sql: %s" % query_sql) + +query_output_file_name = "query_output.txt" + +query_output_file = open(query_output_file_name, "wb") + +subprocess.run( + ["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", "query-table"], + check=True, stdout=query_output_file) +query_output_file.close() + +with open(query_output_file_name, 'r') as file: + all_lines = file.readlines() + +last_three_lines = all_lines[-3:] + +print("result", last_three_lines) + +line1, line2, line3 = last_three_lines + +assert line1.strip() == '1\ta' +assert line2.strip() == '2\tb' +assert line3.strip() == '3\tc' \ No newline at end of file diff --git a/integration_tests/deltalake-sink/spark-script/.gitignore b/integration_tests/deltalake-sink/spark-script/.gitignore new file mode 100644 index 0000000000000..2af1a65665298 --- /dev/null +++ b/integration_tests/deltalake-sink/spark-script/.gitignore @@ -0,0 +1,3 @@ +derby.log +metastore_db +.ivy2 \ No newline at end of file diff --git a/integration_tests/deltalake-sink/spark-script/create-table.sql b/integration_tests/deltalake-sink/spark-script/create-table.sql new file mode 100644 index 0000000000000..edd31d08f3ebc --- /dev/null +++ b/integration_tests/deltalake-sink/spark-script/create-table.sql @@ -0,0 +1 @@ +create table delta.`s3a://deltalake/delta`(id int, name string) using delta \ No newline at end of file diff --git a/integration_tests/deltalake-sink/spark-script/query-table.sql b/integration_tests/deltalake-sink/spark-script/query-table.sql new file mode 100644 index 0000000000000..bdd1dea729836 --- /dev/null +++ b/integration_tests/deltalake-sink/spark-script/query-table.sql @@ -0,0 +1 @@ +SELECT * from delta.`s3a://deltalake/delta` order by id; \ No newline at end of file diff --git a/integration_tests/deltalake-sink/spark-script/run-sql-file.sh b/integration_tests/deltalake-sink/spark-script/run-sql-file.sh new file mode 100644 index 0000000000000..58132bcfafa0a --- /dev/null +++ b/integration_tests/deltalake-sink/spark-script/run-sql-file.sh @@ -0,0 +1,11 @@ +set -ex + +/opt/spark/bin/spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\ + --conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \ + --conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \ + --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \ + --conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \ + --conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \ + --conf 'spark.hadoop.fs.s3a.endpoint=http://minio-0:9301' \ + --conf 'spark.hadoop.fs.s3a.path.style.access=true' \ + -f /spark-script/$1.sql \ No newline at end of file