diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index f1f53315012a..5b6dd7958564 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -17,9 +17,10 @@ use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use api::v1::OpType; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, trace}; use common_time::Timestamp; use datafusion::physical_plan::PhysicalExpr; use datafusion_common::ScalarValue; @@ -47,6 +48,7 @@ use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRef, MemtableStats, }; +use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -319,6 +321,7 @@ impl SeriesSet { pk_schema: primary_key_schema, primary_key_builders, codec: self.codec.clone(), + metrics: Metrics::default(), } } } @@ -346,6 +349,21 @@ fn primary_key_builders( (builders, Arc::new(arrow::datatypes::Schema::new(fields))) } +/// Metrics for reading the memtable. +#[derive(Debug, Default)] +struct Metrics { + /// Total series in the memtable. + total_series: usize, + /// Number of series pruned. + num_pruned_series: usize, + /// Number of rows read. + num_rows: usize, + /// Number of batch read. + num_batches: usize, + /// Duration to scan the memtable. + scan_cost: Duration, +} + struct Iter { metadata: RegionMetadataRef, series: Arc, @@ -355,12 +373,30 @@ struct Iter { pk_schema: arrow::datatypes::SchemaRef, primary_key_builders: Vec>, codec: Arc, + metrics: Metrics, +} + +impl Drop for Iter { + fn drop(&mut self) { + debug!( + "Iter {} time series memtable, metrics: {:?}", + self.metadata.region_id, self.metrics + ); + + READ_ROWS_TOTAL + .with_label_values(&["time_series_memtable"]) + .inc_by(self.metrics.num_rows as u64); + READ_STAGE_ELAPSED + .with_label_values(&["scan_memtable"]) + .observe(self.metrics.scan_cost.as_secs_f64()); + } } impl Iterator for Iter { type Item = Result; fn next(&mut self) -> Option { + let start = Instant::now(); let map = self.series.read().unwrap(); let range = match &self.last_key { None => map.range::, _>(..), @@ -371,7 +407,10 @@ impl Iterator for Iter { // TODO(hl): maybe yield more than one time series to amortize range overhead. for (primary_key, series) in range { + self.metrics.total_series += 1; + let mut series = series.write().unwrap(); + let start = Instant::now(); if !self.predicate.is_empty() && !prune_primary_key( &self.codec, @@ -383,15 +422,23 @@ impl Iterator for Iter { ) { // read next series + self.metrics.num_pruned_series += 1; continue; } self.last_key = Some(primary_key.clone()); let values = series.compact(&self.metadata); - return Some( - values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)), - ); + let batch = + values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)); + + // Update metrics. + self.metrics.num_batches += 1; + self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); + self.metrics.scan_cost += start.elapsed(); + return Some(batch); } + self.metrics.scan_cost += start.elapsed(); + None } } @@ -410,12 +457,7 @@ fn prune_primary_key( } if let Some(rb) = series.pk_cache.as_ref() { - let res = prune_inner(predicate, rb).unwrap_or(true); - debug!( - "Prune primary key: {:?}, predicate: {:?}, res: {:?}", - rb, predicate, res - ); - res + prune_inner(predicate, rb).unwrap_or(true) } else { let rb = match pk_to_record_batch(codec, pk, builders, pk_schema) { Ok(rb) => rb, @@ -425,7 +467,6 @@ fn prune_primary_key( } }; let res = prune_inner(predicate, &rb).unwrap_or(true); - debug!("Prune primary key: {:?}, res: {:?}", rb, res); series.update_pk_cache(rb); res } @@ -452,9 +493,11 @@ fn prune_inner(predicates: &[Arc], primary_key: &RecordBatch) unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key); } }; - debug!( + trace!( "Evaluate primary key {:?} against filter: {:?}, result: {:?}", - primary_key, expr, result + primary_key, + expr, + result ); if !result { return Ok(false); diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 257bb537e50d..7b8aa475da93 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -105,4 +105,21 @@ lazy_static! { /// Counter of failed compaction task. pub static ref COMPACTION_FAILURE_COUNT: IntCounter = register_int_counter!("mito_compaction_failure_total", "mito compaction failure total").unwrap(); + // ------- End of compaction metrics. + + // Query metrics. + /// Timer of different stages in query. + pub static ref READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "mito_read_stage_elapsed", + "mito read stage elapsed", + &[STAGE_LABEL] + ) + .unwrap(); + /// Counter of rows read. + pub static ref READ_ROWS_TOTAL: IntCounterVec = + register_int_counter_vec!("mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap(); + /// Counter of filtered rows during merge. + pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec = + register_int_counter_vec!("mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap(); + // ------- End of query metrics. } diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 187449c01694..ad1f51cf1dc3 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -17,12 +17,15 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; use std::mem; +use std::time::{Duration, Instant}; use async_trait::async_trait; +use common_telemetry::debug; use common_time::Timestamp; use crate::error::Result; use crate::memtable::BoxedBatchIterator; +use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; /// Minimum batch size to output. @@ -51,11 +54,14 @@ pub struct MergeReader { /// Suggested size of each batch. The batch returned by the reader can have more rows than the /// batch size. batch_size: usize, + /// Local metrics. + metrics: Metrics, } #[async_trait] impl BatchReader for MergeReader { async fn next_batch(&mut self) -> Result> { + let start = Instant::now(); while !self.hot.is_empty() && self.batch_merger.num_rows() < self.batch_size { if let Some(current_key) = self.batch_merger.primary_key() { // If the hottest node has a different key, we have finish collecting current key. @@ -68,28 +74,55 @@ impl BatchReader for MergeReader { if self.hot.len() == 1 { // No need to do merge sort if only one batch in the hot heap. self.fetch_batch_from_hottest().await?; + self.metrics.num_fetch_by_batches += 1; } else { // We could only fetch rows that less than the next node from the hottest node. self.fetch_rows_from_hottest().await?; + self.metrics.num_fetch_by_rows += 1; } } if self.batch_merger.is_empty() { // Nothing fetched. + self.metrics.scan_cost += start.elapsed(); + // Update deleted rows num. + self.metrics.num_deleted_rows = self.batch_merger.num_deleted_rows(); Ok(None) } else { - self.batch_merger.merge_batches() + let batch = self.batch_merger.merge_batches()?; + self.metrics.scan_cost += start.elapsed(); + self.metrics.num_output_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); + Ok(batch) } } } +impl Drop for MergeReader { + fn drop(&mut self) { + debug!("Merge reader finished, metrics: {:?}", self.metrics); + + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["dedup"]) + .inc_by(self.metrics.num_duplicate_rows as u64); + MERGE_FILTER_ROWS_TOTAL + .with_label_values(&["delete"]) + .inc_by(self.metrics.num_deleted_rows as u64); + READ_STAGE_ELAPSED + .with_label_values(&["merge"]) + .observe(self.metrics.scan_cost.as_secs_f64()); + } +} + impl MergeReader { /// Creates and initializes a new [MergeReader]. pub async fn new(sources: Vec, batch_size: usize) -> Result { + let start = Instant::now(); + let mut metrics = Metrics::default(); + let mut cold = BinaryHeap::with_capacity(sources.len()); let hot = BinaryHeap::with_capacity(sources.len()); for source in sources { - let node = Node::new(source).await?; + let node = Node::new(source, &mut metrics).await?; if !node.is_eof() { // Ensure `cold` don't have eof nodes. cold.push(node); @@ -101,10 +134,12 @@ impl MergeReader { cold, batch_merger: BatchMerger::new(), batch_size, + metrics, }; // Initializes the reader. reader.refill_hot(); + reader.metrics.scan_cost += start.elapsed(); Ok(reader) } @@ -132,7 +167,7 @@ impl MergeReader { assert_eq!(1, self.hot.len()); let mut hottest = self.hot.pop().unwrap(); - let batch = hottest.fetch_batch().await?; + let batch = hottest.fetch_batch(&mut self.metrics).await?; self.batch_merger.push(batch)?; self.reheap(hottest) } @@ -161,12 +196,12 @@ impl MergeReader { // value directly. match timestamps.binary_search(&next_min_ts.value()) { Ok(pos) => { - // They have duplicate timestamps. Outputs timestamps before the duplciated timestamp. + // They have duplicate timestamps. Outputs timestamps before the duplicated timestamp. // Batch itself doesn't contain duplicate timestamps so timestamps before `pos` // must be less than `next_min_ts`. self.batch_merger.push(top.slice(0, pos))?; // This keep the duplicate timestamp in the node. - top_node.skip_rows(pos).await?; + top_node.skip_rows(pos, &mut self.metrics).await?; // The merge window should contain this timestamp so only nodes in the hot heap // have this timestamp. self.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts) @@ -175,7 +210,7 @@ impl MergeReader { Err(pos) => { // No duplicate timestamp. Outputs timestamp before `pos`. self.batch_merger.push(top.slice(0, pos))?; - top_node.skip_rows(pos).await?; + top_node.skip_rows(pos, &mut self.metrics).await?; self.reheap(top_node)?; } } @@ -211,16 +246,18 @@ impl MergeReader { if max_seq < next_first_seq { // The next node has larger seq. - max_seq_node.skip_rows(1).await?; + max_seq_node.skip_rows(1, &mut self.metrics).await?; + self.metrics.num_duplicate_rows += 1; if !max_seq_node.is_eof() { self.cold.push(max_seq_node); } max_seq_node = next_node; max_seq = next_first_seq; } else { - next_node.skip_rows(1).await?; + next_node.skip_rows(1, &mut self.metrics).await?; + self.metrics.num_duplicate_rows += 1; if !next_node.is_eof() { - // If the next node is + // If the next node has smaller seq, skip that row. self.cold.push(next_node); } } @@ -315,12 +352,33 @@ impl Default for MergeReaderBuilder { } } +/// Metrics for the merge reader. +#[derive(Debug, Default)] +struct Metrics { + /// Total scan cost of the reader. + scan_cost: Duration, + /// Number of times to fetch batches. + num_fetch_by_batches: usize, + /// Number of times to fetch rows. + num_fetch_by_rows: usize, + /// Number of input rows. + num_input_rows: usize, + /// Number of skipped duplicate rows. + num_duplicate_rows: usize, + /// Number of output rows. + num_output_rows: usize, + /// Number of deleted rows. + num_deleted_rows: usize, +} + /// Helper to collect and merge small batches for same primary key. struct BatchMerger { /// Buffered non-empty batches to merge. batches: Vec, /// Number of rows in the batch. num_rows: usize, + /// Number of rows deleted. + num_deleted_rows: usize, } impl BatchMerger { @@ -329,6 +387,7 @@ impl BatchMerger { BatchMerger { batches: Vec::new(), num_rows: 0, + num_deleted_rows: 0, } } @@ -337,6 +396,11 @@ impl BatchMerger { self.num_rows } + /// Returns the number of rows deleted. + fn num_deleted_rows(&self) -> usize { + self.num_deleted_rows + } + /// Returns true if the merger is empty. fn is_empty(&self) -> bool { self.num_rows() == 0 @@ -360,7 +424,9 @@ impl BatchMerger { .map(|b| b.primary_key() == batch.primary_key()) .unwrap_or(true)); + let num_rows = batch.num_rows(); batch.filter_deleted()?; + self.num_deleted_rows += num_rows - batch.num_rows(); if batch.is_empty() { return Ok(()); } @@ -402,9 +468,11 @@ impl Node { /// Initialize a node. /// /// It tries to fetch one batch from the `source`. - async fn new(mut source: Source) -> Result { + async fn new(mut source: Source, metrics: &mut Metrics) -> Result { // Ensures batch is not empty. let current_batch = source.next_batch().await?.map(CompareFirst); + metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0); + Ok(Node { source, current_batch, @@ -437,10 +505,15 @@ impl Node { /// /// # Panics /// Panics if the node has reached EOF. - async fn fetch_batch(&mut self) -> Result { + async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result { let current = self.current_batch.take().unwrap(); // Ensures batch is not empty. self.current_batch = self.source.next_batch().await?.map(CompareFirst); + metrics.num_input_rows += self + .current_batch + .as_ref() + .map(|b| b.0.num_rows()) + .unwrap_or(0); Ok(current.0) } @@ -468,13 +541,14 @@ impl Node { /// /// # Panics /// Panics if the node is EOF. - async fn skip_rows(&mut self, num_to_skip: usize) -> Result<()> { + async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> { let batch = self.current_batch(); debug_assert!(batch.num_rows() >= num_to_skip); + let remaining = batch.num_rows() - num_to_skip; if remaining == 0 { // Nothing remains, we need to fetch next batch to ensure the batch is not empty. - self.fetch_batch().await?; + self.fetch_batch(metrics).await?; } else { debug_assert!(!batch.is_empty()); self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining))); @@ -610,6 +684,10 @@ mod tests { ], ) .await; + + assert_eq!(8, reader.metrics.num_input_rows); + assert_eq!(6, reader.metrics.num_output_rows); + assert_eq!(2, reader.metrics.num_deleted_rows); } #[tokio::test] @@ -722,6 +800,11 @@ mod tests { ], ) .await; + + assert_eq!(11, reader.metrics.num_input_rows); + assert_eq!(7, reader.metrics.num_output_rows); + assert_eq!(2, reader.metrics.num_deleted_rows); + assert_eq!(2, reader.metrics.num_duplicate_rows); } #[tokio::test] @@ -1051,6 +1134,11 @@ mod tests { .push(new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22])) .unwrap(); assert_eq!(2, merger.num_rows()); + merger + .push(new_batch(b"k1", &[3], &[10], &[OpType::Delete], &[23])) + .unwrap(); + assert_eq!(2, merger.num_rows()); + let batch = merger.merge_batches().unwrap().unwrap(); assert_eq!(2, batch.num_rows()); assert_eq!( @@ -1064,5 +1152,6 @@ mod tests { ) ); assert!(merger.is_empty()); + assert_eq!(1, merger.num_deleted_rows()); } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 805e8d8df9e5..9e797e23cc0c 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -15,19 +15,22 @@ //! Sequential scan. use std::sync::Arc; +use std::time::{Duration, Instant}; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_telemetry::debug; use common_time::range::TimestampRange; use snafu::ResultExt; use table::predicate::Predicate; use crate::access_layer::AccessLayerRef; -use crate::cache::CacheManagerRef; +use crate::cache::{CacheManager, CacheManagerRef}; use crate::error::Result; use crate::memtable::MemtableRef; +use crate::metrics::READ_STAGE_ELAPSED; use crate::read::compat::{self, CompatReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; @@ -105,22 +108,27 @@ impl SeqScan { /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { + let start = Instant::now(); // Scans all memtables and SSTs. Builds a merge reader to merge results. let mut reader = self.build_reader().await?; + let mut metrics = Metrics { + scan_cost: start.elapsed(), + }; // Creates a stream to poll the batch reader and convert batch into record batch. let mapper = self.mapper.clone(); let cache_manager = self.cache_manager.clone(); let stream = try_stream! { let cache = cache_manager.as_ref().map(|cache| cache.as_ref()); - while let Some(batch) = reader - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? + while let Some(batch) = + Self::fetch_record_batch(&mut reader, &mapper, cache, &mut metrics).await? { - yield mapper.convert(&batch, cache)?; + yield batch; } + + debug!("Seq scan finished, region_id: {:?}, metrics: {:?}", mapper.metadata().region_id, metrics); + // Update metrics. + READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64()); }; let stream = Box::pin(RecordBatchStreamAdaptor::new( self.mapper.output_schema(), @@ -160,6 +168,39 @@ impl SeqScan { } Ok(Box::new(builder.build().await?)) } + + /// Fetch a batch from the reader and convert it into a record batch. + async fn fetch_record_batch( + reader: &mut dyn BatchReader, + mapper: &ProjectionMapper, + cache: Option<&CacheManager>, + metrics: &mut Metrics, + ) -> common_recordbatch::error::Result> { + let start = Instant::now(); + + let Some(batch) = reader + .next_batch() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + else { + metrics.scan_cost += start.elapsed(); + + return Ok(None); + }; + + let record_batch = mapper.convert(&batch, cache)?; + metrics.scan_cost += start.elapsed(); + + Ok(Some(record_batch)) + } +} + +/// Metrics for [SeqScan]. +#[derive(Debug, Default)] +struct Metrics { + /// Duration to scan data. + scan_cost: Duration, } #[cfg(test)] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 254ecab1c119..a56e140ec033 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -16,9 +16,11 @@ use std::collections::{HashSet, VecDeque}; use std::sync::Arc; +use std::time::{Duration, Instant}; use async_compat::{Compat, CompatExt}; use async_trait::async_trait; +use common_telemetry::debug; use common_time::range::TimestampRange; use datatypes::arrow::record_batch::RecordBatch; use object_store::{ObjectStore, Reader}; @@ -38,6 +40,7 @@ use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu, Result, }; +use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; @@ -112,6 +115,8 @@ impl ParquetReaderBuilder { /// /// This needs to perform IO operation. pub async fn build(&self) -> Result { + let start = Instant::now(); + let file_path = self.file_handle.file_path(&self.file_dir); // Now we create a reader to read the whole file. let reader = self @@ -172,6 +177,7 @@ impl ParquetReaderBuilder { .context(ReadParquetSnafu { path: &file_path })?; let reader_builder = RowGroupReaderBuilder { + file_handle: self.file_handle.clone(), file_path, parquet_meta, file_reader: reader, @@ -179,13 +185,19 @@ impl ParquetReaderBuilder { field_levels, }; + let metrics = Metrics { + read_row_groups: row_groups.len(), + build_cost: start.elapsed(), + ..Default::default() + }; + Ok(ParquetReader { - _file_handle: self.file_handle.clone(), row_groups, read_format, reader_builder, current_reader: None, batches: VecDeque::new(), + metrics, }) } @@ -247,8 +259,29 @@ impl ParquetReaderBuilder { } } +/// Parquet reader metrics. +#[derive(Debug, Default)] +struct Metrics { + /// Number of row groups to read. + read_row_groups: usize, + /// Duration to build the parquet reader. + build_cost: Duration, + /// Duration to scan the reader. + scan_cost: Duration, + /// Number of record batches read. + num_record_batches: usize, + /// Number of batches decoded. + num_batches: usize, + /// Number of rows read. + num_rows: usize, +} + /// Builder to build a [ParquetRecordBatchReader] for a row group. struct RowGroupReaderBuilder { + /// SST file to read. + /// + /// Holds the file handle to avoid the file purge purge it. + file_handle: FileHandle, /// Path of the file. file_path: String, /// Metadata of the parquet file. @@ -294,10 +327,6 @@ impl RowGroupReaderBuilder { /// Parquet batch reader to read our SST format. pub struct ParquetReader { - /// SST file to read. - /// - /// Holds the file handle to avoid the file purge purge it. - _file_handle: FileHandle, /// Indices of row groups to read. row_groups: VecDeque, /// Helper to read record batches. @@ -310,24 +339,60 @@ pub struct ParquetReader { current_reader: Option, /// Buffered batches to return. batches: VecDeque, + /// Local metrics. + metrics: Metrics, } #[async_trait] impl BatchReader for ParquetReader { async fn next_batch(&mut self) -> Result> { + let start = Instant::now(); if let Some(batch) = self.batches.pop_front() { + self.metrics.scan_cost += start.elapsed(); + self.metrics.num_rows += batch.num_rows(); return Ok(Some(batch)); } // We need to fetch next record batch and convert it to batches. let Some(record_batch) = self.fetch_next_record_batch().await? else { + self.metrics.scan_cost += start.elapsed(); return Ok(None); }; + self.metrics.num_record_batches += 1; self.read_format .convert_record_batch(&record_batch, &mut self.batches)?; + self.metrics.num_batches += self.batches.len(); + + let batch = self.batches.pop_front(); + self.metrics.scan_cost += start.elapsed(); + self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0); + Ok(batch) + } +} - Ok(self.batches.pop_front()) +impl Drop for ParquetReader { + fn drop(&mut self) { + debug!( + "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}", + self.reader_builder.file_handle.region_id(), + self.reader_builder.file_handle.file_id(), + self.reader_builder.file_handle.time_range(), + self.metrics.read_row_groups, + self.reader_builder.parquet_meta.num_row_groups(), + self.metrics + ); + + // Report metrics. + READ_STAGE_ELAPSED + .with_label_values(&["build_parquet_reader"]) + .observe(self.metrics.build_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["scan_row_groups"]) + .observe(self.metrics.scan_cost.as_secs_f64()); + READ_ROWS_TOTAL + .with_label_values(&["parquet"]) + .inc_by(self.metrics.num_rows as u64); } }