Skip to content

Commit

Permalink
Support Stoppable Check for Non-Topology-Aware Clusters (#2961)
Browse files Browse the repository at this point in the history
Support Stoppable Check for Non-Topology-Aware Clusters
  • Loading branch information
MarkGaox authored Dec 11, 2024
1 parent fdfa66c commit 33a28e7
Show file tree
Hide file tree
Showing 9 changed files with 376 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,15 @@ public Map<String, List<String>> getAggregatedStoppableCheck(String baseUrl,
if (instances != null && !instances.isEmpty()) {
payLoads.put("instances", instances);
}
if (toBeStoppedInstances != null && !toBeStoppedInstances.isEmpty()) {
payLoads.put("to_be_stopped_instances", toBeStoppedInstances);
// Before sending the request, make sure the toBeStoppedInstances has no overlap with instances
Set<String> remainingToBeStoppedInstances = toBeStoppedInstances;
if (instances != null && toBeStoppedInstances != null) {
remainingToBeStoppedInstances =
toBeStoppedInstances.stream().filter(ins -> !instances.contains(ins))
.collect(Collectors.toSet());
}
if (remainingToBeStoppedInstances != null && !remainingToBeStoppedInstances.isEmpty()) {
payLoads.put("to_be_stopped_instances", remainingToBeStoppedInstances);
}
if (clusterId != null) {
payLoads.put("cluster_id", clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ public ObjectNode getStoppableInstancesInSingleZone(List<String> instances,
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
Set<String> toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances);

List<String> zoneBasedInstance =
getZoneBasedInstances(instances, _clusterTopology.toZoneMapping());
Expand Down Expand Up @@ -118,8 +117,7 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
collectEvacuatingInstances(toBeStoppedInstancesSet);
Set<String> toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances);

Map<String, Set<String>> zoneMapping = _clusterTopology.toZoneMapping();
for (String zone : _orderOfZone) {
Expand All @@ -136,6 +134,39 @@ public ObjectNode getStoppableInstancesCrossZones(List<String> instances,
return result;
}

/**
* Evaluates and collects stoppable instances not based on the zone order.
* The method iterates through instances, performing stoppable checks, and records reasons for
* non-stoppability.
*
* @param instances A list of instance to be evaluated.
* @param toBeStoppedInstances A list of instances presumed to be already stopped
* @return An ObjectNode containing:
* - 'stoppableNode': List of instances that can be stopped.
* - 'instance_not_stoppable_with_reasons': A map with the instance name as the key and
* a list of reasons for non-stoppability as the value.
* @throws IOException
*/
public ObjectNode getStoppableInstancesNonZoneBased(List<String> instances,
List<String> toBeStoppedInstances) throws IOException {
ObjectNode result = JsonNodeFactory.instance.objectNode();
ArrayNode stoppableInstances =
result.putArray(InstancesAccessor.InstancesProperties.instance_stoppable_parallel.name());
ObjectNode failedStoppableInstances = result.putObject(
InstancesAccessor.InstancesProperties.instance_not_stoppable_with_reasons.name());
Set<String> toBeStoppedInstancesSet = findToBeStoppedInstances(toBeStoppedInstances);

// Because zone order calculation is omitted, we must verify each instance's existence
// to ensure we only process valid instances before performing stoppable check.
Set<String> nonExistingInstances = processNonexistentInstances(instances, failedStoppableInstances);
List<String> instancesToCheck = new ArrayList<>(instances);
instancesToCheck.removeAll(nonExistingInstances);
populateStoppableInstances(instancesToCheck, toBeStoppedInstancesSet, stoppableInstances,
failedStoppableInstances);

return result;
}

private void populateStoppableInstances(List<String> instances, Set<String> toBeStoppedInstances,
ArrayNode stoppableInstances, ObjectNode failedStoppableInstances) throws IOException {
Map<String, StoppableCheck> instancesStoppableChecks =
Expand All @@ -159,7 +190,7 @@ private void populateStoppableInstances(List<String> instances, Set<String> toBe
}
}

private void processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
private Set<String> processNonexistentInstances(List<String> instances, ObjectNode failedStoppableInstances) {
// Adding following logic to check whether instances exist or not. An instance exist could be
// checking following scenario:
// 1. Instance got dropped. (InstanceConfig is gone.)
Expand All @@ -174,6 +205,7 @@ private void processNonexistentInstances(List<String> instances, ObjectNode fail
ArrayNode failedReasonsNode = failedStoppableInstances.putArray(nonSelectedInstance);
failedReasonsNode.add(JsonNodeFactory.instance.textNode(INSTANCE_NOT_EXIST));
}
return nonSelectedInstances;
}

/**
Expand Down Expand Up @@ -258,21 +290,26 @@ private Map<String, Set<String>> getOrderedZoneToInstancesMap(
}

/**
* Collect instances marked for evacuation in the current topology and add them into the given set
* Collect instances within the cluster where the instance operation is set to EVACUATE, SWAP_IN, or UNKNOWN.
* And return them as a set.
*
* @param toBeStoppedInstances A set of instances we presume to be stopped.
* @param toBeStoppedInstances A list of instances we presume to be stopped.
*/
private void collectEvacuatingInstances(Set<String> toBeStoppedInstances) {
private Set<String> findToBeStoppedInstances(List<String> toBeStoppedInstances) {
Set<String> toBeStoppedInstancesSet = new HashSet<>(toBeStoppedInstances);
Set<String> allInstances = _clusterTopology.getAllInstances();
for (String instance : allInstances) {
PropertyKey.Builder propertyKeyBuilder = _dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
_dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instance));
if (InstanceConstants.InstanceOperation.EVACUATE.equals(
instanceConfig.getInstanceOperation().getOperation())) {
toBeStoppedInstances.add(instance);
InstanceConstants.InstanceOperation operation = instanceConfig.getInstanceOperation().getOperation();
if (operation == InstanceConstants.InstanceOperation.EVACUATE
|| operation == InstanceConstants.InstanceOperation.SWAP_IN
|| operation == InstanceConstants.InstanceOperation.UNKNOWN) {
toBeStoppedInstancesSet.add(instance);
}
}
return toBeStoppedInstancesSet;
}

public static class StoppableInstancesSelectorBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public enum InstancesProperties {
}

public enum InstanceHealthSelectionBase {
instance_based,
non_zone_based,
zone_based,
cross_zone_based
}
Expand Down Expand Up @@ -224,12 +224,17 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
boolean random) throws IOException {
try {
// TODO: Process input data from the content
// TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
InstancesAccessor.InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
? InstanceHealthSelectionBase.non_zone_based : InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());

List<String> instances = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());

List<String> orderOfZone = null;
String customizedInput = null;
Expand All @@ -252,6 +257,12 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
_logger.error(message);
return badRequest(message);
}
if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_zone_based) {
String message =
"'zone_order' is set but 'selection_base' is 'non_zone_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}

if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) {
Expand Down Expand Up @@ -285,6 +296,33 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
}
}

ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
if (!clusterService.isClusterTopologyAware(clusterId)) {
String message = "Cluster " + clusterId
+ " is not topology aware. Please enable the topology in cluster config or set "
+ "'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}

// Find instances that lack topology information
Set<String> instancesWithTopology =
clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toSet());
Set<String> allInstances = clusterTopology.getAllInstances();
Set<String> topologyUnawareInstances = new HashSet<>(instances).stream().filter(
instance -> !instancesWithTopology.contains(instance) && allInstances.contains(instance))
.collect(Collectors.toSet());
if (!topologyUnawareInstances.isEmpty()) {
String message = "Instances " + topologyUnawareInstances
+ " do not have topology information. Please set topology information in instance config or"
+ " set 'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}

String namespace = getNamespace();
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService.MaintenanceManagementServiceBuilder()
Expand All @@ -299,9 +337,6 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.build();

ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
StoppableInstancesSelector stoppableInstancesSelector =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
Expand All @@ -311,18 +346,20 @@ private Response batchGetStoppableInstances(String clusterId, JsonNode node, boo
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
ObjectNode result;
// TODO: Add support for clusters that do not have topology set up.
// Issue #2893: https://github.com/apache/helix/issues/2893

switch (selectionBase) {
case zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
break;
case cross_zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
break;
case instance_based:
case non_zone_based:
result = stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, toBeStoppedInstances);
break;
default:
throw new UnsupportedOperationException("instance_based selection is not supported yet!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public interface ClusterService {
* @return
*/
ClusterInfo getClusterInfo(String clusterId);

/**
* Check if the cluster is topology aware
* @param clusterId
* @return
*/
boolean isClusterTopologyAware(String clusterId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.netty.util.internal.StringUtil;
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.rest.server.json.cluster.ClusterInfo;
Expand Down Expand Up @@ -102,4 +104,11 @@ public ClusterInfo getClusterInfo(String clusterId) {
.instances(_dataAccessor.getChildNames(keyBuilder.instances()))
.liveInstances(_dataAccessor.getChildNames(keyBuilder.liveInstances())).build();
}

@Override
public boolean isClusterTopologyAware(String clusterId) {
ClusterConfig config = _configAccessor.getClusterConfig(clusterId);
return config.isTopologyAwareEnabled() && !StringUtil.isNullOrEmpty(config.getFaultZoneType())
&& !StringUtil.isNullOrEmpty(config.getTopology());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.junit.Assert;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class TestCustomRestClient {
Expand Down Expand Up @@ -260,4 +264,39 @@ public void testGetAggregatedStoppableCheck() throws IOException {
Assert.assertTrue(Arrays.stream(healthyInstances).allMatch(instance -> clusterHealth.get(instance).isEmpty()));
Assert.assertTrue(Arrays.stream(nonStoppableInstances).noneMatch(instance -> clusterHealth.get(instance).isEmpty()));
}

@Test(description = "Test if the aggregated stoppable check request has the correct format when there"
+ "are duplicate instances in the instances list and the toBeStoppedInstances list.")
public void testAggregatedCheckRemoveDuplicateInstances()
throws IOException {
String clusterId = "cluster1";

MockCustomRestClient customRestClient = new MockCustomRestClient(_httpClient);
HttpResponse httpResponse = mock(HttpResponse.class);
StatusLine statusLine = mock(StatusLine.class);

when(statusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
when(httpResponse.getStatusLine()).thenReturn(statusLine);
when(_httpClient.execute(any(HttpPost.class))).thenReturn(httpResponse);

customRestClient.getAggregatedStoppableCheck(HTTP_LOCALHOST,
ImmutableList.of("n1", "n2"),
ImmutableSet.of("n1"), clusterId, Collections.emptyMap());

// Make sure that the duplicate instances are removed from the toBeStoppedInstances list
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
verify(_httpClient).execute(argThat(x -> {
String request = null;
try {
request = EntityUtils.toString(((HttpPost) x).getEntity());
JsonNode node = OBJECT_MAPPER.readTree(request);
String instancesInRequest = node.get("instances").toString();
Assert.assertEquals(instancesInRequest, "[\"n1\",\"n2\"]");
Assert.assertNull(node.get("to_be_stopped_instances"));
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
}
Loading

0 comments on commit 33a28e7

Please sign in to comment.