Skip to content

Commit

Permalink
Follow up change: NPE in IntermediateStateCalc (apache#2673)
Browse files Browse the repository at this point in the history
Follow up change to NPE in intermediate stage, we should not skip message throttling in case of missing partition's preference list.
  • Loading branch information
desaikomal authored and zpinto committed Nov 8, 2023
1 parent 4f827a8 commit 55edb48
Showing 1 changed file with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,11 +367,11 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
if (preferenceList == null || preferenceList.size() == 0) {
continue;
}
Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
if (preferenceList != null && !preferenceList.isEmpty()) {
// Sort messages based on the priority (priority is defined in the state model definition
messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
}
for (Message message : messagesToThrottle) {
RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredState, message, derivedCurrentStateMap);
Expand Down Expand Up @@ -470,10 +470,6 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
// To clarify that custom mode does not apply recovery/load rebalance since user can define different number of
// replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource
// is not FULL_AUTO, we will return best possible state and do nothing.
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
if (preferenceList == null) {
continue;
}
Map<String, Integer> requiredStates =
getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
// Maps instance to its current state
Expand All @@ -482,8 +478,11 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
// Maps instance to its pending (next) state
List<Message> pendingMessages = new ArrayList<>(
currentStateOutput.getPendingMessageMap(resourceName, partition).values());
pendingMessages.sort(new MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
stateModelDefinition.getStatePriorityMap()));
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
if (preferenceList != null && !preferenceList.isEmpty()) {
pendingMessages.sort(new MessagePriorityComparator(preferenceList,
stateModelDefinition.getStatePriorityMap()));
}

for (Message message : pendingMessages) {
StateTransitionThrottleConfig.RebalanceType rebalanceType =
Expand Down

0 comments on commit 55edb48

Please sign in to comment.