diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java index 7231d64ecce14..f4a5aee5c80ac 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java @@ -52,8 +52,8 @@ public final class CDCDataNodeUtils { public static Map> buildDataNodesMap(final ShardingSphereDatabase database, final Collection tableNames) { Optional shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class); Optional singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class); - Map> result = new HashMap<>(); Optional broadcastRule = database.getRuleMetaData().findSingleRule(BroadcastRule.class); + Map> result = new HashMap<>(); // TODO support virtual data source name for (String each : tableNames) { if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) { diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 34f49f7ddb518..86f7bc9b6587c 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -65,7 +65,6 @@ import java.time.LocalDateTime; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -112,17 +111,18 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ log.info("init data begin: {}", LocalDateTime.now()); DataSourceExecuteUtils.execute(dataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft()); DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"})); + DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new Object[]{3})); log.info("init data end: {}", LocalDateTime.now()); try ( Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) { initSchemaAndTable(containerComposer, connection, 0); } - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData(); - final CDCClient cdcClient = buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData); + final CDCClient cdcClient = buildCDCClientAndStart(containerComposer); Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty()); String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString(); containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId)); + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData(); String tableName = dialectDatabaseMetaData.isSchemaAvailable() ? String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME; containerComposer.startIncrementTask(new E2EIncrementalTask(dataSource, tableName, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20)); containerComposer.getIncreaseTaskThread().join(10000L); @@ -141,6 +141,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ containerComposer.getDatabaseType()); assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName); assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address")); + assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_single")); cdcClient.close(); Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> containerComposer.queryForListWithLog("SHOW STREAMING LIST") .stream().noneMatch(each -> Boolean.parseBoolean(each.get("active").toString()))); @@ -164,6 +165,7 @@ private void initSchemaAndTable(final PipelineContainerComposer containerCompose log.info("Create table sql: {}", sql); connection.createStatement().execute(sql); connection.createStatement().execute("CREATE TABLE t_address(id integer primary key, address_name varchar(255))"); + connection.createStatement().execute("CREATE TABLE t_single(id integer primary key)"); if (sleepSeconds > 0) { Awaitility.await().pollDelay(sleepSeconds, TimeUnit.SECONDS).until(() -> true); } @@ -174,16 +176,14 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont containerComposer.getUsername(), containerComposer.getPassword())); } - private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { + private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; CDCClient result = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); result.connect(recordConsumer, new RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> log.error("Server error: {}", serverErrorResult.getErrorMessage())); result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); // TODO add full=false test case later - result.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), - SchemaTable.newBuilder().setTable("t_address").build())), true)); + result.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("*").setSchema("*").build()), true)); return result; }