From 81de94e5eedab99f7890c0293468ddda86474a44 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Wed, 15 May 2024 22:03:54 +0800 Subject: [PATCH] Refactor RegistryCenter --- .../cluster/coordinator/RegistryCenter.java | 10 +++++----- ...ardingSphereSchemaDataRegistrySubscriber.java | 4 +--- .../subscriber/ClusterProcessSubscriber.java | 11 +++-------- .../subscriber/ClusterStatusSubscriber.java | 8 ++------ .../subscriber/ComputeNodeStatusSubscriber.java | 16 +++------------- .../QualifiedDataSourceStatusSubscriber.java | 8 ++------ 6 files changed, 16 insertions(+), 41 deletions(-) diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java index f96d03fab2a34..f3372d843ebf8 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/RegistryCenter.java @@ -95,11 +95,11 @@ private String getJDBCDatabaseName() { } private void createSubscribers(final ClusterPersistRepository repository) { - new ComputeNodeStatusSubscriber(this, repository); - new ClusterStatusSubscriber(repository, eventBusContext); - new QualifiedDataSourceStatusSubscriber(repository, eventBusContext); - new ClusterProcessSubscriber(repository, eventBusContext); - new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext); + eventBusContext.register(new ComputeNodeStatusSubscriber(this, repository)); + eventBusContext.register(new ClusterStatusSubscriber(repository)); + eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository)); + eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext)); + eventBusContext.register(new ShardingSphereSchemaDataRegistrySubscriber(repository)); } /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java index 8c738752d9c58..c7843a645edd3 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/ShardingSphereSchemaDataRegistrySubscriber.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent; @@ -31,9 +30,8 @@ public final class ShardingSphereSchemaDataRegistrySubscriber implements EventSu private final ShardingSphereDataPersistService persistService; - public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) { + public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository) { persistService = new ShardingSphereDataPersistService(repository); - eventBusContext.register(this); } /** diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java index f6554b0b5fc68..ca7219b50bb85 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/subscriber/ClusterProcessSubscriber.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber; import com.google.common.eventbus.Subscribe; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry; import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList; import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper; @@ -42,20 +43,14 @@ /** * Cluster process subscriber. */ +@RequiredArgsConstructor public final class ClusterProcessSubscriber implements ProcessSubscriber, EventSubscriber { private final PersistRepository repository; private final EventBusContext eventBusContext; - private final YamlProcessListSwapper swapper; - - public ClusterProcessSubscriber(final PersistRepository repository, final EventBusContext eventBusContext) { - this.repository = repository; - this.eventBusContext = eventBusContext; - swapper = new YamlProcessListSwapper(); - eventBusContext.register(this); - } + private final YamlProcessListSwapper swapper = new YamlProcessListSwapper(); @Override @Subscribe diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java index 12007181f1cd2..da35826018ea4 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/cluster/subscriber/ClusterStatusSubscriber.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.metadata.persist.node.ComputeNode; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent; @@ -27,15 +27,11 @@ /** * Cluster status subscriber. */ +@RequiredArgsConstructor public final class ClusterStatusSubscriber implements EventSubscriber { private final ClusterPersistRepository repository; - public ClusterStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) { - this.repository = repository; - eventBusContext.register(this); - } - /** * Update cluster status. * diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java index 3b24717a67ba0..c1f26ea4b7010 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber; import com.google.common.eventbus.Subscribe; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.metadata.persist.node.ComputeNode; import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent; @@ -25,23 +26,16 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; -import java.util.Collections; - /** * Compute node status subscriber. */ +@RequiredArgsConstructor public final class ComputeNodeStatusSubscriber implements EventSubscriber { private final RegistryCenter registryCenter; private final ClusterPersistRepository repository; - public ComputeNodeStatusSubscriber(final RegistryCenter registryCenter, final ClusterPersistRepository repository) { - this.registryCenter = registryCenter; - this.repository = repository; - registryCenter.getEventBusContext().register(this); - } - /** * Update compute node status. * @@ -59,10 +53,6 @@ public void update(final ComputeNodeStatusChangedEvent event) { */ @Subscribe public void update(final LabelsChangedEvent event) { - if (event.getLabels().isEmpty()) { - registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), Collections.emptyList()); - } else { - registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels()); - } + registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels()); } } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java index 7893bf4d95915..1dd359778a625 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/storage/subscriber/QualifiedDataSourceStatusSubscriber.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber; import com.google.common.eventbus.Subscribe; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber; import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -27,15 +27,11 @@ /** * Qualified data source status subscriber. */ +@RequiredArgsConstructor public final class QualifiedDataSourceStatusSubscriber implements EventSubscriber { private final ClusterPersistRepository repository; - public QualifiedDataSourceStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) { - this.repository = repository; - eventBusContext.register(this); - } - /** * Delete qualified data source. *