Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the problem of broadcast table execution error when readwrite_splitting rule exist #29025

Merged
merged 2 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private static boolean isDCLForSingleTable(final SQLStatementContext sqlStatemen

private void routeToAllDatabaseInstance(final RouteContext routeContext, final ShardingSphereDatabase database, final BroadcastRule broadcastRule) {
routeContext.getRouteUnits().clear();
for (String each : broadcastRule.getAvailableDataSourceNames()) {
for (String each : broadcastRule.getDataSourceNames()) {
if (database.getResourceMetaData().getAllInstanceDataSourceNames().contains(each)) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
Expand All @@ -152,7 +152,7 @@ private void routeToAllDatabaseInstance(final RouteContext routeContext, final S

private void routeToAllDatabase(final RouteContext routeContext, final BroadcastRule broadcastRule) {
routeContext.getRouteUnits().clear();
for (String each : broadcastRule.getAvailableDataSourceNames()) {
for (String each : broadcastRule.getDataSourceNames()) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class BroadcastDatabaseBroadcastRoutingEngine implements BroadcastR

@Override
public RouteContext route(final RouteContext routeContext, final BroadcastRule broadcastRule) {
for (String each : broadcastRule.getAvailableDataSourceNames()) {
for (String each : broadcastRule.getDataSourceNames()) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
return routeContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class BroadcastInstanceBroadcastRoutingEngine implements BroadcastRouteEn
@Override
public RouteContext route(final RouteContext routeContext, final BroadcastRule broadcastRule) {
RouteContext result = new RouteContext();
for (String each : broadcastRule.getAvailableDataSourceNames()) {
for (String each : broadcastRule.getDataSourceNames()) {
if (resourceMetaData.getAllInstanceDataSourceNames().contains(each)) {
result.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.emptyList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public RouteContext route(final RouteContext routeContext, final BroadcastRule b

private RouteContext getRouteContext(final BroadcastRule broadcastRule) {
RouteContext result = new RouteContext();
for (String each : broadcastRule.getAvailableDataSourceNames()) {
for (String each : broadcastRule.getDataSourceNames()) {
result.getRouteUnits().add(new RouteUnit(new RouteMapper(each, each), Collections.singletonList(new RouteMapper("", ""))));
}
return result;
Expand All @@ -58,7 +58,7 @@ private RouteContext getRouteContext(final BroadcastRule broadcastRule) {
private RouteContext getRouteContext(final BroadcastRule broadcastRule, final Collection<String> logicTableNames) {
RouteContext result = new RouteContext();
Collection<RouteMapper> tableRouteMappers = getTableRouteMappers(logicTableNames);
for (String each : broadcastRule.getAvailableDataSourceNames()) {
for (String each : broadcastRule.getDataSourceNames()) {
RouteMapper dataSourceMapper = new RouteMapper(each, each);
result.getRouteUnits().add(new RouteUnit(dataSourceMapper, tableRouteMappers));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class BroadcastUnicastRoutingEngine implements BroadcastRouteEngine

@Override
public RouteContext route(final RouteContext routeContext, final BroadcastRule broadcastRule) {
RouteMapper dataSourceMapper = getDataSourceRouteMapper(broadcastRule.getAvailableDataSourceNames());
RouteMapper dataSourceMapper = getDataSourceRouteMapper(broadcastRule.getDataSourceNames());
routeContext.getRouteUnits().add(new RouteUnit(dataSourceMapper, createTableRouteMappers()));
return routeContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import lombok.Getter;
import org.apache.shardingsphere.broadcast.api.config.BroadcastRuleConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.identifier.scope.DatabaseRule;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.DataSourceContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.TableContainedRule;
import org.apache.shardingsphere.infra.rule.identifier.type.TableNamesMapper;

Expand All @@ -31,6 +33,7 @@
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeSet;
import java.util.stream.Collectors;
Expand All @@ -53,17 +56,39 @@ public final class BroadcastRule implements DatabaseRule, DataNodeContainedRule,

private final TableNamesMapper logicalTableMapper;

public BroadcastRule(final BroadcastRuleConfiguration config, final String databaseName, final Map<String, DataSource> dataSources) {
public BroadcastRule(final BroadcastRuleConfiguration config, final String databaseName, final Map<String, DataSource> dataSources, final Collection<ShardingSphereRule> builtRules) {
configuration = config;
this.databaseName = databaseName;
dataSourceNames = getDataSourceNames(dataSources);
dataSourceNames = getAggregatedDataSourceNames(dataSources, builtRules);
tables = createBroadcastTables(config.getTables());
logicalTableMapper = createTableMapper();
tableDataNodes = createShardingTableDataNodes(dataSourceNames, tables);
}

private Collection<String> getDataSourceNames(final Map<String, DataSource> dataSources) {
return new LinkedList<>(dataSources.keySet());
private Collection<String> getAggregatedDataSourceNames(final Map<String, DataSource> dataSources, final Collection<ShardingSphereRule> builtRules) {
Collection<String> result = new LinkedList<>(dataSources.keySet());
for (ShardingSphereRule each : builtRules) {
if (each instanceof DataSourceContainedRule) {
result = getAggregatedDataSourceNames(result, (DataSourceContainedRule) each);
}
}
return result;
}

private Collection<String> getAggregatedDataSourceNames(final Collection<String> dataSourceNames, final DataSourceContainedRule builtRule) {
Collection<String> result = new LinkedList<>();
for (Entry<String, Collection<String>> entry : builtRule.getDataSourceMapper().entrySet()) {
for (String each : entry.getValue()) {
if (dataSourceNames.contains(each)) {
dataSourceNames.remove(each);
if (!result.contains(entry.getKey())) {
result.add(entry.getKey());
}
}
}
}
result.addAll(dataSourceNames);
return result;
}

private Collection<String> createBroadcastTables(final Collection<String> broadcastTables) {
Expand Down Expand Up @@ -150,14 +175,6 @@ public boolean isAllBroadcastTables(final Collection<String> logicTableNames) {
return !logicTableNames.isEmpty() && tables.containsAll(logicTableNames);
}

/**
* Get available datasource names.
* @return datasource names
*/
public Collection<String> getAvailableDataSourceNames() {
return dataSourceNames;
}

@Override
public TableNamesMapper getLogicTableMapper() {
return logicalTableMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class BroadcastRuleBuilder implements DatabaseRuleBuilder<Broadcast
@Override
public BroadcastRule build(final BroadcastRuleConfiguration config, final String databaseName, final DatabaseType protocolType,
final Map<String, DataSource> dataSources, final Collection<ShardingSphereRule> builtRules, final InstanceContext instanceContext) {
return new BroadcastRule(config, databaseName, dataSources);
return new BroadcastRule(config, databaseName, dataSources, builtRules);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class BroadcastDatabaseBroadcastRoutingEngineTest {
@Test
void assertRoute() {
BroadcastRule broadcastRule = mock(BroadcastRule.class);
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
BroadcastDatabaseBroadcastRoutingEngine engine = new BroadcastDatabaseBroadcastRoutingEngine();
RouteContext routeContext = engine.route(new RouteContext(), broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void assertRoute() {
when(resourceMetaData.getAllInstanceDataSourceNames()).thenReturn(Collections.singleton("ds_0"));
BroadcastInstanceBroadcastRoutingEngine engine = new BroadcastInstanceBroadcastRoutingEngine(resourceMetaData);
BroadcastRule broadcastRule = mock(BroadcastRule.class);
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
RouteContext routeContext = engine.route(new RouteContext(), broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(1));
assertDataSourceRouteMapper(routeContext.getRouteUnits().iterator().next(), "ds_0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void assertRouteWithBroadcastRuleTable() {
Collection<String> broadcastRuleTableNames = Collections.singleton("t_address");
BroadcastTableBroadcastRoutingEngine engine = new BroadcastTableBroadcastRoutingEngine(broadcastRuleTableNames);
BroadcastRule broadcastRule = mock(BroadcastRule.class);
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getBroadcastRuleTableNames(any())).thenReturn(Collections.singleton("t_address"));
RouteContext routeContext = engine.route(new RouteContext(), broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(2));
Expand All @@ -55,7 +55,7 @@ void assertRouteWithoutBroadcastRuleTable() {
Collection<String> broadcastRuleTableNames = Collections.singleton("t_address");
BroadcastTableBroadcastRoutingEngine engine = new BroadcastTableBroadcastRoutingEngine(broadcastRuleTableNames);
BroadcastRule broadcastRule = mock(BroadcastRule.class);
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getBroadcastRuleTableNames(any())).thenReturn(Collections.emptyList());
RouteContext routeContext = engine.route(new RouteContext(), broadcastRule);
assertThat(routeContext.getRouteUnits().size(), is(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class BroadcastUnicastRoutingEngineTest {
@BeforeEach
void setUp() {
broadcastRule = mock(BroadcastRule.class);
when(broadcastRule.getAvailableDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
when(broadcastRule.getDataSourceNames()).thenReturn(Arrays.asList("ds_0", "ds_1"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ private void addBroadcastRuleConfiguration(final BroadcastRuleConfiguration broa
allRuleConfigs.add(broadcastRuleConfig);
database.getRuleMetaData().getRules().add(new BroadcastRule(broadcastRuleConfig, database.getName(),
database.getResourceMetaData().getStorageUnits().entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new))));
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().getDataSource(), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)),
database.getRuleMetaData().getRules()));
}

private void addSingleRuleConfiguration(final SingleRuleConfiguration singleRuleConfig, final Collection<RuleConfiguration> allRuleConfigs, final ShardingSphereDatabase database) {
Expand Down