diff --git a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt index 05876099edcce..4ea84f0f17bcd 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt @@ -1,3 +1,4 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s. statement ok set streaming_parallelism=4; diff --git a/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt index 820776fb7e773..e4200b085ca14 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt @@ -1,3 +1,4 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = false, so we'll commit every 1s. statement ok set sink_decouple = false; diff --git a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt index 96870a76b83fb..a99ee36c88837 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt @@ -1,3 +1,4 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s. statement ok set streaming_parallelism=4; diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 2c6ef7340c150..9fe15e0719d7e 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -102,8 +102,11 @@ impl IcebergScanExecutor { let data_types = self.schema.data_types(); let executor_schema_names = self.schema.names(); - let position_delete_filter = PositionDeleteFilter::new( + let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks); + + let mut position_delete_filter = PositionDeleteFilter::new( mem::take(&mut self.position_delete_file_scan_tasks), + &data_file_scan_tasks, &table, self.batch_size, ) @@ -116,8 +119,6 @@ impl IcebergScanExecutor { ) .await?; - let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks); - // Delete rows in the data file that need to be deleted by map for data_file_scan_task in data_file_scan_tasks { let data_file_path = data_file_scan_task.data_file_path.clone(); @@ -147,6 +148,7 @@ impl IcebergScanExecutor { assert_eq!(chunk.data_types(), data_types); yield chunk; } + position_delete_filter.remove_file_path(&data_file_path); } } } @@ -227,17 +229,22 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { struct PositionDeleteFilter { // Delete columns pos for each file path, false means this column needs to be deleted, value is divided by batch size - position_delete_file_path_pos_map: HashMap>>, + position_delete_file_path_pos_map: HashMap>>, } impl PositionDeleteFilter { async fn new( position_delete_file_scan_tasks: Vec, + data_file_scan_tasks: &[FileScanTask], table: &Table, batch_size: usize, ) -> crate::error::Result { - let mut position_delete_file_path_pos_map: HashMap>> = + let mut position_delete_file_path_pos_map: HashMap>> = HashMap::default(); + let data_file_path_set = data_file_scan_tasks + .iter() + .map(|data_file_scan_task| data_file_scan_task.data_file_path.clone()) + .collect::>(); let position_delete_file_scan_stream = { #[try_stream] @@ -259,25 +266,25 @@ impl PositionDeleteFilter { let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; for row in chunk.rows() { // The schema is fixed. `0` must be `file_path`, `1` must be `pos`. - if let Some(file_path) = row.datum_at(POSITION_DELETE_FILE_FILE_PATH_INDEX) - && let Some(pos) = row.datum_at(POSITION_DELETE_FILE_POS) + if let Some(ScalarRefImpl::Utf8(file_path)) = + row.datum_at(POSITION_DELETE_FILE_FILE_PATH_INDEX) + && let Some(ScalarRefImpl::Int64(pos)) = row.datum_at(POSITION_DELETE_FILE_POS) { - if let ScalarRefImpl::Utf8(file_path) = file_path - && let ScalarRefImpl::Int64(pos) = pos - { - let entry = position_delete_file_path_pos_map - .entry(file_path.to_string()) - .or_default(); - // Split `pos` by `batch_size`, because the data file will also be split by `batch_size` - let delete_vec_index = pos as usize / batch_size; - let delete_vec_pos = pos as usize % batch_size; - if delete_vec_index >= entry.len() { - entry.resize(delete_vec_index + 1, vec![true; batch_size]); - } - entry[delete_vec_index][delete_vec_pos] = false; - } else { - bail!("position delete `file_path` and `pos` must be string and int64") + if !data_file_path_set.contains(file_path) { + continue; } + let entry = position_delete_file_path_pos_map + .entry(file_path.to_string()) + .or_default(); + // Split `pos` by `batch_size`, because the data file will also be split by `batch_size` + let delete_vec_index = pos as usize / batch_size; + let delete_vec_pos = pos as usize % batch_size; + let delete_vec = entry + .entry(delete_vec_index) + .or_insert(vec![true; batch_size]); + delete_vec[delete_vec_pos] = false; + } else { + bail!("position delete `file_path` and `pos` must be string and int64") } } } @@ -287,13 +294,11 @@ impl PositionDeleteFilter { } fn filter(&self, data_file_path: &str, mut chunk: DataChunk, index: usize) -> DataChunk { - if !chunk.is_compacted() { - chunk = chunk.compact(); - } + chunk = chunk.compact(); if let Some(position_delete_bool_iter) = self .position_delete_file_path_pos_map .get(data_file_path) - .and_then(|delete_vecs| delete_vecs.get(index)) + .and_then(|delete_vecs| delete_vecs.get(&index)) { // Some chunks are less than `batch_size`, so we need to be truncate to ensure that the bitmap length is consistent let position_delete_bool_iter = if position_delete_bool_iter.len() > chunk.capacity() { @@ -308,6 +313,10 @@ impl PositionDeleteFilter { chunk } } + + fn remove_file_path(&mut self, file_path: &str) { + self.position_delete_file_path_pos_map.remove(file_path); + } } struct EqualityDeleteFilter { @@ -377,34 +386,33 @@ impl EqualityDeleteFilter { } fn apply_data_file_scan_task(&mut self, data_file_scan_task: &FileScanTask) { - self.data_chunk_column_names = Some( - data_file_scan_task - .project_field_ids - .iter() - .filter_map(|id| { - data_file_scan_task - .schema - .name_by_field_id(*id) - .map(|name| name.to_string()) - }) - .collect(), - ); - // eq_delete_column_idxes are used to fetch equality delete columns from data files. - self.equality_delete_column_idxes = - self.equality_delete_ids - .as_ref() - .map(|equality_delete_ids| { - equality_delete_ids - .iter() - .map(|equality_delete_id| { - data_file_scan_task - .project_field_ids - .iter() - .position(|project_field_id| equality_delete_id == project_field_id) - .expect("equality_delete_id not found in delete_equality_ids") - }) - .collect_vec() - }); + if let Some(equality_delete_ids) = &self.equality_delete_ids { + self.data_chunk_column_names = Some( + data_file_scan_task + .project_field_ids + .iter() + .filter_map(|id| { + data_file_scan_task + .schema + .name_by_field_id(*id) + .map(|name| name.to_string()) + }) + .collect(), + ); + // eq_delete_column_idxes are used to fetch equality delete columns from data files. + self.equality_delete_column_idxes = Some( + equality_delete_ids + .iter() + .map(|equality_delete_id| { + data_file_scan_task + .project_field_ids + .iter() + .position(|project_field_id| equality_delete_id == project_field_id) + .expect("equality_delete_id not found in delete_equality_ids") + }) + .collect_vec(), + ); + } } fn filter( @@ -412,9 +420,7 @@ impl EqualityDeleteFilter { mut chunk: DataChunk, data_sequence_number: i64, ) -> crate::error::Result { - if !chunk.is_compacted() { - chunk = chunk.compact(); - } + chunk = chunk.compact(); match self.equality_delete_column_idxes.as_ref() { Some(delete_column_ids) => { let new_visibility = Bitmap::from_iter(