Skip to content

Commit

Permalink
support s3 path style for iceberg source
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Oct 24, 2024
1 parent 707a0d2 commit 78ede6a
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 40 deletions.
1 change: 1 addition & 0 deletions e2e_test/iceberg/test_case/iceberg_select_empty_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
Expand Down
39 changes: 17 additions & 22 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
mod jni_catalog;
mod mock_catalog;
mod storage_catalog;

use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -30,6 +29,7 @@ use serde_with::serde_as;
use url::Url;
use with_options::WithOptions;

use crate::deserialize_optional_bool_from_string;
use crate::error::ConnectorResult;

#[serde_as]
Expand Down Expand Up @@ -62,6 +62,13 @@ pub struct IcebergCommon {
/// Full name of table, must include schema name.
#[serde(rename = "table.name")]
pub table_name: String,

#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: Option<bool>,
}

impl IcebergCommon {
Expand All @@ -79,7 +86,6 @@ impl IcebergCommon {
/// For both V1 and V2.
fn build_jni_catalog_configs(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<(BaseCatalogConfig, HashMap<String, String>)> {
let mut iceberg_configs = HashMap::new();
Expand Down Expand Up @@ -193,7 +199,7 @@ impl IcebergCommon {
self.secret_key.clone().to_string(),
);

if let Some(path_style_access) = path_style_access {
if let Some(path_style_access) = self.path_style_access {
java_catalog_configs.insert(
"s3.path-style-access".to_string(),
path_style_access.to_string(),
Expand Down Expand Up @@ -247,10 +253,7 @@ mod v1 {
Ok(ret.context("Failed to create table identifier")?)
}

fn build_iceberg_configs(
&self,
path_style_access: &Option<bool>,
) -> ConnectorResult<HashMap<String, String>> {
fn build_iceberg_configs(&self) -> ConnectorResult<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let catalog_type = self.catalog_type().to_string();
Expand Down Expand Up @@ -303,7 +306,7 @@ mod v1 {
"iceberg.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);
if let Some(path_style_access) = path_style_access {
if let Some(path_style_access) = self.path_style_access {
iceberg_configs.insert(
"iceberg.table.io.enable_virtual_host_style".to_string(),
(!path_style_access).to_string(),
Expand Down Expand Up @@ -345,12 +348,11 @@ mod v1 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn create_catalog(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<CatalogRef> {
match self.catalog_type() {
"storage" | "rest" => {
let iceberg_configs = self.build_iceberg_configs(path_style_access)?;
let iceberg_configs = self.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs).await?;
Ok(catalog)
}
Expand All @@ -361,7 +363,7 @@ mod v1 {
{
// Create java catalog
let (base_catalog_config, java_catalog_props) =
self.build_jni_catalog_configs(path_style_access, java_catalog_props)?;
self.build_jni_catalog_configs(java_catalog_props)?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
Expand Down Expand Up @@ -389,11 +391,10 @@ mod v1 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn load_table(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<Table> {
let catalog = self
.create_catalog(path_style_access, java_catalog_props)
.create_catalog(java_catalog_props)
.await
.context("Unable to load iceberg catalog")?;

Expand Down Expand Up @@ -429,7 +430,6 @@ mod v2 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn create_catalog_v2(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<Arc<dyn CatalogV2>> {
match self.catalog_type() {
Expand Down Expand Up @@ -515,7 +515,7 @@ mod v2 {
catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => {
// Create java catalog
let (base_catalog_config, java_catalog_props) =
self.build_jni_catalog_configs(path_style_access, java_catalog_props)?;
self.build_jni_catalog_configs(java_catalog_props)?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
Expand All @@ -541,11 +541,10 @@ mod v2 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn load_table_v2(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<TableV2> {
let catalog = self
.create_catalog_v2(path_style_access, java_catalog_props)
.create_catalog_v2(java_catalog_props)
.await
.context("Unable to load iceberg catalog")?;

Expand All @@ -559,7 +558,6 @@ mod v2 {
pub async fn load_table_v2_with_metadata(
&self,
metadata: TableMetadata,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<TableV2> {
match self.catalog_type() {
Expand All @@ -585,10 +583,7 @@ mod v2 {
.readonly(true)
.build()?)
}
_ => {
self.load_table_v2(path_style_access, java_catalog_props)
.await
}
_ => self.load_table_v2(java_catalog_props).await,
}
}
}
Expand Down
157 changes: 157 additions & 0 deletions src/connector/src/parser/protobuf/recursive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ComplexRecursiveMessage {
#[prost(string, tag = "1")]
pub node_name: ::prost::alloc::string::String,
#[prost(int32, tag = "2")]
pub node_id: i32,
#[prost(message, repeated, tag = "3")]
pub attributes: ::prost::alloc::vec::Vec<complex_recursive_message::Attributes>,
#[prost(message, optional, tag = "4")]
pub parent: ::core::option::Option<complex_recursive_message::Parent>,
#[prost(message, repeated, tag = "5")]
pub children: ::prost::alloc::vec::Vec<ComplexRecursiveMessage>,
}
/// Nested message and enum types in `ComplexRecursiveMessage`.
pub mod complex_recursive_message {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Attributes {
#[prost(string, tag = "1")]
pub key: ::prost::alloc::string::String,
#[prost(string, tag = "2")]
pub value: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Parent {
#[prost(string, tag = "1")]
pub parent_name: ::prost::alloc::string::String,
#[prost(int32, tag = "2")]
pub parent_id: i32,
#[prost(message, repeated, tag = "3")]
pub siblings: ::prost::alloc::vec::Vec<super::ComplexRecursiveMessage>,
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct AllTypes {
/// standard types
#[prost(double, tag = "1")]
pub double_field: f64,
#[prost(float, tag = "2")]
pub float_field: f32,
#[prost(int32, tag = "3")]
pub int32_field: i32,
#[prost(int64, tag = "4")]
pub int64_field: i64,
#[prost(uint32, tag = "5")]
pub uint32_field: u32,
#[prost(uint64, tag = "6")]
pub uint64_field: u64,
#[prost(sint32, tag = "7")]
pub sint32_field: i32,
#[prost(sint64, tag = "8")]
pub sint64_field: i64,
#[prost(fixed32, tag = "9")]
pub fixed32_field: u32,
#[prost(fixed64, tag = "10")]
pub fixed64_field: u64,
#[prost(sfixed32, tag = "11")]
pub sfixed32_field: i32,
#[prost(sfixed64, tag = "12")]
pub sfixed64_field: i64,
#[prost(bool, tag = "13")]
pub bool_field: bool,
#[prost(string, tag = "14")]
pub string_field: ::prost::alloc::string::String,
#[prost(bytes = "vec", tag = "15")]
pub bytes_field: ::prost::alloc::vec::Vec<u8>,
#[prost(enumeration = "all_types::EnumType", tag = "16")]
pub enum_field: i32,
#[prost(message, optional, tag = "17")]
pub nested_message_field: ::core::option::Option<all_types::NestedMessage>,
/// repeated field
#[prost(int32, repeated, tag = "18")]
pub repeated_int_field: ::prost::alloc::vec::Vec<i32>,
/// timestamp
#[prost(message, optional, tag = "23")]
pub timestamp_field: ::core::option::Option<::prost_types::Timestamp>,
/// duration
#[prost(message, optional, tag = "24")]
pub duration_field: ::core::option::Option<::prost_types::Duration>,
/// any
#[prost(message, optional, tag = "25")]
pub any_field: ::core::option::Option<::prost_types::Any>,
/// wrapper types
#[prost(message, optional, tag = "27")]
pub int32_value_field: ::core::option::Option<i32>,
#[prost(message, optional, tag = "28")]
pub string_value_field: ::core::option::Option<::prost::alloc::string::String>,
/// oneof field
#[prost(oneof = "all_types::ExampleOneof", tags = "19, 20, 21")]
pub example_oneof: ::core::option::Option<all_types::ExampleOneof>,
}
/// Nested message and enum types in `AllTypes`.
pub mod all_types {
/// nested message
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct NestedMessage {
#[prost(int32, tag = "1")]
pub id: i32,
#[prost(string, tag = "2")]
pub name: ::prost::alloc::string::String,
}
/// enum
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[repr(i32)]
pub enum EnumType {
Default = 0,
Option1 = 1,
Option2 = 2,
}
impl EnumType {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
EnumType::Default => "DEFAULT",
EnumType::Option1 => "OPTION1",
EnumType::Option2 => "OPTION2",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"DEFAULT" => Some(Self::Default),
"OPTION1" => Some(Self::Option1),
"OPTION2" => Some(Self::Option2),
_ => None,
}
}
}
/// oneof field
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum ExampleOneof {
#[prost(string, tag = "19")]
OneofString(::prost::alloc::string::String),
#[prost(int32, tag = "20")]
OneofInt32(i32),
#[prost(enumeration = "EnumType", tag = "21")]
OneofEnum(i32),
}
}
20 changes: 5 additions & 15 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ use crate::connector_common::IcebergCommon;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
use crate::{
deserialize_bool_from_string, deserialize_optional_bool_from_string,
deserialize_optional_string_seq_from_string,
};
use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};

pub const ICEBERG_SINK: &str = "iceberg";

Expand All @@ -82,13 +79,6 @@ pub struct IcebergConfig {
#[serde(flatten)]
common: IcebergCommon,

#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: Option<bool>,

#[serde(
rename = "primary_key",
default,
Expand Down Expand Up @@ -175,21 +165,21 @@ impl IcebergConfig {

pub async fn create_catalog(&self) -> Result<CatalogRef> {
self.common
.create_catalog(&self.path_style_access, &self.java_catalog_props)
.create_catalog(&self.java_catalog_props)
.await
.map_err(Into::into)
}

pub async fn load_table(&self) -> Result<Table> {
self.common
.load_table(&self.path_style_access, &self.java_catalog_props)
.load_table(&self.java_catalog_props)
.await
.map_err(Into::into)
}

pub async fn create_catalog_v2(&self) -> Result<Arc<dyn CatalogV2>> {
self.common
.create_catalog_v2(&self.path_style_access, &self.java_catalog_props)
.create_catalog_v2(&self.java_catalog_props)
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -1018,10 +1008,10 @@ mod test {
catalog_name: Some("demo".to_string()),
database_name: Some("demo_db".to_string()),
table_name: "demo_table".to_string(),
path_style_access: Some(true),
},
r#type: "upsert".to_string(),
force_append_only: false,
path_style_access: Some(true),
primary_key: Some(vec!["v1".to_string()]),
java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
.into_iter()
Expand Down
Loading

0 comments on commit 78ede6a

Please sign in to comment.