Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ignore broadcast tables when unregister storage unit #28911

Merged
merged 4 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -106,9 +107,9 @@ public Map<String, Collection<Class<? extends ShardingSphereRule>>> getInUsedSto
Map<String, Collection<Class<? extends ShardingSphereRule>>> result = new LinkedHashMap<>();
for (ShardingSphereRule each : rules) {
if (each instanceof DataSourceContainedRule) {
result.putAll(getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataSourceContainedRule) each)));
mergeInUsedStorageUnitNameAndRules(result, getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataSourceContainedRule) each)));
} else if (each instanceof DataNodeContainedRule) {
result.putAll(getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataNodeContainedRule) each)));
mergeInUsedStorageUnitNameAndRules(result, getInUsedStorageUnitNameAndRulesMap(each, getInUsedStorageUnitNames((DataNodeContainedRule) each)));
}
}
return result;
Expand All @@ -132,4 +133,19 @@ private Collection<String> getInUsedStorageUnitNames(final DataSourceContainedRu
private Collection<String> getInUsedStorageUnitNames(final DataNodeContainedRule rule) {
return rule.getAllDataNodes().values().stream().flatMap(each -> each.stream().map(DataNode::getDataSourceName).collect(Collectors.toSet()).stream()).collect(Collectors.toSet());
}

private void mergeInUsedStorageUnitNameAndRules(final Map<String, Collection<Class<? extends ShardingSphereRule>>> storageUnitNameAndRules,
final Map<String, Collection<Class<? extends ShardingSphereRule>>> toBeMergedStorageUnitNameAndRules) {
for (Entry<String, Collection<Class<? extends ShardingSphereRule>>> entry : toBeMergedStorageUnitNameAndRules.entrySet()) {
if (storageUnitNameAndRules.containsKey(entry.getKey())) {
for (Class<? extends ShardingSphereRule> each : entry.getValue()) {
if (!storageUnitNameAndRules.get(entry.getKey()).contains(each)) {
storageUnitNameAndRules.get(entry.getKey()).add(each);
}
}
} else {
storageUnitNameAndRules.put(entry.getKey(), entry.getValue());
}
}
}
}
4 changes: 4 additions & 0 deletions parser/distsql/engine/src/main/antlr4/imports/Keyword.g4
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,7 @@ CLUSTER
LOCK_STRATEGY
: L O C K UL_ S T R A T E G Y
;

BROADCAST
: B R O A D C A S T
;
8 changes: 5 additions & 3 deletions parser/distsql/engine/src/main/antlr4/imports/RDLStatement.g4
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ alterStorageUnit
;

unregisterStorageUnit
: UNREGISTER STORAGE UNIT ifExists? storageUnitName (COMMA_ storageUnitName)* ignoreSingleTables?
: UNREGISTER STORAGE UNIT ifExists? storageUnitName (COMMA_ storageUnitName)* ignoreTables?
;

storageUnitDefinition
Expand Down Expand Up @@ -67,8 +67,10 @@ password
: STRING_
;

ignoreSingleTables
: IGNORE SINGLE TABLES
ignoreTables
: IGNORE (SINGLE COMMA_ BROADCAST | BROADCAST COMMA_ SINGLE) TABLES # ignoreSingleAndBroadcastTables
| IGNORE SINGLE TABLES # ignoreSingleTables
| IGNORE BROADCAST TABLES # ignoreBroadcastTables
;

ifExists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ExportMetaDataContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ExportStorageNodesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.FromSegmentContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.IgnoreBroadcastTablesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.IgnoreSingleAndBroadcastTablesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.IgnoreSingleTablesContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ImportDatabaseConfigurationContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.ImportMetaDataContext;
import org.apache.shardingsphere.distsql.parser.autogen.KernelDistSQLStatementParser.InstanceIdContext;
Expand Down Expand Up @@ -206,9 +209,11 @@ private Properties getProperties(final PropertiesDefinitionContext ctx) {

@Override
public ASTNode visitUnregisterStorageUnit(final UnregisterStorageUnitContext ctx) {
boolean ignoreSingleTables = null != ctx.ignoreSingleTables();
boolean ignoreSingleTables = ctx.ignoreTables() instanceof IgnoreSingleAndBroadcastTablesContext || ctx.ignoreTables() instanceof IgnoreSingleTablesContext;
boolean ignoreBroadcastTables = ctx.ignoreTables() instanceof IgnoreSingleAndBroadcastTablesContext || ctx.ignoreTables() instanceof IgnoreBroadcastTablesContext;
return new UnregisterStorageUnitStatement(null != ctx.ifExists(),
ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()), ignoreSingleTables);
ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()),
ignoreSingleTables, ignoreBroadcastTables);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public final class UnregisterStorageUnitStatement extends StorageUnitDefinitionS

private final boolean ignoreSingleTables;

public UnregisterStorageUnitStatement(final Collection<String> storageUnitNames, final boolean ignoreSingleTables) {
this(false, storageUnitNames, ignoreSingleTables);
private final boolean ignoreBroadcastTables;

public UnregisterStorageUnitStatement(final Collection<String> storageUnitNames, final boolean ignoreSingleTables, final boolean ignoreBroadcastTables) {
this(false, storageUnitNames, ignoreSingleTables, ignoreBroadcastTables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.proxy.backend.handler.distsql.rdl.storage.unit;

import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.broadcast.rule.BroadcastRule;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.InvalidStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.MissingRequiredStorageUnitsException;
import org.apache.shardingsphere.distsql.handler.exception.storageunit.StorageUnitInUsedException;
Expand All @@ -37,6 +38,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -82,19 +84,33 @@ private void checkInUsed(final String databaseName, final UnregisterStorageUnitS
Collection<String> inUsedStorageUnitNames = inUsedStorageUnits.keySet();
inUsedStorageUnitNames.retainAll(sqlStatement.getStorageUnitNames());
if (!inUsedStorageUnitNames.isEmpty()) {
if (sqlStatement.isIgnoreSingleTables()) {
checkInUsedIgnoreSingleTables(new HashSet<>(inUsedStorageUnitNames), inUsedStorageUnits);
Collection<Class<? extends ShardingSphereRule>> ignoreShardingSphereRules = getIgnoreShardingSphereRules(sqlStatement);
if (!ignoreShardingSphereRules.isEmpty()) {
checkInUsedIgnoreTables(new HashSet<>(inUsedStorageUnitNames), inUsedStorageUnits, ignoreShardingSphereRules);
} else {
String firstResource = inUsedStorageUnitNames.iterator().next();
throw new StorageUnitInUsedException(firstResource, inUsedStorageUnits.get(firstResource));
}
}
}

private void checkInUsedIgnoreSingleTables(final Collection<String> inUsedResourceNames, final Map<String, Collection<Class<? extends ShardingSphereRule>>> inUsedStorageUnits) {
private Collection<Class<? extends ShardingSphereRule>> getIgnoreShardingSphereRules(final UnregisterStorageUnitStatement sqlStatement) {
Collection<Class<? extends ShardingSphereRule>> result = new LinkedList<>();
if (sqlStatement.isIgnoreSingleTables()) {
result.add(SingleRule.class);
}
if (sqlStatement.isIgnoreBroadcastTables()) {
result.add(BroadcastRule.class);
}
return result;
}

private void checkInUsedIgnoreTables(final Collection<String> inUsedResourceNames,
final Map<String, Collection<Class<? extends ShardingSphereRule>>> inUsedStorageUnits,
final Collection<Class<? extends ShardingSphereRule>> ignoreShardingSphereRules) {
for (String each : inUsedResourceNames) {
Collection<Class<? extends ShardingSphereRule>> inUsedRules = inUsedStorageUnits.get(each);
inUsedRules.remove(SingleRule.class);
ignoreShardingSphereRules.forEach(inUsedRules::remove);
ShardingSpherePreconditions.checkState(inUsedRules.isEmpty(), () -> new StorageUnitInUsedException(each, inUsedRules));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ void assertExecute() throws SQLException {
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(database.getRuleMetaData().getInUsedStorageUnitNameAndRulesMap()).thenReturn(Collections.emptyMap());
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false);
assertThat(handler.execute("foo_db", unregisterStorageUnitStatement), instanceOf(UpdateResponseHeader.class));
verify(modeContextManager).unregisterStorageUnits("foo_db", unregisterStorageUnitStatement.getStorageUnitNames());
}

@Test
void assertStorageUnitNameNotExistedExecute() {
when(ProxyContext.getInstance().getDatabase("foo_db").getResourceMetaData().getStorageUnits()).thenReturn(Collections.emptyMap());
assertThrows(MissingRequiredStorageUnitsException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false)));
assertThrows(MissingRequiredStorageUnitsException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false)));
}

@Test
Expand All @@ -132,7 +132,7 @@ void assertStorageUnitNameInUseExecute() {
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
assertThrows(StorageUnitInUsedException.class,
() -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false)));
() -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false)));
}

@Test
Expand All @@ -146,7 +146,7 @@ void assertStorageUnitNameInUseWithoutIgnoreSingleTables() {
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("foo_ds", storageUnit));
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
assertThrows(StorageUnitInUsedException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false)));
assertThrows(StorageUnitInUsedException.class, () -> handler.execute("foo_db", new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), false, false)));
}

@Test
Expand All @@ -160,14 +160,14 @@ void assertStorageUnitNameInUseIgnoreSingleTables() throws SQLException {
when(resourceMetaData.getStorageUnits()).thenReturn(Collections.singletonMap("foo_ds", storageUnit));
when(database.getResourceMetaData()).thenReturn(resourceMetaData);
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), true);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(Collections.singleton("foo_ds"), true, false);
assertThat(handler.execute("foo_db", unregisterStorageUnitStatement), instanceOf(UpdateResponseHeader.class));
verify(modeContextManager).unregisterStorageUnits("foo_db", unregisterStorageUnitStatement.getStorageUnitNames());
}

@Test
void assertExecuteWithIfExists() throws SQLException {
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true, false);
assertThat(handler.execute("foo_db", unregisterStorageUnitStatement), instanceOf(UpdateResponseHeader.class));
verify(modeContextManager).unregisterStorageUnits("foo_db", unregisterStorageUnitStatement.getStorageUnitNames());
}
Expand All @@ -177,7 +177,7 @@ void assertStorageUnitNameInUseWithIfExists() {
when(database.getRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton(shadowRule)));
when(shadowRule.getDataSourceMapper()).thenReturn(Collections.singletonMap("", Collections.singleton("foo_ds")));
when(contextManager.getMetaDataContexts().getMetaData().getDatabase("foo_db")).thenReturn(database);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true);
UnregisterStorageUnitStatement unregisterStorageUnitStatement = new UnregisterStorageUnitStatement(true, Collections.singleton("foo_ds"), true, false);
assertThrows(DistSQLException.class, () -> handler.execute("foo_db", unregisterStorageUnitStatement));
}
}