Skip to content

Commit

Permalink
Implement the cross-zone-based stoppable check (#2680)
Browse files Browse the repository at this point in the history
Implement the cross-zone-based stoppable check and add to_be_stopped_instances query parameter to the stoppable check API
  • Loading branch information
MarkGaox authored and Xiaoyuan Lu committed Dec 8, 2023
1 parent 39c5eb2 commit 0b01378
Show file tree
Hide file tree
Showing 9 changed files with 576 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -254,6 +255,28 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor) {
return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop,
dataAccessor, Collections.emptySet());
}

/**
* Get the problematic partitions on the to-be-stop instance
* Requirement:
* If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE,
* the cluster still have enough "healthy" replicas on other sibling instances
*
* - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance
*
* @param globalPartitionHealthStatus (instance => (partition name, health status))
* @param instanceToBeStop The instance to be stopped
* @param dataAccessor The data accessor
* @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
* shouldn't contain the `instanceToBeStop`
* @return A list of problematic partitions if the instance is stopped
*/
public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalView> externalViews,
Map<String, Map<String, Boolean>> globalPartitionHealthStatus, String instanceToBeStop,
HelixDataAccessor dataAccessor, Set<String> toBeStoppedInstances) {
Map<String, List<String>> unhealthyPartitions = new HashMap<>();

for (ExternalView externalView : externalViews) {
Expand All @@ -273,7 +296,8 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
&& stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
if (siblingInstance.equals(instanceToBeStop)) {
if (siblingInstance.equals(instanceToBeStop) || (toBeStoppedInstances != null
&& toBeStoppedInstances.contains(siblingInstance))) {
continue;
}

Expand Down Expand Up @@ -366,11 +390,32 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor
* @param instanceName
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName An instance to be evaluated against this check.
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName) {
return siblingNodesActiveReplicaCheck(dataAccessor, instanceName, Collections.emptySet());
}

/**
* Check if sibling nodes of the instance meet min active replicas constraint
* Two instances are sibling of each other if they host the same partition. And sibling nodes
* that are in toBeStoppableInstances will be presumed to be stopped.
* WARNING: The check uses ExternalView to reduce network traffic but suffer from accuracy
* due to external view propagation latency
*
* TODO: Use in memory cache and query instance's currentStates
*
* @param dataAccessor A helper class to access the Helix data.
* @param instanceName An instance to be evaluated against this check.
* @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it
* shouldn't contain the `instanceName`
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String instanceName) {
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName, Set<String> toBeStoppedInstances) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
List<String> resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());

Expand Down Expand Up @@ -406,8 +451,9 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
if (stateByInstanceMap.containsKey(instanceName)) {
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
if (!entry.getKey().equals(instanceName)
&& !unhealthyStates.contains(entry.getValue())) {
if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(entry.getKey())) && !unhealthyStates.contains(
entry.getValue())) {
numHealthySiblings++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -401,6 +403,81 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance3");
toBeStoppedInstances.add("invalidInstances"); // include an invalid instance.
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
Assert.assertTrue(result);

result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, null);
Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
toBeStoppedInstances.add("instance2");
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);

Assert.assertFalse(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,23 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
*/
public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent)
.get(instanceName);
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName),
jsonContent).get(instanceName);
}


public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent,
Collections.emptySet());
}

public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks,
toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
Expand Down Expand Up @@ -441,10 +447,11 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
}

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
.toMap(Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks,
Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(
Collectors.toMap(Function.identity(), instance -> POOL.submit(
() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -512,7 +519,8 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks);
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks,
Collections.emptySet());
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
Expand Down Expand Up @@ -601,10 +609,12 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
return true;
}

private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName,
Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST);
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
}
Expand Down Expand Up @@ -698,6 +708,12 @@ public static boolean getBooleanFromJsonPayload(String jsonString)
@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
}

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Map<String, Boolean> healthStatus = new HashMap<>();
for (HealthCheck healthCheck : healthChecks) {
switch (healthCheck) {
Expand Down Expand Up @@ -745,7 +761,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances));
break;
default:
LOG.error("Unsupported health check: {}", healthCheck);
Expand Down
Loading

0 comments on commit 0b01378

Please sign in to comment.