Skip to content

Commit

Permalink
perf: take a new batch to reduce last row cache usage (#5095)
Browse files Browse the repository at this point in the history
* feat: take and cache last row to save memory

* style: fix clippy
  • Loading branch information
evenyag authored Dec 5, 2024
1 parent 7d8b256 commit 66c0445
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions src/mito2/src/read/last_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use datatypes::vectors::UInt32Vector;
use store_api::storage::TimeSeriesRowSelector;

use crate::cache::{
Expand Down Expand Up @@ -104,10 +105,10 @@ impl RowGroupLastRowCachedReader {
// Schema matches, use cache batches.
Self::new_hit(value)
} else {
Self::new_miss(key, row_group_reader, Some(cache_manager))
Self::new_miss(key, row_group_reader, cache_manager)
}
} else {
Self::new_miss(key, row_group_reader, Some(cache_manager))
Self::new_miss(key, row_group_reader, cache_manager)
}
}

Expand All @@ -121,7 +122,7 @@ impl RowGroupLastRowCachedReader {
fn new_miss(
key: SelectorResultKey,
row_group_reader: RowGroupReader,
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
) -> Self {
selector_result_cache_miss();
Self::Miss(RowGroupLastRowReader::new(
Expand Down Expand Up @@ -166,68 +167,78 @@ pub(crate) struct RowGroupLastRowReader {
reader: RowGroupReader,
selector: LastRowSelector,
yielded_batches: Vec<Batch>,
cache_manager: Option<CacheManagerRef>,
cache_manager: CacheManagerRef,
/// Index buffer to take a new batch from the last row.
take_index: UInt32Vector,
}

impl RowGroupLastRowReader {
fn new(
key: SelectorResultKey,
reader: RowGroupReader,
cache_manager: Option<CacheManagerRef>,
) -> Self {
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_manager: CacheManagerRef) -> Self {
Self {
key,
reader,
selector: LastRowSelector::default(),
yielded_batches: vec![],
cache_manager,
take_index: UInt32Vector::from_vec(vec![0]),
}
}

async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.reader.next_batch().await? {
if let Some(yielded) = self.selector.on_next(batch) {
if self.cache_manager.is_some() {
self.yielded_batches.push(yielded.clone());
}
push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?;
return Ok(Some(yielded));
}
}
let last_batch = if let Some(last_batch) = self.selector.finish() {
if self.cache_manager.is_some() {
self.yielded_batches.push(last_batch.clone());
}
push_yielded_batches(
last_batch.clone(),
&self.take_index,
&mut self.yielded_batches,
)?;
Some(last_batch)
} else {
None
};

// All last rows in row group are yielded, update cache.
self.maybe_update_cache();
self.update_cache();
Ok(last_batch)
}

/// Updates row group's last row cache if cache manager is present.
fn maybe_update_cache(&mut self) {
if let Some(cache) = &self.cache_manager {
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self
.reader
.context()
.read_format()
.projection_indices()
.to_vec(),
});
cache.put_selector_result(self.key, value)
fn update_cache(&mut self) {
if self.yielded_batches.is_empty() {
// we always expect that row groups yields batches.
return;
}
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self
.reader
.context()
.read_format()
.projection_indices()
.to_vec(),
});
self.cache_manager.put_selector_result(self.key, value);
}
}

/// Push last row into `yielded_batches`.
fn push_yielded_batches(
mut batch: Batch,
take_index: &UInt32Vector,
yielded_batches: &mut Vec<Batch>,
) -> Result<()> {
assert_eq!(1, batch.num_rows());
batch.take_in_place(take_index)?;
yielded_batches.push(batch);

Ok(())
}

/// Common struct that selects only the last row of each time series.
#[derive(Default)]
pub struct LastRowSelector {
Expand Down

0 comments on commit 66c0445

Please sign in to comment.