Skip to content

Commit

Permalink
#2965 Atomic ZNode creation on node registration
Browse files Browse the repository at this point in the history
  • Loading branch information
jacob-netguardians committed Nov 19, 2024
1 parent f37f465 commit 24eda31
Showing 1 changed file with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
* under the License.
*/

import com.google.common.collect.ImmutableMap;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -107,9 +106,11 @@
import org.apache.helix.zookeeper.zkclient.NetworkUtil;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -238,19 +239,32 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) {

ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord());

_zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true);
_zkClient
.createPersistent(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true);
_zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true);
// if those nodes are not created atomically, then having two nodes registering almost exactly
// at the same time could be problematic, because one node would not able to check for already
// existing instance with matching logical ID (see
// InstanceUtil.findInstancesWithMatchingLogicalId call above, where finding an incomplete
// "INSTANCE" node is a killer)
final List<Op> ops = Arrays.asList(
createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)),
createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId))
);
_zkClient.multi(ops);

HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId));
}

private static Op createPersistentNodeOp(String path) {
return Op.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

@Override
public void dropInstance(String clusterName, InstanceConfig instanceConfig) {
logger.info("Drop instance {} from cluster {}.", instanceConfig.getInstanceName(), clusterName);
Expand Down

0 comments on commit 24eda31

Please sign in to comment.