From 0595aa1d901b2f99811ba1e7e40161b395ddd6a4 Mon Sep 17 00:00:00 2001 From: ka-weihe Date: Mon, 19 Aug 2024 18:44:08 +0200 Subject: [PATCH] feat(connector): add support for path-style access in Iceberg sink connector (#17747) Co-authored-by: ka-weihe Co-authored-by: lmatz --- src/connector/src/sink/iceberg/mod.rs | 22 +++++++++++++++++++++- src/connector/with_options_sink.yaml | 4 ++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 540fea13b6c03..2274b5805d81d 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -123,6 +123,13 @@ pub struct IcebergConfig { #[serde(rename = "s3.secret.key")] pub secret_key: String, + #[serde( + rename = "s3.path.style.access", + default, + deserialize_with = "deserialize_bool_from_string" + )] + pub path_style_access: bool, + #[serde( rename = "primary_key", default, @@ -270,6 +277,10 @@ impl IcebergConfig { "iceberg.table.io.secret_access_key".to_string(), self.secret_key.clone().to_string(), ); + iceberg_configs.insert( + "iceberg.table.io.enable_virtual_host_style".to_string(), + (!self.path_style_access).to_string(), + ); let (bucket, root) = { let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; @@ -409,7 +420,10 @@ impl IcebergConfig { "s3.secret-access-key".to_string(), self.secret_key.clone().to_string(), ); - + java_catalog_configs.insert( + "s3.path-style-access".to_string(), + self.path_style_access.to_string(), + ); if matches!(self.catalog_type.as_deref(), Some("glue")) { java_catalog_configs.insert( "client.credentials-provider".to_string(), @@ -1286,6 +1300,7 @@ mod test { ("s3.endpoint", "http://127.0.0.1:9301"), ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), + ("s3.path.style.access", "true"), ("s3.region", "us-east-1"), ("catalog.type", "jdbc"), ("catalog.name", "demo"), @@ -1315,6 +1330,7 @@ mod test { endpoint: Some("http://127.0.0.1:9301".to_string()), access_key: "hummockadmin".to_string(), secret_key: "hummockadmin".to_string(), + path_style_access: true, primary_key: Some(vec!["v1".to_string()]), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] .into_iter() @@ -1350,6 +1366,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "storage"), ("warehouse.path", "s3://icebergdata/demo"), @@ -1374,6 +1391,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "rest"), ("catalog.uri", "http://192.168.167.4:8181"), @@ -1399,6 +1417,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "jdbc"), ("catalog.uri", "jdbc:postgresql://localhost:5432/iceberg"), @@ -1426,6 +1445,7 @@ mod test { ("s3.access.key", "hummockadmin"), ("s3.secret.key", "hummockadmin"), ("s3.region", "us-east-1"), + ("s3.path.style.access", "true"), ("catalog.name", "demo"), ("catalog.type", "hive"), ("catalog.uri", "thrift://localhost:9083"), diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 653acaadaaaf1..36fa2559ab1af 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -272,6 +272,10 @@ IcebergConfig: - name: s3.secret.key field_type: String required: true + - name: s3.path.style.access + field_type: bool + required: false + default: Default::default - name: primary_key field_type: Vec required: false