Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(source): parse message with empty key and payload #15678
fix(source): parse message with empty key and payload #15678
Changes from 10 commits
36bc335
6f96d7b
2b814cd
badf094
b5bd535
a65390a
a4d283a
84fed6b
d79b676
6fdea9a
836dd96
e30217d
d525eae
af7b81e
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that the current approach is less type-safe than the original
ChunkWithState
approach. Essentially we are mixing another kind of message into the output type (StreamChunk
) ofinto_chunk_stream
, so the branching here looks fragile to me. cc @xxchan What's your idea on this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't fully checked it yet. It seems the old approach is equally unsafe as current approach? Both rely on invisible rows 🤡
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the best solution I can come up after the refactor of #14524, that is emit a invisible chunk to pass the offset to source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I have resolved the above comments, any more concern about the "invisible chunk" approach? cc @BugenZhao @xxchan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
StreamChunkWithState
or the heartbeat chunk here only passed from connector to SourceExecutor? If so, is it possible to pass aEither<StreamChunk, SourceOffset>
instead?Btw, substituting
StreamChunk with offset column
forStreamChunkWithState
(#14524) seems to be a orthogonol problem to me, because neither of them can pass source offset when no input i.e. heartbeat.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused. In current framework (main), offset of source message has been parsed into a column in the
StreamChunk
. So do you mean we need to add a new type (SourceOffset
) to theMessage
?risingwave/src/stream/src/executor/mod.rs
Lines 796 to 800 in 7ce1b6a
That way we also need to refactor other part to support adding a new message type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What looks weird to me is that we have a specific path for the chunk with cardinality of zero. What about defining the convention like this:
The behavior will be similar to what we implement in this PR, but does not require the ad-hoc path and is more clearly defined. We can iterate over all rows ("with holes") of the chunk to apply the offset updates and yield data optionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. I refactored the code. Is it looks good to you? @tabVersion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You guys are right. That will be 2 code paths, which looks even worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use
.rows()
iteration here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rows()
cannot iterate invisible row, the rows in heartbeat chunk is marked as invisibleThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get the logic either.
In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.