Skip to content

Commit

Permalink
Revert "fix(cdc): commit offset to upstream after checkpoint has comm…
Browse files Browse the repository at this point in the history
…it (cherry-pick) (#16298)"

This reverts commit 0d53a95.
  • Loading branch information
StrikeW committed Apr 16, 2024
1 parent be1f1e2 commit 0d54f39
Show file tree
Hide file tree
Showing 18 changed files with 75 additions and 398 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX;
import static io.debezium.schema.AbstractTopicNamingStrategy.*;

import com.risingwave.connector.api.source.CdcEngine;
import com.risingwave.proto.ConnectorServiceProto;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class DbzCdcEngine implements Runnable {
public class DbzCdcEngine implements CdcEngine {
static final int DEFAULT_QUEUE_CAPACITY = 16;

private final DebeziumEngine<?> engine;
private final DbzChangeEventConsumer changeEventConsumer;
private final DbzCdcEventConsumer consumer;
private final long id;

/** If config is not valid will throw exceptions */
Expand All @@ -40,15 +41,15 @@ public DbzCdcEngine(
var topicPrefix = config.getProperty(TOPIC_PREFIX.name());
var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC);
var consumer =
new DbzChangeEventConsumer(
new DbzCdcEventConsumer(
sourceId,
heartbeatTopicPrefix,
transactionTopic,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
this.id = sourceId;
this.changeEventConsumer = consumer;
this.consumer = consumer;
this.engine =
DebeziumEngine.create(Connect.class)
.using(config)
Expand All @@ -63,6 +64,7 @@ public void run() {
engine.run();
}

@Override
public long getId() {
return id;
}
Expand All @@ -71,11 +73,8 @@ public void stop() throws Exception {
engine.close();
}

@Override
public BlockingQueue<ConnectorServiceProto.GetEventStreamResponse> getOutputChannel() {
return changeEventConsumer.getOutputChannel();
}

public DbzChangeEventConsumer getChangeEventConsumer() {
return changeEventConsumer;
return consumer.getOutputChannel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import org.slf4j.LoggerFactory;

/** Single-thread engine runner */
public class DbzCdcEngineRunner {
public class DbzCdcEngineRunner implements CdcEngineRunner {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEngineRunner.class);

private final ExecutorService executor;
private final AtomicBoolean running = new AtomicBoolean(false);
private DbzCdcEngine engine;
private CdcEngine engine;
private final DbzConnectorConfig config;

public static DbzCdcEngineRunner newCdcEngineRunner(
public static CdcEngineRunner newCdcEngineRunner(
DbzConnectorConfig config, StreamObserver<GetEventStreamResponse> responseObserver) {
DbzCdcEngineRunner runner = null;
try {
Expand Down Expand Up @@ -69,7 +69,7 @@ public static DbzCdcEngineRunner newCdcEngineRunner(
return runner;
}

public static DbzCdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
DbzCdcEngineRunner runner = new DbzCdcEngineRunner(config);
try {
var sourceId = config.getSourceId();
Expand Down Expand Up @@ -123,7 +123,7 @@ private DbzCdcEngineRunner(DbzConnectorConfig config) {
this.config = config;
}

private void withEngine(DbzCdcEngine engine) {
private void withEngine(CdcEngine engine) {
this.engine = engine;
}

Expand Down Expand Up @@ -160,18 +160,16 @@ public void stop() throws Exception {
}
}

public DbzCdcEngine getEngine() {
@Override
public CdcEngine getEngine() {
return engine;
}

@Override
public boolean isRunning() {
return running.get();
}

public DbzChangeEventConsumer getChangeEventConsumer() {
return engine.getChangeEventConsumer();
}

private void cleanUp() {
running.set(false);
// interrupt the runner thread if it is still running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.proto.ConnectorServiceProto.CdcMessage;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.embedded.EmbeddedEngineChangeEventProxy;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
Expand All @@ -44,9 +40,9 @@ enum EventType {
DATA,
}

public class DbzChangeEventConsumer
public class DbzCdcEventConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
static final Logger LOG = LoggerFactory.getLogger(DbzChangeEventConsumer.class);
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class);

private final BlockingQueue<GetEventStreamResponse> outputChannel;
private final long sourceId;
Expand All @@ -55,10 +51,7 @@ public class DbzChangeEventConsumer
private final String heartbeatTopicPrefix;
private final String transactionTopic;

private volatile DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentRecordCommitter;

DbzChangeEventConsumer(
DbzCdcEventConsumer(
long sourceId,
String heartbeatTopicPrefix,
String transactionTopic,
Expand Down Expand Up @@ -115,7 +108,6 @@ public void handleBatch(
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
var respBuilder = GetEventStreamResponse.newBuilder();
currentRecordCommitter = committer;
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
EventType eventType = getEventType(record);
Expand Down Expand Up @@ -207,6 +199,9 @@ var record = event.value();
default:
break;
}

// mark the event as processed
committer.markProcessed(event);
}

LOG.debug("recv {} events", respBuilder.getEventsCount());
Expand All @@ -216,61 +211,16 @@ var record = event.value();
var response = respBuilder.build();
outputChannel.put(response);
}
}

public BlockingQueue<GetEventStreamResponse> getOutputChannel() {
return this.outputChannel;
committer.markBatchFinished();
}

/**
* Commit the offset to the Debezium engine. NOTES: The input offset is passed from the source
* executor to here
*
* @param offset persisted offset in the Source state table
*/
@SuppressWarnings("unchecked")
public void commitOffset(DebeziumOffset offset) throws InterruptedException {
// Although the committer is read/write by multi-thread, the committer will be not changed
// frequently.
if (currentRecordCommitter == null) {
LOG.info(
"commitOffset() called on Debezium change consumer which doesn't receive records yet.");
return;
}

// only the offset is used
SourceRecord recordWrapper =
new SourceRecord(
offset.sourcePartition,
adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
"DUMMY",
Schema.BOOLEAN_SCHEMA,
true);
ChangeEvent<SourceRecord, SourceRecord> changeEvent =
EmbeddedEngineChangeEventProxy.create(null, recordWrapper, recordWrapper);
currentRecordCommitter.markProcessed(changeEvent);
currentRecordCommitter.markBatchFinished();
@Override
public boolean supportsTombstoneEvents() {
return DebeziumEngine.ChangeConsumer.super.supportsTombstoneEvents();
}

/**
* We have to adjust type of LSN values to Long, because it might be Integer after
* deserialization, however {@link
* io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(Map, Map)}
* requires Long.
*/
private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
String value =
sourceOffset
.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)
.toString();
sourceOffset.put(
PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
}
if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)) {
String value = sourceOffset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY).toString();
sourceOffset.put(PostgresOffsetContext.LAST_COMMIT_LSN_KEY, Long.parseLong(value));
}
return sourceOffset;
public BlockingQueue<GetEventStreamResponse> getOutputChannel() {
return this.outputChannel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.risingwave.connector.source.core;

import com.risingwave.connector.api.source.CdcEngineRunner;
import com.risingwave.connector.api.source.SourceHandler;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.metrics.ConnectorNodeMetrics;
Expand All @@ -24,7 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** handler for starting a debezium source connectors */
/** * handler for starting a debezium source connectors */
public class DbzSourceHandler implements SourceHandler {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class);

Expand All @@ -35,11 +36,11 @@ public DbzSourceHandler(DbzConnectorConfig config) {
}

class OnReadyHandler implements Runnable {
private final DbzCdcEngineRunner runner;
private final CdcEngineRunner runner;
private final ServerCallStreamObserver<GetEventStreamResponse> responseObserver;

public OnReadyHandler(
DbzCdcEngineRunner runner,
CdcEngineRunner runner,
ServerCallStreamObserver<GetEventStreamResponse> responseObserver) {
this.runner = runner;
this.responseObserver = responseObserver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,28 @@

import static com.risingwave.proto.ConnectorServiceProto.SourceType.POSTGRES;

import com.risingwave.connector.api.source.CdcEngineRunner;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset;
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.connector.source.common.CdcConnectorException;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.connector.source.common.DbzSourceUtils;
import com.risingwave.java.binding.Binding;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** handler for starting a debezium source connectors for jni */

/** handler for starting a debezium source connectors for jni */
public class JniDbzSourceHandler {
static final Logger LOG = LoggerFactory.getLogger(JniDbzSourceHandler.class);

private final DbzConnectorConfig config;
private final DbzCdcEngineRunner runner;

public JniDbzSourceHandler(DbzConnectorConfig config, long channelPtr) {
public JniDbzSourceHandler(DbzConnectorConfig config) {
this.config = config;
this.runner = DbzCdcEngineRunner.create(config, channelPtr);

if (runner == null) {
throw new CdcConnectorException("Failed to create engine runner");
}
}

public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long channelPtr)
Expand Down Expand Up @@ -78,31 +66,15 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long
mutableUserProps,
request.getSnapshotDone(),
isCdcSourceJob);
JniDbzSourceHandler handler = new JniDbzSourceHandler(config, channelPtr);
// register handler to the registry
JniDbzSourceRegistry.register(config.getSourceId(), handler);
JniDbzSourceHandler handler = new JniDbzSourceHandler(config);
handler.start(channelPtr);
}

public void commitOffset(String encodedOffset) throws InterruptedException {
try {
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(
encodedOffset.getBytes(StandardCharsets.UTF_8));
var changeEventConsumer = runner.getChangeEventConsumer();
if (changeEventConsumer != null) {
changeEventConsumer.commitOffset(offset);
LOG.info("Engine#{}: committed offset {}", config.getSourceId(), offset);
} else {
LOG.warn("Engine#{}: changeEventConsumer is null", config.getSourceId());
}
} catch (IOException err) {
LOG.error("Engine#{}: fail to commit offset.", config.getSourceId(), err);
throw new CdcConnectorException(err.getMessage());
}
}

public void start(long channelPtr) {
var runner = DbzCdcEngineRunner.create(config, channelPtr);
if (runner == null) {
return;
}

try {
// Start the engine
Expand All @@ -111,8 +83,6 @@ public void start(long channelPtr) {
LOG.error(
"Failed to send handshake message to channel. sourceId={}",
config.getSourceId());
// remove the handler from registry
JniDbzSourceRegistry.unregister(config.getSourceId());
return;
}

Expand Down Expand Up @@ -152,13 +122,10 @@ public void start(long channelPtr) {
LOG.warn("Failed to stop Engine#{}", config.getSourceId(), e);
}
}

// remove the handler from registry
JniDbzSourceRegistry.unregister(config.getSourceId());
}

private boolean sendHandshakeMessage(
DbzCdcEngineRunner runner, long channelPtr, boolean startOk) throws Exception {
private boolean sendHandshakeMessage(CdcEngineRunner runner, long channelPtr, boolean startOk)
throws Exception {
// send a handshake message to notify the Source executor
// if the handshake is not ok, the split reader will return error to source actor
var controlInfo =
Expand Down
Loading

0 comments on commit 0d54f39

Please sign in to comment.