Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Sep 20, 2023
1 parent e33c304 commit 5cac3fc
Showing 1 changed file with 47 additions and 22 deletions.
69 changes: 47 additions & 22 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_json::Value;
use url::Url;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -73,7 +74,7 @@ pub struct IcebergConfig {
pub catalog_type: Option<String>,

#[serde(rename = "warehouse.path")]
pub path: Option<String>, // Path of iceberg warehouse, only applicable in storage catalog.
pub path: String, // Path of iceberg warehouse, only applicable in storage catalog.

#[serde(rename = "catalog.uri")]
pub uri: Option<String>, // URI of iceberg catalog, only applicable in rest catalog.
Expand Down Expand Up @@ -108,31 +109,40 @@ impl IcebergConfig {
Ok(config)
}

fn build_iceberg_configs(&self) -> HashMap<String, String> {
fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();
iceberg_configs.insert(
CATALOG_TYPE.to_string(),
self.catalog_type
.as_deref()
.unwrap_or("storage")
.to_string(),
);

let catalog_type = self
.catalog_type
.as_deref()
.unwrap_or("storage")
.to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(
CATALOG_NAME.to_string(),
self.database_name.clone().to_string(),
);
if let Some(path) = &self.path {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
path.clone().to_string(),
);
}

if let Some(uri) = &self.uri {
iceberg_configs.insert(
format!("iceberg.catalog.{}.uri", self.database_name),
uri.clone().to_string(),
);
match catalog_type.as_str() {
"storage" => {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
self.path.clone(),
);
}
"rest" => {
let uri = self.uri.clone().ok_or_else(|| {
SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog"))
})?;
iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.database_name), uri);
}
_ => {
return Err(SinkError::Iceberg(anyhow!(
"Unsupported catalog type: {}, only support `storage` and `rest`",
catalog_type
)))
}
}

if let Some(region) = &self.region {
Expand All @@ -158,7 +168,22 @@ impl IcebergConfig {
self.secret_key.clone().to_string(),
);

iceberg_configs
let (bucket, root) = {
let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
let bucket = url
.host_str()
.ok_or_else(|| {
SinkError::Iceberg(anyhow!("Invalid s3 path: {}, bucket is missing", self.path))
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
iceberg_configs.insert("iceberg.table.io.root".to_string(), root);

Ok(iceberg_configs)
}
}

Expand All @@ -177,7 +202,7 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_table(&self) -> Result<Table> {
let catalog = load_catalog(&self.config.build_iceberg_configs())
let catalog = load_catalog(&self.config.build_iceberg_configs()?)
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

Expand Down

0 comments on commit 5cac3fc

Please sign in to comment.