Skip to content

Commit

Permalink
Ensure IPC stream messages are contiguous
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Aug 28, 2024
1 parent 8c956a9 commit e81e107
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 42 deletions.
20 changes: 19 additions & 1 deletion arrow-integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
// under the License.

use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::ipc::writer::FileWriter;
use arrow_integration_test::*;
use arrow_integration_testing::{canonicalize_schema, open_json_file};
use clap::Parser;
use std::fs::File;
use std::io::{Seek, SeekFrom};

#[derive(clap::ValueEnum, Debug, Clone)]
#[clap(rename_all = "SCREAMING_SNAKE_CASE")]
Expand Down Expand Up @@ -179,5 +180,22 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
));
}


let mut arrow_file = File::open(arrow_name)?;

arrow_file.seek(SeekFrom::Start(8))?;
let arrow_stream_reader = StreamReader::try_new(arrow_file, None)?;

if canonicalize_schema(&json_file.schema) != canonicalize_schema(&arrow_stream_reader.schema()) {
return Err(ArrowError::ComputeError(format!(
"Schemas do not match. JSON: {:?}. Embedded stream: {:?}",
json_file.schema, arrow_stream_reader.schema()
)));
}

for batch in arrow_stream_reader {
let _ = batch?;
}

Ok(())
}
127 changes: 86 additions & 41 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::CONTINUATION_MARKER;
/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
#[derive(Debug, Clone)]
pub struct IpcWriteOptions {
/// Write padding after memory buffers to this multiple of bytes.
/// Write padding to ensure that each data buffer is aligned to this multiple of bytes.
/// Must be 8, 16, 32, or 64 - defaults to 64.
alignment: u8,
/// The legacy format is for releases before 0.15.0, and uses metadata V4
Expand Down Expand Up @@ -824,12 +824,12 @@ impl DictionaryTracker {
pub struct FileWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
schema: SchemaRef,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
/// Dictionary blocks that will be written as part of the IPC footer
dictionary_blocks: Vec<crate::Block>,
/// Record blocks that will be written as part of the IPC footer
Expand Down Expand Up @@ -879,20 +879,31 @@ impl<W: Write> FileWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
// write magic to header aligned on alignment boundary
let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
let header_size = super::ARROW_MAGIC.len() + pad_len;

let mut written_len = 0;

// write magic and padding
writer.write_all(&super::ARROW_MAGIC)?;
written_len += super::ARROW_MAGIC.len();
let pad_len = pad_to_alignment(8, written_len);
writer.write_all(&PADDING[..pad_len])?;
// write the schema, set the written bytes to the schema + header
written_len += pad_len;

// write the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;

let (meta, _data) =
write_message_at_offset(&mut writer, written_len, encoded_message, &write_options)?;

// written bytes = padded_magic + schema
written_len += meta;

let preserve_dict_id = write_options.preserve_dict_id;
Ok(Self {
writer,
written_len,
write_options,
schema: Arc::new(schema.clone()),
block_offsets: meta + data + header_size,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
Expand Down Expand Up @@ -924,23 +935,32 @@ impl<W: Write> FileWriter<W> {
)?;

for encoded_dictionary in encoded_dictionaries {
let (meta, data) =
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;

let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
let block = crate::Block::new(self.written_len as i64, meta as i32, data as i64);
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
}

let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
// add a record block for the footer
let block = crate::Block::new(
self.block_offsets as i64,
self.written_len as i64,
meta as i32, // TODO: is this still applicable?
data as i64,
);
self.record_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -953,7 +973,8 @@ impl<W: Write> FileWriter<W> {
}

// write EOS
write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;
assert_eq!(self.written_len % 8, 0);

let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
Expand Down Expand Up @@ -1041,6 +1062,8 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
pub struct StreamWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// Whether the writer footer has been written, and the writer is finished
Expand Down Expand Up @@ -1084,12 +1107,15 @@ impl<W: Write> StreamWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();

// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
write_message(&mut writer, encoded_message, &write_options)?;
let (meta, _data) = write_message(&mut writer, encoded_message, &write_options)?;

let preserve_dict_id = write_options.preserve_dict_id;
Ok(Self {
writer,
written_len: meta,
write_options,
finished: false,
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
Expand All @@ -1114,10 +1140,22 @@ impl<W: Write> StreamWriter<W> {
.expect("StreamWriter is configured to not error on dictionary replacement");

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;
self.written_len += meta + data;
}

write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message_at_offset(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -1129,7 +1167,7 @@ impl<W: Write> StreamWriter<W> {
));
}

write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

self.finished = true;

Expand Down Expand Up @@ -1221,49 +1259,56 @@ pub struct EncodedData {
}
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(
writer: W,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
write_message_at_offset(writer, 0, encoded, write_options)
}

fn write_message_at_offset<W: Write>(
mut writer: W,
offset: usize,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % usize::from(write_options.alignment) != 0 {
if offset % 8 != 0 {
return Err(ArrowError::MemoryError(
"Arrow data not aligned".to_string(),
"Writing an IPC Message unaligned to 8 bytes".to_string(),
));
}

let a = usize::from(write_options.alignment - 1);
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = if write_options.write_legacy_ipc_format {
let continuation_size = if write_options.write_legacy_ipc_format {
4
} else {
8
};
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
let flatbuf_size = encoded.ipc_message.len();
assert_ne!(flatbuf_size, 0);

let padding_size = pad_to_alignment(
write_options.alignment,
offset + continuation_size + flatbuf_size,
);
let padded_size = continuation_size + flatbuf_size + padding_size;
assert_eq!((offset + padded_size) % write_options.alignment as usize, 0);

// write continuation, flatbuf, and padding
write_continuation(
&mut writer,
write_options,
(aligned_size - prefix_size) as i32,
(padded_size - continuation_size) as i32,
)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&PADDING[..padding_bytes])?;
writer.write_all(&encoded.ipc_message)?;
writer.write_all(&PADDING[..padding_size])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
let body_len = if encoded.arrow_data.len() > 0 {
write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
} else {
0
};

Ok((aligned_size, body_len))
Ok((padded_size, body_len))
}

fn write_body_buffers<W: Write>(
Expand Down

0 comments on commit e81e107

Please sign in to comment.