Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: change binary array type from LargeBinaryArray to BinaryArray #3924

Merged
merged 12 commits into from
May 18, 2024
61 changes: 51 additions & 10 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ mod tests {
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
Expand All @@ -100,7 +103,7 @@ mod tests {
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata,
new_batch_with_large_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -453,30 +456,68 @@ mod tests {
let writer_props = props_builder.build();

let write_format = WriteFormat::new(metadata.clone());
let string = file_path.clone();
let fields: Vec<_> = write_format
.arrow_schema()
.fields()
.into_iter()
.map(|field| {
let data_type = field.data_type().clone();
if data_type == DataType::Binary {
Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
} else {
Field::new(field.name(), data_type, field.is_nullable())
}
})
.collect();

let arrow_schema = Arc::new(Schema::new(fields));

// Ensures field_0 has LargeBinary type.
assert_eq!(
DataType::LargeBinary,
arrow_schema
.field_with_name("field_0")
.unwrap()
.data_type()
.clone()
);
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
let mut buffered_writer = BufferedWriter::try_new(
string,
file_path.clone(),
object_store.clone(),
write_format.arrow_schema(),
arrow_schema.clone(),
Some(writer_props),
write_opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.unwrap();
let batch = new_batch_with_binary_by_range(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();

buffered_writer.write(&arrow_batch).await.unwrap();

let batch = new_batch_with_large_binary(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
let arrays: Vec<_> = arrow_batch
.columns()
.iter()
.map(|array| {
let data_type = array.data_type().clone();
if data_type == DataType::Binary {
arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
} else {
array.clone()
}
})
.collect();
let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();

buffered_writer.write(&result).await.unwrap();
buffered_writer.close().await.unwrap();

let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary_by_range(&["a"], 0, 50),
new_batch_with_binary_by_range(&["a"], 50, 60),
new_batch_with_large_binary(&["a"], 0, 50),
new_batch_with_large_binary(&["a"], 50, 60),
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
],
)
.await;
Expand Down
34 changes: 1 addition & 33 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{
LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
Expand Down Expand Up @@ -560,36 +558,6 @@ impl Iterator for VecBatchReader {
}
}

pub fn new_binary_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field_column_id: ColumnId,
field: Vec<Vec<u8>>,
) -> BatchBuilder {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
field_column_id,
Arc::new(LargeBinaryArray::from_iter_values(field)),
)
.unwrap();
builder
}

pub fn new_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
Expand Down
33 changes: 24 additions & 9 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::sync::Arc;

use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::arrow::array::{
LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
Expand All @@ -27,12 +30,10 @@ use store_api::metadata::{
};
use store_api::storage::RegionId;

use crate::read::{Batch, Source};
use crate::read::{Batch, BatchBuilder, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{
new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader,
};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
Expand Down Expand Up @@ -132,7 +133,7 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
.unwrap()
}

pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
pub fn new_batch_with_large_binary(tags: &[&str], start: usize, end: usize) -> Batch {
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
assert!(end >= start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
Expand All @@ -143,9 +144,23 @@ pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -
.map(|_v| "some data".as_bytes().to_vec())
.collect();

new_binary_batch_builder(&pk, &timestamps, &sequences, &op_types, 1, field)
.build()
let mut builder = BatchBuilder::new(pk);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(1, Arc::new(LargeBinaryArray::from_iter_values(field)))
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
.unwrap();
builder.build().unwrap()
}

/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
Expand Down Expand Up @@ -174,7 +189,7 @@ pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaDat

/// Creates a new region metadata for testing SSTs with binary datatype.
///
/// Schema: tag_0, field_1(binary), ts
/// Schema: tag_0(string), field_0(binary), ts
pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
Expand All @@ -188,7 +203,7 @@ pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true),
column_schema: ColumnSchema::new("field_0", ConcreteDataType::binary_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 1,
})
Expand Down