-
Notifications
You must be signed in to change notification settings - Fork 837
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RecordBatch
normalization (flattening)
#6758
base: main
Are you sure you want to change the base?
Changes from all commits
bbd7c8b
8abcd25
6bba7d3
55eb953
30d6294
0ed979d
d9d08cd
d1b3260
a12082c
7adda58
9c9c699
4422add
d0dc5a7
1e40c98
3c424d1
6d6b026
71380b6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ | |
//! A two-dimensional batch of column-oriented data with a defined | ||
//! [schema](arrow_schema::Schema). | ||
|
||
use crate::cast::AsArray; | ||
use crate::{new_empty_array, Array, ArrayRef, StructArray}; | ||
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef}; | ||
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef}; | ||
use std::collections::VecDeque; | ||
use std::ops::Index; | ||
use std::sync::Arc; | ||
|
||
|
@@ -394,6 +396,96 @@ impl RecordBatch { | |
) | ||
} | ||
|
||
/// Normalize a semi-structured [`RecordBatch`] into a flat table. | ||
/// | ||
/// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for | ||
/// separator= "." and the schema: | ||
/// ```text | ||
/// "foo": StructArray<"bar": Utf8> | ||
/// ``` | ||
/// will generate: | ||
/// ```text | ||
/// "foo.bar": Utf8 | ||
/// ``` | ||
/// `max_level`: The maximum number of levels (depth of the `Schema` and `Columns`) to | ||
/// normalize. If `0`, normalizes all levels. | ||
/// | ||
/// # Example | ||
/// | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
/// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, RecordBatch}; | ||
/// # use arrow_schema::{DataType, Field, Fields, Schema}; | ||
/// | ||
/// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); | ||
/// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); | ||
/// | ||
/// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); | ||
/// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); | ||
/// | ||
/// let a = Arc::new(StructArray::from(vec![ | ||
/// (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef), | ||
/// (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef), | ||
/// ])); | ||
/// | ||
/// let schema = Schema::new(vec![ | ||
/// Field::new( | ||
/// "a", | ||
/// DataType::Struct(Fields::from(vec![animals_field, n_legs_field])), | ||
/// false, | ||
/// ) | ||
/// ]); | ||
/// | ||
/// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a]) | ||
/// .expect("valid conversion") | ||
/// .normalize(".", 0) | ||
/// .expect("valid normalization"); | ||
/// | ||
/// let expected = RecordBatch::try_from_iter_with_nullable(vec![ | ||
/// ("a.animals", animals.clone(), true), | ||
/// ("a.n_legs", n_legs.clone(), true), | ||
/// ]) | ||
/// .expect("valid conversion"); | ||
/// | ||
/// assert_eq!(expected, normalized); | ||
/// ``` | ||
pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result<Self, ArrowError> { | ||
if max_level == 0 { | ||
max_level = usize::MAX; | ||
} | ||
let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = VecDeque::new(); | ||
for (c, f) in self.columns.iter().zip(self.schema.fields()) { | ||
let name_vec: Vec<&str> = vec![f.name()]; | ||
queue.push_back((0, c, name_vec, f)); | ||
} | ||
let mut columns: Vec<ArrayRef> = Vec::new(); | ||
let mut fields: Vec<FieldRef> = Vec::new(); | ||
|
||
while let Some((depth, c, name, field_ref)) = queue.pop_front() { | ||
match field_ref.data_type() { | ||
DataType::Struct(ff) if depth < max_level => { | ||
// Need to zip these in reverse to maintain original order | ||
for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { | ||
let mut name = name.clone(); | ||
name.push(separator); | ||
name.push(fff.name()); | ||
queue.push_front((depth + 1, cff, name, fff)) | ||
} | ||
} | ||
_ => { | ||
let updated_field = Field::new( | ||
name.concat(), | ||
field_ref.data_type().clone(), | ||
field_ref.is_nullable(), | ||
); | ||
columns.push(c.clone()); | ||
fields.push(Arc::new(updated_field)); | ||
} | ||
} | ||
} | ||
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) | ||
} | ||
|
||
/// Returns the number of columns in the record batch. | ||
/// | ||
/// # Example | ||
|
@@ -768,15 +860,14 @@ where | |
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::collections::HashMap; | ||
|
||
use super::*; | ||
use crate::{ | ||
BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray, | ||
}; | ||
use arrow_buffer::{Buffer, ToByteSlice}; | ||
use arrow_data::{ArrayData, ArrayDataBuilder}; | ||
use arrow_schema::Fields; | ||
use std::collections::HashMap; | ||
|
||
#[test] | ||
fn create_record_batch() { | ||
|
@@ -1197,6 +1288,172 @@ mod tests { | |
assert_ne!(batch1, batch2); | ||
} | ||
|
||
#[test] | ||
fn normalize_simple() { | ||
let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); | ||
let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); | ||
let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); | ||
|
||
let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); | ||
let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); | ||
let year_field = Arc::new(Field::new("year", DataType::Int64, true)); | ||
|
||
let a = Arc::new(StructArray::from(vec![ | ||
(animals_field.clone(), Arc::new(animals.clone()) as ArrayRef), | ||
(n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef), | ||
(year_field.clone(), Arc::new(year.clone()) as ArrayRef), | ||
])); | ||
|
||
let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); | ||
|
||
let schema = Schema::new(vec![ | ||
Field::new( | ||
"a", | ||
DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), | ||
false, | ||
), | ||
Field::new("month", DataType::Int64, true), | ||
]); | ||
|
||
let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) | ||
.expect("valid conversion") | ||
.normalize(".", 0) | ||
.expect("valid normalization"); | ||
|
||
let expected = RecordBatch::try_from_iter_with_nullable(vec![ | ||
("a.animals", animals.clone(), true), | ||
("a.n_legs", n_legs.clone(), true), | ||
("a.year", year.clone(), true), | ||
("month", month.clone(), true), | ||
]) | ||
.expect("valid conversion"); | ||
|
||
assert_eq!(expected, normalized); | ||
} | ||
|
||
#[test] | ||
fn normalize_nested() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps have some test cases with some more complex types thrown in as well? e.g. have a ListArray with a StructArray within (Even if to prove that the Struct within the List shouldn't be affected) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I'll work on these. I was a little hesitant since I'm not sure how many cases I need to cover (also since these tests are really annoying to instantiate), but it is a current blind spot. |
||
// Initialize schema | ||
let a = Arc::new(Field::new("a", DataType::Int64, true)); | ||
let b = Arc::new(Field::new("b", DataType::Int64, false)); | ||
let c = Arc::new(Field::new("c", DataType::Int64, true)); | ||
|
||
let one = Arc::new(Field::new( | ||
"1", | ||
DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), | ||
false, | ||
)); | ||
let two = Arc::new(Field::new( | ||
"2", | ||
DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), | ||
true, | ||
)); | ||
|
||
let exclamation = Arc::new(Field::new( | ||
"!", | ||
DataType::Struct(Fields::from(vec![one.clone(), two.clone()])), | ||
false, | ||
)); | ||
|
||
let schema = Schema::new(vec![exclamation.clone()]); | ||
|
||
// Initialize fields | ||
let a_field = Int64Array::from(vec![Some(0), Some(1)]); | ||
let b_field = Int64Array::from(vec![Some(2), Some(3)]); | ||
let c_field = Int64Array::from(vec![None, Some(4)]); | ||
|
||
let one_field = StructArray::from(vec![ | ||
(a.clone(), Arc::new(a_field.clone()) as ArrayRef), | ||
(b.clone(), Arc::new(b_field.clone()) as ArrayRef), | ||
(c.clone(), Arc::new(c_field.clone()) as ArrayRef), | ||
]); | ||
let two_field = StructArray::from(vec![ | ||
(a.clone(), Arc::new(a_field.clone()) as ArrayRef), | ||
(b.clone(), Arc::new(b_field.clone()) as ArrayRef), | ||
(c.clone(), Arc::new(c_field.clone()) as ArrayRef), | ||
]); | ||
|
||
let exclamation_field = Arc::new(StructArray::from(vec![ | ||
(one.clone(), Arc::new(one_field) as ArrayRef), | ||
(two.clone(), Arc::new(two_field) as ArrayRef), | ||
])); | ||
|
||
// Normalize top level | ||
let normalized = | ||
RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) | ||
.expect("valid conversion") | ||
.normalize(".", 1) | ||
.expect("valid normalization"); | ||
|
||
let expected = RecordBatch::try_from_iter_with_nullable(vec![ | ||
( | ||
"!.1", | ||
Arc::new(StructArray::from(vec![ | ||
(a.clone(), Arc::new(a_field.clone()) as ArrayRef), | ||
(b.clone(), Arc::new(b_field.clone()) as ArrayRef), | ||
(c.clone(), Arc::new(c_field.clone()) as ArrayRef), | ||
])) as ArrayRef, | ||
false, | ||
), | ||
( | ||
"!.2", | ||
Arc::new(StructArray::from(vec![ | ||
(a.clone(), Arc::new(a_field.clone()) as ArrayRef), | ||
(b.clone(), Arc::new(b_field.clone()) as ArrayRef), | ||
(c.clone(), Arc::new(c_field.clone()) as ArrayRef), | ||
])) as ArrayRef, | ||
true, | ||
), | ||
]) | ||
.expect("valid conversion"); | ||
|
||
assert_eq!(expected, normalized); | ||
|
||
// Normalize all levels | ||
let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) | ||
.expect("valid conversion") | ||
.normalize(".", 0) | ||
.expect("valid normalization"); | ||
|
||
let expected = RecordBatch::try_from_iter_with_nullable(vec![ | ||
("!.1.a", Arc::new(a_field.clone()) as ArrayRef, true), | ||
("!.1.b", Arc::new(b_field.clone()) as ArrayRef, false), | ||
("!.1.c", Arc::new(c_field.clone()) as ArrayRef, true), | ||
("!.2.a", Arc::new(a_field.clone()) as ArrayRef, true), | ||
("!.2.b", Arc::new(b_field.clone()) as ArrayRef, false), | ||
("!.2.c", Arc::new(c_field.clone()) as ArrayRef, true), | ||
]) | ||
.expect("valid conversion"); | ||
|
||
assert_eq!(expected, normalized); | ||
} | ||
|
||
#[test] | ||
fn normalize_empty() { | ||
let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); | ||
let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); | ||
let year_field = Arc::new(Field::new("year", DataType::Int64, true)); | ||
|
||
let schema = Schema::new(vec![ | ||
Field::new( | ||
"a", | ||
DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), | ||
false, | ||
), | ||
Field::new("month", DataType::Int64, true), | ||
]); | ||
|
||
let normalized = RecordBatch::new_empty(Arc::new(schema.clone())) | ||
.normalize(".", 0) | ||
.expect("valid normalization"); | ||
|
||
let expected = RecordBatch::new_empty(Arc::new( | ||
schema.normalize(".", 0).expect("valid normalization"), | ||
)); | ||
|
||
assert_eq!(expected, normalized); | ||
} | ||
|
||
#[test] | ||
fn project() { | ||
let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo this seems the more Rusty way, making use of Option instead of a sentinel value (though I'm not sure if
Some(0)
is a valid input?)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I've been working on this a bit, I found a few possible solutions that might fit. I think Option might not be the best choice, since personally, the case of Some(0) feels weird to me, and would mean you're doing an annoying copy for no reason (because of that, I would want to add in an if statement to catch it, but then we end up in the same place).
For
RecordBatch
, this seems to fit the Rusty syntax better, but unfortunately the same solution can't be echoed over toSchema
without an additional dependency, not sure how I feel about that.Another option is to use something like
NonZeroUsize
, which I just learned about. My issue with this one is that we'd then be making the normalize call more annoying, since you have to instantiate it with something likeThis makes the normalize call potentially longer and more annoying, but it means there wouldn't be another import.
Any thoughts on these/if you disagree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I find this okay. I'm less concerned with requiring an if check inside the code (its pretty simple anyway) compared to presenting a more Rust-like interface to users.
I don't follow this, the
Schema
code looks almost identical to theRecordBatch
version.I agree with
NonZeroUsize
potentially being a bit clunky for users to use (personally wasn't aware this was part of the stdlib either).But yeah I'm curious to see what others might think for this too.