Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce jni catalog to support all catalogs #14264

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e_test/iceberg/start_spark_connect_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tar -xzf $SPARK_FILE --no-same-owner
--master local[3] \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://127.0.0.1:9301 \
Expand Down
6 changes: 4 additions & 2 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ CREATE SINK s1 AS select * from products WITH (
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand All @@ -38,6 +38,8 @@ CREATE SINK s1 AS select * from products WITH (
statement ok
flush;

sleep 20s

query I
select count(*) from products;
----
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_sink_append_only.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ CREATE SINK s6 AS select * from mv6 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_sink_upsert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
2 changes: 1 addition & 1 deletion e2e_test/iceberg/test_case/no_partition_append_only.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ init_sqls = [
v_date date,
v_timestamp timestamp,
v_ts_ntz timestamp_ntz
) TBLPROPERTIES ('format-version'='2');
) USING iceberg TBLPROPERTIES ('format-version'='2');
'''
]

Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
s3.secret.key = 'hummockadmin',
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
database.name='demo_db',
table.name='demo_table'
);

statement ok
Expand Down
23 changes: 23 additions & 0 deletions integration_tests/iceberg-sink2/docker/rest/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[default]
aws_key=hummockadmin
aws_secret=hummockadmin

[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[sink]
connector = iceberg
type=append-only
force_append_only = true
catalog.type = rest
catalog.uri = http://rest:8181
warehouse.path = s3://icebergdata/demo/s1/t1
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
database.name=s1
table.name=s1.t1
99 changes: 99 additions & 0 deletions integration_tests/iceberg-sink2/docker/rest/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
version: '3.8'

services:
rest:
image: tabulario/iceberg-rest:0.6.0
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
- CATALOG_WAREHOUSE=s3://icebergdata/demo
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio-0:9301
depends_on:
- minio-0
networks:
- iceberg_net
links:
- minio-0:icebergdata.minio-0
expose:
- 8181

spark:
depends_on:
- minio-0
- rest
image: ghcr.io/icelake-io/icelake-spark:latest
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: ["/spark-script/spark-connect-server.sh"]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "

/bin/sh -c '

set -e

mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"

/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"

'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.demo.uri=http://rest:8181 \
--conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.s3.path.style.access=true \
--conf spark.sql.catalog.demo.s3.access.key=hummockadmin \
--conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
Empty file.
136 changes: 136 additions & 0 deletions integration_tests/iceberg-sink2/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import argparse
import subprocess
from pyspark.sql import SparkSession
import configparser
import psycopg2
import time


def case_dir(case_name):
return f"../docker/{case_name}"


def start_docker(case_name):
subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=case_dir(case_name), check=False)


def stop_docker(case_name):
subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=case_dir(case_name),
capture_output=True,
check=True)


def get_ip(case_name, container_name):
return subprocess.check_output(["docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{"
"end}}",
container_name], cwd=case_dir(case_name)).decode("utf-8").rstrip()


def init_spark_table(case_name):
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()

init_table_sqls = [
"CREATE SCHEMA IF NOT EXISTS s1",
"DROP TABLE IF EXISTS s1.t1",
"""
CREATE TABLE s1.t1
(
id bigint,
name string,
distance bigint
) USING iceberg
TBLPROPERTIES ('format-version'='2');
""",
"""INSERT INTO s1.t1 VALUES (1, 'test', 100);"""
]

for sql in init_table_sqls:
print(f"Executing sql: {sql}")
spark.sql(sql)


def init_risingwave_mv(config):
aws_key = config['default']['aws_key']
aws_secret = config['default']['aws_secret']

sink_config = config['sink']
sink_param = ",\n".join([f"{k}='{v}'" for k, v in sink_config.items()])
sqls = [
"set streaming_parallelism = 4",
"""
CREATE SOURCE bid (
"id" BIGINT,
"name" VARCHAR,
"distance" BIGINT,
) with (
connector = 'datagen',
datagen.split.num = '4',


fields.id.kind = 'random',
fields.id.min = '0',
fields.id.max = '1000000',
fields.id.seed = '100',

fields.name.kind = 'random',
fields.name.length = '15',
fields.name.seed = '100',

fields.distance.kind = 'random',
fields.distance.min = '0',
fields.distance.max = '100000',
fields.distance.seed = '100',

datagen.rows.per.second = '500'
) FORMAT PLAIN ENCODE JSON;
""",
f"""
CREATE SINK s1
AS SELECT * FROM bid
WITH (
{sink_param}
);
"""
]

rw_config = config['risingwave']
with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'],
port=rw_config['port']) as conn:
with conn.cursor() as cursor:
for sql in sqls:
print(f"Executing sql {sql}")
cursor.execute(sql)


def check_spark_table(case_name):
spark_ip = get_ip(case_dir(case_name), f"{case_name}-spark-1")
url = f"sc://{spark_ip}:15002"
print(f"Spark url is {url}")
spark = SparkSession.builder.remote(url).getOrCreate()

sqls = [
"SELECT COUNT(*) FROM s1.t1"
]

for sql in sqls:
print(f"Executing sql: {sql}")
result = spark.sql(sql).collect()
print(f"Result is {result}")


if __name__ == "__main__":
case_name = "rest"
config = configparser.ConfigParser()
config.read(f"{case_dir(case_name)}/config.ini")
print({section: dict(config[section]) for section in config.sections()})
start_docker(case_name)
print("Waiting for docker to be ready")
init_spark_table(case_name)
init_risingwave_mv(config)
print("Let risingwave to run")
time.sleep(3)
check_spark_table(case_name)
stop_docker(case_name)
Loading
Loading