Skip to content

Commit

Permalink
feat(batch): support jdbc catalog for iceberg source (#15551)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 8, 2024
1 parent 3276e58 commit b4d8d16
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 1 deletion.
20 changes: 20 additions & 0 deletions integration_tests/iceberg-source/docker/jdbc/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[risingwave]
db=dev
user=root
host=127.0.0.1
port=4566

[source]
connector = iceberg
warehouse.path = s3://icebergdata/demo
s3.endpoint=http://minio-0:9301
s3.access.key = hummockadmin
s3.secret.key = hummockadmin
s3.region = ap-southeast-1
catalog.name = demo
catalog.type = jdbc
catalog.uri = jdbc:postgresql://postgres:5432/iceberg
catalog.jdbc.user = admin
catalog.jdbc.password = 123456
database.name=s1
table.name=t1
96 changes: 96 additions & 0 deletions integration_tests/iceberg-source/docker/jdbc/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
version: '3.8'

services:
postgres:
image: postgres:16.1
environment:
POSTGRES_USER: admin
POSTGRES_PASSWORD: 123456
POSTGRES_DB: iceberg
expose:
- 5432
ports:
- "5432:5432"
networks:
iceberg_net:

spark:
depends_on:
- minio-0
- postgres
image: ghcr.io/icelake-io/icelake-spark:0.1
environment:
- AWS_ACCESS_KEY_ID=hummockadmin
- AWS_SECRET_ACCESS_KEY=hummockadmin
- AWS_REGION=us-east-1
- SPARK_HOME=/opt/spark
- PYSPARK_PYTHON=/usr/bin/python3.9
- PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/spark/bin:/opt/spark/sbin
user: root
networks:
iceberg_net:
links:
- minio-0:icebergdata.minio-0
expose:
- 15002
healthcheck:
test: netstat -ltn | grep -c 15002
interval: 1s
retries: 1200
volumes:
- ./spark-script:/spark-script
entrypoint: [ "/spark-script/spark-connect-server.sh" ]

risingwave-standalone:
extends:
file: ../../../../docker/docker-compose.yml
service: risingwave-standalone
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/4566; exit $$?;'
interval: 1s
timeout: 30s
environment:
- AWS_REGION=us-east-1
links:
- minio-0:icebergdata.minio-0
networks:
iceberg_net:

minio-0:
extends:
file: ../../../../docker/docker-compose.yml
service: minio-0
entrypoint: "
/bin/sh -c '
set -e
mkdir -p \"/data/icebergdata/demo\"
mkdir -p \"/data/hummock001\"
/usr/bin/docker-entrypoint.sh \"$$0\" \"$$@\"
'"
networks:
iceberg_net:

etcd-0:
extends:
file: ../../../../docker/docker-compose.yml
service: etcd-0
networks:
iceberg_net:

volumes:
risingwave-standalone:
external: false
etcd-0:
external: false
minio-0:
external: false

networks:
iceberg_net:
name: iceberg
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

set -ex

JARS=$(find /opt/spark/deps -type f -name "*.jar" | tr '\n' ':')

/opt/spark/sbin/start-connect-server.sh \
--master local[3] \
--driver-class-path $JARS \
--conf spark.driver.bindAddress=0.0.0.0 \
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
--conf spark.sql.catalog.demo.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.demo.warehouse=s3://icebergdata/demo \
--conf spark.sql.catalog.demo.uri=jdbc:postgresql://postgres:5432/iceberg \
--conf spark.sql.catalog.demo.jdbc.user=admin \
--conf spark.sql.catalog.demo.jdbc.password=123456 \
--conf spark.sql.catalog.demo.s3.endpoint=http://minio-0:9301 \
--conf spark.sql.catalog.demo.s3.path.style.access=true \
--conf spark.sql.catalog.demo.s3.access.key=hummockadmin \
--conf spark.sql.catalog.demo.s3.secret.key=hummockadmin \
--conf spark.sql.defaultCatalog=demo

tail -f /opt/spark/logs/spark*.out
2 changes: 1 addition & 1 deletion integration_tests/iceberg-source/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def run_case(case):


if __name__ == "__main__":
case_names = ["hive", "rest", "storage"]
case_names = ["jdbc", "hive", "rest", "storage"]
for case_name in case_names:
print(f"Running test case: {case_name}")
run_case(case_name)
13 changes: 13 additions & 0 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,25 @@ pub struct IcebergProperties {
pub database_name: Option<String>,
#[serde(rename = "table.name")]
pub table_name: String,
// For jdbc catalog
#[serde(rename = "catalog.jdbc.user")]
pub jdbc_user: Option<String>,
#[serde(rename = "catalog.jdbc.password")]
pub jdbc_password: Option<String>,

#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}

impl IcebergProperties {
pub fn to_iceberg_config(&self) -> IcebergConfig {
let mut java_catalog_props = HashMap::new();
if let Some(jdbc_user) = self.jdbc_user.clone() {
java_catalog_props.insert("jdbc.user".to_string(), jdbc_user);
}
if let Some(jdbc_password) = self.jdbc_password.clone() {
java_catalog_props.insert("jdbc.password".to_string(), jdbc_password);
}
IcebergConfig {
catalog_name: self.catalog_name.clone(),
database_name: self.database_name.clone(),
Expand All @@ -74,6 +86,7 @@ impl IcebergProperties {
access_key: self.s3_access.clone(),
secret_key: self.s3_secret.clone(),
region: self.region.clone(),
java_catalog_props,
..Default::default()
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ IcebergProperties:
- name: table.name
field_type: String
required: true
- name: catalog.jdbc.user
field_type: String
required: false
- name: catalog.jdbc.password
field_type: String
required: false
KafkaProperties:
fields:
- name: bytes.per.second
Expand Down

0 comments on commit b4d8d16

Please sign in to comment.