From 6f181c4e5dcb4f18aab3414d7e537db1e7067ebe Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 29 Dec 2023 09:52:27 +0000 Subject: [PATCH] track io bytes Signed-off-by: Zhenchi --- Cargo.lock | 1 + .../src/inverted_index/search/fst_apply.rs | 3 + .../search/fst_apply/intersection_apply.rs | 34 ++++- .../search/fst_apply/keys_apply.rs | 15 ++- .../src/inverted_index/search/index_apply.rs | 3 + .../search/index_apply/predicates_apply.rs | 8 ++ src/mito2/Cargo.toml | 1 + src/mito2/src/metrics.rs | 28 +++- src/mito2/src/sst/index.rs | 1 + src/mito2/src/sst/index/applier.rs | 15 ++- src/mito2/src/sst/index/creator.rs | 7 +- .../src/sst/index/creator/temp_provider.rs | 4 + src/mito2/src/sst/index/io_stats.rs | 122 ++++++++++++++++++ 13 files changed, 231 insertions(+), 11 deletions(-) create mode 100644 src/mito2/src/sst/index/io_stats.rs diff --git a/Cargo.lock b/Cargo.lock index 3fff439cb99e..3b70b7853da9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4986,6 +4986,7 @@ dependencies = [ "object-store", "parquet", "paste", + "pin-project", "prometheus", "prost 0.12.3", "puffin", diff --git a/src/index/src/inverted_index/search/fst_apply.rs b/src/index/src/inverted_index/search/fst_apply.rs index 9f54d0d88918..b496539d866a 100644 --- a/src/index/src/inverted_index/search/fst_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply.rs @@ -30,4 +30,7 @@ pub trait FstApplier: Send + Sync { /// /// Returns a `Vec`, with each u64 being a value from the FstMap. fn apply(&self, fst: &FstMap) -> Vec; + + /// Returns the memory usage of the FstApplier. + fn memory_usage(&self) -> usize; } diff --git a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs index a5783fd97526..6589c299a912 100644 --- a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs @@ -31,6 +31,9 @@ pub struct IntersectionFstApplier { /// A list of `Dfa` compiled from regular expression patterns. dfas: Vec>>, + + /// The memory usage of the `IntersectionFstApplier`. + memory_usage: usize, } impl FstApplier for IntersectionFstApplier { @@ -68,6 +71,10 @@ impl FstApplier for IntersectionFstApplier { } values } + + fn memory_usage(&self) -> usize { + self.memory_usage + } } impl IntersectionFstApplier { @@ -82,12 +89,17 @@ impl IntersectionFstApplier { let mut dfas = Vec::with_capacity(predicates.len()); let mut ranges = Vec::with_capacity(predicates.len()); + let mut memory_usage = 0; for predicate in predicates { match predicate { - Predicate::Range(range) => ranges.push(range.range), + Predicate::Range(range) => { + memory_usage += Self::range_memory_usage(&range.range); + ranges.push(range.range) + } Predicate::RegexMatch(regex) => { let dfa = DFA::new(®ex.pattern); let dfa = dfa.map_err(Box::new).context(ParseDFASnafu)?; + memory_usage += dfa.memory_usage(); dfas.push(dfa); } // Rejection of `InList` predicates is enforced here. @@ -97,7 +109,25 @@ impl IntersectionFstApplier { } } - Ok(Self { dfas, ranges }) + Ok(Self { + dfas, + ranges, + memory_usage, + }) + } + + fn range_memory_usage(range: &Range) -> usize { + let mut memory_usage = std::mem::size_of::(); + + if let Some(lower) = &range.lower { + memory_usage += lower.value.len(); + } + + if let Some(upper) = &range.upper { + memory_usage += upper.value.len(); + } + + memory_usage } } diff --git a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs index 4ec5710a3435..79b79db1ced0 100644 --- a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs @@ -29,12 +29,19 @@ use crate::inverted_index::{Bytes, FstMap}; pub struct KeysFstApplier { /// A list of keys to be fetched directly from the FstMap. keys: Vec, + + /// The memory usage of the applier. + memory_usage: usize, } impl FstApplier for KeysFstApplier { fn apply(&self, fst: &FstMap) -> Vec { self.keys.iter().filter_map(|k| fst.get(k)).collect() } + + fn memory_usage(&self) -> usize { + self.memory_usage + } } impl KeysFstApplier { @@ -56,6 +63,7 @@ impl KeysFstApplier { let regex_matched_keys = Self::filter_by_regexes(range_matched_keys, regexes)?; Ok(Self { + memory_usage: regex_matched_keys.iter().map(|k| k.len()).sum(), keys: regex_matched_keys, }) } @@ -192,6 +200,7 @@ mod tests { let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]); let applier = KeysFstApplier { keys: vec![b("foo"), b("baz")], + memory_usage: 6, }; let results = applier.apply(&test_fst); @@ -201,7 +210,10 @@ mod tests { #[test] fn test_keys_fst_applier_with_empty_keys() { let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]); - let applier = KeysFstApplier { keys: vec![] }; + let applier = KeysFstApplier { + keys: vec![], + memory_usage: 0, + }; let results = applier.apply(&test_fst); assert!(results.is_empty()); @@ -212,6 +224,7 @@ mod tests { let test_fst = create_fst_map(&[(b"foo", 1), (b"bar", 2), (b"baz", 3)]); let applier = KeysFstApplier { keys: vec![b("qux"), b("quux")], + memory_usage: 7, }; let results = applier.apply(&test_fst); diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index 6701f03cac3d..8ad4db10904d 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -35,6 +35,9 @@ pub trait IndexApplier: Send + Sync { context: SearchContext, reader: &mut dyn InvertedIndexReader, ) -> Result>; + + /// Returns the memory usage of the applier. + fn memory_usage(&self) -> usize; } /// A context for searching the inverted index. diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index 2331d8af6ffd..c8a06965e241 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -82,6 +82,14 @@ impl IndexApplier for PredicatesIndexApplier { Ok(bitmap.iter_ones().collect()) } + + /// Returns the memory usage of the applier. + fn memory_usage(&self) -> usize { + self.fst_appliers + .iter() + .map(|(n, fst_applier)| n.as_bytes().len() + fst_applier.memory_usage()) + .sum() + } } impl PredicatesIndexApplier { diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0fbc469fdd22..b977dc40ec3e 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -29,6 +29,7 @@ common-procedure.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true +pin-project.workspace = true common-telemetry.workspace = true common-test-util = { workspace = true, optional = true } common-time.workspace = true diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index dae565fb3baf..5f4038d93018 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage"; pub const TYPE_LABEL: &str = "type"; /// Reason to flush. pub const FLUSH_REASON: &str = "reason"; +/// File type label. +pub const FILE_TYPE_LABEL: &str = "file_type"; lazy_static! { /// Global write buffer size in bytes. @@ -152,6 +154,12 @@ lazy_static! { "index apply cost time", ) .unwrap(); + /// Gauge of index apply memory usage. + pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!( + "index_apply_memory_usage", + "index apply memory usage", + ) + .unwrap(); /// Timer of index creation. pub static ref INDEX_CREATE_COST_TIME: HistogramVec = register_histogram_vec!( "index_create_cost_time", @@ -161,16 +169,26 @@ lazy_static! { .unwrap(); /// Counter of rows indexed. pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounter = register_int_counter!( - "index_rows_total", - "index rows total", + "index_create_rows_total", + "index create rows total", ) .unwrap(); /// Counter of created index bytes. pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounter = register_int_counter!( - "index_bytes_total", - "index bytes total", + "index_create_bytes_total", + "index create bytes total", ) .unwrap(); - + /// Counter of r/w bytes on index related IO operations. + pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "index_io_bytes_total", + "index io bytes total", + &[TYPE_LABEL, FILE_TYPE_LABEL] + ) + .unwrap(); + pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["read", "intermediate"]); + pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["write", "intermediate"]); + pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["read", "puffin"]); + pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL.with_label_values(&["write", "puffin"]); // ------- End of index metrics. } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 1a0365309b85..5e3089f7bef1 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -15,6 +15,7 @@ pub mod applier; mod codec; pub mod creator; +mod io_stats; const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 041ad3086adf..191763425e4a 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -25,8 +25,11 @@ use object_store::ObjectStore; use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; use snafu::ResultExt; +use super::io_stats::InstrumentedAsyncRead; use crate::error::{OpenDalSnafu, Result}; -use crate::metrics::INDEX_APPLY_COST_TIME; +use crate::metrics::{ + INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL, +}; use crate::sst::file::FileId; use crate::sst::index::INDEX_BLOB_TYPE; use crate::sst::location; @@ -45,6 +48,8 @@ impl SstIndexApplier { object_store: ObjectStore, index_applier: Arc, ) -> Self { + INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); + Self { region_dir, object_store, @@ -62,6 +67,8 @@ impl SstIndexApplier { .reader(&file_path) .await .context(OpenDalSnafu)?; + let file_reader = InstrumentedAsyncRead::new(file_reader, &INDEX_PUFFIN_READ_BYTES_TOTAL); + let mut puffin_reader = PuffinFileReader::new(file_reader); let file_meta = puffin_reader.metadata().await.unwrap(); @@ -86,3 +93,9 @@ impl SstIndexApplier { Ok(res) } } + +impl Drop for SstIndexApplier { + fn drop(&mut self) { + INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64); + } +} diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index 5f9c0042f370..998c635e61ae 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -31,7 +31,9 @@ use store_api::metadata::RegionMetadataRef; use tokio::io::duplex; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use super::io_stats::InstrumentedAsyncWrite; use crate::error::{OpenDalSnafu, PushIndexValueSnafu, Result}; +use crate::metrics::INDEX_PUFFIN_WRITE_BYTES_TOTAL; use crate::read::Batch; use crate::sst::file::FileId; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; @@ -154,12 +156,13 @@ impl SstIndexCreator { let mut guard = self.stats.record_finish(); let file_path = location::index_file_path(&self.region_dir, &self.sst_file_id); - let writer = self + let file_writer = self .object_store .writer(&file_path) .await .context(OpenDalSnafu)?; - let mut puffin_writer = PuffinFileWriter::new(writer); + let file_writer = InstrumentedAsyncWrite::new(file_writer, &INDEX_PUFFIN_WRITE_BYTES_TOTAL); + let mut puffin_writer = PuffinFileWriter::new(file_writer); let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); diff --git a/src/mito2/src/sst/index/creator/temp_provider.rs b/src/mito2/src/sst/index/creator/temp_provider.rs index 2148a19af9bd..434fd8d4120f 100644 --- a/src/mito2/src/sst/index/creator/temp_provider.rs +++ b/src/mito2/src/sst/index/creator/temp_provider.rs @@ -23,6 +23,8 @@ use object_store::ObjectStore; use snafu::ResultExt; use crate::error::{OpenDalSnafu, Result}; +use crate::metrics::{INDEX_INTERMEDIATE_READ_BYTES_TOTAL, INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL}; +use crate::sst::index::io_stats::{InstrumentedAsyncRead, InstrumentedAsyncWrite}; use crate::sst::location::IntermediateLocation; pub(crate) struct TempFileProvider { @@ -45,6 +47,7 @@ impl ExternalTempFileProvider for TempFileProvider { .context(OpenDalSnafu) .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; + let writer = InstrumentedAsyncWrite::new(writer, &INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL); Ok(Box::new(writer)) } @@ -75,6 +78,7 @@ impl ExternalTempFileProvider for TempFileProvider { .context(OpenDalSnafu) .map_err(BoxedError::new) .context(index_error::ExternalSnafu)?; + let reader = InstrumentedAsyncRead::new(reader, &INDEX_INTERMEDIATE_READ_BYTES_TOTAL); readers.push(Box::new(reader) as _); } diff --git a/src/mito2/src/sst/index/io_stats.rs b/src/mito2/src/sst/index/io_stats.rs new file mode 100644 index 000000000000..b63166caca70 --- /dev/null +++ b/src/mito2/src/sst/index/io_stats.rs @@ -0,0 +1,122 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use pin_project::pin_project; +use prometheus::IntCounter; + +struct BytesRecorder { + bytes: usize, + recorder: &'static IntCounter, +} + +impl BytesRecorder { + fn new(recorder: &'static IntCounter) -> Self { + Self { bytes: 0, recorder } + } + + fn inc_by(&mut self, bytes: usize) { + self.bytes += bytes; + } +} + +impl Drop for BytesRecorder { + fn drop(&mut self) { + if self.bytes > 0 { + self.recorder.inc_by(self.bytes as _); + } + } +} + +#[pin_project] +pub(crate) struct InstrumentedAsyncRead { + #[pin] + inner: R, + recorder: BytesRecorder, +} + +impl InstrumentedAsyncRead { + pub(crate) fn new(inner: R, recorder: &'static IntCounter) -> Self { + Self { + inner, + recorder: BytesRecorder::new(recorder), + } + } +} + +impl AsyncRead for InstrumentedAsyncRead { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_read(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.recorder.inc_by(*n); + } + poll + } +} + +impl AsyncSeek for InstrumentedAsyncRead { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: io::SeekFrom, + ) -> Poll> { + self.project().inner.poll_seek(cx, pos) + } +} + +#[pin_project] +pub(crate) struct InstrumentedAsyncWrite { + #[pin] + inner: W, + recorder: BytesRecorder, +} + +impl InstrumentedAsyncWrite { + pub(crate) fn new(inner: W, recorder: &'static IntCounter) -> Self { + Self { + inner, + recorder: BytesRecorder::new(recorder), + } + } +} + +impl AsyncWrite for InstrumentedAsyncWrite { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_write(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.recorder.inc_by(*n); + } + poll + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +}