-
Notifications
You must be signed in to change notification settings - Fork 599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(source): parse message with empty key and payload #15678
Conversation
141479a
to
36bc335
Compare
The pr may introduce a behavior that yielding an all-empty row, including the PK column. @hzxa21 please help confirm the storage can ignore the row as expected. |
This quick fix only work for share cdc source. It fails for dedicated source, because payload is empty. Any idea to transfer previous implementation? @tabVersion
|
Hi, I think this PR is ready for review now, PTAL. cc @fuyufjh @BugenZhao @tabVersion |
for i in 0..chunk.capacity() { | ||
let (_, row, _) = chunk.row_at(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use .rows()
iteration here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rows()
cannot iterate invisible row, the rows in heartbeat chunk is marked as invisible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get the logic either.
In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get the logic either. In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.
- The row is not fully empty, those meta column (split_idx, offset_idx) has not-null value
- there is not O(1) method to check whether a row is fully empty, you must check all data column to know whether it is empty
let offset = row.datum_at(offset_idx).unwrap().into_utf8(); | ||
split_offset_mapping.insert(split_id, offset.to_string()); | ||
if chunk.cardinality() == 0 { | ||
// assumes the chunk is a heartbeat chunk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that the current approach is less type-safe than the original ChunkWithState
approach. Essentially we are mixing another kind of message into the output type (StreamChunk
) of into_chunk_stream
, so the branching here looks fragile to me. cc @xxchan What's your idea on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't fully checked it yet. It seems the old approach is equally unsafe as current approach? Both rely on invisible rows 🤡
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the best solution I can come up after the refactor of #14524, that is emit a invisible chunk to pass the offset to source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I have resolved the above comments, any more concern about the "invisible chunk" approach? cc @BugenZhao @xxchan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is StreamChunkWithState
or the heartbeat chunk here only passed from connector to SourceExecutor? If so, is it possible to pass a Either<StreamChunk, SourceOffset>
instead?
Btw, substituting StreamChunk with offset column
for StreamChunkWithState
(#14524) seems to be a orthogonol problem to me, because neither of them can pass source offset when no input i.e. heartbeat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so, is it possible to pass a Either<StreamChunk, SourceOffset> instead?
I am confused. In current framework (main), offset of source message has been parsed into a column in the StreamChunk
. So do you mean we need to add a new type (SourceOffset
) to the Message
?
risingwave/src/stream/src/executor/mod.rs
Lines 796 to 800 in 7ce1b6a
pub enum Message { | |
Chunk(StreamChunk), | |
Barrier(Barrier), | |
Watermark(Watermark), | |
} |
That way we also need to refactor other part to support adding a new message type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What looks weird to me is that we have a specific path for the chunk with cardinality of zero. What about defining the convention like this:
- All visible rows will be forwarded to the downstream.
- All rows (including those visible or invisible) will be used to update the offset.
The behavior will be similar to what we implement in this PR, but does not require the ad-hoc path and is more clearly defined. We can iterate over all rows ("with holes") of the chunk to apply the offset updates and yield data optionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All rows (including those visible or invisible) will be used to update the offset.
+1. I refactored the code. Is it looks good to you? @tabVersion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused. In current framework (main), offset of source message has been parsed into a column in the StreamChunk. So do you mean we need to add a new type (SourceOffset) to the Message?
What looks weird to me is that we have a specific path for the chunk with cardinality of zero.
You guys are right. That will be 2 code paths, which looks even worse.
...e-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look acceptable to me.
To make it type-stronger, #15678 (comment) might be better, but I am afraid that may also lead to more than 2x complexity in terms of LOC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the solution LGTM. But as @fuyufjh said, emitting an empty chunk is not remarkable enough for a controlling message. It may also be broken by mistake someday.
Co-authored-by: tabVersion <[email protected]>
Co-authored-by: StrikeW <[email protected]> Co-authored-by: tabVersion <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
CDC connector could emit heartbeat message to the Source. We rely on the heartbeat to keep up with upstream change log offset, which is important for dedicated source (e.g. mongodb) to reset to a valid offset upon recovery.
Without heartbeat messages, the source offset may become invalid in our side.
This PR fix the issue introduced by #14524. We extract the heartbeat message from the source batch message and emit the heartbeat chunk first to executor. The overhead should be negligible.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.