diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java index f6409ee689d7a..93bc0a20e198c 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java @@ -101,8 +101,8 @@ private void createSubscribers(final EventBusContext eventBusContext, final Clus private void registerOnline(final EventBusContext eventBusContext, final ComputeNodeInstanceContext computeNodeInstanceContext, final ClusterPersistRepository repository, final ContextManagerBuilderParameter param, final ContextManager contextManager) { contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance()); - new GovernanceWatcherFactory(repository, - eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList()).watchListeners(); + new GovernanceWatcherFactory(repository, eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList()) + .watchListeners(computeNodeInstanceContext); if (null != param.getLabels()) { contextManager.getComputeNodeInstanceContext().getInstance().getLabels().addAll(param.getLabels()); } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java index e12299b7afd54..e37c2c6d34d1a 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/GovernanceWatcherFactory.java @@ -18,8 +18,9 @@ package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry; import lombok.RequiredArgsConstructor; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; +import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; import java.util.Collection; @@ -38,11 +39,14 @@ public final class GovernanceWatcherFactory { /** * Watch listeners. + * + * @param computeNodeInstanceContext compute node instance context */ - public void watchListeners() { + public void watchListeners(final ComputeNodeInstanceContext computeNodeInstanceContext) { for (GovernanceWatcher each : ShardingSphereServiceLoader.getServiceInstances(GovernanceWatcher.class)) { watch(each); } + repository.watch(computeNodeInstanceContext); } private void watch(final GovernanceWatcher listener) { diff --git a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java index b240ecea27b38..0879f6d1b9783 100644 --- a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java +++ b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.mode.repository.cluster; +import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; import org.apache.shardingsphere.mode.spi.PersistRepository; @@ -56,4 +57,12 @@ public interface ClusterPersistRepository extends PersistRepository { * @param listener data changed event listener */ void watch(String key, DataChangedEventListener listener); + + /** + * Watch client status. + * + * @param computeNodeInstanceContext compute node instance context + */ + default void watch(ComputeNodeInstanceContext computeNodeInstanceContext) { + } } diff --git a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java index 76e20a97f79f4..84577037a6852 100644 --- a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java +++ b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java @@ -28,7 +28,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext; -import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware; import org.apache.shardingsphere.mode.event.DataChangedEvent; import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; @@ -59,7 +58,7 @@ /** * Registry repository of ZooKeeper. */ -public final class ZookeeperRepository implements ClusterPersistRepository, ComputeNodeInstanceContextAware { +public final class ZookeeperRepository implements ClusterPersistRepository { private final Map caches = new ConcurrentHashMap<>(); @@ -255,6 +254,11 @@ public void watch(final String key, final DataChangedEventListener listener) { cache.start(); } + @Override + public void watch(final ComputeNodeInstanceContext computeNodeInstanceContext) { + client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this)); + } + @Override public void close() { caches.values().forEach(CuratorCache::close); @@ -274,11 +278,6 @@ private void waitForCacheClose() { } } - @Override - public void setComputeNodeInstanceContext(final ComputeNodeInstanceContext computeNodeInstanceContext) { - client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this)); - } - @Override public String getType() { return "ZooKeeper";