Skip to content

Commit

Permalink
Refactor RegistryCenter (#31254)
Browse files Browse the repository at this point in the history
* Refactor ClusterContextManagerBuilder

* Refactor RegistryCenter

* Refactor RegistryCenter

* Refactor RegistryCenter
  • Loading branch information
terrymanu authored May 16, 2024
1 parent 9bdf506 commit 010462f
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber.ShardingSphereSchemaDataRegistrySubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ClusterProcessSubscriber;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.service.ClusterStatusService;
Expand All @@ -44,9 +45,13 @@
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService;

import java.sql.SQLException;
import java.util.Optional;
import java.util.Properties;

/**
* Cluster context manager builder.
Expand All @@ -62,7 +67,7 @@ public ContextManager build(final ContextManagerBuilderParameter param, final Ev
((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext);
}
MetaDataPersistService metaDataPersistService = new MetaDataPersistService(repository);
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, instanceContext, registryCenter.getQualifiedDataSourceStatusService().loadStatus());
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(metaDataPersistService, param, instanceContext, new QualifiedDataSourceStatusService(repository).loadStatus());
ContextManager result = new ContextManager(metaDataContexts, instanceContext);
setContextManagerAware(result);
createSubscribers(eventBusContext, repository);
Expand All @@ -79,8 +84,13 @@ private ClusterPersistRepository getClusterPersistRepository(final ClusterPersis
}

private InstanceContext buildInstanceContext(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final EventBusContext eventBusContext) {
return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
param.getModeConfiguration(), new ClusterModeContextManager(), new GlobalLockContext(registryCenter.getGlobalLockPersistService()), eventBusContext);
return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()), param.getModeConfiguration(),
new ClusterModeContextManager(), new GlobalLockContext(new GlobalLockPersistService(initDistributedLockHolder(registryCenter.getRepository()))), eventBusContext);
}

private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepository repository) {
DistributedLockHolder distributedLockHolder = repository.getDistributedLockHolder();
return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder;
}

private void setContextManagerAware(final ContextManager contextManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.lock.GlobalLockPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceWatcherFactory;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.repository.cluster.lock.impl.props.DefaultLockTypedProperties;
import org.apache.shardingsphere.mode.storage.service.QualifiedDataSourceStatusService;

import java.util.Map;
import java.util.Properties;

/**
* Registry center.
Expand All @@ -46,33 +41,20 @@ public final class RegistryCenter {

private final Map<String, DatabaseConfiguration> databaseConfigs;

@Getter
private final QualifiedDataSourceStatusService qualifiedDataSourceStatusService;

@Getter
private final ComputeNodeStatusService computeNodeStatusService;

@Getter
private final GlobalLockPersistService globalLockPersistService;

private final GovernanceWatcherFactory listenerFactory;

public RegistryCenter(final EventBusContext eventBusContext,
final ClusterPersistRepository repository, final InstanceMetaData instanceMetaData, final Map<String, DatabaseConfiguration> databaseConfigs) {
this.repository = repository;
this.instanceMetaData = instanceMetaData;
this.databaseConfigs = databaseConfigs;
qualifiedDataSourceStatusService = new QualifiedDataSourceStatusService(repository);
computeNodeStatusService = new ComputeNodeStatusService(repository);
globalLockPersistService = new GlobalLockPersistService(initDistributedLockHolder(repository));
listenerFactory = new GovernanceWatcherFactory(repository, eventBusContext, getJDBCDatabaseName());
}

private DistributedLockHolder initDistributedLockHolder(final ClusterPersistRepository repository) {
DistributedLockHolder distributedLockHolder = repository.getDistributedLockHolder();
return null == distributedLockHolder ? new DistributedLockHolder("default", repository, new DefaultLockTypedProperties(new Properties())) : distributedLockHolder;
}

private String getJDBCDatabaseName() {
return instanceMetaData instanceof JDBCInstanceMetaData ? databaseConfigs.keySet().stream().findFirst().orElse(null) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.shardingsphere.distsql.statement.ral.updatable.LockClusterStatement;
import org.apache.shardingsphere.infra.spi.exception.ServiceProviderNotFoundException;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.exception.LockedClusterException;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
Expand All @@ -45,7 +46,7 @@ class LockClusterExecutorTest {
void assertExecuteUpdateWithLockedCluster() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(contextManager.getClusterStateContext().getCurrentState()).thenReturn(ClusterState.UNAVAILABLE);
assertThrows(IllegalStateException.class, () -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new Properties())), contextManager));
assertThrows(LockedClusterException.class, () -> executor.executeUpdate(new LockClusterStatement(new AlgorithmSegment("FOO", new Properties())), contextManager));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.shardingsphere.distsql.statement.ral.updatable.UnlockClusterStatement;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.mode.exception.NotLockedClusterException;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.test.mock.AutoMockExtension;
Expand All @@ -42,6 +43,6 @@ void assertExecuteUpdateWithNotLockedCluster() {
ContextManager contextManager = mock(ContextManager.class, RETURNS_DEEP_STUBS);
when(contextManager.getClusterStateContext().getCurrentState()).thenReturn(ClusterState.OK);
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
assertThrows(IllegalStateException.class, () -> executor.executeUpdate(new UnlockClusterStatement(), contextManager));
assertThrows(NotLockedClusterException.class, () -> executor.executeUpdate(new UnlockClusterStatement(), contextManager));
}
}

0 comments on commit 010462f

Please sign in to comment.