diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index bf96e474eee80..c7c2815ed1206 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -49,7 +49,7 @@ pub struct IcebergCommon { pub secret_key: String, /// Path of iceberg warehouse, only applicable in storage catalog. #[serde(rename = "warehouse.path")] - pub warehouse_path: String, + pub warehouse_path: Option, /// Catalog name, can be omitted for storage catalog, but /// must be set for other catalogs. #[serde(rename = "catalog.name")] @@ -142,23 +142,31 @@ impl IcebergCommon { self.secret_key.clone().to_string(), ); - let (bucket, _) = { - let url = Url::parse(&self.warehouse_path) - .with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?; - let bucket = url - .host_str() - .with_context(|| { - format!( - "Invalid s3 path: {}, bucket is missing", - self.warehouse_path - ) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) - }; + match &self.warehouse_path { + Some(warehouse_path) => { + let (bucket, _) = { + let url = Url::parse(warehouse_path).with_context(|| { + format!("Invalid warehouse path: {}", warehouse_path) + })?; + let bucket = url + .host_str() + .with_context(|| { + format!("Invalid s3 path: {}, bucket is missing", warehouse_path) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + } + None => { + if catalog_type != "rest" { + bail!("`warehouse.path` must be set in {} catalog", &catalog_type); + } + } + } - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); // #TODO // Support load config file iceberg_configs.insert( @@ -176,7 +184,9 @@ impl IcebergCommon { java_catalog_configs.insert("uri".to_string(), uri.to_string()); } - java_catalog_configs.insert("warehouse".to_string(), self.warehouse_path.clone()); + if let Some(warehouse_path) = &self.warehouse_path { + java_catalog_configs.insert("warehouse".to_string(), warehouse_path.clone()); + } java_catalog_configs.extend(java_catalog_props.clone()); // Currently we only support s3, so let's set it to s3 @@ -237,6 +247,7 @@ impl IcebergCommon { /// icelake mod v1 { + use anyhow::anyhow; use icelake::catalog::{load_catalog, CatalogRef}; use icelake::{Table, TableIdentifier}; @@ -265,7 +276,9 @@ mod v1 { "storage" => { iceberg_configs.insert( format!("iceberg.catalog.{}.warehouse", self.catalog_name()), - self.warehouse_path.clone(), + self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in storage catalog") + })?, ); } "rest" => { @@ -313,28 +326,36 @@ mod v1 { ); } - let (bucket, root) = { - let url = Url::parse(&self.warehouse_path) - .with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?; - let bucket = url - .host_str() - .with_context(|| { - format!( - "Invalid s3 path: {}, bucket is missing", - self.warehouse_path - ) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) - }; + match &self.warehouse_path { + Some(warehouse_path) => { + let (bucket, root) = { + let url = Url::parse(warehouse_path).with_context(|| { + format!("Invalid warehouse path: {}", warehouse_path) + })?; + let bucket = url + .host_str() + .with_context(|| { + format!("Invalid s3 path: {}, bucket is missing", warehouse_path) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); - // Only storage catalog should set this. - if catalog_type == "storage" { - iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + // Only storage catalog should set this. + if catalog_type == "storage" { + iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + } + } + None => { + if catalog_type == "storage" { + bail!("`warehouse.path` must be set in storage catalog"); + } + } } + // #TODO // Support load config file iceberg_configs.insert( @@ -409,6 +430,7 @@ mod v1 { /// iceberg-rust mod v2 { + use anyhow::anyhow; use iceberg::spec::TableMetadata; use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, TableIdent}; @@ -435,7 +457,9 @@ mod v2 { match self.catalog_type() { "storage" => { let config = storage_catalog::StorageCatalogConfig::builder() - .warehouse(self.warehouse_path.clone()) + .warehouse(self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in storage catalog") + })?) .access_key(self.access_key.clone()) .secret_key(self.secret_key.clone()) .region(self.region.clone()) @@ -461,12 +485,18 @@ mod v2 { S3_SECRET_ACCESS_KEY.to_string(), self.secret_key.clone().to_string(), ); - let config = iceberg_catalog_rest::RestCatalogConfig::builder() + let config_builder = iceberg_catalog_rest::RestCatalogConfig::builder() .uri(self.catalog_uri.clone().with_context(|| { "`catalog.uri` must be set in rest catalog".to_string() })?) - .props(iceberg_configs) - .build(); + .props(iceberg_configs); + + let config = match &self.warehouse_path { + Some(warehouse_path) => { + config_builder.warehouse(warehouse_path.clone()).build() + } + None => config_builder.build(), + }; let catalog = iceberg_catalog_rest::RestCatalog::new(config); Ok(Arc::new(catalog)) } @@ -502,7 +532,9 @@ mod v2 { self.secret_key.clone().to_string(), ); let config_builder = iceberg_catalog_glue::GlueCatalogConfig::builder() - .warehouse(self.warehouse_path.clone()) + .warehouse(self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in glue catalog") + })?) .props(iceberg_configs); let config = if let Some(uri) = self.catalog_uri.as_deref() { config_builder.uri(uri.to_string()).build() @@ -563,7 +595,9 @@ mod v2 { match self.catalog_type() { "storage" => { let config = storage_catalog::StorageCatalogConfig::builder() - .warehouse(self.warehouse_path.clone()) + .warehouse(self.warehouse_path.clone().ok_or_else(|| { + anyhow!("`warehouse.path` must be set in storage catalog") + })?) .access_key(self.access_key.clone()) .secret_key(self.secret_key.clone()) .region(self.region.clone()) diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index e2f00afd5b525..0c878ae1ba6d6 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -281,18 +281,26 @@ impl IcebergSink { let location = { let mut names = namespace.clone().inner(); names.push(self.config.common.table_name.to_string()); - if self.config.common.warehouse_path.ends_with('/') { - format!("{}{}", self.config.common.warehouse_path, names.join("/")) - } else { - format!("{}/{}", self.config.common.warehouse_path, names.join("/")) + match &self.config.common.warehouse_path { + Some(warehouse_path) => { + if warehouse_path.ends_with('/') { + Some(format!("{}{}", warehouse_path, names.join("/"))) + } else { + Some(format!("{}/{}", warehouse_path, names.join("/"))) + } + } + None => None, } }; - let table_creation = TableCreation::builder() + let table_creation_builder = TableCreation::builder() .name(self.config.common.table_name.clone()) - .schema(iceberg_schema) - .location(location) - .build(); + .schema(iceberg_schema); + + let table_creation = match location { + Some(location) => table_creation_builder.location(location).build(), + None => table_creation_builder.build(), + }; catalog .create_table(&namespace, table_creation) @@ -998,7 +1006,7 @@ mod test { let expected_iceberg_config = IcebergConfig { common: IcebergCommon { - warehouse_path: "s3://iceberg".to_string(), + warehouse_path: Some("s3://iceberg".to_string()), catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()), region: Some("us-east-1".to_string()), endpoint: Some("http://127.0.0.1:9301".to_string()), diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index eb03288dfbcca..e9c0c79ec1c30 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -394,7 +394,7 @@ IcebergConfig: - name: warehouse.path field_type: String comments: Path of iceberg warehouse, only applicable in storage catalog. - required: true + required: false - name: catalog.name field_type: String comments: |- diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index dc8d31d281be9..972c027d8f0bd 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -99,7 +99,7 @@ IcebergProperties: - name: warehouse.path field_type: String comments: Path of iceberg warehouse, only applicable in storage catalog. - required: true + required: false - name: catalog.name field_type: String comments: |-