diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/iceberg_sink_v2.slt index 49f6d570532e2..126ed4bbf4f4b 100644 --- a/e2e_test/iceberg/iceberg_sink_v2.slt +++ b/e2e_test/iceberg/iceberg_sink_v2.slt @@ -23,14 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', + database.name = 'demo', table.name = 'demo_db.demo_table', - iceberg.catalog.type = 'storage', - iceberg.catalog.name = 'demo', - iceberg.catalog.demo.warehouse = 's3://icebergdata/demo', - iceberg.table.io.endpoint = 'http://127.0.0.1:9301', - iceberg.table.io.access_key_id = 'admin', - iceberg.table.io.secret_access_key = 'password' + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.access.key = 'admin', + s3.secret.key = 'password' ); statement ok diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 70fabe684b845..6ba0aa225f1f6 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -19,12 +19,10 @@ use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema}; use async_trait::async_trait; -use icelake::catalog::load_catalog; +use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; use icelake::{Table, TableIdentifier}; -use itertools::Itertools; -use opendal::services::S3; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -66,23 +64,35 @@ pub struct IcebergConfig { #[serde(rename = "table.name")] pub table_name: String, // Full name of table, must include schema name - #[serde(skip)] - pub iceberg_configs: HashMap, + #[serde(rename = "database.name")] + pub database_name: String, // Use as catalog name. + + #[serde(rename = "catalog.type")] + pub catalog_type: String, // Catalog type supported by iceberg, such as "storage", "rest" + + #[serde(rename = "warehouse.path")] + pub path: Option, // Path of iceberg warehouse, only applicable in storage catalog. + + #[serde(rename = "catalog.uri")] + pub uri: Option, // URI of iceberg catalog, only applicable in rest catalog. + + #[serde(rename = "s3.region")] + pub region: Option, + + #[serde(rename = "s3.endpoint")] + pub endpoint: Option, + + #[serde(rename = "s3.access.key")] + pub access_key: String, + + #[serde(rename = "s3.secret.key")] + pub secret_key: String, } impl IcebergConfig { pub fn from_hashmap(values: HashMap) -> Result { - let iceberg_configs = values - .iter() - .filter(|(k, _v)| k.starts_with("iceberg.")) - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - - let mut config = - serde_json::from_value::(serde_json::to_value(values).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; - - config.iceberg_configs = iceberg_configs; + let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( @@ -95,6 +105,53 @@ impl IcebergConfig { Ok(config) } + + fn build_iceberg_configs(&self) -> HashMap { + let mut iceberg_configs = HashMap::new(); + iceberg_configs.insert(CATALOG_TYPE.to_string(), self.catalog_type.clone()); + iceberg_configs.insert( + CATALOG_NAME.to_string(), + self.database_name.clone().to_string(), + ); + if let Some(path) = &self.path { + iceberg_configs.insert( + format!("iceberg.catalog.{}.warehouse", self.database_name), + path.clone().to_string(), + ); + } + + if let Some(uri) = &self.uri { + iceberg_configs.insert( + format!("iceberg.catalog.{}.uri", self.database_name), + uri.clone().to_string(), + ); + } + + if let Some(region) = &self.region { + iceberg_configs.insert( + "iceberg.catalog.table.io.region".to_string(), + region.clone().to_string(), + ); + } + + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert( + "iceberg.catalog.table.io.endpoint".to_string(), + endpoint.clone().to_string(), + ); + } + + iceberg_configs.insert( + "iceberg.catalog.table.io.access_key_id".to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + "iceberg.catalog.table.io.secret_access_key".to_string(), + self.secret_key.clone().to_string(), + ); + + iceberg_configs + } } pub struct IcebergSink { @@ -112,7 +169,7 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_table(&self) -> Result { - let catalog = load_catalog(&self.config.iceberg_configs) + let catalog = load_catalog(&self.config.build_iceberg_configs()) .await .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;