Skip to content

Commit

Permalink
feat: Introduce jni catalog to support all catalogs
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Jan 17, 2024
1 parent 8a542ad commit 8c3f4b7
Show file tree
Hide file tree
Showing 28 changed files with 1,062 additions and 242 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ deltalake = { git = "https://github.com/risingwavelabs/delta-rs", rev = "5c2dccd
"s3-no-concurrent-write",
] }
parquet = "49"
thiserror-ext = "0.0.11"
thiserror-ext = "0.0.10"
tikv-jemalloc-ctl = { git = "https://github.com/risingwavelabs/jemallocator.git", rev = "64a2d9" }
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
"profiling",
Expand Down
1 change: 1 addition & 0 deletions e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tar -xzf $SPARK_FILE --no-same-owner
--master local[3] \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
Expand Down
6 changes: 4 additions & 2 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ CREATE SINK s1 AS select * from products WITH (
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand All @@ -38,6 +38,8 @@ CREATE SINK s1 AS select * from products WITH (
statement ok
flush;

sleep 20s

query I
select count(*) from products;
----
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_sink_append_only.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ CREATE SINK s6 AS select * from mv6 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_sink_upsert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ init_sqls = [
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
) TBLPROPERTIES ('format-version'='2');
) USING iceberg TBLPROPERTIES ('format-version'='2');
'''
]

Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
s3.secret.key = 'hummockadmin',
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
database.name='demo_db',
table.name='demo_table'
);

statement ok
Expand Down
23 changes: 23 additions & 0 deletions integration_tests/iceberg-sink2/docker/rest/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[sink]
connector = iceberg
type=append-only
force_append_only = true
catalog.type = rest
catalog.uri = http://rest:8181
warehouse.path = s3://icebergdata/demo/s1/t1
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
database.name=s1
table.name=s1.t1
99 changes: 99 additions & 0 deletions integration_tests/iceberg-sink2/docker/rest/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: '3.8'

services:
rest:
image: tabulario/iceberg-rest:0.6.0
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
- CATALOG_WAREHOUSE=s3://icebergdata/demo
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio-0:9301
depends_on:
- minio-0
networks:
- iceberg_net
links:
- minio-0:icebergdata.minio-0
expose:
- 8181

spark:
depends_on:
- minio-0
- rest
image: ghcr.io/icelake-io/icelake-spark:latest
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: ["/spark-script/spark-connect-server.sh"]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "
/bin/sh -c '
set -e
mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"
'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.demo.uri=http://rest:8181 \
--conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.s3.path.style.access=true \
--conf spark.sql.catalog.demo.s3.access.key=hummockadmin \
--conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
Empty file.
136 changes: 136 additions & 0 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import argparse
import subprocess
from pyspark.sql import SparkSession
import configparser
import psycopg2
import time


def case_dir(case_name):
return f"../docker/{case_name}"


def start_docker(case_name):
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=case_dir(case_name), check=False)


def stop_docker(case_name):
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=case_dir(case_name),
capture_output=True,
check=True)


def get_ip(case_name, container_name):
return subprocess.check_output(["docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{"
"end}}",
container_name], cwd=case_dir(case_name)).decode("utf-8").rstrip()


def init_spark_table(case_name):
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()

init_table_sqls = [
"CREATE SCHEMA IF NOT EXISTS s1",
"DROP TABLE IF EXISTS s1.t1",
"""
CREATE TABLE s1.t1
(
id bigint,
name string,
distance bigint
) USING iceberg
TBLPROPERTIES ('format-version'='2');
""",
"""INSERT INTO s1.t1 VALUES (1, 'test', 100);"""
]

for sql in init_table_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def init_risingwave_mv(config):
aws_key = config['default']['aws_key']
aws_secret = config['default']['aws_secret']

sink_config = config['sink']
sink_param = ",\n".join([f"{k}='{v}'" for k, v in sink_config.items()])
sqls = [
"set streaming_parallelism = 4",
"""
CREATE SOURCE bid (
"id" BIGINT,
"name" VARCHAR,
"distance" BIGINT,
) with (
connector = 'datagen',
datagen.split.num = '4',
fields.id.kind = 'random',
fields.id.min = '0',
fields.id.max = '1000000',
fields.id.seed = '100',
fields.name.kind = 'random',
fields.name.length = '15',
fields.name.seed = '100',
fields.distance.kind = 'random',
fields.distance.min = '0',
fields.distance.max = '100000',
fields.distance.seed = '100',
datagen.rows.per.second = '500'
) FORMAT PLAIN ENCODE JSON;
""",
f"""
CREATE SINK s1
AS SELECT * FROM bid
WITH (
{sink_param}
);
"""
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
cursor.execute(sql)


def check_spark_table(case_name):
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()

sqls = [
"SELECT COUNT(*) FROM s1.t1"
]

for sql in sqls:
print(f"Executing sql: {sql}")
result = spark.sql(sql).collect()
print(f"Result is {result}")


if __name__ == "__main__":
case_name = "rest"
config = configparser.ConfigParser()
config.read(f"{case_dir(case_name)}/config.ini")
print({section: dict(config[section]) for section in config.sections()})
start_docker(case_name)
print("Waiting for docker to be ready")
init_spark_table(case_name)
init_risingwave_mv(config)
print("Let risingwave to run")
time.sleep(3)
check_spark_table(case_name)
stop_docker(case_name)
Loading

0 comments on commit 8c3f4b7

Please sign in to comment.