Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cdc): Add MongoDB CDC Source #14966

Merged
merged 27 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public enum SourceTypeE {
MYSQL,
POSTGRES,
CITUS,
MONGODB,
INVALID;

public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) {
Expand All @@ -30,6 +31,8 @@ public static SourceTypeE valueOf(ConnectorServiceProto.SourceType type) {
return SourceTypeE.POSTGRES;
case CITUS:
return SourceTypeE.CITUS;
case MONGODB:
return SourceTypeE.MONGODB;
default:
return SourceTypeE.INVALID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,33 +62,35 @@ public static ConnectorServiceProto.ValidateSourceResponse validateResponse(Stri
.build();
}

public static void ensurePropNotBlank(Map<String, String> props, String name) {
private static void ensurePropNotBlank(Map<String, String> props, String name) {
if (StringUtils.isBlank(props.get(name))) {
throw ValidatorUtils.invalidArgument(
String.format("'%s' not found, please check the WITH properties", name));
}
}

public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception {
var props = request.getPropertiesMap();

static void ensureRequiredProps(Map<String, String> props, boolean isMultiTableShared) {
ensurePropNotBlank(props, DbzConnectorConfig.HOST);
ensurePropNotBlank(props, DbzConnectorConfig.PORT);
ensurePropNotBlank(props, DbzConnectorConfig.DB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.USER);
ensurePropNotBlank(props, DbzConnectorConfig.PASSWORD);

var commonParam = request.getCommonParam();
boolean isMultiTableShared = commonParam.getIsMultiTableShared();
// ensure table name is passed by user in non-sharing mode
if (!isMultiTableShared) {
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
}
}

public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception {
var props = request.getPropertiesMap();
var commonParam = request.getCommonParam();
boolean isMultiTableShared = commonParam.getIsMultiTableShared();

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensureRequiredProps(props, isMultiTableShared);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
Expand All @@ -100,6 +102,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
break;

case CITUS:
ensureRequiredProps(props, isMultiTableShared);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
Expand Down Expand Up @@ -128,11 +131,18 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

break;
case MYSQL:
ensureRequiredProps(props, isMultiTableShared);
ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID);
try (var validator = new MySqlValidator(props, tableSchema)) {
validator.validateAll(isMultiTableShared);
}
break;
case MONGODB:
ensurePropNotBlank(props, DbzConnectorConfig.MongoDb.MONGO_URL);
ensurePropNotBlank(props, DbzConnectorConfig.MongoDb.MONGO_COLLECTION_NAME);
var validator = new MongoDbValidator(props);
validator.validateDbConfig();
break;
default:
LOG.warn("Unknown source type");
throw ValidatorUtils.invalidArgument("Unknown source type");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.risingwave.connector.source.common;

import com.mongodb.ConnectionString;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore;
import java.io.IOException;
Expand Down Expand Up @@ -61,12 +62,18 @@ public class DbzConnectorConfig {
private static final String DBZ_CONFIG_FILE = "debezium.properties";
private static final String MYSQL_CONFIG_FILE = "mysql.properties";
private static final String POSTGRES_CONFIG_FILE = "postgres.properties";
private static final String MONGODB_CONFIG_FILE = "mongodb.properties";

private static final String DBZ_PROPERTY_PREFIX = "debezium.";

private static final String SNAPSHOT_MODE_KEY = "debezium.snapshot.mode";
private static final String SNAPSHOT_MODE_BACKFILL = "rw_cdc_backfill";

public static class MongoDb {
public static final String MONGO_URL = "mongodb.url";
public static final String MONGO_COLLECTION_NAME = "collection.name";
Comment on lines +73 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add some auth related options and try to test with a vendor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

username and password can be provided via the connection string (mongodb.url) if needed.

}

private static Map<String, String> extractDebeziumProperties(
Map<String, String> userProperties) {
// retain only debezium properties if any
Expand Down Expand Up @@ -217,6 +224,27 @@ public DbzConnectorConfig(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}
dbzProps.putAll(postgresProps);
} else if (source == SourceTypeE.MONGODB) {
var mongodbProps = initiateDbConfig(MONGODB_CONFIG_FILE, substitutor);

// if snapshot phase is finished and offset is specified, we will continue reading
// changes from the given offset
if (snapshotDone && null != startOffset && !startOffset.isBlank()) {
mongodbProps.setProperty("snapshot.mode", "never");
mongodbProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}

var mongodbUrl = userProps.get("mongodb.url");
var collection = userProps.get("collection.name");
var connectionStr = new ConnectionString(mongodbUrl);
var connectorName =
String.format(
"MongoDB_%d:%s:%s", sourceId, connectionStr.getHosts(), collection);
mongodbProps.setProperty("name", connectorName);

dbzProps.putAll(mongodbProps);

} else {
throw new RuntimeException("unsupported source type: " + source);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.common;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbValidator extends DatabaseValidator {
private static final Logger LOG = LoggerFactory.getLogger(MongoDbValidator.class);

String mongodbUrl;

public MongoDbValidator(Map<String, String> userProps) {
this.mongodbUrl = userProps.get("mongodb.url");
}

@Override
public void validateDbConfig() {
// check connectivity
try (MongoClient mongoClient = MongoClients.create(mongodbUrl)) {
var desc = mongoClient.getClusterDescription();
LOG.info("test connectivity: MongoDB cluster description: {}", desc);
}
}

@Override
void validateUserPrivilege() {
// TODO: check user privilege
// https://debezium.io/documentation/reference/stable/connectors/mongodb.html#setting-up-mongodb
// You must also have a MongoDB user that has the appropriate roles to read the admin
// database where the oplog can be read. Additionally, the user must also be able to read
// the config database in the configuration server of a sharded cluster and must have
// listDatabases privilege action. When change streams are used (the default) the user also
// must have cluster-wide privilege actions find and changeStream.
}

@Override
void validateTable() {
// do nothing since MongoDB is schemaless
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public static CdcEngineRunner newCdcEngineRunner(
config.getResolvedDebeziumProps(),
(success, message, error) -> {
if (!success) {
responseObserver.onError(error);
LOG.error(
"engine#{} terminated with error. message: {}",
sourceId,
message,
error);
if (error != null) {
responseObserver.onError(error);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any motivation for this change? In what situation will it failed with error == null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For defensive of nullptr and match the implementation of the following create(DbzConnectorConfig config, long channelPtr) method.
Actually newCdcEngineRunner only used in unit test right now, since we replaced with embeded jvm.

} else {
LOG.info("engine#{} stopped normally. {}", sourceId, message);
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class DbzCdcEventConsumer

private final BlockingQueue<GetEventStreamResponse> outputChannel;
private final long sourceId;
private final JsonConverter converter;
private final JsonConverter payloadConverter;
private final JsonConverter keyConverter;
private final String heartbeatTopicPrefix;
private final String transactionTopic;

Expand All @@ -64,14 +65,19 @@ public class DbzCdcEventConsumer
// The default JSON converter will output the schema field in the JSON which is unnecessary
// to source parser, we use a customized JSON converter to avoid outputting the `schema`
// field.
var jsonConverter = new DbzJsonConverter();
var payloadConverter = new DbzJsonConverter();
final HashMap<String, Object> configs = new HashMap<>(2);
// only serialize the value part
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
// include record schema to output JSON in { "schema": { ... }, "payload": { ... } } format
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, true);
jsonConverter.configure(configs);
this.converter = jsonConverter;
payloadConverter.configure(configs);
this.payloadConverter = payloadConverter;

var keyConverter = new DbzJsonConverter();
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
keyConverter.configure(configs);
this.keyConverter = keyConverter;
}

private EventType getEventType(SourceRecord record) {
Expand Down Expand Up @@ -137,7 +143,7 @@ var record = event.value();
{
long trxTs = ((Struct) record.value()).getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
var message =
msgBuilder
Expand All @@ -152,8 +158,9 @@ var record = event.value();
case DATA:
{
// Topic naming conventions
// - PG: serverName.schemaName.tableName
// - MySQL: serverName.databaseName.tableName
// - PG: topicPrefix.schemaName.tableName
// - MySQL: topicPrefix.databaseName.tableName
// - Mongo: topicPrefix.databaseName.collectionName
// We can extract the full table name from the topic
var fullTableName =
record.topic().substring(record.topic().indexOf('.') + 1);
Expand All @@ -169,15 +176,23 @@ var record = event.value();
? System.currentTimeMillis()
: sourceStruct.getInt64("ts_ms");
byte[] payload =
converter.fromConnectData(
payloadConverter.fromConnectData(
record.topic(), record.valueSchema(), record.value());
byte[] key =
keyConverter.fromConnectData(
record.topic(), record.keySchema(), record.key());
var message =
msgBuilder
.setFullTableName(fullTableName)
.setPayload(new String(payload, StandardCharsets.UTF_8))
.setKey(new String(key, StandardCharsets.UTF_8))
.setSourceTsMs(sourceTsMs)
.build();
LOG.debug("record => {}", message.getPayload());
LOG.debug(
"offset => {}, key => {}, payload => {}",
message.getOffset(),
message.getKey(),
message.getPayload());
respBuilder.addEvents(message);
break;
}
Expand All @@ -189,6 +204,7 @@ var record = event.value();
committer.markProcessed(event);
}

LOG.debug("recv {} events", respBuilder.getEventsCount());
// skip empty batch
if (respBuilder.getEventsCount() > 0) {
respBuilder.setSourceId(sourceId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# configs for postgres conneoctor
connector.class=io.debezium.connector.mongodb.MongoDbConnector
# default snapshot mode to initial
snapshot.mode=${debezium.snapshot.mode:-initial}
mongodb.connection.string=${mongodb.url}
collection.include.list=${collection.name}
# default heartbeat interval 5 mins
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
# TODO: set this field in the code
name=${collection.name}
provide.transaction.metadata=${transactional:-false}
# update event messages include the full document
capture.mode=${debezium.capture.mode:-change_streams_update_full}
# disable tombstones event
tombstones.on.delete=${debezium.tombstones.on.delete:-false}
8 changes: 8 additions & 0 deletions java/connector-node/risingwave-source-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
</dependency>

<!-- database dependencies -->
<dependency>
Expand All @@ -58,5 +62,9 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
</dependencies>
</project>
Loading
Loading