From 8f1418bb4a2bc47659d4c65d0e462e0b6b87a1fe Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 16 Jan 2025 10:18:30 +0100 Subject: [PATCH 01/11] Add support for async streaming Decoder for rrd files --- Cargo.lock | 2 + crates/store/re_log_encoding/Cargo.toml | 10 +- .../re_log_encoding/src/codec/file/decoder.rs | 22 +- .../re_log_encoding/src/codec/file/mod.rs | 5 + .../store/re_log_encoding/src/decoder/mod.rs | 6 +- .../re_log_encoding/src/decoder/streaming.rs | 354 ++++++++++++++++++ crates/store/re_log_encoding/src/lib.rs | 25 +- 7 files changed, 403 insertions(+), 21 deletions(-) create mode 100644 crates/store/re_log_encoding/src/decoder/streaming.rs diff --git a/Cargo.lock b/Cargo.lock index 250bf25c9fdf..4f0073801057 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6114,6 +6114,8 @@ dependencies = [ "serde_test", "similar-asserts", "thiserror 1.0.65", + "tokio", + "tokio-stream", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", diff --git a/crates/store/re_log_encoding/Cargo.toml b/crates/store/re_log_encoding/Cargo.toml index abe9378978f3..de5125a66e42 100644 --- a/crates/store/re_log_encoding/Cargo.toml +++ b/crates/store/re_log_encoding/Cargo.toml @@ -23,7 +23,13 @@ all-features = true default = [] ## Enable loading data from an .rrd file. -decoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"] +decoder = [ + "dep:rmp-serde", + "dep:lz4_flex", + "re_log_types/serde", + "dep:tokio", + "dep:tokio-stream", +] ## Enable encoding of log messages to an .rrd file/stream. encoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"] @@ -60,6 +66,8 @@ thiserror.workspace = true ehttp = { workspace = true, optional = true, features = ["streaming"] } lz4_flex = { workspace = true, optional = true } rmp-serde = { workspace = true, optional = true } +tokio = { workspace = true, optional = true, features = ["io-util"] } +tokio-stream = { workspace = true, optional = true } web-time = { workspace = true, optional = true } # Web dependencies: diff --git a/crates/store/re_log_encoding/src/codec/file/decoder.rs b/crates/store/re_log_encoding/src/codec/file/decoder.rs index a9fe39e652b2..a8721fd4882d 100644 --- a/crates/store/re_log_encoding/src/codec/file/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/file/decoder.rs @@ -6,9 +6,6 @@ use re_log_types::LogMsg; use re_protos::missing_field; pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option), DecodeError> { - use re_protos::external::prost::Message; - use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo}; - let mut read_bytes = 0u64; let header = MessageHeader::decode(data)?; read_bytes += std::mem::size_of::() as u64 + header.len; @@ -16,13 +13,22 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option Result, DecodeError> { + use re_protos::external::prost::Message; + use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo}; + + let msg = match message_kind { MessageKind::SetStoreInfo => { - let set_store_info = SetStoreInfo::decode(&buf[..])?; + let set_store_info = SetStoreInfo::decode(buf)?; Some(LogMsg::SetStoreInfo(set_store_info.try_into()?)) } MessageKind::ArrowMsg => { - let arrow_msg = ArrowMsg::decode(&buf[..])?; + let arrow_msg = ArrowMsg::decode(buf)?; if arrow_msg.encoding() != Encoding::ArrowIpc { return Err(DecodeError::Codec(CodecError::UnsupportedEncoding)); } @@ -43,7 +49,7 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option { - let blueprint_activation_command = BlueprintActivationCommand::decode(&buf[..])?; + let blueprint_activation_command = BlueprintActivationCommand::decode(buf)?; Some(LogMsg::BlueprintActivationCommand( blueprint_activation_command.try_into()?, )) @@ -51,5 +57,5 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option None, }; - Ok((read_bytes, msg)) + Ok(msg) } diff --git a/crates/store/re_log_encoding/src/codec/file/mod.rs b/crates/store/re_log_encoding/src/codec/file/mod.rs index e491777409ca..3bc5276c1a33 100644 --- a/crates/store/re_log_encoding/src/codec/file/mod.rs +++ b/crates/store/re_log_encoding/src/codec/file/mod.rs @@ -48,6 +48,11 @@ impl MessageHeader { let mut buf = [0; std::mem::size_of::()]; data.read_exact(&mut buf)?; + Self::from_bytes(&buf) + } + + #[cfg(feature = "decoder")] + pub fn from_bytes(buf: &[u8]) -> Result { #[allow(clippy::unwrap_used)] // cannot fail let kind = u64::from_le_bytes(buf[0..8].try_into().unwrap()); let kind = match kind { diff --git a/crates/store/re_log_encoding/src/decoder/mod.rs b/crates/store/re_log_encoding/src/decoder/mod.rs index 0c10b855b9fc..e3b764649f38 100644 --- a/crates/store/re_log_encoding/src/decoder/mod.rs +++ b/crates/store/re_log_encoding/src/decoder/mod.rs @@ -1,6 +1,8 @@ //! Decoding [`LogMsg`]:es from `.rrd` files/streams. pub mod stream; +#[cfg(feature = "decoder")] +pub mod streaming; use std::io::BufRead as _; use std::io::Read; @@ -412,14 +414,14 @@ mod tests { }; // TODO(#3741): remove this once we are all in on arrow-rs - fn strip_arrow_extensions_from_log_messages(log_msg: Vec) -> Vec { + pub fn strip_arrow_extensions_from_log_messages(log_msg: Vec) -> Vec { log_msg .into_iter() .map(LogMsg::strip_arrow_extension_types) .collect() } - fn fake_log_messages() -> Vec { + pub fn fake_log_messages() -> Vec { let store_id = StoreId::random(StoreKind::Blueprint); let arrow_msg = re_chunk::Chunk::builder("test_entity".into()) diff --git a/crates/store/re_log_encoding/src/decoder/streaming.rs b/crates/store/re_log_encoding/src/decoder/streaming.rs new file mode 100644 index 000000000000..39c96049b07e --- /dev/null +++ b/crates/store/re_log_encoding/src/decoder/streaming.rs @@ -0,0 +1,354 @@ +use std::pin::Pin; + +use re_build_info::CrateVersion; +use re_log::external::log::warn; +use re_log_types::LogMsg; +use tokio::io::{AsyncBufRead, AsyncReadExt}; +use tokio_stream::Stream; + +use crate::{ + codec::file::{self}, + Compression, EncodingOptions, VersionPolicy, +}; + +use super::{read_options, DecodeError, FileHeader}; + +pub struct StreamingDecoder { + version: CrateVersion, + options: EncodingOptions, + reader: R, + // buffer used for uncompressing data. This is a tiny optimization + // to (potentially) avoid allocation for each (compressed) message + uncompressed: Vec, + // there are some interesting cases (like corrupted files or concatanated files) where we might + // need to know how much unprocessed bytes we have left from the last read + unprocessed_bytes: usize, +} + +/// `StreamingDecoder` relies on the underlying reader for the wakeup mechanism. +impl StreamingDecoder { + pub async fn new(version_policy: VersionPolicy, mut reader: R) -> Result { + let mut data = [0_u8; FileHeader::SIZE]; + + reader + .read_exact(&mut data) + .await + .map_err(DecodeError::Read)?; + + let (version, options) = read_options(version_policy, &data)?; + + Ok(Self { + version, + options, + reader, + uncompressed: Vec::new(), + unprocessed_bytes: 0, + }) + } + + fn peek_file_header(data: &[u8]) -> bool { + let mut read = std::io::Cursor::new(data); + FileHeader::decode(&mut read).is_ok() + } +} + +impl Stream for StreamingDecoder { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + loop { + let Self { + options, + reader, + uncompressed, + unprocessed_bytes: bytes_read, + .. + } = &mut *self; + + let serializer = options.serializer; + let compression = options.compression; + + // poll_fill_buf() implicitly handles the EOF case, so we don't need to check for it + let buf = match Pin::new(reader).poll_fill_buf(cx) { + std::task::Poll::Ready(Ok([])) => return std::task::Poll::Ready(None), + std::task::Poll::Ready(Ok(buf)) => buf, + std::task::Poll::Ready(Err(err)) => { + return std::task::Poll::Ready(Some(Err(DecodeError::Read(err)))); + } + std::task::Poll::Pending => return std::task::Poll::Pending, + }; + + // no new data to read, but still some unprocessed bytes left + // this can happen in the case of a corrupted file + if *bytes_read == buf.len() { + warn!("we have more bytes in the stream but not enough to process a message"); + return std::task::Poll::Ready(None); + } + + // check if this is a start of a new concatenated file + if buf.len() >= FileHeader::SIZE && Self::peek_file_header(&buf[..FileHeader::SIZE]) { + let data = &buf[..FileHeader::SIZE]; + // We've found another file header in the middle of the stream, it's time to switch + // gears and start over on this new file. + match read_options(VersionPolicy::Warn, data) { + Ok((version, options)) => { + self.version = CrateVersion::max(self.version, version); + self.options = options; + + // Consume the bytes we've processed + Pin::new(&mut self.reader).consume(FileHeader::SIZE); + self.unprocessed_bytes = 0; + + // Continue the loop to process more data + continue; + } + Err(err) => return std::task::Poll::Ready(Some(Err(err))), + } + } + + let (msg, read_bytes) = match serializer { + crate::Serializer::MsgPack => { + let header_size = super::MessageHeader::SIZE; + + if buf.len() < header_size { + self.unprocessed_bytes = buf.len(); + // Not enough data to read the message, need to wait for more + continue; + } + let data = &buf[..header_size]; + let header = super::MessageHeader::from_bytes(data); + + match header { + super::MessageHeader::Data { + compressed_len, + uncompressed_len, + } => { + let uncompressed_len = uncompressed_len as usize; + let compressed_len = compressed_len as usize; + uncompressed.resize(uncompressed.len().max(uncompressed_len), 0); + + // read the data + let (data, length) = match compression { + Compression::Off => { + if buf.len() < header_size + uncompressed_len { + self.unprocessed_bytes = buf.len(); + // Not enough data to read the message, need to wait for more + continue; + } + + ( + &buf[header_size..header_size + uncompressed_len], + uncompressed_len, + ) + } + + Compression::LZ4 => { + if buf.len() < header_size + compressed_len { + self.unprocessed_bytes = buf.len(); + // Not enough data to read the message, need to wait for more + continue; + } + + let data = &buf[header_size..header_size + compressed_len]; + if let Err(err) = + lz4_flex::block::decompress_into(data, uncompressed) + { + return std::task::Poll::Ready(Some(Err( + DecodeError::Lz4(err), + ))); + } + + (&uncompressed[..], compressed_len) + } + }; + + // decode the message + let msg = rmp_serde::from_slice::(data); + let read_bytes = header_size + length; + + match msg { + Ok(msg) => (Some(msg), read_bytes), + Err(err) => { + return std::task::Poll::Ready(Some(Err( + DecodeError::MsgPack(err), + ))); + } + } + } + super::MessageHeader::EndOfStream => return std::task::Poll::Ready(None), + } + } + crate::Serializer::Protobuf => { + let header_size = std::mem::size_of::(); + + if buf.len() < header_size { + self.unprocessed_bytes = buf.len(); + // Not enough data to read the message, need to wait for more + continue; + } + let data = &buf[..header_size]; + let header = file::MessageHeader::from_bytes(data)?; + + if buf.len() < header_size + header.len as usize { + self.unprocessed_bytes = buf.len(); + // Not enough data to read the message, need to wait for more + continue; + } + + // decode the message + let data = &buf[header_size..header.len as usize + header_size]; + let msg = file::decoder::decode_bytes(header.kind, data)?; + + let read_bytes = header_size + header.len as usize; + + (msg, read_bytes) + } + }; + + // when is msg None? when we've reached the end of the stream + let Some(mut msg) = msg else { + // check if there's another file concatenated + if buf.len() < read_bytes + FileHeader::SIZE { + return std::task::Poll::Ready(None); + } + + let data = &buf[read_bytes..read_bytes + FileHeader::SIZE]; + if Self::peek_file_header(data) { + re_log::debug!( + "Reached end of stream, but it seems we have a concatenated file, continuing" + ); + + // Consume the bytes we've processed + Pin::new(&mut self.reader).consume(read_bytes); + self.unprocessed_bytes = 0; + + continue; + } + + re_log::debug!("Reached end of stream, iterator complete"); + return std::task::Poll::Ready(None); + }; + + if let LogMsg::SetStoreInfo(msg) = &mut msg { + // Propagate the protocol version from the header into the `StoreInfo` so that all + // parts of the app can easily access it. + msg.info.store_version = Some(self.version); + } + + // Consume the bytes we've processed + Pin::new(&mut self.reader).consume(read_bytes); + self.unprocessed_bytes = 0; + + return std::task::Poll::Ready(Some(Ok(msg))); + } + } +} + +#[cfg(all(test, feature = "decoder", feature = "encoder"))] +mod tests { + use re_build_info::CrateVersion; + use tokio_stream::StreamExt; + + use crate::{ + decoder::{ + streaming::StreamingDecoder, + tests::{fake_log_messages, strip_arrow_extensions_from_log_messages}, + }, + Compression, EncodingOptions, Serializer, VersionPolicy, + }; + + #[tokio::test] + async fn test_streaming_decoder_handles_corrupted_input_file() { + let rrd_version = CrateVersion::LOCAL; + + let messages = fake_log_messages(); + + let options = [ + EncodingOptions { + compression: Compression::Off, + serializer: Serializer::MsgPack, + }, + EncodingOptions { + compression: Compression::LZ4, + serializer: Serializer::MsgPack, + }, + EncodingOptions { + compression: Compression::Off, + serializer: Serializer::Protobuf, + }, + EncodingOptions { + compression: Compression::LZ4, + serializer: Serializer::Protobuf, + }, + ]; + + for options in options { + let mut data = vec![]; + crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut data) + .unwrap(); + + // We cut the input file by one byte to simulate a corrupted file and check that we don't end up in an infinite loop + // waiting for more data when there's none to be read. + let data = &data[..data.len() - 1]; + + let buf_reader = tokio::io::BufReader::new(std::io::Cursor::new(data)); + + let decoder = StreamingDecoder::new(VersionPolicy::Error, buf_reader) + .await + .unwrap(); + + let decoded_messages = strip_arrow_extensions_from_log_messages( + decoder.collect::, _>>().await.unwrap(), + ); + + similar_asserts::assert_eq!(decoded_messages, messages); + } + } + + #[tokio::test] + async fn test_streaming_decoder_happy_paths() { + let rrd_version = CrateVersion::LOCAL; + + let messages = fake_log_messages(); + + let options = [ + EncodingOptions { + compression: Compression::Off, + serializer: Serializer::MsgPack, + }, + EncodingOptions { + compression: Compression::LZ4, + serializer: Serializer::MsgPack, + }, + EncodingOptions { + compression: Compression::Off, + serializer: Serializer::Protobuf, + }, + EncodingOptions { + compression: Compression::LZ4, + serializer: Serializer::Protobuf, + }, + ]; + + for options in options { + let mut data = vec![]; + crate::encoder::encode_ref(rrd_version, options, messages.iter().map(Ok), &mut data) + .unwrap(); + + let buf_reader = tokio::io::BufReader::new(std::io::Cursor::new(data)); + + let decoder = StreamingDecoder::new(VersionPolicy::Error, buf_reader) + .await + .unwrap(); + + let decoded_messages = strip_arrow_extensions_from_log_messages( + decoder.collect::, _>>().await.unwrap(), + ); + + similar_asserts::assert_eq!(decoded_messages, messages); + } + } +} diff --git a/crates/store/re_log_encoding/src/lib.rs b/crates/store/re_log_encoding/src/lib.rs index 709b96fcdb4f..3fad54e5a963 100644 --- a/crates/store/re_log_encoding/src/lib.rs +++ b/crates/store/re_log_encoding/src/lib.rs @@ -217,23 +217,28 @@ impl MessageHeader { #[cfg(feature = "decoder")] pub fn decode(read: &mut impl std::io::Read) -> Result { - fn u32_from_le_slice(bytes: &[u8]) -> u32 { - u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) - } - let mut buffer = [0_u8; Self::SIZE]; read.read_exact(&mut buffer) .map_err(decoder::DecodeError::Read)?; - if u32_from_le_slice(&buffer[0..4]) == 0 && u32_from_le_slice(&buffer[4..]) == 0 { - Ok(Self::EndOfStream) + Ok(Self::from_bytes(&buffer)) + } + + #[cfg(feature = "decoder")] + pub fn from_bytes(data: &[u8]) -> Self { + fn u32_from_le_slice(bytes: &[u8]) -> u32 { + u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) + } + + if u32_from_le_slice(&data[0..4]) == 0 && u32_from_le_slice(&data[4..]) == 0 { + Self::EndOfStream } else { - let compressed = u32_from_le_slice(&buffer[0..4]); - let uncompressed = u32_from_le_slice(&buffer[4..]); - Ok(Self::Data { + let compressed = u32_from_le_slice(&data[0..4]); + let uncompressed = u32_from_le_slice(&data[4..]); + Self::Data { compressed_len: compressed, uncompressed_len: uncompressed, - }) + } } } } From d1d7e103ccd3cc84c2ed4a033b6eaf3a2538abbf Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 16 Jan 2025 13:24:42 +0100 Subject: [PATCH 02/11] define Catalog fields name in the protobuf spec crate and use it in the catalog view --- crates/store/re_grpc_client/src/lib.rs | 26 +++++++++++++++++--------- crates/store/re_protos/src/lib.rs | 12 ++++++++++++ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 63471503527c..c3938634126f 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -22,7 +22,8 @@ use re_protos::{ common::v0::RecordingId, remote_store::v0::{ storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest, - QueryCatalogRequest, + QueryCatalogRequest, CATALOG_APP_ID_FIELD_NAME, CATALOG_ID_FIELD_NAME, + CATALOG_START_TIME_FIELD_NAME, }, }; use re_types::{ @@ -280,29 +281,36 @@ pub fn store_info_from_catalog_chunk( ) -> Result { let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned()); + println!("TC: {tc:?}"); let (_field, data) = tc .components() - .find(|(f, _)| f.name() == "application_id") + .find(|(f, _)| f.name() == CATALOG_APP_ID_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no application_id field found".to_owned(), + reason: "no {APP_ID_FIELD_NAME} field found".to_owned(), }))?; let app_id = data .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: format!("application_id must be a utf8 array: {:?}", tc.schema_ref()), + reason: format!( + "{CATALOG_APP_ID_FIELD_NAME} must be a utf8 array: {:?}", + tc.schema_ref() + ), }))? .value(0); let (_field, data) = tc .components() - .find(|(f, _)| f.name() == "start_time") + .find(|(f, _)| f.name() == CATALOG_START_TIME_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no start_time field found".to_owned(), + reason: "no {START_TIME_FIELD}} field found".to_owned(), }))?; let start_time = data - .downcast_array_ref::() + .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: format!("start_time must be an int64 array: {:?}", tc.schema_ref()), + reason: format!( + "{CATALOG_START_TIME_FIELD_NAME} must be a Timestamp array: {:?}", + tc.schema_ref() + ), }))? .value(0); @@ -484,7 +492,7 @@ async fn stream_catalog_async( )))?; let recording_uri_arrays: Vec = chunk - .iter_slices::("id".into()) + .iter_slices::(CATALOG_ID_FIELD_NAME.into()) .map(|id| { let rec_id = &id[0]; // each component batch is of length 1 i.e. single 'id' value diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index e41036697752..df3a13b66375 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -48,8 +48,20 @@ pub mod log_msg { /// Generated types for the remote store gRPC service API v0. pub mod remote_store { + pub mod v0 { pub use crate::v0::rerun_remote_store_v0::*; + + /// Recording catalog mandatory field names. All mandatory metadata fields are prefixed + /// with "rerun_" to avoid conflicts with user-defined fields. + pub const CATALOG_ID_FIELD_NAME: &str = "rerun_recording_id"; + pub const CATALOG_APP_ID_FIELD_NAME: &str = "rerun_application_id"; + pub const CATALOG_START_TIME_FIELD_NAME: &str = "rerun_start_time"; + pub const CATALOG_DESCRIPTION_FIELD_NAME: &str = "rerun_description"; + pub const CATALOG_RECORDING_TYPE_FIELD_NAME: &str = "rerun_recording_type"; + pub const CATALOG_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url"; + pub const CATALOG_REGISTRATION_TIME_FIELD_NAME: &str = "rerun_registration_time"; + pub const CATALOG_ROW_ID_FIELD_NAME: &str = "rerun_row_id"; } } From b27c424c97b50a757c7303e1441fc29b40ec832b Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 16 Jan 2025 13:24:42 +0100 Subject: [PATCH 03/11] define Catalog fields name in the protobuf spec crate and use it in the catalog view --- crates/store/re_grpc_client/src/lib.rs | 26 +++++++++++++++++--------- crates/store/re_protos/src/lib.rs | 12 ++++++++++++ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 63471503527c..c3938634126f 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -22,7 +22,8 @@ use re_protos::{ common::v0::RecordingId, remote_store::v0::{ storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest, - QueryCatalogRequest, + QueryCatalogRequest, CATALOG_APP_ID_FIELD_NAME, CATALOG_ID_FIELD_NAME, + CATALOG_START_TIME_FIELD_NAME, }, }; use re_types::{ @@ -280,29 +281,36 @@ pub fn store_info_from_catalog_chunk( ) -> Result { let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned()); + println!("TC: {tc:?}"); let (_field, data) = tc .components() - .find(|(f, _)| f.name() == "application_id") + .find(|(f, _)| f.name() == CATALOG_APP_ID_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no application_id field found".to_owned(), + reason: "no {APP_ID_FIELD_NAME} field found".to_owned(), }))?; let app_id = data .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: format!("application_id must be a utf8 array: {:?}", tc.schema_ref()), + reason: format!( + "{CATALOG_APP_ID_FIELD_NAME} must be a utf8 array: {:?}", + tc.schema_ref() + ), }))? .value(0); let (_field, data) = tc .components() - .find(|(f, _)| f.name() == "start_time") + .find(|(f, _)| f.name() == CATALOG_START_TIME_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no start_time field found".to_owned(), + reason: "no {START_TIME_FIELD}} field found".to_owned(), }))?; let start_time = data - .downcast_array_ref::() + .downcast_array_ref::() .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: format!("start_time must be an int64 array: {:?}", tc.schema_ref()), + reason: format!( + "{CATALOG_START_TIME_FIELD_NAME} must be a Timestamp array: {:?}", + tc.schema_ref() + ), }))? .value(0); @@ -484,7 +492,7 @@ async fn stream_catalog_async( )))?; let recording_uri_arrays: Vec = chunk - .iter_slices::("id".into()) + .iter_slices::(CATALOG_ID_FIELD_NAME.into()) .map(|id| { let rec_id = &id[0]; // each component batch is of length 1 i.e. single 'id' value diff --git a/crates/store/re_protos/src/lib.rs b/crates/store/re_protos/src/lib.rs index e41036697752..df3a13b66375 100644 --- a/crates/store/re_protos/src/lib.rs +++ b/crates/store/re_protos/src/lib.rs @@ -48,8 +48,20 @@ pub mod log_msg { /// Generated types for the remote store gRPC service API v0. pub mod remote_store { + pub mod v0 { pub use crate::v0::rerun_remote_store_v0::*; + + /// Recording catalog mandatory field names. All mandatory metadata fields are prefixed + /// with "rerun_" to avoid conflicts with user-defined fields. + pub const CATALOG_ID_FIELD_NAME: &str = "rerun_recording_id"; + pub const CATALOG_APP_ID_FIELD_NAME: &str = "rerun_application_id"; + pub const CATALOG_START_TIME_FIELD_NAME: &str = "rerun_start_time"; + pub const CATALOG_DESCRIPTION_FIELD_NAME: &str = "rerun_description"; + pub const CATALOG_RECORDING_TYPE_FIELD_NAME: &str = "rerun_recording_type"; + pub const CATALOG_STORAGE_URL_FIELD_NAME: &str = "rerun_storage_url"; + pub const CATALOG_REGISTRATION_TIME_FIELD_NAME: &str = "rerun_registration_time"; + pub const CATALOG_ROW_ID_FIELD_NAME: &str = "rerun_row_id"; } } From 648ea1e260e6012beb5c75e23f955531769404ee Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 16 Jan 2025 14:43:12 +0100 Subject: [PATCH 04/11] rebase --- crates/store/re_chunk/src/transport.rs | 10 ---------- crates/store/re_dataframe/src/query.rs | 2 +- crates/store/re_grpc_client/src/lib.rs | 9 +++++---- .../re_log_encoding/src/codec/wire/decoder.rs | 18 +++++++++-------- .../re_log_encoding/src/codec/wire/mod.rs | 2 +- crates/store/re_log_types/Cargo.toml | 6 +++++- rerun_py/src/remote.rs | 20 +++++++++---------- 7 files changed, 31 insertions(+), 36 deletions(-) diff --git a/crates/store/re_chunk/src/transport.rs b/crates/store/re_chunk/src/transport.rs index 90ea338e21d9..6527da66593d 100644 --- a/crates/store/re_chunk/src/transport.rs +++ b/crates/store/re_chunk/src/transport.rs @@ -356,16 +356,6 @@ impl TransportChunk { }) } - #[inline] - pub fn fields_and_columns(&self) -> impl Iterator + '_ { - self.fields().iter().enumerate().filter_map(|(i, field)| { - self.batch - .columns() - .get(i) - .map(|column| (field.as_ref(), column)) - }) - } - /// Iterates all control columns present in this chunk. #[inline] pub fn controls(&self) -> impl Iterator { diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index e3b3313fa6d2..cd302a51709f 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1262,7 +1262,7 @@ impl QueryHandle { /// Calls [`Self::next_row`] and wraps the result in a [`ArrowRecordBatch`]. /// - /// Only use this if you absolutely need a [`RecordBatch`] as this adds a + /// Only use this if you absolutely need a [`ArrowRecordBatch`] as this adds a /// some overhead for schema validation. /// /// See [`Self::next_row`] for more information. diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index c3938634126f..a426499cb40b 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -219,7 +219,8 @@ async fn stream_recording_async( })); } - let store_info = store_info_from_catalog_chunk(&resp[0], &recording_id)?; + let store_info = + store_info_from_catalog_chunk(&TransportChunk::from(resp[0].clone()), &recording_id)?; let store_id = store_info.store_id.clone(); re_log::debug!("Fetching {recording_id}…"); @@ -256,8 +257,8 @@ async fn stream_recording_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let tc = result.map_err(TonicStatusError)?; - let chunk = Chunk::from_transport(&tc)?; + let batch = result.map_err(TonicStatusError)?; + let chunk = Chunk::from_record_batch(batch)?; if tx .send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?)) @@ -399,7 +400,7 @@ async fn stream_catalog_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let input = result.map_err(TonicStatusError)?; + let input = TransportChunk::from(result.map_err(TonicStatusError)?); // Catalog received from the ReDap server isn't suitable for direct conversion to a Rerun Chunk: // - conversion expects "data" columns to be ListArrays, hence we need to convert any individual row column data to ListArray diff --git a/crates/store/re_log_encoding/src/codec/wire/decoder.rs b/crates/store/re_log_encoding/src/codec/wire/decoder.rs index e8f3c83040be..39b7a66542e5 100644 --- a/crates/store/re_log_encoding/src/codec/wire/decoder.rs +++ b/crates/store/re_log_encoding/src/codec/wire/decoder.rs @@ -1,36 +1,38 @@ -use crate::codec::arrow::read_arrow_from_bytes; -use crate::codec::CodecError; -use re_chunk::TransportChunk; +use arrow::array::RecordBatch as ArrowRecordBatch; + use re_protos::common::v0::RerunChunk; use re_protos::remote_store::v0::DataframePart; +use crate::codec::arrow::read_arrow_from_bytes; +use crate::codec::CodecError; + /// Decode transport data from a byte stream. fn decode( version: re_protos::common::v0::EncoderVersion, data: &[u8], -) -> Result { +) -> Result { match version { re_protos::common::v0::EncoderVersion::V0 => { let mut reader = std::io::Cursor::new(data); let batch = read_arrow_from_bytes(&mut reader)?; - Ok(TransportChunk::from(batch)) + Ok(batch) } } } /// Decode an object from a its wire (protobuf) representation. pub trait Decode { - fn decode(&self) -> Result; + fn decode(&self) -> Result; } impl Decode for DataframePart { - fn decode(&self) -> Result { + fn decode(&self) -> Result { decode(self.encoder_version(), &self.payload) } } impl Decode for RerunChunk { - fn decode(&self) -> Result { + fn decode(&self) -> Result { decode(self.encoder_version(), &self.payload) } } diff --git a/crates/store/re_log_encoding/src/codec/wire/mod.rs b/crates/store/re_log_encoding/src/codec/wire/mod.rs index a08cd28a810a..7f2ea2624100 100644 --- a/crates/store/re_log_encoding/src/codec/wire/mod.rs +++ b/crates/store/re_log_encoding/src/codec/wire/mod.rs @@ -62,7 +62,7 @@ mod tests { .unwrap(); let decoded = encoded.decode().unwrap(); - let decoded_chunk = Chunk::from_transport(&decoded).unwrap(); + let decoded_chunk = Chunk::from_record_batch(decoded).unwrap(); assert_eq!(expected_chunk, decoded_chunk); } diff --git a/crates/store/re_log_types/Cargo.toml b/crates/store/re_log_types/Cargo.toml index 2437bbf3e5bf..788f02b97c4a 100644 --- a/crates/store/re_log_types/Cargo.toml +++ b/crates/store/re_log_types/Cargo.toml @@ -58,7 +58,11 @@ re_types_core.workspace = true ahash.workspace = true anyhow.workspace = true arrow = { workspace = true, features = ["ipc"] } -arrow2 = { workspace = true, features = ["io_print", "compute_concatenate"] } +arrow2 = { workspace = true, features = [ + "arrow", + "io_print", + "compute_concatenate", +] } backtrace.workspace = true bytemuck.workspace = true clean-path.workspace = true diff --git a/rerun_py/src/remote.rs b/rerun_py/src/remote.rs index 5959a385b6a9..dd0763ac8e0c 100644 --- a/rerun_py/src/remote.rs +++ b/rerun_py/src/remote.rs @@ -130,7 +130,10 @@ impl PyStorageNodeClient { )); } - re_grpc_client::store_info_from_catalog_chunk(&resp[0], id) + re_grpc_client::store_info_from_catalog_chunk( + &re_chunk::TransportChunk::from(resp[0].clone()), + id, + ) }) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; @@ -173,7 +176,7 @@ impl PyStorageNodeClient { .unwrap_or_else(|| ArrowSchema::empty().into()); Ok(RecordBatchIterator::new( - batches.into_iter().map(|tc| Ok(tc.into())), + batches.into_iter().map(Ok), schema, )) }); @@ -234,10 +237,7 @@ impl PyStorageNodeClient { .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let record_batches: Vec> = - transport_chunks - .into_iter() - .map(|tc| Ok(tc.into())) - .collect(); + transport_chunks.into_iter().map(Ok).collect(); // TODO(jleibs): surfacing this schema is awkward. This should be more explicit in // the gRPC APIs somehow. @@ -346,9 +346,7 @@ impl PyStorageNodeClient { .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; let recording_id = metadata - .fields_and_columns() - .find(|(field, _data)| field.name() == "rerun_recording_id") - .map(|(_field, data)| data) + .column_by_name("rerun_recording_id") .ok_or(PyRuntimeError::new_err("No rerun_recording_id"))? .downcast_array_ref::() .ok_or(PyRuntimeError::new_err("Recording Id is not a string"))? @@ -480,13 +478,13 @@ impl PyStorageNodeClient { while let Some(result) = resp.next().await { let response = result.map_err(|err| PyRuntimeError::new_err(err.to_string()))?; - let tc = match response.decode() { + let batch = match response.decode() { Ok(tc) => tc, Err(err) => { return Err(PyRuntimeError::new_err(err.to_string())); } }; - let chunk = Chunk::from_transport(&tc) + let chunk = Chunk::from_record_batch(batch) .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; store From fa3e44be7d76c3dcd9254270558315ead5b9c875 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:52:19 +0100 Subject: [PATCH 05/11] always consume all bytes from the reader --- Cargo.lock | 9 +- Cargo.toml | 1 + crates/store/re_log_encoding/Cargo.toml | 2 + .../store/re_log_encoding/src/decoder/mod.rs | 4 + .../re_log_encoding/src/decoder/streaming.rs | 133 ++++++++++-------- 5 files changed, 87 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f0073801057..80e9b3931d56 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1098,9 +1098,9 @@ checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cacache" @@ -3900,7 +3900,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -5218,7 +5218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.13.0", "log", "multimap", @@ -6095,6 +6095,7 @@ name = "re_log_encoding" version = "0.22.0-alpha.1+dev" dependencies = [ "arrow", + "bytes", "criterion", "ehttp", "js-sys", diff --git a/Cargo.toml b/Cargo.toml index ae3606935004..a454ba22a3b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -162,6 +162,7 @@ bit-vec = "0.8" bitflags = { version = "2.4", features = ["bytemuck"] } blackbox = "0.2.0" bytemuck = { version = "1.18", features = ["extern_crate_alloc"] } +bytes = "1.0" camino = "1.1" cargo_metadata = "0.18" cargo-run-wasm = "0.3.2" diff --git a/crates/store/re_log_encoding/Cargo.toml b/crates/store/re_log_encoding/Cargo.toml index de5125a66e42..329a1a8e70a0 100644 --- a/crates/store/re_log_encoding/Cargo.toml +++ b/crates/store/re_log_encoding/Cargo.toml @@ -29,6 +29,7 @@ decoder = [ "re_log_types/serde", "dep:tokio", "dep:tokio-stream", + "dep:bytes", ] ## Enable encoding of log messages to an .rrd file/stream. @@ -63,6 +64,7 @@ parking_lot.workspace = true thiserror.workspace = true # Optional external dependencies: +bytes = { workspace = true, optional = true } ehttp = { workspace = true, optional = true, features = ["streaming"] } lz4_flex = { workspace = true, optional = true } rmp-serde = { workspace = true, optional = true } diff --git a/crates/store/re_log_encoding/src/decoder/mod.rs b/crates/store/re_log_encoding/src/decoder/mod.rs index e3b764649f38..c170d2d756a6 100644 --- a/crates/store/re_log_encoding/src/decoder/mod.rs +++ b/crates/store/re_log_encoding/src/decoder/mod.rs @@ -179,6 +179,7 @@ impl Decoder { /// See also: /// * [`Decoder::new_concatenated`] pub fn new(version_policy: VersionPolicy, mut read: R) -> Result { + println!("DECODER NEW"); re_tracing::profile_function!(); let mut data = [0_u8; FileHeader::SIZE]; @@ -216,6 +217,7 @@ impl Decoder { version_policy: VersionPolicy, mut read: std::io::BufReader, ) -> Result { + println!("STUPID LOG WORKS"); re_tracing::profile_function!(); let mut data = [0_u8; FileHeader::SIZE]; @@ -277,6 +279,7 @@ impl Iterator for Decoder { re_tracing::profile_function!(); if self.peek_file_header() { + println!("CONCAT"); // We've found another file header in the middle of the stream, it's time to switch // gears and start over on this new file. @@ -383,6 +386,7 @@ impl Iterator for Decoder { re_log::debug!( "Reached end of stream, but it seems we have a concatenated file, continuing" ); + println!("CONCAT STREAM"); return self.next(); } diff --git a/crates/store/re_log_encoding/src/decoder/streaming.rs b/crates/store/re_log_encoding/src/decoder/streaming.rs index 39c96049b07e..b4bea2df0c7f 100644 --- a/crates/store/re_log_encoding/src/decoder/streaming.rs +++ b/crates/store/re_log_encoding/src/decoder/streaming.rs @@ -1,5 +1,6 @@ use std::pin::Pin; +use bytes::{Buf, BytesMut}; use re_build_info::CrateVersion; use re_log::external::log::warn; use re_log_types::LogMsg; @@ -20,12 +21,12 @@ pub struct StreamingDecoder { // buffer used for uncompressing data. This is a tiny optimization // to (potentially) avoid allocation for each (compressed) message uncompressed: Vec, - // there are some interesting cases (like corrupted files or concatanated files) where we might - // need to know how much unprocessed bytes we have left from the last read - unprocessed_bytes: usize, + // internal buffer for unprocessed bytes + unprocessed_bytes: BytesMut, + // flag to indicate if we're expecting more data to be read. + expect_more_data: bool, } -/// `StreamingDecoder` relies on the underlying reader for the wakeup mechanism. impl StreamingDecoder { pub async fn new(version_policy: VersionPolicy, mut reader: R) -> Result { let mut data = [0_u8; FileHeader::SIZE]; @@ -42,7 +43,8 @@ impl StreamingDecoder { options, reader, uncompressed: Vec::new(), - unprocessed_bytes: 0, + unprocessed_bytes: BytesMut::new(), + expect_more_data: false, }) } @@ -52,6 +54,9 @@ impl StreamingDecoder { } } +/// `StreamingDecoder` relies on the underlying reader for the wakeup mechanism. +/// The fact that we can have concatanated file or corrupted file pushes us to keep +/// the state of the decoder in the struct itself (through `unprocessed_bytes` and `expect_more_data`). impl Stream for StreamingDecoder { type Item = Result; @@ -64,33 +69,43 @@ impl Stream for StreamingDecoder { options, reader, uncompressed, - unprocessed_bytes: bytes_read, + unprocessed_bytes, + expect_more_data, .. } = &mut *self; let serializer = options.serializer; let compression = options.compression; + let mut buf_length = 0; // poll_fill_buf() implicitly handles the EOF case, so we don't need to check for it - let buf = match Pin::new(reader).poll_fill_buf(cx) { - std::task::Poll::Ready(Ok([])) => return std::task::Poll::Ready(None), - std::task::Poll::Ready(Ok(buf)) => buf, + match Pin::new(reader).poll_fill_buf(cx) { + std::task::Poll::Ready(Ok([])) => { + if unprocessed_bytes.is_empty() { + return std::task::Poll::Ready(None); + } + // there's more unprocessed data, but there's nothing in the underlying + // bytes stream - this indicates a corrupted stream + if *expect_more_data { + warn!("There's {} unprocessed data, but not enough for decoding a full message", unprocessed_bytes.len()); + return std::task::Poll::Ready(None); + } + } + std::task::Poll::Ready(Ok(buf)) => { + unprocessed_bytes.extend_from_slice(buf); + buf_length = buf.len(); + } std::task::Poll::Ready(Err(err)) => { return std::task::Poll::Ready(Some(Err(DecodeError::Read(err)))); } std::task::Poll::Pending => return std::task::Poll::Pending, }; - // no new data to read, but still some unprocessed bytes left - // this can happen in the case of a corrupted file - if *bytes_read == buf.len() { - warn!("we have more bytes in the stream but not enough to process a message"); - return std::task::Poll::Ready(None); - } - // check if this is a start of a new concatenated file - if buf.len() >= FileHeader::SIZE && Self::peek_file_header(&buf[..FileHeader::SIZE]) { - let data = &buf[..FileHeader::SIZE]; + if unprocessed_bytes.len() >= FileHeader::SIZE + && Self::peek_file_header(&unprocessed_bytes[..FileHeader::SIZE]) + { + let data = &unprocessed_bytes[..FileHeader::SIZE]; // We've found another file header in the middle of the stream, it's time to switch // gears and start over on this new file. match read_options(VersionPolicy::Warn, data) { @@ -98,27 +113,26 @@ impl Stream for StreamingDecoder { self.version = CrateVersion::max(self.version, version); self.options = options; - // Consume the bytes we've processed - Pin::new(&mut self.reader).consume(FileHeader::SIZE); - self.unprocessed_bytes = 0; + Pin::new(&mut self.reader).consume(buf_length); + self.unprocessed_bytes.advance(FileHeader::SIZE); - // Continue the loop to process more data continue; } Err(err) => return std::task::Poll::Ready(Some(Err(err))), } } - let (msg, read_bytes) = match serializer { + let (msg, processed_length) = match serializer { crate::Serializer::MsgPack => { let header_size = super::MessageHeader::SIZE; + if unprocessed_bytes.len() < header_size { + // Not enough data to read the header, need to wait for more + self.expect_more_data = true; + Pin::new(&mut self.reader).consume(buf_length); - if buf.len() < header_size { - self.unprocessed_bytes = buf.len(); - // Not enough data to read the message, need to wait for more continue; } - let data = &buf[..header_size]; + let data = &unprocessed_bytes[..header_size]; let header = super::MessageHeader::from_bytes(data); match header { @@ -128,31 +142,37 @@ impl Stream for StreamingDecoder { } => { let uncompressed_len = uncompressed_len as usize; let compressed_len = compressed_len as usize; - uncompressed.resize(uncompressed.len().max(uncompressed_len), 0); // read the data let (data, length) = match compression { Compression::Off => { - if buf.len() < header_size + uncompressed_len { - self.unprocessed_bytes = buf.len(); - // Not enough data to read the message, need to wait for more + if unprocessed_bytes.len() < uncompressed_len + header_size { + self.expect_more_data = true; + Pin::new(&mut self.reader).consume(buf_length); + continue; } ( - &buf[header_size..header_size + uncompressed_len], + &unprocessed_bytes + [header_size..uncompressed_len + header_size], uncompressed_len, ) } Compression::LZ4 => { - if buf.len() < header_size + compressed_len { - self.unprocessed_bytes = buf.len(); + if unprocessed_bytes.len() < compressed_len + header_size { // Not enough data to read the message, need to wait for more + self.expect_more_data = true; + Pin::new(&mut self.reader).consume(buf_length); + continue; } - let data = &buf[header_size..header_size + compressed_len]; + uncompressed + .resize(uncompressed.len().max(uncompressed_len), 0); + let data = &unprocessed_bytes + [header_size..compressed_len + header_size]; if let Err(err) = lz4_flex::block::decompress_into(data, uncompressed) { @@ -167,10 +187,9 @@ impl Stream for StreamingDecoder { // decode the message let msg = rmp_serde::from_slice::(data); - let read_bytes = header_size + length; match msg { - Ok(msg) => (Some(msg), read_bytes), + Ok(msg) => (Some(msg), length + header_size), Err(err) => { return std::task::Poll::Ready(Some(Err( DecodeError::MsgPack(err), @@ -183,48 +202,46 @@ impl Stream for StreamingDecoder { } crate::Serializer::Protobuf => { let header_size = std::mem::size_of::(); + if unprocessed_bytes.len() < header_size { + // Not enough data to read the header, need to wait for more + self.expect_more_data = true; + Pin::new(&mut self.reader).consume(buf_length); - if buf.len() < header_size { - self.unprocessed_bytes = buf.len(); - // Not enough data to read the message, need to wait for more continue; } - let data = &buf[..header_size]; + let data = &unprocessed_bytes[..header_size]; let header = file::MessageHeader::from_bytes(data)?; - if buf.len() < header_size + header.len as usize { - self.unprocessed_bytes = buf.len(); + if unprocessed_bytes.len() < header.len as usize + header_size { // Not enough data to read the message, need to wait for more + self.expect_more_data = true; + Pin::new(&mut self.reader).consume(buf_length); + continue; } // decode the message - let data = &buf[header_size..header.len as usize + header_size]; + let data = &unprocessed_bytes[header_size..header_size + header.len as usize]; let msg = file::decoder::decode_bytes(header.kind, data)?; - let read_bytes = header_size + header.len as usize; - - (msg, read_bytes) + (msg, header.len as usize + header_size) } }; - // when is msg None? when we've reached the end of the stream let Some(mut msg) = msg else { - // check if there's another file concatenated - if buf.len() < read_bytes + FileHeader::SIZE { + // we've reached the end of the stream (i.e. read the EoS header), we check if there's another file concatenated + if unprocessed_bytes.len() < processed_length + FileHeader::SIZE { return std::task::Poll::Ready(None); } - let data = &buf[read_bytes..read_bytes + FileHeader::SIZE]; + let data = + &unprocessed_bytes[processed_length..processed_length + FileHeader::SIZE]; if Self::peek_file_header(data) { re_log::debug!( "Reached end of stream, but it seems we have a concatenated file, continuing" ); - // Consume the bytes we've processed - Pin::new(&mut self.reader).consume(read_bytes); - self.unprocessed_bytes = 0; - + Pin::new(&mut self.reader).consume(buf_length); continue; } @@ -238,9 +255,9 @@ impl Stream for StreamingDecoder { msg.info.store_version = Some(self.version); } - // Consume the bytes we've processed - Pin::new(&mut self.reader).consume(read_bytes); - self.unprocessed_bytes = 0; + Pin::new(&mut self.reader).consume(buf_length); + self.unprocessed_bytes.advance(processed_length); + self.expect_more_data = false; return std::task::Poll::Ready(Some(Ok(msg))); } From 14c8fc9dfee0f7184ab5947575f958c2ac7b2fba Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 17 Jan 2025 08:06:26 +0100 Subject: [PATCH 06/11] remove println --- crates/store/re_grpc_client/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index c3938634126f..1409bd367f4a 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -281,7 +281,6 @@ pub fn store_info_from_catalog_chunk( ) -> Result { let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned()); - println!("TC: {tc:?}"); let (_field, data) = tc .components() .find(|(f, _)| f.name() == CATALOG_APP_ID_FIELD_NAME) From 3a2a8e44c765273ec6fe935a88501c107f50e29d Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 17 Jan 2025 08:11:35 +0100 Subject: [PATCH 07/11] renaming leftovers --- crates/store/re_grpc_client/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 1409bd367f4a..e85f5bb31002 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -285,7 +285,7 @@ pub fn store_info_from_catalog_chunk( .components() .find(|(f, _)| f.name() == CATALOG_APP_ID_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no {APP_ID_FIELD_NAME} field found".to_owned(), + reason: "no {CATALOG_APP_ID_FIELD_NAME} field found".to_owned(), }))?; let app_id = data .downcast_array_ref::() @@ -301,7 +301,7 @@ pub fn store_info_from_catalog_chunk( .components() .find(|(f, _)| f.name() == CATALOG_START_TIME_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no {START_TIME_FIELD}} field found".to_owned(), + reason: "no {CATALOG_START_TIME_FIELD_NAME}} field found".to_owned(), }))?; let start_time = data .downcast_array_ref::() From ad2cd56664f6497b57ea4c586814c58bef5753be Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 17 Jan 2025 08:32:51 +0100 Subject: [PATCH 08/11] rebase --- Cargo.lock | 1 - crates/store/re_grpc_client/src/lib.rs | 5 +- crates/utils/re_arrow_util/Cargo.toml | 6 +- crates/viewer/re_viewer/src/app_state.rs | 2 +- crates/viewer/re_viewport/Cargo.toml | 1 - crates/viewer/re_viewport/src/lib.rs | 1 - crates/viewer/re_viewport/src/viewport_ui.rs | 272 +----------------- .../src/auto_layout.rs | 2 +- .../viewer/re_viewport_blueprint/src/lib.rs | 3 +- .../src/viewport_blueprint.rs | 258 +++++++++++++++++ 10 files changed, 274 insertions(+), 277 deletions(-) rename crates/viewer/{re_viewport => re_viewport_blueprint}/src/auto_layout.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index 80e9b3931d56..3f9835ebed37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7070,7 +7070,6 @@ dependencies = [ "ahash", "egui", "egui_tiles", - "itertools 0.13.0", "nohash-hasher", "rayon", "re_context_menu", diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index a426499cb40b..be5fa4ba2f99 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -282,12 +282,11 @@ pub fn store_info_from_catalog_chunk( ) -> Result { let store_id = StoreId::from_string(StoreKind::Recording, recording_id.to_owned()); - println!("TC: {tc:?}"); let (_field, data) = tc .components() .find(|(f, _)| f.name() == CATALOG_APP_ID_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no {APP_ID_FIELD_NAME} field found".to_owned(), + reason: "no {CATALOG_APP_ID_FIELD_NAME} field found".to_owned(), }))?; let app_id = data .downcast_array_ref::() @@ -303,7 +302,7 @@ pub fn store_info_from_catalog_chunk( .components() .find(|(f, _)| f.name() == CATALOG_START_TIME_FIELD_NAME) .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { - reason: "no {START_TIME_FIELD}} field found".to_owned(), + reason: "no {CATALOG_START_TIME_FIELD_NAME}} field found".to_owned(), }))?; let start_time = data .downcast_array_ref::() diff --git a/crates/utils/re_arrow_util/Cargo.toml b/crates/utils/re_arrow_util/Cargo.toml index 68a36945c59d..126ecf1d831c 100644 --- a/crates/utils/re_arrow_util/Cargo.toml +++ b/crates/utils/re_arrow_util/Cargo.toml @@ -27,5 +27,9 @@ re_log.workspace = true re_tracing.workspace = true arrow.workspace = true -arrow2.workspace = true +arrow2 = { workspace = true, features = [ + "compute_concatenate", + "compute_filter", + "compute_take", +] } itertools.workspace = true diff --git a/crates/viewer/re_viewer/src/app_state.rs b/crates/viewer/re_viewer/src/app_state.rs index b2dcbb77dd60..7a2d454b3246 100644 --- a/crates/viewer/re_viewer/src/app_state.rs +++ b/crates/viewer/re_viewer/src/app_state.rs @@ -529,7 +529,7 @@ impl AppState { drag_and_drop_manager.payload_cursor_ui(ctx.egui_ctx); // Process deferred layout operations and apply updates back to blueprint: - viewport_ui.save_to_blueprint_store(&ctx, view_class_registry); + viewport_ui.save_to_blueprint_store(&ctx); if WATERMARK { ui.ctx().paint_watermark(); diff --git a/crates/viewer/re_viewport/Cargo.toml b/crates/viewer/re_viewport/Cargo.toml index fe80f4c3b184..c097b7b4483d 100644 --- a/crates/viewer/re_viewport/Cargo.toml +++ b/crates/viewer/re_viewport/Cargo.toml @@ -39,6 +39,5 @@ re_viewport_blueprint.workspace = true ahash.workspace = true egui_tiles.workspace = true egui.workspace = true -itertools.workspace = true nohash-hasher.workspace = true rayon.workspace = true diff --git a/crates/viewer/re_viewport/src/lib.rs b/crates/viewer/re_viewport/src/lib.rs index 35eaa13fc3db..4bd26c6db689 100644 --- a/crates/viewer/re_viewport/src/lib.rs +++ b/crates/viewer/re_viewport/src/lib.rs @@ -5,7 +5,6 @@ // TODO(#6330): remove unwrap() #![allow(clippy::unwrap_used)] -mod auto_layout; mod system_execution; mod view_highlights; mod viewport_ui; diff --git a/crates/viewer/re_viewport/src/viewport_ui.rs b/crates/viewer/re_viewport/src/viewport_ui.rs index 4c9035e311dc..13294947ff33 100644 --- a/crates/viewer/re_viewport/src/viewport_ui.rs +++ b/crates/viewer/re_viewport/src/viewport_ui.rs @@ -9,9 +9,8 @@ use re_context_menu::{context_menu_ui_for_item, SelectionUpdateBehavior}; use re_log_types::{EntityPath, ResolvedEntityPathRule, RuleEffect}; use re_ui::{design_tokens, ContextExt as _, DesignTokens, Icon, UiExt as _}; use re_viewer_context::{ - blueprint_id_to_tile_id, icon_for_container_kind, Contents, DragAndDropFeedback, - DragAndDropPayload, Item, PublishedViewInfo, SystemExecutionOutput, ViewClassRegistry, ViewId, - ViewQuery, ViewStates, ViewerContext, + icon_for_container_kind, Contents, DragAndDropFeedback, DragAndDropPayload, Item, + PublishedViewInfo, SystemExecutionOutput, ViewId, ViewQuery, ViewStates, ViewerContext, }; use re_viewport_blueprint::{ create_entity_add_info, ViewBlueprint, ViewportBlueprint, ViewportCommand, @@ -19,17 +18,6 @@ use re_viewport_blueprint::{ use crate::system_execution::{execute_systems_for_all_views, execute_systems_for_view}; -fn tree_simplification_options() -> egui_tiles::SimplificationOptions { - egui_tiles::SimplificationOptions { - prune_empty_tabs: false, - all_panes_must_have_tabs: true, - prune_empty_containers: false, - prune_single_child_tabs: false, - prune_single_child_containers: false, - join_nested_linear_containers: true, - } -} - // ---------------------------------------------------------------------------- /// Defines the UI and layout of the Viewport. @@ -306,258 +294,8 @@ impl ViewportUi { self.blueprint.spawn_heuristic_views(ctx); } - /// Process any deferred [`ViewportCommand`] and then save to blueprint store (if needed). - pub fn save_to_blueprint_store( - self, - ctx: &ViewerContext<'_>, - view_class_registry: &ViewClassRegistry, - ) { - re_tracing::profile_function!(); - - let Self { mut blueprint } = self; - - let commands: Vec = blueprint.deferred_commands.lock().drain(..).collect(); - - if commands.is_empty() { - return; // No changes this frame - no need to save to blueprint store. - } - - let mut run_auto_layout = false; - - for command in commands { - apply_viewport_command(ctx, &mut blueprint, command, &mut run_auto_layout); - } - - if run_auto_layout { - blueprint.tree = - super::auto_layout::tree_from_views(view_class_registry, &blueprint.views); - } - - // Simplify before we save the tree. - // `egui_tiles` also runs a simplifying pass when calling `tree.ui`, but that is too late. - // We want the simplified changes saved to the store: - blueprint.tree.simplify(&tree_simplification_options()); - - // TODO(emilk): consider diffing the tree against the state it was in at the start of the frame, - // so that we only save it if it actually changed. - - blueprint.save_tree_as_containers(ctx); - } -} - -fn apply_viewport_command( - ctx: &ViewerContext<'_>, - bp: &mut ViewportBlueprint, - command: ViewportCommand, - run_auto_layout: &mut bool, -) { - re_log::trace!("Processing viewport command: {command:?}"); - match command { - ViewportCommand::SetTree(new_tree) => { - bp.tree = new_tree; - } - - ViewportCommand::AddView { - view, - parent_container, - position_in_parent, - } => { - let view_id = view.id; - - view.save_to_blueprint_store(ctx); - bp.views.insert(view_id, view); - - if bp.auto_layout() { - // No need to add to the tree - we'll create a new tree from scratch instead. - re_log::trace!( - "Running auto-layout after adding a view because auto_layout is turned on" - ); - *run_auto_layout = true; - } else { - // Add the view to the tree: - let parent_id = parent_container.unwrap_or(bp.root_container); - re_log::trace!("Adding view {view_id} to parent {parent_id}"); - let tile_id = bp.tree.tiles.insert_pane(view_id); - let container_tile_id = blueprint_id_to_tile_id(&parent_id); - if let Some(egui_tiles::Tile::Container(container)) = - bp.tree.tiles.get_mut(container_tile_id) - { - re_log::trace!("Inserting new view into root container"); - container.add_child(tile_id); - if let Some(position_in_parent) = position_in_parent { - bp.tree.move_tile_to_container( - tile_id, - container_tile_id, - position_in_parent, - true, - ); - } - } else { - re_log::trace!( - "Parent was not a container (or not found) - will re-run auto-layout" - ); - *run_auto_layout = true; - } - } - } - - ViewportCommand::AddContainer { - container_kind, - parent_container, - } => { - let parent_id = parent_container.unwrap_or(bp.root_container); - - let tile_id = bp - .tree - .tiles - .insert_container(egui_tiles::Container::new(container_kind, vec![])); - - re_log::trace!("Adding container {container_kind:?} to parent {parent_id}"); - - if let Some(egui_tiles::Tile::Container(parent_container)) = - bp.tree.tiles.get_mut(blueprint_id_to_tile_id(&parent_id)) - { - re_log::trace!("Inserting new view into container {parent_id:?}"); - parent_container.add_child(tile_id); - } else { - re_log::trace!("Parent or root was not a container - will re-run auto-layout"); - *run_auto_layout = true; - } - } - - ViewportCommand::SetContainerKind(container_id, container_kind) => { - if let Some(egui_tiles::Tile::Container(container)) = bp - .tree - .tiles - .get_mut(blueprint_id_to_tile_id(&container_id)) - { - re_log::trace!("Mutating container {container_id:?} to {container_kind:?}"); - container.set_kind(container_kind); - } else { - re_log::trace!("No root found - will re-run auto-layout"); - } - } - - ViewportCommand::FocusTab(view_id) => { - let found = bp.tree.make_active(|_, tile| match tile { - egui_tiles::Tile::Pane(this_view_id) => *this_view_id == view_id, - egui_tiles::Tile::Container(_) => false, - }); - re_log::trace!("Found tab to focus on for view ID {view_id}: {found}"); - } - - ViewportCommand::RemoveContents(contents) => { - let tile_id = contents.as_tile_id(); - - for tile in bp.tree.remove_recursively(tile_id) { - re_log::trace!("Removing tile {tile_id:?}"); - match tile { - egui_tiles::Tile::Pane(view_id) => { - re_log::trace!("Removing view {view_id}"); - - // Remove the view from the store - if let Some(view) = bp.views.get(&view_id) { - view.clear(ctx); - } - - // If the view was maximized, clean it up - if bp.maximized == Some(view_id) { - bp.set_maximized(None, ctx); - } - - bp.views.remove(&view_id); - } - egui_tiles::Tile::Container(_) => { - // Empty containers (like this one) will be auto-removed by the tree simplification algorithm, - // that will run later because of this tree edit. - } - } - } - - bp.mark_user_interaction(ctx); - - if Some(tile_id) == bp.tree.root { - bp.tree.root = None; - } - } - - ViewportCommand::SimplifyContainer(container_id, options) => { - re_log::trace!("Simplifying tree with options: {options:?}"); - let tile_id = blueprint_id_to_tile_id(&container_id); - bp.tree.simplify_children_of_tile(tile_id, &options); - } - - ViewportCommand::MakeAllChildrenSameSize(container_id) => { - let tile_id = blueprint_id_to_tile_id(&container_id); - if let Some(egui_tiles::Tile::Container(container)) = bp.tree.tiles.get_mut(tile_id) { - match container { - egui_tiles::Container::Tabs(_) => {} - egui_tiles::Container::Linear(linear) => { - linear.shares = Default::default(); - } - egui_tiles::Container::Grid(grid) => { - grid.col_shares = Default::default(); - grid.row_shares = Default::default(); - } - } - } - } - - ViewportCommand::MoveContents { - contents_to_move, - target_container, - target_position_in_container, - } => { - re_log::trace!( - "Moving {contents_to_move:?} to container {target_container:?} at pos \ - {target_position_in_container}" - ); - - // TODO(ab): the `rev()` is better preserve ordering when moving a group of items. There - // remains some ordering (and possibly insertion point error) edge cases when dragging - // multiple item within the same container. This should be addressed by egui_tiles: - // https://github.com/rerun-io/egui_tiles/issues/90 - for contents in contents_to_move.iter().rev() { - let contents_tile_id = contents.as_tile_id(); - let target_container_tile_id = blueprint_id_to_tile_id(&target_container); - - bp.tree.move_tile_to_container( - contents_tile_id, - target_container_tile_id, - target_position_in_container, - true, - ); - } - } - - ViewportCommand::MoveContentsToNewContainer { - contents_to_move, - new_container_kind, - target_container, - target_position_in_container, - } => { - let new_container_tile_id = bp - .tree - .tiles - .insert_container(egui_tiles::Container::new(new_container_kind, vec![])); - - let target_container_tile_id = blueprint_id_to_tile_id(&target_container); - bp.tree.move_tile_to_container( - new_container_tile_id, - target_container_tile_id, - target_position_in_container, - true, // reflow grid if needed - ); - - for (pos, content) in contents_to_move.into_iter().enumerate() { - bp.tree.move_tile_to_container( - content.as_tile_id(), - new_container_tile_id, - pos, - true, // reflow grid if needed - ); - } - } + pub fn save_to_blueprint_store(self, ctx: &ViewerContext<'_>) { + self.blueprint.save_to_blueprint_store(ctx); } } @@ -877,7 +615,7 @@ impl<'a> egui_tiles::Behavior for TilesDelegate<'a, '_> { /// /// These options are applied on every frame by `egui_tiles`. fn simplification_options(&self) -> egui_tiles::SimplificationOptions { - tree_simplification_options() + re_viewport_blueprint::tree_simplification_options() } // Callbacks: diff --git a/crates/viewer/re_viewport/src/auto_layout.rs b/crates/viewer/re_viewport_blueprint/src/auto_layout.rs similarity index 99% rename from crates/viewer/re_viewport/src/auto_layout.rs rename to crates/viewer/re_viewport_blueprint/src/auto_layout.rs index 6544544fa246..9aee4e47793d 100644 --- a/crates/viewer/re_viewport/src/auto_layout.rs +++ b/crates/viewer/re_viewport_blueprint/src/auto_layout.rs @@ -9,7 +9,7 @@ use itertools::Itertools as _; use re_types::ViewClassIdentifier; use re_viewer_context::ViewId; -use re_viewport_blueprint::ViewBlueprint; +use crate::ViewBlueprint; #[derive(Clone, Debug)] struct SpaceMakeInfo { diff --git a/crates/viewer/re_viewport_blueprint/src/lib.rs b/crates/viewer/re_viewport_blueprint/src/lib.rs index 3068e8c7fb10..442afe92eedd 100644 --- a/crates/viewer/re_viewport_blueprint/src/lib.rs +++ b/crates/viewer/re_viewport_blueprint/src/lib.rs @@ -2,6 +2,7 @@ //! //! This crate provides blueprint (i.e. description) for how to render the viewport. +mod auto_layout; mod container; mod entity_add_info; pub mod ui; @@ -17,7 +18,7 @@ use re_viewer_context::ViewerContext; pub use view::ViewBlueprint; pub use view_contents::ViewContents; pub use view_properties::{entity_path_for_view_property, ViewProperty, ViewPropertyQueryError}; -pub use viewport_blueprint::ViewportBlueprint; +pub use viewport_blueprint::{tree_simplification_options, ViewportBlueprint}; pub use viewport_command::ViewportCommand; /// The entity path of the viewport blueprint in the blueprint store. diff --git a/crates/viewer/re_viewport_blueprint/src/viewport_blueprint.rs b/crates/viewer/re_viewport_blueprint/src/viewport_blueprint.rs index b2594f754134..86a0f8e38dd5 100644 --- a/crates/viewer/re_viewport_blueprint/src/viewport_blueprint.rs +++ b/crates/viewer/re_viewport_blueprint/src/viewport_blueprint.rs @@ -888,6 +888,264 @@ impl ViewportBlueprint { ctx.save_empty_blueprint_component::(&VIEWPORT_PATH.into()); } } + + /// Process any deferred [`ViewportCommand`] and then save to blueprint store (if needed). + pub fn save_to_blueprint_store(mut self, ctx: &ViewerContext<'_>) { + re_tracing::profile_function!(); + + let commands: Vec = self.deferred_commands.lock().drain(..).collect(); + + if commands.is_empty() { + return; // No changes this frame - no need to save to blueprint store. + } + + let mut run_auto_layout = false; + + for command in commands { + apply_viewport_command(ctx, &mut self, command, &mut run_auto_layout); + } + + if run_auto_layout { + self.tree = super::auto_layout::tree_from_views(ctx.view_class_registry, &self.views); + } + + // Simplify before we save the tree. + // `egui_tiles` also runs a simplifying pass when calling `tree.ui`, but that is too late. + // We want the simplified changes saved to the store: + self.tree.simplify(&tree_simplification_options()); + + // TODO(emilk): consider diffing the tree against the state it was in at the start of the frame, + // so that we only save it if it actually changed. + + self.save_tree_as_containers(ctx); + } +} + +pub fn tree_simplification_options() -> egui_tiles::SimplificationOptions { + egui_tiles::SimplificationOptions { + prune_empty_tabs: false, + all_panes_must_have_tabs: true, + prune_empty_containers: false, + prune_single_child_tabs: false, + prune_single_child_containers: false, + join_nested_linear_containers: true, + } +} + +fn apply_viewport_command( + ctx: &ViewerContext<'_>, + bp: &mut ViewportBlueprint, + command: ViewportCommand, + run_auto_layout: &mut bool, +) { + re_log::trace!("Processing viewport command: {command:?}"); + match command { + ViewportCommand::SetTree(new_tree) => { + bp.tree = new_tree; + } + + ViewportCommand::AddView { + view, + parent_container, + position_in_parent, + } => { + let view_id = view.id; + + view.save_to_blueprint_store(ctx); + bp.views.insert(view_id, view); + + if bp.auto_layout() { + // No need to add to the tree - we'll create a new tree from scratch instead. + re_log::trace!( + "Running auto-layout after adding a view because auto_layout is turned on" + ); + *run_auto_layout = true; + } else { + // Add the view to the tree: + let parent_id = parent_container.unwrap_or(bp.root_container); + re_log::trace!("Adding view {view_id} to parent {parent_id}"); + let tile_id = bp.tree.tiles.insert_pane(view_id); + let container_tile_id = blueprint_id_to_tile_id(&parent_id); + if let Some(egui_tiles::Tile::Container(container)) = + bp.tree.tiles.get_mut(container_tile_id) + { + re_log::trace!("Inserting new view into root container"); + container.add_child(tile_id); + if let Some(position_in_parent) = position_in_parent { + bp.tree.move_tile_to_container( + tile_id, + container_tile_id, + position_in_parent, + true, + ); + } + } else { + re_log::trace!( + "Parent was not a container (or not found) - will re-run auto-layout" + ); + *run_auto_layout = true; + } + } + } + + ViewportCommand::AddContainer { + container_kind, + parent_container, + } => { + let parent_id = parent_container.unwrap_or(bp.root_container); + + let tile_id = bp + .tree + .tiles + .insert_container(egui_tiles::Container::new(container_kind, vec![])); + + re_log::trace!("Adding container {container_kind:?} to parent {parent_id}"); + + if let Some(egui_tiles::Tile::Container(parent_container)) = + bp.tree.tiles.get_mut(blueprint_id_to_tile_id(&parent_id)) + { + re_log::trace!("Inserting new view into container {parent_id:?}"); + parent_container.add_child(tile_id); + } else { + re_log::trace!("Parent or root was not a container - will re-run auto-layout"); + *run_auto_layout = true; + } + } + + ViewportCommand::SetContainerKind(container_id, container_kind) => { + if let Some(egui_tiles::Tile::Container(container)) = bp + .tree + .tiles + .get_mut(blueprint_id_to_tile_id(&container_id)) + { + re_log::trace!("Mutating container {container_id:?} to {container_kind:?}"); + container.set_kind(container_kind); + } else { + re_log::trace!("No root found - will re-run auto-layout"); + } + } + + ViewportCommand::FocusTab(view_id) => { + let found = bp.tree.make_active(|_, tile| match tile { + egui_tiles::Tile::Pane(this_view_id) => *this_view_id == view_id, + egui_tiles::Tile::Container(_) => false, + }); + re_log::trace!("Found tab to focus on for view ID {view_id}: {found}"); + } + + ViewportCommand::RemoveContents(contents) => { + let tile_id = contents.as_tile_id(); + + for tile in bp.tree.remove_recursively(tile_id) { + re_log::trace!("Removing tile {tile_id:?}"); + match tile { + egui_tiles::Tile::Pane(view_id) => { + re_log::trace!("Removing view {view_id}"); + + // Remove the view from the store + if let Some(view) = bp.views.get(&view_id) { + view.clear(ctx); + } + + // If the view was maximized, clean it up + if bp.maximized == Some(view_id) { + bp.set_maximized(None, ctx); + } + + bp.views.remove(&view_id); + } + egui_tiles::Tile::Container(_) => { + // Empty containers (like this one) will be auto-removed by the tree simplification algorithm, + // that will run later because of this tree edit. + } + } + } + + bp.mark_user_interaction(ctx); + + if Some(tile_id) == bp.tree.root { + bp.tree.root = None; + } + } + + ViewportCommand::SimplifyContainer(container_id, options) => { + re_log::trace!("Simplifying tree with options: {options:?}"); + let tile_id = blueprint_id_to_tile_id(&container_id); + bp.tree.simplify_children_of_tile(tile_id, &options); + } + + ViewportCommand::MakeAllChildrenSameSize(container_id) => { + let tile_id = blueprint_id_to_tile_id(&container_id); + if let Some(egui_tiles::Tile::Container(container)) = bp.tree.tiles.get_mut(tile_id) { + match container { + egui_tiles::Container::Tabs(_) => {} + egui_tiles::Container::Linear(linear) => { + linear.shares = Default::default(); + } + egui_tiles::Container::Grid(grid) => { + grid.col_shares = Default::default(); + grid.row_shares = Default::default(); + } + } + } + } + + ViewportCommand::MoveContents { + contents_to_move, + target_container, + target_position_in_container, + } => { + re_log::trace!( + "Moving {contents_to_move:?} to container {target_container:?} at pos \ + {target_position_in_container}" + ); + + // TODO(ab): the `rev()` is better preserve ordering when moving a group of items. There + // remains some ordering (and possibly insertion point error) edge cases when dragging + // multiple item within the same container. This should be addressed by egui_tiles: + // https://github.com/rerun-io/egui_tiles/issues/90 + for contents in contents_to_move.iter().rev() { + let contents_tile_id = contents.as_tile_id(); + let target_container_tile_id = blueprint_id_to_tile_id(&target_container); + + bp.tree.move_tile_to_container( + contents_tile_id, + target_container_tile_id, + target_position_in_container, + true, + ); + } + } + + ViewportCommand::MoveContentsToNewContainer { + contents_to_move, + new_container_kind, + target_container, + target_position_in_container, + } => { + let new_container_tile_id = bp + .tree + .tiles + .insert_container(egui_tiles::Container::new(new_container_kind, vec![])); + + let target_container_tile_id = blueprint_id_to_tile_id(&target_container); + bp.tree.move_tile_to_container( + new_container_tile_id, + target_container_tile_id, + target_position_in_container, + true, // reflow grid if needed + ); + + for (pos, content) in contents_to_move.into_iter().enumerate() { + bp.tree.move_tile_to_container( + content.as_tile_id(), + new_container_tile_id, + pos, + true, // reflow grid if needed + ); + } + } + } } fn build_tree_from_views_and_containers<'a>( From 5a27933085d7a77e86eb7e881cb31391610fba37 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 17 Jan 2025 08:37:30 +0100 Subject: [PATCH 09/11] remove leftover printlns --- crates/store/re_log_encoding/src/decoder/mod.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/crates/store/re_log_encoding/src/decoder/mod.rs b/crates/store/re_log_encoding/src/decoder/mod.rs index c170d2d756a6..80a076bab187 100644 --- a/crates/store/re_log_encoding/src/decoder/mod.rs +++ b/crates/store/re_log_encoding/src/decoder/mod.rs @@ -179,7 +179,6 @@ impl Decoder { /// See also: /// * [`Decoder::new_concatenated`] pub fn new(version_policy: VersionPolicy, mut read: R) -> Result { - println!("DECODER NEW"); re_tracing::profile_function!(); let mut data = [0_u8; FileHeader::SIZE]; @@ -217,7 +216,6 @@ impl Decoder { version_policy: VersionPolicy, mut read: std::io::BufReader, ) -> Result { - println!("STUPID LOG WORKS"); re_tracing::profile_function!(); let mut data = [0_u8; FileHeader::SIZE]; @@ -279,7 +277,6 @@ impl Iterator for Decoder { re_tracing::profile_function!(); if self.peek_file_header() { - println!("CONCAT"); // We've found another file header in the middle of the stream, it's time to switch // gears and start over on this new file. @@ -386,7 +383,6 @@ impl Iterator for Decoder { re_log::debug!( "Reached end of stream, but it seems we have a concatenated file, continuing" ); - println!("CONCAT STREAM"); return self.next(); } @@ -533,8 +529,6 @@ mod tests { ]; for options in options { - println!("{options:?}"); - let mut data = vec![]; // write "2 files" i.e. 2 streams that end with end-of-stream marker From c341c42069697691dd797df7d6e04f14c0c8cde3 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Fri, 17 Jan 2025 11:05:17 +0100 Subject: [PATCH 10/11] breathing room pls --- crates/store/re_log_encoding/src/decoder/streaming.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/store/re_log_encoding/src/decoder/streaming.rs b/crates/store/re_log_encoding/src/decoder/streaming.rs index b4bea2df0c7f..873e102e00b3 100644 --- a/crates/store/re_log_encoding/src/decoder/streaming.rs +++ b/crates/store/re_log_encoding/src/decoder/streaming.rs @@ -91,13 +91,16 @@ impl Stream for StreamingDecoder { return std::task::Poll::Ready(None); } } + std::task::Poll::Ready(Ok(buf)) => { unprocessed_bytes.extend_from_slice(buf); buf_length = buf.len(); } + std::task::Poll::Ready(Err(err)) => { return std::task::Poll::Ready(Some(Err(DecodeError::Read(err)))); } + std::task::Poll::Pending => return std::task::Poll::Pending, }; @@ -197,9 +200,11 @@ impl Stream for StreamingDecoder { } } } + super::MessageHeader::EndOfStream => return std::task::Poll::Ready(None), } } + crate::Serializer::Protobuf => { let header_size = std::mem::size_of::(); if unprocessed_bytes.len() < header_size { From bfc44336fabd5926a5128a8cf16285b17bbe2928 Mon Sep 17 00:00:00 2001 From: Zeljko Mihaljcic <7150613+zehiko@users.noreply.github.com> Date: Fri, 17 Jan 2025 15:45:38 +0100 Subject: [PATCH 11/11] doc comment update Co-authored-by: Clement Rey --- crates/store/re_log_encoding/src/decoder/streaming.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/store/re_log_encoding/src/decoder/streaming.rs b/crates/store/re_log_encoding/src/decoder/streaming.rs index 873e102e00b3..aca4093d2bac 100644 --- a/crates/store/re_log_encoding/src/decoder/streaming.rs +++ b/crates/store/re_log_encoding/src/decoder/streaming.rs @@ -48,6 +48,7 @@ impl StreamingDecoder { }) } + /// Returns true if `data` can be successfully decoded into a `FileHeader`. fn peek_file_header(data: &[u8]) -> bool { let mut read = std::io::Cursor::new(data); FileHeader::decode(&mut read).is_ok()