Skip to content

Commit

Permalink
Finish
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Feb 22, 2024
1 parent 20f6958 commit eece175
Show file tree
Hide file tree
Showing 19 changed files with 134 additions and 231 deletions.
1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
'mysql-sink': ['json'],
'postgres-sink': ['json'],
'iceberg-cdc': ['json'],
'iceberg-sink': ['none'],
'twitter': ['json', 'protobuf'],
'twitter-pulsar': ['json'],
'debezium-mysql': ['json'],
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-ghcr.io/risingwavelabs/risingwave:git-9d3a3a58a182ed60ed1ff4ecc02a6c6489ace0a3}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
services:
risingwave-standalone:
<<: *image
Expand Down
20 changes: 14 additions & 6 deletions integration_tests/iceberg-sink2/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
Use following steps to run:
# How to run the test

1. ./risedev d full-iceberg-bench
2. cd docker; docker compose up -d
3. poetry update
4. poetry run python init.py
5. poetry run python check.py
Run following command to run the test:

```bash
cd python
poetry update
poetry run python main.py
```

# How to override risingwave iamge version:

Check warning on line 11 in integration_tests/iceberg-sink2/README.md

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"iamge" should be "image".

```bash
export RW_IMAGE=<your version>
```
28 changes: 0 additions & 28 deletions integration_tests/iceberg-sink2/docker/docker-compose.yml

This file was deleted.

4 changes: 0 additions & 4 deletions integration_tests/iceberg-sink2/docker/hive/config.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
Expand Down
4 changes: 0 additions & 4 deletions integration_tests/iceberg-sink2/docker/jdbc/config.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
Expand Down
4 changes: 0 additions & 4 deletions integration_tests/iceberg-sink2/docker/rest/config.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
Expand Down

This file was deleted.

18 changes: 0 additions & 18 deletions integration_tests/iceberg-sink2/docker/spark-script/init-table.sql

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

4 changes: 0 additions & 4 deletions integration_tests/iceberg-sink2/docker/storage/config.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
Expand Down
77 changes: 41 additions & 36 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,37 @@
import time


def case_dir(case_name):
return f"../docker/{case_name}"
def read_config(filename):
config = configparser.ConfigParser()
config.read(filename)
print({section: dict(config[section]) for section in config.sections()})
return config


def start_docker(case_name):
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=case_dir(case_name), check=False)
class DockerCompose(object):
def __init__(self, case_name: str):
self.case_name = case_name

def case_dir(self):
return f"../docker/{self.case_name}"

def stop_docker(case_name):
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=case_dir(case_name),
capture_output=True,
check=True)
def get_ip(self, container_name):
return subprocess.check_output([
"docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
container_name], cwd=self.case_dir()).decode("utf-8").rstrip()

def __enter__(self):
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=self.case_dir(), check=False)
return self

def get_ip(case_name, container_name):
return subprocess.check_output(["docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{"
"end}}",
container_name], cwd=case_dir(case_name)).decode("utf-8").rstrip()
def __exit__(self, exc_type, exc_val, exc_tb):
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=self.case_dir(),
capture_output=True,
check=True)


def init_spark_table(case_name):
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1")
def init_spark_table(docker):
spark_ip = docker.get_ip(f"{docker.case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()
Expand All @@ -52,10 +61,8 @@ def init_spark_table(case_name):
spark.sql(sql)


def init_risingwave_mv(config):
aws_key = config['default']['aws_key']
aws_secret = config['default']['aws_secret']

def init_risingwave_mv(docker):
config = read_config(f"{docker.case_dir()}/config.ini")
sink_config = config['sink']
sink_param = ",\n".join([f"{k}='{v}'" for k, v in sink_config.items()])
sqls = [
Expand Down Expand Up @@ -105,8 +112,8 @@ def init_risingwave_mv(config):
cursor.execute(sql)


def check_spark_table(case_name):
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1")
def check_spark_table(docker):
spark_ip = docker.get_ip(f"{docker.case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()
Expand All @@ -118,22 +125,20 @@ def check_spark_table(case_name):
for sql in sqls:
print(f"Executing sql: {sql}")
result = spark.sql(sql).collect()
print(f"Result is {result}")
assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed"


def run_case(case):
with DockerCompose(case) as docker:
init_spark_table(docker)
init_risingwave_mv(docker)
print("Let risingwave to run")
time.sleep(5)
check_spark_table(docker)


if __name__ == "__main__":
case_name = "rest"
# case_name = "storage"
# case_name = "jdbc"
# case_name = "hive"
config = configparser.ConfigParser()
config.read(f"{case_dir(case_name)}/config.ini")
print({section: dict(config[section]) for section in config.sections()})
start_docker(case_name)
print("Waiting for docker to be ready")
init_spark_table(case_name)
init_risingwave_mv(config)
print("Let risingwave to run")
time.sleep(3)
check_spark_table(case_name)
stop_docker(case_name)
case_names = ["rest", "storage", "jdbc", "hive"]
for case_name in case_names:
print(f"Running test case: {case_name}")
run_case(case_name)
Loading

0 comments on commit eece175

Please sign in to comment.