From b4d8d1621bb9d60d8d173d8f0550c11cd5cf75e2 Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 8 Mar 2024 20:33:13 +0800 Subject: [PATCH] feat(batch): support jdbc catalog for iceberg source (#15551) --- .../iceberg-source/docker/jdbc/config.ini | 20 ++++ .../docker/jdbc/docker-compose.yml | 96 +++++++++++++++++++ .../jdbc/spark-script/spark-connect-server.sh | 25 +++++ .../iceberg-source/python/main.py | 2 +- src/connector/src/source/iceberg/mod.rs | 13 +++ src/connector/with_options_source.yaml | 6 ++ 6 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 integration_tests/iceberg-source/docker/jdbc/config.ini create mode 100644 integration_tests/iceberg-source/docker/jdbc/docker-compose.yml create mode 100755 integration_tests/iceberg-source/docker/jdbc/spark-script/spark-connect-server.sh diff --git a/integration_tests/iceberg-source/docker/jdbc/config.ini b/integration_tests/iceberg-source/docker/jdbc/config.ini new file mode 100644 index 0000000000000..afc13cf6b545e --- /dev/null +++ b/integration_tests/iceberg-source/docker/jdbc/config.ini @@ -0,0 +1,20 @@ +[risingwave] +db=dev +user=root +host=127.0.0.1 +port=4566 + +[source] +connector = iceberg +warehouse.path = s3://icebergdata/demo +s3.endpoint=http://minio-0:9301 +s3.access.key = hummockadmin +s3.secret.key = hummockadmin +s3.region = ap-southeast-1 +catalog.name = demo +catalog.type = jdbc +catalog.uri = jdbc:postgresql://postgres:5432/iceberg +catalog.jdbc.user = admin +catalog.jdbc.password = 123456 +database.name=s1 +table.name=t1 \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/jdbc/docker-compose.yml b/integration_tests/iceberg-source/docker/jdbc/docker-compose.yml new file mode 100644 index 0000000000000..3f2bb75479563 --- /dev/null +++ b/integration_tests/iceberg-source/docker/jdbc/docker-compose.yml @@ -0,0 +1,96 @@ +version: '3.8' + +services: + postgres: + image: postgres:16.1 + environment: + POSTGRES_USER: admin + POSTGRES_PASSWORD: 123456 + POSTGRES_DB: iceberg + expose: + - 5432 + ports: + - "5432:5432" + networks: + iceberg_net: + + spark: + depends_on: + - minio-0 + - postgres + image: ghcr.io/icelake-io/icelake-spark:0.1 + environment: + - AWS_ACCESS_KEY_ID=hummockadmin + - AWS_SECRET_ACCESS_KEY=hummockadmin + - AWS_REGION=us-east-1 + - SPARK_HOME=/opt/spark + - PYSPARK_PYTHON=/usr/bin/python3.9 + - PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin + user: root + networks: + iceberg_net: + links: + - minio-0:icebergdata.minio-0 + expose: + - 15002 + healthcheck: + test: netstat -ltn | grep -c 15002 + interval: 1s + retries: 1200 + volumes: + - ./spark-script:/spark-script + entrypoint: [ "/spark-script/spark-connect-server.sh" ] + + risingwave-standalone: + extends: + file: ../../../../docker/docker-compose.yml + service: risingwave-standalone + healthcheck: + test: + - CMD-SHELL + - bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;' + interval: 1s + timeout: 30s + environment: + - AWS_REGION=us-east-1 + links: + - minio-0:icebergdata.minio-0 + networks: + iceberg_net: + + minio-0: + extends: + file: ../../../../docker/docker-compose.yml + service: minio-0 + entrypoint: " + /bin/sh -c ' + + set -e + + mkdir -p \"/data/icebergdata/demo\" + mkdir -p \"/data/hummock001\" + + /usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\" + + '" + networks: + iceberg_net: + + etcd-0: + extends: + file: ../../../../docker/docker-compose.yml + service: etcd-0 + networks: + iceberg_net: + +volumes: + risingwave-standalone: + external: false + etcd-0: + external: false + minio-0: + external: false + +networks: + iceberg_net: + name: iceberg \ No newline at end of file diff --git a/integration_tests/iceberg-source/docker/jdbc/spark-script/spark-connect-server.sh b/integration_tests/iceberg-source/docker/jdbc/spark-script/spark-connect-server.sh new file mode 100755 index 0000000000000..8c3f752dc6414 --- /dev/null +++ b/integration_tests/iceberg-source/docker/jdbc/spark-script/spark-connect-server.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -ex + +JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':') + +/opt/spark/sbin/start-connect-server.sh \ + --master local[3] \ + --driver-class-path $JARS \ + --conf spark.driver.bindAddress=0.0.0.0 \ + --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \ + --conf spark.sql.catalog.demo.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ + --conf spark.sql.catalog.demo.warehouse=s3://icebergdata/demo \ + --conf spark.sql.catalog.demo.uri=jdbc:postgresql://postgres:5432/iceberg \ + --conf spark.sql.catalog.demo.jdbc.user=admin \ + --conf spark.sql.catalog.demo.jdbc.password=123456 \ + --conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \ + --conf spark.sql.catalog.demo.s3.path.style.access=true \ + --conf spark.sql.catalog.demo.s3.access.key=hummockadmin \ + --conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \ + --conf spark.sql.defaultCatalog=demo + +tail -f /opt/spark/logs/spark*.out \ No newline at end of file diff --git a/integration_tests/iceberg-source/python/main.py b/integration_tests/iceberg-source/python/main.py index bca0c828df1d5..ebfd6a6c468f5 100644 --- a/integration_tests/iceberg-source/python/main.py +++ b/integration_tests/iceberg-source/python/main.py @@ -113,7 +113,7 @@ def run_case(case): if __name__ == "__main__": - case_names = ["hive", "rest", "storage"] + case_names = ["jdbc", "hive", "rest", "storage"] for case_name in case_names: print(f"Running test case: {case_name}") run_case(case_name) diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index cee743a827c16..8b7f0e696e95d 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -56,6 +56,11 @@ pub struct IcebergProperties { pub database_name: Option, #[serde(rename = "table.name")] pub table_name: String, + // For jdbc catalog + #[serde(rename = "catalog.jdbc.user")] + pub jdbc_user: Option, + #[serde(rename = "catalog.jdbc.password")] + pub jdbc_password: Option, #[serde(flatten)] pub unknown_fields: HashMap, @@ -63,6 +68,13 @@ pub struct IcebergProperties { impl IcebergProperties { pub fn to_iceberg_config(&self) -> IcebergConfig { + let mut java_catalog_props = HashMap::new(); + if let Some(jdbc_user) = self.jdbc_user.clone() { + java_catalog_props.insert("jdbc.user".to_string(), jdbc_user); + } + if let Some(jdbc_password) = self.jdbc_password.clone() { + java_catalog_props.insert("jdbc.password".to_string(), jdbc_password); + } IcebergConfig { catalog_name: self.catalog_name.clone(), database_name: self.database_name.clone(), @@ -74,6 +86,7 @@ impl IcebergProperties { access_key: self.s3_access.clone(), secret_key: self.s3_secret.clone(), region: self.region.clone(), + java_catalog_props, ..Default::default() } } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index abce71deba567..29055d068294c 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -68,6 +68,12 @@ IcebergProperties: - name: table.name field_type: String required: true + - name: catalog.jdbc.user + field_type: String + required: false + - name: catalog.jdbc.password + field_type: String + required: false KafkaProperties: fields: - name: bytes.per.second