Skip to content

Commit

Permalink
Fix NPE in intermediate state calculation stage (#2668)
Browse files Browse the repository at this point in the history
Fix the NPE in intermediate state calcuation stage when a partition is deleted through update in NUM_PARTITION field
  • Loading branch information
desaikomal authored Oct 18, 2023
1 parent 5f1a3f7 commit 4f19dad
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,9 @@ private PartitionStateMap computeIntermediatePartitionState(ResourceControllerDa
currentStateOutput.getCurrentStateMap(resourceName, partition).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
if (preferenceList == null || preferenceList.size() == 0) {
continue;
}
Map<String, Integer> requiredState = getRequiredStates(resourceName, cache, preferenceList);
messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, stateModelDef.getStatePriorityMap()));
for (Message message : messagesToThrottle) {
Expand Down Expand Up @@ -467,6 +470,10 @@ private void chargePendingTransition(Resource resource, CurrentStateOutput curre
// To clarify that custom mode does not apply recovery/load rebalance since user can define different number of
// replicas for different partitions. Actually, the custom will stopped from resource level checks if this resource
// is not FULL_AUTO, we will return best possible state and do nothing.
List<String> preferenceList = preferenceLists.get(partition.getPartitionName());
if (preferenceList == null) {
continue;
}
Map<String, Integer> requiredStates =
getRequiredStates(resourceName, cache, preferenceLists.get(partition.getPartitionName()));
// Maps instance to its current state
Expand Down Expand Up @@ -643,15 +650,18 @@ private Map<String, Integer> getRequiredStates(String resourceName,
StateModelDefinition stateModelDefinition =
resourceControllerDataProvider.getStateModelDef(idealState.getStateModelDefRef());
int requiredNumReplica =
idealState.getMinActiveReplicas() == -1 ? idealState.getReplicaCount(preferenceList.size())
idealState.getMinActiveReplicas() == -1 ?
idealState.getReplicaCount(preferenceList == null ? 0 : preferenceList.size())
: idealState.getMinActiveReplicas();

// Generate a state mapping, state -> required numbers based on the live and enabled instances for this partition
// preference list
return stateModelDefinition.getStateCountMap(
(int) preferenceList.stream()
.filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
.count(), requiredNumReplica); // StateModelDefinition's counts
if (preferenceList != null) {
return stateModelDefinition.getStateCountMap((int) preferenceList.stream().filter(i -> resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
.count(), requiredNumReplica); // StateModelDefinition's counts
}
return stateModelDefinition.getStateCountMap(resourceControllerDataProvider.getEnabledLiveInstances().size(),
requiredNumReplica); // StateModelDefinition's counts
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,168 @@ public void testThrottleByErrorPartition() {
}
}

@Test
public void testPartitionMissing() {
String resourcePrefix = "resource";
int nResource = 4;
int nPartition = 2;
int nReplica = 3;

String[] resources = new String[nResource];
for (int i = 0; i < nResource; i++) {
resources[i] = resourcePrefix + "_" + i;
}

preSetup(resources, nReplica, nReplica);
event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline"));
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));

// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
MessageOutput messageSelectOutput = new MessageOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();

_clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
setClusterConfig(_clusterConfig);

for (String resource : resources) {
IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource));
setSingleIdealState(is);

Map<String, List<String>> partitionMap = new HashMap<>();
for (int p = 0; p < nPartition; p++) {
Partition partition = new Partition(resource + "_" + p);
for (int r = 0; r < nReplica; r++) {
String instanceName = HOSTNAME_PREFIX + r;

// PartitionMap is used as a preferenceList.
// For the last partition, let us add null as preferenceList.
if (p != nPartition - 1) {
partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName));
} else {
partitionMap.put(partition.getPartitionName(), null);
}

// TODO: The following code is same for testNoStateMissing
if (resource.endsWith("0")) {
// Regular recovery balance
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
// add blocked state transition messages
Message pendingMessage = generateMessage("OFFLINE", "ONLINE", instanceName);
currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage);

bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");

// should be recovered:
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else if (resource.endsWith("1")) {
// Regular load balance
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
// should be recovered:
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else if (resource.endsWith("2")) {
// Recovery balance with transient states, should keep the current states in the output.
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "OFFLINE");
// should be kept unchanged:
expectedResult.setState(resource, partition, instanceName, "OFFLINE");
} else if (resource.endsWith("3")) {
// One unresolved error should not prevent recovery balance
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
if (p == 0) {
if (r == 0) {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ERROR");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ERROR");
// This partition is still ERROR
expectedResult.setState(resource, partition, instanceName, "ERROR");
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// Recovery balance
expectedResult.setState(resource, partition, instanceName, "ONLINE");
}
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
// load balance is throttled, so keep all current states
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName, "ONLINE");
// The following must be removed because now downward state transitions are allowed
// expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE");
}
} else if (resource.endsWith("4")) {
// Test that partitions with replicas to drop are dropping them when recovery is
// happening for other partitions
if (p == 0) {
// This partition requires recovery
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else {
// Other partitions require dropping of replicas
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE");
// BestPossibleState dictates that we only need one ONLINE replica
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
// So instanceName-1 will NOT be expected to show up in expectedResult
expectedResult.setState(resource, partition, instanceName, "ONLINE");
expectedResult.setState(resource, partition, instanceName + "-1", "DROPPED");
}
} else if (resource.endsWith("5")) {
// Test that load balance bringing up a new replica does NOT happen with a recovery
// partition
if (p == 0) {
// Set up a partition requiring recovery
currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName, "ONLINE");
} else {
currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE");
// Check that load balance (bringing up a new node) did not take place
bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE");
messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName, "ONLINE");
}
}
}
}
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
}

event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput);
event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());

IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());

for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare
// anything.
Assert.assertTrue(output.getPartitionStateMap(resource)
.getStateMap()
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
}
}

private void preSetup(String[] resources, int numOfLiveInstances, int numOfReplicas) {
setupIdealState(numOfLiveInstances, resources, numOfLiveInstances, numOfReplicas,
IdealState.RebalanceMode.FULL_AUTO, "OnlineOffline");
Expand Down

0 comments on commit 4f19dad

Please sign in to comment.