diff --git a/arrow-integration-testing/src/bin/arrow-json-integration-test.rs b/arrow-integration-testing/src/bin/arrow-json-integration-test.rs index cc3dd2110e36..9ac3451e573f 100644 --- a/arrow-integration-testing/src/bin/arrow-json-integration-test.rs +++ b/arrow-integration-testing/src/bin/arrow-json-integration-test.rs @@ -16,12 +16,13 @@ // under the License. use arrow::error::{ArrowError, Result}; -use arrow::ipc::reader::FileReader; +use arrow::ipc::reader::{FileReader, StreamReader}; use arrow::ipc::writer::FileWriter; use arrow_integration_test::*; use arrow_integration_testing::{canonicalize_schema, open_json_file}; use clap::Parser; use std::fs::File; +use std::io::{Seek, SeekFrom}; #[derive(clap::ValueEnum, Debug, Clone)] #[clap(rename_all = "SCREAMING_SNAKE_CASE")] @@ -179,5 +180,22 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> { )); } + + let mut arrow_file = File::open(arrow_name)?; + + arrow_file.seek(SeekFrom::Start(8))?; + let arrow_stream_reader = StreamReader::try_new(arrow_file, None)?; + + if canonicalize_schema(&json_file.schema) != canonicalize_schema(&arrow_stream_reader.schema()) { + return Err(ArrowError::ComputeError(format!( + "Schemas do not match. JSON: {:?}. Embedded stream: {:?}", + json_file.schema, arrow_stream_reader.schema() + ))); + } + + for batch in arrow_stream_reader { + let _ = batch?; + } + Ok(()) } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index ade902f7cafd..0f4740ee7074 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -42,7 +42,7 @@ use crate::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] #[derive(Debug, Clone)] pub struct IpcWriteOptions { - /// Write padding after memory buffers to this multiple of bytes. + /// Write padding to ensure that each data buffer is aligned to this multiple of bytes. /// Must be 8, 16, 32, or 64 - defaults to 64. alignment: u8, /// The legacy format is for releases before 0.15.0, and uses metadata V4 @@ -824,12 +824,12 @@ impl DictionaryTracker { pub struct FileWriter { /// The object to write to writer: W, + /// The number of bytes written + written_len: usize, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches schema: SchemaRef, - /// The number of bytes between each block of bytes, as an offset for random access - block_offsets: usize, /// Dictionary blocks that will be written as part of the IPC footer dictionary_blocks: Vec, /// Record blocks that will be written as part of the IPC footer @@ -879,20 +879,31 @@ impl FileWriter { write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - // write magic to header aligned on alignment boundary - let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len()); - let header_size = super::ARROW_MAGIC.len() + pad_len; + + let mut written_len = 0; + + // write magic and padding writer.write_all(&super::ARROW_MAGIC)?; + written_len += super::ARROW_MAGIC.len(); + let pad_len = pad_to_alignment(8, written_len); writer.write_all(&PADDING[..pad_len])?; - // write the schema, set the written bytes to the schema + header + written_len += pad_len; + + // write the schema let encoded_message = data_gen.schema_to_bytes(schema, &write_options); - let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; + + let (meta, _data) = + write_message_at_offset(&mut writer, written_len, encoded_message, &write_options)?; + + // written bytes = padded_magic + schema + written_len += meta; + let preserve_dict_id = write_options.preserve_dict_id; Ok(Self { writer, + written_len, write_options, schema: Arc::new(schema.clone()), - block_offsets: meta + data + header_size, dictionary_blocks: vec![], record_blocks: vec![], finished: false, @@ -924,23 +935,32 @@ impl FileWriter { )?; for encoded_dictionary in encoded_dictionaries { - let (meta, data) = - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + let (meta, data) = write_message_at_offset( + &mut self.writer, + self.written_len, + encoded_dictionary, + &self.write_options, + )?; - let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64); + let block = crate::Block::new(self.written_len as i64, meta as i32, data as i64); self.dictionary_blocks.push(block); - self.block_offsets += meta + data; + self.written_len += meta + data; } - let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?; + let (meta, data) = write_message_at_offset( + &mut self.writer, + self.written_len, + encoded_message, + &self.write_options, + )?; // add a record block for the footer let block = crate::Block::new( - self.block_offsets as i64, + self.written_len as i64, meta as i32, // TODO: is this still applicable? data as i64, ); self.record_blocks.push(block); - self.block_offsets += meta + data; + self.written_len += meta + data; Ok(()) } @@ -953,7 +973,8 @@ impl FileWriter { } // write EOS - write_continuation(&mut self.writer, &self.write_options, 0)?; + self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?; + assert_eq!(self.written_len % 8, 0); let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); @@ -1041,6 +1062,8 @@ impl RecordBatchWriter for FileWriter { pub struct StreamWriter { /// The object to write to writer: W, + /// The number of bytes written + written_len: usize, /// IPC write options write_options: IpcWriteOptions, /// Whether the writer footer has been written, and the writer is finished @@ -1084,12 +1107,15 @@ impl StreamWriter { write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); + // write the schema, set the written bytes to the schema let encoded_message = data_gen.schema_to_bytes(schema, &write_options); - write_message(&mut writer, encoded_message, &write_options)?; + let (meta, _data) = write_message(&mut writer, encoded_message, &write_options)?; + let preserve_dict_id = write_options.preserve_dict_id; Ok(Self { writer, + written_len: meta, write_options, finished: false, dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id( @@ -1114,10 +1140,22 @@ impl StreamWriter { .expect("StreamWriter is configured to not error on dictionary replacement"); for encoded_dictionary in encoded_dictionaries { - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + let (meta, data) = write_message_at_offset( + &mut self.writer, + self.written_len, + encoded_dictionary, + &self.write_options, + )?; + self.written_len += meta + data; } - write_message(&mut self.writer, encoded_message, &self.write_options)?; + let (meta, data) = write_message_at_offset( + &mut self.writer, + self.written_len, + encoded_message, + &self.write_options, + )?; + self.written_len += meta + data; Ok(()) } @@ -1129,7 +1167,7 @@ impl StreamWriter { )); } - write_continuation(&mut self.writer, &self.write_options, 0)?; + self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?; self.finished = true; @@ -1221,49 +1259,56 @@ pub struct EncodedData { } /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written pub fn write_message( + writer: W, + encoded: EncodedData, + write_options: &IpcWriteOptions, +) -> Result<(usize, usize), ArrowError> { + write_message_at_offset(writer, 0, encoded, write_options) +} + +fn write_message_at_offset( mut writer: W, + offset: usize, encoded: EncodedData, write_options: &IpcWriteOptions, ) -> Result<(usize, usize), ArrowError> { - let arrow_data_len = encoded.arrow_data.len(); - if arrow_data_len % usize::from(write_options.alignment) != 0 { + if offset % 8 != 0 { return Err(ArrowError::MemoryError( - "Arrow data not aligned".to_string(), + "Writing an IPC Message unaligned to 8 bytes".to_string(), )); } - let a = usize::from(write_options.alignment - 1); - let buffer = encoded.ipc_message; - let flatbuf_size = buffer.len(); - let prefix_size = if write_options.write_legacy_ipc_format { + let continuation_size = if write_options.write_legacy_ipc_format { 4 } else { 8 }; - let aligned_size = (flatbuf_size + prefix_size + a) & !a; - let padding_bytes = aligned_size - flatbuf_size - prefix_size; + let flatbuf_size = encoded.ipc_message.len(); + assert_ne!(flatbuf_size, 0); + + let padding_size = pad_to_alignment( + write_options.alignment, + offset + continuation_size + flatbuf_size, + ); + let padded_size = continuation_size + flatbuf_size + padding_size; + assert_eq!((offset + padded_size) % write_options.alignment as usize, 0); + // write continuation, flatbuf, and padding write_continuation( &mut writer, write_options, - (aligned_size - prefix_size) as i32, + (padded_size - continuation_size) as i32, )?; - - // write the flatbuf - if flatbuf_size > 0 { - writer.write_all(&buffer)?; - } - // write padding - writer.write_all(&PADDING[..padding_bytes])?; + writer.write_all(&encoded.ipc_message)?; + writer.write_all(&PADDING[..padding_size])?; // write arrow data - let body_len = if arrow_data_len > 0 { + let body_len = if encoded.arrow_data.len() > 0 { write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)? } else { 0 }; - - Ok((aligned_size, body_len)) + Ok((padded_size, body_len)) } fn write_body_buffers(