diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 7315a4e75242..de96f8881e46 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -215,7 +215,7 @@ impl RowGroupLastRowReader { }; // All last rows in row group are yielded, update cache. - self.update_cache(); + self.maybe_update_cache(); Ok(last_batch) } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 64f5c1f35e3b..dde78f39e4ee 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -166,15 +166,15 @@ impl<'a> RowGroupBase<'a> { let mut chunk_data = chunk_data.into_iter(); let mut res = vec![]; - for (idx, (chunk, row_group_pages)) in self - .column_chunks - .iter_mut() - .zip(&self.column_uncompressed_pages) - .enumerate() - { - if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() { - continue; - } + for (idx, (chunk, row_group_pages)) in self + .column_chunks + .iter_mut() + .zip(&self.column_uncompressed_pages) + .enumerate() + { + if chunk.is_some() || !projection.leaf_included(idx) || row_group_pages.is_some() { + continue; + } // Get the fetched page. let Some(data) = chunk_data.next() else { @@ -223,7 +223,7 @@ pub struct InMemoryRowGroup<'a> { region_id: RegionId, file_id: FileId, row_group_idx: usize, - cache_manager: CacheManagerRef, + cache_manager: Option, file_path: &'a str, /// Object store. object_store: ObjectStore, @@ -240,7 +240,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id: FileId, parquet_meta: &'a ParquetMetaData, row_group_idx: usize, - cache_manager: CacheManagerRef, + cache_manager: Option, file_path: &'a str, object_store: ObjectStore, ) -> Self { @@ -293,19 +293,21 @@ impl<'a> InMemoryRowGroup<'a> { let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data); // Put fetched data to cache if necessary. - for (col_idx, data) in assigned_columns { - let column = self.base.metadata.column(col_idx); - if !cache_uncompressed_pages(column) { - // For columns that have multiple uncompressed pages, we only cache the compressed page - // to save memory. - let page_key = PageKey::new_compressed( - self.region_id, - self.file_id, - self.row_group_idx, - col_idx, - ); - self.cache_manager - .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); + if let Some(cache) = &self.cache_manager { + for (col_idx, data) in assigned_columns { + let column = self.base.metadata.column(col_idx); + if !cache_uncompressed_pages(column) { + // For columns that have multiple uncompressed pages, we only cache the compressed page + // to save memory. + let page_key = PageKey::new_compressed( + self.region_id, + self.file_id, + self.row_group_idx, + col_idx, + ); + cache + .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); + } } } } @@ -323,6 +325,9 @@ impl<'a> InMemoryRowGroup<'a> { .enumerate() .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) .for_each(|(idx, chunk)| { + let Some(cache) = &self.cache_manager else { + return; + }; let column = self.base.metadata.column(idx); if cache_uncompressed_pages(column) { // Fetches uncompressed pages for the row group. @@ -332,8 +337,7 @@ impl<'a> InMemoryRowGroup<'a> { self.row_group_idx, idx, ); - self.base.column_uncompressed_pages[idx] = - self.cache_manager.get_pages(&page_key); + self.base.column_uncompressed_pages[idx] = cache.get_pages(&page_key); } else { // Fetches the compressed page from the cache. let page_key = PageKey::new_compressed( @@ -343,7 +347,7 @@ impl<'a> InMemoryRowGroup<'a> { idx, ); - *chunk = self.cache_manager.get_pages(&page_key).map(|page_value| { + *chunk = cache.get_pages(&page_key).map(|page_value| { Arc::new(ColumnChunkData::Dense { offset: column.byte_range().0 as usize, data: page_value.compressed.clone(), @@ -379,7 +383,7 @@ impl<'a> InMemoryRowGroup<'a> { key: IndexKey, ranges: &[Range], ) -> Option> { - if let Some(cache) = self.cache_manager.write_cache() { + if let Some(cache) = self.cache_manager.as_ref()?.write_cache() { return cache.file_cache().read_ranges(key, ranges).await; } None @@ -395,6 +399,10 @@ impl<'a> InMemoryRowGroup<'a> { let page_reader = self.base.column_reader(i)?; + let Some(cache) = &self.cache_manager else { + return Ok(Box::new(page_reader)); + }; + let column = self.base.metadata.column(i); if cache_uncompressed_pages(column) { // This column use row group level page cache. @@ -403,7 +411,7 @@ impl<'a> InMemoryRowGroup<'a> { let page_value = Arc::new(PageValue::new_row_group(pages)); let page_key = PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i); - self.cache_manager.put_pages(page_key, page_value.clone()); + cache.put_pages(page_key, page_value.clone()); return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group))); }