diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 14863f57e7..085a987b1e 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -738,4 +738,21 @@ Map validateResourcesForWagedRebalance(String clusterName, */ Map validateInstancesForWagedRebalance(String clusterName, List instancesNames); + + /** + * Return if instance operation 'Evacuate' is finished. + * @param clusterName + * @param instancesNames + * @return Return true if there is no current state nor pending message on the instance. + */ + boolean isEvacuateFinished(String clusterName, String instancesNames); + + /** + * Return if instance is ready for preparing joining cluster. The instance should have no current state, + * no pending message and tagged with operation that exclude the instance from Helix assignment. + * @param clusterName + * @param instancesNames + * @return true if the instance is ready for preparing joining cluster. + */ + boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 56be045306..ff5824722d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -33,7 +33,6 @@ import java.util.stream.Collectors; import org.apache.helix.HelixDefinedState; import org.apache.helix.api.config.StateTransitionThrottleConfig; -import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; @@ -56,7 +55,7 @@ */ public class DelayedAutoRebalancer extends AbstractRebalancer { private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class); - private static final Set INSTANCE_OPERATION_TO_EXCLUDE = Set.of("EVACUATE", "SWAP_IN"); + public static final Set INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN"); @Override public IdealState computeNewIdealState(String resourceName, @@ -205,7 +204,7 @@ private static List filterOutOnOperationInstances(Map nodes) { return nodes.stream() .filter( - instance -> !INSTANCE_OPERATION_TO_EXCLUDE.contains(instanceConfigMap.get(instance).getInstanceOperation())) + instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) .collect(Collectors.toList()); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 15b38fcbce..44afee5e1b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -47,6 +47,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathBuilder; @@ -57,6 +58,7 @@ import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.api.topology.ClusterTopology; import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; @@ -407,10 +409,70 @@ public ZNRecord update(ZNRecord currentData) { } @Override - public void enableResource(final String clusterName, final String resourceName, - final boolean enabled) { - logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, - clusterName); + public boolean isEvacuateFinished(String clusterName, String instanceName) { + return !instanceHasCurrentSateOrMessage(clusterName, instanceName) && (getInstanceConfig(clusterName, + instanceName).getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name())); + } + + @Override + public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) { + return !instanceHasCurrentSateOrMessage(clusterName, instanceName) + && DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains( + getInstanceConfig(clusterName, instanceName).getInstanceOperation()); + } + + /** + * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline, + * instance has no active session, or if instance is online but has no current state or pending message. + * @param clusterName + * @param instanceName + * @return + */ + private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) { + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // check the instance is alive + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + if (liveInstance == null) { + logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName, + clusterName); + return false; + } + + BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + // count number of sessions under CurrentState folder. If it is carrying over from prv session, + // then there are > 1 session ZNodes. + List sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); + if (sessions.size() > 1) { + logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName, + clusterName); + return true; + } + + String sessionId = liveInstance.getEphemeralOwner(); + + String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId); + List currentStates = baseAccessor.getChildNames(path, 0); + if (currentStates == null) { + logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.", + instanceName, clusterName); + return false; + } + + // see if instance has pending message. + List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); + if (messages != null && !messages.isEmpty()) { + logger.warn("Instance {} in cluster {} has pending messages.", instanceName, clusterName); + return true; + } + + return !currentStates.isEmpty(); + } + + @Override + public void enableResource(final String clusterName, final String resourceName, final boolean enabled) { + logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName); String path = PropertyPathBuilder.idealState(clusterName, resourceName); BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); if (!baseAccessor.exists(path, 0)) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 3f459318b9..6c51d58bbc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -11,17 +11,18 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixRollbackException; import org.apache.helix.NotificationContext; import org.apache.helix.TestHelper; -import org.apache.helix.api.status.ClusterManagementMode; import org.apache.helix.common.ZkTestBase; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBucketDataAccessor; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -57,6 +58,8 @@ public class TestInstanceOperation extends ZkTestBase { private ZkHelixClusterVerifier _clusterVerifier; private ConfigAccessor _configAccessor; private long _stateModelDelay = 3L; + + private HelixAdmin _admin; protected AssignmentMetadataStore _assignmentMetadataStore; HelixDataAccessor _dataAccessor; @@ -91,6 +94,8 @@ public void beforeClass() throws Exception { createTestDBs(200); setUpWagedBaseline(); + + _admin = new ZKHelixAdmin(_gZkClient); } @Test @@ -106,7 +111,6 @@ public void testEvacuate() throws Exception { _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); - System.out.println("123"); Assert.assertTrue(_clusterVerifier.verifyByPolling()); // New ev should contain all instances but the evacuated one @@ -119,6 +123,9 @@ public void testEvacuate() throws Exception { Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); } + + Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); } @Test(dependsOnMethods = "testEvacuate") @@ -215,6 +222,8 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { TestHelper.verify( () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000); } + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); // sleep a bit so ST messages can start executing Thread.sleep(Math.abs(_stateModelDelay / 100)); @@ -261,6 +270,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception { // message should be pending at the to evacuate participant TestHelper.verify( () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000); + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); // cancel evacuation _gSetupTool.getClusterManagementTool() @@ -323,7 +333,7 @@ public void testMarkEvacuationAfterEMM() throws Exception { Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); } - + Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); } @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 81993475b8..512a7b4db7 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -550,4 +550,14 @@ public Map validateInstancesForWagedRebalance(String clusterNam List instancesNames) { return null; } + + @Override + public boolean isEvacuateFinished(String clusterName, String instancesNames) { + return false; + } + + @Override + public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) { + return false; + } }