Skip to content

Commit

Permalink
Move data source state manager to StateContext
Browse files Browse the repository at this point in the history
  • Loading branch information
menghaoranss committed May 29, 2024
1 parent 2995202 commit 7e7543d
Show file tree
Hide file tree
Showing 30 changed files with 152 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService;
import org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
import org.apache.shardingsphere.readwritesplitting.constant.ReadwriteSplittingDataSourceType;
import org.apache.shardingsphere.readwritesplitting.distsql.statement.AlterReadwriteSplittingStorageUnitStatusStatement;
import org.apache.shardingsphere.readwritesplitting.exception.ReadwriteSplittingRuleExceptionIdentifier;
Expand Down Expand Up @@ -77,8 +77,8 @@ private void checkBeforeUpdate(final AlterReadwriteSplittingStorageUnitStatusSta

private void updateStatus(final ContextManager contextManager, final AlterReadwriteSplittingStorageUnitStatusStatement sqlStatement) {
DataSourceState status = sqlStatement.isEnable() ? DataSourceState.ENABLED : DataSourceState.DISABLED;
new QualifiedDataSourceStatusService(contextManager.getRepository())
.changeStatus(database.getName(), sqlStatement.getRuleName(), sqlStatement.getStorageUnitName(), status);
new QualifiedDataSourceStatePersistService(contextManager.getRepository())
.updateState(database.getName(), sqlStatement.getRuleName(), sqlStatement.getStorageUnitName(), status);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.state.StateContext;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
import org.apache.shardingsphere.transaction.rule.TransactionRule;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -70,7 +69,6 @@ void setUp() {
mock(MetaDataPersistService.class), new ShardingSphereMetaData(databases, mock(ResourceMetaData.class), globalRuleMetaData, new ConfigurationProperties(new Properties())));
when(contextManager.getMetaDataContexts()).thenReturn(metaDataContexts);
when(contextManager.getComputeNodeInstanceContext().getInstance().getState()).thenReturn(new InstanceStateContext());
when(contextManager.getStateContext()).thenReturn(new StateContext());
}

private Map<String, ShardingSphereDatabase> mockDatabases() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ public final class ContextManager implements AutoCloseable {

private final PersistRepository repository;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository, final boolean force) {
this.metaDataContexts = new AtomicReference<>(metaDataContexts);
this.computeNodeInstanceContext = computeNodeInstanceContext;
this.repository = repository;
persistServiceFacade = new PersistServiceFacade(repository, computeNodeInstanceContext.getModeConfiguration(), this);
metaDataContextManager = new MetaDataContextManager(this.metaDataContexts, computeNodeInstanceContext, persistServiceFacade);
executorEngine = ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
stateContext = new StateContext();
stateContext = new StateContext(this.metaDataContexts.get().getMetaData(), persistServiceFacade.getQualifiedDataSourceStatePersistService().loadStates(), force);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.shardingsphere.metadata.factory.InternalMetaDataFactory;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;

import javax.sql.DataSource;
import java.sql.SQLException;
Expand Down Expand Up @@ -95,7 +95,7 @@ public static MetaDataContexts create(final MetaDataPersistService persistServic
* @throws SQLException SQL exception
*/
public static MetaDataContexts create(final MetaDataPersistService persistService, final ContextManagerBuilderParameter param,
final ComputeNodeInstanceContext computeNodeInstanceContext, final Map<String, QualifiedDataSourceStatus> statusMap) throws SQLException {
final ComputeNodeInstanceContext computeNodeInstanceContext, final Map<String, QualifiedDataSourceState> statusMap) throws SQLException {
boolean isDatabaseMetaDataExisted = !persistService.getDatabaseMetaDataService().loadAllDatabaseNames().isEmpty();
Map<String, DatabaseConfiguration> effectiveDatabaseConfigs = isDatabaseMetaDataExisted
? createEffectiveDatabaseConfigurations(getDatabaseNames(computeNodeInstanceContext, param.getDatabaseConfigs(), persistService), param.getDatabaseConfigs(), persistService)
Expand Down Expand Up @@ -155,7 +155,7 @@ private static void closeGeneratedDataSources(final String databaseName, final M
}
}

private static void checkDataSourceStates(final Map<String, DatabaseConfiguration> databaseConfigs, final Map<String, QualifiedDataSourceStatus> statusMap, final boolean force) {
private static void checkDataSourceStates(final Map<String, DatabaseConfiguration> databaseConfigs, final Map<String, QualifiedDataSourceState> statusMap, final boolean force) {
Map<String, DataSourceState> storageDataSourceStates = getStorageDataSourceStates(statusMap);
databaseConfigs.forEach((key, value) -> {
if (!value.getStorageUnits().isEmpty()) {
Expand All @@ -164,7 +164,7 @@ private static void checkDataSourceStates(final Map<String, DatabaseConfiguratio
});
}

private static Map<String, DataSourceState> getStorageDataSourceStates(final Map<String, QualifiedDataSourceStatus> statusMap) {
private static Map<String, DataSourceState> getStorageDataSourceStates(final Map<String, QualifiedDataSourceState> statusMap) {
Map<String, DataSourceState> result = new HashMap<>(statusMap.size(), 1F);
statusMap.forEach((key, value) -> {
List<String> values = Splitter.on(".").splitToList(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.shardingsphere.mode.service.persist.MetaDataManagerPersistService;
import org.apache.shardingsphere.mode.service.persist.PersistServiceBuilder;
import org.apache.shardingsphere.mode.service.persist.ProcessPersistService;
import org.apache.shardingsphere.mode.service.persist.QualifiedDataSourceStatePersistService;
import org.apache.shardingsphere.mode.service.pojo.ShardingSphereSchemaDataAlteredPOJO;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.StatePersistService;
Expand All @@ -46,10 +47,13 @@ public final class PersistServiceFacade {

private final ProcessPersistService processPersistService;

private final QualifiedDataSourceStatePersistService qualifiedDataSourceStatePersistService;

public PersistServiceFacade(final PersistRepository repository, final ModeConfiguration modeConfiguration, final ContextManager contextManager) {
metaDataPersistService = new MetaDataPersistService(repository);
computeNodePersistService = new ComputeNodePersistService(repository);
statePersistService = new StatePersistService(repository);
qualifiedDataSourceStatePersistService = new QualifiedDataSourceStatePersistService(repository);
PersistServiceBuilder persistServiceBuilder = TypedSPILoader.getService(PersistServiceBuilder.class, modeConfiguration.getType());
metaDataManagerPersistService = persistServiceBuilder.buildMetaDataManagerPersistService(contextManager);
processPersistService = persistServiceBuilder.buildProcessPersistService(repository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.storage.service;
package org.apache.shardingsphere.mode.service.persist;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
import org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.yaml.YamlQualifiedDataSourceStatusSwapper;
Expand All @@ -33,21 +33,21 @@
import java.util.Map;

/**
* Qualified data source status service.
* Qualified data source state persist service.
*/
@RequiredArgsConstructor
public final class QualifiedDataSourceStatusService {
public final class QualifiedDataSourceStatePersistService {

private final PersistRepository repository;

/**
* Load qualified data source status.
* Load qualified data source states.
*
* @return qualified data source status
* @return qualified data source states
*/
public Map<String, QualifiedDataSourceStatus> loadStatus() {
public Map<String, QualifiedDataSourceState> loadStates() {
Collection<String> qualifiedDataSourceNodes = repository.getChildrenKeys(QualifiedDataSourceNode.getRootPath());
Map<String, QualifiedDataSourceStatus> result = new HashMap<>(qualifiedDataSourceNodes.size(), 1F);
Map<String, QualifiedDataSourceState> result = new HashMap<>(qualifiedDataSourceNodes.size(), 1F);
qualifiedDataSourceNodes.forEach(each -> {
String yamlContent = repository.query(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new QualifiedDataSource(each)));
if (!Strings.isNullOrEmpty(yamlContent)) {
Expand All @@ -58,15 +58,15 @@ public Map<String, QualifiedDataSourceStatus> loadStatus() {
}

/**
* Change qualified data source status.
* Update qualified data source state.
*
* @param databaseName database name
* @param groupName group name
* @param storageUnitName storage unit name
* @param dataSourceState data source state
*/
public void changeStatus(final String databaseName, final String groupName, final String storageUnitName, final DataSourceState dataSourceState) {
QualifiedDataSourceStatus status = new QualifiedDataSourceStatus(dataSourceState);
public void updateState(final String databaseName, final String groupName, final String storageUnitName, final DataSourceState dataSourceState) {
QualifiedDataSourceState status = new QualifiedDataSourceState(dataSourceState);
repository.persist(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(
new QualifiedDataSource(databaseName, groupName, storageUnitName)), YamlEngine.marshal(new YamlQualifiedDataSourceStatusSwapper().swapToYamlConfiguration(status)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,88 @@

package org.apache.shardingsphere.mode.state;

import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.state.datasource.exception.UnavailableDataSourceException;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/**
* State context.
*/
@Slf4j
public final class StateContext {

private final AtomicReference<ClusterState> currentClusterState = new AtomicReference<>(ClusterState.OK);

private final Map<String, DataSourceState> dataSourceStates = new ConcurrentHashMap<>();

public StateContext(final ShardingSphereMetaData metaData, final Map<String, QualifiedDataSourceState> qualifiedDataSourceStates, final boolean force) {
initDataSourceState(metaData, convert(qualifiedDataSourceStates), force);
}

private void initDataSourceState(final ShardingSphereMetaData metaData, final Map<String, DataSourceState> storageDataSourceStates, final boolean force) {
metaData.getDatabases().forEach((key, value) -> {
if (!value.getResourceMetaData().getStorageUnits().isEmpty()) {
initDataSourceState(key, value.getResourceMetaData().getStorageUnits(), storageDataSourceStates, force);
}
});
}

private void initDataSourceState(final String databaseName, final Map<String, StorageUnit> storageUnits, final Map<String, DataSourceState> storageDataSourceStates, final boolean force) {
storageUnits.forEach((key, value) -> initDataSourceState(databaseName, storageDataSourceStates, key, value.getDataSource(), force));
}

private void initDataSourceState(final String databaseName, final Map<String, DataSourceState> storageDataSourceStates, final String actualDataSourceName, final DataSource dataSource,
final boolean force) {
DataSourceState storageState = storageDataSourceStates.get(getCacheKey(databaseName, actualDataSourceName));
if (DataSourceState.DISABLED == storageState) {
dataSourceStates.put(getCacheKey(databaseName, actualDataSourceName), storageState);
} else {
checkState(databaseName, actualDataSourceName, dataSource, force);
}
}

private static Map<String, DataSourceState> convert(final Map<String, QualifiedDataSourceState> qualifiedDataSourceStates) {
Map<String, DataSourceState> result = new HashMap<>(qualifiedDataSourceStates.size(), 1F);
qualifiedDataSourceStates.forEach((key, value) -> {
List<String> values = Splitter.on(".").splitToList(key);
Preconditions.checkArgument(3 == values.size(), "Illegal data source of storage node.");
String databaseName = values.get(0);
String dataSourceName = values.get(2);
result.put(databaseName + "." + dataSourceName, DataSourceState.valueOf(value.getStatus().name()));
});
return result;
}

private void checkState(final String databaseName, final String actualDataSourceName, final DataSource dataSource, final boolean force) {
try (Connection ignored = dataSource.getConnection()) {
dataSourceStates.put(getCacheKey(databaseName, actualDataSourceName), DataSourceState.ENABLED);
} catch (final SQLException ex) {
ShardingSpherePreconditions.checkState(force, () -> new UnavailableDataSourceException(actualDataSourceName, ex));
log.error("Data source unavailable, ignored with the -f parameter.", ex);
}
}

private String getCacheKey(final String databaseName, final String dataSourceName) {
return databaseName + "." + dataSourceName;
}

/**
* Get current cluster state.
*
Expand All @@ -44,4 +116,14 @@ public ClusterState getCurrentClusterState() {
public void switchCurrentClusterState(final ClusterState state) {
currentClusterState.set(state);
}

/**
* Update data source state.
*
* @param qualifiedDataSource qualified data source
* @param dataSourceState data source state
*/
public void updateDataSourceState(final QualifiedDataSource qualifiedDataSource, final DataSourceState dataSourceState) {
dataSourceStates.put(getCacheKey(qualifiedDataSource.getDatabaseName(), qualifiedDataSource.getDataSourceName()), dataSourceState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.shardingsphere.infra.state.datasource.DataSourceState;

/**
* Qualified data source status.
* Qualified data source state.
*/
@RequiredArgsConstructor
@Getter
public final class QualifiedDataSourceStatus {
public final class QualifiedDataSourceState {

private final DataSourceState status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@

import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
import org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceStatus;
import org.apache.shardingsphere.mode.storage.QualifiedDataSourceState;

/**
* YAML qualified data source status swapper.
*/
public final class YamlQualifiedDataSourceStatusSwapper implements YamlConfigurationSwapper<YamlQualifiedDataSourceStatus, QualifiedDataSourceStatus> {
public final class YamlQualifiedDataSourceStatusSwapper implements YamlConfigurationSwapper<YamlQualifiedDataSourceStatus, QualifiedDataSourceState> {

@Override
public YamlQualifiedDataSourceStatus swapToYamlConfiguration(final QualifiedDataSourceStatus data) {
public YamlQualifiedDataSourceStatus swapToYamlConfiguration(final QualifiedDataSourceState data) {
YamlQualifiedDataSourceStatus result = new YamlQualifiedDataSourceStatus();
result.setStatus(data.getStatus().name());
return result;
}

@Override
public QualifiedDataSourceStatus swapToObject(final YamlQualifiedDataSourceStatus yamlConfig) {
return new QualifiedDataSourceStatus(DataSourceState.valueOf(yamlConfig.getStatus()));
public QualifiedDataSourceState swapToObject(final YamlQualifiedDataSourceStatus yamlConfig) {
return new QualifiedDataSourceState(DataSourceState.valueOf(yamlConfig.getStatus()));
}
}
Loading

0 comments on commit 7e7543d

Please sign in to comment.