From af6c0ddaa1a90f87e7ef883526e98e4d002afca3 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 19 Dec 2024 16:47:23 +0100 Subject: [PATCH 1/4] grpc codec simplification - get rid of NoData message --- crates/store/re_grpc_client/src/lib.rs | 10 +--- .../re_log_encoding/src/codec/wire/decoder.rs | 52 ++++------------- .../re_log_encoding/src/codec/wire/encoder.rs | 48 ++------------- .../re_log_encoding/src/codec/wire/mod.rs | 58 +------------------ rerun_py/src/remote.rs | 3 - 5 files changed, 18 insertions(+), 153 deletions(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index f324862dc3a8..9e6fa3bf7680 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -191,12 +191,11 @@ async fn stream_recording_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -225,12 +224,11 @@ async fn stream_recording_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }); drop(client); @@ -340,7 +338,6 @@ async fn stream_catalog_async( re_log::debug!("Fetching catalog…"); let mut resp = client - // TODO(zehiko) add support for fetching specific columns and rows .query_catalog(QueryCatalogRequest { column_projection: None, // fetch all columns filter: None, // fetch all rows @@ -348,12 +345,11 @@ async fn stream_catalog_async( .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }); drop(client); diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index 06b9bccdbc23..db0db4613cd1 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -1,55 +1,23 @@ -use super::MessageHeader; -use super::TransportMessageV0; use crate::codec::arrow::read_arrow_from_bytes; use crate::codec::CodecError; use re_chunk::TransportChunk; -impl MessageHeader { - pub(crate) fn decode(read: &mut impl std::io::Read) -> Result { - let mut buffer = [0_u8; Self::SIZE_BYTES]; - read.read_exact(&mut buffer) - .map_err(CodecError::HeaderDecoding)?; - - let header = u8::from_le(buffer[0]); - - Ok(Self(header)) - } -} - -impl TransportMessageV0 { - pub(crate) fn from_bytes(data: &[u8]) -> Result { - let mut reader = std::io::Cursor::new(data); - let header = MessageHeader::decode(&mut reader)?; - - match header { - MessageHeader::NO_DATA => Ok(Self::NoData), - MessageHeader::RECORD_BATCH => { - let (schema, data) = read_arrow_from_bytes(&mut reader)?; - - let tc = TransportChunk { - schema: schema.clone(), - data, - }; - - Ok(Self::RecordBatch(tc)) - } - _ => Err(CodecError::UnknownMessageHeader), - } - } -} - /// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`. pub fn decode( version: re_protos::common::v0::EncoderVersion, data: &[u8], -) -> Result, CodecError> { +) -> Result { match version { re_protos::common::v0::EncoderVersion::V0 => { - let msg = TransportMessageV0::from_bytes(data)?; - match msg { - TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)), - TransportMessageV0::NoData => Ok(None), - } + let mut reader = std::io::Cursor::new(data); + let (schema, data) = read_arrow_from_bytes(&mut reader)?; + + let tc = TransportChunk { + schema: schema.clone(), + data, + }; + + Ok(tc) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/encoder.rs b/crates/store/re_log_encoding/src/codec/wire/encoder.rs index e6ae62c1e1b7..c2e8312762fd 100644 --- a/crates/store/re_log_encoding/src/codec/wire/encoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/encoder.rs @@ -1,50 +1,7 @@ -use super::MessageHeader; -use super::TransportMessageV0; use crate::codec::arrow::write_arrow_to_bytes; use crate::codec::CodecError; use re_chunk::TransportChunk; -impl MessageHeader { - pub(crate) fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> { - write - .write_all(&[self.0]) - .map_err(CodecError::HeaderEncoding)?; - - Ok(()) - } -} - -impl TransportMessageV0 { - pub(crate) fn to_bytes(&self) -> Result, CodecError> { - match self { - Self::NoData => { - let mut data: Vec = Vec::new(); - MessageHeader::NO_DATA.encode(&mut data)?; - Ok(data) - } - Self::RecordBatch(chunk) => { - let mut data: Vec = Vec::new(); - MessageHeader::RECORD_BATCH.encode(&mut data)?; - - write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; - - Ok(data) - } - } - } -} - -/// Encode a `NoData` message into a byte stream. This can be used by the remote store -/// (i.e. data producer) to signal back to the client that there's no data available. -pub fn no_data(version: re_protos::common::v0::EncoderVersion) -> Result, CodecError> { - match version { - re_protos::common::v0::EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(), - } -} - -// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead -// of sending schema in each transport message for the same stream of batches. This will require codec -// to become stateful and keep track if schema was sent / received. /// Encode a transport chunk into a byte stream. pub fn encode( version: re_protos::common::v0::EncoderVersion, @@ -52,7 +9,10 @@ pub fn encode( ) -> Result, CodecError> { match version { re_protos::common::v0::EncoderVersion::V0 => { - TransportMessageV0::RecordBatch(chunk).to_bytes() + let mut data: Vec = Vec::new(); + write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?; + + Ok(data) } } } diff --git a/crates/store/re_log_encoding/src/codec/wire/mod.rs b/crates/store/re_log_encoding/src/codec/wire/mod.rs index 587e9e31e2ce..19ab51d39d54 100644 --- a/crates/store/re_log_encoding/src/codec/wire/mod.rs +++ b/crates/store/re_log_encoding/src/codec/wire/mod.rs @@ -4,24 +4,6 @@ pub mod encoder; pub use decoder::decode; pub use encoder::encode; -use re_chunk::TransportChunk; - -#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] -pub struct MessageHeader(pub u8); - -impl MessageHeader { - pub const NO_DATA: Self = Self(1); - pub const RECORD_BATCH: Self = Self(2); - - pub const SIZE_BYTES: usize = 1; -} - -#[derive(Debug)] -pub enum TransportMessageV0 { - NoData, - RecordBatch(TransportChunk), -} - #[cfg(test)] mod tests { use crate::{ @@ -55,32 +37,6 @@ mod tests { .unwrap() } - #[test] - fn test_message_v0_no_data() { - let msg = TransportMessageV0::NoData; - let data = msg.to_bytes().unwrap(); - let decoded = TransportMessageV0::from_bytes(&data).unwrap(); - assert!(matches!(decoded, TransportMessageV0::NoData)); - } - - #[test] - fn test_message_v0_record_batch() { - let expected_chunk = get_test_chunk(); - - let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap()); - let data = msg.to_bytes().unwrap(); - let decoded = TransportMessageV0::from_bytes(&data).unwrap(); - - #[allow(clippy::match_wildcard_for_single_variants)] - match decoded { - TransportMessageV0::RecordBatch(transport) => { - let decoded_chunk = Chunk::from_transport(&transport).unwrap(); - assert_eq!(expected_chunk, decoded_chunk); - } - _ => panic!("unexpected message type"), - } - } - #[test] fn test_invalid_batch_data() { let data = vec![2, 3, 4]; // '1' is NO_DATA message header @@ -92,18 +48,6 @@ mod tests { )); } - #[test] - fn test_unknown_header() { - let data = vec![3]; - let decoded = TransportMessageV0::from_bytes(&data); - assert!(decoded.is_err()); - - assert!(matches!( - decoded.err().unwrap(), - CodecError::UnknownMessageHeader - )); - } - #[test] fn test_v0_codec() { let expected_chunk = get_test_chunk(); @@ -113,7 +57,7 @@ mod tests { expected_chunk.clone().to_transport().unwrap(), ) .unwrap(); - let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap(); + let decoded = decode(EncoderVersion::V0, &encoded).unwrap(); let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); assert_eq!(expected_chunk, decoded_chunk); diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 2619d3990088..2c5557747295 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -102,7 +102,6 @@ impl PyStorageNodeClient { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, _>>() .await @@ -187,8 +186,6 @@ impl PyStorageNodeClient { .into_inner(); let metadata = decode(resp.encoder_version(), &resp.payload) .map_err(|err| PyRuntimeError::new_err(err.to_string()))? - // TODO(zehiko) this is going away soon - .ok_or(PyRuntimeError::new_err("No metadata"))?; let recording_id = metadata .all_columns() From c471836d73578daff7e49ad302a628cce1214b11 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:37:20 +0100 Subject: [PATCH 2/4] small fix --- rerun_py/src/remote.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 2c5557747295..cca8d58bc0dc 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -97,7 +97,7 @@ impl PyStorageNodeClient { .await .map_err(|err| PyRuntimeError::new_err(err.to_string()))? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) @@ -185,7 +185,7 @@ impl PyStorageNodeClient { .map_err(|err| PyRuntimeError::new_err(err.to_string()))? .into_inner(); let metadata = decode(resp.encoder_version(), &resp.payload) - .map_err(|err| PyRuntimeError::new_err(err.to_string()))? + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let recording_id = metadata .all_columns() @@ -294,10 +294,6 @@ impl PyStorageNodeClient { let tc = decode(EncoderVersion::V0, &response.payload) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let Some(tc) = tc else { - return Err(PyRuntimeError::new_err("Stream error")); - }; - let chunk = Chunk::from_transport(&tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; From b5c13a91f53151757d83a0ab8b4f8e2c6f4847e4 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:22:12 +0100 Subject: [PATCH 3/4] fix error type inference --- rerun_py/src/remote.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 8207a85c4ad1..13ebf6c172ec 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -107,12 +107,11 @@ impl PyStorageNodeClient { .await .map_err(re_grpc_client::TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -156,12 +155,11 @@ impl PyStorageNodeClient { .await .map_err(TonicStatusError)? .into_inner() - .filter_map(|resp| { + .map(|resp| { resp.and_then(|r| { decode(r.encoder_version(), &r.payload) .map_err(|err| tonic::Status::internal(err.to_string())) }) - .transpose() }) .collect::, tonic::Status>>() .await @@ -431,9 +429,12 @@ impl PyStorageNodeClient { while let Some(result) = resp.next().await { let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let tc = decode(EncoderVersion::V0, &response.payload) - .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - + let tc = match decode(EncoderVersion::V0, &response.payload) { + Ok(tc) => tc, + Err(err) => { + return Err(PyRuntimeError::new_err(err.to_string())); + } + }; let chunk = Chunk::from_transport(&tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; @@ -443,9 +444,9 @@ impl PyStorageNodeClient { } Ok(store) - })?; + }); - let handle = ChunkStoreHandle::new(store); + let handle = ChunkStoreHandle::new(store?); let cache = re_dataframe::QueryCacheHandle::new(re_dataframe::QueryCache::new(handle.clone())); From 97cc07c74b0aadd0557b1bf5a28e1b2cf26a318a Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:55:11 +0100 Subject: [PATCH 4/4] more small fixes --- crates/store/re_log_encoding/src/codec/wire/encoder.rs | 2 +- crates/store/re_log_encoding/src/codec/wire/mod.rs | 10 +++++----- rerun_py/src/remote.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/store/re_log_encoding/src/codec/wire/encoder.rs b/crates/store/re_log_encoding/src/codec/wire/encoder.rs index c2e8312762fd..87438fd0a33c 100644 --- a/crates/store/re_log_encoding/src/codec/wire/encoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/encoder.rs @@ -5,7 +5,7 @@ use re_chunk::TransportChunk; /// Encode a transport chunk into a byte stream. pub fn encode( version: re_protos::common::v0::EncoderVersion, - chunk: TransportChunk, + chunk: &TransportChunk, ) -> Result, CodecError> { match version { re_protos::common::v0::EncoderVersion::V0 => { diff --git a/crates/store/re_log_encoding/src/codec/wire/mod.rs b/crates/store/re_log_encoding/src/codec/wire/mod.rs index 19ab51d39d54..0bc6c702718a 100644 --- a/crates/store/re_log_encoding/src/codec/wire/mod.rs +++ b/crates/store/re_log_encoding/src/codec/wire/mod.rs @@ -7,7 +7,7 @@ pub use encoder::encode; #[cfg(test)] mod tests { use crate::{ - codec::wire::{decode, encode, TransportMessageV0}, + codec::wire::{decode, encode}, codec::CodecError, }; use re_chunk::{Chunk, RowId}; @@ -38,9 +38,9 @@ mod tests { } #[test] - fn test_invalid_batch_data() { - let data = vec![2, 3, 4]; // '1' is NO_DATA message header - let decoded = TransportMessageV0::from_bytes(&data); + fn test_invalid_data() { + let data = vec![2, 3, 4]; + let decoded = decode(EncoderVersion::V0, &data); assert!(matches!( decoded.err().unwrap(), @@ -54,7 +54,7 @@ mod tests { let encoded = encode( EncoderVersion::V0, - expected_chunk.clone().to_transport().unwrap(), + &expected_chunk.clone().to_transport().unwrap(), ) .unwrap(); let decoded = decode(EncoderVersion::V0, &encoded).unwrap(); diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 13ebf6c172ec..34d436aaf828 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -269,7 +269,7 @@ impl PyStorageNodeClient { let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata); - encode(EncoderVersion::V0, metadata_tc) + encode(EncoderVersion::V0, &metadata_tc) .map_err(|err| PyRuntimeError::new_err(err.to_string())) }) .transpose()? @@ -339,7 +339,7 @@ impl PyStorageNodeClient { let request = UpdateCatalogRequest { metadata: Some(DataframePart { encoder_version: EncoderVersion::V0 as i32, - payload: encode(EncoderVersion::V0, metadata_tc) + payload: encode(EncoderVersion::V0, &metadata_tc) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?, }), };