diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index ec17b620c9b..477e4f99bf6 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -367,11 +367,11 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); List preferenceList = preferenceLists.get(partition.getPartitionName()); - if (preferenceList == null || preferenceList.size() == 0) { - continue; - } Map requiredState = getRequiredStates(resourceName, cache, preferenceList); - messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap())); + if (preferenceList != null && !preferenceList.isEmpty()) { + // Sort messages based on the priority (priority is defined in the state model definition + messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap())); + } for (Message message : messagesToThrottle) { RebalanceType rebalanceType = getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap); @@ -470,10 +470,6 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre // To clarify that custom mode does not apply recovery/load rebalance since user can define different number of // replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource // is not FULL_AUTO, we will return best possible state and do nothing. - List preferenceList = preferenceLists.get(partition.getPartitionName()); - if (preferenceList == null) { - continue; - } Map requiredStates = getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName())); // Maps instance to its current state @@ -482,8 +478,11 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre // Maps instance to its pending (next) state List pendingMessages = new ArrayList<>( currentStateOutput.getPendingMessageMap(resourceName, partition).values()); - pendingMessages.sort(new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()), - stateModelDefinition.getStatePriorityMap())); + List preferenceList = preferenceLists.get(partition.getPartitionName()); + if (preferenceList != null && !preferenceList.isEmpty()) { + pendingMessages.sort(new MessagePriorityComparator(preferenceList, + stateModelDefinition.getStatePriorityMap())); + } for (Message message : pendingMessages) { StateTransitionThrottleConfig.RebalanceType rebalanceType =