Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WAGED to only use logicalId when computing baseline and centralize picking assignable instances in the cache #2702

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private void clearCachedComputation() {
HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
switch (changeType) {
case INSTANCE_CONFIG:
return snapshot.getInstanceConfigMap();
return snapshot.getAssignableInstanceConfigMap();
case IDEAL_STATE:
return snapshot.getIdealStateMap();
case RESOURCE_CONFIG:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
class ResourceChangeSnapshot {

private Set<HelixConstants.ChangeType> _changedTypes;
private Map<String, InstanceConfig> _instanceConfigMap;
private Map<String, InstanceConfig> _allInstanceConfigMap;
private Map<String, InstanceConfig> _assignableInstanceConfigMap;
private Map<String, IdealState> _idealStateMap;
private Map<String, ResourceConfig> _resourceConfigMap;
private Map<String, LiveInstance> _liveInstances;
Expand All @@ -61,7 +62,8 @@ class ResourceChangeSnapshot {
*/
ResourceChangeSnapshot() {
_changedTypes = new HashSet<>();
_instanceConfigMap = new HashMap<>();
_allInstanceConfigMap = new HashMap<>();
_assignableInstanceConfigMap = new HashMap<>();
_idealStateMap = new HashMap<>();
_resourceConfigMap = new HashMap<>();
_liveInstances = new HashMap<>();
Expand All @@ -80,12 +82,16 @@ class ResourceChangeSnapshot {
ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider,
boolean ignoreNonTopologyChange) {
_changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());

_instanceConfigMap = ignoreNonTopologyChange ?
_allInstanceConfigMap = ignoreNonTopologyChange ?
dataProvider.getInstanceConfigMap().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
new HashMap<>(dataProvider.getInstanceConfigMap());
_assignableInstanceConfigMap = ignoreNonTopologyChange ?
dataProvider.getAssignableInstanceConfigMap().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
e -> InstanceConfigTrimmer.getInstance().trimProperty(e.getValue()))) :
new HashMap<>(dataProvider.getAssignableInstanceConfigMap());
_idealStateMap = ignoreNonTopologyChange ?
dataProvider.getIdealStates().entrySet().parallelStream().collect(Collectors
.toMap(e -> e.getKey(),
Expand All @@ -99,7 +105,7 @@ class ResourceChangeSnapshot {
_clusterConfig = ignoreNonTopologyChange ?
ClusterConfigTrimmer.getInstance().trimProperty(dataProvider.getClusterConfig()) :
dataProvider.getClusterConfig();
_liveInstances = new HashMap<>(dataProvider.getLiveInstances());
_liveInstances = new HashMap<>(dataProvider.getAssignableLiveInstances());
}

/**
Expand All @@ -108,7 +114,8 @@ class ResourceChangeSnapshot {
*/
ResourceChangeSnapshot(ResourceChangeSnapshot snapshot) {
_changedTypes = new HashSet<>(snapshot._changedTypes);
_instanceConfigMap = new HashMap<>(snapshot._instanceConfigMap);
_allInstanceConfigMap = new HashMap<>(snapshot._allInstanceConfigMap);
_assignableInstanceConfigMap = new HashMap<>(snapshot._assignableInstanceConfigMap);
_idealStateMap = new HashMap<>(snapshot._idealStateMap);
_resourceConfigMap = new HashMap<>(snapshot._resourceConfigMap);
_liveInstances = new HashMap<>(snapshot._liveInstances);
Expand All @@ -120,7 +127,11 @@ Set<HelixConstants.ChangeType> getChangedTypes() {
}

Map<String, InstanceConfig> getInstanceConfigMap() {
return _instanceConfigMap;
return _allInstanceConfigMap;
}

Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
return _assignableInstanceConfigMap;
}

Map<String, IdealState> getIdealStateMap() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.common.caches.TaskCurrentStateCache;
import org.apache.helix.model.CurrentState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.AbstractDataCache;
Expand Down Expand Up @@ -164,7 +163,7 @@ public void setParticipantActiveTaskCount(String instance, int taskCount) {
*/
public void resetActiveTaskCount(CurrentStateOutput currentStateOutput) {
// init participant map
for (String liveInstance : getLiveInstances().keySet()) {
for (String liveInstance : getAssignableLiveInstances().keySet()) {
_participantActiveTaskCount.put(liveInstance, 0);
}
// Active task == init and running tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ public ResourceAssignment computeBestPossiblePartitionState(T cache, IdealState
Set<String> disabledInstancesForPartition =
cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
List<String> preferenceList = getPreferenceList(partition, idealState,
Collections.unmodifiableSet(cache.getLiveInstances().keySet()));
Collections.unmodifiableSet(cache.getAssignableLiveInstances().keySet()));
Map<String, String> bestStateForPartition =
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
computeBestPossibleStateForPartition(cache.getAssignableLiveInstances().keySet(),
stateModelDef,
preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
cache.getClusterConfig(), partition,
cache.getAbnormalStateResolver(stateModelDefName), cache);
Expand Down Expand Up @@ -392,8 +393,14 @@ protected Map<String, String> computeBestPossibleMap(List<String> preferenceList
* transition to the top-state, which could minimize the impact to the application's availability.
* To achieve that, we sort the preferenceList based on CurrentState, by treating top-state and
* second-states with same priority and rely on the fact that Collections.sort() is stable.
* @param preferenceList List of instances the replica will be placed on
* @param stateModelDef State model definition
* @param currentStateMap Current state of each replica <instance: state>
* @param liveInstances Set of live instances
* @param disabledInstancesForPartition Set of disabled instances for the partition
* @param bestPossibleStateMap Output map of <instance: state> for the partition
*/
private void assignStatesToInstances(final List<String> preferenceList,
public void assignStatesToInstances(final List<String> preferenceList,
final StateModelDefinition stateModelDef, final Map<String, String> currentStateMap,
final Set<String> liveInstances, final Set<String> disabledInstancesForPartition,
Map<String, String> bestPossibleStateMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ public IdealState computeNewIdealState(String resourceName,
LOG.error("State Model Definition null for resource: " + resourceName);
throw new HelixException("State Model Definition null for resource: " + resourceName);
}
Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
Map<String, LiveInstance> liveInstance = clusterData.getAssignableLiveInstances();
int replicas = currentIdealState.getReplicaCount(liveInstance.size());

LinkedHashMap<String, Integer> stateCountMap = stateModelDef
.getStateCountMap(liveInstance.size(), replicas);
List<String> liveNodes = new ArrayList<>(liveInstance.keySet());
List<String> allNodes = new ArrayList<>(clusterData.getAllInstances());
List<String> allNodes = new ArrayList<>(clusterData.getAssignableInstances());
allNodes.removeAll(clusterData.getDisabledInstances());
liveNodes.retainAll(allNodes);

Expand All @@ -90,7 +90,7 @@ public IdealState computeNewIdealState(String resourceName,
Set<String> taggedLiveNodes = new HashSet<String>();
if (currentIdealState.getInstanceGroupTag() != null) {
for (String instanceName : allNodes) {
if (clusterData.getInstanceConfigMap().get(instanceName)
if (clusterData.getAssignableInstanceConfigMap().get(instanceName)
.containsTag(currentIdealState.getInstanceGroupTag())) {
taggedNodes.add(instanceName);
if (liveInstance.containsKey(instanceName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private Map<String, String> computeCustomizedBestStateForPartition(
return instanceStateMap;
}

Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances();
Map<String, LiveInstance> liveInstancesMap = cache.getAssignableLiveInstances();
for (String instance : idealStateMap.keySet()) {
boolean notInErrorState = currentStateMap != null
&& !HelixDefinedState.ERROR.toString().equals(currentStateMap.get(instance));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
Expand Down Expand Up @@ -100,8 +99,8 @@ public IdealState computeNewIdealState(String resourceName,

String instanceTag = currentIdealState.getInstanceGroupTag();
if (instanceTag != null) {
liveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
allNodes = clusterData.getInstancesWithTag(instanceTag);
liveEnabledNodes = clusterData.getAssignableEnabledLiveInstancesWithTag(instanceTag);
allNodes = clusterData.getAssignableInstancesWithTag(instanceTag);

if (LOG.isInfoEnabled()) {
LOG.info(String.format(
Expand All @@ -110,36 +109,31 @@ public IdealState computeNewIdealState(String resourceName,
currentIdealState.getInstanceGroupTag(), resourceName, allNodes, liveEnabledNodes));
}
} else {
liveEnabledNodes = clusterData.getEnabledLiveInstances();
allNodes = clusterData.getAllInstances();
liveEnabledNodes = clusterData.getAssignableEnabledLiveInstances();
allNodes = clusterData.getAssignableInstances();
}

Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
clusterData.getInstanceConfigMap(), allNodes);
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
liveEnabledNodes.retainAll(allNodesDeduped);

long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
Set<String> activeNodes =
DelayedRebalanceUtil.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay,
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), delay,
clusterConfig, _manager);
}

if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
if (allNodes.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s",
resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
+ "allInstances: %s, liveInstances: %s, activeInstances: %s", resourceName, allNodes,
liveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
Expand All @@ -165,13 +159,14 @@ public IdealState computeNewIdealState(String resourceName,
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
stateCountMap, maxPartition);

List<String> allNodeList = new ArrayList<>(allNodesDeduped);
List<String> allNodeList = new ArrayList<>(allNodes);

// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = new ArrayList<>(
// We will not assign partitions to instances with EVACUATE InstanceOperation.
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
DelayedRebalanceUtil.filterOutEvacuatingInstances(
clusterData.getAssignableInstanceConfigMap(),
liveEnabledNodes));
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Expand Down Expand Up @@ -199,24 +194,11 @@ public IdealState computeNewIdealState(String resourceName,

finalMapping.getListFields().putAll(userDefinedPreferenceList);

// 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster.
Map<String, String> swapOutToSwapInInstancePairs =
clusterData.getSwapOutToSwapInInstancePairs();
// 2. Get all enabled and live SWAP_IN instances in the cluster.
Set<String> enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames();
// 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end.
// Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active
// swap occurring.
if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) {
DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping,
swapOutToSwapInInstancePairs, enabledLiveSwapInInstances);
}

LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
zpinto marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("activeNodes: {}", activeNodes);
LOG.debug("allNodes: {}", allNodesDeduped);
LOG.debug("allNodes: {}", allNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
Expand Down Expand Up @@ -274,14 +256,15 @@ public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDa
LOG.debug("Processing resource:" + resource.getResourceName());
}

Set<String> allNodes = cache.getEnabledInstances();
Set<String> liveNodes = cache.getLiveInstances().keySet();
Set<String> allNodes = cache.getAssignableEnabledInstances();
Set<String> liveNodes = cache.getAssignableLiveInstances().keySet();

ClusterConfig clusterConfig = cache.getClusterConfig();
long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
cache.getLiveInstances().keySet(), cache.getInstanceConfigMap(), delayTime,
cache.getAssignableLiveInstances().keySet(), cache.getAssignableInstanceConfigMap(),
delayTime,
clusterConfig);

String stateModelDefName = idealState.getStateModelDefRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) {
// validate the instance configs
Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
if (instanceConfigMap == null || !instanceConfigMap.keySet().containsAll(allNodes)) {
throw new HelixException(String.format("Config for instances %s is not found!",
allNodes.removeAll(instanceConfigMap.keySet())));
Expand Down Expand Up @@ -116,7 +116,8 @@ private ZNRecord computeBestPartitionAssignment(List<String> allNodes, List<Stri
if (!origPartitionMap.isEmpty()) {
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo =
new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
clusterData.getClusterConfig());
// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
// Since instance weight will be replaced by constraint evaluation, record it in advance to avoid
// overwriting.
Map<String, Integer> instanceWeightRecords = new HashMap<>();
for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) {
for (InstanceConfig instanceConfig : clusterData.getAssignableInstanceConfigMap().values()) {
if (instanceConfig.getWeight() != InstanceConfig.WEIGHT_NOT_SET) {
instanceWeightRecords.put(instanceConfig.getInstanceName(), instanceConfig.getWeight());
}
Expand All @@ -163,7 +163,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
List<String> candidates = new ArrayList<>(allNodes);
// Only calculate for configured nodes.
// Remove all non-configured nodes.
candidates.retainAll(clusterData.getAllInstances());
candidates.retainAll(clusterData.getAssignableInstances());

// For generating the IdealState ZNRecord
Map<String, List<String>> preferenceList = new HashMap<>();
Expand Down Expand Up @@ -207,7 +207,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,

// recover the original weight
for (String instanceName : instanceWeightRecords.keySet()) {
clusterData.getInstanceConfigMap().get(instanceName)
clusterData.getAssignableInstanceConfigMap().get(instanceName)
.setWeight(instanceWeightRecords.get(instanceName));
}

Expand Down Expand Up @@ -297,7 +297,7 @@ private List<String> computeSinglePartitionAssignment(String partitionName,
}
// Limit the weight to be at least MIN_INSTANCE_WEIGHT
for (int i = 0; i < instancePriority.length; i++) {
clusterData.getInstanceConfigMap().get(qualifiedNodes.get(i))
clusterData.getAssignableInstanceConfigMap().get(qualifiedNodes.get(i))
.setWeight(instancePriority[i] - baseline + MIN_INSTANCE_WEIGHT);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void init(String resourceName, final List<String> partitions,
public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
Node topNode = _clusterTopo.getRootNode();
Expand Down
Loading
Loading