diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 0a9fb652500bc..16035dde0adb9 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::ops::BitOr; use std::collections::HashMap; use std::mem; @@ -24,6 +23,7 @@ use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::row::{OwnedRow, Row}; use risingwave_common::types::DataType; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::sink::iceberg::IcebergConfig; @@ -101,17 +101,13 @@ impl IcebergScanExecutor { .map(|(k, v)| (v.clone(), k)) .collect::>(); - // The value to remove from the column and its seq_num. - let mut eq_delete_file_scan_tasks_map: HashMap< - String, - HashMap, i64>, - > = HashMap::default(); + let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - for mut eq_delete_file_scan_task in eq_delete_file_scan_tasks { - eq_delete_file_scan_task.project_field_ids = - eq_delete_file_scan_task.equality_ids.clone(); + let mut delete_column_names = None; + for eq_delete_file_scan_task in eq_delete_file_scan_tasks { let mut sequence_number = eq_delete_file_scan_task.sequence_number; + let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -124,26 +120,28 @@ impl IcebergScanExecutor { while let Some(record_batch) = delete_record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let delete_column_names = record_batch - .schema() - .fields() - .iter() - .map(|field| field.name()) - .cloned() - .collect_vec(); + if delete_column_names.is_none() { + delete_column_names = Some( + record_batch + .schema() + .fields() + .iter() + .map(|field| field.name()) + .cloned() + .collect_vec(), + ); + } let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - for (array, columns_name) in chunk.columns().iter().zip_eq_fast(delete_column_names) - { - let each_column_seq_num_map = eq_delete_file_scan_tasks_map - .entry(columns_name) + for row in chunk.rows() { + let entry = eq_delete_file_scan_tasks_map + .entry(OwnedRow::new( + row.iter() + .map(|scalar_ref| scalar_ref.map(Into::into)) + .collect_vec(), + )) .or_default(); - for datum in array.get_all_values() { - let entry = each_column_seq_num_map - .entry(datum) - .or_insert(sequence_number); - *entry = *entry.max(&mut sequence_number); - } + *entry = *entry.max(&mut sequence_number); } } } @@ -152,6 +150,7 @@ impl IcebergScanExecutor { for data_file_scan_task in data_file_scan_tasks { let data_sequence_number = data_file_scan_task.sequence_number; + let reader = table .reader_builder() .with_batch_size(self.batch_size) @@ -164,55 +163,57 @@ impl IcebergScanExecutor { while let Some(record_batch) = record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; - let column_names = record_batch - .schema() + + let arrow_schema = record_batch.schema(); + let column_names = arrow_schema .fields() .iter() .map(|field| field.name()) - .cloned() .collect_vec(); let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - let visibilitys: Vec<_> = chunk - .columns() - .iter() - .zip_eq_fast(column_names.clone()) - .filter_map(|(array, column_map)| { - if let Some(each_column_seq_num_map) = - eq_delete_file_scan_tasks_map.get(&column_map) - { - let visibility = - Bitmap::from_iter(array.get_all_values().iter().map(|datum| { - if let Some(delete_sequence_number) = - each_column_seq_num_map.get(datum) - && delete_sequence_number > &data_sequence_number - { - false - } else { - true - } - })); - Some(visibility) - } else { - None - } - }) - .collect(); - let (data, chunk_visibilitys) = chunk.into_parts_v2(); - let visibility = if visibilitys.is_empty() { - chunk_visibilitys - } else { - // Calculate the result of the or operation for different columns of the bitmap - visibilitys - .iter() - .skip(1) - .fold(visibilitys[0].clone(), |acc, bitmap| acc.bitor(bitmap)) + let (data, visibility) = match delete_column_names { + Some(ref delete_column_names) => { + let column_ids = column_names + .iter() + .enumerate() + .filter_map(|(id, column_name)| { + if delete_column_names.contains(column_name) { + Some(id) + } else { + None + } + }) + .collect_vec(); + let visibility = + Bitmap::from_iter(chunk.project(&column_ids).rows().map(|row_ref| { + let row = OwnedRow::new( + row_ref + .iter() + .map(|scalar_ref| scalar_ref.map(Into::into)) + .collect_vec(), + ); + if let Some(delete_sequence_number) = + eq_delete_file_scan_tasks_map.get(&row) + && delete_sequence_number > &data_sequence_number + { + false + } else { + true + } + })) + .clone(); + let (data, _chunk_visibilities) = chunk.into_parts_v2(); + (data, visibility) + } + None => chunk.into_parts_v2(), }; + let data = data .iter() .zip_eq_fast(column_names) .filter_map(|(array, columns)| { chunk_schema_name_to_id - .get(&columns) + .get(columns) .map(|&id| (id, array.clone())) }) .sorted_by_key(|a| a.0) diff --git a/src/common/src/array/mod.rs b/src/common/src/array/mod.rs index 54840198ccca0..b34e5f9b9c470 100644 --- a/src/common/src/array/mod.rs +++ b/src/common/src/array/mod.rs @@ -603,10 +603,6 @@ impl ArrayImpl { }) } - pub fn get_all_values(&self) -> Vec { - (0..self.len()).map(|i| self.datum_at(i)).collect() - } - /// # Safety /// /// This function is unsafe because it does not check the validity of `idx`. It is caller's diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 8d2f1e97f6f42..fd14f88100115 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -259,12 +259,13 @@ impl IcebergSplitEnumerator { #[for_await] for task in file_scan_stream { - let task: FileScanTask = task.map_err(|e| anyhow!(e))?; + let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?; match task.data_file_content { iceberg::spec::DataContentType::Data => { data_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } iceberg::spec::DataContentType::EqualityDeletes => { + task.project_field_ids = task.equality_ids.clone(); eq_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } iceberg::spec::DataContentType::PositionDeletes => {