From 175acb2cf2da8983167b3ce8595fd8454060039e Mon Sep 17 00:00:00 2001 From: Liang Zhang Date: Tue, 14 May 2024 19:54:35 +0800 Subject: [PATCH] Add EventSubscriberRegistry (#31227) * Add EventSubscriber * Add StandaloneEventSubscriberRegister * Add EventSubscriberRegistry * Add EventSubscriberRegistry * Add EventSubscriberRegistry --- .../subsciber/EventSubscriberRegistry.java | 47 +++++++++++++++++++ .../cluster/ClusterContextManagerBuilder.java | 4 +- ...va => ClusterEventSubscriberRegistry.java} | 26 ++-------- .../StandaloneContextManagerBuilder.java | 10 +--- .../StandaloneEventSubscriberRegistry.java | 34 ++++++++++++++ .../StandaloneProcessSubscriber.java | 7 +-- 6 files changed, 92 insertions(+), 36 deletions(-) create mode 100644 mode/core/src/main/java/org/apache/shardingsphere/mode/subsciber/EventSubscriberRegistry.java rename mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/{ContextManagerSubscriberRegister.java => ClusterEventSubscriberRegistry.java} (65%) create mode 100644 mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java diff --git a/mode/core/src/main/java/org/apache/shardingsphere/mode/subsciber/EventSubscriberRegistry.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/subsciber/EventSubscriberRegistry.java new file mode 100644 index 0000000000000..cde56c69cf136 --- /dev/null +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/subsciber/EventSubscriberRegistry.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.subsciber; + +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; +import org.apache.shardingsphere.mode.manager.ContextManager; + +import java.util.Arrays; +import java.util.Collection; + +/** + * Event subscriber registry. + */ +public abstract class EventSubscriberRegistry { + + private final EventBusContext eventBusContext; + + private final Collection subscribers; + + public EventSubscriberRegistry(final ContextManager contextManager, final EventSubscriber... subscribers) { + eventBusContext = contextManager.getInstanceContext().getEventBusContext(); + this.subscribers = Arrays.asList(subscribers); + } + + /** + * Register subscribers. + */ + public void register() { + subscribers.forEach(eventBusContext::register); + } +} diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index 3a04234f4e6d4..8a9a5557c6903 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -31,7 +31,7 @@ import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.generator.ClusterWorkerIdGenerator; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ContextManagerSubscriberRegister; +import org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber.ClusterEventSubscriberRegistry; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -84,7 +84,7 @@ private void registerOnline(final RegistryCenter registryCenter, final ContextMa contextManager.getInstanceContext().getInstance().setLabels(param.getLabels()); contextManager.getInstanceContext().getAllClusterInstances().addAll(registryCenter.getComputeNodeStatusService().loadAllComputeNodeInstances()); contextManager.getInstanceContext().getEventBusContext().register(new RuleItemChangedSubscriber(contextManager)); - new ContextManagerSubscriberRegister(registryCenter, contextManager).register(); + new ClusterEventSubscriberRegistry(contextManager, registryCenter).register(); } private void loadClusterStatus(final RegistryCenter registryCenter, final ContextManager contextManager) { diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberRegister.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java similarity index 65% rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberRegister.java rename to mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java index cb3617516bf17..c81a8181e96fd 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ContextManagerSubscriberRegister.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/subscriber/ClusterEventSubscriberRegistry.java @@ -17,27 +17,18 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; -import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber.ProcessListChangedSubscriber; - -import java.util.Arrays; -import java.util.Collection; +import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry; /** - * Context manager subscriber register. + * Cluster event subscriber registry. */ -public final class ContextManagerSubscriberRegister implements EventSubscriber { - - private final EventBusContext eventBusContext; - - private final Collection subscribers; +public final class ClusterEventSubscriberRegistry extends EventSubscriberRegistry { - public ContextManagerSubscriberRegister(final RegistryCenter registryCenter, final ContextManager contextManager) { - eventBusContext = contextManager.getInstanceContext().getEventBusContext(); - subscribers = Arrays.asList( + public ClusterEventSubscriberRegistry(final ContextManager contextManager, final RegistryCenter registryCenter) { + super(contextManager, new ConfigurationChangedSubscriber(contextManager), new ConfigurationChangedSubscriber(contextManager), new ResourceMetaDataChangedSubscriber(contextManager), new DatabaseChangedSubscriber(contextManager), @@ -45,11 +36,4 @@ public ContextManagerSubscriberRegister(final RegistryCenter registryCenter, fin new ProcessListChangedSubscriber(contextManager, registryCenter), new CacheEvictedSubscriber()); } - - /** - * Register subscribers. - */ - public void register() { - subscribers.forEach(eventBusContext::register); - } } diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java index 8349ac4b73c77..1c24b4b20e346 100644 --- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java +++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java @@ -27,12 +27,11 @@ import org.apache.shardingsphere.mode.manager.ContextManager; import org.apache.shardingsphere.mode.manager.ContextManagerBuilder; import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter; -import org.apache.shardingsphere.mode.manager.standalone.subscriber.StandaloneProcessSubscriber; +import org.apache.shardingsphere.mode.manager.standalone.subscriber.StandaloneEventSubscriberRegistry; import org.apache.shardingsphere.mode.manager.standalone.workerid.generator.StandaloneWorkerIdGenerator; import org.apache.shardingsphere.mode.metadata.MetaDataContexts; import org.apache.shardingsphere.mode.metadata.MetaDataContextsFactory; import org.apache.shardingsphere.mode.repository.standalone.StandalonePersistRepository; -import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber; import java.sql.SQLException; import java.util.Properties; @@ -49,10 +48,9 @@ public ContextManager build(final ContextManagerBuilderParameter param) throws S StandalonePersistRepository.class, null == repositoryConfig ? null : repositoryConfig.getType(), null == repositoryConfig ? new Properties() : repositoryConfig.getProps()); MetaDataPersistService persistService = new MetaDataPersistService(repository); InstanceContext instanceContext = buildInstanceContext(param); - new StandaloneProcessSubscriber(instanceContext.getEventBusContext()); MetaDataContexts metaDataContexts = MetaDataContextsFactory.create(persistService, param, instanceContext); ContextManager result = new ContextManager(metaDataContexts, instanceContext); - registerSubscriber(result); + new StandaloneEventSubscriberRegistry(result).register(); setContextManagerAware(result); return result; } @@ -62,10 +60,6 @@ private InstanceContext buildInstanceContext(final ContextManagerBuilderParamete new StandaloneWorkerIdGenerator(), param.getModeConfiguration(), new StandaloneModeContextManager(), new GlobalLockContext(null), new EventBusContext()); } - private void registerSubscriber(final ContextManager contextManager) { - contextManager.getInstanceContext().getEventBusContext().register(new RuleItemChangedSubscriber(contextManager)); - } - private void setContextManagerAware(final ContextManager contextManager) { ((StandaloneModeContextManager) contextManager.getInstanceContext().getModeContextManager()).setContextManager(contextManager); } diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java new file mode 100644 index 0000000000000..10906b82d0f45 --- /dev/null +++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneEventSubscriberRegistry.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.manager.standalone.subscriber; + +import org.apache.shardingsphere.mode.manager.ContextManager; +import org.apache.shardingsphere.mode.subsciber.EventSubscriberRegistry; +import org.apache.shardingsphere.mode.subsciber.RuleItemChangedSubscriber; + +/** + * Standalone event subscriber registry. + */ +public final class StandaloneEventSubscriberRegistry extends EventSubscriberRegistry { + + public StandaloneEventSubscriberRegistry(final ContextManager contextManager) { + super(contextManager, + new StandaloneProcessSubscriber(contextManager.getInstanceContext().getEventBusContext()), + new RuleItemChangedSubscriber(contextManager)); + } +} diff --git a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java index 74c694c425096..8180b6334d615 100644 --- a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java +++ b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/subscriber/StandaloneProcessSubscriber.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.mode.manager.standalone.subscriber; import com.google.common.eventbus.Subscribe; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.executor.sql.process.Process; import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry; import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; @@ -33,15 +34,11 @@ /** * Standalone process subscriber. */ +@RequiredArgsConstructor public final class StandaloneProcessSubscriber implements ProcessSubscriber, EventSubscriber { private final EventBusContext eventBusContext; - public StandaloneProcessSubscriber(final EventBusContext eventBusContext) { - this.eventBusContext = eventBusContext; - eventBusContext.register(this); - } - @Override @Subscribe public void postShowProcessListData(final ShowProcessListRequestEvent event) {