diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index b091b70ca8..0dbb6863fa 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -19,7 +19,6 @@ * under the License. */ -import com.google.common.collect.ImmutableMap; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -107,9 +106,11 @@ import org.apache.helix.zookeeper.zkclient.NetworkUtil; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,19 +239,32 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); - _zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true); - _zkClient - .createPersistent(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true); + // if those nodes are not created atomically, then having two nodes registering almost exactly + // at the same time could be problematic, because one node would not able to check for already + // existing instance with matching logical ID (see + // InstanceUtil.findInstancesWithMatchingLogicalId call above, where finding an incomplete + // "INSTANCE" node is a killer) + final List ops = Arrays.asList( + createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId)) + ); + _zkClient.multi(ops); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId)); } + private static Op createPersistentNodeOp(String path) { + return Op.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + @Override public void dropInstance(String clusterName, InstanceConfig instanceConfig) { logger.info("Drop instance {} from cluster {}.", instanceConfig.getInstanceName(), clusterName);