Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for asynchronous decoding of rrd stream #8705

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

zehiko
Copy link
Contributor

@zehiko zehiko commented Jan 16, 2025

What

Add a streaming decoder for LogMsgs.

I've commonalized some of the logic between StreamingDecoder and the Decoder, but there is still some repetition that comes from the fact we're working with a std::io::Read in one case and the buffer of bytes in the other.
Bigger annoyance is some state keeping in the StreamingDecoder that comes from the fact that input file can be corrupted or it can contain multiple concatenated files. Open to suggestions for a simpler approach.

Testing done

  • unit tests
  • integration with ReDap and using it for fetching rrd files from the object store

@zehiko zehiko added the exclude from changelog PRs with this won't show up in CHANGELOG.md label Jan 16, 2025
@zehiko zehiko requested a review from teh-cmc January 16, 2025 09:21
@zehiko zehiko self-assigned this Jan 16, 2025
Copy link

github-actions bot commented Jan 16, 2025

Web viewer built successfully. If applicable, you should also test it:

  • I have tested the web viewer
Result Commit Link Manifest
bfc4433 https://rerun.io/viewer/pr/8705 +nightly +main

Note: This comment is updated whenever you push a commit.

@zehiko zehiko marked this pull request as draft January 16, 2025 14:57
@zehiko zehiko marked this pull request as ready for review January 16, 2025 20:38
@zehiko zehiko changed the base branch from main to zehiko/fix-catalog January 17, 2025 07:04
@teh-cmc teh-cmc requested review from teh-cmc and removed request for teh-cmc January 17, 2025 08:33
Copy link
Member

@teh-cmc teh-cmc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually kinda love it.

I'm sure we could find myriad of ways to express this using higher level constructs instead, e.g. return a impl Stream from a freestanding func that uses using async/await expressions rather than a low-level, manual implementation of Stream with task:Poll but... I really appreciate the lack of magic here. It's very readable, very straightforward and puts the numerous edge cases in your face.

I also like that this enforces AsyncBufRead rather than expect an AsyncRead and wrap it in a BufReader unconditionally, that way we're sure the caller does the right thing and we won't have multiple layers of buffering going unnoticed.

Sure, it's a pretty big method, but it reads like a book. I'll take that any day vs. a whack-a-mole of async insanity spread over 10 files.

10/10 would pin again

Comment on lines +26 to +33
decoder = [
"dep:rmp-serde",
"dep:lz4_flex",
"re_log_types/serde",
"dep:tokio",
"dep:tokio-stream",
"dep:bytes",
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please sort this 💀

Suggested change
decoder = [
"dep:rmp-serde",
"dep:lz4_flex",
"re_log_types/serde",
"dep:tokio",
"dep:tokio-stream",
"dep:bytes",
]
decoder = [
"re_log_types/serde",
"dep:bytes",
"dep:lz4_flex",
"dep:rmp-serde",
"dep:tokio",
"dep:tokio-stream",
]

}

#[cfg(feature = "decoder")]
pub fn from_bytes(data: &[u8]) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the fact that this is a public method that takes a slice of bytes that must respect a bunch of undocumented invariants (e.g. this will take down the entire app if data.len() < 4).

No matter what, this needs to document these invariants in its docstring; and then it needs to either return a result if these invariants are violated, or be marked unsafe. I prefer the latter (yes, I know this is abusing the official definition of unsafety, it's fine).

}

#[cfg(feature = "decoder")]
pub fn from_bytes(buf: &[u8]) -> Result<Self, crate::decoder::DecodeError> {
Copy link
Member

@teh-cmc teh-cmc Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the fact that this is a public method that takes a slice of bytes that must respect a bunch of undocumented invariants (e.g. this will take down the entire app if data.len() < 16).

These invariants should be A) documented in the docstring and B) checked for on entry so nice error can be returned if they are violated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Also, thinking about this, I don't think this should be public, Decoder (and the new StreamingDecoder) should be part of file module (which should probably be named rrd. if no one objects, I will do that as a follow up to not make this one to big.

Will document and add the checks regardless.

}
}
crate::Serializer::Protobuf => {
let header_size = std::mem::size_of::<file::MessageHeader>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't followed the recent developments on the protobuf file format, and this is not new to this PR, but seeing a mem_size_of on a struct type that uses the Rust ABI makes me both very uneasy and 99% there's a nasty bug lurking somewhere.

Copy link
Member

@jprochazk jprochazk Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it's 2x u64 fields, which is not problematic unless changed. Until this point, the encoding/decoding was fully local to the same file where it's defined, and the compiler would yell at you if you add/remove fields or change their types, pointing out all the places that need to be changed.
I agree that it feels wrong to use this outside of the file, as it's much easier to miss a major change, such as making the header's encoded size not match its decoded size.

Base automatically changed from zehiko/fix-catalog to main January 17, 2025 12:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
exclude from changelog PRs with this won't show up in CHANGELOG.md
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants