Skip to content

Commit

Permalink
filter on operation isnatcne for min replica cout in waged
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 19, 2023
1 parent 7c312ca commit 94f6a8d
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand Down Expand Up @@ -395,7 +396,7 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
Map<String, ResourceAssignment> currentResourceAssignment,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
final Set<String> enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
return currentResourceAssignment;
Expand Down Expand Up @@ -424,6 +425,14 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
}
}

private static Set<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toSet());
}

/**
* Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
* and addressing for partitions failing to meet minActiveReplicas.
Expand Down Expand Up @@ -608,7 +617,8 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
Set<String> enabledLiveInstances =
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
Expand Down

0 comments on commit 94f6a8d

Please sign in to comment.