Skip to content

Commit

Permalink
Deprecate HELIX_DISABLED_REASON and refactor how InstanceOperation is…
Browse files Browse the repository at this point in the history
… represented in instance configs. (#2801)

Deprecate HELIX_DISABLED_REASON and HELIX_DISABLED_TYPE; Refactor INSTANCE_OPERATION to HELIX_INSTANCE_OPERATIONS List Field

To prevent conflicts from different clients setting the InstanceOperation, we are introducing the HELIX_INSTANCE_OPERATIONS list.

Key changes:
- Clients using the old Helix enabled APIs will take precedence over INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS when those fields are set.
- When the new InstanceOperation APIs set the operation type to DISABLE, the old HELIX_ENABLED field will also be set for backwards compatibility.
- For all InstanceOperation API invocations, the source will default to USER unless specified otherwise. An AUTOMATION source will create a separate entry in the list.
- The most recent non-ENABLE InstanceOperation entry will be the active InstanceOperation used by the controller and returned by the getInstanceOperation API.

These changes ensure smoother operation transitions and maintain compatibility with existing APIs.
  • Loading branch information
zpinto authored Jun 11, 2024
1 parent 17f2df6 commit b7d771e
Show file tree
Hide file tree
Showing 20 changed files with 958 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,54 @@ public class InstanceConstants {
* TODO: Remove this when the deprecated HELIX_ENABLED is removed.
*/
public static final Set<InstanceOperation> INSTANCE_DISABLED_OVERRIDABLE_OPERATIONS =
ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.DISABLE, InstanceOperation.EVACUATE);
ImmutableSet.of(InstanceOperation.ENABLE, InstanceOperation.EVACUATE);


/**
* The set of InstanceOperations that are not allowed to be populated in the RoutingTableProvider.
*/
public static final Set<InstanceOperation> UNSERVABLE_INSTANCE_OPERATIONS =
public static final Set<InstanceOperation> UNROUTABLE_INSTANCE_OPERATIONS =
ImmutableSet.of(InstanceOperation.SWAP_IN, InstanceOperation.UNKNOWN);

@Deprecated
public enum InstanceDisabledType {
CLOUD_EVENT,
USER_OPERATION,
DEFAULT_INSTANCE_DISABLE_TYPE
}

public enum InstanceOperationSource {
ADMIN(0), USER(1), AUTOMATION(2), DEFAULT(3);

private final int _priority;

InstanceOperationSource(int priority) {
_priority = priority;
}

public int getPriority() {
return _priority;
}

/**
* Convert from InstanceDisabledType to InstanceOperationTrigger
*
* @param disabledType InstanceDisabledType
* @return InstanceOperationTrigger
*/
public static InstanceOperationSource instanceDisabledTypeToInstanceOperationSource(
InstanceDisabledType disabledType) {
switch (disabledType) {
case CLOUD_EVENT:
return InstanceOperationSource.AUTOMATION;
case USER_OPERATION:
return InstanceOperationSource.USER;
default:
return InstanceOperationSource.DEFAULT;
}
}
}

public enum InstanceOperation {
/**
* Behavior: Replicas will be assigned to the node and will receive upward state transitions if
Expand Down
31 changes: 27 additions & 4 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,15 +310,38 @@ void enableInstance(String clusterName, String instanceName, boolean enabled,
void enableInstance(String clusterName, List<String> instances, boolean enabled);

/**
* Set the instanceOperation field. Setting it to null is equivalent to
* ENABLE.
* Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}.
*
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation
* @param instanceOperation The instance operation type
*/
void setInstanceOperation(String clusterName, String instanceName,
@Nullable InstanceConstants.InstanceOperation instanceOperation);
InstanceConstants.InstanceOperation instanceOperation);

/**
* Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}.
*
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation type
* @param reason The reason for the operation
*/
void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation, String reason);

/**
* Set the instanceOperation of and instance with {@link InstanceConstants.InstanceOperation}.
*
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation type
* @param reason The reason for the operation
* @param overrideAll Whether to override all existing instance operations from all other
* instance operations
*/
void setInstanceOperation(String clusterName, String instanceName,
InstanceConstants.InstanceOperation instanceOperation, String reason, boolean overrideAll);

/**
* Disable or enable a resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.helix.HelixManager;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.util.InstanceUtil;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,9 +51,14 @@ public void disableInstance(HelixManager manager, Object eventInfo) {
LOG.info("DefaultCloudEventCallbackImpl disable Instance {}", manager.getInstanceName());
if (InstanceValidationUtil
.isEnabled(manager.getHelixDataAccessor(), manager.getInstanceName())) {
manager.getClusterManagmentTool()
.enableInstance(manager.getClusterName(), manager.getInstanceName(), false,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
InstanceUtil.setInstanceOperation(manager.getConfigAccessor(),
manager.getHelixDataAccessor().getBaseDataAccessor(), manager.getClusterName(),
manager.getInstanceName(),
new InstanceConfig.InstanceOperation.Builder().setOperation(
InstanceConstants.InstanceOperation.DISABLE)
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION)
.setReason(message)
.build());
}
HelixEventHandlingUtil.updateCloudEventOperationInClusterConfig(manager.getClusterName(),
manager.getInstanceName(), manager.getHelixDataAccessor().getBaseDataAccessor(), false,
Expand All @@ -72,10 +79,13 @@ public void enableInstance(HelixManager manager, Object eventInfo) {
HelixEventHandlingUtil
.updateCloudEventOperationInClusterConfig(manager.getClusterName(), instanceName,
manager.getHelixDataAccessor().getBaseDataAccessor(), true, message);
if (HelixEventHandlingUtil.isInstanceDisabledForCloudEvent(instanceName, accessor)) {
manager.getClusterManagmentTool().enableInstance(manager.getClusterName(), instanceName, true,
InstanceConstants.InstanceDisabledType.CLOUD_EVENT, message);
}
InstanceUtil.setInstanceOperation(manager.getConfigAccessor(),
manager.getHelixDataAccessor().getBaseDataAccessor(), manager.getClusterName(),
manager.getInstanceName(),
new InstanceConfig.InstanceOperation.Builder().setOperation(
InstanceConstants.InstanceOperation.ENABLE)
.setSource(InstanceConstants.InstanceOperationSource.AUTOMATION).setReason(message)
.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ class HelixEventHandlingUtil {
* @param dataAccessor
* @return return true only when instance is Helix disabled and the disabled reason in
* instanceConfig is cloudEvent
* @deprecated No need to check this if using InstanceOperation and specifying the trigger as CLOUD
* when enabling.
*/
@Deprecated
static boolean isInstanceDisabledForCloudEvent(String instanceName,
HelixDataAccessor dataAccessor) {
InstanceConfig instanceConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* under the License.
*/

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -62,6 +63,21 @@ protected Map<FieldType, Set<String>> getNonTrimmableFields(InstanceConfig insta
return STATIC_TOPOLOGY_RELATED_FIELD_MAP;
}

/**
* We should trim HELIX_INSTANCE_OPERATIONS field, it is used to filter instances in the
* BaseControllerDataProvider. That filtering will be used to determine if ResourceChangeSnapshot
* has changed as opposed to checking the actual value of the field.
*
* @param property the instance config
* @return a map contains all non-trimmable field keys that need to be kept.
*/
protected Map<FieldType, Set<String>> getNonTrimmableKeys(InstanceConfig property) {
Map<FieldType, Set<String>> nonTrimmableKeys = super.getNonTrimmableKeys(property);
nonTrimmableKeys.get(FieldType.LIST_FIELD)
.remove(InstanceConfigProperty.HELIX_INSTANCE_OPERATIONS.name());
return nonTrimmableKeys;
}

@Override
public InstanceConfig trimProperty(InstanceConfig property) {
return new InstanceConfig(doTrim(property));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,14 +413,15 @@ private void updateInstanceSets(Map<String, InstanceConfig> instanceConfigMap,
currentInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());

newInstanceConfigMapByInstanceOperation.computeIfAbsent(
currentInstanceConfig.getInstanceOperation(), k -> new HashMap<>())
currentInstanceConfig.getInstanceOperation().getOperation(),
k -> new HashMap<>())
.put(node, currentInstanceConfig);

if (currentInstanceConfig.isAssignable()) {
newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
}

if (currentInstanceConfig.getInstanceOperation()
if (currentInstanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN)) {
swapInLogicalIdsByInstanceName.put(currentInstanceConfig.getInstanceName(),
currentInstanceLogicalId);
Expand Down Expand Up @@ -1079,7 +1080,8 @@ private void updateDisabledInstances(Collection<InstanceConfig> allInstanceConfi
_disabledInstanceSet.clear();
for (InstanceConfig config : allInstanceConfigs) {
Map<String, List<String>> disabledPartitionMap = config.getDisabledPartitionsMap();
if (config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.DISABLE)) {
if (config.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.DISABLE)) {
_disabledInstanceSet.add(config.getInstanceName());
}
for (String resource : disabledPartitionMap.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
Expand Down Expand Up @@ -358,11 +357,12 @@ private boolean validateInstancesUnableToAcceptOnlineReplicasLimit(final Resourc
if (maxInstancesUnableToAcceptOnlineReplicas >= 0) {
// Instead of only checking the offline instances, we consider how many instances in the cluster
// are not assignable and live. This is because some instances may be online but have an unassignable
// InstanceOperation such as EVACUATE, DISABLE, or UNKNOWN. We will exclude SWAP_IN instances from
// InstanceOperation such as EVACUATE, and DISABLE. We will exclude SWAP_IN and UNKNOWN instances from
// they should not account against the capacity of the cluster.
int instancesUnableToAcceptOnlineReplicas = cache.getInstanceConfigMap().entrySet().stream()
.filter(instanceEntry -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
instanceEntry.getValue().getInstanceOperation())).collect(Collectors.toSet())
.filter(instanceEntry -> !InstanceConstants.UNROUTABLE_INSTANCE_OPERATIONS.contains(
instanceEntry.getValue().getInstanceOperation().getOperation()))
.collect(Collectors.toSet())
.size() - cache.getEnabledLiveInstances().size();
if (instancesUnableToAcceptOnlineReplicas > maxInstancesUnableToAcceptOnlineReplicas) {
String errMsg = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void process(ClusterEvent event) throws Exception {

// Only update the currentStateExcludingUnknown if the instance is not in UNKNOWN InstanceOperation.
if (instanceConfig == null || !instanceConfig.getInstanceOperation()
.getOperation()
.equals(InstanceConstants.InstanceOperation.UNKNOWN)) {
// update current states.
updateCurrentStates(instance,
Expand Down
Loading

0 comments on commit b7d771e

Please sign in to comment.