Skip to content

Commit

Permalink
Add atomic recursive delete to ZK client and use for drop instance (#…
Browse files Browse the repository at this point in the history
…2994)

Add atomic recursive delete to ZK client and use for drop instance
  • Loading branch information
GrantPSpencer authored Jan 28, 2025
1 parent 3233f7d commit 892fc27
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -274,18 +273,16 @@ public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
"Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop.");
}

// delete config path
String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
ZKUtil.dropChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());
// delete instance path
dropInstancePathRecursively(instancePath, instanceConfig.getInstanceName());
dropInstancePathsRecursively(clusterName, instanceName);
}

private void dropInstancePathRecursively(String instancePath, String instanceName) {
private void dropInstancePathsRecursively(String clusterName, String instanceName) {
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
int retryCnt = 0;
while (true) {
try {
_zkClient.deleteRecursively(instancePath);
_zkClient.deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));
return;
} catch (ZkClientException e) {
if (retryCnt < 3 && e.getCause() instanceof ZkException && e.getCause()
Expand Down Expand Up @@ -333,11 +330,7 @@ public void purgeOfflineInstances(String clusterName, long offlineDuration) {

private void purgeInstance(String clusterName, String instanceName) {
logger.info("Purge instance {} from cluster {}.", instanceName, clusterName);

String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
_zkClient.delete(instanceConfigPath);
String instancePath = PropertyPathBuilder.instance(clusterName, instanceName);
dropInstancePathRecursively(instancePath, instanceName);
dropInstancePathsRecursively(clusterName, instanceName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public void testZkHelixAdmin() {
new ZkException("ZkException: failed to delete " + instancePath,
new KeeperException.NotEmptyException(
"NotEmptyException: directory" + instancePath + " is not empty"))))
.when(mockZkClient).deleteRecursively(instancePath);
.when(mockZkClient).deleteRecursivelyAtomic(Arrays.asList(instancePath, instanceConfigPath));

HelixAdmin helixAdminMock = new ZKHelixAdmin(mockZkClient);
try {
Expand Down Expand Up @@ -1342,4 +1342,29 @@ public void testEnableDisableClusterPauseMode() {
_gSetupTool.deleteCluster(clusterName);
}
}

@Test
public void testDropInstance() {
System.out.println("Start test :" + TestHelper.getTestMethodName());
int numInstances = 5;
final String clusterName = getShortClassName();
HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
admin.addCluster(clusterName, true);
Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient), "Cluster should be setup");

// Add instances to cluster
for (int i = 0; i < numInstances; i++) {
admin.addInstance(clusterName, new InstanceConfig("localhost_" + i));
// Create dummy message nodes
_gZkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, "localhost_" + i, ""+i));
}
Assert.assertTrue(admin.getInstancesInCluster(clusterName).size() == numInstances, "Instances should be added");

for (int i = 0; i < 5; i++) {
admin.dropInstance(clusterName, new InstanceConfig("localhost_" + i));
}
Assert.assertTrue(admin.getInstancesInCluster(clusterName).isEmpty(), "Instances should be removed");

System.out.println("End test :" + TestHelper.getTestMethodName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ String createEphemeralSequential(final String path, final Object data, final Lis

void deleteRecursively(String path);

void deleteRecursivelyAtomic(String path);
void deleteRecursivelyAtomic(List<String> paths);

boolean delete(final String path);

boolean delete(final String path, final int expectedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,20 @@ public void deleteRecursively(String path) {
_rawZkClient.deleteRecursively(path);
}

@Override
public void deleteRecursivelyAtomic(String path) {
checkIfPathContainsShardingKey(path);
_rawZkClient.deleteRecursivelyAtomic(path);
}

@Override
public void deleteRecursivelyAtomic(List<String> paths) {
for (String path : paths) {
checkIfPathContainsShardingKey(path);
}
_rawZkClient.deleteRecursivelyAtomic(paths);
}

@Override
public boolean delete(String path) {
return delete(path, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -370,6 +371,20 @@ public void deleteRecursively(String path) {
getZkClient(path).deleteRecursively(path);
}

@Override
public void deleteRecursivelyAtomic(String path) {
getZkClient(path).deleteRecursivelyAtomic(path);
}

@Override
public void deleteRecursivelyAtomic(List<String> paths) {
// Check if all paths are in the same realm. If not, throw error as we cannot guarantee atomicity across clients.
if (paths.stream().map(this::getZkRealm).distinct().count() > 1) {
throw new IllegalArgumentException("Cannot atomically delete paths across different realms");
}
getZkClient(paths.get(0)).deleteRecursivelyAtomic(paths);
}

@Override
public boolean delete(String path) {
return delete(path, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@ public void deleteRecursively(String path) {
_innerSharedZkClient.deleteRecursively(path);
}

@Override
public void deleteRecursivelyAtomic(String path) {
checkIfPathContainsShardingKey(path);
_innerSharedZkClient.deleteRecursivelyAtomic(path);
}

@Override
public void deleteRecursivelyAtomic(List<String> paths) {
for (String path : paths) {
checkIfPathContainsShardingKey(path);
}
_innerSharedZkClient.deleteRecursivelyAtomic(paths);
}

@Override
public boolean delete(String path) {
return delete(path, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
*/

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -1820,6 +1825,84 @@ public void deleteRecursively(String path) throws ZkClientException {
}
}

/**
* Delete the path as well as all its children. This operation is atomic and will either delete all nodes or none.
* This operation may fail if another agent is concurrently creating or deleting nodes under the path.
* @param path ZK path to delete
*/
public void deleteRecursivelyAtomic(String path) {
deleteRecursivelyAtomic(Arrays.asList(path));
}

/**
* Delete the paths as well as all their children. This operation is atomic and will either delete all nodes or none.
* This operation may fail if another agent is concurrently creating or deleting nodes under any of the paths.
* @param paths ZK paths to delete
*/
public void deleteRecursivelyAtomic(List<String> paths) {
List<Op> ops = new ArrayList<>();
List<OpResult> opResults;
for (String path : paths) {
ops.addAll(getOpsForRecursiveDelete(path));
}

// Return early if no operations to execute
if (ops.isEmpty()) {
return;
}

try {
opResults = multi(ops);
} catch (Exception e) {
LOG.error("zkclient {}, Failed to delete paths {}, exception {}", _uid, paths, e);
throw new ZkClientException("Failed to delete paths " + paths, e);
}

// Check if any of the operations failed. Create mapping of failed paths to error codes
Map<String, KeeperException.Code> failedPathsMap = new HashMap<>();
for (int i = 0; i < opResults.size(); i++) {
if (opResults.get(i) instanceof OpResult.ErrorResult) {
failedPathsMap.put(ops.get(i).getPath(),
KeeperException.Code.get(((OpResult.ErrorResult) opResults.get(i)).getErr()));
}
}

// Log and throw exception if any of the operations failed
if (!failedPathsMap.isEmpty()) {
LOG.error("zkclient {}, Failed to delete paths {}, multi returned with error codes {} for sub-paths {}",
_uid, paths, failedPathsMap.keySet(), failedPathsMap.values());
throw new ZkClientException("Failed to delete paths " + paths + " with ZK KeeperException error codes: "
+ failedPathsMap.keySet() + " for paths: " + failedPathsMap.values());
}
}

/**
* Get the list of operations to delete the given root and all its children. Ops will be ordered so that deletion of
* children will come before parent nodes.
* @param root the root node to delete
* @return the list of ZK operations to delete the given root and all its children
*/
private List<Op> getOpsForRecursiveDelete(String root) {
List<Op> ops = new ArrayList<>();
// Return empty list if the root does not exist
if (!exists(root)) {
return ops;
}

// Level order traversal of tree, adding deleting operation for each node
// This will produce list of operations ordered from parent to children nodes
Queue<String> nodes = new LinkedList<>();
nodes.offer(root);
while (!nodes.isEmpty()) {
String node = nodes.poll();
getChildren(node, false).stream().forEach(child -> nodes.offer(node + "/" + child));
ops.add(Op.delete(node, -1));
}
// Reverse the list so that operations are ordered from children to parent nodes
Collections.reverse(ops);
return ops;
}

private void processDataOrChildChange(WatchedEvent event, long notificationTime) {
final String path = event.getPath();
final boolean pathExists = event.getType() != EventType.NodeDeleted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
Expand Down Expand Up @@ -1231,4 +1233,58 @@ public void testInvalidWriteSizeLimitConfig() {
}
}
}

@Test
void testDeleteRecursivelyAtomic() {
System.out.println("Start test: " + TestHelper.getTestMethodName());
String grandParent = "/testDeleteRecursively";
String parent = grandParent + "/parent";
String child1 = parent + "/child1";
String child2 = parent + "/child2";
_zkClient.createPersistent(grandParent);
_zkClient.createPersistent(parent);
_zkClient.createPersistent(child1);
_zkClient.createPersistent(child2);
Assert.assertTrue(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());

// Test calling delete on same path twice
try {
_zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, grandParent));
Assert.fail("Operation should not succeed when attempting to delete same path twice");
} catch (ZkClientException expected) {
// Caught expected exception
}

Assert.assertTrue(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());

// Test calling delete on path that is child of another path in the list
try {
_zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, parent));
Assert.fail("Operation should not succeed when attempting to delete same path twice");
} catch (ZkClientException expected) {
// Caught expected exception
}

// Test calling delete on single node
Assert.assertTrue(_zkClient.exists(child2));
_zkClient.deleteRecursivelyAtomic(child2);
Assert.assertFalse(_zkClient.exists(child2));

Assert.assertTrue(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.getChildren(parent).isEmpty());

// Test successfully delete multiple paths. Also that operation succeeds when attempting to delete non-existent path
String newNode = "/newNode";
_zkClient.createPersistent(newNode);
Assert.assertTrue(_zkClient.exists(newNode));

String nonexistentPath = grandParent + "/nonexistent";
Assert.assertFalse(_zkClient.exists(nonexistentPath));

_zkClient.deleteRecursivelyAtomic(Arrays.asList(grandParent, newNode, nonexistentPath));
Assert.assertFalse(_zkClient.exists(grandParent));
Assert.assertFalse(_zkClient.exists(newNode));
}
}

0 comments on commit 892fc27

Please sign in to comment.