Skip to content

Commit

Permalink
fix backward compatibility issue and modify the perPartitionHealthChe…
Browse files Browse the repository at this point in the history
…ck to bypass the unhealth partitions on the to_be_stopped_instances
  • Loading branch information
MarkGaox committed Oct 27, 2023
1 parent 56854ed commit efdd958
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -254,6 +255,27 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor) {
return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop,
dataAccessor, Collections.emptySet());
}

/**
* Get the problematic partitions on the to-be-stop instance
* Requirement:
* If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE,
* the cluster still have enough "healthy" replicas on other sibling instances
*
* - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance
*
* @param globalPartitionHealthStatus (instance => (partition name, health status))
* @param instanceToBeStop The instance to be stopped
* @param dataAccessor The data accessor
* @param toBeStoppedInstances A set of instances presumed to be are already stopped
* @return A list of problematic partitions if the instance is stopped
*/
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor, Set<String> toBeStoppedInstances) {
Map<String, List<String>> unhealthyPartitions = new HashMap<>();

for (ExternalView externalView : externalViews) {
Expand All @@ -273,7 +295,8 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
&& stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
if (siblingInstance.equals(instanceToBeStop)) {
if (siblingInstance.equals(instanceToBeStop) || toBeStoppedInstances.contains(
siblingInstance)) {
continue;
}

Expand Down Expand Up @@ -366,8 +389,27 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor
* @param instanceName
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName A list of instance to be evaluated against this check.
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName) {
return siblingNodesActiveReplicaCheck(dataAccessor, instanceName, Collections.emptySet());
}

/**
* Check if sibling nodes of the instance meet min active replicas constraint
* Two instances are sibling of each other if they host the same partition. And sibling nodes
* that are in toBeStoppableInstances will be presumed to be stopped.
* WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
* due to external view propagation latency
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName A list of instance to be evaluated against this check.
* @param toBeStoppedInstances A set of instances presumed to be are already stopped.
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Assert.assertTrue(result);
}
Expand Down Expand Up @@ -502,7 +502,7 @@ public void TestSiblingNodesActiveReplicaCheck_fail() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Assert.assertFalse(result);
}
Expand All @@ -526,7 +526,7 @@ public void TestSiblingNodesActiveReplicaCheck_whenNoMinActiveReplica() {
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));

boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
Assert.assertTrue(result);
}

Expand All @@ -546,7 +546,7 @@ public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavail
doReturn(null).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));

InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
}

private class Mock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public InstanceInfo getInstanceHealthInfo(String clusterId, String instanceName,
}
try {
Map<String, Boolean> healthStatus =
getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
getInstanceHealthStatus(clusterId, instanceName, healthChecks);
instanceInfoBuilder.healthStatus(healthStatus);
} catch (HelixException ex) {
LOG.error(
Expand Down Expand Up @@ -328,7 +328,7 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
/**
* {@inheritDoc}
* Single instance stoppable check implementation is a special case of
* {@link #batchGetInstancesStoppableChecks(String, List, String, Set)}
* {@link #batchGetInstancesStoppableChecks(String, List, String)}
* <p>
* Step 1: Perform instance level Helix own health checks
* Step 2: Perform instance level client side health checks
Expand All @@ -339,17 +339,23 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
*/
public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent, Collections.emptySet())
.get(instanceName);
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent,
Collections.emptySet()).get(instanceName);
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent,
Collections.emptySet());
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, toBeStoppedInstances);
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
Expand Down Expand Up @@ -441,10 +447,11 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
}

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks, Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
.toMap(Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -512,7 +519,8 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, Collections.emptySet());
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
Collections.emptySet());
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
Expand Down Expand Up @@ -601,10 +609,12 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
return true;
}

private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, Set<String> toBeStoppedInstances) {
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName,
Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, toBeStoppedInstances);
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
}
Expand Down Expand Up @@ -695,6 +705,12 @@ public static boolean getBooleanFromJsonPayload(String jsonString)
return OBJECT_MAPPER.readTree(jsonString).asBoolean();
}

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
}

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Expand Down
Loading

0 comments on commit efdd958

Please sign in to comment.