diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index d53cbd495dd5..dae565fb3baf 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -143,4 +143,34 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + // ------- End of cache metrics. + + // Index metrics. + /// Timer of index application. + pub static ref INDEX_APPLY_COST_TIME: Histogram = register_histogram!( + "index_apply_cost_time", + "index apply cost time", + ) + .unwrap(); + /// Timer of index creation. + pub static ref INDEX_CREATE_COST_TIME: HistogramVec = register_histogram_vec!( + "index_create_cost_time", + "index create cost time", + &[STAGE_LABEL] + ) + .unwrap(); + /// Counter of rows indexed. + pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!( + "index_rows_total", + "index rows total", + ) + .unwrap(); + /// Counter of created index bytes. + pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!( + "index_bytes_total", + "index bytes total", + ) + .unwrap(); + + // ------- End of index metrics. } diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 67568248d837..041ad3086adf 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -26,6 +26,7 @@ use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; use snafu::ResultExt; use crate::error::{OpenDalSnafu, Result}; +use crate::metrics::INDEX_APPLY_COST_TIME; use crate::sst::file::FileId; use crate::sst::index::INDEX_BLOB_TYPE; use crate::sst::location; @@ -52,6 +53,8 @@ impl SstIndexApplier { } pub async fn apply(&self, file_id: FileId) -> Result> { + let _timer = INDEX_APPLY_COST_TIME.start_timer(); + let file_path = location::index_file_path(&self.region_dir, &file_id); let file_reader = self diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 4b374979a95a..5f9c0042f370 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -12,20 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod statistics; +mod temp_provider; + use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::Arc; -use async_trait::async_trait; -use common_error::ext::BoxedError; use common_telemetry::warn; -use futures::{AsyncRead, AsyncWrite}; -use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider; use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; -use index::inverted_index::error as index_error; -use index::inverted_index::error::Result as IndexResult; use index::inverted_index::format::writer::InvertedIndexBlobWriter; use object_store::ObjectStore; use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; @@ -38,6 +35,8 @@ use crate::error::{OpenDalSnafu, PushIndexValueSnafu, Result}; use crate::read::Batch; use crate::sst::file::FileId; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::creator::statistics::Statistics; +use crate::sst::index::creator::temp_provider::TempFileProvider; use crate::sst::index::{ INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB, }; @@ -57,7 +56,7 @@ pub struct SstIndexCreator { temp_file_provider: Arc, value_buf: Vec, - row_count: RowCount, + stats: Statistics, } impl SstIndexCreator { @@ -69,10 +68,10 @@ impl SstIndexCreator { memory_usage_threshold: Option, row_group_size: NonZeroUsize, ) -> Self { - let temp_file_provider = Arc::new(TempFileProvider { - location: IntermediateLocation::new(®ion_dir, &sst_file_id), - object_store: object_store.clone(), - }); + let temp_file_provider = Arc::new(TempFileProvider::new( + IntermediateLocation::new(®ion_dir, &sst_file_id), + object_store.clone(), + )); let memory_usage_threshold = memory_usage_threshold.map(|threshold| { (threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD) }); @@ -89,7 +88,7 @@ impl SstIndexCreator { index_creator, temp_file_provider, value_buf: vec![], - row_count: 0, + stats: Statistics::default(), } } @@ -103,7 +102,7 @@ impl SstIndexCreator { if let Err(err) = self.do_cleanup().await { let region_dir = &self.region_dir; let sst_file_id = &self.sst_file_id; - warn!("Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}, error: {err}"); + warn!(err; "Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}"); } return Err(err); } @@ -112,8 +111,8 @@ impl SstIndexCreator { } pub async fn finish(&mut self) -> Result<(RowCount, ByteCount)> { - if self.row_count == 0 { - // Everything is clean, no IO is performed. + if self.stats.row_count() == 0 { + // no IO is performed, no garbage to clean up, just return return Ok((0, 0)); } @@ -124,15 +123,17 @@ impl SstIndexCreator { if let Err(err) = cleanup_res { let region_dir = &self.region_dir; let sst_file_id = &self.sst_file_id; - warn!("Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}, error: {err}"); + warn!(err; "Failed to clean up index creator, region_dir: {region_dir}, sst_file_id: {sst_file_id}"); } - finish_res.map(|bytes| (self.row_count, bytes)) + finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count())) } async fn do_update(&mut self, batch: &Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + let n = batch.num_rows(); - self.row_count += n; + guard.inc_row_count(n); for (column_name, field, value) in self.codec.decode(batch.primary_key())? { if let Some(value) = value.as_ref() { self.value_buf.clear(); @@ -149,7 +150,9 @@ impl SstIndexCreator { Ok(()) } - async fn do_finish(&mut self) -> Result { + async fn do_finish(&mut self) -> Result<()> { + let mut guard = self.stats.record_finish(); + let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id); let writer = self .object_store @@ -175,71 +178,13 @@ impl SstIndexCreator { source.unwrap(); sink.unwrap(); - Ok(puffin_writer.finish().await.unwrap()) + let byte_count = puffin_writer.finish().await.unwrap(); + guard.inc_byte_count(byte_count); + Ok(()) } async fn do_cleanup(&mut self) -> Result<()> { + let _guard = self.stats.record_cleanup(); self.temp_file_provider.cleanup().await } } - -struct TempFileProvider { - location: IntermediateLocation, - object_store: ObjectStore, -} - -#[async_trait] -impl ExternalTempFileProvider for TempFileProvider { - async fn create( - &self, - column_name: &str, - file_id: &str, - ) -> IndexResult> { - let path = self.location.file_path(column_name, file_id); - let writer = self - .object_store - .writer(&path) - .await - .context(OpenDalSnafu) - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - Ok(Box::new(writer)) - } - - async fn read_all( - &self, - column_name: &str, - ) -> IndexResult>> { - let dir = self.location.column_dir(column_name); - let entries = self - .object_store - .list(&dir) - .await - .context(OpenDalSnafu) - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - let mut readers = Vec::with_capacity(entries.len()); - - for entry in entries { - let reader = self - .object_store - .reader(entry.path()) - .await - .context(OpenDalSnafu) - .map_err(BoxedError::new) - .context(index_error::ExternalSnafu)?; - readers.push(Box::new(reader) as _); - } - - Ok(readers) - } -} - -impl TempFileProvider { - async fn cleanup(&self) -> Result<()> { - self.object_store - .remove_all(self.location.root_dir()) - .await - .context(OpenDalSnafu) - } -} diff --git a/src/mito2/src/sst/index/creator/statistics.rs b/src/mito2/src/sst/index/creator/statistics.rs new file mode 100644 index 000000000000..70016290a5b1 --- /dev/null +++ b/src/mito2/src/sst/index/creator/statistics.rs @@ -0,0 +1,118 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::{Duration, Instant}; + +use crate::metrics::{INDEX_CREATE_BYTES_TOTAL, INDEX_CREATE_COST_TIME, INDEX_CREATE_ROWS_TOTAL}; + +enum Stage { + Update, + Finish, + Cleanup, +} + +#[derive(Default)] +pub(crate) struct Statistics { + update_cost: Duration, + finish_cost: Duration, + cleanup_cost: Duration, + row_count: usize, + byte_count: usize, +} + +impl Statistics { + pub fn record_update(&mut self) -> TimerGuard<'_> { + TimerGuard::new(self, Stage::Update) + } + + pub fn record_finish(&mut self) -> TimerGuard<'_> { + TimerGuard::new(self, Stage::Finish) + } + + pub fn record_cleanup(&mut self) -> TimerGuard<'_> { + TimerGuard::new(self, Stage::Cleanup) + } + + pub fn row_count(&self) -> usize { + self.row_count + } + + pub fn byte_count(&self) -> usize { + self.byte_count + } + + fn flush(&self) { + INDEX_CREATE_COST_TIME + .with_label_values(&["update"]) + .observe(self.update_cost.as_secs_f64()); + INDEX_CREATE_COST_TIME + .with_label_values(&["finish"]) + .observe(self.finish_cost.as_secs_f64()); + INDEX_CREATE_COST_TIME + .with_label_values(&["cleanup"]) + .observe(self.cleanup_cost.as_secs_f64()); + INDEX_CREATE_COST_TIME + .with_label_values(&["total"]) + .observe((self.update_cost + self.finish_cost + self.cleanup_cost).as_secs_f64()); + + INDEX_CREATE_ROWS_TOTAL.inc_by(self.row_count as _); + INDEX_CREATE_BYTES_TOTAL.inc_by(self.byte_count as _); + } +} + +impl Drop for Statistics { + fn drop(&mut self) { + self.flush(); + } +} + +pub(crate) struct TimerGuard<'a> { + stats: &'a mut Statistics, + stage: Stage, + timer: Instant, +} + +impl<'a> TimerGuard<'a> { + fn new(stats: &'a mut Statistics, stage: Stage) -> Self { + Self { + stats, + stage, + timer: Instant::now(), + } + } + + pub fn inc_row_count(&mut self, n: usize) { + self.stats.row_count += n; + } + + pub fn inc_byte_count(&mut self, n: usize) { + self.stats.byte_count += n; + } +} + +impl Drop for TimerGuard<'_> { + fn drop(&mut self) { + match self.stage { + Stage::Update => { + self.stats.update_cost += self.timer.elapsed(); + } + Stage::Finish => { + self.stats.finish_cost += self.timer.elapsed(); + } + Stage::Cleanup => { + self.stats.cleanup_cost += self.timer.elapsed(); + } + } + } +} diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/creator/temp_provider.rs new file mode 100644 index 000000000000..2148a19af9bd --- /dev/null +++ b/src/mito2/src/sst/index/creator/temp_provider.rs @@ -0,0 +1,99 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use common_telemetry::warn; +use futures::{AsyncRead, AsyncWrite}; +use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider; +use index::inverted_index::error as index_error; +use index::inverted_index::error::Result as IndexResult; +use object_store::ObjectStore; +use snafu::ResultExt; + +use crate::error::{OpenDalSnafu, Result}; +use crate::sst::location::IntermediateLocation; + +pub(crate) struct TempFileProvider { + location: IntermediateLocation, + object_store: ObjectStore, +} + +#[async_trait] +impl ExternalTempFileProvider for TempFileProvider { + async fn create( + &self, + column_name: &str, + file_id: &str, + ) -> IndexResult> { + let path = self.location.file_path(column_name, file_id); + let writer = self + .object_store + .writer(&path) + .await + .context(OpenDalSnafu) + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + Ok(Box::new(writer)) + } + + async fn read_all( + &self, + column_name: &str, + ) -> IndexResult>> { + let dir = self.location.column_dir(column_name); + let entries = self + .object_store + .list(&dir) + .await + .context(OpenDalSnafu) + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + let mut readers = Vec::with_capacity(entries.len()); + + for entry in entries { + if entry.metadata().is_dir() { + warn!("Unexpected entry in index temp dir: {:?}", entry.path()); + continue; + } + + let reader = self + .object_store + .reader(entry.path()) + .await + .context(OpenDalSnafu) + .map_err(BoxedError::new) + .context(index_error::ExternalSnafu)?; + readers.push(Box::new(reader) as _); + } + + Ok(readers) + } +} + +impl TempFileProvider { + pub fn new(location: IntermediateLocation, object_store: ObjectStore) -> Self { + Self { + location, + object_store, + } + } + + pub async fn cleanup(&self) -> Result<()> { + self.object_store + .remove_all(self.location.root_dir()) + .await + .context(OpenDalSnafu) + } +}