diff --git a/e2e_test/iceberg/test_case/cdc/load.slt b/e2e_test/iceberg/test_case/cdc/load.slt index 2ac8ab2d61f25..de9c4dc1766c6 100644 --- a/e2e_test/iceberg/test_case/cdc/load.slt +++ b/e2e_test/iceberg/test_case/cdc/load.slt @@ -24,8 +24,8 @@ CREATE SINK s1 AS select * from products WITH ( connector = 'iceberg', type = 'upsert', force_append_only = 'false', - database.name = 'demo', - table.name = 'demo_db.demo_table', + database.name = 'demo_db', + table.name = 'demo_table', catalog.type = 'storage', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt b/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt index 5f847eaa30a7e..dff6737057363 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_append_only.slt @@ -23,8 +23,8 @@ CREATE SINK s6 AS select * from mv6 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', - database.name = 'demo', - table.name = 'demo_db.demo_table', + database.name = 'demo_db', + table.name = 'demo_table', catalog.type = 'storage', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', diff --git a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt index 646a39cc08e28..2e8ce54e1c742 100644 --- a/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt +++ b/e2e_test/iceberg/test_case/iceberg_sink_upsert.slt @@ -12,8 +12,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 connector = 'iceberg', type = 'upsert', force_append_only = 'false', - database.name = 'demo', - table.name = 'demo_db.demo_table', + database.name = 'demo_db', + table.name = 'demo_table', catalog.type = 'storage', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', diff --git a/e2e_test/sink/iceberg_sink.slt b/e2e_test/sink/iceberg_sink.slt index 9e7bef8d12239..dbc3163b70585 100644 --- a/e2e_test/sink/iceberg_sink.slt +++ b/e2e_test/sink/iceberg_sink.slt @@ -15,8 +15,8 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH s3.secret.key = 'hummockadmin', s3.region = 'us-east-1', catalog.type = 'storage', - database.name='demo', - table.name='demo_db.demo_table' + database.name='demo_db', + table.name='demo_table' ); statement ok diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index a54ac28f8c53f..4412388314cf5 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -61,6 +61,8 @@ use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; /// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; +static RW_CATALOG_NAME: &str = "risingwave"; + #[derive(Debug, Clone, Deserialize, WithOptions)] #[serde(deny_unknown_fields)] pub struct IcebergConfig { @@ -75,7 +77,7 @@ pub struct IcebergConfig { pub table_name: String, // Full name of table, must include schema name #[serde(rename = "database.name")] - pub database_name: String, // Use as catalog name. + pub database_name: String, // Database name of table // Catalog type supported by iceberg, such as "storage", "rest". // If not set, we use "storage" as default. @@ -181,12 +183,12 @@ impl IcebergConfig { let catalog_type = self.catalog_type().to_string(); iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); - iceberg_configs.insert(CATALOG_NAME.to_string(), "risingwave".to_string()); + iceberg_configs.insert(CATALOG_NAME.to_string(), RW_CATALOG_NAME.to_string()); match catalog_type.as_str() { "storage" => { iceberg_configs.insert( - format!("iceberg.catalog.{}.warehouse", self.database_name), + format!("iceberg.catalog.{}.warehouse", RW_CATALOG_NAME), self.path.clone(), ); } @@ -194,7 +196,7 @@ impl IcebergConfig { let uri = self.uri.clone().ok_or_else(|| { SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) })?; - iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.database_name), uri); + iceberg_configs.insert(format!("iceberg.catalog.{}.uri", RW_CATALOG_NAME), uri); } _ => { return Err(SinkError::Iceberg(anyhow!(