Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(source): parse message with empty key and payload #15678

Merged
merged 14 commits into from
Mar 25, 2024

Conversation

StrikeW
Copy link
Contributor

@StrikeW StrikeW commented Mar 14, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

CDC connector could emit heartbeat message to the Source. We rely on the heartbeat to keep up with upstream change log offset, which is important for dedicated source (e.g. mongodb) to reset to a valid offset upon recovery.
Without heartbeat messages, the source offset may become invalid in our side.

This PR fix the issue introduced by #14524. We extract the heartbeat message from the source batch message and emit the heartbeat chunk first to executor. The overhead should be negligible.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@github-actions github-actions bot added the type/fix Bug fix label Mar 14, 2024
@StrikeW StrikeW force-pushed the siyuan/fix-source-parser branch from 141479a to 36bc335 Compare March 14, 2024 06:01
@tabVersion tabVersion requested a review from hzxa21 March 14, 2024 06:05
@tabVersion
Copy link
Contributor

The pr may introduce a behavior that yielding an all-empty row, including the PK column. @hzxa21 please help confirm the storage can ignore the row as expected.

@StrikeW
Copy link
Contributor Author

StrikeW commented Mar 14, 2024

This quick fix only work for share cdc source. It fails for dedicated source, because payload is empty. Any idea to transfer previous implementation? @tabVersion

2024-03-14T06:17:40.878162627Z  INFO risingwave_connector_node: Searching for WAL resume position thread="debezium-postgresconnector-RW_CDC_2-change-event-source-coordinator" class="io.debezium.connector.postgresql.PostgresStreamingChangeEventSource"
thread 'rw-streaming' panicked at src/connector/src/parser/unified/debezium.rs:93:9:
assertion failed: key_accessor.is_some() || value_accessor.is_some()
stack backtrace:
   0: rust_begin_unwind
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panicking.rs:72:14
   2: core::panicking::panic
             at /rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panicking.rs:144:5
   3: risingwave_connector::parser::unified::debezium::DebeziumChangeEvent<A>::new
             at ./src/connector/src/parser/unified/debezium.rs:93:9
   4: {async_fn#0}
             at ./src/connector/src/parser/debezium/debezium_parser.rs:136:22
   5: {async_fn#0}
             at ./src/connector/src/parser/debezium/debezium_parser.rs:184:48
   6: {coroutine#0}<risingwave_connector::parser::debezium::debezium_parser::DebeziumParser>
             at ./src/connector/src/parser/mod.rs:687:18

@StrikeW StrikeW marked this pull request as draft March 19, 2024 05:25
@StrikeW StrikeW changed the title fix(connector): don't skip message without key and payload fix(source): parse message with empty key and payload Mar 19, 2024
@StrikeW StrikeW marked this pull request as ready for review March 19, 2024 12:14
@StrikeW StrikeW changed the title fix(source): parse message with empty key and payload fix(source): parse message with empty key and payload (WIP) Mar 19, 2024
@StrikeW StrikeW changed the title fix(source): parse message with empty key and payload (WIP) fix(source): parse message with empty key and payload Mar 19, 2024
@StrikeW StrikeW requested review from hzxa21, fuyufjh, tabVersion, BugenZhao and xxchan and removed request for hzxa21, fuyufjh, tabVersion and BugenZhao March 19, 2024 14:57
@StrikeW StrikeW marked this pull request as ready for review March 20, 2024 10:38
@StrikeW StrikeW changed the title fix(source): parse message with empty key and payload (WIP) fix(source): parse message with empty key and payload Mar 20, 2024
@StrikeW
Copy link
Contributor Author

StrikeW commented Mar 20, 2024

Hi, I think this PR is ready for review now, PTAL. cc @fuyufjh @BugenZhao @tabVersion

Comment on lines 65 to 66
for i in 0..chunk.capacity() {
let (_, row, _) = chunk.row_at(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use .rows() iteration here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rows() cannot iterate invisible row, the rows in heartbeat chunk is marked as invisible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the logic either.
In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.

Copy link
Contributor Author

@StrikeW StrikeW Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the logic either. In my imagination, the reader emits an empty but normal row. The exec is eligible to detect the empty row and change the visibility.

  1. The row is not fully empty, those meta column (split_idx, offset_idx) has not-null value
  2. there is not O(1) method to check whether a row is fully empty, you must check all data column to know whether it is empty

src/stream/src/executor/source/mod.rs Outdated Show resolved Hide resolved
src/connector/src/parser/mod.rs Outdated Show resolved Hide resolved
src/connector/src/parser/mod.rs Outdated Show resolved Hide resolved
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
split_offset_mapping.insert(split_id, offset.to_string());
if chunk.cardinality() == 0 {
// assumes the chunk is a heartbeat chunk
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that the current approach is less type-safe than the original ChunkWithState approach. Essentially we are mixing another kind of message into the output type (StreamChunk) of into_chunk_stream, so the branching here looks fragile to me. cc @xxchan What's your idea on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't fully checked it yet. It seems the old approach is equally unsafe as current approach? Both rely on invisible rows 🤡

Copy link
Contributor Author

@StrikeW StrikeW Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the best solution I can come up after the refactor of #14524, that is emit a invisible chunk to pass the offset to source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I have resolved the above comments, any more concern about the "invisible chunk" approach? cc @BugenZhao @xxchan

Copy link
Member

@fuyufjh fuyufjh Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is StreamChunkWithState or the heartbeat chunk here only passed from connector to SourceExecutor? If so, is it possible to pass a Either<StreamChunk, SourceOffset> instead?

Btw, substituting StreamChunk with offset column for StreamChunkWithState (#14524) seems to be a orthogonol problem to me, because neither of them can pass source offset when no input i.e. heartbeat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, is it possible to pass a Either<StreamChunk, SourceOffset> instead?

I am confused. In current framework (main), offset of source message has been parsed into a column in the StreamChunk. So do you mean we need to add a new type (SourceOffset) to the Message?

pub enum Message {
Chunk(StreamChunk),
Barrier(Barrier),
Watermark(Watermark),
}

That way we also need to refactor other part to support adding a new message type.

Copy link
Member

@BugenZhao BugenZhao Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What looks weird to me is that we have a specific path for the chunk with cardinality of zero. What about defining the convention like this:

  • All visible rows will be forwarded to the downstream.
  • All rows (including those visible or invisible) will be used to update the offset.

The behavior will be similar to what we implement in this PR, but does not require the ad-hoc path and is more clearly defined. We can iterate over all rows ("with holes") of the chunk to apply the offset updates and yield data optionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All rows (including those visible or invisible) will be used to update the offset.

+1. I refactored the code. Is it looks good to you? @tabVersion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused. In current framework (main), offset of source message has been parsed into a column in the StreamChunk. So do you mean we need to add a new type (SourceOffset) to the Message?

What looks weird to me is that we have a specific path for the chunk with cardinality of zero.

You guys are right. That will be 2 code paths, which looks even worse.

@StrikeW StrikeW requested a review from BugenZhao March 25, 2024 03:03
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look acceptable to me.

To make it type-stronger, #15678 (comment) might be better, but I am afraid that may also lead to more than 2x complexity in terms of LOC.

src/connector/src/parser/mod.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the solution LGTM. But as @fuyufjh said, emitting an empty chunk is not remarkable enough for a controlling message. It may also be broken by mistake someday.

@StrikeW StrikeW enabled auto-merge March 25, 2024 06:55
@StrikeW StrikeW added this pull request to the merge queue Mar 25, 2024
Merged via the queue into main with commit e808972 Mar 25, 2024
26 of 27 checks passed
@StrikeW StrikeW deleted the siyuan/fix-source-parser branch March 25, 2024 07:48
github-actions bot pushed a commit that referenced this pull request Mar 25, 2024
StrikeW added a commit that referenced this pull request Mar 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants