From 56d1621905cb9496520c426048756a0acee65478 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 29 Dec 2023 10:16:15 +0000 Subject: [PATCH] instrument object store to track its usage Signed-off-by: Zhenchi --- src/mito2/src/sst/index.rs | 2 +- src/mito2/src/sst/index/applier.rs | 19 ++--- src/mito2/src/sst/index/applier/builder.rs | 3 +- src/mito2/src/sst/index/creator.rs | 14 ++-- .../src/sst/index/creator/temp_provider.rs | 23 ++---- .../index/{io_stats.rs => object_store.rs} | 77 +++++++++++++++---- 6 files changed, 85 insertions(+), 53 deletions(-) rename src/mito2/src/sst/index/{io_stats.rs => object_store.rs} (67%) diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 5e3089f7bef1..a7e2159f2c83 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -15,7 +15,7 @@ pub mod applier; mod codec; pub mod creator; -mod io_stats; +mod object_store; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 191763425e4a..3cc04974defb 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -21,31 +21,29 @@ use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ IndexApplier, IndexNotFoundStrategy, SearchContext, }; -use object_store::ObjectStore; use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; -use snafu::ResultExt; -use super::io_stats::InstrumentedAsyncRead; -use crate::error::{OpenDalSnafu, Result}; +use crate::error::Result; use crate::metrics::{ INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL, }; use crate::sst::file::FileId; +use crate::sst::index::object_store::InstrumentedObjectStore; use crate::sst::index::INDEX_BLOB_TYPE; use crate::sst::location; #[derive(Clone)] pub struct SstIndexApplier { region_dir: String, - object_store: ObjectStore, + object_store: InstrumentedObjectStore, index_applier: Arc, } impl SstIndexApplier { - pub fn new( + pub(crate) fn new( region_dir: String, - object_store: ObjectStore, + object_store: InstrumentedObjectStore, index_applier: Arc, ) -> Self { INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); @@ -64,11 +62,8 @@ impl SstIndexApplier { let file_reader = self .object_store - .reader(&file_path) - .await - .context(OpenDalSnafu)?; - let file_reader = InstrumentedAsyncRead::new(file_reader, &INDEX_PUFFIN_READ_BYTES_TOTAL); - + .reader(&file_path, &INDEX_PUFFIN_READ_BYTES_TOTAL) + .await?; let mut puffin_reader = PuffinFileReader::new(file_reader); let file_meta = puffin_reader.metadata().await.unwrap(); diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 959210d00ce0..1c435b46f38e 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -36,6 +36,7 @@ use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, Result}; use crate::row_converter::SortField; use crate::sst::index::applier::SstIndexApplier; use crate::sst::index::codec::IndexValueCodec; +use crate::sst::index::object_store::InstrumentedObjectStore; type ColumnName = String; @@ -73,7 +74,7 @@ impl<'a> SstIndexApplierBuilder<'a> { let applier = PredicatesIndexApplier::try_from(predicates); Ok(Some(SstIndexApplier::new( self.region_dir, - self.object_store, + InstrumentedObjectStore::new(self.object_store), Arc::new(applier.context(BuildIndexApplierSnafu)?), ))) } diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 998c635e61ae..3fe0e09ec7ec 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -31,14 +31,14 @@ use store_api::metadata::RegionMetadataRef; use tokio::io::duplex; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; -use super::io_stats::InstrumentedAsyncWrite; -use crate::error::{OpenDalSnafu, PushIndexValueSnafu, Result}; +use crate::error::{PushIndexValueSnafu, Result}; use crate::metrics::INDEX_PUFFIN_WRITE_BYTES_TOTAL; use crate::read::Batch; use crate::sst::file::FileId; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; use crate::sst::index::creator::statistics::Statistics; use crate::sst::index::creator::temp_provider::TempFileProvider; +use crate::sst::index::object_store::InstrumentedObjectStore; use crate::sst::index::{ INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB, }; @@ -50,7 +50,7 @@ type RowCount = usize; pub struct SstIndexCreator { region_dir: String, sst_file_id: FileId, - object_store: ObjectStore, + object_store: InstrumentedObjectStore, codec: IndexValuesCodec, index_creator: Box, @@ -70,6 +70,8 @@ impl SstIndexCreator { memory_usage_threshold: Option, row_group_size: NonZeroUsize, ) -> Self { + let object_store = InstrumentedObjectStore::new(object_store); + let temp_file_provider = Arc::new(TempFileProvider::new( IntermediateLocation::new(®ion_dir, &sst_file_id), object_store.clone(), @@ -158,10 +160,8 @@ impl SstIndexCreator { let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id); let file_writer = self .object_store - .writer(&file_path) - .await - .context(OpenDalSnafu)?; - let file_writer = InstrumentedAsyncWrite::new(file_writer, &INDEX_PUFFIN_WRITE_BYTES_TOTAL); + .writer(&file_path, &INDEX_PUFFIN_WRITE_BYTES_TOTAL) + .await?; let mut puffin_writer = PuffinFileWriter::new(file_writer); let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/creator/temp_provider.rs index 434fd8d4120f..b2b4d967a7df 100644 --- a/src/mito2/src/sst/index/creator/temp_provider.rs +++ b/src/mito2/src/sst/index/creator/temp_provider.rs @@ -19,17 +19,16 @@ use futures::{AsyncRead, AsyncWrite}; use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider; use index::inverted_index::error as index_error; use index::inverted_index::error::Result as IndexResult; -use object_store::ObjectStore; use snafu::ResultExt; -use crate::error::{OpenDalSnafu, Result}; +use crate::error::Result; use crate::metrics::{INDEX_INTERMEDIATE_READ_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL}; -use crate::sst::index::io_stats::{InstrumentedAsyncRead, InstrumentedAsyncWrite}; +use crate::sst::index::object_store::InstrumentedObjectStore; use crate::sst::location::IntermediateLocation; pub(crate) struct TempFileProvider { location: IntermediateLocation, - object_store: ObjectStore, + object_store: InstrumentedObjectStore, } #[async_trait] @@ -42,12 +41,10 @@ impl ExternalTempFileProvider for TempFileProvider { let path = self.location.file_path(column_name, file_id); let writer = self .object_store - .writer(&path) + .writer(&path, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL) .await - .context(OpenDalSnafu) .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; - let writer = InstrumentedAsyncWrite::new(writer, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL); Ok(Box::new(writer)) } @@ -60,7 +57,6 @@ impl ExternalTempFileProvider for TempFileProvider { .object_store .list(&dir) .await - .context(OpenDalSnafu) .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; let mut readers = Vec::with_capacity(entries.len()); @@ -73,12 +69,10 @@ impl ExternalTempFileProvider for TempFileProvider { let reader = self .object_store - .reader(entry.path()) + .reader(entry.path(), &INDEX_INTERMEDIATE_READ_BYTES_TOTAL) .await - .context(OpenDalSnafu) .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; - let reader = InstrumentedAsyncRead::new(reader, &INDEX_INTERMEDIATE_READ_BYTES_TOTAL); readers.push(Box::new(reader) as _); } @@ -87,7 +81,7 @@ impl ExternalTempFileProvider for TempFileProvider { } impl TempFileProvider { - pub fn new(location: IntermediateLocation, object_store: ObjectStore) -> Self { + pub fn new(location: IntermediateLocation, object_store: InstrumentedObjectStore) -> Self { Self { location, object_store, @@ -95,9 +89,6 @@ impl TempFileProvider { } pub async fn cleanup(&self) -> Result<()> { - self.object_store - .remove_all(self.location.root_dir()) - .await - .context(OpenDalSnafu) + self.object_store.remove_all(self.location.root_dir()).await } } diff --git a/src/mito2/src/sst/index/io_stats.rs b/src/mito2/src/sst/index/object_store.rs similarity index 67% rename from src/mito2/src/sst/index/io_stats.rs rename to src/mito2/src/sst/index/object_store.rs index b63166caca70..2316b51a20f0 100644 --- a/src/mito2/src/sst/index/io_stats.rs +++ b/src/mito2/src/sst/index/object_store.rs @@ -17,29 +17,51 @@ use std::pin::Pin; use std::task::{Context, Poll}; use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use object_store::ObjectStore; use pin_project::pin_project; use prometheus::IntCounter; +use snafu::ResultExt; -struct BytesRecorder { - bytes: usize, - recorder: &'static IntCounter, +use crate::error::{OpenDalSnafu, Result}; + +#[derive(Clone)] +pub(crate) struct InstrumentedObjectStore { + object_store: ObjectStore, } -impl BytesRecorder { - fn new(recorder: &'static IntCounter) -> Self { - Self { bytes: 0, recorder } +impl InstrumentedObjectStore { + pub(crate) fn new(object_store: ObjectStore) -> Self { + Self { object_store } } - fn inc_by(&mut self, bytes: usize) { - self.bytes += bytes; + pub(crate) async fn reader( + &self, + path: &str, + recoder: &'static IntCounter, + ) -> Result> { + let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?; + Ok(InstrumentedAsyncRead::new(reader, recoder)) } -} -impl Drop for BytesRecorder { - fn drop(&mut self) { - if self.bytes > 0 { - self.recorder.inc_by(self.bytes as _); - } + pub(crate) async fn writer( + &self, + path: &str, + recoder: &'static IntCounter, + ) -> Result> { + let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?; + Ok(InstrumentedAsyncWrite::new(writer, recoder)) + } + + pub(crate) async fn list(&self, path: &str) -> Result> { + let list = self.object_store.list(path).await.context(OpenDalSnafu)?; + Ok(list) + } + + pub(crate) async fn remove_all(&self, path: &str) -> Result<()> { + self.object_store + .remove_all(path) + .await + .context(OpenDalSnafu) } } @@ -51,7 +73,7 @@ pub(crate) struct InstrumentedAsyncRead { } impl InstrumentedAsyncRead { - pub(crate) fn new(inner: R, recorder: &'static IntCounter) -> Self { + fn new(inner: R, recorder: &'static IntCounter) -> Self { Self { inner, recorder: BytesRecorder::new(recorder), @@ -91,7 +113,7 @@ pub(crate) struct InstrumentedAsyncWrite { } impl InstrumentedAsyncWrite { - pub(crate) fn new(inner: W, recorder: &'static IntCounter) -> Self { + fn new(inner: W, recorder: &'static IntCounter) -> Self { Self { inner, recorder: BytesRecorder::new(recorder), @@ -120,3 +142,26 @@ impl AsyncWrite for InstrumentedAsyncWrite { self.project().inner.poll_close(cx) } } + +struct BytesRecorder { + bytes: usize, + recorder: &'static IntCounter, +} + +impl BytesRecorder { + fn new(recorder: &'static IntCounter) -> Self { + Self { bytes: 0, recorder } + } + + fn inc_by(&mut self, bytes: usize) { + self.bytes += bytes; + } +} + +impl Drop for BytesRecorder { + fn drop(&mut self) { + if self.bytes > 0 { + self.recorder.inc_by(self.bytes as _); + } + } +}