Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 19, 2024
1 parent e5ede6d commit 8a6d296
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 57 deletions.
1 change: 1 addition & 0 deletions e2e_test/iceberg/test_case/iceberg_source_all_delete.slt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s.
statement ok
set streaming_parallelism=4;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = false, so we'll commit every 1s.
statement ok
set sink_decouple = false;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s.
statement ok
set streaming_parallelism=4;

Expand Down
120 changes: 63 additions & 57 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ impl IcebergScanExecutor {
let data_types = self.schema.data_types();
let executor_schema_names = self.schema.names();

let position_delete_filter = PositionDeleteFilter::new(
let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks);

let mut position_delete_filter = PositionDeleteFilter::new(
mem::take(&mut self.position_delete_file_scan_tasks),
&data_file_scan_tasks,
&table,
self.batch_size,
)
Expand All @@ -116,8 +119,6 @@ impl IcebergScanExecutor {
)
.await?;

let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks);

// Delete rows in the data file that need to be deleted by map
for data_file_scan_task in data_file_scan_tasks {
let data_file_path = data_file_scan_task.data_file_path.clone();
Expand Down Expand Up @@ -147,6 +148,7 @@ impl IcebergScanExecutor {
assert_eq!(chunk.data_types(), data_types);
yield chunk;
}
position_delete_filter.remove_file_path(&data_file_path);
}
}
}
Expand Down Expand Up @@ -227,17 +229,22 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {

struct PositionDeleteFilter {
// Delete columns pos for each file path, false means this column needs to be deleted, value is divided by batch size
position_delete_file_path_pos_map: HashMap<String, Vec<Vec<bool>>>,
position_delete_file_path_pos_map: HashMap<String, HashMap<usize, Vec<bool>>>,
}

impl PositionDeleteFilter {
async fn new(
position_delete_file_scan_tasks: Vec<FileScanTask>,
data_file_scan_tasks: &[FileScanTask],
table: &Table,
batch_size: usize,
) -> crate::error::Result<Self> {
let mut position_delete_file_path_pos_map: HashMap<String, Vec<Vec<bool>>> =
let mut position_delete_file_path_pos_map: HashMap<String, HashMap<usize, Vec<bool>>> =
HashMap::default();
let data_file_path_set = data_file_scan_tasks
.iter()
.map(|data_file_scan_task| data_file_scan_task.data_file_path.clone())
.collect::<std::collections::HashSet<_>>();

let position_delete_file_scan_stream = {
#[try_stream]
Expand All @@ -259,25 +266,25 @@ impl PositionDeleteFilter {
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
for row in chunk.rows() {
// The schema is fixed. `0` must be `file_path`, `1` must be `pos`.
if let Some(file_path) = row.datum_at(POSITION_DELETE_FILE_FILE_PATH_INDEX)
&& let Some(pos) = row.datum_at(POSITION_DELETE_FILE_POS)
if let Some(ScalarRefImpl::Utf8(file_path)) =
row.datum_at(POSITION_DELETE_FILE_FILE_PATH_INDEX)
&& let Some(ScalarRefImpl::Int64(pos)) = row.datum_at(POSITION_DELETE_FILE_POS)
{
if let ScalarRefImpl::Utf8(file_path) = file_path
&& let ScalarRefImpl::Int64(pos) = pos
{
let entry = position_delete_file_path_pos_map
.entry(file_path.to_string())
.or_default();
// Split `pos` by `batch_size`, because the data file will also be split by `batch_size`
let delete_vec_index = pos as usize / batch_size;
let delete_vec_pos = pos as usize % batch_size;
if delete_vec_index >= entry.len() {
entry.resize(delete_vec_index + 1, vec![true; batch_size]);
}
entry[delete_vec_index][delete_vec_pos] = false;
} else {
bail!("position delete `file_path` and `pos` must be string and int64")
if !data_file_path_set.contains(file_path) {
continue;
}
let entry = position_delete_file_path_pos_map
.entry(file_path.to_string())
.or_default();
// Split `pos` by `batch_size`, because the data file will also be split by `batch_size`
let delete_vec_index = pos as usize / batch_size;
let delete_vec_pos = pos as usize % batch_size;
let delete_vec = entry
.entry(delete_vec_index)
.or_insert(vec![true; batch_size]);
delete_vec[delete_vec_pos] = false;
} else {
bail!("position delete `file_path` and `pos` must be string and int64")
}
}
}
Expand All @@ -287,13 +294,11 @@ impl PositionDeleteFilter {
}

fn filter(&self, data_file_path: &str, mut chunk: DataChunk, index: usize) -> DataChunk {
if !chunk.is_compacted() {
chunk = chunk.compact();
}
chunk = chunk.compact();
if let Some(position_delete_bool_iter) = self
.position_delete_file_path_pos_map
.get(data_file_path)
.and_then(|delete_vecs| delete_vecs.get(index))
.and_then(|delete_vecs| delete_vecs.get(&index))
{
// Some chunks are less than `batch_size`, so we need to be truncate to ensure that the bitmap length is consistent
let position_delete_bool_iter = if position_delete_bool_iter.len() > chunk.capacity() {
Expand All @@ -308,6 +313,10 @@ impl PositionDeleteFilter {
chunk
}
}

fn remove_file_path(&mut self, file_path: &str) {
self.position_delete_file_path_pos_map.remove(file_path);
}
}

struct EqualityDeleteFilter {
Expand Down Expand Up @@ -377,44 +386,41 @@ impl EqualityDeleteFilter {
}

fn apply_data_file_scan_task(&mut self, data_file_scan_task: &FileScanTask) {
self.data_chunk_column_names = Some(
data_file_scan_task
.project_field_ids
.iter()
.filter_map(|id| {
data_file_scan_task
.schema
.name_by_field_id(*id)
.map(|name| name.to_string())
})
.collect(),
);
// eq_delete_column_idxes are used to fetch equality delete columns from data files.
self.equality_delete_column_idxes =
self.equality_delete_ids
.as_ref()
.map(|equality_delete_ids| {
equality_delete_ids
.iter()
.map(|equality_delete_id| {
data_file_scan_task
.project_field_ids
.iter()
.position(|project_field_id| equality_delete_id == project_field_id)
.expect("equality_delete_id not found in delete_equality_ids")
})
.collect_vec()
});
if let Some(equality_delete_ids) = &self.equality_delete_ids {
self.data_chunk_column_names = Some(
data_file_scan_task
.project_field_ids
.iter()
.filter_map(|id| {
data_file_scan_task
.schema
.name_by_field_id(*id)
.map(|name| name.to_string())
})
.collect(),
);
// eq_delete_column_idxes are used to fetch equality delete columns from data files.
self.equality_delete_column_idxes = Some(
equality_delete_ids
.iter()
.map(|equality_delete_id| {
data_file_scan_task
.project_field_ids
.iter()
.position(|project_field_id| equality_delete_id == project_field_id)
.expect("equality_delete_id not found in delete_equality_ids")
})
.collect_vec(),
);
}
}

fn filter(
&self,
mut chunk: DataChunk,
data_sequence_number: i64,
) -> crate::error::Result<DataChunk> {
if !chunk.is_compacted() {
chunk = chunk.compact();
}
chunk = chunk.compact();
match self.equality_delete_column_idxes.as_ref() {
Some(delete_column_ids) => {
let new_visibility = Bitmap::from_iter(
Expand Down

0 comments on commit 8a6d296

Please sign in to comment.