Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
fix

fmt
  • Loading branch information
xxhZs committed Sep 12, 2024
1 parent 24092e2 commit e39af11
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
# branch dev
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ delete from s1 where i1 = 7;
statement ok
flush;

sleep 5s

query I
select * from iceberg_t1_source order by i1;
----
Expand Down
30 changes: 12 additions & 18 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

pub mod parquet_file_reader;

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -208,12 +208,13 @@ impl IcebergSplitEnumerator {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table_v2().await?;

let current_snapshot = table.metadata().current_snapshot();
if current_snapshot.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit {
split_id: 0,
snapshot_id: 0,
snapshot_id: 0, // unused
table_meta: TableMetadataJsonStr::serialize(table.metadata()),
files: vec![],
eq_delete_files: vec![],
Expand Down Expand Up @@ -339,27 +340,20 @@ impl IcebergSplitEnumerator {
}
}
}
let mut delete_columns = equality_ids
let delete_columns = equality_ids
.into_iter()
.map(|id| match schema.name_by_field_id(id) {
Some(name) => Ok::<std::string::String, ConnectorError>(name.to_string()),
None => bail!("Delete field id {} not found in schema", id),
})
.collect::<ConnectorResult<HashSet<_>>>()?;
delete_columns.extend(rw_schema.names().iter().cloned());
let require_field_ids = schema
.as_struct()
.fields()
.iter()
.filter_map(|field| {
if delete_columns.contains(&field.name) {
Some(field.name.clone())
} else {
None
}
})
.collect();
Ok(require_field_ids)
.collect::<ConnectorResult<Vec<_>>>()?;
let mut require_field_names: Vec<_> = rw_schema.names().to_vec();
for names in delete_columns {
if !require_field_names.contains(&names) {
require_field_names.push(names);
}
}
Ok(require_field_names)
}
}

Expand Down

0 comments on commit e39af11

Please sign in to comment.