diff --git a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java index 85c22c460b..22f6c7c76f 100644 --- a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java +++ b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java @@ -20,21 +20,54 @@ public class InstanceConstants { * TODO: Remove this when the deprecated HELIX_ENABLED is removed. */ public static final Set INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS = - ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE, InstanceOperation.EVACUATE); + ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.EVACUATE); /** * The set of InstanceOperations that are not allowed to be populated in the RoutingTableProvider. */ - public static final Set UNSERVABLE_INSTANCE_OPERATIONS = + public static final Set UNROUTABLE_INSTANCE_OPERATIONS = ImmutableSet.of(InstanceOperation.SWAP_IN, InstanceOperation.UNKNOWN); + @Deprecated public enum InstanceDisabledType { CLOUD_EVENT, USER_OPERATION, DEFAULT_INSTANCE_DISABLE_TYPE } + public enum InstanceOperationSource { + ADMIN(0), USER(1), AUTOMATION(2), DEFAULT(3); + + private final int _priority; + + InstanceOperationSource(int priority) { + _priority = priority; + } + + public int getPriority() { + return _priority; + } + + /** + * Convert from InstanceDisabledType to InstanceOperationTrigger + * + * @param disabledType InstanceDisabledType + * @return InstanceOperationTrigger + */ + public static InstanceOperationSource instanceDisabledTypeToInstanceOperationSource( + InstanceDisabledType disabledType) { + switch (disabledType) { + case CLOUD_EVENT: + return InstanceOperationSource.AUTOMATION; + case USER_OPERATION: + return InstanceOperationSource.USER; + default: + return InstanceOperationSource.DEFAULT; + } + } + } + public enum InstanceOperation { /** * Behavior: Replicas will be assigned to the node and will receive upward state transitions if diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 84a7154b18..07afb55b6f 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -310,15 +310,38 @@ void enableInstance(String clusterName, String instanceName, boolean enabled, void enableInstance(String clusterName, List instances, boolean enabled); /** - * Set the instanceOperation field. Setting it to null is equivalent to - * ENABLE. + * Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}. * * @param clusterName The cluster name * @param instanceName The instance name - * @param instanceOperation The instance operation + * @param instanceOperation The instance operation type */ void setInstanceOperation(String clusterName, String instanceName, - @Nullable InstanceConstants.InstanceOperation instanceOperation); + InstanceConstants.InstanceOperation instanceOperation); + + /** + * Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}. + * + * @param clusterName The cluster name + * @param instanceName The instance name + * @param instanceOperation The instance operation type + * @param reason The reason for the operation + */ + void setInstanceOperation(String clusterName, String instanceName, + InstanceConstants.InstanceOperation instanceOperation, String reason); + + /** + * Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}. + * + * @param clusterName The cluster name + * @param instanceName The instance name + * @param instanceOperation The instance operation type + * @param reason The reason for the operation + * @param overrideAll Whether to override all existing instance operations from all other + * instance operations + */ + void setInstanceOperation(String clusterName, String instanceName, + InstanceConstants.InstanceOperation instanceOperation, String reason, boolean overrideAll); /** * Disable or enable a resource diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java index 04ad4b798a..20c5001164 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/DefaultCloudEventCallbackImpl.java @@ -23,6 +23,8 @@ import org.apache.helix.HelixManager; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.util.InstanceUtil; import org.apache.helix.util.InstanceValidationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,9 +51,14 @@ public void disableInstance(HelixManager manager, Object eventInfo) { LOG.info("DefaultCloudEventCallbackImpl disable Instance {}", manager.getInstanceName()); if (InstanceValidationUtil .isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) { - manager.getClusterManagmentTool() - .enableInstance(manager.getClusterName(), manager.getInstanceName(), false, - InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message); + InstanceUtil.setInstanceOperation(manager.getConfigAccessor(), + manager.getHelixDataAccessor().getBaseDataAccessor(), manager.getClusterName(), + manager.getInstanceName(), + new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE) + .setSource(InstanceConstants.InstanceOperationSource.AUTOMATION) + .setReason(message) + .build()); } HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(), manager.getInstanceName(), manager.getHelixDataAccessor().getBaseDataAccessor(), false, @@ -72,10 +79,13 @@ public void enableInstance(HelixManager manager, Object eventInfo) { HelixEventHandlingUtil .updateCloudEventOperationInClusterConfig(manager.getClusterName(), instanceName, manager.getHelixDataAccessor().getBaseDataAccessor(), true, message); - if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName, accessor)) { - manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, true, - InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message); - } + InstanceUtil.setInstanceOperation(manager.getConfigAccessor(), + manager.getHelixDataAccessor().getBaseDataAccessor(), manager.getClusterName(), + manager.getInstanceName(), + new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.ENABLE) + .setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).setReason(message) + .build()); } /** diff --git a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java index ee96a13ee7..ceff1d299c 100644 --- a/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java +++ b/helix-core/src/main/java/org/apache/helix/cloud/event/helix/HelixEventHandlingUtil.java @@ -48,7 +48,10 @@ class HelixEventHandlingUtil { * @param dataAccessor * @return return true only when instance is Helix disabled and the disabled reason in * instanceConfig is cloudEvent + * @deprecated No need to check this if using InstanceOperation and specifying the trigger as CLOUD + * when enabling. */ + @Deprecated static boolean isInstanceDisabledForCloudEvent(String instanceName, HelixDataAccessor dataAccessor) { InstanceConfig instanceConfig = diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java index 45b0dde766..cd2b16f922 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java @@ -19,6 +19,7 @@ * under the License. */ +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -62,6 +63,21 @@ protected Map> getNonTrimmableFields(InstanceConfig insta return STATIC_TOPOLOGY_RELATED_FIELD_MAP; } + /** + * We should trim HELIX_INSTANCE_OPERATIONS field, it is used to filter instances in the + * BaseControllerDataProvider. That filtering will be used to determine if ResourceChangeSnapshot + * has changed as opposed to checking the actual value of the field. + * + * @param property the instance config + * @return a map contains all non-trimmable field keys that need to be kept. + */ + protected Map> getNonTrimmableKeys(InstanceConfig property) { + Map> nonTrimmableKeys = super.getNonTrimmableKeys(property); + nonTrimmableKeys.get(FieldType.LIST_FIELD) + .remove(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name()); + return nonTrimmableKeys; + } + @Override public InstanceConfig trimProperty(InstanceConfig property) { return new InstanceConfig(doTrim(property)); diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index a91ae12d27..ce5d3de8c7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -413,14 +413,15 @@ private void updateInstanceSets(Map instanceConfigMap, currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()); newInstanceConfigMapByInstanceOperation.computeIfAbsent( - currentInstanceConfig.getInstanceOperation(), k -> new HashMap<>()) + currentInstanceConfig.getInstanceOperation().getOperation(), + k -> new HashMap<>()) .put(node, currentInstanceConfig); if (currentInstanceConfig.isAssignable()) { newAssignableInstanceConfigMap.put(node, currentInstanceConfig); } - if (currentInstanceConfig.getInstanceOperation() + if (currentInstanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN)) { swapInLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(), currentInstanceLogicalId); @@ -1079,7 +1080,8 @@ private void updateDisabledInstances(Collection allInstanceConfi _disabledInstanceSet.clear(); for (InstanceConfig config : allInstanceConfigs) { Map> disabledPartitionMap = config.getDisabledPartitionsMap(); - if (config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) { + if (config.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.DISABLE)) { _disabledInstanceSet.add(config.getInstanceName()); } for (String resource : disabledPartitionMap.keySet()) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 1db0eccfca..714e9325d1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -36,7 +36,6 @@ import org.apache.helix.HelixRebalanceException; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.LogUtil; -import org.apache.helix.controller.common.ResourcesStateMap; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; @@ -358,11 +357,12 @@ private boolean validateInstancesUnableToAcceptOnlineReplicasLimit(final Resourc if (maxInstancesUnableToAcceptOnlineReplicas >= 0) { // Instead of only checking the offline instances, we consider how many instances in the cluster // are not assignable and live. This is because some instances may be online but have an unassignable - // InstanceOperation such as EVACUATE, DISABLE, or UNKNOWN. We will exclude SWAP_IN instances from + // InstanceOperation such as EVACUATE, and DISABLE. We will exclude SWAP_IN and UNKNOWN instances from // they should not account against the capacity of the cluster. int instancesUnableToAcceptOnlineReplicas = cache.getInstanceConfigMap().entrySet().stream() - .filter(instanceEntry -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains( - instanceEntry.getValue().getInstanceOperation())).collect(Collectors.toSet()) + .filter(instanceEntry -> !InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains( + instanceEntry.getValue().getInstanceOperation().getOperation())) + .collect(Collectors.toSet()) .size() - cache.getEnabledLiveInstances().size(); if (instancesUnableToAcceptOnlineReplicas > maxInstancesUnableToAcceptOnlineReplicas) { String errMsg = String.format( diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 3bf23d22ef..da972d682c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -109,6 +109,7 @@ public void process(ClusterEvent event) throws Exception { // Only update the currentStateExcludingUnknown if the instance is not in UNKNOWN InstanceOperation. if (instanceConfig == null || !instanceConfig.getInstanceOperation() + .getOperation() .equals(InstanceConstants.InstanceOperation.UNKNOWN)) { // update current states. updateCurrentStates(instance, diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 8c873b4cdb..39ae9ae67c 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -42,7 +42,6 @@ import javax.annotation.Nullable; import com.google.common.collect.ImmutableSet; -import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; @@ -69,7 +68,6 @@ import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.ClusterStatus; -import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.ConstraintItem; import org.apache.helix.model.ControllerHistory; import org.apache.helix.model.CurrentState; @@ -89,11 +87,11 @@ import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; -import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.tools.DefaultIdealStateCalculator; import org.apache.helix.util.ConfigStringUtil; import org.apache.helix.util.HelixUtil; +import org.apache.helix.util.InstanceUtil; import org.apache.helix.util.RebalanceUtil; import org.apache.helix.zookeeper.api.client.HelixZkClient; import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; @@ -125,6 +123,7 @@ public class ZKHelixAdmin implements HelixAdmin { private final RealmAwareZkClient _zkClient; private final ConfigAccessor _configAccessor; + private final BaseDataAccessor _baseDataAccessor; // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false otherwise // This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient private final boolean _usesExternalZkClient; @@ -142,6 +141,7 @@ public class ZKHelixAdmin implements HelixAdmin { public ZKHelixAdmin(RealmAwareZkClient zkClient) { _zkClient = zkClient; _configAccessor = new ConfigAccessor(zkClient); + _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient); _usesExternalZkClient = true; } @@ -182,12 +182,14 @@ public ZKHelixAdmin(String zkAddress) { _zkClient = zkClient; _configAccessor = new ConfigAccessor(_zkClient); + _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient); _usesExternalZkClient = false; } private ZKHelixAdmin(RealmAwareZkClient zkClient, boolean usesExternalZkClient) { _zkClient = zkClient; _configAccessor = new ConfigAccessor(_zkClient); + _baseDataAccessor = new ZkBaseDataAccessor<>(zkClient); _usesExternalZkClient = usesExternalZkClient; } @@ -206,7 +208,8 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { } List matchingLogicalIdInstances = - findInstancesMatchingLogicalId(clusterName, instanceConfig); + InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + instanceConfig); if (matchingLogicalIdInstances.size() > 1) { throw new HelixException( "There are already more than one instance with the same logicalId in the cluster: " @@ -216,17 +219,16 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { } InstanceConstants.InstanceOperation attemptedInstanceOperation = - instanceConfig.getInstanceOperation(); + instanceConfig.getInstanceOperation().getOperation(); try { - validateInstanceOperationTransition(instanceConfig, - !matchingLogicalIdInstances.isEmpty() ? matchingLogicalIdInstances.get(0) : null, - InstanceConstants.InstanceOperation.UNKNOWN, - attemptedInstanceOperation, clusterName); + InstanceUtil.validateInstanceOperationTransition(_configAccessor, clusterName, instanceConfig, + InstanceConstants.InstanceOperation.UNKNOWN, attemptedInstanceOperation); } catch (HelixException e) { instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.UNKNOWN); logger.error("Failed to add instance " + instanceConfig.getInstanceName() + " to cluster " + clusterName + " with instance operation " + attemptedInstanceOperation + ". Setting INSTANCE_OPERATION to " + instanceConfig.getInstanceOperation() + .getOperation() + " instead.", e); } @@ -240,8 +242,7 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { _zkClient.createPersistent(PropertyPathBuilder.instanceError(clusterName, nodeId), true); _zkClient.createPersistent(PropertyPathBuilder.instanceStatusUpdate(clusterName, nodeId), true); _zkClient.createPersistent(PropertyPathBuilder.instanceHistory(clusterName, nodeId), true); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.participantHistory(nodeId), new ParticipantHistory(nodeId)); } @@ -344,8 +345,7 @@ public InstanceConfig getInstanceConfig(String clusterName, String instanceName) "instance" + instanceName + " does not exist in cluster " + clusterName); } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.instanceConfig(instanceName)); @@ -364,8 +364,7 @@ public boolean setInstanceConfig(String clusterName, String instanceName, "instance" + instanceName + " does not exist in cluster " + clusterName); } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey instanceConfigPropertyKey = accessor.keyBuilder().instanceConfig(instanceName); InstanceConfig currentInstanceConfig = accessor.getProperty(instanceConfigPropertyKey); if (!newInstanceConfig.getHostName().equals(currentInstanceConfig.getHostName()) @@ -397,9 +396,6 @@ public void enableInstance(final String clusterName, final String instanceName, // Eventually we will have all instances' enable/disable information in clusterConfig. Now we // update both instanceConfig and clusterConfig in transition period. enableSingleInstance(clusterName, instanceName, enabled, baseAccessor, disabledType, reason); -// enableBatchInstances(clusterName, Collections.singletonList(instanceName), enabled, -// baseAccessor, disabledType, reason); - } @Deprecated @@ -413,62 +409,6 @@ public void enableInstance(String clusterName, List instances, boolean e //enableInstance(clusterName, instances, enabled, null, null); } - private void validateInstanceOperationTransition(InstanceConfig instanceConfig, - InstanceConfig matchingLogicalIdInstance, - InstanceConstants.InstanceOperation currentOperation, - InstanceConstants.InstanceOperation targetOperation, - String clusterName) { - boolean targetStateEnableOrDisable = - targetOperation.equals(InstanceConstants.InstanceOperation.ENABLE) - || targetOperation.equals(InstanceConstants.InstanceOperation.DISABLE); - switch (currentOperation) { - case ENABLE: - case DISABLE: - // ENABLE or DISABLE can be set to ENABLE, DISABLE, or EVACUATE at any time. - if (ImmutableSet.of(InstanceConstants.InstanceOperation.ENABLE, - InstanceConstants.InstanceOperation.DISABLE, - InstanceConstants.InstanceOperation.EVACUATE).contains(targetOperation)) { - return; - } - case SWAP_IN: - // We can only ENABLE or DISABLE a SWAP_IN instance if there is an instance with matching logicalId - // with an InstanceOperation set to UNKNOWN. - if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null - || matchingLogicalIdInstance.getInstanceOperation() - .equals(InstanceConstants.InstanceOperation.UNKNOWN))) || targetOperation.equals( - InstanceConstants.InstanceOperation.UNKNOWN)) { - return; - } - case EVACUATE: - // EVACUATE can only be set to ENABLE or DISABLE when there is no instance with the same - // logicalId in the cluster. - if ((targetStateEnableOrDisable && matchingLogicalIdInstance == null) - || targetOperation.equals(InstanceConstants.InstanceOperation.UNKNOWN)) { - return; - } - case UNKNOWN: - // UNKNOWN can be set to ENABLE or DISABLE when there is no instance with the same logicalId in the cluster - // or the instance with the same logicalId in the cluster has InstanceOperation set to EVACUATE. - // UNKNOWN can be set to SWAP_IN when there is an instance with the same logicalId in the cluster set to ENABLE, - // or DISABLE. - if ((targetStateEnableOrDisable && (matchingLogicalIdInstance == null - || matchingLogicalIdInstance.getInstanceOperation() - .equals(InstanceConstants.InstanceOperation.EVACUATE)))) { - return; - } else if (targetOperation.equals(InstanceConstants.InstanceOperation.SWAP_IN) - && matchingLogicalIdInstance != null && !ImmutableSet.of( - InstanceConstants.InstanceOperation.UNKNOWN, - InstanceConstants.InstanceOperation.EVACUATE) - .contains(matchingLogicalIdInstance.getInstanceOperation())) { - return; - } - default: - throw new HelixException( - "InstanceOperation cannot be set to " + targetOperation + " when the instance is in " - + currentOperation + " state"); - } - } - /** * Set the InstanceOperation of an instance in the cluster. * @@ -479,75 +419,57 @@ private void validateInstanceOperationTransition(InstanceConfig instanceConfig, @Override public void setInstanceOperation(String clusterName, String instanceName, @Nullable InstanceConstants.InstanceOperation instanceOperation) { + setInstanceOperation(clusterName, instanceName, instanceOperation, null, false); + } - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor<>(_zkClient); - String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); - - InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); - if (instanceConfig == null) { - throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName - + ", instance config does not exist"); - } - List matchingLogicalIdInstances = - findInstancesMatchingLogicalId(clusterName, instanceConfig); - validateInstanceOperationTransition(instanceConfig, - !matchingLogicalIdInstances.isEmpty() ? matchingLogicalIdInstances.get(0) : null, - instanceConfig.getInstanceOperation(), - instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE : instanceOperation, - clusterName); - - boolean succeeded = baseAccessor.update(path, new DataUpdater() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - throw new HelixException( - "Cluster: " + clusterName + ", instance: " + instanceName + ", participant config is null"); - } - - InstanceConfig config = new InstanceConfig(currentData); - config.setInstanceOperation(instanceOperation); - return config.getRecord(); - } - }, AccessOption.PERSISTENT); + /** + * Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}. + * + * @param clusterName The cluster name + * @param instanceName The instance name + * @param instanceOperation The instance operation type + * @param reason The reason for the operation + */ + @Override + public void setInstanceOperation(String clusterName, String instanceName, + @Nullable InstanceConstants.InstanceOperation instanceOperation, String reason) { + setInstanceOperation(clusterName, instanceName, instanceOperation, reason, false); + } - if (!succeeded) { - throw new HelixException("Failed to update instance operation. Please check if instance is disabled."); - } + /** + * Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}. + * + * @param clusterName The cluster name + * @param instanceName The instance name + * @param instanceOperation The instance operation type + * @param reason The reason for the operation + * @param overrideAll Whether to override all existing instance operations from all other + * instance operations + */ + @Override + public void setInstanceOperation(String clusterName, String instanceName, + @Nullable InstanceConstants.InstanceOperation instanceOperation, String reason, + boolean overrideAll) { + InstanceConfig.InstanceOperation instanceOperationObj = + new InstanceConfig.InstanceOperation.Builder().setOperation( + instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE + : instanceOperation).setReason(reason).setSource( + overrideAll ? InstanceConstants.InstanceOperationSource.ADMIN + : InstanceConstants.InstanceOperationSource.USER).build(); + InstanceUtil.setInstanceOperation(_configAccessor, _baseDataAccessor, clusterName, instanceName, + instanceOperationObj); } @Override public boolean isEvacuateFinished(String clusterName, String instanceName) { if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); - return config != null && config.getInstanceOperation() + return config != null && config.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.EVACUATE); } return false; } - /** - * Find the instance that the passed instance has a matching logicalId with. - * - * @param clusterName The cluster name - * @param instanceConfig The instance to find the matching instance for - * @return The matching instance if found, null otherwise. - */ - private List findInstancesMatchingLogicalId(String clusterName, - InstanceConfig instanceConfig) { - String logicalIdKey = - ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName)) - .getEndNodeType(); - return getConfigKeys( - new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, - clusterName).build()).stream() - .map(instanceName -> getInstanceConfig(clusterName, instanceName)).filter( - potentialInstanceConfig -> - !potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName()) - && potentialInstanceConfig.getLogicalId(logicalIdKey) - .equals(instanceConfig.getLogicalId(logicalIdKey))) - .collect(Collectors.toList()); - } - /** * Check to see if swapping between two instances is ready to be completed. Checks: 1. Both * instances must be alive. 2. Both instances must only have one session and not be carrying over @@ -563,7 +485,7 @@ private List findInstancesMatchingLogicalId(String clusterName, */ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, String swapInInstanceName) { - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); @@ -579,8 +501,8 @@ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will" + " not complete unless SwapInInstance instance is ONLINE.", swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE", - swapOutInstanceConfig.getInstanceOperation(), swapInInstanceName, - swapInInstanceConfig.getInstanceOperation(), clusterName); + swapOutInstanceConfig.getInstanceOperation().getOperation(), swapInInstanceName, + swapInInstanceConfig.getInstanceOperation().getOperation(), clusterName); return false; } @@ -619,7 +541,7 @@ private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, // 4. If the swap-out instance is not alive or is disabled, we return true without checking // the current states on the swap-in instance. - if (swapOutLiveInstance == null || swapOutInstanceConfig.getInstanceOperation() + if (swapOutLiveInstance == null || swapOutInstanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.DISABLE)) { return true; } @@ -697,7 +619,8 @@ public boolean canCompleteSwap(String clusterName, String instanceName) { } List swappingInstances = - findInstancesMatchingLogicalId(clusterName, instanceConfig); + InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + instanceConfig); if (swappingInstances.size() != 1) { logger.warn( "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.", @@ -705,10 +628,10 @@ public boolean canCompleteSwap(String clusterName, String instanceName) { return false; } - InstanceConfig swapOutInstanceConfig = - !instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN) + InstanceConfig swapOutInstanceConfig = !instanceConfig.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); - InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation() + InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); if (swapOutInstanceConfig == null || swapInInstanceConfig == null) { @@ -735,7 +658,8 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, } List swappingInstances = - findInstancesMatchingLogicalId(clusterName, instanceConfig); + InstanceUtil.findInstancesWithMatchingLogicalId(_configAccessor, clusterName, + instanceConfig); if (swappingInstances.size() != 1) { logger.warn( "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.", @@ -743,10 +667,10 @@ public boolean completeSwapIfPossible(String clusterName, String instanceName, return false; } - InstanceConfig swapOutInstanceConfig = - !instanceConfig.getInstanceOperation().equals(InstanceConstants.InstanceOperation.SWAP_IN) + InstanceConfig swapOutInstanceConfig = !instanceConfig.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); - InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation() + InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN) ? instanceConfig : swappingInstances.get(0); if (swapOutInstanceConfig == null || swapInInstanceConfig == null) { @@ -802,7 +726,7 @@ public boolean isReadyForPreparingJoiningCluster(String clusterName, String inst if (!instanceHasFullAutoCurrentStateOrMessage(clusterName, instanceName)) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); return config != null && INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains( - config.getInstanceOperation()); + config.getInstanceOperation().getOperation()); } return false; } @@ -816,7 +740,7 @@ public boolean isReadyForPreparingJoiningCluster(String clusterName, String inst */ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, String instanceName) { - HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); // check the instance is alive @@ -827,7 +751,7 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, return false; } - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; // count number of sessions under CurrentState folder. If it is carrying over from prv session, // then there are > 1 session ZNodes. List sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); @@ -867,7 +791,7 @@ private boolean instanceHasFullAutoCurrentStateOrMessage(String clusterName, public void enableResource(final String clusterName, final String resourceName, final boolean enabled) { logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName); String path = PropertyPathBuilder.idealState(clusterName, resourceName); - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; if (!baseAccessor.exists(path, 0)) { throw new HelixException("Cluster " + clusterName + ", resource: " + resourceName + ", ideal-state does not exist"); @@ -894,7 +818,7 @@ public void enablePartition(final boolean enabled, final String clusterName, instanceName, clusterName); String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; // check instanceConfig exists if (!baseAccessor.exists(path, 0)) { @@ -973,8 +897,7 @@ public void enableCluster(String clusterName, boolean enabled) { public void enableCluster(String clusterName, boolean enabled, String reason) { logger.info("{} cluster {} for reason {}.", enabled ? "Enable" : "Disable", clusterName, reason == null ? "NULL" : reason); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); if (enabled) { @@ -998,8 +921,7 @@ public void enableMaintenanceMode(String clusterName, boolean enabled) { @Override public boolean isInMaintenanceMode(String clusterName) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getBaseDataAccessor() .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT); @@ -1248,8 +1170,7 @@ private void processMaintenanceMode(String clusterName, final boolean enabled, final String reason, final MaintenanceSignal.AutoTriggerReason internalReason, final Map customFields, final MaintenanceSignal.TriggeringEntity triggeringEntity) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName, triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically" @@ -1512,8 +1433,7 @@ public List getInstancesInClusterWithTag(String clusterName, String tag) List instances = _zkClient.getChildren(memberInstancesPath); List result = new ArrayList(); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); for (String instanceName : instances) { @@ -1659,8 +1579,7 @@ public List getResourcesInCluster(String clusterName) { public List getResourcesInClusterWithTag(String clusterName, String tag) { List resourcesWithTag = new ArrayList(); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); for (String resourceName : getResourcesInCluster(clusterName)) { @@ -1675,8 +1594,7 @@ public List getResourcesInClusterWithTag(String clusterName, String tag) @Override public IdealState getResourceIdealState(String clusterName, String resourceName) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.idealStates(resourceName)); @@ -1688,8 +1606,7 @@ public void setResourceIdealState(String clusterName, String resourceName, logger .info("Set IdealState for resource {} in cluster {} with new IdealState {}.", resourceName, clusterName, idealState == null ? "NULL" : idealState.toString()); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.idealStates(resourceName), idealState); @@ -1731,8 +1648,7 @@ public void removeFromIdealState(String clusterName, String resourceName, IdealS @Override public ExternalView getResourceExternalView(String clusterName, String resourceName) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.externalView(resourceName)); } @@ -1740,8 +1656,7 @@ public ExternalView getResourceExternalView(String clusterName, String resourceN @Override public CustomizedView getResourceCustomizedView(String clusterName, String resourceName, String customizedStateType) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.customizedView(customizedStateType, resourceName)); } @@ -1774,8 +1689,7 @@ public void addStateModelDef(String clusterName, String stateModelDef, } } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel); } @@ -1786,8 +1700,7 @@ public void dropResource(String clusterName, String resourceName) { if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("Cluster " + clusterName + " is not setup yet"); } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.removeProperty(keyBuilder.idealStates(resourceName)); @@ -1806,8 +1719,7 @@ public void addCloudConfig(String clusterName, CloudConfig cloudConfig) { CloudConfig.Builder builder = new CloudConfig.Builder(cloudConfig); CloudConfig cloudConfigBuilder = builder.build(); - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.cloudConfig(), cloudConfigBuilder); } @@ -1815,8 +1727,7 @@ public void addCloudConfig(String clusterName, CloudConfig cloudConfig) { @Override public void removeCloudConfig(String clusterName) { logger.info("Remove Cloud Config for cluster {}.", clusterName); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.removeProperty(keyBuilder.cloudConfig()); } @@ -1847,8 +1758,7 @@ public List getStateModelDefs(String clusterName) { @Override public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); return accessor.getProperty(keyBuilder.stateModelDef(stateModelName)); @@ -1857,8 +1767,7 @@ public StateModelDefinition getStateModelDef(String clusterName, String stateMod @Override public void dropCluster(String clusterName) { logger.info("Deleting cluster {}.", clusterName); - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); String root = "/" + clusterName; @@ -1935,8 +1844,7 @@ public void addCustomizedStateConfig(String clusterName, new CustomizedStateConfig.Builder(customizedStateConfig); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder); @@ -1947,8 +1855,7 @@ public void removeCustomizedStateConfig(String clusterName) { logger.info( "Remove CustomizedStateConfig from cluster {}.", clusterName); - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.removeProperty(keyBuilder.customizedStateConfig()); @@ -1967,8 +1874,7 @@ public void addTypeToCustomizedStateConfig(String clusterName, String type) { builder.addAggregationEnabledType(type); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); if(!accessor.updateProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder)) { @@ -1997,8 +1903,7 @@ public void removeTypeFromCustomizedStateConfig(String clusterName, String type) builder.removeAggregationEnabledType(type); CustomizedStateConfig customizedStateConfigFromBuilder = builder.build(); - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.customizedStateConfig(), customizedStateConfigFromBuilder); @@ -2021,7 +1926,7 @@ public void rebalance(String clusterName, String resourceName, int replica) { @Override public void onDemandRebalance(String clusterName) { - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; String path = PropertyPathBuilder.clusterConfig(clusterName); if (!baseAccessor.exists(path, 0)) { @@ -2204,7 +2109,7 @@ public void setConstraint(String clusterName, final ConstraintType constraintTyp final String constraintId, final ConstraintItem constraintItem) { logger.info("Set constraint type {} with constraint id {} for cluster {}.", constraintType, constraintId, clusterName); - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); String path = keyBuilder.constraint(constraintType.toString()).getPath(); @@ -2227,7 +2132,7 @@ public void removeConstraint(String clusterName, final ConstraintType constraint final String constraintId) { logger.info("Remove constraint type {} with constraint id {} for cluster {}.", constraintType, constraintId, clusterName); - BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + BaseDataAccessor baseAccessor = _baseDataAccessor; PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); String path = keyBuilder.constraint(constraintType.toString()).getPath(); @@ -2248,8 +2153,7 @@ public ZNRecord update(ZNRecord currentData) { @Override public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType) { - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); return accessor.getProperty(keyBuilder.constraint(constraintType.toString())); @@ -2331,8 +2235,7 @@ public void addInstanceTag(String clusterName, String instanceName, String tag) throw new HelixException( "cluster " + clusterName + " instance " + instanceName + " is not setup yet"); } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); @@ -2352,8 +2255,7 @@ public void removeInstanceTag(String clusterName, String instanceName, String ta throw new HelixException( "cluster " + clusterName + " instance " + instanceName + " is not setup yet"); } - ZKHelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); @@ -2373,8 +2275,7 @@ public void setInstanceZoneId(String clusterName, String instanceName, String zo throw new HelixException( "cluster " + clusterName + " instance " + instanceName + " is not setup yet"); } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName)); @@ -2431,23 +2332,19 @@ public ZNRecord update(ZNRecord currentData) { } InstanceConfig config = new InstanceConfig(currentData); - config.setInstanceEnabled(enabled); - if (!enabled) { - // new disabled type and reason will overwrite existing ones. - config.resetInstanceDisabledTypeAndReason(); - if (reason != null) { - config.setInstanceDisabledReason(reason); - } - if (disabledType != null) { - config.setInstanceDisabledType(disabledType); - } - } + config.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + enabled ? InstanceConstants.InstanceOperation.ENABLE + : InstanceConstants.InstanceOperation.DISABLE).setReason(reason).setSource( + disabledType != null + ? InstanceConstants.InstanceOperationSource.instanceDisabledTypeToInstanceOperationSource( + disabledType) : null).build()); return config.getRecord(); } }, AccessOption.PERSISTENT); } // TODO: Add history ZNode for all batched enabling/disabling histories with metadata. + @Deprecated private void enableBatchInstances(final String clusterName, final List instances, final boolean enabled, BaseDataAccessor baseAccessor, InstanceConstants.InstanceDisabledType disabledType, String reason) { @@ -2783,8 +2680,7 @@ private Set findTimeoutOfflineInstances(String clusterName, long offline } } - HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); List instanceConfigNames = accessor.getChildNames(keyBuilder.instanceConfigs()); List instancePathNames = accessor.getChildNames(keyBuilder.instances()); diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index de41646c39..1b3acd68d6 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -29,6 +29,11 @@ import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; @@ -48,11 +53,13 @@ public class InstanceConfig extends HelixProperty { * Configurable characteristics of an instance */ public enum InstanceConfigProperty { - HELIX_HOST, HELIX_PORT, HELIX_ZONE_ID, @Deprecated - HELIX_ENABLED, + HELIX_HOST, + HELIX_PORT, + HELIX_ZONE_ID, + @Deprecated HELIX_ENABLED, HELIX_ENABLED_TIMESTAMP, - HELIX_DISABLED_REASON, - HELIX_DISABLED_TYPE, + @Deprecated HELIX_DISABLED_REASON, + @Deprecated HELIX_DISABLED_TYPE, HELIX_DISABLED_PARTITION, TAG_LIST, INSTANCE_WEIGHT, @@ -60,15 +67,128 @@ public enum InstanceConfigProperty { DELAY_REBALANCE_ENABLED, MAX_CONCURRENT_TASK, INSTANCE_INFO_MAP, - INSTANCE_CAPACITY_MAP, - TARGET_TASK_THREAD_POOL_SIZE, - INSTANCE_OPERATION + INSTANCE_CAPACITY_MAP, TARGET_TASK_THREAD_POOL_SIZE, HELIX_INSTANCE_OPERATIONS + } + + public static class InstanceOperation { + private final Map _properties; + + private enum InstanceOperationProperties { + OPERATION, REASON, SOURCE, TIMESTAMP + } + + private InstanceOperation(@Nullable Map properties) { + // Default to ENABLE operation if no operation type is provided. + _properties = properties == null ? new HashMap<>() : properties; + if (!_properties.containsKey(InstanceOperationProperties.OPERATION.name())) { + _properties.put(InstanceOperationProperties.OPERATION.name(), + InstanceConstants.InstanceOperation.ENABLE.name()); + } + } + + public static class Builder { + private Map _properties = new HashMap<>(); + + /** + * Set the operation type for this instance operation. + * @param operationType InstanceOperation type of this instance operation. + */ + public Builder setOperation(@Nullable InstanceConstants.InstanceOperation operationType) { + _properties.put(InstanceOperationProperties.OPERATION.name(), + operationType == null ? InstanceConstants.InstanceOperation.ENABLE.name() + : operationType.name()); + return this; + } + + /** + * Set the reason for this instance operation. + * @param reason + */ + public Builder setReason(String reason) { + _properties.put(InstanceOperationProperties.REASON.name(), reason != null ? reason : ""); + return this; + } + + /** + * Set the source for this instance operation. + * @param source InstanceOperationSource + * that caused this instance operation to be triggered. + */ + public Builder setSource(InstanceConstants.InstanceOperationSource source) { + _properties.put(InstanceOperationProperties.SOURCE.name(), + source == null ? InstanceConstants.InstanceOperationSource.USER.name() + : source.name()); + return this; + } + + public InstanceOperation build() throws IllegalArgumentException { + if (!_properties.containsKey(InstanceOperationProperties.OPERATION.name())) { + throw new IllegalArgumentException( + "Instance operation type is not set, this is a required field."); + } + _properties.put(InstanceOperationProperties.TIMESTAMP.name(), + String.valueOf(System.currentTimeMillis())); + return new InstanceOperation(_properties); + } + } + + /** + * Get the operation type of this instance operation. + * @return the InstanceOperation type + */ + public InstanceConstants.InstanceOperation getOperation() throws IllegalArgumentException { + return InstanceConstants.InstanceOperation.valueOf( + _properties.get(InstanceOperationProperties.OPERATION.name())); + } + + /** + * Get the reason for this instance operation. + * If the reason is not set, it will default to an empty string. + * + * @return the reason for this instance operation. + */ + public String getReason() { + return _properties.getOrDefault(InstanceOperationProperties.REASON.name(), ""); + } + + /** + * Get the InstanceOperationSource + * that caused this instance operation to be triggered. + * If the source is not set, it will default to DEFAULT. + * + * @return the InstanceOperationSource + *that caused this instance operation to be triggered. + */ + public InstanceConstants.InstanceOperationSource getSource() { + return InstanceConstants.InstanceOperationSource.valueOf( + _properties.getOrDefault(InstanceOperationProperties.SOURCE.name(), + InstanceConstants.InstanceOperationSource.USER.name())); + } + + /** + * Get the timestamp (milliseconds from epoch) when this instance operation was triggered. + * + * @return the timestamp when the instance operation was triggered. + */ + public long getTimestamp() { + return Long.parseLong(_properties.get(InstanceOperationProperties.TIMESTAMP.name())); + } + + private void setTimestamp(long timestamp) { + _properties.put(InstanceOperationProperties.TIMESTAMP.name(), String.valueOf(timestamp)); + } + + private Map getProperties() { + return _properties; + } } public static final int WEIGHT_NOT_SET = -1; public static final int MAX_CONCURRENT_TASK_NOT_SET = -1; private static final int TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1; private static final boolean HELIX_ENABLED_DEFAULT_VALUE = true; + private static final long HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE = -1; + private static final ObjectMapper _objectMapper = new ObjectMapper(); // These fields are not allowed to be overwritten by the merge method because // they are unique properties of an instance. @@ -79,6 +199,8 @@ public enum InstanceConfigProperty { private static final Logger _logger = LoggerFactory.getLogger(InstanceConfig.class.getName()); + private List _deserializedInstanceOperations; + /** * Instantiate for a specific instance * @param instanceId the instance identifier @@ -264,25 +386,28 @@ public boolean containsTag(String tag) { * enabled/disabled, return -1. */ public long getInstanceEnabledTime() { - return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1); + return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), + HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE); } /** * Set the enabled state of the instance If user enables the instance, HELIX_DISABLED_REASON filed * will be removed. - * @deprecated This method is deprecated. Please use setInstanceOperation instead. * @param enabled true to enable, false to disable + * @deprecated This method is deprecated. Please use setInstanceOperation instead. */ @Deprecated public void setInstanceEnabled(boolean enabled) { // set instance operation only when we need to change InstanceEnabled value. - setInstanceEnabledHelper(enabled); + setInstanceEnabledHelper(enabled, null); } - private void setInstanceEnabledHelper(boolean enabled) { - _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), enabled); - _record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), System.currentTimeMillis()); + private void setInstanceEnabledHelper(boolean enabled, Long timestampOverride) { + _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(), enabled); + _record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), + timestampOverride != null ? timestampOverride : System.currentTimeMillis()); if (enabled) { + // TODO: Replace this when HELIX_ENABLED and HELIX_DISABLED_REASON is removed. resetInstanceDisabledTypeAndReason(); } } @@ -290,6 +415,7 @@ private void setInstanceEnabledHelper(boolean enabled) { /** * Removes HELIX_DISABLED_REASON and HELIX_DISABLED_TYPE entry from simple field. */ + @Deprecated public void resetInstanceDisabledTypeAndReason() { _record.getSimpleFields().remove(InstanceConfigProperty.HELIX_DISABLED_REASON.name()); _record.getSimpleFields().remove(InstanceConfigProperty.HELIX_DISABLED_TYPE.name()); @@ -298,19 +424,25 @@ public void resetInstanceDisabledTypeAndReason() { /** * Set the instance disabled reason when instance is disabled. * It will be a no-op when instance is enabled. + * @deprecated This method is deprecated. Please use . */ + @Deprecated public void setInstanceDisabledReason(String disabledReason) { - if (getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) { - _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), disabledReason); - } + if (getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) { + _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), disabledReason); + } } /** * Set the instance disabled type when instance is disabled. * It will be a no-op when instance is enabled. + * @deprecated This method is deprecated. Please use setInstanceOperation along with + * InstanceOperation.Builder().setSource + *(...) */ + @Deprecated public void setInstanceDisabledType(InstanceConstants.InstanceDisabledType disabledType) { - if (getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) { + if (getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) { _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(), disabledType.name()); } @@ -319,7 +451,9 @@ public void setInstanceDisabledType(InstanceConstants.InstanceDisabledType disab /** * Get the instance disabled reason when instance is disabled. * @return Return instance disabled reason. Default is am empty string. + * @deprecated This method is deprecated. Please use getInstanceOperation().getReason() instead. */ + @Deprecated public String getInstanceDisabledReason() { return _record.getStringField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), ""); } @@ -328,63 +462,184 @@ public String getInstanceDisabledReason() { * * @return Return instance disabled type (org.apache.helix.constants.InstanceConstants.InstanceDisabledType) * Default is am empty string. + * @deprecated This method is deprecated. Please use getInstanceOperation().getSource + *() instead. */ + @Deprecated public String getInstanceDisabledType() { - if (!getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) { + if (_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(), + HELIX_ENABLED_DEFAULT_VALUE)) { return InstanceConstants.INSTANCE_NOT_DISABLED; } return _record.getStringField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(), InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name()); } + private List getInstanceOperations() { + if (_deserializedInstanceOperations == null || _deserializedInstanceOperations.isEmpty()) { + // If the _deserializedInstanceOperations is not set, then we need to build it from the real + // helix property HELIX_INSTANCE_OPERATIONS. + List instanceOperations = + _record.getListField(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name()); + List newDeserializedInstanceOperations = new ArrayList<>(); + if (instanceOperations != null) { + for (String serializedInstanceOperation : instanceOperations) { + try { + Map properties = _objectMapper.readValue(serializedInstanceOperation, + new TypeReference>() { + }); + newDeserializedInstanceOperations.add(new InstanceOperation(properties)); + } catch (JsonProcessingException e) { + _logger.error( + "Failed to deserialize instance operation for instance: " + _record.getId(), e); + } + } + } + _deserializedInstanceOperations = newDeserializedInstanceOperations; + } + + return _deserializedInstanceOperations; + } + /** * Set the instance operation for this instance. + * This method also sets the HELIX_ENABLED, HELIX_DISABLED_REASON, and HELIX_DISABLED_TYPE fields + * for backwards compatibility. * * @param operation the instance operation */ - public void setInstanceOperation(InstanceConstants.InstanceOperation operation) { - _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), - operation == null ? "" : operation.name()); - if (operation == null || operation == InstanceConstants.InstanceOperation.ENABLE - || operation == InstanceConstants.InstanceOperation.DISABLE) { + public void setInstanceOperation(InstanceOperation operation) { + List deserializedInstanceOperations = getInstanceOperations(); + + if (operation.getSource() == InstanceConstants.InstanceOperationSource.ADMIN) { + deserializedInstanceOperations.clear(); + } else { + // Remove the instance operation with the same source if it exists. + deserializedInstanceOperations.removeIf( + instanceOperation -> instanceOperation.getSource() == operation.getSource()); + } + if (operation.getOperation() == InstanceConstants.InstanceOperation.ENABLE) { + // Insert the operation after the last ENABLE or at the beginning if there isn't ENABLE in the list. + int insertIndex = 0; + for (int i = deserializedInstanceOperations.size() - 1; i >= 0; i--) { + if (deserializedInstanceOperations.get(i).getOperation() + == InstanceConstants.InstanceOperation.ENABLE) { + insertIndex = i + 1; + break; + } + } + deserializedInstanceOperations.add(insertIndex, operation); + } else { + deserializedInstanceOperations.add(operation); + } + // Set the actual field in the ZnRecord + _record.setListField(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name(), + deserializedInstanceOperations.stream().map(instanceOperation -> { + try { + return _objectMapper.writeValueAsString(instanceOperation.getProperties()); + } catch (JsonProcessingException e) { + throw new HelixException( + "Failed to serialize instance operation for instance: " + _record.getId() + + " Can't set the instance operation to: " + operation.getOperation(), e); + } + }).collect(Collectors.toList())); + + // TODO: Remove this when we are sure that all users are using the new InstanceOperation only and HELIX_ENABLED is removed. + if (operation.getOperation() == InstanceConstants.InstanceOperation.DISABLE) { // We are still setting the HELIX_ENABLED field for backwards compatibility. // It is possible that users will be using earlier version of HelixAdmin or helix-rest // is on older version. - // TODO: Remove this when we are sure that all users are using the new field INSTANCE_OPERATION. - setInstanceEnabledHelper(!(operation == InstanceConstants.InstanceOperation.DISABLE)); + + if (_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(), true)) { + // Check if it is already disabled, if yes, then we don't need to set HELIX_ENABLED and HELIX_ENABLED_TIMESTAMP + setInstanceEnabledHelper(false, operation.getTimestamp()); + } + + _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), + operation.getReason()); + _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(), + InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name()); + } else if (operation.getOperation() == InstanceConstants.InstanceOperation.ENABLE) { + // If any of the other InstanceOperations are of type DISABLE, set that in the HELIX_ENABLED, + // HELIX_DISABLED_REASON, and HELIX_DISABLED_TYPE fields. + InstanceOperation latestDisableInstanceOperation = null; + for (InstanceOperation instanceOperation : getInstanceOperations()) { + if (instanceOperation.getOperation() == InstanceConstants.InstanceOperation.DISABLE && ( + latestDisableInstanceOperation == null || instanceOperation.getTimestamp() + > latestDisableInstanceOperation.getTimestamp())) { + latestDisableInstanceOperation = instanceOperation; + } + } + + if (latestDisableInstanceOperation != null) { + _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_REASON.name(), + latestDisableInstanceOperation.getReason()); + _record.setSimpleField(InstanceConfigProperty.HELIX_DISABLED_TYPE.name(), + InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name()); + } else { + setInstanceEnabledHelper(true, operation.getTimestamp()); + } } } + /** + * Set the instance operation for this instance. Provide the InstanceOperation enum and the reason + * and source will be set to default values. + * + * @param operation the instance operation + */ + public void setInstanceOperation(InstanceConstants.InstanceOperation operation) { + InstanceOperation instanceOperation = + new InstanceOperation.Builder().setOperation(operation).build(); + setInstanceOperation(instanceOperation); + } + private void setInstanceOperationInit(InstanceConstants.InstanceOperation operation) { if (operation == null) { return; } - _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), operation.name()); + InstanceOperation instanceOperation = + new InstanceOperation.Builder().setOperation(operation).setReason("INIT").build(); + // When an instance is created for the first time the timestamp is set to -1 so that if it + // is disabled it will not be considered within the delay window when it joins. + instanceOperation.setTimestamp(HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE); + setInstanceOperation(instanceOperation); + } + + private InstanceOperation getActiveInstanceOperation() { + List instanceOperations = getInstanceOperations(); + + if (instanceOperations.isEmpty()) { + InstanceOperation instanceOperation = + new InstanceOperation.Builder().setOperation(InstanceConstants.InstanceOperation.ENABLE) + .setSource(InstanceConstants.InstanceOperationSource.DEFAULT).build(); + instanceOperation.setTimestamp(HELIX_ENABLED_TIMESTAMP_DEFAULT_VALUE); + return instanceOperation; + } + + // The last instance operation in the list is the most recent one. + // ENABLE operation should not be included in the list. + return instanceOperations.get(instanceOperations.size() - 1); } /** - * Get the InstanceOperation of this instance, default is ENABLE if nothing is set. If + * Get the InstanceOperationType of this instance, default is ENABLE if nothing is set. If * HELIX_ENABLED is set to false, then the instance operation is DISABLE for backwards * compatibility. * * @return the instance operation */ - public InstanceConstants.InstanceOperation getInstanceOperation() { - String instanceOperationString = - _record.getSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name()); - - InstanceConstants.InstanceOperation instanceOperation; + public InstanceOperation getInstanceOperation() { + InstanceOperation activeInstanceOperation = getActiveInstanceOperation(); try { - // If INSTANCE_OPERATION is not set, then the instance is enabled. - instanceOperation = (instanceOperationString == null || instanceOperationString.isEmpty()) - ? InstanceConstants.InstanceOperation.ENABLE - : InstanceConstants.InstanceOperation.valueOf(instanceOperationString); + activeInstanceOperation.getOperation(); } catch (IllegalArgumentException e) { - _logger.error("Invalid instance operation: " + instanceOperationString + " for instance: " - + _record.getId() + _logger.error("Invalid instance operation type for instance: " + _record.getId() + ". You may need to update your version of Helix to get support for this " + "type of InstanceOperation. Defaulting to UNKNOWN."); - return InstanceConstants.InstanceOperation.UNKNOWN; + activeInstanceOperation = + new InstanceOperation.Builder().setOperation(InstanceConstants.InstanceOperation.UNKNOWN) + .build(); } // Always respect the HELIX_ENABLED being set to false when instance operation is unset @@ -392,11 +647,16 @@ public InstanceConstants.InstanceOperation getInstanceOperation() { if (!_record.getBooleanField(InstanceConfigProperty.HELIX_ENABLED.name(), HELIX_ENABLED_DEFAULT_VALUE) && (InstanceConstants.INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS.contains( - instanceOperation))) { - return InstanceConstants.InstanceOperation.DISABLE; + activeInstanceOperation.getOperation()))) { + return new InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE).setReason(getInstanceDisabledReason()) + .setSource( + InstanceConstants.InstanceOperationSource.instanceDisabledTypeToInstanceOperationSource( + InstanceConstants.InstanceDisabledType.valueOf(getInstanceDisabledType()))) + .build(); } - return instanceOperation; + return activeInstanceOperation; } /** @@ -406,7 +666,7 @@ public InstanceConstants.InstanceOperation getInstanceOperation() { * @return true if enabled, false otherwise */ public boolean getInstanceEnabled() { - return getInstanceOperation().equals(InstanceConstants.InstanceOperation.ENABLE); + return getInstanceOperation().getOperation().equals(InstanceConstants.InstanceOperation.ENABLE); } /** @@ -416,7 +676,8 @@ public boolean getInstanceEnabled() { * @return true if the instance is assignable, false otherwise */ public boolean isAssignable() { - return InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains(getInstanceOperation()); + return InstanceConstants.ASSIGNABLE_INSTANCE_OPERATIONS.contains( + getInstanceOperation().getOperation()); } /** @@ -929,10 +1190,8 @@ public InstanceConfig build(String instanceId) { instanceConfig.addTag(tag); } - if (_instanceOperation == null && _instanceEnabled != HELIX_ENABLED_DEFAULT_VALUE) { - instanceConfig.setInstanceOperationInit( - _instanceEnabled ? InstanceConstants.InstanceOperation.ENABLE - : InstanceConstants.InstanceOperation.DISABLE); + if (_instanceOperation == null && !_instanceEnabled) { + instanceConfig.setInstanceOperationInit(InstanceConstants.InstanceOperation.DISABLE); } if (_instanceOperation != null && !_instanceOperation.equals( diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java index db9ada93c4..f634aac46e 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -181,8 +181,8 @@ public synchronized void refresh(HelixDataAccessor accessor) { private void updateRoutableInstanceConfigMap(Map instanceConfigMap) { _routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter( - (instanceConfigEntry) -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains( - instanceConfigEntry.getValue().getInstanceOperation())) + (instanceConfigEntry) -> !InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains( + instanceConfigEntry.getValue().getInstanceOperation().getOperation())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @@ -190,8 +190,9 @@ private void updateRoutableLiveInstanceMap(Map instanceC Map liveInstanceMap) { _routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter( (liveInstanceEntry) -> instanceConfigMap.containsKey(liveInstanceEntry.getKey()) - && !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains( - instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation())) + && !InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains( + instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation() + .getOperation())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java new file mode 100644 index 0000000000..967d561e74 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceUtil.java @@ -0,0 +1,198 @@ +package org.apache.helix.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; +import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.model.ClusterTopologyConfig; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.zkclient.DataUpdater; + +public class InstanceUtil { + + // Private constructor to prevent instantiation + private InstanceUtil() { + } + + // Validators for instance operation transitions + private static final Function, Boolean> ALWAYS_ALLOWED = + (matchingInstances) -> true; + private static final Function, Boolean> ALL_MATCHES_ARE_UNKNOWN = + (matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch( + instance -> instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.UNKNOWN)); + private static final Function, Boolean> ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE = + (matchingInstances) -> matchingInstances.isEmpty() || matchingInstances.stream().allMatch( + instance -> instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.UNKNOWN) + || instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.EVACUATE)); + private static final Function, Boolean> ANY_MATCH_ENABLE_OR_DISABLE = + (matchingInstances) -> !matchingInstances.isEmpty() && matchingInstances.stream().anyMatch( + instance -> instance.getInstanceOperation().getOperation() + .equals(InstanceConstants.InstanceOperation.ENABLE) || instance.getInstanceOperation() + .getOperation().equals(InstanceConstants.InstanceOperation.DISABLE)); + + // Validator map for valid instance operation transitions :: + private static final ImmutableMap, Boolean>>> + validInstanceOperationTransitions = + ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, + // ENABLE and DISABLE can be set to UNKNOWN when matching instance is in SWAP_IN and set to ENABLE in a transaction. + ImmutableMap.of(InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.DISABLE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED), + InstanceConstants.InstanceOperation.DISABLE, + ImmutableMap.of(InstanceConstants.InstanceOperation.DISABLE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.ENABLE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED), + InstanceConstants.InstanceOperation.SWAP_IN, + // SWAP_IN can be set to ENABLE when matching instance is in UNKNOWN state in a transaction. + ImmutableMap.of(InstanceConstants.InstanceOperation.SWAP_IN, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED), + InstanceConstants.InstanceOperation.EVACUATE, + ImmutableMap.of(InstanceConstants.InstanceOperation.EVACUATE, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.ENABLE, ALL_MATCHES_ARE_UNKNOWN, + InstanceConstants.InstanceOperation.DISABLE, ALL_MATCHES_ARE_UNKNOWN, + InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED), + InstanceConstants.InstanceOperation.UNKNOWN, + ImmutableMap.of(InstanceConstants.InstanceOperation.UNKNOWN, ALWAYS_ALLOWED, + InstanceConstants.InstanceOperation.ENABLE, ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE, + InstanceConstants.InstanceOperation.DISABLE, ALL_MATCHES_ARE_UNKNOWN_OR_EVACUATE, + InstanceConstants.InstanceOperation.SWAP_IN, ANY_MATCH_ENABLE_OR_DISABLE)); + + /** + * Validates if the transition from the current operation to the target operation is valid. + * + * @param configAccessor The ConfigAccessor instance + * @param clusterName The cluster name + * @param instanceConfig The current instance configuration + * @param currentOperation The current operation + * @param targetOperation The target operation + */ + public static void validateInstanceOperationTransition(ConfigAccessor configAccessor, + String clusterName, InstanceConfig instanceConfig, + InstanceConstants.InstanceOperation currentOperation, + InstanceConstants.InstanceOperation targetOperation) { + // Check if the current operation and target operation are in the valid transitions map + if (!validInstanceOperationTransitions.containsKey(currentOperation) + || !validInstanceOperationTransitions.get(currentOperation).containsKey(targetOperation)) { + throw new HelixException( + "Invalid instance operation transition from " + currentOperation + " to " + + targetOperation); + } + + // Throw exception if the validation fails + if (!validInstanceOperationTransitions.get(currentOperation).get(targetOperation) + .apply(findInstancesWithMatchingLogicalId(configAccessor, clusterName, instanceConfig))) { + throw new HelixException( + "Failed validation for instance operation transition from " + currentOperation + " to " + + targetOperation); + } + } + + /** + * Finds the instances that have a matching logical ID with the given instance. + * + * @param configAccessor The ConfigAccessor instance + * @param clusterName The cluster name + * @param instanceConfig The instance configuration to match + * @return A list of matching instances + */ + public static List findInstancesWithMatchingLogicalId( + ConfigAccessor configAccessor, String clusterName, InstanceConfig instanceConfig) { + String logicalIdKey = + ClusterTopologyConfig.createFromClusterConfig(configAccessor.getClusterConfig(clusterName)) + .getEndNodeType(); + + // Retrieve and filter instances with matching logical ID + return configAccessor.getKeys( + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, + clusterName).build()).stream() + .map(instanceName -> configAccessor.getInstanceConfig(clusterName, instanceName)).filter( + potentialInstanceConfig -> + !potentialInstanceConfig.getInstanceName().equals(instanceConfig.getInstanceName()) + && potentialInstanceConfig.getLogicalId(logicalIdKey) + .equals(instanceConfig.getLogicalId(logicalIdKey))) + .collect(Collectors.toList()); + } + + /** + * Sets the instance operation for the given instance. + * + * @param configAccessor The ConfigAccessor instance + * @param baseAccessor The BaseDataAccessor instance + * @param clusterName The cluster name + * @param instanceName The instance name + * @param instanceOperation The instance operation to set + */ + public static void setInstanceOperation(ConfigAccessor configAccessor, + BaseDataAccessor baseAccessor, String clusterName, String instanceName, + InstanceConfig.InstanceOperation instanceOperation) { + String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + + // Retrieve the current instance configuration + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); + if (instanceConfig == null) { + throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName + + ", instance config does not exist"); + } + + // Validate the instance operation transition + validateInstanceOperationTransition(configAccessor, clusterName, instanceConfig, + instanceConfig.getInstanceOperation().getOperation(), + instanceOperation == null ? InstanceConstants.InstanceOperation.ENABLE + : instanceOperation.getOperation()); + + // Update the instance operation + boolean succeeded = baseAccessor.update(path, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName + + ", participant config is null"); + } + + InstanceConfig config = new InstanceConfig(currentData); + config.setInstanceOperation(instanceOperation); + return config.getRecord(); + } + }, AccessOption.PERSISTENT); + + if (!succeeded) { + throw new HelixException( + "Failed to update instance operation. Please check if instance is disabled."); + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java index bf9b59dc2d..5ea42dc3e7 100644 --- a/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java +++ b/helix-core/src/test/java/org/apache/helix/cloud/event/TestDefaultCloudEventCallbackImpl.java @@ -52,20 +52,18 @@ public void testDisableInstance() { Assert.assertFalse(InstanceValidationUtil .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName())); Assert.assertEquals(_manager.getConfigAccessor() - .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName()) - .getInstanceDisabledType(), InstanceConstants.InstanceDisabledType.CLOUD_EVENT.name()); + .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName()).getInstanceOperation() + .getSource(), InstanceConstants.InstanceOperationSource.AUTOMATION); - // Should not disable instance if it is already disabled due to other reasons - // And disabled type should remain unchanged _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false); _impl.disableInstance(_instanceManager, null); Assert.assertFalse(InstanceValidationUtil .isEnabled(_manager.getHelixDataAccessor(), _instanceManager.getInstanceName())); Assert.assertEquals(_manager.getConfigAccessor() .getInstanceConfig(CLUSTER_NAME, _instanceManager.getInstanceName()) - .getInstanceDisabledType(), - InstanceConstants.InstanceDisabledType.DEFAULT_INSTANCE_DISABLE_TYPE.name()); + .getInstanceOperation().getSource(), InstanceConstants.InstanceOperationSource.USER); + _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), true); _admin.enableInstance(CLUSTER_NAME, _instanceManager.getInstanceName(), false, InstanceConstants.InstanceDisabledType.CLOUD_EVENT, null); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 85600c01c1..67b575f0c0 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -211,7 +211,7 @@ private void removeOfflineOrInactiveInstances() { InstanceConfig instanceConfig = _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName); if (!_participants.get(i).isConnected() || !instanceConfig.getInstanceEnabled() - || instanceConfig.getInstanceOperation() + || instanceConfig.getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN)) { if (_participants.get(i).isConnected()) { _participants.get(i).syncStop(); @@ -338,7 +338,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception { // now remove operation tag String instanceToEvacuate = _participants.get(0).getInstanceName(); _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE); Assert.assertTrue(_clusterVerifier.verifyByPolling()); @@ -370,7 +370,8 @@ public void testNodeSwapNoTopologySetup() throws Exception { Assert.assertEquals( _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) - .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); + .getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.UNKNOWN); } @Test(dependsOnMethods = "testNodeSwapNoTopologySetup") @@ -397,7 +398,8 @@ public void testAddingNodeWithEnableInstanceOperation() throws Exception { Assert.assertEquals( _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) - .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); + .getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.UNKNOWN); } @Test(dependsOnMethods = "testAddingNodeWithEnableInstanceOperation") @@ -416,7 +418,8 @@ public void testNodeSwapWithNoSwapOutNode() throws Exception { Assert.assertEquals( _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) - .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); + .getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.UNKNOWN); } @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode") @@ -440,7 +443,8 @@ public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws Exception Assert.assertEquals( _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceToSwapInName) - .getInstanceOperation(), InstanceConstants.InstanceOperation.UNKNOWN); + .getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.UNKNOWN); // Setting the InstanceOperation to SWAP_IN should work because there is a matching logicalId in // the cluster and the InstanceCapacityWeights and FaultZone match. @@ -481,7 +485,8 @@ public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception { // Instance should be UNKNOWN since there was already a swapping pair. Assert.assertEquals(_gSetupTool.getClusterManagementTool() - .getInstanceConfig(CLUSTER_NAME, secondInstanceToSwapInName).getInstanceOperation(), + .getInstanceConfig(CLUSTER_NAME, secondInstanceToSwapInName).getInstanceOperation() + .getOperation(), InstanceConstants.InstanceOperation.UNKNOWN); // Try to set the InstanceOperation to SWAP_IN, it should throw an exception since there is already @@ -576,7 +581,8 @@ public void testNodeSwap() throws Exception { Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); Assert.assertEquals(_gSetupTool.getClusterManagementTool() - .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(), + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation() + .getOperation(), InstanceConstants.InstanceOperation.UNKNOWN); // Check to make sure the throttle was enabled again after the swap was completed. @@ -681,7 +687,8 @@ public void testNodeSwapDisableAndReenable() throws Exception { Assert.assertFalse(_gSetupTool.getClusterManagementTool() .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); Assert.assertEquals(_gSetupTool.getClusterManagementTool() - .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation(), + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceOperation() + .getOperation(), InstanceConstants.InstanceOperation.UNKNOWN); // Validate that the SWAP_IN instance has the same partitions the swap out instance had before @@ -824,7 +831,7 @@ public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { Collections.emptySet(), Collections.emptySet()); _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); + .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, InstanceConstants.InstanceOperation.ENABLE); Assert.assertTrue(_clusterVerifier.verifyByPolling()); @@ -1110,7 +1117,7 @@ public void testUnsetInstanceOperationOnSwapInWhenSwapping() throws Exception { // This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation // unset. _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null); + .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, InstanceConstants.InstanceOperation.ENABLE); } @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenSwapping") @@ -1180,7 +1187,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { // cancel the evacuation _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE); assignment = getEVs(); for (String resource : _allDBs) { @@ -1222,7 +1229,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception { // cancel evacuation _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE); // check every replica has >= 3 active replicas, even before cluster converge Map assignment = getEVs(); for (String resource : _allDBs) { @@ -1311,7 +1318,7 @@ public void testSwapEvacuateAddRemoveEvacuate() throws Exception { // Remove EVACUATE instance's InstanceOperation _gSetupTool.getClusterManagementTool() - .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); + .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, InstanceConstants.InstanceOperation.ENABLE); } @Test(dependsOnMethods = "testSwapEvacuateAddRemoveEvacuate") @@ -1392,7 +1399,7 @@ public boolean isThrottlesEnabled() { @Override public void onInstanceConfigChange(List instanceConfig, NotificationContext context) { - if (instanceConfig.get(0).getInstanceOperation() + if (instanceConfig.get(0).getInstanceOperation().getOperation() .equals(InstanceConstants.InstanceOperation.SWAP_IN)) { throttlesEnabled = false; } else { diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java index 5581108578..b54be8c045 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java @@ -182,8 +182,7 @@ public void testZkHelixAdmin() { String disableReason = "Reason"; tool.enableInstance(clusterName, instanceName, false, InstanceConstants.InstanceDisabledType.CLOUD_EVENT, disableReason); - Assert.assertTrue(tool.getInstanceConfig(clusterName, instanceName).getInstanceDisabledReason() - .equals(disableReason)); + Assert.assertEquals(disableReason, tool.getInstanceConfig(clusterName, instanceName).getInstanceDisabledReason()); tool.enableInstance(clusterName, instanceName, true, InstanceConstants.InstanceDisabledType.CLOUD_EVENT, disableReason); Assert.assertTrue( @@ -348,6 +347,65 @@ public void testZkHelixAdmin() { System.out.println("END testZkHelixAdmin at " + new Date(System.currentTimeMillis())); } + @Test + private void testSetInstanceOperation() { + System.out.println("START testSetInstanceOperation at " + new Date(System.currentTimeMillis())); + + final String clusterName = getShortClassName(); + String rootPath = "/" + clusterName; + if (_gZkClient.exists(rootPath)) { + _gZkClient.deleteRecursively(rootPath); + } + + HelixAdmin tool = new ZKHelixAdmin(_gZkClient); + tool.addCluster(clusterName, true); + Assert.assertTrue(ZKUtil.isClusterSetup(clusterName, _gZkClient)); + Assert.assertTrue(_gZkClient.exists(PropertyPathBuilder.customizedStateConfig(clusterName))); + + // Add instance to cluster + String hostname = "host1"; + String port = "9999"; + String instanceName = hostname + "_" + port; + InstanceConfig config = + new InstanceConfig.Builder().setHostName(hostname).setPort(port).build(instanceName); + + tool.addInstance(clusterName, config); + + // Set instance operation to DISABLE + tool.setInstanceOperation(clusterName, instanceName, + InstanceConstants.InstanceOperation.DISABLE, "disableReason"); + Assert.assertEquals(tool.getInstanceConfig(clusterName, instanceName).getInstanceOperation() + .getOperation(), + InstanceConstants.InstanceOperation.DISABLE); + Assert.assertEquals( + tool.getInstanceConfig(clusterName, instanceName).getInstanceDisabledReason(), + "disableReason"); + + // Set instance operation to ENABLE + tool.setInstanceOperation(clusterName, instanceName, InstanceConstants.InstanceOperation.ENABLE, + "enableReason"); + Assert.assertEquals(tool.getInstanceConfig(clusterName, instanceName).getInstanceOperation() + .getOperation(), + InstanceConstants.InstanceOperation.ENABLE); + // InstanceNonServingReason should be empty after setting operation to ENABLE + Assert.assertEquals( + tool.getInstanceConfig(clusterName, instanceName).getInstanceDisabledReason(), ""); + + // Set instance operation to UNKNOWN + tool.setInstanceOperation(clusterName, instanceName, + InstanceConstants.InstanceOperation.UNKNOWN, "unknownReason"); + Assert.assertEquals(tool.getInstanceConfig(clusterName, instanceName).getInstanceOperation() + .getOperation(), + InstanceConstants.InstanceOperation.UNKNOWN); + Assert.assertEquals( + tool.getInstanceConfig(clusterName, instanceName).getInstanceOperation().getReason(), + "unknownReason"); + + deleteCluster(clusterName); + + System.out.println("END testSetInstanceOperation at " + new Date(System.currentTimeMillis())); + } + private HelixManager initializeHelixManager(String clusterName, String instanceName) { HelixManager manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, org.apache.helix.common.ZkTestBase.ZK_ADDR); diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index d9bc5d7fe6..c5c5626ff6 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -285,16 +287,12 @@ public void enableInstance(String clusterName, String instanceName, boolean enab ZNRecord record = (ZNRecord) _baseDataAccessor.get(instanceConfigPath, null, 0); InstanceConfig instanceConfig = new InstanceConfig(record); - instanceConfig.setInstanceOperation(enabled ? InstanceConstants.InstanceOperation.ENABLE - : InstanceConstants.InstanceOperation.DISABLE); + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + enabled ? InstanceConstants.InstanceOperation.ENABLE + : InstanceConstants.InstanceOperation.DISABLE).setReason(reason).build()); if (!enabled) { + // TODO: Replace this when the HELIX_ENABLED and HELIX_DISABLED fields are removed. instanceConfig.resetInstanceDisabledTypeAndReason(); - if (reason != null) { - instanceConfig.setInstanceDisabledReason(reason); - } - if (disabledType != null) { - instanceConfig.setInstanceDisabledType(disabledType); - } } _baseDataAccessor.set(instanceConfigPath, instanceConfig.getRecord(), 0); } @@ -307,7 +305,20 @@ public void enableInstance(String clusterName, List instances, boolean e @Override public void setInstanceOperation(String clusterName, String instanceName, - InstanceConstants.InstanceOperation instanceOperation) { + @Nullable InstanceConstants.InstanceOperation instanceOperation) { + setInstanceOperation(clusterName, instanceName, instanceOperation, null, false); + } + + @Override + public void setInstanceOperation(String clusterName, String instanceName, + @Nullable InstanceConstants.InstanceOperation instanceOperation, String reason) { + setInstanceOperation(clusterName, instanceName, instanceOperation, reason, false); + } + + @Override + public void setInstanceOperation(String clusterName, String instanceName, + @Nullable InstanceConstants.InstanceOperation instanceOperation, String reason, + boolean overrideAll) { } @Override diff --git a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java index 7da983b8aa..47ea88ac4d 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestInstanceConfig.java @@ -52,7 +52,6 @@ public void testGetParsedDomain() { public void testSetInstanceEnableWithReason() { InstanceConfig instanceConfig = new InstanceConfig(new ZNRecord("id")); instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE); - instanceConfig.setInstanceDisabledReason("NoShowReason"); instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION); Assert.assertEquals(instanceConfig.getRecord().getSimpleFields() @@ -62,10 +61,9 @@ public void testSetInstanceEnableWithReason() { Assert.assertEquals(instanceConfig.getRecord().getSimpleFields() .get(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_TYPE.toString()), null); - - instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE); String reasonCode = "ReasonCode"; - instanceConfig.setInstanceDisabledReason(reasonCode); + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE).setReason(reasonCode).build()); instanceConfig.setInstanceDisabledType(InstanceConstants.InstanceDisabledType.USER_OPERATION); Assert.assertEquals(instanceConfig.getRecord().getSimpleFields() .get(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString()), "false"); @@ -198,6 +196,30 @@ public void testInstanceConfigBuilder() { Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"), Integer.valueOf(1)); } + @Test + public void testInstanceOperationReason() { + InstanceConfig instanceConfig = new InstanceConfig("instance1"); + instanceConfig.setInstanceEnabled(false); + instanceConfig.setInstanceDisabledReason("disableReason"); + Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), "disableReason"); + Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), "disableReason"); + + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.UNKNOWN).setReason("unknownReason").build()); + Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), "disableReason"); + Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), "unknownReason"); + + instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.DISABLE); + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE).setReason("disableReason2").build()); + Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), "disableReason2"); + Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), "disableReason2"); + + instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE); + Assert.assertEquals(instanceConfig.getInstanceDisabledReason(), ""); + Assert.assertEquals(instanceConfig.getInstanceOperation().getReason(), ""); + } + @Test public void testOverwriteInstanceConfig() { InstanceConfig instanceConfig = new InstanceConfig("instance2"); @@ -233,9 +255,91 @@ public void testOverwriteInstanceConfig() { Assert.assertTrue(instanceConfig.getTags().contains("tag4")); Assert.assertFalse(instanceConfig.getRecord().getSimpleFields() .containsKey(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.toString())); - Assert.assertEquals(instanceConfig.getInstanceOperation(), + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), InstanceConstants.InstanceOperation.EVACUATE); Assert.assertFalse(instanceConfig.getInstanceCapacityMap().containsKey("weight1")); Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight2"), Integer.valueOf(2)); } + + @Test + public void testInstanceOperationMultipleSources() throws InterruptedException { + InstanceConfig instanceConfig = new InstanceConfig("instance1"); + + // Check that the instance operation is ENABLE from the DEFAULT source + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.ENABLE); + Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(), + InstanceConstants.InstanceOperationSource.DEFAULT); + + // Set instance operation from user source + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE).setReason("userReason") + .setSource(InstanceConstants.InstanceOperationSource.USER).build()); + // Get enabled time + long op1EnabledTime = instanceConfig.getInstanceEnabledTime(); + + Thread.sleep(1000); + // Set instance operation from automation source + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE).setReason("automationReason") + .setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build()); + + // Check that the enabled time is the same as op1 but the source and reason is changed to automation + Assert.assertEquals(instanceConfig.getInstanceEnabledTime(), op1EnabledTime); + Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(), + InstanceConstants.InstanceOperationSource.AUTOMATION); + + Thread.sleep(1000); + // Set instance operation from user source to be ENABLE + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.ENABLE) + .setSource(InstanceConstants.InstanceOperationSource.USER).build()); + + // Check that the operation is DISABLE, the enabled time is the same as op1, and the source is still automation + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.DISABLE); + Assert.assertEquals(instanceConfig.getInstanceEnabledTime(), op1EnabledTime); + Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(), + InstanceConstants.InstanceOperationSource.AUTOMATION); + + Thread.sleep(1000); + // Set the instance operation from the automation source to be ENABLE + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.ENABLE) + .setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build()); + + // Check that the operation is ENABLE, the enabled time is the different from op1, and the source is still automation + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.ENABLE); + Assert.assertFalse(instanceConfig.getInstanceEnabledTime() == op1EnabledTime); + Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(), + InstanceConstants.InstanceOperationSource.AUTOMATION); + + // Set the instance operation from the automation source to be EVACUATE + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.EVACUATE) + .setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).build()); + + // Set the instance operation from the user source to be DISABLE + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.DISABLE) + .setSource(InstanceConstants.InstanceOperationSource.USER).build()); + + // Check that the instance operation is DISABLE and the source is user + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.DISABLE); + Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(), + InstanceConstants.InstanceOperationSource.USER); + + // Set the instance operation from the admin source to be ENABLE + instanceConfig.setInstanceOperation(new InstanceConfig.InstanceOperation.Builder().setOperation( + InstanceConstants.InstanceOperation.ENABLE) + .setSource(InstanceConstants.InstanceOperationSource.ADMIN).build()); + + // Check that the instance operation is ENABLE and the source is admin + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), + InstanceConstants.InstanceOperation.ENABLE); + Assert.assertEquals(instanceConfig.getInstanceOperation().getSource(), + InstanceConstants.InstanceOperationSource.ADMIN); + } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 8a4bbf07bd..bb5a2bc5c4 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -268,8 +268,8 @@ private void collectEvacuatingInstances(Set toBeStoppedInstances) { PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder(); InstanceConfig instanceConfig = _dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance)); - if (InstanceConstants.InstanceOperation.EVACUATE - .equals(instanceConfig.getInstanceOperation())) { + if (InstanceConstants.InstanceOperation.EVACUATE.equals( + instanceConfig.getInstanceOperation().getOperation())) { toBeStoppedInstances.add(instance); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index ea98f66371..55fc4de36e 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -45,12 +45,14 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Error; import org.apache.helix.model.HealthStat; @@ -66,6 +68,7 @@ import org.apache.helix.rest.server.filters.ClusterAuth; import org.apache.helix.rest.server.json.instance.InstanceInfo; import org.apache.helix.rest.server.json.instance.StoppableCheck; +import org.apache.helix.util.InstanceUtil; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; @@ -388,9 +391,11 @@ record = toZNRecord(content); @POST public Response updateInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("command") String command, - @QueryParam("instanceOperation") InstanceConstants.InstanceOperation state, - @QueryParam("instanceDisabledType") String disabledType, - @QueryParam("instanceDisabledReason") String disabledReason, + @QueryParam("instanceOperation") InstanceConstants.InstanceOperation instanceOperation, + @QueryParam("instanceOperationSource") InstanceConstants.InstanceOperationSource instanceOperationSource, + @QueryParam("reason") String reason, + @Deprecated @QueryParam("instanceDisabledType") String disabledType, + @Deprecated @QueryParam("instanceDisabledReason") String disabledReason, @QueryParam("force") boolean force, String content) { Command cmd; try { @@ -445,7 +450,12 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, .getTypeFactory().constructCollectionType(List.class, String.class))); break; case setInstanceOperation: - admin.setInstanceOperation(clusterId, instanceName, state); + InstanceUtil.setInstanceOperation(new ConfigAccessor(getRealmAwareZkClient()), + new ZkBaseDataAccessor<>(getRealmAwareZkClient()), clusterId, instanceName, + new InstanceConfig.InstanceOperation.Builder().setOperation(instanceOperation) + .setReason(reason).setSource( + force ? InstanceConstants.InstanceOperationSource.ADMIN : instanceOperationSource) + .build()); break; case canCompleteSwap: return OK(OBJECT_MAPPER.writeValueAsString( diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 395f9bf858..6ab727e85e 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -495,14 +495,14 @@ public void updateInstance() throws Exception { new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE") .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); - Assert.assertEquals(instanceConfig.getInstanceOperation(), + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), InstanceConstants.InstanceOperation.EVACUATE); new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP") .expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=") .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); - Assert.assertEquals(instanceConfig.getInstanceOperation(), + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), InstanceConstants.InstanceOperation.ENABLE); // test canCompleteSwap @@ -543,7 +543,7 @@ public void updateInstance() throws Exception { new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE") .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); - Assert.assertEquals(instanceConfig.getInstanceOperation(), + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), InstanceConstants.InstanceOperation.EVACUATE); Response response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished") @@ -586,7 +586,7 @@ public void updateInstance() throws Exception { new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE") .format(CLUSTER_NAME, test_instance_name).post(this, entity); instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, test_instance_name); - Assert.assertEquals(instanceConfig.getInstanceOperation(), + Assert.assertEquals(instanceConfig.getInstanceOperation().getOperation(), InstanceConstants.InstanceOperation.EVACUATE); response = new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=isEvacuateFinished")