Skip to content

Commit

Permalink
feat(iceberg): make wrehouse.path optional for iceberg rest catalog (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Nov 14, 2024
1 parent c1435dd commit daed1f2
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 55 deletions.
122 changes: 78 additions & 44 deletions src/connector/src/connector_common/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct IcebergCommon {
pub secret_key: String,
/// Path of iceberg warehouse, only applicable in storage catalog.
#[serde(rename = "warehouse.path")]
pub warehouse_path: String,
pub warehouse_path: Option<String>,
/// Catalog name, can be omitted for storage catalog, but
/// must be set for other catalogs.
#[serde(rename = "catalog.name")]
Expand Down Expand Up @@ -142,23 +142,31 @@ impl IcebergCommon {
self.secret_key.clone().to_string(),
);

let (bucket, _) = {
let url = Url::parse(&self.warehouse_path)
.with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?;
let bucket = url
.host_str()
.with_context(|| {
format!(
"Invalid s3 path: {}, bucket is missing",
self.warehouse_path
)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};
match &self.warehouse_path {
Some(warehouse_path) => {
let (bucket, _) = {
let url = Url::parse(warehouse_path).with_context(|| {
format!("Invalid warehouse path: {}", warehouse_path)
})?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
}
None => {
if catalog_type != "rest" {
bail!("`warehouse.path` must be set in {} catalog", &catalog_type);
}
}
}

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
// #TODO
// Support load config file
iceberg_configs.insert(
Expand All @@ -176,7 +184,9 @@ impl IcebergCommon {
java_catalog_configs.insert("uri".to_string(), uri.to_string());
}

java_catalog_configs.insert("warehouse".to_string(), self.warehouse_path.clone());
if let Some(warehouse_path) = &self.warehouse_path {
java_catalog_configs.insert("warehouse".to_string(), warehouse_path.clone());
}
java_catalog_configs.extend(java_catalog_props.clone());

// Currently we only support s3, so let's set it to s3
Expand Down Expand Up @@ -241,6 +251,7 @@ impl IcebergCommon {

/// icelake
mod v1 {
use anyhow::anyhow;
use icelake::catalog::{load_catalog, CatalogRef};
use icelake::{Table, TableIdentifier};

Expand Down Expand Up @@ -268,7 +279,9 @@ mod v1 {
"storage" => {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.catalog_name()),
self.warehouse_path.clone(),
self.warehouse_path.clone().ok_or_else(|| {
anyhow!("`warehouse.path` must be set in storage catalog")
})?,
);
iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".into());
}
Expand Down Expand Up @@ -318,28 +331,36 @@ mod v1 {
);
}

let (bucket, root) = {
let url = Url::parse(&self.warehouse_path)
.with_context(|| format!("Invalid warehouse path: {}", self.warehouse_path))?;
let bucket = url
.host_str()
.with_context(|| {
format!(
"Invalid s3 path: {}, bucket is missing",
self.warehouse_path
)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};
match &self.warehouse_path {
Some(warehouse_path) => {
let (bucket, root) = {
let url = Url::parse(warehouse_path).with_context(|| {
format!("Invalid warehouse path: {}", warehouse_path)
})?;
let bucket = url
.host_str()
.with_context(|| {
format!("Invalid s3 path: {}, bucket is missing", warehouse_path)
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);

// Only storage catalog should set this.
if catalog_type == "storage" {
iceberg_configs.insert("iceberg.table.io.root".to_string(), root);
// Only storage catalog should set this.
if catalog_type == "storage" {
iceberg_configs.insert("iceberg.table.io.root".to_string(), root);
}
}
None => {
if catalog_type == "storage" {
bail!("`warehouse.path` must be set in storage catalog");
}
}
}

// #TODO
// Support load config file
iceberg_configs.insert(
Expand Down Expand Up @@ -416,6 +437,7 @@ mod v1 {

/// iceberg-rust
mod v2 {
use anyhow::anyhow;
use iceberg::spec::TableMetadata;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, TableIdent};
Expand All @@ -442,7 +464,9 @@ mod v2 {
match self.catalog_type() {
"storage" => {
let config = storage_catalog::StorageCatalogConfig::builder()
.warehouse(self.warehouse_path.clone())
.warehouse(self.warehouse_path.clone().ok_or_else(|| {
anyhow!("`warehouse.path` must be set in storage catalog")
})?)
.access_key(self.access_key.clone())
.secret_key(self.secret_key.clone())
.region(self.region.clone())
Expand All @@ -468,12 +492,18 @@ mod v2 {
S3_SECRET_ACCESS_KEY.to_string(),
self.secret_key.clone().to_string(),
);
let config = iceberg_catalog_rest::RestCatalogConfig::builder()
let config_builder = iceberg_catalog_rest::RestCatalogConfig::builder()
.uri(self.catalog_uri.clone().with_context(|| {
"`catalog.uri` must be set in rest catalog".to_string()
})?)
.props(iceberg_configs)
.build();
.props(iceberg_configs);

let config = match &self.warehouse_path {
Some(warehouse_path) => {
config_builder.warehouse(warehouse_path.clone()).build()
}
None => config_builder.build(),
};
let catalog = iceberg_catalog_rest::RestCatalog::new(config);
Ok(Arc::new(catalog))
}
Expand Down Expand Up @@ -509,7 +539,9 @@ mod v2 {
self.secret_key.clone().to_string(),
);
let config_builder = iceberg_catalog_glue::GlueCatalogConfig::builder()
.warehouse(self.warehouse_path.clone())
.warehouse(self.warehouse_path.clone().ok_or_else(|| {
anyhow!("`warehouse.path` must be set in glue catalog")
})?)
.props(iceberg_configs);
let config = if let Some(uri) = self.catalog_uri.as_deref() {
config_builder.uri(uri.to_string()).build()
Expand Down Expand Up @@ -575,7 +607,9 @@ mod v2 {
match self.catalog_type() {
"storage" => {
let config = storage_catalog::StorageCatalogConfig::builder()
.warehouse(self.warehouse_path.clone())
.warehouse(self.warehouse_path.clone().ok_or_else(|| {
anyhow!("`warehouse.path` must be set in storage catalog")
})?)
.access_key(self.access_key.clone())
.secret_key(self.secret_key.clone())
.region(self.region.clone())
Expand Down
26 changes: 17 additions & 9 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,26 @@ impl IcebergSink {
let location = {
let mut names = namespace.clone().inner();
names.push(self.config.common.table_name.to_string());
if self.config.common.warehouse_path.ends_with('/') {
format!("{}{}", self.config.common.warehouse_path, names.join("/"))
} else {
format!("{}/{}", self.config.common.warehouse_path, names.join("/"))
match &self.config.common.warehouse_path {
Some(warehouse_path) => {
if warehouse_path.ends_with('/') {
Some(format!("{}{}", warehouse_path, names.join("/")))
} else {
Some(format!("{}/{}", warehouse_path, names.join("/")))
}
}
None => None,
}
};

let table_creation = TableCreation::builder()
let table_creation_builder = TableCreation::builder()
.name(self.config.common.table_name.clone())
.schema(iceberg_schema)
.location(location)
.build();
.schema(iceberg_schema);

let table_creation = match location {
Some(location) => table_creation_builder.location(location).build(),
None => table_creation_builder.build(),
};

catalog
.create_table(&namespace, table_creation)
Expand Down Expand Up @@ -998,7 +1006,7 @@ mod test {

let expected_iceberg_config = IcebergConfig {
common: IcebergCommon {
warehouse_path: "s3://iceberg".to_string(),
warehouse_path: Some("s3://iceberg".to_string()),
catalog_uri: Some("jdbc://postgresql://postgres:5432/iceberg".to_string()),
region: Some("us-east-1".to_string()),
endpoint: Some("http://127.0.0.1:9301".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ IcebergConfig:
- name: warehouse.path
field_type: String
comments: Path of iceberg warehouse, only applicable in storage catalog.
required: true
required: false
- name: catalog.name
field_type: String
comments: |-
Expand Down
2 changes: 1 addition & 1 deletion src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ IcebergProperties:
- name: warehouse.path
field_type: String
comments: Path of iceberg warehouse, only applicable in storage catalog.
required: true
required: false
- name: catalog.name
field_type: String
comments: |-
Expand Down

0 comments on commit daed1f2

Please sign in to comment.