diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java index 68279c81f3..410d5ba595 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java @@ -34,6 +34,8 @@ import java.util.TreeSet; import org.apache.helix.HelixManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.slf4j.Logger; @@ -82,7 +84,6 @@ public void init(String resourceName, final List partitions, @Override public ZNRecord computePartitionAssignment(final List allNodes, final List liveNodes, final Map> currentMapping, ResourceControllerDataProvider clusterData) { - int numReplicas = countStateReplicas(); ZNRecord znRecord = new ZNRecord(_resourceName); if (liveNodes.size() == 0) { return znRecord; @@ -97,9 +98,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, final Li List sortedLiveNodes = new ArrayList(liveNodes); Collections.sort(sortedLiveNodes, currentStateNodeComparator); - int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size(); - int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size(); - _nodeMap = new HashMap(); + _nodeMap = new HashMap<>(); _liveNodesList = new ArrayList(); for (String id : sortedAllNodes) { @@ -108,6 +107,10 @@ public ZNRecord computePartitionAssignment(final List allNodes, final Li node.hasCeilingCapacity = false; _nodeMap.put(id, node); } + + int numReplicas = calculateExpectedReplicaCount(clusterData); + int distRemainder = (numReplicas * _partitions.size()) % sortedLiveNodes.size(); + int distFloor = (numReplicas * _partitions.size()) / sortedLiveNodes.size(); for (int i = 0; i < sortedLiveNodes.size(); i++) { boolean usingCeiling = false; int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor; @@ -116,7 +119,8 @@ public ZNRecord computePartitionAssignment(final List allNodes, final Li distRemainder = distRemainder - 1; usingCeiling = true; } - Node node = _nodeMap.get(sortedLiveNodes.get(i)); + String nodeName = sortedLiveNodes.get(i); + Node node = _nodeMap.get(nodeName); node.isAlive = true; node.capacity = targetSize; node.hasCeilingCapacity = usingCeiling; @@ -127,7 +131,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, final Li _stateMap = generateStateMap(); // compute the preferred mapping if all nodes were up - _preferredAssignment = computePreferredPlacement(sortedAllNodes); + _preferredAssignment = computePreferredPlacement(sortedAllNodes, clusterData); // logger.info("preferred mapping:"+ preferredAssignment); // from current mapping derive the ones in preferred location @@ -135,7 +139,8 @@ public ZNRecord computePartitionAssignment(final List allNodes, final Li _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping); // from current mapping derive the ones not in preferred location - _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping); + _existingNonPreferredAssignment = + computeExistingNonPreferredPlacement(currentMapping, clusterData); // compute orphaned replicas that are not assigned to any node _orphaned = computeOrphaned(); @@ -152,7 +157,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, final Li forceToAssignOrphans(); } - prepareResult(znRecord); + prepareResult(znRecord, clusterData); return znRecord; } @@ -301,7 +306,7 @@ private void moveExcessReplicas() { * Update a ZNRecord with the results of the rebalancing. * @param znRecord */ - private void prepareResult(ZNRecord znRecord) { + private void prepareResult(ZNRecord znRecord, ResourceControllerDataProvider clusterData) { // The map fields are keyed on partition name to a pair of node and state, i.e. it // indicates that the partition with given state is served by that node // @@ -336,7 +341,10 @@ private void prepareResult(ZNRecord znRecord) { } } } - normalizePreferenceLists(znRecord.getListFields(), newPreferences); + normalizePreferenceLists(znRecord.getListFields(), newPreferences, clusterData); + + String stateModelDef = clusterData.getIdealState(_resourceName).getStateModelDefRef(); + StateModelDefinition stateModel = clusterData.getStateModelDef(stateModelDef); // generate preference maps based on the preference lists for (String partition : _partitions) { @@ -359,6 +367,9 @@ private void forceToAssignOrphans() { && receiver.currentlyAssigned < _maximumPerNode && receiver .canAddIfCapacity(replica)) { nodeToAssign = receiver; + // Should update the minOverloadedCapacity to find the node with minimum overloaded capacity + minOverloadedCapacity = + Math.min(receiver.currentlyAssigned - receiver.capacity, minOverloadedCapacity); } } @@ -380,15 +391,15 @@ private void forceToAssignOrphans() { * assignment */ private void normalizePreferenceLists(Map> preferenceLists, - Map> newPreferences) { + Map> newPreferences, ResourceControllerDataProvider clusterData) { Map> nodeReplicaCounts = new HashMap>(); for (String partition : preferenceLists.keySet()) { - normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts); + normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts, clusterData); } for (String partition : newPreferences.keySet()) { - normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts); + normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts, clusterData); preferenceLists.get(partition).addAll(newPreferences.get(partition)); } } @@ -399,9 +410,13 @@ private void normalizePreferenceLists(Map> preferenceLists, * @param nodeReplicaCounts map of (node --> state --> count) */ private void normalizePreferenceList(List preferenceList, - Map> nodeReplicaCounts) { + Map> nodeReplicaCounts, + ResourceControllerDataProvider clusterData) { List newPreferenceList = new ArrayList(); - int replicas = Math.min(countStateReplicas(), preferenceList.size()); + // Use the expected replica count instead of relying on the _states map. + // This prevents the preference list from being truncated when ANY_LIVEINSTANCE + // is used as the replication factor. + int replicas = Math.min(calculateExpectedReplicaCount(clusterData), preferenceList.size()); // make this a LinkedHashSet to preserve iteration order Set notAssigned = new LinkedHashSet(preferenceList); @@ -463,14 +478,14 @@ private int getReplicaCountForNode(String state, String node, /** * Compute the subset of the current mapping where replicas are not mapped according to their - * preferred assignment. + * existing preferred assignment. * @param currentMapping Current mapping of replicas to nodes * @return The current assignments that do not conform to the preferred assignment */ private Map computeExistingNonPreferredPlacement( - Map> currentMapping) { + Map> currentMapping, ResourceControllerDataProvider clusterData) { Map existingNonPreferredAssignment = new TreeMap(); - int count = countStateReplicas(); + int count = calculateExpectedReplicaCount(clusterData); for (String partition : currentMapping.keySet()) { Map nodeStateMap = currentMapping.get(partition); nodeStateMap.keySet().retainAll(_nodeMap.keySet()); @@ -496,12 +511,11 @@ private Map computeExistingNonPreferredPlacement( throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions"); } - if (_preferredAssignment.get(replica).id != node.id + if (!_preferredAssignment.get(replica).id.equals(node.id) && !_existingPreferredAssignment.containsKey(replica) && !existingNonPreferredAssignment.containsKey(replica)) { existingNonPreferredAssignment.put(replica, node); node.nonPreferred.add(replica); - break; } } @@ -548,7 +562,7 @@ private Set computeOrphaned() { private Map computeExistingPreferredPlacement( final Map> currentMapping) { Map existingPreferredAssignment = new TreeMap(); - int count = countStateReplicas(); + int count = calculateStatesReplicaCount(); for (String partition : currentMapping.keySet()) { Map nodeStateMap = currentMapping.get(partition); nodeStateMap.keySet().retainAll(_nodeMap.keySet()); @@ -560,7 +574,7 @@ private Map computeExistingPreferredPlacement( Replica replica = new Replica(partition, replicaId); if (_preferredAssignment.containsKey(replica) && !existingPreferredAssignment.containsKey(replica) - && _preferredAssignment.get(replica).id == node.id) { + && _preferredAssignment.get(replica).id.equals(node.id)) { existingPreferredAssignment.put(replica, node); node.preferred.add(replica); break; @@ -576,16 +590,18 @@ private Map computeExistingPreferredPlacement( * Given a predefined set of all possible nodes, compute an assignment of replicas to * nodes that evenly assigns all replicas to nodes. * @param allNodes Identifiers to all nodes, live and non-live + * @param clusterData * @return Preferred assignment of replicas */ - private Map computePreferredPlacement(final List allNodes) { + private Map computePreferredPlacement(final List allNodes, + ResourceControllerDataProvider clusterData) { Map preferredMapping; preferredMapping = new HashMap(); int partitionId = 0; - int numReplicas = countStateReplicas(); - int count = countStateReplicas(); + // Count the total number of replicas that should be assigned assuming all nodes are up + int numReplicas = calculateExpectedReplicaCount(clusterData); for (String partition : _partitions) { - for (int replicaId = 0; replicaId < count; replicaId++) { + for (int replicaId = 0; replicaId < numReplicas; replicaId++) { Replica replica = new Replica(partition, replicaId); String nodeName = _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas, @@ -598,10 +614,11 @@ private Map computePreferredPlacement(final List allNodes } /** - * Counts the total number of replicas given a state-count mapping + * Calculates the total number of replicas based on the state-count mapping + * which only includes the states of live instances. * @return */ - private int countStateReplicas() { + private int calculateStatesReplicaCount() { int total = 0; for (Integer count : _states.values()) { total += count; @@ -609,6 +626,28 @@ private int countStateReplicas() { return total; } + /** + * Calculates the expected total number of replicas assuming full cluster availability. + * @param clusterData the cache that stores all cluster data + * @return The total number of replicas that should be assigned + */ + private int calculateExpectedReplicaCount(ResourceControllerDataProvider clusterData) { + IdealState currentIdealState = clusterData.getIdealState(_resourceName); + // Recompute the total number of replicas because for resources with ANY_LIVEINSTANCE, + // the replica count should match the total number of instances in the cluster. + // The _states map cannot be used for this calculation, as it only accounts for live instances. + int totalReplicaCount = currentIdealState.getReplicaCount(_nodeMap.keySet().size()); + StateModelDefinition stateModelDef = + clusterData.getStateModelDef(currentIdealState.getStateModelDefRef()); + LinkedHashMap stateToCountMap = + stateModelDef.getStateCountMap(_nodeMap.keySet().size(), totalReplicaCount); + int total = 0; + for (Integer count : stateToCountMap.values()) { + total += count; + } + return total; + } + /** * Compute a map of replica ids to state names * @return Map: replica id -> state name diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java index b6c02a818b..47ddea079b 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java @@ -118,7 +118,6 @@ public ZNRecord computePartitionAssignment(final List liveNodes, // compute the preferred mapping if all nodes were up _preferredAssignment = computePreferredPlacement(allNodes); - // logger.info("preferred mapping:"+ preferredAssignment); // from current mapping derive the ones in preferred location // this will update the nodes with their current fill status _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping); diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java index 9dbba34769..2066178614 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelper.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java @@ -40,6 +40,8 @@ import org.apache.commons.io.FileUtils; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.integration.manager.ZkTestManager; @@ -48,8 +50,10 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; @@ -73,6 +77,9 @@ import org.slf4j.LoggerFactory; import org.testng.Assert; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestHelper { private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class); @@ -864,4 +871,26 @@ public static void printZkListeners(HelixZkClient client) throws Exception { } System.out.println("}"); } + + public static ResourceControllerDataProvider buildMockDataCache(String resourceName, + String numOfReplicas, String stateModelDef, StateModelDefinition stateModel, + Set disabledInstances) { + ClusterConfig config = new ClusterConfig("cluster"); + config.setRebalanceDelayTime(0); + IdealState idealState = new IdealState(resourceName); + idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + idealState.setReplicas(numOfReplicas); + idealState.setStateModelDefRef(stateModelDef); + idealState.setRebalanceStrategy( + "org.apache.helix.controller.rebalancer.strategy." + "AutoRebalanceStrategy"); + ResourceControllerDataProvider dataCache = mock(ResourceControllerDataProvider.class); + when(dataCache.getStateModelDef(stateModelDef)).thenReturn(stateModel); + when(dataCache.getIdealState(resourceName)).thenReturn(idealState); + when(dataCache.getDisabledInstances()).thenReturn(disabledInstances); + when(dataCache.getClusterConfig()).thenReturn(config); + when(dataCache.getAbnormalStateResolver(any())) + .thenReturn(MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER); + when(dataCache.getDisabledInstancesForPartition(any(), any())).thenReturn(disabledInstances); + return dataCache; + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java index 905c5552bd..55fe690c4c 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java @@ -19,6 +19,7 @@ * under the License. */ +import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -41,6 +42,7 @@ import org.apache.helix.HelixDefinedState; import org.apache.helix.MockAccessor; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.TestHelper; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; @@ -48,8 +50,14 @@ import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; +import org.apache.helix.model.LeaderStandbySMD; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.model.OnlineOfflineSMD; import org.apache.helix.model.Partition; +import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.StateModelConfigGenerator; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -58,8 +66,12 @@ import org.testng.Assert; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestAutoRebalanceStrategy { private static Logger logger = LoggerFactory.getLogger(TestAutoRebalanceStrategy.class); + private static final String DEFAULT_STATE_MODEL = "OnlineOffline"; /** * Sanity test for a basic Master-Slave model @@ -117,11 +129,12 @@ private void runTest(String name, int numIterations, int numPartitions, int numL for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) { states.put(stateNames[i], stateCounts[i]); } + int replicaCount = states.values().stream().mapToInt(i -> i).sum(); StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, - stateModelDef).runRepeatedly(numIterations); + replicaCount + "", stateModelDef).runRepeatedly(numIterations); } /** @@ -160,12 +173,14 @@ class AutoRebalanceTester { private Map> _currentMapping; private List _allNodes; private int _maxPerNode; + private String _numOfReplica; private StateModelDefinition _stateModelDef; private Random _random; public AutoRebalanceTester(List partitions, LinkedHashMap states, List liveNodes, Map> currentMapping, - List allNodes, int maxPerNode, StateModelDefinition stateModelDef) { + List allNodes, int maxPerNode, String numOfReplica, + StateModelDefinition stateModelDef) { _partitions = partitions; _states = states; _liveNodes = liveNodes; @@ -183,6 +198,7 @@ public AutoRebalanceTester(List partitions, LinkedHashMap partitions, LinkedHashMap> listResult = znRecord.getListFields(); final Map> mapResult = getMapping(listResult); @@ -387,6 +406,13 @@ private boolean atMostOnePartitionReplicaPerNode(final Map> Set nodeSet = new HashSet(partitionEntry.getValue()); int numUniques = nodeSet.size(); int total = partitionEntry.getValue().size(); + int expectedPreferenceListSize = _numOfReplica.equals("ANY_LIVEINSTANCE") ? _allNodes.size() + : Integer.parseInt(_numOfReplica); + if (nodeSet.size() != expectedPreferenceListSize) { + logger.error("ERROR: Partition " + partitionEntry.getKey() + " expect " + expectedPreferenceListSize + + " of replicas, but the preference list has " + listFields.size() + " nodes!"); + return false; + } if (numUniques < total) { logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total + " nodes, but only " + numUniques + " are unique!"); @@ -480,15 +506,15 @@ private Map> getStateBucketsForNode( * Randomly choose between killing, adding, or resurrecting a single node * @return (Partition -> (Node -> State)) ZNRecord */ - public ZNRecord runOnceRandomly() { + public ZNRecord runOnceRandomly(ResourceControllerDataProvider dataProvider) { double choose = _random.nextDouble(); ZNRecord result = null; if (choose < P_KILL) { - result = removeSingleNode(null); + result = removeSingleNode(null, dataProvider); } else if (choose < P_KILL + P_ADD) { - result = addSingleNode(null); + result = addSingleNode(null, dataProvider); } else if (choose < P_KILL + P_ADD + P_RESURRECT) { - result = resurrectSingleNode(null); + result = resurrectSingleNode(null, dataProvider); } return result; } @@ -499,7 +525,7 @@ public ZNRecord runOnceRandomly() { * Optional String to add * @return ZNRecord result returned by the rebalancer */ - public ZNRecord addSingleNode(String node) { + public ZNRecord addSingleNode(String node, ResourceControllerDataProvider dataProvider) { logger.info("=================== add node ================="); if (_nonLiveSet.size() == 0) { logger.warn("Cannot add node because there are no nodes left to add."); @@ -516,7 +542,7 @@ public ZNRecord addSingleNode(String node) { _nonLiveSet.remove(node); return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). - computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, dataProvider); } /** @@ -525,7 +551,7 @@ public ZNRecord addSingleNode(String node) { * Optional String to remove * @return ZNRecord result returned by the rebalancer */ - public ZNRecord removeSingleNode(String node) { + public ZNRecord removeSingleNode(String node, ResourceControllerDataProvider dataProvider) { logger.info("=================== remove node ================="); if (_liveSet.size() == 0) { logger.warn("Cannot remove node because there are no nodes left to remove."); @@ -550,7 +576,7 @@ public ZNRecord removeSingleNode(String node) { } return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, dataProvider); } /** @@ -559,7 +585,7 @@ public ZNRecord removeSingleNode(String node) { * Optional String to resurrect * @return ZNRecord result returned by the rebalancer */ - public ZNRecord resurrectSingleNode(String node) { + public ZNRecord resurrectSingleNode(String node, ResourceControllerDataProvider dataProvider) { logger.info("=================== resurrect node ================="); if (_removedSet.size() == 0) { logger.warn("Cannot remove node because there are no nodes left to resurrect."); @@ -576,7 +602,7 @@ public ZNRecord resurrectSingleNode(String node) { _liveSet.add(node); return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) - .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, dataProvider); } private T getRandomSetElement(Set source) { @@ -618,18 +644,23 @@ public void testOrphansNotPreferred() { currentMapping.put(partition, new HashMap()); } + ResourceControllerDataProvider dataCache = + TestHelper.buildMockDataCache(RESOURCE_NAME, REPLICA_COUNT + "", "MasterSlave", + MasterSlaveSMD.build(), Collections.emptySet()); + // make sure that when the first node joins, a single replica is assigned fairly List partitions = ImmutableList.copyOf(PARTITIONS); LinkedHashMap stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); ZNRecord znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); Map> preferenceLists = znRecord.getListFields(); for (String partition : currentMapping.keySet()) { // make sure these are all MASTER List preferenceList = preferenceLists.get(partition); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + // Since there is only 1 node in the cluster, it constructs the whole preference list Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); } @@ -642,12 +673,15 @@ public void testOrphansNotPreferred() { } znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + // Since we have enough nodes to achieve the replica count, the preference list size should be + // equal to the replica count + Assert.assertEquals(preferenceList.size(), REPLICA_COUNT, + "invalid preference list for " + partition); Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for " + partition); Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for " @@ -660,7 +694,7 @@ public void testOrphansNotPreferred() { } znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); Set firstNodes = Sets.newHashSet(); for (String partition : currentMapping.keySet()) { @@ -682,13 +716,14 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); boolean newNodeUsed = false; for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), REPLICA_COUNT, + "invalid preference list for " + partition); if (preferenceList.contains(NODES[2])) { newNodeUsed = true; Assert.assertEquals(preferenceList.get(1), NODES[2], @@ -710,14 +745,15 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); firstNodes.clear(); Set secondNodes = Sets.newHashSet(); for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), REPLICA_COUNT, + "invalid preference list for " + partition); firstNodes.add(preferenceList.get(0)); secondNodes.add(preferenceList.get(1)); } @@ -738,12 +774,13 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), REPLICA_COUNT, + "invalid preference list for " + partition); Map stateMap = currentMapping.get(partition); for (String participant : stateMap.keySet()) { Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for " @@ -769,13 +806,14 @@ public void testOrphansNotPreferred() { currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); znRecord = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); firstNodes.clear(); for (String partition : currentMapping.keySet()) { List preferenceList = preferenceLists.get(partition); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); - Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), REPLICA_COUNT, + "invalid preference list for " + partition); firstNodes.add(preferenceList.get(0)); } Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); @@ -794,6 +832,9 @@ public void testOrphansNotPreferred() { for (int i = 0; i < nPartitions; i++) { partitions.add(Integer.toString(i)); } + ResourceControllerDataProvider dataCache = + TestHelper.buildMockDataCache(resourceName, nReplicas + "", DEFAULT_STATE_MODEL, + OnlineOfflineSMD.build(), Collections.emptySet()); LinkedHashMap states = new LinkedHashMap(2); states.put("OFFLINE", 0); @@ -801,7 +842,7 @@ public void testOrphansNotPreferred() { AutoRebalanceStrategy strategy = new AutoRebalanceStrategy(resourceName, partitions, states); ZNRecord znRecord = strategy.computePartitionAssignment(instanceNames, instanceNames, - new HashMap>(0), null); + new HashMap>(0), dataCache); for (List p : znRecord.getListFields().values()) { Assert.assertEquals(p.size(), nReplicas); @@ -837,9 +878,13 @@ public void testWontMoveSinglePartitionUnnecessarily() { upperBounds.put(state, STATE_MODEL.getNumInstancesPerState(state)); } + ResourceControllerDataProvider dataCache = + TestHelper.buildMockDataCache(RESOURCE, 1 + "", DEFAULT_STATE_MODEL, OnlineOfflineSMD.build(), + Collections.emptySet()); + ZNRecord znRecord = new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); Map> preferenceLists = znRecord.getListFields(); List preferenceList = preferenceLists.get(partition.toString()); Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); @@ -854,7 +899,7 @@ public void testWontMoveSinglePartitionUnnecessarily() { znRecord = new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE) - .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + .computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); preferenceLists = znRecord.getListFields(); preferenceList = preferenceLists.get(partition.toString()); @@ -866,4 +911,317 @@ public void testWontMoveSinglePartitionUnnecessarily() { // finally, make sure we haven't moved it. Assert.assertEquals(finalPreferredNode, otherNode); } + + @Test + public void testAutoRebalanceStrategyWorkWithDisabledInstances() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = {"resource_0", "resource_1", "resource_2"}; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + final int REPLICA_COUNT = 2; + final String[] NODES = {"n0", "n1"}; + + ResourceControllerDataProvider dataCache = TestHelper.buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + + // initial state, 2 nodes, no mapping + List allNodes = Lists.newArrayList(NODES[0], NODES[1]); + List liveNodes = Lists.newArrayList(NODES[0], NODES[1]); + Map> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap()); + } + + // make sure that when the first node joins, a single replica is assigned fairly + List partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + Map> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + } + + // now disable node 1, and make sure that it is not in the preference list + allNodes = new ArrayList<>(allNodes); + liveNodes = new ArrayList<>(liveNodes); + liveNodes.remove(NODES[0]); + for (String partition : PARTITIONS) { + Map idealStateMap = znRecord.getMapField(partition); + currentMapping.put(partition, idealStateMap); + } + + stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), 1); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + // make sure the master is transferred to the other node + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); + // Since node 0 is disabled, node 1 should be the only node in the preference list and it + // should be in the top state for every partition + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[1]), + "invalid preference list for " + partition); + Assert.assertEquals(znRecord.getMapField(partition).get(NODES[1]), STATE_MODEL.getTopState()); + } + } + + @Test + public void testRebalanceWithErrorPartition() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = {"resource_0", "resource_1", "resource_2"}; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + final String[] NODES = {"n0", "n1", "n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9"}; + + ResourceControllerDataProvider dataCache = TestHelper.buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + // initial state, 10 node, no mapping + List allNodes = Lists.newArrayList(NODES); + List liveNodes = Lists.newArrayList(NODES); + Map> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap()); + } + + // make sure that when nodes join, all partitions is assigned fairly + List partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), allNodes.size()); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + Map> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), allNodes.size(), + "invalid preference list for " + partition); + } + + // Suppose that one replica of partition 0 is in n0, and it has been in the ERROR state. + for (String partition : PARTITIONS) { + Map idealStateMap = znRecord.getMapField(partition); + currentMapping.put(partition, idealStateMap); + } + currentMapping.get(PARTITIONS[0]).put(NODES[0], "ERROR"); + + // Recalculate the ideal state, n0 shouldn't be dropped from the preference list. + stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), allNodes.size()); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + // make sure the size is equal to the number of all nodes + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), allNodes.size(), + "invalid preference list for " + partition); + // Even if n0 is in ERROR state, it should appear in the IDEAL state + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[0]), + "invalid preference list for " + partition); + Assert.assertTrue(znRecord.getMapField(partition).containsKey(NODES[0]), + "invalid ideal state mapping for " + partition); + } + + // now disable node 0, and make sure the dataCache provides it. And add another node n10 to the + // cluster. We want to make sure the n10 can pick up another replica of partition 0,1,2. + allNodes = new ArrayList<>(allNodes); + liveNodes = new ArrayList<>(liveNodes); + liveNodes.remove(NODES[0]); + allNodes.add("n10"); + liveNodes.add("n10"); + + dataCache = TestHelper.buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + + // Even though we had 11 nodes, we only have 10 nodes in the liveNodes list. So the state + // count map should have 10 entries instead of 11 when using ANY_LIVEINSTANCE . + stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), 10); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + // make sure the size is equal to the number of live nodes + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), liveNodes.size(), + "invalid preference list for " + partition); + // Since node 0 is disabled with ERROR state, it shouldn't appear in the IDEAL state + Assert.assertFalse(znRecord.getListField(partition).contains(NODES[0]), + "invalid preference list for " + partition); + Assert.assertFalse(znRecord.getMapField(partition).containsKey(NODES[0]), + "invalid ideal state mapping for " + partition); + } + } + + @Test + public void testAutoRebalanceStrategyWorkWithDisabledButActiveInstances() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = {"resource_0", "resource_1", "resource_2"}; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + final int REPLICA_COUNT = 2; + final String[] NODES = {"n0", "n1"}; + + ResourceControllerDataProvider dataCache = TestHelper.buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + Map liveInstances = new HashMap<>(); + liveInstances.put(NODES[0], new LiveInstance(NODES[0])); + liveInstances.put(NODES[1], new LiveInstance(NODES[1])); + when(dataCache.getLiveInstances()).thenReturn(liveInstances); + // initial state, 2 node, no mapping + List allNodes = Lists.newArrayList(NODES[0], NODES[1]); + List liveNodes = Lists.newArrayList(NODES[0], NODES[1]); + Map> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap()); + } + + // make sure that when nodes join, all partitions is assigned fairly + List partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + Map> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + } + + // now disable node 0, and make sure the dataCache provides it + for (String partition : PARTITIONS) { + Map idealStateMap = znRecord.getMapField(partition); + currentMapping.put(partition, idealStateMap); + } + dataCache = TestHelper.buildMockDataCache(RESOURCE_NAME, + ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.toString(), "LeaderStandby", + STATE_MODEL, Sets.newHashSet(NODES[0])); + liveInstances.put(NODES[0], new LiveInstance(NODES[0])); + liveInstances.put(NODES[1], new LiveInstance(NODES[1])); + when(dataCache.getLiveInstances()).thenReturn(liveInstances); + + stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), 2); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount).computePartitionAssignment( + allNodes, liveNodes, currentMapping, dataCache); + preferenceLists = znRecord.getListFields(); + + for (String partition : currentMapping.keySet()) { + // make sure the size is equal to the number of active nodes + List preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[1]), + "invalid preference list for " + partition); + Assert.assertTrue(znRecord.getListField(partition).contains(NODES[0]), + "invalid preference list for " + partition); + } + + // Genera the new ideal state + IdealState currentIdealState = dataCache.getIdealState(RESOURCE_NAME); + IdealState newIdealState = new IdealState(RESOURCE_NAME); + newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields()); + newIdealState.setRebalanceMode(currentIdealState.getRebalanceMode()); + newIdealState.getRecord().setListFields(znRecord.getListFields()); + + // Mimic how the Rebalancer would react to the new ideal state and update the current mapping + Resource resource = new Resource(RESOURCE_NAME); + for (String partition : PARTITIONS) { + resource.addPartition(partition); + } + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + currentStateOutput.setCurrentState(RESOURCE_NAME, resource.getPartition(PARTITIONS[0]), NODES[0], "LEADER"); + currentStateOutput.setCurrentState(RESOURCE_NAME, resource.getPartition(PARTITIONS[0]), NODES[1], "STANDBY"); + currentStateOutput.setCurrentState(RESOURCE_NAME, resource.getPartition(PARTITIONS[1]), NODES[0], "STANDBY"); + currentStateOutput.setCurrentState(RESOURCE_NAME, resource.getPartition(PARTITIONS[1]), NODES[1], "LEADER"); + currentStateOutput.setCurrentState(RESOURCE_NAME, resource.getPartition(PARTITIONS[2]), NODES[0], "LEADER"); + currentStateOutput.setCurrentState(RESOURCE_NAME, resource.getPartition(PARTITIONS[2]), NODES[1], "STANDBY"); + + DelayedAutoRebalancer autoRebalancer = new DelayedAutoRebalancer(); + ResourceAssignment assignment = autoRebalancer.computeBestPossiblePartitionState(dataCache, newIdealState, resource, + currentStateOutput); + + // Assert that the new assignment will move the node 0 as the OFFLINE state. And the node 1 as + // the top state LEADER. + for (String partition : PARTITIONS) { + Assert.assertEquals(assignment.getReplicaMap(resource.getPartition(partition)).get(NODES[0]), "OFFLINE"); + Assert.assertEquals(assignment.getReplicaMap(resource.getPartition(partition)).get(NODES[1]), "LEADER"); + } + } + + @Test + public void testSlowlyBootstrapping() { + // Resource setup + final String RESOURCE_NAME = "resource"; + final int PARTITIONS = 100; + final int NUM_NODES = 5; + final StateModelDefinition STATE_MODEL = LeaderStandbySMD.build(); + ArrayList partitions = new ArrayList(); + for (int i = 0; i < PARTITIONS; i++) { + partitions.add("resource_" + i); + } + ArrayList allNodes = new ArrayList(); + ArrayList liveNodes = new ArrayList(); + for (int i = 0; i < NUM_NODES; i++) { + allNodes.add("node-" + i); + } + + ResourceControllerDataProvider dataCache = TestHelper.buildMockDataCache(RESOURCE_NAME, + "1", "LeaderStandby", + STATE_MODEL, Collections.emptySet()); + // initial state, 10 node, no mapping + Map> currentMapping = Maps.newHashMap(); + for (String partition : partitions) { + currentMapping.put(partition, new HashMap()); + } + + // Run rebalance with 5 nodes, 1 live instances + liveNodes.add(allNodes.get(0)); + LinkedHashMap stateCount = + STATE_MODEL.getStateCountMap(liveNodes.size(), 1); + RebalanceStrategy strategy = new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount, 25); + ZNRecord znRecord = strategy.computePartitionAssignment(allNodes, liveNodes, currentMapping, + dataCache); + + // Suppose that we could only bootstrap a portion of the ideal state replicas and update the + // current state mapping + int i = 0; + for (String partition : partitions) { + List preferenceList = znRecord.getListField(partition); + if (!preferenceList.isEmpty()) { + if (i % 2 == 0) { + currentMapping.get(partition).put(preferenceList.get(0), "LEADER"); + } + } + i++; + } + + // The result of the assignment should be the same as the previous assignment + int countOfNonEmptyPreferenceList = 0; + ZNRecord newRecord = strategy.computePartitionAssignment(allNodes, liveNodes, currentMapping, dataCache); + for (String partition : partitions) { + List preferenceList = newRecord.getListField(partition); + Assert.assertEquals(newRecord.getMapField(partition), znRecord.getMapField(partition), + "The partition " + partition + " should have the same ideal state mapping"); + if (!preferenceList.isEmpty()) { + countOfNonEmptyPreferenceList++; + } + } + // The number of non-empty preference list should be 25 because we set the MAX_PARTITION_PER_NODE = 25 + Assert.assertEquals(countOfNonEmptyPreferenceList, 25); + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java index f8f0b90605..cd34d91f7f 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategyImbalanceAssignment.java @@ -20,11 +20,14 @@ */ import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.helix.TestHelper; +import org.apache.helix.model.OnlineOfflineSMD; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; @@ -68,7 +71,9 @@ private void testAssignment(int nPartitions, int nReplicas, int nNode) { AutoRebalanceStrategy strategy = new AutoRebalanceStrategy(resourceName, partitions, states); ZNRecord record = strategy.computePartitionAssignment(instanceNames, instanceNames, - new HashMap>(0), new ResourceControllerDataProvider()); + new HashMap>(0), + TestHelper.buildMockDataCache(resourceName, nReplicas + "", "OnlineOffline", + OnlineOfflineSMD.build(), Collections.emptySet())); for (Map stateMapping : record.getMapFields().values()) { Assert.assertEquals(stateMapping.size(), nReplicas); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java index 3c389ac3a8..091da7facd 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAutoRebalance.java @@ -164,7 +164,7 @@ public void testDropResourceAutoRebalance() throws Exception { ZK_ADDR); } - @Test() + @Test(dependsOnMethods = "testDropResourceAutoRebalance") public void testAutoRebalance() throws Exception { // kill 1 node _participants[0].syncStop();