diff --git a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt index 832a7b781f7fb..ddfc07220f2b3 100644 --- a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt @@ -36,6 +36,7 @@ WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', + s3.path.style.access = 'true', catalog.type = 'storage', warehouse.path = 's3a://icebergdata/demo', database.name = 'demo_db', diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index 37d4e5e6f5a08..bf96e474eee80 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -15,7 +15,6 @@ mod jni_catalog; mod mock_catalog; mod storage_catalog; - use std::collections::HashMap; use std::sync::Arc; @@ -30,6 +29,7 @@ use serde_with::serde_as; use url::Url; use with_options::WithOptions; +use crate::deserialize_optional_bool_from_string; use crate::error::ConnectorResult; #[serde_as] @@ -62,6 +62,13 @@ pub struct IcebergCommon { /// Full name of table, must include schema name. #[serde(rename = "table.name")] pub table_name: String, + + #[serde( + rename = "s3.path.style.access", + default, + deserialize_with = "deserialize_optional_bool_from_string" + )] + pub path_style_access: Option, } impl IcebergCommon { @@ -79,7 +86,6 @@ impl IcebergCommon { /// For both V1 and V2. fn build_jni_catalog_configs( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult<(BaseCatalogConfig, HashMap)> { let mut iceberg_configs = HashMap::new(); @@ -193,7 +199,7 @@ impl IcebergCommon { self.secret_key.clone().to_string(), ); - if let Some(path_style_access) = path_style_access { + if let Some(path_style_access) = self.path_style_access { java_catalog_configs.insert( "s3.path-style-access".to_string(), path_style_access.to_string(), @@ -247,10 +253,7 @@ mod v1 { Ok(ret.context("Failed to create table identifier")?) } - fn build_iceberg_configs( - &self, - path_style_access: &Option, - ) -> ConnectorResult> { + fn build_iceberg_configs(&self) -> ConnectorResult> { let mut iceberg_configs = HashMap::new(); let catalog_type = self.catalog_type().to_string(); @@ -303,7 +306,7 @@ mod v1 { "iceberg.table.io.secret_access_key".to_string(), self.secret_key.clone().to_string(), ); - if let Some(path_style_access) = path_style_access { + if let Some(path_style_access) = self.path_style_access { iceberg_configs.insert( "iceberg.table.io.enable_virtual_host_style".to_string(), (!path_style_access).to_string(), @@ -345,12 +348,11 @@ mod v1 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn create_catalog( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { match self.catalog_type() { "storage" | "rest" => { - let iceberg_configs = self.build_iceberg_configs(path_style_access)?; + let iceberg_configs = self.build_iceberg_configs()?; let catalog = load_catalog(&iceberg_configs).await?; Ok(catalog) } @@ -361,7 +363,7 @@ mod v1 { { // Create java catalog let (base_catalog_config, java_catalog_props) = - self.build_jni_catalog_configs(path_style_access, java_catalog_props)?; + self.build_jni_catalog_configs(java_catalog_props)?; let catalog_impl = match catalog_type { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", @@ -389,11 +391,10 @@ mod v1 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn load_table( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { let catalog = self - .create_catalog(path_style_access, java_catalog_props) + .create_catalog(java_catalog_props) .await .context("Unable to load iceberg catalog")?; @@ -429,7 +430,6 @@ mod v2 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn create_catalog_v2( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult> { match self.catalog_type() { @@ -515,7 +515,7 @@ mod v2 { catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { // Create java catalog let (base_catalog_config, java_catalog_props) = - self.build_jni_catalog_configs(path_style_access, java_catalog_props)?; + self.build_jni_catalog_configs(java_catalog_props)?; let catalog_impl = match catalog_type { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", @@ -541,11 +541,10 @@ mod v2 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn load_table_v2( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { let catalog = self - .create_catalog_v2(path_style_access, java_catalog_props) + .create_catalog_v2(java_catalog_props) .await .context("Unable to load iceberg catalog")?; @@ -559,7 +558,6 @@ mod v2 { pub async fn load_table_v2_with_metadata( &self, metadata: TableMetadata, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { match self.catalog_type() { @@ -585,10 +583,7 @@ mod v2 { .readonly(true) .build()?) } - _ => { - self.load_table_v2(path_style_access, java_catalog_props) - .await - } + _ => self.load_table_v2(java_catalog_props).await, } } } diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index d19ea5a3c0f84..e2f00afd5b525 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -64,10 +64,7 @@ use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::writer::SinkWriter; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; -use crate::{ - deserialize_bool_from_string, deserialize_optional_bool_from_string, - deserialize_optional_string_seq_from_string, -}; +use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string}; pub const ICEBERG_SINK: &str = "iceberg"; @@ -82,13 +79,6 @@ pub struct IcebergConfig { #[serde(flatten)] common: IcebergCommon, - #[serde( - rename = "s3.path.style.access", - default, - deserialize_with = "deserialize_optional_bool_from_string" - )] - pub path_style_access: Option, - #[serde( rename = "primary_key", default, @@ -175,21 +165,21 @@ impl IcebergConfig { pub async fn create_catalog(&self) -> Result { self.common - .create_catalog(&self.path_style_access, &self.java_catalog_props) + .create_catalog(&self.java_catalog_props) .await .map_err(Into::into) } pub async fn load_table(&self) -> Result
{ self.common - .load_table(&self.path_style_access, &self.java_catalog_props) + .load_table(&self.java_catalog_props) .await .map_err(Into::into) } pub async fn create_catalog_v2(&self) -> Result> { self.common - .create_catalog_v2(&self.path_style_access, &self.java_catalog_props) + .create_catalog_v2(&self.java_catalog_props) .await .map_err(Into::into) } @@ -1018,10 +1008,10 @@ mod test { catalog_name: Some("demo".to_string()), database_name: Some("demo_db".to_string()), table_name: "demo_table".to_string(), + path_style_access: Some(true), }, r#type: "upsert".to_string(), force_append_only: false, - path_style_access: Some(true), primary_key: Some(vec!["v1".to_string()]), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] .into_iter() diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index cdeab00649187..60a26e43e1d31 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -65,8 +65,8 @@ impl IcebergProperties { if let Some(jdbc_password) = self.jdbc_password.clone() { java_catalog_props.insert("jdbc.password".to_string(), jdbc_password); } - // TODO: support path_style_access and java_catalog_props for iceberg source - self.common.load_table_v2(&None, &java_catalog_props).await + // TODO: support java_catalog_props for iceberg source + self.common.load_table_v2(&java_catalog_props).await } pub async fn load_table_v2_with_metadata( @@ -82,7 +82,7 @@ impl IcebergProperties { } // TODO: support path_style_access and java_catalog_props for iceberg source self.common - .load_table_v2_with_metadata(table_meta, &None, &java_catalog_props) + .load_table_v2_with_metadata(table_meta, &java_catalog_props) .await } } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c4d9604a8f053..dc8d31d281be9 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -117,6 +117,10 @@ IcebergProperties: field_type: String comments: Full name of table, must include schema name. required: true + - name: s3.path.style.access + field_type: bool + required: false + default: Default::default - name: catalog.jdbc.user field_type: String required: false