Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Empty IdealState Calculation with Disabled Nodes in AutoRebalance… #2877

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.TreeSet;

import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,7 +84,6 @@ public void init(String resourceName, final List<String> partitions,
@Override
public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes,
final Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
int numReplicas = countStateReplicas();
ZNRecord znRecord = new ZNRecord(_resourceName);
if (liveNodes.size() == 0) {
return znRecord;
Expand All @@ -97,9 +98,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
List<String> sortedLiveNodes = new ArrayList<String>(liveNodes);
Collections.sort(sortedLiveNodes, currentStateNodeComparator);

int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
_nodeMap = new HashMap<String, Node>();
_nodeMap = new HashMap<>();
_liveNodesList = new ArrayList<Node>();

for (String id : sortedAllNodes) {
Expand All @@ -108,6 +107,10 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
node.hasCeilingCapacity = false;
_nodeMap.put(id, node);
}

int numReplicas = calculateExpectedReplicaCount(clusterData);
int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size();
int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size();
for (int i = 0; i < sortedLiveNodes.size(); i++) {
boolean usingCeiling = false;
int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor;
Expand All @@ -116,7 +119,8 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
distRemainder = distRemainder - 1;
usingCeiling = true;
}
Node node = _nodeMap.get(sortedLiveNodes.get(i));
String nodeName = sortedLiveNodes.get(i);
Node node = _nodeMap.get(nodeName);
node.isAlive = true;
node.capacity = targetSize;
node.hasCeilingCapacity = usingCeiling;
Expand All @@ -127,15 +131,16 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
_stateMap = generateStateMap();

// compute the preferred mapping if all nodes were up
_preferredAssignment = computePreferredPlacement(sortedAllNodes);
_preferredAssignment = computePreferredPlacement(sortedAllNodes, clusterData);

// logger.info("preferred mapping:"+ preferredAssignment);
MarkGaox marked this conversation as resolved.
Show resolved Hide resolved
// from current mapping derive the ones in preferred location
// this will update the nodes with their current fill status
_existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);

// from current mapping derive the ones not in preferred location
_existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping);
_existingNonPreferredAssignment =
computeExistingNonPreferredPlacement(currentMapping, clusterData);

// compute orphaned replicas that are not assigned to any node
_orphaned = computeOrphaned();
Expand All @@ -152,7 +157,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes, final Li
forceToAssignOrphans();
}

prepareResult(znRecord);
prepareResult(znRecord, clusterData);
return znRecord;
}

Expand Down Expand Up @@ -301,7 +306,7 @@ private void moveExcessReplicas() {
* Update a ZNRecord with the results of the rebalancing.
* @param znRecord
*/
private void prepareResult(ZNRecord znRecord) {
private void prepareResult(ZNRecord znRecord, ResourceControllerDataProvider clusterData) {
// The map fields are keyed on partition name to a pair of node and state, i.e. it
// indicates that the partition with given state is served by that node
//
Expand Down Expand Up @@ -336,7 +341,10 @@ private void prepareResult(ZNRecord znRecord) {
}
}
}
normalizePreferenceLists(znRecord.getListFields(), newPreferences);
normalizePreferenceLists(znRecord.getListFields(), newPreferences, clusterData);

String stateModelDef = clusterData.getIdealState(_resourceName).getStateModelDefRef();
StateModelDefinition stateModel = clusterData.getStateModelDef(stateModelDef);

// generate preference maps based on the preference lists
for (String partition : _partitions) {
Expand All @@ -359,6 +367,9 @@ private void forceToAssignOrphans() {
&& receiver.currentlyAssigned < _maximumPerNode && receiver
.canAddIfCapacity(replica)) {
nodeToAssign = receiver;
// Should update the minOverloadedCapacity to find the node with minimum overloaded capacity
minOverloadedCapacity =
Math.min(receiver.currentlyAssigned - receiver.capacity, minOverloadedCapacity);
}
}

Expand All @@ -380,15 +391,15 @@ private void forceToAssignOrphans() {
* assignment
*/
private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
Map<String, List<String>> newPreferences) {
Map<String, List<String>> newPreferences, ResourceControllerDataProvider clusterData) {

Map<String, Map<String, Integer>> nodeReplicaCounts =
new HashMap<String, Map<String, Integer>>();
for (String partition : preferenceLists.keySet()) {
normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts);
normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts, clusterData);
}
for (String partition : newPreferences.keySet()) {
normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts);
normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts, clusterData);
preferenceLists.get(partition).addAll(newPreferences.get(partition));
}
}
Expand All @@ -399,9 +410,13 @@ private void normalizePreferenceLists(Map<String, List<String>> preferenceLists,
* @param nodeReplicaCounts map of (node --> state --> count)
*/
private void normalizePreferenceList(List<String> preferenceList,
Map<String, Map<String, Integer>> nodeReplicaCounts) {
Map<String, Map<String, Integer>> nodeReplicaCounts,
ResourceControllerDataProvider clusterData) {
List<String> newPreferenceList = new ArrayList<String>();
int replicas = Math.min(countStateReplicas(), preferenceList.size());
// Use the expected replica count instead of relying on the _states map.
// This prevents the preference list from being truncated when ANY_LIVEINSTANCE
// is used as the replication factor.
int replicas = Math.min(calculateExpectedReplicaCount(clusterData), preferenceList.size());

// make this a LinkedHashSet to preserve iteration order
Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
Expand Down Expand Up @@ -463,14 +478,14 @@ private int getReplicaCountForNode(String state, String node,

/**
* Compute the subset of the current mapping where replicas are not mapped according to their
* preferred assignment.
* existing preferred assignment.
* @param currentMapping Current mapping of replicas to nodes
* @return The current assignments that do not conform to the preferred assignment
*/
private Map<Replica, Node> computeExistingNonPreferredPlacement(
Map<String, Map<String, String>> currentMapping) {
Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) {
Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
int count = calculateExpectedReplicaCount(clusterData);
for (String partition : currentMapping.keySet()) {
Map<String, String> nodeStateMap = currentMapping.get(partition);
nodeStateMap.keySet().retainAll(_nodeMap.keySet());
Expand All @@ -496,12 +511,11 @@ private Map<Replica, Node> computeExistingNonPreferredPlacement(
throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions");
}

if (_preferredAssignment.get(replica).id != node.id
if (!_preferredAssignment.get(replica).id.equals(node.id)
&& !_existingPreferredAssignment.containsKey(replica)
&& !existingNonPreferredAssignment.containsKey(replica)) {
existingNonPreferredAssignment.put(replica, node);
node.nonPreferred.add(replica);

break;
}
}
Expand Down Expand Up @@ -548,7 +562,7 @@ private Set<Replica> computeOrphaned() {
private Map<Replica, Node> computeExistingPreferredPlacement(
final Map<String, Map<String, String>> currentMapping) {
Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>();
int count = countStateReplicas();
int count = calculateStatesReplicaCount();
for (String partition : currentMapping.keySet()) {
Map<String, String> nodeStateMap = currentMapping.get(partition);
nodeStateMap.keySet().retainAll(_nodeMap.keySet());
Expand All @@ -560,7 +574,7 @@ private Map<Replica, Node> computeExistingPreferredPlacement(
Replica replica = new Replica(partition, replicaId);
if (_preferredAssignment.containsKey(replica)
&& !existingPreferredAssignment.containsKey(replica)
&& _preferredAssignment.get(replica).id == node.id) {
&& _preferredAssignment.get(replica).id.equals(node.id)) {
existingPreferredAssignment.put(replica, node);
node.preferred.add(replica);
break;
Expand All @@ -576,16 +590,18 @@ private Map<Replica, Node> computeExistingPreferredPlacement(
* Given a predefined set of all possible nodes, compute an assignment of replicas to
* nodes that evenly assigns all replicas to nodes.
* @param allNodes Identifiers to all nodes, live and non-live
* @param clusterData
* @return Preferred assignment of replicas
*/
private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) {
private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes,
ResourceControllerDataProvider clusterData) {
Map<Replica, Node> preferredMapping;
preferredMapping = new HashMap<Replica, Node>();
int partitionId = 0;
int numReplicas = countStateReplicas();
int count = countStateReplicas();
// Count the total number of replicas that should be assigned assuming all nodes are up
int numReplicas = calculateExpectedReplicaCount(clusterData);
for (String partition : _partitions) {
for (int replicaId = 0; replicaId < count; replicaId++) {
for (int replicaId = 0; replicaId < numReplicas; replicaId++) {
Replica replica = new Replica(partition, replicaId);
String nodeName =
_placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas,
Expand All @@ -598,17 +614,40 @@ private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes
}

/**
* Counts the total number of replicas given a state-count mapping
* Calculates the total number of replicas based on the state-count mapping
* which only includes the states of live instances.
* @return
*/
private int countStateReplicas() {
private int calculateStatesReplicaCount() {
int total = 0;
for (Integer count : _states.values()) {
total += count;
}
return total;
}

/**
* Calculates the expected total number of replicas assuming full cluster availability.
* @param clusterData the cache that stores all cluster data
* @return The total number of replicas that should be assigned
*/
private int calculateExpectedReplicaCount(ResourceControllerDataProvider clusterData) {
IdealState currentIdealState = clusterData.getIdealState(_resourceName);
// Recompute the total number of replicas because for resources with ANY_LIVEINSTANCE,
// the replica count should match the total number of instances in the cluster.
// The _states map cannot be used for this calculation, as it only accounts for live instances.
int totalReplicaCount = currentIdealState.getReplicaCount(_nodeMap.keySet().size());
StateModelDefinition stateModelDef =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());
LinkedHashMap<String, Integer> stateToCountMap =
stateModelDef.getStateCountMap(_nodeMap.keySet().size(), totalReplicaCount);
int total = 0;
for (Integer count : stateToCountMap.values()) {
total += count;
}
return total;
}

/**
* Compute a map of replica ids to state names
* @return Map: replica id -> state name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public ZNRecord computePartitionAssignment(final List<String> liveNodes,
// compute the preferred mapping if all nodes were up
_preferredAssignment = computePreferredPlacement(allNodes);

// logger.info("preferred mapping:"+ preferredAssignment);
// from current mapping derive the ones in preferred location
// this will update the nodes with their current fill status
_existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping);
Expand Down
29 changes: 29 additions & 0 deletions helix-core/src/test/java/org/apache/helix/TestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import org.apache.commons.io.FileUtils;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.integration.manager.ZkTestManager;
Expand All @@ -48,8 +50,10 @@
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageType;
Expand All @@ -73,6 +77,9 @@
import org.slf4j.LoggerFactory;
import org.testng.Assert;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestHelper {
private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
Expand Down Expand Up @@ -864,4 +871,26 @@ public static void printZkListeners(HelixZkClient client) throws Exception {
}
System.out.println("}");
}

public static ResourceControllerDataProvider buildMockDataCache(String resourceName,
String numOfReplicas, String stateModelDef, StateModelDefinition stateModel,
Set<String> disabledInstances) {
ClusterConfig config = new ClusterConfig("cluster");
config.setRebalanceDelayTime(0);
IdealState idealState = new IdealState(resourceName);
idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
idealState.setReplicas(numOfReplicas);
idealState.setStateModelDefRef(stateModelDef);
idealState.setRebalanceStrategy(
"org.apache.helix.controller.rebalancer.strategy." + "AutoRebalanceStrategy");
ResourceControllerDataProvider dataCache = mock(ResourceControllerDataProvider.class);
when(dataCache.getStateModelDef(stateModelDef)).thenReturn(stateModel);
when(dataCache.getIdealState(resourceName)).thenReturn(idealState);
when(dataCache.getDisabledInstances()).thenReturn(disabledInstances);
when(dataCache.getClusterConfig()).thenReturn(config);
when(dataCache.getAbnormalStateResolver(any()))
.thenReturn(MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
when(dataCache.getDisabledInstancesForPartition(any(), any())).thenReturn(disabledInstances);
return dataCache;
}
}
Loading
Loading