Skip to content

Commit

Permalink
Change instance operation orthogonal to instance enable (#2615)
Browse files Browse the repository at this point in the history
Change instance operation orthogonal to instance enable
  • Loading branch information
xyuanlu committed Sep 19, 2023
1 parent a51c587 commit eff214d
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ public enum InstanceDisabledType {
public enum InstanceOperation {
EVACUATE, // Node will be removed after a period of time
SWAP_IN, // New node joining for swap operation
SWAP_OUT, // Existing Node to be removed for swap operation
ENABLE, // Backward compatible field for HELIX_ENABLED. Set when changing from disabled to enabled.
DISABLE // Backward compatible field for HELIX_ENABLED. Set when changing from enabled to disabled.
SWAP_OUT // Existing Node to be removed for swap operation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import java.util.Optional;
import java.util.Set;

import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand All @@ -53,6 +56,7 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
private static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN");

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -109,14 +113,12 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

Set<String> activeNodes = liveEnabledNodes;
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);

Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
Expand Down Expand Up @@ -157,15 +159,20 @@ public IdealState computeNewIdealState(String resourceName,

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
List<String> liveEnabledNodeList = new ArrayList<>(liveEnabledNodes);
// We will not assign partition to instances with evacuation and wap-out tag.
// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
List<String> liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(),
liveEnabledNodes);
Collections.sort(allNodeList);
Collections.sort(liveEnabledNodeList);
Collections.sort(liveEnabledAssignableNodeList);

ZNRecord newIdealMapping = _rebalanceStrategy
.computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData);
.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;

if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) {
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size()!= activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
Expand Down Expand Up @@ -194,6 +201,14 @@ public IdealState computeNewIdealState(String resourceName,
return idealState;
}

private static List<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static Set<String> getActiveNodes(Set<String> allNodes, IdealState idealS
private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> liveEnabledNodes,
Map<String, Long> instanceOfflineTimeMap, Set<String> liveNodes, Map<String, InstanceConfig> instanceConfigMap,
long delay, ClusterConfig clusterConfig) {
Set<String> activeNodes = filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes);
Set<String> activeNodes = new HashSet<>(liveEnabledNodes);
Set<String> offlineOrDisabledInstances = new HashSet<>(allNodes);
offlineOrDisabledInstances.removeAll(liveEnabledNodes);
long currentTime = System.currentTimeMillis();
Expand All @@ -126,10 +126,11 @@ private static Set<String> getActiveNodes(Set<String> allNodes, Set<String> live
activeNodes.add(ins);
}
}
return activeNodes;
// TODO: change this after merging operation and helix-enable field.
return filterOutEvacuatingInstances(instanceConfigMap, activeNodes);
}

private static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
public static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,6 @@ public boolean getInstanceEnabled() {
*/
public void setInstanceEnabled(boolean enabled) {
// set instance operation only when we need to change InstanceEnabled value.
// When enabling an instance where current HELIX_ENABLED is false, we update INSTANCE_OPERATION to 'ENABLE'
// When disabling and instance where current HELIX_ENABLED is false, we overwrite what current operation and
// update INSTANCE_OPERATION to 'DISABLE'.
String instanceOperationKey = InstanceConfigProperty.INSTANCE_OPERATION.toString();
if (enabled != getInstanceEnabled()) {
_record.setSimpleField(instanceOperationKey,
enabled ? InstanceConstants.InstanceOperation.ENABLE.name()
: InstanceConstants.InstanceOperation.DISABLE.name());
}
setInstanceEnabledHelper(enabled);
}

Expand Down Expand Up @@ -344,17 +335,8 @@ public long getInstanceEnabledTime() {
}

public void setInstanceOperation(InstanceConstants.InstanceOperation operation) {
if (operation != InstanceConstants.InstanceOperation.DISABLE
&& operation != InstanceConstants.InstanceOperation.ENABLE) {
if (!getInstanceEnabled()) {
throw new HelixException(
"setting non enable/disable operation (e.g. evacuate, swap) to helix disabled instance is not allowed");
}
} else {
setInstanceEnabledHelper(operation == InstanceConstants.InstanceOperation.ENABLE);
}

_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.toString(), operation.toString());
_record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(),
operation == null ? "" : operation.name());
}

public String getInstanceOperation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.participant.StateMachineEngine;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class TestInstanceOperation extends ZkTestBase {
private Set<String> _allDBs = new HashSet<>();
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 30L;
private long _stateModelDelay = 3L;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;

Expand Down Expand Up @@ -105,6 +106,7 @@ public void testEvacuate() throws Exception {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);

System.out.println("123");
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// New ev should contain all instances but the evacuated one
Expand All @@ -125,7 +127,7 @@ public void testRevertEvacuation() throws Exception {
// revert an evacuate instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

Expand All @@ -138,6 +140,55 @@ public void testRevertEvacuation() throws Exception {
}

@Test(dependsOnMethods = "testRevertEvacuation")
public void testAddingNodeWithEvacuationTag() throws Exception {
// first disable and instance, and wait for all replicas to be moved out
String mockNewInstance = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, false);
Assert.assertTrue(_clusterVerifier.verifyByPolling());
//ev should contain all instances but the disabled one
Map<String, ExternalView> assignment = getEV();
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource), REPLICA-1);
Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

// add evacuate tag and enable instance
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE);
_gSetupTool.getClusterManagementTool()
.enableInstance(CLUSTER_NAME, mockNewInstance, true);
//ev should be the same
assignment = getEV();
currentActiveInstances =
_participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList());
for (String resource : _allDBs) {
validateAssignmentInEv(assignment.get(resource), REPLICA-1);
Set<String> newPAssignedParticipants = getParticipantsInEv(assignment.get(resource));
Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

// now remove operation tag
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

Assert.assertTrue(_clusterVerifier.verifyByPolling());

// EV should contain all participants, check resources one by one
assignment = getEV();
for (String resource : _allDBs) {
Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames));
validateAssignmentInEv(assignment.get(resource));
}
}

@Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
Expand All @@ -151,7 +202,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// set bootstrap ST delay to a large number
_stateModelDelay = -300000L;
_stateModelDelay = -10000L;
// evacuate an instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
Expand All @@ -174,9 +225,9 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
validateAssignmentInEv(assignment.get(resource));
}

// cancel the evacuation by setting instance operation back to `ENABLE`
// cancel the evacuation
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);

assignment = getEV();
for (String resource : _allDBs) {
Expand All @@ -200,7 +251,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
public void testEvacuateAndCancelBeforeDropFinish() throws Exception {

// set DROP ST delay to a large number
_stateModelDelay = 300000L;
_stateModelDelay = 10000L;

// evacuate an instance
String instanceToEvacuate = _participants.get(0).getInstanceName();
Expand All @@ -211,8 +262,9 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);

// cancel evacuation
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.ENABLE);
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null);
// check every replica has >= 3 active replicas, even before cluster converge
Map<String, ExternalView> assignment = getEV();
for (String resource : _allDBs) {
Expand Down Expand Up @@ -274,6 +326,27 @@ public void testMarkEvacuationAfterEMM() throws Exception {

}

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
_participants.get(2).syncStop();
_participants.get(3).syncStop();
// wait for converge, and set evacuate on instance 0
Assert.assertTrue(_clusterVerifier.verifyByPolling());

String evacuateInstanceName = _participants.get(0).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE);

Map<String, IdealState> assignment;
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION);

_participants.get(3).syncStart();
_participants.get(2).syncStart();
}


private void addParticipant(String participantName) {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName);

Expand All @@ -290,8 +363,12 @@ private void addParticipant(String participantName) {
}

private void createTestDBs(long delayTime) throws InterruptedException {
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1,
CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB0_CRUSHED");
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED",
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 200,
BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 2000000,
CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB1_CRUSHED");
createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(),
Expand All @@ -310,14 +387,39 @@ private Map<String, ExternalView> getEV() {
return externalViews;
}

private boolean verifyIS(String evacuateInstanceName) {
for (String db : _allDBs) {
IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
for (String partition : is.getPartitionSet()) {
List<String> newPAssignedParticipants = is.getPreferenceList(partition);
if (newPAssignedParticipants.contains(evacuateInstanceName)) {
System.out.println("partition " + partition + " assignment " + newPAssignedParticipants + " ev " + evacuateInstanceName);
return false;
}
}
}
return true;
}

private Set<String> getParticipantsInEv(ExternalView ev) {
Set<String> assignedParticipants = new HashSet<>();
ev.getPartitionSet().forEach(partition -> assignedParticipants.addAll(ev.getStateMap(partition).keySet()));
for (String partition : ev.getPartitionSet()) {
ev.getStateMap(partition)
.keySet()
.stream()
.filter(k -> !ev.getStateMap(partition).get(k).equals("OFFLINE"))
.forEach(assignedParticipants::add);
}
return assignedParticipants;
}

// verify that each partition has >=REPLICA (3 in this case) replicas

private void validateAssignmentInEv(ExternalView ev) {
validateAssignmentInEv(ev, REPLICA);
}

private void validateAssignmentInEv(ExternalView ev, int expectedNumber) {
Set<String> partitionSet = ev.getPartitionSet();
for (String partition : partitionSet) {
AtomicInteger activeReplicaCount = new AtomicInteger();
Expand All @@ -326,8 +428,7 @@ private void validateAssignmentInEv(ExternalView ev) {
.stream()
.filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY"))
.forEach(v -> activeReplicaCount.getAndIncrement());
Assert.assertTrue(activeReplicaCount.get() >=REPLICA);

Assert.assertTrue(activeReplicaCount.get() >=expectedNumber);
}
}

Expand Down Expand Up @@ -375,7 +476,7 @@ public StDelayMSStateModel() {
private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{
while(sleepTime >0 && !isCancelled()) {
Thread.sleep(5000);
sleepTime =- 5000;
sleepTime = sleepTime - 5000;
}
if (isCancelled()) {
_cancelled = false;
Expand Down
Loading

0 comments on commit eff214d

Please sign in to comment.