diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index dc8cc77e010d1..ce4269851cec7 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -80,10 +80,13 @@ pub struct IcebergConfig { pub table_name: String, // Full name of table, must include schema name #[serde(rename = "database.name")] - pub database_name: Option, // Database name of table + pub database_name: Option, + // Database name of table + // Catalog name, can be omitted for storage catalog, but + // must be set for other catalogs. #[serde(rename = "catalog.name")] - pub catalog_name: String, // Catalog name. + pub catalog_name: Option, // Catalog type supported by iceberg, such as "storage", "rest". // If not set, we use "storage" as default. @@ -167,6 +170,12 @@ impl IcebergConfig { } } + if config.catalog_name.is_none() && config.catalog_type.as_deref() != Some("storage") { + return Err(SinkError::Config(anyhow!( + "catalog.name must be set for non-storage catalog" + ))); + } + // All configs start with "catalog." will be treated as java configs. config.java_catalog_props = values .iter() @@ -186,6 +195,13 @@ impl IcebergConfig { self.catalog_type.as_deref().unwrap_or("storage") } + fn catalog_name(&self) -> String { + self.catalog_name + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(|| "risingwave".to_string()) + } + fn full_table_name(&self) -> Result { let ret = if let Some(database_name) = &self.database_name { TableIdentifier::new(vec![database_name, &self.table_name]) @@ -203,12 +219,12 @@ impl IcebergConfig { let catalog_type = self.catalog_type().to_string(); iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); - iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name.clone()); + iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); match catalog_type.as_str() { "storage" => { iceberg_configs.insert( - format!("iceberg.catalog.{}.warehouse", self.catalog_name.clone()), + format!("iceberg.catalog.{}.warehouse", self.catalog_name()), self.path.clone(), ); } @@ -216,10 +232,7 @@ impl IcebergConfig { let uri = self.uri.clone().ok_or_else(|| { SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) })?; - iceberg_configs.insert( - format!("iceberg.catalog.{}.uri", self.catalog_name.clone()), - uri, - ); + iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri); } _ => { return Err(SinkError::Iceberg(anyhow!( @@ -287,7 +300,7 @@ impl IcebergConfig { let catalog_type = self.catalog_type().to_string(); iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); - iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name.clone()); + iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); if let Some(region) = &self.region { iceberg_configs.insert( @@ -390,7 +403,7 @@ impl IcebergConfig { jni_catalog::JniCatalog::build( base_catalog_config, - &self.catalog_name, + self.catalog_name(), catalog_impl, java_catalog_props, ) @@ -398,7 +411,7 @@ impl IcebergConfig { "mock" => Ok(Arc::new(MockCatalog {})), _ => { bail!( - "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`", + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`", self.catalog_type() ) } @@ -1064,7 +1077,7 @@ mod test { force_append_only: false, table_name: "demo_table".to_string(), database_name: Some("demo_db".to_string()), - catalog_name: "demo".to_string(), + catalog_name: Some("demo".to_string()), catalog_type: Some("jdbc".to_string()), path: "s3://iceberg".to_string(), uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()),