-
Notifications
You must be signed in to change notification settings - Fork 824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RecordBatch
normalization (flattening)
#6758
base: main
Are you sure you want to change the base?
Changes from 6 commits
bbd7c8b
8abcd25
6bba7d3
55eb953
30d6294
0ed979d
d9d08cd
d1b3260
a12082c
7adda58
9c9c699
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,8 +18,10 @@ | |
//! A two-dimensional batch of column-oriented data with a defined | ||
//! [schema](arrow_schema::Schema). | ||
|
||
use crate::cast::AsArray; | ||
use crate::{new_empty_array, Array, ArrayRef, StructArray}; | ||
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef}; | ||
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef}; | ||
use std::collections::VecDeque; | ||
use std::ops::Index; | ||
use std::sync::Arc; | ||
|
||
|
@@ -403,6 +405,56 @@ impl RecordBatch { | |
) | ||
} | ||
|
||
/// Normalize a semi-structured [`RecordBatch`] into a flat table. | ||
/// | ||
/// If max_level is 0, normalizes all levels. | ||
pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result<Self, ArrowError> { | ||
if max_level == 0 { | ||
max_level = usize::MAX; | ||
} | ||
if self.num_rows() == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this worth keeping? I could be reading this wrong, but it looks like there's a lot of code strictly to support normalization for the 0 row case (which is likely very rare)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I'm not sure the best case to handle this. I think the secondary case can be removed, as I missed that even with a For the A possible idea could be to set up like
then do some matching? Might be overkill though. And of course, there's the option of removing it altogether, although then a value of 0 would mean no change? |
||
// No data, only need to normalize the schema | ||
return Ok(Self::new_empty(Arc::new( | ||
self.schema.normalize(separator, max_level)?, | ||
))); | ||
} | ||
let mut queue: VecDeque<(usize, (ArrayRef, FieldRef))> = VecDeque::new(); | ||
|
||
for (c, f) in self.columns.iter().zip(self.schema.fields()) { | ||
queue.push_back((0, ((*c).clone(), (*f).clone()))); | ||
} | ||
|
||
let mut columns: Vec<ArrayRef> = Vec::new(); | ||
let mut fields: Vec<FieldRef> = Vec::new(); | ||
|
||
while let Some((depth, (c, f))) = queue.pop_front() { | ||
if depth < max_level { | ||
match f.data_type() { | ||
DataType::Struct(ff) => { | ||
// Need to zip these in reverse to maintain original order | ||
for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { | ||
let new_key = format!("{}{}{}", f.name(), separator, fff.name()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if there's a better way to structure it, but is there a way to keep the field name parts in a Might not be worth the trouble though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a good point, this is definitely not my favorite way to do this. I'll have to do some testing and think about it some more, but it may be better to construct the queue with the components of the |
||
let updated_field = Field::new( | ||
new_key.as_str(), | ||
fff.data_type().clone(), | ||
fff.is_nullable(), | ||
); | ||
queue.push_front((depth + 1, (cff.clone(), Arc::new(updated_field)))) | ||
} | ||
} | ||
_ => { | ||
columns.push(c); | ||
fields.push(f); | ||
} | ||
} | ||
} else { | ||
columns.push(c); | ||
fields.push(f); | ||
} | ||
} | ||
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) | ||
} | ||
|
||
/// Returns the number of columns in the record batch. | ||
/// | ||
/// # Example | ||
|
@@ -1206,6 +1258,145 @@ mod tests { | |
assert_ne!(batch1, batch2); | ||
} | ||
|
||
#[test] | ||
fn normalize() { | ||
let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); | ||
let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); | ||
let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); | ||
|
||
let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); | ||
let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); | ||
let year_field = Arc::new(Field::new("year", DataType::Int64, true)); | ||
|
||
let a = Arc::new(StructArray::from(vec![ | ||
(animals_field.clone(), Arc::new(animals.clone()) as ArrayRef), | ||
(n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef), | ||
(year_field.clone(), Arc::new(year.clone()) as ArrayRef), | ||
])); | ||
|
||
let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); | ||
|
||
let schema = Schema::new(vec![ | ||
Field::new( | ||
"a", | ||
DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), | ||
false, | ||
), | ||
Field::new("month", DataType::Int64, true), | ||
]); | ||
|
||
let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) | ||
.expect("valid conversion") | ||
.normalize(".", 0) | ||
.expect("valid normalization"); | ||
|
||
let expected = RecordBatch::try_from_iter_with_nullable(vec![ | ||
("a.animals", animals.clone(), true), | ||
("a.n_legs", n_legs.clone(), true), | ||
("a.year", year.clone(), true), | ||
("month", month.clone(), true), | ||
]) | ||
.expect("valid conversion"); | ||
|
||
assert_eq!(expected, normalized); | ||
} | ||
|
||
#[test] | ||
fn normalize_nested() { | ||
// Initialize schema | ||
let a = Arc::new(Field::new("a", DataType::Utf8, true)); | ||
let b = Arc::new(Field::new("b", DataType::Int64, false)); | ||
let c = Arc::new(Field::new("c", DataType::Int64, true)); | ||
|
||
let d = Arc::new(Field::new("d", DataType::Utf8, true)); | ||
let e = Arc::new(Field::new("e", DataType::Int64, false)); | ||
let f = Arc::new(Field::new("f", DataType::Int64, true)); | ||
|
||
let one = Arc::new(Field::new( | ||
"1", | ||
DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), | ||
false, | ||
)); | ||
let two = Arc::new(Field::new( | ||
"2", | ||
DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), | ||
true, | ||
)); | ||
|
||
let exclamation = Arc::new(Field::new( | ||
"!", | ||
DataType::Struct(Fields::from(vec![one, two])), | ||
false, | ||
)); | ||
|
||
// Initialize fields | ||
let a_field: ArrayRef = Arc::new(StringArray::from(vec!["a1_field_data", "a1_field_data"])); | ||
let b_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(0), Some(1)])); | ||
let c_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2)])); | ||
|
||
let d_field: ArrayRef = Arc::new(StringArray::from(vec!["d1_field_data", "d2_field_data"])); | ||
let e_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), Some(4)])); | ||
let f_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(5)])); | ||
|
||
let one_field = Arc::new(StructArray::from(vec![ | ||
(a.clone(), Arc::new(a_field.clone()) as ArrayRef), | ||
(b.clone(), Arc::new(b_field.clone()) as ArrayRef), | ||
(c.clone(), Arc::new(c_field.clone()) as ArrayRef), | ||
])); | ||
let two_field = Arc::new(StructArray::from(vec![ | ||
(a.clone(), Arc::new(a_field.clone()) as ArrayRef), | ||
(b.clone(), Arc::new(b_field.clone()) as ArrayRef), | ||
(c.clone(), Arc::new(c_field.clone()) as ArrayRef), | ||
])); | ||
|
||
/*let exclamation_field = Arc::new(StructArray::from(vec![ | ||
ngli-me marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(one.clone(), Arc::new(one_field.clone()) as ArrayRef), | ||
(two.clone(), Arc::new(two_field.clone()) as ArrayRef), | ||
]));*/ | ||
|
||
let schema = Schema::new(vec![exclamation.clone()]); | ||
/*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) | ||
.expect("valid conversion");*/ | ||
//.normalize(".", 0) | ||
//.expect("valid normalization"); | ||
|
||
/*let expected = RecordBatch::try_from_iter_with_nullable(vec![ | ||
("a.animals", animals.clone(), true), | ||
("a.n_legs", n_legs.clone(), true), | ||
("a.year", year.clone(), true), | ||
("month", month.clone(), true), | ||
]) | ||
.expect("valid conversion");*/ | ||
|
||
//assert_eq!(expected, normalized); | ||
} | ||
|
||
#[test] | ||
fn normalize_empty() { | ||
let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); | ||
let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); | ||
let year_field = Arc::new(Field::new("year", DataType::Int64, true)); | ||
|
||
let schema = Schema::new(vec![ | ||
Field::new( | ||
"a", | ||
DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), | ||
false, | ||
), | ||
Field::new("month", DataType::Int64, true), | ||
]); | ||
|
||
let normalized = RecordBatch::new_empty(Arc::new(schema.clone())) | ||
.normalize(".", 0) | ||
.expect("valid normalization"); | ||
|
||
let expected = RecordBatch::new_empty(Arc::new( | ||
schema.normalize(".", 0).expect("valid normalization"), | ||
)); | ||
|
||
assert_eq!(expected, normalized); | ||
} | ||
|
||
#[test] | ||
fn project() { | ||
let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); | ||
|
@@ -1318,7 +1509,9 @@ mod tests { | |
let metadata = vec![("foo".to_string(), "bar".to_string())] | ||
.into_iter() | ||
.collect(); | ||
println!("Metadata: {:?}", metadata); | ||
let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata); | ||
println!("Metadata schema: {:?}", metadata_schema); | ||
let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap(); | ||
|
||
// Cannot remove metadata | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please improve this documentation (maybe copy from the pyarrow version)?
max_level
means (in addition to that 0)separator
doesFor example like https://docs.rs/arrow/latest/arrow/index.html#columnar-format
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, missed doing this, will do!