From a6557cfc2ee89d5a9c7afe5d98f5fc6d214c2473 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Thu, 26 Oct 2023 09:42:53 -0400 Subject: [PATCH] Optimize duplicate logicalId filtering to only be called on allNodes and then used to remove duplicate logicalIds from enabledLiveNodes. --- .../rebalancer/DelayedAutoRebalancer.java | 20 +++++----- .../rebalancer/util/DelayedRebalanceUtil.java | 16 +------- .../waged/GlobalRebalanceRunner.java | 2 +- .../rebalancer/waged/WagedRebalancer.java | 37 ++++++++----------- .../rebalancer/TestInstanceOperation.java | 2 + 5 files changed, 30 insertions(+), 47 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index d7579ac008..b7d5134577 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -114,16 +114,16 @@ public IdealState computeNewIdealState(String resourceName, allNodes = clusterData.getAllInstances(); } - allNodes = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( + Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( ClusterTopologyConfig.createFromClusterConfig(clusterConfig), - clusterData.getInstanceConfigMap(), allNodes, null); - liveEnabledNodes = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( - ClusterTopologyConfig.createFromClusterConfig(clusterConfig), - clusterData.getInstanceConfigMap(), liveEnabledNodes, allNodes); + clusterData.getInstanceConfigMap(), allNodes); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + liveEnabledNodes.retainAll(allNodesDeduped); long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig); Set activeNodes = DelayedRebalanceUtil - .getActiveNodes(allNodes, currentIdealState, liveEnabledNodes, + .getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes, clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(), clusterData.getInstanceConfigMap(), delay, clusterConfig); if (delayRebalanceEnabled) { @@ -135,11 +135,11 @@ public IdealState computeNewIdealState(String resourceName, clusterConfig, _manager); } - if (allNodes.isEmpty() || activeNodes.isEmpty()) { + if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) { LOG.error(String.format( "No instances or active instances available for resource %s, " + "allInstances: %s, liveInstances: %s, activeInstances: %s", - resourceName, allNodes, liveEnabledNodes, activeNodes)); + resourceName, allNodesDeduped, liveEnabledNodes, activeNodes)); return generateNewIdealState(resourceName, currentIdealState, emptyMapping(currentIdealState)); } @@ -166,7 +166,7 @@ public IdealState computeNewIdealState(String resourceName, stateCountMap, maxPartition); // sort node lists to ensure consistent preferred assignments - List allNodeList = new ArrayList<>(allNodes); + List allNodeList = new ArrayList<>(allNodesDeduped); // We will not assign partition to instances with evacuation and wap-out tag. // TODO: Currently we have 2 groups of instances and compute preference list twice and merge. // Eventually we want to have exclusive groups of instance for different instance tag. @@ -215,7 +215,7 @@ public IdealState computeNewIdealState(String resourceName, LOG.debug("stateCountMap: {}", stateCountMap); LOG.debug("liveEnabledNodes: {}", liveEnabledNodes); LOG.debug("activeNodes: {}", activeNodes); - LOG.debug("allNodes: {}", allNodes); + LOG.debug("allNodes: {}", allNodesDeduped); LOG.debug("maxPartition: {}", maxPartition); LOG.debug("newIdealMapping: {}", newIdealMapping); LOG.debug("finalMapping: {}", finalMapping); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java index 5bc9a0e8a9..3d7b7cc9d5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/DelayedRebalanceUtil.java @@ -147,25 +147,16 @@ public static Set filterOutEvacuatingInstances(Map - * Additionally, we never want to add a node that is not in the allowedInstances set if it is - * provided. allowedInstances should contain the filtered and deduped superset of nodes such as - * allInstances. If this filter method is called on a subset of allInstances, it should not ever - * keep an instance with a duplicate logical ID that was not previously returned when filtering - * the superset. This is important for cases where filtering allEnabledLiveInstances is filtered - * and the SWAP_OUT node is not enabled or live. We must prevent the SWAP_IN node from passing - * through the filter. * * @param clusterTopologyConfig the cluster topology configuration * @param instanceConfigMap the map of instance name to corresponding InstanceConfig * @param instances the set of instances to filter out duplicate logicalIDs for - * @param allowedInstances the set of instances that are allowed to pass through the filter * @return the set of instances with duplicate logicalIDs filtered out, there will only be one * instance per logicalID */ public static Set filterOutInstancesWithDuplicateLogicalIds( ClusterTopologyConfig clusterTopologyConfig, Map instanceConfigMap, - Set instances, Set allowedInstances) { + Set instances) { Set filteredNodes = new HashSet<>(); Map filteredInstancesByLogicalId = new HashMap<>(); @@ -177,11 +168,6 @@ public static Set filterOutInstancesWithDuplicateLogicalIds( String thisLogicalId = thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType()); - // We do not want to allow the addition of nodes that are not in the allowedInstances set it is provided. - if (allowedInstances != null && !allowedInstances.contains(node)) { - return; - } - if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) { InstanceConfig filteredDuplicateInstanceConfig = instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId)); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java index ee99dd6af2..6c199bc1be 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/GlobalRebalanceRunner.java @@ -172,7 +172,7 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map computeBestPossibleStates( Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), - clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null); - Set liveEnabledNodesDeduped = - DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( - ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), - clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances(), - allNodesDeduped); + clusterData.getInstanceConfigMap(), clusterData.getAllInstances()); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + Set liveEnabledNodesDeduped = clusterData.getEnabledLiveInstances(); + liveEnabledNodesDeduped.retainAll(allNodesDeduped); Set activeNodes = DelayedRebalanceUtil.getActiveNodes(allNodesDeduped, liveEnabledNodesDeduped, @@ -422,15 +421,13 @@ private Map handleDelayedRebalanceMinActiveReplica( RebalanceAlgorithm algorithm) throws HelixRebalanceException { // the "real" live nodes at the time // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`. - final Set allNodes = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( + final Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), - clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null); - final Set enabledLiveInstances = - DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(), - DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( - ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), - clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances(), - allNodes)); + clusterData.getInstanceConfigMap(), clusterData.getAllInstances()); + final Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + enabledLiveInstances.retainAll(allNodesDeduped); if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) { // no need for additional process, return the current resource assignment @@ -647,13 +644,11 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust Set allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), - clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null); - Set enabledLiveInstances = - DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(), - DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds( - ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()), - clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances(), - allNodesDeduped)); + clusterData.getInstanceConfigMap(), clusterData.getAllInstances()); + Set enabledLiveInstances = clusterData.getEnabledLiveInstances(); + // Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes + // This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes + enabledLiveInstances.retainAll(allNodesDeduped); int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size()); int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index d3fc16be7f..8bec5f266e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -586,6 +586,8 @@ public void testNodeSwapNoTopologySetup() throws Exception { addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID), instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE), InstanceConstants.InstanceOperation.SWAP_IN, true); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); } @Test(dependsOnMethods = "testNodeSwapNoTopologySetup")