From 4954775cf14843865fffaef58503e1280fa27460 Mon Sep 17 00:00:00 2001 From: azexcy <13588031592@qq.com> Date: Fri, 10 Nov 2023 16:37:47 +0800 Subject: [PATCH] CDC support broadcast table --- kernel/data-pipeline/scenario/cdc/core/pom.xml | 5 +++++ .../data/pipeline/cdc/util/CDCDataNodeUtils.java | 7 +++++++ .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 5 +++++ 3 files changed, 17 insertions(+) diff --git a/kernel/data-pipeline/scenario/cdc/core/pom.xml b/kernel/data-pipeline/scenario/cdc/core/pom.xml index 0baab795693ed..57140ae56d222 100644 --- a/kernel/data-pipeline/scenario/cdc/core/pom.xml +++ b/kernel/data-pipeline/scenario/cdc/core/pom.xml @@ -37,6 +37,11 @@ shardingsphere-data-pipeline-cdc-protocol ${project.version} + + org.apache.shardingsphere + shardingsphere-broadcast-core + ${project.version} + org.apache.shardingsphere diff --git a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java index ebd2fdcb73ffc..7231d64ecce14 100644 --- a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java +++ b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataNodeUtils.java @@ -19,6 +19,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.shardingsphere.broadcast.rule.BroadcastRule; import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException; import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; @@ -28,6 +29,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +53,7 @@ public static Map> buildDataNodesMap(final ShardingSphere Optional shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class); Optional singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class); Map> result = new HashMap<>(); + Optional broadcastRule = database.getRuleMetaData().findSingleRule(BroadcastRule.class); // TODO support virtual data source name for (String each : tableNames) { if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) { @@ -62,6 +65,10 @@ public static Map> buildDataNodesMap(final ShardingSphere result.put(each, tableRule.getActualDataNodes()); continue; } + if (broadcastRule.isPresent() && broadcastRule.get().findFirstActualTable(each).isPresent()) { + result.put(each, Collections.singletonList(broadcastRule.get().getTableDataNodes().get(each).iterator().next())); + continue; + } throw new PipelineInvalidParameterException(String.format("Not find actual data nodes of `%s`", each)); } return result; diff --git a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java index 5d0d7dea41711..34f49f7ddb518 100644 --- a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java +++ b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java @@ -103,6 +103,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ containerComposer.registerStorageUnit(each); } createOrderTableRule(containerComposer); + createBroadcastRule(containerComposer); try (Connection connection = containerComposer.getProxyDataSource().getConnection()) { initSchemaAndTable(containerComposer, connection, 3); } @@ -153,6 +154,10 @@ private void createOrderTableRule(final PipelineContainerComposer containerCompo Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty()); } + private void createBroadcastRule(final PipelineContainerComposer containerComposer) throws SQLException { + containerComposer.proxyExecuteWithLog("CREATE BROADCAST TABLE RULE t_address", 2); + } + private void initSchemaAndTable(final PipelineContainerComposer containerComposer, final Connection connection, final int sleepSeconds) throws SQLException { containerComposer.createSchema(connection, sleepSeconds); String sql = containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);