Skip to content

Commit

Permalink
store is nice
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 29, 2023
1 parent 56d1621 commit 2f9319a
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
pub mod applier;
mod codec;
pub mod creator;
mod object_store;
mod store;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

Expand Down
10 changes: 5 additions & 5 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,29 @@ use crate::metrics::{
INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
};
use crate::sst::file::FileId;
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;

#[derive(Clone)]
pub struct SstIndexApplier {
region_dir: String,
object_store: InstrumentedObjectStore,
store: InstrumentedStore,

index_applier: Arc<dyn IndexApplier>,
}

impl SstIndexApplier {
pub(crate) fn new(
region_dir: String,
object_store: InstrumentedObjectStore,
store: InstrumentedStore,
index_applier: Arc<dyn IndexApplier>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);

Self {
region_dir,
object_store,
store,
index_applier,
}
}
Expand All @@ -61,7 +61,7 @@ impl SstIndexApplier {
let file_path = location::index_file_path(&self.region_dir, &file_id);

let file_reader = self
.object_store
.store
.reader(&file_path, &INDEX_PUFFIN_READ_BYTES_TOTAL)
.await?;
let mut puffin_reader = PuffinFileReader::new(file_reader);
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/index/applier/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::index::store::InstrumentedStore;

type ColumnName = String;

Expand Down Expand Up @@ -74,7 +74,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(SstIndexApplier::new(
self.region_dir,
InstrumentedObjectStore::new(self.object_store),
InstrumentedStore::new(self.object_store),
Arc::new(applier.context(BuildIndexApplierSnafu)?),
)))
}
Expand Down
12 changes: 6 additions & 6 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::{
INDEX_BLOB_TYPE, MIN_MEMORY_USAGE_THRESHOLD, PIPE_BUFFER_SIZE_FOR_SENDING_BLOB,
};
Expand All @@ -50,7 +50,7 @@ type RowCount = usize;
pub struct SstIndexCreator {
region_dir: String,
sst_file_id: FileId,
object_store: InstrumentedObjectStore,
store: InstrumentedStore,

codec: IndexValuesCodec,
index_creator: Box<dyn InvertedIndexCreator>,
Expand All @@ -70,11 +70,11 @@ impl SstIndexCreator {
memory_usage_threshold: Option<usize>,
row_group_size: NonZeroUsize,
) -> Self {
let object_store = InstrumentedObjectStore::new(object_store);
let store = InstrumentedStore::new(object_store);

let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&region_dir, &sst_file_id),
object_store.clone(),
store.clone(),
));
let memory_usage_threshold = memory_usage_threshold.map(|threshold| {
(threshold / metadata.primary_key.len()).max(MIN_MEMORY_USAGE_THRESHOLD)
Expand All @@ -87,7 +87,7 @@ impl SstIndexCreator {
Self {
region_dir,
sst_file_id,
object_store,
store,
codec,
index_creator,
temp_file_provider,
Expand Down Expand Up @@ -159,7 +159,7 @@ impl SstIndexCreator {

let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id);
let file_writer = self
.object_store
.store
.writer(&file_path, &INDEX_PUFFIN_WRITE_BYTES_TOTAL)
.await?;
let mut puffin_writer = PuffinFileWriter::new(file_writer);
Expand Down
19 changes: 8 additions & 11 deletions src/mito2/src/sst/index/creator/temp_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use snafu::ResultExt;

use crate::error::Result;
use crate::metrics::{INDEX_INTERMEDIATE_READ_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL};
use crate::sst::index::object_store::InstrumentedObjectStore;
use crate::sst::index::store::InstrumentedStore;
use crate::sst::location::IntermediateLocation;

pub(crate) struct TempFileProvider {
location: IntermediateLocation,
object_store: InstrumentedObjectStore,
store: InstrumentedStore,
}

#[async_trait]
Expand All @@ -40,7 +40,7 @@ impl ExternalTempFileProvider for TempFileProvider {
) -> IndexResult<Box<dyn AsyncWrite + Unpin + Send>> {
let path = self.location.file_path(column_name, file_id);
let writer = self
.object_store
.store
.writer(&path, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL)
.await
.map_err(BoxedError::new)
Expand All @@ -54,7 +54,7 @@ impl ExternalTempFileProvider for TempFileProvider {
) -> IndexResult<Vec<Box<dyn AsyncRead + Unpin + Send>>> {
let dir = self.location.column_dir(column_name);
let entries = self
.object_store
.store
.list(&dir)
.await
.map_err(BoxedError::new)
Expand All @@ -68,7 +68,7 @@ impl ExternalTempFileProvider for TempFileProvider {
}

let reader = self
.object_store
.store
.reader(entry.path(), &INDEX_INTERMEDIATE_READ_BYTES_TOTAL)
.await
.map_err(BoxedError::new)
Expand All @@ -81,14 +81,11 @@ impl ExternalTempFileProvider for TempFileProvider {
}

impl TempFileProvider {
pub fn new(location: IntermediateLocation, object_store: InstrumentedObjectStore) -> Self {
Self {
location,
object_store,
}
pub fn new(location: IntermediateLocation, store: InstrumentedStore) -> Self {
Self { location, store }
}

pub async fn cleanup(&self) -> Result<()> {
self.object_store.remove_all(self.location.root_dir()).await
self.store.remove_all(self.location.root_dir()).await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ use snafu::ResultExt;
use crate::error::{OpenDalSnafu, Result};

#[derive(Clone)]
pub(crate) struct InstrumentedObjectStore {
pub(crate) struct InstrumentedStore {
object_store: ObjectStore,
}

impl InstrumentedObjectStore {
pub(crate) fn new(object_store: ObjectStore) -> Self {
impl InstrumentedStore {
pub fn new(object_store: ObjectStore) -> Self {
Self { object_store }
}

pub(crate) async fn reader(
pub async fn reader(
&self,
path: &str,
recoder: &'static IntCounter,
Expand All @@ -43,7 +43,7 @@ impl InstrumentedObjectStore {
Ok(InstrumentedAsyncRead::new(reader, recoder))
}

pub(crate) async fn writer(
pub async fn writer(
&self,
path: &str,
recoder: &'static IntCounter,
Expand All @@ -52,12 +52,12 @@ impl InstrumentedObjectStore {
Ok(InstrumentedAsyncWrite::new(writer, recoder))
}

pub(crate) async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
pub async fn list(&self, path: &str) -> Result<Vec<object_store::Entry>> {
let list = self.object_store.list(path).await.context(OpenDalSnafu)?;
Ok(list)
}

pub(crate) async fn remove_all(&self, path: &str) -> Result<()> {
pub async fn remove_all(&self, path: &str) -> Result<()> {
self.object_store
.remove_all(path)
.await
Expand Down

0 comments on commit 2f9319a

Please sign in to comment.