Skip to content

Commit

Permalink
chore: change binary array type from LargeBinaryArray to BinaryArray (G…
Browse files Browse the repository at this point in the history
…reptimeTeam#3924)

* chore: change binary array type from LargeBinaryArray to BinaryArray

* fix: adjust try_into_vector logic

* fix: apply CR suggestions, add tests

* chore: fix failing test

* chore: fix integration test

* chore: adjust the assertions according to changed implementation

* chore: add a test with LargeBinary type

* chore: apply CR suggestions

* chore: simplify tests
  • Loading branch information
etolbakov authored and WenyXu committed May 21, 2024
1 parent 00884c5 commit b821fd7
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 26 deletions.
4 changes: 2 additions & 2 deletions src/datatypes/src/arrow_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub type BinaryArray = arrow::array::LargeBinaryArray;
pub type MutableBinaryArray = arrow::array::LargeBinaryBuilder;
pub type BinaryArray = arrow::array::BinaryArray;
pub type MutableBinaryArray = arrow::array::BinaryBuilder;
pub type StringArray = arrow::array::StringArray;
pub type MutableStringArray = arrow::array::StringBuilder;
2 changes: 1 addition & 1 deletion src/datatypes/src/types/binary_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl DataType for BinaryType {
}

fn as_arrow_type(&self) -> ArrowDataType {
ArrowDataType::LargeBinary
ArrowDataType::Binary
}

fn create_mutable_vector(&self, capacity: usize) -> Box<dyn MutableVector> {
Expand Down
8 changes: 4 additions & 4 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl Value {
Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
Value::Binary(v) => ScalarValue::LargeBinary(Some(v.to_vec())),
Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
Value::DateTime(v) => ScalarValue::Date64(Some(v.val())),
Value::Null => to_null_scalar_value(output_type)?,
Expand Down Expand Up @@ -413,7 +413,7 @@ pub fn to_null_scalar_value(output_type: &ConcreteDataType) -> Result<ScalarValu
ConcreteDataType::UInt64(_) => ScalarValue::UInt64(None),
ConcreteDataType::Float32(_) => ScalarValue::Float32(None),
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
ConcreteDataType::Binary(_) => ScalarValue::Binary(None),
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
ConcreteDataType::Date(_) => ScalarValue::Date32(None),
ConcreteDataType::DateTime(_) => ScalarValue::Date64(None),
Expand Down Expand Up @@ -2105,7 +2105,7 @@ mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::LargeBinary(Some("world".as_bytes().to_vec())),
ScalarValue::Binary(Some("world".as_bytes().to_vec())),
Value::Binary(Bytes::from("world".as_bytes()))
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
Expand Down Expand Up @@ -2187,7 +2187,7 @@ mod tests {
.unwrap()
);
assert_eq!(
ScalarValue::LargeBinary(None),
ScalarValue::Binary(None),
Value::Null
.try_to_scalar_value(&ConcreteDataType::binary_datatype())
.unwrap()
Expand Down
10 changes: 9 additions & 1 deletion src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ impl From<Vec<Option<Vec<u8>>>> for BinaryVector {
}
}

impl From<Vec<&[u8]>> for BinaryVector {
fn from(data: Vec<&[u8]>) -> Self {
Self {
array: BinaryArray::from_iter_values(data),
}
}
}

impl Vector for BinaryVector {
fn data_type(&self) -> ConcreteDataType {
ConcreteDataType::binary_datatype()
Expand Down Expand Up @@ -257,7 +265,7 @@ mod tests {

let arrow_arr = v.to_arrow_array();
assert_eq!(2, arrow_arr.len());
assert_eq!(&ArrowDataType::LargeBinary, arrow_arr.data_type());
assert_eq!(&ArrowDataType::Binary, arrow_arr.data_type());
}

#[test]
Expand Down
62 changes: 53 additions & 9 deletions src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ impl Helper {
Ok(match array.as_ref().data_type() {
ArrowDataType::Null => Arc::new(NullVector::try_from_arrow_array(array)?),
ArrowDataType::Boolean => Arc::new(BooleanVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Binary => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::LargeBinary)
ArrowDataType::Binary => Arc::new(BinaryVector::try_from_arrow_array(array)?),
ArrowDataType::LargeBinary | ArrowDataType::FixedSizeBinary(_) => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Binary)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
}
Expand All @@ -278,7 +278,7 @@ impl Helper {
ArrowDataType::LargeUtf8 => {
let array = arrow::compute::cast(array.as_ref(), &ArrowDataType::Utf8)
.context(crate::error::ArrowComputeSnafu)?;
Arc::new(BinaryVector::try_from_arrow_array(array)?)
Arc::new(StringVector::try_from_arrow_array(array)?)
}
ArrowDataType::Date32 => Arc::new(DateVector::try_from_arrow_array(array)?),
ArrowDataType::Date64 => Arc::new(DateTimeVector::try_from_arrow_array(array)?),
Expand Down Expand Up @@ -402,8 +402,10 @@ mod tests {
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::buffer::Buffer;
use arrow::datatypes::Int32Type;
use arrow_array::DictionaryArray;
use arrow_array::{BinaryArray, DictionaryArray, FixedSizeBinaryArray, LargeStringArray};
use arrow_schema::DataType;
use common_decimal::Decimal128;
use common_time::time::Time;
use common_time::timestamp::TimeUnit;
Expand Down Expand Up @@ -576,10 +578,6 @@ mod tests {
fn test_try_into_vector() {
check_try_into_vector(NullArray::new(2));
check_try_into_vector(BooleanArray::from(vec![true, false]));
check_try_into_vector(LargeBinaryArray::from(vec![
"hello".as_bytes(),
"world".as_bytes(),
]));
check_try_into_vector(Int8Array::from(vec![1, 2, 3]));
check_try_into_vector(Int16Array::from(vec![1, 2, 3]));
check_try_into_vector(Int32Array::from(vec![1, 2, 3]));
Expand Down Expand Up @@ -611,6 +609,52 @@ mod tests {
Helper::try_into_vector(array).unwrap_err();
}

#[test]
fn test_try_binary_array_into_vector() {
let input_vec: Vec<&[u8]> = vec!["hello".as_bytes(), "world".as_bytes()];
let assertion_vector = BinaryVector::from(input_vec.clone());

let input_arrays: Vec<ArrayRef> = vec![
Arc::new(LargeBinaryArray::from(input_vec.clone())) as ArrayRef,
Arc::new(BinaryArray::from(input_vec.clone())) as ArrayRef,
Arc::new(FixedSizeBinaryArray::new(
5,
Buffer::from_vec("helloworld".as_bytes().to_vec()),
None,
)) as ArrayRef,
];

for input_array in input_arrays {
let vector = Helper::try_into_vector(input_array).unwrap();

assert_eq!(2, vector.len());
assert_eq!(0, vector.null_count());

let output_arrow_array: ArrayRef = vector.to_arrow_array();
assert_eq!(&DataType::Binary, output_arrow_array.data_type());
assert_eq!(&assertion_vector.to_arrow_array(), &output_arrow_array);
}
}

#[test]
fn test_large_string_array_into_vector() {
let input_vec = vec!["a", "b"];
let assertion_array = StringArray::from(input_vec.clone());

let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(input_vec));
let vector = Helper::try_into_vector(large_string_array).unwrap();
assert_eq!(2, vector.len());
assert_eq!(0, vector.null_count());

let output_arrow_array: StringArray = vector
.to_arrow_array()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.clone();
assert_eq!(&assertion_array, &output_arrow_array);
}

#[test]
fn test_try_from_scalar_time_value() {
let vector = Helper::try_from_scalar_value(ScalarValue::Time32Second(Some(42)), 3).unwrap();
Expand Down
4 changes: 0 additions & 4 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,6 @@ impl Drop for KeyDict {

/// Buffer to store unsorted primary keys.
struct KeyBuffer {
// We use arrow's binary builder as out default binary builder
// is LargeBinaryBuilder
// TODO(yingwen): Change the type binary vector to Binary instead of LargeBinary.
/// Builder for binary key array.
key_builder: BinaryBuilder,
next_pk_index: usize,
}
Expand Down
101 changes: 99 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,28 @@ pub struct SstInfo {
mod tests {
use std::sync::Arc;

use common_datasource::file_format::parquet::BufferedWriter;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use table::predicate::Predicate;

use super::*;
use crate::cache::{CacheManager, PageKey};
use crate::sst::index::Indexer;
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, new_batch_by_range, new_source, sst_file_handle,
sst_region_metadata,
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -399,4 +408,92 @@ mod tests {
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
}

#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);

let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};

let metadata = build_test_binary_test_region_metadata();
let json = metadata.to_json().unwrap();
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);

let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);

let writer_props = props_builder.build();

let write_format = WriteFormat::new(metadata);
let fields: Vec<_> = write_format
.arrow_schema()
.fields()
.into_iter()
.map(|field| {
let data_type = field.data_type().clone();
if data_type == DataType::Binary {
Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
} else {
Field::new(field.name(), data_type, field.is_nullable())
}
})
.collect();

let arrow_schema = Arc::new(Schema::new(fields));

// Ensures field_0 has LargeBinary type.
assert_eq!(
&DataType::LargeBinary,
arrow_schema.field_with_name("field_0").unwrap().data_type()
);
let mut buffered_writer = BufferedWriter::try_new(
file_path.clone(),
object_store.clone(),
arrow_schema.clone(),
Some(writer_props),
write_opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.unwrap();

let batch = new_batch_with_binary(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
let arrays: Vec<_> = arrow_batch
.columns()
.iter()
.map(|array| {
let data_type = array.data_type().clone();
if data_type == DataType::Binary {
arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
} else {
array.clone()
}
})
.collect();
let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();

buffered_writer.write(&result).await.unwrap();
buffered_writer.close().await.unwrap();

let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary(&["a"], 0, 50),
new_batch_with_binary(&["a"], 50, 60),
],
)
.await;
}
}
Loading

0 comments on commit b821fd7

Please sign in to comment.