From 5d18e4efa1ddb0d0dfca3f890221e29e9c552b88 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 14 Sep 2023 14:22:31 +0800 Subject: [PATCH 1/4] Change icelake version --- Cargo.lock | 46 +++++++++++++++++++++++++++++-- Cargo.toml | 2 +- src/connector/src/sink/iceberg.rs | 5 +++- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12f7be03b7d2c..55c4f366b0b1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,6 +1781,15 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -2508,6 +2517,26 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "enum-display" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d4df33d54dd1959d177a0e2c2f4e5a8637a3054aa56861ed7e173ad2043fe2" +dependencies = [ + "enum-display-macro", +] + +[[package]] +name = "enum-display-macro" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0ce3a36047ede676eb0d2721d065beed8410cf4f113f489604d2971331cb378" +dependencies = [ + "convert_case", + "quote", + "syn 1.0.109", +] + [[package]] name = "enum-iterator" version = "1.4.1" @@ -3576,8 +3605,8 @@ dependencies = [ [[package]] name = "icelake" -version = "0.0.9" -source = "git+https://github.com/icelake-io/icelake?rev=a6790d17094754959e351fac1e11147e37643e97#a6790d17094754959e351fac1e11147e37643e97" +version = "0.0.10" +source = "git+https://github.com/icelake-io/icelake?rev=85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd#85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" dependencies = [ "anyhow", "apache-avro 0.15.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3591,21 +3620,28 @@ dependencies = [ "bitvec", "bytes", "chrono", + "csv", + "enum-display", "faster-hex", "futures", + "itertools 0.11.0", "log", + "murmur3", "once_cell", "opendal", "ordered-float 3.9.1", "parquet", "regex", + "reqwest", "rust_decimal", "serde", "serde_bytes", "serde_json", "serde_with 3.3.0", "tokio", + "toml 0.7.8", "url", + "urlencoding", "uuid", ] @@ -4473,6 +4509,12 @@ dependencies = [ "serde", ] +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + [[package]] name = "mysql-common-derive" version = "0.30.2" diff --git a/Cargo.toml b/Cargo.toml index 5742b9efc3713..ccca32af0b9e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ hashbrown = { version = "0.14.0", features = [ criterion = { version = "0.5", features = ["async_futures"] } tonic = { package = "madsim-tonic", version = "0.3.1" } tonic-build = { package = "madsim-tonic-build", version = "0.3.1" } -icelake = { git = "https://github.com/icelake-io/icelake", rev = "a6790d17094754959e351fac1e11147e37643e97" } +icelake = { git = "https://github.com/icelake-io/icelake", rev = "85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" } arrow-array = "46" arrow-schema = "46" arrow-buffer = "46" diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 8d19cc0ad705c..07eee0472b9a6 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -57,13 +57,16 @@ pub type RemoteIcebergConfig = RemoteConfig; #[serde(deny_unknown_fields)] pub struct IcebergConfig { #[serde(skip_serializing)] - pub connector: String, // Must be "kafka" here. + pub connector: String, // Must be "iceberg" here. pub r#type: String, // accept "append-only" or "upsert" #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub force_append_only: bool, + // Catalog type values: "storage", "rest" + pub catalog: String, + #[serde(rename = "warehouse.path")] pub path: String, From 99df9abb3aafb6157674b9dd6a59da3664d154d2 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Thu, 14 Sep 2023 15:09:10 +0800 Subject: [PATCH 2/4] refactor: Use catalog api in icelake --- e2e_test/iceberg/iceberg_sink_v2.slt | 14 +-- src/connector/src/sink/iceberg.rs | 123 +++++---------------------- src/connector/src/sink/mod.rs | 2 +- 3 files changed, 30 insertions(+), 109 deletions(-) diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/iceberg_sink_v2.slt index 59af62b5a1b46..49f6d570532e2 100644 --- a/e2e_test/iceberg/iceberg_sink_v2.slt +++ b/e2e_test/iceberg/iceberg_sink_v2.slt @@ -23,12 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', - warehouse.path = 's3://icebergdata/demo', - s3.endpoint = 'http://127.0.0.1:9301', - s3.access.key = 'hummockadmin', - s3.secret.key = 'hummockadmin', - database.name='demo_db', - table.name='demo_table' + table.name = 'demo_db.demo_table', + + iceberg.catalog.type = 'storage', + iceberg.catalog.name = 'demo', + iceberg.catalog.demo.warehouse = 's3://icebergdata/demo', + iceberg.table.io.endpoint = 'http://127.0.0.1:9301', + iceberg.table.io.access_key_id = 'admin', + iceberg.table.io.secret_access_key = 'password' ); statement ok diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 07eee0472b9a6..70fabe684b845 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -14,16 +14,16 @@ use std::collections::HashMap; use std::fmt::Debug; -use std::sync::Arc; use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema}; use async_trait::async_trait; -use icelake::config::{TableConfig, TableConfigRef}; +use icelake::catalog::load_catalog; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; -use icelake::Table; +use icelake::{Table, TableIdentifier}; +use itertools::Itertools; use opendal::services::S3; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; @@ -35,7 +35,6 @@ use risingwave_pb::connector_service::SinkMetadata; use risingwave_rpc_client::ConnectorClient; use serde_derive::Deserialize; use serde_json::Value; -use url::Url; use super::{ Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, @@ -64,43 +63,26 @@ pub struct IcebergConfig { #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub force_append_only: bool, - // Catalog type values: "storage", "rest" - pub catalog: String, - - #[serde(rename = "warehouse.path")] - pub path: String, - - #[serde(rename = "s3.region")] - pub region: Option, - - #[serde(rename = "s3.endpoint")] - pub endpoint: Option, - - #[serde(rename = "s3.access.key")] - pub access_key: String, - - #[serde(rename = "s3.secret.key")] - pub secret_key: String, - - #[serde(rename = "database.name")] - pub database_name: String, - #[serde(rename = "table.name")] - pub table_name: String, + pub table_name: String, // Full name of table, must include schema name #[serde(skip)] - pub iceberg_table_config: TableConfigRef, + pub iceberg_configs: HashMap, } impl IcebergConfig { pub fn from_hashmap(values: HashMap) -> Result { - let iceberg_table_config = - Arc::new(TableConfig::try_from(&values).map_err(|e| SinkError::Iceberg(anyhow!(e)))?); + let iceberg_configs = values + .iter() + .filter(|(k, _v)| k.starts_with("iceberg.")) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let mut config = serde_json::from_value::(serde_json::to_value(values).unwrap()) .map_err(|e| SinkError::Config(anyhow!(e)))?; - config.iceberg_table_config = iceberg_table_config; + config.iceberg_configs = iceberg_configs; if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( @@ -111,12 +93,6 @@ impl IcebergConfig { ))); } - if config.endpoint.is_none() && config.region.is_none() { - return Err(SinkError::Config(anyhow!( - "You must fill either s3 region or s3 endpoint", - ))); - } - Ok(config) } } @@ -124,8 +100,6 @@ impl IcebergConfig { pub struct IcebergSink { config: IcebergConfig, param: SinkParam, - table_root: String, - bucket_name: String, } impl Debug for IcebergSink { @@ -138,32 +112,17 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_table(&self) -> Result { - let mut builder = S3::default(); - - // Sink will not load config from file. - builder.disable_config_load(); - - builder - .root(&self.table_root) - .bucket(&self.bucket_name) - .access_key_id(&self.config.access_key) - .secret_access_key(&self.config.secret_key); - - if let Some(region) = &self.config.region { - builder.region(region); - } - - if let Some(endpoint) = &self.config.endpoint { - builder.endpoint(endpoint); - } + let catalog = load_catalog(&self.config.iceberg_configs) + .await + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; - let op = opendal::Operator::new(builder) - .map_err(|err| SinkError::Config(anyhow!("{err}")))? - .finish(); + let table_id = TableIdentifier::new(self.config.table_name.split('.')) + .map_err(|e| SinkError::Iceberg(anyhow!("Unable to parse table name: {e}")))?; - let table = Table::open_with_config(op, self.config.iceberg_table_config.clone()) + let table = catalog + .load_table(&table_id) .await - .map_err(|err| SinkError::Iceberg(anyhow!("Create table fail: {}", err)))?; + .map_err(|err| SinkError::Iceberg(anyhow!(err)))?; let sink_schema = self.param.schema(); let iceberg_schema = table @@ -179,42 +138,7 @@ impl IcebergSink { Ok(table) } - /// Parse bucket name and table root path. - /// - /// return (bucket name, table root path) - fn parse_bucket_and_root_from_path(config: &IcebergConfig) -> Result<(String, String)> { - let url = Url::parse(&config.path).map_err(|err| { - SinkError::Config(anyhow!( - "Fail to parse Invalid path: {}, caused by: {}", - &config.path, - err - )) - })?; - - let scheme = url.scheme(); - if scheme != "s3a" && scheme != "s3" && scheme != "s3n" { - return Err(SinkError::Config(anyhow!( - "Invalid path: {}, only support s3a,s3,s3n prefix", - &config.path - ))); - } - - let bucket = url - .host_str() - .ok_or_else(|| SinkError::Config(anyhow!("Invalid path: {}", &config.path)))?; - let root = url.path(); - - let table_root_path = if root.is_empty() { - format!("/{}/{}", config.database_name, config.table_name) - } else { - format!("{}/{}/{}", root, config.database_name, config.table_name) - }; - - Ok((bucket.to_string(), table_root_path)) - } - pub fn new(config: IcebergConfig, param: SinkParam) -> Result { - let (bucket_name, table_root) = Self::parse_bucket_and_root_from_path(&config)?; // TODO(ZENOTME): Only support append-only mode now. if !config.force_append_only { return Err(SinkError::Iceberg(anyhow!( @@ -222,12 +146,7 @@ impl IcebergSink { ))); } - Ok(Self { - config, - param, - table_root, - bucket_name, - }) + Ok(Self { config, param }) } } diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 9588e00aa655a..988f316d178c9 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -438,7 +438,7 @@ impl SinkImpl { SinkImpl::ClickHouse(_) => "clickhouse", SinkImpl::Iceberg(_) => "iceberg", SinkImpl::Nats(_) => "nats", - SinkImpl::RemoteIceberg(_) => "iceberg", + SinkImpl::RemoteIceberg(_) => "iceberg_java", SinkImpl::TestSink(_) => "test", } } From 0e151171a8fd044f229cde30c2cb234cf0f56888 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 19 Sep 2023 18:35:22 +0800 Subject: [PATCH 3/4] Fix comment --- e2e_test/iceberg/iceberg_sink_v2.slt | 12 ++-- src/connector/src/sink/iceberg.rs | 91 ++++++++++++++++++++++------ 2 files changed, 80 insertions(+), 23 deletions(-) diff --git a/e2e_test/iceberg/iceberg_sink_v2.slt b/e2e_test/iceberg/iceberg_sink_v2.slt index 49f6d570532e2..126ed4bbf4f4b 100644 --- a/e2e_test/iceberg/iceberg_sink_v2.slt +++ b/e2e_test/iceberg/iceberg_sink_v2.slt @@ -23,14 +23,14 @@ CREATE SINK s6 AS select * from mv6 WITH ( connector = 'iceberg', type = 'append-only', force_append_only = 'true', + database.name = 'demo', table.name = 'demo_db.demo_table', - iceberg.catalog.type = 'storage', - iceberg.catalog.name = 'demo', - iceberg.catalog.demo.warehouse = 's3://icebergdata/demo', - iceberg.table.io.endpoint = 'http://127.0.0.1:9301', - iceberg.table.io.access_key_id = 'admin', - iceberg.table.io.secret_access_key = 'password' + catalog.type = 'storage', + warehouse.path = 's3://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.access.key = 'admin', + s3.secret.key = 'password' ); statement ok diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 70fabe684b845..6ba0aa225f1f6 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -19,12 +19,10 @@ use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_schema::{DataType as ArrowDataType, Schema as ArrowSchema}; use async_trait::async_trait; -use icelake::catalog::load_catalog; +use icelake::catalog::{load_catalog, CATALOG_NAME, CATALOG_TYPE}; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, DataFile}; use icelake::{Table, TableIdentifier}; -use itertools::Itertools; -use opendal::services::S3; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; @@ -66,23 +64,35 @@ pub struct IcebergConfig { #[serde(rename = "table.name")] pub table_name: String, // Full name of table, must include schema name - #[serde(skip)] - pub iceberg_configs: HashMap, + #[serde(rename = "database.name")] + pub database_name: String, // Use as catalog name. + + #[serde(rename = "catalog.type")] + pub catalog_type: String, // Catalog type supported by iceberg, such as "storage", "rest" + + #[serde(rename = "warehouse.path")] + pub path: Option, // Path of iceberg warehouse, only applicable in storage catalog. + + #[serde(rename = "catalog.uri")] + pub uri: Option, // URI of iceberg catalog, only applicable in rest catalog. + + #[serde(rename = "s3.region")] + pub region: Option, + + #[serde(rename = "s3.endpoint")] + pub endpoint: Option, + + #[serde(rename = "s3.access.key")] + pub access_key: String, + + #[serde(rename = "s3.secret.key")] + pub secret_key: String, } impl IcebergConfig { pub fn from_hashmap(values: HashMap) -> Result { - let iceberg_configs = values - .iter() - .filter(|(k, _v)| k.starts_with("iceberg.")) - .map(|(k, v)| (k.clone(), v.clone())) - .collect(); - - let mut config = - serde_json::from_value::(serde_json::to_value(values).unwrap()) - .map_err(|e| SinkError::Config(anyhow!(e)))?; - - config.iceberg_configs = iceberg_configs; + let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { return Err(SinkError::Config(anyhow!( @@ -95,6 +105,53 @@ impl IcebergConfig { Ok(config) } + + fn build_iceberg_configs(&self) -> HashMap { + let mut iceberg_configs = HashMap::new(); + iceberg_configs.insert(CATALOG_TYPE.to_string(), self.catalog_type.clone()); + iceberg_configs.insert( + CATALOG_NAME.to_string(), + self.database_name.clone().to_string(), + ); + if let Some(path) = &self.path { + iceberg_configs.insert( + format!("iceberg.catalog.{}.warehouse", self.database_name), + path.clone().to_string(), + ); + } + + if let Some(uri) = &self.uri { + iceberg_configs.insert( + format!("iceberg.catalog.{}.uri", self.database_name), + uri.clone().to_string(), + ); + } + + if let Some(region) = &self.region { + iceberg_configs.insert( + "iceberg.catalog.table.io.region".to_string(), + region.clone().to_string(), + ); + } + + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert( + "iceberg.catalog.table.io.endpoint".to_string(), + endpoint.clone().to_string(), + ); + } + + iceberg_configs.insert( + "iceberg.catalog.table.io.access_key_id".to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + "iceberg.catalog.table.io.secret_access_key".to_string(), + self.secret_key.clone().to_string(), + ); + + iceberg_configs + } } pub struct IcebergSink { @@ -112,7 +169,7 @@ impl Debug for IcebergSink { impl IcebergSink { async fn create_table(&self) -> Result
{ - let catalog = load_catalog(&self.config.iceberg_configs) + let catalog = load_catalog(&self.config.build_iceberg_configs()) .await .map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?; From 7d91a2fb5cc87d11b07be78ff83f680cfa073117 Mon Sep 17 00:00:00 2001 From: Renjie Liu Date: Tue, 19 Sep 2023 19:49:39 +0800 Subject: [PATCH 4/4] fix breaking --- src/connector/src/sink/iceberg.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/connector/src/sink/iceberg.rs b/src/connector/src/sink/iceberg.rs index 6ba0aa225f1f6..0238c66d43788 100644 --- a/src/connector/src/sink/iceberg.rs +++ b/src/connector/src/sink/iceberg.rs @@ -67,8 +67,10 @@ pub struct IcebergConfig { #[serde(rename = "database.name")] pub database_name: String, // Use as catalog name. + // Catalog type supported by iceberg, such as "storage", "rest". + // If not set, we use "storage" as default. #[serde(rename = "catalog.type")] - pub catalog_type: String, // Catalog type supported by iceberg, such as "storage", "rest" + pub catalog_type: Option, #[serde(rename = "warehouse.path")] pub path: Option, // Path of iceberg warehouse, only applicable in storage catalog. @@ -108,7 +110,13 @@ impl IcebergConfig { fn build_iceberg_configs(&self) -> HashMap { let mut iceberg_configs = HashMap::new(); - iceberg_configs.insert(CATALOG_TYPE.to_string(), self.catalog_type.clone()); + iceberg_configs.insert( + CATALOG_TYPE.to_string(), + self.catalog_type + .as_deref() + .unwrap_or("storage") + .to_string(), + ); iceberg_configs.insert( CATALOG_NAME.to_string(), self.database_name.clone().to_string(),