diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index 82bf5c8440d0..e3214ee71abc 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -18,6 +18,7 @@ use std::cmp::{Ordering, Reverse}; use std::fmt::{Debug, Formatter}; use std::ops::Range; use std::sync::Arc; +use std::time::{Duration, Instant}; use bytes::Bytes; use datatypes::arrow; @@ -46,6 +47,7 @@ use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger}; use crate::memtable::merge_tree::PkIndex; +use crate::metrics::{MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, MERGE_TREE_READ_STAGE_ELAPSED}; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; @@ -255,16 +257,21 @@ impl DataBuffer { /// If pk_weights is present, yielded rows are sorted according to weights, /// otherwise rows are sorted by "pk_weights" values as they are actually weights. pub fn read(&self, pk_weights: Option<&[u16]>) -> Result { - let batch = read_data_buffer_to_record_batches( - self.data_part_schema.clone(), - self, - pk_weights, - self.dedup, - // replace_pk_index is always set to false since: - // - for DataBuffer in ShardBuilder, pk dict is not frozen - // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`. - false, - )?; + let batch = { + let _timer = MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["read_data_buffer_to_batch"]) + .start_timer(); + read_data_buffer_to_record_batches( + self.data_part_schema.clone(), + self, + pk_weights, + self.dedup, + // replace_pk_index is always set to false since: + // - for DataBuffer in ShardBuilder, pk dict is not frozen + // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`. + false, + )? + }; DataBufferReader::new(batch) } @@ -493,6 +500,15 @@ pub(crate) struct DataBufferReader { batch: RecordBatch, offset: usize, current_range: Option, + elapsed_time: Duration, +} + +impl Drop for DataBufferReader { + fn drop(&mut self) { + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["read_data_buffer"]) + .observe(self.elapsed_time.as_secs_f64()) + } } impl DataBufferReader { @@ -501,6 +517,7 @@ impl DataBufferReader { batch, offset: 0, current_range: None, + elapsed_time: Duration::default(), }; reader.next()?; // fill data batch for comparison and merge. Ok(reader) @@ -527,6 +544,7 @@ impl DataBufferReader { self.current_range = None; return Ok(()); } + let start = Instant::now(); let pk_index_array = pk_index_array(&self.batch); if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) { self.offset = range.end; @@ -538,6 +556,7 @@ impl DataBufferReader { } else { self.current_range = None; } + self.elapsed_time += start.elapsed(); Ok(()) } } @@ -741,18 +760,30 @@ impl<'a> DataPartEncoder<'a> { pub fn write(self, source: &mut DataBuffer) -> Result { let mut bytes = Vec::with_capacity(1024); - let rb = drain_data_buffer_to_record_batches( - self.schema.clone(), - source, - self.pk_weights, - self.dedup, - self.replace_pk_index, - )?; - let mut writer = - ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props())) - .context(error::EncodeMemtableSnafu)?; - writer.write(&rb).context(error::EncodeMemtableSnafu)?; - let _metadata = writer.close().context(error::EncodeMemtableSnafu)?; + + let rb = { + let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED + .with_label_values(&["drain_data_buffer_to_batch"]) + .start_timer(); + drain_data_buffer_to_record_batches( + self.schema.clone(), + source, + self.pk_weights, + self.dedup, + self.replace_pk_index, + )? + }; + + { + let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED + .with_label_values(&["encode"]) + .start_timer(); + let mut writer = + ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props())) + .context(error::EncodeMemtableSnafu)?; + writer.write(&rb).context(error::EncodeMemtableSnafu)?; + let _metadata = writer.close().context(error::EncodeMemtableSnafu)?; + } Ok(DataPart::Parquet(ParquetPart { data: Bytes::from(bytes), })) @@ -783,6 +814,15 @@ pub struct DataPartReader { inner: ParquetRecordBatchReader, current_batch: Option, current_range: Option, + elapsed: Duration, +} + +impl Drop for DataPartReader { + fn drop(&mut self) { + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["read_data_part"]) + .observe(self.elapsed.as_secs_f64()); + } } impl Debug for DataPartReader { @@ -805,6 +845,7 @@ impl DataPartReader { inner: parquet_reader, current_batch: None, current_range: None, + elapsed: Default::default(), }; reader.next()?; Ok(reader) @@ -827,6 +868,7 @@ impl DataPartReader { } pub(crate) fn next(&mut self) -> Result<()> { + let start = Instant::now(); if let Some((next_pk, range)) = self.search_next_pk_range() { // first try to search next pk in current record batch. self.current_range = Some(DataBatchRange { @@ -847,7 +889,7 @@ impl DataPartReader { self.current_range = None; } } - + self.elapsed += start.elapsed(); Ok(()) } @@ -901,6 +943,10 @@ impl DataParts { /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. pub fn read(&self) -> Result { + let _timer = MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["build_data_parts_reader"]) + .start_timer(); + let mut nodes = Vec::with_capacity(self.frozen.len() + 1); nodes.push(DataNode::new(DataSource::Buffer( // `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights. @@ -911,7 +957,10 @@ impl DataParts { nodes.push(DataNode::new(DataSource::Part(p.read()?))); } let merger = Merger::try_new(nodes)?; - Ok(DataPartsReader { merger }) + Ok(DataPartsReader { + merger, + elapsed: Default::default(), + }) } pub(crate) fn is_empty(&self) -> bool { @@ -922,6 +971,15 @@ impl DataParts { /// Reader for all parts inside a `DataParts`. pub struct DataPartsReader { merger: Merger, + elapsed: Duration, +} + +impl Drop for DataPartsReader { + fn drop(&mut self) { + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["read_data_parts"]) + .observe(self.elapsed.as_secs_f64()) + } } impl DataPartsReader { @@ -931,7 +989,10 @@ impl DataPartsReader { } pub(crate) fn next(&mut self) -> Result<()> { - self.merger.next() + let start = Instant::now(); + let result = self.merger.next(); + self.elapsed += start.elapsed(); + result } pub(crate) fn is_valid(&self) -> bool { diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index e32381c4f5a7..28df85ba7aa0 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -18,9 +18,11 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; +use common_telemetry::tracing::log; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::storage::ColumnId; @@ -35,6 +37,7 @@ use crate::memtable::merge_tree::shard::{ }; use crate::memtable::merge_tree::shard_builder::ShardBuilder; use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; +use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; use crate::read::{Batch, BatchBuilder}; use crate::row_converter::{McmpRowCodec, RowCodec}; @@ -110,8 +113,8 @@ impl Partition { let inner = self.inner.read().unwrap(); let mut nodes = Vec::with_capacity(inner.shards.len() + 1); if !inner.shard_builder.is_empty() { - let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?; - nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader))); + let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?; + nodes.push(ShardNode::new(ShardSource::Builder(builder_reader))); } for shard in &inner.shards { if !shard.is_empty() { @@ -122,7 +125,7 @@ impl Partition { nodes }; - // Creating a shard merger will invoke next so we do it outside of the lock. + // Creating a shard merger will invoke next so we do it outside the lock. let merger = ShardMerger::try_new(nodes)?; if self.dedup { let source = DedupReader::try_new(merger)?; @@ -234,6 +237,15 @@ pub(crate) struct PartitionStats { pub(crate) shared_memory_size: usize, } +#[derive(Default)] +struct PartitionReaderMetrics { + prune_pk: Duration, + read_source: Duration, + data_batch_to_batch: Duration, + keys_before_pruning: usize, + keys_after_pruning: usize, +} + /// Reader to scan rows in a partition. /// /// It can merge rows from multiple shards. @@ -266,8 +278,9 @@ impl PartitionReader { /// # Panics /// Panics if the reader is invalid. pub fn next(&mut self) -> Result<()> { + let read_source = Instant::now(); self.source.next()?; - + self.context.metrics.read_source += read_source.elapsed(); self.prune_batch_by_key() } @@ -275,14 +288,17 @@ impl PartitionReader { /// /// # Panics /// Panics if the reader is invalid. - pub fn convert_current_batch(&self) -> Result { + pub fn convert_current_batch(&mut self) -> Result { + let start = Instant::now(); let data_batch = self.source.current_data_batch(); - data_batch_to_batch( + let batch = data_batch_to_batch( &self.context.metadata, &self.context.projection, self.source.current_key(), data_batch, - ) + )?; + self.context.metrics.data_batch_to_batch += start.elapsed(); + Ok(batch) } pub(crate) fn into_context(self) -> ReadPartitionContext { @@ -290,6 +306,7 @@ impl PartitionReader { } fn prune_batch_by_key(&mut self) -> Result<()> { + let start = Instant::now(); if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key { // Nothing to prune. return Ok(()); @@ -305,6 +322,7 @@ impl PartitionReader { } } let key = self.source.current_key().unwrap(); + self.context.metrics.keys_before_pruning += 1; // Prune batch by primary key. if prune_primary_key( &self.context.metadata, @@ -314,11 +332,12 @@ impl PartitionReader { ) { // We need this key. self.last_yield_pk_id = Some(pk_id); + self.context.metrics.keys_after_pruning += 1; break; } self.source.next()?; } - + self.context.metrics.prune_pk += start.elapsed(); Ok(()) } } @@ -384,6 +403,26 @@ pub(crate) struct ReadPartitionContext { /// Buffer to store pk weights. pk_weights: Vec, need_prune_key: bool, + metrics: PartitionReaderMetrics, +} + +impl Drop for ReadPartitionContext { + fn drop(&mut self) { + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["partition_prune_pk"]) + .observe(self.metrics.prune_pk.as_secs_f64()); + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["partition_read_source"]) + .observe(self.metrics.read_source.as_secs_f64()); + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["partition_data_batch_to_batch"]) + .observe(self.metrics.data_batch_to_batch.as_secs_f64()); + log::debug!( + "TreeIter pruning, before: {}, after: {}", + self.metrics.keys_before_pruning, + self.metrics.keys_before_pruning + ); + } } impl ReadPartitionContext { @@ -401,10 +440,11 @@ impl ReadPartitionContext { filters, pk_weights: Vec::new(), need_prune_key, + metrics: Default::default(), } } - /// Does filters contains predicate on primary key columns after pruning the + /// Does filter contain predicate on primary key columns after pruning the /// partition column. fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool { for filter in filters { diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index f20a38fe672d..c2185f2d3517 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -28,6 +28,7 @@ use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder}; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId}; +use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; /// Builder to write keys and data to a shard that the key dictionary /// is still active. @@ -125,10 +126,21 @@ impl ShardBuilder { /// Scans the shard builder. pub fn read(&self, pk_weights_buffer: &mut Vec) -> Result { - let dict_reader = self.dict_builder.read(); - dict_reader.pk_weights_to_sort_data(pk_weights_buffer); - let data_reader = self.data_buffer.read(Some(pk_weights_buffer))?; + let dict_reader = { + let _timer = MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["shard_builder_read_pk"]) + .start_timer(); + self.dict_builder.read() + }; + { + let _timer = MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["sort_pk"]) + .start_timer(); + dict_reader.pk_weights_to_sort_data(pk_weights_buffer); + } + + let data_reader = self.data_buffer.read(Some(pk_weights_buffer))?; Ok(ShardBuilderReader { shard_id: self.current_shard_id, dict_reader, diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 6472af49973a..84c5fb09dfb3 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -16,9 +16,11 @@ use std::collections::{BTreeMap, HashSet, VecDeque}; use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; use api::v1::OpType; use common_recordbatch::filter::SimpleFilterEvaluator; +use common_telemetry::tracing::log; use common_time::Timestamp; use datafusion_common::ScalarValue; use snafu::ensure; @@ -35,6 +37,7 @@ use crate::memtable::merge_tree::partition::{ }; use crate::memtable::merge_tree::MergeTreeConfig; use crate::memtable::{BoxedBatchIterator, KeyValues}; +use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -153,6 +156,7 @@ impl MergeTree { let mut iter = TreeIter { partitions, current_reader: None, + metrics: Default::default(), }; let context = ReadPartitionContext::new( self.metadata.clone(), @@ -313,9 +317,30 @@ impl MergeTree { } } +#[derive(Default)] +struct TreeIterMetrics { + fetch_partition_elapsed: Duration, + rows_fetched: usize, + batches_fetched: usize, +} + struct TreeIter { partitions: VecDeque, current_reader: Option, + metrics: TreeIterMetrics, +} + +impl Drop for TreeIter { + fn drop(&mut self) { + MERGE_TREE_READ_STAGE_ELAPSED + .with_label_values(&["fetch_next_partition"]) + .observe(self.metrics.fetch_partition_elapsed.as_secs_f64()); + log::debug!( + "TreeIter rows fetched: {}, batches fetched: {}", + self.metrics.rows_fetched, + self.metrics.batches_fetched + ); + } } impl Iterator for TreeIter { @@ -329,6 +354,7 @@ impl Iterator for TreeIter { impl TreeIter { /// Fetch next partition. fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> { + let start = Instant::now(); while let Some(partition) = self.partitions.pop_front() { let part_reader = partition.read(context)?; if !part_reader.is_valid() { @@ -338,7 +364,7 @@ impl TreeIter { self.current_reader = Some(part_reader); break; } - + self.metrics.fetch_partition_elapsed += start.elapsed(); Ok(()) } @@ -360,6 +386,8 @@ impl TreeIter { let context = part_reader.into_context(); self.fetch_next_partition(context)?; + self.metrics.rows_fetched += batch.num_rows(); + self.metrics.batches_fetched += 1; Ok(Some(batch)) } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index b25fd393bdb4..83d8f87016f9 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -254,4 +254,24 @@ lazy_static! { pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL .with_label_values(&["flush", "intermediate"]); // ------- End of index metrics. + + /// Merge tree memtable data buffer freeze metrics + pub static ref MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_merge_tree_buffer_freeze_stage_elapsed", + "mito merge tree data buffer freeze stage elapsed", + &[STAGE_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] + ) + .unwrap(); + + /// Merge tree memtable read path metrics + pub static ref MERGE_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_merge_tree_read_stage_elapsed", + "mito merge tree read stage elapsed", + &[STAGE_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] + ) + .unwrap(); + + // ------- End of merge tree memtable metrics. }