Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): commit offset to upstream after checkpoint has commit #16058

Merged
merged 25 commits into from
Apr 12, 2024

Conversation

StrikeW
Copy link
Contributor

@StrikeW StrikeW commented Apr 1, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

resolve #15464

  • Add WaitEpochWorker in source executor to wait the commit of an epoch for some cdc connectors
  • Add JniDbzSourceRegistry to allow source executor can get the source handler
  • Add get_encoded_offset() to SplitMetaData to get offset from a split

related: #15312

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@github-actions github-actions bot added the type/fix Bug fix label Apr 1, 2024
Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 4959 files.

Valid Invalid Ignored Fixed
2131 1 2827 0
Click to see the invalid file list
  • java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java

@StrikeW StrikeW linked an issue Apr 2, 2024 that may be closed by this pull request
@StrikeW StrikeW changed the title fix(cdc): commit offset to upstream after checkpoint has commit fix(cdc): commit offset to upstream after checkpoint has commit (WIP) Apr 2, 2024
@StrikeW StrikeW force-pushed the siyuan/cdc-commit-offset branch from 3f0e91a to e07f7e4 Compare April 7, 2024 10:00
@StrikeW StrikeW changed the title fix(cdc): commit offset to upstream after checkpoint has commit (WIP) fix(cdc): commit offset to upstream after checkpoint has commit Apr 7, 2024
@StrikeW StrikeW marked this pull request as ready for review April 7, 2024 14:38
@StrikeW StrikeW requested review from BugenZhao and wenym1 April 9, 2024 05:24
src/connector/src/source/base.rs Outdated Show resolved Hide resolved
src/connector/src/source/kinesis/split.rs Outdated Show resolved Hide resolved
src/connector/src/source/iceberg/mod.rs Outdated Show resolved Hide resolved
src/stream/src/common/table/watermark.rs Outdated Show resolved Hide resolved
src/stream/src/common/table/watermark.rs Outdated Show resolved Hide resolved
src/stream/src/common/table/state_table.rs Outdated Show resolved Hide resolved
@StrikeW StrikeW requested a review from BugenZhao April 9, 2024 08:46
@BugenZhao BugenZhao requested a review from xxchan April 11, 2024 05:52
}
}
Err(e) => {
tracing::error!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the first glance I thought we may miss commit_cdc_offset if we reach here. But I realize that try_wait_epoch on committed epoch won't fail unless the node is exiting. We may want to add a commment here for this assumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to guarantee commit offset to upstream on each checkpoint, it should not be a big deal if some checkpoint failed to commit offset. The upstream just need to retain those logs for a while, but eventually will be discarded by future checkpoint.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that as long as we don't miss offset commit for a long time, it is not a big deal. However, my point is not about whether we will miss it. My point is that try_wait_epoch is not expected to fail in this case. I originally thought we should panic here but logging an error log is more robust. How about just tweaking the message to be "Unexpected failure in try_wait_epoch {}" and adding a comment for this assumption?

@@ -105,6 +105,11 @@ impl SourceReader {
}
}

/// Postgres and Oracle connectors need to commit the offset to upstream.
pub fn need_commit_offset_to_upstream(&self) -> bool {
matches!(&self.config, ConnectorProperties::PostgresCdc(_))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about mysql? I thought all dbz-based cdc needs to commit offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only pg and oracle has this kind of protocol.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that recordCommitter.markProcessed() and recordCommitter.markBatchFinished are no-ops for other dbz-based cdc source?

Copy link
Contributor Author

@StrikeW StrikeW Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. only a step in recordCommitter.markBatchFinished is no-op for cdc sources that are not pg and oracle.

/**
     * Commits the given offset with the source database. Used by some connectors
     * like Postgres and Oracle to indicate how far the source TX log can be
     * discarded.
     */
    default void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
    }

I got your point. To be cautious, we should call markProcessed and markBatchFinished for all cdc connectors.

src/connector/src/source/base.rs Outdated Show resolved Hide resolved
@StrikeW
Copy link
Contributor Author

StrikeW commented Apr 12, 2024

I evaluated the effectiveness of this PR with chaos test, the Cannot seek to the last known offset error is gone, which means #15464 can be resolved.
But ch_benchmark_q4 still encounter inconsistency of the cdc source tables (tracked in #15312). I think we can merge this PR first and continue investigating the cdc table inconsistent issues.

@StrikeW StrikeW requested a review from hzxa21 April 12, 2024 05:21
@hzxa21
Copy link
Collaborator

hzxa21 commented Apr 12, 2024

I evaluated the effectiveness of this PR with chaos test, the Cannot seek to the last known offset error is gone, which means #15464 can be resolved. But ch_benchmark_q4 still encounter inconsistency of the cdc source tables (tracked in #15312). I think we can merge this PR first and continue investigating the cdc table inconsistent issues.

#15312 (comment) Can we be sure that the extra rows in RW is not caused by this PR?

@StrikeW
Copy link
Contributor Author

StrikeW commented Apr 12, 2024

I evaluated the effectiveness of this PR with chaos test, the Cannot seek to the last known offset error is gone, which means #15464 can be resolved. But ch_benchmark_q4 still encounter inconsistency of the cdc source tables (tracked in #15312). I think we can merge this PR first and continue investigating the cdc table inconsistent issues.

#15312 (comment) Can we be sure that the extra rows in RW is not caused by this PR?

I also run against the nightly image (750), the symptom is same: RW rows is more than upstream pg.

PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(),
MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(),
CitusCdc(split) => split.start_offset().clone().unwrap_or_default(),
_ => "".to_string(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unreachable! instead of empty string?

Copy link
Collaborator

@hzxa21 hzxa21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM!

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for the executor part

@StrikeW StrikeW added this pull request to the merge queue Apr 12, 2024
Merged via the queue into main with commit cc795da Apr 12, 2024
28 of 29 checks passed
@StrikeW StrikeW deleted the siyuan/cdc-commit-offset branch April 12, 2024 12:03
StrikeW added a commit that referenced this pull request Apr 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: checkpoint commit callback (for cdc-connector)
4 participants