From dbed27df41481e70dbf5c01ae10415409da61d70 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 2 Nov 2023 12:34:55 +0800 Subject: [PATCH] test: test write read parquet --- src/mito2/src/read.rs | 2 +- src/mito2/src/sst/parquet.rs | 77 +++++++++++++++++++ src/mito2/src/sst/parquet/writer.rs | 2 - src/mito2/src/test_util.rs | 26 +++++-- src/mito2/src/test_util/sst_util.rs | 112 ++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 src/mito2/src/test_util/sst_util.rs diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ee2305fdd916..7c3fda279026 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -686,7 +686,7 @@ mod tests { op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(b"test", timestamps, sequences, op_types, field) + new_batch_builder(b"test", timestamps, sequences, op_types, 1, field) .build() .unwrap() } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 73604638c604..ad4caa7c4d38 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -60,3 +60,80 @@ pub struct SstInfo { /// Number of rows. pub num_rows: usize, } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::OpType; + use common_time::Timestamp; + + use super::*; + use crate::read::Batch; + use crate::sst::parquet::reader::ParquetReaderBuilder; + use crate::sst::parquet::writer::ParquetWriter; + use crate::test_util::sst_util::{ + new_primary_key, new_source, sst_file_handle, sst_region_metadata, + }; + use crate::test_util::{check_reader_result, new_batch_builder, TestEnv}; + + const FILE_DIR: &str = "/"; + + fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end > start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + let field: Vec<_> = (start..end).map(|v| v as u64).collect(); + new_batch_builder(&pk, ×tamps, &sequences, &op_types, 2, &field) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_write_read() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + let metadata = Arc::new(sst_region_metadata()); + let source = new_source(&[ + new_batch_by_range(&["a", "d"], 0, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 200), + ]); + // Use a small row group size for test. + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + let mut writer = + ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone()); + let info = writer.write_all(&write_opts).await.unwrap().unwrap(); + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(199) + ), + info.time_range + ); + + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_by_range(&["a", "d"], 0, 50), + new_batch_by_range(&["a", "d"], 50, 60), + new_batch_by_range(&["b", "f"], 0, 40), + new_batch_by_range(&["b", "h"], 100, 150), + new_batch_by_range(&["b", "h"], 150, 200), + ], + ) + .await; + } +} diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 14d6a9da3e7a..b236ce1d3c4f 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -155,5 +155,3 @@ impl SourceStats { } } } - -// TODO(yingwen): Port tests. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 69bd22d26e1b..e602643ff5c5 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -17,6 +17,7 @@ pub mod memtable_util; pub mod meta_util; pub mod scheduler_util; +pub mod sst_util; pub mod version_util; use std::collections::HashMap; @@ -195,6 +196,12 @@ impl TestEnv { ) } + /// Only initializes the object store manager, returns the default object store. + pub fn init_object_store_manager(&mut self) -> ObjectStore { + self.object_store_manager = Some(Arc::new(self.create_object_store_manager())); + self.get_object_store().unwrap() + } + /// Creates a new [WorkerGroup] with specific config under this env. pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; @@ -207,14 +214,19 @@ impl TestEnv { ) -> (RaftEngineLogStore, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let data_path = data_home.join("data").as_path().display().to_string(); - let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; + + let object_store_manager = self.create_object_store_manager(); + (log_store, object_store_manager) + } + + fn create_object_store_manager(&self) -> ObjectStoreManager { + let data_home = self.data_home.path(); + let data_path = data_home.join("data").as_path().display().to_string(); let mut builder = Fs::default(); builder.root(&data_path); let object_store = ObjectStore::new(builder).unwrap().finish(); - let object_store_manager = ObjectStoreManager::new("default", object_store); - (log_store, object_store_manager) + ObjectStoreManager::new("default", object_store) } /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` @@ -414,6 +426,7 @@ pub fn new_batch_builder( timestamps: &[i64], sequences: &[u64], op_types: &[OpType], + field_column_id: ColumnId, field: &[u64], ) -> BatchBuilder { let mut builder = BatchBuilder::new(primary_key.to_vec()); @@ -431,13 +444,14 @@ pub fn new_batch_builder( ))) .unwrap() .push_field_array( - 1, + field_column_id, Arc::new(UInt64Array::from_iter_values(field.iter().copied())), ) .unwrap(); builder } +/// Returns a new [Batch] whose field has column id 1. pub fn new_batch( primary_key: &[u8], timestamps: &[i64], @@ -445,7 +459,7 @@ pub fn new_batch( op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(primary_key, timestamps, sequences, op_types, field) + new_batch_builder(primary_key, timestamps, sequences, op_types, 1, field) .build() .unwrap() } diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs new file mode 100644 index 000000000000..3638d119faa1 --- /dev/null +++ b/src/mito2/src/test_util/sst_util.rs @@ -0,0 +1,112 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for testing SSTs. + +use api::v1::SemanticType; +use common_time::Timestamp; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use datatypes::value::ValueRef; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::storage::RegionId; + +use crate::read::{Batch, Source}; +use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; +use crate::sst::file::{FileHandle, FileId, FileMeta}; +use crate::test_util::{new_noop_file_purger, VecBatchReader}; + +/// Test region id. +const REGION_ID: RegionId = RegionId::new(0, 0); + +/// Creates a new region metadata for testing SSTs. +/// +/// Schema: tag_0, tag_1, field_0, ts +pub fn sst_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(REGION_ID); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_1".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![0, 1]); + builder.build().unwrap() +} + +/// Encodes a primary key for specific tags. +pub fn new_primary_key(tags: &[&str]) -> Vec { + let fields = (0..tags.len()) + .map(|_| SortField::new(ConcreteDataType::string_datatype())) + .collect(); + let converter = McmpRowCodec::new(fields); + converter + .encode(tags.iter().map(|tag| ValueRef::String(tag))) + .unwrap() +} + +/// Creates a [Source] from `batches`. +pub fn new_source(batches: &[Batch]) -> Source { + let reader = VecBatchReader::new(batches); + Source::Reader(Box::new(reader)) +} + +/// Creates a new [FileHandle] for a SST. +pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle { + let file_purger = new_noop_file_purger(); + FileHandle::new( + FileMeta { + region_id: REGION_ID, + file_id: FileId::random(), + time_range: ( + Timestamp::new_millisecond(start_ms), + Timestamp::new_millisecond(end_ms), + ), + level: 0, + file_size: 0, + }, + file_purger, + ) +}