Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move MetaDataPersistService from MetaDataContexts to PersistServiceFacade #31376

Merged
merged 3 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
Expand Down Expand Up @@ -84,6 +85,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
import org.apache.shardingsphere.test.mock.StaticMockSettings;
Expand Down Expand Up @@ -73,6 +74,6 @@ private ContextManager mockContextManager() {
ComputeNodeInstanceContext computeNodeInstanceContext = new ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new StandaloneWorkerIdGenerator(), new ModeConfiguration("Standalone", null),
mock(LockContext.class), new EventBusContext());
return new ContextManager(metaDataContexts, computeNodeInstanceContext);
return new ContextManager(metaDataContexts, computeNodeInstanceContext, mock(PersistRepository.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private void checkBeforeUpdate(final AlterReadwriteSplittingStorageUnitStatusSta

private void updateStatus(final ContextManager contextManager, final AlterReadwriteSplittingStorageUnitStatusStatement sqlStatement) {
DataSourceState status = sqlStatement.isEnable() ? DataSourceState.ENABLED : DataSourceState.DISABLED;
new QualifiedDataSourceStatusService(contextManager.getMetaDataContexts().getPersistService().getRepository())
new QualifiedDataSourceStatusService(contextManager.getRepository())
.changeStatus(database.getName(), sqlStatement.getRuleName(), sqlStatement.getStorageUnitName(), status);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void executeUpdate() {
checkBeforeUpdate();
RuleConfiguration currentRuleConfig = rule.map(ShardingSphereRule::getConfiguration).orElse(null);
if (getRefreshStatus(rule.isPresent())) {
contextManager.getMetaDataContexts().getPersistService().getMetaDataVersionPersistService()
contextManager.getPersistServiceFacade().getMetaDataPersistService().getMetaDataVersionPersistService()
.switchActiveVersion(DatabaseRuleOperatorFactory.newInstance(contextManager, executor).operate(sqlStatement, database, currentRuleConfig));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private Map<String, DataSource> getTrafficDataSourceMap(final String databaseNam
if (rule.getStrategyRules().isEmpty()) {
return Collections.emptyMap();
}
MetaDataPersistService persistService = contextManager.getMetaDataContexts().getPersistService();
MetaDataPersistService persistService = contextManager.getPersistServiceFacade().getMetaDataPersistService();
String actualDatabaseName = contextManager.getMetaDataContexts().getMetaData().getDatabase(databaseName).getName();
Map<String, DataSourcePoolProperties> propsMap = persistService.getDataSourceUnitService().load(actualDatabaseName);
Preconditions.checkState(!propsMap.isEmpty(), "Can not get data source properties from meta data.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private ContextManager mockContextManager() throws SQLException {
Map<String, StorageUnit> storageUnits = mockStorageUnits();
when(result.getStorageUnits(DefaultDatabase.LOGIC_NAME)).thenReturn(storageUnits);
MetaDataPersistService persistService = mockMetaDataPersistService();
when(result.getMetaDataContexts().getPersistService()).thenReturn(persistService);
when(result.getPersistServiceFacade().getMetaDataPersistService()).thenReturn(persistService);
when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(
new RuleMetaData(Arrays.asList(mock(AuthorityRule.class, RETURNS_DEEP_STUBS), mock(TransactionRule.class, RETURNS_DEEP_STUBS),
mock(TrafficRule.class, RETURNS_DEEP_STUBS))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static PipelineGovernanceFacade getPipelineGovernanceFacade(final Pipelin
@Override
protected PipelineGovernanceFacade initialize() {
ContextManager contextManager = PipelineContextManager.getContext(contextKey).getContextManager();
return new PipelineGovernanceFacade((ClusterPersistRepository) contextManager.getMetaDataContexts().getPersistService().getRepository());
return new PipelineGovernanceFacade((ClusterPersistRepository) contextManager.getPersistServiceFacade().getMetaDataPersistService().getRepository());
}
}).get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private final class PersistRepositoryLazyInitializer extends LazyInitializer<Clu

@Override
protected ClusterPersistRepository initialize() {
return (ClusterPersistRepository) PipelineContextManager.getContext(contextKey).getContextManager().getMetaDataContexts().getPersistService().getRepository();
return (ClusterPersistRepository) PipelineContextManager.getContext(contextKey).getContextManager().getPersistServiceFacade().getMetaDataPersistService().getRepository();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

/**
Expand All @@ -55,8 +55,10 @@ public void persist(final String databaseName, final String schemaName, final Ma
String tableName = entry.getKey().toLowerCase();
List<String> versions = repository.getChildrenKeys(TableMetaDataNode.getTableVersionsNode(databaseName, schemaName, tableName));
String nextActiveVersion = versions.isEmpty() ? DEFAULT_VERSION : String.valueOf(Integer.parseInt(versions.get(0)) + 1);
repository.persist(TableMetaDataNode.getTableVersionNode(databaseName, schemaName, tableName, nextActiveVersion),
YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(entry.getValue())));
if (entry.getValue() != null) {
repository.persist(TableMetaDataNode.getTableVersionNode(databaseName, schemaName, tableName, nextActiveVersion),
YamlEngine.marshal(new YamlTableSwapper().swapToYamlConfiguration(entry.getValue())));
}
if (Strings.isNullOrEmpty(getActiveVersion(databaseName, schemaName, tableName))) {
repository.persist(TableMetaDataNode.getTableActiveVersionNode(databaseName, schemaName, tableName), DEFAULT_VERSION);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.service.PersistServiceFacade;
import org.apache.shardingsphere.mode.spi.PersistRepository;
import org.apache.shardingsphere.mode.state.StateContext;

import java.sql.SQLException;
Expand Down Expand Up @@ -77,15 +78,18 @@ public final class ContextManager implements AutoCloseable {

private final PersistServiceFacade persistServiceFacade;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext) {
private final PersistRepository repository;

public ContextManager(final MetaDataContexts metaDataContexts, final ComputeNodeInstanceContext computeNodeInstanceContext, final PersistRepository repository) {
this.metaDataContexts = new AtomicReference<>(metaDataContexts);
this.computeNodeInstanceContext = computeNodeInstanceContext;
shardingSphereDatabaseContextManager = new ShardingSphereDatabaseContextManager(this.metaDataContexts);
configurationContextManager = new ConfigurationContextManager(this.metaDataContexts, computeNodeInstanceContext);
resourceMetaDataContextManager = new ResourceMetaDataContextManager(this.metaDataContexts);
this.repository = repository;
persistServiceFacade = new PersistServiceFacade(repository, computeNodeInstanceContext.getModeConfiguration(), this);
shardingSphereDatabaseContextManager = new ShardingSphereDatabaseContextManager(this.metaDataContexts, persistServiceFacade);
configurationContextManager = new ConfigurationContextManager(this.metaDataContexts, computeNodeInstanceContext, persistServiceFacade);
resourceMetaDataContextManager = new ResourceMetaDataContextManager(this.metaDataContexts, persistServiceFacade);
executorEngine = ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
stateContext = new StateContext();
persistServiceFacade = new PersistServiceFacade(metaDataContexts.getPersistService().getRepository(), computeNodeInstanceContext.getModeConfiguration(), this);
}

/**
Expand Down Expand Up @@ -138,7 +142,7 @@ public Map<String, StorageUnit> getStorageUnits(final String databaseName) {
public void refreshDatabaseMetaData(final ShardingSphereDatabase database, final boolean force) {
try {
MetaDataContexts reloadedMetaDataContexts = createMetaDataContexts(database);
MetaDataPersistService persistService = metaDataContexts.get().getPersistService();
MetaDataPersistService persistService = persistServiceFacade.getMetaDataPersistService();
if (force) {
metaDataContexts.set(reloadedMetaDataContexts);
metaDataContexts.get().getMetaData().getDatabase(database.getName()).getSchemas()
Expand All @@ -165,14 +169,14 @@ public void refreshTableMetaData(final ShardingSphereDatabase database) {
deletedSchemaNames(database.getName(), reloadedMetaDataContexts.getMetaData().getDatabase(database.getName()), database);
metaDataContexts.set(reloadedMetaDataContexts);
metaDataContexts.get().getMetaData().getDatabase(database.getName()).getSchemas()
.forEach((schemaName, schema) -> metaDataContexts.get().getPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, schema));
.forEach((schemaName, schema) -> persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().compareAndPersist(database.getName(), schemaName, schema));
} catch (final SQLException ex) {
log.error("Refresh table meta data: {} failed", database.getName(), ex);
}
}

private MetaDataContexts createMetaDataContexts(final ShardingSphereDatabase database) throws SQLException {
MetaDataPersistService metaDataPersistService = metaDataContexts.get().getPersistService();
MetaDataPersistService metaDataPersistService = persistServiceFacade.getMetaDataPersistService();
Map<String, DataSourcePoolProperties> dataSourcePoolPropsFromRegCenter = metaDataPersistService.getDataSourceUnitService().load(database.getName());
SwitchingResource switchingResource = new ResourceSwitchManager().alterStorageUnit(database.getResourceMetaData(), dataSourcePoolPropsFromRegCenter);
metaDataContexts.get().getMetaData().getDatabases().putAll(configurationContextManager.renewDatabase(database, switchingResource));
Expand All @@ -196,7 +200,7 @@ private MetaDataContexts createMetaDataContexts(final ShardingSphereDatabase dat
*/
public void deletedSchemaNames(final String databaseName, final ShardingSphereDatabase reloadDatabase, final ShardingSphereDatabase currentDatabase) {
GenericSchemaManager.getToBeDeletedSchemaNames(reloadDatabase.getSchemas(), currentDatabase.getSchemas()).keySet()
.forEach(each -> metaDataContexts.get().getPersistService().getDatabaseMetaDataService().dropSchema(databaseName, each));
.forEach(each -> persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().dropSchema(databaseName, each));
}

/**
Expand All @@ -211,11 +215,11 @@ public void reloadSchema(final ShardingSphereDatabase database, final String sch
ShardingSphereSchema reloadedSchema = loadSchema(database, schemaName, dataSourceName);
if (reloadedSchema.getTables().isEmpty()) {
database.dropSchema(schemaName);
metaDataContexts.get().getPersistService().getDatabaseMetaDataService().dropSchema(database.getName(),
persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().dropSchema(database.getName(),
schemaName);
} else {
database.addSchema(schemaName, reloadedSchema);
metaDataContexts.get().getPersistService().getDatabaseMetaDataService()
persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService()
.compareAndPersist(database.getName(), schemaName, reloadedSchema);
}
} catch (final SQLException ex) {
Expand All @@ -230,7 +234,7 @@ private ShardingSphereSchema loadSchema(final ShardingSphereDatabase database, f
Collections.singletonMap(dataSourceName, database.getResourceMetaData().getStorageUnits().get(dataSourceName).getDataSource()),
database.getRuleMetaData().getRules(), metaDataContexts.get().getMetaData().getProps(), schemaName);
ShardingSphereSchema result = GenericSchemaBuilder.build(material).get(schemaName);
result.getViews().putAll(metaDataContexts.get().getPersistService().getDatabaseMetaDataService().getViewMetaDataPersistService().load(database.getName(), schemaName));
result.getViews().putAll(persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().getViewMetaDataPersistService().load(database.getName(), schemaName));
return result;
}

Expand Down Expand Up @@ -273,13 +277,14 @@ public void reloadTable(final ShardingSphereDatabase database, final String sche

private void persistTable(final ShardingSphereDatabase database, final String schemaName, final String tableName, final GenericSchemaBuilderMaterial material) throws SQLException {
ShardingSphereSchema schema = GenericSchemaBuilder.build(Collections.singleton(tableName), material).getOrDefault(schemaName, new ShardingSphereSchema());
metaDataContexts.get().getPersistService().getDatabaseMetaDataService().getTableMetaDataPersistService()
persistServiceFacade.getMetaDataPersistService().getDatabaseMetaDataService().getTableMetaDataPersistService()
.persist(database.getName(), schemaName, Collections.singletonMap(tableName, schema.getTable(tableName)));
}

@Override
public void close() {
executorEngine.close();
metaDataContexts.get().close();
repository.close();
}
}
Loading