From ec7e135f620e0c9ad1df73baeaa9a66c66382094 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Sat, 21 Dec 2024 17:59:25 +0200 Subject: [PATCH] improve offsets code according to code review --- arrow-buffer/src/buffer/offset.rs | 223 +++++++----------------------- arrow-select/src/concat.rs | 91 +----------- 2 files changed, 51 insertions(+), 263 deletions(-) diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index ce222540c92..164af6f01d0 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -16,8 +16,7 @@ // under the License. use crate::buffer::ScalarBuffer; -use crate::{ArrowNativeType, Buffer, MutableBuffer, OffsetBufferBuilder}; -use num::Integer; +use crate::{ArrowNativeType, MutableBuffer, OffsetBufferBuilder}; use std::ops::Deref; /// A non-empty buffer of monotonically increasing, positive integers. @@ -134,6 +133,38 @@ impl OffsetBuffer { Self(out.into()) } + /// Get an Iterator over the lengths of this [`OffsetBuffer`] + /// + /// ``` + /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; + /// let offsets = OffsetBuffer::<_>::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); + /// assert_eq!(offsets.lengths().collect::>(), vec![1, 3, 5]); + /// ``` + /// + /// Empty [`OffsetBuffer`] will return an empty iterator + /// ``` + /// # use arrow_buffer::OffsetBuffer; + /// let offsets = OffsetBuffer::::new_empty(); + /// assert_eq!(offsets.lengths().count(), 0); + /// ``` + /// + /// This can be used to merge multiple [`OffsetBuffer`]s to one + /// ``` + /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; + /// + /// let buffer1 = OffsetBuffer::::from_lengths([2, 6, 3, 7, 2]); + /// let buffer2 = OffsetBuffer::::from_lengths([1, 3, 5, 7, 9]); + /// + /// let merged = OffsetBuffer::::from_lengths( + /// vec![buffer1, buffer2].iter().flat_map(|x| x.lengths()) + /// ); + /// + /// assert_eq!(merged.lengths().collect::>(), &[2, 6, 3, 7, 2, 1, 3, 5, 7, 9]); + /// ``` + pub fn lengths(&self) -> impl ExactSizeIterator + '_ { + self.0.windows(2).map(|x| x[1].as_usize() - x[0].as_usize()) + } + /// Free up unused memory. pub fn shrink_to_fit(&mut self) { self.0.shrink_to_fit(); @@ -163,44 +194,6 @@ impl OffsetBuffer { } } -impl OffsetBuffer { - /// Merge multiple [`OffsetBuffer`] into a single [`OffsetBuffer`] - /// - /// - /// ``` - /// # use arrow_buffer::{OffsetBuffer, ScalarBuffer}; - /// // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] - /// // The output should be - /// // [ 0, 3, 5, 7, 7, 13, 13, 14] - /// let buffers = [ - /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 3, 5])), - /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 2, 2, 8])), - /// OffsetBuffer::::new_empty(), - /// OffsetBuffer::::new(ScalarBuffer::from(vec![0, 0, 1])), - /// ]; - /// - /// let buffer = OffsetBuffer::::merge(&buffers); - /// assert_eq!(buffer.as_ref(), &[ 0, 3, 5, 7, 7, 13, 13, 14]); - /// ``` - /// - pub fn merge<'a, Iter>(offset_buffers_iterator: Iter) -> Self - where - Iter: IntoIterator>, - ::IntoIter: 'a + Clone, - { - let iter = MergeBuffersIter::from(offset_buffers_iterator.into_iter()); - if iter.len() == 0 { - return Self::new_empty(); - } - - let buffer = unsafe { Buffer::from_trusted_len_iter(iter) }; - - let scalar_buffer: ScalarBuffer = buffer.into(); - - Self::new(scalar_buffer) - } -} - impl Deref for OffsetBuffer { type Target = [T]; @@ -223,119 +216,6 @@ impl From> for OffsetBuffer { } } -struct MergeBuffersIter<'a, Offset: Integer + Copy> { - size: usize, - iterator: Box + 'a>, - inner_iterator: Box + 'a>, - advance_by: Offset, - next_advance_by: Offset, -} - -impl<'a, Offset, Iter> From for MergeBuffersIter<'a, Offset> -where - Offset: ArrowNativeType + Integer + Copy, - Iter: Iterator> + Clone + 'a, -{ - fn from(offset_buffers: Iter) -> Self { - Self::new(offset_buffers.clone(), Self::calculate_size(offset_buffers)) - } -} - -impl<'a, Offset: ArrowNativeType + Integer + Copy> MergeBuffersIter<'a, Offset> { - fn new( - offset_buffers_iterator: impl Iterator> + 'a, - size: usize, - ) -> Self { - let offsets_iterator: Box> = Box::new( - offset_buffers_iterator - // Filter out empty lists or lists with only 1 offset which are invalid as they should have at least 2 offsets (start and end) - .filter(|offset_buffer| offset_buffer.len() > 1) - .map(|offset_buffer| offset_buffer.inner().as_ref()), - ); - - Self { - size, - iterator: Box::new(offsets_iterator), - inner_iterator: if size == 0 { - Box::new([].into_iter()) - } else { - // Start initially with outputting the initial offset - Box::new([Offset::zero()].into_iter()) - }, - advance_by: Offset::zero(), - next_advance_by: Offset::zero(), - } - } - - fn calculate_size(buffers: impl Iterator>) -> usize { - // The total length of the merged offset buffer - // We calculate this so we can use the faster `try_from_trusted_len_iter` method which requires fixed length - let merged_offset_length: usize = buffers - // 1. `saturating_sub` as the list can be empty - // 2. subtract 1 as we have the initial offset of 0 that we don't need to count for each list, and we are adding 1 at the end - .map(|x| x.len().saturating_sub(1)) - .sum(); - - if merged_offset_length == 0 { - return 0; - } - - // we need to add 1 to the total length of the merged offset buffer as we have the initial offset of 0 - merged_offset_length + 1 - } -} - -impl Iterator for MergeBuffersIter<'_, Offset> { - type Item = Offset; - - fn next(&mut self) -> Option { - // 1. Consume the inner iterator first - let inner_value = self.inner_iterator.next(); - - // 2. If we have a value, advance it by the last value in the previous buffer (advance_by) - if inner_value.is_some() { - self.size -= 1; - return Some(inner_value.unwrap() + self.advance_by); - } - - self.advance_by = self.next_advance_by; - - // 3. If no more iterators, than we finished - let current_offset_buffer = self.iterator.next()?; - - // 4. Get the last value of the current buffer so we can know how much to advance the next buffer - // Safety: We already filtered out empty lists - let last_value = *current_offset_buffer.last().unwrap(); - - // 5. Update the next advance_by - self.next_advance_by = self.advance_by + last_value; - - self.inner_iterator = Box::new( - current_offset_buffer - .iter() - // 6. Skip the initial offset of 0 - // Skipping the first item as it is the initial offset of 0, - // and we skip even for the first iterator as we handle that by starting with inner_iterator of [0] - .skip(1) - .copied(), - ); - - // 7. Resume the inner iterator - // We already filtered out lists that have less than 2 offsets so can guarantee that the next call will return a value - self.next() - } - - fn size_hint(&self) -> (usize, Option) { - (self.size, Some(self.size)) - } -} - -impl ExactSizeIterator for MergeBuffersIter<'_, Offset> { - fn len(&self) -> usize { - self.size - } -} - #[cfg(test)] mod tests { use super::*; @@ -398,31 +278,22 @@ mod tests { } #[test] - fn merge_from() { - // [[0, 3, 5], [0, 2, 2, 8], [], [0, 0, 1]] - // The output should be - // [ 0, 3, 5, 7, 7, 13, 13, 14] - // - let buffers = [ - OffsetBuffer::::new(ScalarBuffer::from(vec![0, 3, 5])), - OffsetBuffer::::new(ScalarBuffer::from(vec![0, 2, 2, 8])), - OffsetBuffer::::new_empty(), - OffsetBuffer::::new(ScalarBuffer::from(vec![0, 0, 1])), - ]; - - let buffer = OffsetBuffer::::merge(&buffers); - assert_eq!(buffer.as_ref(), &[0, 3, 5, 7, 7, 13, 13, 14]); + fn get_lengths() { + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); + assert_eq!(offsets.lengths().collect::>(), vec![1, 3, 5]); + } + + #[test] + fn get_lengths_should_be_with_fixed_size() { + let offsets = OffsetBuffer::::new(ScalarBuffer::::from(vec![0, 1, 4, 9])); + let iter = offsets.lengths(); + assert_eq!(iter.size_hint(), (3, Some(3))); + assert_eq!(iter.len(), 3); } #[test] - fn merge_from_empty() { - let buffers = [ - OffsetBuffer::::new_empty(), - OffsetBuffer::::new_empty(), - OffsetBuffer::::new_empty(), - ]; - - let buffer = OffsetBuffer::::merge(&buffers); - assert_eq!(buffer.as_ref(), OffsetBuffer::::new_empty().as_ref()); + fn get_lengths_from_empty_offset_buffer_should_be_empty_iterator() { + let offsets = OffsetBuffer::::new_empty(); + assert_eq!(offsets.lengths().collect::>(), vec![]); } } diff --git a/arrow-select/src/concat.rs b/arrow-select/src/concat.rs index 58242ba03b3..20b84bdc63a 100644 --- a/arrow-select/src/concat.rs +++ b/arrow-select/src/concat.rs @@ -199,9 +199,10 @@ fn concat_list_of_dictionaries::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths())) + .into_inner() + .into_inner(); let builder = ArrayDataBuilder::new(arrays[0].data_type().clone()) .len(output_len) @@ -1031,90 +1032,6 @@ mod tests { ); } - #[test] - fn concat_dictionary_list_array_with_multiple_rows() { - let scalars = vec![ - create_list_of_dict(vec![ - // Row 1 - Some(vec![Some("a"), Some("c")]), - // Row 2 - None, - // Row 3 - Some(vec![Some("f"), Some("g"), None]), - // Row 4 - Some(vec![Some("c"), Some("f")]), - ]), - create_list_of_dict(vec![ - // Row 1 - Some(vec![Some("a")]), - // Row 2 - Some(vec![]), - // Row 3 - Some(vec![None, Some("b")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - ]), - create_list_of_dict(vec![ - // Row 1 - Some(vec![Some("g")]), - // Row 2 - Some(vec![Some("h"), Some("i")]), - // Row 3 - Some(vec![Some("j"), Some("a")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - ]), - ]; - let arrays = scalars - .iter() - .map(|a| a as &(dyn Array)) - .collect::>(); - let concat_res = concat(arrays.as_slice()).unwrap(); - - let expected_list = create_list_of_dict(vec![ - // First list: - - // Row 1 - Some(vec![Some("a"), Some("c")]), - // Row 2 - None, - // Row 3 - Some(vec![Some("f"), Some("g"), None]), - // Row 4 - Some(vec![Some("c"), Some("f")]), - // Second list: - // Row 1 - Some(vec![Some("a")]), - // Row 2 - Some(vec![]), - // Row 3 - Some(vec![None, Some("b")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - // Third list: - - // Row 1 - Some(vec![Some("g")]), - // Row 2 - Some(vec![Some("h"), Some("i")]), - // Row 3 - Some(vec![Some("j"), Some("a")]), - // Row 4 - Some(vec![Some("d"), Some("e")]), - ]); - - let list = concat_res.as_list::(); - - // Assert that the list is equal to the expected list - list.iter().zip(expected_list.iter()).for_each(|(a, b)| { - assert_eq!(a, b); - }); - - assert_dictionary_has_unique_values::<_, StringArray>( - list.values().as_dictionary::(), - ); - } - fn create_single_row_list_of_dict( list_items: Vec>>, ) -> GenericListArray {