Skip to content

Commit

Permalink
CDC support broadcast table
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 10, 2023
1 parent 2cb461c commit 4954775
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
5 changes: 5 additions & 0 deletions kernel/data-pipeline/scenario/cdc/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>shardingsphere-data-pipeline-cdc-protocol</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-broadcast-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.shardingsphere</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
Expand All @@ -28,6 +29,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -51,6 +53,7 @@ public static Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphere
Optional<ShardingRule> shardingRule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
Optional<SingleRule> singleRule = database.getRuleMetaData().findSingleRule(SingleRule.class);
Map<String, List<DataNode>> result = new HashMap<>();
Optional<BroadcastRule> broadcastRule = database.getRuleMetaData().findSingleRule(BroadcastRule.class);
// TODO support virtual data source name
for (String each : tableNames) {
if (singleRule.isPresent() && singleRule.get().getAllDataNodes().containsKey(each)) {
Expand All @@ -62,6 +65,10 @@ public static Map<String, List<DataNode>> buildDataNodesMap(final ShardingSphere
result.put(each, tableRule.getActualDataNodes());
continue;
}
if (broadcastRule.isPresent() && broadcastRule.get().findFirstActualTable(each).isPresent()) {
result.put(each, Collections.singletonList(broadcastRule.get().getTableDataNodes().get(each).iterator().next()));
continue;
}
throw new PipelineInvalidParameterException(String.format("Not find actual data nodes of `%s`", each));
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ void assertCDCDataImportSuccess(final PipelineTestParameter testParam) throws SQ
containerComposer.registerStorageUnit(each);
}
createOrderTableRule(containerComposer);
createBroadcastRule(containerComposer);
try (Connection connection = containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(containerComposer, connection, 3);
}
Expand Down Expand Up @@ -153,6 +154,10 @@ private void createOrderTableRule(final PipelineContainerComposer containerCompo
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW SHARDING TABLE RULE t_order").isEmpty());
}

private void createBroadcastRule(final PipelineContainerComposer containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog("CREATE BROADCAST TABLE RULE t_address", 2);
}

private void initSchemaAndTable(final PipelineContainerComposer containerComposer, final Connection connection, final int sleepSeconds) throws SQLException {
containerComposer.createSchema(connection, sleepSeconds);
String sql = containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
Expand Down

0 comments on commit 4954775

Please sign in to comment.