Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): Add MongoDB CDC Source #14966

Merged
merged 27 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public enum SourceTypeE {
MYSQL,
POSTGRES,
CITUS,
MONGODB,
INVALID;

public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) {
Expand All @@ -30,6 +31,8 @@ public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) {
return SourceTypeE.POSTGRES;
case CITUS:
return SourceTypeE.CITUS;
case MONGODB:
return SourceTypeE.MONGODB;
default:
return SourceTypeE.INVALID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,35 @@ public static ConnectorServiceProto.ValidateSourceResponse validateResponse(Stri
.build();
}

public static void ensurePropNotBlank(Map<String, String> props, String name) {
private static void ensurePropNotBlank(Map<String, String> props, String name) {
if (StringUtils.isBlank(props.get(name))) {
throw ValidatorUtils.invalidArgument(
String.format("'%s' not found, please check the WITH properties", name));
}
}

public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception {
var props = request.getPropertiesMap();

static void ensureRequiredProps(Map<String, String> props, boolean isMultiTableShared) {
ensurePropNotBlank(props, DbzConnectorConfig.HOST);
ensurePropNotBlank(props, DbzConnectorConfig.PORT);
ensurePropNotBlank(props, DbzConnectorConfig.DB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.USER);
ensurePropNotBlank(props, DbzConnectorConfig.PASSWORD);

var commonParam = request.getCommonParam();
boolean isMultiTableShared = commonParam.getIsMultiTableShared();
// ensure table name is passed by user in non-sharing mode
if (!isMultiTableShared) {
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
}
}

public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception {
var props = request.getPropertiesMap();
var commonParam = request.getCommonParam();
boolean isMultiTableShared = commonParam.getIsMultiTableShared();

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensureRequiredProps(props, isMultiTableShared);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
Expand All @@ -100,6 +102,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
break;

case CITUS:
ensureRequiredProps(props, isMultiTableShared);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
Expand Down Expand Up @@ -128,11 +131,17 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

break;
case MYSQL:
ensureRequiredProps(props, isMultiTableShared);
ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID);
try (var validator = new MySqlValidator(props, tableSchema)) {
validator.validateAll(isMultiTableShared);
}
break;
case MONGODB:
ensurePropNotBlank(props, DbzConnectorConfig.MongoDb.MONGO_URL);
ensurePropNotBlank(props, DbzConnectorConfig.MongoDb.MONGO_COLLECTION_NAME);
// TODO: validate mongodb connectivity and replica set config
break;
default:
LOG.warn("Unknown source type");
throw ValidatorUtils.invalidArgument("Unknown source type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,18 @@ public class DbzConnectorConfig {
private static final String DBZ_CONFIG_FILE = "debezium.properties";
private static final String MYSQL_CONFIG_FILE = "mysql.properties";
private static final String POSTGRES_CONFIG_FILE = "postgres.properties";
private static final String MONGODB_CONFIG_FILE = "mongodb.properties";

private static final String DBZ_PROPERTY_PREFIX = "debezium.";

private static final String SNAPSHOT_MODE_KEY = "debezium.snapshot.mode";
private static final String SNAPSHOT_MODE_BACKFILL = "rw_cdc_backfill";

public static class MongoDb {
public static final String MONGO_URL = "mongodb.url";
public static final String MONGO_COLLECTION_NAME = "collection.name";
Comment on lines +73 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add some auth related options and try to test with a vendor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

username and password can be provided via the connection string (mongodb.url) if needed.

}

private static Map<String, String> extractDebeziumProperties(
Map<String, String> userProperties) {
// retain only debezium properties if any
Expand Down Expand Up @@ -217,6 +223,18 @@ public DbzConnectorConfig(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}
dbzProps.putAll(postgresProps);
} else if (source == SourceTypeE.MONGODB) {
var mongodbProps = initiateDbConfig(MONGODB_CONFIG_FILE, substitutor);

// if snapshot phase is finished and offset is specified, we will continue reading
// changes from the given offset
if (snapshotDone && null != startOffset && !startOffset.isBlank()) {
mongodbProps.setProperty("snapshot.mode", "never");
mongodbProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}
dbzProps.putAll(mongodbProps);

} else {
throw new RuntimeException("unsupported source type: " + source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public static CdcEngineRunner newCdcEngineRunner(
config.getResolvedDebeziumProps(),
(success, message, error) -> {
if (!success) {
responseObserver.onError(error);
LOG.error(
"engine#{} terminated with error. message: {}",
sourceId,
message,
error);
if (error != null) {
responseObserver.onError(error);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any motivation for this change? In what situation will it failed with error == null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For defensive of nullptr and match the implementation of the following create(DbzConnectorConfig config, long channelPtr) method.
Actually newCdcEngineRunner only used in unit test right now, since we replaced with embeded jvm.

} else {
LOG.info("engine#{} stopped normally. {}", sourceId, message);
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class DbzCdcEventConsumer

private final BlockingQueue<GetEventStreamResponse> outputChannel;
private final long sourceId;
private final JsonConverter converter;
private final JsonConverter payloadConverter;
private final JsonConverter keyConverter;
private final String heartbeatTopicPrefix;
private final String transactionTopic;

Expand All @@ -64,14 +65,19 @@ public class DbzCdcEventConsumer
// The default JSON converter will output the schema field in the JSON which is unnecessary
// to source parser, we use a customized JSON converter to avoid outputting the `schema`
// field.
var jsonConverter = new DbzJsonConverter();
var payloadConverter = new DbzJsonConverter();
final HashMap<String, Object> configs = new HashMap<>(2);
// only serialize the value part
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
// include record schema to output JSON in { "schema": { ... }, "payload": { ... } } format
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true);
jsonConverter.configure(configs);
this.converter = jsonConverter;
payloadConverter.configure(configs);
this.payloadConverter = payloadConverter;

var keyConverter = new DbzJsonConverter();
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
keyConverter.configure(configs);
this.keyConverter = keyConverter;
}

private EventType getEventType(SourceRecord record) {
Expand Down Expand Up @@ -137,7 +143,7 @@ var record = event.value();
{
long trxTs = ((Struct) record.value()).getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
Expand All @@ -152,8 +158,9 @@ var record = event.value();
case DATA:
{
// Topic naming conventions
// - PG: serverName.schemaName.tableName
// - MySQL: serverName.databaseName.tableName
// - PG: topicPrefix.schemaName.tableName
// - MySQL: topicPrefix.databaseName.tableName
// - Mongo: topicPrefix.databaseName.collectionName
// We can extract the full table name from the topic
var fullTableName =
record.topic().substring(record.topic().indexOf('.') + 1);
Expand All @@ -169,15 +176,23 @@ var record = event.value();
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
byte[] key =
keyConverter.fromConnectData(
record.topic(), record.keySchema(), record.key());
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setKey(new String(key, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug("record => {}", message.getPayload());
LOG.debug(
"offset => {}, key => {}, payload => {}",
message.getOffset(),
message.getKey(),
message.getPayload());
respBuilder.addEvents(message);
break;
}
Expand All @@ -189,6 +204,7 @@ var record = event.value();
committer.markProcessed(event);
}

LOG.debug("recv {} events", respBuilder.getEventsCount());
// skip empty batch
if (respBuilder.getEventsCount() > 0) {
respBuilder.setSourceId(sourceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# configs for postgres conneoctor
connector.class=io.debezium.connector.mongodb.MongoDbConnector
# default snapshot mode to initial
snapshot.mode=${debezium.snapshot.mode:-initial}
mongodb.connection.string=${mongodb.url}
collection.include.list=${collection.name}
# default heartbeat interval 5 mins
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
# In sharing cdc source mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
# TODO: set this field in the code
name=${collection.name:-RW_CDC_Sharing}
provide.transaction.metadata=${transactional:-false}
# update event messages include the full document
capture.mode=${debezium.capture.mode:-change_streams_update_full}
# disable tombstones event
tombstones.on.delete=${debezium.tombstones.on.delete:-false}
4 changes: 4 additions & 0 deletions java/connector-node/risingwave-source-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
</dependency>

<!-- database dependencies -->
<dependency>
Expand Down
33 changes: 18 additions & 15 deletions java/connector-node/risingwave-source-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,6 @@
<artifactId>connector-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave</groupId>
<artifactId>s3-common</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
Expand All @@ -46,11 +31,19 @@
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand All @@ -75,6 +68,16 @@
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -152,6 +153,25 @@ protected Iterator<ConnectorServiceProto.GetEventStreamResponse> getEventStreamS
return responses;
}

public Iterator<ConnectorServiceProto.GetEventStreamResponse> getEventStream(
ConnectorServiceProto.SourceType sourceType,
long sourceId,
Map<String, String> properties) {
ConnectorServiceProto.GetEventStreamRequest req =
ConnectorServiceProto.GetEventStreamRequest.newBuilder()
.setSourceId(sourceId)
.setSourceType(sourceType)
.putAllProperties(properties)
.build();
Iterator<ConnectorServiceProto.GetEventStreamResponse> responses = null;
try {
responses = blockingStub.getEventStream(req);
} catch (StatusRuntimeException e) {
fail("RPC failed: {}", e.getStatus());
}
return responses;
}

// generates an orders.tbl in class path using random data
// if file does not contain 10000 lines
static void genOrdersTable(int numRows) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
rootLogger.level=ERROR
rootLogger.level=INFO
# declare the appender to use
appenders=console
# appender properties
Expand Down
13 changes: 12 additions & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -161,6 +161,11 @@
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down Expand Up @@ -348,6 +353,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Loading
Loading