-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Some tables doesn't sync for PG CDC v2.1.0-rt.8 #19681
Comments
The problem is #19467 breaks a prerequisite for cdc tables. Previously, we require the pg publication is created if In the log of Meta we can see that the |
After #19467, the source data stream may not have been created even the source streaming job is created, since the initial barrier will be yielded to downstream before we successfully created the data stream. Does the correctness of Share Source relies on source data stream must be created when |
There's no such assumption |
Wait, I just came up of a problem: sth like #19443 may occur. We have a
In other words, the assumption "source data stream must be created when CREATE SOURCE returns success" is needed (not precisely). More precisely, events cannot happen in the order like above. (It can happen only when
(From the logs in https://buildkite.com/risingwavelabs/pull-request/builds/64080#0193b138-957e-4fa3-9572-51f9cd0be778, it seems indeed the problem.) |
So forward the
I thought about that before, but when external system cannot be reached, then the cluster will in crash loop state, since the source data stream cannot be built and #17807 still remain. |
The semantic of initial barrier is to notify the downstream executor that upstream executor is inited. We concurrently poll barrier stream and the future to build source stream to avoid blocking the stream. But when we forward the initial barrier first before finishing the creation of source reader, the source executor actually doesn't finish its initialization.
Now I think it is risky to break this assumption for the source executor, which is special. We may need other mechanism to resolve #17807 for the source specifically. edit: precisely it is not the forwarding of initial barrier, it should be the first barrier after the initial barrier from the code but the problem is similar. |
Exactly. IIUC, downstream actor treats the first barrier as the notification on successful creation of upstream source reader, which means upstream must block barriers until source reader creation is successful. How about having a special command attached to the barrier for such notification without relying on the barrier being the first barrier? // Poll the upstream to get the first barrier.
let barrier = expect_first_barrier(&mut input).await?;
let first_epoch = barrier.epoch;
let owned_splits = barrier
.initial_split_assignment(self.actor_ctx.id)
.unwrap_or(&[])
.to_vec();
let is_pause_on_startup = barrier.is_pause_on_startup();
yield Message::Barrier(barrier); becomes // Init some states on first barrier
let barrier = expect_first_barrier(&mut input).await?;
let first_epoch = barrier.epoch;
let owned_splits = barrier
.initial_split_assignment(self.actor_ctx.id)
.unwrap_or(&[])
.to_vec();
let is_pause_on_startup = barrier.is_pause_on_startup();
yield Message::Barrier(barrier);
// Poll upstream barriers until source reader finish creation
while let message = stream.next().instrument_await("expect_first_barrier").await {
let barrier = message
.into_barrier()
.expect("the message must be a barrier");
yield Message::Barrier(barrier);
if source_reader_created(&barrier) {
break;
}
} |
Do you mean when the source executor finished creating the source reader, it should notify the Meta to push this special command to the streaming pipeline to let downstream executors know source reader is ready? |
@xxchan How do we make sure that right now if we start the work immediately without waiting for the ckpt of the first barrier completes? If we start doing something immediately in the actor after seeing the first barrier without waiting for ckpt, isn't it possible that upstream actor in other parallelism is still creating the source reader? |
@hzxa21 I don't fully get it. Previous implementation is meta will wait for the first barrier to be collected. (And SourceExec will first do work and then yield barrier) Do you mean that's not enough and we should wait for ckpt? |
Discussed offline with @xxchan and @StrikeW. There are several tricky assumptions here and there so let me note them down here. Assumptions
History of Changes
|
The cleanest way to fix this issue is to wait until the 2nd barrier completes before we claim |
Describe the bug
https://buildkite.com/risingwave-test/ch-benchmark-pg-cdc-shared-source/builds/124#019391f2-6d26-4d75-bcc4-ab39abc277cb/8586-8587
https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&from=1733320258820&to=1733323410627&var-datasource=cdtasocg64074c&var-namespace=ch-benchmark-pg-cdc-shared-source-6ffck&var-instance=benchmark-risingwave&var-pod=All&var-component=All&var-table=All
Error message/log
No response
To Reproduce
No response
Expected behavior
No response
How did you deploy RisingWave?
No response
The version of RisingWave
v2.1.0-rt.8
Additional context
No response
The text was updated successfully, but these errors were encountered: