Skip to content

Commit

Permalink
fix(ci): iceberg sink test failure (#12446)
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Sep 20, 2023
1 parent 9358708 commit c9f3cd0
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 32 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ hashbrown = { version = "0.14.0", features = [
criterion = { version = "0.5", features = ["async_futures"] }
tonic = { package = "madsim-tonic", version = "0.3.1" }
tonic-build = { package = "madsim-tonic-build", version = "0.3.1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "85ca0e57bf059b2e84a8bca531f9a8f3bc2f8dfd" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "166a36b1a40a64086db09a0e0f2ed6791cec548b" }
arrow-array = "46"
arrow-schema = "46"
arrow-buffer = "46"
Expand Down
6 changes: 3 additions & 3 deletions e2e_test/iceberg/iceberg_sink_v2.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ CREATE SINK s6 AS select * from mv6 WITH (
force_append_only = 'true',
database.name = 'demo',
table.name = 'demo_db.demo_table',

catalog.type = 'storage',
warehouse.path = 's3://icebergdata/demo',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = 'admin',
s3.secret.key = 'password'
s3.region = 'us-east-1',
s3.access.key = 'hummockadmin',
s3.secret.key = 'hummockadmin'
);

statement ok
Expand Down
79 changes: 52 additions & 27 deletions src/connector/src/sink/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::ConnectorClient;
use serde_derive::Deserialize;
use serde_json::Value;
use url::Url;

use super::{
Sink, SinkError, SinkWriter, SinkWriterParam, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION,
Expand Down Expand Up @@ -73,7 +74,7 @@ pub struct IcebergConfig {
pub catalog_type: Option<String>,

#[serde(rename = "warehouse.path")]
pub path: Option<String>, // Path of iceberg warehouse, only applicable in storage catalog.
pub path: String, // Path of iceberg warehouse, only applicable in storage catalog.

#[serde(rename = "catalog.uri")]
pub uri: Option<String>, // URI of iceberg catalog, only applicable in rest catalog.
Expand Down Expand Up @@ -108,57 +109,81 @@ impl IcebergConfig {
Ok(config)
}

fn build_iceberg_configs(&self) -> HashMap<String, String> {
fn build_iceberg_configs(&self) -> Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();
iceberg_configs.insert(
CATALOG_TYPE.to_string(),
self.catalog_type
.as_deref()
.unwrap_or("storage")
.to_string(),
);

let catalog_type = self
.catalog_type
.as_deref()
.unwrap_or("storage")
.to_string();

iceberg_configs.insert(CATALOG_TYPE.to_string(), catalog_type.clone());
iceberg_configs.insert(
CATALOG_NAME.to_string(),
self.database_name.clone().to_string(),
);
if let Some(path) = &self.path {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
path.clone().to_string(),
);
}

if let Some(uri) = &self.uri {
iceberg_configs.insert(
format!("iceberg.catalog.{}.uri", self.database_name),
uri.clone().to_string(),
);
match catalog_type.as_str() {
"storage" => {
iceberg_configs.insert(
format!("iceberg.catalog.{}.warehouse", self.database_name),
self.path.clone(),
);
}
"rest" => {
let uri = self.uri.clone().ok_or_else(|| {
SinkError::Iceberg(anyhow!("`catalog.uri` must be set in rest catalog"))
})?;
iceberg_configs.insert(format!("iceberg.catalog.{}.uri", self.database_name), uri);
}
_ => {
return Err(SinkError::Iceberg(anyhow!(
"Unsupported catalog type: {}, only support `storage` and `rest`",
catalog_type
)))
}
}

if let Some(region) = &self.region {
iceberg_configs.insert(
"iceberg.catalog.table.io.region".to_string(),
"iceberg.table.io.region".to_string(),
region.clone().to_string(),
);
}

if let Some(endpoint) = &self.endpoint {
iceberg_configs.insert(
"iceberg.catalog.table.io.endpoint".to_string(),
"iceberg.table.io.endpoint".to_string(),
endpoint.clone().to_string(),
);
}

iceberg_configs.insert(
"iceberg.catalog.table.io.access_key_id".to_string(),
"iceberg.table.io.access_key_id".to_string(),
self.access_key.clone().to_string(),
);
iceberg_configs.insert(
"iceberg.catalog.table.io.secret_access_key".to_string(),
"iceberg.table.io.secret_access_key".to_string(),
self.secret_key.clone().to_string(),
);

iceberg_configs
let (bucket, root) = {
let url = Url::parse(&self.path).map_err(|e| SinkError::Iceberg(anyhow!(e)))?;
let bucket = url
.host_str()
.ok_or_else(|| {
SinkError::Iceberg(anyhow!("Invalid s3 path: {}, bucket is missing", self.path))
})?
.to_string();
let root = url.path().trim_start_matches('/').to_string();
(bucket, root)
};

iceberg_configs.insert("iceberg.table.io.bucket".to_string(), bucket);
iceberg_configs.insert("iceberg.table.io.root".to_string(), root);

Ok(iceberg_configs)
}
}

Expand All @@ -177,7 +202,7 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_table(&self) -> Result<Table> {
let catalog = load_catalog(&self.config.build_iceberg_configs())
let catalog = load_catalog(&self.config.build_iceberg_configs()?)
.await
.map_err(|e| SinkError::Iceberg(anyhow!("Unable to load iceberg catalog: {e}")))?;

Expand Down Expand Up @@ -414,7 +439,7 @@ impl SinkCommitCoordinator for IcebergSinkCommitter {
.collect::<Result<Vec<WriteResult>>>()?;

let mut txn = Transaction::new(&mut self.table);
txn.append_file(
txn.append_data_file(
write_results
.into_iter()
.flat_map(|s| s.data_files.into_iter()),
Expand Down

0 comments on commit c9f3cd0

Please sign in to comment.