Skip to content

Commit

Permalink
add some metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 29, 2023
1 parent 7420e13 commit 315c77b
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 81 deletions.
30 changes: 30 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,34 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
// ------- End of cache metrics.

// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_COST_TIME: Histogram = register_histogram!(
"index_apply_cost_time",
"index apply cost time",
)
.unwrap();
/// Timer of index creation.
pub static ref INDEX_CREATE_COST_TIME: HistogramVec = register_histogram_vec!(
"index_create_cost_time",
"index create cost time",
&[STAGE_LABEL]
)
.unwrap();
/// Counter of rows indexed.
pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!(
"index_rows_total",
"index rows total",
)
.unwrap();
/// Counter of created index bytes.
pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!(
"index_bytes_total",
"index bytes total",
)
.unwrap();

// ------- End of index metrics.
}
3 changes: 3 additions & 0 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::ResultExt;

use crate::error::{OpenDalSnafu, Result};
use crate::metrics::INDEX_APPLY_COST_TIME;
use crate::sst::file::FileId;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;
Expand All @@ -52,6 +53,8 @@ impl SstIndexApplier {
}

pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<usize>> {
let _timer = INDEX_APPLY_COST_TIME.start_timer();

let file_path = location::index_file_path(&self.region_dir, &file_id);

let file_reader = self
Expand Down
107 changes: 26 additions & 81 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod statistics;
mod temp_provider;

use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::sync::Arc;

use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_telemetry::warn;
use futures::{AsyncRead, AsyncWrite};
use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::error as index_error;
use index::inverted_index::error::Result as IndexResult;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use object_store::ObjectStore;
use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter};
Expand All @@ -38,6 +35,8 @@ use crate::error::{OpenDalSnafu, PushIndexValueSnafu, Result};
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::{
INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB,
};
Expand All @@ -57,7 +56,7 @@ pub struct SstIndexCreator {
temp_file_provider: Arc<TempFileProvider>,
value_buf: Vec<u8>,

row_count: RowCount,
stats: Statistics,
}

impl SstIndexCreator {
Expand All @@ -69,10 +68,10 @@ impl SstIndexCreator {
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
) -> Self {
let temp_file_provider = Arc::new(TempFileProvider {
location: IntermediateLocation::new(&region_dir, &sst_file_id),
object_store: object_store.clone(),
});
let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&region_dir, &sst_file_id),
object_store.clone(),
));
let memory_usage_threshold = memory_usage_threshold.map(|threshold| {
(threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD)
});
Expand All @@ -89,7 +88,7 @@ impl SstIndexCreator {
index_creator,
temp_file_provider,
value_buf: vec![],
row_count: 0,
stats: Statistics::default(),
}
}

Expand All @@ -103,7 +102,7 @@ impl SstIndexCreator {
if let Err(err) = self.do_cleanup().await {
let region_dir = &self.region_dir;
let sst_file_id = &self.sst_file_id;
warn!("Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}, error: {err}");
warn!(err; "Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}");
}
return Err(err);
}
Expand All @@ -112,8 +111,8 @@ impl SstIndexCreator {
}

pub async fn finish(&mut self) -> Result<(RowCount, ByteCount)> {
if self.row_count == 0 {
// Everything is clean, no IO is performed.
if self.stats.row_count() == 0 {
// no IO is performed, no garbage to clean up, just return
return Ok((0, 0));
}

Expand All @@ -124,15 +123,17 @@ impl SstIndexCreator {
if let Err(err) = cleanup_res {
let region_dir = &self.region_dir;
let sst_file_id = &self.sst_file_id;
warn!("Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}, error: {err}");
warn!(err; "Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}");
}

finish_res.map(|bytes| (self.row_count, bytes))
finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
}

async fn do_update(&mut self, batch: &Batch) -> Result<()> {
let mut guard = self.stats.record_update();

let n = batch.num_rows();
self.row_count += n;
guard.inc_row_count(n);
for (column_name, field, value) in self.codec.decode(batch.primary_key())? {
if let Some(value) = value.as_ref() {
self.value_buf.clear();
Expand All @@ -149,7 +150,9 @@ impl SstIndexCreator {
Ok(())
}

async fn do_finish(&mut self) -> Result<ByteCount> {
async fn do_finish(&mut self) -> Result<()> {
let mut guard = self.stats.record_finish();

let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id);
let writer = self
.object_store
Expand All @@ -175,71 +178,13 @@ impl SstIndexCreator {
source.unwrap();
sink.unwrap();

Ok(puffin_writer.finish().await.unwrap())
let byte_count = puffin_writer.finish().await.unwrap();
guard.inc_byte_count(byte_count);
Ok(())
}

async fn do_cleanup(&mut self) -> Result<()> {
let _guard = self.stats.record_cleanup();
self.temp_file_provider.cleanup().await
}
}

struct TempFileProvider {
location: IntermediateLocation,
object_store: ObjectStore,
}

#[async_trait]
impl ExternalTempFileProvider for TempFileProvider {
async fn create(
&self,
column_name: &str,
file_id: &str,
) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.location.file_path(column_name, file_id);
let writer = self
.object_store
.writer(&path)
.await
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
Ok(Box::new(writer))
}

async fn read_all(
&self,
column_name: &str,
) -> IndexResult<Vec<Box<dyn AsyncRead + Unpin + Send>>> {
let dir = self.location.column_dir(column_name);
let entries = self
.object_store
.list(&dir)
.await
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let mut readers = Vec::with_capacity(entries.len());

for entry in entries {
let reader = self
.object_store
.reader(entry.path())
.await
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
readers.push(Box::new(reader) as _);
}

Ok(readers)
}
}

impl TempFileProvider {
async fn cleanup(&self) -> Result<()> {
self.object_store
.remove_all(self.location.root_dir())
.await
.context(OpenDalSnafu)
}
}
118 changes: 118 additions & 0 deletions src/mito2/src/sst/index/creator/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::{Duration, Instant};

use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_COST_TIME, INDEX_CREATE_ROWS_TOTAL};

enum Stage {
Update,
Finish,
Cleanup,
}

#[derive(Default)]
pub(crate) struct Statistics {
update_cost: Duration,
finish_cost: Duration,
cleanup_cost: Duration,
row_count: usize,
byte_count: usize,
}

impl Statistics {
pub fn record_update(&mut self) -> TimerGuard<'_> {
TimerGuard::new(self, Stage::Update)
}

pub fn record_finish(&mut self) -> TimerGuard<'_> {
TimerGuard::new(self, Stage::Finish)
}

pub fn record_cleanup(&mut self) -> TimerGuard<'_> {
TimerGuard::new(self, Stage::Cleanup)
}

pub fn row_count(&self) -> usize {
self.row_count
}

pub fn byte_count(&self) -> usize {
self.byte_count
}

fn flush(&self) {
INDEX_CREATE_COST_TIME
.with_label_values(&["update"])
.observe(self.update_cost.as_secs_f64());
INDEX_CREATE_COST_TIME
.with_label_values(&["finish"])
.observe(self.finish_cost.as_secs_f64());
INDEX_CREATE_COST_TIME
.with_label_values(&["cleanup"])
.observe(self.cleanup_cost.as_secs_f64());
INDEX_CREATE_COST_TIME
.with_label_values(&["total"])
.observe((self.update_cost + self.finish_cost + self.cleanup_cost).as_secs_f64());

INDEX_CREATE_ROWS_TOTAL.inc_by(self.row_count as _);
INDEX_CREATE_BYTES_TOTAL.inc_by(self.byte_count as _);
}
}

impl Drop for Statistics {
fn drop(&mut self) {
self.flush();
}
}

pub(crate) struct TimerGuard<'a> {
stats: &'a mut Statistics,
stage: Stage,
timer: Instant,
}

impl<'a> TimerGuard<'a> {
fn new(stats: &'a mut Statistics, stage: Stage) -> Self {
Self {
stats,
stage,
timer: Instant::now(),
}
}

pub fn inc_row_count(&mut self, n: usize) {
self.stats.row_count += n;
}

pub fn inc_byte_count(&mut self, n: usize) {
self.stats.byte_count += n;
}
}

impl Drop for TimerGuard<'_> {
fn drop(&mut self) {
match self.stage {
Stage::Update => {
self.stats.update_cost += self.timer.elapsed();
}
Stage::Finish => {
self.stats.finish_cost += self.timer.elapsed();
}
Stage::Cleanup => {
self.stats.cleanup_cost += self.timer.elapsed();
}
}
}
}
Loading

0 comments on commit 315c77b

Please sign in to comment.