Skip to content

Commit

Permalink
refactor(iceberg): use load table v2 to extract iceberg columns (#18196)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Aug 22, 2024
1 parent 84b7b94 commit 9c4984d
Showing 1 changed file with 5 additions and 12 deletions.
17 changes: 5 additions & 12 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1313,12 +1313,9 @@ pub async fn extract_iceberg_columns(
let props = ConnectorProperties::extract(with_properties.clone(), true)?;
if let ConnectorProperties::Iceberg(properties) = props {
let iceberg_config: IcebergConfig = properties.to_iceberg_config();
let table = iceberg_config.load_table().await?;
let iceberg_schema: arrow_schema_iceberg::Schema = table
.current_table_metadata()
.current_schema()?
.clone()
.try_into()?;
let table = iceberg_config.load_table_v2().await?;
let iceberg_schema: arrow_schema_iceberg::Schema =
iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;

let columns = iceberg_schema
.fields()
Expand Down Expand Up @@ -1368,13 +1365,9 @@ pub async fn check_iceberg_source(
.collect(),
};

let table = iceberg_config.load_table().await?;
let table = iceberg_config.load_table_v2().await?;

let iceberg_schema: arrow_schema_iceberg::Schema = table
.current_table_metadata()
.current_schema()?
.clone()
.try_into()?;
let iceberg_schema = iceberg::arrow::schema_to_arrow_schema(table.metadata().current_schema())?;

for f1 in schema.fields() {
if !iceberg_schema.fields.iter().any(|f2| f2.name() == &f1.name) {
Expand Down

0 comments on commit 9c4984d

Please sign in to comment.