-
Notifications
You must be signed in to change notification settings - Fork 384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for asynchronous decoding of rrd stream #8705
base: main
Are you sure you want to change the base?
Conversation
Web viewer built successfully. If applicable, you should also test it:
Note: This comment is updated whenever you push a commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually kinda love it.
I'm sure we could find myriad of ways to express this using higher level constructs instead, e.g. return a impl Stream
from a freestanding func that uses using async/await expressions rather than a low-level, manual implementation of Stream
with task:Poll
but... I really appreciate the lack of magic here. It's very readable, very straightforward and puts the numerous edge cases in your face.
I also like that this enforces AsyncBufRead
rather than expect an AsyncRead
and wrap it in a BufReader
unconditionally, that way we're sure the caller does the right thing and we won't have multiple layers of buffering going unnoticed.
Sure, it's a pretty big method, but it reads like a book. I'll take that any day vs. a whack-a-mole of async insanity spread over 10 files.
10/10 would pin again
decoder = [ | ||
"dep:rmp-serde", | ||
"dep:lz4_flex", | ||
"re_log_types/serde", | ||
"dep:tokio", | ||
"dep:tokio-stream", | ||
"dep:bytes", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please sort this 💀
decoder = [ | |
"dep:rmp-serde", | |
"dep:lz4_flex", | |
"re_log_types/serde", | |
"dep:tokio", | |
"dep:tokio-stream", | |
"dep:bytes", | |
] | |
decoder = [ | |
"re_log_types/serde", | |
"dep:bytes", | |
"dep:lz4_flex", | |
"dep:rmp-serde", | |
"dep:tokio", | |
"dep:tokio-stream", | |
] |
} | ||
|
||
#[cfg(feature = "decoder")] | ||
pub fn from_bytes(data: &[u8]) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the fact that this is a public method that takes a slice of bytes that must respect a bunch of undocumented invariants (e.g. this will take down the entire app if data.len() < 4
).
No matter what, this needs to document these invariants in its docstring; and then it needs to either return a result if these invariants are violated, or be marked unsafe. I prefer the latter (yes, I know this is abusing the official definition of unsafety, it's fine).
} | ||
|
||
#[cfg(feature = "decoder")] | ||
pub fn from_bytes(buf: &[u8]) -> Result<Self, crate::decoder::DecodeError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the fact that this is a public method that takes a slice of bytes that must respect a bunch of undocumented invariants (e.g. this will take down the entire app if data.len() < 16
).
These invariants should be A) documented in the docstring and B) checked for on entry so nice error can be returned if they are violated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Also, thinking about this, I don't think this should be public, Decoder
(and the new StreamingDecoder
) should be part of file
module (which should probably be named rrd
. if no one objects, I will do that as a follow up to not make this one to big.
Will document and add the checks regardless.
} | ||
} | ||
crate::Serializer::Protobuf => { | ||
let header_size = std::mem::size_of::<file::MessageHeader>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't followed the recent developments on the protobuf file format, and this is not new to this PR, but seeing a mem_size_of
on a struct type that uses the Rust ABI makes me both very uneasy and 99% there's a nasty bug lurking somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it's 2x u64 fields, which is not problematic unless changed. Until this point, the encoding/decoding was fully local to the same file where it's defined, and the compiler would yell at you if you add/remove fields or change their types, pointing out all the places that need to be changed.
I agree that it feels wrong to use this outside of the file, as it's much easier to miss a major change, such as making the header's encoded size not match its decoded size.
Co-authored-by: Clement Rey <[email protected]>
What
Add a streaming decoder for
LogMsg
s.I've commonalized some of the logic between
StreamingDecoder
and theDecoder
, but there is still some repetition that comes from the fact we're working with astd::io::Read
in one case and the buffer of bytes in the other.Bigger annoyance is some state keeping in the
StreamingDecoder
that comes from the fact that input file can be corrupted or it can contain multiple concatenated files. Open to suggestions for a simpler approach.Testing done