diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java index e34e9e6fb4..46db5d5a33 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java @@ -20,6 +20,7 @@ */ import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -254,6 +255,27 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String public static Map> perPartitionHealthCheck(List externalViews, Map> globalPartitionHealthStatus, String instanceToBeStop, HelixDataAccessor dataAccessor) { + return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop, + dataAccessor, Collections.emptySet()); + } + + /** + * Get the problematic partitions on the to-be-stop instance + * Requirement: + * If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE, + * the cluster still have enough "healthy" replicas on other sibling instances + * + * - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance + * + * @param globalPartitionHealthStatus (instance => (partition name, health status)) + * @param instanceToBeStop The instance to be stopped + * @param dataAccessor The data accessor + * @param toBeStoppedInstances A set of instances presumed to be are already stopped + * @return A list of problematic partitions if the instance is stopped + */ + public static Map> perPartitionHealthCheck(List externalViews, + Map> globalPartitionHealthStatus, String instanceToBeStop, + HelixDataAccessor dataAccessor, Set toBeStoppedInstances) { Map> unhealthyPartitions = new HashMap<>(); for (ExternalView externalView : externalViews) { @@ -273,7 +295,8 @@ public static Map> perPartitionHealthCheck(List healthStatus = - getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet()); + getInstanceHealthStatus(clusterId, instanceName, healthChecks); instanceInfoBuilder.healthStatus(healthStatus); } catch (HelixException ex) { LOG.error( @@ -328,7 +328,7 @@ private List getAllOperationClasses(List operations) /** * {@inheritDoc} * Single instance stoppable check implementation is a special case of - * {@link #batchGetInstancesStoppableChecks(String, List, String, Set)} + * {@link #batchGetInstancesStoppableChecks(String, List, String)} *

* Step 1: Perform instance level Helix own health checks * Step 2: Perform instance level client side health checks @@ -339,17 +339,23 @@ private List getAllOperationClasses(List operations) */ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent) throws IOException { - return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent, Collections.emptySet()) - .get(instanceName); + return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent, + Collections.emptySet()).get(instanceName); } + public Map batchGetInstancesStoppableChecks(String clusterId, + List instances, String jsonContent) throws IOException { + return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, + Collections.emptySet()); + } public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent, Set toBeStoppedInstances) throws IOException { Map finalStoppableChecks = new HashMap<>(); // helix instance check. List instancesForCustomInstanceLevelChecks = - batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, toBeStoppedInstances); + batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, + toBeStoppedInstances); // custom check, includes partition check. batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks, finalStoppableChecks, getMapFromJsonPayload(jsonContent)); @@ -441,10 +447,11 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl } private List batchHelixInstanceStoppableCheck(String clusterId, - Collection instances, Map finalStoppableChecks, Set toBeStoppedInstances) { - Map> helixInstanceChecks = instances.stream().collect(Collectors - .toMap(Function.identity(), - instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)))); + Collection instances, Map finalStoppableChecks, + Set toBeStoppedInstances) { + Map> helixInstanceChecks = instances.stream().collect( + Collectors.toMap(Function.identity(), instance -> POOL.submit( + () -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)))); // finalStoppableChecks contains instances that does not pass this health check return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks); } @@ -512,7 +519,8 @@ private Map batchInstanceHealthCheck( if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) { // this is helix own check instancesForNext = - batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, Collections.emptySet()); + batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, + Collections.emptySet()); } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) { // custom check, includes custom Instance check and partition check. instancesForNext = @@ -601,10 +609,12 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) { return true; } - private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, Set toBeStoppedInstances) { + private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, + Set toBeStoppedInstances) { LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName); Map helixStoppableCheck = - getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, toBeStoppedInstances); + getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, + toBeStoppedInstances); return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK); } @@ -695,6 +705,12 @@ public static boolean getBooleanFromJsonPayload(String jsonString) return OBJECT_MAPPER.readTree(jsonString).asBoolean(); } + @VisibleForTesting + protected Map getInstanceHealthStatus(String clusterId, String instanceName, + List healthChecks) { + return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet()); + } + @VisibleForTesting protected Map getInstanceHealthStatus(String clusterId, String instanceName, List healthChecks, Set toBeStoppedInstances) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index 30f0a6bde4..8f63850f2e 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -36,6 +36,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.rest.server.json.instance.StoppableCheck; +import org.apache.helix.rest.server.resources.helix.InstancesAccessor; public class StoppableInstancesSelector { // This type does not belong to real HealthCheck failed reason. Also, if we add this type @@ -45,19 +46,15 @@ public class StoppableInstancesSelector { private String _clusterId; private List _orderOfZone; private String _customizedInput; - private ArrayNode _stoppableInstances; - private ObjectNode _failedStoppableInstances; private MaintenanceManagementService _maintenanceService; private ClusterTopology _clusterTopology; public StoppableInstancesSelector(String clusterId, List orderOfZone, - String customizedInput, ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, - MaintenanceManagementService maintenanceService, ClusterTopology clusterTopology) { + String customizedInput, MaintenanceManagementService maintenanceService, + ClusterTopology clusterTopology) { _clusterId = clusterId; _orderOfZone = orderOfZone; _customizedInput = customizedInput; - _stoppableInstances = stoppableInstances; - _failedStoppableInstances = failedStoppableInstances; _maintenanceService = maintenanceService; _clusterTopology = clusterTopology; } @@ -72,12 +69,22 @@ public StoppableInstancesSelector(String clusterId, List orderOfZone, * @param toBeStoppedInstances A list of instances presumed to be are already stopped * @throws IOException */ - public void getStoppableInstancesInSingleZone(List instances, - Set toBeStoppedInstances) throws IOException { + public ObjectNode getStoppableInstancesInSingleZone(List instances, + List toBeStoppedInstances) throws IOException { + ObjectNode result = JsonNodeFactory.instance.objectNode(); + ArrayNode stoppableInstances = + result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + ObjectNode failedStoppableInstances = result.putObject( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); + List zoneBasedInstance = getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); - getStoppableInstances(zoneBasedInstance, toBeStoppedInstances); - processNonexistentInstances(instances); + getStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances, + failedStoppableInstances); + processNonexistentInstances(instances, failedStoppableInstances); + + return result; } /** @@ -89,8 +96,15 @@ public void getStoppableInstancesInSingleZone(List instances, * @param toBeStoppedInstances A list of instances presumed to be are already stopped * @throws IOException */ - public void getStoppableInstancesCrossZones(List instances, - Set toBeStoppedInstances) throws IOException { + public ObjectNode getStoppableInstancesCrossZones(List instances, + List toBeStoppedInstances) throws IOException { + ObjectNode result = JsonNodeFactory.instance.objectNode(); + ArrayNode stoppableInstances = + result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + ObjectNode failedStoppableInstances = result.putObject( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); + Map> zoneMapping = _clusterTopology.toZoneMapping(); for (String zone : _orderOfZone) { Set instanceSet = new HashSet<>(instances); @@ -99,13 +113,15 @@ public void getStoppableInstancesCrossZones(List instances, if (instanceSet.isEmpty()) { continue; } - getStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstances); + getStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances, + failedStoppableInstances); } - processNonexistentInstances(instances); + processNonexistentInstances(instances, failedStoppableInstances); + return result; } - private void getStoppableInstances(List instances, Set toBeStoppedInstances) - throws IOException { + private void getStoppableInstances(List instances, Set toBeStoppedInstances, + ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException { Map instancesStoppableChecks = _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, _customizedInput, toBeStoppedInstances); @@ -114,12 +130,12 @@ private void getStoppableInstances(List instances, Set toBeStopp String instance = instanceStoppableCheck.getKey(); StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); if (!stoppableCheck.isStoppable()) { - ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(instance); + ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); for (String failedReason : stoppableCheck.getFailedChecks()) { failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); } } else { - _stoppableInstances.add(instance); + stoppableInstances.add(instance); // Update the toBeStoppedInstances set with the currently identified stoppable instance. // This ensures that subsequent checks in other zones are aware of this instance's stoppable status. toBeStoppedInstances.add(instance); @@ -127,7 +143,7 @@ private void getStoppableInstances(List instances, Set toBeStopp } } - private void processNonexistentInstances(List instances) { + private void processNonexistentInstances(List instances, ObjectNode failedStoppableInstances) { // Adding following logic to check whether instances exist or not. An instance exist could be // checking following scenario: // 1. Instance got dropped. (InstanceConfig is gone.) @@ -139,7 +155,7 @@ private void processNonexistentInstances(List instances) { Set nonSelectedInstances = new HashSet<>(instances); nonSelectedInstances.removeAll(_clusterTopology.getAllInstances()); for (String nonSelectedInstance : nonSelectedInstances) { - ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(nonSelectedInstance); + ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance); failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST)); } } @@ -150,6 +166,7 @@ private void processNonexistentInstances(List instances) { * * If `random` is true, the order of zones will be randomized regardless of any previous order. * + * @param instances A list of instance to be used to calculate the order of zones. * @param random Indicates whether to randomize the order of zones. */ public void calculateOrderOfZone(List instances, boolean random) { @@ -228,8 +245,6 @@ public static class StoppableInstancesSelectorBuilder { private String _clusterId; private List _orderOfZone; private String _customizedInput; - private ArrayNode _stoppableInstances; - private ObjectNode _failedStoppableInstances; private MaintenanceManagementService _maintenanceService; private ClusterTopology _clusterTopology; @@ -248,16 +263,6 @@ public StoppableInstancesSelectorBuilder setCustomizedInput(String customizedInp return this; } - public StoppableInstancesSelectorBuilder setStoppableInstances(ArrayNode stoppableInstances) { - _stoppableInstances = stoppableInstances; - return this; - } - - public StoppableInstancesSelectorBuilder setFailedStoppableInstances(ObjectNode failedStoppableInstances) { - _failedStoppableInstances = failedStoppableInstances; - return this; - } - public StoppableInstancesSelectorBuilder setMaintenanceService( MaintenanceManagementService maintenanceService) { _maintenanceService = maintenanceService; @@ -270,9 +275,8 @@ public StoppableInstancesSelectorBuilder setClusterTopology(ClusterTopology clus } public StoppableInstancesSelector build() { - return new StoppableInstancesSelector(_clusterId, _orderOfZone, - _customizedInput, _stoppableInstances, _failedStoppableInstances, _maintenanceService, - _clusterTopology); + return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput, + _maintenanceService, _clusterTopology); } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 06d7ceabc7..785195ebe1 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -227,7 +227,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo List orderOfZone = null; String customizedInput = null; - Set toBeStoppedInstances = Collections.emptySet(); + List toBeStoppedInstances = Collections.emptyList(); if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) { customizedInput = node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString(); @@ -248,7 +248,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) { toBeStoppedInstances = OBJECT_MAPPER.readValue( node.get(InstancesProperties.to_be_stopped_instances.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class)); + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); Set instanceSet = new HashSet<>(instances); instanceSet.retainAll(toBeStoppedInstances); if (!instanceSet.isEmpty()) { @@ -260,13 +260,6 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo } } - // Prepare output result - ObjectNode result = JsonNodeFactory.instance.objectNode(); - ArrayNode stoppableInstances = - result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); - ObjectNode failedStoppableInstances = result.putObject( - InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); - MaintenanceManagementService maintenanceService = new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId), getConfigAccessor(), skipZKRead, continueOnFailures, skipHealthCheckCategories, @@ -279,18 +272,17 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setClusterId(clusterId) .setOrderOfZone(orderOfZone) .setCustomizedInput(customizedInput) - .setStoppableInstances(stoppableInstances) - .setFailedStoppableInstances(failedStoppableInstances) .setMaintenanceService(maintenanceService) .setClusterTopology(clusterTopology) .build(); stoppableInstancesSelector.calculateOrderOfZone(instances, random); + ObjectNode result; switch (selectionBase) { case zone_based: - stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); + result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); break; case cross_zone_based: - stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances); + result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances); break; case instance_based: default: diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java index 35e6399e0f..e37da34ff6 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; @@ -103,6 +105,52 @@ public void testPartitionLevelCheckInitState() { Assert.assertEquals(failedPartitions.keySet().size(), 2); } + @Test + public void testPartitionLevelCheckWithToBeStoppedNode() { + List externalViews = new ArrayList<>(Arrays.asList(prepareExternalViewOnline())); + Mock mock = new Mock(); + HelixDataAccessor accessor = mock.dataAccessor; + + when(mock.dataAccessor.keyBuilder()) + .thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); + when(mock.dataAccessor + .getProperty(new PropertyKey.Builder(TEST_CLUSTER).stateModelDef(MasterSlaveSMD.name))) + .thenReturn(mock.stateModel); + when(mock.stateModel.getTopState()).thenReturn("MASTER"); + when(mock.stateModel.getInitialState()).thenReturn("OFFLINE"); + + Map> partitionStateMap = new HashMap<>(); + partitionStateMap.put("h1", new HashMap<>()); + partitionStateMap.put("h2", new HashMap<>()); + partitionStateMap.put("h3", new HashMap<>()); + partitionStateMap.put("h4", new HashMap<>()); + + partitionStateMap.get("h1").put("p1", true); + partitionStateMap.get("h2").put("p1", true); + partitionStateMap.get("h3").put("p1", true); + partitionStateMap.get("h4").put("p1", true); + + partitionStateMap.get("h1").put("p2", true); + partitionStateMap.get("h2").put("p2", false); + partitionStateMap.get("h3").put("p2", true); + + Set toBeStoppedInstances = new HashSet<>(); + toBeStoppedInstances.add("h3"); + Map> failedPartitions = InstanceValidationUtil.perPartitionHealthCheck( + externalViews, partitionStateMap, "h1", accessor, toBeStoppedInstances); + Assert.assertEquals(failedPartitions.keySet().size(), 1); + Assert.assertEquals(failedPartitions.get("p2").size(), 1); + Assert.assertTrue(failedPartitions.get("p2").contains("UNHEALTHY_PARTITION")); + + toBeStoppedInstances.remove("h3"); + toBeStoppedInstances.add("h2"); + failedPartitions = + InstanceValidationUtil.perPartitionHealthCheck(externalViews, partitionStateMap, "h1", + accessor, toBeStoppedInstances); + // Since we presume h2 as being already stopped, the health status of p2 on h2 will be skipped. + Assert.assertEquals(failedPartitions.keySet().size(), 0); + } + private ExternalView prepareExternalView() { ExternalView externalView = new ExternalView(RESOURCE_NAME); externalView.getRecord() @@ -163,6 +211,22 @@ private ExternalView prepareExternalViewOffline() { return externalView; } + private ExternalView prepareExternalViewOnline() { + ExternalView externalView = new ExternalView(RESOURCE_NAME); + externalView.getRecord() + .setSimpleField(ExternalView.ExternalViewProperty.STATE_MODEL_DEF_REF.toString(), + MasterSlaveSMD.name); + externalView.setState("p1", "h1", "MASTER"); + externalView.setState("p1", "h2", "SLAVE"); + externalView.setState("p1", "h3", "SLAVE"); + + externalView.setState("p2", "h1", "MASTER"); + externalView.setState("p2", "h2", "SLAVE"); + externalView.setState("p2", "h3", "SLAVE"); + + return externalView; + } + private final class Mock { private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); private StateModelDefinition stateModel = mock(StateModelDefinition.class);