diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index b012c767f620..a9764e4d79bf 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -35,6 +35,7 @@ use crate::read::compat::CompatBatch; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::{ScanInput, ScanPart}; use crate::read::Source; +use crate::sst::parquet::reader::ReaderMetrics; /// Scans a region without providing any output ordering guarantee. /// @@ -192,8 +193,8 @@ impl RegionScanner for UnorderedScan { yield batch; } } - // TODO(yingwen): metrics. // Then scans file ranges. + let mut reader_metrics = ReaderMetrics::default(); for file_range in file_ranges { let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?; let compat_batch = file_range.compat_batch(); @@ -203,11 +204,17 @@ impl RegionScanner for UnorderedScan { metrics.num_rows += batch.num_rows(); yield batch; } + if let Source::RowGroupReader(reader) = source { + reader_metrics.merge_from(reader.metrics()); + } } metrics.total_cost = query_start.elapsed(); Self::observe_metrics_on_finish(&metrics); - debug!("Unordered scan partition {} finished, region_id: {}, metrics: {:?}", partition, mapper.metadata().region_id, metrics); + debug!( + "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}", + partition, mapper.metadata().region_id, metrics, reader_metrics + ); }; let stream = Box::pin(RecordBatchStreamWrapper::new( self.input.mapper.output_schema(), diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 630d1823cb34..cad0b67396e7 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -195,7 +195,7 @@ impl ParquetReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadParquetSnafu { path: &file_path })?; - let mut metrics = Metrics::default(); + let mut metrics = ReaderMetrics::default(); let row_groups = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) @@ -303,7 +303,7 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - metrics: &mut Metrics, + metrics: &mut ReaderMetrics, ) -> BTreeMap> { let num_row_groups = parquet_meta.num_row_groups(); if num_row_groups == 0 { @@ -325,7 +325,7 @@ impl ParquetReaderBuilder { async fn prune_row_groups_by_inverted_index( &self, parquet_meta: &ParquetMetaData, - metrics: &mut Metrics, + metrics: &mut ReaderMetrics, ) -> Option>> { let Some(index_applier) = &self.index_applier else { return None; @@ -407,7 +407,7 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - metrics: &mut Metrics, + metrics: &mut ReaderMetrics, ) -> Option>> { let Some(predicate) = &self.predicate else { return None; @@ -439,7 +439,7 @@ impl ParquetReaderBuilder { /// Parquet reader metrics. #[derive(Debug, Default)] -struct Metrics { +pub(crate) struct ReaderMetrics { /// Number of row groups before filtering. num_row_groups_before_filtering: usize, /// Number of row groups filtered by inverted index. @@ -464,6 +464,24 @@ struct Metrics { num_rows: usize, } +impl ReaderMetrics { + /// Adds `other` metrics to this metrics. + pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) { + self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; + self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; + self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; + self.num_rows_precise_filtered += other.num_rows_precise_filtered; + self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; + self.num_rows_in_row_group_inverted_index_filtered += + other.num_rows_in_row_group_inverted_index_filtered; + self.build_cost += other.build_cost; + self.scan_cost += other.scan_cost; + self.num_record_batches += other.num_record_batches; + self.num_batches += other.num_batches; + self.num_rows += other.num_rows; + } +} + /// Builder to build a [ParquetRecordBatchReader] for a row group. pub(crate) struct RowGroupReaderBuilder { /// SST file to read. @@ -532,12 +550,12 @@ enum ReaderState { /// The reader is reading a row group. Readable(RowGroupReader), /// The reader is exhausted. - Exhausted(Metrics), + Exhausted(ReaderMetrics), } impl ReaderState { /// Returns the metrics of the reader. - fn metrics(&self) -> &Metrics { + fn metrics(&self) -> &ReaderMetrics { match self { ReaderState::Readable(reader) => &reader.metrics, ReaderState::Exhausted(m) => m, @@ -719,7 +737,7 @@ impl ParquetReader { .await?; ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader)) } else { - ReaderState::Exhausted(Metrics::default()) + ReaderState::Exhausted(ReaderMetrics::default()) }; Ok(ParquetReader { @@ -749,7 +767,7 @@ pub struct RowGroupReader { /// Buffered batches to return. batches: VecDeque, /// Local scan metrics. - metrics: Metrics, + metrics: ReaderMetrics, } impl RowGroupReader { @@ -759,10 +777,15 @@ impl RowGroupReader { context, reader, batches: VecDeque::new(), - metrics: Metrics::default(), + metrics: ReaderMetrics::default(), } } + /// Gets the metrics and consume the reader. + pub(crate) fn metrics(&self) -> &ReaderMetrics { + &self.metrics + } + /// Resets the parquet reader. fn reset_reader(&mut self, reader: ParquetRecordBatchReader) { self.reader = reader;