-
Notifications
You must be signed in to change notification settings - Fork 229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HelixAdmin APIs and pipeline changes to support Helix Node Swap #2661
Changes from 9 commits
6dc998d
ee03fd2
8956ad6
fa7e067
d2f72a4
a4be83d
c42078f
6d0a3f7
a6557cf
d3887bd
4ca8f26
aa813a9
b9a3d8d
4d31725
476e1fb
ddd6c16
363a61a
853b15d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,10 +46,12 @@ | |
import org.apache.helix.common.caches.PropertyCache; | ||
import org.apache.helix.common.caches.TaskCurrentStateCache; | ||
import org.apache.helix.common.controllers.ControlContextProvider; | ||
import org.apache.helix.constants.InstanceConstants; | ||
import org.apache.helix.controller.LogUtil; | ||
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; | ||
import org.apache.helix.model.ClusterConfig; | ||
import org.apache.helix.model.ClusterConstraints; | ||
import org.apache.helix.model.ClusterTopologyConfig; | ||
import org.apache.helix.model.CurrentState; | ||
import org.apache.helix.model.IdealState; | ||
import org.apache.helix.model.InstanceConfig; | ||
|
@@ -116,6 +118,8 @@ public class BaseControllerDataProvider implements ControlContextProvider { | |
private Map<String, Map<String, String>> _idealStateRuleMap; | ||
private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>(); | ||
private final Set<String> _disabledInstanceSet = new HashSet<>(); | ||
private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>(); | ||
private final Set<String> _enabledLiveSwapInInstanceNames = new HashSet<>(); | ||
private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>(); | ||
private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>(); | ||
private Map<String, LiveInstance> _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>(); | ||
|
@@ -437,6 +441,8 @@ protected synchronized Set<HelixConstants.ChangeType> doRefresh(HelixDataAccesso | |
|
||
updateIdealRuleMap(getClusterConfig()); | ||
updateDisabledInstances(getInstanceConfigMap().values(), getClusterConfig()); | ||
updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(), | ||
getClusterConfig()); | ||
|
||
return refreshedTypes; | ||
} | ||
|
@@ -471,6 +477,8 @@ public void setClusterConfig(ClusterConfig clusterConfig) { | |
refreshAbnormalStateResolverMap(_clusterConfig); | ||
updateIdealRuleMap(_clusterConfig); | ||
updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig); | ||
updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(), | ||
_clusterConfig); | ||
} | ||
|
||
@Override | ||
|
@@ -617,6 +625,24 @@ public Set<String> getDisabledInstances() { | |
return Collections.unmodifiableSet(_disabledInstanceSet); | ||
} | ||
|
||
/** | ||
* Get all swapping instance pairs. | ||
* | ||
* @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames. | ||
*/ | ||
public Map<String, String> getSwapOutToSwapInInstancePairs() { | ||
return Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName); | ||
} | ||
|
||
/** | ||
* Get all the enabled and live SWAP_IN instances. | ||
* | ||
* @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance. | ||
*/ | ||
public Set<String> getEnabledLiveSwapInInstanceNames() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: when we tag SWAP_IN instanceNames, don't we already check if there is already an paring SWAP_OUT instance? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline. TLDR: Sanity checks ensure that we can't have a case where there are two instances with the same logicalId and both have InstanceOperation unset. This allows for setting either SWAP_OUT node or SWAP_IN node instance operation first. |
||
return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames); | ||
} | ||
|
||
public synchronized void setLiveInstances(List<LiveInstance> liveInstances) { | ||
_liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances)); | ||
_updateInstanceOfflineTime = true; | ||
|
@@ -750,6 +776,8 @@ public Map<String, InstanceConfig> getInstanceConfigMap() { | |
public void setInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) { | ||
_instanceConfigCache.setPropertyMap(instanceConfigMap); | ||
updateDisabledInstances(instanceConfigMap.values(), getClusterConfig()); | ||
updateSwappingInstances(instanceConfigMap.values(), getEnabledLiveInstances(), | ||
getClusterConfig()); | ||
} | ||
|
||
/** | ||
|
@@ -858,6 +886,42 @@ private void updateDisabledInstances(Collection<InstanceConfig> instanceConfigs, | |
} | ||
} | ||
|
||
private void updateSwappingInstances(Collection<InstanceConfig> instanceConfigs, | ||
zpinto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Set<String> liveEnabledInstances, ClusterConfig clusterConfig) { | ||
ClusterTopologyConfig clusterTopologyConfig = | ||
ClusterTopologyConfig.createFromClusterConfig(clusterConfig); | ||
_swapOutInstanceNameToSwapInInstanceName.clear(); | ||
_enabledLiveSwapInInstanceNames.clear(); | ||
Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>(); | ||
Map<String, String> swapInInstancesByLogicalId = new HashMap<>(); | ||
instanceConfigs.forEach(instanceConfig -> { | ||
if (instanceConfig == null) { | ||
return; | ||
} | ||
if (instanceConfig.getInstanceOperation() | ||
.equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) { | ||
swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(), | ||
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType())); | ||
} | ||
if (instanceConfig.getInstanceOperation() | ||
.equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) { | ||
swapInInstancesByLogicalId.put( | ||
instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()), | ||
instanceConfig.getInstanceName()); | ||
} | ||
}); | ||
|
||
swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> { | ||
String swapInInstanceName = swapInInstancesByLogicalId.get(value); | ||
if (swapInInstanceName != null) { | ||
_swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName); | ||
if (liveEnabledInstances.contains(swapInInstanceName)) { | ||
_enabledLiveSwapInInstanceNames.add(swapInInstanceName); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
/* | ||
* Check if the instance is timed-out during maintenance mode. An instance is timed-out if it has | ||
* been offline for longer than the user defined timeout window. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,17 +31,17 @@ | |
import java.util.Optional; | ||
import java.util.Set; | ||
|
||
import java.util.stream.Collectors; | ||
import org.apache.helix.HelixDefinedState; | ||
import org.apache.helix.api.config.StateTransitionThrottleConfig; | ||
import org.apache.helix.constants.InstanceConstants; | ||
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; | ||
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; | ||
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; | ||
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; | ||
import org.apache.helix.controller.stages.CurrentStateOutput; | ||
import org.apache.helix.model.ClusterConfig; | ||
import org.apache.helix.model.ClusterTopologyConfig; | ||
import org.apache.helix.model.IdealState; | ||
import org.apache.helix.model.InstanceConfig; | ||
import org.apache.helix.model.Partition; | ||
import org.apache.helix.model.Resource; | ||
import org.apache.helix.model.ResourceAssignment; | ||
|
@@ -56,7 +56,8 @@ | |
*/ | ||
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> { | ||
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class); | ||
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE", "SWAP_IN"); | ||
public static ImmutableSet<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = | ||
ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name()); | ||
|
||
@Override | ||
public IdealState computeNewIdealState(String resourceName, | ||
|
@@ -113,9 +114,16 @@ public IdealState computeNewIdealState(String resourceName, | |
allNodes = clusterData.getAllInstances(); | ||
} | ||
|
||
Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think we could integrate the logic into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a TODO for this. We can do it when we refactor to use states instead of InstanceOperation. |
||
ClusterTopologyConfig.createFromClusterConfig(clusterConfig), | ||
clusterData.getInstanceConfigMap(), allNodes); | ||
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes | ||
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes | ||
liveEnabledNodes.retainAll(allNodesDeduped); | ||
|
||
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig); | ||
Set<String> activeNodes = DelayedRebalanceUtil | ||
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes, | ||
.getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes, | ||
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), | ||
clusterData.getInstanceConfigMap(), delay, clusterConfig); | ||
if (delayRebalanceEnabled) { | ||
|
@@ -127,11 +135,11 @@ public IdealState computeNewIdealState(String resourceName, | |
clusterConfig, _manager); | ||
} | ||
|
||
if (allNodes.isEmpty() || activeNodes.isEmpty()) { | ||
if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) { | ||
LOG.error(String.format( | ||
"No instances or active instances available for resource %s, " | ||
+ "allInstances: %s, liveInstances: %s, activeInstances: %s", | ||
resourceName, allNodes, liveEnabledNodes, activeNodes)); | ||
resourceName, allNodesDeduped, liveEnabledNodes, activeNodes)); | ||
return generateNewIdealState(resourceName, currentIdealState, | ||
emptyMapping(currentIdealState)); | ||
} | ||
|
@@ -158,40 +166,56 @@ public IdealState computeNewIdealState(String resourceName, | |
stateCountMap, maxPartition); | ||
|
||
// sort node lists to ensure consistent preferred assignments | ||
List<String> allNodeList = new ArrayList<>(allNodes); | ||
List<String> allNodeList = new ArrayList<>(allNodesDeduped); | ||
// We will not assign partition to instances with evacuation and wap-out tag. | ||
zpinto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// TODO: Currently we have 2 groups of instances and compute preference list twice and merge. | ||
// Eventually we want to have exclusive groups of instance for different instance tag. | ||
List<String> liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), | ||
liveEnabledNodes); | ||
List<String> liveEnabledAssignableNodeList = new ArrayList<>( | ||
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(), | ||
liveEnabledNodes)); | ||
Collections.sort(allNodeList); | ||
Collections.sort(liveEnabledAssignableNodeList); | ||
|
||
ZNRecord newIdealMapping = _rebalanceStrategy | ||
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData); | ||
ZNRecord newIdealMapping = | ||
_rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, | ||
currentMapping, clusterData); | ||
ZNRecord finalMapping = newIdealMapping; | ||
|
||
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig) | ||
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) { | ||
|| liveEnabledAssignableNodeList.size() != activeNodes.size()) { | ||
List<String> activeNodeList = new ArrayList<>(activeNodes); | ||
Collections.sort(activeNodeList); | ||
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica( | ||
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState), | ||
currentIdealState, replicaCount); | ||
|
||
ZNRecord newActiveMapping = _rebalanceStrategy | ||
.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData); | ||
ZNRecord newActiveMapping = | ||
_rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, | ||
clusterData); | ||
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, | ||
liveEnabledNodes, replicaCount, minActiveReplicas); | ||
} | ||
|
||
finalMapping.getListFields().putAll(userDefinedPreferenceList); | ||
|
||
// 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster. | ||
Map<String, String> swapOutToSwapInInstancePairs = | ||
clusterData.getSwapOutToSwapInInstancePairs(); | ||
// 2. Get all enabled and live SWAP_IN instances in the cluster. | ||
Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames(); | ||
// 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end. | ||
// Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active | ||
// swap occurring. | ||
if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) { | ||
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping, | ||
swapOutToSwapInInstancePairs, enabledLiveSwapInInstances); | ||
} | ||
|
||
LOG.debug("currentMapping: {}", currentMapping); | ||
LOG.debug("stateCountMap: {}", stateCountMap); | ||
LOG.debug("liveEnabledNodes: {}", liveEnabledNodes); | ||
LOG.debug("activeNodes: {}", activeNodes); | ||
LOG.debug("allNodes: {}", allNodes); | ||
LOG.debug("allNodes: {}", allNodesDeduped); | ||
LOG.debug("maxPartition: {}", maxPartition); | ||
LOG.debug("newIdealMapping: {}", newIdealMapping); | ||
LOG.debug("finalMapping: {}", finalMapping); | ||
|
@@ -201,14 +225,6 @@ public IdealState computeNewIdealState(String resourceName, | |
return idealState; | ||
} | ||
|
||
private static List<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap, | ||
Set<String> nodes) { | ||
return nodes.stream() | ||
.filter( | ||
instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState, | ||
ZNRecord newMapping) { | ||
IdealState newIdealState = new IdealState(resourceName); | ||
|
@@ -376,7 +392,7 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l | |
// if preference list is not empty, and we do have new intanceToAdd, we | ||
// should check if it has capacity to hold the partition. | ||
boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache != null; | ||
if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) { | ||
if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) { | ||
// check instanceToAdd instance appears in combinedPreferenceList | ||
for (String instance : instanceToAdd) { | ||
if (combinedPreferenceList.contains(instance)) { | ||
|
@@ -409,7 +425,11 @@ protected Map<String, String> computeBestPossibleStateForPartition(Set<String> l | |
bestPossibleStateMap, preferenceList, combinedPreferenceList)) { | ||
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) { | ||
String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1); | ||
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name()); | ||
// We do not want to drop a SWAP_IN node if it is at the end of the preferenceList, | ||
// because partitions are actively being added on this node to prepare for SWAP completion. | ||
if (cache == null || !cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) { | ||
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name()); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for my lack of understanding, typically swap involves 2 entries, why does the API take only one? Also what does "swap" mean in this context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless instanceName is like "virtual" and there are 2 entities tied to the same virtual instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instance name in this case will be one of the unique instance names(same as InstanceConfig znode ID) involved in the swap. Either SWAP_OUT or SWAP_IN. This API is intended to be used with the PerInstanceAccessor in helix-rest. For that endpoint you pass one instanceName.
Because we can find the matching swap instance for either SWAP_OUT or SWAP_IN node, we can take either as the instance name.
"swap" is referring to the operation between two instances which can be deduced from providing one of the instances involved in the swap.