Skip to content

Commit

Permalink
Catalog name optional
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Feb 27, 2024
1 parent 64e20c4 commit 490090c
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,13 @@ pub struct IcebergConfig {
pub table_name: String, // Full name of table, must include schema name

#[serde(rename = "database.name")]
pub database_name: Option<String>, // Database name of table
pub database_name: Option<String>,
// Database name of table

// Catalog name, can be omitted for storage catalog, but
// must be set for other catalogs.
#[serde(rename = "catalog.name")]
pub catalog_name: String, // Catalog name.
pub catalog_name: Option<String>,

// Catalog type supported by iceberg, such as "storage", "rest".
// If not set, we use "storage" as default.
Expand Down Expand Up @@ -167,6 +170,12 @@ impl IcebergConfig {
}
}

if config.catalog_name.is_none() && config.catalog_type.as_deref() != Some("storage") {
return Err(SinkError::Config(anyhow!(
"catalog.name must be set for non-storage catalog"
)));
}

// All configs start with "catalog." will be treated as java configs.
config.java_catalog_props = values
.iter()
Expand All @@ -186,6 +195,13 @@ impl IcebergConfig {
self.catalog_type.as_deref().unwrap_or("storage")
}

fn catalog_name(&self) -> String {
self.catalog_name
.as_ref()
.map(|s| s.to_string())
.unwrap_or_else(|| "risingwave".to_string())
}

fn full_table_name(&self) -> Result<TableIdentifier> {
let ret = if let Some(database_name) = &self.database_name {
TableIdentifier::new(vec![database_name, &self.table_name])
Expand All @@ -203,23 +219,20 @@ impl IcebergConfig {
let catalog_type = self.catalog_type().to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name());

match catalog_type.as_str() {
"storage" => {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.catalog_name.clone()),
format!("iceberg.catalog.{}.warehouse", self.catalog_name()),
self.path.clone(),
);
}
"rest" => {
let uri = self.uri.clone().ok_or_else(|| {
SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog"))
})?;
iceberg_configs.insert(
format!("iceberg.catalog.{}.uri", self.catalog_name.clone()),
uri,
);
iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri);
}
_ => {
return Err(SinkError::Iceberg(anyhow!(
Expand Down Expand Up @@ -287,7 +300,7 @@ impl IcebergConfig {
let catalog_type = self.catalog_type().to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name());

if let Some(region) = &self.region {
iceberg_configs.insert(
Expand Down Expand Up @@ -390,15 +403,15 @@ impl IcebergConfig {

jni_catalog::JniCatalog::build(
base_catalog_config,
&self.catalog_name,
self.catalog_name(),
catalog_impl,
java_catalog_props,
)
}
"mock" => Ok(Arc::new(MockCatalog {})),
_ => {
bail!(
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `sql`, `glue`, `dynamodb`",
"Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`",
self.catalog_type()
)
}
Expand Down Expand Up @@ -1064,7 +1077,7 @@ mod test {
force_append_only: false,
table_name: "demo_table".to_string(),
database_name: Some("demo_db".to_string()),
catalog_name: "demo".to_string(),
catalog_name: Some("demo".to_string()),
catalog_type: Some("jdbc".to_string()),
path: "s3://iceberg".to_string(),
uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()),
Expand Down

0 comments on commit 490090c

Please sign in to comment.