Skip to content

Commit

Permalink
chore: add a test with LargeBinary type
Browse files Browse the repository at this point in the history
  • Loading branch information
etolbakov authored May 16, 2024
1 parent ce4d595 commit 9f55436
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 5 deletions.
86 changes: 84 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,28 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use table::predicate::Predicate;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
sst_region_metadata,
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -399,4 +408,77 @@ mod tests {
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
}

fn customize_column_config(
builder: WriterPropertiesBuilder,
region_metadata: &RegionMetadataRef,
) -> WriterPropertiesBuilder {
let ts_col = ColumnPath::new(vec![region_metadata
.time_index_column()
.column_schema
.name
.clone()]);
let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);

builder
.set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(seq_col, false)
.set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(ts_col, false)
}

#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);

let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

let metadata = build_test_binary_test_region_metadata();
let json = metadata.to_json().unwrap();
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);

let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);

let props_builder = customize_column_config(props_builder, &metadata);
let writer_props = props_builder.build();

let write_format = WriteFormat::new(metadata.clone());
let string = file_path.clone();
let mut buffered_writer = BufferedWriter::try_new(
string,
object_store.clone(),
write_format.arrow_schema(),
Some(writer_props),
write_opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.unwrap();
let batch = new_batch_with_binary_by_range(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();

buffered_writer.write(&arrow_batch).await.unwrap();

buffered_writer.close().await.unwrap();
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary_by_range(&["a"], 0, 50),
new_batch_with_binary_by_range(&["a"], 50, 60),
],
)
.await;
}
}
34 changes: 33 additions & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::array::{
LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
Expand Down Expand Up @@ -558,6 +560,36 @@ impl Iterator for VecBatchReader {
}
}

pub fn new_binary_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field_column_id: ColumnId,
field: Vec<Vec<u8>>,
) -> BatchBuilder {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
field_column_id,
Arc::new(LargeBinaryArray::from_iter_values(field)),
)
.unwrap();
builder
}

pub fn new_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
Expand Down
57 changes: 55 additions & 2 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::storage::RegionId;

use crate::read::{Batch, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};
use crate::test_util::{
new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader,
};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
Expand Down Expand Up @@ -128,6 +132,22 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
.unwrap()
}

pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end >= start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
let sequences = vec![1000; end - start];
let op_types = vec![OpType::Put; end - start];

let field: Vec<_> = (start..end)
.map(|_v| "some data".as_bytes().to_vec())
.collect();

new_binary_batch_builder(&pk, &timestamps, &sequences, &op_types, 1, field)
.build()
.unwrap()
}

/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaData>) {
macro_rules! assert_metadata {
Expand All @@ -151,3 +171,36 @@ pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaDat

assert_metadata!(a, b, row_groups, column_index, offset_index,);
}

/// Creates a new region metadata for testing SSTs with binary datatype.
///
/// Schema: tag_0, field_1(binary), ts
pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![0]);
Arc::new(builder.build().unwrap())
}

0 comments on commit 9f55436

Please sign in to comment.