Skip to content

Commit

Permalink
HelixAdmin APIs and pipeline changes to support Helix Node Swap (#2661)
Browse files Browse the repository at this point in the history
Add ability for up to 2 nodes with the same logicalId to be added to the cluster at the same time when a SWAP is happening.
During all paritionAssignment for WAGED and DelayedAutoRebalancer, we select just one instance for each logicalId. Achieves n -> n+1 for all replicas on SWAP_OUT node and back n when SWAP is marked complete, making it cancelable.

Adding and updating Helix Admin APIs to support swap operation:

setInstanceOperation
addInstance
canCompleteSwap
completeSwapIfPossible
* Refactor sanity checks for HelixAdmin swap APIs.

* Helix Node Swap pipeline changes and integration tests.

* Fix integration tests to properly restore stopped MockParticipant so following tests are not affected.

* Add comments and docstrings.

* Fix tests to clean up after themselves.

* Optimize duplicate logicalId filtering to only be called on allNodes and then used to remove duplicate logicalIds from enabledLiveNodes.

* Add handling for clusterConfig == null in updateSwappingInstances and fix AssignableNode to check for clusterTopologyConfig when attempting to get logicalId.

* Fix integ tests.

* Fix testGetDomainInformation since we no longer allow an instance to join the cluster with an invalid DOMAIN field.

* Add checks to ensure that the SWAP_IN instance has a matching FAULT_ZONE and matching INSTANCE_CAPACITY_MAP to SWAP_OUT node.

* Rename canSwapBeCompleted to canCompleteSwap.

* Add sanity checks to allow SWAP_IN node to join the cluster in disabled state before SWAP_OUT node has instance operation set.

* Fix print in test case.

* Add canCompleteSwap to PerInstanceAccessor and fix formatting.

* Fix flaky node swap after completion by making sure replica has is computed with logicalIds intead of instanceNames.
  • Loading branch information
zpinto authored and Xiaoyuan Lu committed Nov 13, 2023
1 parent e88fec8 commit 32fb092
Show file tree
Hide file tree
Showing 18 changed files with 1,720 additions and 146 deletions.
33 changes: 31 additions & 2 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
Expand Down Expand Up @@ -302,8 +304,15 @@ void enableInstance(String clusterName, String instanceName, boolean enabled,
*/
void enableInstance(String clusterName, List<String> instances, boolean enabled);

void setInstanceOperation(String clusterName, String instance,
InstanceConstants.InstanceOperation instanceOperation);
/**
* Set the instanceOperation field.
*
* @param clusterName The cluster name
* @param instanceName The instance name
* @param instanceOperation The instance operation
*/
void setInstanceOperation(String clusterName, String instanceName,
@Nullable InstanceConstants.InstanceOperation instanceOperation);

/**
* Disable or enable a resource
Expand Down Expand Up @@ -747,6 +756,26 @@ Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

/**
* Check to see if swapping between two instances can be completed. Either the swapOut or
* swapIn instance can be passed in.
* @param clusterName The cluster name
* @param instanceName The instance that is being swapped out or swapped in
* @return True if the swap is ready to be completed, false otherwise.
*/
boolean canCompleteSwap(String clusterName, String instanceName);

/**
* Check to see if swapping between two instances is ready to be completed and complete it if
* possible. Either the swapOut or swapIn instance can be passed in.
*
* @param clusterName The cluster name
* @param instanceName The instance that is being swapped out or swapped in
* @return True if the swap is ready to be completed and was completed successfully, false
* otherwise.
*/
boolean completeSwapIfPossible(String clusterName, String instanceName);

/**
* Return if instance is ready for preparing joining cluster. The instance should have no current state,
* no pending message and tagged with operation that exclude the instance from Helix assignment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.common.caches.TaskCurrentStateCache;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
Expand Down Expand Up @@ -116,6 +118,8 @@ public class BaseControllerDataProvider implements ControlContextProvider {
private Map<String, Map<String, String>> _idealStateRuleMap;
private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
private final Set<String> _disabledInstanceSet = new HashSet<>();
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>();
private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>();
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>();
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>();
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
Expand Down Expand Up @@ -437,6 +441,8 @@ protected synchronized Set<HelixConstants.ChangeType> doRefresh(HelixDataAccesso

updateIdealRuleMap(getClusterConfig());
updateDisabledInstances(getInstanceConfigMap().values(), getClusterConfig());
updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
getClusterConfig());

return refreshedTypes;
}
Expand Down Expand Up @@ -471,6 +477,8 @@ public void setClusterConfig(ClusterConfig clusterConfig) {
refreshAbnormalStateResolverMap(_clusterConfig);
updateIdealRuleMap(_clusterConfig);
updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig);
updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(),
_clusterConfig);
}

@Override
Expand Down Expand Up @@ -617,6 +625,24 @@ public Set<String> getDisabledInstances() {
return Collections.unmodifiableSet(_disabledInstanceSet);
}

/**
* Get all swapping instance pairs.
*
* @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames.
*/
public Map<String, String> getSwapOutToSwapInInstancePairs() {
return Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName);
}

/**
* Get all the enabled and live SWAP_IN instances.
*
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance.
*/
public Set<String> getEnabledLiveSwapInInstanceNames() {
return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames);
}

public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
_liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances));
_updateInstanceOfflineTime = true;
Expand Down Expand Up @@ -750,6 +776,8 @@ public Map<String, InstanceConfig> getInstanceConfigMap() {
public void setInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
_instanceConfigCache.setPropertyMap(instanceConfigMap);
updateDisabledInstances(instanceConfigMap.values(), getClusterConfig());
updateSwappingInstances(instanceConfigMap.values(), getEnabledLiveInstances(),
getClusterConfig());
}

/**
Expand Down Expand Up @@ -858,6 +886,49 @@ private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs,
}
}

private void updateSwappingInstances(Collection<InstanceConfig> instanceConfigs,
Set<String> liveEnabledInstances, ClusterConfig clusterConfig) {
_swapOutInstanceNameToSwapInInstanceName.clear();
_enabledLiveSwapInInstanceNames.clear();

if (clusterConfig == null) {
logger.warn("Skip refreshing swapping instances because clusterConfig is null.");
return;
}

ClusterTopologyConfig clusterTopologyConfig =
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);

Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
Map<String, String> swapInInstancesByLogicalId = new HashMap<>();
instanceConfigs.forEach(instanceConfig -> {
if (instanceConfig == null) {
return;
}
if (instanceConfig.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) {
swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(),
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()));
}
if (instanceConfig.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) {
swapInInstancesByLogicalId.put(
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()),
instanceConfig.getInstanceName());
}
});

swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
String swapInInstanceName = swapInInstancesByLogicalId.get(value);
if (swapInInstanceName != null) {
_swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName);
if (liveEnabledInstances.contains(swapInInstanceName)) {
_enabledLiveSwapInInstanceNames.add(swapInInstanceName);
}
}
});
}

/*
* Check if the instance is timed-out during maintenance mode. An instance is timed-out if it has
* been offline for longer than the user defined timeout window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
import java.util.Optional;
import java.util.Set;

import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand All @@ -56,7 +56,8 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE", "SWAP_IN");
public static ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT =
ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name());

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -113,9 +114,16 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
clusterData.getInstanceConfigMap(), allNodes);
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
liveEnabledNodes.retainAll(allNodesDeduped);

long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
.getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Expand All @@ -127,11 +135,11 @@ public IdealState computeNewIdealState(String resourceName,
clusterConfig, _manager);
}

if (allNodes.isEmpty() || activeNodes.isEmpty()) {
if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s",
resourceName, allNodes, liveEnabledNodes, activeNodes));
resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
Expand All @@ -157,41 +165,58 @@ public IdealState computeNewIdealState(String resourceName,
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
stateCountMap, maxPartition);

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
// We will not assign partition to instances with evacuation and wap-out tag.
List<String> allNodeList = new ArrayList<>(allNodesDeduped);

// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes);
List<String> liveEnabledAssignableNodeList = new ArrayList<>(
// We will not assign partitions to instances with EVACUATE InstanceOperation.
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes));
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);

ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData);
ZNRecord newIdealMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList,
currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
|| liveEnabledAssignableNodeList.size() != activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState),
currentIdealState, replicaCount);

ZNRecord newActiveMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData);
ZNRecord newActiveMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
clusterData);
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
liveEnabledNodes, replicaCount, minActiveReplicas);
}

finalMapping.getListFields().putAll(userDefinedPreferenceList);

// 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
Map<String, String> swapOutToSwapInInstancePairs =
clusterData.getSwapOutToSwapInInstancePairs();
// 2. Get all enabled and live SWAP_IN instances in the cluster.
Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
// 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
// Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
// swap occurring.
if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
}

LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
LOG.debug("allNodes: {}", allNodes);
LOG.debug("allNodes: {}", allNodesDeduped);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
Expand All @@ -201,14 +226,6 @@ public IdealState computeNewIdealState(String resourceName,
return idealState;
}

private static List<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
Expand Down Expand Up @@ -376,7 +393,7 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l
// if preference list is not empty, and we do have new intanceToAdd, we
// should check if it has capacity to hold the partition.
boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache != null;
if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) {
if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) {
// check instanceToAdd instance appears in combinedPreferenceList
for (String instance : instanceToAdd) {
if (combinedPreferenceList.contains(instance)) {
Expand Down Expand Up @@ -409,7 +426,11 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
// We do not want to drop a SWAP_IN node if it is at the end of the preferenceList,
// because partitions are actively being added on this node to prepare for SWAP completion.
if (cache == null || !cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) {
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
}
}
}

Expand Down
Loading

0 comments on commit 32fb092

Please sign in to comment.