Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for deserializing list-encoded JSON structs [#6558] #6643

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion arrow-json/src/reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{make_decoder, ArrayDecoder};
use crate::reader::{make_decoder, ArrayDecoder, StructParseMode};
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_array::OffsetSizeTrait;
use arrow_buffer::buffer::NullBuffer;
Expand All @@ -37,6 +37,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_parse_mode: StructParseMode,
) -> Result<Self, ArrowError> {
let field = match &data_type {
DataType::List(f) if !O::IS_LARGE => f,
Expand All @@ -48,6 +49,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
coerce_primitive,
strict_mode,
field.is_nullable(),
struct_parse_mode,
)?;

Ok(Self {
Expand Down
5 changes: 4 additions & 1 deletion arrow-json/src/reader/map_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{make_decoder, ArrayDecoder};
use crate::reader::{make_decoder, ArrayDecoder, StructParseMode};
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::ArrowNativeType;
Expand All @@ -36,6 +36,7 @@ impl MapArrayDecoder {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_parse_mode: StructParseMode,
) -> Result<Self, ArrowError> {
let fields = match &data_type {
DataType::Map(_, true) => {
Expand All @@ -59,12 +60,14 @@ impl MapArrayDecoder {
coerce_primitive,
strict_mode,
fields[0].is_nullable(),
struct_parse_mode,
)?;
let values = make_decoder(
fields[1].data_type().clone(),
coerce_primitive,
strict_mode,
fields[1].is_nullable(),
struct_parse_mode,
)?;

Ok(Self {
Expand Down
264 changes: 258 additions & 6 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,30 @@ mod struct_array;
mod tape;
mod timestamp_array;

/// Specifies what is considered valid JSON when parsing StructArrays.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a link to documentation / prior art for this type of JSON encoding of structs? Maybe to trino / presto docs that describe it in more full detail that we can add to this doc?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, the documentation of the format of the returned data is a little lacking. They describe that the top-level row is a list with one entry per column (https://prestodb.io/docs/current/develop/client-protocol.html#important-queryresults-attributes, https://trino.io/docs/current/develop/client-protocol.html#important-queryresults-attributes), but do not specify how any structural elements are serialized (or non-structural things like Datetimes etc). If it helps, I could add this to the Presto documentation.

///
/// If a struct with fields `("a", Int32)` and `("b", Utf8)`, it could be represented as
/// a JSON object (`{"a": 1, "b": "c"}`) or a JSON list (`[1, "c"]`). This enum controls
/// which form(s) the Reader will accept.
///
/// For objects, the order of the key does not matter.
/// For lists, the entries must be the same number and in the same order as the struct fields.
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub enum StructParseMode {
#[default]
/// Only parse objects (e.g., {"a": 1, "b": "c"})
ObjectOnly,
/// Only parse lists (e.g., [1, "c"])
ListOnly,
}

/// A builder for [`Reader`] and [`Decoder`]
pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
is_field: bool,
struct_parse_mode: StructParseMode,

schema: SchemaRef,
}
Expand All @@ -195,6 +213,7 @@ impl ReaderBuilder {
coerce_primitive: false,
strict_mode: false,
is_field: false,
struct_parse_mode: StructParseMode::ObjectOnly,
schema,
}
}
Expand Down Expand Up @@ -235,6 +254,7 @@ impl ReaderBuilder {
coerce_primitive: false,
strict_mode: false,
is_field: true,
struct_parse_mode: StructParseMode::ObjectOnly,
schema: Arc::new(Schema::new([field.into()])),
}
}
Expand Down Expand Up @@ -262,6 +282,15 @@ impl ReaderBuilder {
}
}

/// Set the [`StructParseMode`] for the reader, which determines whether
/// structs can be represented by JSON objects, lists, or either.
pub fn with_struct_parse_mode(self, struct_parse_mode: StructParseMode) -> Self {
Self {
struct_parse_mode,
..self
}
}

/// Create a [`Reader`] with the provided [`BufRead`]
pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
Ok(Reader {
Expand All @@ -280,7 +309,13 @@ impl ReaderBuilder {
}
};

let decoder = make_decoder(data_type, self.coerce_primitive, self.strict_mode, nullable)?;
let decoder = make_decoder(
data_type,
self.coerce_primitive,
self.strict_mode,
nullable,
self.struct_parse_mode,
)?;

let num_fields = self.schema.flattened_fields().len();

Expand Down Expand Up @@ -643,6 +678,7 @@ fn make_decoder(
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
struct_parse_mode: StructParseMode,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
downcast_integer! {
data_type => (primitive_decoder, data_type),
Expand Down Expand Up @@ -693,13 +729,13 @@ fn make_decoder(
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
}
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable)?)),
DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_parse_mode)?)),
d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
}
}
Expand All @@ -715,7 +751,7 @@ mod tests {
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use arrow_data::ArrayDataBuilder;
use arrow_schema::Field;
use arrow_schema::{Field, Fields};

use super::*;

Expand Down Expand Up @@ -2343,4 +2379,220 @@ mod tests {
.unwrap()
);
}

#[test]
fn test_struct_decoding_list_length() {
use arrow_array::array;

let row = "[1, 2]";

let mut fields = vec![Field::new("a", DataType::Int32, true)];
let too_few_fields = Fields::from(fields.clone());
fields.push(Field::new("b", DataType::Int32, true));
let correct_fields = Fields::from(fields.clone());
fields.push(Field::new("c", DataType::Int32, true));
let too_many_fields = Fields::from(fields.clone());

let parse = |fields: Fields, as_field: bool| {
let builder = if as_field {
ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
} else {
ReaderBuilder::new(Arc::new(Schema::new(fields)))
};
builder
.with_struct_parse_mode(StructParseMode::ListOnly)
.build(Cursor::new(row.as_bytes()))
.unwrap()
.next()
.unwrap()
};

let expected_row = StructArray::new(
correct_fields.clone(),
vec![
Arc::new(array::Int32Array::from(vec![1])),
Arc::new(array::Int32Array::from(vec![2])),
],
None,
);
let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);

assert_eq!(
parse(too_few_fields.clone(), true).unwrap_err().to_string(),
"Json error: found extra columns for 1 fields".to_string()
);
assert_eq!(
parse(too_few_fields, false).unwrap_err().to_string(),
"Json error: found extra columns for 1 fields".to_string()
);
assert_eq!(
parse(correct_fields.clone(), true).unwrap(),
RecordBatch::try_new(
Arc::new(Schema::new(vec![row_field])),
vec![Arc::new(expected_row.clone())]
)
.unwrap()
);
assert_eq!(
parse(correct_fields, false).unwrap(),
RecordBatch::from(expected_row)
);
assert_eq!(
parse(too_many_fields.clone(), true)
.unwrap_err()
.to_string(),
"Json error: found 2 columns for 3 fields".to_string()
);
assert_eq!(
parse(too_many_fields, false).unwrap_err().to_string(),
"Json error: found 2 columns for 3 fields".to_string()
);
}

#[test]
fn test_struct_decoding() {
use arrow_array::builder;

let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;

let struct_fields = Fields::from(vec![
Field::new("b", DataType::new_list(DataType::Int32, true), true),
Field::new_map(
"c",
"entries",
Field::new("keys", DataType::Utf8, false),
Field::new("values", DataType::Int32, true),
false,
false,
),
]);

let list_array =
ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);

let map_array = {
let mut map_builder = builder::MapBuilder::new(
None,
builder::StringBuilder::new(),
builder::Int32Builder::new(),
);
map_builder.keys().append_value("d");
map_builder.values().append_value(3);
map_builder.append(true).unwrap();
map_builder.finish()
};

let struct_array = StructArray::new(
struct_fields.clone(),
vec![Arc::new(list_array), Arc::new(map_array)],
None,
);

let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Struct(struct_fields),
true,
)]));
let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();

let parse = |s: &str, mode: StructParseMode| {
ReaderBuilder::new(schema.clone())
.with_struct_parse_mode(mode)
.build(Cursor::new(s.as_bytes()))
.unwrap()
.next()
.unwrap()
};

assert_eq!(
parse(nested_object_json, StructParseMode::ObjectOnly).unwrap(),
expected
);
assert_eq!(
parse(nested_list_json, StructParseMode::ObjectOnly)
.unwrap_err()
.to_string(),
"Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
);
assert_eq!(
parse(nested_mixed_json, StructParseMode::ObjectOnly)
.unwrap_err()
.to_string(),
"Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
);

assert_eq!(
parse(nested_list_json, StructParseMode::ListOnly).unwrap(),
expected
);
assert_eq!(
parse(nested_object_json, StructParseMode::ListOnly)
.unwrap_err()
.to_string(),
"Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
);
assert_eq!(
parse(nested_mixed_json, StructParseMode::ListOnly)
.unwrap_err()
.to_string(),
"Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
);
}

// Test cases:
// [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error
// [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])] -> Error
// [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
// [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a', Int32)])] -> Error
#[test]
fn test_struct_decoding_empty_list() {
let int_field = Field::new("a", DataType::Int32, true);
let struct_field = Field::new(
"r",
DataType::Struct(Fields::from(vec![int_field.clone()])),
true,
);

let parse = |json: &str, as_field: bool, field: Field| {
let builder = if as_field {
ReaderBuilder::new_with_field(field.clone())
} else {
ReaderBuilder::new(Arc::new(Schema::new(vec![field].clone())))
};
builder
.with_struct_parse_mode(StructParseMode::ListOnly)
.build(Cursor::new(json.as_bytes()))
.unwrap()
.next()
.unwrap()
};

assert_eq!(
parse("[]", true, struct_field.clone())
.unwrap_err()
.to_string(),
"Json error: found 0 columns for 1 fields".to_owned()
);
assert_eq!(
parse("[]", false, int_field.clone())
.unwrap_err()
.to_string(),
"Json error: found 0 columns for 1 fields".to_owned()
);
assert_eq!(
parse("[]", false, struct_field.clone())
.unwrap_err()
.to_string(),
"Json error: found 0 columns for 1 fields".to_owned()
);

assert_eq!(
parse("[[]]", false, struct_field.clone())
.unwrap_err()
.to_string(),
"Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
);
}
}
Loading
Loading