Skip to content

Commit

Permalink
feat(iceberg): support s3 path style for iceberg source (#19111)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Oct 25, 2024
1 parent ad0bf17 commit ca75c21
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 40 deletions.
1 change: 1 addition & 0 deletions e2e_test/iceberg/test_case/iceberg_select_empty_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ WITH (
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin',
s3.path.style.access = 'true',
catalog.type = 'storage',
warehouse.path = 's3a://icebergdata/demo',
database.name = 'demo_db',
Expand Down
39 changes: 17 additions & 22 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
mod jni_catalog;
mod mock_catalog;
mod storage_catalog;

use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -30,6 +29,7 @@ use serde_with::serde_as;
use url::Url;
use with_options::WithOptions;

use crate::deserialize_optional_bool_from_string;
use crate::error::ConnectorResult;

#[serde_as]
Expand Down Expand Up @@ -62,6 +62,13 @@ pub struct IcebergCommon {
/// Full name of table, must include schema name.
#[serde(rename = "table.name")]
pub table_name: String,

#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: Option<bool>,
}

impl IcebergCommon {
Expand All @@ -79,7 +86,6 @@ impl IcebergCommon {
/// For both V1 and V2.
fn build_jni_catalog_configs(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<(BaseCatalogConfig, HashMap<String, String>)> {
let mut iceberg_configs = HashMap::new();
Expand Down Expand Up @@ -193,7 +199,7 @@ impl IcebergCommon {
self.secret_key.clone().to_string(),
);

if let Some(path_style_access) = path_style_access {
if let Some(path_style_access) = self.path_style_access {
java_catalog_configs.insert(
"s3.path-style-access".to_string(),
path_style_access.to_string(),
Expand Down Expand Up @@ -247,10 +253,7 @@ mod v1 {
Ok(ret.context("Failed to create table identifier")?)
}

fn build_iceberg_configs(
&self,
path_style_access: &Option<bool>,
) -> ConnectorResult<HashMap<String, String>> {
fn build_iceberg_configs(&self) -> ConnectorResult<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let catalog_type = self.catalog_type().to_string();
Expand Down Expand Up @@ -303,7 +306,7 @@ mod v1 {
"iceberg.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);
if let Some(path_style_access) = path_style_access {
if let Some(path_style_access) = self.path_style_access {
iceberg_configs.insert(
"iceberg.table.io.enable_virtual_host_style".to_string(),
(!path_style_access).to_string(),
Expand Down Expand Up @@ -345,12 +348,11 @@ mod v1 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn create_catalog(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<CatalogRef> {
match self.catalog_type() {
"storage" | "rest" => {
let iceberg_configs = self.build_iceberg_configs(path_style_access)?;
let iceberg_configs = self.build_iceberg_configs()?;
let catalog = load_catalog(&iceberg_configs).await?;
Ok(catalog)
}
Expand All @@ -361,7 +363,7 @@ mod v1 {
{
// Create java catalog
let (base_catalog_config, java_catalog_props) =
self.build_jni_catalog_configs(path_style_access, java_catalog_props)?;
self.build_jni_catalog_configs(java_catalog_props)?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
Expand Down Expand Up @@ -389,11 +391,10 @@ mod v1 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn load_table(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<Table> {
let catalog = self
.create_catalog(path_style_access, java_catalog_props)
.create_catalog(java_catalog_props)
.await
.context("Unable to load iceberg catalog")?;

Expand Down Expand Up @@ -429,7 +430,6 @@ mod v2 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn create_catalog_v2(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<Arc<dyn CatalogV2>> {
match self.catalog_type() {
Expand Down Expand Up @@ -515,7 +515,7 @@ mod v2 {
catalog_type if catalog_type == "hive" || catalog_type == "jdbc" => {
// Create java catalog
let (base_catalog_config, java_catalog_props) =
self.build_jni_catalog_configs(path_style_access, java_catalog_props)?;
self.build_jni_catalog_configs(java_catalog_props)?;
let catalog_impl = match catalog_type {
"hive" => "org.apache.iceberg.hive.HiveCatalog",
"jdbc" => "org.apache.iceberg.jdbc.JdbcCatalog",
Expand All @@ -541,11 +541,10 @@ mod v2 {
/// TODO: remove the arguments and put them into `IcebergCommon`. Currently the handling in source and sink are different, so pass them separately to be safer.
pub async fn load_table_v2(
&self,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<TableV2> {
let catalog = self
.create_catalog_v2(path_style_access, java_catalog_props)
.create_catalog_v2(java_catalog_props)
.await
.context("Unable to load iceberg catalog")?;

Expand All @@ -559,7 +558,6 @@ mod v2 {
pub async fn load_table_v2_with_metadata(
&self,
metadata: TableMetadata,
path_style_access: &Option<bool>,
java_catalog_props: &HashMap<String, String>,
) -> ConnectorResult<TableV2> {
match self.catalog_type() {
Expand All @@ -585,10 +583,7 @@ mod v2 {
.readonly(true)
.build()?)
}
_ => {
self.load_table_v2(path_style_access, java_catalog_props)
.await
}
_ => self.load_table_v2(java_catalog_props).await,
}
}
}
Expand Down
20 changes: 5 additions & 15 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ use crate::connector_common::IcebergCommon;
use crate::sink::coordinate::CoordinatedSinkWriter;
use crate::sink::writer::SinkWriter;
use crate::sink::{Result, SinkCommitCoordinator, SinkParam};
use crate::{
deserialize_bool_from_string, deserialize_optional_bool_from_string,
deserialize_optional_string_seq_from_string,
};
use crate::{deserialize_bool_from_string, deserialize_optional_string_seq_from_string};

pub const ICEBERG_SINK: &str = "iceberg";

Expand All @@ -82,13 +79,6 @@ pub struct IcebergConfig {
#[serde(flatten)]
common: IcebergCommon,

#[serde(
rename = "s3.path.style.access",
default,
deserialize_with = "deserialize_optional_bool_from_string"
)]
pub path_style_access: Option<bool>,

#[serde(
rename = "primary_key",
default,
Expand Down Expand Up @@ -175,21 +165,21 @@ impl IcebergConfig {

pub async fn create_catalog(&self) -> Result<CatalogRef> {
self.common
.create_catalog(&self.path_style_access, &self.java_catalog_props)
.create_catalog(&self.java_catalog_props)
.await
.map_err(Into::into)
}

pub async fn load_table(&self) -> Result<Table> {
self.common
.load_table(&self.path_style_access, &self.java_catalog_props)
.load_table(&self.java_catalog_props)
.await
.map_err(Into::into)
}

pub async fn create_catalog_v2(&self) -> Result<Arc<dyn CatalogV2>> {
self.common
.create_catalog_v2(&self.path_style_access, &self.java_catalog_props)
.create_catalog_v2(&self.java_catalog_props)
.await
.map_err(Into::into)
}
Expand Down Expand Up @@ -1018,10 +1008,10 @@ mod test {
catalog_name: Some("demo".to_string()),
database_name: Some("demo_db".to_string()),
table_name: "demo_table".to_string(),
path_style_access: Some(true),
},
r#type: "upsert".to_string(),
force_append_only: false,
path_style_access: Some(true),
primary_key: Some(vec!["v1".to_string()]),
java_catalog_props: [("jdbc.user", "admin"), ("jdbc.password", "123456")]
.into_iter()
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ impl IcebergProperties {
if let Some(jdbc_password) = self.jdbc_password.clone() {
java_catalog_props.insert("jdbc.password".to_string(), jdbc_password);
}
// TODO: support path_style_access and java_catalog_props for iceberg source
self.common.load_table_v2(&None, &java_catalog_props).await
// TODO: support java_catalog_props for iceberg source
self.common.load_table_v2(&java_catalog_props).await
}

pub async fn load_table_v2_with_metadata(
Expand All @@ -82,7 +82,7 @@ impl IcebergProperties {
}
// TODO: support path_style_access and java_catalog_props for iceberg source
self.common
.load_table_v2_with_metadata(table_meta, &None, &java_catalog_props)
.load_table_v2_with_metadata(table_meta, &java_catalog_props)
.await
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ IcebergProperties:
field_type: String
comments: Full name of table, must include schema name.
required: true
- name: s3.path.style.access
field_type: bool
required: false
default: Default::default
- name: catalog.jdbc.user
field_type: String
required: false
Expand Down

0 comments on commit ca75c21

Please sign in to comment.