Skip to content

Commit

Permalink
grpc codec simplification - get rid of NoData message (#8550)
Browse files Browse the repository at this point in the history
  • Loading branch information
zehiko authored Dec 19, 2024
1 parent d65c9ec commit cee46d6
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 175 deletions.
10 changes: 3 additions & 7 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ async fn stream_recording_async(
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, tonic::Status>>()
.await
Expand Down Expand Up @@ -225,12 +224,11 @@ async fn stream_recording_async(
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
});

drop(client);
Expand Down Expand Up @@ -340,20 +338,18 @@ async fn stream_catalog_async(
re_log::debug!("Fetching catalog…");

let mut resp = client
// TODO(zehiko) add support for fetching specific columns and rows
.query_catalog(QueryCatalogRequest {
column_projection: None, // fetch all columns
filter: None, // fetch all rows
})
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
});

drop(client);
Expand Down
52 changes: 10 additions & 42 deletions crates/store/re_log_encoding/src/codec/wire/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,23 @@
use super::MessageHeader;
use super::TransportMessageV0;
use crate::codec::arrow::read_arrow_from_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;

impl MessageHeader {
pub(crate) fn decode(read: &mut impl std::io::Read) -> Result<Self, CodecError> {
let mut buffer = [0_u8; Self::SIZE_BYTES];
read.read_exact(&mut buffer)
.map_err(CodecError::HeaderDecoding)?;

let header = u8::from_le(buffer[0]);

Ok(Self(header))
}
}

impl TransportMessageV0 {
pub(crate) fn from_bytes(data: &[u8]) -> Result<Self, CodecError> {
let mut reader = std::io::Cursor::new(data);
let header = MessageHeader::decode(&mut reader)?;

match header {
MessageHeader::NO_DATA => Ok(Self::NoData),
MessageHeader::RECORD_BATCH => {
let (schema, data) = read_arrow_from_bytes(&mut reader)?;

let tc = TransportChunk {
schema: schema.clone(),
data,
};

Ok(Self::RecordBatch(tc))
}
_ => Err(CodecError::UnknownMessageHeader),
}
}
}

/// Decode transport data from a byte stream - if there's a record batch present, return it, otherwise return `None`.
pub fn decode(
version: re_protos::common::v0::EncoderVersion,
data: &[u8],
) -> Result<Option<TransportChunk>, CodecError> {
) -> Result<TransportChunk, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
let msg = TransportMessageV0::from_bytes(data)?;
match msg {
TransportMessageV0::RecordBatch(chunk) => Ok(Some(chunk)),
TransportMessageV0::NoData => Ok(None),
}
let mut reader = std::io::Cursor::new(data);
let (schema, data) = read_arrow_from_bytes(&mut reader)?;

let tc = TransportChunk {
schema: schema.clone(),
data,
};

Ok(tc)
}
}
}
50 changes: 5 additions & 45 deletions crates/store/re_log_encoding/src/codec/wire/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,18 @@
use super::MessageHeader;
use super::TransportMessageV0;
use crate::codec::arrow::write_arrow_to_bytes;
use crate::codec::CodecError;
use re_chunk::TransportChunk;

impl MessageHeader {
pub(crate) fn encode(&self, write: &mut impl std::io::Write) -> Result<(), CodecError> {
write
.write_all(&[self.0])
.map_err(CodecError::HeaderEncoding)?;

Ok(())
}
}

impl TransportMessageV0 {
pub(crate) fn to_bytes(&self) -> Result<Vec<u8>, CodecError> {
match self {
Self::NoData => {
let mut data: Vec<u8> = Vec::new();
MessageHeader::NO_DATA.encode(&mut data)?;
Ok(data)
}
Self::RecordBatch(chunk) => {
let mut data: Vec<u8> = Vec::new();
MessageHeader::RECORD_BATCH.encode(&mut data)?;

write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?;

Ok(data)
}
}
}
}

/// Encode a `NoData` message into a byte stream. This can be used by the remote store
/// (i.e. data producer) to signal back to the client that there's no data available.
pub fn no_data(version: re_protos::common::v0::EncoderVersion) -> Result<Vec<u8>, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => TransportMessageV0::NoData.to_bytes(),
}
}

// TODO(zehiko) add support for separately encoding schema from the record batch to get rid of overhead
// of sending schema in each transport message for the same stream of batches. This will require codec
// to become stateful and keep track if schema was sent / received.
/// Encode a transport chunk into a byte stream.
pub fn encode(
version: re_protos::common::v0::EncoderVersion,
chunk: TransportChunk,
chunk: &TransportChunk,
) -> Result<Vec<u8>, CodecError> {
match version {
re_protos::common::v0::EncoderVersion::V0 => {
TransportMessageV0::RecordBatch(chunk).to_bytes()
let mut data: Vec<u8> = Vec::new();
write_arrow_to_bytes(&mut data, &chunk.schema, &chunk.data)?;

Ok(data)
}
}
}
68 changes: 6 additions & 62 deletions crates/store/re_log_encoding/src/codec/wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,10 @@ pub mod encoder;
pub use decoder::decode;
pub use encoder::encode;

use re_chunk::TransportChunk;

#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)]
pub struct MessageHeader(pub u8);

impl MessageHeader {
pub const NO_DATA: Self = Self(1);
pub const RECORD_BATCH: Self = Self(2);

pub const SIZE_BYTES: usize = 1;
}

#[derive(Debug)]
pub enum TransportMessageV0 {
NoData,
RecordBatch(TransportChunk),
}

#[cfg(test)]
mod tests {
use crate::{
codec::wire::{decode, encode, TransportMessageV0},
codec::wire::{decode, encode},
codec::CodecError,
};
use re_chunk::{Chunk, RowId};
Expand Down Expand Up @@ -56,64 +38,26 @@ mod tests {
}

#[test]
fn test_message_v0_no_data() {
let msg = TransportMessageV0::NoData;
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();
assert!(matches!(decoded, TransportMessageV0::NoData));
}

#[test]
fn test_message_v0_record_batch() {
let expected_chunk = get_test_chunk();

let msg = TransportMessageV0::RecordBatch(expected_chunk.clone().to_transport().unwrap());
let data = msg.to_bytes().unwrap();
let decoded = TransportMessageV0::from_bytes(&data).unwrap();

#[allow(clippy::match_wildcard_for_single_variants)]
match decoded {
TransportMessageV0::RecordBatch(transport) => {
let decoded_chunk = Chunk::from_transport(&transport).unwrap();
assert_eq!(expected_chunk, decoded_chunk);
}
_ => panic!("unexpected message type"),
}
}

#[test]
fn test_invalid_batch_data() {
let data = vec![2, 3, 4]; // '1' is NO_DATA message header
let decoded = TransportMessageV0::from_bytes(&data);
fn test_invalid_data() {
let data = vec![2, 3, 4];
let decoded = decode(EncoderVersion::V0, &data);

assert!(matches!(
decoded.err().unwrap(),
CodecError::ArrowSerialization(_)
));
}

#[test]
fn test_unknown_header() {
let data = vec![3];
let decoded = TransportMessageV0::from_bytes(&data);
assert!(decoded.is_err());

assert!(matches!(
decoded.err().unwrap(),
CodecError::UnknownMessageHeader
));
}

#[test]
fn test_v0_codec() {
let expected_chunk = get_test_chunk();

let encoded = encode(
EncoderVersion::V0,
expected_chunk.clone().to_transport().unwrap(),
&expected_chunk.clone().to_transport().unwrap(),
)
.unwrap();
let decoded = decode(EncoderVersion::V0, &encoded).unwrap().unwrap();
let decoded = decode(EncoderVersion::V0, &encoded).unwrap();
let decoded_chunk = Chunk::from_transport(&decoded).unwrap();

assert_eq!(expected_chunk, decoded_chunk);
Expand Down
32 changes: 13 additions & 19 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,11 @@ impl PyStorageNodeClient {
.await
.map_err(re_grpc_client::TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, tonic::Status>>()
.await
Expand Down Expand Up @@ -156,12 +155,11 @@ impl PyStorageNodeClient {
.await
.map_err(TonicStatusError)?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, tonic::Status>>()
.await
Expand Down Expand Up @@ -207,12 +205,11 @@ impl PyStorageNodeClient {
.await
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
.into_inner()
.filter_map(|resp| {
.map(|resp| {
resp.and_then(|r| {
decode(r.encoder_version(), &r.payload)
.map_err(|err| tonic::Status::internal(err.to_string()))
})
.transpose()
})
.collect::<Result<Vec<_>, _>>()
.await
Expand Down Expand Up @@ -272,7 +269,7 @@ impl PyStorageNodeClient {

let metadata_tc = TransportChunk::from_arrow_record_batch(&metadata);

encode(EncoderVersion::V0, metadata_tc)
encode(EncoderVersion::V0, &metadata_tc)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
})
.transpose()?
Expand All @@ -296,9 +293,7 @@ impl PyStorageNodeClient {
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
.into_inner();
let metadata = decode(resp.encoder_version(), &resp.payload)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?
// TODO(zehiko) this is going away soon
.ok_or(PyRuntimeError::new_err("No metadata"))?;
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let recording_id = metadata
.all_columns()
Expand Down Expand Up @@ -344,7 +339,7 @@ impl PyStorageNodeClient {
let request = UpdateCatalogRequest {
metadata: Some(DataframePart {
encoder_version: EncoderVersion::V0 as i32,
payload: encode(EncoderVersion::V0, metadata_tc)
payload: encode(EncoderVersion::V0, &metadata_tc)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?,
}),
};
Expand Down Expand Up @@ -434,13 +429,12 @@ impl PyStorageNodeClient {

while let Some(result) = resp.next().await {
let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;
let tc = decode(EncoderVersion::V0, &response.payload)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let Some(tc) = tc else {
return Err(PyRuntimeError::new_err("Stream error"));
let tc = match decode(EncoderVersion::V0, &response.payload) {
Ok(tc) => tc,
Err(err) => {
return Err(PyRuntimeError::new_err(err.to_string()));
}
};

let chunk = Chunk::from_transport(&tc)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Expand All @@ -450,9 +444,9 @@ impl PyStorageNodeClient {
}

Ok(store)
})?;
});

let handle = ChunkStoreHandle::new(store);
let handle = ChunkStoreHandle::new(store?);

let cache =
re_dataframe::QueryCacheHandle::new(re_dataframe::QueryCache::new(handle.clone()));
Expand Down

0 comments on commit cee46d6

Please sign in to comment.