From 269dceb75f7f8396226277a33a4e8e2c280c7407 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Mon, 13 May 2024 08:54:10 +0100 Subject: [PATCH 1/9] chore: change binary array type from LargeBinaryArray to BinaryArray --- src/datatypes/src/arrow_array.rs | 4 ++-- src/datatypes/src/types/binary_type.rs | 2 +- src/datatypes/src/vectors/helper.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/datatypes/src/arrow_array.rs b/src/datatypes/src/arrow_array.rs index d9b231bdb41e..40b7d46d1d02 100644 --- a/src/datatypes/src/arrow_array.rs +++ b/src/datatypes/src/arrow_array.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub type BinaryArray = arrow::array::LargeBinaryArray; -pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder; +pub type BinaryArray = arrow::array::BinaryArray; +pub type MutableBinaryArray = arrow::array::BinaryBuilder; pub type StringArray = arrow::array::StringArray; pub type MutableStringArray = arrow::array::StringBuilder; diff --git a/src/datatypes/src/types/binary_type.rs b/src/datatypes/src/types/binary_type.rs index 7213489da102..fa6aa134fd69 100644 --- a/src/datatypes/src/types/binary_type.rs +++ b/src/datatypes/src/types/binary_type.rs @@ -47,7 +47,7 @@ impl DataType for BinaryType { } fn as_arrow_type(&self) -> ArrowDataType { - ArrowDataType::LargeBinary + ArrowDataType::Binary } fn create_mutable_vector(&self, capacity: usize) -> Box { diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index 21c37ec07742..a3782193904a 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -278,7 +278,7 @@ impl Helper { ArrowDataType::LargeUtf8 => { let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8) .context(crate::error::ArrowComputeSnafu)?; - Arc::new(BinaryVector::try_from_arrow_array(array)?) + Arc::new(StringVector::try_from_arrow_array(array)?) } ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?), ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?), From 58c38c0b54b47283ab15a7b87bb8f2d3d349f77c Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Mon, 13 May 2024 22:36:17 +0100 Subject: [PATCH 2/9] fix: adjust try_into_vector logic --- src/datatypes/src/vectors/helper.rs | 33 +++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index a3782193904a..d261b80e3785 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -258,9 +258,10 @@ impl Helper { Ok(match array.as_ref().data_type() { ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?), ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?), - ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?), - ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => { - let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary) + ArrowDataType::LargeBinary + | ArrowDataType::FixedSizeBinary(_) + | ArrowDataType::Binary => { + let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary) .context(crate::error::ArrowComputeSnafu)?; Arc::new(BinaryVector::try_from_arrow_array(array)?) } @@ -396,14 +397,15 @@ impl Helper { #[cfg(test)] mod tests { use arrow::array::{ - ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, LargeBinaryArray, ListArray, NullArray, + ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, ListArray, NullArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::datatypes::Int32Type; use arrow_array::DictionaryArray; + use arrow_schema::DataType; use common_decimal::Decimal128; use common_time::time::Time; use common_time::timestamp::TimeUnit; @@ -572,14 +574,27 @@ mod tests { assert_eq!(&array, &vector.to_arrow_array()); } + #[test] + fn test_try_binary_array_into_vector() { + let input_vec = vec!["hello".as_bytes(), "world".as_bytes()]; + + let input_large_binary_array: ArrayRef = + Arc::new(LargeBinaryArray::from(input_vec.clone())); + let assertion_array: ArrayRef = Arc::new(BinaryArray::from(input_vec)); + let vector = Helper::try_into_vector(input_large_binary_array.clone()).unwrap(); + + assert_eq!(2, vector.len()); + assert_eq!(0, vector.null_count()); + + let output_arrow_array: ArrayRef = vector.to_arrow_array(); + assert_eq!(&DataType::Binary, output_arrow_array.data_type()); + assert_eq!(&assertion_array, &output_arrow_array); + } + #[test] fn test_try_into_vector() { check_try_into_vector(NullArray::new(2)); check_try_into_vector(BooleanArray::from(vec![true, false])); - check_try_into_vector(LargeBinaryArray::from(vec![ - "hello".as_bytes(), - "world".as_bytes(), - ])); check_try_into_vector(Int8Array::from(vec![1, 2, 3])); check_try_into_vector(Int16Array::from(vec![1, 2, 3])); check_try_into_vector(Int32Array::from(vec![1, 2, 3])); From ccc40c4ff56226d991d93e70f04b9c916ae3c5d6 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Tue, 14 May 2024 22:19:30 +0100 Subject: [PATCH 3/9] fix: apply CR suggestions, add tests --- src/datatypes/src/vectors/binary.rs | 8 +++ src/datatypes/src/vectors/helper.rs | 75 ++++++++++++++++++++--------- 2 files changed, 60 insertions(+), 23 deletions(-) diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 36187bd5af7f..5efa03b72e1a 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -52,6 +52,14 @@ impl From>>> for BinaryVector { } } +impl From> for BinaryVector { + fn from(data: Vec<&[u8]>) -> Self { + Self { + array: BinaryArray::from_iter_values(data), + } + } +} + impl Vector for BinaryVector { fn data_type(&self) -> ConcreteDataType { ConcreteDataType::binary_datatype() diff --git a/src/datatypes/src/vectors/helper.rs b/src/datatypes/src/vectors/helper.rs index d261b80e3785..b583b20697fd 100644 --- a/src/datatypes/src/vectors/helper.rs +++ b/src/datatypes/src/vectors/helper.rs @@ -258,9 +258,8 @@ impl Helper { Ok(match array.as_ref().data_type() { ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?), ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?), - ArrowDataType::LargeBinary - | ArrowDataType::FixedSizeBinary(_) - | ArrowDataType::Binary => { + ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?), + ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => { let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary) .context(crate::error::ArrowComputeSnafu)?; Arc::new(BinaryVector::try_from_arrow_array(array)?) @@ -397,14 +396,15 @@ impl Helper { #[cfg(test)] mod tests { use arrow::array::{ - ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, ListArray, NullArray, + ArrayRef, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array, Int16Array, + Int32Array, Int64Array, Int8Array, LargeBinaryArray, ListArray, NullArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; + use arrow::buffer::Buffer; use arrow::datatypes::Int32Type; - use arrow_array::DictionaryArray; + use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray}; use arrow_schema::DataType; use common_decimal::Decimal128; use common_time::time::Time; @@ -574,23 +574,6 @@ mod tests { assert_eq!(&array, &vector.to_arrow_array()); } - #[test] - fn test_try_binary_array_into_vector() { - let input_vec = vec!["hello".as_bytes(), "world".as_bytes()]; - - let input_large_binary_array: ArrayRef = - Arc::new(LargeBinaryArray::from(input_vec.clone())); - let assertion_array: ArrayRef = Arc::new(BinaryArray::from(input_vec)); - let vector = Helper::try_into_vector(input_large_binary_array.clone()).unwrap(); - - assert_eq!(2, vector.len()); - assert_eq!(0, vector.null_count()); - - let output_arrow_array: ArrayRef = vector.to_arrow_array(); - assert_eq!(&DataType::Binary, output_arrow_array.data_type()); - assert_eq!(&assertion_array, &output_arrow_array); - } - #[test] fn test_try_into_vector() { check_try_into_vector(NullArray::new(2)); @@ -626,6 +609,52 @@ mod tests { Helper::try_into_vector(array).unwrap_err(); } + #[test] + fn test_try_binary_array_into_vector() { + let input_vec: Vec<&[u8]> = vec!["hello".as_bytes(), "world".as_bytes()]; + let assertion_vector = BinaryVector::from(input_vec.clone()); + + let input_arrays: Vec = vec![ + Arc::new(LargeBinaryArray::from(input_vec.clone())) as ArrayRef, + Arc::new(BinaryArray::from(input_vec.clone())) as ArrayRef, + Arc::new(FixedSizeBinaryArray::new( + 5, + Buffer::from_vec("helloworld".as_bytes().to_vec()), + None, + )) as ArrayRef, + ]; + + for input_array in input_arrays { + let vector = Helper::try_into_vector(input_array).unwrap(); + + assert_eq!(2, vector.len()); + assert_eq!(0, vector.null_count()); + + let output_arrow_array: ArrayRef = vector.to_arrow_array(); + assert_eq!(&DataType::Binary, output_arrow_array.data_type()); + assert_eq!(&assertion_vector.to_arrow_array(), &output_arrow_array); + } + } + + #[test] + fn test_large_string_array_into_vector() { + let input_vec = vec!["a", "b"]; + let assertion_array = StringArray::from(input_vec.clone()); + + let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec)); + let vector = Helper::try_into_vector(large_string_array).unwrap(); + assert_eq!(2, vector.len()); + assert_eq!(0, vector.null_count()); + + let output_arrow_array: StringArray = vector + .to_arrow_array() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + assert_eq!(&assertion_array, &output_arrow_array); + } + #[test] fn test_try_from_scalar_time_value() { let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap(); From 85dcc941d5e8dffa7314b859547197be07cb1e86 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Tue, 14 May 2024 22:53:09 +0100 Subject: [PATCH 4/9] chore: fix failing test --- src/datatypes/src/vectors/binary.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 5efa03b72e1a..e2074f949c2b 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -265,7 +265,7 @@ mod tests { let arrow_arr = v.to_arrow_array(); assert_eq!(2, arrow_arr.len()); - assert_eq!(&ArrowDataType::LargeBinary, arrow_arr.data_type()); + assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type()); } #[test] From 972cacca5331c56c51d481a4923be6376f84ac3b Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 15 May 2024 09:16:19 +0100 Subject: [PATCH 5/9] chore: fix integration test --- src/datatypes/src/value.rs | 4 ++-- src/mito2/src/memtable/partition_tree/dict.rs | 4 ---- src/servers/src/mysql/helper.rs | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 9c9b79a4a6a9..2b8d058f7520 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -342,7 +342,7 @@ impl Value { Value::Float32(v) => ScalarValue::Float32(Some(v.0)), Value::Float64(v) => ScalarValue::Float64(Some(v.0)), Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())), - Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())), + Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())), Value::Date(v) => ScalarValue::Date32(Some(v.val())), Value::DateTime(v) => ScalarValue::Date64(Some(v.val())), Value::Null => to_null_scalar_value(output_type)?, @@ -413,7 +413,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result ScalarValue::UInt64(None), ConcreteDataType::Float32(_) => ScalarValue::Float32(None), ConcreteDataType::Float64(_) => ScalarValue::Float64(None), - ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None), + ConcreteDataType::Binary(_) => ScalarValue::Binary(None), ConcreteDataType::String(_) => ScalarValue::Utf8(None), ConcreteDataType::Date(_) => ScalarValue::Date32(None), ConcreteDataType::DateTime(_) => ScalarValue::Date64(None), diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs index 52217dc94bc2..b841abb9cb73 100644 --- a/src/mito2/src/memtable/partition_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -281,10 +281,6 @@ impl Drop for KeyDict { /// Buffer to store unsorted primary keys. struct KeyBuffer { - // We use arrow's binary builder as out default binary builder - // is LargeBinaryBuilder - // TODO(yingwen): Change the type binary vector to Binary instead of LargeBinary. - /// Builder for binary key array. key_builder: BinaryBuilder, next_pk_index: usize, } diff --git a/src/servers/src/mysql/helper.rs b/src/servers/src/mysql/helper.rs index c5d509016da4..f1aede0b5d5f 100644 --- a/src/servers/src/mysql/helper.rs +++ b/src/servers/src/mysql/helper.rs @@ -159,7 +159,7 @@ pub fn convert_value(param: &ParamValue, t: &ConcreteDataType) -> Result Ok(ScalarValue::Utf8(Some( String::from_utf8_lossy(b).to_string(), ))), - ConcreteDataType::Binary(_) => Ok(ScalarValue::LargeBinary(Some(b.to_vec()))), + ConcreteDataType::Binary(_) => Ok(ScalarValue::Binary(Some(b.to_vec()))), _ => error::PreparedStmtTypeMismatchSnafu { expected: t, From ce4d595fe5c40c7cad476c443a5b204055dc9cdc Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Wed, 15 May 2024 09:40:50 +0100 Subject: [PATCH 6/9] chore: adjust the assertions according to changed implementation --- src/datatypes/src/value.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index 2b8d058f7520..cfeff152e2b3 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -2105,7 +2105,7 @@ mod tests { .unwrap() ); assert_eq!( - ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())), + ScalarValue::Binary(Some("world".as_bytes().to_vec())), Value::Binary(Bytes::from("world".as_bytes())) .try_to_scalar_value(&ConcreteDataType::binary_datatype()) .unwrap() @@ -2187,7 +2187,7 @@ mod tests { .unwrap() ); assert_eq!( - ScalarValue::LargeBinary(None), + ScalarValue::Binary(None), Value::Null .try_to_scalar_value(&ConcreteDataType::binary_datatype()) .unwrap() From 9f554362f1a9a3cfb90566c1425766991c21a89c Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Thu, 16 May 2024 15:28:45 +0100 Subject: [PATCH 7/9] chore: add a test with LargeBinary type --- src/mito2/src/sst/parquet.rs | 86 ++++++++++++++++++++++++++++- src/mito2/src/test_util.rs | 34 +++++++++++- src/mito2/src/test_util/sst_util.rs | 57 ++++++++++++++++++- 3 files changed, 172 insertions(+), 5 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index de723cae1e3d..8ed42ed42194 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -79,19 +79,28 @@ pub struct SstInfo { mod tests { use std::sync::Arc; + use common_datasource::file_format::parquet::BufferedWriter; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; + use parquet::basic::{Compression, Encoding, ZstdLevel}; + use parquet::file::metadata::KeyValue; + use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; + use parquet::schema::types::ColumnPath; + use store_api::metadata::RegionMetadataRef; + use store_api::storage::consts::SEQUENCE_COLUMN_NAME; use table::predicate::Predicate; use super::*; use crate::cache::{CacheManager, PageKey}; use crate::sst::index::Indexer; + use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; + use crate::sst::DEFAULT_WRITE_CONCURRENCY; use crate::test_util::sst_util::{ - assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle, - sst_region_metadata, + assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, + new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -399,4 +408,77 @@ mod tests { let mut reader = builder.build().await.unwrap(); check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await; } + + fn customize_column_config( + builder: WriterPropertiesBuilder, + region_metadata: &RegionMetadataRef, + ) -> WriterPropertiesBuilder { + let ts_col = ColumnPath::new(vec![region_metadata + .time_index_column() + .column_schema + .name + .clone()]); + let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]); + + builder + .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(seq_col, false) + .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED) + .set_column_dictionary_enabled(ts_col, false) + } + + #[tokio::test] + async fn test_read_large_binary() { + let mut env = TestEnv::new(); + let object_store = env.init_object_store_manager(); + let handle = sst_file_handle(0, 1000); + let file_path = handle.file_path(FILE_DIR); + + let write_opts = WriteOptions { + row_group_size: 50, + ..Default::default() + }; + + let metadata = build_test_binary_test_region_metadata(); + let json = metadata.to_json().unwrap(); + let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); + + let props_builder = WriterProperties::builder() + .set_key_value_metadata(Some(vec![key_value_meta])) + .set_compression(Compression::ZSTD(ZstdLevel::default())) + .set_encoding(Encoding::PLAIN) + .set_max_row_group_size(write_opts.row_group_size); + + let props_builder = customize_column_config(props_builder, &metadata); + let writer_props = props_builder.build(); + + let write_format = WriteFormat::new(metadata.clone()); + let string = file_path.clone(); + let mut buffered_writer = BufferedWriter::try_new( + string, + object_store.clone(), + write_format.arrow_schema(), + Some(writer_props), + write_opts.write_buffer_size.as_bytes() as usize, + DEFAULT_WRITE_CONCURRENCY, + ) + .await + .unwrap(); + let batch = new_batch_with_binary_by_range(&["a"], 0, 60); + let arrow_batch = write_format.convert_batch(&batch).unwrap(); + + buffered_writer.write(&arrow_batch).await.unwrap(); + + buffered_writer.close().await.unwrap(); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); + let mut reader = builder.build().await.unwrap(); + check_reader_result( + &mut reader, + &[ + new_batch_with_binary_by_range(&["a"], 0, 50), + new_batch_with_binary_by_range(&["a"], 50, 60), + ], + ) + .await; + } } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 27a746f3ba6a..de226e8feb61 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -33,7 +33,9 @@ use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; +use datatypes::arrow::array::{ + LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, +}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -558,6 +560,36 @@ impl Iterator for VecBatchReader { } } +pub fn new_binary_batch_builder( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field_column_id: ColumnId, + field: Vec>, +) -> BatchBuilder { + let mut builder = BatchBuilder::new(primary_key.to_vec()); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) + .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array( + field_column_id, + Arc::new(LargeBinaryArray::from_iter_values(field)), + ) + .unwrap(); + builder +} + pub fn new_batch_builder( primary_key: &[u8], timestamps: &[i64], diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 82308fa1b54f..40af16f828d4 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -22,13 +22,17 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; use parquet::file::metadata::ParquetMetaData; -use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::metadata::{ + ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, +}; use store_api::storage::RegionId; use crate::read::{Batch, Source}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; -use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; +use crate::test_util::{ + new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader, +}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -128,6 +132,22 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { .unwrap() } +pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch { + assert!(end >= start); + let pk = new_primary_key(tags); + let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); + let sequences = vec![1000; end - start]; + let op_types = vec![OpType::Put; end - start]; + + let field: Vec<_> = (start..end) + .map(|_v| "some data".as_bytes().to_vec()) + .collect(); + + new_binary_batch_builder(&pk, ×tamps, &sequences, &op_types, 1, field) + .build() + .unwrap() +} + /// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually pub fn assert_parquet_metadata_eq(a: Arc, b: Arc) { macro_rules! assert_metadata { @@ -151,3 +171,36 @@ pub fn assert_parquet_metadata_eq(a: Arc, b: Arc RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![0]); + Arc::new(builder.build().unwrap()) +} From 817b34304759909e66547062ca98d1c56f3f8327 Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Fri, 17 May 2024 20:46:04 +0100 Subject: [PATCH 8/9] chore: apply CR suggestions --- src/mito2/src/sst/parquet.rs | 61 ++++++++++++++++++++++++----- src/mito2/src/test_util.rs | 34 +--------------- src/mito2/src/test_util/sst_util.rs | 33 +++++++++++----- 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 8ed42ed42194..82f0388d6891 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -83,6 +83,9 @@ mod tests { use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; + use datatypes::arrow; + use datatypes::arrow::array::RecordBatch; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; @@ -100,7 +103,7 @@ mod tests { use crate::sst::DEFAULT_WRITE_CONCURRENCY; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata, + new_batch_with_large_binary, new_source, sst_file_handle, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -453,30 +456,68 @@ mod tests { let writer_props = props_builder.build(); let write_format = WriteFormat::new(metadata.clone()); - let string = file_path.clone(); + let fields: Vec<_> = write_format + .arrow_schema() + .fields() + .into_iter() + .map(|field| { + let data_type = field.data_type().clone(); + if data_type == DataType::Binary { + Field::new(field.name(), DataType::LargeBinary, field.is_nullable()) + } else { + Field::new(field.name(), data_type, field.is_nullable()) + } + }) + .collect(); + + let arrow_schema = Arc::new(Schema::new(fields)); + + // Ensures field_0 has LargeBinary type. + assert_eq!( + DataType::LargeBinary, + arrow_schema + .field_with_name("field_0") + .unwrap() + .data_type() + .clone() + ); let mut buffered_writer = BufferedWriter::try_new( - string, + file_path.clone(), object_store.clone(), - write_format.arrow_schema(), + arrow_schema.clone(), Some(writer_props), write_opts.write_buffer_size.as_bytes() as usize, DEFAULT_WRITE_CONCURRENCY, ) .await .unwrap(); - let batch = new_batch_with_binary_by_range(&["a"], 0, 60); - let arrow_batch = write_format.convert_batch(&batch).unwrap(); - - buffered_writer.write(&arrow_batch).await.unwrap(); + let batch = new_batch_with_large_binary(&["a"], 0, 60); + let arrow_batch = write_format.convert_batch(&batch).unwrap(); + let arrays: Vec<_> = arrow_batch + .columns() + .iter() + .map(|array| { + let data_type = array.data_type().clone(); + if data_type == DataType::Binary { + arrow::compute::cast(array, &DataType::LargeBinary).unwrap() + } else { + array.clone() + } + }) + .collect(); + let result = RecordBatch::try_new(arrow_schema, arrays).unwrap(); + + buffered_writer.write(&result).await.unwrap(); buffered_writer.close().await.unwrap(); + let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store); let mut reader = builder.build().await.unwrap(); check_reader_result( &mut reader, &[ - new_batch_with_binary_by_range(&["a"], 0, 50), - new_batch_with_binary_by_range(&["a"], 50, 60), + new_batch_with_large_binary(&["a"], 0, 50), + new_batch_with_large_binary(&["a"], 50, 60), ], ) .await; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index de226e8feb61..27a746f3ba6a 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -33,9 +33,7 @@ use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use datatypes::arrow::array::{ - LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, -}; +use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -560,36 +558,6 @@ impl Iterator for VecBatchReader { } } -pub fn new_binary_batch_builder( - primary_key: &[u8], - timestamps: &[i64], - sequences: &[u64], - op_types: &[OpType], - field_column_id: ColumnId, - field: Vec>, -) -> BatchBuilder { - let mut builder = BatchBuilder::new(primary_key.to_vec()); - builder - .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( - timestamps.iter().copied(), - ))) - .unwrap() - .sequences_array(Arc::new(UInt64Array::from_iter_values( - sequences.iter().copied(), - ))) - .unwrap() - .op_types_array(Arc::new(UInt8Array::from_iter_values( - op_types.iter().map(|v| *v as u8), - ))) - .unwrap() - .push_field_array( - field_column_id, - Arc::new(LargeBinaryArray::from_iter_values(field)), - ) - .unwrap(); - builder -} - pub fn new_batch_builder( primary_key: &[u8], timestamps: &[i64], diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 40af16f828d4..33fa611e1d43 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -18,6 +18,9 @@ use std::sync::Arc; use api::v1::{OpType, SemanticType}; use common_time::Timestamp; +use datatypes::arrow::array::{ + LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, +}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; @@ -27,12 +30,10 @@ use store_api::metadata::{ }; use store_api::storage::RegionId; -use crate::read::{Batch, Source}; +use crate::read::{Batch, BatchBuilder, Source}; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; use crate::sst::file::{FileHandle, FileId, FileMeta}; -use crate::test_util::{ - new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader, -}; +use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); @@ -132,7 +133,7 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { .unwrap() } -pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch { +pub fn new_batch_with_large_binary(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); @@ -143,9 +144,23 @@ pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) - .map(|_v| "some data".as_bytes().to_vec()) .collect(); - new_binary_batch_builder(&pk, ×tamps, &sequences, &op_types, 1, field) - .build() + let mut builder = BatchBuilder::new(pk); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array(1, Arc::new(LargeBinaryArray::from_iter_values(field))) + .unwrap(); + builder.build().unwrap() } /// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually @@ -174,7 +189,7 @@ pub fn assert_parquet_metadata_eq(a: Arc, b: Arc RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); builder @@ -188,7 +203,7 @@ pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef { column_id: 0, }) .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true), + column_schema: ColumnSchema::new("field_0", ConcreteDataType::binary_datatype(), true), semantic_type: SemanticType::Field, column_id: 1, }) From 5c2dd98528f8aea2f4abfe6b7f1a0c4fb9d3115c Mon Sep 17 00:00:00 2001 From: Eugene Tolbakov Date: Sat, 18 May 2024 08:18:02 +0100 Subject: [PATCH 9/9] chore: simplify tests --- src/mito2/src/sst/parquet.rs | 42 ++++++----------------------- src/mito2/src/test_util/sst_util.rs | 8 +++--- 2 files changed, 11 insertions(+), 39 deletions(-) diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 82f0388d6891..3a49d84a2d8e 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -88,10 +88,7 @@ mod tests { use datatypes::arrow::datatypes::{DataType, Field, Schema}; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; - use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; - use parquet::schema::types::ColumnPath; - use store_api::metadata::RegionMetadataRef; - use store_api::storage::consts::SEQUENCE_COLUMN_NAME; + use parquet::file::properties::WriterProperties; use table::predicate::Predicate; use super::*; @@ -103,7 +100,7 @@ mod tests { use crate::sst::DEFAULT_WRITE_CONCURRENCY; use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, - new_batch_with_large_binary, new_source, sst_file_handle, sst_region_metadata, + new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; @@ -412,24 +409,6 @@ mod tests { check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await; } - fn customize_column_config( - builder: WriterPropertiesBuilder, - region_metadata: &RegionMetadataRef, - ) -> WriterPropertiesBuilder { - let ts_col = ColumnPath::new(vec![region_metadata - .time_index_column() - .column_schema - .name - .clone()]); - let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]); - - builder - .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED) - .set_column_dictionary_enabled(seq_col, false) - .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED) - .set_column_dictionary_enabled(ts_col, false) - } - #[tokio::test] async fn test_read_large_binary() { let mut env = TestEnv::new(); @@ -452,10 +431,9 @@ mod tests { .set_encoding(Encoding::PLAIN) .set_max_row_group_size(write_opts.row_group_size); - let props_builder = customize_column_config(props_builder, &metadata); let writer_props = props_builder.build(); - let write_format = WriteFormat::new(metadata.clone()); + let write_format = WriteFormat::new(metadata); let fields: Vec<_> = write_format .arrow_schema() .fields() @@ -474,12 +452,8 @@ mod tests { // Ensures field_0 has LargeBinary type. assert_eq!( - DataType::LargeBinary, - arrow_schema - .field_with_name("field_0") - .unwrap() - .data_type() - .clone() + &DataType::LargeBinary, + arrow_schema.field_with_name("field_0").unwrap().data_type() ); let mut buffered_writer = BufferedWriter::try_new( file_path.clone(), @@ -492,7 +466,7 @@ mod tests { .await .unwrap(); - let batch = new_batch_with_large_binary(&["a"], 0, 60); + let batch = new_batch_with_binary(&["a"], 0, 60); let arrow_batch = write_format.convert_batch(&batch).unwrap(); let arrays: Vec<_> = arrow_batch .columns() @@ -516,8 +490,8 @@ mod tests { check_reader_result( &mut reader, &[ - new_batch_with_large_binary(&["a"], 0, 50), - new_batch_with_large_binary(&["a"], 50, 60), + new_batch_with_binary(&["a"], 0, 50), + new_batch_with_binary(&["a"], 50, 60), ], ) .await; diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 33fa611e1d43..e2c627a1816c 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -18,9 +18,7 @@ use std::sync::Arc; use api::v1::{OpType, SemanticType}; use common_time::Timestamp; -use datatypes::arrow::array::{ - LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array, -}; +use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; @@ -133,7 +131,7 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { .unwrap() } -pub fn new_batch_with_large_binary(tags: &[&str], start: usize, end: usize) -> Batch { +pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch { assert!(end >= start); let pk = new_primary_key(tags); let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); @@ -158,7 +156,7 @@ pub fn new_batch_with_large_binary(tags: &[&str], start: usize, end: usize) -> B op_types.iter().map(|v| *v as u8), ))) .unwrap() - .push_field_array(1, Arc::new(LargeBinaryArray::from_iter_values(field))) + .push_field_array(1, Arc::new(BinaryArray::from_iter_values(field))) .unwrap(); builder.build().unwrap() }