From 6f2a14e7af9dbca3a37c9953c29a2ad06023710c Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Sun, 15 Sep 2024 18:53:19 +0200 Subject: [PATCH 1/2] arrow-ipc: Add test for streaming IPC with REE dicts --- arrow-ipc/src/reader/stream.rs | 65 ++++++++++++++++++++++++++++++++-- 1 file changed, 63 insertions(+), 2 deletions(-) diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 64191a22b33e..9cf81aaa0d1d 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -275,8 +275,8 @@ impl StreamDecoder { #[cfg(test)] mod tests { use super::*; - use crate::writer::StreamWriter; - use arrow_array::{Int32Array, Int64Array, RecordBatch}; + use crate::writer::{IpcWriteOptions, StreamWriter}; + use arrow_array::{Int32Array, Int64Array, RecordBatch, RunArray, DictionaryArray, types::Int32Type}; use arrow_schema::{DataType, Field, Schema}; // Further tests in arrow-integration-testing/tests/ipc_reader.rs @@ -315,4 +315,65 @@ mod tests { let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Ipc error: Unexpected End of Stream"); } + + #[test] + fn test_read_ree_dict_record_batches_from_buffer() { + let schema = Schema::new(vec![ + Field::new( + "test1", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)), + Arc::new(Field::new_dict( + "values".to_string(), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + 0, + false, + )), + ), + true, + ), + ]); + let batch = RecordBatch::try_new( + schema.clone().into(), + vec![ + Arc::new( + RunArray::try_new( + &Int32Array::from(vec![1, 2, 3]), + &vec![Some("a"), None, Some("a")] + .into_iter() + .collect::>(), + ) + .expect("Failed to create RunArray"), + ), + ], + ) + .expect("Failed to create RecordBatch"); + + let mut buffer = vec![]; + { + let mut writer = StreamWriter::try_new_with_options( + &mut buffer, + &schema, + IpcWriteOptions::default().with_preserve_dict_id(false), + ) + .expect("Failed to create StreamWriter"); + writer.write(&batch).expect("Failed to write RecordBatch"); + writer.finish().expect("Failed to finish StreamWriter"); + } + + let mut decoder = StreamDecoder::new(); + let buf = &mut Buffer::from(buffer.as_slice()); + while let Some(batch) = decoder + .decode(buf) + .map_err(|e| { + ArrowError::ExternalError(format!("Failed to decode record batch: {}", e).into()) + }) + .expect("Failed to decode record batch") + { + assert_eq!(batch, batch); + } + + decoder.finish().expect("Failed to finish decoder"); + } } From df2a11f60da5745f1615616e926d2b4bced55b68 Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Sun, 15 Sep 2024 18:55:50 +0200 Subject: [PATCH 2/2] arrow-schema: Include child fields of REE fields --- arrow-ipc/src/reader/stream.rs | 22 ++++++++++------------ arrow-schema/src/field.rs | 1 + 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 9cf81aaa0d1d..de5f5bdd629f 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -276,7 +276,9 @@ impl StreamDecoder { mod tests { use super::*; use crate::writer::{IpcWriteOptions, StreamWriter}; - use arrow_array::{Int32Array, Int64Array, RecordBatch, RunArray, DictionaryArray, types::Int32Type}; + use arrow_array::{ + types::Int32Type, DictionaryArray, Int32Array, Int64Array, RecordBatch, RunArray, + }; use arrow_schema::{DataType, Field, Schema}; // Further tests in arrow-integration-testing/tests/ipc_reader.rs @@ -318,9 +320,8 @@ mod tests { #[test] fn test_read_ree_dict_record_batches_from_buffer() { - let schema = Schema::new(vec![ - Field::new( - "test1", + let schema = Schema::new(vec![Field::new( + "test1", DataType::RunEndEncoded( Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)), Arc::new(Field::new_dict( @@ -332,21 +333,18 @@ mod tests { )), ), true, - ), - ]); + )]); let batch = RecordBatch::try_new( schema.clone().into(), - vec![ - Arc::new( + vec![Arc::new( RunArray::try_new( &Int32Array::from(vec![1, 2, 3]), &vec![Some("a"), None, Some("a")] - .into_iter() - .collect::>(), + .into_iter() + .collect::>(), ) .expect("Failed to create RunArray"), - ), - ], + )], ) .expect("Failed to create RecordBatch"); diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index a84a6ada334d..fc4852a3d37d 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -375,6 +375,7 @@ impl Field { | DataType::FixedSizeList(field, _) | DataType::Map(field, _) => field.fields(), DataType::Dictionary(_, value_field) => Field::_fields(value_field.as_ref()), + DataType::RunEndEncoded(_, field) => field.fields(), _ => vec![], } }