Skip to content

Commit

Permalink
Refactor ContextManagerSubscriberFacade to ContextManagerSubscriberRe…
Browse files Browse the repository at this point in the history
…gister (#31220)

* Rename ContextManagerAware.setContextManager()

* Refactor ContextManagerSubscriberFacade

* Refactor ContextManagerSubscriberRegister

* Refactor ContextManagerSubscriberRegister

* Refactor ContextManagerSubscriberRegister
  • Loading branch information
terrymanu authored May 13, 2024
1 parent 3018c65 commit fcb5901
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberFacade;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberRegister;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand Down Expand Up @@ -84,7 +84,7 @@ private void registerOnline(final RegistryCenter registryCenter, final ContextMa
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
contextManager.getInstanceContext().getEventBusContext().register(new RuleItemChangedSubscriber(contextManager));
new ContextManagerSubscriberFacade(registryCenter, contextManager);
new ContextManagerSubscriberRegister(registryCenter, contextManager).register();
}

private void loadClusterStatus(final RegistryCenter registryCenter, final ContextManager contextManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
Expand All @@ -40,19 +41,15 @@
* TODO replace the old ProcessListChangedSubscriber after meta data refactor completed
* New process list changed subscriber.
*/
@SuppressWarnings("unused")
@RequiredArgsConstructor
public final class ProcessListChangedSubscriber {

private final RegistryCenter registryCenter;

private final ContextManager contextManager;

private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
private final RegistryCenter registryCenter;

public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
this.registryCenter = registryCenter;
this.contextManager = contextManager;
contextManager.getInstanceContext().getEventBusContext().register(this);
}
private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();

/**
* Report local processes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;
import org.apache.shardingsphere.infra.rule.event.GovernanceEvent;
import org.apache.shardingsphere.infra.spi.type.ordered.cache.OrderedServicesCache;

/**
* Cache evicted subscriber.
*/
@SuppressWarnings("unused")
public final class CacheEvictedSubscriber {

public CacheEvictedSubscriber(final EventBusContext eventBusContext) {
eventBusContext.register(this);
}

/**
* Callback of any {@link GovernanceEvent}.
*
* @param ignored unused
*/
@SuppressWarnings("unused")
@Subscribe
public void onGovernanceEvent(final GovernanceEvent ignored) {
OrderedServicesCache.clearCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.event.config.AlterGlobalRuleConfigurationEvent;
import org.apache.shardingsphere.mode.event.config.AlterPropertiesEvent;
import org.apache.shardingsphere.mode.event.datasource.unit.AlterStorageUnitEvent;
Expand All @@ -28,16 +29,12 @@
/**
* Configuration changed subscriber.
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class ConfigurationChangedSubscriber {

private final ContextManager contextManager;

public ConfigurationChangedSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
contextManager.getInstanceContext().getEventBusContext().register(this);
}

/**
* Renew for register storage unit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,38 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;

import java.util.Arrays;
import java.util.Collection;

/**
* Context manager subscriber facade.
* Context manager subscriber register.
*/
public final class ContextManagerSubscriberFacade {
public final class ContextManagerSubscriberRegister {

private final EventBusContext eventBusContext;

private final Collection<Object> subscribers;

public ContextManagerSubscriberRegister(final RegistryCenter registryCenter, final ContextManager contextManager) {
eventBusContext = contextManager.getInstanceContext().getEventBusContext();
subscribers = Arrays.asList(
new ConfigurationChangedSubscriber(contextManager),
new ResourceMetaDataChangedSubscriber(contextManager),
new DatabaseChangedSubscriber(contextManager),
new StateChangedSubscriber(contextManager, registryCenter),
new ProcessListChangedSubscriber(contextManager, registryCenter),
new CacheEvictedSubscriber());
}

public ContextManagerSubscriberFacade(final RegistryCenter registryCenter, final ContextManager contextManager) {
new ConfigurationChangedSubscriber(contextManager);
new ResourceMetaDataChangedSubscriber(contextManager);
new DatabaseChangedSubscriber(contextManager);
new StateChangedSubscriber(registryCenter, contextManager);
new ProcessListChangedSubscriber(registryCenter, contextManager);
new CacheEvictedSubscriber(contextManager.getInstanceContext().getEventBusContext());
/**
* Register subscribers.
*/
public void register() {
subscribers.forEach(eventBusContext::register);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.DatabaseDataDeletedEvent;
Expand All @@ -30,16 +31,12 @@
/**
* Database changed subscriber.
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class DatabaseChangedSubscriber {

private final ContextManager contextManager;

public DatabaseChangedSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
contextManager.getInstanceContext().getEventBusContext().register(this);
}

/**
* Renew to persist ShardingSphere database data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereView;
import org.apache.shardingsphere.mode.event.schema.table.CreateOrAlterTableEvent;
Expand All @@ -35,16 +36,12 @@
/**
* Resource meta data changed subscriber.
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class ResourceMetaDataChangedSubscriber {

private final ContextManager contextManager;

public ResourceMetaDataChangedSubscriber(final ContextManager contextManager) {
this.contextManager = contextManager;
contextManager.getInstanceContext().getEventBusContext().register(this);
}

/**
* Renew to persist meta data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
Expand All @@ -39,18 +40,13 @@
/**
* State changed subscriber.
*/
@RequiredArgsConstructor
@SuppressWarnings("unused")
public final class StateChangedSubscriber {

private final RegistryCenter registryCenter;

private final ContextManager contextManager;

public StateChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
this.registryCenter = registryCenter;
this.contextManager = contextManager;
contextManager.getInstanceContext().getEventBusContext().register(this);
}
private final RegistryCenter registryCenter;

/**
* Renew disabled data source names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void setUp() throws SQLException {
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
registryCenter = new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null);
subscriber = new ProcessListChangedSubscriber(registryCenter, contextManager);
subscriber = new ProcessListChangedSubscriber(contextManager, registryCenter);
}

private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CacheEvictedSubscriberTest {
@Test
void assertOnGovernanceEvent() {
EventBusContext eventBusContext = new EventBusContext();
new CacheEvictedSubscriber(eventBusContext);
eventBusContext.register(new CacheEvictedSubscriber());
OrderedServicesCache.cacheServices(getClass(), Collections.emptyList(), Collections.emptyMap());
eventBusContext.post(new DatabaseDeletedEvent("db"));
assertFalse(OrderedServicesCache.findCachedServices(getClass(), Collections.emptyList()).isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void setUp() throws SQLException {
contextManager.renewMetaDataContexts(new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService(), new ShardingSphereMetaData(createDatabases(),
contextManager.getMetaDataContexts().getMetaData().getGlobalResourceMetaData(), contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData(),
new ConfigurationProperties(new Properties()))));
subscriber = new StateChangedSubscriber(new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null), contextManager);
subscriber = new StateChangedSubscriber(contextManager, new RegistryCenter(mock(ClusterPersistRepository.class), new EventBusContext(), mock(ProxyInstanceMetaData.class), null));
}

private ContextManagerBuilderParameter createContextManagerBuilderParameter() {
Expand Down

0 comments on commit fcb5901

Please sign in to comment.