From ad3d611fb0352f29dcd61e3f1ab5b93d8cf698a4 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 12 Jul 2024 17:48:05 -0400 Subject: [PATCH 1/7] Allow access to the underlying bytes of the row Currently these are accessible via `AsRef`, but that trait only gives you the bytes with the lifetime of the `Row` struct and not the lifetime of the backing data. --- arrow-row/src/lib.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index a6fd03b5bcec..201b4a44cfff 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -948,6 +948,11 @@ impl<'a> Row<'a> { config: self.config.clone(), } } + + /// The row's bytes, with the lifetime of the underlying data. + pub fn data(&self) -> &'a [u8] { + self.data + } } // Manually derive these as don't wish to include `fields` From 24a622a8dcbb642bf7251effc5aa0968a86abaa3 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 19 Jul 2024 17:28:11 -0400 Subject: [PATCH 2/7] Conversions to and from BinaryArray --- arrow-row/src/lib.rs | 57 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 201b4a44cfff..20591396a71a 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -132,7 +132,7 @@ use std::sync::Arc; use arrow_array::cast::*; use arrow_array::types::ArrowDictionaryKeyType; use arrow_array::*; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::*; use variable::{decode_binary_view, decode_string_view}; @@ -738,6 +738,23 @@ impl RowConverter { } } + /// Create a new [Rows] instance from the given binary data. + pub fn from_binary(&self, array: BinaryArray) -> Rows { + assert_eq!( + array.null_count(), + 0, + "can't construct Rows instance from array with nulls" + ); + Rows { + buffer: array.values().to_vec(), + offsets: array.offsets().into_iter().map(|&i| i.as_usize()).collect(), + config: RowConfig { + fields: Arc::clone(&self.fields), + validate_utf8: true, + }, + } + } + /// Convert raw bytes into [`ArrayRef`] /// /// # Safety @@ -868,6 +885,24 @@ impl Rows { + self.buffer.len() + self.offsets.len() * std::mem::size_of::() } + + /// Create a [BinaryArray] from the [Rows] data without reallocating the + /// underlying bytes. + pub fn into_binary(self) -> BinaryArray { + assert!( + self.buffer.len() <= i32::MAX as usize, + "rows buffer too large" + ); + let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as)); + // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer. + unsafe { + BinaryArray::new_unchecked( + OffsetBuffer::new_unchecked(offsets_scalar), + self.buffer.into(), + None, + ) + } + } } impl<'a> IntoIterator for &'a Rows { @@ -2276,7 +2311,7 @@ mod tests { let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); - let columns = options + let columns: Vec = options .into_iter() .zip(&arrays) .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) @@ -2309,6 +2344,24 @@ mod tests { actual.to_data().validate_full().unwrap(); dictionary_eq(actual, expected) } + + // Check that we can convert + let rows = rows.into_binary(); + let parser = converter.parser(); + let back = converter + .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes")))) + .unwrap(); + for (actual, expected) in back.iter().zip(&arrays) { + actual.to_data().validate_full().unwrap(); + dictionary_eq(actual, expected) + } + + let rows = converter.from_binary(rows); + let back = converter.convert_rows(&rows).unwrap(); + for (actual, expected) in back.iter().zip(&arrays) { + actual.to_data().validate_full().unwrap(); + dictionary_eq(actual, expected) + } } } From 980ec146388fabd5894b59b0038637445aa2bdc0 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 9 Aug 2024 10:54:11 -0400 Subject: [PATCH 3/7] Clippy fixes --- arrow-row/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 20591396a71a..5dc9ca66cb5b 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -747,7 +747,7 @@ impl RowConverter { ); Rows { buffer: array.values().to_vec(), - offsets: array.offsets().into_iter().map(|&i| i.as_usize()).collect(), + offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(), config: RowConfig { fields: Arc::clone(&self.fields), validate_utf8: true, From 6c2a980c046acde5eb4f21e9271ec1e027df055d Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 9 Aug 2024 11:14:05 -0400 Subject: [PATCH 4/7] Doc comments for binary conversions --- arrow-row/src/lib.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 5dc9ca66cb5b..585059096fa8 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -739,6 +739,25 @@ impl RowConverter { } /// Create a new [Rows] instance from the given binary data. + /// + /// ``` + /// # use std::sync::Arc; + /// # use std::collections::HashSet; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::StringArray; + /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; + /// # use arrow_schema::DataType; + /// # + /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); + /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); + /// + /// // We can convert rows into binary format and back in batch. + /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); + /// let binary = rows.into_binary(); + /// let converted = converter.from_binary(binary.clone()); + /// assert!(converted.iter().eq(values.iter().map(|r| r.row()))); + /// ``` pub fn from_binary(&self, array: BinaryArray) -> Rows { assert_eq!( array.null_count(), @@ -888,6 +907,28 @@ impl Rows { /// Create a [BinaryArray] from the [Rows] data without reallocating the /// underlying bytes. + /// + /// + /// ``` + /// # use std::sync::Arc; + /// # use std::collections::HashSet; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::StringArray; + /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; + /// # use arrow_schema::DataType; + /// # + /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); + /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); + /// + /// // We can convert rows into binary format and back. + /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); + /// let binary = rows.into_binary(); + /// let parser = converter.parser(); + /// let parsed: Vec = + /// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect(); + /// assert_eq!(values, parsed); + /// ``` pub fn into_binary(self) -> BinaryArray { assert!( self.buffer.len() <= i32::MAX as usize, From 3a646e54dd5b40691930449d45113732bae47580 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 9 Aug 2024 11:33:39 -0400 Subject: [PATCH 5/7] Converting to binary now returns a result --- arrow-row/src/lib.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 585059096fa8..b4d90db5a1c6 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -754,7 +754,7 @@ impl RowConverter { /// /// // We can convert rows into binary format and back in batch. /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); - /// let binary = rows.into_binary(); + /// let binary = rows.try_into_binary().expect("small"); /// let converted = converter.from_binary(binary.clone()); /// assert!(converted.iter().eq(values.iter().map(|r| r.row()))); /// ``` @@ -923,26 +923,30 @@ impl Rows { /// /// // We can convert rows into binary format and back. /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); - /// let binary = rows.into_binary(); + /// let binary = rows.try_into_binary().expect("small"); /// let parser = converter.parser(); /// let parsed: Vec = /// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect(); /// assert_eq!(values, parsed); /// ``` - pub fn into_binary(self) -> BinaryArray { - assert!( - self.buffer.len() <= i32::MAX as usize, - "rows buffer too large" - ); + pub fn try_into_binary(self) -> Result { + if self.buffer.len() > i32::MAX as usize { + return Err(ArrowError::InvalidArgumentError(format!( + "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray", + self.buffer.len() + ))); + } + // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well. let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as)); // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer. - unsafe { + let array = unsafe { BinaryArray::new_unchecked( OffsetBuffer::new_unchecked(offsets_scalar), self.buffer.into(), None, ) - } + }; + Ok(array) } } @@ -2387,7 +2391,7 @@ mod tests { } // Check that we can convert - let rows = rows.into_binary(); + let rows = rows.try_into_binary().expect("reasonable size"); let parser = converter.parser(); let back = converter .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes")))) From c3b6b36930f35e00bbd5634b21991d32f92f8386 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 9 Aug 2024 12:20:41 -0400 Subject: [PATCH 6/7] Add some negative tests for invalid bytes --- arrow-row/src/lib.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index b4d90db5a1c6..29558d6554ce 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -1910,6 +1910,69 @@ mod tests { converter.convert_rows(std::iter::once(utf8_row)).unwrap(); } + #[test] + #[should_panic(expected = "Encountered non UTF-8 data")] + fn test_invalid_utf8_array() { + let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); + let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; + let rows = converter.convert_columns(&[array]).unwrap(); + let binary_rows = rows.try_into_binary().expect("known-small rows"); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parsed = converter.from_binary(binary_rows); + + converter.convert_rows(parsed.iter()).unwrap(); + } + + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_empty() { + let binary_row: &[u8] = &[]; + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parser = converter.parser(); + let utf8_row = parser.parse(binary_row.as_ref()); + + converter.convert_rows(std::iter::once(utf8_row)).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_empty_array() { + let row: &[u8] = &[]; + let binary_rows = BinaryArray::from(vec![row]); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parsed = converter.from_binary(binary_rows); + + converter.convert_rows(parsed.iter()).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_truncated() { + let binary_row: &[u8] = &[0x02]; + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parser = converter.parser(); + let utf8_row = parser.parse(binary_row.as_ref()); + + converter.convert_rows(std::iter::once(utf8_row)).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_truncated_array() { + let row: &[u8] = &[0x02]; + let binary_rows = BinaryArray::from(vec![row]); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parsed = converter.from_binary(binary_rows); + + converter.convert_rows(parsed.iter()).unwrap(); + } + #[test] #[should_panic(expected = "rows were not produced by this RowConverter")] fn test_different_converter() { From c2f0c7d9f88beeb29ad24431e774bd99b5a86b41 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Fri, 4 Oct 2024 11:45:42 -0400 Subject: [PATCH 7/7] Address clippy and review comments --- arrow-row/src/lib.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 0bb30e9aeedc..5bfb6043ff54 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -754,10 +754,16 @@ impl RowConverter { /// /// // We can convert rows into binary format and back in batch. /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); - /// let binary = rows.try_into_binary().expect("small"); + /// let binary = rows.try_into_binary().expect("known-small array"); /// let converted = converter.from_binary(binary.clone()); /// assert!(converted.iter().eq(values.iter().map(|r| r.row()))); /// ``` + /// + /// # Panics + /// + /// This function expects the passed [BinaryArray] to contain valid row data as produced by this + /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may + /// panic if the data is malformed. pub fn from_binary(&self, array: BinaryArray) -> Rows { assert_eq!( array.null_count(), @@ -933,12 +939,17 @@ impl Rows { /// /// // We can convert rows into binary format and back. /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); - /// let binary = rows.try_into_binary().expect("small"); + /// let binary = rows.try_into_binary().expect("known-small array"); /// let parser = converter.parser(); /// let parsed: Vec = /// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect(); /// assert_eq!(values, parsed); /// ``` + /// + /// # Errors + /// + /// This function will return an error if there is more data than can be stored in + /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB. pub fn try_into_binary(self) -> Result { if self.buffer.len() > i32::MAX as usize { return Err(ArrowError::InvalidArgumentError(format!( @@ -1937,7 +1948,6 @@ mod tests { converter.convert_rows(parsed.iter()).unwrap(); } - #[test] #[should_panic(expected = "index out of bounds")] fn test_invalid_empty() {