diff --git a/ci/scripts/gen-integration-test-yaml.py b/ci/scripts/gen-integration-test-yaml.py index 8f39ab6edb180..8dbe604aed409 100644 --- a/ci/scripts/gen-integration-test-yaml.py +++ b/ci/scripts/gen-integration-test-yaml.py @@ -16,6 +16,7 @@ 'postgres-sink': ['json'], 'iceberg-cdc': ['json'], 'iceberg-sink': ['none'], + 'iceberg-source': ['none'], 'twitter': ['json', 'protobuf'], 'twitter-pulsar': ['json'], 'debezium-mysql': ['json'], diff --git a/integration_tests/iceberg-sink2/python/main.py b/integration_tests/iceberg-sink2/python/main.py index ff0c90d4cf752..97b5d52d8f517 100644 --- a/integration_tests/iceberg-sink2/python/main.py +++ b/integration_tests/iceberg-sink2/python/main.py @@ -102,7 +102,6 @@ def init_risingwave_mv(docker): ); """ ] - rw_config = config['risingwave'] with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'], port=rw_config['port']) as conn: @@ -127,7 +126,6 @@ def check_spark_table(docker): result = spark.sql(sql).collect() assert result[0][0] > 100, f"Inserted result is too small: {result[0][0]}, test failed" - def run_case(case): with DockerCompose(case) as docker: init_spark_table(docker) diff --git a/integration_tests/iceberg-source/README.md b/integration_tests/iceberg-source/README.md new file mode 100644 index 0000000000000..496111fca71fe --- /dev/null +++ b/integration_tests/iceberg-source/README.md @@ -0,0 +1,15 @@ +# How to run the test + +Run following command to run the test: + +```bash +cd python +poetry update +poetry run python main.py +``` + +# How to override risingwave image version: + +```bash +export RW_IMAGE= +``` \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/storage/config.ini b/integration_tests/iceberg-source/docker/storage/config.ini new file mode 100644 index 0000000000000..dd795fd3ef684 --- /dev/null +++ b/integration_tests/iceberg-source/docker/storage/config.ini @@ -0,0 +1,16 @@ +[risingwave] +db=dev +user=root +host=127.0.0.1 +port=4566 + +[source] +connector = iceberg +s3.endpoint=http://minio-0:9301 +s3.access.key = hummockadmin +s3.secret.key = hummockadmin +s3.region = ap-southeast-1 +catalog.type = storage +warehouse.path = s3://icebergdata/demo +database.name=s1 +table.name=t1 \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/storage/docker-compose.yml b/integration_tests/iceberg-source/docker/storage/docker-compose.yml new file mode 100644 index 0000000000000..8d6d6f72d53c5 --- /dev/null +++ b/integration_tests/iceberg-source/docker/storage/docker-compose.yml @@ -0,0 +1,75 @@ +version: '3.8' + +services: + spark: + depends_on: + - minio-0 + image: ghcr.io/icelake-io/icelake-spark:0.1 + environment: + - SPARK_HOME=/opt/spark + - PYSPARK_PYTHON=/usr/bin/python3.9 + - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + user: root + networks: + iceberg_net: + links: + - minio-0:icebergdata.minio-0 + expose: + - 15002 + healthcheck: + test: netstat -ltn | grep -c 15002 + interval: 1s + retries: 1200 + volumes: + - ./spark-script:/spark-script + entrypoint: ["/spark-script/spark-connect-server.sh"] + + risingwave-standalone: + extends: + file: ../../../../docker/docker-compose.yml + service: risingwave-standalone + healthcheck: + test: + - CMD-SHELL + - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;' + interval: 1s + timeout: 30s + networks: + iceberg_net: + + minio-0: + extends: + file: ../../../../docker/docker-compose.yml + service: minio-0 + entrypoint: " + + /bin/sh -c ' + + set -e + + mkdir -p \"/data/icebergdata/demo\" + mkdir -p \"/data/hummock001\" + + /usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\" + + '" + networks: + iceberg_net: + + etcd-0: + extends: + file: ../../../../docker/docker-compose.yml + service: etcd-0 + networks: + iceberg_net: + +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + minio-0: + external: false + +networks: + iceberg_net: \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/storage/spark-script/spark-connect-server.sh b/integration_tests/iceberg-source/docker/storage/spark-script/spark-connect-server.sh new file mode 100755 index 0000000000000..d37ed983fc236 --- /dev/null +++ b/integration_tests/iceberg-source/docker/storage/spark-script/spark-connect-server.sh @@ -0,0 +1,21 @@ +#!/bin/bash + +set -ex + +JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':') + +/opt/spark/sbin/start-connect-server.sh \ + --master local[3] \ + --driver-class-path $JARS \ + --conf spark.driver.bindAddress=0.0.0.0 \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.demo.type=hadoop \ + --conf spark.sql.catalog.demo.warehouse=s3a://icebergdata/demo \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=http://minio-0:9301 \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=hummockadmin \ + --conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=hummockadmin \ + --conf spark.sql.defaultCatalog=demo + +tail -f /opt/spark/logs/spark*.out \ No newline at end of file diff --git a/integration_tests/iceberg-source/python/main.py b/integration_tests/iceberg-source/python/main.py new file mode 100644 index 0000000000000..fd2ebcbe5f8c5 --- /dev/null +++ b/integration_tests/iceberg-source/python/main.py @@ -0,0 +1,119 @@ +import argparse +import subprocess +from pyspark.sql import SparkSession +import configparser +import psycopg2 +import time + + +def read_config(filename): + config = configparser.ConfigParser() + config.read(filename) + print({section: dict(config[section]) for section in config.sections()}) + return config + + +class DockerCompose(object): + def __init__(self, case_name: str): + self.case_name = case_name + + def case_dir(self): + return f"../docker/{self.case_name}" + + def get_ip(self, container_name): + return subprocess.check_output([ + "docker", "inspect", "-f", "{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}", + container_name], cwd=self.case_dir()).decode("utf-8").rstrip() + + def __enter__(self): + subprocess.run(["docker-compose", "up", "-d", "--wait"], cwd=self.case_dir(), check=False) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + subprocess.run(["docker", "compose", "down", "-v", "--remove-orphans"], cwd=self.case_dir(), + capture_output=True, + check=True) + + +def init_spark_table(docker): + spark_ip = docker.get_ip(f"{docker.case_name}-spark-1") + url = f"sc://{spark_ip}:15002" + print(f"Spark url is {url}") + spark = SparkSession.builder.remote(url).getOrCreate() + + init_table_sqls = [ + "CREATE SCHEMA IF NOT EXISTS s1", + "DROP TABLE IF EXISTS s1.t1", + """ + CREATE TABLE s1.t1 + ( + id bigint, + name string, + distance bigint + ) USING iceberg + TBLPROPERTIES ('format-version'='2'); + """, + """INSERT INTO s1.t1 VALUES (1, 'test', 100);""" + ] + + for sql in init_table_sqls: + print(f"Executing sql: {sql}") + spark.sql(sql) + + +def init_risingwave_source(docker): + config = read_config(f"{docker.case_dir()}/config.ini") + + source_config = config['source'] + source_param = ",\n".join([f"{k}='{v}'" for k, v in source_config.items()]) + + sqls = [ + f""" + CREATE SOURCE iceberg_source + WITH ( + {source_param} + ); + """ + ] + + rw_config = config['risingwave'] + with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'], + port=rw_config['port']) as conn: + with conn.cursor() as cursor: + for sql in sqls: + print(f"Executing sql {sql}") + cursor.execute(sql) + +def check_risingwave_iceberg_source(docker): + config = read_config(f"{docker.case_dir()}/config.ini") + + sqls = [ + "select count(*) from iceberg_source" + ] + + rw_config = config['risingwave'] + with psycopg2.connect(database=rw_config['db'], user=rw_config['user'], host=rw_config['host'], + port=rw_config['port']) as conn: + with conn.cursor() as cursor: + for sql in sqls: + print(f"Executing sql {sql}") + # execute sql and collect result + cursor.execute(sql) + result = cursor.fetchall() + assert result[0][0] == 1, f"Inserted result is unexpected: {result[0][0]}, test failed" + + +def run_case(case): + with DockerCompose(case) as docker: + init_spark_table(docker) + init_risingwave_source(docker) + print("Let risingwave to run") + time.sleep(5) + check_risingwave_iceberg_source(docker) + + +if __name__ == "__main__": + case_names = ["storage"] + for case_name in case_names: + print(f"Running test case: {case_name}") + run_case(case_name) diff --git a/integration_tests/iceberg-source/python/pyproject.toml b/integration_tests/iceberg-source/python/pyproject.toml new file mode 100644 index 0000000000000..6a8e06b62215e --- /dev/null +++ b/integration_tests/iceberg-source/python/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "icelake-integration-tests" +version = "0.0.9" +description = "" +authors = ["Renjie Liu "] +readme = "README.md" +packages = [{include = "icelake_integration_tests"}] + +[tool.poetry.dependencies] +python = "^3.8" +pyspark = { version = "3.4.1", extras = ["sql", "connect"] } +psycopg2-binary = "^2.9" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/integration_tests/iceberg-source/run.sh b/integration_tests/iceberg-source/run.sh new file mode 100755 index 0000000000000..d58973f6c7c8f --- /dev/null +++ b/integration_tests/iceberg-source/run.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash + +# Exits as soon as any line fails. +set -euox pipefail + +"$HOME"/.local/bin/poetry --version +cd python +# Don't remove the `--quiet` option since poetry has a bug when printing output, see +# https://github.com/python-poetry/poetry/issues/3412 +"$HOME"/.local/bin/poetry update --quiet +"$HOME"/.local/bin/poetry run python main.py \ No newline at end of file diff --git a/integration_tests/scripts/check_data.py b/integration_tests/scripts/check_data.py index 62b887e1b724e..d165bed4ea6af 100644 --- a/integration_tests/scripts/check_data.py +++ b/integration_tests/scripts/check_data.py @@ -109,7 +109,7 @@ def test_check(demo: str, upstream: str, need_data_check=True, need_sink_check=F demo = sys.argv[1] upstream = sys.argv[2] # mysql, postgres, etc. see scripts/integration_tests.sh -if demo in ['docker', 'iceberg-cdc', 'iceberg-sink']: +if demo in ['docker', 'iceberg-cdc', 'iceberg-sink', 'iceberg-source']: print('Skip for running test for `%s`' % demo) sys.exit(0) diff --git a/integration_tests/scripts/clean_demos.py b/integration_tests/scripts/clean_demos.py index 2a884832395fa..322c97d4a3591 100644 --- a/integration_tests/scripts/clean_demos.py +++ b/integration_tests/scripts/clean_demos.py @@ -25,7 +25,7 @@ def clean_demo(demo: str): args = arg_parser.parse_args() demo = args.case -if demo in ['iceberg-sink']: +if demo in ['iceberg-sink', 'iceberg-source']: print('Skip for running test for `%s`' % demo) sys.exit(0) diff --git a/integration_tests/scripts/run_demos.py b/integration_tests/scripts/run_demos.py index d6fd4ecbed9c3..b18e016c5a116 100644 --- a/integration_tests/scripts/run_demos.py +++ b/integration_tests/scripts/run_demos.py @@ -66,6 +66,14 @@ def iceberg_sink_demo(): print("Running demo: iceberg-sink2") subprocess.run(["bash", "./run.sh"], cwd=demo_dir, check=True) +def iceberg_source_demo(): + demo = "iceberg-source" + file_dir = dirname(abspath(__file__)) + project_dir = dirname(file_dir) + demo_dir = os.path.join(project_dir, demo) + print("Running demo: iceberg-source") + subprocess.run(["bash", "./run.sh"], cwd=demo_dir, check=True) + arg_parser = argparse.ArgumentParser(description="Run the demo") arg_parser.add_argument( @@ -86,5 +94,7 @@ def iceberg_sink_demo(): iceberg_cdc_demo() elif args.case == "iceberg-sink": iceberg_sink_demo() +elif args.case == "iceberg-source": + iceberg_source_demo() else: run_demo(args.case, args.format) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 9c24e554c8e1f..e20b1aaedb8fe 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -41,7 +41,7 @@ use crate::executor::{DataChunk, Executor}; /// async fn test_iceberg_scan() { /// let iceberg_scan_executor = IcebergScanExecutor::new( /// IcebergConfig { -/// database_name: "demo_db".into(), +/// database_name: Some("demo_db".into()), /// table_name: "demo_table".into(), /// catalog_type: Some("storage".into()), /// path: "s3a://hummock001/".into(), diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 776ab65a05540..76c64f2f19fab 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -35,9 +35,9 @@ pub const ICEBERG_CONNECTOR: &str = "iceberg"; #[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] pub struct IcebergProperties { #[serde(rename = "catalog.type")] - pub catalog_type: String, + pub catalog_type: Option, #[serde(rename = "s3.region")] - pub region_name: String, + pub region: Option, #[serde(rename = "s3.endpoint", default)] pub endpoint: String, #[serde(rename = "s3.access.key", default)] @@ -46,8 +46,12 @@ pub struct IcebergProperties { pub s3_secret: String, #[serde(rename = "warehouse.path")] pub warehouse_path: String, + // Catalog name, can be omitted for storage catalog, but + // must be set for other catalogs. + #[serde(rename = "catalog.name")] + pub catalog_name: Option, #[serde(rename = "database.name")] - pub database_name: String, + pub database_name: Option, #[serde(rename = "table.name")] pub table_name: String, @@ -58,14 +62,15 @@ pub struct IcebergProperties { impl IcebergProperties { pub fn to_iceberg_config(&self) -> IcebergConfig { IcebergConfig { - database_name: Some(self.database_name.clone()), + catalog_name: self.catalog_name.clone(), + database_name: self.database_name.clone(), table_name: self.table_name.clone(), - catalog_type: Some(self.catalog_type.clone()), + catalog_type: self.catalog_type.clone(), path: self.warehouse_path.clone(), endpoint: Some(self.endpoint.clone()), access_key: self.s3_access.clone(), secret_key: self.s3_secret.clone(), - region: Some(self.region_name.clone()), + region: self.region.clone(), ..Default::default() } } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index dec3cf6a8941a..a36557c0d7ee1 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -37,10 +37,10 @@ IcebergProperties: fields: - name: catalog.type field_type: String - required: true + required: false - name: s3.region field_type: String - required: true + required: false - name: s3.endpoint field_type: String required: false @@ -56,9 +56,12 @@ IcebergProperties: - name: warehouse.path field_type: String required: true + - name: catalog.name + field_type: String + required: false - name: database.name field_type: String - required: true + required: false - name: table.name field_type: String required: true