Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecordBatch normalization (flattening) #6758

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
195 changes: 194 additions & 1 deletion arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
//! A two-dimensional batch of column-oriented data with a defined
//! [schema](arrow_schema::Schema).

use crate::cast::AsArray;
use crate::{new_empty_array, Array, ArrayRef, StructArray};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef};
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef};
use std::collections::VecDeque;
use std::ops::Index;
use std::sync::Arc;

Expand Down Expand Up @@ -403,6 +405,56 @@ impl RecordBatch {
)
}

/// Normalize a semi-structured [`RecordBatch`] into a flat table.
///
/// If max_level is 0, normalizes all levels.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please improve this documentation (maybe copy from the pyarrow version)?

  1. Doucment what max_level means (in addition to that 0)
  2. Document what separator does
  3. provide an example of flatteing a record batch as a doc example?

For example like https://docs.rs/arrow/latest/arrow/index.html#columnar-format

Screenshot 2024-12-18 at 8 05 08 AM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed doing this, will do!

pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result<Self, ArrowError> {
if max_level == 0 {
max_level = usize::MAX;
}
if self.num_rows() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth keeping? I could be reading this wrong, but it looks like there's a lot of code strictly to support normalization for the 0 row case (which is likely very rare)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure the best case to handle this.

I think the secondary case can be removed, as I missed that even with a new_empty RecordBatch, it will still have an (empty) columns field, good catch!

For the usize::MAX setting, this was because polars/pandas had this , with a default value of 0. However, since Rust does not have default parameters, I wasn't sure the best way to adapt this.

A possible idea could be to set up like

enum Depth {
    Default, // All levels
    Value(usize)
}

then do some matching? Might be overkill though.

And of course, there's the option of removing it altogether, although then a value of 0 would mean no change?

// No data, only need to normalize the schema
return Ok(Self::new_empty(Arc::new(
self.schema.normalize(separator, max_level)?,
)));
}
let mut queue: VecDeque<(usize, (ArrayRef, FieldRef))> = VecDeque::new();

for (c, f) in self.columns.iter().zip(self.schema.fields()) {
queue.push_back((0, ((*c).clone(), (*f).clone())));
}

let mut columns: Vec<ArrayRef> = Vec::new();
let mut fields: Vec<FieldRef> = Vec::new();

while let Some((depth, (c, f))) = queue.pop_front() {
if depth < max_level {
match f.data_type() {
DataType::Struct(ff) => {
// Need to zip these in reverse to maintain original order
for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() {
let new_key = format!("{}{}{}", f.name(), separator, fff.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a better way to structure it, but is there a way to keep the field name parts in a Vec and create the flattened fields at the end? That allows you to avoid the repeated format! in a deeply nested schema.

Might not be worth the trouble though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good point, this is definitely not my favorite way to do this. I'll have to do some testing and think about it some more, but it may be better to construct the queue with the components of the Field, then go through and construct all of the Fields at the very end.

let updated_field = Field::new(
new_key.as_str(),
fff.data_type().clone(),
fff.is_nullable(),
);
queue.push_front((depth + 1, (cff.clone(), Arc::new(updated_field))))
}
}
_ => {
columns.push(c);
fields.push(f);
}
}
} else {
columns.push(c);
fields.push(f);
}
}
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
}

/// Returns the number of columns in the record batch.
///
/// # Example
Expand Down Expand Up @@ -1206,6 +1258,145 @@ mod tests {
assert_ne!(batch1, batch2);
}

#[test]
fn normalize() {
let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""]));
let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)]));
let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)]));

let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
let year_field = Arc::new(Field::new("year", DataType::Int64, true));

let a = Arc::new(StructArray::from(vec![
(animals_field.clone(), Arc::new(animals.clone()) as ArrayRef),
(n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef),
(year_field.clone(), Arc::new(year.clone()) as ArrayRef),
]));

let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)]));

let schema = Schema::new(vec![
Field::new(
"a",
DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])),
false,
),
Field::new("month", DataType::Int64, true),
]);

let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()])
.expect("valid conversion")
.normalize(".", 0)
.expect("valid normalization");

let expected = RecordBatch::try_from_iter_with_nullable(vec![
("a.animals", animals.clone(), true),
("a.n_legs", n_legs.clone(), true),
("a.year", year.clone(), true),
("month", month.clone(), true),
])
.expect("valid conversion");

assert_eq!(expected, normalized);
}

#[test]
fn normalize_nested() {
// Initialize schema
let a = Arc::new(Field::new("a", DataType::Utf8, true));
let b = Arc::new(Field::new("b", DataType::Int64, false));
let c = Arc::new(Field::new("c", DataType::Int64, true));

let d = Arc::new(Field::new("d", DataType::Utf8, true));
let e = Arc::new(Field::new("e", DataType::Int64, false));
let f = Arc::new(Field::new("f", DataType::Int64, true));

let one = Arc::new(Field::new(
"1",
DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])),
false,
));
let two = Arc::new(Field::new(
"2",
DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])),
true,
));

let exclamation = Arc::new(Field::new(
"!",
DataType::Struct(Fields::from(vec![one, two])),
false,
));

// Initialize fields
let a_field: ArrayRef = Arc::new(StringArray::from(vec!["a1_field_data", "a1_field_data"]));
let b_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(0), Some(1)]));
let c_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2)]));

let d_field: ArrayRef = Arc::new(StringArray::from(vec!["d1_field_data", "d2_field_data"]));
let e_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), Some(4)]));
let f_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(5)]));

let one_field = Arc::new(StructArray::from(vec![
(a.clone(), Arc::new(a_field.clone()) as ArrayRef),
(b.clone(), Arc::new(b_field.clone()) as ArrayRef),
(c.clone(), Arc::new(c_field.clone()) as ArrayRef),
]));
let two_field = Arc::new(StructArray::from(vec![
(a.clone(), Arc::new(a_field.clone()) as ArrayRef),
(b.clone(), Arc::new(b_field.clone()) as ArrayRef),
(c.clone(), Arc::new(c_field.clone()) as ArrayRef),
]));

/*let exclamation_field = Arc::new(StructArray::from(vec![
ngli-me marked this conversation as resolved.
Show resolved Hide resolved
(one.clone(), Arc::new(one_field.clone()) as ArrayRef),
(two.clone(), Arc::new(two_field.clone()) as ArrayRef),
]));*/

let schema = Schema::new(vec![exclamation.clone()]);
/*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field])
.expect("valid conversion");*/
//.normalize(".", 0)
//.expect("valid normalization");

/*let expected = RecordBatch::try_from_iter_with_nullable(vec![
("a.animals", animals.clone(), true),
("a.n_legs", n_legs.clone(), true),
("a.year", year.clone(), true),
("month", month.clone(), true),
])
.expect("valid conversion");*/

//assert_eq!(expected, normalized);
}

#[test]
fn normalize_empty() {
let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true));
let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true));
let year_field = Arc::new(Field::new("year", DataType::Int64, true));

let schema = Schema::new(vec![
Field::new(
"a",
DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])),
false,
),
Field::new("month", DataType::Int64, true),
]);

let normalized = RecordBatch::new_empty(Arc::new(schema.clone()))
.normalize(".", 0)
.expect("valid normalization");

let expected = RecordBatch::new_empty(Arc::new(
schema.normalize(".", 0).expect("valid normalization"),
));

assert_eq!(expected, normalized);
}

#[test]
fn project() {
let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
Expand Down Expand Up @@ -1318,7 +1509,9 @@ mod tests {
let metadata = vec![("foo".to_string(), "bar".to_string())]
.into_iter()
.collect();
println!("Metadata: {:?}", metadata);
let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata);
println!("Metadata schema: {:?}", metadata_schema);
let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap();

// Cannot remove metadata
Expand Down
Loading
Loading