Skip to content

Commit

Permalink
Fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Jan 17, 2024
1 parent bcb7e38 commit 79ff68a
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 12 deletions.
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/cdc/load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ CREATE SINK s1 AS select * from products WITH (
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_sink_append_only.slt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ CREATE SINK s6 AS select * from mv6 WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/iceberg/test_case/iceberg_sink_upsert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ CREATE SINK s6 AS select mv6.id as id, mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3
connector = 'iceberg',
type = 'upsert',
force_append_only = 'false',
database.name = 'demo',
table.name = 'demo_db.demo_table',
database.name = 'demo_db',
table.name = 'demo_table',
catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
Expand Down
4 changes: 2 additions & 2 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ CREATE SINK s6 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH
s3.secret.key = 'hummockadmin',
s3.region = 'us-east-1',
catalog.type = 'storage',
database.name='demo',
table.name='demo_db.demo_table'
database.name='demo_db',
table.name='demo_table'
);

statement ok
Expand Down
10 changes: 6 additions & 4 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg".
pub const ICEBERG_SINK: &str = "iceberg";

static RW_CATALOG_NAME: &str = "risingwave";

#[derive(Debug, Clone, Deserialize, WithOptions)]
#[serde(deny_unknown_fields)]
pub struct IcebergConfig {
Expand All @@ -75,7 +77,7 @@ pub struct IcebergConfig {
pub table_name: String, // Full name of table, must include schema name

#[serde(rename = "database.name")]
pub database_name: String, // Use as catalog name.
pub database_name: String, // Database name of table

// Catalog type supported by iceberg, such as "storage", "rest".
// If not set, we use "storage" as default.
Expand Down Expand Up @@ -181,20 +183,20 @@ impl IcebergConfig {
let catalog_type = self.catalog_type().to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(CATALOG_NAME.to_string(), "risingwave".to_string());
iceberg_configs.insert(CATALOG_NAME.to_string(), RW_CATALOG_NAME.to_string());

match catalog_type.as_str() {
"storage" => {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
format!("iceberg.catalog.{}.warehouse", RW_CATALOG_NAME),
self.path.clone(),
);
}
"rest" => {
let uri = self.uri.clone().ok_or_else(|| {
SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog"))
})?;
iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.database_name), uri);
iceberg_configs.insert(format!("iceberg.catalog.{}.uri", RW_CATALOG_NAME), uri);
}
_ => {
return Err(SinkError::Iceberg(anyhow!(
Expand Down

0 comments on commit 79ff68a

Please sign in to comment.