Skip to content

Commit

Permalink
Implement node evacuation filtering during checks and add whitelist f…
Browse files Browse the repository at this point in the history
…or stoppable checks
  • Loading branch information
MarkGaox committed Nov 1, 2023
1 parent 4552f87 commit 17ab9df
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
Expand Down Expand Up @@ -239,6 +240,23 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String
return false;
}

/**
* Checks if the specified instance is marked for an ongoing instance operation. Currently,
* this method only checks for evacuation.
*
* @param dataAccessor The accessor for retrieving Helix data properties.
* @param instanceName An instance to be evaluated.
* @return
*/
public static boolean isOperationSetForInstance(HelixDataAccessor dataAccessor,
String instanceName) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instanceName));
return InstanceConstants.InstanceOperation.EVACUATE.name()
.equals(instanceConfig.getInstanceOperation());
}

/**
* Get the problematic partitions on the to-be-stop instance
* Requirement:
Expand Down Expand Up @@ -295,12 +313,17 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
if (stateMap.containsKey(instanceToBeStop)
&& stateMap.get(instanceToBeStop).equals(stateModelDefinition.getTopState())) {
for (String siblingInstance : stateMap.keySet()) {
// Skip this self check
// Skip this self check and instances we assume to be already stopped
if (siblingInstance.equals(instanceToBeStop) || (toBeStoppedInstances != null
&& toBeStoppedInstances.contains(siblingInstance))) {
continue;
}

// If the node is in the evacuating state, we skip this partition health check.
if (isOperationSetForInstance(dataAccessor, siblingInstance)) {
continue;
}

// If the state is init state, we add appropriate messages
if (stateMap.get(siblingInstance).equals(stateModelDefinition.getInitialState())) {
unhealthyPartitions.computeIfAbsent(partition, list -> new ArrayList<>())
Expand Down Expand Up @@ -451,9 +474,11 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
if (stateByInstanceMap.containsKey(instanceName)) {
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
if (!entry.getKey().equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(entry.getKey())) && !unhealthyStates.contains(
entry.getValue())) {
String siblingInstanceName = entry.getKey();
if (!siblingInstanceName.equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(siblingInstanceName))
&& !unhealthyStates.contains(entry.getValue()) && !isOperationSetForInstance(
dataAccessor, siblingInstanceName)) {
numHealthySiblings++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
Expand All @@ -45,6 +46,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.argThat;
Expand Down Expand Up @@ -375,7 +377,7 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
Expand All @@ -397,6 +399,9 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set up default instances config
setDefaultInstanceConfigs(mock);

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Expand Down Expand Up @@ -430,6 +435,9 @@ public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances()
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set default instance config
setDefaultInstanceConfigs(mock);

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance3");
toBeStoppedInstances.add("invalidInstances"); // include an invalid instance.
Expand Down Expand Up @@ -469,6 +477,9 @@ public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set default instance config
setDefaultInstanceConfigs(mock);

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
toBeStoppedInstances.add("instance2");
Expand All @@ -478,6 +489,59 @@ public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
Assert.assertFalse(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckFailsWithEvacuatingInstance() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// Set instance operation to be EVACUATE for instance2
InstanceConfig instanceConfig2 = new InstanceConfig("instance2");
InstanceConfig instanceConfig3 = new InstanceConfig("instance3");
instanceConfig2.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
doReturn(instanceConfig2).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance2"));
doReturn(instanceConfig3).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance3"));

// Add instance1 to toBeStoppedInstances
Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
Assert.assertFalse(result);

// Remove instance1 from toBeStoppedInstances and add its config
InstanceConfig instanceConfig1 = new InstanceConfig("instance1");
doReturn(instanceConfig1).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance1"));
toBeStoppedInstances = new HashSet<>();
result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
Expand All @@ -504,6 +568,9 @@ public void TestSiblingNodesActiveReplicaCheck_fail() {
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set default instance config
setDefaultInstanceConfigs(mock);

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Expand Down Expand Up @@ -552,6 +619,19 @@ public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavail
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
}

private void setDefaultInstanceConfigs(Mock mock) {
// set default instance config
InstanceConfig instanceConfig1 = new InstanceConfig("instance1");
doReturn(instanceConfig1).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance1"));
InstanceConfig instanceConfig2 = new InstanceConfig("instance2");
doReturn(instanceConfig2).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance2"));
InstanceConfig instanceConfig3 = new InstanceConfig("instance3");
doReturn(instanceConfig3).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance3"));
}

private class Mock {
HelixDataAccessor dataAccessor;
ConfigAccessor configAccessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public class MaintenanceManagementService {
private final HelixDataAccessorWrapper _dataAccessor;
private final Set<String> _nonBlockingHealthChecks;
private final Set<StoppableCheck.Category> _skipHealthCheckCategories;
// Set the default value of _stoppableHealthCheckList to be the list of all stoppable checks to
// maintain the backward compatibility with users who don't use MaintenanceManagementServiceBuilder
// to create the MaintenanceManagementService object.
private List<HealthCheck> _stoppableHealthCheckList = HealthCheck.STOPPABLE_CHECK_LIST;

public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, boolean skipZKRead, String namespace) {
Expand Down Expand Up @@ -144,6 +148,26 @@ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
_namespace = namespace;
}

@VisibleForTesting
MaintenanceManagementService(MaintenanceManagementServiceBuilder builder) {
_dataAccessor =
new HelixDataAccessorWrapper(builder.getDataAccessor(), builder.getCustomRestClient(),
builder.getNamespace());
_configAccessor = builder.getConfigAccessor();
_customRestClient = builder.getCustomRestClient();
_skipZKRead = builder.isSkipZKRead();
_nonBlockingHealthChecks =
builder.isContinueOnFailure() ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
: Collections.emptySet();
_skipHealthCheckCategories =
builder.getSkipHealthCheckCategories() == null ? Collections.emptySet()
: builder.getSkipHealthCheckCategories();
_stoppableHealthCheckList =
builder.getStoppableHealthCheckList() == null ? Collections.emptyList()
: builder.getStoppableHealthCheckList();
_namespace = builder.getNamespace();
}

/**
* Perform health check and maintenance operation check and execution for a instance in
* one cluster.
Expand Down Expand Up @@ -613,7 +637,7 @@ private StoppableCheck performHelixOwnInstanceCheck(String clusterId, String ins
Set<String> toBeStoppedInstances) {
LOG.info("Perform helix own custom health checks for {}/{}", clusterId, instanceName);
Map<String, Boolean> helixStoppableCheck =
getInstanceHealthStatus(clusterId, instanceName, HealthCheck.STOPPABLE_CHECK_LIST,
getInstanceHealthStatus(clusterId, instanceName, _stoppableHealthCheckList,
toBeStoppedInstances);

return new StoppableCheck(helixStoppableCheck, StoppableCheck.Category.HELIX_OWN_CHECK);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.apache.helix.rest.clusterMaintenanceService;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Collections;
import java.util.List;
import java.util.Set;

import org.apache.helix.ConfigAccessor;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.rest.client.CustomRestClient;
import org.apache.helix.rest.server.json.instance.StoppableCheck;


public class MaintenanceManagementServiceBuilder {
private ConfigAccessor _configAccessor;
private boolean _skipZKRead;
private String _namespace;
private ZKHelixDataAccessor _dataAccessor;
private CustomRestClient _customRestClient;
private boolean _continueOnFailure;
private Set<StoppableCheck.Category> _skipHealthCheckCategories = Collections.emptySet();
private List<HealthCheck> _stoppableHealthCheckList = Collections.emptyList();

public ConfigAccessor getConfigAccessor() {
return _configAccessor;
}

public boolean isSkipZKRead() {
return _skipZKRead;
}

public String getNamespace() {
return _namespace;
}

public ZKHelixDataAccessor getDataAccessor() {
return _dataAccessor;
}

public CustomRestClient getCustomRestClient() {
return _customRestClient;
}

public boolean isContinueOnFailure() {
return _continueOnFailure;
}

public Set<StoppableCheck.Category> getSkipHealthCheckCategories() {
return _skipHealthCheckCategories;
}

public List<HealthCheck> getStoppableHealthCheckList() {
return _stoppableHealthCheckList;
}

public MaintenanceManagementServiceBuilder setConfigAccessor(ConfigAccessor configAccessor) {
_configAccessor = configAccessor;
return this;
}

public MaintenanceManagementServiceBuilder setSkipZKRead(boolean skipZKRead) {
_skipZKRead = skipZKRead;
return this;
}

public MaintenanceManagementServiceBuilder setNamespace(String namespace) {
_namespace = namespace;
return this;
}

public MaintenanceManagementServiceBuilder setDataAccessor(
ZKHelixDataAccessor dataAccessor) {
_dataAccessor = dataAccessor;
return this;
}

public MaintenanceManagementServiceBuilder setCustomRestClient(
CustomRestClient customRestClient) {
_customRestClient = customRestClient;
return this;
}

public MaintenanceManagementServiceBuilder setContinueOnFailure(boolean continueOnFailure) {
_continueOnFailure = continueOnFailure;
return this;
}

public MaintenanceManagementServiceBuilder setSkipHealthCheckCategories(
Set<StoppableCheck.Category> skipHealthCheckCategories) {
_skipHealthCheckCategories = skipHealthCheckCategories;
return this;
}

public MaintenanceManagementServiceBuilder setStoppableHealthCheckList(
List<HealthCheck> stoppableHealthCheckList) {
_stoppableHealthCheckList = stoppableHealthCheckList;
return this;
}

public MaintenanceManagementService createMaintenanceManagementService() {
if (_configAccessor == null || _namespace == null || _dataAccessor == null
|| _customRestClient == null) {
throw new IllegalArgumentException(
"One or more of following mandatory arguments are not set: '_configAccessor', "
+ "'_namespace', '_dataAccessor', '_customRestClient'.");
}
return new MaintenanceManagementService(this);
}
}
Loading

0 comments on commit 17ab9df

Please sign in to comment.