Skip to content

Commit

Permalink
fix(cdc-backfill): ensure snapshot read starts after source (#13663)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Dec 1, 2023
1 parent fce66c0 commit 7677abc
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 87 deletions.
4 changes: 2 additions & 2 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ select v1, v2, v3 from mytable order by v1;
query I
SELECT * from products_test_cnt
----
12
13

query I
SELECT * from orders_test_cnt
----
4
5

query ITT
SELECT * FROM products_test order by id limit 3
Expand Down
10 changes: 7 additions & 3 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ create table products_test ( id INT,
PRIMARY KEY (id)
) from mysql_mytest table 'mytest.products';

system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES (default,'Milk','Milk is a white liquid food');
INSERT INTO orders VALUES (default, '2023-11-28 15:08:22', 'Bob', 10.52, 100, false);"

# check the fragment distribution
query TT
select distribution_type,flags from rw_fragments order by fragment_id;
Expand Down Expand Up @@ -70,20 +74,20 @@ statement ok
create materialized view orders_test_cnt as select count(*) as cnt from orders_test;

system ok
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Milk', '100ml Milk');"
mysql --protocol=tcp -u root mytest -e "INSERT INTO products VALUES(default, 'Juice', '100ml Juice');"

sleep 5s

# check ingestion results
query I
SELECT * from products_test_cnt
----
10
11

query I
SELECT * from orders_test_cnt
----
3
4

query ITT
SELECT * FROM products_test order by id limit 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package com.risingwave.connector.api.source;

public interface CdcEngineRunner {
void start() throws Exception;
boolean start() throws Exception;

void stop() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@ private static Map<String, String> extractDebeziumProperties(
}

private final long sourceId;

private final SourceTypeE sourceType;

private final Properties resolvedDbzProps;
private final boolean isBackfillSource;

public long getSourceId() {
return sourceId;
Expand All @@ -96,6 +95,10 @@ public Properties getResolvedDebeziumProps() {
return resolvedDbzProps;
}

public boolean isBackfillSource() {
return isBackfillSource;
}

public DbzConnectorConfig(
SourceTypeE source,
long sourceId,
Expand Down Expand Up @@ -190,6 +193,7 @@ public DbzConnectorConfig(
this.sourceId = sourceId;
this.sourceType = source;
this.resolvedDbzProps = dbzProps;
this.isBackfillSource = isCdcBackfill;
}

private void adjustConfigForSharedCdcStream(Properties dbzProps) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.common;

import com.risingwave.connector.api.source.SourceTypeE;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DbzSourceUtils {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceUtils.class);

public static boolean waitForStreamingRunning(SourceTypeE sourceType, String dbServerName) {
// Right now, we only needs to wait for MySQL source, as it's the only source that support
// backfill. After we support backfill for other sources, we need to wait for all sources
// too
if (sourceType == SourceTypeE.MYSQL) {
LOG.info("Waiting for streaming source of {} to start", dbServerName);
return waitForStreamingRunningInner("mysql", dbServerName);
} else {
LOG.info("Unsupported backfill source, just return true for {}", dbServerName);
return true;
}
}

private static boolean waitForStreamingRunningInner(String connector, String dbServerName) {
int maxPollCount = 10;
while (!isStreamingRunning(connector, dbServerName, "streaming")) {
maxPollCount--;
if (maxPollCount == 0) {
LOG.error("Debezium streaming source of {} failed to start", dbServerName);
return false;
}
try {
TimeUnit.SECONDS.sleep(1); // poll interval
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for streaming source to start", e);
}
}

LOG.info("Debezium streaming source of {} started", dbServerName);
return true;
}

// Copy from debezium test suite: io.debezium.embedded.AbstractConnectorTest
// Notes: although this method is recommended by the community
// (https://debezium.zulipchat.com/#narrow/stream/302529-community-general/topic/.E2.9C.94.20Embedded.20engine.20has.20started.20StreamingChangeEventSource/near/405121659),
// but it is not solid enough. As the jmx bean metric is marked as true before starting the
// binlog client, which may fail to connect the upstream database.
private static boolean isStreamingRunning(String connector, String server, String contextName) {
final MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
try {
return (boolean)
mbeanServer.getAttribute(
getStreamingMetricsObjectName(connector, server, contextName),
"Connected");
} catch (JMException ex) {
LOG.warn("Failed to get streaming metrics", ex);
}
return false;
}

private static ObjectName getStreamingMetricsObjectName(
String connector, String server, String context) throws MalformedObjectNameException {
return new ObjectName(
"debezium."
+ connector
+ ":type=connector-metrics,context="
+ context
+ ",server="
+ server);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,24 @@ public class DbzCdcEngine implements CdcEngine {
private final long id;

/** If config is not valid will throw exceptions */
public DbzCdcEngine(long id, Properties config, DebeziumEngine.CompletionCallback callback) {
public DbzCdcEngine(
long sourceId,
Properties config,
DebeziumEngine.CompletionCallback completionCallback) {
var dbzHeartbeatPrefix = config.getProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name());
var consumer =
new DbzCdcEventConsumer(
id, dbzHeartbeatPrefix, new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));
sourceId,
dbzHeartbeatPrefix,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
this.id = id;
this.id = sourceId;
this.consumer = consumer;
this.engine =
DebeziumEngine.create(Connect.class)
.using(config)
.using(callback)
.using(completionCallback)
.notifying(consumer)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.risingwave.connector.api.source.*;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.connector.source.common.DbzSourceUtils;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
Expand All @@ -31,13 +32,7 @@ public class DbzCdcEngineRunner implements CdcEngineRunner {
private final ExecutorService executor;
private final AtomicBoolean running = new AtomicBoolean(false);
private final CdcEngine engine;

private DbzCdcEngineRunner(CdcEngine engine) {
this.executor =
Executors.newSingleThreadExecutor(
r -> new Thread(r, "rw-dbz-engine-runner-" + engine.getId()));
this.engine = engine;
}
private final DbzConnectorConfig config;

public static CdcEngineRunner newCdcEngineRunner(
DbzConnectorConfig config, StreamObserver<GetEventStreamResponse> responseObserver) {
Expand All @@ -62,14 +57,14 @@ public static CdcEngineRunner newCdcEngineRunner(
}
});

runner = new DbzCdcEngineRunner(engine);
runner = new DbzCdcEngineRunner(engine, config);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
}
return runner;
}

public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
public static CdcEngineRunner create(DbzConnectorConfig config) {
DbzCdcEngineRunner runner = null;
try {
var sourceId = config.getSourceId();
Expand All @@ -89,32 +84,44 @@ public static CdcEngineRunner newCdcEngineRunner(DbzConnectorConfig config) {
}
});

runner = new DbzCdcEngineRunner(engine);
runner = new DbzCdcEngineRunner(engine, config);
} catch (Exception e) {
LOG.error("failed to create the CDC engine", e);
}
return runner;
}

// private constructor
private DbzCdcEngineRunner(CdcEngine engine, DbzConnectorConfig config) {
this.executor =
Executors.newSingleThreadExecutor(
r -> new Thread(r, "rw-dbz-engine-runner-" + engine.getId()));
this.engine = engine;
this.config = config;
}

/** Start to run the cdc engine */
public void start() throws InterruptedException {
public boolean start() throws InterruptedException {
if (isRunning()) {
LOG.info("engine#{} already started", engine.getId());
return;
return true;
}

// put a handshake message to notify the Source executor
var controlInfo =
GetEventStreamResponse.ControlInfo.newBuilder().setHandshakeOk(true).build();
engine.getOutputChannel()
.put(
GetEventStreamResponse.newBuilder()
.setSourceId(engine.getId())
.setControl(controlInfo)
.build());
executor.execute(engine);

boolean startOk = true;
// For backfill source, we need to wait for the streaming source to start before proceeding
if (config.isBackfillSource()) {
var databaseServerName =
config.getResolvedDebeziumProps().getProperty("database.server.name");
startOk =
DbzSourceUtils.waitForStreamingRunning(
config.getSourceType(), databaseServerName);
}

running.set(true);
LOG.info("engine#{} started", engine.getId());
return startOk;
}

public void stop() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

package com.risingwave.connector.source.core;

import com.risingwave.connector.api.source.CdcEngineRunner;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.java.binding.Binding;
import com.risingwave.metrics.ConnectorNodeMetrics;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,7 +29,7 @@

/** handler for starting a debezium source connectors for jni */
public class JniDbzSourceHandler {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceHandler.class);
static final Logger LOG = LoggerFactory.getLogger(JniDbzSourceHandler.class);

private final DbzConnectorConfig config;

Expand Down Expand Up @@ -61,16 +63,22 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long
}

public void start(long channelPtr) {
var runner = DbzCdcEngineRunner.newCdcEngineRunner(config);
var runner = DbzCdcEngineRunner.create(config);
if (runner == null) {
return;
}

try {
// Start the engine
runner.start();
LOG.info("Start consuming events of table {}", config.getSourceId());
var startOk = runner.start();
if (!sendHandshakeMessage(runner, channelPtr, startOk)) {
LOG.error(
"Failed to send handshake message to channel. sourceId={}",
config.getSourceId());
return;
}

LOG.info("Start consuming events of table {}", config.getSourceId());
while (runner.isRunning()) {
// check whether the send queue has room for new messages
// Thread will block on the channel to get output from engine
Expand Down Expand Up @@ -107,4 +115,25 @@ public void start(long channelPtr) {
}
}
}

private boolean sendHandshakeMessage(CdcEngineRunner runner, long channelPtr, boolean startOk)
throws Exception {
// send a handshake message to notify the Source executor
// if the handshake is not ok, the split reader will return error to source actor
var controlInfo =
GetEventStreamResponse.ControlInfo.newBuilder().setHandshakeOk(startOk).build();

var handshakeMsg =
GetEventStreamResponse.newBuilder()
.setSourceId(config.getSourceId())
.setControl(controlInfo)
.build();
var success = Binding.sendCdcSourceMsgToChannel(channelPtr, handshakeMsg.toByteArray());
if (!success) {
LOG.info(
"Engine#{}: JNI sender broken detected, stop the engine", config.getSourceId());
runner.stop();
}
return success;
}
}
6 changes: 5 additions & 1 deletion src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,15 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
}
});

// wait for the handshake message
if let Some(res) = rx.recv().await {
let resp: GetEventStreamResponse = res?;
let inited = match resp.control {
Some(info) => info.handshake_ok,
None => false,
None => {
tracing::error!(?source_id, "handshake message not received. {:?}", resp);
false
}
};
if !inited {
return Err(anyhow!("failed to start cdc connector"));
Expand Down
Loading

0 comments on commit 7677abc

Please sign in to comment.