From 6472dd87f17260f0687b6ba4aca41c01ebc0e7f4 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Wed, 27 Nov 2024 11:00:18 +0000 Subject: [PATCH 1/4] deduplicate variadic buffers in MutableArrayData::extend for ByteView arrays --- arrow-data/src/transform/mod.rs | 52 ++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index 93b79e6a5eb8..e344e80b763b 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -27,6 +27,7 @@ use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer}; use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode}; use half::f16; use num::Integer; +use std::collections::HashMap; use std::mem; mod boolean; @@ -214,7 +215,7 @@ fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Opti } /// Builds an extend that adds `buffer_offset` to any buffer indices encountered -fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend { +fn build_extend_view(array: &ArrayData, buffer_lookup: Vec) -> Extend { let views = array.buffer::(0); Box::new( move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| { @@ -226,7 +227,7 @@ fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend { return *v; // Stored inline } let mut view = ByteView::from(*v); - view.buffer_index += buffer_offset; + view.buffer_index = buffer_lookup[view.buffer_index as usize] as u32; view.into() })) }, @@ -627,13 +628,19 @@ impl<'a> MutableArrayData<'a> { _ => (None, false), }; - let variadic_data_buffers = match &data_type { - DataType::BinaryView | DataType::Utf8View => arrays - .iter() - .flat_map(|x| x.buffers().iter().skip(1)) - .map(Buffer::clone) - .collect(), - _ => vec![], + let (variadic_data_buffers, buffer_to_idx) = match &data_type { + DataType::BinaryView | DataType::Utf8View => { + let mut buffer_to_idx = HashMap::new(); + let mut variadic_buffers = Vec::new(); + for buffer in arrays.iter().flat_map(|x| x.buffers().iter().skip(1)) { + buffer_to_idx.entry(buffer.as_ptr()).or_insert_with(|| { + variadic_buffers.push(buffer.clone()); + variadic_buffers.len() - 1 + }); + } + (variadic_buffers, buffer_to_idx) + } + _ => (vec![], HashMap::new()), }; let extend_nulls = build_extend_nulls(data_type); @@ -668,20 +675,19 @@ impl<'a> MutableArrayData<'a> { extend_values.expect("MutableArrayData::new is infallible") } - DataType::BinaryView | DataType::Utf8View => { - let mut next_offset = 0u32; - arrays - .iter() - .map(|arr| { - let num_data_buffers = (arr.buffers().len() - 1) as u32; - let offset = next_offset; - next_offset = next_offset - .checked_add(num_data_buffers) - .expect("view buffer index overflow"); - build_extend_view(arr, offset) - }) - .collect() - } + DataType::BinaryView | DataType::Utf8View => arrays + .iter() + .map(|arr| { + let arr_to_res_buffer_idx: Vec<_> = arr + .buffers() + .iter() + .skip(1) + .map(|buf| *buffer_to_idx.get(&buf.as_ptr()).unwrap()) + .collect(); + + build_extend_view(arr, arr_to_res_buffer_idx) + }) + .collect(), _ => arrays.iter().map(|array| build_extend(array)).collect(), }; From d143706b7fbd7b3e83b67fe7638cae27870a08a0 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 3 Dec 2024 16:34:39 +0000 Subject: [PATCH 2/4] wip: add stringview to interleave benchmark, comment out the rest --- arrow/benches/interleave_kernels.rs | 30 +++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/arrow/benches/interleave_kernels.rs b/arrow/benches/interleave_kernels.rs index 0941f1e3fd33..a6571e192329 100644 --- a/arrow/benches/interleave_kernels.rs +++ b/arrow/benches/interleave_kernels.rs @@ -72,17 +72,19 @@ fn add_benchmark(c: &mut Criterion) { let string_opt = create_string_array_with_len::(1024, 0.5, 20); let values = create_string_array_with_len::(10, 0.0, 20); let dict = create_dict_from_values::(1024, 0.0, &values); + let string_view = create_string_view_array_with_len(1024, 0.5, 50, true); let values = create_string_array_with_len::(1024, 0.0, 20); let sparse_dict = create_sparse_dict_from_values::(1024, 0.0, &values, 10..20); let cases: &[(&str, &dyn Array)] = &[ - ("i32(0.0)", &i32), - ("i32(0.5)", &i32_opt), - ("str(20, 0.0)", &string), - ("str(20, 0.5)", &string_opt), - ("dict(20, 0.0)", &dict), - ("dict_sparse(20, 0.0)", &sparse_dict), + // ("i32(0.0)", &i32), + // ("i32(0.5)", &i32_opt), + // ("str(20, 0.0)", &string), + // ("str(20, 0.5)", &string_opt), + // ("dict(20, 0.0)", &dict), + // ("dict_sparse(20, 0.0)", &sparse_dict), + ("string_view(0.5, 50, true)", &string_view), ]; for (prefix, base) in cases { @@ -98,14 +100,14 @@ fn add_benchmark(c: &mut Criterion) { } } - for len in [100, 1024, 2048] { - bench_values( - c, - &format!("interleave dict_distinct {len}"), - 100, - &[&dict, &sparse_dict], - ); - } + // for len in [100, 1024, 2048] { + // bench_values( + // c, + // &format!("interleave dict_distinct {len}"), + // 100, + // &[&dict, &sparse_dict], + // ); + // } } criterion_group!(benches, add_benchmark); From a2957d020a08bce0eeda6ff7739a09a2ccb1c80c Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 3 Dec 2024 16:44:31 +0000 Subject: [PATCH 3/4] restore the rest of the benchmarks --- arrow/benches/interleave_kernels.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/arrow/benches/interleave_kernels.rs b/arrow/benches/interleave_kernels.rs index a6571e192329..76b8f4d96a22 100644 --- a/arrow/benches/interleave_kernels.rs +++ b/arrow/benches/interleave_kernels.rs @@ -78,12 +78,12 @@ fn add_benchmark(c: &mut Criterion) { let sparse_dict = create_sparse_dict_from_values::(1024, 0.0, &values, 10..20); let cases: &[(&str, &dyn Array)] = &[ - // ("i32(0.0)", &i32), - // ("i32(0.5)", &i32_opt), - // ("str(20, 0.0)", &string), - // ("str(20, 0.5)", &string_opt), - // ("dict(20, 0.0)", &dict), - // ("dict_sparse(20, 0.0)", &sparse_dict), + ("i32(0.0)", &i32), + ("i32(0.5)", &i32_opt), + ("str(20, 0.0)", &string), + ("str(20, 0.5)", &string_opt), + ("dict(20, 0.0)", &dict), + ("dict_sparse(20, 0.0)", &sparse_dict), ("string_view(0.5, 50, true)", &string_view), ]; @@ -100,14 +100,14 @@ fn add_benchmark(c: &mut Criterion) { } } - // for len in [100, 1024, 2048] { - // bench_values( - // c, - // &format!("interleave dict_distinct {len}"), - // 100, - // &[&dict, &sparse_dict], - // ); - // } + for len in [100, 1024, 2048] { + bench_values( + c, + &format!("interleave dict_distinct {len}"), + 100, + &[&dict, &sparse_dict], + ); + } } criterion_group!(benches, add_benchmark); From 0a54217bdf1adef08eaa21620df12643d69d4fa1 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Sat, 21 Dec 2024 14:23:36 +0000 Subject: [PATCH 4/4] tests --- arrow-data/src/transform/mod.rs | 131 +++++++++++++++++++++++++++++++- 1 file changed, 130 insertions(+), 1 deletion(-) diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index e344e80b763b..2ba50f3b4762 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -817,10 +817,11 @@ impl<'a> MutableArrayData<'a> { } // See arrow/tests/array_transform.rs for tests of transform functionality - #[cfg(test)] mod test { use super::*; + use arrow_buffer::Buffer; + use arrow_schema::DataType; use arrow_schema::Field; use std::sync::Arc; @@ -842,4 +843,132 @@ mod test { assert_eq!(mutable.data.buffer1.capacity(), 64); assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192); } + + fn create_view_array( + data_type: DataType, + views: &[u128], + data_buffers: Vec, + ) -> ArrayData { + let view_buffer = Buffer::from_slice_ref(views); + let mut buffers = vec![view_buffer]; + buffers.extend(data_buffers); + + ArrayData::try_new(data_type, views.len(), None, 0, buffers, vec![]).unwrap() + } + + #[test] + fn test_binary_view_buffer_deduplication() { + let data = Buffer::from_slice_ref(b"this is a long string that will not be inlined"); + + let view = ByteView::new(data.len() as u32, b"this") + .with_buffer_index(0) + .with_offset(0); + + // Create two arrays pointing to the same buffer + let array1 = create_view_array(DataType::BinaryView, &[view.into()], vec![data.clone()]); + let array2 = create_view_array(DataType::BinaryView, &[view.into()], vec![data.clone()]); + + let mut mutable = MutableArrayData::new(vec![&array1, &array2], false, 2); + mutable.extend(0, 0, 1); + mutable.extend(1, 0, 1); + + let result = mutable.freeze(); + + // Should only have one data buffer since they were identical + assert_eq!(result.buffers().len(), 2); // view buffer + 1 data buffer + } + + #[test] + fn test_binary_view_buffer_remapping() { + let data1 = Buffer::from_slice_ref(b"this is the first long string buffer"); + let data2 = Buffer::from_slice_ref(b"this is the second long string buffer"); + + // Create views pointing to different buffers + let view1 = ByteView::new(data1.len() as u32, b"this") + .with_buffer_index(0) + .with_offset(0); + + let view2 = ByteView::new(data2.len() as u32, b"this") + .with_buffer_index(0) // Will be remapped + .with_offset(0); + + let array1 = create_view_array(DataType::BinaryView, &[view1.into()], vec![data1]); + let array2 = create_view_array(DataType::BinaryView, &[view2.into()], vec![data2]); + + let mut mutable = MutableArrayData::new(vec![&array1, &array2], false, 2); + mutable.extend(0, 0, 1); + mutable.extend(1, 0, 1); + + let result = mutable.freeze(); + + assert_eq!(result.buffers().len(), 3); // view buffer + 2 data buffers + + let views = result.buffer::(0); + let view1 = ByteView::from(views[0]); + let view2 = ByteView::from(views[1]); + + assert_eq!(view1.buffer_index, 0); + assert_eq!(view2.buffer_index, 1); // Second buffer index was remapped + } + + #[test] + fn test_utf8_view_inline_data() { + let short_str = "hello"; + let mut view_data = [0u8; 16]; + view_data[0..4].copy_from_slice(&(short_str.len() as u32).to_le_bytes()); + view_data[4..9].copy_from_slice(short_str.as_bytes()); + let view = u128::from_le_bytes(view_data); + + // no buffers for inline strings + let array = create_view_array(DataType::Utf8View, &[view], vec![]); + + let mut mutable = MutableArrayData::new(vec![&array], false, 1); + mutable.extend(0, 0, 1); + + let result = mutable.freeze(); + + // Should only have view buffer for inline data + assert_eq!(result.buffers().len(), 1); + + // Verify the inline data was preserved + let views = result.buffer::(0); + let actual_view = views[0]; + assert_eq!(actual_view, view); + } + + #[test] + fn test_view_multiple_arrays() { + let data1 = Buffer::from_slice_ref(b"this is a shared buffer that will be deduplicated"); + let data2 = Buffer::from_slice_ref(b"this is a unique buffer that won't be deduplicated"); + + let view1 = ByteView::new(data1.len() as u32, b"this") + .with_buffer_index(0) + .with_offset(0); + + let view2 = ByteView::new(data2.len() as u32, b"this") + .with_buffer_index(0) + .with_offset(0); + + let array1 = create_view_array(DataType::BinaryView, &[view1.into()], vec![data1.clone()]); + let array2 = create_view_array( + DataType::BinaryView, + &[view1.into()], + vec![data1.clone()], // Shared buffer + ); + let array3 = create_view_array( + DataType::BinaryView, + &[view2.into()], + vec![data2], // Unique buffer + ); + + let mut mutable = MutableArrayData::new(vec![&array1, &array2, &array3], false, 3); + mutable.extend(0, 0, 1); + mutable.extend(1, 0, 1); + mutable.extend(2, 0, 1); + + let result = mutable.freeze(); + + // Should have view buffer + 2 unique data buffers + assert_eq!(result.buffers().len(), 3); + } }