Skip to content

Commit

Permalink
doc(delta-lake): add demo for delta lake sink (#11087)
Browse files Browse the repository at this point in the history
Co-authored-by: Xinhao Xu <[email protected]>
Co-authored-by: xxhZs <[email protected]>
  • Loading branch information
3 people authored Dec 26, 2023
1 parent 070bd0c commit 5a312a2
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 0 deletions.
1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'nats': ['json'],
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
}

def gen_pipeline_steps():
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"vector-json": ["tao"],
"doris-sink": ["xinhao"],
"starrocks-sink": ["xinhao"],
"deltalake-sink": ["xinhao"],
}

def get_failed_tests(get_test_status, test_map):
Expand Down
25 changes: 25 additions & 0 deletions integration_tests/deltalake-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Demo: Sinking to Delta Lake

In this demo, we will create an append-only source via our datagen source,
and sink the data generated from source to the downstream delta lake table
stored on minio.

1. Launch the cluster via docker compose
```
docker compose up -d
```

2. Create a delta lake table on minio
```
docker compose exec minio-0 mkdir /data/deltalake
docker compose exec spark bash /spark-script/run-sql-file.sh create-table
```

3. Execute the SQL queries in sequence:
- create_source.sql
- create_sink.sql

4. Query delta lake table. The following command will query the total count of records.
```
docker compose exec spark bash /spark-script/run-sql-file.sh query-table
```
10 changes: 10 additions & 0 deletions integration_tests/deltalake-sink/create_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
create sink delta_lake_sink from source
with (
connector = 'deltalake',
type = 'append-only',
force_append_only='true',
location = 's3a://deltalake/delta',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.endpoint = 'http://minio-0:9301'
);
3 changes: 3 additions & 0 deletions integration_tests/deltalake-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE table source (id int, name varchar);

INSERT into source values (1, 'a'), (2, 'b'), (3, 'c');
43 changes: 43 additions & 0 deletions integration_tests/deltalake-sink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
version: "3"
services:
spark:
image: apache/spark:3.3.1
command: tail -f /dev/null
depends_on:
- minio-0
volumes:
- "./spark-script:/spark-script"
container_name: spark
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
name: risingwave-compose
7 changes: 7 additions & 0 deletions integration_tests/deltalake-sink/prepare.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

# build minio dir and create table
docker compose exec minio-0 mkdir /data/deltalake
docker compose exec spark bash /spark-script/run-sql-file.sh create-table
30 changes: 30 additions & 0 deletions integration_tests/deltalake-sink/sink_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import subprocess
from time import sleep

sleep(60)

query_sql = open("spark-script/query-table.sql").read()

print("querying deltalake with sql: %s" % query_sql)

query_output_file_name = "query_output.txt"

query_output_file = open(query_output_file_name, "wb")

subprocess.run(
["docker", "compose", "exec", "spark", "bash", "/spark-script/run-sql-file.sh", "query-table"],
check=True, stdout=query_output_file)
query_output_file.close()

with open(query_output_file_name, 'r') as file:
all_lines = file.readlines()

last_three_lines = all_lines[-3:]

print("result", last_three_lines)

line1, line2, line3 = last_three_lines

assert line1.strip() == '1\ta'
assert line2.strip() == '2\tb'
assert line3.strip() == '3\tc'
3 changes: 3 additions & 0 deletions integration_tests/deltalake-sink/spark-script/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
derby.log
metastore_db
.ivy2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table delta.`s3a://deltalake/delta`(id int, name string) using delta
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT * from delta.`s3a://deltalake/delta` order by id;
11 changes: 11 additions & 0 deletions integration_tests/deltalake-sink/spark-script/run-sql-file.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set -ex

/opt/spark/bin/spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.secret.key=hummockadmin' \
--conf 'spark.hadoop.fs.s3a.endpoint=http://minio-0:9301' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
-f /spark-script/$1.sql

0 comments on commit 5a312a2

Please sign in to comment.