Skip to content

Commit

Permalink
Add single table streaming case at CDC E2E (#29022)
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy authored Nov 13, 2023
1 parent b945324 commit b6d11fe
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public final class CDCDataNodeUtils {
public static Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphereDatabase database, final Collection<String> tableNames) {
Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
Map<String, List<DataNode>> result = new HashMap<>();
Optional<BroadcastRule> broadcastRule = database.getRuleMetaData().findSingleRule(BroadcastRule.class);
Map<String, List<DataNode>> result = new HashMap<>();
// TODO support virtual data source name
for (String each : tableNames) {
if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -112,17 +111,18 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
log.info("init data begin: {}", LocalDateTime.now());
DataSourceExecuteUtils.execute(dataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft());
DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"}));
DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new Object[]{3}));
log.info("init data end: {}", LocalDateTime.now());
try (
Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false),
containerComposer.getUsername(), containerComposer.getPassword())) {
initSchemaAndTable(containerComposer, connection, 0);
}
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
final CDCClient cdcClient = buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData);
final CDCClient cdcClient = buildCDCClientAndStart(containerComposer);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty());
String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString();
containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId));
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
String tableName = dialectDatabaseMetaData.isSchemaAvailable() ? String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME;
containerComposer.startIncrementTask(new E2EIncrementalTask(dataSource, tableName, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20));
containerComposer.getIncreaseTaskThread().join(10000L);
Expand All @@ -141,6 +141,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
containerComposer.getDatabaseType());
assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName);
assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address"));
assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_single"));
cdcClient.close();
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> containerComposer.queryForListWithLog("SHOW STREAMING LIST")
.stream().noneMatch(each -> Boolean.parseBoolean(each.get("active").toString())));
Expand All @@ -164,6 +165,7 @@ private void initSchemaAndTable(final PipelineContainerComposer containerCompose
log.info("Create table sql: {}", sql);
connection.createStatement().execute(sql);
connection.createStatement().execute("CREATE TABLE t_address(id integer primary key, address_name varchar(255))");
connection.createStatement().execute("CREATE TABLE t_single(id integer primary key)");
if (sleepSeconds > 0) {
Awaitility.await().pollDelay(sleepSeconds, TimeUnit.SECONDS).until(() -> true);
}
Expand All @@ -174,16 +176,14 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont
containerComposer.getUsername(), containerComposer.getPassword()));
}

private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) {
private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer) {
DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : "";
CDCClient result = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000));
result.connect(recordConsumer, new RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> log.error("Server error: {}", serverErrorResult.getErrorMessage()));
result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD));
// TODO add full=false test case later
result.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(),
SchemaTable.newBuilder().setTable("t_address").build())), true));
result.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("*").setSchema("*").build()), true));
return result;
}

Expand Down

0 comments on commit b6d11fe

Please sign in to comment.