From 7613cabf007a8983895843e2520ddd28b0ff67dc Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Fri, 3 Nov 2023 15:12:14 -0700 Subject: [PATCH] HelixAdmin APIs and pipeline changes to support Helix Node Swap (#2661) Add ability for up to 2 nodes with the same logicalId to be added to the cluster at the same time when a SWAP is happening. During all paritionAssignment for WAGED and DelayedAutoRebalancer, we select just one instance for each logicalId. Achieves n -> n+1 for all replicas on SWAP_OUT node and back n when SWAP is marked complete, making it cancelable. Adding and updating Helix Admin APIs to support swap operation: setInstanceOperation addInstance canCompleteSwap completeSwapIfPossible * Refactor sanity checks for HelixAdmin swap APIs. * Helix Node Swap pipeline changes and integration tests. * Fix integration tests to properly restore stopped MockParticipant so following tests are not affected. * Add comments and docstrings. * Fix tests to clean up after themselves. * Optimize duplicate logicalId filtering to only be called on allNodes and then used to remove duplicate logicalIds from enabledLiveNodes. * Add handling for clusterConfig == null in updateSwappingInstances and fix AssignableNode to check for clusterTopologyConfig when attempting to get logicalId. * Fix integ tests. * Fix testGetDomainInformation since we no longer allow an instance to join the cluster with an invalid DOMAIN field. * Add checks to ensure that the SWAP_IN instance has a matching FAULT_ZONE and matching INSTANCE_CAPACITY_MAP to SWAP_OUT node. * Rename canSwapBeCompleted to canCompleteSwap. * Add sanity checks to allow SWAP_IN node to join the cluster in disabled state before SWAP_OUT node has instance operation set. * Fix print in test case. * Add canCompleteSwap to PerInstanceAccessor and fix formatting. * Fix flaky node swap after completion by making sure replica has is computed with logicalIds intead of instanceNames. --- .../java/org/apache/helix/HelixAdmin.java | 33 +- .../BaseControllerDataProvider.java | 71 ++ .../rebalancer/DelayedAutoRebalancer.java | 75 +- .../rebalancer/util/DelayedRebalanceUtil.java | 88 ++ .../waged/GlobalRebalanceRunner.java | 12 +- .../rebalancer/waged/WagedRebalancer.java | 57 +- .../constraints/ConstraintBasedAlgorithm.java | 4 +- .../waged/model/AssignableNode.java | 18 +- .../rebalancer/waged/model/ClusterModel.java | 8 + .../waged/model/ClusterModelProvider.java | 6 +- .../apache/helix/manager/zk/ZKHelixAdmin.java | 423 ++++++++- .../apache/helix/model/InstanceConfig.java | 28 + .../rebalancer/waged/TestWagedRebalancer.java | 4 +- .../rebalancer/TestInstanceOperation.java | 896 +++++++++++++++++- .../helix/manager/zk/TestZkHelixAdmin.java | 28 +- .../org/apache/helix/mock/MockHelixAdmin.java | 10 + .../server/resources/AbstractResource.java | 2 + .../resources/helix/PerInstanceAccessor.java | 103 +- 18 files changed, 1720 insertions(+), 146 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 085a987b1e..f53b886e2b 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; +import javax.annotation.Nullable; + import org.apache.helix.api.status.ClusterManagementMode; import org.apache.helix.api.status.ClusterManagementModeRequest; import org.apache.helix.api.topology.ClusterTopology; @@ -302,8 +304,15 @@ void enableInstance(String clusterName, String instanceName, boolean enabled, */ void enableInstance(String clusterName, List instances, boolean enabled); - void setInstanceOperation(String clusterName, String instance, - InstanceConstants.InstanceOperation instanceOperation); + /** + * Set the instanceOperation field. + * + * @param clusterName The cluster name + * @param instanceName The instance name + * @param instanceOperation The instance operation + */ + void setInstanceOperation(String clusterName, String instanceName, + @Nullable InstanceConstants.InstanceOperation instanceOperation); /** * Disable or enable a resource @@ -747,6 +756,26 @@ Map validateInstancesForWagedRebalance(String clusterName, */ boolean isEvacuateFinished(String clusterName, String instancesNames); + /** + * Check to see if swapping between two instances can be completed. Either the swapOut or + * swapIn instance can be passed in. + * @param clusterName The cluster name + * @param instanceName The instance that is being swapped out or swapped in + * @return True if the swap is ready to be completed, false otherwise. + */ + boolean canCompleteSwap(String clusterName, String instanceName); + + /** + * Check to see if swapping between two instances is ready to be completed and complete it if + * possible. Either the swapOut or swapIn instance can be passed in. + * + * @param clusterName The cluster name + * @param instanceName The instance that is being swapped out or swapped in + * @return True if the swap is ready to be completed and was completed successfully, false + * otherwise. + */ + boolean completeSwapIfPossible(String clusterName, String instanceName); + /** * Return if instance is ready for preparing joining cluster. The instance should have no current state, * no pending message and tagged with operation that exclude the instance from Helix assignment. diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index 8e8f9fa9b5..9dd5173841 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -46,10 +46,12 @@ import org.apache.helix.common.caches.PropertyCache; import org.apache.helix.common.caches.TaskCurrentStateCache; import org.apache.helix.common.controllers.ControlContextProvider; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -116,6 +118,8 @@ public class BaseControllerDataProvider implements ControlContextProvider { private Map> _idealStateRuleMap; private final Map>> _disabledInstanceForPartitionMap = new HashMap<>(); private final Set _disabledInstanceSet = new HashSet<>(); + private final Map _swapOutInstanceNameToSwapInInstanceName = new HashMap<>(); + private final Set _enabledLiveSwapInInstanceNames = new HashSet<>(); private final Map _abnormalStateResolverMap = new HashMap<>(); private final Set _timedOutInstanceDuringMaintenance = new HashSet<>(); private Map _liveInstanceExcludeTimedOutForMaintenance = new HashMap<>(); @@ -437,6 +441,8 @@ protected synchronized Set doRefresh(HelixDataAccesso updateIdealRuleMap(getClusterConfig()); updateDisabledInstances(getInstanceConfigMap().values(), getClusterConfig()); + updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(), + getClusterConfig()); return refreshedTypes; } @@ -471,6 +477,8 @@ public void setClusterConfig(ClusterConfig clusterConfig) { refreshAbnormalStateResolverMap(_clusterConfig); updateIdealRuleMap(_clusterConfig); updateDisabledInstances(getInstanceConfigMap().values(), _clusterConfig); + updateSwappingInstances(getInstanceConfigMap().values(), getEnabledLiveInstances(), + _clusterConfig); } @Override @@ -617,6 +625,24 @@ public Set getDisabledInstances() { return Collections.unmodifiableSet(_disabledInstanceSet); } + /** + * Get all swapping instance pairs. + * + * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames. + */ + public Map getSwapOutToSwapInInstancePairs() { + return Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName); + } + + /** + * Get all the enabled and live SWAP_IN instances. + * + * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance. + */ + public Set getEnabledLiveSwapInInstanceNames() { + return Collections.unmodifiableSet(_enabledLiveSwapInInstanceNames); + } + public synchronized void setLiveInstances(List liveInstances) { _liveInstanceCache.setPropertyMap(HelixProperty.convertListToMap(liveInstances)); _updateInstanceOfflineTime = true; @@ -750,6 +776,8 @@ public Map getInstanceConfigMap() { public void setInstanceConfigMap(Map instanceConfigMap) { _instanceConfigCache.setPropertyMap(instanceConfigMap); updateDisabledInstances(instanceConfigMap.values(), getClusterConfig()); + updateSwappingInstances(instanceConfigMap.values(), getEnabledLiveInstances(), + getClusterConfig()); } /** @@ -858,6 +886,49 @@ private void updateDisabledInstances(Collection instanceConfigs, } } + private void updateSwappingInstances(Collection instanceConfigs, + Set liveEnabledInstances, ClusterConfig clusterConfig) { + _swapOutInstanceNameToSwapInInstanceName.clear(); + _enabledLiveSwapInInstanceNames.clear(); + + if (clusterConfig == null) { + logger.warn("Skip refreshing swapping instances because clusterConfig is null."); + return; + } + + ClusterTopologyConfig clusterTopologyConfig = + ClusterTopologyConfig.createFromClusterConfig(clusterConfig); + + Map swapOutLogicalIdsByInstanceName = new HashMap<>(); + Map swapInInstancesByLogicalId = new HashMap<>(); + instanceConfigs.forEach(instanceConfig -> { + if (instanceConfig == null) { + return; + } + if (instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) { + swapOutLogicalIdsByInstanceName.put(instanceConfig.getInstanceName(), + instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType())); + } + if (instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) { + swapInInstancesByLogicalId.put( + instanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()), + instanceConfig.getInstanceName()); + } + }); + + swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> { + String swapInInstanceName = swapInInstancesByLogicalId.get(value); + if (swapInInstanceName != null) { + _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName); + if (liveEnabledInstances.contains(swapInInstanceName)) { + _enabledLiveSwapInInstanceNames.add(swapInInstanceName); + } + } + }); + } + /* * Check if the instance is timed-out during maintenance mode. An instance is timed-out if it has * been offline for longer than the user defined timeout window. diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 9cd0f71cd7..442ddfb029 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -31,17 +31,17 @@ import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; import org.apache.helix.HelixDefinedState; import org.apache.helix.api.config.StateTransitionThrottleConfig; +import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver; import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.IdealState; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; @@ -56,7 +56,8 @@ */ public class DelayedAutoRebalancer extends AbstractRebalancer { private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class); - public static final Set INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = ImmutableSet.of("EVACUATE", "SWAP_IN"); + public static ImmutableSet INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT = + ImmutableSet.of(InstanceConstants.InstanceOperation.EVACUATE.name()); @Override public IdealState computeNewIdealState(String resourceName, @@ -113,9 +114,16 @@ public IdealState computeNewIdealState(String resourceName, allNodes = clusterData.getAllInstances(); } + Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( + ClusterTopologyConfig.createFromClusterConfig(clusterConfig), + clusterData.getInstanceConfigMap(), allNodes); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + liveEnabledNodes.retainAll(allNodesDeduped); + long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig); Set activeNodes = DelayedRebalanceUtil - .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes, + .getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes, clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay, clusterConfig); if (delayRebalanceEnabled) { @@ -127,11 +135,11 @@ public IdealState computeNewIdealState(String resourceName, clusterConfig, _manager); } - if (allNodes.isEmpty() || activeNodes.isEmpty()) { + if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) { LOG.error(String.format( "No instances or active instances available for resource %s, " + "allInstances: %s, liveInstances: %s, activeInstances: %s", - resourceName, allNodes, liveEnabledNodes, activeNodes)); + resourceName, allNodesDeduped, liveEnabledNodes, activeNodes)); return generateNewIdealState(resourceName, currentIdealState, emptyMapping(currentIdealState)); } @@ -157,41 +165,58 @@ public IdealState computeNewIdealState(String resourceName, getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName, stateCountMap, maxPartition); - // sort node lists to ensure consistent preferred assignments - List allNodeList = new ArrayList<>(allNodes); - // We will not assign partition to instances with evacuation and wap-out tag. + List allNodeList = new ArrayList<>(allNodesDeduped); + // TODO: Currently we have 2 groups of instances and compute preference list twice and merge. // Eventually we want to have exclusive groups of instance for different instance tag. - List liveEnabledAssignableNodeList = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), - liveEnabledNodes); + List liveEnabledAssignableNodeList = new ArrayList<>( + // We will not assign partitions to instances with EVACUATE InstanceOperation. + DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(), + liveEnabledNodes)); + // sort node lists to ensure consistent preferred assignments Collections.sort(allNodeList); Collections.sort(liveEnabledAssignableNodeList); - ZNRecord newIdealMapping = _rebalanceStrategy - .computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, currentMapping, clusterData); + ZNRecord newIdealMapping = + _rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList, + currentMapping, clusterData); ZNRecord finalMapping = newIdealMapping; if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig) - || liveEnabledAssignableNodeList.size()!= activeNodes.size()) { + || liveEnabledAssignableNodeList.size() != activeNodes.size()) { List activeNodeList = new ArrayList<>(activeNodes); Collections.sort(activeNodeList); int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica( ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState), currentIdealState, replicaCount); - ZNRecord newActiveMapping = _rebalanceStrategy - .computePartitionAssignment(allNodeList, activeNodeList, currentMapping, clusterData); + ZNRecord newActiveMapping = + _rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping, + clusterData); finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping, liveEnabledNodes, replicaCount, minActiveReplicas); } finalMapping.getListFields().putAll(userDefinedPreferenceList); + // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster. + Map swapOutToSwapInInstancePairs = + clusterData.getSwapOutToSwapInInstancePairs(); + // 2. Get all enabled and live SWAP_IN instances in the cluster. + Set enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames(); + // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end. + // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active + // swap occurring. + if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) { + DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(finalMapping, + swapOutToSwapInInstancePairs, enabledLiveSwapInInstances); + } + LOG.debug("currentMapping: {}", currentMapping); LOG.debug("stateCountMap: {}", stateCountMap); LOG.debug("liveEnabledNodes: {}", liveEnabledNodes); LOG.debug("activeNodes: {}", activeNodes); - LOG.debug("allNodes: {}", allNodes); + LOG.debug("allNodes: {}", allNodesDeduped); LOG.debug("maxPartition: {}", maxPartition); LOG.debug("newIdealMapping: {}", newIdealMapping); LOG.debug("finalMapping: {}", finalMapping); @@ -201,14 +226,6 @@ public IdealState computeNewIdealState(String resourceName, return idealState; } - private static List filterOutOnOperationInstances(Map instanceConfigMap, - Set nodes) { - return nodes.stream() - .filter( - instance -> !INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) - .collect(Collectors.toList()); - } - private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState, ZNRecord newMapping) { IdealState newIdealState = new IdealState(resourceName); @@ -376,7 +393,7 @@ protected Map computeBestPossibleStateForPartition(Set l // if preference list is not empty, and we do have new intanceToAdd, we // should check if it has capacity to hold the partition. boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache != null; - if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) { + if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) { // check instanceToAdd instance appears in combinedPreferenceList for (String instance : instanceToAdd) { if (combinedPreferenceList.contains(instance)) { @@ -409,7 +426,11 @@ protected Map computeBestPossibleStateForPartition(Set l bestPossibleStateMap, preferenceList, combinedPreferenceList)) { for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) { String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1); - bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name()); + // We do not want to drop a SWAP_IN node if it is at the end of the preferenceList, + // because partitions are actively being added on this node to prepare for SWAP completion. + if (cache == null || !cache.getEnabledLiveSwapInInstanceNames().contains(instanceToDrop)) { + bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name()); + } } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java index 58bad164a5..c7066d053d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -34,12 +34,14 @@ import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.util.InstanceValidationUtil; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +141,92 @@ public static Set filterOutEvacuatingInstances(Map filterOutInstancesWithDuplicateLogicalIds( + ClusterTopologyConfig clusterTopologyConfig, Map instanceConfigMap, + Set instances) { + Set filteredNodes = new HashSet<>(); + Map filteredInstancesByLogicalId = new HashMap<>(); + + instances.forEach(node -> { + InstanceConfig thisInstanceConfig = instanceConfigMap.get(node); + if (thisInstanceConfig == null) { + return; + } + String thisLogicalId = + thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()); + + if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) { + InstanceConfig filteredDuplicateInstanceConfig = + instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId)); + if ((filteredDuplicateInstanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) + && thisInstanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) + || thisInstanceConfig.getInstanceOperation().isEmpty()) { + // If the already filtered instance is SWAP_IN and this instance is in SWAP_OUT, then replace the filtered + // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance + // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and + // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set. + filteredNodes.remove(filteredInstancesByLogicalId.get(thisLogicalId)); + filteredNodes.add(node); + filteredInstancesByLogicalId.put(thisLogicalId, node); + } + } else { + filteredNodes.add(node); + filteredInstancesByLogicalId.put(thisLogicalId, node); + } + }); + + return filteredNodes; + } + + /** + * Look through the provided mapping and add corresponding SWAP_IN node if a SWAP_OUT node exists + * in the partition's preference list. + * + * @param mapping the mapping to be updated (IdealState ZNRecord) + * @param swapOutToSwapInInstancePairs the map of SWAP_OUT to SWAP_IN instances + */ + public static void addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists(ZNRecord mapping, + Map swapOutToSwapInInstancePairs, Set enabledLiveSwapInInstances) { + Map> preferenceListsByPartition = mapping.getListFields(); + for (String partition : preferenceListsByPartition.keySet()) { + List preferenceList = preferenceListsByPartition.get(partition); + if (preferenceList == null) { + continue; + } + List newInstancesToAdd = new ArrayList<>(); + for (String instanceName : preferenceList) { + if (swapOutToSwapInInstancePairs.containsKey(instanceName) + && enabledLiveSwapInInstances.contains( + swapOutToSwapInInstancePairs.get(instanceName))) { + String swapInInstanceName = swapOutToSwapInInstancePairs.get(instanceName); + if (!preferenceList.contains(swapInInstanceName) && !newInstancesToAdd.contains( + swapInInstanceName)) { + newInstancesToAdd.add(swapInInstanceName); + } + } + } + if (!newInstancesToAdd.isEmpty()) { + preferenceList.addAll(newInstancesToAdd); + } + } + } + /** * Return the time when an offline or disabled instance should be treated as inactive. Return -1 * if it is inactive now or forced to be rebalanced by an on-demand rebalance. diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java index 6130e5c522..6c199bc1be 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java @@ -30,10 +30,12 @@ import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.changedetector.ResourceChangeDetector; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil; import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.Resource; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.monitoring.metrics.MetricCollector; @@ -163,8 +165,14 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map computeBestPossibleStates( ResourceControllerDataProvider clusterData, Map resourceMap, final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm) throws HelixRebalanceException { - Set activeNodes = DelayedRebalanceUtil - .getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(), + + Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( + ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), + clusterData.getInstanceConfigMap(), clusterData.getAllInstances()); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + Set liveEnabledNodesDeduped = clusterData.getEnabledLiveInstances(); + liveEnabledNodesDeduped.retainAll(allNodesDeduped); + + Set activeNodes = + DelayedRebalanceUtil.getActiveNodes(allNodesDeduped, liveEnabledNodesDeduped, clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), clusterData.getClusterConfig()); @@ -359,6 +368,20 @@ private Map convertResourceAssignment( // Sort the preference list according to state priority. newIdealState.setPreferenceLists( getPreferenceLists(assignments.get(resourceName), statePriorityMap)); + + // 1. Get all SWAP_OUT instances and corresponding SWAP_IN instance pairs in the cluster. + Map swapOutToSwapInInstancePairs = + clusterData.getSwapOutToSwapInInstancePairs(); + // 2. Get all enabled and live SWAP_IN instances in the cluster. + Set enabledLiveSwapInInstances = clusterData.getEnabledLiveSwapInInstanceNames(); + // 3. For each SWAP_OUT instance in any of the preferenceLists, add the corresponding SWAP_IN instance to the end. + // Skipping this when there are not SWAP_IN instances ready(enabled and live) will reduce computation time when there is not an active + // swap occurring. + if (!clusterData.getEnabledLiveSwapInInstanceNames().isEmpty()) { + DelayedRebalanceUtil.addSwapInInstanceToPreferenceListsIfSwapOutInstanceExists( + newIdealState.getRecord(), swapOutToSwapInInstancePairs, enabledLiveSwapInInstances); + } + // Note the state mapping in the new assignment won't directly propagate to the map fields. // The rebalancer will calculate for the final state mapping considering the current states. finalIdealStateMap.put(resourceName, newIdealState); @@ -398,7 +421,14 @@ private Map handleDelayedRebalanceMinActiveReplica( RebalanceAlgorithm algorithm) throws HelixRebalanceException { // the "real" live nodes at the time // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`. - final Set enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances()); + final Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( + ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), + clusterData.getInstanceConfigMap(), clusterData.getAllInstances()); + final Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + enabledLiveInstances.retainAll(allNodesDeduped); + if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) { // no need for additional process, return the current resource assignment return currentResourceAssignment; @@ -427,14 +457,6 @@ private Map handleDelayedRebalanceMinActiveReplica( } } - private static Set filterOutOnOperationInstances(Map instanceConfigMap, - Set nodes) { - return nodes.stream() - .filter( - instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation())) - .collect(Collectors.toSet()); - } - /** * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes * and addressing for partitions failing to meet minActiveReplicas. @@ -619,8 +641,15 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> { String resourceName = resourceAssignment.getResourceName(); IdealState currentIdealState = clusterData.getIdealState(resourceName); - Set enabledLiveInstances = - filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances()); + + Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( + ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), + clusterData.getInstanceConfigMap(), clusterData.getAllInstances()); + Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + enabledLiveInstances.retainAll(allNodesDeduped); + int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName), diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 5dbeb5c38a..77d56302c1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -153,7 +153,7 @@ private Optional getNodeWithHighestPoints(AssignableReplica repl int idleScore1 = busyInstances.contains(instanceName1) ? 0 : 1; int idleScore2 = busyInstances.contains(instanceName2) ? 0 : 1; return idleScore1 != idleScore2 ? (idleScore1 - idleScore2) - : -instanceName1.compareTo(instanceName2); + : -nodeEntry1.getKey().compareTo(nodeEntry2.getKey()); } else { return scoreCompareResult; } @@ -193,7 +193,7 @@ private static class AssignableReplicaWithScore implements Comparable { // Immutable Instance Properties private final String _instanceName; + private final String _logicaId; private final String _faultZone; // maximum number of the partitions that can be assigned to the instance. private final int _maxPartition; @@ -72,8 +74,12 @@ public class AssignableNode implements Comparable { * ResourceConfig could * subject to change. If the assumption is no longer true, this function should become private. */ - AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) { + AssignableNode(ClusterConfig clusterConfig, ClusterTopologyConfig clusterTopologyConfig, + InstanceConfig instanceConfig, String instanceName) { _instanceName = instanceName; + _logicaId = clusterTopologyConfig != null ? instanceConfig.getLogicalId( + clusterTopologyConfig.getEndNodeType()) + : instanceName; Map instanceCapacity = fetchInstanceCapacity(clusterConfig, instanceConfig); _faultZone = computeFaultZone(clusterConfig, instanceConfig); _instanceTags = ImmutableSet.copyOf(instanceConfig.getTags()); @@ -86,6 +92,10 @@ public class AssignableNode implements Comparable { _currentAssignedReplicaMap = new HashMap<>(); } + AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, String instanceName) { + this(clusterConfig, null, instanceConfig, instanceName); + } + /** * This function should only be used to assign a set of new partitions that are not allocated on * this node. It's because the any exception could occur at the middle of batch assignment and the @@ -272,6 +282,10 @@ public String getInstanceName() { return _instanceName; } + public String getLogicalId() { + return _logicaId; + } + public Set getInstanceTags() { return _instanceTags; } @@ -368,7 +382,7 @@ public int hashCode() { @Override public int compareTo(AssignableNode o) { - return _instanceName.compareTo(o.getInstanceName()); + return _logicaId.compareTo(o.getLogicalId()); } @Override diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java index 42eaabaf93..7ef503e013 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModel.java @@ -37,6 +37,7 @@ public class ClusterModel { // Note that the identical replicas are deduped in the index. private final Map> _assignableReplicaIndex; private final Map _assignableNodeMap; + private final Set _assignableNodeLogicalIds; /** * @param clusterContext The initialized cluster context. @@ -60,6 +61,9 @@ public class ClusterModel { _assignableNodeMap = assignableNodes.parallelStream() .collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node)); + _assignableNodeLogicalIds = + assignableNodes.parallelStream().map(AssignableNode::getLogicalId) + .collect(Collectors.toSet()); } public ClusterContext getContext() { @@ -70,6 +74,10 @@ public Map getAssignableNodes() { return _assignableNodeMap; } + public Set getAssignableLogicalIds() { + return _assignableNodeLogicalIds; + } + public Map> getAssignableReplicaMap() { return _assignableReplicaMap; } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index dffaec3e04..3f16732107 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -34,6 +34,7 @@ import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; @@ -530,9 +531,12 @@ public static Map>> getStateInstanceMap( */ private static Set getAllAssignableNodes(ClusterConfig clusterConfig, Map instanceConfigMap, Set activeInstances) { + ClusterTopologyConfig clusterTopologyConfig = + ClusterTopologyConfig.createFromClusterConfig(clusterConfig); return activeInstances.parallelStream() .filter(instanceConfigMap::containsKey).map( - instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName), + instanceName -> new AssignableNode(clusterConfig, clusterTopologyConfig, + instanceConfigMap.get(instanceName), instanceName)).collect(Collectors.toSet()); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index ebbcf64d02..34bd564878 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -39,6 +39,10 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.annotation.Nullable; + +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; @@ -47,7 +51,6 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; -import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyPathBuilder; @@ -67,6 +70,7 @@ import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.ClusterStatus; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.ConstraintItem; import org.apache.helix.model.ControllerHistory; import org.apache.helix.model.CurrentState; @@ -86,6 +90,7 @@ import org.apache.helix.model.PauseSignal; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.tools.DefaultIdealStateCalculator; import org.apache.helix.util.ConfigStringUtil; @@ -114,6 +119,8 @@ public class ZKHelixAdmin implements HelixAdmin { public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec"; private static final String MAINTENANCE_ZNODE_ID = "maintenance"; private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3; + private static final ImmutableSet ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE = + ImmutableSet.of("", InstanceConstants.InstanceOperation.SWAP_IN.name()); private final RealmAwareZkClient _zkClient; private final ConfigAccessor _configAccessor; @@ -197,6 +204,108 @@ public void addInstance(String clusterName, InstanceConfig instanceConfig) { throw new HelixException("Node " + nodeId + " already exists in cluster " + clusterName); } + if (!ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE.contains( + instanceConfig.getInstanceOperation())) { + throw new HelixException( + "Instance can only be added if InstanceOperation is set to one of" + "the following: " + + ALLOWED_INSTANCE_OPERATIONS_FOR_ADD_INSTANCE + " This instance: " + nodeId + + " has InstanceOperation set to " + instanceConfig.getInstanceOperation()); + } + + // Get the topology key used to determine the logicalId of a node. + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(clusterName); + ClusterTopologyConfig clusterTopologyConfig = + ClusterTopologyConfig.createFromClusterConfig(clusterConfig); + String logicalIdKey = clusterTopologyConfig.getEndNodeType(); + String faultZoneKey = clusterTopologyConfig.getFaultZoneType(); + String toAddInstanceLogicalId = instanceConfig.getLogicalId(logicalIdKey); + + HelixConfigScope instanceConfigScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, + clusterName).build(); + List existingInstanceIds = getConfigKeys(instanceConfigScope); + List foundInstanceConfigsWithMatchingLogicalId = + existingInstanceIds.parallelStream() + .map(existingInstanceId -> getInstanceConfig(clusterName, existingInstanceId)).filter( + existingInstanceConfig -> existingInstanceConfig.getLogicalId(logicalIdKey) + .equals(toAddInstanceLogicalId)).collect(Collectors.toList()); + + if (foundInstanceConfigsWithMatchingLogicalId.size() >= 2) { + // If the length is 2, we cannot add an instance with the same logicalId as an existing instance + // regardless of InstanceOperation. + throw new HelixException( + "There can only be 2 instances with the same logicalId in a cluster. " + + "Existing instances: " + foundInstanceConfigsWithMatchingLogicalId.get(0) + .getInstanceName() + " and " + foundInstanceConfigsWithMatchingLogicalId.get(1) + .getInstanceName() + " already have the same logicalId: " + toAddInstanceLogicalId + + "; therefore, " + nodeId + " cannot be added to the cluster."); + } else if (foundInstanceConfigsWithMatchingLogicalId.size() == 1) { + // If there is only one instance with the same logicalId, we can infer that the intended behaviour + // is to SWAP_IN. + + // If the InstanceOperation is unset, we will set it to SWAP_IN. + if (!instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) { + instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.SWAP_IN); + } + + // If the existing instance with the same logicalId does not have InstanceOperation set to SWAP_OUT and this instance + // is attempting to join as enabled, we cannot add this instance. + if (instanceConfig.getInstanceEnabled() && !foundInstanceConfigsWithMatchingLogicalId.get(0) + .getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) { + throw new HelixException( + "Instance can only be added if the exising instance sharing the same logicalId has InstanceOperation" + + " set to " + InstanceConstants.InstanceOperation.SWAP_OUT.name() + + " and this instance has InstanceOperation set to " + + InstanceConstants.InstanceOperation.SWAP_IN.name() + ". " + "Existing instance: " + + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName() + + " has InstanceOperation: " + + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceOperation() + + " and this instance: " + nodeId + " has InstanceOperation: " + + instanceConfig.getInstanceOperation()); + } + + // If the existing instance with the same logicalId is not in the same FAULT_ZONE as this instance, we cannot + // add this instance. + if (!foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap() + .containsKey(faultZoneKey) || !instanceConfig.getDomainAsMap().containsKey(faultZoneKey) + || !foundInstanceConfigsWithMatchingLogicalId.get(0).getDomainAsMap().get(faultZoneKey) + .equals(instanceConfig.getDomainAsMap().get(faultZoneKey))) { + throw new HelixException( + "Instance can only be added if the SWAP_OUT instance sharing the same logicalId is in the same FAULT_ZONE" + + " as this instance. " + "Existing instance: " + + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName() + + " has FAULT_ZONE_TYPE: " + foundInstanceConfigsWithMatchingLogicalId.get(0) + .getDomainAsMap().get(faultZoneKey) + " and this instance: " + nodeId + + " has FAULT_ZONE_TYPE: " + instanceConfig.getDomainAsMap().get(faultZoneKey)); + } + + Map foundInstanceCapacityMap = + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap().isEmpty() + ? clusterConfig.getDefaultInstanceCapacityMap() + : foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceCapacityMap(); + Map instanceCapacityMap = instanceConfig.getInstanceCapacityMap().isEmpty() + ? clusterConfig.getDefaultInstanceCapacityMap() : instanceConfig.getInstanceCapacityMap(); + // If the instance does not have the same capacity, we cannot add this instance. + if (!new EqualsBuilder().append(foundInstanceCapacityMap, instanceCapacityMap).isEquals()) { + throw new HelixException( + "Instance can only be added if the SWAP_OUT instance sharing the same logicalId has the same capacity" + + " as this instance. " + "Existing instance: " + + foundInstanceConfigsWithMatchingLogicalId.get(0).getInstanceName() + + " has capacity: " + foundInstanceCapacityMap + " and this instance: " + nodeId + + " has capacity: " + instanceCapacityMap); + } + } else if (!instanceConfig.getInstanceOperation().isEmpty()) { + // If there are no instances with the same logicalId, we can only add this instance if InstanceOperation + // is unset because it is a new instance. + throw new HelixException( + "There is no instance with logicalId: " + toAddInstanceLogicalId + " in cluster: " + + clusterName + "; therefore, " + nodeId + + " cannot join cluster with InstanceOperation set to " + + instanceConfig.getInstanceOperation() + "."); + } + ZKUtil.createChildren(_zkClient, instanceConfigsPath, instanceConfig.getRecord()); _zkClient.createPersistent(PropertyPathBuilder.instanceMessage(clusterName, nodeId), true); @@ -358,6 +467,21 @@ public void enableInstance(final String clusterName, final String instanceName, logger.info("{} instance {} in cluster {}.", enabled ? "Enable" : "Disable", instanceName, clusterName); BaseDataAccessor baseAccessor = new ZkBaseDataAccessor<>(_zkClient); + + // If enabled is set to true and InstanceOperation is SWAP_IN, we should fail if there is not a + // matching SWAP_OUT instance. + InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); + if (enabled && instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name())) { + InstanceConfig matchingSwapInstance = findMatchingSwapInstance(clusterName, instanceConfig); + if (matchingSwapInstance == null || !matchingSwapInstance.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) { + throw new HelixException("Instance cannot be enabled if InstanceOperation is set to " + + instanceConfig.getInstanceOperation() + " when there is no matching " + + InstanceConstants.InstanceOperation.SWAP_OUT.name() + " instance."); + } + } + // Eventually we will have all instances' enable/disable information in clusterConfig. Now we // update both instanceConfig and clusterConfig in transition period. enableSingleInstance(clusterName, instanceName, enabled, baseAccessor, disabledType, reason); @@ -379,11 +503,53 @@ public void enableInstance(String clusterName, List instances, boolean e @Override // TODO: Name may change in future public void setInstanceOperation(String clusterName, String instanceName, - InstanceConstants.InstanceOperation instanceOperation) { + @Nullable InstanceConstants.InstanceOperation instanceOperation) { BaseDataAccessor baseAccessor = new ZkBaseDataAccessor<>(_zkClient); String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + // InstanceOperation can only be set to SWAP_IN when the instance is added to the cluster + // or if it is disabled. + if (instanceOperation != null && instanceOperation.equals( + InstanceConstants.InstanceOperation.SWAP_IN) && getInstanceConfig(clusterName, + instanceName).getInstanceEnabled()) { + throw new HelixException("InstanceOperation should only be set to " + + InstanceConstants.InstanceOperation.SWAP_IN.name() + + " when an instance joins the cluster for the first time(when " + + "creating the InstanceConfig) or is disabled."); + } + + // InstanceOperation cannot be set to null if there is an instance with the same logicalId in + // the cluster which does not have InstanceOperation set to SWAP_IN or SWAP_OUT. + if (instanceOperation == null) { + InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); + String logicalIdKey = ClusterTopologyConfig.createFromClusterConfig( + _configAccessor.getClusterConfig(clusterName)).getEndNodeType(); + + HelixConfigScope instanceConfigScope = + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, + clusterName).build(); + List existingInstanceIds = getConfigKeys(instanceConfigScope); + List matchingInstancesWithNonSwappingInstanceOperation = + existingInstanceIds.parallelStream() + .map(existingInstanceId -> getInstanceConfig(clusterName, existingInstanceId)).filter( + existingInstanceConfig -> + !existingInstanceConfig.getInstanceName().equals(instanceName) + && existingInstanceConfig.getLogicalId(logicalIdKey) + .equals(instanceConfig.getLogicalId(logicalIdKey)) + && !existingInstanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) + && !existingInstanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) + .collect(Collectors.toList()); + + if (!matchingInstancesWithNonSwappingInstanceOperation.isEmpty()) { + throw new HelixException("InstanceOperation cannot be set to null for " + instanceName + + " if there are other instances with the same logicalId in the cluster that do not have" + + " InstanceOperation set to SWAP_IN or SWAP_OUT."); + } + } + if (!baseAccessor.exists(path, 0)) { throw new HelixException( "Cluster " + clusterName + ", instance: " + instanceName + ", instance config does not exist"); @@ -410,16 +576,263 @@ public ZNRecord update(ZNRecord currentData) { @Override public boolean isEvacuateFinished(String clusterName, String instanceName) { - if (!instanceHasCurrentSateOrMessage(clusterName, instanceName)) { + if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); return config != null && config.getInstanceOperation().equals(InstanceConstants.InstanceOperation.EVACUATE.name()); } return false; } + /** + * Find the instance that the passed instance is swapping with. If the passed instance has + * SWAP_OUT instanceOperation, then find the corresponding instance that has SWAP_IN + * instanceOperation. If the passed instance has SWAP_IN instanceOperation, then find the + * corresponding instance that has SWAP_OUT instanceOperation. + * + * @param clusterName The cluster name + * @param instanceConfig The instance to find the swap instance for + * @return The swap instance if found, null otherwise. + */ + @Nullable + private InstanceConfig findMatchingSwapInstance(String clusterName, + InstanceConfig instanceConfig) { + String logicalIdKey = + ClusterTopologyConfig.createFromClusterConfig(_configAccessor.getClusterConfig(clusterName)) + .getEndNodeType(); + + for (String potentialSwappingInstance : getConfigKeys( + new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.PARTICIPANT, + clusterName).build())) { + InstanceConfig potentialSwappingInstanceConfig = + getInstanceConfig(clusterName, potentialSwappingInstance); + + // Return if there is a matching Instance with the same logicalId and opposite InstanceOperation swap operation. + if (potentialSwappingInstanceConfig.getLogicalId(logicalIdKey) + .equals(instanceConfig.getLogicalId(logicalIdKey)) && ( + instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) + && potentialSwappingInstanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name())) || ( + instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) + && potentialSwappingInstanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()))) { + return potentialSwappingInstanceConfig; + } + } + + return null; + } + + /** + * Check to see if swapping between two instances is ready to be completed. Checks: 1. Both + * instances must be alive. 2. Both instances must only have one session and not be carrying over + * from a previous session. 3. Both instances must have no pending messages. 4. Both instances + * cannot have partitions in the ERROR state 5. SwapIn instance must have correct state for all + * partitions that are currently assigned to the SwapOut instance. + * TODO: We may want to make this a public API in the future. + * + * @param clusterName The cluster name + * @param swapOutInstanceName The instance that is being swapped out + * @param swapInInstanceName The instance that is being swapped in + * @return True if the swap is ready to be completed, false otherwise. + */ + private boolean canCompleteSwap(String clusterName, String swapOutInstanceName, + String swapInInstanceName) { + BaseDataAccessor baseAccessor = new ZkBaseDataAccessor(_zkClient); + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // 1. Check that both instances are alive. + LiveInstance swapOutLiveInstance = + accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName)); + LiveInstance swapInLiveInstance = + accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName)); + if (swapOutLiveInstance == null || swapInLiveInstance == null) { + logger.warn( + "SwapOutInstance {} is {} and SwapInInstance {} is {} for cluster {}. Swap will not complete unless both instances are ONLINE.", + swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE", + swapInInstanceName, swapInLiveInstance != null ? "ONLINE" : "OFFLINE", clusterName); + return false; + } + + // 2. Check that both instances only have one session and are not carrying any over. + // count number of sessions under CurrentState folder. If it is carrying over from prv session, + // then there are > 1 session ZNodes. + List swapOutSessions = baseAccessor.getChildNames( + PropertyPathBuilder.instanceCurrentState(clusterName, swapOutInstanceName), 0); + List swapInSessions = baseAccessor.getChildNames( + PropertyPathBuilder.instanceCurrentState(clusterName, swapInInstanceName), 0); + if (swapOutSessions.size() > 1 || swapInSessions.size() > 1) { + logger.warn( + "SwapOutInstance {} is carrying over from prev session and SwapInInstance {} is carrying over from prev session for cluster {}." + + " Swap will not complete unless both instances have only one session.", + swapOutInstanceName, swapInInstanceName, clusterName); + return false; + } + + // 3. Check that the swapOutInstance has no pending messages. + List swapOutMessages = + accessor.getChildValues(keyBuilder.messages(swapOutInstanceName), true); + int swapOutPendingMessageCount = swapOutMessages != null ? swapOutMessages.size() : 0; + List swapInMessages = + accessor.getChildValues(keyBuilder.messages(swapInInstanceName), true); + int swapInPendingMessageCount = swapInMessages != null ? swapInMessages.size() : 0; + if (swapOutPendingMessageCount > 0 || swapInPendingMessageCount > 0) { + logger.warn( + "SwapOutInstance {} has {} pending messages and SwapInInstance {} has {} pending messages for cluster {}." + + " Swap will not complete unless both instances have no pending messages.", + swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName, + swapInPendingMessageCount, clusterName); + return false; + } + + // 4. Collect a list of all partitions that have a current state on swapOutInstance + String swapOutActiveSession = swapOutLiveInstance.getEphemeralOwner(); + String swapInActiveSession = swapInLiveInstance.getEphemeralOwner(); + + // Iterate over all resources with current states on the swapOutInstance + List swapOutResources = baseAccessor.getChildNames( + PropertyPathBuilder.instanceCurrentState(clusterName, swapOutInstanceName, + swapOutActiveSession), 0); + for (String swapOutResource : swapOutResources) { + // Get the topState and secondTopStates for the stateModelDef used by the resource. + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(swapOutResource)); + StateModelDefinition stateModelDefinition = + accessor.getProperty(keyBuilder.stateModelDef(idealState.getStateModelDefRef())); + String topState = stateModelDefinition.getTopState(); + Set secondTopStates = stateModelDefinition.getSecondTopStates(); + + CurrentState swapOutResourceCurrentState = accessor.getProperty( + keyBuilder.currentState(swapOutInstanceName, swapOutActiveSession, swapOutResource)); + CurrentState swapInResourceCurrentState = accessor.getProperty( + keyBuilder.currentState(swapInInstanceName, swapInActiveSession, swapOutResource)); + + // Check to make sure swapInInstance has a current state for the resource + if (swapInResourceCurrentState == null) { + logger.warn( + "SwapOutInstance {} has current state for resource {} but SwapInInstance {} does not for cluster {}." + + " Swap will not complete unless both instances have current states for all resources.", + swapOutInstanceName, swapOutResource, swapInInstanceName, clusterName); + return false; + } + + // Iterate over all partitions in the swapOutInstance's current state for the resource + // and ensure that the swapInInstance has the correct state for the partition. + for (String partitionName : swapOutResourceCurrentState.getPartitionStateMap().keySet()) { + String swapOutPartitionState = swapOutResourceCurrentState.getState(partitionName); + String swapInPartitionState = swapInResourceCurrentState.getState(partitionName); + + // Neither instance should have any partitions in ERROR state. + if (swapOutPartitionState.equals(HelixDefinedState.ERROR.name()) + || swapInPartitionState.equals(HelixDefinedState.ERROR.name())) { + logger.warn( + "SwapOutInstance {} has partition {} in state {} and SwapInInstance {} has partition {} in state {} for cluster {}." + + " Swap will not complete unless both instances have no partitions in ERROR state.", + swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName, + partitionName, swapInPartitionState, clusterName); + return false; + } + + // When the state of a partition on a swapOut instance is in the topState, the state + // of the partition on the swapInInstance should also be in the topState or a secondTopState. + // It should be in a topState only if the state model allows multiple replicas in the topState. + // In all other cases it should be a secondTopState. + if (swapOutPartitionState.equals(topState) && !(swapInPartitionState.equals(topState) + || secondTopStates.contains(swapInPartitionState))) { + logger.warn( + "SwapOutInstance {} has partition {} in topState {} but SwapInInstance {} has partition {} in state {} for cluster {}." + + " Swap will not complete unless SwapInInstance has partition in topState or secondState.", + swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName, + partitionName, swapInPartitionState, clusterName); + return false; + } + + // When the state of a partition on a swapOut instance is any other state, except ERROR, DROPPED or TopState, + // the state of the partition on the swapInInstance should be the same. + if (!swapOutPartitionState.equals(topState) && !swapOutPartitionState.equals( + HelixDefinedState.DROPPED.name()) + && !swapOutPartitionState.equals(swapInPartitionState)) { + logger.warn( + "SwapOutInstance {} has partition {} in state {} but SwapInInstance {} has partition {} in state {} for cluster {}." + + " Swap will not complete unless both instances have matching states.", + swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName, + partitionName, swapInPartitionState, clusterName); + return false; + } + } + } + + return true; + } + + @Override + public boolean canCompleteSwap(String clusterName, String instanceName) { + InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); + if (instanceConfig == null) { + logger.warn( + "Instance {} in cluster {} does not exist. Cannot determine if the swap is complete.", + instanceName, clusterName); + return false; + } + + InstanceConfig swapOutInstanceConfig = instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? instanceConfig + : findMatchingSwapInstance(clusterName, instanceConfig); + InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? instanceConfig + : findMatchingSwapInstance(clusterName, instanceConfig); + if (swapOutInstanceConfig == null || swapInInstanceConfig == null) { + logger.warn( + "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.", + instanceName, clusterName); + return false; + } + + // Check if the swap is ready to be completed. + return canCompleteSwap(clusterName, swapOutInstanceConfig.getInstanceName(), + swapInInstanceConfig.getInstanceName()); + } + + @Override + public boolean completeSwapIfPossible(String clusterName, String instanceName) { + InstanceConfig instanceConfig = getInstanceConfig(clusterName, instanceName); + if (instanceConfig == null) { + logger.warn( + "Instance {} in cluster {} does not exist. Cannot determine if the swap is complete.", + instanceName, clusterName); + return false; + } + + InstanceConfig swapOutInstanceConfig = instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_OUT.name()) ? instanceConfig + : findMatchingSwapInstance(clusterName, instanceConfig); + InstanceConfig swapInInstanceConfig = instanceConfig.getInstanceOperation() + .equals(InstanceConstants.InstanceOperation.SWAP_IN.name()) ? instanceConfig + : findMatchingSwapInstance(clusterName, instanceConfig); + if (swapOutInstanceConfig == null || swapInInstanceConfig == null) { + logger.warn( + "Instance {} in cluster {} is not swapping with any other instance. Cannot determine if the swap is complete.", + instanceName, clusterName); + return false; + } + + // Check if the swap is ready to be completed. If not, return false. + if (!canCompleteSwap(clusterName, swapOutInstanceConfig.getInstanceName(), + swapInInstanceConfig.getInstanceName())) { + return false; + } + + // Complete the swap by removing the InstanceOperation for the SWAP_IN node and disabling the SWAP_OUT node. + setInstanceOperation(clusterName, swapInInstanceConfig.getInstanceName(), null); + enableInstance(clusterName, swapOutInstanceConfig.getInstanceName(), false); + + return true; + } + @Override public boolean isReadyForPreparingJoiningCluster(String clusterName, String instanceName) { - if (!instanceHasCurrentSateOrMessage(clusterName, instanceName)) { + if (!instanceHasCurrentStateOrMessage(clusterName, instanceName)) { InstanceConfig config = getInstanceConfig(clusterName, instanceName); return config != null && DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains( config.getInstanceOperation()); @@ -434,7 +847,7 @@ public boolean isReadyForPreparingJoiningCluster(String clusterName, String inst * @param instanceName * @return */ - private boolean instanceHasCurrentSateOrMessage(String clusterName, String instanceName) { + private boolean instanceHasCurrentStateOrMessage(String clusterName, String instanceName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_zkClient)); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index b68250dd37..98da7340f2 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -702,6 +702,18 @@ public String getInstanceName() { return _record.getId(); } + /** + * Get the logicalId of this instance. If it does not exist or is not set, + * return the instance name. + * @param logicalIdKey the key for the DOMAIN field containing the logicalId + * @return the logicalId of this instance + */ + public String getLogicalId(String logicalIdKey) { + // TODO: Consider caching DomainMap, parsing the DOMAIN string every time + // getLogicalId is called can become expensive if called too frequently. + return getDomainAsMap().getOrDefault(logicalIdKey, getInstanceName()); + } + @Override public boolean isValid() { // HELIX-65: remove check for hostname/port existence @@ -772,6 +784,7 @@ public static class Builder { private int _weight = WEIGHT_NOT_SET; private List _tags = new ArrayList<>(); private boolean _instanceEnabled = HELIX_ENABLED_DEFAULT_VALUE; + private InstanceConstants.InstanceOperation _instanceOperation; private Map _instanceInfoMap; private Map _instanceCapacityMap; @@ -819,6 +832,10 @@ public InstanceConfig build(String instanceId) { instanceConfig.setInstanceEnabled(_instanceEnabled); } + if (_instanceOperation != null) { + instanceConfig.setInstanceOperation(_instanceOperation); + } + if (_instanceInfoMap != null) { instanceConfig.setInstanceInfoMap(_instanceInfoMap); } @@ -890,6 +907,17 @@ public Builder setInstanceEnabled(boolean instanceEnabled) { return this; } + /** + * Set the instance operation for this instance + * + * @param instanceOperation the instance operation. + * @return InstanceConfig.Builder + */ + public Builder setInstanceOperation(InstanceConstants.InstanceOperation instanceOperation) { + _instanceOperation = instanceOperation; + return this; + } + /** * Set the INSTANCE_INFO_MAP for this instance * @param instanceInfoMap the instance info map diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java index 608a4d3afe..000978ef1a 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -738,7 +739,8 @@ public void testRebalanceOverwrite() throws HelixRebalanceException, IOException instances.add(offlineInstance); when(clusterData.getAllInstances()).thenReturn(instances); when(clusterData.getEnabledInstances()).thenReturn(instances); - when(clusterData.getEnabledLiveInstances()).thenReturn(ImmutableSet.of(instance0, instance1, instance2)); + when(clusterData.getEnabledLiveInstances()).thenReturn( + new HashSet<>(Arrays.asList(instance0, instance1, instance2))); Map instanceOfflineTimeMap = new HashMap<>(); instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE); when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap); diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 10cd662cb2..9ccc14fdfa 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -10,12 +10,14 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableSet; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.HelixRollbackException; import org.apache.helix.NotificationContext; -import org.apache.helix.PropertyPathBuilder; import org.apache.helix.TestHelper; import org.apache.helix.common.ZkTestBase; import org.apache.helix.constants.InstanceConstants; @@ -30,8 +32,10 @@ import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Message; import org.apache.helix.model.ResourceAssignment; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelFactory; @@ -51,15 +55,28 @@ public class TestInstanceOperation extends ZkTestBase { protected final String CLASS_NAME = getShortClassName(); protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + private final String TEST_CAPACITY_KEY = "TestCapacityKey"; + private final int TEST_CAPACITY_VALUE = 100; + protected static final String ZONE = "zone"; + protected static final String HOST = "host"; + protected static final String LOGICAL_ID = "logicalId"; + protected static final String TOPOLOGY = String.format("%s/%s/%s", ZONE, HOST, LOGICAL_ID); + + protected static final ImmutableSet SECONDARY_STATE_SET = + ImmutableSet.of("SLAVE", "STANDBY"); + protected static final ImmutableSet ACCEPTABLE_STATE_SET = + ImmutableSet.of("MASTER", "LEADER", "SLAVE", "STANDBY"); private int REPLICA = 3; protected ClusterControllerManager _controller; List _participants = new ArrayList<>(); + private List _originalParticipantNames = new ArrayList<>(); List _participantNames = new ArrayList<>(); private Set _allDBs = new HashSet<>(); private ZkHelixClusterVerifier _clusterVerifier; private ConfigAccessor _configAccessor; private long _stateModelDelay = 3L; + private final long DEFAULT_RESOURCE_DELAY_TIME = 1800000L; private HelixAdmin _admin; protected AssignmentMetadataStore _assignmentMetadataStore; HelixDataAccessor _dataAccessor; @@ -72,6 +89,7 @@ public void beforeClass() throws Exception { for (int i = 0; i < NUM_NODE; i++) { String participantName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _originalParticipantNames.add(participantName); addParticipant(participantName); } @@ -88,24 +106,88 @@ public void beforeClass() throws Exception { _configAccessor = new ConfigAccessor(_gZkClient); _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + setupClusterConfig(); + + createTestDBs(DEFAULT_RESOURCE_DELAY_TIME); + + setUpWagedBaseline(); + + _admin = new ZKHelixAdmin(_gZkClient); + } + + private void setupClusterConfig() { + _stateModelDelay = 3L; ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); clusterConfig.stateTransitionCancelEnabled(true); clusterConfig.setDelayRebalaceEnabled(true); clusterConfig.setRebalanceDelayTime(1800000L); _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - createTestDBs(1800000L); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } - setUpWagedBaseline(); + private void enabledTopologyAwareRebalance() { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopology(TOPOLOGY); + clusterConfig.setFaultZoneType(ZONE); + clusterConfig.setTopologyAwareEnabled(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); - _admin = new ZKHelixAdmin(_gZkClient); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + private void disableTopologyAwareRebalance() { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.setTopologyAwareEnabled(false); + clusterConfig.setTopology(null); + clusterConfig.setFaultZoneType(null); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + private void resetInstances() { + // Disable and drop any participants that are not in the original participant list. + Set droppedParticipants = new HashSet<>(); + for (int i = 0; i < _participants.size(); i++) { + String participantName = _participantNames.get(i); + if (!_originalParticipantNames.contains(participantName)) { + _participants.get(i).syncStop(); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, participantName, false); + _gSetupTool.getClusterManagementTool() + .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, participantName)); + droppedParticipants.add(participantName); + } + } + + // Remove the dropped instance from _participants and _participantNames + _participantNames.removeIf(droppedParticipants::contains); + _participants.removeIf(p -> droppedParticipants.contains(p.getInstanceName())); + + for (int i = 0; i < _participants.size(); i++) { + // If instance is not connected to ZK, replace it + if (!_participants.get(i).isConnected()) { + // Drop bad instance from the cluster. + _gSetupTool.getClusterManagementTool() + .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i))); + _participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i), + "zone_" + i, null, true, -1)); + _participants.get(i).syncStart(); + continue; + } + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participantNames.get(i), true); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); } @Test public void testEvacuate() throws Exception { System.out.println("START TestInstanceOperation.testEvacuate() at " + new Date(System.currentTimeMillis())); // EV should contain all participants, check resources one by one - Map assignment = getEV(); + Map assignment = getEVs(); for (String resource : _allDBs) { Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); } @@ -118,7 +200,7 @@ public void testEvacuate() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // New ev should contain all instances but the evacuated one - assignment = getEV(); + assignment = getEVs(); List currentActiveInstances = _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); for (String resource : _allDBs) { @@ -143,7 +225,7 @@ public void testRevertEvacuation() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // EV should contain all participants, check resources one by one - Map assignment = getEV(); + Map assignment = getEVs(); for (String resource : _allDBs) { Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); validateAssignmentInEv(assignment.get(resource)); @@ -159,7 +241,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception { .enableInstance(CLUSTER_NAME, mockNewInstance, false); Assert.assertTrue(_clusterVerifier.verifyByPolling()); //ev should contain all instances but the disabled one - Map assignment = getEV(); + Map assignment = getEVs(); List currentActiveInstances = _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList()); for (String resource : _allDBs) { @@ -175,7 +257,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception { _gSetupTool.getClusterManagementTool() .enableInstance(CLUSTER_NAME, mockNewInstance, true); //ev should be the same - assignment = getEV(); + assignment = getEVs(); currentActiveInstances = _participantNames.stream().filter(n -> !n.equals(mockNewInstance)).collect(Collectors.toList()); for (String resource : _allDBs) { @@ -193,7 +275,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // EV should contain all participants, check resources one by one - assignment = getEV(); + assignment = getEVs(); for (String resource : _allDBs) { Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); validateAssignmentInEv(assignment.get(resource)); @@ -234,7 +316,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { // sleep a bit so ST messages can start executing Thread.sleep(Math.abs(_stateModelDelay / 100)); // before we cancel, check current EV - Map assignment = getEV(); + Map assignment = getEVs(); for (String resource : _allDBs) { // check every replica has >= 3 partitions and a top state partition validateAssignmentInEv(assignment.get(resource)); @@ -244,7 +326,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); - assignment = getEV(); + assignment = getEVs(); for (String resource : _allDBs) { // check every replica has >= 3 active replicas, even before cluster converge validateAssignmentInEv(assignment.get(resource)); @@ -254,7 +336,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // EV should contain all participants, check resources one by one - assignment = getEV(); + assignment = getEVs(); for (String resource : _allDBs) { Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); // check every replica has >= 3 active replicas again @@ -283,7 +365,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception { _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, instanceToEvacuate, null); // check every replica has >= 3 active replicas, even before cluster converge - Map assignment = getEV(); + Map assignment = getEVs(); for (String resource : _allDBs) { validateAssignmentInEv(assignment.get(resource)); } @@ -291,7 +373,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); // EV should contain all participants, check resources one by one - assignment = getEV(); + assignment = getEVs(); for (String resource : _allDBs) { Assert.assertTrue(getParticipantsInEv(assignment.get(resource)).containsAll(_participantNames)); // check every replica has >= 3 active replicas @@ -309,7 +391,7 @@ public void testMarkEvacuationAfterEMM() throws Exception { addParticipant(PARTICIPANT_PREFIX + "_" + (START_PORT + NUM_NODE)); Assert.assertTrue(_clusterVerifier.verifyByPolling()); - Map assignment = getEV(); + Map assignment = getEVs(); for (String resource : _allDBs) { Assert.assertFalse(getParticipantsInEv(assignment.get(resource)).contains(_participantNames.get(NUM_NODE))); } @@ -332,7 +414,7 @@ public void testMarkEvacuationAfterEMM() throws Exception { Assert.assertTrue(_clusterVerifier.verifyByPolling()); - assignment = getEV(); + assignment = getEVs(); List currentActiveInstances = _participantNames.stream().filter(n -> !n.equals(instanceToEvacuate)).collect(Collectors.toList()); for (String resource : _allDBs) { @@ -342,6 +424,8 @@ public void testMarkEvacuationAfterEMM() throws Exception { Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances)); } Assert.assertTrue(_admin.isReadyForPreparingJoiningCluster(CLUSTER_NAME, instanceToEvacuate)); + + _stateModelDelay = 3L; } @Test(dependsOnMethods = "testMarkEvacuationAfterEMM") @@ -356,7 +440,7 @@ public void testEvacuationWithOfflineInstancesInCluster() throws Exception { Map assignment; // EV should contain all participants, check resources one by one - assignment = getEV(); + assignment = getEVs(); for (String resource : _allDBs) { TestHelper.verify(() -> { ExternalView ev = assignment.get(resource); @@ -379,13 +463,686 @@ public void testEvacuationWithOfflineInstancesInCluster() throws Exception { }, 30000); } - _participants.get(1).syncStart(); - _participants.get(2).syncStart(); + resetInstances(); + dropTestDBs(ImmutableSet.of("TEST_DB3_DELAYED_CRUSHED", "TEST_DB4_DELAYED_WAGED")); } + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testEvacuationWithOfflineInstancesInCluster") + public void testAddingNodeWithSwapOutInstanceOperation() throws Exception { + System.out.println( + "START TestInstanceOperation.testAddingNodeWithSwapOutInstanceOperation() at " + new Date( + System.currentTimeMillis())); + + enabledTopologyAwareRebalance(); + resetInstances(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_OUT, true, -1); + } - private void addParticipant(String participantName) { - _gSetupTool.addInstanceToCluster(CLUSTER_NAME, participantName); + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithSwapOutInstanceOperation") + public void testAddingNodeWithSwapOutNodeInstanceOperationUnset() throws Exception { + System.out.println( + "START TestInstanceOperation.testAddingNodeWithSwapOutNodeInstanceOperationUnset() at " + + new Date(System.currentTimeMillis())); + + resetInstances(); + + // Set instance's InstanceOperation to null + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testAddingNodeWithSwapOutNodeInstanceOperationUnset") + public void testNodeSwapWithNoSwapOutNode() throws Exception { + System.out.println("START TestInstanceOperation.testNodeSwapWithNoSwapOutNode() at " + new Date( + System.currentTimeMillis())); + + resetInstances(); + + // Add new instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, "1000", "zone_1000", + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + } + + @Test(dependsOnMethods = "testNodeSwapWithNoSwapOutNode") + public void testNodeSwapSwapInNodeNoInstanceOperationEnabled() throws Exception { + System.out.println( + "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationEnabled() at " + + new Date(System.currentTimeMillis())); + + resetInstances(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Add instance with same logicalId with InstanceOperation unset + // This should work because adding instance with InstanceOperation unset will automatically + // set the InstanceOperation to SWAP_IN. + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationEnabled") + public void testNodeSwapSwapInNodeWithAlreadySwappingPair() throws Exception { + System.out.println( + "START TestInstanceOperation.testNodeSwapSwapInNodeWithAlreadySwappingPair() at " + + new Date(System.currentTimeMillis())); + + resetInstances(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + + // Add another instance with InstanceOperation set to SWAP_IN with same logicalId as previously + // added SWAP_IN instance. + String secondInstanceToSwapInName = + PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(secondInstanceToSwapInName, + instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapSwapInNodeWithAlreadySwappingPair") + public void testNodeSwapNoTopologySetup() throws Exception { + System.out.println("START TestInstanceOperation.testNodeSwapNoTopologySetup() at " + new Date( + System.currentTimeMillis())); + disableTopologyAwareRebalance(); + resetInstances(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Add instance with InstanceOperation set to SWAP_IN + // There should be an error that the logicalId does not have SWAP_OUT instance because, + // helix can't determine what topology key to use to get the logicalId if TOPOLOGY is not set. + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapNoTopologySetup") + public void testNodeSwapWrongFaultZone() throws Exception { + System.out.println("START TestInstanceOperation.testNodeSwapWrongFaultZone() at " + new Date( + System.currentTimeMillis())); + // Re-enable topology aware rebalancing and set TOPOLOGY. + enabledTopologyAwareRebalance(); + resetInstances(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Add instance with InstanceOperation set to SWAP_IN + // There should be an error because SWAP_IN instance must be in the same FAULT_ZONE as the SWAP_OUT instance. + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE) + "1", + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWrongFaultZone") + public void testNodeSwapWrongCapacity() throws Exception { + System.out.println("START TestInstanceOperation.testNodeSwapWrongCapacity() at " + new Date( + System.currentTimeMillis())); + resetInstances(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Add instance with InstanceOperation set to SWAP_IN + // There should be an error because SWAP_IN instance must have same capacity as the SWAP_OUT node. + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, TEST_CAPACITY_VALUE - 10); + } + + @Test(dependsOnMethods = "testNodeSwapWrongCapacity") + public void testNodeSwap() throws Exception { + System.out.println( + "START TestInstanceOperation.testNodeSwap() at " + new Date(System.currentTimeMillis())); + resetInstances(); + + // Store original EV + Map originalEVs = getEVs(); + + Map swapOutInstancesToSwapInInstances = new HashMap<>(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + + // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT + // and adding the SWAP_IN instance to the cluster. + // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance + // but none of them are in a top state. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Set.of(instanceToSwapInName), Collections.emptySet()); + + // Assert canSwapBeCompleted is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + // Assert completeSwapIfPossible is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. + Assert.assertFalse(_gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), + 0); + + // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before + // swap was completed. + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName)); + } + + @Test(dependsOnMethods = "testNodeSwap") + public void testNodeSwapSwapInNodeNoInstanceOperationDisabled() throws Exception { + System.out.println( + "START TestInstanceOperation.testNodeSwapSwapInNodeNoInstanceOperationDisabled() at " + + new Date(System.currentTimeMillis())); + + resetInstances(); + + // Store original EVs + Map originalEVs = getEVs(); + + Map swapOutInstancesToSwapInInstances = new HashMap<>(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // Add instance with InstanceOperation unset, should automatically be set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // Enable the SWAP_IN instance, so it can start being assigned replicas + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true); + + // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT + // and adding the SWAP_IN instance to the cluster. + // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance + // but none of them are in a top state. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Set.of(instanceToSwapInName), Collections.emptySet()); + + // Assert canSwapBeCompleted is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + // Assert completeSwapIfPossible is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. + Assert.assertFalse(_gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), + 0); + + // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before + // swap was completed. + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName)); + } + + @Test(dependsOnMethods = "testNodeSwapSwapInNodeNoInstanceOperationDisabled") + public void testNodeSwapCancelSwapWhenReadyToComplete() throws Exception { + System.out.println( + "START TestInstanceOperation.testNodeSwapCancelSwapWhenReadyToComplete() at " + new Date( + System.currentTimeMillis())); + + resetInstances(); + + // Store original EVs + Map originalEVs = getEVs(); + + Map swapOutInstancesToSwapInInstances = new HashMap<>(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + + // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT + // and adding the SWAP_IN instance to the cluster. + // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance + // but none of them are in a top state. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Set.of(instanceToSwapInName), Collections.emptySet()); + + // Assert canSwapBeCompleted is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + + // Cancel SWAP by disabling the SWAP_IN instance and remove SWAP_OUT InstanceOperation from SWAP_OUT instance. + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, instanceToSwapInName, false); + + // Wait for cluster to converge. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Validate there are no partitions on the SWAP_IN instance. + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0); + + // Validate that the SWAP_OUT instance has the same partitions as it had before. + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, null); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Validate there are no partitions on the SWAP_IN instance. + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName).size(), 0); + + // Validate that the SWAP_OUT instance has the same partitions as it had before. + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + } + + @Test(dependsOnMethods = "testNodeSwapCancelSwapWhenReadyToComplete") + public void testNodeSwapAfterEMM() throws Exception { + System.out.println("START TestInstanceOperation.testNodeSwapAfterEMM() at " + new Date( + System.currentTimeMillis())); + + resetInstances(); + + // Store original EVs + Map originalEVs = getEVs(); + + Map swapOutInstancesToSwapInInstances = new HashMap<>(); + + // Put the cluster in maintenance mode. + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null, null); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + + // Validate that the assignment has not changed since adding the SWAP_IN node. + // During MM, the cluster should not compute new assignment. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // Remove the cluster from maintenance mode. + // Now swapping will begin + _gSetupTool.getClusterManagementTool() + .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, null, null); + + // Validate that partitions on SWAP_OUT instance does not change after exiting MM + // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance + // but none of them are in a top state. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Set.of(instanceToSwapInName), Collections.emptySet()); + + // Assert canSwapBeCompleted is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + // Assert completeSwapIfPossible is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. + Assert.assertFalse(_gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), + 0); + + // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before + // swap was completed. + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName)); + } + + @Test(dependsOnMethods = "testNodeSwapAfterEMM") + public void testNodeSwapWithSwapOutInstanceDisabled() throws Exception { + System.out.println( + "START TestInstanceOperation.testNodeSwapWithSwapOutInstanceDisabled() at " + new Date( + System.currentTimeMillis())); + + resetInstances(); + + // Store original EVs + Map originalEVs = getEVs(); + + // Set instance's InstanceOperation to SWAP_OUT + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + Set swapOutInstanceOriginalPartitions = + getPartitionsAndStatesOnInstance(originalEVs, instanceToSwapOutName).keySet(); + + // Disable the SWAP_OUT instance. + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, instanceToSwapOutName, false); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Validate that the SWAP_OUT instance has all partitions in OFFLINE state + Set swapOutInstanceOfflineStates = + new HashSet<>(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).values()); + Assert.assertEquals(swapOutInstanceOfflineStates.size(), 1); + Assert.assertTrue(swapOutInstanceOfflineStates.contains("OFFLINE")); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), + InstanceConstants.InstanceOperation.SWAP_IN, true, -1); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Validate that the SWAP_IN instance has the same partitions as the SWAP_OUT instance in second top state. + Map swapInInstancePartitionsAndStates = + getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName); + Assert.assertTrue( + swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions)); + Set swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values()); + swapInInstanceStates.removeAll(SECONDARY_STATE_SET); + Assert.assertEquals(swapInInstanceStates.size(), 0); + + // Assert canSwapBeCompleted is false because SWAP_OUT instance is disabled. + Assert.assertFalse(_gSetupTool.getClusterManagementTool() + .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + + // Enable the SWAP_OUT instance. + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, instanceToSwapOutName, true); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Assert completeSwapIfPossible is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); + + // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance originally + // had. Validate they are in second top state because initially disabling SWAP_OUT instance + // caused all topStates to be handed off to next replica in the preference list. + swapInInstancePartitionsAndStates = + getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapInName); + Assert.assertTrue( + swapInInstancePartitionsAndStates.keySet().containsAll(swapOutInstanceOriginalPartitions)); + swapInInstanceStates = new HashSet<>(swapInInstancePartitionsAndStates.values()); + swapInInstanceStates.removeAll(SECONDARY_STATE_SET); + Assert.assertEquals(swapInInstanceStates.size(), 0); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. + Assert.assertFalse(_gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), + 0); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapWithSwapOutInstanceDisabled") + public void testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() { + System.out.println( + "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet() at " + + new Date(System.currentTimeMillis())); + resetInstances(); + + // Get the SWAP_OUT instance. + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + + // Add instance with InstanceOperation set to SWAP_IN enabled before setting SWAP_OUT instance. + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, true, -1); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnabledBeforeSwapOutSet") + public void testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() { + System.out.println( + "START TestInstanceOperation.testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet() at " + + new Date(System.currentTimeMillis())); + resetInstances(); + + // Get the SWAP_OUT instance. + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Enable the SWAP_IN instance before we have set the SWAP_OUT instance. + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true); + } + + @Test(expectedExceptions = HelixException.class, dependsOnMethods = "testNodeSwapAddSwapInFirstEnableBeforeSwapOutSet") + public void testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() { + System.out.println( + "START TestInstanceOperation.testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut() at " + + new Date(System.currentTimeMillis())); + resetInstances(); + + // Get the SWAP_OUT instance. + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Try to remove the InstanceOperation from the SWAP_IN instance before the SWAP_OUT instance is set. + // This should throw exception because we cannot ever have two instances with the same logicalId and both have InstanceOperation + // unset. + _gSetupTool.getClusterManagementTool() + .setInstanceOperation(CLUSTER_NAME, instanceToSwapInName, null); + } + + @Test(dependsOnMethods = "testUnsetInstanceOperationOnSwapInWhenAlreadyUnsetOnSwapOut") + public void testNodeSwapAddSwapInFirst() { + System.out.println("START TestInstanceOperation.testNodeSwapAddSwapInFirst() at " + new Date( + System.currentTimeMillis())); + resetInstances(); + + // Store original EV + Map originalEVs = getEVs(); + + Map swapOutInstancesToSwapInInstances = new HashMap<>(); + + // Get the SWAP_OUT instance. + String instanceToSwapOutName = _participants.get(0).getInstanceName(); + InstanceConfig instanceToSwapOutInstanceConfig = _gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName); + + // Add instance with InstanceOperation set to SWAP_IN + String instanceToSwapInName = PARTICIPANT_PREFIX + "_" + (START_PORT + _participants.size()); + swapOutInstancesToSwapInInstances.put(instanceToSwapOutName, instanceToSwapInName); + addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), + instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), null, false, -1); + + // Validate that the assignment has not changed since setting the InstanceOperation to SWAP_OUT + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Collections.emptySet()); + + // After the SWAP_IN instance is added, we set the InstanceOperation to SWAP_OUT + _gSetupTool.getClusterManagementTool().setInstanceOperation(CLUSTER_NAME, instanceToSwapOutName, + InstanceConstants.InstanceOperation.SWAP_OUT); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Enable the SWAP_IN instance to begin the swap operation. + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, instanceToSwapInName, true); + + // Validate that partitions on SWAP_OUT instance does not change after setting the InstanceOperation to SWAP_OUT + // and adding the SWAP_IN instance to the cluster. + // Check that the SWAP_IN instance has the same partitions as the SWAP_OUT instance + // but none of them are in a top state. + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Set.of(instanceToSwapInName), Collections.emptySet()); + + // Assert canSwapBeCompleted is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .canCompleteSwap(CLUSTER_NAME, instanceToSwapOutName)); + // Assert completeSwapIfPossible is true + Assert.assertTrue(_gSetupTool.getClusterManagementTool() + .completeSwapIfPossible(CLUSTER_NAME, instanceToSwapOutName)); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + + // Assert that SWAP_OUT instance is disabled and has no partitions assigned to it. + Assert.assertFalse(_gSetupTool.getClusterManagementTool() + .getInstanceConfig(CLUSTER_NAME, instanceToSwapOutName).getInstanceEnabled()); + Assert.assertEquals(getPartitionsAndStatesOnInstance(getEVs(), instanceToSwapOutName).size(), + 0); + + // Validate that the SWAP_IN instance has the same partitions the SWAP_OUT instance had before + // swap was completed. + validateEVsCorrect(getEVs(), originalEVs, swapOutInstancesToSwapInInstances, + Collections.emptySet(), Set.of(instanceToSwapInName)); + } + + private MockParticipantManager createParticipant(String participantName, String logicalId, String zone, + InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { + InstanceConfig config = new InstanceConfig.Builder().setDomain( + String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, + logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) + .build(participantName); + if (capacity >= 0) { + config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity)); + } + _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); // start dummy participants MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); @@ -393,12 +1150,24 @@ private void addParticipant(String participantName) { // Using a delayed state model StDelayMSStateModelFactory delayFactory = new StDelayMSStateModelFactory(); stateMachine.registerStateModelFactory("MasterSlave", delayFactory); + return participant; + } + + private void addParticipant(String participantName, String logicalId, String zone, + InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { + MockParticipantManager participant = createParticipant(participantName, logicalId, zone, + instanceOperation, enabled, capacity); participant.syncStart(); _participants.add(participant); _participantNames.add(participantName); } + private void addParticipant(String participantName) { + addParticipant(participantName, Integer.toString(_participants.size()), + "zone_" + _participants.size(), null, true, -1); + } + private void createTestDBs(long delayTime) throws InterruptedException { createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB0_CRUSHED", BuiltInStateModelDefinitions.LeaderStandby.name(), PARTITIONS, REPLICA, REPLICA - 1, -1, @@ -415,7 +1184,15 @@ private void createTestDBs(long delayTime) throws InterruptedException { Assert.assertTrue(_clusterVerifier.verifyByPolling()); } - private Map getEV() { + private void dropTestDBs(Set dbs) { + for (String db : dbs) { + _gSetupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, db); + _allDBs.remove(db); + } + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + private Map getEVs() { Map externalViews = new HashMap(); for (String db : _allDBs) { ExternalView ev = _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); @@ -450,7 +1227,65 @@ private Set getParticipantsInEv(ExternalView ev) { return assignedParticipants; } - // verify that each partition has >=REPLICA (3 in this case) replicas + private Map getPartitionsAndStatesOnInstance(Map evs, + String instanceName) { + Map instancePartitions = new HashMap<>(); + for (String resourceEV : evs.keySet()) { + for (String partition : evs.get(resourceEV).getPartitionSet()) { + if (evs.get(resourceEV).getStateMap(partition).containsKey(instanceName)) { + instancePartitions.put(partition, + evs.get(resourceEV).getStateMap(partition).get(instanceName)); + } + } + } + + return instancePartitions; + } + + private void validateEVCorrect(ExternalView actual, ExternalView original, + Map swapOutInstancesToSwapInInstances, Set inFlightSwapInInstances, + Set completedSwapInInstanceNames) { + Assert.assertEquals(actual.getPartitionSet(), original.getPartitionSet()); + IdealState is = _gSetupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, original.getResourceName()); + StateModelDefinition stateModelDef = _gSetupTool.getClusterManagementTool() + .getStateModelDef(CLUSTER_NAME, is.getStateModelDefRef()); + for (String partition : actual.getPartitionSet()) { + Map expectedStateMap = new HashMap<>(original.getStateMap(partition)); + for (String swapOutInstance : swapOutInstancesToSwapInInstances.keySet()) { + if (expectedStateMap.containsKey(swapOutInstance) && inFlightSwapInInstances.contains( + swapOutInstancesToSwapInInstances.get(swapOutInstance))) { + // If the corresponding swapInInstance is in-flight, add it to the expectedStateMap + // with the same state as the swapOutInstance or secondState if the swapOutInstance + // has a topState. + expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance), + expectedStateMap.get(swapOutInstance).equals(stateModelDef.getTopState()) + ? (String) stateModelDef.getSecondTopStates().toArray()[0] + : expectedStateMap.get(swapOutInstance)); + } else if (expectedStateMap.containsKey(swapOutInstance) + && completedSwapInInstanceNames.contains( + swapOutInstancesToSwapInInstances.get(swapOutInstance))) { + // If the corresponding swapInInstance is completed, add it to the expectedStateMap + // with the same state as the swapOutInstance. + expectedStateMap.put(swapOutInstancesToSwapInInstances.get(swapOutInstance), + expectedStateMap.get(swapOutInstance)); + expectedStateMap.remove(swapOutInstance); + } + } + Assert.assertEquals(actual.getStateMap(partition), expectedStateMap, "Error for partition " + partition + + " in resource " + actual.getResourceName()); + } + } + + private void validateEVsCorrect(Map actuals, + Map originals, Map swapOutInstancesToSwapInInstances, + Set inFlightSwapInInstances, Set completedSwapInInstanceNames) { + Assert.assertEquals(actuals.keySet(), originals.keySet()); + for (String resource : actuals.keySet()) { + validateEVCorrect(actuals.get(resource), originals.get(resource), + swapOutInstancesToSwapInInstances, inFlightSwapInInstances, completedSwapInInstanceNames); + } + } private void validateAssignmentInEv(ExternalView ev) { validateAssignmentInEv(ev, REPLICA); @@ -460,10 +1295,7 @@ private void validateAssignmentInEv(ExternalView ev, int expectedNumber) { Set partitionSet = ev.getPartitionSet(); for (String partition : partitionSet) { AtomicInteger activeReplicaCount = new AtomicInteger(); - ev.getStateMap(partition) - .values() - .stream() - .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals("STANDBY")) + ev.getStateMap(partition).values().stream().filter(ACCEPTABLE_STATE_SET::contains) .forEach(v -> activeReplicaCount.getAndIncrement()); Assert.assertTrue(activeReplicaCount.get() >=expectedNumber); } @@ -486,10 +1318,10 @@ public synchronized Map getBestPossibleAssignment() // Set test instance capacity and partition weights ClusterConfig clusterConfig = _dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig()); - String testCapacityKey = "TestCapacityKey"; - clusterConfig.setInstanceCapacityKeys(Collections.singletonList(testCapacityKey)); - clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(testCapacityKey, 100)); - clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(testCapacityKey, 1)); + clusterConfig.setInstanceCapacityKeys(Collections.singletonList(TEST_CAPACITY_KEY)); + clusterConfig.setDefaultInstanceCapacityMap( + Collections.singletonMap(TEST_CAPACITY_KEY, TEST_CAPACITY_VALUE)); + clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(TEST_CAPACITY_KEY, 1)); _dataAccessor.setProperty(_dataAccessor.keyBuilder().clusterConfig(), clusterConfig); } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java index e1ffbb646c..59decd98e5 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java @@ -898,24 +898,28 @@ public void testGetDomainInformation() { InstanceConfig instanceConfig = new InstanceConfig(instanceName); instanceConfig.setHostName(hostname); instanceConfig.setPort(port); - if (i == 40) { - instanceConfig.setDomain(String - .format("invaliddomain=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, "myzone" + i % 4, - "myrack" + i % 4, hostname)); - } else if (i == 41) { - instanceConfig.setDomain("invaliddomain"); - } else { - String domain = String - .format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, "myzone" + i % 4, - "myrack" + i % 4, hostname); - instanceConfig.setDomain(domain); - } + + String domain = + String.format("group=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, "myzone" + i % 4, + "myrack" + i % 4, hostname); + instanceConfig.setDomain(domain); + LiveInstance liveInstance = new LiveInstance(instanceName); liveInstance.setSessionId(UUID.randomUUID().toString()); liveInstance.setHelixVersion(UUID.randomUUID().toString()); accessor.setProperty(keyBuilder.liveInstance(instanceName), liveInstance); admin.addInstance(clusterName, instanceConfig); admin.enableInstance(clusterName, instanceName, true); + + if (i == 40) { + instanceConfig.setDomain( + String.format("invaliddomain=%s,zone=%s,rack=%s,host=%s", "mygroup" + i % 2, + "myzone" + i % 4, "myrack" + i % 4, hostname)); + admin.setInstanceConfig(clusterName, instanceName, instanceConfig); + } else if (i == 41) { + instanceConfig.setDomain("invaliddomain"); + admin.setInstanceConfig(clusterName, instanceName, instanceConfig); + } } ClusterTopology clusterTopology = admin.getClusterTopology(clusterName); diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index 512a7b4db7..aa93bc9c88 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -556,6 +556,16 @@ public boolean isEvacuateFinished(String clusterName, String instancesNames) { return false; } + @Override + public boolean canCompleteSwap(String clusterName, String instancesNames) { + return false; + } + + @Override + public boolean completeSwapIfPossible(String clusterName, String instanceName) { + return false; + } + @Override public boolean isReadyForPreparingJoiningCluster(String clusterName, String instancesNames) { return false; diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java index 48b467eaac..64fbaff412 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/AbstractResource.java @@ -86,6 +86,8 @@ public enum Command { getInstance, getAllInstances, setInstanceOperation, // TODO: Name is just a place holder, may change in future + canCompleteSwap, + completeSwapIfPossible, onDemandRebalance } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java index efc3ce6521..b920f66ce8 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/PerInstanceAccessor.java @@ -427,52 +427,63 @@ public Response updateInstance(@PathParam("clusterId") String clusterId, } admin.resetPartition(clusterId, instanceName, node.get(PerInstanceProperties.resource.name()).textValue(), - (List) OBJECT_MAPPER - .readValue(node.get(PerInstanceProperties.partitions.name()).toString(), - OBJECT_MAPPER.getTypeFactory() - .constructCollectionType(List.class, String.class))); - break; - case setInstanceOperation: - admin.setInstanceOperation(clusterId, instanceName, state); - break; - case addInstanceTag: - if (!validInstance(node, instanceName)) { - return badRequest("Instance names are not match!"); - } - for (String tag : (List) OBJECT_MAPPER - .readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { - admin.addInstanceTag(clusterId, instanceName, tag); - } - break; - case removeInstanceTag: - if (!validInstance(node, instanceName)) { - return badRequest("Instance names are not match!"); - } - for (String tag : (List) OBJECT_MAPPER - .readValue(node.get(PerInstanceProperties.instanceTags.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { - admin.removeInstanceTag(clusterId, instanceName, tag); - } - break; - case enablePartitions: - admin.enablePartition(true, clusterId, instanceName, - node.get(PerInstanceProperties.resource.name()).textValue(), - (List) OBJECT_MAPPER - .readValue(node.get(PerInstanceProperties.partitions.name()).toString(), - OBJECT_MAPPER.getTypeFactory() - .constructCollectionType(List.class, String.class))); - break; - case disablePartitions: - admin.enablePartition(false, clusterId, instanceName, - node.get(PerInstanceProperties.resource.name()).textValue(), - (List) OBJECT_MAPPER - .readValue(node.get(PerInstanceProperties.partitions.name()).toString(), - OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))); - break; - default: - LOG.error("Unsupported command :" + command); - return badRequest("Unsupported command :" + command); + (List) OBJECT_MAPPER.readValue( + node.get(PerInstanceProperties.partitions.name()).toString(), + OBJECT_MAPPER.getTypeFactory() + .constructCollectionType(List.class, String.class))); + break; + case setInstanceOperation: + admin.setInstanceOperation(clusterId, instanceName, state); + break; + case canCompleteSwap: + if (!admin.canCompleteSwap(clusterId, instanceName)) { + return badRequest("Swap is not ready to be completed!"); + } + break; + case completeSwapIfPossible: + if (!admin.completeSwapIfPossible(clusterId, instanceName)) { + return badRequest("Swap is not ready to be completed!"); + } + break; + case addInstanceTag: + if (!validInstance(node, instanceName)) { + return badRequest("Instance names are not match!"); + } + for (String tag : (List) OBJECT_MAPPER.readValue( + node.get(PerInstanceProperties.instanceTags.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { + admin.addInstanceTag(clusterId, instanceName, tag); + } + break; + case removeInstanceTag: + if (!validInstance(node, instanceName)) { + return badRequest("Instance names are not match!"); + } + for (String tag : (List) OBJECT_MAPPER.readValue( + node.get(PerInstanceProperties.instanceTags.name()).toString(), + OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class))) { + admin.removeInstanceTag(clusterId, instanceName, tag); + } + break; + case enablePartitions: + admin.enablePartition(true, clusterId, instanceName, + node.get(PerInstanceProperties.resource.name()).textValue(), + (List) OBJECT_MAPPER.readValue( + node.get(PerInstanceProperties.partitions.name()).toString(), + OBJECT_MAPPER.getTypeFactory() + .constructCollectionType(List.class, String.class))); + break; + case disablePartitions: + admin.enablePartition(false, clusterId, instanceName, + node.get(PerInstanceProperties.resource.name()).textValue(), + (List) OBJECT_MAPPER.readValue( + node.get(PerInstanceProperties.partitions.name()).toString(), + OBJECT_MAPPER.getTypeFactory() + .constructCollectionType(List.class, String.class))); + break; + default: + LOG.error("Unsupported command :" + command); + return badRequest("Unsupported command :" + command); } } catch (Exception e) { LOG.error("Failed in updating instance : " + instanceName, e);