Skip to content

Commit

Permalink
[apache/helix] -- Fixes #2638, Improve Hard Constraint Failure Debugg…
Browse files Browse the repository at this point in the history
…ability by adding details in the error message (#2639)

n this PR, we are logging the specific details of the hard constraint validation failures, that improvs the issue debuggability.
  • Loading branch information
himanshukandwal authored Oct 20, 2023
1 parent 937d506 commit 4738f6d
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,34 @@
* under the License.
*/

import java.util.Set;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class FaultZoneAwareConstraint extends HardConstraint {

private static final Logger LOG = LoggerFactory.getLogger(FaultZoneAwareConstraint.class);

@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
if (!node.hasFaultZone()) {
return true;
}
return !clusterContext
.getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone())
.contains(replica.getPartitionName());

Set<String> partitionsForResourceAndFaultZone =
clusterContext.getPartitionsForResourceAndFaultZone(replica.getResourceName(), node.getFaultZone());

if (partitionsForResourceAndFaultZone.contains(replica.getPartitionName())) {
LOG.debug("A fault zone cannot contain more than 1 replica of same partition. Found replica for partition: {}",
replica.getPartitionName());
return false;
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
*/

import java.util.Map;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class NodeCapacityConstraint extends HardConstraint {

private static final Logger LOG = LoggerFactory.getLogger(NodeCapacityConstraint.class);

@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
Expand All @@ -36,6 +40,8 @@ boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
for (String key : replicaCapacity.keySet()) {
if (nodeCapacity.containsKey(key)) {
if (nodeCapacity.get(key) < replicaCapacity.get(key)) {
LOG.debug("Node has insufficient capacity for: {}. Left available: {}, Required: {}",
key, nodeCapacity.get(key), replicaCapacity.get(key));
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,37 @@
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class NodeMaxPartitionLimitConstraint extends HardConstraint {

private static final Logger LOG = LoggerFactory.getLogger(NodeMaxPartitionLimitConstraint.class);

@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
boolean exceedMaxPartitionLimit =
node.getMaxPartition() < 0 || node.getAssignedReplicaCount() < node.getMaxPartition();
boolean exceedResourceMaxPartitionLimit = replica.getResourceMaxPartitionsPerInstance() < 0
|| node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
.getResourceMaxPartitionsPerInstance();
return exceedMaxPartitionLimit && exceedResourceMaxPartitionLimit;

if (!exceedMaxPartitionLimit) {
LOG.debug("Cannot exceed the max number of partitions ({}) limitation on node. Assigned replica count: {}",
node.getMaxPartition(), node.getAssignedReplicaCount());
return false;
}

int resourceMaxPartitionsPerInstance = replica.getResourceMaxPartitionsPerInstance();
int assignedPartitionsByResourceSize = node.getAssignedPartitionsByResource(replica.getResourceName()).size();
boolean exceedResourceMaxPartitionLimit = resourceMaxPartitionsPerInstance < 0
|| assignedPartitionsByResourceSize < resourceMaxPartitionsPerInstance;

if (!exceedResourceMaxPartitionLimit) {
LOG.debug("Cannot exceed the max number of partitions per resource ({}) limitation on node. Assigned replica count: {}",
resourceMaxPartitionsPerInstance, assignedPartitionsByResourceSize);
return false;
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class ReplicaActivateConstraint extends HardConstraint {

private static final Logger LOG = LoggerFactory.getLogger(ReplicaActivateConstraint.class);

@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
List<String> disabledPartitions =
node.getDisabledPartitionsMap().get(replica.getResourceName());
return disabledPartitions == null || !disabledPartitions.contains(replica.getPartitionName());
List<String> disabledPartitions = node.getDisabledPartitionsMap().get(replica.getResourceName());

if (disabledPartitions != null && disabledPartitions.contains(replica.getPartitionName())) {
LOG.debug("Cannot assign the inactive replica: {}", replica.getPartitionName());
return false;
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@
* under the License.
*/

import java.util.Set;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class SamePartitionOnInstanceConstraint extends HardConstraint {

private static final Logger LOG = LoggerFactory.getLogger(SamePartitionOnInstanceConstraint.class);

@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
return !node.getAssignedPartitionsByResource(replica.getResourceName())
.contains(replica.getPartitionName());
Set<String> assignedPartitionsByResource = node.getAssignedPartitionsByResource(replica.getResourceName());

if (assignedPartitionsByResource.contains(replica.getPartitionName())) {
LOG.debug("Same partition ({}) of different states cannot co-exist in one instance", replica.getPartitionName());
return false;
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,26 @@
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


class ValidGroupTagConstraint extends HardConstraint {

private static final Logger LOG = LoggerFactory.getLogger(SamePartitionOnInstanceConstraint.class);

@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
if (!replica.hasResourceInstanceGroupTag()) {
return true;
}

return node.getInstanceTags().contains(replica.getResourceInstanceGroupTag());
if (!node.getInstanceTags().contains(replica.getResourceInstanceGroupTag())) {
LOG.debug("Instance doesn't have the tag of the replica ({})", replica.getResourceInstanceGroupTag());
return false;
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
* under the License.
*/

import java.io.IOException;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelTestHelper;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
Expand Down Expand Up @@ -118,4 +121,26 @@ public void testSortingEarlyQuitLackCapacity() throws IOException, HelixRebalanc
"The cluster does not have enough item1 capacity for all partitions. Failure Type: FAILED_TO_CALCULATE");
}
}

@Test
public void testCalculateWithInvalidAssignmentForNodeCapacity() throws IOException {
HardConstraint nodeCapacityConstraint = new NodeCapacityConstraint();
SoftConstraint soft1 = new MaxCapacityUsageInstanceConstraint();
SoftConstraint soft2 = new InstancePartitionsCountConstraint();
ConstraintBasedAlgorithm algorithm =
new ConstraintBasedAlgorithm(ImmutableList.of(nodeCapacityConstraint),
ImmutableMap.of(soft1, 1f, soft2, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getMultiNodeClusterModel();
// increase the ask capacity of item 3, which will trigger the capacity constraint to fail.
Map<String, Set<AssignableReplica>> assignableReplicaMap = new HashMap<>(clusterModel.getAssignableReplicaMap());
Set<AssignableReplica> resourceAssignableReplicas = assignableReplicaMap.get("Resource3");
AssignableReplica replica = resourceAssignableReplicas.iterator().next();
replica.getCapacity().put("item3", 40); // available: 30, requested: 40.

try {
algorithm.calculate(clusterModel);
} catch (HelixRebalanceException ex) {
Assert.assertEquals(ex.getFailureType(), HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
* under the License.
*/

import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import java.util.Collections;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
Expand All @@ -31,7 +30,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableSet;
import static org.mockito.Mockito.*;

public class TestFaultZoneAwareConstraint {
private static final String TEST_PARTITION = "testPartition";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package org.apache.helix.controller.rebalancer.waged.constraints;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


public class TestReplicaActivateConstraint {

private static final String TEST_RESOURCE = "testResource";
private static final String TEST_PARTITION = "testPartition";

private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);

private final HardConstraint _faultZoneAwareConstraint = new ReplicaActivateConstraint();

@Test
public void validWhenEmptyDisabledReplicaMap() {
Map<String, List<String>> disabledReplicaMap = new HashMap<>();
disabledReplicaMap.put(TEST_RESOURCE, new ArrayList<>());

when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
when(_testNode.getDisabledPartitionsMap()).thenReturn(disabledReplicaMap);

Assert.assertTrue(_faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
}

@Test
public void invalidWhenPartitionIsDisabled() {
Map<String, List<String>> disabledReplicaMap = new HashMap<>();
disabledReplicaMap.put(TEST_RESOURCE, Collections.singletonList(TEST_PARTITION));

when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE);
when(_testReplica.getPartitionName()).thenReturn(TEST_PARTITION);
when(_testNode.getDisabledPartitionsMap()).thenReturn(disabledReplicaMap);

Assert.assertFalse(_faultZoneAwareConstraint.isAssignmentValid(_testNode, _testReplica, _clusterContext));
}

}

0 comments on commit 4738f6d

Please sign in to comment.