Skip to content

Commit

Permalink
Rewrite the filtering logic by adding evacuating instances to toBeSto…
Browse files Browse the repository at this point in the history
…ppedInstances list
  • Loading branch information
MarkGaox committed Nov 2, 2023
1 parent 17ab9df commit 8951a31
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 354 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,23 +240,6 @@ public static boolean hasErrorPartitions(HelixDataAccessor dataAccessor, String
return false;
}

/**
* Checks if the specified instance is marked for an ongoing instance operation. Currently,
* this method only checks for evacuation.
*
* @param dataAccessor The accessor for retrieving Helix data properties.
* @param instanceName An instance to be evaluated.
* @return
*/
public static boolean isOperationSetForInstance(HelixDataAccessor dataAccessor,
String instanceName) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
InstanceConfig instanceConfig =
dataAccessor.getProperty(propertyKeyBuilder.instanceConfig(instanceName));
return InstanceConstants.InstanceOperation.EVACUATE.name()
.equals(instanceConfig.getInstanceOperation());
}

/**
* Get the problematic partitions on the to-be-stop instance
* Requirement:
Expand Down Expand Up @@ -319,11 +302,6 @@ public static Map<String, List<String>> perPartitionHealthCheck(List<ExternalVie
continue;
}

// If the node is in the evacuating state, we skip this partition health check.
if (isOperationSetForInstance(dataAccessor, siblingInstance)) {
continue;
}

// If the state is init state, we add appropriate messages
if (stateMap.get(siblingInstance).equals(stateModelDefinition.getInitialState())) {
unhealthyPartitions.computeIfAbsent(partition, list -> new ArrayList<>())
Expand Down Expand Up @@ -477,8 +455,7 @@ public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAcces
String siblingInstanceName = entry.getKey();
if (!siblingInstanceName.equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(siblingInstanceName))
&& !unhealthyStates.contains(entry.getValue()) && !isOperationSetForInstance(
dataAccessor, siblingInstanceName)) {
&& !unhealthyStates.contains(entry.getValue())) {
numHealthySiblings++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
Expand All @@ -46,7 +45,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.argThat;
Expand Down Expand Up @@ -399,9 +397,6 @@ public void TestSiblingNodesActiveReplicaCheck_success() {
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set up default instances config
setDefaultInstanceConfigs(mock);

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Expand Down Expand Up @@ -435,9 +430,6 @@ public void TestSiblingNodesActiveReplicaCheckSuccessWithToBeStoppedInstances()
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set default instance config
setDefaultInstanceConfigs(mock);

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance3");
toBeStoppedInstances.add("invalidInstances"); // include an invalid instance.
Expand Down Expand Up @@ -477,9 +469,6 @@ public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set default instance config
setDefaultInstanceConfigs(mock);

Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
toBeStoppedInstances.add("instance2");
Expand All @@ -489,59 +478,6 @@ public void TestSiblingNodesActiveReplicaCheckFailsWithToBeStoppedInstances() {
Assert.assertFalse(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheckFailsWithEvacuatingInstance() {
String resource = "resource";
Mock mock = new Mock();
doReturn(ImmutableList.of(resource)).when(mock.dataAccessor)
.getChildNames(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));
// set ideal state
IdealState idealState = mock(IdealState.class);
when(idealState.isEnabled()).thenReturn(true);
when(idealState.isValid()).thenReturn(true);
when(idealState.getStateModelDefRef()).thenReturn("MasterSlave");
doReturn(idealState).when(mock.dataAccessor).getProperty(argThat(new PropertyKeyArgument(PropertyType.IDEALSTATES)));

// set external view
ExternalView externalView = mock(ExternalView.class);
when(externalView.getMinActiveReplicas()).thenReturn(2);
when(externalView.getStateModelDefRef()).thenReturn("MasterSlave");
when(externalView.getPartitionSet()).thenReturn(ImmutableSet.of("db0"));
when(externalView.getStateMap("db0")).thenReturn(ImmutableMap.of(TEST_INSTANCE, "Master",
"instance1", "Slave", "instance2", "Slave", "instance3", "Slave"));
doReturn(externalView).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.EXTERNALVIEW)));
StateModelDefinition stateModelDefinition = mock(StateModelDefinition.class);
when(stateModelDefinition.getInitialState()).thenReturn("OFFLINE");
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// Set instance operation to be EVACUATE for instance2
InstanceConfig instanceConfig2 = new InstanceConfig("instance2");
InstanceConfig instanceConfig3 = new InstanceConfig("instance3");
instanceConfig2.setInstanceOperation(InstanceConstants.InstanceOperation.EVACUATE);
doReturn(instanceConfig2).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance2"));
doReturn(instanceConfig3).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance3"));

// Add instance1 to toBeStoppedInstances
Set<String> toBeStoppedInstances = new HashSet<>();
toBeStoppedInstances.add("instance1");
boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
Assert.assertFalse(result);

// Remove instance1 from toBeStoppedInstances and add its config
InstanceConfig instanceConfig1 = new InstanceConfig("instance1");
doReturn(instanceConfig1).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance1"));
toBeStoppedInstances = new HashSet<>();
result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE, toBeStoppedInstances);
Assert.assertTrue(result);
}

@Test
public void TestSiblingNodesActiveReplicaCheck_fail() {
String resource = "resource";
Expand All @@ -568,9 +504,6 @@ public void TestSiblingNodesActiveReplicaCheck_fail() {
doReturn(stateModelDefinition).when(mock.dataAccessor)
.getProperty(argThat(new PropertyKeyArgument(PropertyType.STATEMODELDEFS)));

// set default instance config
setDefaultInstanceConfigs(mock);

boolean result =
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);

Expand Down Expand Up @@ -619,19 +552,6 @@ public void TestSiblingNodesActiveReplicaCheck_exception_whenExternalViewUnavail
InstanceValidationUtil.siblingNodesActiveReplicaCheck(mock.dataAccessor, TEST_INSTANCE);
}

private void setDefaultInstanceConfigs(Mock mock) {
// set default instance config
InstanceConfig instanceConfig1 = new InstanceConfig("instance1");
doReturn(instanceConfig1).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance1"));
InstanceConfig instanceConfig2 = new InstanceConfig("instance2");
doReturn(instanceConfig2).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance2"));
InstanceConfig instanceConfig3 = new InstanceConfig("instance3");
doReturn(instanceConfig3).when(mock.dataAccessor)
.getProperty(BUILDER.instanceConfig("instance3"));
}

private class Mock {
HelixDataAccessor dataAccessor;
ConfigAccessor configAccessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,26 @@ public MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
_namespace = namespace;
}

@VisibleForTesting
MaintenanceManagementService(MaintenanceManagementServiceBuilder builder) {
private MaintenanceManagementService(ZKHelixDataAccessor dataAccessor,
ConfigAccessor configAccessor, CustomRestClient customRestClient, boolean skipZKRead,
boolean continueOnFailure, Set<StoppableCheck.Category> skipHealthCheckCategories,
List<HealthCheck> stoppableHealthCheckList, String namespace) {
_dataAccessor =
new HelixDataAccessorWrapper(builder.getDataAccessor(), builder.getCustomRestClient(),
builder.getNamespace());
_configAccessor = builder.getConfigAccessor();
_customRestClient = builder.getCustomRestClient();
_skipZKRead = builder.isSkipZKRead();
new HelixDataAccessorWrapper(dataAccessor, customRestClient,
namespace);
_configAccessor = configAccessor;
_customRestClient = customRestClient;
_skipZKRead = skipZKRead;
_nonBlockingHealthChecks =
builder.isContinueOnFailure() ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
continueOnFailure ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK)
: Collections.emptySet();
_skipHealthCheckCategories =
builder.getSkipHealthCheckCategories() == null ? Collections.emptySet()
: builder.getSkipHealthCheckCategories();
skipHealthCheckCategories == null ? Collections.emptySet()
: skipHealthCheckCategories;
_stoppableHealthCheckList =
builder.getStoppableHealthCheckList() == null ? Collections.emptyList()
: builder.getStoppableHealthCheckList();
_namespace = builder.getNamespace();
stoppableHealthCheckList == null ? Collections.emptyList()
: stoppableHealthCheckList;
_namespace = namespace;
}

/**
Expand Down Expand Up @@ -795,4 +797,86 @@ protected Map<String, Boolean> getInstanceHealthStatus(String clusterId, String

return healthStatus;
}

public static class MaintenanceManagementServiceBuilder {
private ConfigAccessor _configAccessor;
private boolean _skipZKRead;
private String _namespace;
private ZKHelixDataAccessor _dataAccessor;
private CustomRestClient _customRestClient;
private boolean _continueOnFailure;
private Set<StoppableCheck.Category> _skipHealthCheckCategories = Collections.emptySet();
private List<HealthCheck> _stoppableHealthCheckList = Collections.emptyList();

public MaintenanceManagementServiceBuilder setConfigAccessor(ConfigAccessor configAccessor) {
_configAccessor = configAccessor;
return this;
}

public MaintenanceManagementServiceBuilder setSkipZKRead(boolean skipZKRead) {
_skipZKRead = skipZKRead;
return this;
}

public MaintenanceManagementServiceBuilder setNamespace(String namespace) {
_namespace = namespace;
return this;
}

public MaintenanceManagementServiceBuilder setDataAccessor(
ZKHelixDataAccessor dataAccessor) {
_dataAccessor = dataAccessor;
return this;
}

public MaintenanceManagementServiceBuilder setCustomRestClient(
CustomRestClient customRestClient) {
_customRestClient = customRestClient;
return this;
}

public MaintenanceManagementServiceBuilder setContinueOnFailure(boolean continueOnFailure) {
_continueOnFailure = continueOnFailure;
return this;
}

public MaintenanceManagementServiceBuilder setSkipHealthCheckCategories(
Set<StoppableCheck.Category> skipHealthCheckCategories) {
_skipHealthCheckCategories = skipHealthCheckCategories;
return this;
}

public MaintenanceManagementServiceBuilder setStoppableHealthCheckList(
List<HealthCheck> stoppableHealthCheckList) {
_stoppableHealthCheckList = stoppableHealthCheckList;
return this;
}

public MaintenanceManagementService build() {
validate();
return new MaintenanceManagementService(_dataAccessor, _configAccessor, _customRestClient,
_skipZKRead, _continueOnFailure, _skipHealthCheckCategories, _stoppableHealthCheckList,
_namespace);
}

private void validate() throws IllegalArgumentException {
List<String> msg = new ArrayList<>();
if (_configAccessor == null) {
msg.add("'configAccessor' can't be null.");
}
if (_namespace == null) {
msg.add("'namespace' can't be null.");
}
if (_dataAccessor == null) {
msg.add("'_dataAccessor' can't be null.");
}
if (_customRestClient == null) {
msg.add("'customRestClient' can't be null.");
}
if (msg.size() != 0) {
throw new IllegalArgumentException(
"One or more mandatory arguments are not set " + msg);
}
}
}
}
Loading

0 comments on commit 8951a31

Please sign in to comment.