diff --git a/arrow-row/src/lib.rs b/arrow-row/src/lib.rs index 2d9af757550e..5bfb6043ff54 100644 --- a/arrow-row/src/lib.rs +++ b/arrow-row/src/lib.rs @@ -132,7 +132,7 @@ use std::sync::Arc; use arrow_array::cast::*; use arrow_array::types::ArrowDictionaryKeyType; use arrow_array::*; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::*; use variable::{decode_binary_view, decode_string_view}; @@ -738,6 +738,48 @@ impl RowConverter { } } + /// Create a new [Rows] instance from the given binary data. + /// + /// ``` + /// # use std::sync::Arc; + /// # use std::collections::HashSet; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::StringArray; + /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; + /// # use arrow_schema::DataType; + /// # + /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); + /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); + /// + /// // We can convert rows into binary format and back in batch. + /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); + /// let binary = rows.try_into_binary().expect("known-small array"); + /// let converted = converter.from_binary(binary.clone()); + /// assert!(converted.iter().eq(values.iter().map(|r| r.row()))); + /// ``` + /// + /// # Panics + /// + /// This function expects the passed [BinaryArray] to contain valid row data as produced by this + /// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may + /// panic if the data is malformed. + pub fn from_binary(&self, array: BinaryArray) -> Rows { + assert_eq!( + array.null_count(), + 0, + "can't construct Rows instance from array with nulls" + ); + Rows { + buffer: array.values().to_vec(), + offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(), + config: RowConfig { + fields: Arc::clone(&self.fields), + validate_utf8: true, + }, + } + } + /// Convert raw bytes into [`ArrayRef`] /// /// # Safety @@ -878,6 +920,55 @@ impl Rows { + self.buffer.len() + self.offsets.len() * std::mem::size_of::() } + + /// Create a [BinaryArray] from the [Rows] data without reallocating the + /// underlying bytes. + /// + /// + /// ``` + /// # use std::sync::Arc; + /// # use std::collections::HashSet; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::StringArray; + /// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField}; + /// # use arrow_schema::DataType; + /// # + /// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + /// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]); + /// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap(); + /// + /// // We can convert rows into binary format and back. + /// let values: Vec = rows.iter().map(|r| r.owned()).collect(); + /// let binary = rows.try_into_binary().expect("known-small array"); + /// let parser = converter.parser(); + /// let parsed: Vec = + /// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect(); + /// assert_eq!(values, parsed); + /// ``` + /// + /// # Errors + /// + /// This function will return an error if there is more data than can be stored in + /// a [BinaryArray] -- i.e. if the total data size is more than 2GiB. + pub fn try_into_binary(self) -> Result { + if self.buffer.len() > i32::MAX as usize { + return Err(ArrowError::InvalidArgumentError(format!( + "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray", + self.buffer.len() + ))); + } + // We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well. + let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as)); + // SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer. + let array = unsafe { + BinaryArray::new_unchecked( + OffsetBuffer::new_unchecked(offsets_scalar), + Buffer::from_vec(self.buffer), + None, + ) + }; + Ok(array) + } } impl<'a> IntoIterator for &'a Rows { @@ -961,6 +1052,11 @@ impl<'a> Row<'a> { config: self.config.clone(), } } + + /// The row's bytes, with the lifetime of the underlying data. + pub fn data(&self) -> &'a [u8] { + self.data + } } // Manually derive these as don't wish to include `fields` @@ -1838,6 +1934,68 @@ mod tests { converter.convert_rows(std::iter::once(utf8_row)).unwrap(); } + #[test] + #[should_panic(expected = "Encountered non UTF-8 data")] + fn test_invalid_utf8_array() { + let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap(); + let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _; + let rows = converter.convert_columns(&[array]).unwrap(); + let binary_rows = rows.try_into_binary().expect("known-small rows"); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parsed = converter.from_binary(binary_rows); + + converter.convert_rows(parsed.iter()).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_empty() { + let binary_row: &[u8] = &[]; + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parser = converter.parser(); + let utf8_row = parser.parse(binary_row.as_ref()); + + converter.convert_rows(std::iter::once(utf8_row)).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_empty_array() { + let row: &[u8] = &[]; + let binary_rows = BinaryArray::from(vec![row]); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parsed = converter.from_binary(binary_rows); + + converter.convert_rows(parsed.iter()).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_truncated() { + let binary_row: &[u8] = &[0x02]; + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parser = converter.parser(); + let utf8_row = parser.parse(binary_row.as_ref()); + + converter.convert_rows(std::iter::once(utf8_row)).unwrap(); + } + + #[test] + #[should_panic(expected = "index out of bounds")] + fn test_invalid_truncated_array() { + let row: &[u8] = &[0x02]; + let binary_rows = BinaryArray::from(vec![row]); + + let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap(); + let parsed = converter.from_binary(binary_rows); + + converter.convert_rows(parsed.iter()).unwrap(); + } + #[test] #[should_panic(expected = "rows were not produced by this RowConverter")] fn test_different_converter() { @@ -2284,7 +2442,7 @@ mod tests { let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap(); - let columns = options + let columns: Vec = options .into_iter() .zip(&arrays) .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o)) @@ -2317,6 +2475,24 @@ mod tests { actual.to_data().validate_full().unwrap(); dictionary_eq(actual, expected) } + + // Check that we can convert + let rows = rows.try_into_binary().expect("reasonable size"); + let parser = converter.parser(); + let back = converter + .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes")))) + .unwrap(); + for (actual, expected) in back.iter().zip(&arrays) { + actual.to_data().validate_full().unwrap(); + dictionary_eq(actual, expected) + } + + let rows = converter.from_binary(rows); + let back = converter.convert_rows(&rows).unwrap(); + for (actual, expected) in back.iter().zip(&arrays) { + actual.to_data().validate_full().unwrap(); + dictionary_eq(actual, expected) + } } }