From 010462f6a0c6f1d6efd1c91129143aaf74e2e735 Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Thu, 16 May 2024 23:43:45 +0800 Subject: [PATCH] Refactor RegistryCenter (#31254) * Refactor ClusterContextManagerBuilder * Refactor RegistryCenter * Refactor RegistryCenter * Refactor RegistryCenter --- .../cluster/ClusterContextManagerBuilder.java | 16 +++++++++++++--- .../cluster/coordinator/RegistryCenter.java | 18 ------------------ .../ral/updatable/LockClusterExecutorTest.java | 3 ++- .../updatable/UnlockClusterExecutorTest.java | 3 ++- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index d97d32bab4108..2f6f3c59ac8b5 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -31,6 +31,7 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilder; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService; @@ -44,9 +45,13 @@ import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; +import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; +import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; +import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService; import java.sql.SQLException; import java.util.Optional; +import java.util.Properties; /** * Cluster context manager builder. @@ -62,7 +67,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev ((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext); } MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository); - MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, instanceContext, registryCenter.getQualifiedDataSourceStatusService().loadStatus()); + MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, instanceContext, new QualifiedDataSourceStatusService(repository).loadStatus()); ContextManager result = new ContextManager(metaDataContexts, instanceContext); setContextManagerAware(result); createSubscribers(eventBusContext, repository); @@ -79,8 +84,13 @@ private ClusterPersistRepository getClusterPersistRepository(final ClusterPersis } private InstanceContext buildInstanceContext(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final EventBusContext eventBusContext) { - return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()), - param.getModeConfiguration(), new ClusterModeContextManager(), new GlobalLockContext(registryCenter.getGlobalLockPersistService()), eventBusContext); + return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()), param.getModeConfiguration(), + new ClusterModeContextManager(), new GlobalLockContext(new GlobalLockPersistService(initDistributedLockHolder(registryCenter.getRepository()))), eventBusContext); + } + + private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepository repository) { + DistributedLockHolder distributedLockHolder = repository.getDistributedLockHolder(); + return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder; } private void setContextManagerAware(final ContextManager contextManager) { diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java index 0b92eff5d3f09..34ae942497ef7 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java @@ -23,16 +23,11 @@ import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData; import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; -import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; -import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties; -import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService; import java.util.Map; -import java.util.Properties; /** * Registry center. @@ -46,15 +41,9 @@ public final class RegistryCenter { private final Map databaseConfigs; - @Getter - private final QualifiedDataSourceStatusService qualifiedDataSourceStatusService; - @Getter private final ComputeNodeStatusService computeNodeStatusService; - @Getter - private final GlobalLockPersistService globalLockPersistService; - private final GovernanceWatcherFactory listenerFactory; public RegistryCenter(final EventBusContext eventBusContext, @@ -62,17 +51,10 @@ public RegistryCenter(final EventBusContext eventBusContext, this.repository = repository; this.instanceMetaData = instanceMetaData; this.databaseConfigs = databaseConfigs; - qualifiedDataSourceStatusService = new QualifiedDataSourceStatusService(repository); computeNodeStatusService = new ComputeNodeStatusService(repository); - globalLockPersistService = new GlobalLockPersistService(initDistributedLockHolder(repository)); listenerFactory = new GovernanceWatcherFactory(repository, eventBusContext, getJDBCDatabaseName()); } - private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepository repository) { - DistributedLockHolder distributedLockHolder = repository.getDistributedLockHolder(); - return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder; - } - private String getJDBCDatabaseName() { return instanceMetaData instanceof JDBCInstanceMetaData ? databaseConfigs.keySet().stream().findFirst().orElse(null) : null; } diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java index b14babcb81ef5..e43deee62e885 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/LockClusterExecutorTest.java @@ -21,6 +21,7 @@ import org.apache.shardingsphere.distsql.statement.ral.updatable.LockClusterStatement; import org.apache.shardingsphere.infra.spi.exception.ServiceProviderNotFoundException; import org.apache.shardingsphere.infra.state.cluster.ClusterState; +import org.apache.shardingsphere.mode.exception.LockedClusterException; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.test.mock.AutoMockExtension; @@ -45,7 +46,7 @@ class LockClusterExecutorTest { void assertExecuteUpdateWithLockedCluster() { ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); when(contextManager.getClusterStateContext().getCurrentState()).thenReturn(ClusterState.UNAVAILABLE); - assertThrows(IllegalStateException.class, () -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new Properties())), contextManager)); + assertThrows(LockedClusterException.class, () -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new Properties())), contextManager)); } @Test diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java index ff4e3365d57e3..99bc88db730a6 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/UnlockClusterExecutorTest.java @@ -19,6 +19,7 @@ import org.apache.shardingsphere.distsql.statement.ral.updatable.UnlockClusterStatement; import org.apache.shardingsphere.infra.state.cluster.ClusterState; +import org.apache.shardingsphere.mode.exception.NotLockedClusterException; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.test.mock.AutoMockExtension; @@ -42,6 +43,6 @@ void assertExecuteUpdateWithNotLockedCluster() { ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS); when(contextManager.getClusterStateContext().getCurrentState()).thenReturn(ClusterState.OK); when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager); - assertThrows(IllegalStateException.class, () -> executor.executeUpdate(new UnlockClusterStatement(), contextManager)); + assertThrows(NotLockedClusterException.class, () -> executor.executeUpdate(new UnlockClusterStatement(), contextManager)); } }