Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary / row helpers #6096

Merged
merged 8 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 178 additions & 2 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ use std::sync::Arc;
use arrow_array::cast::*;
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::*;
use variable::{decode_binary_view, decode_string_view};
Expand Down Expand Up @@ -738,6 +738,48 @@ impl RowConverter {
}
}

/// Create a new [Rows] instance from the given binary data.
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back in batch.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("known-small array");
/// let converted = converter.from_binary(binary.clone());
/// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
/// ```
///
/// # Panics
///
/// This function expects the passed [BinaryArray] to contain valid row data as produced by this
/// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
/// panic if the data is malformed.
pub fn from_binary(&self, array: BinaryArray) -> Rows {
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(
array.null_count(),
0,
"can't construct Rows instance from array with nulls"
);
Rows {
buffer: array.values().to_vec(),
offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
config: RowConfig {
fields: Arc::clone(&self.fields),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More for my curiosity than anything but why Arc::clone(&self.fields) instead of self.fields.clone()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some people prefer this form because it makes it more explicit that we're just incrementing an arc and not cloning the underlying data. See the clippy lint docs for more: https://rust-lang.github.io/rust-clippy/master/index.html#/clone_on_ref_ptr

I've gotten used to this style, though I do not personally care deeply about it! This codebase seems to use a mix of both.

validate_utf8: true,
},
}
}

/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
Expand Down Expand Up @@ -878,6 +920,55 @@ impl Rows {
+ self.buffer.len()
+ self.offsets.len() * std::mem::size_of::<usize>()
}

/// Create a [BinaryArray] from the [Rows] data without reallocating the
/// underlying bytes.
///
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("known-small array");
/// let parser = converter.parser();
/// let parsed: Vec<OwnedRow> =
/// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
/// assert_eq!(values, parsed);
/// ```
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Errors
///
/// This function will return an error if there is more data than can be stored in
/// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
if self.buffer.len() > i32::MAX as usize {
return Err(ArrowError::InvalidArgumentError(format!(
"{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
self.buffer.len()
)));
}
// We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
// SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
let array = unsafe {
BinaryArray::new_unchecked(
OffsetBuffer::new_unchecked(offsets_scalar),
Buffer::from_vec(self.buffer),
None,
)
};
Ok(array)
}
}

impl<'a> IntoIterator for &'a Rows {
Expand Down Expand Up @@ -961,6 +1052,11 @@ impl<'a> Row<'a> {
config: self.config.clone(),
}
}

/// The row's bytes, with the lifetime of the underlying data.
pub fn data(&self) -> &'a [u8] {
self.data
}
}

// Manually derive these as don't wish to include `fields`
Expand Down Expand Up @@ -1838,6 +1934,68 @@ mod tests {
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "Encountered non UTF-8 data")]
fn test_invalid_utf8_array() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_rows = rows.try_into_binary().expect("known-small rows");

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty() {
let binary_row: &[u8] = &[];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty_array() {
let row: &[u8] = &[];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated() {
let binary_row: &[u8] = &[0x02];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated_array() {
let row: &[u8] = &[0x02];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "rows were not produced by this RowConverter")]
fn test_different_converter() {
Expand Down Expand Up @@ -2284,7 +2442,7 @@ mod tests {

let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();

let columns = options
let columns: Vec<SortField> = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
Expand Down Expand Up @@ -2317,6 +2475,24 @@ mod tests {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

// Check that we can convert
let rows = rows.try_into_binary().expect("reasonable size");
let parser = converter.parser();
let back = converter
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
.unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

let rows = converter.from_binary(rows);
let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading