diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index d5998a166f..ae914682b4 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -19,7 +19,6 @@ * under the License. */ -import com.google.common.collect.ImmutableMap; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -274,18 +273,16 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) { "Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop."); } - // delete config path - String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName); - ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); - // delete instance path - dropInstancePathRecursively(instancePath, instanceConfig.getInstanceName()); + dropInstancePathsRecursively(clusterName, instanceName); } - private void dropInstancePathRecursively(String instancePath, String instanceName) { + private void dropInstancePathsRecursively(String clusterName, String instanceName) { + String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); int retryCnt = 0; while (true) { try { - _zkClient.deleteRecursively(instancePath); + _zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath)); return; } catch (ZkClientException e) { if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause() @@ -333,11 +330,7 @@ public void purgeOfflineInstances(String clusterName, long offlineDuration) { private void purgeInstance(String clusterName, String instanceName) { logger.info("Purge instance {} from cluster {}.", instanceName, clusterName); - - String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); - _zkClient.delete(instanceConfigPath); - String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); - dropInstancePathRecursively(instancePath, instanceName); + dropInstancePathsRecursively(clusterName, instanceName); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java index b54be8c045..b60b909a26 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java @@ -237,7 +237,7 @@ public void testZkHelixAdmin() { new ZkException("ZkException: failed to delete " + instancePath, new KeeperException.NotEmptyException( "NotEmptyException: directory" + instancePath + " is not empty")))) - .when(mockZkClient).deleteRecursively(instancePath); + .when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath)); HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient); try { @@ -1342,4 +1342,29 @@ public void testEnableDisableClusterPauseMode() { _gSetupTool.deleteCluster(clusterName); } } + + @Test + public void testDropInstance() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + int numInstances = 5; + final String clusterName = getShortClassName(); + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.addCluster(clusterName, true); + Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup"); + + // Add instances to cluster + for (int i = 0; i < numInstances; i++) { + admin.addInstance(clusterName, new InstanceConfig("localhost_" + i)); + // Create dummy message nodes + _gZkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, "localhost_" + i, ""+i)); + } + Assert.assertTrue(admin.getInstancesInCluster(clusterName).size() == numInstances, "Instances should be added"); + + for (int i = 0; i < 5; i++) { + admin.dropInstance(clusterName, new InstanceConfig("localhost_" + i)); + } + Assert.assertTrue(admin.getInstancesInCluster(clusterName).isEmpty(), "Instances should be removed"); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } } diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java index e24cc6e9cd..e18e868772 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java @@ -252,6 +252,9 @@ String createEphemeralSequential(final String path, final Object data, final Lis void deleteRecursively(String path); + void deleteRecursivelyAtomic(String path); + void deleteRecursivelyAtomic(List paths); + boolean delete(final String path); boolean delete(final String path, final int expectedVersion); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java index a1a06f48da..e214806293 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java @@ -383,6 +383,20 @@ public void deleteRecursively(String path) { _rawZkClient.deleteRecursively(path); } + @Override + public void deleteRecursivelyAtomic(String path) { + checkIfPathContainsShardingKey(path); + _rawZkClient.deleteRecursivelyAtomic(path); + } + + @Override + public void deleteRecursivelyAtomic(List paths) { + for (String path : paths) { + checkIfPathContainsShardingKey(path); + } + _rawZkClient.deleteRecursivelyAtomic(paths); + } + @Override public boolean delete(String path) { return delete(path, -1); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java index 219ad9703c..7943bc27ad 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java @@ -44,6 +44,7 @@ import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; import org.apache.zookeeper.ZooDefs; @@ -370,6 +371,20 @@ public void deleteRecursively(String path) { getZkClient(path).deleteRecursively(path); } + @Override + public void deleteRecursivelyAtomic(String path) { + getZkClient(path).deleteRecursivelyAtomic(path); + } + + @Override + public void deleteRecursivelyAtomic(List paths) { + // Check if all paths are in the same realm. If not, throw error as we cannot guarantee atomicity across clients. + if (paths.stream().map(this::getZkRealm).distinct().count() > 1) { + throw new IllegalArgumentException("Cannot atomically delete paths across different realms"); + } + getZkClient(paths.get(0)).deleteRecursivelyAtomic(paths); + } + @Override public boolean delete(String path) { return delete(path, -1); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java index 3676288682..210e82bd6b 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java @@ -412,6 +412,20 @@ public void deleteRecursively(String path) { _innerSharedZkClient.deleteRecursively(path); } + @Override + public void deleteRecursivelyAtomic(String path) { + checkIfPathContainsShardingKey(path); + _innerSharedZkClient.deleteRecursivelyAtomic(path); + } + + @Override + public void deleteRecursivelyAtomic(List paths) { + for (String path : paths) { + checkIfPathContainsShardingKey(path); + } + _innerSharedZkClient.deleteRecursivelyAtomic(paths); + } + @Override public boolean delete(String path) { return delete(path, -1); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java index 9d21e7604f..33b8bc185c 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java @@ -20,12 +20,17 @@ */ import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.OptionalLong; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -1820,6 +1825,84 @@ public void deleteRecursively(String path) throws ZkClientException { } } + /** + * Delete the path as well as all its children. This operation is atomic and will either delete all nodes or none. + * This operation may fail if another agent is concurrently creating or deleting nodes under the path. + * @param path ZK path to delete + */ + public void deleteRecursivelyAtomic(String path) { + deleteRecursivelyAtomic(Arrays.asList(path)); + } + + /** + * Delete the paths as well as all their children. This operation is atomic and will either delete all nodes or none. + * This operation may fail if another agent is concurrently creating or deleting nodes under any of the paths. + * @param paths ZK paths to delete + */ + public void deleteRecursivelyAtomic(List paths) { + List ops = new ArrayList<>(); + List opResults; + for (String path : paths) { + ops.addAll(getOpsForRecursiveDelete(path)); + } + + // Return early if no operations to execute + if (ops.isEmpty()) { + return; + } + + try { + opResults = multi(ops); + } catch (Exception e) { + LOG.error("zkclient {}, Failed to delete paths {}, exception {}", _uid, paths, e); + throw new ZkClientException("Failed to delete paths " + paths, e); + } + + // Check if any of the operations failed. Create mapping of failed paths to error codes + Map failedPathsMap = new HashMap<>(); + for (int i = 0; i < opResults.size(); i++) { + if (opResults.get(i) instanceof OpResult.ErrorResult) { + failedPathsMap.put(ops.get(i).getPath(), + KeeperException.Code.get(((OpResult.ErrorResult) opResults.get(i)).getErr())); + } + } + + // Log and throw exception if any of the operations failed + if (!failedPathsMap.isEmpty()) { + LOG.error("zkclient {}, Failed to delete paths {}, multi returned with error codes {} for sub-paths {}", + _uid, paths, failedPathsMap.keySet(), failedPathsMap.values()); + throw new ZkClientException("Failed to delete paths " + paths + " with ZK KeeperException error codes: " + + failedPathsMap.keySet() + " for paths: " + failedPathsMap.values()); + } + } + + /** + * Get the list of operations to delete the given root and all its children. Ops will be ordered so that deletion of + * children will come before parent nodes. + * @param root the root node to delete + * @return the list of ZK operations to delete the given root and all its children + */ + private List getOpsForRecursiveDelete(String root) { + List ops = new ArrayList<>(); + // Return empty list if the root does not exist + if (!exists(root)) { + return ops; + } + + // Level order traversal of tree, adding deleting operation for each node + // This will produce list of operations ordered from parent to children nodes + Queue nodes = new LinkedList<>(); + nodes.offer(root); + while (!nodes.isEmpty()) { + String node = nodes.poll(); + getChildren(node, false).stream().forEach(child -> nodes.offer(node + "/" + child)); + ops.add(Op.delete(node, -1)); + } + // Reverse the list so that operations are ordered from children to parent nodes + Collections.reverse(ops); + return ops; + } + private void processDataOrChildChange(WatchedEvent event, long notificationTime) { final String path = event.getPath(); final boolean pathExists = event.getType() != EventType.NodeDeleted; diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java index e5b589ff8f..0ce0349aba 100644 --- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java +++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java @@ -21,6 +21,7 @@ import java.lang.management.ManagementFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.UUID; @@ -51,6 +52,7 @@ import org.apache.helix.zookeeper.zkclient.ZkServer; import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; import org.apache.helix.zookeeper.zkclient.exception.ZkException; +import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException; import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor; @@ -1231,4 +1233,58 @@ public void testInvalidWriteSizeLimitConfig() { } } } + + @Test + void testDeleteRecursivelyAtomic() { + System.out.println("Start test: " + TestHelper.getTestMethodName()); + String grandParent = "/testDeleteRecursively"; + String parent = grandParent + "/parent"; + String child1 = parent + "/child1"; + String child2 = parent + "/child2"; + _zkClient.createPersistent(grandParent); + _zkClient.createPersistent(parent); + _zkClient.createPersistent(child1); + _zkClient.createPersistent(child2); + Assert.assertTrue(_zkClient.exists(grandParent)); + Assert.assertFalse(_zkClient.getChildren(parent).isEmpty()); + + // Test calling delete on same path twice + try { + _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, grandParent)); + Assert.fail("Operation should not succeed when attempting to delete same path twice"); + } catch (ZkClientException expected) { + // Caught expected exception + } + + Assert.assertTrue(_zkClient.exists(grandParent)); + Assert.assertFalse(_zkClient.getChildren(parent).isEmpty()); + + // Test calling delete on path that is child of another path in the list + try { + _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, parent)); + Assert.fail("Operation should not succeed when attempting to delete same path twice"); + } catch (ZkClientException expected) { + // Caught expected exception + } + + // Test calling delete on single node + Assert.assertTrue(_zkClient.exists(child2)); + _zkClient.deleteRecursivelyAtomic(child2); + Assert.assertFalse(_zkClient.exists(child2)); + + Assert.assertTrue(_zkClient.exists(grandParent)); + Assert.assertFalse(_zkClient.getChildren(parent).isEmpty()); + + // Test successfully delete multiple paths. Also that operation succeeds when attempting to delete non-existent path + String newNode = "/newNode"; + _zkClient.createPersistent(newNode); + Assert.assertTrue(_zkClient.exists(newNode)); + + String nonexistentPath = grandParent + "/nonexistent"; + Assert.assertFalse(_zkClient.exists(nonexistentPath)); + + _zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, newNode, nonexistentPath)); + Assert.assertFalse(_zkClient.exists(grandParent)); + Assert.assertFalse(_zkClient.exists(newNode)); + } }