Skip to content

Commit

Permalink
Revert "Remove ComputeNodeInstanceContextAware interface for ClusterP…
Browse files Browse the repository at this point in the history
…ersistRepository (#31438)" (#31445)

This reverts commit 2995202.
  • Loading branch information
menghaoranss authored May 30, 2024
1 parent 7c549bc commit 9994aab
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ private void createSubscribers(final EventBusContext eventBusContext, final Clus
private void registerOnline(final EventBusContext eventBusContext, final ComputeNodeInstanceContext computeNodeInstanceContext,
final ClusterPersistRepository repository, final ContextManagerBuilderParameter param, final ContextManager contextManager) {
contextManager.getPersistServiceFacade().getComputeNodePersistService().registerOnline(computeNodeInstanceContext.getInstance());
new GovernanceWatcherFactory(repository, eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList())
.watchListeners(computeNodeInstanceContext);
new GovernanceWatcherFactory(repository,
eventBusContext, param.getInstanceMetaData() instanceof JDBCInstanceMetaData ? param.getDatabaseConfigs().keySet() : Collections.emptyList()).watchListeners();
if (null != param.getLabels()) {
contextManager.getComputeNodeInstanceContext().getInstance().getLabels().addAll(param.getLabels());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry;

import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;

import java.util.Collection;
Expand All @@ -39,14 +38,11 @@ public final class GovernanceWatcherFactory {

/**
* Watch listeners.
*
* @param computeNodeInstanceContext compute node instance context
*/
public void watchListeners(final ComputeNodeInstanceContext computeNodeInstanceContext) {
public void watchListeners() {
for (GovernanceWatcher<?> each : ShardingSphereServiceLoader.getServiceInstances(GovernanceWatcher.class)) {
watch(each);
}
repository.watch(computeNodeInstanceContext);
}

private void watch(final GovernanceWatcher<?> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.mode.repository.cluster;

import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.spi.PersistRepository;
Expand Down Expand Up @@ -57,12 +56,4 @@ public interface ClusterPersistRepository extends PersistRepository {
* @param listener data changed event listener
*/
void watch(String key, DataChangedEventListener listener);

/**
* Watch client status.
*
* @param computeNodeInstanceContext compute node instance context
*/
default void watch(ComputeNodeInstanceContext computeNodeInstanceContext) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand Down Expand Up @@ -58,7 +59,7 @@
/**
* Registry repository of ZooKeeper.
*/
public final class ZookeeperRepository implements ClusterPersistRepository {
public final class ZookeeperRepository implements ClusterPersistRepository, ComputeNodeInstanceContextAware {

private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -254,11 +255,6 @@ public void watch(final String key, final DataChangedEventListener listener) {
cache.start();
}

@Override
public void watch(final ComputeNodeInstanceContext computeNodeInstanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
}

@Override
public void close() {
caches.values().forEach(CuratorCache::close);
Expand All @@ -278,6 +274,11 @@ private void waitForCacheClose() {
}
}

@Override
public void setComputeNodeInstanceContext(final ComputeNodeInstanceContext computeNodeInstanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionReconnectListener(computeNodeInstanceContext, this));
}

@Override
public String getType() {
return "ZooKeeper";
Expand Down

0 comments on commit 9994aab

Please sign in to comment.