Skip to content

Commit

Permalink
Optimize duplicate logicalId filtering to only be called on allNodes …
Browse files Browse the repository at this point in the history
…and then used to remove duplicate logicalIds from enabledLiveNodes.
  • Loading branch information
zpinto committed Oct 26, 2023
1 parent 6d0a3f7 commit a6557cf
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ public IdealState computeNewIdealState(String resourceName,
allNodes = clusterData.getAllInstances();
}

allNodes = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
clusterData.getInstanceConfigMap(), allNodes, null);
liveEnabledNodes = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterConfig),
clusterData.getInstanceConfigMap(), liveEnabledNodes, allNodes);
clusterData.getInstanceConfigMap(), allNodes);
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
liveEnabledNodes.retainAll(allNodesDeduped);

long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, currentIdealState, liveEnabledNodes,
.getActiveNodes(allNodesDeduped, currentIdealState, liveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Expand All @@ -135,11 +135,11 @@ public IdealState computeNewIdealState(String resourceName,
clusterConfig, _manager);
}

if (allNodes.isEmpty() || activeNodes.isEmpty()) {
if (allNodesDeduped.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s",
resourceName, allNodes, liveEnabledNodes, activeNodes));
resourceName, allNodesDeduped, liveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
Expand All @@ -166,7 +166,7 @@ public IdealState computeNewIdealState(String resourceName,
stateCountMap, maxPartition);

// sort node lists to ensure consistent preferred assignments
List<String> allNodeList = new ArrayList<>(allNodes);
List<String> allNodeList = new ArrayList<>(allNodesDeduped);
// We will not assign partition to instances with evacuation and wap-out tag.
// TODO: Currently we have 2 groups of instances and compute preference list twice and merge.
// Eventually we want to have exclusive groups of instance for different instance tag.
Expand Down Expand Up @@ -215,7 +215,7 @@ public IdealState computeNewIdealState(String resourceName,
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("liveEnabledNodes: {}", liveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
LOG.debug("allNodes: {}", allNodes);
LOG.debug("allNodes: {}", allNodesDeduped);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,16 @@ public static Set<String> filterOutEvacuatingInstances(Map<String, InstanceConfi
* assignable. If there are duplicates with one node having no InstanceOperation and the other
* having SWAP_OUT, the node with no InstanceOperation will be chosen. This signifies SWAP
* completion, therefore making the node assignable.
* <p>
* Additionally, we never want to add a node that is not in the allowedInstances set if it is
* provided. allowedInstances should contain the filtered and deduped superset of nodes such as
* allInstances. If this filter method is called on a subset of allInstances, it should not ever
* keep an instance with a duplicate logical ID that was not previously returned when filtering
* the superset. This is important for cases where filtering allEnabledLiveInstances is filtered
* and the SWAP_OUT node is not enabled or live. We must prevent the SWAP_IN node from passing
* through the filter.
*
* @param clusterTopologyConfig the cluster topology configuration
* @param instanceConfigMap the map of instance name to corresponding InstanceConfig
* @param instances the set of instances to filter out duplicate logicalIDs for
* @param allowedInstances the set of instances that are allowed to pass through the filter
* @return the set of instances with duplicate logicalIDs filtered out, there will only be one
* instance per logicalID
*/
public static Set<String> filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig clusterTopologyConfig, Map<String, InstanceConfig> instanceConfigMap,
Set<String> instances, Set<String> allowedInstances) {
Set<String> instances) {
Set<String> filteredNodes = new HashSet<>();
Map<String, String> filteredInstancesByLogicalId = new HashMap<>();

Expand All @@ -177,11 +168,6 @@ public static Set<String> filterOutInstancesWithDuplicateLogicalIds(
String thisLogicalId =
thisInstanceConfig.getLogicalId(clusterTopologyConfig.getEndNodeType());

// We do not want to allow the addition of nodes that are not in the allowedInstances set it is provided.
if (allowedInstances != null && !allowedInstances.contains(node)) {
return;
}

if (filteredInstancesByLogicalId.containsKey(thisLogicalId)) {
InstanceConfig filteredDuplicateInstanceConfig =
instanceConfigMap.get(filteredInstancesByLogicalId.get(thisLogicalId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private void doGlobalRebalance(ResourceControllerDataProvider clusterData, Map<S
// with both the SWAP_OUT and SWAP_IN node.
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null),
clusterData.getInstanceConfigMap(), clusterData.getAllInstances()),
clusterChanges, currentBaseline);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,11 @@ private Map<String, IdealState> computeBestPossibleStates(

Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null);
Set<String> liveEnabledNodesDeduped =
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances(),
allNodesDeduped);
clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
Set<String> liveEnabledNodesDeduped = clusterData.getEnabledLiveInstances();
liveEnabledNodesDeduped.retainAll(allNodesDeduped);

Set<String> activeNodes =
DelayedRebalanceUtil.getActiveNodes(allNodesDeduped, liveEnabledNodesDeduped,
Expand Down Expand Up @@ -422,15 +421,13 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
// TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`.
final Set<String> allNodes = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
final Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null);
final Set<String> enabledLiveInstances =
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances(),
allNodes));
clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
enabledLiveInstances.retainAll(allNodesDeduped);

if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
Expand Down Expand Up @@ -647,13 +644,11 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust

Set<String> allNodesDeduped = DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getAllInstances(), null);
Set<String> enabledLiveInstances =
DelayedRebalanceUtil.filterOutEvacuatingInstances(clusterData.getInstanceConfigMap(),
DelayedRebalanceUtil.filterOutInstancesWithDuplicateLogicalIds(
ClusterTopologyConfig.createFromClusterConfig(clusterData.getClusterConfig()),
clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances(),
allNodesDeduped));
clusterData.getInstanceConfigMap(), clusterData.getAllInstances());
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// Remove the non-selected instances with duplicate logicalIds from liveEnabledNodes
// This ensures the same duplicate instance is kept in both allNodesDeduped and liveEnabledNodes
enabledLiveInstances.retainAll(allNodesDeduped);

int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ public void testNodeSwapNoTopologySetup() throws Exception {
addParticipant(instanceToSwapInName, instanceToSwapOutInstanceConfig.getLogicalId(LOGICAL_ID),
instanceToSwapOutInstanceConfig.getDomainAsMap().get(ZONE),
InstanceConstants.InstanceOperation.SWAP_IN, true);

Assert.assertTrue(_clusterVerifier.verifyByPolling());
}

@Test(dependsOnMethods = "testNodeSwapNoTopologySetup")
Expand Down

0 comments on commit a6557cf

Please sign in to comment.