diff --git a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java index db943b3ddb..b09116ac99 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/client/CustomRestClientImpl.java @@ -132,8 +132,15 @@ public Map> getAggregatedStoppableCheck(String baseUrl, if (instances != null && !instances.isEmpty()) { payLoads.put("instances", instances); } - if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) { - payLoads.put("to_be_stopped_instances", toBeStoppedInstances); + // Before sending the request, make sure the toBeStoppedInstances has no overlap with instances + Set remainingToBeStoppedInstances = toBeStoppedInstances; + if (instances != null && toBeStoppedInstances != null) { + remainingToBeStoppedInstances = + toBeStoppedInstances.stream().filter(ins -> !instances.contains(ins)) + .collect(Collectors.toSet()); + } + if (remainingToBeStoppedInstances != null && !remainingToBeStoppedInstances.isEmpty()) { + payLoads.put("to_be_stopped_instances", remainingToBeStoppedInstances); } if (clusterId != null) { payLoads.put("cluster_id", clusterId); diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java index bb5a2bc5c4..8916991008 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -86,8 +86,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List instances, result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); ObjectNode failedStoppableInstances = result.putObject( InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); - Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); - collectEvacuatingInstances(toBeStoppedInstancesSet); + Set toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances); List zoneBasedInstance = getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); @@ -118,8 +117,7 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); ObjectNode failedStoppableInstances = result.putObject( InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); - Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); - collectEvacuatingInstances(toBeStoppedInstancesSet); + Set toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances); Map> zoneMapping = _clusterTopology.toZoneMapping(); for (String zone : _orderOfZone) { @@ -136,6 +134,39 @@ public ObjectNode getStoppableInstancesCrossZones(List instances, return result; } + /** + * Evaluates and collects stoppable instances not based on the zone order. + * The method iterates through instances, performing stoppable checks, and records reasons for + * non-stoppability. + * + * @param instances A list of instance to be evaluated. + * @param toBeStoppedInstances A list of instances presumed to be already stopped + * @return An ObjectNode containing: + * - 'stoppableNode': List of instances that can be stopped. + * - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and + * a list of reasons for non-stoppability as the value. + * @throws IOException + */ + public ObjectNode getStoppableInstancesNonZoneBased(List instances, + List toBeStoppedInstances) throws IOException { + ObjectNode result = JsonNodeFactory.instance.objectNode(); + ArrayNode stoppableInstances = + result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + ObjectNode failedStoppableInstances = result.putObject( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Set toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances); + + // Because zone order calculation is omitted, we must verify each instance's existence + // to ensure we only process valid instances before performing stoppable check. + Set nonExistingInstances = processNonexistentInstances(instances, failedStoppableInstances); + List instancesToCheck = new ArrayList<>(instances); + instancesToCheck.removeAll(nonExistingInstances); + populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances, + failedStoppableInstances); + + return result; + } + private void populateStoppableInstances(List instances, Set toBeStoppedInstances, ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException { Map instancesStoppableChecks = @@ -159,7 +190,7 @@ private void populateStoppableInstances(List instances, Set toBe } } - private void processNonexistentInstances(List instances, ObjectNode failedStoppableInstances) { + private Set processNonexistentInstances(List instances, ObjectNode failedStoppableInstances) { // Adding following logic to check whether instances exist or not. An instance exist could be // checking following scenario: // 1. Instance got dropped. (InstanceConfig is gone.) @@ -174,6 +205,7 @@ private void processNonexistentInstances(List instances, ObjectNode fail ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance); failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST)); } + return nonSelectedInstances; } /** @@ -258,21 +290,26 @@ private Map> getOrderedZoneToInstancesMap( } /** - * Collect instances marked for evacuation in the current topology and add them into the given set + * Collect instances within the cluster where the instance operation is set to EVACUATE, SWAP_IN, or UNKNOWN. + * And return them as a set. * - * @param toBeStoppedInstances A set of instances we presume to be stopped. + * @param toBeStoppedInstances A list of instances we presume to be stopped. */ - private void collectEvacuatingInstances(Set toBeStoppedInstances) { + private Set findToBeStoppedInstances(List toBeStoppedInstances) { + Set toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances); Set allInstances = _clusterTopology.getAllInstances(); for (String instance : allInstances) { PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder(); InstanceConfig instanceConfig = _dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance)); - if (InstanceConstants.InstanceOperation.EVACUATE.equals( - instanceConfig.getInstanceOperation().getOperation())) { - toBeStoppedInstances.add(instance); + InstanceConstants.InstanceOperation operation = instanceConfig.getInstanceOperation().getOperation(); + if (operation == InstanceConstants.InstanceOperation.EVACUATE + || operation == InstanceConstants.InstanceOperation.SWAP_IN + || operation == InstanceConstants.InstanceOperation.UNKNOWN) { + toBeStoppedInstancesSet.add(instance); } } + return toBeStoppedInstancesSet; } public static class StoppableInstancesSelectorBuilder { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index d24ad9fcea..52313a66b0 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -82,7 +82,7 @@ public enum InstancesProperties { } public enum InstanceHealthSelectionBase { - instance_based, + non_zone_based, zone_based, cross_zone_based } @@ -224,12 +224,17 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo boolean random) throws IOException { try { // TODO: Process input data from the content + // TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799 InstancesAccessor.InstanceHealthSelectionBase selectionBase = - InstancesAccessor.InstanceHealthSelectionBase.valueOf( + node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null + ? InstanceHealthSelectionBase.non_zone_based : InstanceHealthSelectionBase.valueOf( node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue()); + List instances = OBJECT_MAPPER.readValue( node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(), OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + ClusterService clusterService = + new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); List orderOfZone = null; String customizedInput = null; @@ -252,6 +257,12 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo _logger.error(message); return badRequest(message); } + if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_zone_based) { + String message = + "'zone_order' is set but 'selection_base' is 'non_zone_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'."; + _logger.error(message); + return badRequest(message); + } } if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) { @@ -285,6 +296,33 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo } } + ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); + if (selectionBase != InstanceHealthSelectionBase.non_zone_based) { + if (!clusterService.isClusterTopologyAware(clusterId)) { + String message = "Cluster " + clusterId + + " is not topology aware. Please enable the topology in cluster config or set " + + "'selection_base' to 'non_zone_based'."; + _logger.error(message); + return badRequest(message); + } + + // Find instances that lack topology information + Set instancesWithTopology = + clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry -> entry.getValue().stream()) + .collect(Collectors.toSet()); + Set allInstances = clusterTopology.getAllInstances(); + Set topologyUnawareInstances = new HashSet<>(instances).stream().filter( + instance -> !instancesWithTopology.contains(instance) && allInstances.contains(instance)) + .collect(Collectors.toSet()); + if (!topologyUnawareInstances.isEmpty()) { + String message = "Instances " + topologyUnawareInstances + + " do not have topology information. Please set topology information in instance config or" + + " set 'selection_base' to 'non_zone_based'."; + _logger.error(message); + return badRequest(message); + } + } + String namespace = getNamespace(); MaintenanceManagementService maintenanceService = new MaintenanceManagementService.MaintenanceManagementServiceBuilder() @@ -299,9 +337,6 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setSkipStoppableHealthCheckList(skipStoppableCheckList) .build(); - ClusterService clusterService = - new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); - ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); StoppableInstancesSelector stoppableInstancesSelector = new StoppableInstancesSelector.StoppableInstancesSelectorBuilder() .setClusterId(clusterId) @@ -311,18 +346,20 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo .setClusterTopology(clusterTopology) .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId)) .build(); - stoppableInstancesSelector.calculateOrderOfZone(instances, random); ObjectNode result; - // TODO: Add support for clusters that do not have topology set up. - // Issue #2893: https://github.com/apache/helix/issues/2893 + switch (selectionBase) { case zone_based: + stoppableInstancesSelector.calculateOrderOfZone(instances, random); result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances); break; case cross_zone_based: + stoppableInstancesSelector.calculateOrderOfZone(instances, random); result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances); break; - case instance_based: + case non_zone_based: + result = stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, toBeStoppedInstances); + break; default: throw new UnsupportedOperationException("instance_based selection is not supported yet!"); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java index d789e36159..db93571e85 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterService.java @@ -41,4 +41,11 @@ public interface ClusterService { * @return */ ClusterInfo getClusterInfo(String clusterId); + + /** + * Check if the cluster is topology aware + * @param clusterId + * @return + */ + boolean isClusterTopologyAware(String clusterId); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java index b4667fb13a..a152c3e647 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java @@ -25,10 +25,12 @@ import java.util.Map; import java.util.stream.Collectors; +import io.netty.util.internal.StringUtil; import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.rest.server.json.cluster.ClusterInfo; @@ -102,4 +104,11 @@ public ClusterInfo getClusterInfo(String clusterId) { .instances(_dataAccessor.getChildNames(keyBuilder.instances())) .liveInstances(_dataAccessor.getChildNames(keyBuilder.liveInstances())).build(); } + + @Override + public boolean isClusterTopologyAware(String clusterId) { + ClusterConfig config = _configAccessor.getClusterConfig(clusterId); + return config.isTopologyAwareEnabled() && !StringUtil.isNullOrEmpty(config.getFaultZoneType()) + && !StringUtil.isNullOrEmpty(config.getTopology()); + } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java index 3d81dc432a..6c8f4be403 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/client/TestCustomRestClient.java @@ -40,14 +40,18 @@ import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.junit.Assert; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestCustomRestClient { @@ -260,4 +264,39 @@ public void testGetAggregatedStoppableCheck() throws IOException { Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance -> clusterHealth.get(instance).isEmpty())); Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance -> clusterHealth.get(instance).isEmpty())); } + + @Test(description = "Test if the aggregated stoppable check request has the correct format when there" + + "are duplicate instances in the instances list and the toBeStoppedInstances list.") + public void testAggregatedCheckRemoveDuplicateInstances() + throws IOException { + String clusterId = "cluster1"; + + MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient); + HttpResponse httpResponse = mock(HttpResponse.class); + StatusLine statusLine = mock(StatusLine.class); + + when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse); + + customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST, + ImmutableList.of("n1", "n2"), + ImmutableSet.of("n1"), clusterId, Collections.emptyMap()); + + // Make sure that the duplicate instances are removed from the toBeStoppedInstances list + ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + verify(_httpClient).execute(argThat(x -> { + String request = null; + try { + request = EntityUtils.toString(((HttpPost) x).getEntity()); + JsonNode node = OBJECT_MAPPER.readTree(request); + String instancesInRequest = node.get("instances").toString(); + Assert.assertEquals(instancesInRequest, "[\"n1\",\"n2\"]"); + Assert.assertNull(node.get("to_be_stopped_instances")); + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index ad7c4482f3..ee2346d2d4 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -132,6 +132,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static BaseDataAccessor _baseAccessorTestNS; protected static final String STOPPABLE_CLUSTER = "StoppableTestCluster"; protected static final String STOPPABLE_CLUSTER2 = "StoppableTestCluster2"; + protected static final String STOPPABLE_CLUSTER3 = "StoppableTestCluster3"; protected static final String TASK_TEST_CLUSTER = "TaskTestCluster"; protected static final List STOPPABLE_INSTANCES = Arrays.asList("instance0", "instance1", "instance2", "instance3", "instance4", "instance5"); @@ -343,6 +344,7 @@ protected void setupHelixResources() throws Exception { } preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); preSetupForCrosszoneParallelInstancesStoppableTest(STOPPABLE_CLUSTER2, STOPPABLE_INSTANCES2); + preSetupForNonTopoAwareInstancesStoppableTest(STOPPABLE_CLUSTER3, STOPPABLE_INSTANCES2); } protected Set createInstances(String cluster, int numInstances) { @@ -602,6 +604,8 @@ private void preSetupForParallelInstancesStoppableTest(String clusterName, _gSetupTool.addCluster(clusterName, true); ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setTopology("/helixZoneId/instance"); clusterConfig.setPersistIntermediateAssignment(true); _configAccessor.setClusterConfig(clusterName, clusterConfig); // Create instance configs @@ -659,6 +663,8 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa _gSetupTool.addCluster(clusterName, true); ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setTopology("/helixZoneId/instance"); clusterConfig.setPersistIntermediateAssignment(true); _configAccessor.setClusterConfig(clusterName, clusterConfig); // Create instance configs @@ -711,6 +717,62 @@ private void preSetupForCrosszoneParallelInstancesStoppableTest(String clusterNa _clusters.add(clusterName); _workflowMap.put(clusterName, createWorkflows(clusterName, 3)); } + + private void preSetupForNonTopoAwareInstancesStoppableTest(String clusterName, + List instances) throws Exception { + _gSetupTool.addCluster(clusterName, true); + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + clusterConfig.setFaultZoneType("helixZoneId"); + clusterConfig.setPersistIntermediateAssignment(true); + _configAccessor.setClusterConfig(clusterName, clusterConfig); + // Create instance configs that do not include the domain field + List instanceConfigs = new ArrayList<>(); + int perZoneInstancesCount = 3; + int curZoneCount = 0; + for (int i = 0; i < instances.size(); i++) { + InstanceConfig instanceConfig = new InstanceConfig(instances.get(i)); + if (++curZoneCount >= perZoneInstancesCount) { + curZoneCount = 0; + } + instanceConfigs.add(instanceConfig); + } + + for (InstanceConfig instanceConfig : instanceConfigs) { + _gSetupTool.getClusterManagementTool().addInstance(clusterName, instanceConfig); + } + + // Start participant + startInstances(clusterName, new TreeSet<>(instances), instances.size()); + createResources(clusterName, 1, 2, 3); + _clusterControllerManagers.add(startController(clusterName)); + + // Make sure that cluster config exists + boolean isClusterConfigExist = TestHelper.verify(() -> { + ClusterConfig stoppableClusterConfig; + try { + stoppableClusterConfig = _configAccessor.getClusterConfig(clusterName); + } catch (Exception e) { + return false; + } + return (stoppableClusterConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isClusterConfigExist); + // Make sure that instance config exists for the instance0 to instance5 + for (String instance: instances) { + boolean isinstanceConfigExist = TestHelper.verify(() -> { + InstanceConfig instanceConfig; + try { + instanceConfig = _configAccessor.getInstanceConfig(clusterName, instance); + } catch (Exception e) { + return false; + } + return (instanceConfig != null); + }, TestHelper.WAIT_DURATION); + Assert.assertTrue(isinstanceConfigExist); + } + _clusters.add(clusterName); + _workflowMap.put(clusterName, createWorkflows(clusterName, 3)); + } /** * Starts a HelixRestServer for the test suite. * @return diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java index e1bbe78696..1c9cbcb51b 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestInstancesAccessor.java @@ -30,6 +30,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -633,6 +634,137 @@ public void testSkipClusterLevelHealthCheck() throws IOException { System.out.println("End test :" + TestHelper.getTestMethodName()); } + @Test(dependsOnMethods = "testSkipClusterLevelHealthCheck") + public void testNonTopoAwareStoppableCheck() throws JsonProcessingException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.non_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE"); + + // Change instance config of instance1 & instance0 to be evacuating + String instance0 = "instance0"; + InstanceConfig instanceConfig = + _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance0); + instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0, instanceConfig); + String instance1 = "instance1"; + InstanceConfig instanceConfig1 = + _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER3, instance1); + instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1); + + // It takes time to reflect the changes. + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER3).post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + JsonNode jsonNode = OBJECT_MAPPER.readTree(response.readEntity(String.class)); + + Set stoppableSet = getStringSet(jsonNode, + InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name()); + Assert.assertTrue(stoppableSet.contains("instance12") && stoppableSet.contains("instance3") + && stoppableSet.contains("instance10")); + + JsonNode nonStoppableInstances = jsonNode.get( + InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name()); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance13"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "instance14"), + ImmutableSet.of("HELIX:MIN_ACTIVE_REPLICA_CHECK_FAILED")); + Assert.assertEquals(getStringSet(nonStoppableInstances, "invalidInstance"), + ImmutableSet.of("HELIX:INSTANCE_NOT_EXIST")); + instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance0, instanceConfig); + instanceConfig1.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testSkipClusterLevelHealthCheck") + public void testNonTopoAwareStoppableCheckWithException() throws JsonProcessingException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE"); + + // It takes time to reflect the changes. + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Making the REST Call to cross zone stoppable check while the cluster has no topology aware + // setup. The call should return an error. + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER3) + .isBodyReturnExpected(true) + .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()) + .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(description = "Test zone selection base with instance that don't have topology set in the config", + dependsOnMethods = "testNonTopoAwareStoppableCheckWithException") + public void testZoneSelectionBaseWithInstanceThatDontHaveTopologySet() { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + + // STOPPABLE_CLUSTER3 is a cluster is non topology aware cluster + String content = String.format( + "{\"%s\":\"%s\",\"%s\":[\"%s\",\"%s\",\"%s\",\"%s\", \"%s\", \"%s\", \"%s\",\"%s\", \"%s\", \"%s\"], \"%s\":[\"%s\", \"%s\"]}", + InstancesAccessor.InstancesProperties.selection_base.name(), + InstancesAccessor.InstanceHealthSelectionBase.cross_zone_based.name(), + InstancesAccessor.InstancesProperties.instances.name(), "instance1", "instance3", + "instance6", "instance9", "instance10", "instance11", "instance12", "instance13", + "instance14", "invalidInstance", + InstancesAccessor.InstancesProperties.skip_stoppable_check_list.name(), "INSTANCE_NOT_ENABLED", "INSTANCE_NOT_STABLE"); + + String instance1 = "instance1"; + InstanceConfig instanceConfig1 = + _configAccessor.getInstanceConfig(STOPPABLE_CLUSTER2, instance1); + String domain = instanceConfig1.getDomainAsString(); + instanceConfig1.setDomain("FALSE_DOMAIN"); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER2, instance1, instanceConfig1); + + // It takes time to reflect the changes. + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(STOPPABLE_CLUSTER3).setZkAddr(ZK_ADDR).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + // Making the REST Call to cross zone stoppable check while the cluster has no topology aware + // setup. The call should return an error. + Response response = new JerseyUriRequestBuilder( + "clusters/{}/instances?command=stoppable&skipHealthCheckCategories=CUSTOM_INSTANCE_CHECK,CUSTOM_PARTITION_CHECK").format( + STOPPABLE_CLUSTER3) + .isBodyReturnExpected(true) + .expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()) + .post(this, Entity.entity(content, MediaType.APPLICATION_JSON_TYPE)); + + // Restore the changes on instance 1 + instanceConfig1.setDomain(domain); + _configAccessor.setInstanceConfig(STOPPABLE_CLUSTER3, instance1, instanceConfig1); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + private Set getStringSet(JsonNode jsonNode, String key) { Set result = new HashSet<>(); jsonNode.withArray(key).forEach(s -> result.add(s.textValue())); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java index 5786350d9f..23182d2bb6 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java @@ -96,6 +96,30 @@ public void testGetClusterTopology_whenZoneHasMultiInstances() { Assert.assertEquals(clusterTopology.getClusterId(), TEST_CLUSTER); } + @Test + public void testCheckTopologyAware() { + Mock mock = new Mock(); + Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER)); + + ClusterConfig config = new ClusterConfig(TEST_CLUSTER); + config.setTopology("/zone"); + when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config); + Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER)); + + config = new ClusterConfig(TEST_CLUSTER); + config.setFaultZoneType("zone"); + config.setTopology("/zone"); + when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config); + Assert.assertFalse(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER)); + + config = new ClusterConfig(TEST_CLUSTER); + config.setFaultZoneType("zone"); + config.setTopology("/zone"); + config.setTopologyAwareEnabled(true); + when(mock.configAccessor.getClusterConfig(TEST_CLUSTER)).thenReturn(config); + Assert.assertTrue(mock.clusterService.isClusterTopologyAware(TEST_CLUSTER)); + } + private final class Mock { private HelixDataAccessor dataAccessor = mock(HelixDataAccessor.class); private ConfigAccessor configAccessor = mock(ConfigAccessor.class);