diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs index 64191a22b33e..de5f5bdd629f 100644 --- a/arrow-ipc/src/reader/stream.rs +++ b/arrow-ipc/src/reader/stream.rs @@ -275,8 +275,10 @@ impl StreamDecoder { #[cfg(test)] mod tests { use super::*; - use crate::writer::StreamWriter; - use arrow_array::{Int32Array, Int64Array, RecordBatch}; + use crate::writer::{IpcWriteOptions, StreamWriter}; + use arrow_array::{ + types::Int32Type, DictionaryArray, Int32Array, Int64Array, RecordBatch, RunArray, + }; use arrow_schema::{DataType, Field, Schema}; // Further tests in arrow-integration-testing/tests/ipc_reader.rs @@ -315,4 +317,61 @@ mod tests { let err = decoder.finish().unwrap_err().to_string(); assert_eq!(err, "Ipc error: Unexpected End of Stream"); } + + #[test] + fn test_read_ree_dict_record_batches_from_buffer() { + let schema = Schema::new(vec![Field::new( + "test1", + DataType::RunEndEncoded( + Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)), + Arc::new(Field::new_dict( + "values".to_string(), + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + 0, + false, + )), + ), + true, + )]); + let batch = RecordBatch::try_new( + schema.clone().into(), + vec![Arc::new( + RunArray::try_new( + &Int32Array::from(vec![1, 2, 3]), + &vec![Some("a"), None, Some("a")] + .into_iter() + .collect::>(), + ) + .expect("Failed to create RunArray"), + )], + ) + .expect("Failed to create RecordBatch"); + + let mut buffer = vec![]; + { + let mut writer = StreamWriter::try_new_with_options( + &mut buffer, + &schema, + IpcWriteOptions::default().with_preserve_dict_id(false), + ) + .expect("Failed to create StreamWriter"); + writer.write(&batch).expect("Failed to write RecordBatch"); + writer.finish().expect("Failed to finish StreamWriter"); + } + + let mut decoder = StreamDecoder::new(); + let buf = &mut Buffer::from(buffer.as_slice()); + while let Some(batch) = decoder + .decode(buf) + .map_err(|e| { + ArrowError::ExternalError(format!("Failed to decode record batch: {}", e).into()) + }) + .expect("Failed to decode record batch") + { + assert_eq!(batch, batch); + } + + decoder.finish().expect("Failed to finish decoder"); + } } diff --git a/arrow-schema/src/field.rs b/arrow-schema/src/field.rs index a84a6ada334d..fc4852a3d37d 100644 --- a/arrow-schema/src/field.rs +++ b/arrow-schema/src/field.rs @@ -375,6 +375,7 @@ impl Field { | DataType::FixedSizeList(field, _) | DataType::Map(field, _) => field.fields(), DataType::Dictionary(_, value_field) => Field::_fields(value_field.as_ref()), + DataType::RunEndEncoded(_, field) => field.fields(), _ => vec![], } }