From 414e5f53a020d9e92b04032d9ecb88286d2703ad Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Mon, 13 Nov 2023 10:58:25 +0800 Subject: [PATCH] Add single table streaming case at CDC E2E --- .../data/pipeline/cdc/util/CDCDataNodeUtils.java | 2 +- .../org/apache/shardingsphere/proxy/Bootstrap.java | 2 ++ .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 14 +++++++------- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java index 7231d64ecce142..f4a5aee5c80ac4 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java @@ -52,8 +52,8 @@ public final class CDCDataNodeUtils { public static Map> buildDataNodesMap(final ShardingSphereDatabase database, final Collection tableNames) { Optional shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class); Optional singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class); - Map> result = new HashMap<>(); Optional broadcastRule = database.getRuleMetaData().findSingleRule(BroadcastRule.class); + Map> result = new HashMap<>(); // TODO support virtual data source name for (String each : tableNames) { if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) { diff --git a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java index 620a6b5a0422be..e249ab3a96d32b 100644 --- a/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java +++ b/proxy/bootstrap/src/main/java/org/apache/shardingsphere/proxy/Bootstrap.java @@ -33,6 +33,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Optional; +import java.util.TimeZone; /** * ShardingSphere-Proxy Bootstrap. @@ -48,6 +49,7 @@ public final class Bootstrap { * @throws SQLException SQL exception */ public static void main(final String[] args) throws IOException, SQLException { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); BootstrapArguments bootstrapArgs = new BootstrapArguments(args); YamlProxyConfiguration yamlConfig = ProxyConfigurationLoader.load(bootstrapArgs.getConfigurationPath()); int port = bootstrapArgs.getPort().orElseGet(() -> new ConfigurationProperties(yamlConfig.getServerConfiguration().getProps()).getValue(ConfigurationPropertyKey.PROXY_DEFAULT_PORT)); diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 34f49f7ddb518e..86f7bc9b6587c6 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -65,7 +65,6 @@ import java.time.LocalDateTime; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -112,17 +111,18 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ log.info("init data begin: {}", LocalDateTime.now()); DataSourceExecuteUtils.execute(dataSource, containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME), dataPair.getLeft()); DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"}, new Object[]{2, "b"})); + DataSourceExecuteUtils.execute(dataSource, "INSERT INTO t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new Object[]{3})); log.info("init data end: {}", LocalDateTime.now()); try ( Connection connection = DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, false), containerComposer.getUsername(), containerComposer.getPassword())) { initSchemaAndTable(containerComposer, connection, 0); } - DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData(); - final CDCClient cdcClient = buildCDCClientAndStart(containerComposer, dialectDatabaseMetaData); + final CDCClient cdcClient = buildCDCClientAndStart(containerComposer); Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW STREAMING LIST").isEmpty()); String jobId = containerComposer.queryForListWithLog("SHOW STREAMING LIST").get(0).get("id").toString(); containerComposer.waitIncrementTaskFinished(String.format("SHOW STREAMING STATUS '%s'", jobId)); + DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData(); String tableName = dialectDatabaseMetaData.isSchemaAvailable() ? String.join(".", "test", SOURCE_TABLE_NAME) : SOURCE_TABLE_NAME; containerComposer.startIncrementTask(new E2EIncrementalTask(dataSource, tableName, new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20)); containerComposer.getIncreaseTaskThread().join(10000L); @@ -141,6 +141,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ containerComposer.getDatabaseType()); assertDataMatched(sourceDataSource, targetDataSource, orderSchemaTableName); assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_address")); + assertDataMatched(sourceDataSource, targetDataSource, new CaseInsensitiveQualifiedTable(null, "t_single")); cdcClient.close(); Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> containerComposer.queryForListWithLog("SHOW STREAMING LIST") .stream().noneMatch(each -> Boolean.parseBoolean(each.get("active").toString()))); @@ -164,6 +165,7 @@ private void initSchemaAndTable(final PipelineContainerComposer containerCompose log.info("Create table sql: {}", sql); connection.createStatement().execute(sql); connection.createStatement().execute("CREATE TABLE t_address(id integer primary key, address_name varchar(255))"); + connection.createStatement().execute("CREATE TABLE t_single(id integer primary key)"); if (sleepSeconds > 0) { Awaitility.await().pollDelay(sleepSeconds, TimeUnit.SECONDS).until(() -> true); } @@ -174,16 +176,14 @@ private DataSource createStandardDataSource(final PipelineContainerComposer cont containerComposer.getUsername(), containerComposer.getPassword())); } - private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer, final DialectDatabaseMetaData dialectDatabaseMetaData) { + private CDCClient buildCDCClientAndStart(final PipelineContainerComposer containerComposer) { DataSource dataSource = createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4); DataSourceRecordConsumer recordConsumer = new DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType()); - String schema = dialectDatabaseMetaData.isSchemaAvailable() ? "test" : ""; CDCClient result = new CDCClient(new CDCClientConfiguration("localhost", containerComposer.getContainerComposer().getProxyCDCPort(), 5000)); result.connect(recordConsumer, new RetryStreamingExceptionHandler(result, 5, 5000), (ctx, serverErrorResult) -> log.error("Server error: {}", serverErrorResult.getErrorMessage())); result.login(new CDCLoginParameter(ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)); // TODO add full=false test case later - result.startStreaming(new StartStreamingParameter("sharding_db", new HashSet<>(Arrays.asList(SchemaTable.newBuilder().setTable(SOURCE_TABLE_NAME).setSchema(schema).build(), - SchemaTable.newBuilder().setTable("t_address").build())), true)); + result.startStreaming(new StartStreamingParameter("sharding_db", Collections.singleton(SchemaTable.newBuilder().setTable("*").setSchema("*").build()), true)); return result; }