Skip to content

Commit

Permalink
Add supporting APIs a isEvacuateFinished and isReadyForPreparingJoini…
Browse files Browse the repository at this point in the history
…ngCluster for evacuation. (#2618)
  • Loading branch information
xyuanlu committed Sep 19, 2023
1 parent eff214d commit 96cd0d0
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 10 deletions.
17 changes: 17 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -738,4 +738,21 @@ Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);

/**
* Return if instance operation 'Evacuate' is finished.
* @param clusterName
* @param instancesNames
* @return Return true if there is no current state nor pending message on the instance.
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

/**
* Return if instance is ready for preparing joining cluster. The instance should have no current state,
* no pending message and tagged with operation that exclude the instance from Helix assignment.
* @param clusterName
* @param instancesNames
* @return true if the instance is ready for preparing joining cluster.
*/
boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.stream.Collectors;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
Expand All @@ -56,7 +55,7 @@
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
private static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN");
public static final Set<String> INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN");

@Override
public IdealState computeNewIdealState(String resourceName,
Expand Down Expand Up @@ -205,7 +204,7 @@ private static List<String> filterOutOnOperationInstances(Map<String, InstanceCo
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation()))
instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
Expand All @@ -57,6 +58,7 @@
import org.apache.helix.api.status.ClusterManagementModeRequest;
import org.apache.helix.api.topology.ClusterTopology;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
Expand Down Expand Up @@ -407,10 +409,70 @@ public ZNRecord update(ZNRecord currentData) {
}

@Override
public void enableResource(final String clusterName, final String resourceName,
final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName,
clusterName);
public boolean isEvacuateFinished(String clusterName, String instanceName) {
return !instanceHasCurrentSateOrMessage(clusterName, instanceName) && (getInstanceConfig(clusterName,
instanceName).getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name()));
}

@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) {
return !instanceHasCurrentSateOrMessage(clusterName, instanceName)
&& DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(
getInstanceConfig(clusterName, instanceName).getInstanceOperation());
}

/**
* Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline,
* instance has no active session, or if instance is online but has no current state or pending message.
* @param clusterName
* @param instanceName
* @return
*/
private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) {
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName,
clusterName);
return false;
}

BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
// count number of sessions under CurrentState folder. If it is carrying over from prv session,
// then there are > 1 session ZNodes.
List<String> sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0);
if (sessions.size() > 1) {
logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName,
clusterName);
return true;
}

String sessionId = liveInstance.getEphemeralOwner();

String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId);
List<String> currentStates = baseAccessor.getChildNames(path, 0);
if (currentStates == null) {
logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.",
instanceName, clusterName);
return false;
}

// see if instance has pending message.
List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
if (messages != null && !messages.isEmpty()) {
logger.warn("Instance {} in cluster {} has pending messages.", instanceName, clusterName);
return true;
}

return !currentStates.isEmpty();
}

@Override
public void enableResource(final String clusterName, final String resourceName, final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName);
String path = PropertyPathBuilder.idealState(clusterName, resourceName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
if (!baseAccessor.exists(path, 0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.TestHelper;
import org.apache.helix.api.status.ClusterManagementMode;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class TestInstanceOperation extends ZkTestBase {
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;

private HelixAdmin _admin;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;

Expand Down Expand Up @@ -91,6 +94,8 @@ public void beforeClass() throws Exception {
createTestDBs(200);

setUpWagedBaseline();

_admin = new ZKHelixAdmin(_gZkClient);
}

@Test
Expand All @@ -106,7 +111,6 @@ public void testEvacuate() throws Exception {
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE);

System.out.println("123");
Assert.assertTrue(_clusterVerifier.verifyByPolling());

// New ev should contain all instances but the evacuated one
Expand All @@ -119,6 +123,9 @@ public void testEvacuate() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testEvacuate")
Expand Down Expand Up @@ -215,6 +222,8 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
}
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));

// sleep a bit so ST messages can start executing
Thread.sleep(Math.abs(_stateModelDelay / 100));
Expand Down Expand Up @@ -261,6 +270,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
// message should be pending at the to evacuate participant
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));

// cancel evacuation
_gSetupTool.getClusterManagementTool()
Expand Down Expand Up @@ -323,7 +333,7 @@ public void testMarkEvacuationAfterEMM() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
Expand Down
10 changes: 10 additions & 0 deletions helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,14 @@ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterNam
List<String> instancesNames) {
return null;
}

@Override
public boolean isEvacuateFinished(String clusterName, String instancesNames) {
return false;
}

@Override
public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) {
return false;
}
}

0 comments on commit 96cd0d0

Please sign in to comment.