Skip to content

Commit

Permalink
test(batch): add iceberg source integration test (#15491)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 7, 2024
1 parent dbace2d commit f36430e
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 14 deletions.
1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
'postgres-sink': ['json'],
'iceberg-cdc': ['json'],
'iceberg-sink': ['none'],
'iceberg-source': ['none'],
'twitter': ['json', 'protobuf'],
'twitter-pulsar': ['json'],
'debezium-mysql': ['json'],
Expand Down
2 changes: 0 additions & 2 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def init_risingwave_mv(docker):
);
"""
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
Expand All @@ -127,7 +126,6 @@ def check_spark_table(docker):
result = spark.sql(sql).collect()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
init_spark_table(docker)
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/iceberg-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# How to run the test

Run following command to run the test:

```bash
cd python
poetry update
poetry run python main.py
```

# How to override risingwave image version:

```bash
export RW_IMAGE=<your version>
```
16 changes: 16 additions & 0 deletions integration_tests/iceberg-source/docker/storage/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[source]
connector = iceberg
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1
75 changes: 75 additions & 0 deletions integration_tests/iceberg-source/docker/storage/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
version: '3.8'

services:
spark:
depends_on:
- minio-0
image: ghcr.io/icelake-io/icelake-spark:0.1
environment:
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: ["/spark-script/spark-connect-server.sh"]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "
/bin/sh -c '
set -e
mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"
'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
119 changes: 119 additions & 0 deletions integration_tests/iceberg-source/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import argparse
import subprocess
from pyspark.sql import SparkSession
import configparser
import psycopg2
import time


def read_config(filename):
config = configparser.ConfigParser()
config.read(filename)
print({section: dict(config[section]) for section in config.sections()})
return config


class DockerCompose(object):
def __init__(self, case_name: str):
self.case_name = case_name

def case_dir(self):
return f"../docker/{self.case_name}"

def get_ip(self, container_name):
return subprocess.check_output([
"docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
container_name], cwd=self.case_dir()).decode("utf-8").rstrip()

def __enter__(self):
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=self.case_dir(), check=False)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=self.case_dir(),
capture_output=True,
check=True)


def init_spark_table(docker):
spark_ip = docker.get_ip(f"{docker.case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()

init_table_sqls = [
"CREATE SCHEMA IF NOT EXISTS s1",
"DROP TABLE IF EXISTS s1.t1",
"""
CREATE TABLE s1.t1
(
id bigint,
name string,
distance bigint
) USING iceberg
TBLPROPERTIES ('format-version'='2');
""",
"""INSERT INTO s1.t1 VALUES (1, 'test', 100);"""
]

for sql in init_table_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def init_risingwave_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

source_config = config['source']
source_param = ",\n".join([f"{k}='{v}'" for k, v in source_config.items()])

sqls = [
f"""
CREATE SOURCE iceberg_source
WITH (
{source_param}
);
"""
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
cursor.execute(sql)

def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
# execute sql and collect result
cursor.execute(sql)
result = cursor.fetchall()
assert result[0][0] == 1, f"Inserted result is unexpected: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
init_spark_table(docker)
init_risingwave_source(docker)
print("Let risingwave to run")
time.sleep(5)
check_risingwave_iceberg_source(docker)


if __name__ == "__main__":
case_names = ["storage"]
for case_name in case_names:
print(f"Running test case: {case_name}")
run_case(case_name)
16 changes: 16 additions & 0 deletions integration_tests/iceberg-source/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[tool.poetry]
name = "icelake-integration-tests"
version = "0.0.9"
description = ""
authors = ["Renjie Liu <[email protected]>"]
readme = "README.md"
packages = [{include = "icelake_integration_tests"}]

[tool.poetry.dependencies]
python = "^3.8"
pyspark = { version = "3.4.1", extras = ["sql", "connect"] }
psycopg2-binary = "^2.9"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
11 changes: 11 additions & 0 deletions integration_tests/iceberg-source/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euox pipefail

"$HOME"/.local/bin/poetry --version
cd python
# Don't remove the `--quiet` option since poetry has a bug when printing output, see
# https://github.com/python-poetry/poetry/issues/3412
"$HOME"/.local/bin/poetry update --quiet
"$HOME"/.local/bin/poetry run python main.py
2 changes: 1 addition & 1 deletion integration_tests/scripts/check_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def test_check(demo: str, upstream: str, need_data_check=True, need_sink_check=F

demo = sys.argv[1]
upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh
if demo in ['docker', 'iceberg-cdc', 'iceberg-sink']:
if demo in ['docker', 'iceberg-cdc', 'iceberg-sink', 'iceberg-source']:
print('Skip for running test for `%s`' % demo)
sys.exit(0)

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/scripts/clean_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def clean_demo(demo: str):
args = arg_parser.parse_args()
demo = args.case

if demo in ['iceberg-sink']:
if demo in ['iceberg-sink', 'iceberg-source']:
print('Skip for running test for `%s`' % demo)
sys.exit(0)

Expand Down
10 changes: 10 additions & 0 deletions integration_tests/scripts/run_demos.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def iceberg_sink_demo():
print("Running demo: iceberg-sink2")
subprocess.run(["bash", "./run.sh"], cwd=demo_dir, check=True)

def iceberg_source_demo():
demo = "iceberg-source"
file_dir = dirname(abspath(__file__))
project_dir = dirname(file_dir)
demo_dir = os.path.join(project_dir, demo)
print("Running demo: iceberg-source")
subprocess.run(["bash", "./run.sh"], cwd=demo_dir, check=True)


arg_parser = argparse.ArgumentParser(description="Run the demo")
arg_parser.add_argument(
Expand All @@ -86,5 +94,7 @@ def iceberg_sink_demo():
iceberg_cdc_demo()
elif args.case == "iceberg-sink":
iceberg_sink_demo()
elif args.case == "iceberg-source":
iceberg_source_demo()
else:
run_demo(args.case, args.format)
2 changes: 1 addition & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::executor::{DataChunk, Executor};
/// async fn test_iceberg_scan() {
/// let iceberg_scan_executor = IcebergScanExecutor::new(
/// IcebergConfig {
/// database_name: "demo_db".into(),
/// database_name: Some("demo_db".into()),
/// table_name: "demo_table".into(),
/// catalog_type: Some("storage".into()),
/// path: "s3a://hummock001/".into(),
Expand Down
Loading

0 comments on commit f36430e

Please sign in to comment.