-
Notifications
You must be signed in to change notification settings - Fork 591
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Introduce jni catalog to support all catalogs (#14264)
- Loading branch information
1 parent
9aeec08
commit f1a138f
Showing
26 changed files
with
1,057 additions
and
237 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
[default] | ||
aws_key=hummockadmin | ||
aws_secret=hummockadmin | ||
|
||
[risingwave] | ||
db=dev | ||
user=root | ||
host=127.0.0.1 | ||
port=4566 | ||
|
||
[sink] | ||
connector = iceberg | ||
type=append-only | ||
force_append_only = true | ||
catalog.type = rest | ||
catalog.uri = http://rest:8181 | ||
warehouse.path = s3://icebergdata/demo/s1/t1 | ||
s3.endpoint=http://minio-0:9301 | ||
s3.access.key = hummockadmin | ||
s3.secret.key = hummockadmin | ||
s3.region = ap-southeast-1 | ||
database.name=s1 | ||
table.name=s1.t1 |
99 changes: 99 additions & 0 deletions
99
integration_tests/iceberg-sink2/docker/rest/docker-compose.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
version: '3.8' | ||
|
||
services: | ||
rest: | ||
image: tabulario/iceberg-rest:0.6.0 | ||
environment: | ||
- AWS_ACCESS_KEY_ID=hummockadmin | ||
- AWS_SECRET_ACCESS_KEY=hummockadmin | ||
- AWS_REGION=us-east-1 | ||
- CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog | ||
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory | ||
- CATALOG_WAREHOUSE=s3://icebergdata/demo | ||
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO | ||
- CATALOG_S3_ENDPOINT=http://minio-0:9301 | ||
depends_on: | ||
- minio-0 | ||
networks: | ||
- iceberg_net | ||
links: | ||
- minio-0:icebergdata.minio-0 | ||
expose: | ||
- 8181 | ||
|
||
spark: | ||
depends_on: | ||
- minio-0 | ||
- rest | ||
image: ghcr.io/icelake-io/icelake-spark:latest | ||
environment: | ||
- AWS_ACCESS_KEY_ID=hummockadmin | ||
- AWS_SECRET_ACCESS_KEY=hummockadmin | ||
- AWS_REGION=us-east-1 | ||
- SPARK_HOME=/opt/spark | ||
- PYSPARK_PYTHON=/usr/bin/python3.9 | ||
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin | ||
user: root | ||
networks: | ||
iceberg_net: | ||
links: | ||
- minio-0:icebergdata.minio-0 | ||
expose: | ||
- 15002 | ||
healthcheck: | ||
test: netstat -ltn | grep -c 15002 | ||
interval: 1s | ||
retries: 1200 | ||
volumes: | ||
- ./spark-script:/spark-script | ||
entrypoint: ["/spark-script/spark-connect-server.sh"] | ||
|
||
risingwave-standalone: | ||
extends: | ||
file: ../../../../docker/docker-compose.yml | ||
service: risingwave-standalone | ||
healthcheck: | ||
test: | ||
- CMD-SHELL | ||
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;' | ||
interval: 1s | ||
timeout: 30s | ||
networks: | ||
iceberg_net: | ||
|
||
minio-0: | ||
extends: | ||
file: ../../../../docker/docker-compose.yml | ||
service: minio-0 | ||
entrypoint: " | ||
/bin/sh -c ' | ||
set -e | ||
mkdir -p \"/data/icebergdata/demo\" | ||
mkdir -p \"/data/hummock001\" | ||
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\" | ||
'" | ||
networks: | ||
iceberg_net: | ||
|
||
etcd-0: | ||
extends: | ||
file: ../../../../docker/docker-compose.yml | ||
service: etcd-0 | ||
networks: | ||
iceberg_net: | ||
|
||
volumes: | ||
risingwave-standalone: | ||
external: false | ||
etcd-0: | ||
external: false | ||
minio-0: | ||
external: false | ||
|
||
networks: | ||
iceberg_net: |
21 changes: 21 additions & 0 deletions
21
integration_tests/iceberg-sink2/docker/rest/spark-script/spark-connect-server.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#!/bin/bash | ||
|
||
set -ex | ||
|
||
JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':') | ||
|
||
/opt/spark/sbin/start-connect-server.sh \ | ||
--master local[3] \ | ||
--driver-class-path $JARS \ | ||
--conf spark.driver.bindAddress=0.0.0.0 \ | ||
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ | ||
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ | ||
--conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \ | ||
--conf spark.sql.catalog.demo.uri=http://rest:8181 \ | ||
--conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \ | ||
--conf spark.sql.catalog.demo.s3.path.style.access=true \ | ||
--conf spark.sql.catalog.demo.s3.access.key=hummockadmin \ | ||
--conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \ | ||
--conf spark.sql.defaultCatalog=demo | ||
|
||
tail -f /opt/spark/logs/spark*.out |
Empty file modified
0
integration_tests/iceberg-sink2/docker/spark-script/spark-connect-server.sh
100644 → 100755
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
import argparse | ||
import subprocess | ||
from pyspark.sql import SparkSession | ||
import configparser | ||
import psycopg2 | ||
import time | ||
|
||
|
||
def case_dir(case_name): | ||
return f"../docker/{case_name}" | ||
|
||
|
||
def start_docker(case_name): | ||
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=case_dir(case_name), check=False) | ||
|
||
|
||
def stop_docker(case_name): | ||
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=case_dir(case_name), | ||
capture_output=True, | ||
check=True) | ||
|
||
|
||
def get_ip(case_name, container_name): | ||
return subprocess.check_output(["docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{" | ||
"end}}", | ||
container_name], cwd=case_dir(case_name)).decode("utf-8").rstrip() | ||
|
||
|
||
def init_spark_table(case_name): | ||
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1") | ||
url = f"sc://{spark_ip}:15002" | ||
print(f"Spark url is {url}") | ||
spark = SparkSession.builder.remote(url).getOrCreate() | ||
|
||
init_table_sqls = [ | ||
"CREATE SCHEMA IF NOT EXISTS s1", | ||
"DROP TABLE IF EXISTS s1.t1", | ||
""" | ||
CREATE TABLE s1.t1 | ||
( | ||
id bigint, | ||
name string, | ||
distance bigint | ||
) USING iceberg | ||
TBLPROPERTIES ('format-version'='2'); | ||
""", | ||
"""INSERT INTO s1.t1 VALUES (1, 'test', 100);""" | ||
] | ||
|
||
for sql in init_table_sqls: | ||
print(f"Executing sql: {sql}") | ||
spark.sql(sql) | ||
|
||
|
||
def init_risingwave_mv(config): | ||
aws_key = config['default']['aws_key'] | ||
aws_secret = config['default']['aws_secret'] | ||
|
||
sink_config = config['sink'] | ||
sink_param = ",\n".join([f"{k}='{v}'" for k, v in sink_config.items()]) | ||
sqls = [ | ||
"set streaming_parallelism = 4", | ||
""" | ||
CREATE SOURCE bid ( | ||
"id" BIGINT, | ||
"name" VARCHAR, | ||
"distance" BIGINT, | ||
) with ( | ||
connector = 'datagen', | ||
datagen.split.num = '4', | ||
fields.id.kind = 'random', | ||
fields.id.min = '0', | ||
fields.id.max = '1000000', | ||
fields.id.seed = '100', | ||
fields.name.kind = 'random', | ||
fields.name.length = '15', | ||
fields.name.seed = '100', | ||
fields.distance.kind = 'random', | ||
fields.distance.min = '0', | ||
fields.distance.max = '100000', | ||
fields.distance.seed = '100', | ||
datagen.rows.per.second = '500' | ||
) FORMAT PLAIN ENCODE JSON; | ||
""", | ||
f""" | ||
CREATE SINK s1 | ||
AS SELECT * FROM bid | ||
WITH ( | ||
{sink_param} | ||
); | ||
""" | ||
] | ||
|
||
rw_config = config['risingwave'] | ||
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'], | ||
port=rw_config['port']) as conn: | ||
with conn.cursor() as cursor: | ||
for sql in sqls: | ||
print(f"Executing sql {sql}") | ||
cursor.execute(sql) | ||
|
||
|
||
def check_spark_table(case_name): | ||
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1") | ||
url = f"sc://{spark_ip}:15002" | ||
print(f"Spark url is {url}") | ||
spark = SparkSession.builder.remote(url).getOrCreate() | ||
|
||
sqls = [ | ||
"SELECT COUNT(*) FROM s1.t1" | ||
] | ||
|
||
for sql in sqls: | ||
print(f"Executing sql: {sql}") | ||
result = spark.sql(sql).collect() | ||
print(f"Result is {result}") | ||
|
||
|
||
if __name__ == "__main__": | ||
case_name = "rest" | ||
config = configparser.ConfigParser() | ||
config.read(f"{case_dir(case_name)}/config.ini") | ||
print({section: dict(config[section]) for section in config.sections()}) | ||
start_docker(case_name) | ||
print("Waiting for docker to be ready") | ||
init_spark_table(case_name) | ||
init_risingwave_mv(config) | ||
print("Let risingwave to run") | ||
time.sleep(3) | ||
check_spark_table(case_name) | ||
stop_docker(case_name) |
Oops, something went wrong.