Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: checkpoint commit callback (for cdc-connector) #15464

Closed
fuyufjh opened this issue Mar 5, 2024 · 11 comments · Fixed by #16058
Closed

feat: checkpoint commit callback (for cdc-connector) #15464

fuyufjh opened this issue Mar 5, 2024 · 11 comments · Fixed by #16058
Assignees
Labels
needs-design Don't start your coding work before a detailed design proposed priority/high
Milestone

Comments

@fuyufjh
Copy link
Member

fuyufjh commented Mar 5, 2024

Some background go first

Our cdc connector consumes PG cdc events while acking to the PG server at regular intervals the offset (lsn) that has been consumed. Then upstream PG will assume that wal log of those offsets can be discarded.

committer.markProcessed(event);
}
// skip empty batch
if (respBuilder.getEventsCount() > 0) {
respBuilder.setSourceId(sourceId);
var response = respBuilder.build();
outputChannel.put(response);
}
committer.markBatchFinished();

DebeziumEngine will commit those marked offsets to upstream:
https://github.com/debezium/debezium/blob/4ca2a67b0d302c611b89b1931728377cf232ab6c/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L435-L436

Findings

After some investigation, I think the reason for the "Cannot seek to the last known offset" error is that we ack the offset to PG before the checkpoint commit. So that when the cluster recovered from a committed checkpoint, the restored offset may already been discarded by upstream PG.

Currently our framework doesn't have a checkpoint commit callback mechanism to notify the source executor. An intuitive idea is to let Meta make a broadcast RPCs to each CNs in the cluster. cc @hzxa21

To confirm the findings, I increase the offset flush interval to 30mins which is much large than the time required for the test and rerun the chaos test (stresschaos only and w/o memtable spill: 599, 603 ), the results show that the "Cannot seek" error is gone and btw mv check is passed.

But! when I run chaos test with 3CNs (601) , even the error is gone, the source table is still unsynced with PG, I have no idea of this one right now.

Originally posted by @StrikeW in #15141 (comment)

@fuyufjh fuyufjh added the needs-design Don't start your coding work before a detailed design proposed label Mar 5, 2024
@github-actions github-actions bot added this to the release-1.8 milestone Mar 5, 2024
@fuyufjh
Copy link
Member Author

fuyufjh commented Mar 8, 2024

Note down the discussion with @BugenZhao.

Today we realized the requirement not only works for source but also works for sink. For example, if we want to provide exactly-once delivery for sink, the implementation will be like

  1. Prepare: SinkExecutor writes data and makes sure the data is persisted in target system but invisible to readers.
  2. Commit: The checkpoint commit callback calls sink to finalize the writes by making all previous writes visible.

Hope we can design a general framework for this requirement, rather than being specific
to CDC connectors.

@BugenZhao BugenZhao self-assigned this Mar 8, 2024
@StrikeW
Copy link
Contributor

StrikeW commented Mar 8, 2024

I think we can generalize the sink coordinator framework for this feature.

@hzxa21
Copy link
Collaborator

hzxa21 commented Mar 8, 2024

Today we realized the requirement not only works for source but also works for sink. For example, if we want to provide exactly-once delivery for sink, the implementation will be like

I think we have already implemented a general framework (sink coordinator) for sink and we need to extend it to source. you can check with @wenym1 for the details. IIRC, the iceberg sink already leverages sink coordinator to coordinate commit from different sink actors in the same fragment.

@StrikeW StrikeW self-assigned this Mar 28, 2024
@yuhao-su
Copy link
Contributor

BTW, this situation is also true for persisted pulsar source with 0 retention.

@BugenZhao
Copy link
Member

Currently our framework doesn't have a checkpoint commit callback mechanism to notify the source executor.

Does StateStore::try_wait_epoch achieve similar functionality? Can we delay the acknowledgement until the try_wait_epoch call returns to resolve this issue?

@StrikeW
Copy link
Contributor

StrikeW commented Mar 29, 2024

The problem is we need to ack the source offset to upstream database after checkpoint has been committed.

I think we can generalize the sink coordinator framework for this feature.

I revisited the workflow of sink coordinator, which is independent of the barriers collection workflow (initiated by BarrierManager). The sink executors communicate with sink coordinator solely. If we want to generalize the sink coordinator framework, I think we need to integrate it into barrier collection first, which would introduce unnecessary work right now.

Does StateStore::try_wait_epoch achieve similar functionality? Can we delay the acknowledgement until the try_wait_epoch call returns to resolve this issue?

This is a good idea. Right now there is a existing WaitEpochCommit RPC from Meta to CN, we can leverage this and hook into barrier collection workflow to notify Source. The workflow be like:

  1. Meta send WaitEpochCommit to CN when a barrier completed
  2. LocalStreamManager in the CN notify all source exec via channel
  3. Source exec interact with the cdc connector to ack the offset to upstream.

Btw source exec can call StateStore::try_wait_epoch by itself with current epoch, so that it is unnecessary to rely on the RPC.

@hzxa21
Copy link
Collaborator

hzxa21 commented Mar 29, 2024

I think the workflow can be simplified without RPC:

  1. Source writes offset to source state table
  2. On seeing checkpoint barrier, source flush states and await on try_wait_epoch without blocking the stream.
  3. On try_wait_epoch succeeds, source ack offset to upstream

Note that because upstream receives ack after checkpoint has finished, it is possible that upstream will send already consumed message to source. To ensure exactly once consumption, source needs to filter out duplicate messages sent by upstream on recovery. Discussed offline with @StrikeW, delay in ack won't affect exactly once consumption because the acked offset is only used to decide the lowest offset to be retained in binlog, and on recovery source will seek to the right offset to start consumption based on source state.

@BugenZhao
Copy link
Member

I think the workflow can be simplified without RPC:

This is exactly what I mean! There's one more thing to be note that the procedure should be done in a concurrent or asynchronous manner.

@StrikeW
Copy link
Contributor

StrikeW commented Apr 8, 2024

The coding of PR #16058 is done, but the call to wait_epoch on local state store cannot receive notification and the barrier also piles up. Call for review of the code.

Problem resolved. should wait for the epoch.prev instead of epoch.curr.

@tabVersion
Copy link
Contributor

Hi all, I am revisiting the issue for implementing a general ack mechanism in source, thanks @xxchan for mentioning this.

My major questions are:

  1. for a mq with ack, it has a timeout for the staging queue. If a message fails to ack within the timeout, it will go back to the deliver queue and be delivered again to the clients. Then we can get a message twice in some cases, for example, an mq's timeout is 30min and at that time rw is experiencing heavy load, and it takes 35min to finish one epoch. Then all data in the epoch will be delivered twice (fails to achieve exactly-once semantic, and makes future traffic heavier). Is it desired or we gonna trigger a rollback in advance proactively for the promise?
  2. unlike a cdc-spec one, a general framework gonna retains all messages' ack-id. Considering a 30min sliding time window, we may have to persist millions of records. Shall we introducing a log store for it?

BTW, this situation is also true for persisted pulsar source with 0 retention.

@yuhao-su can you share more about the case? IIRC, a non-persistent pulsar topic cannot seek to spec position and I think it is expected.

@yuhao-su
Copy link
Contributor

can you share more about the case? IIRC, a non-persistent pulsar topic cannot seek to spec position and I think it is expected.

Replied in the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-design Don't start your coding work before a detailed design proposed priority/high
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants