diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index 1180264e5ddd..3e87635b0ac4 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -398,7 +398,8 @@ fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult { let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1); let mut schema = vec![]; - writer::write_message(&mut schema, encoded_data, pair.1)?; + let already_written_len = 0; + writer::write_message(&mut schema, already_written_len, encoded_data, pair.1)?; Ok(IpcMessage(schema.into())) } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index ba86e452f331..4b0f4a00b513 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -893,7 +893,7 @@ impl FileWriter { let encoded_message = data_gen.schema_to_bytes(schema, &write_options); let (meta, _data) = - write_message_at_offset(&mut writer, written_len, encoded_message, &write_options)?; + write_message(&mut writer, written_len, encoded_message, &write_options)?; // written bytes = padded_magic + schema written_len += meta; @@ -935,7 +935,7 @@ impl FileWriter { )?; for encoded_dictionary in encoded_dictionaries { - let (meta, data) = write_message_at_offset( + let (meta, data) = write_message( &mut self.writer, self.written_len, encoded_dictionary, @@ -947,7 +947,7 @@ impl FileWriter { self.written_len += meta + data; } - let (meta, data) = write_message_at_offset( + let (meta, data) = write_message( &mut self.writer, self.written_len, encoded_message, @@ -1109,7 +1109,7 @@ impl StreamWriter { // write the schema, set the written bytes to the schema let encoded_message = data_gen.schema_to_bytes(schema, &write_options); - let (meta, _data) = write_message(&mut writer, encoded_message, &write_options)?; + let (meta, _data) = write_message(&mut writer, 0, encoded_message, &write_options)?; let preserve_dict_id = write_options.preserve_dict_id; Ok(Self { @@ -1139,7 +1139,7 @@ impl StreamWriter { .expect("StreamWriter is configured to not error on dictionary replacement"); for encoded_dictionary in encoded_dictionaries { - let (meta, data) = write_message_at_offset( + let (meta, data) = write_message( &mut self.writer, self.written_len, encoded_dictionary, @@ -1148,7 +1148,7 @@ impl StreamWriter { self.written_len += meta + data; } - let (meta, data) = write_message_at_offset( + let (meta, data) = write_message( &mut self.writer, self.written_len, encoded_message, @@ -1258,20 +1258,12 @@ pub struct EncodedData { } /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written pub fn write_message( - writer: W, - encoded: EncodedData, - write_options: &IpcWriteOptions, -) -> Result<(usize, usize), ArrowError> { - write_message_at_offset(writer, 0, encoded, write_options) -} - -fn write_message_at_offset( mut writer: W, - offset: usize, + already_written_len: usize, encoded: EncodedData, write_options: &IpcWriteOptions, ) -> Result<(usize, usize), ArrowError> { - if offset % 8 != 0 { + if already_written_len % 8 != 0 { return Err(ArrowError::MemoryError( "Writing an IPC Message unaligned to 8 bytes".to_string(), )); @@ -1287,10 +1279,13 @@ fn write_message_at_offset( let padding_size = pad_to_alignment( write_options.alignment, - offset + continuation_size + flatbuf_size, + already_written_len + continuation_size + flatbuf_size, ); let padded_size = continuation_size + flatbuf_size + padding_size; - assert_eq!((offset + padded_size) % write_options.alignment as usize, 0); + assert_eq!( + (already_written_len + padded_size) % write_options.alignment as usize, + 0 + ); // write continuation, flatbuf, and padding write_continuation(