diff --git a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto index a10de288194c..48626eb3d19d 100644 --- a/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto +++ b/crates/store/re_remote_store_types/proto/rerun/v0/remote_store.proto @@ -15,10 +15,22 @@ service StorageNode { // ---------------- RegisterRecording ------------------ message RegisterRecordingsRequest { + // human readable description of the recording string description = 1; + // information about recording's backing storage + // TODO(zehiko) add separate info about the "source" recording ObjectStorage obj_storage = 2; - // TODO(zehiko) should this be auto-discoverable? + // type of recording RecordingType typ = 3; + // (optional) any additional metadata that should be associated with the recording + // You can associate any arbtrirary number of columns with a specific recording + RecordingMetadata metadata = 4; +} + +// Recording metadata is single row arrow record batch +message RecordingMetadata { + EncoderVersion encoder_version = 1; + bytes payload = 2; } message ObjectStorage { @@ -51,13 +63,8 @@ message GetRecordingMetadataRequest { } message GetRecordingMetadataResponse { - RecordingMetadata metadata = 1; -} - -message RecordingMetadata { RecordingId id = 1; - Schema schema = 2; - repeated TimeMetadata time_metadata = 3; + RecordingMetadata metadata = 2; } message TimeMetadata { diff --git a/crates/store/re_remote_store_types/src/codec.rs b/crates/store/re_remote_store_types/src/codec.rs index 42acc31c9734..184126b10e17 100644 --- a/crates/store/re_remote_store_types/src/codec.rs +++ b/crates/store/re_remote_store_types/src/codec.rs @@ -1,8 +1,11 @@ +use arrow2::array::Array as ArrowArray; +use arrow2::chunk::Chunk as ArrowChunk; +use arrow2::datatypes::Schema as ArrowSchema; use arrow2::error::Error as ArrowError; use arrow2::io::ipc::{read, write}; use re_dataframe::TransportChunk; -use crate::v0::EncoderVersion; +use crate::v0::{EncoderVersion, RecordingMetadata}; #[derive(Debug, thiserror::Error)] pub enum CodecError { @@ -23,6 +26,9 @@ pub enum CodecError { #[error("Unknown message header")] UnknownMessageHeader, + + #[error("Invalid argument: {0}")] + InvalidArgument(String), } #[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] @@ -73,14 +79,7 @@ impl TransportMessageV0 { let mut data: Vec = Vec::new(); MessageHader::RECORD_BATCH.encode(&mut data)?; - let options = write::WriteOptions { compression: None }; - let mut sw = write::StreamWriter::new(&mut data, options); - - sw.start(&chunk.schema, None) - .map_err(CodecError::ArrowSerialization)?; - sw.write(&chunk.data, None) - .map_err(CodecError::ArrowSerialization)?; - sw.finish().map_err(CodecError::ArrowSerialization)?; + write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; Ok(data) } @@ -94,29 +93,14 @@ impl TransportMessageV0 { match header { MessageHader::NO_DATA => Ok(Self::NoData), MessageHader::RECORD_BATCH => { - let metadata = read::read_stream_metadata(&mut reader) - .map_err(CodecError::ArrowSerialization)?; - let mut stream = read::StreamReader::new(&mut reader, metadata, None); - - let schema = stream.schema().clone(); - // there should be at least one record batch in the stream - // TODO(zehiko) isn't there a "read one record batch from bytes" arrow2 function?? - let stream_state = stream - .next() - .ok_or(CodecError::MissingRecordBatch)? - .map_err(CodecError::ArrowSerialization)?; - - match stream_state { - read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState), - read::StreamState::Some(chunk) => { - let tc = TransportChunk { - schema: schema.clone(), - data: chunk, - }; - - Ok(Self::RecordBatch(tc)) - } - } + let (schema, data) = read_arrow_from_bytes(&mut reader)?; + + let tc = TransportChunk { + schema: schema.clone(), + data, + }; + + Ok(Self::RecordBatch(tc)) } _ => Err(CodecError::UnknownMessageHeader), } @@ -154,11 +138,95 @@ pub fn decode(version: EncoderVersion, data: &[u8]) -> Result>, + ) -> Result { + if unit_batch.len() > 1 { + return Err(CodecError::InvalidArgument(format!( + "metadata record batch can only have a single row, batch with {} rows given", + unit_batch.len() + ))); + } + + match version { + EncoderVersion::V0 => { + let mut data: Vec = Vec::new(); + write_arrow_to_bytes(&mut data, schema, unit_batch)?; + + Ok(Self { + encoder_version: version as i32, + payload: data, + }) + } + } + } + + /// Get metadata as arrow data + pub fn data(&self) -> Result<(ArrowSchema, ArrowChunk>), CodecError> { + let mut reader = std::io::Cursor::new(self.payload.clone()); + + let encoder_version = EncoderVersion::try_from(self.encoder_version) + .map_err(|err| CodecError::InvalidArgument(err.to_string()))?; + + match encoder_version { + EncoderVersion::V0 => read_arrow_from_bytes(&mut reader), + } + } +} + +/// Helper function that serializes given arrow schema and record batch into bytes +/// using Arrow IPC format. +fn write_arrow_to_bytes( + writer: &mut W, + schema: &ArrowSchema, + data: &ArrowChunk>, +) -> Result<(), CodecError> { + let options = write::WriteOptions { compression: None }; + let mut sw = write::StreamWriter::new(writer, options); + + sw.start(schema, None) + .map_err(CodecError::ArrowSerialization)?; + sw.write(data, None) + .map_err(CodecError::ArrowSerialization)?; + sw.finish().map_err(CodecError::ArrowSerialization)?; + + Ok(()) +} + +/// Helper function that deserializes raw bytes into arrow schema and record batch +/// using Arrow IPC format. +fn read_arrow_from_bytes( + reader: &mut R, +) -> Result<(ArrowSchema, ArrowChunk>), CodecError> { + let metadata = read::read_stream_metadata(reader).map_err(CodecError::ArrowSerialization)?; + let mut stream = read::StreamReader::new(reader, metadata, None); + + let schema = stream.schema().clone(); + // there should be at least one record batch in the stream + let stream_state = stream + .next() + .ok_or(CodecError::MissingRecordBatch)? + .map_err(CodecError::ArrowSerialization)?; + + match stream_state { + read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState), + read::StreamState::Some(chunk) => Ok((schema, chunk)), + } +} + #[cfg(test)] mod tests { + + use arrow2::chunk::Chunk as ArrowChunk; + use arrow2::{array::Int32Array, datatypes::Field, datatypes::Schema as ArrowSchema}; use re_dataframe::external::re_chunk::{Chunk, RowId}; use re_log_types::{example_components::MyPoint, Timeline}; + use crate::v0::RecordingMetadata; use crate::{ codec::{decode, encode, CodecError, TransportMessageV0}, v0::EncoderVersion, @@ -250,4 +318,45 @@ mod tests { assert_eq!(expected_chunk, decoded_chunk); } + + #[test] + fn test_recording_metadata_serialization() { + let expected_schema = ArrowSchema::from(vec![Field::new( + "my_int", + arrow2::datatypes::DataType::Int32, + false, + )]); + let my_ints = Int32Array::from_slice([42]); + let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]); + + let metadata = + RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk) + .unwrap(); + + let (schema, chunk) = metadata.data().unwrap(); + + assert_eq!(expected_schema, schema); + assert_eq!(expected_chunk, chunk); + } + + #[test] + fn test_recording_metadata_fails_with_non_unit_batch() { + let expected_schema = ArrowSchema::from(vec![Field::new( + "my_int", + arrow2::datatypes::DataType::Int32, + false, + )]); + // more than 1 row in the batch + let my_ints = Int32Array::from_slice([41, 42]); + + let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]); + + let metadata = + RecordingMetadata::try_from(EncoderVersion::V0, &expected_schema, &expected_chunk); + + assert!(matches!( + metadata.err().unwrap(), + CodecError::InvalidArgument(_) + )); + } } diff --git a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs index b546af84c872..cdaf77447e4a 100644 --- a/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs +++ b/crates/store/re_remote_store_types/src/v0/rerun.remote_store.v0.rs @@ -244,13 +244,28 @@ impl ErrorCode { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct RegisterRecordingsRequest { + /// human readable description of the recording #[prost(string, tag = "1")] pub description: ::prost::alloc::string::String, + /// information about recording's backing storage + /// TODO(zehiko) add separate info about the "source" recording #[prost(message, optional, tag = "2")] pub obj_storage: ::core::option::Option, - /// TODO(zehiko) should this be auto-discoverable? + /// type of recording #[prost(enumeration = "RecordingType", tag = "3")] pub typ: i32, + /// (optional) any additional metadata that should be associated with the recording + /// You can associate any arbtrirary number of columns with a specific recording + #[prost(message, optional, tag = "4")] + pub metadata: ::core::option::Option, +} +/// Recording metadata is single row arrow record batch +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct RecordingMetadata { + #[prost(enumeration = "EncoderVersion", tag = "1")] + pub encoder_version: i32, + #[prost(bytes = "vec", tag = "2")] + pub payload: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ObjectStorage { @@ -288,17 +303,10 @@ pub struct GetRecordingMetadataRequest { } #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetRecordingMetadataResponse { - #[prost(message, optional, tag = "1")] - pub metadata: ::core::option::Option, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct RecordingMetadata { #[prost(message, optional, tag = "1")] pub id: ::core::option::Option, #[prost(message, optional, tag = "2")] - pub schema: ::core::option::Option, - #[prost(message, repeated, tag = "3")] - pub time_metadata: ::prost::alloc::vec::Vec, + pub metadata: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TimeMetadata {