Skip to content

Commit

Permalink
chore: reader metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed May 20, 2024
1 parent 19b1fa3 commit b9eb13f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 12 deletions.
11 changes: 9 additions & 2 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::read::compat::CompatBatch;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{ScanInput, ScanPart};
use crate::read::Source;
use crate::sst::parquet::reader::ReaderMetrics;

/// Scans a region without providing any output ordering guarantee.
///
Expand Down Expand Up @@ -192,8 +193,8 @@ impl RegionScanner for UnorderedScan {
yield batch;
}
}
// TODO(yingwen): metrics.
// Then scans file ranges.
let mut reader_metrics = ReaderMetrics::default();
for file_range in file_ranges {
let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?;
let compat_batch = file_range.compat_batch();
Expand All @@ -203,11 +204,17 @@ impl RegionScanner for UnorderedScan {
metrics.num_rows += batch.num_rows();
yield batch;
}
if let Source::RowGroupReader(reader) = source {
reader_metrics.merge_from(reader.metrics());
}
}

metrics.total_cost = query_start.elapsed();
Self::observe_metrics_on_finish(&metrics);
debug!("Unordered scan partition {} finished, region_id: {}, metrics: {:?}", partition, mapper.metadata().region_id, metrics);
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.input.mapper.output_schema(),
Expand Down
43 changes: 33 additions & 10 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl ParquetReaderBuilder {
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;

let mut metrics = Metrics::default();
let mut metrics = ReaderMetrics::default();

let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics)
Expand Down Expand Up @@ -303,7 +303,7 @@ impl ParquetReaderBuilder {
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
metrics: &mut ReaderMetrics,
) -> BTreeMap<usize, Option<RowSelection>> {
let num_row_groups = parquet_meta.num_row_groups();
if num_row_groups == 0 {
Expand All @@ -325,7 +325,7 @@ impl ParquetReaderBuilder {
async fn prune_row_groups_by_inverted_index(
&self,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
metrics: &mut ReaderMetrics,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
let Some(index_applier) = &self.index_applier else {
return None;
Expand Down Expand Up @@ -407,7 +407,7 @@ impl ParquetReaderBuilder {
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut Metrics,
metrics: &mut ReaderMetrics,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
let Some(predicate) = &self.predicate else {
return None;
Expand Down Expand Up @@ -439,7 +439,7 @@ impl ParquetReaderBuilder {

/// Parquet reader metrics.
#[derive(Debug, Default)]
struct Metrics {
pub(crate) struct ReaderMetrics {
/// Number of row groups before filtering.
num_row_groups_before_filtering: usize,
/// Number of row groups filtered by inverted index.
Expand All @@ -464,6 +464,24 @@ struct Metrics {
num_rows: usize,
}

impl ReaderMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
self.build_cost += other.build_cost;
self.scan_cost += other.scan_cost;
self.num_record_batches += other.num_record_batches;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
}
}

/// Builder to build a [ParquetRecordBatchReader] for a row group.
pub(crate) struct RowGroupReaderBuilder {
/// SST file to read.
Expand Down Expand Up @@ -532,12 +550,12 @@ enum ReaderState {
/// The reader is reading a row group.
Readable(RowGroupReader),
/// The reader is exhausted.
Exhausted(Metrics),
Exhausted(ReaderMetrics),
}

impl ReaderState {
/// Returns the metrics of the reader.
fn metrics(&self) -> &Metrics {
fn metrics(&self) -> &ReaderMetrics {
match self {
ReaderState::Readable(reader) => &reader.metrics,
ReaderState::Exhausted(m) => m,
Expand Down Expand Up @@ -719,7 +737,7 @@ impl ParquetReader {
.await?;
ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader))
} else {
ReaderState::Exhausted(Metrics::default())
ReaderState::Exhausted(ReaderMetrics::default())
};

Ok(ParquetReader {
Expand Down Expand Up @@ -749,7 +767,7 @@ pub struct RowGroupReader {
/// Buffered batches to return.
batches: VecDeque<Batch>,
/// Local scan metrics.
metrics: Metrics,
metrics: ReaderMetrics,
}

impl RowGroupReader {
Expand All @@ -759,10 +777,15 @@ impl RowGroupReader {
context,
reader,
batches: VecDeque::new(),
metrics: Metrics::default(),
metrics: ReaderMetrics::default(),
}
}

/// Gets the metrics and consume the reader.
pub(crate) fn metrics(&self) -> &ReaderMetrics {
&self.metrics
}

/// Resets the parquet reader.
fn reset_reader(&mut self, reader: ParquetRecordBatchReader) {
self.reader = reader;
Expand Down

0 comments on commit b9eb13f

Please sign in to comment.