diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index d97e35ac08b9..ee775a8ec2ba 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; +use datatypes::vectors::UInt32Vector; use store_api::storage::TimeSeriesRowSelector; use crate::cache::{ @@ -104,10 +105,10 @@ impl RowGroupLastRowCachedReader { // Schema matches, use cache batches. Self::new_hit(value) } else { - Self::new_miss(key, row_group_reader, Some(cache_manager)) + Self::new_miss(key, row_group_reader, cache_manager) } } else { - Self::new_miss(key, row_group_reader, Some(cache_manager)) + Self::new_miss(key, row_group_reader, cache_manager) } } @@ -121,7 +122,7 @@ impl RowGroupLastRowCachedReader { fn new_miss( key: SelectorResultKey, row_group_reader: RowGroupReader, - cache_manager: Option, + cache_manager: CacheManagerRef, ) -> Self { selector_result_cache_miss(); Self::Miss(RowGroupLastRowReader::new( @@ -166,68 +167,78 @@ pub(crate) struct RowGroupLastRowReader { reader: RowGroupReader, selector: LastRowSelector, yielded_batches: Vec, - cache_manager: Option, + cache_manager: CacheManagerRef, + /// Index buffer to take a new batch from the last row. + take_index: UInt32Vector, } impl RowGroupLastRowReader { - fn new( - key: SelectorResultKey, - reader: RowGroupReader, - cache_manager: Option, - ) -> Self { + fn new(key: SelectorResultKey, reader: RowGroupReader, cache_manager: CacheManagerRef) -> Self { Self { key, reader, selector: LastRowSelector::default(), yielded_batches: vec![], cache_manager, + take_index: UInt32Vector::from_vec(vec![0]), } } async fn next_batch(&mut self) -> Result> { while let Some(batch) = self.reader.next_batch().await? { if let Some(yielded) = self.selector.on_next(batch) { - if self.cache_manager.is_some() { - self.yielded_batches.push(yielded.clone()); - } + push_yielded_batches(yielded.clone(), &self.take_index, &mut self.yielded_batches)?; return Ok(Some(yielded)); } } let last_batch = if let Some(last_batch) = self.selector.finish() { - if self.cache_manager.is_some() { - self.yielded_batches.push(last_batch.clone()); - } + push_yielded_batches( + last_batch.clone(), + &self.take_index, + &mut self.yielded_batches, + )?; Some(last_batch) } else { None }; // All last rows in row group are yielded, update cache. - self.maybe_update_cache(); + self.update_cache(); Ok(last_batch) } /// Updates row group's last row cache if cache manager is present. - fn maybe_update_cache(&mut self) { - if let Some(cache) = &self.cache_manager { - if self.yielded_batches.is_empty() { - // we always expect that row groups yields batches. - return; - } - let value = Arc::new(SelectorResultValue { - result: std::mem::take(&mut self.yielded_batches), - projection: self - .reader - .context() - .read_format() - .projection_indices() - .to_vec(), - }); - cache.put_selector_result(self.key, value) + fn update_cache(&mut self) { + if self.yielded_batches.is_empty() { + // we always expect that row groups yields batches. + return; } + let value = Arc::new(SelectorResultValue { + result: std::mem::take(&mut self.yielded_batches), + projection: self + .reader + .context() + .read_format() + .projection_indices() + .to_vec(), + }); + self.cache_manager.put_selector_result(self.key, value); } } +/// Push last row into `yielded_batches`. +fn push_yielded_batches( + mut batch: Batch, + take_index: &UInt32Vector, + yielded_batches: &mut Vec, +) -> Result<()> { + assert_eq!(1, batch.num_rows()); + batch.take_in_place(take_index)?; + yielded_batches.push(batch); + + Ok(()) +} + /// Common struct that selects only the last row of each time series. #[derive(Default)] pub struct LastRowSelector {