Skip to content

Commit

Permalink
chore: add metris for memtable read path (GreptimeTeam#3397)
Browse files Browse the repository at this point in the history
* chore: add metris for read path

* chore: add more metrics
  • Loading branch information
v0y4g3r authored Feb 28, 2024
1 parent b97f957 commit 7942b8f
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 38 deletions.
111 changes: 86 additions & 25 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::cmp::{Ordering, Reverse};
use std::fmt::{Debug, Formatter};
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};

use bytes::Bytes;
use datatypes::arrow;
Expand Down Expand Up @@ -46,6 +47,7 @@ use crate::error::Result;
use crate::memtable::key_values::KeyValue;
use crate::memtable::merge_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
use crate::memtable::merge_tree::PkIndex;
use crate::metrics::{MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, MERGE_TREE_READ_STAGE_ELAPSED};

const PK_INDEX_COLUMN_NAME: &str = "__pk_index";

Expand Down Expand Up @@ -255,16 +257,21 @@ impl DataBuffer {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
let batch = read_data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
self.dedup,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
false,
)?;
let batch = {
let _timer = MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_buffer_to_batch"])
.start_timer();
read_data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
self.dedup,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
false,
)?
};
DataBufferReader::new(batch)
}

Expand Down Expand Up @@ -493,6 +500,15 @@ pub(crate) struct DataBufferReader {
batch: RecordBatch,
offset: usize,
current_range: Option<DataBatchRange>,
elapsed_time: Duration,
}

impl Drop for DataBufferReader {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_buffer"])
.observe(self.elapsed_time.as_secs_f64())
}
}

impl DataBufferReader {
Expand All @@ -501,6 +517,7 @@ impl DataBufferReader {
batch,
offset: 0,
current_range: None,
elapsed_time: Duration::default(),
};
reader.next()?; // fill data batch for comparison and merge.
Ok(reader)
Expand All @@ -527,6 +544,7 @@ impl DataBufferReader {
self.current_range = None;
return Ok(());
}
let start = Instant::now();
let pk_index_array = pk_index_array(&self.batch);
if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
self.offset = range.end;
Expand All @@ -538,6 +556,7 @@ impl DataBufferReader {
} else {
self.current_range = None;
}
self.elapsed_time += start.elapsed();
Ok(())
}
}
Expand Down Expand Up @@ -741,18 +760,30 @@ impl<'a> DataPartEncoder<'a> {

pub fn write(self, source: &mut DataBuffer) -> Result<DataPart> {
let mut bytes = Vec::with_capacity(1024);
let rb = drain_data_buffer_to_record_batches(
self.schema.clone(),
source,
self.pk_weights,
self.dedup,
self.replace_pk_index,
)?;
let mut writer =
ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
.context(error::EncodeMemtableSnafu)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;

let rb = {
let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
.with_label_values(&["drain_data_buffer_to_batch"])
.start_timer();
drain_data_buffer_to_record_batches(
self.schema.clone(),
source,
self.pk_weights,
self.dedup,
self.replace_pk_index,
)?
};

{
let _timer = MERGE_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
.with_label_values(&["encode"])
.start_timer();
let mut writer =
ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
.context(error::EncodeMemtableSnafu)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
}
Ok(DataPart::Parquet(ParquetPart {
data: Bytes::from(bytes),
}))
Expand Down Expand Up @@ -783,6 +814,15 @@ pub struct DataPartReader {
inner: ParquetRecordBatchReader,
current_batch: Option<RecordBatch>,
current_range: Option<DataBatchRange>,
elapsed: Duration,
}

impl Drop for DataPartReader {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_part"])
.observe(self.elapsed.as_secs_f64());
}
}

impl Debug for DataPartReader {
Expand All @@ -805,6 +845,7 @@ impl DataPartReader {
inner: parquet_reader,
current_batch: None,
current_range: None,
elapsed: Default::default(),
};
reader.next()?;
Ok(reader)
Expand All @@ -827,6 +868,7 @@ impl DataPartReader {
}

pub(crate) fn next(&mut self) -> Result<()> {
let start = Instant::now();
if let Some((next_pk, range)) = self.search_next_pk_range() {
// first try to search next pk in current record batch.
self.current_range = Some(DataBatchRange {
Expand All @@ -847,7 +889,7 @@ impl DataPartReader {
self.current_range = None;
}
}

self.elapsed += start.elapsed();
Ok(())
}

Expand Down Expand Up @@ -901,6 +943,10 @@ impl DataParts {
/// The returned iterator yields a record batch of one primary key at a time.
/// The order of yielding primary keys is determined by provided weights.
pub fn read(&self) -> Result<DataPartsReader> {
let _timer = MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["build_data_parts_reader"])
.start_timer();

let mut nodes = Vec::with_capacity(self.frozen.len() + 1);
nodes.push(DataNode::new(DataSource::Buffer(
// `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
Expand All @@ -911,7 +957,10 @@ impl DataParts {
nodes.push(DataNode::new(DataSource::Part(p.read()?)));
}
let merger = Merger::try_new(nodes)?;
Ok(DataPartsReader { merger })
Ok(DataPartsReader {
merger,
elapsed: Default::default(),
})
}

pub(crate) fn is_empty(&self) -> bool {
Expand All @@ -922,6 +971,15 @@ impl DataParts {
/// Reader for all parts inside a `DataParts`.
pub struct DataPartsReader {
merger: Merger<DataNode>,
elapsed: Duration,
}

impl Drop for DataPartsReader {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["read_data_parts"])
.observe(self.elapsed.as_secs_f64())
}
}

impl DataPartsReader {
Expand All @@ -931,7 +989,10 @@ impl DataPartsReader {
}

pub(crate) fn next(&mut self) -> Result<()> {
self.merger.next()
let start = Instant::now();
let result = self.merger.next();
self.elapsed += start.elapsed();
result
}

pub(crate) fn is_valid(&self) -> bool {
Expand Down
58 changes: 49 additions & 9 deletions src/mito2/src/memtable/merge_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

use api::v1::SemanticType;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::tracing::log;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
Expand All @@ -35,6 +37,7 @@ use crate::memtable::merge_tree::shard::{
};
use crate::memtable::merge_tree::shard_builder::ShardBuilder;
use crate::memtable::merge_tree::{MergeTreeConfig, PkId};
use crate::metrics::MERGE_TREE_READ_STAGE_ELAPSED;
use crate::read::{Batch, BatchBuilder};
use crate::row_converter::{McmpRowCodec, RowCodec};

Expand Down Expand Up @@ -110,8 +113,8 @@ impl Partition {
let inner = self.inner.read().unwrap();
let mut nodes = Vec::with_capacity(inner.shards.len() + 1);
if !inner.shard_builder.is_empty() {
let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader)));
let builder_reader = inner.shard_builder.read(&mut context.pk_weights)?;
nodes.push(ShardNode::new(ShardSource::Builder(builder_reader)));
}
for shard in &inner.shards {
if !shard.is_empty() {
Expand All @@ -122,7 +125,7 @@ impl Partition {
nodes
};

// Creating a shard merger will invoke next so we do it outside of the lock.
// Creating a shard merger will invoke next so we do it outside the lock.
let merger = ShardMerger::try_new(nodes)?;
if self.dedup {
let source = DedupReader::try_new(merger)?;
Expand Down Expand Up @@ -234,6 +237,15 @@ pub(crate) struct PartitionStats {
pub(crate) shared_memory_size: usize,
}

#[derive(Default)]
struct PartitionReaderMetrics {
prune_pk: Duration,
read_source: Duration,
data_batch_to_batch: Duration,
keys_before_pruning: usize,
keys_after_pruning: usize,
}

/// Reader to scan rows in a partition.
///
/// It can merge rows from multiple shards.
Expand Down Expand Up @@ -266,30 +278,35 @@ impl PartitionReader {
/// # Panics
/// Panics if the reader is invalid.
pub fn next(&mut self) -> Result<()> {
let read_source = Instant::now();
self.source.next()?;

self.context.metrics.read_source += read_source.elapsed();
self.prune_batch_by_key()
}

/// Converts current data batch into a [Batch].
///
/// # Panics
/// Panics if the reader is invalid.
pub fn convert_current_batch(&self) -> Result<Batch> {
pub fn convert_current_batch(&mut self) -> Result<Batch> {
let start = Instant::now();
let data_batch = self.source.current_data_batch();
data_batch_to_batch(
let batch = data_batch_to_batch(
&self.context.metadata,
&self.context.projection,
self.source.current_key(),
data_batch,
)
)?;
self.context.metrics.data_batch_to_batch += start.elapsed();
Ok(batch)
}

pub(crate) fn into_context(self) -> ReadPartitionContext {
self.context
}

fn prune_batch_by_key(&mut self) -> Result<()> {
let start = Instant::now();
if self.context.metadata.primary_key.is_empty() || !self.context.need_prune_key {
// Nothing to prune.
return Ok(());
Expand All @@ -305,6 +322,7 @@ impl PartitionReader {
}
}
let key = self.source.current_key().unwrap();
self.context.metrics.keys_before_pruning += 1;
// Prune batch by primary key.
if prune_primary_key(
&self.context.metadata,
Expand All @@ -314,11 +332,12 @@ impl PartitionReader {
) {
// We need this key.
self.last_yield_pk_id = Some(pk_id);
self.context.metrics.keys_after_pruning += 1;
break;
}
self.source.next()?;
}

self.context.metrics.prune_pk += start.elapsed();
Ok(())
}
}
Expand Down Expand Up @@ -384,6 +403,26 @@ pub(crate) struct ReadPartitionContext {
/// Buffer to store pk weights.
pk_weights: Vec<u16>,
need_prune_key: bool,
metrics: PartitionReaderMetrics,
}

impl Drop for ReadPartitionContext {
fn drop(&mut self) {
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_prune_pk"])
.observe(self.metrics.prune_pk.as_secs_f64());
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_read_source"])
.observe(self.metrics.read_source.as_secs_f64());
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["partition_data_batch_to_batch"])
.observe(self.metrics.data_batch_to_batch.as_secs_f64());
log::debug!(
"TreeIter pruning, before: {}, after: {}",
self.metrics.keys_before_pruning,
self.metrics.keys_before_pruning
);
}
}

impl ReadPartitionContext {
Expand All @@ -401,10 +440,11 @@ impl ReadPartitionContext {
filters,
pk_weights: Vec::new(),
need_prune_key,
metrics: Default::default(),
}
}

/// Does filters contains predicate on primary key columns after pruning the
/// Does filter contain predicate on primary key columns after pruning the
/// partition column.
fn need_prune_key(metadata: &RegionMetadataRef, filters: &[SimpleFilterEvaluator]) -> bool {
for filter in filters {
Expand Down
Loading

0 comments on commit 7942b8f

Please sign in to comment.