diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index a38ee33d97da..4519c3e0734e 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -90,6 +90,10 @@ impl FileCache { } } + pub(crate) fn contains_key(&self, key: &IndexKey) -> bool { + self.memory_index.contains_key(key) + } + /// Puts a file into the cache index. /// /// The `WriteCache` should ensure the file is in the correct path. diff --git a/src/mito2/src/cache/test_util.rs b/src/mito2/src/cache/test_util.rs index deb4ba23cd7b..306bb50467e8 100644 --- a/src/mito2/src/cache/test_util.rs +++ b/src/mito2/src/cache/test_util.rs @@ -19,6 +19,8 @@ use std::sync::Arc; use bytes::Bytes; use datatypes::arrow::array::{ArrayRef, Int64Array}; use datatypes::arrow::record_batch::RecordBatch; +use object_store::services::Fs; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::ArrowWriter; use parquet::file::metadata::ParquetMetaData; @@ -42,3 +44,9 @@ fn parquet_file_data() -> Vec { buffer } + +pub(crate) fn new_fs_store(path: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(path); + ObjectStore::new(builder).unwrap().finish() +} diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 55c87d528304..839f913c1b0e 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -30,7 +30,8 @@ use crate::metrics::{UPLOAD_BYTES_TOTAL, WRITE_AND_UPLOAD_ELAPSED_TOTAL}; use crate::read::Source; use crate::sst::file::FileId; use crate::sst::parquet::writer::ParquetWriter; -use crate::sst::parquet::{SstInfo, WriteOptions, DEFAULT_WRITE_BUFFER_SIZE}; +use crate::sst::parquet::{SstInfo, WriteOptions}; +use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; /// A cache for uploading files to remote object stores. /// @@ -140,8 +141,83 @@ pub struct SstUploadRequest { #[cfg(test)] mod tests { + use api::v1::OpType; + use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_temp_dir; - - #[test] - fn test_write_and_upload_sst() {} + use object_store::manager::ObjectStoreManager; + use object_store::services::Fs; + use object_store::ObjectStore; + use store_api::storage::RegionId; + + use super::*; + use crate::cache::file_cache::{self, FileCache}; + use crate::cache::test_util::new_fs_store; + use crate::sst::file::FileId; + use crate::sst::location::sst_file_path; + use crate::test_util::sst_util::{ + new_batch_by_range, new_source, sst_file_handle, sst_region_metadata, + }; + use crate::test_util::{build_rows, new_batch_builder, CreateRequestBuilder, TestEnv}; + + #[tokio::test] + async fn test_write_and_upload_sst() { + // Create a local object store and FileCache. + let local_dir = create_temp_dir(""); + let local_store = new_fs_store(local_dir.path().to_str().unwrap()); + let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10)); + let file_id = FileId::random(); + + // Create Env + let mut env = TestEnv::new(); + let metadata = Arc::new(sst_region_metadata()); + let region_id = metadata.region_id; + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + + // Another fs store to mock uploading + let mock_store = env.init_object_store_manager(); + let upload_path = sst_file_path("test", file_id); + + // Create write cache + let object_store_manager = Arc::new(ObjectStoreManager::new("mock", mock_store.clone())); + let write_cache = WriteCache::new(local_store, object_store_manager, ReadableSize::mb(10)); + + let request = SstUploadRequest { + file_id, + metadata, + source, + storage: Some("mock".to_string()), + upload_path: upload_path.clone(), + remote_store: mock_store.clone(), + }; + + let write_opts = WriteOptions { + row_group_size: 512, + ..Default::default() + }; + + // write to cache and upload sst to mock remote store + let sst_info = write_cache + .write_and_upload_sst(request, &write_opts) + .await + .unwrap() + .unwrap(); + + // Check write cache contains the key + let key = (region_id, file_id); + assert!(write_cache.file_cache.contains_key(&key)); + + // Check the file size + let file_size = sst_info.file_size; + // Read from mock remote store + let n: usize = mock_store.read(&upload_path).await.unwrap().len(); + assert_eq!(file_size, n as u64); + + // Delete test files + mock_store.delete(&upload_path).await; + write_cache.file_cache.remove(key); + } } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index a0b8c52c4bd7..384734465ce3 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -14,6 +14,8 @@ //! Sorted strings tables. +use common_base::readable_size::ReadableSize; + pub mod file; pub mod file_purger; pub mod index; diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 56fb70e8f77a..746be130a5b6 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -25,6 +25,7 @@ pub mod writer; use common_base::readable_size::ReadableSize; use parquet::file::metadata::ParquetMetaData; +use super::DEFAULT_WRITE_BUFFER_SIZE; use crate::sst::file::FileTimeRange; /// Key of metadata in parquet SST. @@ -69,33 +70,19 @@ pub struct SstInfo { mod tests { use std::sync::Arc; - use api::v1::OpType; use common_time::Timestamp; use super::*; use crate::cache::{CacheManager, PageKey}; - use crate::read::Batch; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::test_util::sst_util::{ - new_primary_key, new_source, sst_file_handle, sst_region_metadata, + new_batch_by_range, new_source, sst_file_handle, sst_region_metadata, }; - use crate::test_util::{check_reader_result, new_batch_builder, TestEnv}; + use crate::test_util::{check_reader_result, TestEnv}; const FILE_DIR: &str = "/"; - fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { - assert!(end > start); - let pk = new_primary_key(tags); - let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); - let sequences = vec![1000; end - start]; - let op_types = vec![OpType::Put; end - start]; - let field: Vec<_> = (start..end).map(|v| v as u64).collect(); - new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) - .build() - .unwrap() - } - #[tokio::test] async fn test_write_read() { let mut env = TestEnv::new(); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 3638d119faa1..1cc158ed91b5 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -14,7 +14,7 @@ //! Utilities for testing SSTs. -use api::v1::SemanticType; +use api::v1::{OpType, SemanticType}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use crate::read::{Batch, Source}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; -use crate::test_util::{new_noop_file_purger, VecBatchReader}; +use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -110,3 +110,15 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { file_purger, ) } + +pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end > start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + let field: Vec<_> = (start..end).map(|v| v as u64).collect(); + new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) + .build() + .unwrap() +}