Skip to content

Commit

Permalink
refactor: unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
QuenKar committed Jan 8, 2024
1 parent 8702308 commit bcc421e
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 20 deletions.
4 changes: 4 additions & 0 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl FileCache {
}
}

pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
self.memory_index.contains_key(key)
}

/// Puts a file into the cache index.
///
/// The `WriteCache` should ensure the file is in the correct path.
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/cache/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::sync::Arc;
use bytes::Bytes;
use datatypes::arrow::array::{ArrayRef, Int64Array};
use datatypes::arrow::record_batch::RecordBatch;
use object_store::services::Fs;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -42,3 +44,9 @@ fn parquet_file_data() -> Vec<u8> {

buffer
}

pub(crate) fn new_fs_store(path: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(path);
ObjectStore::new(builder).unwrap().finish()
}
94 changes: 93 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use crate::metrics::{UPLOAD_BYTES_TOTAL, WRITE_AND_UPLOAD_ELAPSED_TOTAL};
use crate::read::Source;
use crate::sst::file::FileId;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions, DEFAULT_WRITE_BUFFER_SIZE};
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// A cache for uploading files to remote object stores.
///
Expand Down Expand Up @@ -137,3 +138,94 @@ pub struct SstUploadRequest {
/// Remote object store to upload.
pub remote_store: ObjectStore,
}

#[cfg(test)]
mod tests {
use api::v1::OpType;
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_temp_dir;
use object_store::manager::ObjectStoreManager;
use object_store::services::Fs;
use object_store::ObjectStore;
use store_api::storage::RegionId;

use super::*;
use crate::cache::file_cache::{self, FileCache};
use crate::cache::test_util::new_fs_store;
use crate::sst::file::FileId;
use crate::sst::location::sst_file_path;
use crate::test_util::sst_util::{
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{build_rows, new_batch_builder, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_write_and_upload_sst() {
// Create a local object store and FileCache.
let local_dir = create_temp_dir("");
let local_store = new_fs_store(local_dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let file_id = FileId::random();

// Create Source
let metadata = Arc::new(sst_region_metadata());
let region_id = metadata.region_id;
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);

// TODO(QuenKar): maybe find a way to create some object server for testing,
// and now just use local file system to mock.
let mut env = TestEnv::new();
let mock_store = env.init_object_store_manager();
let upload_path = sst_file_path("test", file_id);

// Create WriteCache
let object_store_manager = Arc::new(ObjectStoreManager::new("mock", mock_store.clone()));
let write_cache = WriteCache::new(
local_store.clone(),
object_store_manager,
ReadableSize::mb(10),
);

let request = SstUploadRequest {
file_id,
metadata,
source,
storage: Some("mock".to_string()),
upload_path: upload_path.clone(),
remote_store: mock_store.clone(),
};

let write_opts = WriteOptions {
row_group_size: 512,
..Default::default()
};

// Write to cache and upload sst to mock remote store
let sst_info = write_cache
.write_and_upload_sst(request, &write_opts)
.await
.unwrap()
.unwrap();

// Check write cache contains the key
let key = (region_id, file_id);
assert!(write_cache.file_cache.contains_key(&key));

// Check file data
let remote_data = mock_store.read(&upload_path).await.unwrap();
let cache_data = local_store.read(&cache.cache_file_path(key)).await.unwrap();
assert_eq!(remote_data.len(), 3434);
assert_eq!(remote_data, cache_data);

// Delete test files
mock_store
.delete(&upload_path)
.await
.expect("delete must succeed");
write_cache.file_cache.remove(key);
}
}
5 changes: 5 additions & 0 deletions src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

//! Sorted strings tables.
use common_base::readable_size::ReadableSize;

pub mod file;
pub mod file_purger;
pub mod index;
pub mod location;
pub mod parquet;
pub(crate) mod version;

/// Default write buffer size.
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
21 changes: 4 additions & 17 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ pub mod writer;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;

use super::DEFAULT_WRITE_BUFFER_SIZE;
use crate::sst::file::FileTimeRange;

/// Key of metadata in parquet SST.
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);

/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
Expand Down Expand Up @@ -69,33 +70,19 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use api::v1::OpType;
use common_time::Timestamp;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::read::Batch;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::test_util::sst_util::{
new_primary_key, new_source, sst_file_handle, sst_region_metadata,
new_batch_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, new_batch_builder, TestEnv};
use crate::test_util::{check_reader_result, TestEnv};

const FILE_DIR: &str = "/";

fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}

#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
Expand Down
16 changes: 14 additions & 2 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Utilities for testing SSTs.
use api::v1::SemanticType;
use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand All @@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use crate::read::{Batch, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_noop_file_purger, VecBatchReader};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
Expand Down Expand Up @@ -110,3 +110,15 @@ pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
file_purger,
)
}

pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}

0 comments on commit bcc421e

Please sign in to comment.