diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index 00c49ad918..ec17b620c9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -367,6 +367,9 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); List preferenceList = preferenceLists.get(partition.getPartitionName()); + if (preferenceList == null || preferenceList.size() == 0) { + continue; + } Map requiredState = getRequiredStates(resourceName, cache, preferenceList); messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap())); for (Message message : messagesToThrottle) { @@ -467,6 +470,10 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre // To clarify that custom mode does not apply recovery/load rebalance since user can define different number of // replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource // is not FULL_AUTO, we will return best possible state and do nothing. + List preferenceList = preferenceLists.get(partition.getPartitionName()); + if (preferenceList == null) { + continue; + } Map requiredStates = getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName())); // Maps instance to its current state @@ -643,15 +650,18 @@ private Map getRequiredStates(String resourceName, StateModelDefinition stateModelDefinition = resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef()); int requiredNumReplica = - idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size()) + idealState.getMinActiveReplicas() == -1 ? + idealState.getReplicaCount(preferenceList == null ? 0 : preferenceList.size()) : idealState.getMinActiveReplicas(); // Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition // preference list - return stateModelDefinition.getStateCountMap( - (int) preferenceList.stream() - .filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i)) - .count(), requiredNumReplica); // StateModelDefinition's counts + if (preferenceList != null) { + return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i)) + .count(), requiredNumReplica); // StateModelDefinition's counts + } + return stateModelDefinition.getStateCountMap(resourceControllerDataProvider.getEnabledLiveInstances().size(), + requiredNumReplica); // StateModelDefinition's counts } /** diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java index b651e0d283..60281e65b8 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java @@ -406,6 +406,168 @@ public void testThrottleByErrorPartition() { } } + @Test + public void testPartitionMissing() { + String resourcePrefix = "resource"; + int nResource = 4; + int nPartition = 2; + int nReplica = 3; + + String[] resources = new String[nResource]; + for (int i = 0; i < nResource; i++) { + resources[i] = resourcePrefix + "_" + i; + } + + preSetup(resources, nReplica, nReplica); + event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline")); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), + getResourceMap(resources, nPartition, "OnlineOffline")); + + // Initialize bestpossible state and current state + BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + MessageOutput messageSelectOutput = new MessageOutput(); + IntermediateStateOutput expectedResult = new IntermediateStateOutput(); + + _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1); + setClusterConfig(_clusterConfig); + + for (String resource : resources) { + IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource)); + setSingleIdealState(is); + + Map> partitionMap = new HashMap<>(); + for (int p = 0; p < nPartition; p++) { + Partition partition = new Partition(resource + "_" + p); + for (int r = 0; r < nReplica; r++) { + String instanceName = HOSTNAME_PREFIX + r; + + // PartitionMap is used as a preferenceList. + // For the last partition, let us add null as preferenceList. + if (p != nPartition - 1) { + partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName)); + } else { + partitionMap.put(partition.getPartitionName(), null); + } + + // TODO: The following code is same for testNoStateMissing + if (resource.endsWith("0")) { + // Regular recovery balance + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + // add blocked state transition messages + Message pendingMessage = generateMessage("OFFLINE", "ONLINE", instanceName); + currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage); + + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + + // should be recovered: + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else if (resource.endsWith("1")) { + // Regular load balance + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "DROPPED", instanceName + "-1")); + // should be recovered: + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else if (resource.endsWith("2")) { + // Recovery balance with transient states, should keep the current states in the output. + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "OFFLINE"); + // should be kept unchanged: + expectedResult.setState(resource, partition, instanceName, "OFFLINE"); + } else if (resource.endsWith("3")) { + // One unresolved error should not prevent recovery balance + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + if (p == 0) { + if (r == 0) { + currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR"); + // This partition is still ERROR + expectedResult.setState(resource, partition, instanceName, "ERROR"); + } else { + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); + // Recovery balance + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } + } else { + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); + // load balance is throttled, so keep all current states + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "DROPPED", instanceName + "-1")); + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + // The following must be removed because now downward state transitions are allowed + // expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE"); + } + } else if (resource.endsWith("4")) { + // Test that partitions with replicas to drop are dropping them when recovery is + // happening for other partitions + if (p == 0) { + // This partition requires recovery + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); + // After recovery, it should be back ONLINE + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else { + // Other partitions require dropping of replicas + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); + // BestPossibleState dictates that we only need one ONLINE replica + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "DROPPED", instanceName + "-1")); + // So instanceName-1 will NOT be expected to show up in expectedResult + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + expectedResult.setState(resource, partition, instanceName + "-1", "DROPPED"); + } + } else if (resource.endsWith("5")) { + // Test that load balance bringing up a new replica does NOT happen with a recovery + // partition + if (p == 0) { + // Set up a partition requiring recovery + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); + // After recovery, it should be back ONLINE + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else { + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + // Check that load balance (bringing up a new node) did not take place + bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "ONLINE", instanceName + "-1")); + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } + } + } + } + bestPossibleStateOutput.setPreferenceLists(resource, partitionMap); + } + + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); + runStage(event, new ReadClusterDataStage()); + runStage(event, new IntermediateStateCalcStage()); + + IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + + for (String resource : resources) { + // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare + // anything. + Assert.assertTrue(output.getPartitionStateMap(resource) + .getStateMap() + .equals(expectedResult.getPartitionStateMap(resource).getStateMap())); + } + } + private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) { setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");