From f77b125e555e113be0cdabf05a140d276845246f Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 22 Aug 2024 17:34:17 -0700 Subject: [PATCH 01/11] optimize pad_nulls for fixed_len_byte_array --- parquet/benches/arrow_reader.rs | 207 ++++++++++++++++++ .../array_reader/fixed_len_byte_array.rs | 13 +- 2 files changed, 217 insertions(+), 3 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 18e16f0a4297..28f12b70c199 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -68,6 +68,14 @@ fn build_test_schema() -> SchemaDescPtr { OPTIONAL BYTE_ARRAY optional_binary_leaf; REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_f16_leaf (Float16); OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_f16_leaf (Float16); + REQUIRED FIXED_LEN_BYTE_ARRAY (2) mandatory_flba2_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (2) optional_flba2_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (4) mandatory_flba4_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (4) optional_flba4_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (8) mandatory_flba8_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (8) optional_flba8_leaf; + REQUIRED FIXED_LEN_BYTE_ARRAY (16) mandatory_flba16_leaf; + OPTIONAL FIXED_LEN_BYTE_ARRAY (16) optional_flba16_leaf; } "; parse_message_type(message_type) @@ -209,6 +217,50 @@ where InMemoryPageIterator::new(pages) } +// support for fixed_len_byte_arrays +fn build_encoded_flba_bytes_page_iterator( + column_desc: ColumnDescPtr, + null_density: f32, + encoding: Encoding, +) -> impl PageIterator + Clone { + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for _k in 0..VALUES_PER_PAGE { + let def_level = if rng.gen::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + // create the FLBA(BYTE_LENGTH) value + let value = (0..BYTE_LENGTH).map(|_| rng.gen()).collect::>(); + let value = + ::T::from(value); + values.push(value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(encoding, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + InMemoryPageIterator::new(pages) +} + fn build_encoded_primitive_page_iterator( column_desc: ColumnDescPtr, null_density: f32, @@ -752,6 +804,68 @@ fn bench_byte_stream_split_f16( }); } +fn bench_flba( + group: &mut BenchmarkGroup, + mandatory_column_desc: &ColumnDescPtr, + optional_column_desc: &ColumnDescPtr, + encoding: Encoding, +) { + let mut count: usize = 0; + + encoding.to_string(); + // byte_stream_split encoded, no NULLs + let data = build_encoded_flba_bytes_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + encoding, + ); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + let data = build_encoded_flba_bytes_page_iterator::( + optional_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + ); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + let data = build_encoded_flba_bytes_page_iterator::( + optional_column_desc.clone(), + 0.5, + Encoding::BYTE_STREAM_SPLIT, + ); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); +} + fn bench_byte_stream_split_decimal( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, @@ -1023,6 +1137,50 @@ fn byte_stream_split_benches(c: &mut Criterion) { 1.0, ); group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(2)"); + let mandatory_flba2_leaf_desc = schema.column(19); + let optional_flba2_leaf_desc = schema.column(20); + bench_flba::<2>( + &mut group, + &mandatory_flba2_leaf_desc, + &optional_flba2_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(4)"); + let mandatory_flba4_leaf_desc = schema.column(21); + let optional_flba4_leaf_desc = schema.column(22); + bench_flba::<4>( + &mut group, + &mandatory_flba4_leaf_desc, + &optional_flba4_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(8)"); + let mandatory_flba8_leaf_desc = schema.column(23); + let optional_flba8_leaf_desc = schema.column(24); + bench_flba::<8>( + &mut group, + &mandatory_flba8_leaf_desc, + &optional_flba8_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(16)"); + let mandatory_flba16_leaf_desc = schema.column(25); + let optional_flba16_leaf_desc = schema.column(26); + bench_flba::<16>( + &mut group, + &mandatory_flba16_leaf_desc, + &optional_flba16_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, + ); + group.finish(); } fn decimal_benches(c: &mut Criterion) { @@ -1560,6 +1718,55 @@ fn add_benches(c: &mut Criterion) { }); assert_eq!(count, EXPECTED_VALUE_COUNT); }); + + group.finish(); + + // fixed_len_byte_array benchmarks + //============================== + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(2)"); + let mandatory_flba2_leaf_desc = schema.column(19); + let optional_flba2_leaf_desc = schema.column(20); + bench_flba::<2>( + &mut group, + &mandatory_flba2_leaf_desc, + &optional_flba2_leaf_desc, + Encoding::PLAIN, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(4)"); + let mandatory_flba4_leaf_desc = schema.column(21); + let optional_flba4_leaf_desc = schema.column(22); + bench_flba::<4>( + &mut group, + &mandatory_flba4_leaf_desc, + &optional_flba4_leaf_desc, + Encoding::PLAIN, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(8)"); + let mandatory_flba8_leaf_desc = schema.column(23); + let optional_flba8_leaf_desc = schema.column(24); + bench_flba::<8>( + &mut group, + &mandatory_flba8_leaf_desc, + &optional_flba8_leaf_desc, + Encoding::PLAIN, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(16)"); + let mandatory_flba16_leaf_desc = schema.column(25); + let optional_flba16_leaf_desc = schema.column(26); + bench_flba::<16>( + &mut group, + &mandatory_flba16_leaf_desc, + &optional_flba16_leaf_desc, + Encoding::PLAIN, + ); + group.finish(); } criterion_group!( diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 01692c242713..cd6395f28182 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -256,9 +256,16 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let level_pos_bytes = level_pos * byte_length; let value_pos_bytes = value_pos * byte_length; - - for i in 0..byte_length { - self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] + // testing shows up to byte_length == 4 prefers the loop, 8 and up prefers copy_within + if byte_length > 4 { + self.buffer.copy_within( + value_pos_bytes..value_pos_bytes + byte_length, + level_pos_bytes, + ); + } else { + for i in 0..byte_length { + self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] + } } } } From da0bbd44a2c7731e1520abf214cccc1b843fce44 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 22 Aug 2024 20:46:02 -0700 Subject: [PATCH 02/11] start refactor of benchmarks --- parquet/benches/arrow_reader.rs | 288 +++++++++++++++----------------- 1 file changed, 134 insertions(+), 154 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 28f12b70c199..2ebd277b6c2c 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -636,6 +636,13 @@ fn create_decimal_by_bytes_reader( } } +fn create_fixed_len_byte_array_reader( + page_iterator: impl PageIterator + 'static, + column_desc: ColumnDescPtr, +) -> Box { + make_fixed_len_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap() +} + fn create_byte_array_reader( page_iterator: impl PageIterator + 'static, column_desc: ColumnDescPtr, @@ -682,6 +689,7 @@ fn bench_byte_decimal( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, optional_column_desc: &ColumnDescPtr, + encoding: Encoding, min: i128, max: i128, ) where @@ -691,55 +699,64 @@ fn bench_byte_decimal( // all are plain encoding let mut count: usize = 0; - // plain encoded, no NULLs + // no NULLs let data = build_encoded_decimal_bytes_page_iterator::( mandatory_column_desc.clone(), 0.0, - Encoding::PLAIN, + encoding, min, max, ); - group.bench_function("plain encoded, mandatory, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); let data = build_encoded_decimal_bytes_page_iterator::( optional_column_desc.clone(), 0.0, - Encoding::PLAIN, + encoding, min, max, ); - group.bench_function("plain encoded, optional, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); // half null let data = build_encoded_decimal_bytes_page_iterator::( optional_column_desc.clone(), 0.5, - Encoding::PLAIN, + encoding, min, max, ); - group.bench_function("plain encoded, optional, half NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", + |b| { + b.iter(|| { + let array_reader = + create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); } fn bench_byte_stream_split_f16( @@ -813,7 +830,7 @@ fn bench_flba( let mut count: usize = 0; encoding.to_string(); - // byte_stream_split encoded, no NULLs + // no NULLs let data = build_encoded_flba_bytes_page_iterator::( mandatory_column_desc.clone(), 0.0, @@ -824,7 +841,7 @@ fn bench_flba( |b| { b.iter(|| { let array_reader = - create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + create_fixed_len_byte_array_reader(data.clone(), mandatory_column_desc.clone()); count = bench_array_reader(array_reader); }); assert_eq!(count, EXPECTED_VALUE_COUNT); @@ -834,31 +851,32 @@ fn bench_flba( let data = build_encoded_flba_bytes_page_iterator::( optional_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, + encoding, ); group.bench_function( encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = - create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + create_fixed_len_byte_array_reader(data.clone(), optional_column_desc.clone()); count = bench_array_reader(array_reader); }); assert_eq!(count, EXPECTED_VALUE_COUNT); }, ); + // half null let data = build_encoded_flba_bytes_page_iterator::( optional_column_desc.clone(), 0.5, - Encoding::BYTE_STREAM_SPLIT, + encoding, ); group.bench_function( encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = - create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + create_fixed_len_byte_array_reader(data.clone(), optional_column_desc.clone()); count = bench_array_reader(array_reader); }); assert_eq!(count, EXPECTED_VALUE_COUNT); @@ -866,67 +884,23 @@ fn bench_flba( ); } -fn bench_byte_stream_split_decimal( +fn bench_fixed_len_byte_array( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, optional_column_desc: &ColumnDescPtr, - min: i128, - max: i128, -) where - T: parquet::data_type::DataType, - T::T: From>, -{ - let mut count: usize = 0; - - // byte_stream_split encoded, no NULLs - let data = build_encoded_decimal_bytes_page_iterator::( - mandatory_column_desc.clone(), - 0.0, - Encoding::BYTE_STREAM_SPLIT, - min, - max, - ); - group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - let data = build_encoded_decimal_bytes_page_iterator::( - optional_column_desc.clone(), - 0.0, - Encoding::BYTE_STREAM_SPLIT, - min, - max, +) { + bench_flba::( + group, + mandatory_column_desc, + optional_column_desc, + Encoding::PLAIN, ); - group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); - - // half null - let data = build_encoded_decimal_bytes_page_iterator::( - optional_column_desc.clone(), - 0.5, + bench_flba::( + group, + mandatory_column_desc, + optional_column_desc, Encoding::BYTE_STREAM_SPLIT, - min, - max, ); - group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { - b.iter(|| { - let array_reader = - create_decimal_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); } fn bench_primitive( @@ -1108,24 +1082,61 @@ fn bench_primitive( }); assert_eq!(count, EXPECTED_VALUE_COUNT); }); + + // byte_stream_split encoded, no NULLs + let data = build_encoded_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_encoded_primitive_page_iterator::( + optional_column_desc.clone(), + 0.0, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // plain encoded, half NULLs + let data = build_encoded_primitive_page_iterator::( + optional_column_desc.clone(), + 0.5, + Encoding::BYTE_STREAM_SPLIT, + min, + max, + ); + group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); } fn byte_stream_split_benches(c: &mut Criterion) { let schema = build_test_schema(); - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Decimal128Array"); - let mandatory_decimal4_leaf_desc = schema.column(12); - let optional_decimal4_leaf_desc = schema.column(13); - bench_byte_stream_split_decimal::( - &mut group, - &mandatory_decimal4_leaf_desc, - &optional_decimal4_leaf_desc, - // precision is 16: the max is 9999999999999999 - 9999999999999000, - 9999999999999999, - ); - group.finish(); - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array"); let mandatory_f16_leaf_desc = schema.column(17); let optional_f16_leaf_desc = schema.column(18); @@ -1137,50 +1148,6 @@ fn byte_stream_split_benches(c: &mut Criterion) { 1.0, ); group.finish(); - - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(2)"); - let mandatory_flba2_leaf_desc = schema.column(19); - let optional_flba2_leaf_desc = schema.column(20); - bench_flba::<2>( - &mut group, - &mandatory_flba2_leaf_desc, - &optional_flba2_leaf_desc, - Encoding::BYTE_STREAM_SPLIT, - ); - group.finish(); - - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(4)"); - let mandatory_flba4_leaf_desc = schema.column(21); - let optional_flba4_leaf_desc = schema.column(22); - bench_flba::<4>( - &mut group, - &mandatory_flba4_leaf_desc, - &optional_flba4_leaf_desc, - Encoding::BYTE_STREAM_SPLIT, - ); - group.finish(); - - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(8)"); - let mandatory_flba8_leaf_desc = schema.column(23); - let optional_flba8_leaf_desc = schema.column(24); - bench_flba::<8>( - &mut group, - &mandatory_flba8_leaf_desc, - &optional_flba8_leaf_desc, - Encoding::BYTE_STREAM_SPLIT, - ); - group.finish(); - - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/FixedLenByteArray(16)"); - let mandatory_flba16_leaf_desc = schema.column(25); - let optional_flba16_leaf_desc = schema.column(26); - bench_flba::<16>( - &mut group, - &mandatory_flba16_leaf_desc, - &optional_flba16_leaf_desc, - Encoding::BYTE_STREAM_SPLIT, - ); - group.finish(); } fn decimal_benches(c: &mut Criterion) { @@ -1221,6 +1188,22 @@ fn decimal_benches(c: &mut Criterion) { &mut group, &mandatory_decimal3_leaf_desc, &optional_decimal3_leaf_desc, + Encoding::PLAIN, + // precision is 16: the max is 9999999999999999 + 9999999999999000, + 9999999999999999, + ); + group.finish(); + + // parquet FIXED_LEN_BYTE_ARRAY, logical type decimal(16,2) + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array"); + let mandatory_decimal4_leaf_desc = schema.column(12); + let optional_decimal4_leaf_desc = schema.column(13); + bench_byte_decimal::( + &mut group, + &mandatory_decimal4_leaf_desc, + &optional_decimal4_leaf_desc, + Encoding::PLAIN, // precision is 16: the max is 9999999999999999 9999999999999000, 9999999999999999, @@ -1234,6 +1217,7 @@ fn decimal_benches(c: &mut Criterion) { &mut group, &mandatory_decimal4_leaf_desc, &optional_decimal4_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, // precision is 16: the max is 9999999999999999 9999999999999000, 9999999999999999, @@ -1727,44 +1711,40 @@ fn add_benches(c: &mut Criterion) { let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(2)"); let mandatory_flba2_leaf_desc = schema.column(19); let optional_flba2_leaf_desc = schema.column(20); - bench_flba::<2>( + bench_fixed_len_byte_array::<2>( &mut group, &mandatory_flba2_leaf_desc, &optional_flba2_leaf_desc, - Encoding::PLAIN, ); group.finish(); let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(4)"); let mandatory_flba4_leaf_desc = schema.column(21); let optional_flba4_leaf_desc = schema.column(22); - bench_flba::<4>( + bench_fixed_len_byte_array::<4>( &mut group, &mandatory_flba4_leaf_desc, &optional_flba4_leaf_desc, - Encoding::PLAIN, ); group.finish(); let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(8)"); let mandatory_flba8_leaf_desc = schema.column(23); let optional_flba8_leaf_desc = schema.column(24); - bench_flba::<8>( + bench_fixed_len_byte_array::<8>( &mut group, &mandatory_flba8_leaf_desc, &optional_flba8_leaf_desc, - Encoding::PLAIN, ); group.finish(); let mut group = c.benchmark_group("arrow_array_reader/FixedLenByteArray(16)"); let mandatory_flba16_leaf_desc = schema.column(25); let optional_flba16_leaf_desc = schema.column(26); - bench_flba::<16>( + bench_fixed_len_byte_array::<16>( &mut group, &mandatory_flba16_leaf_desc, &optional_flba16_leaf_desc, - Encoding::PLAIN, ); group.finish(); } From fe97120a3dcd8d414a03248481a289adb166b4b0 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 22 Aug 2024 21:03:56 -0700 Subject: [PATCH 03/11] refactor float16 --- parquet/benches/arrow_reader.rs | 37 +++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 2ebd277b6c2c..accc77f9ffc5 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -759,10 +759,11 @@ fn bench_byte_decimal( ); } -fn bench_byte_stream_split_f16( +fn bench_f16( group: &mut BenchmarkGroup, mandatory_column_desc: &ColumnDescPtr, optional_column_desc: &ColumnDescPtr, + encoding: Encoding, min: f32, max: f32, ) where @@ -775,11 +776,11 @@ fn bench_byte_stream_split_f16( let data = build_encoded_f16_bytes_page_iterator::( mandatory_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, + encoding, min, max, ); - group.bench_function("byte_stream_split encoded, mandatory, no NULLs", |b| { + group.bench_function(encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", |b| { b.iter(|| { let array_reader = create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); @@ -791,11 +792,11 @@ fn bench_byte_stream_split_f16( let data = build_encoded_f16_bytes_page_iterator::( optional_column_desc.clone(), 0.0, - Encoding::BYTE_STREAM_SPLIT, + encoding, min, max, ); - group.bench_function("byte_stream_split encoded, optional, no NULLs", |b| { + group.bench_function(encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", |b| { b.iter(|| { let array_reader = create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); @@ -807,11 +808,11 @@ fn bench_byte_stream_split_f16( let data = build_encoded_f16_bytes_page_iterator::( optional_column_desc.clone(), 0.5, - Encoding::BYTE_STREAM_SPLIT, + encoding, min, max, ); - group.bench_function("byte_stream_split encoded, optional, half NULLs", |b| { + group.bench_function(encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", |b| { b.iter(|| { let array_reader = create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); @@ -1134,16 +1135,30 @@ fn bench_primitive( }); } -fn byte_stream_split_benches(c: &mut Criterion) { +fn float16_benches(c: &mut Criterion) { let schema = build_test_schema(); - let mut group = c.benchmark_group("arrow_array_reader/BYTE_STREAM_SPLIT/Float16Array"); + let mut group = c.benchmark_group("arrow_array_reader/Float16Array"); + let mandatory_f16_leaf_desc = schema.column(17); + let optional_f16_leaf_desc = schema.column(18); + bench_f16::( + &mut group, + &mandatory_f16_leaf_desc, + &optional_f16_leaf_desc, + Encoding::PLAIN, + -1.0, + 1.0, + ); + group.finish(); + + let mut group = c.benchmark_group("arrow_array_reader/Float16Array"); let mandatory_f16_leaf_desc = schema.column(17); let optional_f16_leaf_desc = schema.column(18); - bench_byte_stream_split_f16::( + bench_f16::( &mut group, &mandatory_f16_leaf_desc, &optional_f16_leaf_desc, + Encoding::BYTE_STREAM_SPLIT, -1.0, 1.0, ); @@ -1753,6 +1768,6 @@ criterion_group!( benches, add_benches, decimal_benches, - byte_stream_split_benches, + float16_benches, ); criterion_main!(benches); From 1ca022285143eb18b39b6ec49369828e056cef8e Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 22 Aug 2024 21:19:39 -0700 Subject: [PATCH 04/11] add fixed_len_byte_array to float16 benches --- parquet/benches/arrow_reader.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index accc77f9ffc5..37d731bdf508 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -1138,7 +1138,7 @@ fn bench_primitive( fn float16_benches(c: &mut Criterion) { let schema = build_test_schema(); - let mut group = c.benchmark_group("arrow_array_reader/Float16Array"); + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array"); let mandatory_f16_leaf_desc = schema.column(17); let optional_f16_leaf_desc = schema.column(18); bench_f16::( @@ -1151,7 +1151,7 @@ fn float16_benches(c: &mut Criterion) { ); group.finish(); - let mut group = c.benchmark_group("arrow_array_reader/Float16Array"); + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Float16Array"); let mandatory_f16_leaf_desc = schema.column(17); let optional_f16_leaf_desc = schema.column(18); bench_f16::( @@ -1211,7 +1211,7 @@ fn decimal_benches(c: &mut Criterion) { group.finish(); // parquet FIXED_LEN_BYTE_ARRAY, logical type decimal(16,2) - let mut group = c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array"); + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array"); let mandatory_decimal4_leaf_desc = schema.column(12); let optional_decimal4_leaf_desc = schema.column(13); bench_byte_decimal::( @@ -1225,7 +1225,7 @@ fn decimal_benches(c: &mut Criterion) { ); group.finish(); - let mut group = c.benchmark_group("arrow_array_reader/FIXED_LENGTH_BYTE_ARRAY/Decimal128Array"); + let mut group = c.benchmark_group("arrow_array_reader/FIXED_LEN_BYTE_ARRAY/Decimal128Array"); let mandatory_decimal4_leaf_desc = schema.column(12); let optional_decimal4_leaf_desc = schema.column(13); bench_byte_decimal::( From b715f80c5f137bc00c4e6c406c6f8edacfa35861 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 23 Aug 2024 09:48:49 -0700 Subject: [PATCH 05/11] replace copy_within with vectorizable copy --- .../src/arrow/array_reader/fixed_len_byte_array.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index cd6395f28182..12ea6995a1ef 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -258,10 +258,16 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let value_pos_bytes = value_pos * byte_length; // testing shows up to byte_length == 4 prefers the loop, 8 and up prefers copy_within if byte_length > 4 { - self.buffer.copy_within( - value_pos_bytes..value_pos_bytes + byte_length, - level_pos_bytes, - ); + let split = self.buffer.split_at_mut(level_pos_bytes); + let dst = &mut split.1[..byte_length]; + let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length]; + for i in 0..byte_length { + dst[i] = src[i] + } + //self.buffer.copy_within( + // value_pos_bytes..value_pos_bytes + byte_length, + // level_pos_bytes, + //); } else { for i in 0..byte_length { self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] From 1c5221ea76019022538f2c5f243bd9d9f07ca389 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Fri, 23 Aug 2024 10:36:04 -0700 Subject: [PATCH 06/11] update comment --- parquet/src/arrow/array_reader/fixed_len_byte_array.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 12ea6995a1ef..ec4cbc501f68 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -256,7 +256,11 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let level_pos_bytes = level_pos * byte_length; let value_pos_bytes = value_pos * byte_length; - // testing shows up to byte_length == 4 prefers the loop, 8 and up prefers copy_within + + // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4, + // the simple loop is preferred as the compiler can eliminate the loop via unrolling. + // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows + // the loop to be vectorized, yielding much better performance. if byte_length > 4 { let split = self.buffer.split_at_mut(level_pos_bytes); let dst = &mut split.1[..byte_length]; From 90c21a85cfb6531d746b9226199934ecbb0e1b30 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Aug 2024 11:37:25 -0700 Subject: [PATCH 07/11] move branch on byte_length outside of loop --- .../array_reader/fixed_len_byte_array.rs | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index ec4cbc501f68..25c3b37545c0 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -248,31 +248,37 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { .resize((read_offset + levels_read) * byte_length, 0); let values_range = read_offset..read_offset + values_read; - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - let level_pos_bytes = level_pos * byte_length; - let value_pos_bytes = value_pos * byte_length; + // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4, + // the simple loop is preferred as the compiler can eliminate the loop via unrolling. + // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows + // the loop to be vectorized, yielding much better performance. + if byte_length > 4 { + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + + let level_pos_bytes = level_pos * byte_length; + let value_pos_bytes = value_pos * byte_length; - // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4, - // the simple loop is preferred as the compiler can eliminate the loop via unrolling. - // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows - // the loop to be vectorized, yielding much better performance. - if byte_length > 4 { let split = self.buffer.split_at_mut(level_pos_bytes); let dst = &mut split.1[..byte_length]; let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length]; for i in 0..byte_length { dst[i] = src[i] } - //self.buffer.copy_within( - // value_pos_bytes..value_pos_bytes + byte_length, - // level_pos_bytes, - //); - } else { + } + } else { + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + + let level_pos_bytes = level_pos * byte_length; + let value_pos_bytes = value_pos * byte_length; for i in 0..byte_length { self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] } From 4e2dc4f83b12fb94e077bfcc512aec1bf37dcaff Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Aug 2024 14:13:12 -0700 Subject: [PATCH 08/11] reduce code duplication while preserving performance gains --- .../array_reader/fixed_len_byte_array.rs | 57 +++++++++++-------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 25c3b37545c0..9aa333040228 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -36,6 +36,7 @@ use arrow_schema::{DataType as ArrowType, IntervalUnit}; use bytes::Bytes; use half::f16; use std::any::Any; +use std::ops::Range; use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column @@ -233,6 +234,29 @@ struct FixedLenByteArrayBuffer { byte_length: Option, } +#[inline] +fn move_values( + buffer: &mut Vec, + byte_length: usize, + values_range: Range, + valid_mask: &[u8], + mut op: F, +) where + F: FnMut(&mut Vec, usize, usize, usize), +{ + for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { + debug_assert!(level_pos >= value_pos); + if level_pos <= value_pos { + break; + } + + let level_pos_bytes = level_pos * byte_length; + let value_pos_bytes = value_pos * byte_length; + + op(buffer, level_pos_bytes, value_pos_bytes, byte_length) + } +} + impl ValuesBuffer for FixedLenByteArrayBuffer { fn pad_nulls( &mut self, @@ -248,41 +272,28 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { .resize((read_offset + levels_read) * byte_length, 0); let values_range = read_offset..read_offset + values_read; - // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4, // the simple loop is preferred as the compiler can eliminate the loop via unrolling. // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows // the loop to be vectorized, yielding much better performance. - if byte_length > 4 { - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - - let level_pos_bytes = level_pos * byte_length; - let value_pos_bytes = value_pos * byte_length; - - let split = self.buffer.split_at_mut(level_pos_bytes); + const VEC_CUTOFF: usize = 4; + if byte_length > VEC_CUTOFF { + let op = |buffer: &mut Vec, level_pos_bytes, value_pos_bytes, byte_length| { + let split = buffer.split_at_mut(level_pos_bytes); let dst = &mut split.1[..byte_length]; let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length]; for i in 0..byte_length { dst[i] = src[i] } - } + }; + move_values(&mut self.buffer, byte_length, values_range, valid_mask, op); } else { - for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { - debug_assert!(level_pos >= value_pos); - if level_pos <= value_pos { - break; - } - - let level_pos_bytes = level_pos * byte_length; - let value_pos_bytes = value_pos * byte_length; + let op = |buffer: &mut Vec, level_pos_bytes, value_pos_bytes, byte_length| { for i in 0..byte_length { - self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] + buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i] } - } + }; + move_values(&mut self.buffer, byte_length, values_range, valid_mask, op); } } } From 63116aa6fc1e2b1fa04200eebdb73fdac2ee6642 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Aug 2024 14:51:41 -0700 Subject: [PATCH 09/11] formatting --- parquet/benches/arrow_reader.rs | 64 +++++++++++++++++---------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 37d731bdf508..f165adbe897c 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -780,14 +780,17 @@ fn bench_f16( min, max, ); - group.bench_function(encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); let data = build_encoded_f16_bytes_page_iterator::( optional_column_desc.clone(), @@ -796,14 +799,17 @@ fn bench_f16( min, max, ); - group.bench_function(encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", |b| { - b.iter(|| { - let array_reader = - create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, no NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); let data = build_encoded_f16_bytes_page_iterator::( optional_column_desc.clone(), @@ -812,14 +818,17 @@ fn bench_f16( min, max, ); - group.bench_function(encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", |b| { - b.iter(|| { - let array_reader = - create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); - count = bench_array_reader(array_reader); - }); - assert_eq!(count, EXPECTED_VALUE_COUNT); - }); + group.bench_function( + encoding.to_string().to_lowercase() + " encoded, optional, half NULLs", + |b| { + b.iter(|| { + let array_reader = + create_f16_by_bytes_reader(data.clone(), optional_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); } fn bench_flba( @@ -1764,10 +1773,5 @@ fn add_benches(c: &mut Criterion) { group.finish(); } -criterion_group!( - benches, - add_benches, - decimal_benches, - float16_benches, -); +criterion_group!(benches, add_benches, decimal_benches, float16_benches,); criterion_main!(benches); From a13b96d4d64ae1685d5c3d4e25af414ca2bcc053 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Aug 2024 14:56:08 -0700 Subject: [PATCH 10/11] silence clippy --- parquet/src/arrow/array_reader/fixed_len_byte_array.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 9aa333040228..4b1f4cce85be 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -258,6 +258,9 @@ fn move_values( } impl ValuesBuffer for FixedLenByteArrayBuffer { + // Silence clippy warning. `copy_from_slice` is slower than allowing the compiler to vectorize + // `dst[i] = src[i]` below. + #[allow(clippy::manual_memcpy)] fn pad_nulls( &mut self, read_offset: usize, From 1a1e967c4704fd5d67ccc5054d4673805a994656 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 23 Aug 2024 15:21:26 -0700 Subject: [PATCH 11/11] clippy won again --- parquet/src/arrow/array_reader/fixed_len_byte_array.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 4b1f4cce85be..4be07ed68f1d 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -258,9 +258,6 @@ fn move_values( } impl ValuesBuffer for FixedLenByteArrayBuffer { - // Silence clippy warning. `copy_from_slice` is slower than allowing the compiler to vectorize - // `dst[i] = src[i]` below. - #[allow(clippy::manual_memcpy)] fn pad_nulls( &mut self, read_offset: usize, @@ -285,9 +282,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { let split = buffer.split_at_mut(level_pos_bytes); let dst = &mut split.1[..byte_length]; let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length]; - for i in 0..byte_length { - dst[i] = src[i] - } + dst.copy_from_slice(src); }; move_values(&mut self.buffer, byte_length, values_range, valid_mask, op); } else {