Skip to content

Commit

Permalink
Implement the cross-zone-based stoppable check; Add to_be_stopped_ins…
Browse files Browse the repository at this point in the history
…tances parameter to the Stoppable API
  • Loading branch information
MarkGaox committed Oct 24, 2023
1 parent 9f40006 commit 56854ed
Show file tree
Hide file tree
Showing 9 changed files with 411 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in
* @param instanceName
* @return
*/
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String instanceName) {
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName, Set<String> toBeStoppedInstances) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
List<String> resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());

Expand Down Expand Up @@ -406,8 +407,8 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
if (stateByInstanceMap.containsKey(instanceName)) {
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
if (!entry.getKey().equals(instanceName)
&& !unhealthyStates.contains(entry.getValue())) {
if (!entry.getKey().equals(instanceName) && !toBeStoppedInstances.contains(
entry.getKey()) && !unhealthyStates.contains(entry.getValue())) {
numHealthySiblings++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -396,11 +398,83 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());

Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance3");
toBeStoppedInstances.add("invalidInstances"); // include an invalid instance.
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);

Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
toBeStoppedInstances.add("instance2");
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);

Assert.assertFalse(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
Expand Down Expand Up @@ -428,7 +502,7 @@ public void TestSiblingNodesActiveReplicaCheck_fail() {
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());

Assert.assertFalse(result);
}
Expand All @@ -452,7 +526,7 @@ public void TestSiblingNodesActiveReplicaCheck_whenNoMinActiveReplica() {
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));

boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
Assert.assertTrue(result);
}

Expand All @@ -472,7 +546,7 @@ public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavail
doReturn(null).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));

InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet());
}

private class Mock {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public InstanceInfo getInstanceHealthInfo(String clusterId, String instanceName,
}
try {
Map<String, Boolean> healthStatus =
getInstanceHealthStatus(clusterId, instanceName, healthChecks);
getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet());
instanceInfoBuilder.healthStatus(healthStatus);
} catch (HelixException ex) {
LOG.error(
Expand Down Expand Up @@ -328,7 +328,7 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
/**
* {@inheritDoc}
* Single instance stoppable check implementation is a special case of
* {@link #batchGetInstancesStoppableChecks(String, List, String)}
* {@link #batchGetInstancesStoppableChecks(String, List, String, Set)}
* <p>
* Step 1: Perform instance level Helix own health checks
* Step 2: Perform instance level client side health checks
Expand All @@ -339,17 +339,17 @@ private List<OperationInterface> getAllOperationClasses(List<String> operations)
*/
public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName,
String jsonContent) throws IOException {
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent)
return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent, Collections.emptySet())
.get(instanceName);
}


public Map<String, StoppableCheck> batchGetInstancesStoppableChecks(String clusterId,
List<String> instances, String jsonContent) throws IOException {
List<String> instances, String jsonContent, Set<String> toBeStoppedInstances) throws IOException {
Map<String, StoppableCheck> finalStoppableChecks = new HashMap<>();
// helix instance check.
List<String> instancesForCustomInstanceLevelChecks =
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks);
batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, toBeStoppedInstances);
// custom check, includes partition check.
batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks,
finalStoppableChecks, getMapFromJsonPayload(jsonContent));
Expand Down Expand Up @@ -441,10 +441,10 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl
}

private List<String> batchHelixInstanceStoppableCheck(String clusterId,
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks) {
Collection<String> instances, Map<String, StoppableCheck> finalStoppableChecks, Set<String> toBeStoppedInstances) {
Map<String, Future<StoppableCheck>> helixInstanceChecks = instances.stream().collect(Collectors
.toMap(Function.identity(),
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance))));
instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances))));
// finalStoppableChecks contains instances that does not pass this health check
return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks);
}
Expand Down Expand Up @@ -512,7 +512,7 @@ private Map<String, MaintenanceManagementInstanceInfo> batchInstanceHealthCheck(
if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) {
// this is helix own check
instancesForNext =
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks);
batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, Collections.emptySet());
} else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) {
// custom check, includes custom Instance check and partition check.
instancesForNext =
Expand Down Expand Up @@ -601,10 +601,10 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
return true;
}

private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) {
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST);
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
}
Expand Down Expand Up @@ -697,7 +697,7 @@ public static boolean getBooleanFromJsonPayload(String jsonString)

@VisibleForTesting
protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String instanceName,
List<HealthCheck> healthChecks) {
List<HealthCheck> healthChecks, Set<String> toBeStoppedInstances) {
Map<String, Boolean> healthStatus = new HashMap<>();
for (HealthCheck healthCheck : healthChecks) {
switch (healthCheck) {
Expand Down Expand Up @@ -745,7 +745,7 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String
break;
case MIN_ACTIVE_REPLICA_CHECK_FAILED:
healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(),
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName));
InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances));
break;
default:
LOG.error("Unsupported health check: {}", healthCheck);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -33,7 +34,6 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.json.instance.StoppableCheck;

Expand Down Expand Up @@ -69,14 +69,47 @@ public StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
* reasons for non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be are already stopped
* @throws IOException
*/
public void getStoppableInstancesInSingleZone(List<String> instances) throws IOException {
public void getStoppableInstancesInSingleZone(List<String> instances,
Set<String> toBeStoppedInstances) throws IOException {
List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
getStoppableInstances(zoneBasedInstance, toBeStoppedInstances);
processNonexistentInstances(instances);
}

/**
* Evaluates and collects stoppable instances cross a set of zones based on the order of zones.
* The method iterates through instances, performing stoppable checks, and records reasons for
* non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be are already stopped
* @throws IOException
*/
public void getStoppableInstancesCrossZones(List<String> instances,
Set<String> toBeStoppedInstances) throws IOException {
Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
for (String zone : _orderOfZone) {
Set<String> instanceSet = new HashSet<>(instances);
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone));
instanceSet.retainAll(currentZoneInstanceSet);
if (instanceSet.isEmpty()) {
continue;
}
getStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstances);
}
processNonexistentInstances(instances);
}

private void getStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances)
throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, zoneBasedInstance,
_customizedInput);
_maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances,
_customizedInput, toBeStoppedInstances);

for (Map.Entry<String, StoppableCheck> instanceStoppableCheck : instancesStoppableChecks.entrySet()) {
String instance = instanceStoppableCheck.getKey();
StoppableCheck stoppableCheck = instanceStoppableCheck.getValue();
Expand All @@ -87,8 +120,14 @@ public void getStoppableInstancesInSingleZone(List<String> instances) throws IOE
}
} else {
_stoppableInstances.add(instance);
// Update the toBeStoppedInstances set with the currently identified stoppable instance.
// This ensures that subsequent checks in other zones are aware of this instance's stoppable status.
toBeStoppedInstances.add(instance);
}
}
}

private void processNonexistentInstances(List<String> instances) {
// Adding following logic to check whether instances exist or not. An instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
Expand All @@ -105,11 +144,6 @@ public void getStoppableInstancesInSingleZone(List<String> instances) throws IOE
}
}

public void getStoppableInstancesCrossZones() {
// TODO: Add implementation to enable cross zone stoppable check.
throw new NotImplementedException("Not Implemented");
}

/**
* Determines the order of zones. If an order is provided by the user, it will be used directly.
* Otherwise, zones will be ordered by their associated instance count in descending order.
Expand All @@ -118,10 +152,22 @@ public void getStoppableInstancesCrossZones() {
*
* @param random Indicates whether to randomize the order of zones.
*/
public void calculateOrderOfZone(boolean random) {
public void calculateOrderOfZone(List<String> instances, boolean random) {
if (_orderOfZone == null) {
_orderOfZone =
new ArrayList<>(getOrderedZoneToInstancesMap(_clusterTopology.toZoneMapping()).keySet());
Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
Map<String, Set<String>> zoneToInstancesMap = new HashMap<>();
for (ClusterTopology.Zone zone : _clusterTopology.getZones()) {
Set<String> instanceSet = new HashSet<>(instances);
// TODO: Use instance config from Helix-rest Cache to get the zone instead of reading the topology info
Set<String> currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone.getId()));
instanceSet.retainAll(currentZoneInstanceSet);
if (instanceSet.isEmpty()) {
continue;
}
zoneToInstancesMap.put(zone.getId(), instanceSet);
}

_orderOfZone = new ArrayList<>(getOrderedZoneToInstancesMap(zoneToInstancesMap).keySet());
}

if (_orderOfZone.isEmpty()) {
Expand Down
Loading

0 comments on commit 56854ed

Please sign in to comment.