Skip to content

Commit

Permalink
Change stoppable to perform min active check sequentially in mz
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Aug 15, 2024
1 parent e1799b4 commit fcb9d83
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;

import com.google.common.collect.ImmutableList;
import java.util.stream.Collectors;


public enum HealthCheck {
Expand Down Expand Up @@ -62,9 +63,12 @@ public enum HealthCheck {
MIN_ACTIVE_REPLICA_CHECK_FAILED;

/**
* Pre-defined list of checks to test if an instance can be stopped at runtime
* Pre-defined list of checks to test if an instance can be stopped at runtime. Excludes MIN_ACTIVE_REPLICA_CHECK as
* that is performed separately.
*/
public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.asList(HealthCheck.values());
public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.stream(HealthCheck.values())
.filter(healthCheck -> healthCheck != HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED)
.collect(Collectors.toList());
/**
* Pre-defined list of checks to test if an instance is in healthy running state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,15 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {

// Perform all but min_active replicas check in parallel
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));

// Perform min_active replicas check sequentially
addInstanceMinActiveReplicaCheck(helixInstanceChecks, toBeStoppedInstances);

// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -787,10 +793,6 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
healthStatus.put(HealthCheck.EMPTY_RESOURCE_ASSIGNMENT.name(),
InstanceValidationUtil.isResourceAssigned(_dataAccessor, instanceName));
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances));
break;
default:
LOG.error("Unsupported health check: {}", healthCheck);
break;
Expand All @@ -800,6 +802,35 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
return healthStatus;
}

// Adds the result of the min_active replica check for each stoppable check passed in futureStoppableCheckByInstance
private void addInstanceMinActiveReplicaCheck(Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
Set<String> toBeStoppedInstances) {
Set<String> possibleToStopInstances = new HashSet<>(toBeStoppedInstances);

for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableCheckByInstance.entrySet()) {
try {
String instanceName = entry.getKey();
StoppableCheck stoppableCheck = entry.getValue().get();

// Check if min active will be violated and add to stoppableCheck. If instance still stoppable,
// add to possibleToStopInstances
boolean minActiveCheckResult = InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor,
instanceName, possibleToStopInstances);
stoppableCheck.add(new StoppableCheck(Collections.singletonMap(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
minActiveCheckResult), StoppableCheck.Category.HELIX_OWN_CHECK));
if (stoppableCheck.isStoppable()) {
possibleToStopInstances.add(instanceName);
}

} catch (Exception e) {
String errorMessage = String.format("Failed to get StoppableChecks in parallel. Instances: %s",
futureStoppableCheckByInstance.values());
LOG.error(errorMessage, e);
throw new HelixException(errorMessage);
}
}
}

public static class MaintenanceManagementServiceBuilder {
private ConfigAccessor _configAccessor;
private boolean _skipZKRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void beforeMethod() {
RESTConfig restConfig = new RESTConfig("restConfig");
restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, "http://*:123/path");
when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
when(_dataAccessorWrapper.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
}

class MockMaintenanceManagementService extends MaintenanceManagementService {
Expand Down Expand Up @@ -124,7 +125,7 @@ public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws IOExcept
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
MockMaintenanceManagementService service =
new MockMaintenanceManagementService(_dataAccessorWrapper, _configAccessor,
_customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
_customRestClient, false, false, null, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand All @@ -144,7 +145,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail() throws IOException {
MockMaintenanceManagementService service =
new MockMaintenanceManagementService(_dataAccessorWrapper, _configAccessor,
_customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
_customRestClient, false, false, null, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand Down Expand Up @@ -243,7 +244,7 @@ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckDisabled() throw
public void testGetInstanceStoppableCheckConnectionRefused() throws IOException {
MockMaintenanceManagementService service =
new MockMaintenanceManagementService(_dataAccessorWrapper, _configAccessor,
_customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
_customRestClient, false, false, null, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
Expand Down Expand Up @@ -288,6 +289,10 @@ public void testInstanceStoppableCrossZoneBasedWithEvacuatingInstances() throws
ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance0, instanceConfig);
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, instanceConfig1);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

Expand Down Expand Up @@ -519,6 +524,59 @@ public void testValidateWeightForAllInstances() throws IOException {
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testValidateWeightForAllInstances")
public void testMultipleReplicasInSameMZ() throws Exception {
System.out.println("Start test :" + TestHelper.getTestMethodName());
// Create SemiAuto DB so that we can control assignment
String testDb = TestHelper.getTestMethodName() + "_resource";
_gSetupTool.getClusterManagementTool().addResource(STOPPABLE_CLUSTER2, testDb, 3, "MasterSlave",
IdealState.RebalanceMode.SEMI_AUTO.toString());
_gSetupTool.getClusterManagementTool().rebalance(STOPPABLE_CLUSTER2, testDb, 3);

// Manually set ideal state to have the 3 replcias assigned to 3 instances all in the same zone
List<String> preferenceList = Arrays.asList("instance0", "instance1", "instance2");
IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(STOPPABLE_CLUSTER2, testDb);
for (String p : is.getPartitionSet()) {
is.setPreferenceList(p, preferenceList);
}
is.setMinActiveReplicas(2);
_gSetupTool.getClusterManagementTool().setResourceIdealState(STOPPABLE_CLUSTER2, testDb, is);

// Wait for assignments to take place
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(verifier.verifyByPolling());

// Run stoppable check against the 3 instances where SemiAuto DB was assigned
String content =
String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"]}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1",
"instance2");
Response response =
new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));

// Resource has 3 replicas with min_active of 2
// First instance should be stoppable as min_active still satisfied
Set<String> stoppableSet = getStringSet(jsonNode,
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
Assert.assertTrue(Collections.singleton("instance0").equals(stoppableSet));

// Next 2 instances should fail stoppable due to MIN_ACTIVE_REPLICA_CHECK_FAILED
JsonNode nonStoppableInstances = jsonNode.get(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Assert.assertFalse(getStringSet(nonStoppableInstances, "instance0")
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertTrue(getStringSet(nonStoppableInstances, "instance1")
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertTrue(getStringSet(nonStoppableInstances, "instance2")
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
System.out.println("End test :" + TestHelper.getTestMethodName());
}

private Set<String> getStringSet(JsonNode jsonNode, String key) {
Set<String> result = new HashSet<>();
jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
Expand Down

0 comments on commit fcb9d83

Please sign in to comment.