Skip to content

Commit

Permalink
Refactor RegistryCenter
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed May 15, 2024
1 parent c7978f4 commit 81de94e
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ private String getJDBCDatabaseName() {
}

private void createSubscribers(final ClusterPersistRepository repository) {
new ComputeNodeStatusSubscriber(this, repository);
new ClusterStatusSubscriber(repository, eventBusContext);
new QualifiedDataSourceStatusSubscriber(repository, eventBusContext);
new ClusterProcessSubscriber(repository, eventBusContext);
new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
eventBusContext.register(new ComputeNodeStatusSubscriber(this, repository));
eventBusContext.register(new ClusterStatusSubscriber(repository));
eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext));
eventBusContext.register(new ShardingSphereSchemaDataRegistrySubscriber(repository));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
Expand All @@ -31,9 +30,8 @@ public final class ShardingSphereSchemaDataRegistrySubscriber implements EventSu

private final ShardingSphereDataPersistService persistService;

public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository) {
persistService = new ShardingSphereDataPersistService(repository);
eventBusContext.register(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
Expand All @@ -42,20 +43,14 @@
/**
* Cluster process subscriber.
*/
@RequiredArgsConstructor
public final class ClusterProcessSubscriber implements ProcessSubscriber, EventSubscriber {

private final PersistRepository repository;

private final EventBusContext eventBusContext;

private final YamlProcessListSwapper swapper;

public ClusterProcessSubscriber(final PersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
this.eventBusContext = eventBusContext;
swapper = new YamlProcessListSwapper();
eventBusContext.register(this);
}
private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();

@Override
@Subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
Expand All @@ -27,15 +27,11 @@
/**
* Cluster status subscriber.
*/
@RequiredArgsConstructor
public final class ClusterStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

public ClusterStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
eventBusContext.register(this);
}

/**
* Update cluster status.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,24 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collections;

/**
* Compute node status subscriber.
*/
@RequiredArgsConstructor
public final class ComputeNodeStatusSubscriber implements EventSubscriber {

private final RegistryCenter registryCenter;

private final ClusterPersistRepository repository;

public ComputeNodeStatusSubscriber(final RegistryCenter registryCenter, final ClusterPersistRepository repository) {
this.registryCenter = registryCenter;
this.repository = repository;
registryCenter.getEventBusContext().register(this);
}

/**
* Update compute node status.
*
Expand All @@ -59,10 +53,6 @@ public void update(final ComputeNodeStatusChangedEvent event) {
*/
@Subscribe
public void update(final LabelsChangedEvent event) {
if (event.getLabels().isEmpty()) {
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), Collections.emptyList());
} else {
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
}
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -27,15 +27,11 @@
/**
* Qualified data source status subscriber.
*/
@RequiredArgsConstructor
public final class QualifiedDataSourceStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

public QualifiedDataSourceStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
eventBusContext.register(this);
}

/**
* Delete qualified data source.
*
Expand Down

0 comments on commit 81de94e

Please sign in to comment.