Skip to content

Commit

Permalink
Refactor ZookeeperRepository remove NodePathTransactionAware (#30799)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaojinchao95 authored Apr 7, 2024
1 parent 9e68371 commit 9f98b43
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.metadata.version.MetaDataVersion;
import org.apache.shardingsphere.metadata.persist.node.DatabaseMetaDataNode;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
import org.apache.shardingsphere.mode.spi.PersistRepository;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Meta data version persist service.
Expand All @@ -42,33 +38,6 @@ public final class MetaDataVersionPersistService implements MetaDataVersionBased

@Override
public void switchActiveVersion(final Collection<MetaDataVersion> metaDataVersions) {
if (repository instanceof NodePathTransactionAware) {
switchActiveVersionWithTransaction(metaDataVersions);
} else {
switchActiveVersionWithoutTransaction(metaDataVersions);
}
}

private void switchActiveVersionWithTransaction(final Collection<MetaDataVersion> metaDataVersions) {
List<NodePathTransactionOperation> nodePathTransactionOperations = buildNodePathTransactionOperations(metaDataVersions);
if (!nodePathTransactionOperations.isEmpty()) {
((NodePathTransactionAware) repository).executeInTransaction(nodePathTransactionOperations);
}
}

private List<NodePathTransactionOperation> buildNodePathTransactionOperations(final Collection<MetaDataVersion> metaDataVersions) {
List<NodePathTransactionOperation> result = new ArrayList<>();
for (MetaDataVersion each : metaDataVersions) {
if (each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
continue;
}
result.add(NodePathTransactionOperation.update(each.getKey() + ACTIVE_VERSION, each.getNextActiveVersion()));
result.add(NodePathTransactionOperation.delete(each.getKey() + VERSIONS + each.getCurrentActiveVersion()));
}
return result;
}

private void switchActiveVersionWithoutTransaction(final Collection<MetaDataVersion> metaDataVersions) {
for (MetaDataVersion each : metaDataVersions) {
if (each.getNextActiveVersion().equals(each.getCurrentActiveVersion())) {
continue;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,20 @@
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.instance.InstanceContextAware;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionAware;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.identifier.NodePathTransactionOperation;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.handler.ZookeeperExceptionHandler;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.listener.SessionConnectionListener;
import org.apache.shardingsphere.mode.repository.cluster.zookeeper.props.ZookeeperProperties;
Expand All @@ -56,14 +52,13 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
* Registry repository of ZooKeeper.
*/
public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware, NodePathTransactionAware {
public final class ZookeeperRepository implements ClusterPersistRepository, InstanceContextAware {

private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -293,39 +288,6 @@ public void setInstanceContext(final InstanceContext instanceContext) {
client.getConnectionStateListenable().addListener(new SessionConnectionListener(instanceContext, this));
}

@Override
public void executeInTransaction(final List<NodePathTransactionOperation> nodePathTransactionOperations) {
try {
client.transaction().forOperations(buildCuratorOps(nodePathTransactionOperations));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
ZookeeperExceptionHandler.handleException(ex);
}
}

private List<CuratorOp> buildCuratorOps(final List<NodePathTransactionOperation> nodePathTransactionOperations) throws Exception {
List<CuratorOp> result = new ArrayList<>(nodePathTransactionOperations.size());
TransactionOp transactionOp = client.transactionOp();
for (NodePathTransactionOperation each : nodePathTransactionOperations) {
result.add(buildCuratorOp(each, transactionOp));
}
return result;
}

private CuratorOp buildCuratorOp(final NodePathTransactionOperation each, final TransactionOp transactionOp) throws Exception {
switch (each.getType()) {
case ADD:
return transactionOp.create().forPath(each.getKey(), each.getValue().getBytes(StandardCharsets.UTF_8));
case UPDATE:
return transactionOp.setData().forPath(each.getKey(), each.getValue().getBytes(StandardCharsets.UTF_8));
case DELETE:
return transactionOp.delete().forPath(each.getKey());
default:
throw new UnsupportedOperationException(each.toString());
}
}

@Override
public String getType() {
return "ZooKeeper";
Expand Down

0 comments on commit 9f98b43

Please sign in to comment.