Skip to content

Commit

Permalink
test: test write read parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Nov 3, 2023
1 parent 9b1f995 commit b54fa42
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ mod tests {
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(b"test", timestamps, sequences, op_types, field)
new_batch_builder(b"test", timestamps, sequences, op_types, 1, field)
.build()
.unwrap()
}
Expand Down
77 changes: 77 additions & 0 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,80 @@ pub struct SstInfo {
/// Number of rows.
pub num_rows: usize,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use api::v1::OpType;
use common_time::Timestamp;

use super::*;
use crate::read::Batch;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::test_util::sst_util::{
new_primary_key, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, new_batch_builder, TestEnv};

const FILE_DIR: &str = "/";

fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end > start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];
let field: Vec<_> = (start..end).map(|v| v as u64).collect();
new_batch_builder(&pk, &timestamps, &sequences, &op_types, 2, &field)
.build()
.unwrap()
}

#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
// Use a small row group size for test.
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

let mut writer =
ParquetWriter::new(file_path, metadata.clone(), source, object_store.clone());
let info = writer.write_all(&write_opts).await.unwrap().unwrap();
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(199)
),
info.time_range
);

let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 150),
new_batch_by_range(&["b", "h"], 150, 200),
],
)
.await;
}
}
2 changes: 0 additions & 2 deletions src/mito2/src/sst/parquet/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,3 @@ impl SourceStats {
}
}
}

// TODO(yingwen): Port tests.
26 changes: 20 additions & 6 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
pub mod memtable_util;
pub mod meta_util;
pub mod scheduler_util;
pub mod sst_util;
pub mod version_util;

use std::collections::HashMap;
Expand Down Expand Up @@ -162,6 +163,12 @@ impl TestEnv {
)
}

/// Only initializes the object store manager, returns the default object store.
pub fn init_object_store_manager(&mut self) -> ObjectStore {
self.object_store_manager = Some(Arc::new(self.create_object_store_manager()));
self.get_object_store().unwrap()
}

/// Creates a new [WorkerGroup] with specific config under this env.
pub(crate) async fn create_worker_group(&self, config: MitoConfig) -> WorkerGroup {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
Expand All @@ -174,14 +181,19 @@ impl TestEnv {
) -> (RaftEngineLogStore, ObjectStoreManager) {
let data_home = self.data_home.path();
let wal_path = data_home.join("wal");
let data_path = data_home.join("data").as_path().display().to_string();

let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await;

let object_store_manager = self.create_object_store_manager();
(log_store, object_store_manager)
}

fn create_object_store_manager(&self) -> ObjectStoreManager {
let data_home = self.data_home.path();
let data_path = data_home.join("data").as_path().display().to_string();
let mut builder = Fs::default();
builder.root(&data_path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let object_store_manager = ObjectStoreManager::new("default", object_store);
(log_store, object_store_manager)
ObjectStoreManager::new("default", object_store)
}

/// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata`
Expand Down Expand Up @@ -381,6 +393,7 @@ pub fn new_batch_builder(
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field_column_id: ColumnId,
field: &[u64],
) -> BatchBuilder {
let mut builder = BatchBuilder::new(primary_key.to_vec());
Expand All @@ -398,21 +411,22 @@ pub fn new_batch_builder(
)))
.unwrap()
.push_field_array(
1,
field_column_id,
Arc::new(UInt64Array::from_iter_values(field.iter().copied())),
)
.unwrap();
builder
}

/// Returns a new [Batch] whose field has column id 1.
pub fn new_batch(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field: &[u64],
) -> Batch {
new_batch_builder(primary_key, timestamps, sequences, op_types, field)
new_batch_builder(primary_key, timestamps, sequences, op_types, 1, field)
.build()
.unwrap()
}
Expand Down
112 changes: 112 additions & 0 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Utilities for testing SSTs.
use api::v1::SemanticType;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;

use crate::read::{Batch, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_noop_file_purger, VecBatchReader};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);

/// Creates a new region metadata for testing SSTs.
///
/// Schema: tag_0, tag_1, field_0, ts
pub fn sst_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(REGION_ID);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![0, 1]);
builder.build().unwrap()
}

/// Encodes a primary key for specific tags.
pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
let fields = (0..tags.len())
.map(|_| SortField::new(ConcreteDataType::string_datatype()))
.collect();
let converter = McmpRowCodec::new(fields);
converter
.encode(tags.iter().map(|tag| ValueRef::String(tag)))
.unwrap()
}

/// Creates a [Source] from `batches`.
pub fn new_source(batches: &[Batch]) -> Source {
let reader = VecBatchReader::new(batches);
Source::Reader(Box::new(reader))
}

/// Creates a new [FileHandle] for a SST.
pub fn sst_file_handle(start_ms: i64, end_ms: i64) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
FileMeta {
region_id: REGION_ID,
file_id: FileId::random(),
time_range: (
Timestamp::new_millisecond(start_ms),
Timestamp::new_millisecond(end_ms),
),
level: 0,
file_size: 0,
},
file_purger,
)
}

0 comments on commit b54fa42

Please sign in to comment.