-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(cdc): commit offset to upstream after checkpoint has commit #16058
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4959 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2131 | 1 | 2827 | 0 |
Click to see the invalid file list
- java/connector-node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java
...node/risingwave-source-cdc/src/main/java/io/debezium/embedded/EmbeddedEngineChangeEvent.java
Outdated
Show resolved
Hide resolved
3f0e91a
to
e07f7e4
Compare
} | ||
} | ||
Err(e) => { | ||
tracing::error!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the first glance I thought we may miss commit_cdc_offset
if we reach here. But I realize that try_wait_epoch
on committed epoch won't fail unless the node is exiting. We may want to add a commment here for this assumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to guarantee commit offset to upstream on each checkpoint, it should not be a big deal if some checkpoint failed to commit offset. The upstream just need to retain those logs for a while, but eventually will be discarded by future checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that as long as we don't miss offset commit for a long time, it is not a big deal. However, my point is not about whether we will miss it. My point is that try_wait_epoch
is not expected to fail in this case. I originally thought we should panic here but logging an error log is more robust. How about just tweaking the message to be "Unexpected failure in try_wait_epoch {}" and adding a comment for this assumption?
@@ -105,6 +105,11 @@ impl SourceReader { | |||
} | |||
} | |||
|
|||
/// Postgres and Oracle connectors need to commit the offset to upstream. | |||
pub fn need_commit_offset_to_upstream(&self) -> bool { | |||
matches!(&self.config, ConnectorProperties::PostgresCdc(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about mysql? I thought all dbz-based cdc needs to commit offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only pg and oracle has this kind of protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that recordCommitter.markProcessed()
and recordCommitter.markBatchFinished
are no-ops for other dbz-based cdc source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. only a step in recordCommitter.markBatchFinished
is no-op for cdc sources that are not pg and oracle.
/**
* Commits the given offset with the source database. Used by some connectors
* like Postgres and Oracle to indicate how far the source TX log can be
* discarded.
*/
default void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
}
I got your point. To be cautious, we should call markProcessed
and markBatchFinished
for all cdc connectors.
...ector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java
Show resolved
Hide resolved
I evaluated the effectiveness of this PR with chaos test, the |
#15312 (comment) Can we be sure that the extra rows in RW is not caused by this PR? |
I also run against the nightly image (750), the symptom is same: RW rows is more than upstream pg. |
src/connector/src/source/base.rs
Outdated
PostgresCdc(split) => split.start_offset().clone().unwrap_or_default(), | ||
MongodbCdc(split) => split.start_offset().clone().unwrap_or_default(), | ||
CitusCdc(split) => split.start_offset().clone().unwrap_or_default(), | ||
_ => "".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unreachable!
instead of empty string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the executor part
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
resolve #15464
WaitEpochWorker
in source executor to wait the commit of an epoch for some cdc connectorsJniDbzSourceRegistry
to allow source executor can get the source handlerget_encoded_offset()
toSplitMetaData
to get offset from a splitrelated: #15312
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.