Skip to content

Commit

Permalink
chore: apply CR suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
etolbakov authored May 17, 2024
1 parent 9f55436 commit 817b343
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 52 deletions.
61 changes: 51 additions & 10 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ mod tests {
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
Expand All @@ -100,7 +103,7 @@ mod tests {
use crate::sst::DEFAULT_WRITE_CONCURRENCY;
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary_by_range, new_source, sst_file_handle, sst_region_metadata,
new_batch_with_large_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};

Expand Down Expand Up @@ -453,30 +456,68 @@ mod tests {
let writer_props = props_builder.build();

let write_format = WriteFormat::new(metadata.clone());
let string = file_path.clone();
let fields: Vec<_> = write_format
.arrow_schema()
.fields()
.into_iter()
.map(|field| {
let data_type = field.data_type().clone();
if data_type == DataType::Binary {
Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
} else {
Field::new(field.name(), data_type, field.is_nullable())
}
})
.collect();

let arrow_schema = Arc::new(Schema::new(fields));

// Ensures field_0 has LargeBinary type.
assert_eq!(
DataType::LargeBinary,
arrow_schema
.field_with_name("field_0")
.unwrap()
.data_type()
.clone()
);
let mut buffered_writer = BufferedWriter::try_new(
string,
file_path.clone(),
object_store.clone(),
write_format.arrow_schema(),
arrow_schema.clone(),
Some(writer_props),
write_opts.write_buffer_size.as_bytes() as usize,
DEFAULT_WRITE_CONCURRENCY,
)
.await
.unwrap();
let batch = new_batch_with_binary_by_range(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();

buffered_writer.write(&arrow_batch).await.unwrap();

let batch = new_batch_with_large_binary(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
let arrays: Vec<_> = arrow_batch
.columns()
.iter()
.map(|array| {
let data_type = array.data_type().clone();
if data_type == DataType::Binary {
arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
} else {
array.clone()
}
})
.collect();
let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();

buffered_writer.write(&result).await.unwrap();
buffered_writer.close().await.unwrap();

let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary_by_range(&["a"], 0, 50),
new_batch_with_binary_by_range(&["a"], 50, 60),
new_batch_with_large_binary(&["a"], 0, 50),
new_batch_with_large_binary(&["a"], 50, 60),
],
)
.await;
Expand Down
34 changes: 1 addition & 33 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{
LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use log_store::raft_engine::log_store::RaftEngineLogStore;
Expand Down Expand Up @@ -560,36 +558,6 @@ impl Iterator for VecBatchReader {
}
}

pub fn new_binary_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
sequences: &[u64],
op_types: &[OpType],
field_column_id: ColumnId,
field: Vec<Vec<u8>>,
) -> BatchBuilder {
let mut builder = BatchBuilder::new(primary_key.to_vec());
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(
field_column_id,
Arc::new(LargeBinaryArray::from_iter_values(field)),
)
.unwrap();
builder
}

pub fn new_batch_builder(
primary_key: &[u8],
timestamps: &[i64],
Expand Down
33 changes: 24 additions & 9 deletions src/mito2/src/test_util/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::sync::Arc;

use api::v1::{OpType, SemanticType};
use common_time::Timestamp;
use datatypes::arrow::array::{
LargeBinaryArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::ValueRef;
Expand All @@ -27,12 +30,10 @@ use store_api::metadata::{
};
use store_api::storage::RegionId;

use crate::read::{Batch, Source};
use crate::read::{Batch, BatchBuilder, Source};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::test_util::{
new_batch_builder, new_binary_batch_builder, new_noop_file_purger, VecBatchReader,
};
use crate::test_util::{new_batch_builder, new_noop_file_purger, VecBatchReader};

/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
Expand Down Expand Up @@ -132,7 +133,7 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
.unwrap()
}

pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -> Batch {
pub fn new_batch_with_large_binary(tags: &[&str], start: usize, end: usize) -> Batch {
assert!(end >= start);
let pk = new_primary_key(tags);
let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect();
Expand All @@ -143,9 +144,23 @@ pub fn new_batch_with_binary_by_range(tags: &[&str], start: usize, end: usize) -
.map(|_v| "some data".as_bytes().to_vec())
.collect();

new_binary_batch_builder(&pk, &timestamps, &sequences, &op_types, 1, field)
.build()
let mut builder = BatchBuilder::new(pk);
builder
.timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
timestamps.iter().copied(),
)))
.unwrap()
.sequences_array(Arc::new(UInt64Array::from_iter_values(
sequences.iter().copied(),
)))
.unwrap()
.op_types_array(Arc::new(UInt8Array::from_iter_values(
op_types.iter().map(|v| *v as u8),
)))
.unwrap()
.push_field_array(1, Arc::new(LargeBinaryArray::from_iter_values(field)))
.unwrap();
builder.build().unwrap()
}

/// ParquetMetaData doesn't implement `PartialEq` trait, check internal fields manually
Expand Down Expand Up @@ -174,7 +189,7 @@ pub fn assert_parquet_metadata_eq(a: Arc<ParquetMetaData>, b: Arc<ParquetMetaDat

/// Creates a new region metadata for testing SSTs with binary datatype.
///
/// Schema: tag_0, field_1(binary), ts
/// Schema: tag_0(string), field_0(binary), ts
pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
builder
Expand All @@ -188,7 +203,7 @@ pub fn build_test_binary_test_region_metadata() -> RegionMetadataRef {
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("field_1", ConcreteDataType::binary_datatype(), true),
column_schema: ColumnSchema::new("field_0", ConcreteDataType::binary_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 1,
})
Expand Down

0 comments on commit 817b343

Please sign in to comment.