Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change stoppable to perform min active check sequentially in mz #2886

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;

import com.google.common.collect.ImmutableList;
import java.util.stream.Collectors;


public enum HealthCheck {
Expand Down Expand Up @@ -60,15 +61,15 @@ public enum HealthCheck {
* constraint if this instance is shutdown
*/
MIN_ACTIVE_REPLICA_CHECK_FAILED;

/**
* Pre-defined list of checks to test if an instance can be stopped at runtime
* Pre-defined list of checks to test if an instance can be stopped at runtime.
*/
public static List<HealthCheck> STOPPABLE_CHECK_LIST = Arrays.asList(HealthCheck.values());

/**
* Pre-defined list of checks to test if an instance is in healthy running state
*/
public static List<HealthCheck> STARTED_AND_HEALTH_CHECK_LIST = ImmutableList
.of(INVALID_CONFIG, INSTANCE_NOT_ALIVE, INSTANCE_NOT_ENABLED, INSTANCE_NOT_STABLE,
EMPTY_RESOURCE_ASSIGNMENT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,15 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {

// Perform all but min_active replicas check in parallel
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));

// Perform min_active replicas check sequentially
addInstanceMinActiveReplicaCheck(helixInstanceChecks, toBeStoppedInstances);
GrantPSpencer marked this conversation as resolved.
Show resolved Hide resolved

// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -788,8 +794,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
InstanceValidationUtil.isResourceAssigned(_dataAccessor, instanceName));
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances));
// No-op as MIN_ACTIVE_REPLICA_CHECK_FAILED is handled separately afterward
break;
default:
LOG.error("Unsupported health check: {}", healthCheck);
Expand All @@ -800,6 +805,39 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
return healthStatus;
}

// Adds the result of the min_active replica check for each stoppable check passed in futureStoppableCheckByInstance
private void addInstanceMinActiveReplicaCheck(Map<String, Future<StoppableCheck>> futureStoppableCheckByInstance,
Set<String> toBeStoppedInstances) {
// Do not perform check if in the skip list
if (_skipStoppableHealthCheckList.contains(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED)) {
return;
}

Set<String> possibleToStopInstances = new HashSet<>(toBeStoppedInstances);
for (Map.Entry<String, Future<StoppableCheck>> entry : futureStoppableCheckByInstance.entrySet()) {
try {
String instanceName = entry.getKey();
StoppableCheck stoppableCheck = entry.getValue().get();
Comment on lines +820 to +823
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, by placing the min active replica check before submitting the asynchronous checks, there is no longer a need to check the future and get the value.


// Check if min active will be violated and add to stoppableCheck. If instance still stoppable,
// add to possibleToStopInstances
boolean minActiveCheckResult = InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor,
instanceName, possibleToStopInstances);
stoppableCheck.add(new StoppableCheck(Collections.singletonMap(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
minActiveCheckResult), StoppableCheck.Category.HELIX_OWN_CHECK));
if (stoppableCheck.isStoppable()) {
possibleToStopInstances.add(instanceName);
}

} catch (Exception e) {
String errorMessage = String.format("Failed to get StoppableChecks in parallel. Instances: %s",
futureStoppableCheckByInstance.values());
LOG.error(errorMessage, e);
throw new HelixException(errorMessage);
}
}
}

public static class MaintenanceManagementServiceBuilder {
private ConfigAccessor _configAccessor;
private boolean _skipZKRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void beforeMethod() {
RESTConfig restConfig = new RESTConfig("restConfig");
restConfig.set(RESTConfig.SimpleFields.CUSTOMIZED_HEALTH_URL, "http://*:123/path");
when(_configAccessor.getRESTConfig(TEST_CLUSTER)).thenReturn(restConfig);
when(_dataAccessorWrapper.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER));
}

class MockMaintenanceManagementService extends MaintenanceManagementService {
Expand Down Expand Up @@ -124,7 +125,7 @@ public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws IOExcept
Map<String, Boolean> failedCheck = ImmutableMap.of("FailCheck", false);
MockMaintenanceManagementService service =
new MockMaintenanceManagementService(_dataAccessorWrapper, _configAccessor,
_customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
_customRestClient, false, false, null, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand All @@ -144,7 +145,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail() throws IOException {
MockMaintenanceManagementService service =
new MockMaintenanceManagementService(_dataAccessorWrapper, _configAccessor,
_customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
_customRestClient, false, false, null, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand Down Expand Up @@ -243,7 +244,7 @@ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckDisabled() throw
public void testGetInstanceStoppableCheckConnectionRefused() throws IOException {
MockMaintenanceManagementService service =
new MockMaintenanceManagementService(_dataAccessorWrapper, _configAccessor,
_customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
_customRestClient, false, false, null, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) {
@Override
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId,
String instanceName, List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.helix.TestHelper;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
import org.apache.helix.rest.server.util.JerseyUriRequestBuilder;
Expand Down Expand Up @@ -288,6 +289,10 @@ public void testInstanceStoppableCrossZoneBasedWithEvacuatingInstances() throws
ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"),
ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST"));
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance0, instanceConfig);
instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
_configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, instanceConfig1);
System.out.println("End test :" + TestHelper.getTestMethodName());
}

Expand Down Expand Up @@ -519,6 +524,59 @@ public void testValidateWeightForAllInstances() throws IOException {
System.out.println("End test :" + TestHelper.getTestMethodName());
}

@Test(dependsOnMethods = "testValidateWeightForAllInstances")
public void testMultipleReplicasInSameMZ() throws Exception {
System.out.println("Start test :" + TestHelper.getTestMethodName());
// Create SemiAuto DB so that we can control assignment
String testDb = TestHelper.getTestMethodName() + "_resource";
_gSetupTool.getClusterManagementTool().addResource(STOPPABLE_CLUSTER2, testDb, 3, "MasterSlave",
IdealState.RebalanceMode.SEMI_AUTO.toString());
_gSetupTool.getClusterManagementTool().rebalance(STOPPABLE_CLUSTER2, testDb, 3);

// Manually set ideal state to have the 3 replcias assigned to 3 instances all in the same zone
List<String> preferenceList = Arrays.asList("instance0", "instance1", "instance2");
IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(STOPPABLE_CLUSTER2, testDb);
for (String p : is.getPartitionSet()) {
is.setPreferenceList(p, preferenceList);
}
is.setMinActiveReplicas(2);
_gSetupTool.getClusterManagementTool().setResourceIdealState(STOPPABLE_CLUSTER2, testDb, is);

// Wait for assignments to take place
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER2).setZkAddr(ZK_ADDR).build();
Assert.assertTrue(verifier.verifyByPolling());

// Run stoppable check against the 3 instances where SemiAuto DB was assigned
String content =
String.format("{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\"]}",
InstancesAccessor.InstancesProperties.selection_base.name(),
InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(),
InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1",
"instance2");
Response response =
new JerseyUriRequestBuilder("clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format(
STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE));
JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class));

// Resource has 3 replicas with min_active of 2
// First instance should be stoppable as min_active still satisfied
Set<String> stoppableSet = getStringSet(jsonNode,
InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
Assert.assertTrue(Collections.singleton("instance0").equals(stoppableSet));

// Next 2 instances should fail stoppable due to MIN_ACTIVE_REPLICA_CHECK_FAILED
JsonNode nonStoppableInstances = jsonNode.get(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Assert.assertFalse(getStringSet(nonStoppableInstances, "instance0")
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertTrue(getStringSet(nonStoppableInstances, "instance1")
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
Assert.assertTrue(getStringSet(nonStoppableInstances, "instance2")
.contains("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED"));
System.out.println("End test :" + TestHelper.getTestMethodName());
}

private Set<String> getStringSet(JsonNode jsonNode, String key) {
Set<String> result = new HashSet<>();
jsonNode.withArray(key).forEach(s -> result.add(s.textValue()));
Expand Down
Loading