diff --git a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt index 832a7b781f7fb..ddfc07220f2b3 100644 --- a/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt +++ b/e2e_test/iceberg/test_case/iceberg_select_empty_table.slt @@ -36,6 +36,7 @@ WITH ( s3.region = 'us-east-1', s3.access.key = 'hummockadmin', s3.secret.key = 'hummockadmin', + s3.path.style.access = 'true', catalog.type = 'storage', warehouse.path = 's3a://icebergdata/demo', database.name = 'demo_db', diff --git a/src/connector/src/connector_common/iceberg/mod.rs b/src/connector/src/connector_common/iceberg/mod.rs index 37d4e5e6f5a08..bf96e474eee80 100644 --- a/src/connector/src/connector_common/iceberg/mod.rs +++ b/src/connector/src/connector_common/iceberg/mod.rs @@ -15,7 +15,6 @@ mod jni_catalog; mod mock_catalog; mod storage_catalog; - use std::collections::HashMap; use std::sync::Arc; @@ -30,6 +29,7 @@ use serde_with::serde_as; use url::Url; use with_options::WithOptions; +use crate::deserialize_optional_bool_from_string; use crate::error::ConnectorResult; #[serde_as] @@ -62,6 +62,13 @@ pub struct IcebergCommon { /// Full name of table, must include schema name. #[serde(rename = "table.name")] pub table_name: String, + + #[serde( + rename = "s3.path.style.access", + default, + deserialize_with = "deserialize_optional_bool_from_string" + )] + pub path_style_access: Option, } impl IcebergCommon { @@ -79,7 +86,6 @@ impl IcebergCommon { /// For both V1 and V2. fn build_jni_catalog_configs( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult<(BaseCatalogConfig, HashMap)> { let mut iceberg_configs = HashMap::new(); @@ -193,7 +199,7 @@ impl IcebergCommon { self.secret_key.clone().to_string(), ); - if let Some(path_style_access) = path_style_access { + if let Some(path_style_access) = self.path_style_access { java_catalog_configs.insert( "s3.path-style-access".to_string(), path_style_access.to_string(), @@ -247,10 +253,7 @@ mod v1 { Ok(ret.context("Failed to create table identifier")?) } - fn build_iceberg_configs( - &self, - path_style_access: &Option, - ) -> ConnectorResult> { + fn build_iceberg_configs(&self) -> ConnectorResult> { let mut iceberg_configs = HashMap::new(); let catalog_type = self.catalog_type().to_string(); @@ -303,7 +306,7 @@ mod v1 { "iceberg.table.io.secret_access_key".to_string(), self.secret_key.clone().to_string(), ); - if let Some(path_style_access) = path_style_access { + if let Some(path_style_access) = self.path_style_access { iceberg_configs.insert( "iceberg.table.io.enable_virtual_host_style".to_string(), (!path_style_access).to_string(), @@ -345,12 +348,11 @@ mod v1 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn create_catalog( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { match self.catalog_type() { "storage" | "rest" => { - let iceberg_configs = self.build_iceberg_configs(path_style_access)?; + let iceberg_configs = self.build_iceberg_configs()?; let catalog = load_catalog(&iceberg_configs).await?; Ok(catalog) } @@ -361,7 +363,7 @@ mod v1 { { // Create java catalog let (base_catalog_config, java_catalog_props) = - self.build_jni_catalog_configs(path_style_access, java_catalog_props)?; + self.build_jni_catalog_configs(java_catalog_props)?; let catalog_impl = match catalog_type { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", @@ -389,11 +391,10 @@ mod v1 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn load_table( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { let catalog = self - .create_catalog(path_style_access, java_catalog_props) + .create_catalog(java_catalog_props) .await .context("Unable to load iceberg catalog")?; @@ -429,7 +430,6 @@ mod v2 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn create_catalog_v2( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult> { match self.catalog_type() { @@ -515,7 +515,7 @@ mod v2 { catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => { // Create java catalog let (base_catalog_config, java_catalog_props) = - self.build_jni_catalog_configs(path_style_access, java_catalog_props)?; + self.build_jni_catalog_configs(java_catalog_props)?; let catalog_impl = match catalog_type { "hive" => "org.apache.iceberg.hive.HiveCatalog", "jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog", @@ -541,11 +541,10 @@ mod v2 { /// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer. pub async fn load_table_v2( &self, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { let catalog = self - .create_catalog_v2(path_style_access, java_catalog_props) + .create_catalog_v2(java_catalog_props) .await .context("Unable to load iceberg catalog")?; @@ -559,7 +558,6 @@ mod v2 { pub async fn load_table_v2_with_metadata( &self, metadata: TableMetadata, - path_style_access: &Option, java_catalog_props: &HashMap, ) -> ConnectorResult { match self.catalog_type() { @@ -585,10 +583,7 @@ mod v2 { .readonly(true) .build()?) } - _ => { - self.load_table_v2(path_style_access, java_catalog_props) - .await - } + _ => self.load_table_v2(java_catalog_props).await, } } } diff --git a/src/connector/src/parser/protobuf/recursive.rs b/src/connector/src/parser/protobuf/recursive.rs new file mode 100644 index 0000000000000..dc367eb5f70cd --- /dev/null +++ b/src/connector/src/parser/protobuf/recursive.rs @@ -0,0 +1,157 @@ +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ComplexRecursiveMessage { + #[prost(string, tag = "1")] + pub node_name: ::prost::alloc::string::String, + #[prost(int32, tag = "2")] + pub node_id: i32, + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "4")] + pub parent: ::core::option::Option, + #[prost(message, repeated, tag = "5")] + pub children: ::prost::alloc::vec::Vec, +} +/// Nested message and enum types in `ComplexRecursiveMessage`. +pub mod complex_recursive_message { + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Attributes { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub value: ::prost::alloc::string::String, + } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Parent { + #[prost(string, tag = "1")] + pub parent_name: ::prost::alloc::string::String, + #[prost(int32, tag = "2")] + pub parent_id: i32, + #[prost(message, repeated, tag = "3")] + pub siblings: ::prost::alloc::vec::Vec, + } +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AllTypes { + /// standard types + #[prost(double, tag = "1")] + pub double_field: f64, + #[prost(float, tag = "2")] + pub float_field: f32, + #[prost(int32, tag = "3")] + pub int32_field: i32, + #[prost(int64, tag = "4")] + pub int64_field: i64, + #[prost(uint32, tag = "5")] + pub uint32_field: u32, + #[prost(uint64, tag = "6")] + pub uint64_field: u64, + #[prost(sint32, tag = "7")] + pub sint32_field: i32, + #[prost(sint64, tag = "8")] + pub sint64_field: i64, + #[prost(fixed32, tag = "9")] + pub fixed32_field: u32, + #[prost(fixed64, tag = "10")] + pub fixed64_field: u64, + #[prost(sfixed32, tag = "11")] + pub sfixed32_field: i32, + #[prost(sfixed64, tag = "12")] + pub sfixed64_field: i64, + #[prost(bool, tag = "13")] + pub bool_field: bool, + #[prost(string, tag = "14")] + pub string_field: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "15")] + pub bytes_field: ::prost::alloc::vec::Vec, + #[prost(enumeration = "all_types::EnumType", tag = "16")] + pub enum_field: i32, + #[prost(message, optional, tag = "17")] + pub nested_message_field: ::core::option::Option, + /// repeated field + #[prost(int32, repeated, tag = "18")] + pub repeated_int_field: ::prost::alloc::vec::Vec, + /// timestamp + #[prost(message, optional, tag = "23")] + pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>, + /// duration + #[prost(message, optional, tag = "24")] + pub duration_field: ::core::option::Option<::prost_types::Duration>, + /// any + #[prost(message, optional, tag = "25")] + pub any_field: ::core::option::Option<::prost_types::Any>, + /// wrapper types + #[prost(message, optional, tag = "27")] + pub int32_value_field: ::core::option::Option, + #[prost(message, optional, tag = "28")] + pub string_value_field: ::core::option::Option<::prost::alloc::string::String>, + /// oneof field + #[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")] + pub example_oneof: ::core::option::Option, +} +/// Nested message and enum types in `AllTypes`. +pub mod all_types { + /// nested message + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct NestedMessage { + #[prost(int32, tag = "1")] + pub id: i32, + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + } + /// enum + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum EnumType { + Default = 0, + Option1 = 1, + Option2 = 2, + } + impl EnumType { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + EnumType::Default => "DEFAULT", + EnumType::Option1 => "OPTION1", + EnumType::Option2 => "OPTION2", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "DEFAULT" => Some(Self::Default), + "OPTION1" => Some(Self::Option1), + "OPTION2" => Some(Self::Option2), + _ => None, + } + } + } + /// oneof field + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum ExampleOneof { + #[prost(string, tag = "19")] + OneofString(::prost::alloc::string::String), + #[prost(int32, tag = "20")] + OneofInt32(i32), + #[prost(enumeration = "EnumType", tag = "21")] + OneofEnum(i32), + } +} diff --git a/src/connector/src/sink/iceberg/mod.rs b/src/connector/src/sink/iceberg/mod.rs index d19ea5a3c0f84..e2f00afd5b525 100644 --- a/src/connector/src/sink/iceberg/mod.rs +++ b/src/connector/src/sink/iceberg/mod.rs @@ -64,10 +64,7 @@ use crate::connector_common::IcebergCommon; use crate::sink::coordinate::CoordinatedSinkWriter; use crate::sink::writer::SinkWriter; use crate::sink::{Result, SinkCommitCoordinator, SinkParam}; -use crate::{ - deserialize_bool_from_string, deserialize_optional_bool_from_string, - deserialize_optional_string_seq_from_string, -}; +use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string}; pub const ICEBERG_SINK: &str = "iceberg"; @@ -82,13 +79,6 @@ pub struct IcebergConfig { #[serde(flatten)] common: IcebergCommon, - #[serde( - rename = "s3.path.style.access", - default, - deserialize_with = "deserialize_optional_bool_from_string" - )] - pub path_style_access: Option, - #[serde( rename = "primary_key", default, @@ -175,21 +165,21 @@ impl IcebergConfig { pub async fn create_catalog(&self) -> Result { self.common - .create_catalog(&self.path_style_access, &self.java_catalog_props) + .create_catalog(&self.java_catalog_props) .await .map_err(Into::into) } pub async fn load_table(&self) -> Result
{ self.common - .load_table(&self.path_style_access, &self.java_catalog_props) + .load_table(&self.java_catalog_props) .await .map_err(Into::into) } pub async fn create_catalog_v2(&self) -> Result> { self.common - .create_catalog_v2(&self.path_style_access, &self.java_catalog_props) + .create_catalog_v2(&self.java_catalog_props) .await .map_err(Into::into) } @@ -1018,10 +1008,10 @@ mod test { catalog_name: Some("demo".to_string()), database_name: Some("demo_db".to_string()), table_name: "demo_table".to_string(), + path_style_access: Some(true), }, r#type: "upsert".to_string(), force_append_only: false, - path_style_access: Some(true), primary_key: Some(vec!["v1".to_string()]), java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")] .into_iter() diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index cdeab00649187..60a26e43e1d31 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -65,8 +65,8 @@ impl IcebergProperties { if let Some(jdbc_password) = self.jdbc_password.clone() { java_catalog_props.insert("jdbc.password".to_string(), jdbc_password); } - // TODO: support path_style_access and java_catalog_props for iceberg source - self.common.load_table_v2(&None, &java_catalog_props).await + // TODO: support java_catalog_props for iceberg source + self.common.load_table_v2(&java_catalog_props).await } pub async fn load_table_v2_with_metadata( @@ -82,7 +82,7 @@ impl IcebergProperties { } // TODO: support path_style_access and java_catalog_props for iceberg source self.common - .load_table_v2_with_metadata(table_meta, &None, &java_catalog_props) + .load_table_v2_with_metadata(table_meta, &java_catalog_props) .await } } diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c4d9604a8f053..dc8d31d281be9 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -117,6 +117,10 @@ IcebergProperties: field_type: String comments: Full name of table, must include schema name. required: true + - name: s3.path.style.access + field_type: bool + required: false + default: Default::default - name: catalog.jdbc.user field_type: String required: false