Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for asynchronous decoding of rrd stream #8705

Merged
merged 16 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1098,9 +1098,9 @@ checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"

[[package]]
name = "bytes"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da"
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b"

[[package]]
name = "cacache"
Expand Down Expand Up @@ -3900,7 +3900,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.6",
]

[[package]]
Expand Down Expand Up @@ -5218,7 +5218,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15"
dependencies = [
"bytes",
"heck 0.4.1",
"heck 0.5.0",
"itertools 0.13.0",
"log",
"multimap",
Expand Down Expand Up @@ -6092,6 +6092,7 @@ name = "re_log_encoding"
version = "0.22.0-alpha.1+dev"
dependencies = [
"arrow",
"bytes",
"criterion",
"ehttp",
"js-sys",
Expand All @@ -6111,6 +6112,8 @@ dependencies = [
"serde_test",
"similar-asserts",
"thiserror 1.0.65",
"tokio",
"tokio-stream",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ bit-vec = "0.8"
bitflags = { version = "2.4", features = ["bytemuck"] }
blackbox = "0.2.0"
bytemuck = { version = "1.18", features = ["extern_crate_alloc"] }
bytes = "1.0"
camino = "1.1"
cargo_metadata = "0.18"
cargo-run-wasm = "0.3.2"
Expand Down
12 changes: 11 additions & 1 deletion crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ all-features = true
default = []

## Enable loading data from an .rrd file.
decoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
decoder = [
"re_log_types/serde",
"dep:bytes",
"dep:lz4_flex",
"dep:rmp-serde",
"dep:tokio",
"dep:tokio-stream",
]

## Enable encoding of log messages to an .rrd file/stream.
encoder = ["dep:rmp-serde", "dep:lz4_flex", "re_log_types/serde"]
Expand Down Expand Up @@ -57,9 +64,12 @@ parking_lot.workspace = true
thiserror.workspace = true

# Optional external dependencies:
bytes = { workspace = true, optional = true }
ehttp = { workspace = true, optional = true, features = ["streaming"] }
lz4_flex = { workspace = true, optional = true }
rmp-serde = { workspace = true, optional = true }
tokio = { workspace = true, optional = true, features = ["io-util"] }
tokio-stream = { workspace = true, optional = true }
web-time = { workspace = true, optional = true }

# Web dependencies:
Expand Down
22 changes: 14 additions & 8 deletions crates/store/re_log_encoding/src/codec/file/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,29 @@ use re_log_types::LogMsg;
use re_protos::missing_field;

pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMsg>), DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let mut read_bytes = 0u64;
let header = MessageHeader::decode(data)?;
read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len;

let mut buf = vec![0; header.len as usize];
data.read_exact(&mut buf[..])?;

let msg = match header.kind {
let msg = decode_bytes(header.kind, &buf)?;

Ok((read_bytes, msg))
}

pub fn decode_bytes(message_kind: MessageKind, buf: &[u8]) -> Result<Option<LogMsg>, DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let msg = match message_kind {
MessageKind::SetStoreInfo => {
let set_store_info = SetStoreInfo::decode(&buf[..])?;
let set_store_info = SetStoreInfo::decode(buf)?;
Some(LogMsg::SetStoreInfo(set_store_info.try_into()?))
}
MessageKind::ArrowMsg => {
let arrow_msg = ArrowMsg::decode(&buf[..])?;
let arrow_msg = ArrowMsg::decode(buf)?;
if arrow_msg.encoding() != Encoding::ArrowIpc {
return Err(DecodeError::Codec(CodecError::UnsupportedEncoding));
}
Expand All @@ -43,13 +49,13 @@ pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<(u64, Option<LogMs
Some(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}
MessageKind::BlueprintActivationCommand => {
let blueprint_activation_command = BlueprintActivationCommand::decode(&buf[..])?;
let blueprint_activation_command = BlueprintActivationCommand::decode(buf)?;
Some(LogMsg::BlueprintActivationCommand(
blueprint_activation_command.try_into()?,
))
}
MessageKind::End => None,
};

Ok((read_bytes, msg))
Ok(msg)
}
15 changes: 15 additions & 0 deletions crates/store/re_log_encoding/src/codec/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ impl MessageHeader {
let mut buf = [0; std::mem::size_of::<Self>()];
data.read_exact(&mut buf)?;

Self::from_bytes(&buf)
}

/// Decode a message header from a byte buffer. Input buffer must be exactly 16 bytes long.
/// TODO(zehiko) this should be public, we need to shuffle things around to ensure that #8726
#[cfg(feature = "decoder")]
pub fn from_bytes(buf: &[u8]) -> Result<Self, crate::decoder::DecodeError> {
zehiko marked this conversation as resolved.
Show resolved Hide resolved
if buf.len() != 16 {
return Err(crate::decoder::DecodeError::Codec(
crate::codec::CodecError::HeaderDecoding(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid header length",
)),
));
}
#[allow(clippy::unwrap_used)] // cannot fail
let kind = u64::from_le_bytes(buf[0..8].try_into().unwrap());
let kind = match kind {
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_log_encoding/src/decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Decoding [`LogMsg`]:es from `.rrd` files/streams.

pub mod stream;
#[cfg(feature = "decoder")]
pub mod streaming;

use std::io::BufRead as _;
use std::io::Read;
Expand Down Expand Up @@ -412,14 +414,14 @@ mod tests {
};

// TODO(#3741): remove this once we are all in on arrow-rs
fn strip_arrow_extensions_from_log_messages(log_msg: Vec<LogMsg>) -> Vec<LogMsg> {
pub fn strip_arrow_extensions_from_log_messages(log_msg: Vec<LogMsg>) -> Vec<LogMsg> {
log_msg
.into_iter()
.map(LogMsg::strip_arrow_extension_types)
.collect()
}

fn fake_log_messages() -> Vec<LogMsg> {
pub fn fake_log_messages() -> Vec<LogMsg> {
let store_id = StoreId::random(StoreKind::Blueprint);

let arrow_msg = re_chunk::Chunk::builder("test_entity".into())
Expand Down Expand Up @@ -527,8 +529,6 @@ mod tests {
];

for options in options {
println!("{options:?}");

let mut data = vec![];

// write "2 files" i.e. 2 streams that end with end-of-stream marker
Expand Down
Loading
Loading