From 817b34304759909e66547062ca98d1c56f3f8327 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Fri, 17 May 2024 20:46:04 +0100 Subject: [PATCH] chore: apply CR suggestions --- src/mito2/src/sst/parquet.rs | 61 ++++++++++++++++++++++++----- src/mito2/src/test_util.rs | 34 +--------------- src/mito2/src/test_util/sst_util.rs | 33 +++++++++++----- 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 8ed42ed42194..82f0388d6891 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -83,6 +83,9 @@ mod tests { use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; + use datatypes::arrow; + use datatypes::arrow::array::RecordBatch; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; @@ -100,7 +103,7 @@ mod tests { use crate::sst::DEFAULT_WRITE_CONCURRENCY; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata, + new_batch_with_large_binary, new_source, sst_file_handle, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -453,30 +456,68 @@ mod tests { let writer_props = props_builder.build(); let write_format = WriteFormat::new(metadata.clone()); - let string = file_path.clone(); + let fields: Vec<_> = write_format + .arrow_schema() + .fields() + .into_iter() + .map(|field| { + let data_type = field.data_type().clone(); + if data_type == DataType::Binary { + Field::new(field.name(), DataType::LargeBinary, field.is_nullable()) + } else { + Field::new(field.name(), data_type, field.is_nullable()) + } + }) + .collect(); + + let arrow_schema = Arc::new(Schema::new(fields)); + + // Ensures field_0 has LargeBinary type. + assert_eq!( + DataType::LargeBinary, + arrow_schema + .field_with_name("field_0") + .unwrap() + .data_type() + .clone() + ); let mut buffered_writer = BufferedWriter::try_new( - string, + file_path.clone(), object_store.clone(), - write_format.arrow_schema(), + arrow_schema.clone(), Some(writer_props), write_opts.write_buffer_size.as_bytes() as usize, DEFAULT_WRITE_CONCURRENCY, ) .await .unwrap(); - let batch = new_batch_with_binary_by_range(&["a"], 0, 60); - let arrow_batch = write_format.convert_batch(&batch).unwrap(); - - buffered_writer.write(&arrow_batch).await.unwrap(); + let batch = new_batch_with_large_binary(&["a"], 0, 60); + let arrow_batch = write_format.convert_batch(&batch).unwrap(); + let arrays: Vec<_> = arrow_batch + .columns() + .iter() + .map(|array| { + let data_type = array.data_type().clone(); + if data_type == DataType::Binary { + arrow::compute::cast(array, &DataType::LargeBinary).unwrap() + } else { + array.clone() + } + }) + .collect(); + let result = RecordBatch::try_new(arrow_schema, arrays).unwrap(); + + buffered_writer.write(&result).await.unwrap(); buffered_writer.close().await.unwrap(); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); let mut reader = builder.build().await.unwrap(); check_reader_result( &mut reader, &[ - new_batch_with_binary_by_range(&["a"], 0, 50), - new_batch_with_binary_by_range(&["a"], 50, 60), + new_batch_with_large_binary(&["a"], 0, 50), + new_batch_with_large_binary(&["a"], 50, 60), ], ) .await; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index de226e8feb61..27a746f3ba6a 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -33,9 +33,7 @@ use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datatypes::arrow::array::{ - LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, -}; +use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -560,36 +558,6 @@ impl Iterator for VecBatchReader { } } -pub fn new_binary_batch_builder( - primary_key: &[u8], - timestamps: &[i64], - sequences: &[u64], - op_types: &[OpType], - field_column_id: ColumnId, - field: Vec>, -) -> BatchBuilder { - let mut builder = BatchBuilder::new(primary_key.to_vec()); - builder - .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( - timestamps.iter().copied(), - ))) - .unwrap() - .sequences_array(Arc::new(UInt64Array::from_iter_values( - sequences.iter().copied(), - ))) - .unwrap() - .op_types_array(Arc::new(UInt8Array::from_iter_values( - op_types.iter().map(|v| *v as u8), - ))) - .unwrap() - .push_field_array( - field_column_id, - Arc::new(LargeBinaryArray::from_iter_values(field)), - ) - .unwrap(); - builder -} - pub fn new_batch_builder( primary_key: &[u8], timestamps: &[i64], diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 40af16f828d4..33fa611e1d43 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -18,6 +18,9 @@ use std::sync::Arc; use api::v1::{OpType, SemanticType}; use common_time::Timestamp; +use datatypes::arrow::array::{ + LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, +}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; @@ -27,12 +30,10 @@ use store_api::metadata::{ }; use store_api::storage::RegionId; -use crate::read::{Batch, Source}; +use crate::read::{Batch, BatchBuilder, Source}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; -use crate::test_util::{ - new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader, -}; +use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -132,7 +133,7 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { .unwrap() } -pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch { +pub fn new_batch_with_large_binary(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); @@ -143,9 +144,23 @@ pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) - .map(|_v| "some data".as_bytes().to_vec()) .collect(); - new_binary_batch_builder(&pk, ×tamps, &sequences, &op_types, 1, field) - .build() + let mut builder = BatchBuilder::new(pk); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array(1, Arc::new(LargeBinaryArray::from_iter_values(field))) + .unwrap(); + builder.build().unwrap() } /// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually @@ -174,7 +189,7 @@ pub fn assert_parquet_metadata_eq(a: Arc, b: Arc RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); builder @@ -188,7 +203,7 @@ pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef { column_id: 0, }) .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true), + column_schema: ColumnSchema::new("field_0", ConcreteDataType::binary_datatype(), true), semantic_type: SemanticType::Field, column_id: 1, })