diff --git a/.github/workflows/Helix-PR-CI.yml b/.github/workflows/Helix-PR-CI.yml index b0547facd3..d86c8d8af2 100644 --- a/.github/workflows/Helix-PR-CI.yml +++ b/.github/workflows/Helix-PR-CI.yml @@ -1,7 +1,7 @@ name: Helix PR CI on: pull_request: - branches: [ master, metaclient ] # TODO: remove side branch + branches: [ master, metaclient, ApplicationClusterManager] # TODO: remove side branch paths-ignore: - '.github/**' - 'helix-front/**' diff --git a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java index 5bacef7a96..e2cc2de2d5 100644 --- a/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java +++ b/helix-common/src/main/java/org/apache/helix/constants/InstanceConstants.java @@ -8,4 +8,10 @@ public enum InstanceDisabledType { USER_OPERATION, DEFAULT_INSTANCE_DISABLE_TYPE } + + public enum InstanceOperation { + EVACUATE, // Node will be removed after a period of time + SWAP_IN, // New node joining for swap operation + SWAP_OUT // Existing Node to be removed for swap operation + } } diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 6c6d4be5ce..085a987b1e 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -302,6 +302,9 @@ void enableInstance(String clusterName, String instanceName, boolean enabled, */ void enableInstance(String clusterName, List instances, boolean enabled); + void setInstanceOperation(String clusterName, String instance, + InstanceConstants.InstanceOperation instanceOperation); + /** * Disable or enable a resource * @param clusterName @@ -550,6 +553,12 @@ CustomizedView getResourceCustomizedView(String clusterName, String resourceName */ void rebalance(String clusterName, String resourceName, int replica); + /** + * Rebalance a cluster without respecting the delay + * @param clusterName + */ + void onDemandRebalance(String clusterName); + /** * Add ideal state using a json format file * @param clusterName @@ -729,4 +738,21 @@ Map validateResourcesForWagedRebalance(String clusterName, */ Map validateInstancesForWagedRebalance(String clusterName, List instancesNames); + + /** + * Return if instance operation 'Evacuate' is finished. + * @param clusterName + * @param instancesNames + * @return Return true if there is no current state nor pending message on the instance. + */ + boolean isEvacuateFinished(String clusterName, String instancesNames); + + /** + * Return if instance is ready for preparing joining cluster. The instance should have no current state, + * no pending message and tagged with operation that exclude the instance from Helix assignment. + * @param clusterName + * @param instancesNames + * @return true if the instance is ready for preparing joining cluster. + */ + boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index ad36b50195..ff5824722d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import org.apache.helix.HelixDefinedState; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -39,6 +40,7 @@ import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -53,6 +55,7 @@ */ public class DelayedAutoRebalancer extends AbstractRebalancer { private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class); + public static final Set INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = Set.of("EVACUATE", "SWAP_IN"); @Override public IdealState computeNewIdealState(String resourceName, @@ -109,14 +112,12 @@ public IdealState computeNewIdealState(String resourceName, allNodes = clusterData.getAllInstances(); } - Set activeNodes = liveEnabledNodes; + long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig); + Set activeNodes = DelayedRebalanceUtil + .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes, + clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), + clusterData.getInstanceConfigMap(), delay, clusterConfig); if (delayRebalanceEnabled) { - long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig); - activeNodes = DelayedRebalanceUtil - .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes, - clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), - clusterData.getInstanceConfigMap(), delay, clusterConfig); - Set offlineOrDisabledInstances = new HashSet<>(activeNodes); offlineOrDisabledInstances.removeAll(liveEnabledNodes); DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true, @@ -157,15 +158,20 @@ public IdealState computeNewIdealState(String resourceName, // sort node lists to ensure consistent preferred assignments List allNodeList = new ArrayList<>(allNodes); - List liveEnabledNodeList = new ArrayList<>(liveEnabledNodes); + // We will not assign partition to instances with evacuation and wap-out tag. + // TODO: Currently we have 2 groups of instances and compute preference list twice and merge. + // Eventually we want to have exclusive groups of instance for different instance tag. + List liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), + liveEnabledNodes); Collections.sort(allNodeList); - Collections.sort(liveEnabledNodeList); + Collections.sort(liveEnabledAssignableNodeList); ZNRecord newIdealMapping = _rebalanceStrategy - .computePartitionAssignment(allNodeList, liveEnabledNodeList, currentMapping, clusterData); + .computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData); ZNRecord finalMapping = newIdealMapping; - if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)) { + if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig) + || liveEnabledAssignableNodeList.size()!= activeNodes.size()) { List activeNodeList = new ArrayList<>(activeNodes); Collections.sort(activeNodeList); int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica( @@ -194,6 +200,14 @@ public IdealState computeNewIdealState(String resourceName, return idealState; } + private static List filterOutOnOperationInstances(Map instanceConfigMap, + Set nodes) { + return nodes.stream() + .filter( + instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) + .collect(Collectors.toList()); + } + private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState, ZNRecord newMapping) { IdealState newIdealState = new IdealState(resourceName); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java index ee8804749a..f42176a0a4 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.helix.HelixManager; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; @@ -82,37 +83,37 @@ public static long getRebalanceDelay(IdealState idealState, ClusterConfig cluste } /** - * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster - * delay rebalance configurations. + * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes) + * while considering cluster delay rebalance configurations. */ public static Set getActiveNodes(Set allNodes, Set liveEnabledNodes, Map instanceOfflineTimeMap, Set liveNodes, Map instanceConfigMap, ClusterConfig clusterConfig) { if (!isDelayRebalanceEnabled(clusterConfig)) { - return new HashSet<>(liveEnabledNodes); + return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes); } return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes, instanceConfigMap, clusterConfig.getRebalanceDelayTime(), clusterConfig); } /** - * @return all active nodes (live nodes plus offline-yet-active nodes) while considering cluster - * and the resource delay rebalance configurations. + * @return all active nodes (live nodes not marked as evacuate plus offline-yet-active nodes) + * while considering cluster delay rebalance configurations. */ public static Set getActiveNodes(Set allNodes, IdealState idealState, Set liveEnabledNodes, Map instanceOfflineTimeMap, Set liveNodes, Map instanceConfigMap, long delay, ClusterConfig clusterConfig) { if (!isDelayRebalanceEnabled(idealState, clusterConfig)) { - return new HashSet<>(liveEnabledNodes); + return filterOutEvacuatingInstances(instanceConfigMap, liveEnabledNodes); } return getActiveNodes(allNodes, liveEnabledNodes, instanceOfflineTimeMap, liveNodes, instanceConfigMap, delay, clusterConfig); } private static Set getActiveNodes(Set allNodes, Set liveEnabledNodes, - Map instanceOfflineTimeMap, Set liveNodes, - Map instanceConfigMap, long delay, ClusterConfig clusterConfig) { - Set activeNodes = new HashSet<>(liveEnabledNodes); + Map instanceOfflineTimeMap, Set liveNodes, Map instanceConfigMap, + long delay, ClusterConfig clusterConfig) { + Set activeNodes = new HashSet<>(liveEnabledNodes); Set offlineOrDisabledInstances = new HashSet<>(allNodes); offlineOrDisabledInstances.removeAll(liveEnabledNodes); long currentTime = System.currentTimeMillis(); @@ -125,19 +126,38 @@ private static Set getActiveNodes(Set allNodes, Set live activeNodes.add(ins); } } - return activeNodes; + // TODO: change this after merging operation and helix-enable field. + return filterOutEvacuatingInstances(instanceConfigMap, activeNodes); + } + + public static Set filterOutEvacuatingInstances(Map instanceConfigMap, + Set nodes) { + return nodes.stream() + .filter(instance -> !instanceConfigMap.get(instance).getInstanceOperation().equals( + InstanceConstants.InstanceOperation.EVACUATE.name())) + .collect(Collectors.toSet()); } /** - * @return The time when an offline or disabled instance should be treated as inactive. - * Return -1 if it is inactive now. + * Return the time when an offline or disabled instance should be treated as inactive. Return -1 + * if it is inactive now or forced to be rebalanced by an on-demand rebalance. + * + * @return A timestamp that represents the expected inactive time of a node. */ private static long getInactiveTime(String instance, Set liveInstances, Long offlineTime, long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) { long inactiveTime = Long.MAX_VALUE; + long lastOnDemandRebalanceTime = clusterConfig.getLastOnDemandRebalanceTimestamp(); - // check the time instance went offline. + // Check if the given instance is offline if (!liveInstances.contains(instance)) { + // Check if the offline instance is forced to be rebalanced by an on-demand rebalance. + // If so, return it as an inactive instance. + if (isInstanceForcedToBeRebalanced(offlineTime, delay, lastOnDemandRebalanceTime)) { + return -1L; + } + + // Check the time instance went offline. if (offlineTime != null && offlineTime > 0 && offlineTime + delay < inactiveTime) { inactiveTime = offlineTime + delay; } @@ -154,6 +174,13 @@ private static long getInactiveTime(String instance, Set liveInstances, disabledTime = batchDisableTime; } } + + // Check if the disabled instance is forced to be rebalanced by an on-demand rebalance. + // If so, return it as an inactive instance. + if (isInstanceForcedToBeRebalanced(disabledTime, delay, lastOnDemandRebalanceTime)) { + return -1L; + } + if (disabledTime > 0 && disabledTime + delay < inactiveTime) { inactiveTime = disabledTime + delay; } @@ -417,6 +444,33 @@ private static int getMinActiveReplica(ResourceControllerDataProvider clusterDat currentIdealState), currentIdealState, numReplica); } + /** + * Given the offline/disabled time, delay, and the last on-demand rebalance time, this method checks + * if the node associated with the offline/disabled time is forced to be rebalanced by the on-demand + * rebalance. + * 1. If either the last on-demand rebalance time or the offline/disabled time is unavailable, then + * the node is not forced to be rebalanced. + * 2. If the current time doesn't surpass the delayed offline/disabled time and the last on-demand + * rebalance time is after the offline/disabled time, then the node is forced to be rebalanced. + * + * @param offlineOrDisabledTime A unix timestamp indicating the most recent time when a node went + * offline or was disabled. + * @param delay The delay window configuration of the current cluster + * @param lastOnDemandRebalanceTime A unix timestamp representing the most recent time when an + * on-demand rebalance was triggered. + * @return A boolean indicating whether a node is forced to be rebalanced + */ + private static boolean isInstanceForcedToBeRebalanced(Long offlineOrDisabledTime, long delay, + long lastOnDemandRebalanceTime) { + if (lastOnDemandRebalanceTime == -1 || offlineOrDisabledTime == null + || offlineOrDisabledTime <= 0 || System.currentTimeMillis() > (offlineOrDisabledTime + + delay)) { + return false; + } + + return offlineOrDisabledTime < lastOnDemandRebalanceTime; + } + /** * For the resource in the cluster, find additional AssignableReplica to close the gap on minActiveReplica. * @param clusterData Cluster data cache. diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 63986e6b5d..44afee5e1b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -47,6 +47,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathBuilder; @@ -57,6 +58,7 @@ import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.api.topology.ClusterTopology; import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; @@ -375,10 +377,102 @@ public void enableInstance(String clusterName, List instances, boolean e } @Override - public void enableResource(final String clusterName, final String resourceName, - final boolean enabled) { - logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, - clusterName); + // TODO: Name may change in future + public void setInstanceOperation(String clusterName, String instanceName, + InstanceConstants.InstanceOperation instanceOperation) { + + BaseDataAccessor baseAccessor = new ZkBaseDataAccessor<>(_zkClient); + String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + + if (!baseAccessor.exists(path, 0)) { + throw new HelixException( + "Cluster " + clusterName + ", instance: " + instanceName + ", instance config does not exist"); + } + + boolean succeeded = baseAccessor.update(path, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException( + "Cluster: " + clusterName + ", instance: " + instanceName + ", participant config is null"); + } + + InstanceConfig config = new InstanceConfig(currentData); + config.setInstanceOperation(instanceOperation); + return config.getRecord(); + } + }, AccessOption.PERSISTENT); + + if (!succeeded) { + throw new HelixException("Failed to update instance operation. Please check if instance is disabled."); + } + } + + @Override + public boolean isEvacuateFinished(String clusterName, String instanceName) { + return !instanceHasCurrentSateOrMessage(clusterName, instanceName) && (getInstanceConfig(clusterName, + instanceName).getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name())); + } + + @Override + public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) { + return !instanceHasCurrentSateOrMessage(clusterName, instanceName) + && DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains( + getInstanceConfig(clusterName, instanceName).getInstanceOperation()); + } + + /** + * Return true if Instance has any current state or pending message. Otherwise, return false if instance is offline, + * instance has no active session, or if instance is online but has no current state or pending message. + * @param clusterName + * @param instanceName + * @return + */ + private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) { + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // check the instance is alive + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + if (liveInstance == null) { + logger.warn("Instance {} in cluster {} is not alive. The instance can be removed anyway.", instanceName, + clusterName); + return false; + } + + BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + // count number of sessions under CurrentState folder. If it is carrying over from prv session, + // then there are > 1 session ZNodes. + List sessions = baseAccessor.getChildNames(PropertyPathBuilder.instanceCurrentState(clusterName, instanceName), 0); + if (sessions.size() > 1) { + logger.warn("Instance {} in cluster {} is carrying over from prev session.", instanceName, + clusterName); + return true; + } + + String sessionId = liveInstance.getEphemeralOwner(); + + String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId); + List currentStates = baseAccessor.getChildNames(path, 0); + if (currentStates == null) { + logger.warn("Instance {} in cluster {} does not have live session. The instance can be removed anyway.", + instanceName, clusterName); + return false; + } + + // see if instance has pending message. + List messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); + if (messages != null && !messages.isEmpty()) { + logger.warn("Instance {} in cluster {} has pending messages.", instanceName, clusterName); + return true; + } + + return !currentStates.isEmpty(); + } + + @Override + public void enableResource(final String clusterName, final String resourceName, final boolean enabled) { + logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName, clusterName); String path = PropertyPathBuilder.idealState(clusterName, resourceName); BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); if (!baseAccessor.exists(path, 0)) { @@ -1498,6 +1592,28 @@ public void rebalance(String clusterName, String resourceName, int replica) { rebalance(clusterName, resourceName, replica, resourceName, ""); } + @Override + public void onDemandRebalance(String clusterName) { + BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + String path = PropertyPathBuilder.clusterConfig(clusterName); + + if (!baseAccessor.exists(path, 0)) { + throw new HelixException("Cluster " + clusterName + ": cluster config does not exist"); + } + + baseAccessor.update(path, new DataUpdater() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ": cluster config is null"); + } + ClusterConfig clusterConfig = new ClusterConfig(currentData); + clusterConfig.setLastOnDemandRebalanceTimestamp(System.currentTimeMillis()); + return clusterConfig.getRecord(); + } + }, AccessOption.PERSISTENT); + } + @Override public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix, String group) { diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 8f04c5fabf..e33b902049 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -151,7 +151,10 @@ public enum ClusterConfigProperty { HELIX_ENABLED_DISABLE_TIMESTAMP, HELIX_DISABLED_REASON, // disabled type should be a enum of org.apache.helix.constants.InstanceConstants.InstanceDisabledType - HELIX_DISABLED_TYPE + HELIX_DISABLED_TYPE, + + // The last time when the on-demand rebalance is triggered. + LAST_ON_DEMAND_REBALANCE_TIMESTAMP } public enum GlobalRebalancePreferenceKey { @@ -188,6 +191,7 @@ public enum GlobalRebalancePreferenceKey { private static final int GLOBAL_TARGET_TASK_THREAD_POOL_SIZE_NOT_SET = -1; private static final int OFFLINE_NODE_TIME_OUT_FOR_MAINTENANCE_MODE_NOT_SET = -1; private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30; + private final static long DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP = -1L; /** * Instantiate for a specific cluster @@ -1173,4 +1177,25 @@ public String getInstanceHelixDisabledTimeStamp(String instanceName) { } return getDisabledInstances().get(instanceName); } + + /** + * Get a unix time that represents the last time the on-demand rebalance is triggered on the + * current cluster. Return -1 if the configuration doesn't have such record yet. + * + * @return the last on-demand rebalance timestamp in a unix format + */ + public long getLastOnDemandRebalanceTimestamp() { + return _record.getLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + DEFAULT_LAST_ON_DEMAND_REBALANCE_TIMESTAMP); + } + + /** + * Set the last on demand rebalance time to be the given timestamp. + * + * @param rebalanceTimestamp + */ + public void setLastOnDemandRebalanceTimestamp(long rebalanceTimestamp) { + _record.setLongField(ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + rebalanceTimestamp); + } } diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index 01c1f682f1..45e0476ba0 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -61,7 +61,8 @@ public enum InstanceConfigProperty { DELAY_REBALANCE_ENABLED, MAX_CONCURRENT_TASK, INSTANCE_CAPACITY_MAP, - TARGET_TASK_THREAD_POOL_SIZE + TARGET_TASK_THREAD_POOL_SIZE, + INSTANCE_OPERATION } public static final int WEIGHT_NOT_SET = -1; @@ -263,9 +264,13 @@ public boolean getInstanceEnabled() { * @param enabled true to enable, false to disable */ public void setInstanceEnabled(boolean enabled) { + // set instance operation only when we need to change InstanceEnabled value. + setInstanceEnabledHelper(enabled); + } + + private void setInstanceEnabledHelper(boolean enabled) { _record.setBooleanField(InstanceConfigProperty.HELIX_ENABLED.toString(), enabled); - _record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), - System.currentTimeMillis()); + _record.setLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), System.currentTimeMillis()); if (enabled) { resetInstanceDisabledTypeAndReason(); } @@ -329,6 +334,15 @@ public long getInstanceEnabledTime() { return _record.getLongField(InstanceConfigProperty.HELIX_ENABLED_TIMESTAMP.name(), -1); } + public void setInstanceOperation(InstanceConstants.InstanceOperation operation) { + _record.setSimpleField(InstanceConfigProperty.INSTANCE_OPERATION.name(), + operation == null ? "" : operation.name()); + } + + public String getInstanceOperation() { + return _record.getStringField(InstanceConfigProperty.INSTANCE_OPERATION.name(), ""); + } + /** * Check if this instance is enabled for a given partition * This API is deprecated, and will be removed in next major release. diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java index 1cea1926ab..0218c3ffcb 100644 --- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java @@ -356,6 +356,14 @@ protected void setDelayTimeInCluster(HelixZkClient zkClient, String clusterName, configAccessor.setClusterConfig(clusterName, clusterConfig); } + protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient, + String clusterName, long lastOnDemandTime) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java index 958f27c83d..d5e8239504 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalance.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.ConfigAccessor; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; @@ -35,6 +36,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -51,6 +53,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase { // TODO: remove this wait time once we have a better way to determine if the rebalance has been // TODO: done as a reaction of the test operations. protected static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME; + protected static final String OFFLINE_NODE = "offline"; + protected static final String DISABLED_NODE = "disabled"; protected final String CLASS_NAME = getShortClassName(); protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; @@ -61,6 +65,8 @@ public class TestDelayedAutoRebalance extends ZkTestBase { protected int _minActiveReplica = _replica - 1; protected ZkHelixClusterVerifier _clusterVerifier; protected List _testDBs = new ArrayList<>(); + protected String _testingCondition = OFFLINE_NODE; + protected ConfigAccessor _configAccessor; @BeforeClass public void beforeClass() throws Exception { @@ -90,6 +96,7 @@ public void beforeClass() throws Exception { .build(); enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + _testingCondition = OFFLINE_NODE; } protected String[] TestStateModels = { @@ -233,6 +240,76 @@ public void testDisableDelayRebalanceInInstance() throws Exception { enableDelayRebalanceInInstance(_gZkClient, CLUSTER_NAME, disabledInstanceName, true); } + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map externalViewsBefore = createTestDBs(-1); + boolean isDisabled = _testingCondition.equals(DISABLED_NODE); + if (isDisabled) { + // disable one node and make sure no partition movement + validateDelayedMovementsOnDisabledNode(externalViewsBefore); + } else { + // stop one node and make sure no partition movement + validateDelayedMovements(externalViewsBefore); + } + + // trigger an on-demand rebalance and partitions on the offline/disabled node should move + validateMovementAfterOnDemandRebalance(externalViewsBefore, null,true, isDisabled); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true); + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, 1000000); + Map externalViewsBefore = createTestDBs(-1); + boolean isDisabled = _testingCondition.equals(DISABLED_NODE); + if (isDisabled) { + // disable one node and make sure no partition movement + validateDelayedMovementsOnDisabledNode(externalViewsBefore); + } else { + // stop one node and make sure no partition movement + validateDelayedMovements(externalViewsBefore); + } + + // trigger an on-demand rebalance and partitions on the offline/disabled node shouldn't move + // because the last on-demand timestamp is expired. + validateMovementAfterOnDemandRebalance(externalViewsBefore, 1L, false, isDisabled); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + long delay = 4000; + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, delay); + Map externalViewsBefore = createTestDBs(-1); + boolean isDisabled = _testingCondition.equals(DISABLED_NODE); + if (isDisabled) { + // disable one node and make sure no partition movement + validateDelayedMovementsOnDisabledNode(externalViewsBefore); + } else { + // stop one node and make sure no partition movement + validateDelayedMovements(externalViewsBefore); + } + + Thread.sleep(delay); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + // after delay time, it should maintain required number of replicas + externalViewsBefore = validatePartitionMovement(externalViewsBefore, true, isDisabled); + + // trigger an on-demand rebalance and partitions on the offline/disabled node shouldn't move + // because the last on-demand timestamp is expired. + validateMovementAfterOnDemandRebalance(externalViewsBefore, null,false, isDisabled); + + setDelayTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, -1); + } + @AfterMethod public void afterTest() throws InterruptedException { // delete all DBs create in last test @@ -304,6 +381,54 @@ protected void validateNoPartitionMove(IdealState is, ExternalView evBefore, Ext } + protected void validateMovementAfterOnDemandRebalance( + Map externalViewsBefore, Long lastOnDemandTime, boolean isPartitionMoved, + boolean isDisabled) { + if (lastOnDemandTime == null) { + _gSetupTool.getClusterManagementTool().onDemandRebalance(CLUSTER_NAME); + } else { + setLastOnDemandRebalanceTimeInCluster(_gZkClient, CLUSTER_NAME, lastOnDemandTime); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validatePartitionMovement(externalViewsBefore, isPartitionMoved, isDisabled); + } + + protected Map validatePartitionMovement( + Map externalViewsBefore, boolean isPartitionMoved, boolean isDisabled) { + Map externalViewAfter = new HashMap<>(); + for (String db : _testDBs) { + ExternalView ev = + _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + if (isPartitionMoved) { + validateMinActiveAndTopStateReplica(is, ev, _replica, NUM_NODE); + validateNoPartitionOnInstance(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName()); + } else { + validateMinActiveAndTopStateReplica(is, ev, _minActiveReplica, NUM_NODE); + validateNoPartitionMove(is, externalViewsBefore.get(db), ev, + _participants.get(0).getInstanceName(), isDisabled); + } + externalViewAfter.put(db, ev); + } + return externalViewAfter; + } + + protected void validateNoPartitionOnInstance(IdealState is, ExternalView evBefore, + ExternalView evAfter, String instanceName) { + for (String partition : is.getPartitionSet()) { + Map assignmentsBefore = evBefore.getRecord().getMapField(partition); + Map assignmentsAfter = evAfter.getRecord().getMapField(partition); + Set instancesAfter = new HashSet(assignmentsAfter.keySet()); + + // the offline/disabled instance shouldn't have a partition assignment after rebalance + Assert.assertFalse(instancesAfter.contains(instanceName), String.format( + "%s is still on the instance after rebalance, before: %s, after: %s, instance: %s", + partition, assignmentsBefore.toString(), assignmentsAfter.toString(), instanceName)); + } + } + private void validateDelayedMovements(Map externalViewsBefore) throws InterruptedException { _participants.get(0).syncStop(); @@ -318,6 +443,25 @@ private void validateDelayedMovements(Map externalViewsBef } } + protected void enableInstance(String instance, boolean enabled) { + // Disable one node, no partition should be moved. + long currentTime = System.currentTimeMillis(); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled); + InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance); + Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled); + Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime); + Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 100); + } + + protected void validateDelayedMovementsOnDisabledNode(Map externalViewsBefore) + throws Exception { + enableInstance(_participants.get(0).getInstanceName(), false); + Thread.sleep(DEFAULT_REBALANCE_PROCESSING_WAIT_TIME); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + validatePartitionMovement(externalViewsBefore, false, true); + } + @AfterClass public void afterClass() throws Exception { if (_clusterVerifier != null) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java index 3e5eadd0f5..7fa1426d68 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithDisabledInstance.java @@ -25,7 +25,6 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -33,12 +32,11 @@ public class TestDelayedAutoRebalanceWithDisabledInstance extends TestDelayedAutoRebalance { - private ConfigAccessor _configAccessor; - @BeforeClass public void beforeClass() throws Exception { super.beforeClass(); _configAccessor = new ConfigAccessor(_gZkClient); + _testingCondition = DISABLED_NODE; } @@ -292,6 +290,21 @@ public void testDisableDelayRebalanceInInstance() throws Exception { super.testDisableDelayRebalanceInInstance(); } + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } + @BeforeMethod public void beforeTest() { // restart any participant that has been disconnected from last test. @@ -304,14 +317,4 @@ public void beforeTest() { enableInstance(_participants.get(i).getInstanceName(), true); } } - - private void enableInstance(String instance, boolean enabled) { - // Disable one node, no partition should be moved. - long currentTime = System.currentTimeMillis(); - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instance, enabled); - InstanceConfig instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, instance); - Assert.assertEquals(instanceConfig.getInstanceEnabled(), enabled); - Assert.assertTrue(instanceConfig.getInstanceEnabledTime() >= currentTime); - Assert.assertTrue(instanceConfig.getInstanceEnabledTime() <= currentTime + 100); - } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java index 9a2ccf90a2..4af45320dc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/DelayedAutoRebalancer/TestDelayedAutoRebalanceWithRackaware.java @@ -114,4 +114,18 @@ public void testDisableDelayRebalanceInCluster() throws Exception { public void testDisableDelayRebalanceInInstance() throws Exception { super.testDisableDelayRebalanceInInstance(); } + + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java new file mode 100644 index 0000000000..6c51d58bbc --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -0,0 +1,532 @@ +package org.apache.helix.integration.rebalancer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixRollbackException; +import org.apache.helix.NotificationContext; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBucketDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; +import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestInstanceOperation extends ZkTestBase { + protected final int NUM_NODE = 6; + protected static final int START_PORT = 12918; + protected static final int PARTITIONS = 20; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + private int REPLICA = 3; + protected ClusterControllerManager _controller; + List _participants = new ArrayList<>(); + List _participantNames = new ArrayList<>(); + private Set _allDBs = new HashSet<>(); + private ZkHelixClusterVerifier _clusterVerifier; + private ConfigAccessor _configAccessor; + private long _stateModelDelay = 3L; + + private HelixAdmin _admin; + protected AssignmentMetadataStore _assignmentMetadataStore; + HelixDataAccessor _dataAccessor; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < NUM_NODE; i++) { + String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + addParticipant(participantName); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setDeactivatedNodeAwareness(true) + .setResources(_allDBs) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + _configAccessor = new ConfigAccessor(_gZkClient); + _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.stateTransitionCancelEnabled(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + createTestDBs(200); + + setUpWagedBaseline(); + + _admin = new ZKHelixAdmin(_gZkClient); + } + + @Test + public void testEvacuate() throws Exception { + // EV should contain all participants, check resources one by one + Map assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + } + + // evacuated instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // New ev should contain all instances but the evacuated one + assignment = getEV(); + List currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + Set newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + + Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); + } + + @Test(dependsOnMethods = "testEvacuate") + public void testRevertEvacuation() throws Exception { + + // revert an evacuate instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + Map assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testRevertEvacuation") + public void testAddingNodeWithEvacuationTag() throws Exception { + // first disable and instance, and wait for all replicas to be moved out + String mockNewInstance = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, mockNewInstance, false); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + //ev should contain all instances but the disabled one + Map assignment = getEV(); + List currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource), REPLICA-1); + Set newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + + // add evacuate tag and enable instance + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, mockNewInstance, InstanceConstants.InstanceOperation.EVACUATE); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, mockNewInstance, true); + //ev should be the same + assignment = getEV(); + currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource), REPLICA-1); + Set newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(mockNewInstance)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + + // now remove operation tag + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag") + public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { + // add a resource where downward state transition is slow + createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA, + REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName()); + _allDBs.add("TEST_DB3_DELAYED_CRUSHED"); + // add a resource where downward state transition is slow + createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave", + PARTITIONS, REPLICA, REPLICA - 1); + _allDBs.add("TEST_DB4_DELAYED_WAGED"); + // wait for assignment to finish + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // set bootstrap ST delay to a large number + _stateModelDelay = -10000L; + // evacuate an instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + // Messages should be pending at all instances besides the evacuate one + for (String participant : _participantNames) { + if (participant.equals(instanceToEvacuate)) { + continue; + } + TestHelper.verify( + () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000); + } + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + Assert.assertFalse(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); + + // sleep a bit so ST messages can start executing + Thread.sleep(Math.abs(_stateModelDelay / 100)); + // before we cancel, check current EV + Map assignment = getEV(); + for (String resource : _allDBs) { + // check every replica has >= 3 partitions and a top state partition + validateAssignmentInEv(assignment.get(resource)); + } + + // cancel the evacuation + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + + assignment = getEV(); + for (String resource : _allDBs) { + // check every replica has >= 3 active replicas, even before cluster converge + validateAssignmentInEv(assignment.get(resource)); + } + + // check cluster converge. We have longer delay for ST then verifier timeout. It will only converge if we cancel ST. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + // check every replica has >= 3 active replicas again + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish") + public void testEvacuateAndCancelBeforeDropFinish() throws Exception { + + // set DROP ST delay to a large number + _stateModelDelay = 10000L; + + // evacuate an instance + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + + // message should be pending at the to evacuate participant + TestHelper.verify( + () -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000); + Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate)); + + // cancel evacuation + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); + // check every replica has >= 3 active replicas, even before cluster converge + Map assignment = getEV(); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // EV should contain all participants, check resources one by one + assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); + // check every replica has >= 3 active replicas + validateAssignmentInEv(assignment.get(resource)); + } + } + + @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish") + public void testMarkEvacuationAfterEMM() throws Exception { + _stateModelDelay = 1000L; + Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME)); + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, + null); + addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE)); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Map assignment = getEV(); + for (String resource : _allDBs) { + Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE))); + } + + // set evacuate operation + String instanceToEvacuate = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, InstanceConstants.InstanceOperation.EVACUATE); + + // there should be no evacuation happening + for (String resource : _allDBs) { + Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).contains(instanceToEvacuate)); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // exit MM + _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, + null); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + assignment = getEV(); + List currentActiveInstances = + _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); + for (String resource : _allDBs) { + validateAssignmentInEv(assignment.get(resource)); + Set newPAssignedParticipants = getParticipantsInEv(assignment.get(resource)); + Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate)); + Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); + } + Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); + } + + @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") + public void testEvacuationWithOfflineInstancesInCluster() throws Exception { + _participants.get(2).syncStop(); + _participants.get(3).syncStop(); + // wait for converge, and set evacuate on instance 0 + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + String evacuateInstanceName = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE); + + Map assignment; + List currentActiveInstances = + _participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList()); + TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION); + + _participants.get(3).syncStart(); + _participants.get(2).syncStart(); + } + + + private void addParticipant(String participantName) { + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName); + + // start dummy participants + MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); + StateMachineEngine stateMachine = participant.getStateMachineEngine(); + // Using a delayed state model + StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory(); + stateMachine.registerStateModelFactory("MasterSlave", delayFactory); + + participant.syncStart(); + _participants.add(participant); + _participantNames.add(participantName); + } + + private void createTestDBs(long delayTime) throws InterruptedException { + createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED", + BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1, + CrushEdRebalanceStrategy.class.getName()); + _allDBs.add("TEST_DB0_CRUSHED"); + createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB1_CRUSHED", + BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, 2000000, + CrushEdRebalanceStrategy.class.getName()); + _allDBs.add("TEST_DB1_CRUSHED"); + createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB2_WAGED", BuiltInStateModelDefinitions.LeaderStandby.name(), + PARTITIONS, REPLICA, REPLICA - 1); + _allDBs.add("TEST_DB2_WAGED"); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + private Map getEV() { + Map externalViews = new HashMap(); + for (String db : _allDBs) { + ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); + externalViews.put(db, ev); + } + return externalViews; + } + + private boolean verifyIS(String evacuateInstanceName) { + for (String db : _allDBs) { + IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + for (String partition : is.getPartitionSet()) { + List newPAssignedParticipants = is.getPreferenceList(partition); + if (newPAssignedParticipants.contains(evacuateInstanceName)) { + System.out.println("partition " + partition + " assignment " + newPAssignedParticipants + " ev " + evacuateInstanceName); + return false; + } + } + } + return true; + } + + private Set getParticipantsInEv(ExternalView ev) { + Set assignedParticipants = new HashSet<>(); + for (String partition : ev.getPartitionSet()) { + ev.getStateMap(partition) + .keySet() + .stream() + .filter(k -> !ev.getStateMap(partition).get(k).equals("OFFLINE")) + .forEach(assignedParticipants::add); + } + return assignedParticipants; + } + + // verify that each partition has >=REPLICA (3 in this case) replicas + + private void validateAssignmentInEv(ExternalView ev) { + validateAssignmentInEv(ev, REPLICA); + } + + private void validateAssignmentInEv(ExternalView ev, int expectedNumber) { + Set partitionSet = ev.getPartitionSet(); + for (String partition : partitionSet) { + AtomicInteger activeReplicaCount = new AtomicInteger(); + ev.getStateMap(partition) + .values() + .stream() + .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY")) + .forEach(v -> activeReplicaCount.getAndIncrement()); + Assert.assertTrue(activeReplicaCount.get() >=expectedNumber); + } + } + + private void setUpWagedBaseline() { + _assignmentMetadataStore = new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), CLUSTER_NAME) { + public Map getBaseline() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBaseline(); + } + + public synchronized Map getBestPossibleAssignment() { + // Ensure this metadata store always read from the ZK without using cache. + super.reset(); + return super.getBestPossibleAssignment(); + } + }; + + // Set test instance capacity and partition weights + ClusterConfig clusterConfig = _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig()); + String testCapacityKey = "TestCapacityKey"; + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); + clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig); + } + + // A state transition model where either downward ST are slow (_stateModelDelay >0) or upward ST are slow (_stateModelDelay <0) + public class StDelayMSStateModelFactory extends StateModelFactory { + + @Override + public StDelayMSStateModel createNewStateModel(String resourceName, String partitionKey) { + StDelayMSStateModel model = new StDelayMSStateModel(); + return model; + } + } + + @StateModelInfo(initialState = "OFFLINE", states = {"MASTER", "SLAVE", "ERROR"}) + public class StDelayMSStateModel extends StateModel { + + public StDelayMSStateModel() { + _cancelled = false; + } + + private void sleepWhileNotCanceled(long sleepTime) throws InterruptedException{ + while(sleepTime >0 && !isCancelled()) { + Thread.sleep(5000); + sleepTime = sleepTime - 5000; + } + if (isCancelled()) { + _cancelled = false; + throw new HelixRollbackException("EX"); + } + } + + @Transition(to = "SLAVE", from = "OFFLINE") + public void onBecomeSlaveFromOffline(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay < 0) { + sleepWhileNotCanceled(Math.abs(_stateModelDelay)); + } + } + + @Transition(to = "MASTER", from = "SLAVE") + public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay < 0) { + sleepWhileNotCanceled(Math.abs(_stateModelDelay)); + } + } + + @Transition(to = "SLAVE", from = "MASTER") + public void onBecomeSlaveFromMaster(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay > 0) { + sleepWhileNotCanceled(_stateModelDelay); + } + } + + @Transition(to = "OFFLINE", from = "SLAVE") + public void onBecomeOfflineFromSlave(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay > 0) { + sleepWhileNotCanceled(_stateModelDelay); + } + } + + @Transition(to = "DROPPED", from = "OFFLINE") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) throws InterruptedException { + if (_stateModelDelay > 0) { + sleepWhileNotCanceled(_stateModelDelay); + } + } + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java index f2aa3025e1..5f806669d8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalance.java @@ -86,4 +86,19 @@ public void testDisableDelayRebalanceInCluster() throws Exception { public void testDisableDelayRebalanceInInstance() throws Exception { super.testDisableDelayRebalanceInInstance(); } + + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java index 70b8adf5fa..84d86c4a79 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestDelayedWagedRebalanceWithDisabledInstance.java @@ -98,4 +98,19 @@ public void testDisableDelayRebalanceInInstance() throws Exception { super.testDisableDelayRebalanceInInstance(); } + + @Test(dependsOnMethods = {"testDisableDelayRebalanceInInstance"}) + public void testOnDemandRebalance() throws Exception { + super.testOnDemandRebalance(); + } + + @Test(dependsOnMethods = {"testOnDemandRebalance"}) + public void testExpiredOnDemandRebalanceTimestamp() throws Exception { + super.testExpiredOnDemandRebalanceTimestamp(); + } + + @Test(dependsOnMethods = {"testExpiredOnDemandRebalanceTimestamp"}) + public void testOnDemandRebalanceAfterDelayRebalanceHappen() throws Exception { + super.testOnDemandRebalanceAfterDelayRebalanceHappen(); + } } diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index ec25d42beb..512a7b4db7 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -23,8 +23,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.TreeMap; +import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -50,8 +50,7 @@ import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.zookeeper.datamodel.ZNRecord; - -import static org.apache.helix.manager.zk.ZKHelixAdmin.assembleInstanceBatchedDisabledInfo; +import org.apache.helix.zookeeper.zkclient.DataUpdater; public class MockHelixAdmin implements HelixAdmin { @@ -306,6 +305,11 @@ public void enableInstance(String clusterName, List instances, boolean e } + @Override + public void setInstanceOperation(String clusterName, String instanceName, + InstanceConstants.InstanceOperation instanceOperation) { + } + @Override public void enableResource(String clusterName, String resourceName, boolean enabled) { @@ -444,6 +448,9 @@ public ClusterTopology getClusterTopology(String clusterName) { } + @Override + public void onDemandRebalance(String clusterName) {} + @Override public void addIdealState(String clusterName, String resourceName, String idealStateFile) throws IOException { @@ -543,4 +550,14 @@ public Map validateInstancesForWagedRebalance(String clusterNam List instancesNames) { return null; } + + @Override + public boolean isEvacuateFinished(String clusterName, String instancesNames) { + return false; + } + + @Override + public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) { + return false; + } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java index 3690ca447a..af794126c5 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java @@ -397,6 +397,28 @@ public void testSetInvalidAbnormalStatesResolverConfig() { trySetInvalidAbnormalStatesResolverMap(testConfig, resolverMap); } + @Test + public void testGetLastOnDemandRebalanceTimestamp() { + ClusterConfig testConfig = new ClusterConfig("testConfig"); + Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), -1L); + + testConfig.getRecord() + .setLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + 10000L); + Assert.assertEquals(testConfig.getLastOnDemandRebalanceTimestamp(), 10000L); + } + + @Test + public void testSetLastOnDemandRebalanceTimestamp() { + ClusterConfig testConfig = new ClusterConfig("testConfig"); + testConfig.setLastOnDemandRebalanceTimestamp(10000L); + + Assert.assertEquals(testConfig.getRecord() + .getLongField(ClusterConfig.ClusterConfigProperty.LAST_ON_DEMAND_REBALANCE_TIMESTAMP.name(), + -1), 10000L); + } + + private void trySetInvalidAbnormalStatesResolverMap(ClusterConfig testConfig, Map resolverMap) { try { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index b3ca029c5c..48b467eaac 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -84,7 +84,9 @@ public enum Command { enableWagedRebalanceForAllResources, purgeOfflineParticipants, getInstance, - getAllInstances + getAllInstances, + setInstanceOperation, // TODO: Name is just a place holder, may change in future + onDemandRebalance } @Context diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java index 68744cbbe0..809d736a06 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java @@ -347,6 +347,15 @@ public Response updateCluster(@PathParam("clusterId") String clusterId, helixAdmin.purgeOfflineInstances(clusterId, duration); } break; + case onDemandRebalance: + try { + helixAdmin.onDemandRebalance(clusterId); + } catch (Exception ex) { + LOG.error( + "Cannot start on-demand rebalance for cluster: {}, Exception: {}", clusterId, ex); + return serverError(ex); + } + break; default: return badRequest("Unsupported command {}." + command); } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java index d4f8f90af6..d397ada0f4 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java @@ -44,6 +44,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index d77e74355b..8fcd2ab35d 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -370,6 +370,7 @@ record = toZNRecord(content); @POST public Response updateInstance(@PathParam("clusterId") String clusterId, @PathParam("instanceName") String instanceName, @QueryParam("command") String command, + @QueryParam("instanceOperation") InstanceConstants.InstanceOperation state, @QueryParam("instanceDisabledType") String disabledType, @QueryParam("instanceDisabledReason") String disabledReason, String content) { Command cmd; @@ -414,6 +415,9 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, OBJECT_MAPPER.getTypeFactory() .constructCollectionType(List.class, String.class))); break; + case setInstanceOperation: + admin.setInstanceOperation(clusterId, instanceName, state); + break; case addInstanceTag: if (!validInstance(node, instanceName)) { return badRequest("Instance names are not match!"); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index 86c9d5ae3b..e1ed254c8d 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -1414,6 +1414,26 @@ public void testUpdateCustomizedConfig() throws IOException { System.out.println("End test :" + TestHelper.getTestMethodName()); } + @Test(dependsOnMethods = "testUpdateCustomizedConfig") + public void testOnDemandRebalance() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + long currentTime = System.currentTimeMillis(); + String cluster = "TestCluster_1"; + new JerseyUriRequestBuilder("clusters/{}?command=onDemandRebalance").format(cluster) + .post(this, Entity.entity("{}", MediaType.APPLICATION_JSON_TYPE)); + + ClusterConfig config = _configAccessor.getClusterConfig(cluster); + long lastOnDemandRebalanceTime = config.getLastOnDemandRebalanceTimestamp(); + Assert.assertFalse(lastOnDemandRebalanceTime == -1L, + "The last on-demand rebalance timestamp is not found."); + Assert.assertTrue(lastOnDemandRebalanceTime > currentTime, String.format( + "The last on-demand rebalance timestamp {} is stale. Expect a timestamp that is larger than {}.", + lastOnDemandRebalanceTime, currentTime)); + // restore the state + config.setLastOnDemandRebalanceTimestamp(-1L); + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + @Test public void testClusterFreezeMode() throws Exception { String cluster = "TestCluster_0"; diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java index 5d2fc70829..35f0712b41 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPerInstanceAccessor.java @@ -486,6 +486,20 @@ public void updateInstance() throws IOException { Assert.assertEquals(new HashSet<>(instanceConfig.getDisabledPartitionsMap() .get(CLUSTER_NAME + dbName.substring(0, dbName.length() - 1))), new HashSet<>(Arrays.asList(CLUSTER_NAME + dbName + "0", CLUSTER_NAME + dbName + "3"))); + + // test set instance operation + new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=EVACUATE") + .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); + Assert.assertEquals( + instanceConfig.getInstanceOperation(), InstanceConstants.InstanceOperation.EVACUATE.toString()); + new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=INVALIDOP") + .expectedReturnStatusCode(Response.Status.NOT_FOUND.getStatusCode()).format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + new JerseyUriRequestBuilder("clusters/{}/instances/{}?command=setInstanceOperation&instanceOperation=") + .format(CLUSTER_NAME, INSTANCE_NAME).post(this, entity); + instanceConfig = _configAccessor.getInstanceConfig(CLUSTER_NAME, INSTANCE_NAME); + Assert.assertEquals( + instanceConfig.getInstanceOperation(), ""); System.out.println("End test :" + TestHelper.getTestMethodName()); }