Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 10, 2024
1 parent 4ee73ba commit 62644fe
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
Expand All @@ -41,7 +42,7 @@ pub struct IcebergScanExecutor {
#[allow(dead_code)]
snapshot_id: Option<i64>,
table_meta: TableMetadata,
file_scan_tasks: Vec<FileScanTask>,
data_file_scan_tasks: Vec<FileScanTask>,
eq_delete_file_scan_tasks: Vec<FileScanTask>,
batch_size: usize,
schema: Schema,
Expand All @@ -67,7 +68,7 @@ impl IcebergScanExecutor {
iceberg_config: IcebergConfig,
snapshot_id: Option<i64>,
table_meta: TableMetadata,
file_scan_tasks: Vec<FileScanTask>,
data_file_scan_tasks: Vec<FileScanTask>,
eq_delete_file_scan_tasks: Vec<FileScanTask>,
batch_size: usize,
schema: Schema,
Expand All @@ -77,7 +78,7 @@ impl IcebergScanExecutor {
iceberg_config,
snapshot_id,
table_meta,
file_scan_tasks,
data_file_scan_tasks,
eq_delete_file_scan_tasks,
batch_size,
schema,
Expand All @@ -100,6 +101,7 @@ impl IcebergScanExecutor {
.map(|(k, v)| (v.clone(), k))
.collect::<HashMap<_, _>>();

// The value to remove from the column and its seq_num.
let mut eq_delete_file_scan_tasks_map: HashMap<
String,
HashMap<Option<risingwave_common::types::ScalarImpl>, i64>,
Expand Down Expand Up @@ -131,7 +133,8 @@ impl IcebergScanExecutor {
.collect_vec();

let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
for (array, columns_name) in chunk.columns().iter().zip_eq(delete_column_names) {
for (array, columns_name) in chunk.columns().iter().zip_eq_fast(delete_column_names)
{
let each_column_seq_num_map = eq_delete_file_scan_tasks_map
.entry(columns_name)
.or_default();
Expand All @@ -145,15 +148,15 @@ impl IcebergScanExecutor {
}
}

let file_scan_tasks = mem::take(&mut self.file_scan_tasks);
let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks);

for file_scan_task in file_scan_tasks {
let sequence_number = file_scan_task.sequence_number;
for data_file_scan_task in data_file_scan_tasks {
let data_sequence_number = data_file_scan_task.sequence_number;
let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
.build();
let file_scan_stream = tokio_stream::once(Ok(file_scan_task));
let file_scan_stream = tokio_stream::once(Ok(data_file_scan_task));

let mut record_batch_stream = reader
.read(Box::pin(file_scan_stream))
Expand All @@ -172,15 +175,16 @@ impl IcebergScanExecutor {
let visibilitys: Vec<_> = chunk
.columns()
.iter()
.zip_eq(column_names.clone())
.zip_eq_fast(column_names.clone())
.filter_map(|(array, column_map)| {
if let Some(each_column_seq_num_map) =
eq_delete_file_scan_tasks_map.get(&column_map)
{
let visibility =
Bitmap::from_iter(array.get_all_values().iter().map(|datum| {
if let Some(s) = each_column_seq_num_map.get(datum)
&& s > &sequence_number
if let Some(delete_sequence_number) =
each_column_seq_num_map.get(datum)
&& delete_sequence_number > &data_sequence_number
{
false
} else {
Expand All @@ -193,25 +197,26 @@ impl IcebergScanExecutor {
}
})
.collect();
let (data, va) = chunk.into_parts_v2();
let (data, chunk_visibilitys) = chunk.into_parts_v2();
let visibility = if visibilitys.is_empty() {
va
chunk_visibilitys
} else {
// Calculate the result of the or operation for different columns of the bitmap
visibilitys
.iter()
.skip(1)
.fold(visibilitys[0].clone(), |acc, bitmap| acc.bitor(bitmap))
};
let data = data
.iter()
.zip_eq(column_names)
.zip_eq_fast(column_names)
.filter_map(|(array, columns)| {
chunk_schema_name_to_id
.get(&columns)
.map(|&id| (id, array.clone()))
})
.sorted_by_key(|a| a.0)
.map(|(k, v)| v)
.map(|(_k, v)| v)
.collect_vec();
let chunk = DataChunk::new(data, visibility);
debug_assert_eq!(chunk.data_types(), data_types);
Expand Down

0 comments on commit 62644fe

Please sign in to comment.