Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary / row helpers #6096

Merged
merged 8 commits into from
Oct 8, 2024
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 168 additions & 2 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ use std::sync::Arc;
use arrow_array::cast::*;
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::*;
use variable::{decode_binary_view, decode_string_view};
Expand Down Expand Up @@ -738,6 +738,42 @@ impl RowConverter {
}
}

/// Create a new [Rows] instance from the given binary data.
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back in batch.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("small");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a little confused by .expect("small"). What does "small" mean in this context? Why not just .unwrap()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try_into_binary fails when the data is too large to be indexed with a 32-bit integer, so this was meant to suggest that it was fine to unwrap here because the data is known to be small. I'll expand the message a bit!

/// let converted = converter.from_binary(binary.clone());
/// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
/// ```
pub fn from_binary(&self, array: BinaryArray) -> Rows {
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(
array.null_count(),
0,
"can't construct Rows instance from array with nulls"
);
Rows {
buffer: array.values().to_vec(),
offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
config: RowConfig {
fields: Arc::clone(&self.fields),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More for my curiosity than anything but why Arc::clone(&self.fields) instead of self.fields.clone()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some people prefer this form because it makes it more explicit that we're just incrementing an arc and not cloning the underlying data. See the clippy lint docs for more: https://rust-lang.github.io/rust-clippy/master/index.html#/clone_on_ref_ptr

I've gotten used to this style, though I do not personally care deeply about it! This codebase seems to use a mix of both.

validate_utf8: true,
},
}
}

/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
Expand Down Expand Up @@ -878,6 +914,50 @@ impl Rows {
+ self.buffer.len()
+ self.offsets.len() * std::mem::size_of::<usize>()
}

/// Create a [BinaryArray] from the [Rows] data without reallocating the
/// underlying bytes.
///
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("small");
/// let parser = converter.parser();
/// let parsed: Vec<OwnedRow> =
/// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
/// assert_eq!(values, parsed);
/// ```
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
if self.buffer.len() > i32::MAX as usize {
return Err(ArrowError::InvalidArgumentError(format!(
"{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
self.buffer.len()
)));
}
// We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
// SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
let array = unsafe {
BinaryArray::new_unchecked(
OffsetBuffer::new_unchecked(offsets_scalar),
Buffer::from_vec(self.buffer),
None,
)
};
Ok(array)
}
}

impl<'a> IntoIterator for &'a Rows {
Expand Down Expand Up @@ -961,6 +1041,11 @@ impl<'a> Row<'a> {
config: self.config.clone(),
}
}

/// The row's bytes, with the lifetime of the underlying data.
pub fn data(&self) -> &'a [u8] {
self.data
}
}

// Manually derive these as don't wish to include `fields`
Expand Down Expand Up @@ -1838,6 +1923,69 @@ mod tests {
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "Encountered non UTF-8 data")]
fn test_invalid_utf8_array() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_rows = rows.try_into_binary().expect("known-small rows");

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}


#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty() {
let binary_row: &[u8] = &[];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty_array() {
let row: &[u8] = &[];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated() {
let binary_row: &[u8] = &[0x02];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated_array() {
let row: &[u8] = &[0x02];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "rows were not produced by this RowConverter")]
fn test_different_converter() {
Expand Down Expand Up @@ -2284,7 +2432,7 @@ mod tests {

let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();

let columns = options
let columns: Vec<SortField> = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
Expand Down Expand Up @@ -2317,6 +2465,24 @@ mod tests {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

// Check that we can convert
let rows = rows.try_into_binary().expect("reasonable size");
let parser = converter.parser();
let back = converter
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
.unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

let rows = converter.from_binary(rows);
let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
bkirwi marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading