From cb94bd45d3b8f18203a930badce7bb4b10da0c48 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Mon, 29 Jul 2024 14:20:07 +0800 Subject: [PATCH] fix(fulltext-search): prune rows in row group forget to take remainder (#4447) * fix(fulltext-search): prune rows in row group forget to take remainder Signed-off-by: Zhenchi * test: add unit test Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/mito2/src/sst/parquet/reader.rs | 78 +++++++++++++++------- src/mito2/src/sst/parquet/row_selection.rs | 4 +- 2 files changed, 57 insertions(+), 25 deletions(-) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4e4956b7d1b2..3f7400557869 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -14,7 +14,7 @@ //! Parquet reader. -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -408,13 +408,11 @@ impl ParquetReaderBuilder { } }; - let row_group_to_row_ids = apply_res - .into_iter() - .group_by(|row_id| *row_id as usize / row_group_size); - + let row_group_to_row_ids = + Self::group_row_ids(apply_res, row_group_size, parquet_meta.num_row_groups()); Self::prune_row_groups_by_rows( parquet_meta, - row_group_to_row_ids.into_iter(), + row_group_to_row_ids, output, &mut metrics.num_row_groups_fulltext_index_filtered, &mut metrics.num_rows_in_row_group_fulltext_index_filtered, @@ -423,6 +421,33 @@ impl ParquetReaderBuilder { true } + /// Groups row IDs into row groups, with each group's row IDs starting from 0. + fn group_row_ids( + row_ids: BTreeSet, + row_group_size: usize, + num_row_groups: usize, + ) -> Vec<(usize, Vec)> { + let est_rows_per_group = row_ids.len() / num_row_groups; + + let mut row_group_to_row_ids: Vec<(usize, Vec)> = Vec::with_capacity(num_row_groups); + for row_id in row_ids { + let row_group_id = row_id as usize / row_group_size; + let row_id_in_group = row_id as usize % row_group_size; + + if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut() + && *rg_id == row_group_id + { + row_ids.push(row_id_in_group); + } else { + let mut row_ids = Vec::with_capacity(est_rows_per_group); + row_ids.push(row_id_in_group); + row_group_to_row_ids.push((row_group_id, row_ids)); + } + } + + row_group_to_row_ids + } + /// Applies index to prune row groups. /// /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices @@ -540,7 +565,7 @@ impl ParquetReaderBuilder { /// a list of row ids to keep. fn prune_row_groups_by_rows( parquet_meta: &ParquetMetaData, - rows_in_row_groups: impl Iterator)>, + rows_in_row_groups: Vec<(usize, Vec)>, output: &mut BTreeMap>, filtered_row_groups: &mut usize, filtered_rows: &mut usize, @@ -560,7 +585,8 @@ impl ParquetReaderBuilder { .as_ref() .map_or(total_row_count, |s| s.row_count()); - let new_selection = row_selection_from_sorted_row_ids(row_ids, total_row_count); + let new_selection = + row_selection_from_sorted_row_ids(row_ids.into_iter(), total_row_count); let intersected_selection = intersect_row_selections(selection, Some(new_selection)); let num_rows_after = intersected_selection @@ -1159,14 +1185,26 @@ mod tests { ParquetMetaData::new(file_meta, row_groups) } + #[test] + fn test_group_row_ids() { + let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect(); + let row_group_size = 5; + let num_row_groups = 3; + + let row_group_to_row_ids = + ParquetReaderBuilder::group_row_ids(row_ids, row_group_size, num_row_groups); + + assert_eq!( + row_group_to_row_ids, + vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])] + ); + } + #[test] fn prune_row_groups_by_rows_from_empty() { let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - let rows_in_row_groups = [ - (0, [5, 6, 7, 8, 9].into_iter()), - (2, [0, 1, 2, 3, 4].into_iter()), - ]; + let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])]; // The original output is empty. No row groups are pruned. let mut output = BTreeMap::new(); @@ -1175,7 +1213,7 @@ mod tests { ParquetReaderBuilder::prune_row_groups_by_rows( &parquet_meta, - rows_in_row_groups.into_iter(), + rows_in_row_groups, &mut output, &mut filtered_row_groups, &mut filtered_rows, @@ -1190,10 +1228,7 @@ mod tests { fn prune_row_groups_by_rows_from_full() { let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - let rows_in_row_groups = [ - (0, [5, 6, 7, 8, 9].into_iter()), - (2, [0, 1, 2, 3, 4].into_iter()), - ]; + let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])]; // The original output is full. let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]); @@ -1202,7 +1237,7 @@ mod tests { ParquetReaderBuilder::prune_row_groups_by_rows( &parquet_meta, - rows_in_row_groups.into_iter(), + rows_in_row_groups, &mut output, &mut filtered_row_groups, &mut filtered_rows, @@ -1229,10 +1264,7 @@ mod tests { fn prune_row_groups_by_rows_from_not_full() { let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); - let rows_in_row_groups = [ - (0, [5, 6, 7, 8, 9].into_iter()), - (2, [0, 1, 2, 3, 4].into_iter()), - ]; + let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])]; // The original output is not full. let mut output = BTreeMap::from([ @@ -1257,7 +1289,7 @@ mod tests { ParquetReaderBuilder::prune_row_groups_by_rows( &parquet_meta, - rows_in_row_groups.into_iter(), + rows_in_row_groups, &mut output, &mut filtered_row_groups, &mut filtered_rows, diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 0750d7d2fd67..6b83242b73cb 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -53,14 +53,14 @@ pub(crate) fn row_selection_from_row_ranges( /// Note: the input iterator must be sorted in ascending order and /// contain unique row IDs in the range [0, total_row_count). pub(crate) fn row_selection_from_sorted_row_ids( - row_ids: impl Iterator, + row_ids: impl Iterator, total_row_count: usize, ) -> RowSelection { let mut selectors: Vec = Vec::new(); let mut last_processed_end = 0; for row_id in row_ids { - let start = row_id as usize; + let start = row_id; let end = start + 1; if start > last_processed_end {