Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add supporting APIs for evacuation #2618

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -738,4 +738,21 @@ Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);

/**
* Return if instance operation 'Evacuate' is finished.
* @param clusterName
* @param instancesNames
* @return Return true if there is no current state nor pending message on the instance.
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

/**
* Return if instance is ready for preparing joining cluster. The instance should have no current state,
* no pending message and tagged with operation that exclude the instance from Helix assignment.
* @param clusterName
* @param instancesNames
* @return true if the instance is ready for preparing joining cluster.
*/
boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
Expand All @@ -56,7 +55,7 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
private static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN");
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN");

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -205,7 +204,7 @@ private static List<String> filterOutOnOperationInstances(Map<String, InstanceCo
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation()))
instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
Expand All @@ -57,6 +58,7 @@
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
Expand Down Expand Up @@ -407,10 +409,60 @@ public ZNRecord update(ZNRecord currentData) {
}

@Override
public void enableResource(final String clusterName, final String resourceName,
final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName,
clusterName);
public boolean isEvacuateFinished(String clusterName, String instanceName) {
return !instanceHasCurrentSateOrMessage(clusterName, instanceName);
}

@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
return !instanceHasCurrentSateOrMessage(clusterName, instanceName)
&& DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
getInstanceConfig(clusterName, instanceName).getInstanceOperation());
}

/**
* Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
* instance has no active session, or if instance is online but has no current state or pending message.
* @param clusterName
* @param instanceName
* @return
*/
private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName,
clusterName);
return false;
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
}

BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
String sessionId = liveInstance.getEphemeralOwner();

String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId);
List<String> currentStates = baseAccessor.getChildNames(path, 0);
if (currentStates == null) {
logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.",
instanceName, clusterName);
return false;
}
Comment on lines +456 to +461
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition can happen if the instance is carrying over from old session current state. New session current state is empty.


// see if instance has pending message.
List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
if (messages != null && !messages.isEmpty()) {
logger.warn("Instance {} in cluster {} has pending messages.", instanceName, clusterName);
return true;
}

return !currentStates.isEmpty();
}

@Override
public void enableResource(final String clusterName, final String resourceName, final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName);
String path = PropertyPathBuilder.idealState(clusterName, resourceName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (!baseAccessor.exists(path, 0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class TestInstanceOperation extends ZkTestBase {
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;

private HelixAdmin _admin;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;

Expand Down Expand Up @@ -91,6 +94,8 @@ public void beforeClass() throws Exception {
createTestDBs(200);

setUpWagedBaseline();

_admin = new ZKHelixAdmin(_gZkClient);
}

@Test
Expand All @@ -106,7 +111,6 @@ public void testEvacuate() throws Exception {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);

System.out.println("123");
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// New ev should contain all instances but the evacuated one
Expand All @@ -119,6 +123,9 @@ public void testEvacuate() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testEvacuate")
Expand Down Expand Up @@ -215,6 +222,8 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
}
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));

// sleep a bit so ST messages can start executing
Thread.sleep(Math.abs(_stateModelDelay / 100));
Expand Down Expand Up @@ -261,6 +270,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
// message should be pending at the to evacuate participant
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));

// cancel evacuation
_gSetupTool.getClusterManagementTool()
Expand Down Expand Up @@ -323,7 +333,7 @@ public void testMarkEvacuationAfterEMM() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
Expand Down
10 changes: 10 additions & 0 deletions helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,14 @@ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterNam
List<String> instancesNames) {
return null;
}

@Override
public boolean isEvacuateFinished(String clusterName, String instancesNames) {
return false;
}

@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) {
return false;
}
}
Loading