diff --git a/Cargo.lock b/Cargo.lock index 12f7be03b7d2c..55c4f366b0b1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,6 +1781,15 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -2508,6 +2517,26 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "enum-display" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d4df33d54dd1959d177a0e2c2f4e5a8637a3054aa56861ed7e173ad2043fe2" +dependencies = [ + "enum-display-macro", +] + +[[package]] +name = "enum-display-macro" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0ce3a36047ede676eb0d2721d065beed8410cf4f113f489604d2971331cb378" +dependencies = [ + "convert_case", + "quote", + "syn 1.0.109", +] + [[package]] name = "enum-iterator" version = "1.4.1" @@ -3576,8 +3605,8 @@ dependencies = [ [[package]] name = "icelake" -version = "0.0.9" -source = "git+https://github.com/icelake-io/icelake?rev=a6790d17094754959e351fac1e11147e37643e97#a6790d17094754959e351fac1e11147e37643e97" +version = "0.0.10" +source = "git+https://github.com/icelake-io/icelake?rev=85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd#85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3591,21 +3620,28 @@ dependencies = [ "bitvec", "bytes", "chrono", + "csv", + "enum-display", "faster-hex", "futures", + "itertools 0.11.0", "log", + "murmur3", "once_cell", "opendal", "ordered-float 3.9.1", "parquet", "regex", + "reqwest", "rust_decimal", "serde", "serde_bytes", "serde_json", "serde_with 3.3.0", "tokio", + "toml 0.7.8", "url", + "urlencoding", "uuid", ] @@ -4473,6 +4509,12 @@ dependencies = [ "serde", ] +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + [[package]] name = "mysql-common-derive" version = "0.30.2" diff --git a/Cargo.toml b/Cargo.toml index 5742b9efc3713..ccca32af0b9e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ hashbrown = { version = "0.14.0", features = [ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.3.1" } tonic-build = { package = "madsim-tonic-build", version = "0.3.1" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "a6790d17094754959e351fac1e11147e37643e97" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" } arrow-array = "46" arrow-schema = "46" arrow-buffer = "46" diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/iceberg_sink_v2.slt index 59af62b5a1b46..126ed4bbf4f4b 100644 --- a/e2e_test/iceberg/iceberg_sink_v2.slt +++ b/e2e_test/iceberg/iceberg_sink_v2.slt @@ -23,12 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', + database.name = 'demo', + table.name = 'demo_db.demo_table', + + catalog.type = 'storage', warehouse.path = 's3://icebergdata/demo', s3.endpoint = 'http://127.0.0.1:9301', - s3.access.key = 'hummockadmin', - s3.secret.key = 'hummockadmin', - database.name='demo_db', - table.name='demo_table' + s3.access.key = 'admin', + s3.secret.key = 'password' ); statement ok diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 8d19cc0ad705c..0238c66d43788 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -14,17 +14,15 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema}; use async_trait::async_trait; -use icelake::config::{TableConfig, TableConfigRef}; +use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; -use icelake::Table; -use opendal::services::S3; +use icelake::{Table, TableIdentifier}; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -35,7 +33,6 @@ use risingwave_pb::connector_service::SinkMetadata; use risingwave_rpc_client::ConnectorClient; use serde_derive::Deserialize; use serde_json::Value; -use url::Url; use super::{ Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, @@ -57,15 +54,29 @@ pub type RemoteIcebergConfig = RemoteConfig; #[serde(deny_unknown_fields)] pub struct IcebergConfig { #[serde(skip_serializing)] - pub connector: String, // Must be "kafka" here. + pub connector: String, // Must be "iceberg" here. pub r#type: String, // accept "append-only" or "upsert" #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub force_append_only: bool, + #[serde(rename = "table.name")] + pub table_name: String, // Full name of table, must include schema name + + #[serde(rename = "database.name")] + pub database_name: String, // Use as catalog name. + + // Catalog type supported by iceberg, such as "storage", "rest". + // If not set, we use "storage" as default. + #[serde(rename = "catalog.type")] + pub catalog_type: Option, + #[serde(rename = "warehouse.path")] - pub path: String, + pub path: Option, // Path of iceberg warehouse, only applicable in storage catalog. + + #[serde(rename = "catalog.uri")] + pub uri: Option, // URI of iceberg catalog, only applicable in rest catalog. #[serde(rename = "s3.region")] pub region: Option, @@ -78,26 +89,12 @@ pub struct IcebergConfig { #[serde(rename = "s3.secret.key")] pub secret_key: String, - - #[serde(rename = "database.name")] - pub database_name: String, - - #[serde(rename = "table.name")] - pub table_name: String, - - #[serde(skip)] - pub iceberg_table_config: TableConfigRef, } impl IcebergConfig { pub fn from_hashmap(values: HashMap) -> Result { - let iceberg_table_config = - Arc::new(TableConfig::try_from(&values).map_err(|e| SinkError::Iceberg(anyhow!(e)))?); - let mut config = - serde_json::from_value::(serde_json::to_value(values).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; - - config.iceberg_table_config = iceberg_table_config; + let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( @@ -108,21 +105,66 @@ impl IcebergConfig { ))); } - if config.endpoint.is_none() && config.region.is_none() { - return Err(SinkError::Config(anyhow!( - "You must fill either s3 region or s3 endpoint", - ))); + Ok(config) + } + + fn build_iceberg_configs(&self) -> HashMap { + let mut iceberg_configs = HashMap::new(); + iceberg_configs.insert( + CATALOG_TYPE.to_string(), + self.catalog_type + .as_deref() + .unwrap_or("storage") + .to_string(), + ); + iceberg_configs.insert( + CATALOG_NAME.to_string(), + self.database_name.clone().to_string(), + ); + if let Some(path) = &self.path { + iceberg_configs.insert( + format!("iceberg.catalog.{}.warehouse", self.database_name), + path.clone().to_string(), + ); } - Ok(config) + if let Some(uri) = &self.uri { + iceberg_configs.insert( + format!("iceberg.catalog.{}.uri", self.database_name), + uri.clone().to_string(), + ); + } + + if let Some(region) = &self.region { + iceberg_configs.insert( + "iceberg.catalog.table.io.region".to_string(), + region.clone().to_string(), + ); + } + + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert( + "iceberg.catalog.table.io.endpoint".to_string(), + endpoint.clone().to_string(), + ); + } + + iceberg_configs.insert( + "iceberg.catalog.table.io.access_key_id".to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + "iceberg.catalog.table.io.secret_access_key".to_string(), + self.secret_key.clone().to_string(), + ); + + iceberg_configs } } pub struct IcebergSink { config: IcebergConfig, param: SinkParam, - table_root: String, - bucket_name: String, } impl Debug for IcebergSink { @@ -135,32 +177,17 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_table(&self) -> Result { - let mut builder = S3::default(); - - // Sink will not load config from file. - builder.disable_config_load(); - - builder - .root(&self.table_root) - .bucket(&self.bucket_name) - .access_key_id(&self.config.access_key) - .secret_access_key(&self.config.secret_key); - - if let Some(region) = &self.config.region { - builder.region(region); - } - - if let Some(endpoint) = &self.config.endpoint { - builder.endpoint(endpoint); - } + let catalog = load_catalog(&self.config.build_iceberg_configs()) + .await + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; - let op = opendal::Operator::new(builder) - .map_err(|err| SinkError::Config(anyhow!("{err}")))? - .finish(); + let table_id = TableIdentifier::new(self.config.table_name.split('.')) + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - let table = Table::open_with_config(op, self.config.iceberg_table_config.clone()) + let table = catalog + .load_table(&table_id) .await - .map_err(|err| SinkError::Iceberg(anyhow!("Create table fail: {}", err)))?; + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; let sink_schema = self.param.schema(); let iceberg_schema = table @@ -176,42 +203,7 @@ impl IcebergSink { Ok(table) } - /// Parse bucket name and table root path. - /// - /// return (bucket name, table root path) - fn parse_bucket_and_root_from_path(config: &IcebergConfig) -> Result<(String, String)> { - let url = Url::parse(&config.path).map_err(|err| { - SinkError::Config(anyhow!( - "Fail to parse Invalid path: {}, caused by: {}", - &config.path, - err - )) - })?; - - let scheme = url.scheme(); - if scheme != "s3a" && scheme != "s3" && scheme != "s3n" { - return Err(SinkError::Config(anyhow!( - "Invalid path: {}, only support s3a,s3,s3n prefix", - &config.path - ))); - } - - let bucket = url - .host_str() - .ok_or_else(|| SinkError::Config(anyhow!("Invalid path: {}", &config.path)))?; - let root = url.path(); - - let table_root_path = if root.is_empty() { - format!("/{}/{}", config.database_name, config.table_name) - } else { - format!("{}/{}/{}", root, config.database_name, config.table_name) - }; - - Ok((bucket.to_string(), table_root_path)) - } - pub fn new(config: IcebergConfig, param: SinkParam) -> Result { - let (bucket_name, table_root) = Self::parse_bucket_and_root_from_path(&config)?; // TODO(ZENOTME): Only support append-only mode now. if !config.force_append_only { return Err(SinkError::Iceberg(anyhow!( @@ -219,12 +211,7 @@ impl IcebergSink { ))); } - Ok(Self { - config, - param, - table_root, - bucket_name, - }) + Ok(Self { config, param }) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 9588e00aa655a..988f316d178c9 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -438,7 +438,7 @@ impl SinkImpl { SinkImpl::ClickHouse(_) => "clickhouse", SinkImpl::Iceberg(_) => "iceberg", SinkImpl::Nats(_) => "nats", - SinkImpl::RemoteIceberg(_) => "iceberg", + SinkImpl::RemoteIceberg(_) => "iceberg_java", SinkImpl::TestSink(_) => "test", } }