Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 11, 2024
1 parent 62644fe commit 3a35be3
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 69 deletions.
129 changes: 65 additions & 64 deletions src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::ops::BitOr;
use std::collections::HashMap;
use std::mem;

Expand All @@ -24,6 +23,7 @@ use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::sink::iceberg::IcebergConfig;
Expand Down Expand Up @@ -101,17 +101,13 @@ impl IcebergScanExecutor {
.map(|(k, v)| (v.clone(), k))
.collect::<HashMap<_, _>>();

// The value to remove from the column and its seq_num.
let mut eq_delete_file_scan_tasks_map: HashMap<
String,
HashMap<Option<risingwave_common::types::ScalarImpl>, i64>,
> = HashMap::default();
let mut eq_delete_file_scan_tasks_map: HashMap<OwnedRow, i64> = HashMap::default();
let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks);

for mut eq_delete_file_scan_task in eq_delete_file_scan_tasks {
eq_delete_file_scan_task.project_field_ids =
eq_delete_file_scan_task.equality_ids.clone();
let mut delete_column_names = None;
for eq_delete_file_scan_task in eq_delete_file_scan_tasks {
let mut sequence_number = eq_delete_file_scan_task.sequence_number;

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
Expand All @@ -124,26 +120,28 @@ impl IcebergScanExecutor {

while let Some(record_batch) = delete_record_batch_stream.next().await {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;
let delete_column_names = record_batch
.schema()
.fields()
.iter()
.map(|field| field.name())
.cloned()
.collect_vec();
if delete_column_names.is_none() {
delete_column_names = Some(
record_batch
.schema()
.fields()
.iter()
.map(|field| field.name())
.cloned()
.collect_vec(),
);
}

let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
for (array, columns_name) in chunk.columns().iter().zip_eq_fast(delete_column_names)
{
let each_column_seq_num_map = eq_delete_file_scan_tasks_map
.entry(columns_name)
for row in chunk.rows() {
let entry = eq_delete_file_scan_tasks_map
.entry(OwnedRow::new(
row.iter()
.map(|scalar_ref| scalar_ref.map(Into::into))
.collect_vec(),
))
.or_default();
for datum in array.get_all_values() {
let entry = each_column_seq_num_map
.entry(datum)
.or_insert(sequence_number);
*entry = *entry.max(&mut sequence_number);
}
*entry = *entry.max(&mut sequence_number);
}
}
}
Expand All @@ -152,6 +150,7 @@ impl IcebergScanExecutor {

for data_file_scan_task in data_file_scan_tasks {
let data_sequence_number = data_file_scan_task.sequence_number;

let reader = table
.reader_builder()
.with_batch_size(self.batch_size)
Expand All @@ -164,55 +163,57 @@ impl IcebergScanExecutor {

while let Some(record_batch) = record_batch_stream.next().await {
let record_batch = record_batch.map_err(BatchError::Iceberg)?;
let column_names = record_batch
.schema()

let arrow_schema = record_batch.schema();
let column_names = arrow_schema
.fields()
.iter()
.map(|field| field.name())
.cloned()
.collect_vec();
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
let visibilitys: Vec<_> = chunk
.columns()
.iter()
.zip_eq_fast(column_names.clone())
.filter_map(|(array, column_map)| {
if let Some(each_column_seq_num_map) =
eq_delete_file_scan_tasks_map.get(&column_map)
{
let visibility =
Bitmap::from_iter(array.get_all_values().iter().map(|datum| {
if let Some(delete_sequence_number) =
each_column_seq_num_map.get(datum)
&& delete_sequence_number > &data_sequence_number
{
false
} else {
true
}
}));
Some(visibility)
} else {
None
}
})
.collect();
let (data, chunk_visibilitys) = chunk.into_parts_v2();
let visibility = if visibilitys.is_empty() {
chunk_visibilitys
} else {
// Calculate the result of the or operation for different columns of the bitmap
visibilitys
.iter()
.skip(1)
.fold(visibilitys[0].clone(), |acc, bitmap| acc.bitor(bitmap))
let (data, visibility) = match delete_column_names {
Some(ref delete_column_names) => {
let column_ids = column_names
.iter()
.enumerate()
.filter_map(|(id, column_name)| {
if delete_column_names.contains(column_name) {
Some(id)
} else {
None
}
})
.collect_vec();
let visibility =
Bitmap::from_iter(chunk.project(&column_ids).rows().map(|row_ref| {
let row = OwnedRow::new(
row_ref
.iter()
.map(|scalar_ref| scalar_ref.map(Into::into))
.collect_vec(),
);
if let Some(delete_sequence_number) =
eq_delete_file_scan_tasks_map.get(&row)
&& delete_sequence_number > &data_sequence_number
{
false
} else {
true
}
}))
.clone();
let (data, _chunk_visibilities) = chunk.into_parts_v2();
(data, visibility)
}
None => chunk.into_parts_v2(),
};

let data = data
.iter()
.zip_eq_fast(column_names)
.filter_map(|(array, columns)| {
chunk_schema_name_to_id
.get(&columns)
.get(columns)
.map(|&id| (id, array.clone()))
})
.sorted_by_key(|a| a.0)
Expand Down
4 changes: 0 additions & 4 deletions src/common/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,6 @@ impl ArrayImpl {
})
}

pub fn get_all_values(&self) -> Vec<Datum> {
(0..self.len()).map(|i| self.datum_at(i)).collect()
}

/// # Safety
///
/// This function is unsafe because it does not check the validity of `idx`. It is caller's
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,13 @@ impl IcebergSplitEnumerator {

#[for_await]
for task in file_scan_stream {
let task: FileScanTask = task.map_err(|e| anyhow!(e))?;
let mut task: FileScanTask = task.map_err(|e| anyhow!(e))?;
match task.data_file_content {
iceberg::spec::DataContentType::Data => {
data_files.push(IcebergFileScanTaskJsonStr::serialize(&task));
}
iceberg::spec::DataContentType::EqualityDeletes => {
task.project_field_ids = task.equality_ids.clone();
eq_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task));
}
iceberg::spec::DataContentType::PositionDeletes => {
Expand Down

0 comments on commit 3a35be3

Please sign in to comment.