Skip to content

Commit

Permalink
del(write_message_at_offset), offset->already_written_len
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Sep 5, 2024
1 parent 3dca647 commit 95b6975
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
3 changes: 2 additions & 1 deletion arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

let mut schema = vec![];
writer::write_message(&mut schema, encoded_data, pair.1)?;
let already_written_len = 0;
writer::write_message(&mut schema, already_written_len, encoded_data, pair.1)?;
Ok(IpcMessage(schema.into()))
}

Expand Down
31 changes: 13 additions & 18 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ impl<W: Write> FileWriter<W> {
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);

let (meta, _data) =
write_message_at_offset(&mut writer, written_len, encoded_message, &write_options)?;
write_message(&mut writer, written_len, encoded_message, &write_options)?;

// written bytes = padded_magic + schema
written_len += meta;
Expand Down Expand Up @@ -935,7 +935,7 @@ impl<W: Write> FileWriter<W> {
)?;

for encoded_dictionary in encoded_dictionaries {
let (meta, data) = write_message_at_offset(
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_dictionary,
Expand All @@ -947,7 +947,7 @@ impl<W: Write> FileWriter<W> {
self.written_len += meta + data;
}

let (meta, data) = write_message_at_offset(
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_message,
Expand Down Expand Up @@ -1109,7 +1109,7 @@ impl<W: Write> StreamWriter<W> {

// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
let (meta, _data) = write_message(&mut writer, encoded_message, &write_options)?;
let (meta, _data) = write_message(&mut writer, 0, encoded_message, &write_options)?;

let preserve_dict_id = write_options.preserve_dict_id;
Ok(Self {
Expand Down Expand Up @@ -1139,7 +1139,7 @@ impl<W: Write> StreamWriter<W> {
.expect("StreamWriter is configured to not error on dictionary replacement");

for encoded_dictionary in encoded_dictionaries {
let (meta, data) = write_message_at_offset(
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_dictionary,
Expand All @@ -1148,7 +1148,7 @@ impl<W: Write> StreamWriter<W> {
self.written_len += meta + data;
}

let (meta, data) = write_message_at_offset(
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_message,
Expand Down Expand Up @@ -1258,20 +1258,12 @@ pub struct EncodedData {
}
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(
writer: W,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
write_message_at_offset(writer, 0, encoded, write_options)
}

fn write_message_at_offset<W: Write>(
mut writer: W,
offset: usize,
already_written_len: usize,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
if offset % 8 != 0 {
if already_written_len % 8 != 0 {
return Err(ArrowError::MemoryError(
"Writing an IPC Message unaligned to 8 bytes".to_string(),
));
Expand All @@ -1287,10 +1279,13 @@ fn write_message_at_offset<W: Write>(

let padding_size = pad_to_alignment(
write_options.alignment,
offset + continuation_size + flatbuf_size,
already_written_len + continuation_size + flatbuf_size,
);
let padded_size = continuation_size + flatbuf_size + padding_size;
assert_eq!((offset + padded_size) % write_options.alignment as usize, 0);
assert_eq!(
(already_written_len + padded_size) % write_options.alignment as usize,
0
);

// write continuation, flatbuf, and padding
write_continuation(
Expand Down

0 comments on commit 95b6975

Please sign in to comment.