Skip to content

Commit

Permalink
instrument object store to track its usage
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 29, 2023
1 parent c5b3d69 commit 56d1621
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 53 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pub mod applier;
mod codec;
pub mod creator;
mod io_stats;
mod object_store;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

Expand Down
19 changes: 7 additions & 12 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,29 @@ use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
IndexApplier, IndexNotFoundStrategy, SearchContext,
};
use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::ResultExt;

use super::io_stats::InstrumentedAsyncRead;
use crate::error::{OpenDalSnafu, Result};
use crate::error::Result;
use crate::metrics::{
INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
};
use crate::sst::file::FileId;
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;

#[derive(Clone)]
pub struct SstIndexApplier {
region_dir: String,
object_store: ObjectStore,
object_store: InstrumentedObjectStore,

index_applier: Arc<dyn IndexApplier>,
}

impl SstIndexApplier {
pub fn new(
pub(crate) fn new(
region_dir: String,
object_store: ObjectStore,
object_store: InstrumentedObjectStore,
index_applier: Arc<dyn IndexApplier>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
Expand All @@ -64,11 +62,8 @@ impl SstIndexApplier {

let file_reader = self
.object_store
.reader(&file_path)
.await
.context(OpenDalSnafu)?;
let file_reader = InstrumentedAsyncRead::new(file_reader, &INDEX_PUFFIN_READ_BYTES_TOTAL);

.reader(&file_path, &INDEX_PUFFIN_READ_BYTES_TOTAL)
.await?;
let mut puffin_reader = PuffinFileReader::new(file_reader);

let file_meta = puffin_reader.metadata().await.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/sst/index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::object_store::InstrumentedObjectStore;

type ColumnName = String;

Expand Down Expand Up @@ -73,7 +74,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(SstIndexApplier::new(
self.region_dir,
self.object_store,
InstrumentedObjectStore::new(self.object_store),
Arc::new(applier.context(BuildIndexApplierSnafu)?),
)))
}
Expand Down
14 changes: 7 additions & 7 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ use store_api::metadata::RegionMetadataRef;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

use super::io_stats::InstrumentedAsyncWrite;
use crate::error::{OpenDalSnafu, PushIndexValueSnafu, Result};
use crate::error::{PushIndexValueSnafu, Result};
use crate::metrics::INDEX_PUFFIN_WRITE_BYTES_TOTAL;
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::index::{
INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB,
};
Expand All @@ -50,7 +50,7 @@ type RowCount = usize;
pub struct SstIndexCreator {
region_dir: String,
sst_file_id: FileId,
object_store: ObjectStore,
object_store: InstrumentedObjectStore,

codec: IndexValuesCodec,
index_creator: Box<dyn InvertedIndexCreator>,
Expand All @@ -70,6 +70,8 @@ impl SstIndexCreator {
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
) -> Self {
let object_store = InstrumentedObjectStore::new(object_store);

let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&region_dir, &sst_file_id),
object_store.clone(),
Expand Down Expand Up @@ -158,10 +160,8 @@ impl SstIndexCreator {
let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id);
let file_writer = self
.object_store
.writer(&file_path)
.await
.context(OpenDalSnafu)?;
let file_writer = InstrumentedAsyncWrite::new(file_writer, &INDEX_PUFFIN_WRITE_BYTES_TOTAL);
.writer(&file_path, &INDEX_PUFFIN_WRITE_BYTES_TOTAL)
.await?;
let mut puffin_writer = PuffinFileWriter::new(file_writer);

let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
Expand Down
23 changes: 7 additions & 16 deletions src/mito2/src/sst/index/creator/temp_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ use futures::{AsyncRead, AsyncWrite};
use index::inverted_index::create::sort::external_provider::ExternalTempFileProvider;
use index::inverted_index::error as index_error;
use index::inverted_index::error::Result as IndexResult;
use object_store::ObjectStore;
use snafu::ResultExt;

use crate::error::{OpenDalSnafu, Result};
use crate::error::Result;
use crate::metrics::{INDEX_INTERMEDIATE_READ_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL};
use crate::sst::index::io_stats::{InstrumentedAsyncRead, InstrumentedAsyncWrite};
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::location::IntermediateLocation;

pub(crate) struct TempFileProvider {
location: IntermediateLocation,
object_store: ObjectStore,
object_store: InstrumentedObjectStore,
}

#[async_trait]
Expand All @@ -42,12 +41,10 @@ impl ExternalTempFileProvider for TempFileProvider {
let path = self.location.file_path(column_name, file_id);
let writer = self
.object_store
.writer(&path)
.writer(&path, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL)
.await
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let writer = InstrumentedAsyncWrite::new(writer, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL);
Ok(Box::new(writer))
}

Expand All @@ -60,7 +57,6 @@ impl ExternalTempFileProvider for TempFileProvider {
.object_store
.list(&dir)
.await
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let mut readers = Vec::with_capacity(entries.len());
Expand All @@ -73,12 +69,10 @@ impl ExternalTempFileProvider for TempFileProvider {

let reader = self
.object_store
.reader(entry.path())
.reader(entry.path(), &INDEX_INTERMEDIATE_READ_BYTES_TOTAL)
.await
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let reader = InstrumentedAsyncRead::new(reader, &INDEX_INTERMEDIATE_READ_BYTES_TOTAL);
readers.push(Box::new(reader) as _);
}

Expand All @@ -87,17 +81,14 @@ impl ExternalTempFileProvider for TempFileProvider {
}

impl TempFileProvider {
pub fn new(location: IntermediateLocation, object_store: ObjectStore) -> Self {
pub fn new(location: IntermediateLocation, object_store: InstrumentedObjectStore) -> Self {
Self {
location,
object_store,
}
}

pub async fn cleanup(&self) -> Result<()> {
self.object_store
.remove_all(self.location.root_dir())
.await
.context(OpenDalSnafu)
self.object_store.remove_all(self.location.root_dir()).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,51 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use object_store::ObjectStore;
use pin_project::pin_project;
use prometheus::IntCounter;
use snafu::ResultExt;

struct BytesRecorder {
bytes: usize,
recorder: &'static IntCounter,
use crate::error::{OpenDalSnafu, Result};

#[derive(Clone)]
pub(crate) struct InstrumentedObjectStore {
object_store: ObjectStore,
}

impl BytesRecorder {
fn new(recorder: &'static IntCounter) -> Self {
Self { bytes: 0, recorder }
impl InstrumentedObjectStore {
pub(crate) fn new(object_store: ObjectStore) -> Self {
Self { object_store }
}

fn inc_by(&mut self, bytes: usize) {
self.bytes += bytes;
pub(crate) async fn reader(
&self,
path: &str,
recoder: &'static IntCounter,
) -> Result<InstrumentedAsyncRead<object_store::Reader>> {
let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncRead::new(reader, recoder))
}
}

impl Drop for BytesRecorder {
fn drop(&mut self) {
if self.bytes > 0 {
self.recorder.inc_by(self.bytes as _);
}
pub(crate) async fn writer(
&self,
path: &str,
recoder: &'static IntCounter,
) -> Result<InstrumentedAsyncWrite<object_store::Writer>> {
let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncWrite::new(writer, recoder))
}

pub(crate) async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
Ok(list)
}

pub(crate) async fn remove_all(&self, path: &str) -> Result<()> {
self.object_store
.remove_all(path)
.await
.context(OpenDalSnafu)
}
}

Expand All @@ -51,7 +73,7 @@ pub(crate) struct InstrumentedAsyncRead<R> {
}

impl<R> InstrumentedAsyncRead<R> {
pub(crate) fn new(inner: R, recorder: &'static IntCounter) -> Self {
fn new(inner: R, recorder: &'static IntCounter) -> Self {
Self {
inner,
recorder: BytesRecorder::new(recorder),
Expand Down Expand Up @@ -91,7 +113,7 @@ pub(crate) struct InstrumentedAsyncWrite<W> {
}

impl<W> InstrumentedAsyncWrite<W> {
pub(crate) fn new(inner: W, recorder: &'static IntCounter) -> Self {
fn new(inner: W, recorder: &'static IntCounter) -> Self {
Self {
inner,
recorder: BytesRecorder::new(recorder),
Expand Down Expand Up @@ -120,3 +142,26 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<W> {
self.project().inner.poll_close(cx)
}
}

struct BytesRecorder {
bytes: usize,
recorder: &'static IntCounter,
}

impl BytesRecorder {
fn new(recorder: &'static IntCounter) -> Self {
Self { bytes: 0, recorder }
}

fn inc_by(&mut self, bytes: usize) {
self.bytes += bytes;
}
}

impl Drop for BytesRecorder {
fn drop(&mut self) {
if self.bytes > 0 {
self.recorder.inc_by(self.bytes as _);
}
}
}

0 comments on commit 56d1621

Please sign in to comment.