From e88fec884aaa604f166560a651878b266224da5c Mon Sep 17 00:00:00 2001 From: Xiaxuan Gao <32374858+MarkGaox@users.noreply.github.com> Date: Tue, 31 Oct 2023 15:41:05 -0700 Subject: [PATCH] Implement the cross-zone-based stoppable check (#2680) Implement the cross-zone-based stoppable check and add to_be_stopped_instances query parameter to the stoppable check API --- .../helix/util/InstanceValidationUtil.java | 58 ++++++- .../util/TestInstanceValidationUtil.java | 77 +++++++++ .../MaintenanceManagementService.java | 40 +++-- .../StoppableInstancesSelector.java | 138 +++++++++++----- .../resources/helix/InstancesAccessor.java | 35 ++-- .../TestMaintenanceManagementService.java | 14 +- .../helix/rest/server/AbstractTestClass.java | 77 ++++++++- .../rest/server/TestInstancesAccessor.java | 155 ++++++++++++++++++ .../TestInstanceValidationUtilInRest.java | 64 ++++++++ 9 files changed, 576 insertions(+), 82 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java index fdbf7dd1a0..5f179e784e 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java @@ -20,6 +20,7 @@ */ import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -254,6 +255,28 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String public static Map> perPartitionHealthCheck(List externalViews, Map> globalPartitionHealthStatus, String instanceToBeStop, HelixDataAccessor dataAccessor) { + return perPartitionHealthCheck(externalViews, globalPartitionHealthStatus, instanceToBeStop, + dataAccessor, Collections.emptySet()); + } + + /** + * Get the problematic partitions on the to-be-stop instance + * Requirement: + * If the instance and the toBeStoppedInstances are stopped and the partitions on them are OFFLINE, + * the cluster still have enough "healthy" replicas on other sibling instances + * + * - sibling instances mean those who share the same partition (replicas) of the to-be-stop instance + * + * @param globalPartitionHealthStatus (instance => (partition name, health status)) + * @param instanceToBeStop The instance to be stopped + * @param dataAccessor The data accessor + * @param toBeStoppedInstances A set of instances presumed to be are already stopped. And it + * shouldn't contain the `instanceToBeStop` + * @return A list of problematic partitions if the instance is stopped + */ + public static Map> perPartitionHealthCheck(List externalViews, + Map> globalPartitionHealthStatus, String instanceToBeStop, + HelixDataAccessor dataAccessor, Set toBeStoppedInstances) { Map> unhealthyPartitions = new HashMap<>(); for (ExternalView externalView : externalViews) { @@ -273,7 +296,8 @@ public static Map> perPartitionHealthCheck(List toBeStoppedInstances) { PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder(); List resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates()); @@ -406,8 +451,9 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces if (stateByInstanceMap.containsKey(instanceName)) { int numHealthySiblings = 0; for (Map.Entry entry : stateByInstanceMap.entrySet()) { - if (!entry.getKey().equals(instanceName) - && !unhealthyStates.contains(entry.getValue())) { + if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances == null + || !toBeStoppedInstances.contains(entry.getKey())) && !unhealthyStates.contains( + entry.getValue())) { numHealthySiblings++; } } diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java index 2c51fc92b2..aa1ba32290 100644 --- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java +++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java @@ -21,7 +21,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -401,6 +403,81 @@ public void TestSiblingNodesActiveReplicaCheck_success() { Assert.assertTrue(result); } + @Test + public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor) + .getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + // set ideal state + IdealState idealState = mock(IdealState.class); + when(idealState.isEnabled()).thenReturn(true); + when(idealState.isValid()).thenReturn(true); + when(idealState.getStateModelDefRef()).thenReturn("MasterSlave"); + doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + + // set external view + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(2); + when(externalView.getStateModelDefRef()).thenReturn("MasterSlave"); + when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); + when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master", + "instance1", "Slave", "instance2", "Slave", "instance3", "Slave")); + doReturn(externalView).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class); + when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE"); + doReturn(stateModelDefinition).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); + + Set toBeStoppedInstances = new HashSet<>(); + toBeStoppedInstances.add("instance3"); + toBeStoppedInstances.add("invalidInstances"); // include an invalid instance. + boolean result = + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances); + Assert.assertTrue(result); + + result = + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, null); + Assert.assertTrue(result); + } + + @Test + public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor) + .getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + // set ideal state + IdealState idealState = mock(IdealState.class); + when(idealState.isEnabled()).thenReturn(true); + when(idealState.isValid()).thenReturn(true); + when(idealState.getStateModelDefRef()).thenReturn("MasterSlave"); + doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + + // set external view + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(2); + when(externalView.getStateModelDefRef()).thenReturn("MasterSlave"); + when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); + when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master", + "instance1", "Slave", "instance2", "Slave", "instance3", "Slave")); + doReturn(externalView).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class); + when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE"); + doReturn(stateModelDefinition).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); + + Set toBeStoppedInstances = new HashSet<>(); + toBeStoppedInstances.add("instance1"); + toBeStoppedInstances.add("instance2"); + boolean result = + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances); + + Assert.assertFalse(result); + } + @Test public void TestSiblingNodesActiveReplicaCheck_fail() { String resource = "resource"; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index 529fc469d4..c3fa04966f 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -339,17 +339,23 @@ private List getAllOperationClasses(List operations) */ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent) throws IOException { - return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent) - .get(instanceName); + return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), + jsonContent).get(instanceName); } - public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent) throws IOException { + return batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, + Collections.emptySet()); + } + + public Map batchGetInstancesStoppableChecks(String clusterId, + List instances, String jsonContent, Set toBeStoppedInstances) throws IOException { Map finalStoppableChecks = new HashMap<>(); // helix instance check. List instancesForCustomInstanceLevelChecks = - batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks); + batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, + toBeStoppedInstances); // custom check, includes partition check. batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks, finalStoppableChecks, getMapFromJsonPayload(jsonContent)); @@ -441,10 +447,11 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl } private List batchHelixInstanceStoppableCheck(String clusterId, - Collection instances, Map finalStoppableChecks) { - Map> helixInstanceChecks = instances.stream().collect(Collectors - .toMap(Function.identity(), - instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance)))); + Collection instances, Map finalStoppableChecks, + Set toBeStoppedInstances) { + Map> helixInstanceChecks = instances.stream().collect( + Collectors.toMap(Function.identity(), instance -> POOL.submit( + () -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)))); // finalStoppableChecks contains instances that does not pass this health check return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks); } @@ -512,7 +519,8 @@ private Map batchInstanceHealthCheck( if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) { // this is helix own check instancesForNext = - batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks); + batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, + Collections.emptySet()); } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) { // custom check, includes custom Instance check and partition check. instancesForNext = @@ -601,10 +609,12 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) { return true; } - private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) { + private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, + Set toBeStoppedInstances) { LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName); Map helixStoppableCheck = - getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST); + getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, + toBeStoppedInstances); return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK); } @@ -698,6 +708,12 @@ public static boolean getBooleanFromJsonPayload(String jsonString) @VisibleForTesting protected Map getInstanceHealthStatus(String clusterId, String instanceName, List healthChecks) { + return getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet()); + } + + @VisibleForTesting + protected Map getInstanceHealthStatus(String clusterId, String instanceName, + List healthChecks, Set toBeStoppedInstances) { Map healthStatus = new HashMap<>(); for (HealthCheck healthCheck : healthChecks) { switch (healthCheck) { @@ -745,7 +761,7 @@ protected Map getInstanceHealthStatus(String clusterId, String break; case MIN_ACTIVE_REPLICA_CHECK_FAILED: healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(), - InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName)); + InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances)); break; default: LOG.error("Unsupported health check: {}", healthCheck); diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index dafe1ab2d8..8cf8bc83cb 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -33,31 +34,27 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.rest.server.json.instance.StoppableCheck; +import org.apache.helix.rest.server.resources.helix.InstancesAccessor; public class StoppableInstancesSelector { // This type does not belong to real HealthCheck failed reason. Also, if we add this type // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl // loops all the types to do corresponding checks. private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST"; - private String _clusterId; + private final String _clusterId; private List _orderOfZone; - private String _customizedInput; - private ArrayNode _stoppableInstances; - private ObjectNode _failedStoppableInstances; - private MaintenanceManagementService _maintenanceService; - private ClusterTopology _clusterTopology; + private final String _customizedInput; + private final MaintenanceManagementService _maintenanceService; + private final ClusterTopology _clusterTopology; public StoppableInstancesSelector(String clusterId, List orderOfZone, - String customizedInput, ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, - MaintenanceManagementService maintenanceService, ClusterTopology clusterTopology) { + String customizedInput, MaintenanceManagementService maintenanceService, + ClusterTopology clusterTopology) { _clusterId = clusterId; _orderOfZone = orderOfZone; _customizedInput = customizedInput; - _stoppableInstances = stoppableInstances; - _failedStoppableInstances = failedStoppableInstances; _maintenanceService = maintenanceService; _clusterTopology = clusterTopology; } @@ -69,26 +66,92 @@ public StoppableInstancesSelector(String clusterId, List orderOfZone, * reasons for non-stoppability. * * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be are already stopped + * @return An ObjectNode containing: + * - 'stoppableNode': List of instances that can be stopped. + * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and + * a list of reasons for non-stoppability as the value. * @throws IOException */ - public void getStoppableInstancesInSingleZone(List instances) throws IOException { + public ObjectNode getStoppableInstancesInSingleZone(List instances, + List toBeStoppedInstances) throws IOException { + ObjectNode result = JsonNodeFactory.instance.objectNode(); + ArrayNode stoppableInstances = + result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + ObjectNode failedStoppableInstances = result.putObject( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); + List zoneBasedInstance = getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); + populateStoppableInstances(zoneBasedInstance, toBeStoppedInstancesSet, stoppableInstances, + failedStoppableInstances); + processNonexistentInstances(instances, failedStoppableInstances); + + return result; + } + + /** + * Evaluates and collects stoppable instances cross a set of zones based on the order of zones. + * The method iterates through instances, performing stoppable checks, and records reasons for + * non-stoppability. + * + * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be are already stopped + * @return An ObjectNode containing: + * - 'stoppableNode': List of instances that can be stopped. + * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and + * a list of reasons for non-stoppability as the value. + * @throws IOException + */ + public ObjectNode getStoppableInstancesCrossZones(List instances, + List toBeStoppedInstances) throws IOException { + ObjectNode result = JsonNodeFactory.instance.objectNode(); + ArrayNode stoppableInstances = + result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + ObjectNode failedStoppableInstances = result.putObject( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); + + Map> zoneMapping = _clusterTopology.toZoneMapping(); + for (String zone : _orderOfZone) { + Set instanceSet = new HashSet<>(instances); + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); + instanceSet.retainAll(currentZoneInstanceSet); + if (instanceSet.isEmpty()) { + continue; + } + populateStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstancesSet, stoppableInstances, + failedStoppableInstances); + } + processNonexistentInstances(instances, failedStoppableInstances); + return result; + } + + private void populateStoppableInstances(List instances, Set toBeStoppedInstances, + ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException { Map instancesStoppableChecks = - _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, zoneBasedInstance, - _customizedInput); + _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, + _customizedInput, toBeStoppedInstances); + for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { String instance = instanceStoppableCheck.getKey(); StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); if (!stoppableCheck.isStoppable()) { - ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(instance); + ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); for (String failedReason : stoppableCheck.getFailedChecks()) { failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); } } else { - _stoppableInstances.add(instance); + stoppableInstances.add(instance); + // Update the toBeStoppedInstances set with the currently identified stoppable instance. + // This ensures that subsequent checks in other zones are aware of this instance's stoppable status. + toBeStoppedInstances.add(instance); } } + } + + private void processNonexistentInstances(List instances, ObjectNode failedStoppableInstances) { // Adding following logic to check whether instances exist or not. An instance exist could be // checking following scenario: // 1. Instance got dropped. (InstanceConfig is gone.) @@ -100,28 +163,36 @@ public void getStoppableInstancesInSingleZone(List instances) throws IOE Set nonSelectedInstances = new HashSet<>(instances); nonSelectedInstances.removeAll(_clusterTopology.getAllInstances()); for (String nonSelectedInstance : nonSelectedInstances) { - ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(nonSelectedInstance); + ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance); failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST)); } } - public void getStoppableInstancesCrossZones() { - // TODO: Add implementation to enable cross zone stoppable check. - throw new NotImplementedException("Not Implemented"); - } - /** * Determines the order of zones. If an order is provided by the user, it will be used directly. * Otherwise, zones will be ordered by their associated instance count in descending order. * * If `random` is true, the order of zones will be randomized regardless of any previous order. * + * @param instances A list of instance to be used to calculate the order of zones. * @param random Indicates whether to randomize the order of zones. */ - public void calculateOrderOfZone(boolean random) { + public void calculateOrderOfZone(List instances, boolean random) { if (_orderOfZone == null) { - _orderOfZone = - new ArrayList<>(getOrderedZoneToInstancesMap(_clusterTopology.toZoneMapping()).keySet()); + Map> zoneMapping = _clusterTopology.toZoneMapping(); + Map> zoneToInstancesMap = new HashMap<>(); + for (ClusterTopology.Zone zone : _clusterTopology.getZones()) { + Set instanceSet = new HashSet<>(instances); + // TODO: Use instance config from Helix-rest Cache to get the zone instead of reading the topology info + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone.getId())); + instanceSet.retainAll(currentZoneInstanceSet); + if (instanceSet.isEmpty()) { + continue; + } + zoneToInstancesMap.put(zone.getId(), instanceSet); + } + + _orderOfZone = new ArrayList<>(getOrderedZoneToInstancesMap(zoneToInstancesMap).keySet()); } if (_orderOfZone.isEmpty()) { @@ -182,8 +253,6 @@ public static class StoppableInstancesSelectorBuilder { private String _clusterId; private List _orderOfZone; private String _customizedInput; - private ArrayNode _stoppableInstances; - private ObjectNode _failedStoppableInstances; private MaintenanceManagementService _maintenanceService; private ClusterTopology _clusterTopology; @@ -202,16 +271,6 @@ public StoppableInstancesSelectorBuilder setCustomizedInput(String customizedInp return this; } - public StoppableInstancesSelectorBuilder setStoppableInstances(ArrayNode stoppableInstances) { - _stoppableInstances = stoppableInstances; - return this; - } - - public StoppableInstancesSelectorBuilder setFailedStoppableInstances(ObjectNode failedStoppableInstances) { - _failedStoppableInstances = failedStoppableInstances; - return this; - } - public StoppableInstancesSelectorBuilder setMaintenanceService( MaintenanceManagementService maintenanceService) { _maintenanceService = maintenanceService; @@ -224,9 +283,8 @@ public StoppableInstancesSelectorBuilder setClusterTopology(ClusterTopology clus } public StoppableInstancesSelector build() { - return new StoppableInstancesSelector(_clusterId, _orderOfZone, - _customizedInput, _stoppableInstances, _failedStoppableInstances, _maintenanceService, - _clusterTopology); + return new StoppableInstancesSelector(_clusterId, _orderOfZone, _customizedInput, + _maintenanceService, _clusterTopology); } } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 8a21202704..785195ebe1 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -20,7 +20,9 @@ */ import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,6 +69,7 @@ public enum InstancesProperties { disabled, selection_base, zone_order, + to_be_stopped_instances, customized_values, instance_stoppable_parallel, instance_not_stoppable_with_reasons @@ -224,6 +227,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo List orderOfZone = null; String customizedInput = null; + List toBeStoppedInstances = Collections.emptyList(); if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) { customizedInput = node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString(); @@ -235,18 +239,26 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); if (!orderOfZone.isEmpty() && random) { String message = - "Both 'orderOfZone' and 'random' parameters are set. Please specify only one option."; + "Both 'zone_order' and 'random' parameters are set. Please specify only one option."; _logger.error(message); return badRequest(message); } } - // Prepare output result - ObjectNode result = JsonNodeFactory.instance.objectNode(); - ArrayNode stoppableInstances = - result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); - ObjectNode failedStoppableInstances = result.putObject( - InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) { + toBeStoppedInstances = OBJECT_MAPPER.readValue( + node.get(InstancesProperties.to_be_stopped_instances.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + Set instanceSet = new HashSet<>(instances); + instanceSet.retainAll(toBeStoppedInstances); + if (!instanceSet.isEmpty()) { + String message = + "'to_be_stopped_instances' and 'instances' have intersection: " + instanceSet + + ". Please make them mutually exclusive."; + _logger.error(message); + return badRequest(message); + } + } MaintenanceManagementService maintenanceService = new MaintenanceManagementService((ZKHelixDataAccessor) getDataAccssor(clusterId), @@ -260,18 +272,17 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setClusterId(clusterId) .setOrderOfZone(orderOfZone) .setCustomizedInput(customizedInput) - .setStoppableInstances(stoppableInstances) - .setFailedStoppableInstances(failedStoppableInstances) .setMaintenanceService(maintenanceService) .setClusterTopology(clusterTopology) .build(); - stoppableInstancesSelector.calculateOrderOfZone(random); + stoppableInstancesSelector.calculateOrderOfZone(instances, random); + ObjectNode result; switch (selectionBase) { case zone_based: - stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances); + result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); break; case cross_zone_based: - stoppableInstancesSelector.getStoppableInstancesCrossZones(); + result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances); break; case instance_based: default: diff --git a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java index f8408b0707..a49a95066f 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java @@ -114,7 +114,7 @@ public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor, @Override protected Map getInstanceHealthStatus(String clusterId, String instanceName, - List healthChecks) { + List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } } @@ -127,7 +127,7 @@ public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws IOExcept _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return failedCheck; } }; @@ -147,7 +147,7 @@ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail() throws IO _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } }; @@ -227,7 +227,7 @@ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckDisabled() throw _customRestClient, false, false, new HashSet<>(Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)), HelixRestNamespace.DEFAULT_NAMESPACE_NAME); - + StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, ""); List expectedFailedChecks = Arrays.asList( StoppableCheck.Category.CUSTOM_PARTITION_CHECK.getPrefix() @@ -246,7 +246,7 @@ public void testGetInstanceStoppableCheckConnectionRefused() throws IOException _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } }; @@ -365,7 +365,7 @@ public void testGetStoppableWithAllChecks() throws IOException { HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return instanceHealthFailedCheck; } }; @@ -393,7 +393,7 @@ public void testGetInstanceStoppableCheckWhenPartitionsCheckFail() throws IOExce _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } }; diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index a7cf91c7b8..68561ce839 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -54,6 +54,7 @@ import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.RESTConfig; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.rest.common.ContextPropertyKeys; import org.apache.helix.rest.common.HelixRestNamespace; @@ -129,9 +130,14 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static HelixZkClient _gZkClientTestNS; protected static BaseDataAccessor _baseAccessorTestNS; protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster"; + protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2"; protected static final String TASK_TEST_CLUSTER = "TaskTestCluster"; protected static final List STOPPABLE_INSTANCES = Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); + protected static final List STOPPABLE_INSTANCES2 = + Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5", + "instance6", "instance7", "instance8", "instance9", "instance10", "instance11", + "instance12", "instance13", "instance14"); protected static Set _clusters; protected static String _superCluster = "superCluster"; @@ -329,13 +335,14 @@ protected void setupHelixResources() throws Exception { _configAccessor.setClusterConfig(cluster, clusterConfig); createResourceConfigs(cluster, 8); _workflowMap.put(cluster, createWorkflows(cluster, 3)); - Set resources = createResources(cluster, 8); + Set resources = createResources(cluster, 8, MIN_ACTIVE_REPLICA, NUM_REPLICA); _instancesMap.put(cluster, instances); _liveInstancesMap.put(cluster, liveInstances); _resourcesMap.put(cluster, resources); _clusterControllerManagers.add(startController(cluster)); } preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); + preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2); } protected Set createInstances(String cluster, int numInstances) { @@ -348,16 +355,17 @@ protected Set createInstances(String cluster, int numInstances) { return instances; } - protected Set createResources(String cluster, int numResources) { + protected Set createResources(String cluster, int numResources, int minActiveReplica, + int replicationFactor) { Set resources = new HashSet<>(); for (int i = 0; i < numResources; i++) { String resource = cluster + "_db_" + i; _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave"); IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource); - idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA); + idealState.setMinActiveReplicas(minActiveReplica); _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState); - _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA); + _gSetupTool.rebalanceStorageCluster(cluster, resource, replicationFactor); resources.add(resource); } return resources; @@ -575,7 +583,7 @@ private void preSetupForParallelInstancesStoppableTest(String clusterName, // Start participant startInstances(clusterName, new TreeSet<>(instances), 3); - createResources(clusterName, 1); + createResources(clusterName, 1, MIN_ACTIVE_REPLICA, NUM_REPLICA); _clusterControllerManagers.add(startController(clusterName)); // Make sure that cluster config exists @@ -606,6 +614,65 @@ private void preSetupForParallelInstancesStoppableTest(String clusterName, _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3)); } + private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterName, + List instances) throws Exception { + _gSetupTool.addCluster(clusterName, true); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setPersistIntermediateAssignment(true); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + RESTConfig emptyRestConfig = new RESTConfig(clusterName); + _configAccessor.setRESTConfig(clusterName, emptyRestConfig); + // Create instance configs + List instanceConfigs = new ArrayList<>(); + int perZoneInstancesCount = 3; + int curZoneCount = 0, zoneId = 1; + for (int i = 0; i < instances.size(); i++) { + InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); + instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance" + i); + if (++curZoneCount >= perZoneInstancesCount) { + curZoneCount = 0; + zoneId++; + } + instanceConfigs.add(instanceConfig); + } + + for (InstanceConfig instanceConfig : instanceConfigs) { + _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); + } + + // Start participant + startInstances(clusterName, new TreeSet<>(instances), instances.size()); + createResources(clusterName, 1, 2, 3); + _clusterControllerManagers.add(startController(clusterName)); + + // Make sure that cluster config exists + boolean isClusterConfigExist = TestHelper.verify(() -> { + ClusterConfig stoppableClusterConfig; + try { + stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName); + } catch (Exception e) { + return false; + } + return (stoppableClusterConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isClusterConfigExist); + // Make sure that instance config exists for the instance0 to instance5 + for (String instance: instances) { + boolean isinstanceConfigExist = TestHelper.verify(() -> { + InstanceConfig instanceConfig; + try { + instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance); + } catch (Exception e) { + return false; + } + return (instanceConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isinstanceConfigExist); + } + _clusters.add(clusterName); + _workflowMap.put(clusterName, createWorkflows(clusterName, 3)); + } /** * Starts a HelixRestServer for the test suite. * @return diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 01701a4864..2bc539a4d4 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -40,12 +40,167 @@ import org.apache.helix.rest.server.util.JerseyUriRequestBuilder; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TestInstancesAccessor extends AbstractTestClass { private final static String CLUSTER_NAME = "TestCluster_0"; + @DataProvider + public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() { + return new Object[][]{ + {String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\"," + + " \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\"]," + + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", + "instance3", "instance4", "instance5", "instance6", "instance7", "instance8", + "instance9", "instance10", "instance11", "instance12", "instance13", "instance14", + "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(),"zone5", "zone4", "zone3", "zone2", + "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), + "instance0"), + }, + {String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"]," + + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone5", "zone4", "zone1", + "zone3", "zone2", InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), + "instance0", "invalidInstance1", "invalidInstance1"), + } + }; + } + + @Test + public void testInstanceStoppableZoneBasedWithToBeStoppedInstances() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", "instance6", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance4") && stoppableSet.contains("instance3")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test + public void testInstanceStoppableZoneBasedWithoutZoneOrder() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1", + "instance2", "instance3", "instance4", "invalidInstance", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), + "instance7", "instance9", "instance10"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + // Without zone order, helix should pick the zone1 because it has higher instance count than zone2. + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance0") && stoppableSet.contains("instance1")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dataProvider = "generatePayloadCrossZoneStoppableCheckWithZoneOrder") + public void testCrossZoneStoppableWithZoneOrder(String content) throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance14") && stoppableSet.contains("instance12") + && stoppableSet.contains("instance11") && stoppableSet.contains("instance10")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + @Test + public void testCrossZoneStoppableWithoutZoneOrder() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"]," + + "\"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", + "invalidInstance1", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance14") && stoppableSet.contains("instance12") + && stoppableSet.contains("instance11") && stoppableSet.contains("instance10")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + + @Test(dependsOnMethods = "testInstanceStoppableZoneBasedWithToBeStoppedInstances") public void testInstanceStoppable_zoneBased_zoneOrder() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); // Select instances with zone based diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java index 35e6399e0f..e37da34ff6 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/util/TestInstanceValidationUtilInRest.java @@ -22,8 +22,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; @@ -103,6 +105,52 @@ public void testPartitionLevelCheckInitState() { Assert.assertEquals(failedPartitions.keySet().size(), 2); } + @Test + public void testPartitionLevelCheckWithToBeStoppedNode() { + List externalViews = new ArrayList<>(Arrays.asList(prepareExternalViewOnline())); + Mock mock = new Mock(); + HelixDataAccessor accessor = mock.dataAccessor; + + when(mock.dataAccessor.keyBuilder()) + .thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); + when(mock.dataAccessor + .getProperty(new PropertyKey.Builder(TEST_CLUSTER).stateModelDef(MasterSlaveSMD.name))) + .thenReturn(mock.stateModel); + when(mock.stateModel.getTopState()).thenReturn("MASTER"); + when(mock.stateModel.getInitialState()).thenReturn("OFFLINE"); + + Map> partitionStateMap = new HashMap<>(); + partitionStateMap.put("h1", new HashMap<>()); + partitionStateMap.put("h2", new HashMap<>()); + partitionStateMap.put("h3", new HashMap<>()); + partitionStateMap.put("h4", new HashMap<>()); + + partitionStateMap.get("h1").put("p1", true); + partitionStateMap.get("h2").put("p1", true); + partitionStateMap.get("h3").put("p1", true); + partitionStateMap.get("h4").put("p1", true); + + partitionStateMap.get("h1").put("p2", true); + partitionStateMap.get("h2").put("p2", false); + partitionStateMap.get("h3").put("p2", true); + + Set toBeStoppedInstances = new HashSet<>(); + toBeStoppedInstances.add("h3"); + Map> failedPartitions = InstanceValidationUtil.perPartitionHealthCheck( + externalViews, partitionStateMap, "h1", accessor, toBeStoppedInstances); + Assert.assertEquals(failedPartitions.keySet().size(), 1); + Assert.assertEquals(failedPartitions.get("p2").size(), 1); + Assert.assertTrue(failedPartitions.get("p2").contains("UNHEALTHY_PARTITION")); + + toBeStoppedInstances.remove("h3"); + toBeStoppedInstances.add("h2"); + failedPartitions = + InstanceValidationUtil.perPartitionHealthCheck(externalViews, partitionStateMap, "h1", + accessor, toBeStoppedInstances); + // Since we presume h2 as being already stopped, the health status of p2 on h2 will be skipped. + Assert.assertEquals(failedPartitions.keySet().size(), 0); + } + private ExternalView prepareExternalView() { ExternalView externalView = new ExternalView(RESOURCE_NAME); externalView.getRecord() @@ -163,6 +211,22 @@ private ExternalView prepareExternalViewOffline() { return externalView; } + private ExternalView prepareExternalViewOnline() { + ExternalView externalView = new ExternalView(RESOURCE_NAME); + externalView.getRecord() + .setSimpleField(ExternalView.ExternalViewProperty.STATE_MODEL_DEF_REF.toString(), + MasterSlaveSMD.name); + externalView.setState("p1", "h1", "MASTER"); + externalView.setState("p1", "h2", "SLAVE"); + externalView.setState("p1", "h3", "SLAVE"); + + externalView.setState("p2", "h1", "MASTER"); + externalView.setState("p2", "h2", "SLAVE"); + externalView.setState("p2", "h3", "SLAVE"); + + return externalView; + } + private final class Mock { private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); private StateModelDefinition stateModel = mock(StateModelDefinition.class);