Skip to content

Commit

Permalink
Add EventSubscriberRegistry (#31227)
Browse files Browse the repository at this point in the history
* Add EventSubscriber

* Add StandaloneEventSubscriberRegister

* Add EventSubscriberRegistry

* Add EventSubscriberRegistry

* Add EventSubscriberRegistry
  • Loading branch information
terrymanu authored May 14, 2024
1 parent 492822b commit 175acb2
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.subsciber;

import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;

import java.util.Arrays;
import java.util.Collection;

/**
* Event subscriber registry.
*/
public abstract class EventSubscriberRegistry {

private final EventBusContext eventBusContext;

private final Collection<EventSubscriber> subscribers;

public EventSubscriberRegistry(final ContextManager contextManager, final EventSubscriber... subscribers) {
eventBusContext = contextManager.getInstanceContext().getEventBusContext();
this.subscribers = Arrays.asList(subscribers);
}

/**
* Register subscribers.
*/
public void register() {
subscribers.forEach(eventBusContext::register);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberRegister;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand Down Expand Up @@ -84,7 +84,7 @@ private void registerOnline(final RegistryCenter registryCenter, final ContextMa
contextManager.getInstanceContext().getInstance().setLabels(param.getLabels());
contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances());
contextManager.getInstanceContext().getEventBusContext().register(new RuleItemChangedSubscriber(contextManager));
new ContextManagerSubscriberRegister(registryCenter, contextManager).register();
new ClusterEventSubscriberRegistry(contextManager, registryCenter).register();
}

private void loadClusterStatus(final RegistryCenter registryCenter, final ContextManager contextManager) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,23 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;

import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber;

import java.util.Arrays;
import java.util.Collection;
import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;

/**
* Context manager subscriber register.
* Cluster event subscriber registry.
*/
public final class ContextManagerSubscriberRegister implements EventSubscriber {

private final EventBusContext eventBusContext;

private final Collection<EventSubscriber> subscribers;
public final class ClusterEventSubscriberRegistry extends EventSubscriberRegistry {

public ContextManagerSubscriberRegister(final RegistryCenter registryCenter, final ContextManager contextManager) {
eventBusContext = contextManager.getInstanceContext().getEventBusContext();
subscribers = Arrays.asList(
public ClusterEventSubscriberRegistry(final ContextManager contextManager, final RegistryCenter registryCenter) {
super(contextManager, new ConfigurationChangedSubscriber(contextManager),
new ConfigurationChangedSubscriber(contextManager),
new ResourceMetaDataChangedSubscriber(contextManager),
new DatabaseChangedSubscriber(contextManager),
new StateChangedSubscriber(contextManager, registryCenter),
new ProcessListChangedSubscriber(contextManager, registryCenter),
new CacheEvictedSubscriber());
}

/**
* Register subscribers.
*/
public void register() {
subscribers.forEach(eventBusContext::register);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilder;
import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
import org.apache.shardingsphere.mode.manager.standalone.subscriber.StandaloneProcessSubscriber;
import org.apache.shardingsphere.mode.manager.standalone.subscriber.StandaloneEventSubscriberRegistry;
import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory;
import org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository;
import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;

import java.sql.SQLException;
import java.util.Properties;
Expand All @@ -49,10 +48,9 @@ public ContextManager build(final ContextManagerBuilderParameter param) throws S
StandalonePersistRepository.class, null == repositoryConfig ? null : repositoryConfig.getType(), null == repositoryConfig ? new Properties() : repositoryConfig.getProps());
MetaDataPersistService persistService = new MetaDataPersistService(repository);
InstanceContext instanceContext = buildInstanceContext(param);
new StandaloneProcessSubscriber(instanceContext.getEventBusContext());
MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, instanceContext);
ContextManager result = new ContextManager(metaDataContexts, instanceContext);
registerSubscriber(result);
new StandaloneEventSubscriberRegistry(result).register();
setContextManagerAware(result);
return result;
}
Expand All @@ -62,10 +60,6 @@ private InstanceContext buildInstanceContext(final ContextManagerBuilderParamete
new StandaloneWorkerIdGenerator(), param.getModeConfiguration(), new StandaloneModeContextManager(), new GlobalLockContext(null), new EventBusContext());
}

private void registerSubscriber(final ContextManager contextManager) {
contextManager.getInstanceContext().getEventBusContext().register(new RuleItemChangedSubscriber(contextManager));
}

private void setContextManagerAware(final ContextManager contextManager) {
((StandaloneModeContextManager) contextManager.getInstanceContext().getModeContextManager()).setContextManager(contextManager);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.standalone.subscriber;

import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry;
import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber;

/**
* Standalone event subscriber registry.
*/
public final class StandaloneEventSubscriberRegistry extends EventSubscriberRegistry {

public StandaloneEventSubscriberRegistry(final ContextManager contextManager) {
super(contextManager,
new StandaloneProcessSubscriber(contextManager.getInstanceContext().getEventBusContext()),
new RuleItemChangedSubscriber(contextManager));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.standalone.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.Process;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
Expand All @@ -33,15 +34,11 @@
/**
* Standalone process subscriber.
*/
@RequiredArgsConstructor
public final class StandaloneProcessSubscriber implements ProcessSubscriber, EventSubscriber {

private final EventBusContext eventBusContext;

public StandaloneProcessSubscriber(final EventBusContext eventBusContext) {
this.eventBusContext = eventBusContext;
eventBusContext.register(this);
}

@Override
@Subscribe
public void postShowProcessListData(final ShowProcessListRequestEvent event) {
Expand Down

0 comments on commit 175acb2

Please sign in to comment.