Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: change binary array type from LargeBinaryArray to BinaryArray #3924

Merged
merged 12 commits into from
May 18, 2024
4 changes: 2 additions & 2 deletions src/datatypes/src/arrow_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub type BinaryArray = arrow::array::LargeBinaryArray;
pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder;
pub type BinaryArray = arrow::array::BinaryArray;
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
pub type StringArray = arrow::array::StringArray;
pub type MutableStringArray = arrow::array::StringBuilder;
2 changes: 1 addition & 1 deletion src/datatypes/src/types/binary_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl DataType for BinaryType {
}

fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::LargeBinary
ArrowDataType::Binary
}

fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
Expand Down
8 changes: 4 additions & 4 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl Value {
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
Value::Null => to_null_scalar_value(output_type)?,
Expand Down Expand Up @@ -413,7 +413,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
ConcreteDataType::Binary(_) => ScalarValue::Binary(None),
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
Expand Down Expand Up @@ -2105,7 +2105,7 @@ mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
ScalarValue::Binary(Some("world".as_bytes().to_vec())),
Value::Binary(Bytes::from("world".as_bytes()))
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
Expand Down Expand Up @@ -2187,7 +2187,7 @@ mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::LargeBinary(None),
ScalarValue::Binary(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
Expand Down
10 changes: 9 additions & 1 deletion src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
}
}

impl From<Vec<&[u8]>> for BinaryVector {
fn from(data: Vec<&[u8]>) -> Self {
Self {
array: BinaryArray::from_iter_values(data),
}
}
}

impl Vector for BinaryVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::binary_datatype()
Expand Down Expand Up @@ -257,7 +265,7 @@ mod tests {

let arrow_arr = v.to_arrow_array();
assert_eq!(2, arrow_arr.len());
assert_eq!(&ArrowDataType::LargeBinary, arrow_arr.data_type());
assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
}

#[test]
Expand Down
62 changes: 53 additions & 9 deletions src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ impl Helper {
Ok(match array.as_ref().data_type() {
ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary)
ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
}
Expand All @@ -278,7 +278,7 @@ impl Helper {
ArrowDataType::LargeUtf8 => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
Arc::new(StringVector::try_from_arrow_array(array)?)
evenyag marked this conversation as resolved.
Show resolved Hide resolved
}
ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
Expand Down Expand Up @@ -402,8 +402,10 @@ mod tests {
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::buffer::Buffer;
use arrow::datatypes::Int32Type;
use arrow_array::DictionaryArray;
use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray};
use arrow_schema::DataType;
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
Expand Down Expand Up @@ -576,10 +578,6 @@ mod tests {
fn test_try_into_vector() {
check_try_into_vector(NullArray::new(2));
check_try_into_vector(BooleanArray::from(vec![true, false]));
check_try_into_vector(LargeBinaryArray::from(vec![
"hello".as_bytes(),
"world".as_bytes(),
]));
check_try_into_vector(Int8Array::from(vec![1, 2, 3]));
check_try_into_vector(Int16Array::from(vec![1, 2, 3]));
check_try_into_vector(Int32Array::from(vec![1, 2, 3]));
Expand Down Expand Up @@ -611,6 +609,52 @@ mod tests {
Helper::try_into_vector(array).unwrap_err();
}

#[test]
fn test_try_binary_array_into_vector() {
let input_vec: Vec<&[u8]> = vec!["hello".as_bytes(), "world".as_bytes()];
let assertion_vector = BinaryVector::from(input_vec.clone());

let input_arrays: Vec<ArrayRef> = vec![
Arc::new(LargeBinaryArray::from(input_vec.clone())) as ArrayRef,
Arc::new(BinaryArray::from(input_vec.clone())) as ArrayRef,
Arc::new(FixedSizeBinaryArray::new(
5,
Buffer::from_vec("helloworld".as_bytes().to_vec()),
None,
)) as ArrayRef,
];

for input_array in input_arrays {
let vector = Helper::try_into_vector(input_array).unwrap();

assert_eq!(2, vector.len());
assert_eq!(0, vector.null_count());

let output_arrow_array: ArrayRef = vector.to_arrow_array();
assert_eq!(&DataType::Binary, output_arrow_array.data_type());
assert_eq!(&assertion_vector.to_arrow_array(), &output_arrow_array);
}
}

#[test]
fn test_large_string_array_into_vector() {
let input_vec = vec!["a", "b"];
let assertion_array = StringArray::from(input_vec.clone());

let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
let vector = Helper::try_into_vector(large_string_array).unwrap();
assert_eq!(2, vector.len());
assert_eq!(0, vector.null_count());

let output_arrow_array: StringArray = vector
.to_arrow_array()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.clone();
assert_eq!(&assertion_array, &output_arrow_array);
}

#[test]
fn test_try_from_scalar_time_value() {
let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap();
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,6 @@ impl Drop for KeyDict {

/// Buffer to store unsorted primary keys.
struct KeyBuffer {
// We use arrow's binary builder as out default binary builder
// is LargeBinaryBuilder
// TODO(yingwen): Change the type binary vector to Binary instead of LargeBinary.
/// Builder for binary key array.
key_builder: BinaryBuilder,
next_pk_index: usize,
}
Expand Down
86 changes: 84 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,28 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
use table::predicate::Predicate;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
sst_region_metadata,
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -399,4 +408,77 @@ mod tests {
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
}

fn customize_column_config(
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
builder: WriterPropertiesBuilder,
region_metadata: &RegionMetadataRef,
) -> WriterPropertiesBuilder {
let ts_col = ColumnPath::new(vec![region_metadata
.time_index_column()
.column_schema
.name
.clone()]);
let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);

builder
.set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(seq_col, false)
.set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
.set_column_dictionary_enabled(ts_col, false)
}

#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);

let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

let metadata = build_test_binary_test_region_metadata();
let json = metadata.to_json().unwrap();
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);

let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);

let props_builder = customize_column_config(props_builder, &metadata);
let writer_props = props_builder.build();

let write_format = WriteFormat::new(metadata.clone());
let string = file_path.clone();
let mut buffered_writer = BufferedWriter::try_new(
string,
object_store.clone(),
write_format.arrow_schema(),
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
Some(writer_props),
write_opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.unwrap();
let batch = new_batch_with_binary_by_range(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
etolbakov marked this conversation as resolved.
Show resolved Hide resolved

buffered_writer.write(&arrow_batch).await.unwrap();

buffered_writer.close().await.unwrap();
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary_by_range(&["a"], 0, 50),
new_batch_with_binary_by_range(&["a"], 50, 60),
],
)
.await;
}
}
34 changes: 33 additions & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::arrow::array::{
LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
Expand Down Expand Up @@ -558,6 +560,36 @@ impl Iterator for VecBatchReader {
}
}

pub fn new_binary_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field_column_id: ColumnId,
field: Vec<Vec<u8>>,
) -> BatchBuilder {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
field_column_id,
Arc::new(LargeBinaryArray::from_iter_values(field)),
)
.unwrap();
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
builder
}

pub fn new_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
Expand Down
Loading