Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change instance operation orthogonal to instance enable #2615

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public enum InstanceDisabledType {
public enum InstanceOperation {
EVACUATE, // Node will be removed after a period of time
SWAP_IN, // New node joining for swap operation
SWAP_OUT, // Existing Node to be removed for swap operation
ENABLE, // Backward compatible field for HELIX_ENABLED. Set when changing from disabled to enabled.
DISABLE // Backward compatible field for HELIX_ENABLED. Set when changing from enabled to disabled.
SWAP_OUT // Existing Node to be removed for swap operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public static void main(String[] args) throws Exception {

controllerName = cmd.getOptionValue(name);

// Espresso_driver.py will consume this
// Espresso_driver.py will consume thisq
logger.info("Cluster manager started, zkServer: " + zkConnectString + ", clusterName:"
+ clusterName + ", controllerName:" + controllerName + ", mode:" + controllerMode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import java.util.Optional;
import java.util.Set;

import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand Down Expand Up @@ -109,14 +112,12 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

Set<String> activeNodes = liveEnabledNodes;
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);

Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
Expand Down Expand Up @@ -157,15 +158,17 @@ public IdealState computeNewIdealState(String resourceName,

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
List<String> liveEnabledNodeList = new ArrayList<>(liveEnabledNodes);
List<String> liveEnabledAssignableNodeList = filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes);
Collections.sort(allNodeList);
Collections.sort(liveEnabledNodeList);
Collections.sort(liveEnabledAssignableNodeList);

ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
Expand Down Expand Up @@ -194,6 +197,14 @@ public IdealState computeNewIdealState(String resourceName,
return idealState;
}

public static List<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
InstanceConstants.InstanceOperation.EVACUATE.name()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's include the SWAP_IN as well. It will be same behavior.

.collect(Collectors.toList());
}

private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> live
return activeNodes;
}

private static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
public static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,6 @@ public boolean getInstanceEnabled() {
*/
public void setInstanceEnabled(boolean enabled) {
// set instance operation only when we need to change InstanceEnabled value.
// When enabling an instance where current HELIX_ENABLED is false, we update INSTANCE_OPERATION to 'ENABLE'
// When disabling and instance where current HELIX_ENABLED is false, we overwrite what current operation and
// update INSTANCE_OPERATION to 'DISABLE'.
String instanceOperationKey = InstanceConfigProperty.INSTANCE_OPERATION.toString();
if (enabled != getInstanceEnabled()) {
_record.setSimpleField(instanceOperationKey,
enabled ? InstanceConstants.InstanceOperation.ENABLE.name()
: InstanceConstants.InstanceOperation.DISABLE.name());
}
setInstanceEnabledHelper(enabled);
}

Expand Down Expand Up @@ -344,17 +335,8 @@ public long getInstanceEnabledTime() {
}

public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
if (operation != InstanceConstants.InstanceOperation.DISABLE
&& operation != InstanceConstants.InstanceOperation.ENABLE) {
if (!getInstanceEnabled()) {
throw new HelixException(
"setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
}
} else {
setInstanceEnabledHelper(operation == InstanceConstants.InstanceOperation.ENABLE);
}

_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.toString(), operation.toString());
_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation == null ? "" : operation.name());
}

public String getInstanceOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void testRevertEvacuation() throws Exception {
// revert an evacuate instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

Expand All @@ -138,6 +138,55 @@ public void testRevertEvacuation() throws Exception {
}

@Test(dependsOnMethods = "testRevertEvacuation")
public void testAddingNodeWithEvacuationTag() throws Exception {
// first disable and instance, and wait for all replicas to be moved out
String mockNewInstance = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
//ev should contain all instances but the disabled one
Map<String, ExternalView> assignment = getEV();
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource));
Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

// add evacuate tag and enable instance
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE);
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, true);
//ev should be the same
assignment = getEV();
currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource));
Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

// now remove operation tag
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

// EV should contain all participants, check resources one by one
assignment = getEV();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
validateAssignmentInEv(assignment.get(resource));
}
}

@Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
Expand All @@ -151,7 +200,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// set bootstrap ST delay to a large number
_stateModelDelay = -300000L;
_stateModelDelay = -10000L;
// evacuate an instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
Expand All @@ -174,9 +223,9 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
validateAssignmentInEv(assignment.get(resource));
}

// cancel the evacuation by setting instance operation back to `ENABLE`
// cancel the evacuation
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

assignment = getEV();
for (String resource : _allDBs) {
Expand All @@ -200,7 +249,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
public void testEvacuateAndCancelBeforeDropFinish() throws Exception {

// set DROP ST delay to a large number
_stateModelDelay = 300000L;
_stateModelDelay = 10000L;

// evacuate an instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
Expand All @@ -211,8 +260,9 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);

// cancel evacuation
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
// check every replica has >= 3 active replicas, even before cluster converge
Map<String, ExternalView> assignment = getEV();
for (String resource : _allDBs) {
Expand Down Expand Up @@ -290,6 +340,10 @@ private void addParticipant(String participantName) {
}

private void createTestDBs(long delayTime) throws InterruptedException {
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1,
CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB0_CRUSHED");
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 200,
CrushEdRebalanceStrategy.class.getName());
Expand Down Expand Up @@ -375,7 +429,7 @@ public StDelayMSStateModel() {
private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{
while(sleepTime >0 && !isCancelled()) {
Thread.sleep(5000);
sleepTime =- 5000;
sleepTime = sleepTime - 5000;
}
if (isCancelled()) {
_cancelled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,29 +495,11 @@ public void updateInstance() throws IOException {
instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString());
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP")
.expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation")
.expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
// TODO: enable the following test when we add sanity check.
// set operation to be DISABLE
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=DISABLE")
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
Assert.assertEquals(
instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.DISABLE.toString());
Assert.assertTrue(!instanceConfig.getInstanceEnabled());

// set operation to EVACUATE, expect error
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE")
.expectedReturnStatusCode(Response.Status.BAD_REQUEST.getStatusCode())
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
// set back to enable
new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=ENABLE")
.format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity);
instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME);
Assert.assertEquals(
instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.ENABLE.toString());
Assert.assertTrue(instanceConfig.getInstanceEnabled());

instanceConfig.getInstanceOperation(), "");
System.out.println("End test :" + TestHelper.getTestMethodName());
}

Expand Down
Loading