Skip to content

Commit

Permalink
fix(cdc): commit offset to upstream after checkpoint has commit (cher…
Browse files Browse the repository at this point in the history
…ry-pick) (#16298)
  • Loading branch information
StrikeW authored Apr 15, 2024
1 parent 4a52ef4 commit 0d53a95
Show file tree
Hide file tree
Showing 18 changed files with 398 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX;
import static io.debezium.schema.AbstractTopicNamingStrategy.*;

import com.risingwave.connector.api.source.CdcEngine;
import com.risingwave.proto.ConnectorServiceProto;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class DbzCdcEngine implements CdcEngine {
public class DbzCdcEngine implements Runnable {
static final int DEFAULT_QUEUE_CAPACITY = 16;

private final DebeziumEngine<?> engine;
private final DbzCdcEventConsumer consumer;
private final DbzChangeEventConsumer changeEventConsumer;
private final long id;

/** If config is not valid will throw exceptions */
Expand All @@ -41,15 +40,15 @@ public DbzCdcEngine(
var topicPrefix = config.getProperty(TOPIC_PREFIX.name());
var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC);
var consumer =
new DbzCdcEventConsumer(
new DbzChangeEventConsumer(
sourceId,
heartbeatTopicPrefix,
transactionTopic,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
this.id = sourceId;
this.consumer = consumer;
this.changeEventConsumer = consumer;
this.engine =
DebeziumEngine.create(Connect.class)
.using(config)
Expand All @@ -64,7 +63,6 @@ public void run() {
engine.run();
}

@Override
public long getId() {
return id;
}
Expand All @@ -73,8 +71,11 @@ public void stop() throws Exception {
engine.close();
}

@Override
public BlockingQueue<ConnectorServiceProto.GetEventStreamResponse> getOutputChannel() {
return consumer.getOutputChannel();
return changeEventConsumer.getOutputChannel();
}

public DbzChangeEventConsumer getChangeEventConsumer() {
return changeEventConsumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import org.slf4j.LoggerFactory;

/** Single-thread engine runner */
public class DbzCdcEngineRunner implements CdcEngineRunner {
public class DbzCdcEngineRunner {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEngineRunner.class);

private final ExecutorService executor;
private final AtomicBoolean running = new AtomicBoolean(false);
private CdcEngine engine;
private DbzCdcEngine engine;
private final DbzConnectorConfig config;

public static CdcEngineRunner newCdcEngineRunner(
public static DbzCdcEngineRunner newCdcEngineRunner(
DbzConnectorConfig config, StreamObserver<GetEventStreamResponse> responseObserver) {
DbzCdcEngineRunner runner = null;
try {
Expand Down Expand Up @@ -69,7 +69,7 @@ public static CdcEngineRunner newCdcEngineRunner(
return runner;
}

public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
public static DbzCdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
DbzCdcEngineRunner runner = new DbzCdcEngineRunner(config);
try {
var sourceId = config.getSourceId();
Expand Down Expand Up @@ -123,7 +123,7 @@ private DbzCdcEngineRunner(DbzConnectorConfig config) {
this.config = config;
}

private void withEngine(CdcEngine engine) {
private void withEngine(DbzCdcEngine engine) {
this.engine = engine;
}

Expand Down Expand Up @@ -160,16 +160,18 @@ public void stop() throws Exception {
}
}

@Override
public CdcEngine getEngine() {
public DbzCdcEngine getEngine() {
return engine;
}

@Override
public boolean isRunning() {
return running.get();
}

public DbzChangeEventConsumer getChangeEventConsumer() {
return engine.getChangeEventConsumer();
}

private void cleanUp() {
running.set(false);
// interrupt the runner thread if it is still running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.proto.ConnectorServiceProto.CdcMessage;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.embedded.EmbeddedEngineChangeEventProxy;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
Expand All @@ -40,9 +44,9 @@ enum EventType {
DATA,
}

public class DbzCdcEventConsumer
public class DbzChangeEventConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class);
static final Logger LOG = LoggerFactory.getLogger(DbzChangeEventConsumer.class);

private final BlockingQueue<GetEventStreamResponse> outputChannel;
private final long sourceId;
Expand All @@ -51,7 +55,10 @@ public class DbzCdcEventConsumer
private final String heartbeatTopicPrefix;
private final String transactionTopic;

DbzCdcEventConsumer(
private volatile DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentRecordCommitter;

DbzChangeEventConsumer(
long sourceId,
String heartbeatTopicPrefix,
String transactionTopic,
Expand Down Expand Up @@ -108,6 +115,7 @@ public void handleBatch(
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
var respBuilder = GetEventStreamResponse.newBuilder();
currentRecordCommitter = committer;
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
EventType eventType = getEventType(record);
Expand Down Expand Up @@ -199,9 +207,6 @@ var record = event.value();
default:
break;
}

// mark the event as processed
committer.markProcessed(event);
}

LOG.debug("recv {} events", respBuilder.getEventsCount());
Expand All @@ -211,16 +216,61 @@ var record = event.value();
var response = respBuilder.build();
outputChannel.put(response);
}
}

committer.markBatchFinished();
public BlockingQueue<GetEventStreamResponse> getOutputChannel() {
return this.outputChannel;
}

@Override
public boolean supportsTombstoneEvents() {
return DebeziumEngine.ChangeConsumer.super.supportsTombstoneEvents();
/**
* Commit the offset to the Debezium engine. NOTES: The input offset is passed from the source
* executor to here
*
* @param offset persisted offset in the Source state table
*/
@SuppressWarnings("unchecked")
public void commitOffset(DebeziumOffset offset) throws InterruptedException {
// Although the committer is read/write by multi-thread, the committer will be not changed
// frequently.
if (currentRecordCommitter == null) {
LOG.info(
"commitOffset() called on Debezium change consumer which doesn't receive records yet.");
return;
}

// only the offset is used
SourceRecord recordWrapper =
new SourceRecord(
offset.sourcePartition,
adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
"DUMMY",
Schema.BOOLEAN_SCHEMA,
true);
ChangeEvent<SourceRecord, SourceRecord> changeEvent =
EmbeddedEngineChangeEventProxy.create(null, recordWrapper, recordWrapper);
currentRecordCommitter.markProcessed(changeEvent);
currentRecordCommitter.markBatchFinished();
}

public BlockingQueue<GetEventStreamResponse> getOutputChannel() {
return this.outputChannel;
/**
* We have to adjust type of LSN values to Long, because it might be Integer after
* deserialization, however {@link
* io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(Map, Map)}
* requires Long.
*/
private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
String value =
sourceOffset
.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)
.toString();
sourceOffset.put(
PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
}
if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)) {
String value = sourceOffset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY).toString();
sourceOffset.put(PostgresOffsetContext.LAST_COMMIT_LSN_KEY, Long.parseLong(value));
}
return sourceOffset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.risingwave.connector.source.core;

import com.risingwave.connector.api.source.CdcEngineRunner;
import com.risingwave.connector.api.source.SourceHandler;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.metrics.ConnectorNodeMetrics;
Expand All @@ -25,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** * handler for starting a debezium source connectors */
/** handler for starting a debezium source connectors */
public class DbzSourceHandler implements SourceHandler {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class);

Expand All @@ -36,11 +35,11 @@ public DbzSourceHandler(DbzConnectorConfig config) {
}

class OnReadyHandler implements Runnable {
private final CdcEngineRunner runner;
private final DbzCdcEngineRunner runner;
private final ServerCallStreamObserver<GetEventStreamResponse> responseObserver;

public OnReadyHandler(
CdcEngineRunner runner,
DbzCdcEngineRunner runner,
ServerCallStreamObserver<GetEventStreamResponse> responseObserver) {
this.runner = runner;
this.responseObserver = responseObserver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,40 @@

import static com.risingwave.proto.ConnectorServiceProto.SourceType.POSTGRES;

import com.risingwave.connector.api.source.CdcEngineRunner;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.connector.source.common.CdcConnectorException;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.connector.source.common.DbzSourceUtils;
import com.risingwave.java.binding.Binding;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** handler for starting a debezium source connectors for jni */

/** handler for starting a debezium source connectors for jni */
public class JniDbzSourceHandler {
static final Logger LOG = LoggerFactory.getLogger(JniDbzSourceHandler.class);

private final DbzConnectorConfig config;
private final DbzCdcEngineRunner runner;

public JniDbzSourceHandler(DbzConnectorConfig config) {
public JniDbzSourceHandler(DbzConnectorConfig config, long channelPtr) {
this.config = config;
this.runner = DbzCdcEngineRunner.create(config, channelPtr);

if (runner == null) {
throw new CdcConnectorException("Failed to create engine runner");
}
}

public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr)
Expand Down Expand Up @@ -66,15 +78,31 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long
mutableUserProps,
request.getSnapshotDone(),
isCdcSourceJob);
JniDbzSourceHandler handler = new JniDbzSourceHandler(config);
JniDbzSourceHandler handler = new JniDbzSourceHandler(config, channelPtr);
// register handler to the registry
JniDbzSourceRegistry.register(config.getSourceId(), handler);
handler.start(channelPtr);
}

public void start(long channelPtr) {
var runner = DbzCdcEngineRunner.create(config, channelPtr);
if (runner == null) {
return;
public void commitOffset(String encodedOffset) throws InterruptedException {
try {
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(
encodedOffset.getBytes(StandardCharsets.UTF_8));
var changeEventConsumer = runner.getChangeEventConsumer();
if (changeEventConsumer != null) {
changeEventConsumer.commitOffset(offset);
LOG.info("Engine#{}: committed offset {}", config.getSourceId(), offset);
} else {
LOG.warn("Engine#{}: changeEventConsumer is null", config.getSourceId());
}
} catch (IOException err) {
LOG.error("Engine#{}: fail to commit offset.", config.getSourceId(), err);
throw new CdcConnectorException(err.getMessage());
}
}

public void start(long channelPtr) {

try {
// Start the engine
Expand All @@ -83,6 +111,8 @@ public void start(long channelPtr) {
LOG.error(
"Failed to send handshake message to channel. sourceId={}",
config.getSourceId());
// remove the handler from registry
JniDbzSourceRegistry.unregister(config.getSourceId());
return;
}

Expand Down Expand Up @@ -122,10 +152,13 @@ public void start(long channelPtr) {
LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e);
}
}

// remove the handler from registry
JniDbzSourceRegistry.unregister(config.getSourceId());
}

private boolean sendHandshakeMessage(CdcEngineRunner runner, long channelPtr, boolean startOk)
throws Exception {
private boolean sendHandshakeMessage(
DbzCdcEngineRunner runner, long channelPtr, boolean startOk) throws Exception {
// send a handshake message to notify the Source executor
// if the handshake is not ok, the split reader will return error to source actor
var controlInfo =
Expand Down
Loading

0 comments on commit 0d53a95

Please sign in to comment.