Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for asynchronous decoding of rrd stream #8705

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
11 changes: 7 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,9 @@ checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"

[[package]]
name = "bytes"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"

[[package]]
name = "cacache"
Expand Down Expand Up @@ -3900,7 +3900,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]

[[package]]
Expand Down Expand Up @@ -5218,7 +5218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.4.1",
"heck 0.5.0",
"itertools 0.13.0",
"log",
"multimap",
Expand Down Expand Up @@ -6095,6 +6095,7 @@ name = "re_log_encoding"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"bytes",
"criterion",
"ehttp",
"js-sys",
Expand All @@ -6114,6 +6115,8 @@ dependencies = [
"serde_test",
"similar-asserts",
"thiserror 1.0.65",
"tokio",
"tokio-stream",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ bit-vec = "0.8"
bitflags = { version = "2.4", features = ["bytemuck"] }
blackbox = "0.2.0"
bytemuck = { version = "1.18", features = ["extern_crate_alloc"] }
bytes = "1.0"
camino = "1.1"
cargo_metadata = "0.18"
cargo-run-wasm = "0.3.2"
Expand Down
25 changes: 16 additions & 9 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use re_protos::{
common::v0::RecordingId,
remote_store::v0::{
storage_node_client::StorageNodeClient, CatalogFilter, FetchRecordingRequest,
QueryCatalogRequest,
QueryCatalogRequest, CATALOG_APP_ID_FIELD_NAME, CATALOG_ID_FIELD_NAME,
CATALOG_START_TIME_FIELD_NAME,
},
};
use re_types::{
Expand Down Expand Up @@ -283,27 +284,33 @@ pub fn store_info_from_catalog_chunk(

let (_field, data) = tc
.components()
.find(|(f, _)| f.name() == "application_id")
.find(|(f, _)| f.name() == CATALOG_APP_ID_FIELD_NAME)
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: "no application_id field found".to_owned(),
reason: "no {CATALOG_APP_ID_FIELD_NAME} field found".to_owned(),
}))?;
let app_id = data
.downcast_array_ref::<arrow::array::StringArray>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("application_id must be a utf8 array: {:?}", tc.schema_ref()),
reason: format!(
"{CATALOG_APP_ID_FIELD_NAME} must be a utf8 array: {:?}",
tc.schema_ref()
),
}))?
.value(0);

let (_field, data) = tc
.components()
.find(|(f, _)| f.name() == "start_time")
.find(|(f, _)| f.name() == CATALOG_START_TIME_FIELD_NAME)
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: "no start_time field found".to_owned(),
reason: "no {CATALOG_START_TIME_FIELD_NAME}} field found".to_owned(),
}))?;
let start_time = data
.downcast_array_ref::<arrow::array::Int64Array>()
.downcast_array_ref::<arrow::array::TimestampNanosecondArray>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("start_time must be an int64 array: {:?}", tc.schema_ref()),
reason: format!(
"{CATALOG_START_TIME_FIELD_NAME} must be a Timestamp array: {:?}",
tc.schema_ref()
),
}))?
.value(0);

Expand Down Expand Up @@ -485,7 +492,7 @@ async fn stream_catalog_async(
)))?;

let recording_uri_arrays: Vec<ArrowArrayRef> = chunk
.iter_slices::<String>("id".into())
.iter_slices::<String>(CATALOG_ID_FIELD_NAME.into())
.map(|id| {
let rec_id = &id[0]; // each component batch is of length 1 i.e. single 'id' value

Expand Down
12 changes: 11 additions & 1 deletion crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ all-features = true
default = []

## Enable loading data from an .rrd file.
decoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
decoder = [
"dep:rmp-serde",
"dep:lz4_flex",
"re_log_types/serde",
"dep:tokio",
"dep:tokio-stream",
"dep:bytes",
]
Comment on lines +26 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please sort this 💀

Suggested change
decoder = [
"dep:rmp-serde",
"dep:lz4_flex",
"re_log_types/serde",
"dep:tokio",
"dep:tokio-stream",
"dep:bytes",
]
decoder = [
"re_log_types/serde",
"dep:bytes",
"dep:lz4_flex",
"dep:rmp-serde",
"dep:tokio",
"dep:tokio-stream",
]


## Enable encoding of log messages to an .rrd file/stream.
encoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
Expand Down Expand Up @@ -57,9 +64,12 @@ parking_lot.workspace = true
thiserror.workspace = true

# Optional external dependencies:
bytes = { workspace = true, optional = true }
ehttp = { workspace = true, optional = true, features = ["streaming"] }
lz4_flex = { workspace = true, optional = true }
rmp-serde = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["io-util"] }
tokio-stream = { workspace = true, optional = true }
web-time = { workspace = true, optional = true }

# Web dependencies:
Expand Down
22 changes: 14 additions & 8 deletions crates/store/re_log_encoding/src/codec/file/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@ use re_log_types::LogMsg;
use re_protos::missing_field;

pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMsg>), DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let mut read_bytes = 0u64;
let header = MessageHeader::decode(data)?;
read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len;

let mut buf = vec![0; header.len as usize];
data.read_exact(&mut buf[..])?;

let msg = match header.kind {
let msg = decode_bytes(header.kind, &buf)?;

Ok((read_bytes, msg))
}

pub fn decode_bytes(message_kind: MessageKind, buf: &[u8]) -> Result<Option<LogMsg>, DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let msg = match message_kind {
MessageKind::SetStoreInfo => {
let set_store_info = SetStoreInfo::decode(&buf[..])?;
let set_store_info = SetStoreInfo::decode(buf)?;
Some(LogMsg::SetStoreInfo(set_store_info.try_into()?))
}
MessageKind::ArrowMsg => {
let arrow_msg = ArrowMsg::decode(&buf[..])?;
let arrow_msg = ArrowMsg::decode(buf)?;
if arrow_msg.encoding() != Encoding::ArrowIpc {
return Err(DecodeError::Codec(CodecError::UnsupportedEncoding));
}
Expand All @@ -43,13 +49,13 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMs
Some(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}
MessageKind::BlueprintActivationCommand => {
let blueprint_activation_command = BlueprintActivationCommand::decode(&buf[..])?;
let blueprint_activation_command = BlueprintActivationCommand::decode(buf)?;
Some(LogMsg::BlueprintActivationCommand(
blueprint_activation_command.try_into()?,
))
}
MessageKind::End => None,
};

Ok((read_bytes, msg))
Ok(msg)
}
5 changes: 5 additions & 0 deletions crates/store/re_log_encoding/src/codec/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ impl MessageHeader {
let mut buf = [0; std::mem::size_of::<Self>()];
data.read_exact(&mut buf)?;

Self::from_bytes(&buf)
}

#[cfg(feature = "decoder")]
pub fn from_bytes(buf: &[u8]) -> Result<Self, crate::decoder::DecodeError> {
Copy link
Member

@teh-cmc teh-cmc Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the fact that this is a public method that takes a slice of bytes that must respect a bunch of undocumented invariants (e.g. this will take down the entire app if data.len() < 16).

These invariants should be A) documented in the docstring and B) checked for on entry so nice error can be returned if they are violated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Also, thinking about this, I don't think this should be public, Decoder (and the new StreamingDecoder) should be part of file module (which should probably be named rrd. if no one objects, I will do that as a follow up to not make this one to big.

Will document and add the checks regardless.

#[allow(clippy::unwrap_used)] // cannot fail
let kind = u64::from_le_bytes(buf[0..8].try_into().unwrap());
let kind = match kind {
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_log_encoding/src/decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Decoding [`LogMsg`]:es from `.rrd` files/streams.

pub mod stream;
#[cfg(feature = "decoder")]
pub mod streaming;

use std::io::BufRead as _;
use std::io::Read;
Expand Down Expand Up @@ -412,14 +414,14 @@ mod tests {
};

// TODO(#3741): remove this once we are all in on arrow-rs
fn strip_arrow_extensions_from_log_messages(log_msg: Vec<LogMsg>) -> Vec<LogMsg> {
pub fn strip_arrow_extensions_from_log_messages(log_msg: Vec<LogMsg>) -> Vec<LogMsg> {
log_msg
.into_iter()
.map(LogMsg::strip_arrow_extension_types)
.collect()
}

fn fake_log_messages() -> Vec<LogMsg> {
pub fn fake_log_messages() -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Blueprint);

let arrow_msg = re_chunk::Chunk::builder("test_entity".into())
Expand Down Expand Up @@ -527,8 +529,6 @@ mod tests {
];

for options in options {
println!("{options:?}");

let mut data = vec![];

// write "2 files" i.e. 2 streams that end with end-of-stream marker
Expand Down
Loading
Loading