From 77fc1e6de03a1c1ac29fe512d65cc67569f81be7 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 26 Apr 2024 14:52:23 +0800 Subject: [PATCH] fix: prune row groups correctly for columns with the same name (#3802) * test: add prune test case * fix: use latest region metadata to get column id * test: sort output --- src/mito2/src/read/scan_region.rs | 1 + src/mito2/src/sst/parquet/reader.rs | 28 ++++++-- src/mito2/src/sst/parquet/stats.rs | 24 ++++--- .../common/alter/drop_add_col.result | 67 +++++++++++++++++++ .../standalone/common/alter/drop_add_col.sql | 21 ++++++ 5 files changed, 123 insertions(+), 18 deletions(-) create mode 100644 tests/cases/standalone/common/alter/drop_add_col.result create mode 100644 tests/cases/standalone/common/alter/drop_add_col.sql diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 0ba6290c6950..a33765479b11 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -509,6 +509,7 @@ impl ScanInput { .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) .index_applier(self.index_applier.clone()) + .expected_metadata(Some(self.mapper.metadata().clone())) .build() .await; let reader = match maybe_reader { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 71996eba5b12..72ec6c0528dd 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -77,6 +77,10 @@ pub(crate) struct ParquetReaderBuilder { cache_manager: Option, /// Index applier. index_applier: Option, + /// Expected metadata of the region while reading the SST. + /// This is usually the latest metadata of the region. The reader use + /// it get the correct column id of a column by name. + expected_metadata: Option, } impl ParquetReaderBuilder { @@ -95,16 +99,19 @@ impl ParquetReaderBuilder { projection: None, cache_manager: None, index_applier: None, + expected_metadata: None, } } /// Attaches the predicate to the builder. + #[must_use] pub fn predicate(mut self, predicate: Option) -> ParquetReaderBuilder { self.predicate = predicate; self } /// Attaches the time range to the builder. + #[must_use] pub fn time_range(mut self, time_range: Option) -> ParquetReaderBuilder { self.time_range = time_range; self @@ -113,12 +120,14 @@ impl ParquetReaderBuilder { /// Attaches the projection to the builder. /// /// The reader only applies the projection to fields. + #[must_use] pub fn projection(mut self, projection: Option>) -> ParquetReaderBuilder { self.projection = projection; self } /// Attaches the cache to the builder. + #[must_use] pub fn cache(mut self, cache: Option) -> ParquetReaderBuilder { self.cache_manager = cache; self @@ -131,6 +140,13 @@ impl ParquetReaderBuilder { self } + /// Attaches the expected metadata to the builder. + #[must_use] + pub fn expected_metadata(mut self, expected_metadata: Option) -> Self { + self.expected_metadata = expected_metadata; + self + } + /// Builds and initializes a [ParquetReader]. /// /// This needs to perform IO operation. @@ -386,14 +402,12 @@ impl ParquetReaderBuilder { let num_row_groups = parquet_meta.num_row_groups(); let region_meta = read_format.metadata(); - let column_ids = region_meta - .column_metadatas - .iter() - .map(|c| c.column_id) - .collect(); - let row_groups = parquet_meta.row_groups(); - let stats = RowGroupPruningStats::new(row_groups, read_format, column_ids); + let stats = + RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone()); + // Here we use the schema of the SST to build the physical expression. If the column + // in the SST doesn't have the same column id as the column in the expected metadata, + // we will get a None statistics for that column. let row_groups = predicate .prune_with_stats(&stats, region_meta.schema.arrow_schema()) .iter() diff --git a/src/mito2/src/sst/parquet/stats.rs b/src/mito2/src/sst/parquet/stats.rs index 43e0e3cfb963..61360b1f7085 100644 --- a/src/mito2/src/sst/parquet/stats.rs +++ b/src/mito2/src/sst/parquet/stats.rs @@ -21,6 +21,7 @@ use datafusion::physical_optimizer::pruning::PruningStatistics; use datafusion_common::{Column, ScalarValue}; use datatypes::arrow::array::{ArrayRef, BooleanArray}; use parquet::file::metadata::RowGroupMetaData; +use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use crate::sst::parquet::format::ReadFormat; @@ -31,11 +32,11 @@ pub(crate) struct RowGroupPruningStats<'a, T> { row_groups: &'a [T], /// Helper to read the SST. read_format: &'a ReadFormat, - /// Projected column ids to read. - /// - /// We need column ids to distinguish different columns with the same name. - /// e.g. Drops and then adds a column again. - column_ids: HashSet, + /// The metadata of the region. + /// It contains the schema a query expects to read. If it is not None, we use it instead + /// of the metadata in the SST to get the column id of a column as the SST may have + /// different columns. + expected_metadata: Option, } impl<'a, T> RowGroupPruningStats<'a, T> { @@ -43,22 +44,23 @@ impl<'a, T> RowGroupPruningStats<'a, T> { pub(crate) fn new( row_groups: &'a [T], read_format: &'a ReadFormat, - column_ids: HashSet, + expected_metadata: Option, ) -> Self { Self { row_groups, read_format, - column_ids, + expected_metadata, } } /// Returns the column id of specific column name if we need to read it. fn column_id_to_prune(&self, name: &str) -> Option { + let metadata = self + .expected_metadata + .as_ref() + .unwrap_or_else(|| self.read_format.metadata()); // Only use stats when the column to read has the same id as the column in the SST. - self.read_format - .metadata() - .column_by_name(name) - .and_then(|col| self.column_ids.get(&col.column_id).copied()) + metadata.column_by_name(name).map(|col| col.column_id) } } diff --git a/tests/cases/standalone/common/alter/drop_add_col.result b/tests/cases/standalone/common/alter/drop_add_col.result new file mode 100644 index 000000000000..a677c097d73a --- /dev/null +++ b/tests/cases/standalone/common/alter/drop_add_col.result @@ -0,0 +1,67 @@ +CREATE TABLE test(i TIMESTAMP TIME INDEX, j INTEGER, k INTEGER NOT NULL); + +Affected Rows: 0 + +INSERT INTO test(i, j, k) VALUES (1, 11, 5), (2, 12, 5); + +Affected Rows: 2 + +SELECT * FROM test order by i; + ++-------------------------+----+---+ +| i | j | k | ++-------------------------+----+---+ +| 1970-01-01T00:00:00.001 | 11 | 5 | +| 1970-01-01T00:00:00.002 | 12 | 5 | ++-------------------------+----+---+ + +SELECT FLUSH_TABLE('test'); + ++---------------------------+ +| flush_table(Utf8("test")) | ++---------------------------+ +| 0 | ++---------------------------+ + +ALTER TABLE test DROP COLUMN j; + +Affected Rows: 0 + +ALTER TABLE test ADD COLUMN j INTEGER DEFAULT 0; + +Affected Rows: 0 + +INSERT INTO test(i, j, k) VALUES (3, 0, 6); + +Affected Rows: 1 + +INSERT INTO test VALUES (4, 7, 0); + +Affected Rows: 1 + +SELECT * FROM test order by i; + ++-------------------------+---+---+ +| i | k | j | ++-------------------------+---+---+ +| 1970-01-01T00:00:00.001 | 5 | 0 | +| 1970-01-01T00:00:00.002 | 5 | 0 | +| 1970-01-01T00:00:00.003 | 6 | 0 | +| 1970-01-01T00:00:00.004 | 7 | 0 | ++-------------------------+---+---+ + +SELECT * FROM test WHERE j = 0 order by i; + ++-------------------------+---+---+ +| i | k | j | ++-------------------------+---+---+ +| 1970-01-01T00:00:00.001 | 5 | 0 | +| 1970-01-01T00:00:00.002 | 5 | 0 | +| 1970-01-01T00:00:00.003 | 6 | 0 | +| 1970-01-01T00:00:00.004 | 7 | 0 | ++-------------------------+---+---+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/drop_add_col.sql b/tests/cases/standalone/common/alter/drop_add_col.sql new file mode 100644 index 000000000000..0fd9795efd19 --- /dev/null +++ b/tests/cases/standalone/common/alter/drop_add_col.sql @@ -0,0 +1,21 @@ +CREATE TABLE test(i TIMESTAMP TIME INDEX, j INTEGER, k INTEGER NOT NULL); + +INSERT INTO test(i, j, k) VALUES (1, 11, 5), (2, 12, 5); + +SELECT * FROM test order by i; + +SELECT FLUSH_TABLE('test'); + +ALTER TABLE test DROP COLUMN j; + +ALTER TABLE test ADD COLUMN j INTEGER DEFAULT 0; + +INSERT INTO test(i, j, k) VALUES (3, 0, 6); + +INSERT INTO test VALUES (4, 7, 0); + +SELECT * FROM test order by i; + +SELECT * FROM test WHERE j = 0 order by i; + +DROP TABLE test;