Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 2, 2024
1 parent 6d9125a commit 77f47c0
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,11 @@ impl IcebergSink {
.map(|column| {
let mut arrow_field = IcebergArrowConvert
.to_arrow_field(&column.name, &column.data_type)
.map_err(|e| SinkError::Iceberg(anyhow!(e))).context(format!("failed to convert {}: {} to arrow type", &column.name, &column.data_type))?;
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context(format!(
"failed to convert {}: {} to arrow type",
&column.name, &column.data_type
))?;
let mut metadata = HashMap::new();
metadata.insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
Expand All @@ -771,7 +775,8 @@ impl IcebergSink {
.collect::<Result<Vec<ArrowField>>>()?;
let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
.map_err(|e| SinkError::Iceberg(anyhow!(e))).context("failed to convert arrow schema to iceberg schema")?;
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to convert arrow schema to iceberg schema")?;

let location = {
let mut names = namespace.clone().inner();
Expand All @@ -788,7 +793,8 @@ impl IcebergSink {
catalog
.create_table(&namespace, table_creation)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e))).context("failed to create iceberg table")?;
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to create iceberg table")?;
}
Ok(())
}
Expand Down

0 comments on commit 77f47c0

Please sign in to comment.