From e0a00b4b24d3a048bc6b94318ca65ea996f0be58 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 2 Nov 2023 12:40:56 +0800 Subject: [PATCH] test: test cache --- src/mito2/src/sst/parquet.rs | 63 ++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index ad4caa7c4d38..af3f8479f39c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -69,6 +69,7 @@ mod tests { use common_time::Timestamp; use super::*; + use crate::cache::{CacheManager, PageKey}; use crate::read::Batch; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -109,8 +110,7 @@ mod tests { ..Default::default() }; - let mut writer = - ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let mut writer = ParquetWriter::new(file_path, metadata, source, object_store.clone()); let info = writer.write_all(&write_opts).await.unwrap().unwrap(); assert_eq!(200, info.num_rows); assert!(info.file_size > 0); @@ -136,4 +136,63 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_read_with_cache() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + // Prepare data. + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + writer.write_all(&write_opts).await.unwrap().unwrap(); + + let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024))); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) + .cache(cache.clone()); + for _ in 0..3 { + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 150), + new_batch_by_range(&["b", "h"], 150, 200), + ], + ) + .await; + } + + // Cache 4 row groups. + for i in 0..4 { + let page_key = PageKey { + region_id: metadata.region_id, + file_id: handle.file_id(), + row_group_idx: i, + column_idx: 0, + }; + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); + } + let page_key = PageKey { + region_id: metadata.region_id, + file_id: handle.file_id(), + row_group_idx: 5, + column_idx: 0, + }; + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + } }