diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index de723cae1e3d..8ed42ed42194 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -79,19 +79,28 @@ pub struct SstInfo { mod tests { use std::sync::Arc; + use common_datasource::file_format::parquet::BufferedWriter; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; + use parquet::basic::{Compression, Encoding, ZstdLevel}; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; + use parquet::schema::types::ColumnPath; + use store_api::metadata::RegionMetadataRef; + use store_api::storage::consts::SEQUENCE_COLUMN_NAME; use table::predicate::Predicate; use super::*; use crate::cache::{CacheManager, PageKey}; use crate::sst::index::Indexer; + use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; + use crate::sst::DEFAULT_WRITE_CONCURRENCY; use crate::test_util::sst_util::{ - assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle, - sst_region_metadata, + assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, + new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -399,4 +408,77 @@ mod tests { let mut reader = builder.build().await.unwrap(); check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await; } + + fn customize_column_config( + builder: WriterPropertiesBuilder, + region_metadata: &RegionMetadataRef, + ) -> WriterPropertiesBuilder { + let ts_col = ColumnPath::new(vec![region_metadata + .time_index_column() + .column_schema + .name + .clone()]); + let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]); + + builder + .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(seq_col, false) + .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(ts_col, false) + } + + #[tokio::test] + async fn test_read_large_binary() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + let metadata = build_test_binary_test_region_metadata(); + let json = metadata.to_json().unwrap(); + let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); + + let props_builder = WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value_meta])) + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_encoding(Encoding::PLAIN) + .set_max_row_group_size(write_opts.row_group_size); + + let props_builder = customize_column_config(props_builder, &metadata); + let writer_props = props_builder.build(); + + let write_format = WriteFormat::new(metadata.clone()); + let string = file_path.clone(); + let mut buffered_writer = BufferedWriter::try_new( + string, + object_store.clone(), + write_format.arrow_schema(), + Some(writer_props), + write_opts.write_buffer_size.as_bytes() as usize, + DEFAULT_WRITE_CONCURRENCY, + ) + .await + .unwrap(); + let batch = new_batch_with_binary_by_range(&["a"], 0, 60); + let arrow_batch = write_format.convert_batch(&batch).unwrap(); + + buffered_writer.write(&arrow_batch).await.unwrap(); + + buffered_writer.close().await.unwrap(); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_with_binary_by_range(&["a"], 0, 50), + new_batch_with_binary_by_range(&["a"], 50, 60), + ], + ) + .await; + } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 27a746f3ba6a..de226e8feb61 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -33,7 +33,9 @@ use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; +use datatypes::arrow::array::{ + LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, +}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -558,6 +560,36 @@ impl Iterator for VecBatchReader { } } +pub fn new_binary_batch_builder( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field_column_id: ColumnId, + field: Vec>, +) -> BatchBuilder { + let mut builder = BatchBuilder::new(primary_key.to_vec()); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) + .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array( + field_column_id, + Arc::new(LargeBinaryArray::from_iter_values(field)), + ) + .unwrap(); + builder +} + pub fn new_batch_builder( primary_key: &[u8], timestamps: &[i64], diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 82308fa1b54f..40af16f828d4 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -22,13 +22,17 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; use parquet::file::metadata::ParquetMetaData; -use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::metadata::{ + ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, +}; use store_api::storage::RegionId; use crate::read::{Batch, Source}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; -use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; +use crate::test_util::{ + new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader, +}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -128,6 +132,22 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { .unwrap() } +pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end >= start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + + let field: Vec<_> = (start..end) + .map(|_v| "some data".as_bytes().to_vec()) + .collect(); + + new_binary_batch_builder(&pk, ×tamps, &sequences, &op_types, 1, field) + .build() + .unwrap() +} + /// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually pub fn assert_parquet_metadata_eq(a: Arc, b: Arc) { macro_rules! assert_metadata { @@ -151,3 +171,36 @@ pub fn assert_parquet_metadata_eq(a: Arc, b: Arc RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![0]); + Arc::new(builder.build().unwrap()) +}