Skip to content

Commit

Permalink
[apache/helix] -- Added detail in the Exception message for WAGED reb…
Browse files Browse the repository at this point in the history
…alance (hard constraint) failures.
  • Loading branch information
himanshukandwal committed Jul 16, 2024
1 parent 613abbc commit e77bf3a
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,11 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final Map<SoftConstraint, Float> _softConstraints;
private final Set<String> logEnabledClusters = new HashSet<>();

ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;

for (HardConstraint constraint : hardConstraints) {
constraint.setLogEnabledClusters(logEnabledClusters);
}
}

@Override
Expand Down Expand Up @@ -144,14 +139,14 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl

if (candidateNodes.isEmpty()) {
LOG.info("Found no eligible candidate nodes. Enabling hard constraint level logging for cluster: {}", clusterContext.getClusterName());
enableFullLoggingForCluster(clusterContext.getClusterName());
enableFullLoggingForCluster();
optimalAssignment.recordAssignmentFailure(replica,
Maps.transformValues(hardConstraintFailures, this::convertFailureReasons));
return Optional.empty();
}

LOG.info("Disabling hard constraint level logging for cluster: {}", clusterContext.getClusterName());
removeClusterFromFullLogging(clusterContext.getClusterName());
removeFullLoggingForCluster();

return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node,
getAssignmentNormalizedScore(node, replica, clusterContext)))
Expand Down Expand Up @@ -192,24 +187,21 @@ private List<String> convertFailureReasons(List<HardConstraint> hardConstraints)
}

/**
* Adds cluster name for full logging in all hard constraints
* @param clusterName cluster to be added
* Enables logging of failures in all hard constraints
*/
private void enableFullLoggingForCluster(String clusterName) {
logEnabledClusters.add(clusterName);
private void enableFullLoggingForCluster() {
for (HardConstraint hardConstraint : _hardConstraints) {
hardConstraint.setEnableLogging(true);
}
}

/**
* Removes the cluster from full logging in all hard constraints (if added previously)
* @param clusterName cluster to be removed
*/
private void removeClusterFromFullLogging(String clusterName) {
logEnabledClusters.remove(clusterName);
}

@VisibleForTesting
Set<String> getLogEnabledClusters() {
return logEnabledClusters;
private void removeFullLoggingForCluster() {
for (HardConstraint hardConstraint : _hardConstraints) {
hardConstraint.setEnableLogging(false);
}
}

private static class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
clusterContext.getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone());

if (partitionsForResourceAndFaultZone.contains(replica.getPartitionName())) {
if (isLoggingEnabled(clusterContext.getClusterName())) {
if (enableLogging) {
LOG.info("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
* under the License.
*/

import java.util.Set;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
Expand All @@ -31,7 +29,7 @@
*/
abstract class HardConstraint {

protected Set<String> logEnabledClusters;
protected boolean enableLogging = false;

/**
* Check if the replica could be assigned to the node
Expand All @@ -50,19 +48,10 @@ String getDescription() {
}

/**
* Check if the logging is enabled for the replica
* @param clusterName The name of the cluster to be checked
*/
public boolean isLoggingEnabled(String clusterName) {
return logEnabledClusters != null && logEnabledClusters.contains(clusterName);
}

/**
* Set the reference of the replicas that need to be logged.
* @param logEnabledClusters The clusters that need to be logged
* Sets the flag to enable constraint level logging
*/
public void setLogEnabledClusters(Set<String> logEnabledClusters) {
this.logEnabledClusters = logEnabledClusters;
public void setEnableLogging(boolean enableLogging) {
this.enableLogging = enableLogging;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
for (String key : replicaCapacity.keySet()) {
if (nodeCapacity.containsKey(key)) {
if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
if (isLoggingEnabled(clusterContext.getClusterName())) {
if (enableLogging) {
LOG.info("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
|| assignedPartitionsByResourceSize < resourceMaxPartitionsPerInstance;

if (!exceedResourceMaxPartitionLimit) {
if (isLoggingEnabled(clusterContext.getClusterName())) {
if (enableLogging) {
LOG.info("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
if (isLoggingEnabled(clusterContext.getClusterName())) {
if (enableLogging) {
LOG.info("Cannot assign the inactive replica: {}", replica.getPartitionName());
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
Set<String> assignedPartitionsByResource = node.getAssignedPartitionsByResource(replica.getResourceName());

if (assignedPartitionsByResource.contains(replica.getPartitionName())) {
if (isLoggingEnabled(clusterContext.getClusterName())) {
if (enableLogging) {
LOG.info("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
}

if (!node.getInstanceTags().contains(replica.getResourceInstanceGroupTag())) {
if (isLoggingEnabled(clusterContext.getClusterName())) {
if (enableLogging) {
LOG.info("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -60,8 +62,7 @@ public void testCalculateNoValidAssignment() throws IOException {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

Assert.assertEquals(algorithm.getLogEnabledClusters().size(), 1);
Assert.assertTrue(algorithm.getLogEnabledClusters().contains(clusterModel.getContext().getClusterName()));
verify(mockHardConstraint, times(1)).setEnableLogging(eq(true));
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());
}

Expand All @@ -83,13 +84,12 @@ public void testCalculateNoValidAssignmentFirstAndThenRecovery() throws IOExcept
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}

Assert.assertEquals(algorithm.getLogEnabledClusters().size(), 1);
Assert.assertTrue(algorithm.getLogEnabledClusters().contains(clusterModel.getContext().getClusterName()));
verify(mockHardConstraint, times(1)).setEnableLogging(eq(true));
verify(mockHardConstraint, times(1)).isAssignmentValid(any(), any(), any());

// calling again for recovery (no exception)
algorithm.calculate(clusterModel);
Assert.assertEquals(algorithm.getLogEnabledClusters().size(), 0);
verify(mockHardConstraint, atLeastOnce()).setEnableLogging(eq(false));
}

@Test
Expand Down

0 comments on commit e77bf3a

Please sign in to comment.