Skip to content

Commit

Permalink
Refactor RegistryCenter
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 15, 2024
1 parent 829b5b5 commit 165893e
Show file tree
Hide file tree
Showing 16 changed files with 55 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
Expand Down Expand Up @@ -76,7 +77,7 @@ private ContextManager createContextManager(final String databaseName, final Mod
databaseRuleConfigs.removeAll(globalRuleConfigs);
ContextManagerBuilderParameter param = new ContextManagerBuilderParameter(modeConfig, Collections.singletonMap(databaseName,
new DataSourceProvidedDatabaseConfiguration(dataSourceMap, databaseRuleConfigs)), Collections.emptyMap(), globalRuleConfigs, props, Collections.emptyList(), instanceMetaData, false);
return TypedSPILoader.getService(ContextManagerBuilder.class, null == modeConfig ? null : modeConfig.getType()).build(param);
return TypedSPILoader.getService(ContextManagerBuilder.class, null == modeConfig ? null : modeConfig.getType()).build(param, new EventBusContext());
}

private void contextManagerInitializedCallback(final String databaseName, final ContextManager contextManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;

import java.sql.SQLException;

Expand All @@ -32,8 +33,9 @@ public interface ContextManagerBuilder extends TypedSPI {
* Build context manager.
*
* @param param context manager builder parameter
* @param eventBusContext event bus context
* @return context manager
* @throws SQLException SQL exception
*/
ContextManager build(ContextManagerBuilderParameter param) throws SQLException;
ContextManager build(ContextManagerBuilderParameter param, EventBusContext eventBusContext) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.metadata.persist.MetaDataPersistService;
import org.apache.shardingsphere.mode.lock.GlobalLockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
Expand All @@ -35,7 +36,6 @@
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;

import java.sql.SQLException;

Expand All @@ -45,10 +45,10 @@
public final class ClusterContextManagerBuilder implements ContextManagerBuilder {

@Override
public ContextManager build(final ContextManagerBuilderParameter param) throws SQLException {
public ContextManager build(final ContextManagerBuilderParameter param, final EventBusContext eventBusContext) throws SQLException {
ClusterPersistRepository repository = getClusterPersistRepository((ClusterPersistRepositoryConfiguration) param.getModeConfiguration().getRepository());
RegistryCenter registryCenter = new RegistryCenter(repository, param.getInstanceMetaData(), param.getDatabaseConfigs());
InstanceContext instanceContext = buildInstanceContext(registryCenter, param);
RegistryCenter registryCenter = new RegistryCenter(repository, eventBusContext, param.getInstanceMetaData(), param.getDatabaseConfigs());
InstanceContext instanceContext = buildInstanceContext(registryCenter, param, eventBusContext);
if (registryCenter.getRepository() instanceof InstanceContextAware) {
((InstanceContextAware) registryCenter.getRepository()).setInstanceContext(instanceContext);
}
Expand All @@ -67,9 +67,9 @@ private ClusterPersistRepository getClusterPersistRepository(final ClusterPersis
return result;
}

private InstanceContext buildInstanceContext(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param) {
private InstanceContext buildInstanceContext(final RegistryCenter registryCenter, final ContextManagerBuilderParameter param, final EventBusContext eventBusContext) {
return new InstanceContext(new ComputeNodeInstance(param.getInstanceMetaData()), new ClusterWorkerIdGenerator(registryCenter, param.getInstanceMetaData()),
param.getModeConfiguration(), new ClusterModeContextManager(), new GlobalLockContext(registryCenter.getGlobalLockPersistService()), registryCenter.getEventBusContext());
param.getModeConfiguration(), new ClusterModeContextManager(), new GlobalLockContext(registryCenter.getGlobalLockPersistService()), eventBusContext);
}

private void setContextManagerAware(final ContextManager contextManager) {
Expand All @@ -81,7 +81,6 @@ private void registerOnline(final RegistryCenter registryCenter, final ContextMa
loadClusterStatus(registryCenter, contextManager);
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
contextManager.getInstanceContext().getEventBusContext().register(new RuleItemChangedSubscriber(contextManager));
new ClusterEventSubscriberRegistry(contextManager, registryCenter).register();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@
*/
public final class RegistryCenter {

@Getter
private final EventBusContext eventBusContext;

@Getter
private final ClusterPersistRepository repository;

private final EventBusContext eventBusContext;

private final InstanceMetaData instanceMetaData;

private final Map<String, DatabaseConfiguration> databaseConfigs;
Expand All @@ -72,9 +71,10 @@ public final class RegistryCenter {

private final GovernanceWatcherFactory listenerFactory;

public RegistryCenter(final ClusterPersistRepository repository, final InstanceMetaData instanceMetaData, final Map<String, DatabaseConfiguration> databaseConfigs) {
eventBusContext = new EventBusContext();
public RegistryCenter(final ClusterPersistRepository repository, final EventBusContext eventBusContext,
final InstanceMetaData instanceMetaData, final Map<String, DatabaseConfiguration> databaseConfigs) {
this.repository = repository;
this.eventBusContext = new EventBusContext();
this.instanceMetaData = instanceMetaData;
this.databaseConfigs = databaseConfigs;
clusterStatusService = new ClusterStatusService(repository);
Expand All @@ -95,11 +95,11 @@ private String getJDBCDatabaseName() {
}

private void createSubscribers(final ClusterPersistRepository repository) {
new ComputeNodeStatusSubscriber(this, repository);
new ClusterStatusSubscriber(repository, eventBusContext);
new QualifiedDataSourceStatusSubscriber(repository, eventBusContext);
new ClusterProcessSubscriber(repository, eventBusContext);
new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
eventBusContext.register(new ComputeNodeStatusSubscriber(repository));
eventBusContext.register(new ClusterStatusSubscriber(repository));
eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext));
eventBusContext.register(new ShardingSphereSchemaDataRegistrySubscriber(repository));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
Expand All @@ -31,9 +30,8 @@ public final class ShardingSphereSchemaDataRegistrySubscriber implements EventSu

private final ShardingSphereDataPersistService persistService;

public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository) {
persistService = new ShardingSphereDataPersistService(repository);
eventBusContext.register(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public ClusterProcessSubscriber(final PersistRepository repository, final EventB
this.repository = repository;
this.eventBusContext = eventBusContext;
swapper = new YamlProcessListSwapper();
eventBusContext.register(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
Expand All @@ -27,15 +27,11 @@
/**
* Cluster status subscriber.
*/
@RequiredArgsConstructor
public final class ClusterStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

public ClusterStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
eventBusContext.register(this);
}

/**
* Update cluster status.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,23 @@
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.service.ComputeNodeStatusService;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collections;

/**
* Compute node status subscriber.
*/
public final class ComputeNodeStatusSubscriber implements EventSubscriber {

private final RegistryCenter registryCenter;

private final ClusterPersistRepository repository;

public ComputeNodeStatusSubscriber(final RegistryCenter registryCenter, final ClusterPersistRepository repository) {
this.registryCenter = registryCenter;
private final ComputeNodeStatusService computeNodeStatusService;

public ComputeNodeStatusSubscriber(final ClusterPersistRepository repository) {
this.repository = repository;
registryCenter.getEventBusContext().register(this);
computeNodeStatusService = new ComputeNodeStatusService(repository);

}

/**
Expand All @@ -59,10 +57,6 @@ public void update(final ComputeNodeStatusChangedEvent event) {
*/
@Subscribe
public void update(final LabelsChangedEvent event) {
if (event.getLabels().isEmpty()) {
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), Collections.emptyList());
} else {
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
}
computeNodeStatusService.persistInstanceLabels(event.getInstanceId(), event.getLabels());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -27,15 +27,11 @@
/**
* Qualified data source status subscriber.
*/
@RequiredArgsConstructor
public final class QualifiedDataSourceStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

public QualifiedDataSourceStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
eventBusContext.register(this);
}

/**
* Delete qualified data source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;
import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;
import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;

/**
* Cluster event subscriber registry.
*/
public final class ClusterEventSubscriberRegistry extends EventSubscriberRegistry {

public ClusterEventSubscriberRegistry(final ContextManager contextManager, final RegistryCenter registryCenter) {
super(contextManager, new ConfigurationChangedSubscriber(contextManager),
super(contextManager,
new RuleItemChangedSubscriber(contextManager),
new ConfigurationChangedSubscriber(contextManager),
new ConfigurationChangedSubscriber(contextManager),
new ResourceMetaDataChangedSubscriber(contextManager),
new DatabaseChangedSubscriber(contextManager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.ClusterContextManagerBuilder;
Expand Down Expand Up @@ -80,11 +81,12 @@ class ProcessListChangedSubscriberTest {

@BeforeEach
void setUp() throws SQLException {
contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter());
EventBusContext eventBusContext = new EventBusContext();
contextManager = new ClusterContextManagerBuilder().build(createContextManagerBuilderParameter(), eventBusContext);
contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), mock(ProxyInstanceMetaData.class), null);
registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), eventBusContext, mock(ProxyInstanceMetaData.class), null);
subscriber = new ProcessListChangedSubscriber(contextManager, registryCenter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;

import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.junit.jupiter.api.extension.ExtendWith;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.verify;
Expand All @@ -37,7 +36,7 @@ class ClusterStatusSubscriberTest {

@Test
void assertUpdate() {
ClusterStatusSubscriber clusterStatusSubscriber = new ClusterStatusSubscriber(repository, new EventBusContext());
ClusterStatusSubscriber clusterStatusSubscriber = new ClusterStatusSubscriber(repository);
ClusterStatusChangedEvent event = new ClusterStatusChangedEvent(ClusterState.OK);
clusterStatusSubscriber.update(event);
verify(repository).persist(ComputeNode.getClusterStatusNodePath(), ClusterState.OK.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;

import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
Expand All @@ -35,15 +34,13 @@ class QualifiedDataSourceStatusSubscriberTest {
@Mock
private ClusterPersistRepository repository;

private final EventBusContext eventBusContext = new EventBusContext();

@Test
void assertDeleteStorageNodeDataSourceDataSourceState() {
String databaseName = "replica_query_db";
String groupName = "readwrite_ds";
String dataSourceName = "replica_ds_0";
QualifiedDataSourceDeletedEvent event = new QualifiedDataSourceDeletedEvent(new QualifiedDataSource(databaseName, groupName, dataSourceName));
new QualifiedDataSourceStatusSubscriber(repository, eventBusContext).delete(event);
new QualifiedDataSourceStatusSubscriber(repository).delete(event);
verify(repository).delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new QualifiedDataSource(databaseName, groupName, dataSourceName)));
}
}
Loading

0 comments on commit 165893e

Please sign in to comment.