Skip to content

Commit

Permalink
Remove ComputeNodeInstanceContextAware interface for ClusterPersistRe…
Browse files Browse the repository at this point in the history
…pository
  • Loading branch information
menghaoranss committed May 29, 2024
1 parent 0596927 commit e3e1757
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ private void createSubscribers(final EventBusContext eventBusContext, final Clus
private void registerOnline(final EventBusContext eventBusContext, final ComputeNodeInstanceContext computeNodeInstanceContext,
final ClusterPersistRepository repository, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
new GovernanceWatcherFactory(repository,
eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList()).watchListeners();
new GovernanceWatcherFactory(repository, eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList())
.watchListeners(computeNodeInstanceContext);
if (null != param.getLabels()) {
contextManager.getComputeNodeInstanceContext().getInstance().getLabels().addAll(param.getLabels());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collection;
Expand All @@ -38,11 +39,14 @@ public final class GovernanceWatcherFactory {

/**
* Watch listeners.
*
* @param computeNodeInstanceContext compute node instance context
*/
public void watchListeners() {
public void watchListeners(final ComputeNodeInstanceContext computeNodeInstanceContext) {
for (GovernanceWatcher<?> each : ShardingSphereServiceLoader.getServiceInstances(GovernanceWatcher.class)) {
watch(each);
}
repository.watch(computeNodeInstanceContext);
}

private void watch(final GovernanceWatcher<?> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.mode.repository.cluster;

import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.spi.PersistRepository;
Expand Down Expand Up @@ -56,4 +57,12 @@ public interface ClusterPersistRepository extends PersistRepository {
* @param listener data changed event listener
*/
void watch(String key, DataChangedEventListener listener);

/**
* Watch client status.
*
* @param computeNodeInstanceContext compute node instance context
*/
default void watch(ComputeNodeInstanceContext computeNodeInstanceContext) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand Down Expand Up @@ -59,7 +58,7 @@
/**
* Registry repository of ZooKeeper.
*/
public final class ZookeeperRepository implements ClusterPersistRepository, ComputeNodeInstanceContextAware {
public final class ZookeeperRepository implements ClusterPersistRepository {

private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -255,6 +254,11 @@ public void watch(final String key, final DataChangedEventListener listener) {
cache.start();
}

@Override
public void watch(final ComputeNodeInstanceContext computeNodeInstanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
}

@Override
public void close() {
caches.values().forEach(CuratorCache::close);
Expand All @@ -274,11 +278,6 @@ private void waitForCacheClose() {
}
}

@Override
public void setComputeNodeInstanceContext(final ComputeNodeInstanceContext computeNodeInstanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
}

@Override
public String getType() {
return "ZooKeeper";
Expand Down

0 comments on commit e3e1757

Please sign in to comment.