Skip to content

Commit

Permalink
impl validator
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Feb 20, 2024
1 parent a3f3f54 commit f53cf2a
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re
case MONGODB:
ensurePropNotBlank(props, DbzConnectorConfig.MongoDb.MONGO_URL);
ensurePropNotBlank(props, DbzConnectorConfig.MongoDb.MONGO_COLLECTION_NAME);
// TODO: validate mongodb connectivity and replica set config
var validator = new MongoDbValidator(props);
validator.validateDbConfig();
break;
default:
LOG.warn("Unknown source type");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.risingwave.connector.source.common;

import com.mongodb.ConnectionString;
import com.risingwave.connector.api.source.SourceTypeE;
import com.risingwave.connector.cdc.debezium.internal.ConfigurableOffsetBackingStore;
import java.io.IOException;
Expand Down Expand Up @@ -233,6 +234,15 @@ public DbzConnectorConfig(
mongodbProps.setProperty(
ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset);
}

var mongodbUrl = userProps.get("mongodb.url");
var collection = userProps.get("collection.name");
var connectionStr = new ConnectionString(mongodbUrl);
var connectorName =
String.format(
"MongoDB_%d:%s:%s", sourceId, connectionStr.getHosts(), collection);
mongodbProps.setProperty("name", connectorName);

dbzProps.putAll(mongodbProps);

} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.source.common;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbValidator extends DatabaseValidator {
private static final Logger LOG = LoggerFactory.getLogger(MongoDbValidator.class);

String mongodbUrl;

public MongoDbValidator(Map<String, String> userProps) {
this.mongodbUrl = userProps.get("mongodb.url");
}

@Override
public void validateDbConfig() {
// check connectivity
try (MongoClient mongoClient = MongoClients.create(mongodbUrl)) {
var desc = mongoClient.getClusterDescription();
LOG.info("MongoDB cluster description: {}", desc);
}
}

@Override
void validateUserPrivilege() {}

@Override
void validateTable() {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ mongodb.connection.string=${mongodb.url}
collection.include.list=${collection.name}
# default heartbeat interval 5 mins
heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-300000}
# In sharing cdc source mode, we will subscribe to multiple tables in the given database,
# so here we set ${table.name} to a default value `RW_CDC_Sharing` just for display.
# TODO: set this field in the code
name=${collection.name:-RW_CDC_Sharing}
name=${collection.name}
provide.transaction.metadata=${transactional:-false}
# update event messages include the full document
capture.mode=${debezium.capture.mode:-change_streams_update_full}
Expand Down
4 changes: 4 additions & 0 deletions java/connector-node/risingwave-source-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,9 @@
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
</dependencies>
</project>
12 changes: 10 additions & 2 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
<datastax.version>4.15.0</datastax.version>
<flink.version>1.18.0</flink.version>
<testcontainers.version>1.17.6</testcontainers.version>
<postgresql.version>42.5.1</postgresql.version>
<mysql.connector.java.version>8.0.28</mysql.connector.java.version>
<mongodb.driver.sync.version>4.11.1</mongodb.driver.sync.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -169,12 +172,17 @@
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.1</version>
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
<version>${mysql.connector.java.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>${mongodb.driver.sync.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
Expand Down

0 comments on commit f53cf2a

Please sign in to comment.