Skip to content

Commit

Permalink
track io bytes
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Dec 29, 2023
1 parent 315c77b commit 6f181c4
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 11 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/index/src/inverted_index/search/fst_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,7 @@ pub trait FstApplier: Send + Sync {
///
/// Returns a `Vec<u64>`, with each u64 being a value from the FstMap.
fn apply(&self, fst: &FstMap) -> Vec<u64>;

/// Returns the memory usage of the FstApplier.
fn memory_usage(&self) -> usize;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub struct IntersectionFstApplier {

/// A list of `Dfa` compiled from regular expression patterns.
dfas: Vec<DFA<Vec<u32>>>,

/// The memory usage of the `IntersectionFstApplier`.
memory_usage: usize,
}

impl FstApplier for IntersectionFstApplier {
Expand Down Expand Up @@ -68,6 +71,10 @@ impl FstApplier for IntersectionFstApplier {
}
values
}

fn memory_usage(&self) -> usize {
self.memory_usage
}
}

impl IntersectionFstApplier {
Expand All @@ -82,12 +89,17 @@ impl IntersectionFstApplier {
let mut dfas = Vec::with_capacity(predicates.len());
let mut ranges = Vec::with_capacity(predicates.len());

let mut memory_usage = 0;
for predicate in predicates {
match predicate {
Predicate::Range(range) => ranges.push(range.range),
Predicate::Range(range) => {
memory_usage += Self::range_memory_usage(&range.range);
ranges.push(range.range)
}
Predicate::RegexMatch(regex) => {
let dfa = DFA::new(&regex.pattern);
let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?;
memory_usage += dfa.memory_usage();
dfas.push(dfa);
}
// Rejection of `InList` predicates is enforced here.
Expand All @@ -97,7 +109,25 @@ impl IntersectionFstApplier {
}
}

Ok(Self { dfas, ranges })
Ok(Self {
dfas,
ranges,
memory_usage,
})
}

fn range_memory_usage(range: &Range) -> usize {
let mut memory_usage = std::mem::size_of::<Range>();

if let Some(lower) = &range.lower {
memory_usage += lower.value.len();
}

if let Some(upper) = &range.upper {
memory_usage += upper.value.len();
}

memory_usage
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/index/src/inverted_index/search/fst_apply/keys_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,19 @@ use crate::inverted_index::{Bytes, FstMap};
pub struct KeysFstApplier {
/// A list of keys to be fetched directly from the FstMap.
keys: Vec<Bytes>,

/// The memory usage of the applier.
memory_usage: usize,
}

impl FstApplier for KeysFstApplier {
fn apply(&self, fst: &FstMap) -> Vec<u64> {
self.keys.iter().filter_map(|k| fst.get(k)).collect()
}

fn memory_usage(&self) -> usize {
self.memory_usage
}
}

impl KeysFstApplier {
Expand All @@ -56,6 +63,7 @@ impl KeysFstApplier {
let regex_matched_keys = Self::filter_by_regexes(range_matched_keys, regexes)?;

Ok(Self {
memory_usage: regex_matched_keys.iter().map(|k| k.len()).sum(),
keys: regex_matched_keys,
})
}
Expand Down Expand Up @@ -192,6 +200,7 @@ mod tests {
let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]);
let applier = KeysFstApplier {
keys: vec![b("foo"), b("baz")],
memory_usage: 6,
};

let results = applier.apply(&test_fst);
Expand All @@ -201,7 +210,10 @@ mod tests {
#[test]
fn test_keys_fst_applier_with_empty_keys() {
let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]);
let applier = KeysFstApplier { keys: vec![] };
let applier = KeysFstApplier {
keys: vec![],
memory_usage: 0,
};

let results = applier.apply(&test_fst);
assert!(results.is_empty());
Expand All @@ -212,6 +224,7 @@ mod tests {
let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]);
let applier = KeysFstApplier {
keys: vec![b("qux"), b("quux")],
memory_usage: 7,
};

let results = applier.apply(&test_fst);
Expand Down
3 changes: 3 additions & 0 deletions src/index/src/inverted_index/search/index_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub trait IndexApplier: Send + Sync {
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<BTreeSet<usize>>;

/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize;
}

/// A context for searching the inverted index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl IndexApplier for PredicatesIndexApplier {

Ok(bitmap.iter_ones().collect())
}

/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize {
self.fst_appliers
.iter()
.map(|(n, fst_applier)| n.as_bytes().len() + fst_applier.memory_usage())
.sum()
}
}

impl PredicatesIndexApplier {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ common-procedure.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
pin-project.workspace = true
common-telemetry.workspace = true
common-test-util = { workspace = true, optional = true }
common-time.workspace = true
Expand Down
28 changes: 23 additions & 5 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage";
pub const TYPE_LABEL: &str = "type";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";

lazy_static! {
/// Global write buffer size in bytes.
Expand Down Expand Up @@ -152,6 +154,12 @@ lazy_static! {
"index apply cost time",
)
.unwrap();
/// Gauge of index apply memory usage.
pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!(
"index_apply_memory_usage",
"index apply memory usage",
)
.unwrap();
/// Timer of index creation.
pub static ref INDEX_CREATE_COST_TIME: HistogramVec = register_histogram_vec!(
"index_create_cost_time",
Expand All @@ -161,16 +169,26 @@ lazy_static! {
.unwrap();
/// Counter of rows indexed.
pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!(
"index_rows_total",
"index rows total",
"index_create_rows_total",
"index create rows total",
)
.unwrap();
/// Counter of created index bytes.
pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!(
"index_bytes_total",
"index bytes total",
"index_create_bytes_total",
"index create bytes total",
)
.unwrap();

/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_bytes_total",
"index io bytes total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["read", "intermediate"]);
pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["write", "intermediate"]);
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["read", "puffin"]);
pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["write", "puffin"]);
// ------- End of index metrics.
}
1 change: 1 addition & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pub mod applier;
mod codec;
pub mod creator;
mod io_stats;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

Expand Down
15 changes: 14 additions & 1 deletion src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ use object_store::ObjectStore;
use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader};
use snafu::ResultExt;

use super::io_stats::InstrumentedAsyncRead;
use crate::error::{OpenDalSnafu, Result};
use crate::metrics::INDEX_APPLY_COST_TIME;
use crate::metrics::{
INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
};
use crate::sst::file::FileId;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;
Expand All @@ -45,6 +48,8 @@ impl SstIndexApplier {
object_store: ObjectStore,
index_applier: Arc<dyn IndexApplier>,
) -> Self {
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);

Self {
region_dir,
object_store,
Expand All @@ -62,6 +67,8 @@ impl SstIndexApplier {
.reader(&file_path)
.await
.context(OpenDalSnafu)?;
let file_reader = InstrumentedAsyncRead::new(file_reader, &INDEX_PUFFIN_READ_BYTES_TOTAL);

let mut puffin_reader = PuffinFileReader::new(file_reader);

let file_meta = puffin_reader.metadata().await.unwrap();
Expand All @@ -86,3 +93,9 @@ impl SstIndexApplier {
Ok(res)
}
}

impl Drop for SstIndexApplier {
fn drop(&mut self) {
INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
}
}
7 changes: 5 additions & 2 deletions src/mito2/src/sst/index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use store_api::metadata::RegionMetadataRef;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

use super::io_stats::InstrumentedAsyncWrite;
use crate::error::{OpenDalSnafu, PushIndexValueSnafu, Result};
use crate::metrics::INDEX_PUFFIN_WRITE_BYTES_TOTAL;
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
Expand Down Expand Up @@ -154,12 +156,13 @@ impl SstIndexCreator {
let mut guard = self.stats.record_finish();

let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id);
let writer = self
let file_writer = self
.object_store
.writer(&file_path)
.await
.context(OpenDalSnafu)?;
let mut puffin_writer = PuffinFileWriter::new(writer);
let file_writer = InstrumentedAsyncWrite::new(file_writer, &INDEX_PUFFIN_WRITE_BYTES_TOTAL);
let mut puffin_writer = PuffinFileWriter::new(file_writer);

let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);

Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/sst/index/creator/temp_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use object_store::ObjectStore;
use snafu::ResultExt;

use crate::error::{OpenDalSnafu, Result};
use crate::metrics::{INDEX_INTERMEDIATE_READ_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL};
use crate::sst::index::io_stats::{InstrumentedAsyncRead, InstrumentedAsyncWrite};
use crate::sst::location::IntermediateLocation;

pub(crate) struct TempFileProvider {
Expand All @@ -45,6 +47,7 @@ impl ExternalTempFileProvider for TempFileProvider {
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let writer = InstrumentedAsyncWrite::new(writer, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL);
Ok(Box::new(writer))
}

Expand Down Expand Up @@ -75,6 +78,7 @@ impl ExternalTempFileProvider for TempFileProvider {
.context(OpenDalSnafu)
.map_err(BoxedError::new)
.context(index_error::ExternalSnafu)?;
let reader = InstrumentedAsyncRead::new(reader, &INDEX_INTERMEDIATE_READ_BYTES_TOTAL);
readers.push(Box::new(reader) as _);
}

Expand Down
Loading

0 comments on commit 6f181c4

Please sign in to comment.