Skip to content

Commit

Permalink
fix: prune row groups correctly for columns with the same name (#3802)
Browse files Browse the repository at this point in the history
* test: add prune test case

* fix: use latest region metadata to get column id

* test: sort output
  • Loading branch information
evenyag authored Apr 26, 2024
1 parent 4eadd9f commit 77fc1e6
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ impl ScanInput {
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build()
.await;
let reader = match maybe_reader {
Expand Down
28 changes: 21 additions & 7 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub(crate) struct ParquetReaderBuilder {
cache_manager: Option<CacheManagerRef>,
/// Index applier.
index_applier: Option<SstIndexApplierRef>,
/// Expected metadata of the region while reading the SST.
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
expected_metadata: Option<RegionMetadataRef>,
}

impl ParquetReaderBuilder {
Expand All @@ -95,16 +99,19 @@ impl ParquetReaderBuilder {
projection: None,
cache_manager: None,
index_applier: None,
expected_metadata: None,
}
}

/// Attaches the predicate to the builder.
#[must_use]
pub fn predicate(mut self, predicate: Option<Predicate>) -> ParquetReaderBuilder {
self.predicate = predicate;
self
}

/// Attaches the time range to the builder.
#[must_use]
pub fn time_range(mut self, time_range: Option<TimestampRange>) -> ParquetReaderBuilder {
self.time_range = time_range;
self
Expand All @@ -113,12 +120,14 @@ impl ParquetReaderBuilder {
/// Attaches the projection to the builder.
///
/// The reader only applies the projection to fields.
#[must_use]
pub fn projection(mut self, projection: Option<Vec<ColumnId>>) -> ParquetReaderBuilder {
self.projection = projection;
self
}

/// Attaches the cache to the builder.
#[must_use]
pub fn cache(mut self, cache: Option<CacheManagerRef>) -> ParquetReaderBuilder {
self.cache_manager = cache;
self
Expand All @@ -131,6 +140,13 @@ impl ParquetReaderBuilder {
self
}

/// Attaches the expected metadata to the builder.
#[must_use]
pub fn expected_metadata(mut self, expected_metadata: Option<RegionMetadataRef>) -> Self {
self.expected_metadata = expected_metadata;
self
}

/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
Expand Down Expand Up @@ -386,14 +402,12 @@ impl ParquetReaderBuilder {
let num_row_groups = parquet_meta.num_row_groups();

let region_meta = read_format.metadata();
let column_ids = region_meta
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect();

let row_groups = parquet_meta.row_groups();
let stats = RowGroupPruningStats::new(row_groups, read_format, column_ids);
let stats =
RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone());
// Here we use the schema of the SST to build the physical expression. If the column
// in the SST doesn't have the same column id as the column in the expected metadata,
// we will get a None statistics for that column.
let row_groups = predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
.iter()
Expand Down
24 changes: 13 additions & 11 deletions src/mito2/src/sst/parquet/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use datafusion::physical_optimizer::pruning::PruningStatistics;
use datafusion_common::{Column, ScalarValue};
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use parquet::file::metadata::RowGroupMetaData;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;

use crate::sst::parquet::format::ReadFormat;
Expand All @@ -31,34 +32,35 @@ pub(crate) struct RowGroupPruningStats<'a, T> {
row_groups: &'a [T],
/// Helper to read the SST.
read_format: &'a ReadFormat,
/// Projected column ids to read.
///
/// We need column ids to distinguish different columns with the same name.
/// e.g. Drops and then adds a column again.
column_ids: HashSet<ColumnId>,
/// The metadata of the region.
/// It contains the schema a query expects to read. If it is not None, we use it instead
/// of the metadata in the SST to get the column id of a column as the SST may have
/// different columns.
expected_metadata: Option<RegionMetadataRef>,
}

impl<'a, T> RowGroupPruningStats<'a, T> {
/// Creates a new statistics to prune specific `row_groups`.
pub(crate) fn new(
row_groups: &'a [T],
read_format: &'a ReadFormat,
column_ids: HashSet<ColumnId>,
expected_metadata: Option<RegionMetadataRef>,
) -> Self {
Self {
row_groups,
read_format,
column_ids,
expected_metadata,
}
}

/// Returns the column id of specific column name if we need to read it.
fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
let metadata = self
.expected_metadata
.as_ref()
.unwrap_or_else(|| self.read_format.metadata());
// Only use stats when the column to read has the same id as the column in the SST.
self.read_format
.metadata()
.column_by_name(name)
.and_then(|col| self.column_ids.get(&col.column_id).copied())
metadata.column_by_name(name).map(|col| col.column_id)
}
}

Expand Down
67 changes: 67 additions & 0 deletions tests/cases/standalone/common/alter/drop_add_col.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
CREATE TABLE test(i TIMESTAMP TIME INDEX, j INTEGER, k INTEGER NOT NULL);

Affected Rows: 0

INSERT INTO test(i, j, k) VALUES (1, 11, 5), (2, 12, 5);

Affected Rows: 2

SELECT * FROM test order by i;

+-------------------------+----+---+
| i | j | k |
+-------------------------+----+---+
| 1970-01-01T00:00:00.001 | 11 | 5 |
| 1970-01-01T00:00:00.002 | 12 | 5 |
+-------------------------+----+---+

SELECT FLUSH_TABLE('test');

+---------------------------+
| flush_table(Utf8("test")) |
+---------------------------+
| 0 |
+---------------------------+

ALTER TABLE test DROP COLUMN j;

Affected Rows: 0

ALTER TABLE test ADD COLUMN j INTEGER DEFAULT 0;

Affected Rows: 0

INSERT INTO test(i, j, k) VALUES (3, 0, 6);

Affected Rows: 1

INSERT INTO test VALUES (4, 7, 0);

Affected Rows: 1

SELECT * FROM test order by i;

+-------------------------+---+---+
| i | k | j |
+-------------------------+---+---+
| 1970-01-01T00:00:00.001 | 5 | 0 |
| 1970-01-01T00:00:00.002 | 5 | 0 |
| 1970-01-01T00:00:00.003 | 6 | 0 |
| 1970-01-01T00:00:00.004 | 7 | 0 |
+-------------------------+---+---+

SELECT * FROM test WHERE j = 0 order by i;

+-------------------------+---+---+
| i | k | j |
+-------------------------+---+---+
| 1970-01-01T00:00:00.001 | 5 | 0 |
| 1970-01-01T00:00:00.002 | 5 | 0 |
| 1970-01-01T00:00:00.003 | 6 | 0 |
| 1970-01-01T00:00:00.004 | 7 | 0 |
+-------------------------+---+---+

DROP TABLE test;

Affected Rows: 0

21 changes: 21 additions & 0 deletions tests/cases/standalone/common/alter/drop_add_col.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE test(i TIMESTAMP TIME INDEX, j INTEGER, k INTEGER NOT NULL);

INSERT INTO test(i, j, k) VALUES (1, 11, 5), (2, 12, 5);

SELECT * FROM test order by i;

SELECT FLUSH_TABLE('test');

ALTER TABLE test DROP COLUMN j;

ALTER TABLE test ADD COLUMN j INTEGER DEFAULT 0;

INSERT INTO test(i, j, k) VALUES (3, 0, 6);

INSERT INTO test VALUES (4, 7, 0);

SELECT * FROM test order by i;

SELECT * FROM test WHERE j = 0 order by i;

DROP TABLE test;

0 comments on commit 77fc1e6

Please sign in to comment.