Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 12, 2024
1 parent 24092e2 commit 0fc921e
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ arrow-array-iceberg = { package = "arrow-array", version = "52" }
arrow-schema-iceberg = { package = "arrow-schema", version = "52" }
arrow-buffer-iceberg = { package = "arrow-buffer", version = "52" }
arrow-cast-iceberg = { package = "arrow-cast", version = "52" }
# branch dev
iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "bf97b9de51c5c86394e4df7aaff82cc3ed8bc928" }
Expand Down
2 changes: 2 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ delete from s1 where i1 = 7;
statement ok
flush;

sleep 5s

query I
select * from iceberg_t1_source order by i1;
----
Expand Down
55 changes: 16 additions & 39 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

pub mod parquet_file_reader;

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
Expand Down Expand Up @@ -208,18 +208,6 @@ impl IcebergSplitEnumerator {
bail!("Batch parallelism is 0. Cannot split the iceberg files.");
}
let table = self.config.load_table_v2().await?;
let current_snapshot = table.metadata().current_snapshot();
if current_snapshot.is_none() {
// If there is no snapshot, we will return a mock `IcebergSplit` with empty files.
return Ok(vec![IcebergSplit {
split_id: 0,
snapshot_id: 0,
table_meta: TableMetadataJsonStr::serialize(table.metadata()),
files: vec![],
eq_delete_files: vec![],
}]);
}

let snapshot_id = match time_traval_info {
Some(IcebergTimeTravelInfo::Version(version)) => {
let Some(snapshot) = table.metadata().snapshot_by_id(version) else {
Expand Down Expand Up @@ -249,12 +237,12 @@ impl IcebergSplitEnumerator {
}
}
}
None => {
assert!(current_snapshot.is_some());
current_snapshot.unwrap().snapshot_id()
}
None => match table.metadata().current_snapshot() {
Some(snapshot) => snapshot.snapshot_id(),
None => bail!("Cannot find the current snapshot id in the iceberg table."),
},
};
let require_names = Self::get_require_field_names(&table, snapshot_id, schema).await?;
let require_names = Self::get_require_field_names(&table, snapshot_id,schema).await?;

let mut data_files = vec![];
let mut eq_delete_files = vec![];
Expand Down Expand Up @@ -315,11 +303,7 @@ impl IcebergSplitEnumerator {
.collect_vec())
}

async fn get_require_field_names(
table: &Table,
snapshot_id: i64,
rw_schema: Schema,
) -> ConnectorResult<Vec<String>> {
async fn get_require_field_names(table: &Table, snapshot_id: i64, rw_schema: Schema) -> ConnectorResult<Vec<String>> {
let scan = table
.scan()
.snapshot_id(snapshot_id)
Expand All @@ -339,27 +323,20 @@ impl IcebergSplitEnumerator {
}
}
}
let mut delete_columns = equality_ids
let delete_columns = equality_ids
.into_iter()
.map(|id| match schema.name_by_field_id(id) {
Some(name) => Ok::<std::string::String, ConnectorError>(name.to_string()),
None => bail!("Delete field id {} not found in schema", id),
})
.collect::<ConnectorResult<HashSet<_>>>()?;
delete_columns.extend(rw_schema.names().iter().cloned());
let require_field_ids = schema
.as_struct()
.fields()
.iter()
.filter_map(|field| {
if delete_columns.contains(&field.name) {
Some(field.name.clone())
} else {
None
}
})
.collect();
Ok(require_field_ids)
.collect::<ConnectorResult<Vec<_>>>()?;
let mut require_field_names: Vec<_> = rw_schema.names().iter().cloned().collect();
for names in delete_columns{
if !require_field_names.contains(&names){
require_field_names.push(names);
}
}
Ok(require_field_names)
}
}

Expand Down

0 comments on commit 0fc921e

Please sign in to comment.