Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC spec and codec update - add support for arrow metadata #7907

Merged
merged 10 commits into from
Oct 28, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,22 @@ service StorageNode {
// ---------------- RegisterRecording ------------------

message RegisterRecordingsRequest {
// human readable description of the recording
string description = 1;
// information about recording's backing storage
// TODO(zehiko) add separate info about the "source" recording
ObjectStorage obj_storage = 2;
// TODO(zehiko) should this be auto-discoverable?
// type of recording
RecordingType typ = 3;
// (optional) any additional metadata that should be associated with the recording
// You can associate any arbtrirary number of columns with a specific recording
RecordingMetadata metadata = 4;
}

// Recording metadata is single row arrow record batch
message RecordingMetadata {
EncoderVersion encoder_version = 1;
bytes payload = 2;
}

message ObjectStorage {
Expand Down Expand Up @@ -51,13 +63,8 @@ message GetRecordingMetadataRequest {
}

message GetRecordingMetadataResponse {
RecordingMetadata metadata = 1;
}

message RecordingMetadata {
RecordingId id = 1;
Schema schema = 2;
repeated TimeMetadata time_metadata = 3;
RecordingMetadata metadata = 2;
}

message TimeMetadata {
Expand Down
173 changes: 141 additions & 32 deletions crates/store/re_remote_store_types/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use arrow2::array::Array as ArrowArray;
use arrow2::chunk::Chunk as ArrowChunk;
use arrow2::datatypes::Schema as ArrowSchema;
use arrow2::error::Error as ArrowError;
use arrow2::io::ipc::{read, write};
use re_dataframe::TransportChunk;

use crate::v0::EncoderVersion;
use crate::v0::{EncoderVersion, RecordingMetadata};

#[derive(Debug, thiserror::Error)]
pub enum CodecError {
Expand All @@ -23,6 +26,9 @@ pub enum CodecError {

#[error("Unknown message header")]
UnknownMessageHeader,

#[error("Invalid argument: {0}")]
InvalidArgument(String),
}

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
Expand Down Expand Up @@ -73,14 +79,7 @@ impl TransportMessageV0 {
let mut data: Vec<u8> = Vec::new();
MessageHader::RECORD_BATCH.encode(&mut data)?;

let options = write::WriteOptions { compression: None };
let mut sw = write::StreamWriter::new(&mut data, options);

sw.start(&chunk.schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(&chunk.data, None)
.map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;
write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?;

Ok(data)
}
Expand All @@ -94,29 +93,14 @@ impl TransportMessageV0 {
match header {
MessageHader::NO_DATA => Ok(Self::NoData),
MessageHader::RECORD_BATCH => {
let metadata = read::read_stream_metadata(&mut reader)
.map_err(CodecError::ArrowSerialization)?;
let mut stream = read::StreamReader::new(&mut reader, metadata, None);

let schema = stream.schema().clone();
// there should be at least one record batch in the stream
// TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function??
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;

match stream_state {
read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState),
read::StreamState::Some(chunk) => {
let tc = TransportChunk {
schema: schema.clone(),
data: chunk,
};

Ok(Self::RecordBatch(tc))
}
}
let (schema, data) = read_arrow_from_bytes(&mut reader)?;

let tc = TransportChunk {
schema: schema.clone(),
data,
};

Ok(Self::RecordBatch(tc))
}
_ => Err(CodecError::UnknownMessageHeader),
}
Expand Down Expand Up @@ -154,11 +138,95 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result<Option<TransportCh
}
}

impl RecordingMetadata {
/// Create `RecordingMetadata` from arrow schema and arrow record batch
pub fn try_from(
version: EncoderVersion,
schema: &ArrowSchema,
unit_batch: &ArrowChunk<Box<dyn ArrowArray>>,
) -> Result<Self, CodecError> {
if unit_batch.len() > 1 {
return Err(CodecError::InvalidArgument(format!(
"metadata record batch can only have a single row, batch with {} rows given",
unit_batch.len()
)));
}

match version {
EncoderVersion::V0 => {
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, schema, unit_batch)?;

Ok(Self {
encoder_version: version as i32,
payload: data,
})
}
}
}

/// Get metadata as arrow data
pub fn data(&self) -> Result<(ArrowSchema, ArrowChunk<Box<dyn ArrowArray>>), CodecError> {
let mut reader = std::io::Cursor::new(self.payload.clone());

let encoder_version = EncoderVersion::try_from(self.encoder_version)
.map_err(|err| CodecError::InvalidArgument(err.to_string()))?;

match encoder_version {
EncoderVersion::V0 => read_arrow_from_bytes(&mut reader),
}
}
}

/// Helper function that serializes given arrow schema and record batch into bytes
/// using Arrow IPC format.
fn write_arrow_to_bytes<W: std::io::Write>(
writer: &mut W,
schema: &ArrowSchema,
data: &ArrowChunk<Box<dyn ArrowArray>>,
) -> Result<(), CodecError> {
let options = write::WriteOptions { compression: None };
let mut sw = write::StreamWriter::new(writer, options);

sw.start(schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(data, None)
.map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;

Ok(())
}

/// Helper function that deserializes raw bytes into arrow schema and record batch
/// using Arrow IPC format.
fn read_arrow_from_bytes<R: std::io::Read>(
reader: &mut R,
) -> Result<(ArrowSchema, ArrowChunk<Box<dyn ArrowArray>>), CodecError> {
let metadata = read::read_stream_metadata(reader).map_err(CodecError::ArrowSerialization)?;
let mut stream = read::StreamReader::new(reader, metadata, None);

let schema = stream.schema().clone();
// there should be at least one record batch in the stream
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;

match stream_state {
read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState),
read::StreamState::Some(chunk) => Ok((schema, chunk)),
}
}

#[cfg(test)]
mod tests {

use arrow2::chunk::Chunk as ArrowChunk;
use arrow2::{array::Int32Array, datatypes::Field, datatypes::Schema as ArrowSchema};
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_log_types::{example_components::MyPoint, Timeline};

use crate::v0::RecordingMetadata;
use crate::{
codec::{decode, encode, CodecError, TransportMessageV0},
v0::EncoderVersion,
Expand Down Expand Up @@ -250,4 +318,45 @@ mod tests {

assert_eq!(expected_chunk, decoded_chunk);
}

#[test]
fn test_recording_metadata_serialization() {
let expected_schema = ArrowSchema::from(vec![Field::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
let my_ints = Int32Array::from_slice([42]);
let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);

let metadata =
RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk)
.unwrap();

let (schema, chunk) = metadata.data().unwrap();

assert_eq!(expected_schema, schema);
assert_eq!(expected_chunk, chunk);
}

#[test]
fn test_recording_metadata_fails_with_non_unit_batch() {
let expected_schema = ArrowSchema::from(vec![Field::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
// more than 1 row in the batch
let my_ints = Int32Array::from_slice([41, 42]);

let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);

let metadata =
RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk);

assert!(matches!(
metadata.err().unwrap(),
CodecError::InvalidArgument(_)
));
}
}
26 changes: 17 additions & 9 deletions crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,28 @@ impl ErrorCode {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RegisterRecordingsRequest {
/// human readable description of the recording
#[prost(string, tag = "1")]
pub description: ::prost::alloc::string::String,
/// information about recording's backing storage
/// TODO(zehiko) add separate info about the "source" recording
#[prost(message, optional, tag = "2")]
pub obj_storage: ::core::option::Option<ObjectStorage>,
/// TODO(zehiko) should this be auto-discoverable?
/// type of recording
#[prost(enumeration = "RecordingType", tag = "3")]
pub typ: i32,
/// (optional) any additional metadata that should be associated with the recording
/// You can associate any arbtrirary number of columns with a specific recording
#[prost(message, optional, tag = "4")]
pub metadata: ::core::option::Option<RecordingMetadata>,
}
/// Recording metadata is single row arrow record batch
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RecordingMetadata {
#[prost(enumeration = "EncoderVersion", tag = "1")]
pub encoder_version: i32,
#[prost(bytes = "vec", tag = "2")]
pub payload: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ObjectStorage {
Expand Down Expand Up @@ -288,17 +303,10 @@ pub struct GetRecordingMetadataRequest {
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetRecordingMetadataResponse {
#[prost(message, optional, tag = "1")]
pub metadata: ::core::option::Option<RecordingMetadata>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct RecordingMetadata {
#[prost(message, optional, tag = "1")]
pub id: ::core::option::Option<RecordingId>,
#[prost(message, optional, tag = "2")]
pub schema: ::core::option::Option<Schema>,
#[prost(message, repeated, tag = "3")]
pub time_metadata: ::prost::alloc::vec::Vec<TimeMetadata>,
pub metadata: ::core::option::Option<RecordingMetadata>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TimeMetadata {
Expand Down
Loading