-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(cdc): support transaction for shared cdc source #14375
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add transaction metadata parsing logic to PlainParser for the CDC source job, which fixed to plain json format.
I am a little worried that we are mixing too much logic into plain parser. shall we make a new format dedicated to CDC source?
Hmm, the modification to plain parser is small, only add a new branch to parse transaction metadata if it needs to. |
Yes, I am ok with the current impl, just wonder if you have such plan. |
Right now the modification is small, so I think it is ok to just extend the plain parser. If there are more features need to add to the plain parser (e.g. handle schema change events), we can define a new parser and format for it at that time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask what's PlainParser
for? Why aren't we using JsonParser
here?
&'a mut self, | ||
key: Option<Vec<u8>>, | ||
payload: Option<Vec<u8>>, | ||
writer: SourceStreamChunkRowWriter<'a>, | ||
) -> Result<()> { | ||
) -> Result<ParseResult> { | ||
tracing::info!("parse_one_with_txn"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this be too verbose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, will remove it in a following bugfix PR.
ConnectorProperties::PostgresCdc(_) => { | ||
let (tx_id, _) = id.split_once(':').unwrap(); | ||
return Ok(TransactionControl::Begin { id: tx_id.into() }); | ||
} | ||
ConnectorProperties::MysqlCdc(_) => return Ok(TransactionControl::Begin { id }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we depend on the value of the ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we have a check in https://github.com/risingwavelabs/risingwave/pull/14375/files#diff-d20a701c6d209a35e84545b11ccd13ab2f332a68aa175eb4bf73edf67184e8bcR673-R675, if transaction id mismatch, it will print a warn log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. IIUC, the LSN for BEGIN
and COMMIT
differs from each other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, lsn in commit will greater than begin.
It is for parsing message for source in risingwave/src/connector/src/parser/plain_parser.rs Lines 58 to 63 in 414c6ec
|
Co-authored-by: StrikeW <[email protected]>
Is it possible to enable this feature by default? Will this involve additional cost? |
The overhead should be small. But I think we should conduct a performance test to make this decision. @lmatz Does our ch-benchmark/sysbench use |
Any follow-ups? @StrikeW |
@cyliu0 is working on the test, the ETA is next week. https://github.com/risingwavelabs/risingwave-test/issues/577 |
Thanks for the testing work! I prefer to enable this by default so that we can claim the CDC connector is transactional. Otherwise, telling people "Yes-but" (Yes it supports transaction but needs to be enabled manually) sounds bad to me. |
I got it. Let's do the change in v1.7.0. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
close #14348
PlainParser
for the CDC source job, which fixed to plain json format.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
User can specify
transactional = true
parameter to the WITH options to preserve upstream transaction semantic when create a CDC source.