Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encode LogMsg using protobuf #8347

Merged
merged 53 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
bdb22ed
fix typo
jprochazk Nov 13, 2024
8b51ba8
temp
jprochazk Nov 26, 2024
9b4a9a3
wip
jprochazk Dec 6, 2024
317e9c0
Merge branch 'main' into jan/recording-protobuf
jprochazk Dec 6, 2024
8209db4
fix after merge
jprochazk Dec 6, 2024
34dcd4c
remove unused dep
jprochazk Dec 6, 2024
ba4cfbb
exclude `re_grpc_client/address` links
jprochazk Dec 6, 2024
691fb46
cargo fmt
jprochazk Dec 6, 2024
a2e58fb
fix lints
jprochazk Dec 6, 2024
f22cb03
rm dead comment
jprochazk Dec 6, 2024
b76cf9c
add todo
jprochazk Dec 6, 2024
f91737b
gate behind feature
jprochazk Dec 6, 2024
7a720f8
fix check
jprochazk Dec 6, 2024
a2d7ea4
fix more lints
jprochazk Dec 6, 2024
4c18bda
Merge branch 'main' into jan/recording-protobuf
jprochazk Dec 10, 2024
0d3ec40
update lockfile
jprochazk Dec 10, 2024
3d405ff
Merge branch 'main' into jan/recording-protobuf
jprochazk Dec 10, 2024
5d9051b
rename `OUTPUT_V0_RUST`
jprochazk Dec 10, 2024
8497713
remove comments
jprochazk Dec 10, 2024
187bcb2
rm dead code
jprochazk Dec 10, 2024
8b8be71
docs
jprochazk Dec 10, 2024
76c973c
make `PythonVersion` less bad
jprochazk Dec 10, 2024
c367fc3
update module structure
jprochazk Dec 11, 2024
44f7017
typo
jprochazk Dec 11, 2024
ce14bf7
undo header bump
jprochazk Dec 11, 2024
ad4fc56
uncap max decode size
jprochazk Dec 11, 2024
4054285
Merge branch 'main' into jan/recording-protobuf
jprochazk Dec 11, 2024
4c2b644
fix lints
jprochazk Dec 11, 2024
a615187
add max decoding message size issue link
jprochazk Dec 11, 2024
77f4142
add todo for arrow ipc compression
jprochazk Dec 11, 2024
d79f317
rename EncodingOptions constants
jprochazk Dec 11, 2024
0b56c95
more thorough testing
jprochazk Dec 11, 2024
b1d8ffa
add conversion unit tests
jprochazk Dec 11, 2024
3c9a682
fix compile error
jprochazk Dec 11, 2024
21258a0
remove temp
jprochazk Dec 11, 2024
04efcf5
Merge branch 'main' into jan/recording-protobuf
jprochazk Dec 11, 2024
d1dc3f8
use types instead of strings
jprochazk Dec 11, 2024
90360cf
move `MessageKind`/`MessageHeader` impls back into declaration site
jprochazk Dec 11, 2024
c880b6a
64-bit length
jprochazk Dec 11, 2024
5ce1531
fix
jprochazk Dec 11, 2024
abbd4c9
less hardcoding, more type-safety
jprochazk Dec 11, 2024
764d3d6
type aliases
jprochazk Dec 11, 2024
922ef0e
remove dead comment
jprochazk Dec 11, 2024
a51ffbd
remove spaces
jprochazk Dec 11, 2024
d3183f0
add full bench
jprochazk Dec 11, 2024
fed700f
fix warning
jprochazk Dec 11, 2024
9683405
remove unused dep
jprochazk Dec 11, 2024
ec1c6b6
fix
jprochazk Dec 11, 2024
297ab19
add test for python version parsing
jprochazk Dec 12, 2024
1d7aecb
mark `src/v0` as generated
jprochazk Dec 12, 2024
df46547
fix pattern
jprochazk Dec 12, 2024
0605295
Merge branch 'main' into jan/recording-protobuf
jprochazk Dec 12, 2024
cd6ab2e
fix lint
jprochazk Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5826,6 +5826,7 @@ dependencies = [
"re_chunk",
"re_chunk_store",
"re_log",
"re_log_encoding",
"re_log_types",
"re_query",
"re_tracing",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! This binary runs the remote store gRPC service codegen manually.
//!
//! It is easiest to call this using `pixi run codegen-protos`,
//! It is easiest to call this using `pixi run codegen-protos `,
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
//! which will set up the necessary tools.

#![allow(clippy::unwrap_used)]

use camino::Utf8Path;

const PROTOBUF_DEFINITIONS_DIR_PATH: &str = "crates/store/re_protos/proto";
const PROTOBUF_REMOTE_STORE_V0_RELATIVE_PATH: &str = "rerun/v0/remote_store.proto";
const RUST_V0_OUTPUT_DIR_PATH: &str = "crates/store/re_protos/src/v0";
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
const PROTOS_DIR: &str = "crates/store/re_protos/proto";
const INPUT_V0: &[&str] = &["rerun/v0/remote_store.proto", "rerun/v0/log_msg.proto"];
const OUTPUT_V0_RUST_DIR: &str = "crates/store/re_protos/src/v0";

fn main() {
re_log::setup_logging();
Expand All @@ -26,8 +26,8 @@ fn main() {
"failed to find workspace root"
);

let definitions_dir_path = workspace_dir.join(PROTOBUF_DEFINITIONS_DIR_PATH);
let rust_generated_output_dir_path = workspace_dir.join(RUST_V0_OUTPUT_DIR_PATH);
let definitions_dir_path = workspace_dir.join(PROTOS_DIR);
let rust_generated_output_dir_path = workspace_dir.join(OUTPUT_V0_RUST_DIR);

re_log::info!(
definitions=?definitions_dir_path,
Expand All @@ -37,7 +37,7 @@ fn main() {

re_protos_builder::generate_rust_code(
definitions_dir_path,
&[PROTOBUF_REMOTE_STORE_V0_RELATIVE_PATH],
INPUT_V0,
rust_generated_output_dir_path,
);
}
4 changes: 1 addition & 3 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ pub use re_chunk::{
Chunk, ChunkId, ChunkShared, LatestAtQuery, RangeQuery, RangeQueryOptions, RowId,
UnitChunkShared,
};
#[doc(no_inline)]
pub use re_log_encoding::decoder::VersionPolicy;
jprochazk marked this conversation as resolved.
Show resolved Hide resolved

#[doc(no_inline)]
pub use re_log_types::{ResolvedTimeRange, TimeInt, TimeType, Timeline};

pub mod external {
pub use arrow2;

pub use re_chunk;
pub use re_log_encoding;
}

// ---
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ impl ChunkStore {
pub fn from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: crate::VersionPolicy,
version_policy: re_log_encoding::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, Self>> {
let path_to_rrd = path_to_rrd.as_ref();

Expand Down Expand Up @@ -808,7 +808,7 @@ impl ChunkStore {
pub fn handle_from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: crate::VersionPolicy,
version_policy: re_log_encoding::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, ChunkStoreHandle>> {
Ok(
Self::from_rrd_filepath(store_config, path_to_rrd, version_policy)?
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl crate::DataLoader for ExternalLoader {
// streaming data to stdout.
let is_sending_data = Arc::new(AtomicBool::new(false));

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
Expand Down
8 changes: 4 additions & 4 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl crate::DataLoader for RrdLoader {
"Loading rrd data from filesystem…",
);

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;

match extension.as_str() {
"rbl" => {
Expand Down Expand Up @@ -118,7 +118,7 @@ impl crate::DataLoader for RrdLoader {
return Err(crate::DataLoaderError::Incompatible(filepath));
}

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;
let contents = std::io::Cursor::new(contents);
let decoder = match re_log_encoding::decoder::Decoder::new(version_policy, contents) {
Ok(decoder) => decoder,
Expand Down Expand Up @@ -308,7 +308,7 @@ impl RetryableFileReader {
mod tests {
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_encoding::{decoder, encoder::DroppableEncoder};
use re_log_encoding::{encoder::DroppableEncoder, VersionPolicy};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
};
Expand Down Expand Up @@ -372,7 +372,7 @@ mod tests {
encoder.flush_blocking().expect("failed to flush messages");

let reader = RetryableFileReader::new(&rrd_file_path).unwrap();
let mut decoder = Decoder::new(decoder::VersionPolicy::Warn, reader).unwrap();
let mut decoder = Decoder::new(VersionPolicy::Warn, reader).unwrap();

// we should be able to read 5 messages that we wrote
let decoded_messages = (0..5)
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_source/src/load_stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use re_smart_channel::Sender;
/// This fails synchronously iff the standard input stream could not be opened, otherwise errors
/// are handled asynchronously (as in: they're logged).
pub fn load_stdin(tx: Sender<LogMsg>) -> anyhow::Result<()> {
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;

let stdin = std::io::BufReader::new(std::io::stdin());
let decoder = re_log_encoding::decoder::Decoder::new_concatenated(version_policy, stdin)?;
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ all-features = true
[features]
default = []


[dependencies]
# Rerun dependencies:
re_chunk.workspace = true
re_chunk_store.workspace = true
re_log.workspace = true
re_log_encoding.workspace = true
re_log_types.workspace = true
re_query.workspace = true
re_tracing.workspace = true
re_types_core.workspace = true

# External dependencies:
anyhow.workspace = true
arrow2.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_dataframe/examples/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use itertools::Itertools;

use re_dataframe::{
ChunkStoreConfig, EntityPathFilter, QueryEngine, QueryExpression, ResolvedTimeRange,
SparseFillStrategy, StoreKind, TimeInt, Timeline, VersionPolicy,
SparseFillStrategy, StoreKind, TimeInt, Timeline,
};
use re_log_encoding::VersionPolicy;

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
Expand Down
3 changes: 1 addition & 2 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::BTreeMap;
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ColumnDescriptor, QueryExpression,
VersionPolicy,
};
use re_log_types::{EntityPathFilter, StoreId};
use re_query::{QueryCache, QueryCacheHandle, StorageEngine, StorageEngineLike};
Expand Down Expand Up @@ -59,7 +58,7 @@ impl QueryEngine<StorageEngine> {
pub fn from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: VersionPolicy,
version_policy: re_log_encoding::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, Self>> {
Ok(
ChunkStore::handle_from_rrd_filepath(store_config, path_to_rrd, version_policy)?
Expand Down
3 changes: 1 addition & 2 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub use self::external::re_chunk::{util::concatenate_record_batches, TransportCh
#[doc(no_inline)]
pub use self::external::re_chunk_store::{
ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange,
IndexValue, QueryExpression, SparseFillStrategy, TimeColumnSelector, VersionPolicy,
ViewContentsSelector,
IndexValue, QueryExpression, SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
};
#[doc(no_inline)]
pub use self::external::re_log_types::{
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_entity_db/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ fn log_messages() {
}

fn decode_log_msg(mut bytes: &[u8]) -> LogMsg {
let version_policy = re_log_encoding::decoder::VersionPolicy::Error;
let version_policy = re_log_encoding::VersionPolicy::Error;
let mut messages = re_log_encoding::decoder::Decoder::new(version_policy, &mut bytes)
.unwrap()
.collect::<Result<Vec<LogMsg>, _>>()
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_entity_db/src/store_bundle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use itertools::Itertools as _;

use crate::EntityDb;
use re_log_encoding::decoder::VersionPolicy;
use re_log_encoding::VersionPolicy;
use re_log_types::{StoreId, StoreKind};

#[derive(thiserror::Error, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn stream_recording_async(
.connect()
.await?;

StorageNodeClient::new(tonic_client)
StorageNodeClient::new(tonic_client).max_decoding_message_size(usize::MAX)
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
};

re_log::debug!("Fetching {recording_id}…");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn encode_log_msgs(messages: &[LogMsg]) -> Vec<u8> {
}

fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
let version_policy = re_log_encoding::decoder::VersionPolicy::Error;
let version_policy = re_log_encoding::VersionPolicy::Error;
let messages = re_log_encoding::decoder::Decoder::new(version_policy, &mut bytes)
.unwrap()
.collect::<Result<Vec<LogMsg>, _>>()
Expand Down
52 changes: 52 additions & 0 deletions crates/store/re_log_encoding/src/codec/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use super::CodecError;

/// Helper function that serializes given arrow schema and record batch into bytes
/// using Arrow IPC format.
pub(crate) fn write_arrow_to_bytes<W: std::io::Write>(
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
writer: &mut W,
schema: &arrow2::datatypes::Schema,
data: &arrow2::chunk::Chunk<Box<dyn re_chunk::Arrow2Array>>,
) -> Result<(), CodecError> {
use arrow2::io::ipc;
jprochazk marked this conversation as resolved.
Show resolved Hide resolved

let options = ipc::write::WriteOptions { compression: None };
let mut sw = ipc::write::StreamWriter::new(writer, options);

sw.start(schema, None)
.map_err(CodecError::ArrowSerialization)?;
sw.write(data, None)
.map_err(CodecError::ArrowSerialization)?;
sw.finish().map_err(CodecError::ArrowSerialization)?;

Ok(())
}

/// Helper function that deserializes raw bytes into arrow schema and record batch
/// using Arrow IPC format.
pub(crate) fn read_arrow_from_bytes<R: std::io::Read>(
reader: &mut R,
) -> Result<
(
arrow2::datatypes::Schema,
arrow2::chunk::Chunk<Box<dyn re_chunk::Arrow2Array>>,
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
),
CodecError,
> {
use arrow2::io::ipc;

let metadata =
ipc::read::read_stream_metadata(reader).map_err(CodecError::ArrowSerialization)?;
let mut stream = ipc::read::StreamReader::new(reader, metadata, None);

let schema = stream.schema().clone();
// there should be at least one record batch in the stream
let stream_state = stream
.next()
.ok_or(CodecError::MissingRecordBatch)?
.map_err(CodecError::ArrowSerialization)?;

match stream_state {
ipc::read::StreamState::Waiting => Err(CodecError::UnexpectedStreamState),
ipc::read::StreamState::Some(chunk) => Ok((schema, chunk)),
}
}
107 changes: 107 additions & 0 deletions crates/store/re_log_encoding/src/codec/file/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use super::{MessageHeader, MessageKind};
use crate::codec::arrow::read_arrow_from_bytes;
use crate::decoder::DecodeError;
use crate::{codec::CodecError, Compression};
use re_log_types::LogMsg;
use re_protos::TypeConversionError;

impl MessageKind {
pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<Self, DecodeError> {
let mut buf = [0; 4];
data.read_exact(&mut buf)?;

match u32::from_le_bytes(buf) {
1 => Ok(Self::SetStoreInfo),
2 => Ok(Self::ArrowMsg),
3 => Ok(Self::BlueprintActivationCommand),
255 => Ok(Self::End),
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
_ => Err(DecodeError::Codec(CodecError::UnknownMessageHeader)),
}
}
}

impl MessageHeader {
pub(crate) fn decode(data: &mut impl std::io::Read) -> Result<Self, DecodeError> {
let kind = MessageKind::decode(data)?;
let mut buf = [0; 4];
data.read_exact(&mut buf)?;
let len = u32::from_le_bytes(buf);
jprochazk marked this conversation as resolved.
Show resolved Hide resolved

Ok(Self { kind, len })
}
}

pub(crate) fn decode(
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
data: &mut impl std::io::Read,
compression: Compression,
) -> Result<(u64, Option<LogMsg>), DecodeError> {
use re_protos::external::prost::Message;
use re_protos::log_msg::v0::{ArrowMsg, BlueprintActivationCommand, Encoding, SetStoreInfo};

let mut read_bytes = 0u64;
let header = MessageHeader::decode(data)?;
read_bytes += std::mem::size_of::<MessageHeader>() as u64 + header.len as u64;

let mut buf = vec![0; header.len as usize];
data.read_exact(&mut buf[..])?;

let msg = match header.kind {
MessageKind::SetStoreInfo => {
let set_store_info = SetStoreInfo::decode(&buf[..])?;
Some(LogMsg::SetStoreInfo(set_store_info.try_into()?))
}
MessageKind::ArrowMsg => {
let arrow_msg = ArrowMsg::decode(&buf[..])?;
if arrow_msg.encoding() != Encoding::ArrowIpc {
return Err(DecodeError::Codec(CodecError::UnsupportedEncoding));
}

let (schema, chunk) = decode_arrow(&arrow_msg.payload, compression)?;

let store_id: re_log_types::StoreId = arrow_msg
.store_id
.ok_or_else(|| {
TypeConversionError::missing_field("rerun.log_msg.v0.ArrowMsg", "store_id")
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
})?
.into();

let chunk = re_chunk::Chunk::from_transport(&re_chunk::TransportChunk {
schema,
data: chunk,
})?;

Some(LogMsg::ArrowMsg(store_id, chunk.to_arrow_msg()?))
}
MessageKind::BlueprintActivationCommand => {
let blueprint_activation_command = BlueprintActivationCommand::decode(&buf[..])?;
Some(LogMsg::BlueprintActivationCommand(
blueprint_activation_command.try_into()?,
))
}
MessageKind::End => None,
};

Ok((read_bytes, msg))
}

fn decode_arrow(
data: &[u8],
compression: crate::Compression,
) -> Result<
(
arrow2::datatypes::Schema,
arrow2::chunk::Chunk<Box<dyn re_chunk::Arrow2Array>>,
),
DecodeError,
> {
let mut uncompressed = Vec::new();
let data = match compression {
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
crate::Compression::Off => data,
crate::Compression::LZ4 => {
lz4_flex::block::decompress_into(data, &mut uncompressed)?;
uncompressed.as_slice()
}
};

Ok(read_arrow_from_bytes(&mut &data[..])?)
}
Loading
Loading