diff --git a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java index fdbf7dd1a0..e34e9e6fb4 100644 --- a/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java @@ -370,7 +370,8 @@ public static boolean isInstanceStable(HelixDataAccessor dataAccessor, String in * @param instanceName * @return */ - public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, String instanceName) { + public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor, + String instanceName, Set toBeStoppedInstances) { PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder(); List resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates()); @@ -406,8 +407,8 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces if (stateByInstanceMap.containsKey(instanceName)) { int numHealthySiblings = 0; for (Map.Entry entry : stateByInstanceMap.entrySet()) { - if (!entry.getKey().equals(instanceName) - && !unhealthyStates.contains(entry.getValue())) { + if (!entry.getKey().equals(instanceName) && !toBeStoppedInstances.contains( + entry.getKey()) && !unhealthyStates.contains(entry.getValue())) { numHealthySiblings++; } } diff --git a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java index 2c51fc92b2..8e1a849744 100644 --- a/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java +++ b/helix-core/src/test/java/org/apache/helix/util/TestInstanceValidationUtil.java @@ -21,7 +21,9 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -396,11 +398,83 @@ public void TestSiblingNodesActiveReplicaCheck_success() { .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); boolean result = - InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet()); Assert.assertTrue(result); } + @Test + public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor) + .getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + // set ideal state + IdealState idealState = mock(IdealState.class); + when(idealState.isEnabled()).thenReturn(true); + when(idealState.isValid()).thenReturn(true); + when(idealState.getStateModelDefRef()).thenReturn("MasterSlave"); + doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + + // set external view + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(2); + when(externalView.getStateModelDefRef()).thenReturn("MasterSlave"); + when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); + when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master", + "instance1", "Slave", "instance2", "Slave", "instance3", "Slave")); + doReturn(externalView).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class); + when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE"); + doReturn(stateModelDefinition).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); + + Set toBeStoppedInstances = new HashSet<>(); + toBeStoppedInstances.add("instance3"); + toBeStoppedInstances.add("invalidInstances"); // include an invalid instance. + boolean result = + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances); + + Assert.assertTrue(result); + } + + @Test + public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() { + String resource = "resource"; + Mock mock = new Mock(); + doReturn(ImmutableList.of(resource)).when(mock.dataAccessor) + .getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + // set ideal state + IdealState idealState = mock(IdealState.class); + when(idealState.isEnabled()).thenReturn(true); + when(idealState.isValid()).thenReturn(true); + when(idealState.getStateModelDefRef()).thenReturn("MasterSlave"); + doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES))); + + // set external view + ExternalView externalView = mock(ExternalView.class); + when(externalView.getMinActiveReplicas()).thenReturn(2); + when(externalView.getStateModelDefRef()).thenReturn("MasterSlave"); + when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0")); + when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master", + "instance1", "Slave", "instance2", "Slave", "instance3", "Slave")); + doReturn(externalView).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); + StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class); + when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE"); + doReturn(stateModelDefinition).when(mock.dataAccessor) + .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); + + Set toBeStoppedInstances = new HashSet<>(); + toBeStoppedInstances.add("instance1"); + toBeStoppedInstances.add("instance2"); + boolean result = + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances); + + Assert.assertFalse(result); + } + @Test public void TestSiblingNodesActiveReplicaCheck_fail() { String resource = "resource"; @@ -428,7 +502,7 @@ public void TestSiblingNodesActiveReplicaCheck_fail() { .getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS))); boolean result = - InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet()); Assert.assertFalse(result); } @@ -452,7 +526,7 @@ public void TestSiblingNodesActiveReplicaCheck_whenNoMinActiveReplica() { doReturn(externalView).when(mock.dataAccessor) .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); - boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + boolean result = InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet()); Assert.assertTrue(result); } @@ -472,7 +546,7 @@ public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavail doReturn(null).when(mock.dataAccessor) .getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW))); - InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE); + InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, Collections.emptySet()); } private class Mock { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java index 529fc469d4..181cd03dab 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/MaintenanceManagementService.java @@ -296,7 +296,7 @@ public InstanceInfo getInstanceHealthInfo(String clusterId, String instanceName, } try { Map healthStatus = - getInstanceHealthStatus(clusterId, instanceName, healthChecks); + getInstanceHealthStatus(clusterId, instanceName, healthChecks, Collections.emptySet()); instanceInfoBuilder.healthStatus(healthStatus); } catch (HelixException ex) { LOG.error( @@ -328,7 +328,7 @@ private List getAllOperationClasses(List operations) /** * {@inheritDoc} * Single instance stoppable check implementation is a special case of - * {@link #batchGetInstancesStoppableChecks(String, List, String)} + * {@link #batchGetInstancesStoppableChecks(String, List, String, Set)} *

* Step 1: Perform instance level Helix own health checks * Step 2: Perform instance level client side health checks @@ -339,17 +339,17 @@ private List getAllOperationClasses(List operations) */ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanceName, String jsonContent) throws IOException { - return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent) + return batchGetInstancesStoppableChecks(clusterId, ImmutableList.of(instanceName), jsonContent, Collections.emptySet()) .get(instanceName); } public Map batchGetInstancesStoppableChecks(String clusterId, - List instances, String jsonContent) throws IOException { + List instances, String jsonContent, Set toBeStoppedInstances) throws IOException { Map finalStoppableChecks = new HashMap<>(); // helix instance check. List instancesForCustomInstanceLevelChecks = - batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks); + batchHelixInstanceStoppableCheck(clusterId, instances, finalStoppableChecks, toBeStoppedInstances); // custom check, includes partition check. batchCustomInstanceStoppableCheck(clusterId, instancesForCustomInstanceLevelChecks, finalStoppableChecks, getMapFromJsonPayload(jsonContent)); @@ -441,10 +441,10 @@ private MaintenanceManagementInstanceInfo takeFreeSingleInstanceHelper(String cl } private List batchHelixInstanceStoppableCheck(String clusterId, - Collection instances, Map finalStoppableChecks) { + Collection instances, Map finalStoppableChecks, Set toBeStoppedInstances) { Map> helixInstanceChecks = instances.stream().collect(Collectors .toMap(Function.identity(), - instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance)))); + instance -> POOL.submit(() -> performHelixOwnInstanceCheck(clusterId, instance, toBeStoppedInstances)))); // finalStoppableChecks contains instances that does not pass this health check return filterInstancesForNextCheck(helixInstanceChecks, finalStoppableChecks); } @@ -512,7 +512,7 @@ private Map batchInstanceHealthCheck( if (healthCheck.equals(HELIX_INSTANCE_STOPPABLE_CHECK)) { // this is helix own check instancesForNext = - batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks); + batchHelixInstanceStoppableCheck(clusterId, instancesForNext, finalStoppableChecks, Collections.emptySet()); } else if (healthCheck.equals(HELIX_CUSTOM_STOPPABLE_CHECK)) { // custom check, includes custom Instance check and partition check. instancesForNext = @@ -601,10 +601,10 @@ private boolean isNonBlockingCheck(StoppableCheck stoppableCheck) { return true; } - private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName) { + private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String instanceName, Set toBeStoppedInstances) { LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName); Map helixStoppableCheck = - getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST); + getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST, toBeStoppedInstances); return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK); } @@ -697,7 +697,7 @@ public static boolean getBooleanFromJsonPayload(String jsonString) @VisibleForTesting protected Map getInstanceHealthStatus(String clusterId, String instanceName, - List healthChecks) { + List healthChecks, Set toBeStoppedInstances) { Map healthStatus = new HashMap<>(); for (HealthCheck healthCheck : healthChecks) { switch (healthCheck) { @@ -745,7 +745,7 @@ protected Map getInstanceHealthStatus(String clusterId, String break; case MIN_ACTIVE_REPLICA_CHECK_FAILED: healthStatus.put(HealthCheck.MIN_ACTIVE_REPLICA_CHECK_FAILED.name(), - InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName)); + InstanceValidationUtil.siblingNodesActiveReplicaCheck(_dataAccessor, instanceName, toBeStoppedInstances)); break; default: LOG.error("Unsupported health check: {}", healthCheck); diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index dafe1ab2d8..30f0a6bde4 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -33,7 +34,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.rest.server.json.instance.StoppableCheck; @@ -69,14 +69,47 @@ public StoppableInstancesSelector(String clusterId, List orderOfZone, * reasons for non-stoppability. * * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be are already stopped * @throws IOException */ - public void getStoppableInstancesInSingleZone(List instances) throws IOException { + public void getStoppableInstancesInSingleZone(List instances, + Set toBeStoppedInstances) throws IOException { List zoneBasedInstance = getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); + getStoppableInstances(zoneBasedInstance, toBeStoppedInstances); + processNonexistentInstances(instances); + } + + /** + * Evaluates and collects stoppable instances cross a set of zones based on the order of zones. + * The method iterates through instances, performing stoppable checks, and records reasons for + * non-stoppability. + * + * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be are already stopped + * @throws IOException + */ + public void getStoppableInstancesCrossZones(List instances, + Set toBeStoppedInstances) throws IOException { + Map> zoneMapping = _clusterTopology.toZoneMapping(); + for (String zone : _orderOfZone) { + Set instanceSet = new HashSet<>(instances); + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); + instanceSet.retainAll(currentZoneInstanceSet); + if (instanceSet.isEmpty()) { + continue; + } + getStoppableInstances(new ArrayList<>(instanceSet), toBeStoppedInstances); + } + processNonexistentInstances(instances); + } + + private void getStoppableInstances(List instances, Set toBeStoppedInstances) + throws IOException { Map instancesStoppableChecks = - _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, zoneBasedInstance, - _customizedInput); + _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, instances, + _customizedInput, toBeStoppedInstances); + for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { String instance = instanceStoppableCheck.getKey(); StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); @@ -87,8 +120,14 @@ public void getStoppableInstancesInSingleZone(List instances) throws IOE } } else { _stoppableInstances.add(instance); + // Update the toBeStoppedInstances set with the currently identified stoppable instance. + // This ensures that subsequent checks in other zones are aware of this instance's stoppable status. + toBeStoppedInstances.add(instance); } } + } + + private void processNonexistentInstances(List instances) { // Adding following logic to check whether instances exist or not. An instance exist could be // checking following scenario: // 1. Instance got dropped. (InstanceConfig is gone.) @@ -105,11 +144,6 @@ public void getStoppableInstancesInSingleZone(List instances) throws IOE } } - public void getStoppableInstancesCrossZones() { - // TODO: Add implementation to enable cross zone stoppable check. - throw new NotImplementedException("Not Implemented"); - } - /** * Determines the order of zones. If an order is provided by the user, it will be used directly. * Otherwise, zones will be ordered by their associated instance count in descending order. @@ -118,10 +152,22 @@ public void getStoppableInstancesCrossZones() { * * @param random Indicates whether to randomize the order of zones. */ - public void calculateOrderOfZone(boolean random) { + public void calculateOrderOfZone(List instances, boolean random) { if (_orderOfZone == null) { - _orderOfZone = - new ArrayList<>(getOrderedZoneToInstancesMap(_clusterTopology.toZoneMapping()).keySet()); + Map> zoneMapping = _clusterTopology.toZoneMapping(); + Map> zoneToInstancesMap = new HashMap<>(); + for (ClusterTopology.Zone zone : _clusterTopology.getZones()) { + Set instanceSet = new HashSet<>(instances); + // TODO: Use instance config from Helix-rest Cache to get the zone instead of reading the topology info + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone.getId())); + instanceSet.retainAll(currentZoneInstanceSet); + if (instanceSet.isEmpty()) { + continue; + } + zoneToInstancesMap.put(zone.getId(), instanceSet); + } + + _orderOfZone = new ArrayList<>(getOrderedZoneToInstancesMap(zoneToInstancesMap).keySet()); } if (_orderOfZone.isEmpty()) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 8a21202704..06d7ceabc7 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -20,7 +20,9 @@ */ import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -67,6 +69,7 @@ public enum InstancesProperties { disabled, selection_base, zone_order, + to_be_stopped_instances, customized_values, instance_stoppable_parallel, instance_not_stoppable_with_reasons @@ -224,6 +227,7 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo List orderOfZone = null; String customizedInput = null; + Set toBeStoppedInstances = Collections.emptySet(); if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) { customizedInput = node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString(); @@ -235,7 +239,22 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); if (!orderOfZone.isEmpty() && random) { String message = - "Both 'orderOfZone' and 'random' parameters are set. Please specify only one option."; + "Both 'zone_order' and 'random' parameters are set. Please specify only one option."; + _logger.error(message); + return badRequest(message); + } + } + + if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) { + toBeStoppedInstances = OBJECT_MAPPER.readValue( + node.get(InstancesProperties.to_be_stopped_instances.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(Set.class, String.class)); + Set instanceSet = new HashSet<>(instances); + instanceSet.retainAll(toBeStoppedInstances); + if (!instanceSet.isEmpty()) { + String message = + "'to_be_stopped_instances' and 'instances' have intersection: " + instanceSet + + ". Please make them mutually exclusive."; _logger.error(message); return badRequest(message); } @@ -265,13 +284,13 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setMaintenanceService(maintenanceService) .setClusterTopology(clusterTopology) .build(); - stoppableInstancesSelector.calculateOrderOfZone(random); + stoppableInstancesSelector.calculateOrderOfZone(instances, random); switch (selectionBase) { case zone_based: - stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances); + stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); break; case cross_zone_based: - stoppableInstancesSelector.getStoppableInstancesCrossZones(); + stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances); break; case instance_based: default: diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java index 59d1a37c0b..99cd6916c4 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java @@ -20,6 +20,7 @@ */ import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -90,7 +91,7 @@ public StoppableCheck getInstanceStoppableCheck(String clusterId, String instanc public Map batchGetInstancesStoppableChecks(String clusterId, List instances, String jsonContent) throws IOException { return _maintenanceManagementService - .batchGetInstancesStoppableChecks(clusterId, instances, jsonContent); + .batchGetInstancesStoppableChecks(clusterId, instances, jsonContent, Collections.emptySet()); } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java index f8408b0707..a49a95066f 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/clusterMaintenanceService/TestMaintenanceManagementService.java @@ -114,7 +114,7 @@ public MockMaintenanceManagementService(ZKHelixDataAccessor dataAccessor, @Override protected Map getInstanceHealthStatus(String clusterId, String instanceName, - List healthChecks) { + List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } } @@ -127,7 +127,7 @@ public void testGetInstanceStoppableCheckWhenHelixOwnCheckFail() throws IOExcept _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return failedCheck; } }; @@ -147,7 +147,7 @@ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckFail() throws IO _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } }; @@ -227,7 +227,7 @@ public void testGetInstanceStoppableCheckWhenCustomInstanceCheckDisabled() throw _customRestClient, false, false, new HashSet<>(Arrays.asList(StoppableCheck.Category.CUSTOM_INSTANCE_CHECK)), HelixRestNamespace.DEFAULT_NAMESPACE_NAME); - + StoppableCheck actual = service.getInstanceStoppableCheck(TEST_CLUSTER, TEST_INSTANCE, ""); List expectedFailedChecks = Arrays.asList( StoppableCheck.Category.CUSTOM_PARTITION_CHECK.getPrefix() @@ -246,7 +246,7 @@ public void testGetInstanceStoppableCheckConnectionRefused() throws IOException _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } }; @@ -365,7 +365,7 @@ public void testGetStoppableWithAllChecks() throws IOException { HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return instanceHealthFailedCheck; } }; @@ -393,7 +393,7 @@ public void testGetInstanceStoppableCheckWhenPartitionsCheckFail() throws IOExce _customRestClient, false, false, HelixRestNamespace.DEFAULT_NAMESPACE_NAME) { @Override protected Map getInstanceHealthStatus(String clusterId, - String instanceName, List healthChecks) { + String instanceName, List healthChecks, Set toBeStoppedInstances) { return Collections.emptyMap(); } }; diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index a7cf91c7b8..68561ce839 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -54,6 +54,7 @@ import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.RESTConfig; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.rest.common.ContextPropertyKeys; import org.apache.helix.rest.common.HelixRestNamespace; @@ -129,9 +130,14 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static HelixZkClient _gZkClientTestNS; protected static BaseDataAccessor _baseAccessorTestNS; protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster"; + protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2"; protected static final String TASK_TEST_CLUSTER = "TaskTestCluster"; protected static final List STOPPABLE_INSTANCES = Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); + protected static final List STOPPABLE_INSTANCES2 = + Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5", + "instance6", "instance7", "instance8", "instance9", "instance10", "instance11", + "instance12", "instance13", "instance14"); protected static Set _clusters; protected static String _superCluster = "superCluster"; @@ -329,13 +335,14 @@ protected void setupHelixResources() throws Exception { _configAccessor.setClusterConfig(cluster, clusterConfig); createResourceConfigs(cluster, 8); _workflowMap.put(cluster, createWorkflows(cluster, 3)); - Set resources = createResources(cluster, 8); + Set resources = createResources(cluster, 8, MIN_ACTIVE_REPLICA, NUM_REPLICA); _instancesMap.put(cluster, instances); _liveInstancesMap.put(cluster, liveInstances); _resourcesMap.put(cluster, resources); _clusterControllerManagers.add(startController(cluster)); } preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); + preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2); } protected Set createInstances(String cluster, int numInstances) { @@ -348,16 +355,17 @@ protected Set createInstances(String cluster, int numInstances) { return instances; } - protected Set createResources(String cluster, int numResources) { + protected Set createResources(String cluster, int numResources, int minActiveReplica, + int replicationFactor) { Set resources = new HashSet<>(); for (int i = 0; i < numResources; i++) { String resource = cluster + "_db_" + i; _gSetupTool.addResourceToCluster(cluster, resource, NUM_PARTITIONS, "MasterSlave"); IdealState idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(cluster, resource); - idealState.setMinActiveReplicas(MIN_ACTIVE_REPLICA); + idealState.setMinActiveReplicas(minActiveReplica); _gSetupTool.getClusterManagementTool().setResourceIdealState(cluster, resource, idealState); - _gSetupTool.rebalanceStorageCluster(cluster, resource, NUM_REPLICA); + _gSetupTool.rebalanceStorageCluster(cluster, resource, replicationFactor); resources.add(resource); } return resources; @@ -575,7 +583,7 @@ private void preSetupForParallelInstancesStoppableTest(String clusterName, // Start participant startInstances(clusterName, new TreeSet<>(instances), 3); - createResources(clusterName, 1); + createResources(clusterName, 1, MIN_ACTIVE_REPLICA, NUM_REPLICA); _clusterControllerManagers.add(startController(clusterName)); // Make sure that cluster config exists @@ -606,6 +614,65 @@ private void preSetupForParallelInstancesStoppableTest(String clusterName, _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3)); } + private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterName, + List instances) throws Exception { + _gSetupTool.addCluster(clusterName, true); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setPersistIntermediateAssignment(true); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + RESTConfig emptyRestConfig = new RESTConfig(clusterName); + _configAccessor.setRESTConfig(clusterName, emptyRestConfig); + // Create instance configs + List instanceConfigs = new ArrayList<>(); + int perZoneInstancesCount = 3; + int curZoneCount = 0, zoneId = 1; + for (int i = 0; i < instances.size(); i++) { + InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); + instanceConfig.setDomain("helixZoneId=zone" + zoneId + ",host=instance" + i); + if (++curZoneCount >= perZoneInstancesCount) { + curZoneCount = 0; + zoneId++; + } + instanceConfigs.add(instanceConfig); + } + + for (InstanceConfig instanceConfig : instanceConfigs) { + _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); + } + + // Start participant + startInstances(clusterName, new TreeSet<>(instances), instances.size()); + createResources(clusterName, 1, 2, 3); + _clusterControllerManagers.add(startController(clusterName)); + + // Make sure that cluster config exists + boolean isClusterConfigExist = TestHelper.verify(() -> { + ClusterConfig stoppableClusterConfig; + try { + stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName); + } catch (Exception e) { + return false; + } + return (stoppableClusterConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isClusterConfigExist); + // Make sure that instance config exists for the instance0 to instance5 + for (String instance: instances) { + boolean isinstanceConfigExist = TestHelper.verify(() -> { + InstanceConfig instanceConfig; + try { + instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance); + } catch (Exception e) { + return false; + } + return (instanceConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isinstanceConfigExist); + } + _clusters.add(clusterName); + _workflowMap.put(clusterName, createWorkflows(clusterName, 3)); + } /** * Starts a HelixRestServer for the test suite. * @return diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index 01701a4864..2bc539a4d4 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -40,12 +40,167 @@ import org.apache.helix.rest.server.util.JerseyUriRequestBuilder; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class TestInstancesAccessor extends AbstractTestClass { private final static String CLUSTER_NAME = "TestCluster_0"; + @DataProvider + public Object[][] generatePayloadCrossZoneStoppableCheckWithZoneOrder() { + return new Object[][]{ + {String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\"," + + " \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\", \"%s\"]," + + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance2", + "instance3", "instance4", "instance5", "instance6", "instance7", "instance8", + "instance9", "instance10", "instance11", "instance12", "instance13", "instance14", + "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(),"zone5", "zone4", "zone3", "zone2", + "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), + "instance0"), + }, + {String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"]," + + "\"%s\":[\"%s\", \"%s\", \"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone5", "zone4", "zone1", + "zone3", "zone2", InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), + "instance0", "invalidInstance1", "invalidInstance1"), + } + }; + } + + @Test + public void testInstanceStoppableZoneBasedWithToBeStoppedInstances() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\", \"%s\"], \"%s\":[\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", + "instance2", "instance3", "instance4", "instance5", "invalidInstance", + InstancesAccessor.InstancesProperties.zone_order.name(), "zone2", "zone1", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", "instance6", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance4") && stoppableSet.contains("instance3")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + // "StoppableTestCluster2_db_0_3" : { "instance0" : "MASTER", "instance13" : "SLAVE", "instance5" : "SLAVE"}. + // Since instance0 is to_be_stopped and MIN_ACTIVE_REPLICA is 2, instance5 is not stoppable. + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance5"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test + public void testInstanceStoppableZoneBasedWithoutZoneOrder() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\",\"%s\",\"%s\"], \"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance0", "instance1", + "instance2", "instance3", "instance4", "invalidInstance", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), + "instance7", "instance9", "instance10"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + // Without zone order, helix should pick the zone1 because it has higher instance count than zone2. + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance0") && stoppableSet.contains("instance1")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance2"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dataProvider = "generatePayloadCrossZoneStoppableCheckWithZoneOrder") + public void testCrossZoneStoppableWithZoneOrder(String content) throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance14") && stoppableSet.contains("instance12") + && stoppableSet.contains("instance11") && stoppableSet.contains("instance10")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + @Test + public void testCrossZoneStoppableWithoutZoneOrder() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"]," + + "\"%s\":[\"%s\", \"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.to_be_stopped_instances.name(), "instance0", + "invalidInstance1", "invalidInstance1"); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER2).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance14") && stoppableSet.contains("instance12") + && stoppableSet.contains("instance11") && stoppableSet.contains("instance10")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + + @Test(dependsOnMethods = "testInstanceStoppableZoneBasedWithToBeStoppedInstances") public void testInstanceStoppable_zoneBased_zoneOrder() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); // Select instances with zone based