Skip to content

Commit

Permalink
Enhanced stoppable checks with node evacuation filtering and introduc…
Browse files Browse the repository at this point in the history
…ed blacklisting capabilities (#2687)

Enhanced stoppable checks with node evacuation filtering and introduced blacklisting capabilities
  • Loading branch information
MarkGaox authored and Xiaoyuan Lu committed Dec 13, 2023
1 parent ac85195 commit 595f1cc
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
if (stateMap.containsKey(instanceToBeStop)
&& stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
// Skip this self check and instances we assume to be already stopped
if (siblingInstance.equals(instanceToBeStop) || (toBeStoppedInstances != null
&& toBeStoppedInstances.contains(siblingInstance))) {
continue;
Expand Down Expand Up @@ -451,9 +451,10 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
if (stateByInstanceMap.containsKey(instanceName)) {
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(entry.getKey())) && !unhealthyStates.contains(
entry.getValue())) {
String siblingInstanceName = entry.getKey();
if (!siblingInstanceName.equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(siblingInstanceName))
&& !unhealthyStates.contains(entry.getValue())) {
numHealthySiblings++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public class MaintenanceManagementService {
private final HelixDataAccessorWrapper _dataAccessor;
private final Set<String> _nonBlockingHealthChecks;
private final Set<StoppableCheck.Category> _skipHealthCheckCategories;
// Set the default value of _skipStoppableHealthCheckList to be an empty list to
// maintain the backward compatibility with users who don't use MaintenanceManagementServiceBuilder
// to create the MaintenanceManagementService object.
private List<HealthCheck> _skipStoppableHealthCheckList = Collections.emptyList();

public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, boolean skipZKRead, String namespace) {
Expand Down Expand Up @@ -144,6 +148,25 @@ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
_namespace = namespace;
}

private MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
Set<String> nonBlockingHealthChecks, Set<StoppableCheck.Category> skipHealthCheckCategories,
List<HealthCheck> skipStoppableHealthCheckList, String namespace) {
_dataAccessor =
new HelixDataAccessorWrapper(dataAccessor, customRestClient,
namespace);
_configAccessor = configAccessor;
_customRestClient = customRestClient;
_skipZKRead = skipZKRead;
_nonBlockingHealthChecks =
nonBlockingHealthChecks == null ? Collections.emptySet() : nonBlockingHealthChecks;
_skipHealthCheckCategories =
skipHealthCheckCategories == null ? Collections.emptySet() : skipHealthCheckCategories;
_skipStoppableHealthCheckList = skipStoppableHealthCheckList == null ? Collections.emptyList()
: skipStoppableHealthCheckList;
_namespace = namespace;
}

/**
* Perform health check and maintenance operation check and execution for a instance in
* one cluster.
Expand Down Expand Up @@ -463,7 +486,10 @@ private List<String> batchCustomInstanceStoppableCheck(String clusterId, List<St
return instances;
}
RESTConfig restConfig = _configAccessor.getRESTConfig(clusterId);
if (restConfig == null) {
if (restConfig == null && (
!_skipHealthCheckCategories.contains(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)
|| !_skipHealthCheckCategories.contains(
StoppableCheck.Category.CUSTOM_PARTITION_CHECK))) {
String errorMessage = String.format(
"The cluster %s hasn't enabled client side health checks yet, "
+ "thus the stoppable check result is inaccurate", clusterId);
Expand Down Expand Up @@ -612,8 +638,10 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) {
private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName,
Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
List<HealthCheck> healthChecksToExecute = new ArrayList<>(HealthCheck.STOPPABLE_CHECK_LIST);
healthChecksToExecute.removeAll(_skipStoppableHealthCheckList);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
getInstanceHealthStatus(clusterId, instanceName, healthChecksToExecute,
toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
Expand Down Expand Up @@ -771,4 +799,87 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String

return healthStatus;
}

public static class MaintenanceManagementServiceBuilder {
private ConfigAccessor _configAccessor;
private boolean _skipZKRead;
private String _namespace;
private ZKHelixDataAccessor _dataAccessor;
private CustomRestClient _customRestClient;
private Set<String> _nonBlockingHealthChecks;
private Set<StoppableCheck.Category> _skipHealthCheckCategories = Collections.emptySet();
private List<HealthCheck> _skipStoppableHealthCheckList = Collections.emptyList();

public MaintenanceManagementServiceBuilder setConfigAccessor(ConfigAccessor configAccessor) {
_configAccessor = configAccessor;
return this;
}

public MaintenanceManagementServiceBuilder setSkipZKRead(boolean skipZKRead) {
_skipZKRead = skipZKRead;
return this;
}

public MaintenanceManagementServiceBuilder setNamespace(String namespace) {
_namespace = namespace;
return this;
}

public MaintenanceManagementServiceBuilder setDataAccessor(
ZKHelixDataAccessor dataAccessor) {
_dataAccessor = dataAccessor;
return this;
}

public MaintenanceManagementServiceBuilder setCustomRestClient(
CustomRestClient customRestClient) {
_customRestClient = customRestClient;
return this;
}

public MaintenanceManagementServiceBuilder setNonBlockingHealthChecks(
Set<String> nonBlockingHealthChecks) {
_nonBlockingHealthChecks = nonBlockingHealthChecks;
return this;
}

public MaintenanceManagementServiceBuilder setSkipHealthCheckCategories(
Set<StoppableCheck.Category> skipHealthCheckCategories) {
_skipHealthCheckCategories = skipHealthCheckCategories;
return this;
}

public MaintenanceManagementServiceBuilder setSkipStoppableHealthCheckList(
List<HealthCheck> skipStoppableHealthCheckList) {
_skipStoppableHealthCheckList = skipStoppableHealthCheckList;
return this;
}

public MaintenanceManagementService build() {
validate();
return new MaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient,
_skipZKRead, _nonBlockingHealthChecks, _skipHealthCheckCategories,
_skipStoppableHealthCheckList, _namespace);
}

private void validate() throws IllegalArgumentException {
List<String> msg = new ArrayList<>();
if (_configAccessor == null) {
msg.add("'configAccessor' can't be null.");
}
if (_namespace == null) {
msg.add("'namespace' can't be null.");
}
if (_dataAccessor == null) {
msg.add("'_dataAccessor' can't be null.");
}
if (_customRestClient == null) {
msg.add("'customRestClient' can't be null.");
}
if (msg.size() != 0) {
throw new IllegalArgumentException(
"One or more mandatory arguments are not set " + msg);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.helix.PropertyKey;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.rest.server.json.cluster.ClusterTopology;
import org.apache.helix.rest.server.json.instance.StoppableCheck;
import org.apache.helix.rest.server.resources.helix.InstancesAccessor;
Expand All @@ -48,15 +52,17 @@ public class StoppableInstancesSelector {
private final String _customizedInput;
private final MaintenanceManagementService _maintenanceService;
private final ClusterTopology _clusterTopology;
private final ZKHelixDataAccessor _dataAccessor;

public StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
private StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
String customizedInput, MaintenanceManagementService maintenanceService,
ClusterTopology clusterTopology) {
ClusterTopology clusterTopology, ZKHelixDataAccessor dataAccessor) {
_clusterId = clusterId;
_orderOfZone = orderOfZone;
_customizedInput = customizedInput;
_maintenanceService = maintenanceService;
_clusterTopology = clusterTopology;
_dataAccessor = dataAccessor;
}

/**
Expand All @@ -66,7 +72,7 @@ public StoppableInstancesSelector(String clusterId, List<String> orderOfZone,
* reasons for non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be are already stopped
* @param toBeStoppedInstances A list of instances presumed to be already stopped
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
Expand All @@ -81,6 +87,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);

List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
Expand All @@ -97,7 +104,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
* non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be are already stopped
* @param toBeStoppedInstances A list of instances presumed to be already stopped
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
Expand All @@ -112,6 +119,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);

Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
for (String zone : _orderOfZone) {
Expand Down Expand Up @@ -249,12 +257,31 @@ private Map<String, Set<String>> getOrderedZoneToInstancesMap(
(existing, replacement) -> existing, LinkedHashMap::new));
}

/**
* Collect instances marked for evacuation in the current topology and add them into the given set
*
* @param toBeStoppedInstances A set of instances we presume to be stopped.
*/
private void collectEvacuatingInstances(Set<String> toBeStoppedInstances) {
Set<String> allInstances = _clusterTopology.getAllInstances();
for (String instance : allInstances) {
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
if (InstanceConstants.InstanceOperation.EVACUATE.name()
.equals(instanceConfig.getInstanceOperation())) {
toBeStoppedInstances.add(instance);
}
}
}

public static class StoppableInstancesSelectorBuilder {
private String _clusterId;
private List<String> _orderOfZone;
private String _customizedInput;
private MaintenanceManagementService _maintenanceService;
private ClusterTopology _clusterTopology;
private ZKHelixDataAccessor _dataAccessor;

public StoppableInstancesSelectorBuilder setClusterId(String clusterId) {
_clusterId = clusterId;
Expand Down Expand Up @@ -282,9 +309,14 @@ public StoppableInstancesSelectorBuilder setClusterTopology(ClusterTopology clus
return this;
}

public StoppableInstancesSelectorBuilder setDataAccessor(ZKHelixDataAccessor dataAccessor) {
_dataAccessor = dataAccessor;
return this;
}

public StoppableInstancesSelector build() {
return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput,
_maintenanceService, _clusterTopology);
_maintenanceService, _clusterTopology, _dataAccessor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
*/

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -46,6 +46,8 @@
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.rest.client.CustomRestClientFactory;
import org.apache.helix.rest.clusterMaintenanceService.HealthCheck;
import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService;
import org.apache.helix.rest.common.HttpConstants;
import org.apache.helix.rest.clusterMaintenanceService.StoppableInstancesSelector;
Expand All @@ -59,17 +61,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService.ALL_HEALTH_CHECK_NONBLOCK;

@ClusterAuth
@Path("/clusters/{clusterId}/instances")
public class InstancesAccessor extends AbstractHelixResource {
private final static Logger _logger = LoggerFactory.getLogger(InstancesAccessor.class);

public enum InstancesProperties {
instances,
online,
disabled,
selection_base,
zone_order,
to_be_stopped_instances,
skip_stoppable_check_list,
customized_values,
instance_stoppable_parallel,
instance_not_stoppable_with_reasons
Expand Down Expand Up @@ -228,6 +234,9 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
List<String> orderOfZone = null;
String customizedInput = null;
List<String> toBeStoppedInstances = Collections.emptyList();
// By default, if skip_stoppable_check_list is unset, all checks are performed to maintain
// backward compatibility with existing clients.
List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) {
customizedInput =
node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
Expand Down Expand Up @@ -260,10 +269,36 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
}
}

if (node.get(InstancesProperties.skip_stoppable_check_list.name()) != null) {
List<String> list = OBJECT_MAPPER.readValue(
node.get(InstancesProperties.skip_stoppable_check_list.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
try {
skipStoppableCheckList =
list.stream().map(HealthCheck::valueOf).collect(Collectors.toList());
} catch (IllegalArgumentException e) {
String message =
"'skip_stoppable_check_list' has invalid check names: " + list
+ ". Supported checks: " + HealthCheck.STOPPABLE_CHECK_LIST;
_logger.error(message, e);
return badRequest(message);
}
}

String namespace = getNamespace();
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId),
getConfigAccessor(), skipZKRead, continueOnFailures, skipHealthCheckCategories,
getNamespace());
new MaintenanceManagementService.MaintenanceManagementServiceBuilder()
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.setConfigAccessor(getConfigAccessor())
.setSkipZKRead(skipZKRead)
.setNonBlockingHealthChecks(
continueOnFailures ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK) : null)
.setCustomRestClient(CustomRestClientFactory.get())
.setSkipHealthCheckCategories(skipHealthCheckCategories)
.setNamespace(namespace)
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.build();

ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
Expand All @@ -274,6 +309,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setCustomizedInput(customizedInput)
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
ObjectNode result;
Expand Down
Loading

0 comments on commit 595f1cc

Please sign in to comment.