diff --git a/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java new file mode 100644 index 0000000000..dafe1ab2d8 --- /dev/null +++ b/helix-rest/src/main/java/org/apache/helix/rest/clusterMaintenanceService/StoppableInstancesSelector.java @@ -0,0 +1,232 @@ +package org.apache.helix.rest.clusterMaintenanceService; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.helix.rest.server.json.cluster.ClusterTopology; +import org.apache.helix.rest.server.json.instance.StoppableCheck; + +public class StoppableInstancesSelector { + // This type does not belong to real HealthCheck failed reason. Also, if we add this type + // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl + // loops all the types to do corresponding checks. + private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST"; + private String _clusterId; + private List _orderOfZone; + private String _customizedInput; + private ArrayNode _stoppableInstances; + private ObjectNode _failedStoppableInstances; + private MaintenanceManagementService _maintenanceService; + private ClusterTopology _clusterTopology; + + public StoppableInstancesSelector(String clusterId, List orderOfZone, + String customizedInput, ArrayNode stoppableInstances, ObjectNode failedStoppableInstances, + MaintenanceManagementService maintenanceService, ClusterTopology clusterTopology) { + _clusterId = clusterId; + _orderOfZone = orderOfZone; + _customizedInput = customizedInput; + _stoppableInstances = stoppableInstances; + _failedStoppableInstances = failedStoppableInstances; + _maintenanceService = maintenanceService; + _clusterTopology = clusterTopology; + } + + /** + * Evaluates and collects stoppable instances within a specified or determined zone based on the order of zones. + * If _orderOfZone is specified, the method targets the first non-empty zone; otherwise, it targets the zone with + * the highest instance count. The method iterates through instances, performing stoppable checks, and records + * reasons for non-stoppability. + * + * @param instances A list of instance to be evaluated. + * @throws IOException + */ + public void getStoppableInstancesInSingleZone(List instances) throws IOException { + List zoneBasedInstance = + getZoneBasedInstances(instances, _clusterTopology.toZoneMapping()); + Map instancesStoppableChecks = + _maintenanceService.batchGetInstancesStoppableChecks(_clusterId, zoneBasedInstance, + _customizedInput); + for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { + String instance = instanceStoppableCheck.getKey(); + StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); + if (!stoppableCheck.isStoppable()) { + ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(instance); + for (String failedReason : stoppableCheck.getFailedChecks()) { + failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); + } + } else { + _stoppableInstances.add(instance); + } + } + // Adding following logic to check whether instances exist or not. An instance exist could be + // checking following scenario: + // 1. Instance got dropped. (InstanceConfig is gone.) + // 2. Instance name has typo. + + // If we dont add this check, the instance, which does not exist, will be disappeared from + // result since Helix skips instances for instances not in the selected zone. User may get + // confused with the output. + Set nonSelectedInstances = new HashSet<>(instances); + nonSelectedInstances.removeAll(_clusterTopology.getAllInstances()); + for (String nonSelectedInstance : nonSelectedInstances) { + ArrayNode failedReasonsNode = _failedStoppableInstances.putArray(nonSelectedInstance); + failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST)); + } + } + + public void getStoppableInstancesCrossZones() { + // TODO: Add implementation to enable cross zone stoppable check. + throw new NotImplementedException("Not Implemented"); + } + + /** + * Determines the order of zones. If an order is provided by the user, it will be used directly. + * Otherwise, zones will be ordered by their associated instance count in descending order. + * + * If `random` is true, the order of zones will be randomized regardless of any previous order. + * + * @param random Indicates whether to randomize the order of zones. + */ + public void calculateOrderOfZone(boolean random) { + if (_orderOfZone == null) { + _orderOfZone = + new ArrayList<>(getOrderedZoneToInstancesMap(_clusterTopology.toZoneMapping()).keySet()); + } + + if (_orderOfZone.isEmpty()) { + return; + } + + if (random) { + Collections.shuffle(_orderOfZone); + } + } + + /** + * Get instances belongs to the first zone. If the zone is already empty, Helix will iterate zones + * by order until find the zone contains instances. + * + * The order of zones can directly come from user input. If user did not specify it, Helix will order + * zones by the number of associated instances in descending order. + * + * @param instances + * @param zoneMapping + * @return + */ + private List getZoneBasedInstances(List instances, + Map> zoneMapping) { + if (_orderOfZone.isEmpty()) { + return _orderOfZone; + } + + Set instanceSet = null; + for (String zone : _orderOfZone) { + instanceSet = new TreeSet<>(instances); + Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); + instanceSet.retainAll(currentZoneInstanceSet); + if (instanceSet.size() > 0) { + return new ArrayList<>(instanceSet); + } + } + + return Collections.EMPTY_LIST; + } + + /** + * Returns a map from zone to instances set, ordered by the number of instances in each zone + * in descending order. + * + * @param zoneMapping A map from zone to instances set + * @return An ordered map from zone to instances set, with zones having more instances appearing first. + */ + private Map> getOrderedZoneToInstancesMap( + Map> zoneMapping) { + return zoneMapping.entrySet().stream() + .sorted((e1, e2) -> e2.getValue().size() - e1.getValue().size()).collect( + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, + (existing, replacement) -> existing, LinkedHashMap::new)); + } + + public static class StoppableInstancesSelectorBuilder { + private String _clusterId; + private List _orderOfZone; + private String _customizedInput; + private ArrayNode _stoppableInstances; + private ObjectNode _failedStoppableInstances; + private MaintenanceManagementService _maintenanceService; + private ClusterTopology _clusterTopology; + + public StoppableInstancesSelectorBuilder setClusterId(String clusterId) { + _clusterId = clusterId; + return this; + } + + public StoppableInstancesSelectorBuilder setOrderOfZone(List orderOfZone) { + _orderOfZone = orderOfZone; + return this; + } + + public StoppableInstancesSelectorBuilder setCustomizedInput(String customizedInput) { + _customizedInput = customizedInput; + return this; + } + + public StoppableInstancesSelectorBuilder setStoppableInstances(ArrayNode stoppableInstances) { + _stoppableInstances = stoppableInstances; + return this; + } + + public StoppableInstancesSelectorBuilder setFailedStoppableInstances(ObjectNode failedStoppableInstances) { + _failedStoppableInstances = failedStoppableInstances; + return this; + } + + public StoppableInstancesSelectorBuilder setMaintenanceService( + MaintenanceManagementService maintenanceService) { + _maintenanceService = maintenanceService; + return this; + } + + public StoppableInstancesSelectorBuilder setClusterTopology(ClusterTopology clusterTopology) { + _clusterTopology = clusterTopology; + return this; + } + + public StoppableInstancesSelector build() { + return new StoppableInstancesSelector(_clusterId, _orderOfZone, + _customizedInput, _stoppableInstances, _failedStoppableInstances, _maintenanceService, + _clusterTopology); + } + } +} diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index 87be72b969..8a21202704 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -20,13 +20,10 @@ */ import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -49,6 +46,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.rest.clusterMaintenanceService.MaintenanceManagementService; import org.apache.helix.rest.common.HttpConstants; +import org.apache.helix.rest.clusterMaintenanceService.StoppableInstancesSelector; import org.apache.helix.rest.server.filters.ClusterAuth; import org.apache.helix.rest.server.json.cluster.ClusterTopology; import org.apache.helix.rest.server.json.instance.StoppableCheck; @@ -63,10 +61,6 @@ @Path("/clusters/{clusterId}/instances") public class InstancesAccessor extends AbstractHelixResource { private final static Logger _logger = LoggerFactory.getLogger(InstancesAccessor.class); - // This type does not belongs to real HealthCheck failed reason. Also if we add this type - // to HealthCheck enum, it could introduce more unnecessary check step since the InstanceServiceImpl - // loops all the types to do corresponding checks. - private final static String INSTANCE_NOT_EXIST = "HELIX:INSTANCE_NOT_EXIST"; public enum InstancesProperties { instances, online, @@ -80,7 +74,8 @@ public enum InstancesProperties { public enum InstanceHealthSelectionBase { instance_based, - zone_based + zone_based, + cross_zone_based } @ResponseMetered(name = HttpConstants.READ_REQUEST) @@ -153,7 +148,8 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, @QueryParam("command") String command, @QueryParam("continueOnFailures") boolean continueOnFailures, @QueryParam("skipZKRead") boolean skipZKRead, - @QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories, String content) { + @QueryParam("skipHealthCheckCategories") String skipHealthCheckCategories, + @DefaultValue("false") @QueryParam("random") boolean random, String content) { Command cmd; try { cmd = Command.valueOf(command); @@ -198,7 +194,7 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, break; case stoppable: return batchGetStoppableInstances(clusterId, node, skipZKRead, continueOnFailures, - skipHealthCheckCategorySet); + skipHealthCheckCategorySet, random); default: _logger.error("Unsupported command :" + command); return badRequest("Unsupported command :" + command); @@ -215,8 +211,8 @@ public Response instancesOperations(@PathParam("clusterId") String clusterId, } private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead, - boolean continueOnFailures, Set skipHealthCheckCategories) - throws IOException { + boolean continueOnFailures, Set skipHealthCheckCategories, + boolean random) throws IOException { try { // TODO: Process input data from the content InstancesAccessor.InstanceHealthSelectionBase selectionBase = @@ -237,6 +233,12 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo orderOfZone = OBJECT_MAPPER.readValue( node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(), OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class)); + if (!orderOfZone.isEmpty() && random) { + String message = + "Both 'orderOfZone' and 'random' parameters are set. Please specify only one option."; + _logger.error(message); + return badRequest(message); + } } // Prepare output result @@ -253,40 +255,23 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo ClusterService clusterService = new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor()); ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId); + StoppableInstancesSelector stoppableInstancesSelector = + new StoppableInstancesSelector.StoppableInstancesSelectorBuilder() + .setClusterId(clusterId) + .setOrderOfZone(orderOfZone) + .setCustomizedInput(customizedInput) + .setStoppableInstances(stoppableInstances) + .setFailedStoppableInstances(failedStoppableInstances) + .setMaintenanceService(maintenanceService) + .setClusterTopology(clusterTopology) + .build(); + stoppableInstancesSelector.calculateOrderOfZone(random); switch (selectionBase) { case zone_based: - List zoneBasedInstance = - getZoneBasedInstances(instances, orderOfZone, clusterTopology.toZoneMapping()); - Map instancesStoppableChecks = - maintenanceService.batchGetInstancesStoppableChecks(clusterId, zoneBasedInstance, - customizedInput); - for (Map.Entry instanceStoppableCheck : instancesStoppableChecks.entrySet()) { - String instance = instanceStoppableCheck.getKey(); - StoppableCheck stoppableCheck = instanceStoppableCheck.getValue(); - if (!stoppableCheck.isStoppable()) { - ArrayNode failedReasonsNode = failedStoppableInstances.putArray(instance); - for (String failedReason : stoppableCheck.getFailedChecks()) { - failedReasonsNode.add(JsonNodeFactory.instance.textNode(failedReason)); - } - } else { - stoppableInstances.add(instance); - } - } - // Adding following logic to check whether instances exist or not. An instance exist could be - // checking following scenario: - // 1. Instance got dropped. (InstanceConfig is gone.) - // 2. Instance name has typo. - - // If we dont add this check, the instance, which does not exist, will be disappeared from - // result since Helix skips instances for instances not in the selected zone. User may get - // confused with the output. - Set nonSelectedInstances = new HashSet<>(instances); - nonSelectedInstances.removeAll(clusterTopology.getAllInstances()); - for (String nonSelectedInstance : nonSelectedInstances) { - ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance); - failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST)); - } - + stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances); + break; + case cross_zone_based: + stoppableInstancesSelector.getStoppableInstancesCrossZones(); break; case instance_based: default: @@ -304,41 +289,4 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo throw e; } } - - /** - * Get instances belongs to the first zone. If the zone is already empty, Helix will iterate zones - * by order until find the zone contains instances. - * - * The order of zones can directly come from user input. If user did not specify it, Helix will order - * zones with alphabetical order. - * - * @param instances - * @param orderedZones - * @return - */ - private List getZoneBasedInstances(List instances, List orderedZones, - Map> zoneMapping) { - - // If the orderedZones is not specified, we will order all zones in alphabetical order. - if (orderedZones == null) { - orderedZones = new ArrayList<>(zoneMapping.keySet()); - Collections.sort(orderedZones); - } - - if (orderedZones.isEmpty()) { - return orderedZones; - } - - Set instanceSet = null; - for (String zone : orderedZones) { - instanceSet = new TreeSet<>(instances); - Set currentZoneInstanceSet = new HashSet<>(zoneMapping.get(zone)); - instanceSet.retainAll(currentZoneInstanceSet); - if (instanceSet.size() > 0) { - return new ArrayList<>(instanceSet); - } - } - - return Collections.EMPTY_LIST; - } }