From 4bc3b28d7eac7e9c9700b76b41ce0fc5350069c1 Mon Sep 17 00:00:00 2001 From: xxchan Date: Fri, 20 Sep 2024 13:50:39 +0800 Subject: [PATCH] refactor(iceberg): extract IcebergCommon config (#18600) Signed-off-by: xxchan --- src/batch/src/executor/iceberg_scan.rs | 7 +- src/connector/src/connector_common/common.rs | 10 - .../iceberg/jni_catalog.rs | 0 .../iceberg/mock_catalog.rs | 0 .../src/connector_common/iceberg/mod.rs | 595 ++++++++++++++++++ .../iceberg/storage_catalog.rs | 0 src/connector/src/connector_common/mod.rs | 9 +- .../src/connector_common/mqtt_common.rs | 2 +- src/connector/src/sink/iceberg/mod.rs | 568 ++--------------- src/connector/src/source/iceberg/mod.rs | 72 +-- src/connector/src/with_options.rs | 2 +- src/connector/src/with_options_test.rs | 49 +- src/connector/with_options_sink.yaml | 35 +- src/connector/with_options_source.yaml | 6 + .../rw_catalog/rw_iceberg_files.rs | 4 +- .../rw_catalog/rw_iceberg_snapshots.rs | 4 +- src/frontend/src/handler/create_source.rs | 8 +- .../src/optimizer/plan_node/batch_source.rs | 2 + 18 files changed, 737 insertions(+), 636 deletions(-) rename src/connector/src/{sink => connector_common}/iceberg/jni_catalog.rs (100%) rename src/connector/src/{sink => connector_common}/iceberg/mock_catalog.rs (100%) create mode 100644 src/connector/src/connector_common/iceberg/mod.rs rename src/connector/src/{sink => connector_common}/iceberg/storage_catalog.rs (100%) diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 6272384435db9..8f5c625aaa3d2 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -28,7 +28,6 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::{DataType, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; -use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData}; use risingwave_connector::WithOptionsSecResolved; @@ -42,7 +41,7 @@ use crate::task::BatchTaskContext; static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0; static POSITION_DELETE_FILE_POS: usize = 1; pub struct IcebergScanExecutor { - iceberg_config: IcebergConfig, + iceberg_config: IcebergProperties, #[allow(dead_code)] snapshot_id: Option, table_meta: TableMetadata, @@ -70,7 +69,7 @@ impl Executor for IcebergScanExecutor { impl IcebergScanExecutor { pub fn new( - iceberg_config: IcebergConfig, + iceberg_config: IcebergProperties, snapshot_id: Option, table_meta: TableMetadata, data_file_scan_tasks: Vec, @@ -203,7 +202,7 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { let iceberg_properties: IcebergProperties = *iceberg_properties; let split: IcebergSplit = split.clone(); Ok(Box::new(IcebergScanExecutor::new( - iceberg_properties.to_iceberg_config(), + iceberg_properties, Some(split.snapshot_id), split.table_meta.deserialize(), split.files.into_iter().map(|x| x.deserialize()).collect(), diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 9f4211aedd4d9..a492c2f7d2dcc 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; use std::collections::BTreeMap; use std::io::Write; use std::time::Duration; @@ -38,8 +37,6 @@ use crate::deserialize_duration_from_string; use crate::error::ConnectorResult; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; -// The file describes the common abstractions for each connector and can be used in both source and -// sink. pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints"; pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets"; @@ -563,13 +560,6 @@ impl KinesisCommon { Ok(KinesisClient::from_conf(builder.build())) } } -#[derive(Debug, Deserialize)] -pub struct UpsertMessage<'a> { - #[serde(borrow)] - pub primary_key: Cow<'a, [u8]>, - #[serde(borrow)] - pub record: Cow<'a, [u8]>, -} #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] diff --git a/src/connector/src/sink/iceberg/jni_catalog.rs b/src/connector/src/connector_common/iceberg/jni_catalog.rs similarity index 100% rename from src/connector/src/sink/iceberg/jni_catalog.rs rename to src/connector/src/connector_common/iceberg/jni_catalog.rs diff --git a/src/connector/src/sink/iceberg/mock_catalog.rs b/src/connector/src/connector_common/iceberg/mock_catalog.rs similarity index 100% rename from src/connector/src/sink/iceberg/mock_catalog.rs rename to src/connector/src/connector_common/iceberg/mock_catalog.rs diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs new file mode 100644 index 0000000000000..37d4e5e6f5a08 --- /dev/null +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -0,0 +1,595 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod jni_catalog; +mod mock_catalog; +mod storage_catalog; + +use std::collections::HashMap; +use std::sync::Arc; + +use anyhow::Context; +use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use icelake::catalog::{ + load_iceberg_base_catalog_config, BaseCatalogConfig, CATALOG_NAME, CATALOG_TYPE, +}; +use risingwave_common::bail; +use serde_derive::Deserialize; +use serde_with::serde_as; +use url::Url; +use with_options::WithOptions; + +use crate::error::ConnectorResult; + +#[serde_as] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] +pub struct IcebergCommon { + // Catalog type supported by iceberg, such as "storage", "rest". + // If not set, we use "storage" as default. + #[serde(rename = "catalog.type")] + pub catalog_type: Option, + #[serde(rename = "s3.region")] + pub region: Option, + #[serde(rename = "s3.endpoint")] + pub endpoint: Option, + #[serde(rename = "s3.access.key")] + pub access_key: String, + #[serde(rename = "s3.secret.key")] + pub secret_key: String, + /// Path of iceberg warehouse, only applicable in storage catalog. + #[serde(rename = "warehouse.path")] + pub warehouse_path: String, + /// Catalog name, can be omitted for storage catalog, but + /// must be set for other catalogs. + #[serde(rename = "catalog.name")] + pub catalog_name: Option, + /// URI of iceberg catalog, only applicable in rest catalog. + #[serde(rename = "catalog.uri")] + pub catalog_uri: Option, + #[serde(rename = "database.name")] + pub database_name: Option, + /// Full name of table, must include schema name. + #[serde(rename = "table.name")] + pub table_name: String, +} + +impl IcebergCommon { + pub fn catalog_type(&self) -> &str { + self.catalog_type.as_deref().unwrap_or("storage") + } + + pub fn catalog_name(&self) -> String { + self.catalog_name + .as_ref() + .map(|s| s.to_string()) + .unwrap_or_else(|| "risingwave".to_string()) + } + + /// For both V1 and V2. + fn build_jni_catalog_configs( + &self, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult<(BaseCatalogConfig, HashMap)> { + let mut iceberg_configs = HashMap::new(); + + let base_catalog_config = { + let catalog_type = self.catalog_type().to_string(); + + iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); + iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); + + if let Some(region) = &self.region { + // icelake + iceberg_configs.insert( + "iceberg.table.io.region".to_string(), + region.clone().to_string(), + ); + // iceberg-rust + iceberg_configs.insert( + ("iceberg.table.io.".to_string() + S3_REGION).to_string(), + region.clone().to_string(), + ); + } + + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert( + "iceberg.table.io.endpoint".to_string(), + endpoint.clone().to_string(), + ); + + // iceberg-rust + iceberg_configs.insert( + ("iceberg.table.io.".to_string() + S3_ENDPOINT).to_string(), + endpoint.clone().to_string(), + ); + } + + // icelake + iceberg_configs.insert( + "iceberg.table.io.access_key_id".to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + "iceberg.table.io.secret_access_key".to_string(), + self.secret_key.clone().to_string(), + ); + + // iceberg-rust + iceberg_configs.insert( + ("iceberg.table.io.".to_string() + S3_ACCESS_KEY_ID).to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + ("iceberg.table.io.".to_string() + S3_SECRET_ACCESS_KEY).to_string(), + self.secret_key.clone().to_string(), + ); + + let (bucket, _) = { + let url = Url::parse(&self.warehouse_path) + .with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?; + let bucket = url + .host_str() + .with_context(|| { + format!( + "Invalid s3 path: {}, bucket is missing", + self.warehouse_path + ) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + // #TODO + // Support load config file + iceberg_configs.insert( + "iceberg.table.io.disable_config_load".to_string(), + "true".to_string(), + ); + + load_iceberg_base_catalog_config(&iceberg_configs)? + }; + + // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/ + let mut java_catalog_configs = HashMap::new(); + { + if let Some(uri) = self.catalog_uri.as_deref() { + java_catalog_configs.insert("uri".to_string(), uri.to_string()); + } + + java_catalog_configs.insert("warehouse".to_string(), self.warehouse_path.clone()); + java_catalog_configs.extend(java_catalog_props.clone()); + + // Currently we only support s3, so let's set it to s3 + java_catalog_configs.insert( + "io-impl".to_string(), + "org.apache.iceberg.aws.s3.S3FileIO".to_string(), + ); + + if let Some(endpoint) = &self.endpoint { + java_catalog_configs + .insert("s3.endpoint".to_string(), endpoint.clone().to_string()); + } + + java_catalog_configs.insert( + "s3.access-key-id".to_string(), + self.access_key.clone().to_string(), + ); + java_catalog_configs.insert( + "s3.secret-access-key".to_string(), + self.secret_key.clone().to_string(), + ); + + if let Some(path_style_access) = path_style_access { + java_catalog_configs.insert( + "s3.path-style-access".to_string(), + path_style_access.to_string(), + ); + } + if matches!(self.catalog_type.as_deref(), Some("glue")) { + java_catalog_configs.insert( + "client.credentials-provider".to_string(), + "com.risingwave.connector.catalog.GlueCredentialProvider".to_string(), + ); + // Use S3 ak/sk and region as glue ak/sk and region by default. + // TODO: use different ak/sk and region for s3 and glue. + java_catalog_configs.insert( + "client.credentials-provider.glue.access-key-id".to_string(), + self.access_key.clone().to_string(), + ); + java_catalog_configs.insert( + "client.credentials-provider.glue.secret-access-key".to_string(), + self.secret_key.clone().to_string(), + ); + if let Some(region) = &self.region { + java_catalog_configs + .insert("client.region".to_string(), region.clone().to_string()); + java_catalog_configs.insert( + "glue.endpoint".to_string(), + format!("https://glue.{}.amazonaws.com", region), + ); + } + } + } + + Ok((base_catalog_config, java_catalog_configs)) + } +} + +/// icelake +mod v1 { + use icelake::catalog::{load_catalog, CatalogRef}; + use icelake::{Table, TableIdentifier}; + + use super::*; + + impl IcebergCommon { + pub fn full_table_name(&self) -> ConnectorResult { + let ret = if let Some(database_name) = &self.database_name { + TableIdentifier::new(vec![database_name, &self.table_name]) + } else { + TableIdentifier::new(vec![&self.table_name]) + }; + + Ok(ret.context("Failed to create table identifier")?) + } + + fn build_iceberg_configs( + &self, + path_style_access: &Option, + ) -> ConnectorResult> { + let mut iceberg_configs = HashMap::new(); + + let catalog_type = self.catalog_type().to_string(); + + iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); + iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); + + match catalog_type.as_str() { + "storage" => { + iceberg_configs.insert( + format!("iceberg.catalog.{}.warehouse", self.catalog_name()), + self.warehouse_path.clone(), + ); + } + "rest" => { + let uri = self + .catalog_uri + .clone() + .with_context(|| "`catalog.uri` must be set in rest catalog".to_string())?; + iceberg_configs + .insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri); + } + _ => { + bail!( + "Unsupported catalog type: {}, only support `storage` and `rest`", + catalog_type + ); + } + } + + if let Some(region) = &self.region { + iceberg_configs.insert( + "iceberg.table.io.region".to_string(), + region.clone().to_string(), + ); + } + + if let Some(endpoint) = &self.endpoint { + iceberg_configs.insert( + "iceberg.table.io.endpoint".to_string(), + endpoint.clone().to_string(), + ); + } + + iceberg_configs.insert( + "iceberg.table.io.access_key_id".to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + "iceberg.table.io.secret_access_key".to_string(), + self.secret_key.clone().to_string(), + ); + if let Some(path_style_access) = path_style_access { + iceberg_configs.insert( + "iceberg.table.io.enable_virtual_host_style".to_string(), + (!path_style_access).to_string(), + ); + } + + let (bucket, root) = { + let url = Url::parse(&self.warehouse_path) + .with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?; + let bucket = url + .host_str() + .with_context(|| { + format!( + "Invalid s3 path: {}, bucket is missing", + self.warehouse_path + ) + })? + .to_string(); + let root = url.path().trim_start_matches('/').to_string(); + (bucket, root) + }; + + iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); + + // Only storage catalog should set this. + if catalog_type == "storage" { + iceberg_configs.insert("iceberg.table.io.root".to_string(), root); + } + // #TODO + // Support load config file + iceberg_configs.insert( + "iceberg.table.io.disable_config_load".to_string(), + "true".to_string(), + ); + + Ok(iceberg_configs) + } + + /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. + pub async fn create_catalog( + &self, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult { + match self.catalog_type() { + "storage" | "rest" => { + let iceberg_configs = self.build_iceberg_configs(path_style_access)?; + let catalog = load_catalog(&iceberg_configs).await?; + Ok(catalog) + } + catalog_type + if catalog_type == "hive" + || catalog_type == "jdbc" + || catalog_type == "glue" => + { + // Create java catalog + let (base_catalog_config, java_catalog_props) = + self.build_jni_catalog_configs(path_style_access, java_catalog_props)?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", + "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", + _ => unreachable!(), + }; + + jni_catalog::JniCatalog::build_catalog( + base_catalog_config, + self.catalog_name(), + catalog_impl, + java_catalog_props, + ) + } + "mock" => Ok(Arc::new(mock_catalog::MockCatalog {})), + _ => { + bail!( + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`", + self.catalog_type() + ) + } + } + } + + /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. + pub async fn load_table( + &self, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult { + let catalog = self + .create_catalog(path_style_access, java_catalog_props) + .await + .context("Unable to load iceberg catalog")?; + + let table_id = self + .full_table_name() + .context("Unable to parse table name")?; + + catalog.load_table(&table_id).await.map_err(Into::into) + } + } +} + +/// iceberg-rust +mod v2 { + use iceberg::spec::TableMetadata; + use iceberg::table::Table as TableV2; + use iceberg::{Catalog as CatalogV2, TableIdent}; + use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY}; + + use super::*; + + impl IcebergCommon { + pub fn full_table_name_v2(&self) -> ConnectorResult { + let ret = if let Some(database_name) = &self.database_name { + TableIdent::from_strs(vec![database_name, &self.table_name]) + } else { + TableIdent::from_strs(vec![&self.table_name]) + }; + + Ok(ret.context("Failed to create table identifier")?) + } + + /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. + pub async fn create_catalog_v2( + &self, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult> { + match self.catalog_type() { + "storage" => { + let config = storage_catalog::StorageCatalogConfig::builder() + .warehouse(self.warehouse_path.clone()) + .access_key(self.access_key.clone()) + .secret_key(self.secret_key.clone()) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(); + let catalog = storage_catalog::StorageCatalog::new(config)?; + Ok(Arc::new(catalog)) + } + "rest" => { + let mut iceberg_configs = HashMap::new(); + if let Some(region) = &self.region { + iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string()); + } + if let Some(endpoint) = &self.endpoint { + iceberg_configs + .insert(S3_ENDPOINT.to_string(), endpoint.clone().to_string()); + } + iceberg_configs.insert( + S3_ACCESS_KEY_ID.to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + S3_SECRET_ACCESS_KEY.to_string(), + self.secret_key.clone().to_string(), + ); + let config = iceberg_catalog_rest::RestCatalogConfig::builder() + .uri(self.catalog_uri.clone().with_context(|| { + "`catalog.uri` must be set in rest catalog".to_string() + })?) + .props(iceberg_configs) + .build(); + let catalog = iceberg_catalog_rest::RestCatalog::new(config); + Ok(Arc::new(catalog)) + } + "glue" => { + let mut iceberg_configs = HashMap::new(); + // glue + if let Some(region) = &self.region { + iceberg_configs + .insert(AWS_REGION_NAME.to_string(), region.clone().to_string()); + } + iceberg_configs.insert( + AWS_ACCESS_KEY_ID.to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + AWS_SECRET_ACCESS_KEY.to_string(), + self.secret_key.clone().to_string(), + ); + // s3 + if let Some(region) = &self.region { + iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string()); + } + if let Some(endpoint) = &self.endpoint { + iceberg_configs + .insert(S3_ENDPOINT.to_string(), endpoint.clone().to_string()); + } + iceberg_configs.insert( + S3_ACCESS_KEY_ID.to_string(), + self.access_key.clone().to_string(), + ); + iceberg_configs.insert( + S3_SECRET_ACCESS_KEY.to_string(), + self.secret_key.clone().to_string(), + ); + let config_builder = iceberg_catalog_glue::GlueCatalogConfig::builder() + .warehouse(self.warehouse_path.clone()) + .props(iceberg_configs); + let config = if let Some(uri) = self.catalog_uri.as_deref() { + config_builder.uri(uri.to_string()).build() + } else { + config_builder.build() + }; + let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?; + Ok(Arc::new(catalog)) + } + catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { + // Create java catalog + let (base_catalog_config, java_catalog_props) = + self.build_jni_catalog_configs(path_style_access, java_catalog_props)?; + let catalog_impl = match catalog_type { + "hive" => "org.apache.iceberg.hive.HiveCatalog", + "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", + _ => unreachable!(), + }; + + jni_catalog::JniCatalog::build_catalog_v2( + base_catalog_config, + self.catalog_name(), + catalog_impl, + java_catalog_props, + ) + } + _ => { + bail!( + "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`", + self.catalog_type() + ) + } + } + } + + /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. + pub async fn load_table_v2( + &self, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult { + let catalog = self + .create_catalog_v2(path_style_access, java_catalog_props) + .await + .context("Unable to load iceberg catalog")?; + + let table_id = self + .full_table_name_v2() + .context("Unable to parse table name")?; + + catalog.load_table(&table_id).await.map_err(Into::into) + } + + pub async fn load_table_v2_with_metadata( + &self, + metadata: TableMetadata, + path_style_access: &Option, + java_catalog_props: &HashMap, + ) -> ConnectorResult { + match self.catalog_type() { + "storage" => { + let config = storage_catalog::StorageCatalogConfig::builder() + .warehouse(self.warehouse_path.clone()) + .access_key(self.access_key.clone()) + .secret_key(self.secret_key.clone()) + .region(self.region.clone()) + .endpoint(self.endpoint.clone()) + .build(); + let storage_catalog = storage_catalog::StorageCatalog::new(config)?; + + let table_id = self + .full_table_name_v2() + .context("Unable to parse table name")?; + + Ok(iceberg::table::Table::builder() + .metadata(metadata) + .identifier(table_id) + .file_io(storage_catalog.file_io().clone()) + // Only support readonly table for storage catalog now. + .readonly(true) + .build()?) + } + _ => { + self.load_table_v2(path_style_access, java_catalog_props) + .await + } + } + } + } +} diff --git a/src/connector/src/sink/iceberg/storage_catalog.rs b/src/connector/src/connector_common/iceberg/storage_catalog.rs similarity index 100% rename from src/connector/src/sink/iceberg/storage_catalog.rs rename to src/connector/src/connector_common/iceberg/storage_catalog.rs diff --git a/src/connector/src/connector_common/mod.rs b/src/connector/src/connector_common/mod.rs index 4ec36ba78e0be..30d699198ccd4 100644 --- a/src/connector/src/connector_common/mod.rs +++ b/src/connector/src/connector_common/mod.rs @@ -12,12 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod mqtt_common; +//! Common parameters and utilities for both source and sink. + +mod mqtt_common; pub use mqtt_common::{MqttCommon, QualityOfService as MqttQualityOfService}; -pub mod common; +mod common; pub use common::{ AwsAuthProps, AwsPrivateLinkItem, KafkaCommon, KafkaPrivateLinkCommon, KinesisCommon, MongodbCommon, NatsCommon, PulsarCommon, PulsarOauthCommon, RdKafkaPropertiesCommon, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY, }; + +mod iceberg; +pub use iceberg::IcebergCommon; diff --git a/src/connector/src/connector_common/mqtt_common.rs b/src/connector/src/connector_common/mqtt_common.rs index c883e459a49e8..12b3123da237b 100644 --- a/src/connector/src/connector_common/mqtt_common.rs +++ b/src/connector/src/connector_common/mqtt_common.rs @@ -21,7 +21,7 @@ use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; use with_options::WithOptions; -use crate::connector_common::common::{load_certs, load_private_key}; +use super::common::{load_certs, load_private_key}; use crate::deserialize_bool_from_string; use crate::error::ConnectorResult; diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index 9e87694539f0c..a7d212e512fe5 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -12,10 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod jni_catalog; -mod mock_catalog; mod prometheus; -mod storage_catalog; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; @@ -28,15 +25,8 @@ use arrow_schema_iceberg::{ DataType as ArrowDataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef, }; use async_trait::async_trait; -use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; -use iceberg::spec::TableMetadata; -use iceberg::table::Table as TableV2; use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent}; -use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY}; -use icelake::catalog::{ - load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME, - CATALOG_TYPE, -}; +use icelake::catalog::CatalogRef; use icelake::io_v2::input_wrapper::{DeltaWriter, RecordBatchWriter}; use icelake::io_v2::prometheus::{PrometheusWriterBuilder, WriterMetrics}; use icelake::io_v2::{ @@ -44,7 +34,7 @@ use icelake::io_v2::{ }; use icelake::transaction::Transaction; use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile}; -use icelake::{Table, TableIdentifier}; +use icelake::Table; use itertools::Itertools; use risingwave_common::array::arrow::{IcebergArrowConvert, IcebergCreateTableArrowConvert}; use risingwave_common::array::{Op, StreamChunk}; @@ -56,12 +46,9 @@ use risingwave_pb::connector_service::sink_metadata::SerializedMetadata; use risingwave_pb::connector_service::SinkMetadata; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; -use storage_catalog::StorageCatalogConfig; use thiserror_ext::AsReport; -use url::Url; use with_options::WithOptions; -use self::mock_catalog::MockCatalog; use self::prometheus::monitored_base_file_writer::MonitoredBaseFileWriterBuilder; use self::prometheus::monitored_position_delete_writer::MonitoredPositionDeleteWriterBuilder; use super::decouple_checkpoint_log_sink::{ @@ -70,7 +57,7 @@ use super::decouple_checkpoint_log_sink::{ use super::{ Sink, SinkError, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, }; -use crate::error::ConnectorResult; +use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::writer::SinkWriter; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; @@ -79,53 +66,18 @@ use crate::{ deserialize_optional_string_seq_from_string, }; -/// This iceberg sink is WIP. When it ready, we will change this name to "iceberg". pub const ICEBERG_SINK: &str = "iceberg"; #[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions, Default)] +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, WithOptions)] pub struct IcebergConfig { - pub connector: String, // Avoid deny unknown field. Must be "iceberg" - pub r#type: String, // accept "append-only" or "upsert" #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub force_append_only: bool, - #[serde(rename = "table.name")] - pub table_name: String, // Full name of table, must include schema name - - #[serde(rename = "database.name")] - pub database_name: Option, - // Database name of table - - // Catalog name, can be omitted for storage catalog, but - // must be set for other catalogs. - #[serde(rename = "catalog.name")] - pub catalog_name: Option, - - // Catalog type supported by iceberg, such as "storage", "rest". - // If not set, we use "storage" as default. - #[serde(rename = "catalog.type")] - pub catalog_type: Option, - - #[serde(rename = "warehouse.path")] - pub path: String, // Path of iceberg warehouse, only applicable in storage catalog. - - #[serde(rename = "catalog.uri")] - pub uri: Option, // URI of iceberg catalog, only applicable in rest catalog. - - #[serde(rename = "s3.region")] - pub region: Option, - - #[serde(rename = "s3.endpoint")] - pub endpoint: Option, - - #[serde(rename = "s3.access.key")] - pub access_key: String, - - #[serde(rename = "s3.secret.key")] - pub secret_key: String, + #[serde(flatten)] + common: IcebergCommon, #[serde( rename = "s3.path.style.access", @@ -185,7 +137,9 @@ impl IcebergConfig { } } - if config.catalog_name.is_none() && config.catalog_type.as_deref() != Some("storage") { + if config.common.catalog_name.is_none() + && config.common.catalog_type.as_deref() != Some("storage") + { return Err(SinkError::Config(anyhow!( "catalog.name must be set for non-storage catalog" ))); @@ -213,466 +167,32 @@ impl IcebergConfig { } pub fn catalog_type(&self) -> &str { - self.catalog_type.as_deref().unwrap_or("storage") - } - - fn catalog_name(&self) -> String { - self.catalog_name - .as_ref() - .map(|s| s.to_string()) - .unwrap_or_else(|| "risingwave".to_string()) - } - - fn full_table_name(&self) -> Result { - let ret = if let Some(database_name) = &self.database_name { - TableIdentifier::new(vec![database_name, &self.table_name]) - } else { - TableIdentifier::new(vec![&self.table_name]) - }; - - ret.context("Failed to create table identifier") - .map_err(|e| SinkError::Iceberg(anyhow!(e))) - } - - fn build_iceberg_configs(&self) -> Result> { - let mut iceberg_configs = HashMap::new(); - - let catalog_type = self.catalog_type().to_string(); - - iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); - iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); - - match catalog_type.as_str() { - "storage" => { - iceberg_configs.insert( - format!("iceberg.catalog.{}.warehouse", self.catalog_name()), - self.path.clone(), - ); - } - "rest" => { - let uri = self.uri.clone().ok_or_else(|| { - SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) - })?; - iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.catalog_name()), uri); - } - _ => { - return Err(SinkError::Iceberg(anyhow!( - "Unsupported catalog type: {}, only support `storage` and `rest`", - catalog_type - ))); - } - } - - if let Some(region) = &self.region { - iceberg_configs.insert( - "iceberg.table.io.region".to_string(), - region.clone().to_string(), - ); - } - - if let Some(endpoint) = &self.endpoint { - iceberg_configs.insert( - "iceberg.table.io.endpoint".to_string(), - endpoint.clone().to_string(), - ); - } - - iceberg_configs.insert( - "iceberg.table.io.access_key_id".to_string(), - self.access_key.clone().to_string(), - ); - iceberg_configs.insert( - "iceberg.table.io.secret_access_key".to_string(), - self.secret_key.clone().to_string(), - ); - if let Some(path_style_access) = self.path_style_access { - iceberg_configs.insert( - "iceberg.table.io.enable_virtual_host_style".to_string(), - (!path_style_access).to_string(), - ); - } - - let (bucket, root) = { - let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - let bucket = url - .host_str() - .ok_or_else(|| { - SinkError::Iceberg(anyhow!("Invalid s3 path: {}, bucket is missing", self.path)) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) - }; - - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); - - // Only storage catalog should set this. - if catalog_type == "storage" { - iceberg_configs.insert("iceberg.table.io.root".to_string(), root); - } - // #TODO - // Support load config file - iceberg_configs.insert( - "iceberg.table.io.disable_config_load".to_string(), - "true".to_string(), - ); - - Ok(iceberg_configs) + self.common.catalog_type() } - fn build_jni_catalog_configs(&self) -> Result<(BaseCatalogConfig, HashMap)> { - let mut iceberg_configs = HashMap::new(); - - let base_catalog_config = { - let catalog_type = self.catalog_type().to_string(); - - iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone()); - iceberg_configs.insert(CATALOG_NAME.to_string(), self.catalog_name()); - - if let Some(region) = &self.region { - // icelake - iceberg_configs.insert( - "iceberg.table.io.region".to_string(), - region.clone().to_string(), - ); - // iceberg-rust - iceberg_configs.insert( - ("iceberg.table.io.".to_string() + S3_REGION).to_string(), - region.clone().to_string(), - ); - } - - if let Some(endpoint) = &self.endpoint { - iceberg_configs.insert( - "iceberg.table.io.endpoint".to_string(), - endpoint.clone().to_string(), - ); - - // iceberg-rust - iceberg_configs.insert( - ("iceberg.table.io.".to_string() + S3_ENDPOINT).to_string(), - endpoint.clone().to_string(), - ); - } - - // icelake - iceberg_configs.insert( - "iceberg.table.io.access_key_id".to_string(), - self.access_key.clone().to_string(), - ); - iceberg_configs.insert( - "iceberg.table.io.secret_access_key".to_string(), - self.secret_key.clone().to_string(), - ); - - // iceberg-rust - iceberg_configs.insert( - ("iceberg.table.io.".to_string() + S3_ACCESS_KEY_ID).to_string(), - self.access_key.clone().to_string(), - ); - iceberg_configs.insert( - ("iceberg.table.io.".to_string() + S3_SECRET_ACCESS_KEY).to_string(), - self.secret_key.clone().to_string(), - ); - - let (bucket, _) = { - let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?; - let bucket = url - .host_str() - .ok_or_else(|| { - SinkError::Iceberg(anyhow!( - "Invalid s3 path: {}, bucket is missing", - self.path - )) - })? - .to_string(); - let root = url.path().trim_start_matches('/').to_string(); - (bucket, root) - }; - - iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket); - // #TODO - // Support load config file - iceberg_configs.insert( - "iceberg.table.io.disable_config_load".to_string(), - "true".to_string(), - ); - - load_iceberg_base_catalog_config(&iceberg_configs)? - }; - - // Prepare jni configs, for details please see https://iceberg.apache.org/docs/latest/aws/ - let mut java_catalog_configs = HashMap::new(); - { - if let Some(uri) = self.uri.as_deref() { - java_catalog_configs.insert("uri".to_string(), uri.to_string()); - } - - java_catalog_configs.insert("warehouse".to_string(), self.path.clone()); - java_catalog_configs.extend(self.java_catalog_props.clone()); - - // Currently we only support s3, so let's set it to s3 - java_catalog_configs.insert( - "io-impl".to_string(), - "org.apache.iceberg.aws.s3.S3FileIO".to_string(), - ); - - if let Some(endpoint) = &self.endpoint { - java_catalog_configs - .insert("s3.endpoint".to_string(), endpoint.clone().to_string()); - } - - java_catalog_configs.insert( - "s3.access-key-id".to_string(), - self.access_key.clone().to_string(), - ); - java_catalog_configs.insert( - "s3.secret-access-key".to_string(), - self.secret_key.clone().to_string(), - ); - - if let Some(path_style_access) = self.path_style_access { - java_catalog_configs.insert( - "s3.path-style-access".to_string(), - path_style_access.to_string(), - ); - } - if matches!(self.catalog_type.as_deref(), Some("glue")) { - java_catalog_configs.insert( - "client.credentials-provider".to_string(), - "com.risingwave.connector.catalog.GlueCredentialProvider".to_string(), - ); - // Use S3 ak/sk and region as glue ak/sk and region by default. - // TODO: use different ak/sk and region for s3 and glue. - java_catalog_configs.insert( - "client.credentials-provider.glue.access-key-id".to_string(), - self.access_key.clone().to_string(), - ); - java_catalog_configs.insert( - "client.credentials-provider.glue.secret-access-key".to_string(), - self.secret_key.clone().to_string(), - ); - if let Some(region) = &self.region { - java_catalog_configs - .insert("client.region".to_string(), region.clone().to_string()); - java_catalog_configs.insert( - "glue.endpoint".to_string(), - format!("https://glue.{}.amazonaws.com", region), - ); - } - } - } - - Ok((base_catalog_config, java_catalog_configs)) - } - - async fn create_catalog(&self) -> ConnectorResult { - match self.catalog_type() { - "storage" | "rest" => { - let iceberg_configs = self.build_iceberg_configs()?; - let catalog = load_catalog(&iceberg_configs).await?; - Ok(catalog) - } - catalog_type - if catalog_type == "hive" || catalog_type == "jdbc" || catalog_type == "glue" => - { - // Create java catalog - let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", - "glue" => "org.apache.iceberg.aws.glue.GlueCatalog", - _ => unreachable!(), - }; - - jni_catalog::JniCatalog::build_catalog( - base_catalog_config, - self.catalog_name(), - catalog_impl, - java_catalog_props, - ) - } - "mock" => Ok(Arc::new(MockCatalog {})), - _ => { - bail!( - "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`", - self.catalog_type() - ) - } - } - } - - pub async fn load_table(&self) -> ConnectorResult
{ - let catalog = self - .create_catalog() + pub async fn create_catalog(&self) -> Result { + self.common + .create_catalog(&self.path_style_access, &self.java_catalog_props) .await - .context("Unable to load iceberg catalog")?; - - let table_id = self - .full_table_name() - .context("Unable to parse table name")?; - - catalog.load_table(&table_id).await.map_err(Into::into) + .map_err(Into::into) } -} -impl IcebergConfig { - fn full_table_name_v2(&self) -> Result { - let ret = if let Some(database_name) = &self.database_name { - TableIdent::from_strs(vec![database_name, &self.table_name]) - } else { - TableIdent::from_strs(vec![&self.table_name]) - }; - - ret.context("Failed to create table identifier") - .map_err(|e| SinkError::Iceberg(anyhow!(e))) - } - - async fn create_catalog_v2(&self) -> ConnectorResult> { - match self.catalog_type() { - "storage" => { - let config = StorageCatalogConfig::builder() - .warehouse(self.path.clone()) - .access_key(self.access_key.clone()) - .secret_key(self.secret_key.clone()) - .region(self.region.clone()) - .endpoint(self.endpoint.clone()) - .build(); - let catalog = storage_catalog::StorageCatalog::new(config)?; - Ok(Arc::new(catalog)) - } - "rest" => { - let mut iceberg_configs = HashMap::new(); - if let Some(region) = &self.region { - iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string()); - } - if let Some(endpoint) = &self.endpoint { - iceberg_configs.insert(S3_ENDPOINT.to_string(), endpoint.clone().to_string()); - } - iceberg_configs.insert( - S3_ACCESS_KEY_ID.to_string(), - self.access_key.clone().to_string(), - ); - iceberg_configs.insert( - S3_SECRET_ACCESS_KEY.to_string(), - self.secret_key.clone().to_string(), - ); - let config = iceberg_catalog_rest::RestCatalogConfig::builder() - .uri(self.uri.clone().ok_or_else(|| { - SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog")) - })?) - .props(iceberg_configs) - .build(); - let catalog = iceberg_catalog_rest::RestCatalog::new(config); - Ok(Arc::new(catalog)) - } - "glue" => { - let mut iceberg_configs = HashMap::new(); - // glue - if let Some(region) = &self.region { - iceberg_configs.insert(AWS_REGION_NAME.to_string(), region.clone().to_string()); - } - iceberg_configs.insert( - AWS_ACCESS_KEY_ID.to_string(), - self.access_key.clone().to_string(), - ); - iceberg_configs.insert( - AWS_SECRET_ACCESS_KEY.to_string(), - self.secret_key.clone().to_string(), - ); - // s3 - if let Some(region) = &self.region { - iceberg_configs.insert(S3_REGION.to_string(), region.clone().to_string()); - } - if let Some(endpoint) = &self.endpoint { - iceberg_configs.insert(S3_ENDPOINT.to_string(), endpoint.clone().to_string()); - } - iceberg_configs.insert( - S3_ACCESS_KEY_ID.to_string(), - self.access_key.clone().to_string(), - ); - iceberg_configs.insert( - S3_SECRET_ACCESS_KEY.to_string(), - self.secret_key.clone().to_string(), - ); - let config_builder = iceberg_catalog_glue::GlueCatalogConfig::builder() - .warehouse(self.path.clone()) - .props(iceberg_configs); - let config = if let Some(uri) = self.uri.as_deref() { - config_builder.uri(uri.to_string()).build() - } else { - config_builder.build() - }; - let catalog = iceberg_catalog_glue::GlueCatalog::new(config).await?; - Ok(Arc::new(catalog)) - } - catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { - // Create java catalog - let (base_catalog_config, java_catalog_props) = self.build_jni_catalog_configs()?; - let catalog_impl = match catalog_type { - "hive" => "org.apache.iceberg.hive.HiveCatalog", - "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", - _ => unreachable!(), - }; - - jni_catalog::JniCatalog::build_catalog_v2( - base_catalog_config, - self.catalog_name(), - catalog_impl, - java_catalog_props, - ) - } - _ => { - bail!( - "Unsupported catalog type: {}, only support `storage`, `rest`, `hive`, `jdbc`, `glue`", - self.catalog_type() - ) - } - } + pub async fn load_table(&self) -> Result
{ + self.common + .load_table(&self.path_style_access, &self.java_catalog_props) + .await + .map_err(Into::into) } - pub async fn load_table_v2(&self) -> ConnectorResult { - let catalog = self - .create_catalog_v2() + pub async fn create_catalog_v2(&self) -> Result> { + self.common + .create_catalog_v2(&self.path_style_access, &self.java_catalog_props) .await - .context("Unable to load iceberg catalog")?; - - let table_id = self - .full_table_name_v2() - .context("Unable to parse table name")?; - - catalog.load_table(&table_id).await.map_err(Into::into) + .map_err(Into::into) } - pub async fn load_table_v2_with_metadata( - &self, - metadata: TableMetadata, - ) -> ConnectorResult { - match self.catalog_type() { - "storage" => { - let config = StorageCatalogConfig::builder() - .warehouse(self.path.clone()) - .access_key(self.access_key.clone()) - .secret_key(self.secret_key.clone()) - .region(self.region.clone()) - .endpoint(self.endpoint.clone()) - .build(); - let storage_catalog = storage_catalog::StorageCatalog::new(config)?; - - let table_id = self - .full_table_name_v2() - .context("Unable to parse table name")?; - - Ok(iceberg::table::Table::builder() - .metadata(metadata) - .identifier(table_id) - .file_io(storage_catalog.file_io().clone()) - // Only support readonly table for storage catalog now. - .readonly(true) - .build()?) - } - _ => self.load_table_v2().await, - } + pub fn full_table_name_v2(&self) -> Result { + self.common.full_table_name_v2().map_err(Into::into) } } @@ -738,7 +258,7 @@ impl IcebergSink { .await .map_err(|e| SinkError::Iceberg(anyhow!(e)))? { - let namespace = if let Some(database_name) = &self.config.database_name { + let namespace = if let Some(database_name) = &self.config.common.database_name { NamespaceIdent::new(database_name.clone()) } else { bail!("database name must be set if you want to create table") @@ -767,16 +287,16 @@ impl IcebergSink { let location = { let mut names = namespace.clone().inner(); - names.push(self.config.table_name.to_string()); - if self.config.path.ends_with('/') { - format!("{}{}", self.config.path, names.join("/")) + names.push(self.config.common.table_name.to_string()); + if self.config.common.warehouse_path.ends_with('/') { + format!("{}{}", self.config.common.warehouse_path, names.join("/")) } else { - format!("{}/{}", self.config.path, names.join("/")) + format!("{}/{}", self.config.common.warehouse_path, names.join("/")) } }; let table_creation = TableCreation::builder() - .name(self.config.table_name.clone()) + .name(self.config.common.table_name.clone()) .schema(iceberg_schema) .location(location) .build(); @@ -1361,6 +881,7 @@ mod test { use risingwave_common::catalog::Field; + use crate::connector_common::IcebergCommon; use crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE; use crate::sink::iceberg::IcebergConfig; use crate::source::DataType; @@ -1425,19 +946,20 @@ mod test { let iceberg_config = IcebergConfig::from_btreemap(values).unwrap(); let expected_iceberg_config = IcebergConfig { - connector: "iceberg".to_string(), + common: IcebergCommon { + warehouse_path: "s3://iceberg".to_string(), + catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()), + region: Some("us-east-1".to_string()), + endpoint: Some("http://127.0.0.1:9301".to_string()), + access_key: "hummockadmin".to_string(), + secret_key: "hummockadmin".to_string(), + catalog_type: Some("jdbc".to_string()), + catalog_name: Some("demo".to_string()), + database_name: Some("demo_db".to_string()), + table_name: "demo_table".to_string(), + }, r#type: "upsert".to_string(), force_append_only: false, - table_name: "demo_table".to_string(), - database_name: Some("demo_db".to_string()), - catalog_name: Some("demo".to_string()), - catalog_type: Some("jdbc".to_string()), - path: "s3://iceberg".to_string(), - uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()), - region: Some("us-east-1".to_string()), - endpoint: Some("http://127.0.0.1:9301".to_string()), - access_key: "hummockadmin".to_string(), - secret_key: "hummockadmin".to_string(), path_style_access: Some(true), primary_key: Some(vec!["v1".to_string()]), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] @@ -1451,7 +973,7 @@ mod test { assert_eq!(iceberg_config, expected_iceberg_config); assert_eq!( - &iceberg_config.full_table_name().unwrap().to_string(), + &iceberg_config.common.full_table_name().unwrap().to_string(), "demo_db.demo_table" ); } diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 0b271d7e4875a..cdeab00649187 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -29,9 +29,9 @@ use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; use serde::{Deserialize, Serialize}; +use crate::connector_common::IcebergCommon; use crate::error::{ConnectorError, ConnectorResult}; use crate::parser::ParserConfig; -use crate::sink::iceberg::IcebergConfig; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceEnumeratorContextRef, SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, UnknownFields, @@ -39,30 +39,11 @@ use crate::source::{ pub const ICEBERG_CONNECTOR: &str = "iceberg"; -#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)] +#[derive(Clone, Debug, Deserialize, with_options::WithOptions)] pub struct IcebergProperties { - #[serde(rename = "catalog.type")] - pub catalog_type: Option, - #[serde(rename = "s3.region")] - pub region: Option, - #[serde(rename = "s3.endpoint")] - pub endpoint: Option, - #[serde(rename = "s3.access.key")] - pub s3_access: String, - #[serde(rename = "s3.secret.key")] - pub s3_secret: String, - #[serde(rename = "warehouse.path")] - pub warehouse_path: String, - // Catalog name, can be omitted for storage catalog, but - // must be set for other catalogs. - #[serde(rename = "catalog.name")] - pub catalog_name: Option, - #[serde(rename = "catalog.uri")] - pub catalog_uri: Option, // URI of iceberg catalog, only applicable in rest catalog. - #[serde(rename = "database.name")] - pub database_name: Option, - #[serde(rename = "table.name")] - pub table_name: String, + #[serde(flatten)] + pub common: IcebergCommon, + // For jdbc catalog #[serde(rename = "catalog.jdbc.user")] pub jdbc_user: Option, @@ -73,8 +54,10 @@ pub struct IcebergProperties { pub unknown_fields: HashMap, } +use iceberg::table::Table as TableV2; + impl IcebergProperties { - pub fn to_iceberg_config(&self) -> IcebergConfig { + pub async fn load_table_v2(&self) -> ConnectorResult { let mut java_catalog_props = HashMap::new(); if let Some(jdbc_user) = self.jdbc_user.clone() { java_catalog_props.insert("jdbc.user".to_string(), jdbc_user); @@ -82,20 +65,25 @@ impl IcebergProperties { if let Some(jdbc_password) = self.jdbc_password.clone() { java_catalog_props.insert("jdbc.password".to_string(), jdbc_password); } - IcebergConfig { - catalog_name: self.catalog_name.clone(), - database_name: self.database_name.clone(), - table_name: self.table_name.clone(), - catalog_type: self.catalog_type.clone(), - uri: self.catalog_uri.clone(), - path: self.warehouse_path.clone(), - endpoint: self.endpoint.clone(), - access_key: self.s3_access.clone(), - secret_key: self.s3_secret.clone(), - region: self.region.clone(), - java_catalog_props, - ..Default::default() + // TODO: support path_style_access and java_catalog_props for iceberg source + self.common.load_table_v2(&None, &java_catalog_props).await + } + + pub async fn load_table_v2_with_metadata( + &self, + table_meta: TableMetadata, + ) -> ConnectorResult { + let mut java_catalog_props = HashMap::new(); + if let Some(jdbc_user) = self.jdbc_user.clone() { + java_catalog_props.insert("jdbc.user".to_string(), jdbc_user); + } + if let Some(jdbc_password) = self.jdbc_password.clone() { + java_catalog_props.insert("jdbc.password".to_string(), jdbc_password); } + // TODO: support path_style_access and java_catalog_props for iceberg source + self.common + .load_table_v2_with_metadata(table_meta, &None, &java_catalog_props) + .await } } @@ -169,7 +157,7 @@ impl SplitMetaData for IcebergSplit { #[derive(Debug, Clone)] pub struct IcebergSplitEnumerator { - config: IcebergConfig, + config: IcebergProperties, } #[async_trait] @@ -181,10 +169,7 @@ impl SplitEnumerator for IcebergSplitEnumerator { properties: Self::Properties, _context: SourceEnumeratorContextRef, ) -> ConnectorResult { - let iceberg_config = properties.to_iceberg_config(); - Ok(Self { - config: iceberg_config, - }) + Ok(Self { config: properties }) } async fn list_splits(&mut self) -> ConnectorResult> { @@ -208,6 +193,7 @@ impl IcebergSplitEnumerator { if batch_parallelism == 0 { bail!("Batch parallelism is 0. Cannot split the iceberg files."); } + let table = self.config.load_table_v2().await?; let current_snapshot = table.metadata().current_snapshot(); diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 065c9394b8a49..7476f5a10789c 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -66,7 +66,7 @@ impl WithOptions for i32 {} impl WithOptions for i64 {} impl WithOptions for f64 {} impl WithOptions for std::time::Duration {} -impl WithOptions for crate::connector_common::mqtt_common::QualityOfService {} +impl WithOptions for crate::connector_common::MqttQualityOfService {} impl WithOptions for crate::sink::kafka::CompressionCodec {} impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {} impl WithOptions for nexmark::config::RateShape {} diff --git a/src/connector/src/with_options_test.rs b/src/connector/src/with_options_test.rs index 4eb4b52fe070d..61862a7e572dc 100644 --- a/src/connector/src/with_options_test.rs +++ b/src/connector/src/with_options_test.rs @@ -20,40 +20,42 @@ use itertools::Itertools; use quote::ToTokens; use serde::Serialize; use syn::{parse_file, Attribute, Field, Item, ItemFn, Lit, Meta, MetaNameValue, NestedMeta, Type}; +use walkdir::{DirEntry, WalkDir}; fn connector_crate_path() -> PathBuf { let connector_crate_path = env::var("CARGO_MANIFEST_DIR").unwrap(); Path::new(&connector_crate_path).to_path_buf() } -fn source_mod_path() -> PathBuf { - connector_crate_path().join("src").join("source") -} - -fn sink_mod_path() -> PathBuf { - connector_crate_path().join("src").join("sink") -} - -fn common_mod_path() -> PathBuf { - connector_crate_path() - .join("src") - .join("connector_common") - .join("common.rs") -} - -fn mqtt_common_mod_path() -> PathBuf { - connector_crate_path() - .join("src") - .join("connector_common") - .join("mqtt_common.rs") +fn common_files() -> impl IntoIterator> { + WalkDir::new( + connector_crate_path() + .join("src") + .join("connector_common") + .join("common.rs"), + ) + .into_iter() + .chain(WalkDir::new( + connector_crate_path() + .join("src") + .join("connector_common") + .join("mqtt_common.rs"), + )) + .chain(WalkDir::new( + connector_crate_path() + .join("src") + .join("connector_common") + .join("iceberg") + .join("mod.rs"), + )) } pub fn generate_with_options_yaml_source() -> String { - generate_with_options_yaml_inner(&source_mod_path()) + generate_with_options_yaml_inner(&connector_crate_path().join("src").join("source")) } pub fn generate_with_options_yaml_sink() -> String { - generate_with_options_yaml_inner(&sink_mod_path()) + generate_with_options_yaml_inner(&connector_crate_path().join("src").join("sink")) } /// Collect all structs with `#[derive(WithOptions)]` in the `.rs` files in `path` (plus `common.rs`), @@ -72,8 +74,7 @@ fn generate_with_options_yaml_inner(path: &Path) -> String { // Recursively list all the .rs files for entry in walkdir::WalkDir::new(path) .into_iter() - .chain(walkdir::WalkDir::new(common_mod_path())) - .chain(walkdir::WalkDir::new(mqtt_common_mod_path())) + .chain(common_files()) { let entry = entry.expect("Failed to read directory entry"); if entry.path().extension() == Some("rs".as_ref()) { diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 72e800e82a48b..347fa10f5ac92 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -284,9 +284,6 @@ GooglePubSubConfig: required: false IcebergConfig: fields: - - name: connector - field_type: String - required: true - name: r#type field_type: String required: true @@ -294,35 +291,41 @@ IcebergConfig: field_type: bool required: false default: Default::default - - name: table.name - field_type: String - required: true - - name: database.name + - name: catalog.type field_type: String required: false - - name: catalog.name + - name: s3.region field_type: String required: false - - name: catalog.type + - name: s3.endpoint field_type: String required: false + - name: s3.access.key + field_type: String + required: true + - name: s3.secret.key + field_type: String + required: true - name: warehouse.path field_type: String + comments: Path of iceberg warehouse, only applicable in storage catalog. required: true - - name: catalog.uri + - name: catalog.name field_type: String + comments: |- + Catalog name, can be omitted for storage catalog, but + must be set for other catalogs. required: false - - name: s3.region + - name: catalog.uri field_type: String + comments: URI of iceberg catalog, only applicable in rest catalog. required: false - - name: s3.endpoint + - name: database.name field_type: String required: false - - name: s3.access.key - field_type: String - required: true - - name: s3.secret.key + - name: table.name field_type: String + comments: Full name of table, must include schema name. required: true - name: s3.path.style.access field_type: bool diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 20ee03949396f..50222280e8b1b 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -98,18 +98,24 @@ IcebergProperties: required: true - name: warehouse.path field_type: String + comments: Path of iceberg warehouse, only applicable in storage catalog. required: true - name: catalog.name field_type: String + comments: |- + Catalog name, can be omitted for storage catalog, but + must be set for other catalogs. required: false - name: catalog.uri field_type: String + comments: URI of iceberg catalog, only applicable in rest catalog. required: false - name: database.name field_type: String required: false - name: table.name field_type: String + comments: Full name of table, must include schema name. required: true - name: catalog.jdbc.user field_type: String diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs index b025723857b1e..2e6e6af6cae32 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_files.rs @@ -19,7 +19,6 @@ use futures::StreamExt; use iceberg::spec::ManifestList; use iceberg::table::Table; use risingwave_common::types::Fields; -use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; use risingwave_connector::WithPropertiesExt; use risingwave_frontend_macro::system_catalog; @@ -81,8 +80,7 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result> { for (schema_name, source) in iceberg_sources { let config = ConnectorProperties::extract(source.with_properties.clone(), false)?; if let ConnectorProperties::Iceberg(iceberg_properties) = config { - let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); - let table: Table = iceberg_config.load_table_v2().await?; + let table: Table = iceberg_properties.load_table_v2().await?; if let Some(snapshot) = table.metadata().current_snapshot() { let manifest_list: ManifestList = snapshot .load_manifest_list(table.file_io(), table.metadata()) diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs index 3c60236f96e66..8ef3229128373 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_iceberg_snapshots.rs @@ -18,7 +18,6 @@ use iceberg::table::Table; use jsonbb::{Value, ValueRef}; use risingwave_common::types::{Fields, JsonbVal, Timestamptz}; use risingwave_connector::error::ConnectorResult; -use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::ConnectorProperties; use risingwave_connector::WithPropertiesExt; use risingwave_frontend_macro::system_catalog; @@ -60,8 +59,7 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result> for (schema_name, source) in iceberg_sources { let config = ConnectorProperties::extract(source.with_properties.clone(), false)?; if let ConnectorProperties::Iceberg(iceberg_properties) = config { - let iceberg_config: IcebergConfig = iceberg_properties.to_iceberg_config(); - let table: Table = iceberg_config.load_table_v2().await?; + let table: Table = iceberg_properties.load_table_v2().await?; let snapshots: ConnectorResult> = table .metadata() diff --git a/src/frontend/src/handler/create_source.rs b/src/frontend/src/handler/create_source.rs index f1535fa769b28..a3f668124e4a0 100644 --- a/src/frontend/src/handler/create_source.rs +++ b/src/frontend/src/handler/create_source.rs @@ -42,7 +42,6 @@ use risingwave_connector::schema::schema_registry::{ name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME, }; use risingwave_connector::schema::AWS_GLUE_SCHEMA_ARN_KEY; -use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::cdc::{ CDC_AUTO_SCHEMA_CHANGE_KEY, CDC_SHARING_MODE_KEY, CDC_SNAPSHOT_BACKFILL, CDC_SNAPSHOT_MODE_KEY, CDC_TRANSACTIONAL_KEY, CDC_WAIT_FOR_STREAMING_START_TIMEOUT, CITUS_CDC_CONNECTOR, @@ -1351,8 +1350,7 @@ pub async fn extract_iceberg_columns( ) -> anyhow::Result> { let props = ConnectorProperties::extract(with_properties.clone(), true)?; if let ConnectorProperties::Iceberg(properties) = props { - let iceberg_config: IcebergConfig = properties.to_iceberg_config(); - let table = iceberg_config.load_table_v2().await?; + let table = properties.load_table_v2().await?; let iceberg_schema: arrow_schema_iceberg::Schema = iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?; @@ -1394,8 +1392,6 @@ pub async fn check_iceberg_source( ))); }; - let iceberg_config = properties.to_iceberg_config(); - let schema = Schema { fields: columns .iter() @@ -1404,7 +1400,7 @@ pub async fn check_iceberg_source( .collect(), }; - let table = iceberg_config.load_table_v2().await?; + let table = properties.load_table_v2().await?; let iceberg_schema = iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?; diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index fd33d2dba0035..bf77196e6eb3a 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -30,6 +30,8 @@ use crate::optimizer::plan_node::expr_visitable::ExprVisitable; use crate::optimizer::property::{Distribution, Order}; /// [`BatchSource`] represents a table/connector source at the very beginning of the graph. +/// +/// For supported batch connectors, see [`crate::scheduler::plan_fragmenter::SourceScanInfo`]. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchSource { pub base: PlanBase,