Skip to content

Commit

Permalink
Rename QualifiedDatabase to QualifiedDataSource (#30988)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Apr 22, 2024
1 parent 3a1b1db commit aab1fee
Show file tree
Hide file tree
Showing 16 changed files with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
Expand Down Expand Up @@ -58,13 +58,14 @@ public Map<String, Collection<String>> getDataSourceMapper() {
@Override
public void updateStatus(final DataSourceStatusChangedEvent event) {
StorageNodeDataSourceChangedEvent dataSourceEvent = (StorageNodeDataSourceChangedEvent) event;
QualifiedDatabase qualifiedDatabase = dataSourceEvent.getQualifiedDatabase();
ReadwriteSplittingDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDatabase.getGroupName());
ShardingSpherePreconditions.checkNotNull(dataSourceRule, () -> new ReadwriteSplittingDataSourceRuleNotFoundException(qualifiedDatabase.getGroupName(), qualifiedDatabase.getDatabaseName()));
QualifiedDataSource qualifiedDataSource = dataSourceEvent.getQualifiedDataSource();
ReadwriteSplittingDataSourceRule dataSourceRule = dataSourceRules.get(qualifiedDataSource.getGroupName());
ShardingSpherePreconditions.checkNotNull(dataSourceRule,
() -> new ReadwriteSplittingDataSourceRuleNotFoundException(qualifiedDataSource.getGroupName(), qualifiedDataSource.getDatabaseName()));
if (DataSourceState.DISABLED == dataSourceEvent.getDataSource().getStatus()) {
dataSourceRule.disableDataSource(dataSourceEvent.getQualifiedDatabase().getDataSourceName());
dataSourceRule.disableDataSource(dataSourceEvent.getQualifiedDataSource().getDataSourceName());
} else {
dataSourceRule.enableDataSource(dataSourceEvent.getQualifiedDatabase().getDataSourceName());
dataSourceRule.enableDataSource(dataSourceEvent.getQualifiedDataSource().getDataSourceName());
}
}

Expand All @@ -76,7 +77,7 @@ public void cleanStorageNodeDataSource(final String groupName) {

private void deleteStorageNodeDataSources(final ReadwriteSplittingDataSourceRule rule) {
rule.getReadwriteSplittingGroup().getReadDataSources()
.forEach(each -> instanceContext.getEventBusContext().post(new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, rule.getName(), each))));
.forEach(each -> instanceContext.getEventBusContext().post(new StorageNodeDataSourceDeletedEvent(new QualifiedDataSource(databaseName, rule.getName(), each))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.shardingsphere.infra.algorithm.core.config.AlgorithmConfiguration;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSource;
Expand Down Expand Up @@ -71,26 +71,26 @@ private void assertDataSourceRule(final ReadwriteSplittingDataSourceRule actual)
void assertUpdateRuleStatusWithNotExistDataSource() {
ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule();
readwriteSplittingRule.getAttributes().getAttribute(StaticDataSourceRuleAttribute.class).updateStatus(new StorageNodeDataSourceChangedEvent(
new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
new QualifiedDataSource("readwrite_splitting_db.readwrite.read_ds"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.singleton("read_ds")));
}

@Test
void assertUpdateRuleStatus() {
ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule();
readwriteSplittingRule.getAttributes().getAttribute(StaticDataSourceRuleAttribute.class).updateStatus(new StorageNodeDataSourceChangedEvent(
new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
new QualifiedDataSource("readwrite_splitting_db.readwrite.read_ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.singleton("read_ds_0")));
}

@Test
void assertUpdateRuleStatusWithEnable() {
ReadwriteSplittingRule readwriteSplittingRule = createReadwriteSplittingRule();
readwriteSplittingRule.getAttributes().getAttribute(StaticDataSourceRuleAttribute.class).updateStatus(new StorageNodeDataSourceChangedEvent(
new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
new QualifiedDataSource("readwrite_splitting_db.readwrite.read_ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.DISABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.singleton("read_ds_0")));
readwriteSplittingRule.getAttributes().getAttribute(StaticDataSourceRuleAttribute.class).updateStatus(new StorageNodeDataSourceChangedEvent(
new QualifiedDatabase("readwrite_splitting_db.readwrite.read_ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED)));
new QualifiedDataSource("readwrite_splitting_db.readwrite.read_ds_0"), new StorageNodeDataSource(StorageNodeRole.MEMBER, DataSourceState.ENABLED)));
assertThat(readwriteSplittingRule.getSingleDataSourceRule().getDisabledDataSourceNames(), is(Collections.emptySet()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import java.util.List;

/**
* Qualified database.
* Qualified data source.
*/
@RequiredArgsConstructor
@Getter
public final class QualifiedDatabase {
public final class QualifiedDataSource {

private static final String DELIMITER = ".";

Expand All @@ -38,7 +38,7 @@ public final class QualifiedDatabase {

private final String dataSourceName;

public QualifiedDatabase(final String value) {
public QualifiedDataSource(final String value) {
List<String> values = Splitter.on(DELIMITER).splitToList(value);
databaseName = values.get(0);
groupName = values.get(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

class QualifiedDatabaseTest {
class QualifiedDataSourceTest {

@Test
void assertNewQualifiedDatabaseWithDatabaseNameAndDataSourceName() {
QualifiedDatabase actual = new QualifiedDatabase("test_db.test_group_name.test_ds");
void assertNew() {
QualifiedDataSource actual = new QualifiedDataSource("test_db.test_group_name.test_ds");
assertThat(actual.getDatabaseName(), is("test_db"));
assertThat(actual.getGroupName(), is("test_group_name"));
assertThat(actual.getDataSourceName(), is("test_ds"));
assertThat(actual.toString(), is("test_db.test_group_name.test_ds"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.rule.event.DataSourceStatusChangedEvent;

/**
Expand All @@ -29,7 +29,7 @@
@Getter
public final class StorageNodeDataSourceChangedEvent implements DataSourceStatusChangedEvent {

private final QualifiedDatabase qualifiedDatabase;
private final QualifiedDataSource qualifiedDataSource;

private final StorageNodeDataSource dataSource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;

/**
* Storage node data source deleted event.
Expand All @@ -28,5 +28,5 @@
@Getter
public final class StorageNodeDataSourceDeletedEvent {

private final QualifiedDatabase qualifiedDatabase;
private final QualifiedDataSource qualifiedDataSource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;

import java.util.Optional;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -60,19 +60,19 @@ public static String getStorageNodesDataSourcePath(final String dataSourcePath)
* @param database cluster database
* @return status path of storage node
*/
public static String getStorageNodeDataSourcePath(final QualifiedDatabase database) {
public static String getStorageNodeDataSourcePath(final QualifiedDataSource database) {
return String.join("/", getRootPath(), database.toString());
}

/**
* Extract qualified database.
* Extract qualified data source.
*
* @param storageNodePath storage node path
* @return extracted qualified database
* @return extracted qualified data source
*/
public static Optional<QualifiedDatabase> extractQualifiedDatabase(final String storageNodePath) {
public static Optional<QualifiedDataSource> extractQualifiedDataSource(final String storageNodePath) {
Pattern pattern = Pattern.compile(getRootPath() + "/(\\S+)$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(storageNodePath);
return matcher.find() ? Optional.of(new QualifiedDatabase(matcher.group(1))) : Optional.empty();
return matcher.find() ? Optional.of(new QualifiedDataSource(matcher.group(1))) : Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSource;
Expand Down Expand Up @@ -68,7 +68,7 @@ public Map<String, StorageNodeDataSource> loadStorageNodes() {
*/
public void changeMemberStorageNodeStatus(final String databaseName, final String groupName, final String storageUnitName, final DataSourceState dataSourceState) {
StorageNodeDataSource storageNodeDataSource = new StorageNodeDataSource(StorageNodeRole.MEMBER, dataSourceState);
repository.persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, storageUnitName)),
repository.persist(StorageNode.getStorageNodeDataSourcePath(new QualifiedDataSource(databaseName, groupName, storageUnitName)),
YamlEngine.marshal(new YamlStorageNodeDataSourceSwapper().swapToYamlConfiguration(storageNodeDataSource)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSource;

/**
Expand All @@ -30,7 +30,7 @@
@Getter
public final class StorageNodeChangedEvent implements GovernanceEvent {

private final QualifiedDatabase qualifiedDatabase;
private final QualifiedDataSource qualifiedDataSource;

private final StorageNodeDataSource dataSource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ public StorageNodeStatusSubscriber(final ClusterPersistRepository repository, fi
*/
@Subscribe
public void delete(final StorageNodeDataSourceDeletedEvent event) {
repository.delete(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDatabase()));
repository.delete(StorageNode.getStorageNodeDataSourcePath(event.getQualifiedDataSource()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.watcher;

import com.google.common.base.Strings;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcher;
Expand Down Expand Up @@ -55,9 +55,9 @@ public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent ev
if (Strings.isNullOrEmpty(event.getValue())) {
return Optional.empty();
}
Optional<QualifiedDatabase> qualifiedDatabase = StorageNode.extractQualifiedDatabase(event.getKey());
if (qualifiedDatabase.isPresent()) {
QualifiedDatabase database = qualifiedDatabase.get();
Optional<QualifiedDataSource> qualifiedDataSource = StorageNode.extractQualifiedDataSource(event.getKey());
if (qualifiedDataSource.isPresent()) {
QualifiedDataSource database = qualifiedDataSource.get();
StorageNodeDataSource storageNodeDataSource = new YamlStorageNodeDataSourceSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(), YamlStorageNodeDataSource.class));
return Optional.of(new StorageNodeChangedEvent(database, storageNodeDataSource));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ public StateChangedSubscriber(final RegistryCenter registryCenter, final Context
@Subscribe
public synchronized void renew(final StorageNodeChangedEvent event) {
ShardingSphereMetaData metaData = contextManager.getMetaDataContexts().getMetaData();
if (!metaData.containsDatabase(event.getQualifiedDatabase().getDatabaseName())) {
if (!metaData.containsDatabase(event.getQualifiedDataSource().getDatabaseName())) {
return;
}
for (StaticDataSourceRuleAttribute each : metaData.getDatabase(event.getQualifiedDatabase().getDatabaseName()).getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
each.updateStatus(new StorageNodeDataSourceChangedEvent(event.getQualifiedDatabase(), event.getDataSource()));
for (StaticDataSourceRuleAttribute each : metaData.getDatabase(event.getQualifiedDataSource().getDatabaseName()).getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
each.updateStatus(new StorageNodeDataSourceChangedEvent(event.getQualifiedDataSource(), event.getDataSource()));
}
DataSourceStateManager.getInstance().updateState(
event.getQualifiedDatabase().getDatabaseName(), event.getQualifiedDatabase().getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().name()));
event.getQualifiedDataSource().getDatabaseName(), event.getQualifiedDataSource().getDataSourceName(), DataSourceState.valueOf(event.getDataSource().getStatus().name()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.node;

import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.mode.storage.node.StorageNode;
import org.junit.jupiter.api.Test;

Expand All @@ -40,8 +40,8 @@ void assertGetStorageNodesDataSourcePath() {
}

@Test
void assertExtractQualifiedDatabase() {
Optional<QualifiedDatabase> actual = StorageNode.extractQualifiedDatabase("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0");
void assertExtractQualifiedDataSource() {
Optional<QualifiedDataSource> actual = StorageNode.extractQualifiedDataSource("/nodes/storage_nodes/replica_query_db.readwrite_ds.replica_ds_0");
assertTrue(actual.isPresent());
assertThat(actual.get().getDatabaseName(), is("replica_query_db"));
assertThat(actual.get().getGroupName(), is("readwrite_ds"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;

import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.event.storage.StorageNodeDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -42,8 +42,8 @@ void assertDeleteStorageNodeDataSourceDataSourceState() {
String databaseName = "replica_query_db";
String groupName = "readwrite_ds";
String dataSourceName = "replica_ds_0";
StorageNodeDataSourceDeletedEvent event = new StorageNodeDataSourceDeletedEvent(new QualifiedDatabase(databaseName, groupName, dataSourceName));
StorageNodeDataSourceDeletedEvent event = new StorageNodeDataSourceDeletedEvent(new QualifiedDataSource(databaseName, groupName, dataSourceName));
new StorageNodeStatusSubscriber(repository, eventBusContext).delete(event);
verify(repository).delete(StorageNode.getStorageNodeDataSourcePath(new QualifiedDatabase(databaseName, groupName, dataSourceName)));
verify(repository).delete(StorageNode.getStorageNodeDataSourcePath(new QualifiedDataSource(databaseName, groupName, dataSourceName)));
}
}
Loading

0 comments on commit aab1fee

Please sign in to comment.