From 24eda3169650d9f26fbb71b30b9752b8600aabde Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Mon, 18 Nov 2024 15:37:13 +0100 Subject: [PATCH 1/4] #2965 Atomic ZNode creation on node registration --- .../apache/helix/manager/zk/ZKHelixAdmin.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index b091b70ca8..0dbb6863fa 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -19,7 +19,6 @@ * under the License. */ -import com.google.common.collect.ImmutableMap; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; @@ -107,9 +106,11 @@ import org.apache.helix.zookeeper.zkclient.NetworkUtil; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; import org.apache.zookeeper.OpResult; +import org.apache.zookeeper.ZooDefs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -238,19 +239,32 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); - _zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId), true); - _zkClient - .createPersistent(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true); - _zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true); + // if those nodes are not created atomically, then having two nodes registering almost exactly + // at the same time could be problematic, because one node would not able to check for already + // existing instance with matching logical ID (see + // InstanceUtil.findInstancesWithMatchingLogicalId call above, where finding an incomplete + // "INSTANCE" node is a killer) + final List ops = Arrays.asList( + createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId)) + ); + _zkClient.multi(ops); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId)); } + private static Op createPersistentNodeOp(String path) { + return Op.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + @Override public void dropInstance(String clusterName, InstanceConfig instanceConfig) { logger.info("Drop instance {} from cluster {}.", instanceConfig.getInstanceName(), clusterName); From 43ba0b0df526421189d5c41fc3cee6f6bb7091aa Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Mon, 9 Dec 2024 16:04:12 +0100 Subject: [PATCH 2/4] #2965 Atomic ZNode creation on node registration Reformatted changed file according to the "helix format" --- .../apache/helix/manager/zk/ZKHelixAdmin.java | 261 +++++++++--------- 1 file changed, 125 insertions(+), 136 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 0dbb6863fa..9d95237bad 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -114,7 +114,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ZKHelixAdmin implements HelixAdmin { private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class); @@ -165,9 +164,8 @@ public ZKHelixAdmin(RealmAwareZkClient zkClient) { public ZKHelixAdmin(String zkAddress) { int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30")); RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = - new RealmAwareZkClient.RealmAwareZkClientConfig() - .setConnectInitTimeout(timeOutInSec * 1000L) - .setZkSerializer(new ZNRecordSerializer()); + new RealmAwareZkClient.RealmAwareZkClientConfig().setConnectInitTimeout( + timeOutInSec * 1000L).setZkSerializer(new ZNRecordSerializer()); RealmAwareZkClient zkClient; @@ -233,8 +231,7 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { logger.error("Failed to add instance " + instanceConfig.getInstanceName() + " to cluster " + clusterName + " with instance operation " + attemptedInstanceOperation + ". Setting INSTANCE_OPERATION to " + instanceConfig.getInstanceOperation() - .getOperation() - + " instead.", e); + .getOperation() + " instead.", e); } ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); @@ -244,16 +241,17 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { // existing instance with matching logical ID (see // InstanceUtil.findInstancesWithMatchingLogicalId call above, where finding an incomplete // "INSTANCE" node is a killer) - final List ops = Arrays.asList( - createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId)) - ); + final List ops = + Arrays.asList(createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), + createPersistentNodeOp( + PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), + createPersistentNodeOp( + PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId))); _zkClient.multi(ops); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); @@ -387,11 +385,10 @@ public boolean setInstanceConfig(String clusterName, String instanceName, InstanceConfig currentInstanceConfig = accessor.getProperty(instanceConfigPropertyKey); if (!newInstanceConfig.getHostName().equals(currentInstanceConfig.getHostName()) || !newInstanceConfig.getPort().equals(currentInstanceConfig.getPort())) { - throw new HelixException( - "Hostname and port cannot be changed, current hostname: " + currentInstanceConfig - .getHostName() + " and port: " + currentInstanceConfig.getPort() - + " is different from new hostname: " + newInstanceConfig.getHostName() - + "and new port: " + newInstanceConfig.getPort()); + throw new HelixException("Hostname and port cannot be changed, current hostname: " + + currentInstanceConfig.getHostName() + " and port: " + currentInstanceConfig.getPort() + + " is different from new hostname: " + newInstanceConfig.getHostName() + "and new port: " + + newInstanceConfig.getPort()); } return accessor.setProperty(instanceConfigPropertyKey, newInstanceConfig); } @@ -517,8 +514,8 @@ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, if (swapInLiveInstance == null) { logger.warn( "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will" - + " not complete unless SwapInInstance instance is ONLINE.", - swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE", + + " not complete unless SwapInInstance instance is ONLINE.", swapOutInstanceName, + swapOutLiveInstance != null ? "ONLINE" : "OFFLINE", swapOutInstanceConfig.getInstanceOperation().getOperation(), swapInInstanceName, swapInInstanceConfig.getInstanceOperation().getOperation(), clusterName); return false; @@ -551,9 +548,8 @@ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, logger.warn( "SwapOutInstance {} has {} pending messages and SwapInInstance {} has {} pending messages for cluster {}." + " Swap will not complete unless both SwapOutInstance(only when live)" - + " and SwapInInstance have no pending messages unless.", - swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName, - swapInPendingMessageCount, clusterName); + + " and SwapInInstance have no pending messages unless.", swapOutInstanceName, + swapOutPendingMessageCount, swapInInstanceName, swapInPendingMessageCount, clusterName); return false; } @@ -647,8 +643,8 @@ public boolean canCompleteSwap(String clusterName, String instanceName) { } InstanceConfig swapOutInstanceConfig = !instanceConfig.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.SWAP_IN) - ? instanceConfig : swappingInstances.get(0); + .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig + : swappingInstances.get(0); InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); @@ -686,8 +682,8 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, } InstanceConfig swapOutInstanceConfig = !instanceConfig.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.SWAP_IN) - ? instanceConfig : swappingInstances.get(0); + .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig + : swappingInstances.get(0); InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); @@ -759,20 +755,22 @@ public boolean forceKillInstance(String clusterName, String instanceName, String InstanceConstants.InstanceOperationSource operationSource) { logger.info("Force kill instance {} in cluster {}.", instanceName, clusterName); - InstanceConfig.InstanceOperation instanceOperationObj = new InstanceConfig.InstanceOperation.Builder() - .setOperation(InstanceConstants.InstanceOperation.UNKNOWN).setReason(reason) - .setSource(operationSource != null ? operationSource : InstanceConstants.InstanceOperationSource.USER).build(); + InstanceConfig.InstanceOperation instanceOperationObj = + new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.UNKNOWN).setReason(reason).setSource( + operationSource != null ? operationSource + : InstanceConstants.InstanceOperationSource.USER).build(); InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); instanceConfig.setInstanceOperation(instanceOperationObj); // Set instance operation to unknown and delete live instance in one operation List operations = Arrays.asList( - Op.setData(PropertyPathBuilder.instanceConfig(clusterName, instanceName), - _zkClient.serialize(instanceConfig.getRecord(), - PropertyPathBuilder.instanceConfig(clusterName, instanceName)), -1), - Op.delete(PropertyPathBuilder.liveInstance(clusterName, instanceName), -1)); + Op.setData(PropertyPathBuilder.instanceConfig(clusterName, instanceName), + _zkClient.serialize(instanceConfig.getRecord(), + PropertyPathBuilder.instanceConfig(clusterName, instanceName)), -1), + Op.delete(PropertyPathBuilder.liveInstance(clusterName, instanceName), -1)); - List< OpResult> opResults = _zkClient.multi(operations); + List opResults = _zkClient.multi(operations); return opResults.stream().noneMatch(result -> result instanceof OpResult.ErrorResult); } @@ -791,15 +789,16 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, // check the instance is alive LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); if (liveInstance == null) { - logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName, - clusterName); + logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", + instanceName, clusterName); return false; } BaseDataAccessor baseAccessor = _baseDataAccessor; // count number of sessions under CurrentState folder. If it is carrying over from prv session, // then there are > 1 session ZNodes. - List sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); + List sessions = baseAccessor.getChildNames( + PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); if (sessions.size() > 1) { logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName, clusterName); @@ -811,7 +810,8 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId); List currentStates = baseAccessor.getChildNames(path, 0); if (currentStates == null) { - logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.", + logger.warn( + "Instance {} in cluster {} does not have live session. The instance can be removed anyway.", instanceName, clusterName); return false; } @@ -833,8 +833,10 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, } @Override - public void enableResource(final String clusterName, final String resourceName, final boolean enabled) { - logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName); + public void enableResource(final String clusterName, final String resourceName, + final boolean enabled) { + logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, + clusterName); String path = PropertyPathBuilder.idealState(clusterName, resourceName); BaseDataAccessor baseAccessor = _baseDataAccessor; if (!baseAccessor.exists(path, 0)) { @@ -1026,26 +1028,26 @@ private void sendStateTransitionMessage(String clusterName, String instanceName, throw new HelixException(String.format( (_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE : SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName, - partitionNames, instanceName, instanceName, clusterName, stateTransitionType))); + partitionNames, instanceName, instanceName, clusterName, stateTransitionType))); } // check resource exists in ideal state IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); if (idealState == null) { - throw new HelixException( - String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName, - partitionNames, instanceName, resourceName, clusterName, stateTransitionType))); + throw new HelixException(String.format( + SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName, partitionNames, + instanceName, resourceName, clusterName, stateTransitionType))); } // check partition exists in resource Set partitionsNames = new HashSet(partitionNames); - Set partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) - ? idealState.getRecord().getMapFields().keySet() - : idealState.getRecord().getListFields().keySet(); + Set partitions = + (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) ? idealState.getRecord() + .getMapFields().keySet() : idealState.getRecord().getListFields().keySet(); if (!partitions.containsAll(partitionsNames)) { - throw new HelixException( - String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName, - partitionNames, instanceName, partitionNames.toString(), clusterName, stateTransitionType))); + throw new HelixException(String.format( + SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName, partitionNames, + instanceName, partitionNames.toString(), clusterName, stateTransitionType))); } // check partition is in ERROR state if reset is set to True @@ -1066,8 +1068,8 @@ private void sendStateTransitionMessage(String clusterName, String instanceName, String stateModelDef = idealState.getStateModelDefRef(); StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); if (stateModel == null) { - throw new HelixException( - String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName, + throw new HelixException(String.format( + SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName, partitionNames, instanceName, stateModelDef, clusterName, stateTransitionType))); } @@ -1075,9 +1077,8 @@ private void sendStateTransitionMessage(String clusterName, String instanceName, List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); for (Message message : messages) { if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) - || !sessionId.equals(message.getTgtSessionId()) - || !resourceName.equals(message.getResourceName()) - || !partitionsNames.contains(message.getPartitionName())) { + || !sessionId.equals(message.getTgtSessionId()) || !resourceName.equals( + message.getResourceName()) || !partitionsNames.contains(message.getPartitionName())) { continue; } @@ -1257,21 +1258,19 @@ private void processMaintenanceMode(String clusterName, final boolean enabled, } // Record a MaintenanceSignal history - if (!accessor.getBaseDataAccessor() - .update(keyBuilder.controllerLeaderHistory().getPath(), - (DataUpdater) oldRecord -> { - try { - if (oldRecord == null) { - oldRecord = new ZNRecord(PropertyType.HISTORY.toString()); - } - return new ControllerHistory(oldRecord) - .updateMaintenanceHistory(enabled, reason, currentTime, internalReason, - customFields, triggeringEntity); - } catch (IOException e) { - logger.error("Failed to update maintenance history! Exception: {}", e); - return oldRecord; - } - }, AccessOption.PERSISTENT)) { + if (!accessor.getBaseDataAccessor().update(keyBuilder.controllerLeaderHistory().getPath(), + (DataUpdater) oldRecord -> { + try { + if (oldRecord == null) { + oldRecord = new ZNRecord(PropertyType.HISTORY.toString()); + } + return new ControllerHistory(oldRecord).updateMaintenanceHistory(enabled, reason, + currentTime, internalReason, customFields, triggeringEntity); + } catch (IOException e) { + logger.error("Failed to update maintenance history! Exception: {}", e); + return oldRecord; + } + }, AccessOption.PERSISTENT)) { logger.error("Failed to write maintenance history to ZK!"); } } @@ -1306,13 +1305,15 @@ private enum StateTransitionType { // Unknown StateTransitionType UNDEFINED } + @Override public void resetPartition(String clusterName, String instanceName, String resourceName, List partitionNames) { logger.info("Reset partitions {} for resource {} on instance {} in cluster {}.", partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName, instanceName, clusterName); - sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, StateTransitionType.RESET); + sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, + StateTransitionType.RESET); } @Override @@ -1484,8 +1485,8 @@ public List getInstancesInClusterWithTag(String clusterName, String tag) for (String instanceName : instances) { InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); if (config == null) { - throw new IllegalStateException(String - .format("Instance %s does not have a config, cluster might be in bad state", + throw new IllegalStateException( + String.format("Instance %s does not have a config, cluster might be in bad state", instanceName)); } if (config.containsTag(tag)) { @@ -1590,8 +1591,8 @@ public List getClusters() { realmToShardingKeys = RoutingDataManager.getInstance().getRawRoutingData(); } else { realmToShardingKeys = RoutingDataManager.getInstance().getRawRoutingData( - RoutingDataReaderType - .lookUp(_zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()), + RoutingDataReaderType.lookUp( + _zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()), routingDataSourceEndpoint); } @@ -1648,9 +1649,8 @@ public IdealState getResourceIdealState(String clusterName, String resourceName) @Override public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) { - logger - .info("Set IdealState for resource {} in cluster {} with new IdealState {}.", resourceName, - clusterName, idealState == null ? "NULL" : idealState.toString()); + logger.info("Set IdealState for resource {} in cluster {} with new IdealState {}.", + resourceName, clusterName, idealState == null ? "NULL" : idealState.toString()); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -1715,9 +1715,8 @@ public void addStateModelDef(String clusterName, String stateModelDef, @Override public void addStateModelDef(String clusterName, String stateModelDef, StateModelDefinition stateModel, boolean recreateIfExists) { - logger - .info("Add StateModelDef {} in cluster {} with StateModel {}.", stateModelDef, clusterName, - stateModel == null ? "NULL" : stateModel.toString()); + logger.info("Add StateModelDef {} in cluster {} with StateModel {}.", stateModelDef, + clusterName, stateModel == null ? "NULL" : stateModel.toString()); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -1877,9 +1876,8 @@ public Map getConfig(HelixConfigScope scope, List keys) @Override public void addCustomizedStateConfig(String clusterName, CustomizedStateConfig customizedStateConfig) { - logger.info( - "Add CustomizedStateConfig to cluster {}, CustomizedStateConfig is {}", - clusterName, customizedStateConfig.toString()); + logger.info("Add CustomizedStateConfig to cluster {}, CustomizedStateConfig is {}", clusterName, + customizedStateConfig.toString()); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); @@ -1891,19 +1889,16 @@ public void addCustomizedStateConfig(String clusterName, ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.customizedStateConfig(), - customizedStateConfigFromBuilder); + accessor.setProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder); } @Override public void removeCustomizedStateConfig(String clusterName) { - logger.info( - "Remove CustomizedStateConfig from cluster {}.", clusterName); + logger.info("Remove CustomizedStateConfig from cluster {}.", clusterName); ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.removeProperty(keyBuilder.customizedStateConfig()); - } @Override @@ -1913,45 +1908,41 @@ public void addTypeToCustomizedStateConfig(String clusterName, String type) { if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } - CustomizedStateConfig.Builder builder = - new CustomizedStateConfig.Builder(); + CustomizedStateConfig.Builder builder = new CustomizedStateConfig.Builder(); builder.addAggregationEnabledType(type); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - if(!accessor.updateProperty(keyBuilder.customizedStateConfig(), + if (!accessor.updateProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder)) { throw new HelixException( "Failed to add customized state config type " + type + " to cluster" + clusterName); } } - @Override public void removeTypeFromCustomizedStateConfig(String clusterName, String type) { - logger.info("Remove type {} to CustomizedStateConfig of cluster {}", type, - clusterName); + logger.info("Remove type {} to CustomizedStateConfig of cluster {}", type, clusterName); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } - CustomizedStateConfig.Builder builder = new CustomizedStateConfig.Builder( - _configAccessor.getCustomizedStateConfig(clusterName)); + CustomizedStateConfig.Builder builder = + new CustomizedStateConfig.Builder(_configAccessor.getCustomizedStateConfig(clusterName)); if (!builder.getAggregationEnabledTypes().contains(type)) { - throw new HelixException("Type " + type - + " is missing from the CustomizedStateConfig of cluster " + clusterName); + throw new HelixException( + "Type " + type + " is missing from the CustomizedStateConfig of cluster " + clusterName); } builder.removeAggregationEnabledType(type); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.customizedStateConfig(), - customizedStateConfigFromBuilder); + accessor.setProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder); } @Override @@ -2078,9 +2069,9 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP } if (idealState.getRebalanceMode() != RebalanceMode.FULL_AUTO && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) { - ZNRecord newIdealState = DefaultIdealStateCalculator - .calculateIdealState(instanceNames, partitions, replica, keyPrefix, masterStateValue, - slaveStateValue); + ZNRecord newIdealState = + DefaultIdealStateCalculator.calculateIdealState(instanceNames, partitions, replica, + keyPrefix, masterStateValue, slaveStateValue); // for now keep mapField in SEMI_AUTO mode and remove listField in CUSTOMIZED mode if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) { @@ -2114,8 +2105,7 @@ public void addIdealState(String clusterName, String resourceName, String idealS setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord)); } - private static byte[] readFile(String filePath) - throws IOException { + private static byte[] readFile(String filePath) throws IOException { File file = new File(filePath); int size = (int) file.length(); @@ -2138,8 +2128,7 @@ private static byte[] readFile(String filePath) @Override public void addStateModelDef(String clusterName, String stateModelDefName, - String stateModelDefFile) - throws IOException { + String stateModelDefFile) throws IOException { ZNRecord record = (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile))); if (record == null || record.getId() == null || !record.getId().equals(stateModelDefName)) { @@ -2238,9 +2227,8 @@ public void rebalance(String clusterName, IdealState currentIdealState, } String[] states = RebalanceUtil.parseStates(clusterName, stateModDef); - ZNRecord newIdealStateRecord = DefaultIdealStateCalculator - .convertToZNRecord(balancedRecord, currentIdealState.getResourceName(), states[0], - states[1]); + ZNRecord newIdealStateRecord = DefaultIdealStateCalculator.convertToZNRecord(balancedRecord, + currentIdealState.getResourceName(), states[0], states[1]); Set partitionSet = new HashSet(); partitionSet.addAll(newIdealStateRecord.getMapFields().keySet()); partitionSet.addAll(newIdealStateRecord.getListFields().keySet()); @@ -2270,8 +2258,8 @@ public void rebalance(String clusterName, IdealState currentIdealState, @Override public void addInstanceTag(String clusterName, String instanceName, String tag) { - logger - .info("Add instance tag {} for instance {} in cluster {}.", tag, instanceName, clusterName); + logger.info("Add instance tag {} for instance {} in cluster {}.", tag, instanceName, + clusterName); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -2330,8 +2318,8 @@ public void setInstanceZoneId(String clusterName, String instanceName, String zo @Override public void enableBatchMessageMode(String clusterName, boolean enabled) { - logger - .info("{} batch message mode for cluster {}.", enabled ? "Enable" : "Disable", clusterName); + logger.info("{} batch message mode for cluster {}.", enabled ? "Enable" : "Disable", + clusterName); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -2414,7 +2402,8 @@ public ZNRecord update(ZNRecord currentData) { ClusterConfig clusterConfig = new ClusterConfig(currentData); Map disabledInstances = new TreeMap<>(clusterConfig.getDisabledInstances()); - Map disabledInstancesWithInfo = new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo()); + Map disabledInstancesWithInfo = + new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo()); if (enabled) { disabledInstances.keySet().removeAll(instances); disabledInstancesWithInfo.keySet().removeAll(instances); @@ -2426,8 +2415,8 @@ public ZNRecord update(ZNRecord currentData) { // TODO: update the history ZNode String timeStamp = String.valueOf(System.currentTimeMillis()); disabledInstances.put(disabledInstance, timeStamp); - disabledInstancesWithInfo - .put(disabledInstance, assembleInstanceBatchedDisabledInfo(disabledType, reason, timeStamp)); + disabledInstancesWithInfo.put(disabledInstance, + assembleInstanceBatchedDisabledInfo(disabledType, reason, timeStamp)); } } clusterConfig.setDisabledInstances(disabledInstances); @@ -2516,14 +2505,14 @@ public boolean addResourceWithWeight(String clusterName, IdealState idealState, // Check that all capacity keys in ClusterConfig are set up in every partition in ResourceConfig field if (!validateWeightForResourceConfig(_configAccessor.getClusterConfig(clusterName), resourceConfig, idealState)) { - throw new HelixException(String - .format("Could not add resource %s with weight! Failed to validate the ResourceConfig!", - idealState.getResourceName())); + throw new HelixException(String.format( + "Could not add resource %s with weight! Failed to validate the ResourceConfig!", + idealState.getResourceName())); } // 2. Add the resourceConfig to ZK - _configAccessor - .setResourceConfig(clusterName, resourceConfig.getResourceName(), resourceConfig); + _configAccessor.setResourceConfig(clusterName, resourceConfig.getResourceName(), + resourceConfig); // 3. Add the idealState to ZK setResourceIdealState(clusterName, idealState.getResourceName(), idealState); @@ -2533,8 +2522,9 @@ public boolean addResourceWithWeight(String clusterName, IdealState idealState, PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List liveNodes = accessor.getChildNames(keyBuilder.liveInstances()); - rebalance(clusterName, idealState.getResourceName(), idealState.getReplicaCount(liveNodes.size()), - idealState.getResourceName(), idealState.getInstanceGroupTag()); + rebalance(clusterName, idealState.getResourceName(), + idealState.getReplicaCount(liveNodes.size()), idealState.getResourceName(), + idealState.getInstanceGroupTag()); return true; } @@ -2599,8 +2589,8 @@ public Map validateResourcesForWagedRebalance(String clusterNam PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List instances = accessor.getChildNames(keyBuilder.instanceConfigs()); if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) { - throw new HelixException(String - .format("Instance capacities haven't been configured properly for cluster %s", + throw new HelixException( + String.format("Instance capacities haven't been configured properly for cluster %s", clusterName)); } @@ -2701,9 +2691,9 @@ private boolean validateWeightForResourceConfig(ClusterConfig clusterConfig, } // Loop through all partitions and validate - capacityMap.keySet().forEach(partitionName -> WagedValidationUtil - .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, - clusterConfig)); + capacityMap.keySet().forEach( + partitionName -> WagedValidationUtil.validateAndGetPartitionCapacity(partitionName, + resourceConfig, capacityMap, clusterConfig)); return true; } @@ -2722,8 +2712,7 @@ public ZKHelixAdmin build() { private Set findTimeoutOfflineInstances(String clusterName, long offlineDuration) { // in case there is no customized timeout value, use the one defined in cluster config if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) { - offlineDuration = - _configAccessor.getClusterConfig(clusterName).getOfflineDurationForPurge(); + offlineDuration = _configAccessor.getClusterConfig(clusterName).getOfflineDurationForPurge(); if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) { return Collections.emptySet(); } From 31b0ffd75f28123f3c86c71a2459f67bc3c310e7 Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Wed, 11 Dec 2024 07:14:31 +0100 Subject: [PATCH 3/4] Revert "#2965 Atomic ZNode creation on node registration" This reverts commit 43ba0b0df526421189d5c41fc3cee6f6bb7091aa. --- .../apache/helix/manager/zk/ZKHelixAdmin.java | 261 +++++++++--------- 1 file changed, 136 insertions(+), 125 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 9d95237bad..0dbb6863fa 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -114,6 +114,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class ZKHelixAdmin implements HelixAdmin { private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class); @@ -164,8 +165,9 @@ public ZKHelixAdmin(RealmAwareZkClient zkClient) { public ZKHelixAdmin(String zkAddress) { int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30")); RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = - new RealmAwareZkClient.RealmAwareZkClientConfig().setConnectInitTimeout( - timeOutInSec * 1000L).setZkSerializer(new ZNRecordSerializer()); + new RealmAwareZkClient.RealmAwareZkClientConfig() + .setConnectInitTimeout(timeOutInSec * 1000L) + .setZkSerializer(new ZNRecordSerializer()); RealmAwareZkClient zkClient; @@ -231,7 +233,8 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { logger.error("Failed to add instance " + instanceConfig.getInstanceName() + " to cluster " + clusterName + " with instance operation " + attemptedInstanceOperation + ". Setting INSTANCE_OPERATION to " + instanceConfig.getInstanceOperation() - .getOperation() + " instead.", e); + .getOperation() + + " instead.", e); } ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); @@ -241,17 +244,16 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { // existing instance with matching logical ID (see // InstanceUtil.findInstancesWithMatchingLogicalId call above, where finding an incomplete // "INSTANCE" node is a killer) - final List ops = - Arrays.asList(createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), - createPersistentNodeOp( - PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), - createPersistentNodeOp( - PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId))); + final List ops = Arrays.asList( + createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId)) + ); _zkClient.multi(ops); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); @@ -385,10 +387,11 @@ public boolean setInstanceConfig(String clusterName, String instanceName, InstanceConfig currentInstanceConfig = accessor.getProperty(instanceConfigPropertyKey); if (!newInstanceConfig.getHostName().equals(currentInstanceConfig.getHostName()) || !newInstanceConfig.getPort().equals(currentInstanceConfig.getPort())) { - throw new HelixException("Hostname and port cannot be changed, current hostname: " - + currentInstanceConfig.getHostName() + " and port: " + currentInstanceConfig.getPort() - + " is different from new hostname: " + newInstanceConfig.getHostName() + "and new port: " - + newInstanceConfig.getPort()); + throw new HelixException( + "Hostname and port cannot be changed, current hostname: " + currentInstanceConfig + .getHostName() + " and port: " + currentInstanceConfig.getPort() + + " is different from new hostname: " + newInstanceConfig.getHostName() + + "and new port: " + newInstanceConfig.getPort()); } return accessor.setProperty(instanceConfigPropertyKey, newInstanceConfig); } @@ -514,8 +517,8 @@ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, if (swapInLiveInstance == null) { logger.warn( "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will" - + " not complete unless SwapInInstance instance is ONLINE.", swapOutInstanceName, - swapOutLiveInstance != null ? "ONLINE" : "OFFLINE", + + " not complete unless SwapInInstance instance is ONLINE.", + swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE", swapOutInstanceConfig.getInstanceOperation().getOperation(), swapInInstanceName, swapInInstanceConfig.getInstanceOperation().getOperation(), clusterName); return false; @@ -548,8 +551,9 @@ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, logger.warn( "SwapOutInstance {} has {} pending messages and SwapInInstance {} has {} pending messages for cluster {}." + " Swap will not complete unless both SwapOutInstance(only when live)" - + " and SwapInInstance have no pending messages unless.", swapOutInstanceName, - swapOutPendingMessageCount, swapInInstanceName, swapInPendingMessageCount, clusterName); + + " and SwapInInstance have no pending messages unless.", + swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName, + swapInPendingMessageCount, clusterName); return false; } @@ -643,8 +647,8 @@ public boolean canCompleteSwap(String clusterName, String instanceName) { } InstanceConfig swapOutInstanceConfig = !instanceConfig.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig - : swappingInstances.get(0); + .equals(InstanceConstants.InstanceOperation.SWAP_IN) + ? instanceConfig : swappingInstances.get(0); InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); @@ -682,8 +686,8 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, } InstanceConfig swapOutInstanceConfig = !instanceConfig.getInstanceOperation().getOperation() - .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig - : swappingInstances.get(0); + .equals(InstanceConstants.InstanceOperation.SWAP_IN) + ? instanceConfig : swappingInstances.get(0); InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); @@ -755,22 +759,20 @@ public boolean forceKillInstance(String clusterName, String instanceName, String InstanceConstants.InstanceOperationSource operationSource) { logger.info("Force kill instance {} in cluster {}.", instanceName, clusterName); - InstanceConfig.InstanceOperation instanceOperationObj = - new InstanceConfig.InstanceOperation.Builder().setOperation( - InstanceConstants.InstanceOperation.UNKNOWN).setReason(reason).setSource( - operationSource != null ? operationSource - : InstanceConstants.InstanceOperationSource.USER).build(); + InstanceConfig.InstanceOperation instanceOperationObj = new InstanceConfig.InstanceOperation.Builder() + .setOperation(InstanceConstants.InstanceOperation.UNKNOWN).setReason(reason) + .setSource(operationSource != null ? operationSource : InstanceConstants.InstanceOperationSource.USER).build(); InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); instanceConfig.setInstanceOperation(instanceOperationObj); // Set instance operation to unknown and delete live instance in one operation List operations = Arrays.asList( - Op.setData(PropertyPathBuilder.instanceConfig(clusterName, instanceName), - _zkClient.serialize(instanceConfig.getRecord(), - PropertyPathBuilder.instanceConfig(clusterName, instanceName)), -1), - Op.delete(PropertyPathBuilder.liveInstance(clusterName, instanceName), -1)); + Op.setData(PropertyPathBuilder.instanceConfig(clusterName, instanceName), + _zkClient.serialize(instanceConfig.getRecord(), + PropertyPathBuilder.instanceConfig(clusterName, instanceName)), -1), + Op.delete(PropertyPathBuilder.liveInstance(clusterName, instanceName), -1)); - List opResults = _zkClient.multi(operations); + List< OpResult> opResults = _zkClient.multi(operations); return opResults.stream().noneMatch(result -> result instanceof OpResult.ErrorResult); } @@ -789,16 +791,15 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, // check the instance is alive LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); if (liveInstance == null) { - logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", - instanceName, clusterName); + logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName, + clusterName); return false; } BaseDataAccessor baseAccessor = _baseDataAccessor; // count number of sessions under CurrentState folder. If it is carrying over from prv session, // then there are > 1 session ZNodes. - List sessions = baseAccessor.getChildNames( - PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); + List sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); if (sessions.size() > 1) { logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName, clusterName); @@ -810,8 +811,7 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId); List currentStates = baseAccessor.getChildNames(path, 0); if (currentStates == null) { - logger.warn( - "Instance {} in cluster {} does not have live session. The instance can be removed anyway.", + logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.", instanceName, clusterName); return false; } @@ -833,10 +833,8 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, } @Override - public void enableResource(final String clusterName, final String resourceName, - final boolean enabled) { - logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, - clusterName); + public void enableResource(final String clusterName, final String resourceName, final boolean enabled) { + logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName); String path = PropertyPathBuilder.idealState(clusterName, resourceName); BaseDataAccessor baseAccessor = _baseDataAccessor; if (!baseAccessor.exists(path, 0)) { @@ -1028,26 +1026,26 @@ private void sendStateTransitionMessage(String clusterName, String instanceName, throw new HelixException(String.format( (_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE : SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName, - partitionNames, instanceName, instanceName, clusterName, stateTransitionType))); + partitionNames, instanceName, instanceName, clusterName, stateTransitionType))); } // check resource exists in ideal state IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); if (idealState == null) { - throw new HelixException(String.format( - SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName, partitionNames, - instanceName, resourceName, clusterName, stateTransitionType))); + throw new HelixException( + String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName, + partitionNames, instanceName, resourceName, clusterName, stateTransitionType))); } // check partition exists in resource Set partitionsNames = new HashSet(partitionNames); - Set partitions = - (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) ? idealState.getRecord() - .getMapFields().keySet() : idealState.getRecord().getListFields().keySet(); + Set partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) + ? idealState.getRecord().getMapFields().keySet() + : idealState.getRecord().getListFields().keySet(); if (!partitions.containsAll(partitionsNames)) { - throw new HelixException(String.format( - SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName, partitionNames, - instanceName, partitionNames.toString(), clusterName, stateTransitionType))); + throw new HelixException( + String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName, + partitionNames, instanceName, partitionNames.toString(), clusterName, stateTransitionType))); } // check partition is in ERROR state if reset is set to True @@ -1068,8 +1066,8 @@ private void sendStateTransitionMessage(String clusterName, String instanceName, String stateModelDef = idealState.getStateModelDefRef(); StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); if (stateModel == null) { - throw new HelixException(String.format( - SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName, + throw new HelixException( + String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName, partitionNames, instanceName, stateModelDef, clusterName, stateTransitionType))); } @@ -1077,8 +1075,9 @@ private void sendStateTransitionMessage(String clusterName, String instanceName, List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); for (Message message : messages) { if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) - || !sessionId.equals(message.getTgtSessionId()) || !resourceName.equals( - message.getResourceName()) || !partitionsNames.contains(message.getPartitionName())) { + || !sessionId.equals(message.getTgtSessionId()) + || !resourceName.equals(message.getResourceName()) + || !partitionsNames.contains(message.getPartitionName())) { continue; } @@ -1258,19 +1257,21 @@ private void processMaintenanceMode(String clusterName, final boolean enabled, } // Record a MaintenanceSignal history - if (!accessor.getBaseDataAccessor().update(keyBuilder.controllerLeaderHistory().getPath(), - (DataUpdater) oldRecord -> { - try { - if (oldRecord == null) { - oldRecord = new ZNRecord(PropertyType.HISTORY.toString()); - } - return new ControllerHistory(oldRecord).updateMaintenanceHistory(enabled, reason, - currentTime, internalReason, customFields, triggeringEntity); - } catch (IOException e) { - logger.error("Failed to update maintenance history! Exception: {}", e); - return oldRecord; - } - }, AccessOption.PERSISTENT)) { + if (!accessor.getBaseDataAccessor() + .update(keyBuilder.controllerLeaderHistory().getPath(), + (DataUpdater) oldRecord -> { + try { + if (oldRecord == null) { + oldRecord = new ZNRecord(PropertyType.HISTORY.toString()); + } + return new ControllerHistory(oldRecord) + .updateMaintenanceHistory(enabled, reason, currentTime, internalReason, + customFields, triggeringEntity); + } catch (IOException e) { + logger.error("Failed to update maintenance history! Exception: {}", e); + return oldRecord; + } + }, AccessOption.PERSISTENT)) { logger.error("Failed to write maintenance history to ZK!"); } } @@ -1305,15 +1306,13 @@ private enum StateTransitionType { // Unknown StateTransitionType UNDEFINED } - @Override public void resetPartition(String clusterName, String instanceName, String resourceName, List partitionNames) { logger.info("Reset partitions {} for resource {} on instance {} in cluster {}.", partitionNames == null ? "NULL" : HelixUtil.serializeByComma(partitionNames), resourceName, instanceName, clusterName); - sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, - StateTransitionType.RESET); + sendStateTransitionMessage(clusterName, instanceName, resourceName, partitionNames, StateTransitionType.RESET); } @Override @@ -1485,8 +1484,8 @@ public List getInstancesInClusterWithTag(String clusterName, String tag) for (String instanceName : instances) { InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); if (config == null) { - throw new IllegalStateException( - String.format("Instance %s does not have a config, cluster might be in bad state", + throw new IllegalStateException(String + .format("Instance %s does not have a config, cluster might be in bad state", instanceName)); } if (config.containsTag(tag)) { @@ -1591,8 +1590,8 @@ public List getClusters() { realmToShardingKeys = RoutingDataManager.getInstance().getRawRoutingData(); } else { realmToShardingKeys = RoutingDataManager.getInstance().getRawRoutingData( - RoutingDataReaderType.lookUp( - _zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()), + RoutingDataReaderType + .lookUp(_zkClient.getRealmAwareZkConnectionConfig().getRoutingDataSourceType()), routingDataSourceEndpoint); } @@ -1649,8 +1648,9 @@ public IdealState getResourceIdealState(String clusterName, String resourceName) @Override public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) { - logger.info("Set IdealState for resource {} in cluster {} with new IdealState {}.", - resourceName, clusterName, idealState == null ? "NULL" : idealState.toString()); + logger + .info("Set IdealState for resource {} in cluster {} with new IdealState {}.", resourceName, + clusterName, idealState == null ? "NULL" : idealState.toString()); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -1715,8 +1715,9 @@ public void addStateModelDef(String clusterName, String stateModelDef, @Override public void addStateModelDef(String clusterName, String stateModelDef, StateModelDefinition stateModel, boolean recreateIfExists) { - logger.info("Add StateModelDef {} in cluster {} with StateModel {}.", stateModelDef, - clusterName, stateModel == null ? "NULL" : stateModel.toString()); + logger + .info("Add StateModelDef {} in cluster {} with StateModel {}.", stateModelDef, clusterName, + stateModel == null ? "NULL" : stateModel.toString()); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -1876,8 +1877,9 @@ public Map getConfig(HelixConfigScope scope, List keys) @Override public void addCustomizedStateConfig(String clusterName, CustomizedStateConfig customizedStateConfig) { - logger.info("Add CustomizedStateConfig to cluster {}, CustomizedStateConfig is {}", clusterName, - customizedStateConfig.toString()); + logger.info( + "Add CustomizedStateConfig to cluster {}, CustomizedStateConfig is {}", + clusterName, customizedStateConfig.toString()); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); @@ -1889,16 +1891,19 @@ public void addCustomizedStateConfig(String clusterName, ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder); + accessor.setProperty(keyBuilder.customizedStateConfig(), + customizedStateConfigFromBuilder); } @Override public void removeCustomizedStateConfig(String clusterName) { - logger.info("Remove CustomizedStateConfig from cluster {}.", clusterName); + logger.info( + "Remove CustomizedStateConfig from cluster {}.", clusterName); ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.removeProperty(keyBuilder.customizedStateConfig()); + } @Override @@ -1908,41 +1913,45 @@ public void addTypeToCustomizedStateConfig(String clusterName, String type) { if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } - CustomizedStateConfig.Builder builder = new CustomizedStateConfig.Builder(); + CustomizedStateConfig.Builder builder = + new CustomizedStateConfig.Builder(); builder.addAggregationEnabledType(type); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - if (!accessor.updateProperty(keyBuilder.customizedStateConfig(), + if(!accessor.updateProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder)) { throw new HelixException( "Failed to add customized state config type " + type + " to cluster" + clusterName); } } + @Override public void removeTypeFromCustomizedStateConfig(String clusterName, String type) { - logger.info("Remove type {} to CustomizedStateConfig of cluster {}", type, clusterName); + logger.info("Remove type {} to CustomizedStateConfig of cluster {}", type, + clusterName); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } - CustomizedStateConfig.Builder builder = - new CustomizedStateConfig.Builder(_configAccessor.getCustomizedStateConfig(clusterName)); + CustomizedStateConfig.Builder builder = new CustomizedStateConfig.Builder( + _configAccessor.getCustomizedStateConfig(clusterName)); if (!builder.getAggregationEnabledTypes().contains(type)) { - throw new HelixException( - "Type " + type + " is missing from the CustomizedStateConfig of cluster " + clusterName); + throw new HelixException("Type " + type + + " is missing from the CustomizedStateConfig of cluster " + clusterName); } builder.removeAggregationEnabledType(type); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder); + accessor.setProperty(keyBuilder.customizedStateConfig(), + customizedStateConfigFromBuilder); } @Override @@ -2069,9 +2078,9 @@ void rebalance(String clusterName, String resourceName, int replica, String keyP } if (idealState.getRebalanceMode() != RebalanceMode.FULL_AUTO && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) { - ZNRecord newIdealState = - DefaultIdealStateCalculator.calculateIdealState(instanceNames, partitions, replica, - keyPrefix, masterStateValue, slaveStateValue); + ZNRecord newIdealState = DefaultIdealStateCalculator + .calculateIdealState(instanceNames, partitions, replica, keyPrefix, masterStateValue, + slaveStateValue); // for now keep mapField in SEMI_AUTO mode and remove listField in CUSTOMIZED mode if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) { @@ -2105,7 +2114,8 @@ public void addIdealState(String clusterName, String resourceName, String idealS setResourceIdealState(clusterName, resourceName, new IdealState(idealStateRecord)); } - private static byte[] readFile(String filePath) throws IOException { + private static byte[] readFile(String filePath) + throws IOException { File file = new File(filePath); int size = (int) file.length(); @@ -2128,7 +2138,8 @@ private static byte[] readFile(String filePath) throws IOException { @Override public void addStateModelDef(String clusterName, String stateModelDefName, - String stateModelDefFile) throws IOException { + String stateModelDefFile) + throws IOException { ZNRecord record = (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(stateModelDefFile))); if (record == null || record.getId() == null || !record.getId().equals(stateModelDefName)) { @@ -2227,8 +2238,9 @@ public void rebalance(String clusterName, IdealState currentIdealState, } String[] states = RebalanceUtil.parseStates(clusterName, stateModDef); - ZNRecord newIdealStateRecord = DefaultIdealStateCalculator.convertToZNRecord(balancedRecord, - currentIdealState.getResourceName(), states[0], states[1]); + ZNRecord newIdealStateRecord = DefaultIdealStateCalculator + .convertToZNRecord(balancedRecord, currentIdealState.getResourceName(), states[0], + states[1]); Set partitionSet = new HashSet(); partitionSet.addAll(newIdealStateRecord.getMapFields().keySet()); partitionSet.addAll(newIdealStateRecord.getListFields().keySet()); @@ -2258,8 +2270,8 @@ public void rebalance(String clusterName, IdealState currentIdealState, @Override public void addInstanceTag(String clusterName, String instanceName, String tag) { - logger.info("Add instance tag {} for instance {} in cluster {}.", tag, instanceName, - clusterName); + logger + .info("Add instance tag {} for instance {} in cluster {}.", tag, instanceName, clusterName); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -2318,8 +2330,8 @@ public void setInstanceZoneId(String clusterName, String instanceName, String zo @Override public void enableBatchMessageMode(String clusterName, boolean enabled) { - logger.info("{} batch message mode for cluster {}.", enabled ? "Enable" : "Disable", - clusterName); + logger + .info("{} batch message mode for cluster {}.", enabled ? "Enable" : "Disable", clusterName); if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -2402,8 +2414,7 @@ public ZNRecord update(ZNRecord currentData) { ClusterConfig clusterConfig = new ClusterConfig(currentData); Map disabledInstances = new TreeMap<>(clusterConfig.getDisabledInstances()); - Map disabledInstancesWithInfo = - new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo()); + Map disabledInstancesWithInfo = new TreeMap<>(clusterConfig.getDisabledInstancesWithInfo()); if (enabled) { disabledInstances.keySet().removeAll(instances); disabledInstancesWithInfo.keySet().removeAll(instances); @@ -2415,8 +2426,8 @@ public ZNRecord update(ZNRecord currentData) { // TODO: update the history ZNode String timeStamp = String.valueOf(System.currentTimeMillis()); disabledInstances.put(disabledInstance, timeStamp); - disabledInstancesWithInfo.put(disabledInstance, - assembleInstanceBatchedDisabledInfo(disabledType, reason, timeStamp)); + disabledInstancesWithInfo + .put(disabledInstance, assembleInstanceBatchedDisabledInfo(disabledType, reason, timeStamp)); } } clusterConfig.setDisabledInstances(disabledInstances); @@ -2505,14 +2516,14 @@ public boolean addResourceWithWeight(String clusterName, IdealState idealState, // Check that all capacity keys in ClusterConfig are set up in every partition in ResourceConfig field if (!validateWeightForResourceConfig(_configAccessor.getClusterConfig(clusterName), resourceConfig, idealState)) { - throw new HelixException(String.format( - "Could not add resource %s with weight! Failed to validate the ResourceConfig!", - idealState.getResourceName())); + throw new HelixException(String + .format("Could not add resource %s with weight! Failed to validate the ResourceConfig!", + idealState.getResourceName())); } // 2. Add the resourceConfig to ZK - _configAccessor.setResourceConfig(clusterName, resourceConfig.getResourceName(), - resourceConfig); + _configAccessor + .setResourceConfig(clusterName, resourceConfig.getResourceName(), resourceConfig); // 3. Add the idealState to ZK setResourceIdealState(clusterName, idealState.getResourceName(), idealState); @@ -2522,9 +2533,8 @@ public boolean addResourceWithWeight(String clusterName, IdealState idealState, PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List liveNodes = accessor.getChildNames(keyBuilder.liveInstances()); - rebalance(clusterName, idealState.getResourceName(), - idealState.getReplicaCount(liveNodes.size()), idealState.getResourceName(), - idealState.getInstanceGroupTag()); + rebalance(clusterName, idealState.getResourceName(), idealState.getReplicaCount(liveNodes.size()), + idealState.getResourceName(), idealState.getInstanceGroupTag()); return true; } @@ -2589,8 +2599,8 @@ public Map validateResourcesForWagedRebalance(String clusterNam PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List instances = accessor.getChildNames(keyBuilder.instanceConfigs()); if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) { - throw new HelixException( - String.format("Instance capacities haven't been configured properly for cluster %s", + throw new HelixException(String + .format("Instance capacities haven't been configured properly for cluster %s", clusterName)); } @@ -2691,9 +2701,9 @@ private boolean validateWeightForResourceConfig(ClusterConfig clusterConfig, } // Loop through all partitions and validate - capacityMap.keySet().forEach( - partitionName -> WagedValidationUtil.validateAndGetPartitionCapacity(partitionName, - resourceConfig, capacityMap, clusterConfig)); + capacityMap.keySet().forEach(partitionName -> WagedValidationUtil + .validateAndGetPartitionCapacity(partitionName, resourceConfig, capacityMap, + clusterConfig)); return true; } @@ -2712,7 +2722,8 @@ public ZKHelixAdmin build() { private Set findTimeoutOfflineInstances(String clusterName, long offlineDuration) { // in case there is no customized timeout value, use the one defined in cluster config if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) { - offlineDuration = _configAccessor.getClusterConfig(clusterName).getOfflineDurationForPurge(); + offlineDuration = + _configAccessor.getClusterConfig(clusterName).getOfflineDurationForPurge(); if (offlineDuration == ClusterConfig.OFFLINE_DURATION_FOR_PURGE_NOT_SET) { return Collections.emptySet(); } From 3686a4232ac76fb2e421cef355a87c78e0da3b45 Mon Sep 17 00:00:00 2001 From: Jacob Deker Date: Wed, 11 Dec 2024 07:16:25 +0100 Subject: [PATCH 4/4] #2965 Atomic ZNode creation on node registration Reformatted changed file according to the "helix format" --- .../apache/helix/manager/zk/ZKHelixAdmin.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 0dbb6863fa..5ade7d3755 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -244,16 +244,17 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { // existing instance with matching logical ID (see // InstanceUtil.findInstancesWithMatchingLogicalId call above, where finding an incomplete // "INSTANCE" node is a killer) - final List ops = Arrays.asList( - createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), - createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId)) - ); + final List ops = + Arrays.asList(createPersistentNodeOp(PropertyPathBuilder.instance(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceMessage(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceCurrentState(clusterName, nodeId)), + createPersistentNodeOp( + PropertyPathBuilder.instanceTaskCurrentState(clusterName, nodeId)), + createPersistentNodeOp( + PropertyPathBuilder.instanceCustomizedState(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceError(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId)), + createPersistentNodeOp(PropertyPathBuilder.instanceHistory(clusterName, nodeId))); _zkClient.multi(ops); HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor);