Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Mar 7, 2024
1 parent a3ef367 commit 91d4774
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 45 deletions.
11 changes: 0 additions & 11 deletions integration_tests/iceberg-sink2/docker/storage/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,4 @@ catalog.type = storage
catalog.name = demo
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1

[source]
connector = iceberg
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1
34 changes: 0 additions & 34 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,6 @@ def init_risingwave_mv(docker):
);
"""
]

if 'source' in config:
source_config = config['source']
source_param = ",\n".join([f"{k}='{v}'" for k, v in source_config.items()])
sqls.append(
f"""
CREATE SOURCE iceberg_source
WITH (
{source_param}
);
"""
)

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
Expand All @@ -139,34 +126,13 @@ def check_spark_table(docker):
result = spark.sql(sql).collect()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"

def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
# execute sql and collect result
cursor.execute(sql)
result = cursor.fetchall()
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
init_spark_table(docker)
init_risingwave_mv(docker)
print("Let risingwave to run")
time.sleep(5)
check_spark_table(docker)
if case == "storage":
check_risingwave_iceberg_source(docker)


if __name__ == "__main__":
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/iceberg-source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# How to run the test

Run following command to run the test:

```bash
cd python
poetry update
poetry run python main.py
```

# How to override risingwave image version:

```bash
export RW_IMAGE=<your version>
```
30 changes: 30 additions & 0 deletions integration_tests/iceberg-source/docker/storage/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[sink]
connector = iceberg
type=append-only
force_append_only = true
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
catalog.name = demo
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1

[source]
connector = iceberg
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.type = storage
warehouse.path = s3://icebergdata/demo
database.name=s1
table.name=t1
75 changes: 75 additions & 0 deletions integration_tests/iceberg-source/docker/storage/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
version: '3.8'

services:
spark:
depends_on:
- minio-0
image: ghcr.io/icelake-io/icelake-spark:0.1
environment:
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: ["/spark-script/spark-connect-server.sh"]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "
/bin/sh -c '
set -e
mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"
'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
119 changes: 119 additions & 0 deletions integration_tests/iceberg-source/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import argparse
import subprocess
from pyspark.sql import SparkSession
import configparser
import psycopg2
import time


def read_config(filename):
config = configparser.ConfigParser()
config.read(filename)
print({section: dict(config[section]) for section in config.sections()})
return config


class DockerCompose(object):
def __init__(self, case_name: str):
self.case_name = case_name

def case_dir(self):
return f"../docker/{self.case_name}"

def get_ip(self, container_name):
return subprocess.check_output([
"docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
container_name], cwd=self.case_dir()).decode("utf-8").rstrip()

def __enter__(self):
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=self.case_dir(), check=False)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=self.case_dir(),
capture_output=True,
check=True)


def init_spark_table(docker):
spark_ip = docker.get_ip(f"{docker.case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()

init_table_sqls = [
"CREATE SCHEMA IF NOT EXISTS s1",
"DROP TABLE IF EXISTS s1.t1",
"""
CREATE TABLE s1.t1
(
id bigint,
name string,
distance bigint
) USING iceberg
TBLPROPERTIES ('format-version'='2');
""",
"""INSERT INTO s1.t1 VALUES (1, 'test', 100);"""
]

for sql in init_table_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def init_risingwave_mv(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

source_config = config['source']
source_param = ",\n".join([f"{k}='{v}'" for k, v in source_config.items()])

sqls = [
f"""
CREATE SOURCE iceberg_source
WITH (
{source_param}
);
"""
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
cursor.execute(sql)

def check_risingwave_iceberg_source(docker):
config = read_config(f"{docker.case_dir()}/config.ini")

sqls = [
"select count(*) from iceberg_source"
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
# execute sql and collect result
cursor.execute(sql)
result = cursor.fetchall()
assert result[0][0] == 1, f"Inserted result is unexpected: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
init_spark_table(docker)
init_risingwave_mv(docker)
print("Let risingwave to run")
time.sleep(5)
check_risingwave_iceberg_source(docker)


if __name__ == "__main__":
case_names = ["storage"]
for case_name in case_names:
print(f"Running test case: {case_name}")
run_case(case_name)
16 changes: 16 additions & 0 deletions integration_tests/iceberg-source/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[tool.poetry]
name = "icelake-integration-tests"
version = "0.0.9"
description = ""
authors = ["Renjie Liu <[email protected]>"]
readme = "README.md"
packages = [{include = "icelake_integration_tests"}]

[tool.poetry.dependencies]
python = "^3.8"
pyspark = { version = "3.4.1", extras = ["sql", "connect"] }
psycopg2-binary = "^2.9"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
11 changes: 11 additions & 0 deletions integration_tests/iceberg-source/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/usr/bin/env bash

# Exits as soon as any line fails.
set -euox pipefail

"$HOME"/.local/bin/poetry --version
cd python
# Don't remove the `--quiet` option since poetry has a bug when printing output, see
# https://github.com/python-poetry/poetry/issues/3412
"$HOME"/.local/bin/poetry update --quiet
"$HOME"/.local/bin/poetry run python main.py

0 comments on commit 91d4774

Please sign in to comment.