Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deduplicate variadic buffers in MutableArrayData::extend for ByteView arrays #6808

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 159 additions & 24 deletions arrow-data/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer};
use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode};
use half::f16;
use num::Integer;
use std::collections::HashMap;
use std::mem;

mod boolean;
Expand Down Expand Up @@ -214,7 +215,7 @@ fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Opti
}

/// Builds an extend that adds `buffer_offset` to any buffer indices encountered
fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend {
fn build_extend_view(array: &ArrayData, buffer_lookup: Vec<usize>) -> Extend {
let views = array.buffer::<u128>(0);
Box::new(
move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| {
Expand All @@ -226,7 +227,7 @@ fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend {
return *v; // Stored inline
}
let mut view = ByteView::from(*v);
view.buffer_index += buffer_offset;
view.buffer_index = buffer_lookup[view.buffer_index as usize] as u32;
view.into()
}))
},
Expand Down Expand Up @@ -627,13 +628,19 @@ impl<'a> MutableArrayData<'a> {
_ => (None, false),
};

let variadic_data_buffers = match &data_type {
DataType::BinaryView | DataType::Utf8View => arrays
.iter()
.flat_map(|x| x.buffers().iter().skip(1))
.map(Buffer::clone)
.collect(),
_ => vec![],
let (variadic_data_buffers, buffer_to_idx) = match &data_type {
DataType::BinaryView | DataType::Utf8View => {
let mut buffer_to_idx = HashMap::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if building a hashmap / vec would be overly expensive (though we would need to run benchmarks to be sure)

cc @XiangpengHao

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to run benchmarks, any particular in mind or should I create one with criterion specific to this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ones in cast are probably a good place to start

let mut variadic_buffers = Vec::new();
for buffer in arrays.iter().flat_map(|x| x.buffers().iter().skip(1)) {
buffer_to_idx.entry(buffer.as_ptr()).or_insert_with(|| {
variadic_buffers.push(buffer.clone());
variadic_buffers.len() - 1
});
}
(variadic_buffers, buffer_to_idx)
}
_ => (vec![], HashMap::new()),
};

let extend_nulls = build_extend_nulls(data_type);
Expand Down Expand Up @@ -668,20 +675,19 @@ impl<'a> MutableArrayData<'a> {

extend_values.expect("MutableArrayData::new is infallible")
}
DataType::BinaryView | DataType::Utf8View => {
let mut next_offset = 0u32;
arrays
.iter()
.map(|arr| {
let num_data_buffers = (arr.buffers().len() - 1) as u32;
let offset = next_offset;
next_offset = next_offset
.checked_add(num_data_buffers)
.expect("view buffer index overflow");
build_extend_view(arr, offset)
})
.collect()
}
DataType::BinaryView | DataType::Utf8View => arrays
.iter()
.map(|arr| {
let arr_to_res_buffer_idx: Vec<_> = arr
.buffers()
.iter()
.skip(1)
.map(|buf| *buffer_to_idx.get(&buf.as_ptr()).unwrap())
.collect();

build_extend_view(arr, arr_to_res_buffer_idx)
})
.collect(),
_ => arrays.iter().map(|array| build_extend(array)).collect(),
};

Expand Down Expand Up @@ -811,10 +817,11 @@ impl<'a> MutableArrayData<'a> {
}

// See arrow/tests/array_transform.rs for tests of transform functionality

#[cfg(test)]
mod test {
use super::*;
use arrow_buffer::Buffer;
use arrow_schema::DataType;
use arrow_schema::Field;
use std::sync::Arc;

Expand All @@ -836,4 +843,132 @@ mod test {
assert_eq!(mutable.data.buffer1.capacity(), 64);
assert_eq!(mutable.data.child_data[0].data.buffer1.capacity(), 192);
}

fn create_view_array(
data_type: DataType,
views: &[u128],
data_buffers: Vec<Buffer>,
) -> ArrayData {
let view_buffer = Buffer::from_slice_ref(views);
let mut buffers = vec![view_buffer];
buffers.extend(data_buffers);

ArrayData::try_new(data_type, views.len(), None, 0, buffers, vec![]).unwrap()
}

#[test]
fn test_binary_view_buffer_deduplication() {
let data = Buffer::from_slice_ref(b"this is a long string that will not be inlined");

let view = ByteView::new(data.len() as u32, b"this")
.with_buffer_index(0)
.with_offset(0);

// Create two arrays pointing to the same buffer
let array1 = create_view_array(DataType::BinaryView, &[view.into()], vec![data.clone()]);
let array2 = create_view_array(DataType::BinaryView, &[view.into()], vec![data.clone()]);

let mut mutable = MutableArrayData::new(vec![&array1, &array2], false, 2);
mutable.extend(0, 0, 1);
mutable.extend(1, 0, 1);

let result = mutable.freeze();

// Should only have one data buffer since they were identical
assert_eq!(result.buffers().len(), 2); // view buffer + 1 data buffer
}

#[test]
fn test_binary_view_buffer_remapping() {
let data1 = Buffer::from_slice_ref(b"this is the first long string buffer");
let data2 = Buffer::from_slice_ref(b"this is the second long string buffer");

// Create views pointing to different buffers
let view1 = ByteView::new(data1.len() as u32, b"this")
.with_buffer_index(0)
.with_offset(0);

let view2 = ByteView::new(data2.len() as u32, b"this")
.with_buffer_index(0) // Will be remapped
.with_offset(0);

let array1 = create_view_array(DataType::BinaryView, &[view1.into()], vec![data1]);
let array2 = create_view_array(DataType::BinaryView, &[view2.into()], vec![data2]);

let mut mutable = MutableArrayData::new(vec![&array1, &array2], false, 2);
mutable.extend(0, 0, 1);
mutable.extend(1, 0, 1);

let result = mutable.freeze();

assert_eq!(result.buffers().len(), 3); // view buffer + 2 data buffers

let views = result.buffer::<u128>(0);
let view1 = ByteView::from(views[0]);
let view2 = ByteView::from(views[1]);

assert_eq!(view1.buffer_index, 0);
assert_eq!(view2.buffer_index, 1); // Second buffer index was remapped
}

#[test]
fn test_utf8_view_inline_data() {
let short_str = "hello";
let mut view_data = [0u8; 16];
view_data[0..4].copy_from_slice(&(short_str.len() as u32).to_le_bytes());
view_data[4..9].copy_from_slice(short_str.as_bytes());
let view = u128::from_le_bytes(view_data);

// no buffers for inline strings
let array = create_view_array(DataType::Utf8View, &[view], vec![]);

let mut mutable = MutableArrayData::new(vec![&array], false, 1);
mutable.extend(0, 0, 1);

let result = mutable.freeze();

// Should only have view buffer for inline data
assert_eq!(result.buffers().len(), 1);

// Verify the inline data was preserved
let views = result.buffer::<u128>(0);
let actual_view = views[0];
assert_eq!(actual_view, view);
}

#[test]
fn test_view_multiple_arrays() {
let data1 = Buffer::from_slice_ref(b"this is a shared buffer that will be deduplicated");
let data2 = Buffer::from_slice_ref(b"this is a unique buffer that won't be deduplicated");

let view1 = ByteView::new(data1.len() as u32, b"this")
.with_buffer_index(0)
.with_offset(0);

let view2 = ByteView::new(data2.len() as u32, b"this")
.with_buffer_index(0)
.with_offset(0);

let array1 = create_view_array(DataType::BinaryView, &[view1.into()], vec![data1.clone()]);
let array2 = create_view_array(
DataType::BinaryView,
&[view1.into()],
vec![data1.clone()], // Shared buffer
);
let array3 = create_view_array(
DataType::BinaryView,
&[view2.into()],
vec![data2], // Unique buffer
);

let mut mutable = MutableArrayData::new(vec![&array1, &array2, &array3], false, 3);
mutable.extend(0, 0, 1);
mutable.extend(1, 0, 1);
mutable.extend(2, 0, 1);

let result = mutable.freeze();

// Should have view buffer + 2 unique data buffers
assert_eq!(result.buffers().len(), 3);
}
}
2 changes: 2 additions & 0 deletions arrow/benches/interleave_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ fn add_benchmark(c: &mut Criterion) {
let string_opt = create_string_array_with_len::<i32>(1024, 0.5, 20);
let values = create_string_array_with_len::<i32>(10, 0.0, 20);
let dict = create_dict_from_values::<Int32Type>(1024, 0.0, &values);
let string_view = create_string_view_array_with_len(1024, 0.5, 50, true);

let values = create_string_array_with_len::<i32>(1024, 0.0, 20);
let sparse_dict = create_sparse_dict_from_values::<Int32Type>(1024, 0.0, &values, 10..20);
Expand All @@ -83,6 +84,7 @@ fn add_benchmark(c: &mut Criterion) {
("str(20, 0.5)", &string_opt),
("dict(20, 0.0)", &dict),
("dict_sparse(20, 0.0)", &sparse_dict),
("string_view(0.5, 50, true)", &string_view),
];

for (prefix, base) in cases {
Expand Down
Loading