Skip to content

Commit

Permalink
Refactor RegistryCenter (#31233)
Browse files Browse the repository at this point in the history
* Refactor RegistryCenter

* Refactor RegistryCenter
  • Loading branch information
terrymanu authored May 15, 2024
1 parent c7978f4 commit d5e1653
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ private String getJDBCDatabaseName() {
}

private void createSubscribers(final ClusterPersistRepository repository) {
new ComputeNodeStatusSubscriber(this, repository);
new ClusterStatusSubscriber(repository, eventBusContext);
new QualifiedDataSourceStatusSubscriber(repository, eventBusContext);
new ClusterProcessSubscriber(repository, eventBusContext);
new ShardingSphereSchemaDataRegistrySubscriber(repository, eventBusContext);
eventBusContext.register(new ComputeNodeStatusSubscriber(this, repository));
eventBusContext.register(new ClusterStatusSubscriber(repository));
eventBusContext.register(new QualifiedDataSourceStatusSubscriber(repository));
eventBusContext.register(new ClusterProcessSubscriber(repository, eventBusContext));
eventBusContext.register(new ShardingSphereSchemaDataRegistrySubscriber(repository));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.data.ShardingSphereDataPersistService;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.data.event.ShardingSphereSchemaDataAlteredEvent;
Expand All @@ -31,9 +30,8 @@ public final class ShardingSphereSchemaDataRegistrySubscriber implements EventSu

private final ShardingSphereDataPersistService persistService;

public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
public ShardingSphereSchemaDataRegistrySubscriber(final ClusterPersistRepository repository) {
persistService = new ShardingSphereDataPersistService(repository);
eventBusContext.register(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.YamlProcessList;
import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
Expand All @@ -42,20 +43,14 @@
/**
* Cluster process subscriber.
*/
@RequiredArgsConstructor
public final class ClusterProcessSubscriber implements ProcessSubscriber, EventSubscriber {

private final PersistRepository repository;

private final EventBusContext eventBusContext;

private final YamlProcessListSwapper swapper;

public ClusterProcessSubscriber(final PersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
this.eventBusContext = eventBusContext;
swapper = new YamlProcessListSwapper();
eventBusContext.register(this);
}
private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();

@Override
@Subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
Expand All @@ -27,15 +27,11 @@
/**
* Cluster status subscriber.
*/
@RequiredArgsConstructor
public final class ClusterStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

public ClusterStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
eventBusContext.register(this);
}

/**
* Update cluster status.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,24 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.subscriber;

import com.google.common.eventbus.Subscribe;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.event.node.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collections;

/**
* Compute node status subscriber.
*/
@RequiredArgsConstructor
public final class ComputeNodeStatusSubscriber implements EventSubscriber {

private final RegistryCenter registryCenter;

private final ClusterPersistRepository repository;

public ComputeNodeStatusSubscriber(final RegistryCenter registryCenter, final ClusterPersistRepository repository) {
this.registryCenter = registryCenter;
this.repository = repository;
registryCenter.getEventBusContext().register(this);
}

/**
* Update compute node status.
*
Expand All @@ -59,10 +53,6 @@ public void update(final ComputeNodeStatusChangedEvent event) {
*/
@Subscribe
public void update(final LabelsChangedEvent event) {
if (event.getLabels().isEmpty()) {
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), Collections.emptyList());
} else {
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
}
registryCenter.getComputeNodeStatusService().persistInstanceLabels(event.getInstanceId(), event.getLabels());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventSubscriber;
import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand All @@ -27,15 +27,11 @@
/**
* Qualified data source status subscriber.
*/
@RequiredArgsConstructor
public final class QualifiedDataSourceStatusSubscriber implements EventSubscriber {

private final ClusterPersistRepository repository;

public QualifiedDataSourceStatusSubscriber(final ClusterPersistRepository repository, final EventBusContext eventBusContext) {
this.repository = repository;
eventBusContext.register(this);
}

/**
* Delete qualified data source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.subscriber;

import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.junit.jupiter.api.extension.ExtendWith;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
import org.apache.shardingsphere.infra.state.cluster.ClusterState;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.Mockito.verify;
Expand All @@ -37,7 +36,7 @@ class ClusterStatusSubscriberTest {

@Test
void assertUpdate() {
ClusterStatusSubscriber clusterStatusSubscriber = new ClusterStatusSubscriber(repository, new EventBusContext());
ClusterStatusSubscriber clusterStatusSubscriber = new ClusterStatusSubscriber(repository);
ClusterStatusChangedEvent event = new ClusterStatusChangedEvent(ClusterState.OK);
clusterStatusSubscriber.update(event);
verify(repository).persist(ComputeNode.getClusterStatusNodePath(), ClusterState.OK.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.subscriber;

import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.event.node.QualifiedDataSourceDeletedEvent;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.storage.node.QualifiedDataSourceNode;
Expand All @@ -35,15 +34,13 @@ class QualifiedDataSourceStatusSubscriberTest {
@Mock
private ClusterPersistRepository repository;

private final EventBusContext eventBusContext = new EventBusContext();

@Test
void assertDeleteStorageNodeDataSourceDataSourceState() {
String databaseName = "replica_query_db";
String groupName = "readwrite_ds";
String dataSourceName = "replica_ds_0";
QualifiedDataSourceDeletedEvent event = new QualifiedDataSourceDeletedEvent(new QualifiedDataSource(databaseName, groupName, dataSourceName));
new QualifiedDataSourceStatusSubscriber(repository, eventBusContext).delete(event);
new QualifiedDataSourceStatusSubscriber(repository).delete(event);
verify(repository).delete(QualifiedDataSourceNode.getQualifiedDataSourceNodePath(new QualifiedDataSource(databaseName, groupName, dataSourceName)));
}
}

0 comments on commit d5e1653

Please sign in to comment.