From 5dc2e798ce4c01264456f83620d5fb90580f6555 Mon Sep 17 00:00:00 2001 From: Zachary Pinto Date: Tue, 12 Dec 2023 19:19:54 -0800 Subject: [PATCH] Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. This will allow for swap to work in clusters where the non-FaultZone or EndNode domain kv pairs don't directly match the swapping node. --- ...ractEvenDistributionRebalanceStrategy.java | 2 +- .../strategy/CrushRebalanceStrategy.java | 2 +- .../MultiRoundCrushRebalanceStrategy.java | 2 +- .../rebalancer/topology/Topology.java | 42 +++++++++++++++++-- .../rebalancer/TestInstanceOperation.java | 18 ++++---- 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java index 7750bd70b0..c10a6a9ab2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java @@ -117,7 +117,7 @@ private ZNRecord computeBestPartitionAssignment(List allNodes, List> finalPartitionMap = null; Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(), - clusterData.getClusterConfig()); + clusterData.getClusterConfig(), true); // Transform current assignment to instance->partitions map, and get total partitions Map> nodeToPartitionMap = convertPartitionMap(origPartitionMap, allNodeTopo); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java index 08bbaaffa0..011da67772 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java @@ -77,7 +77,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, ResourceControllerDataProvider clusterData) throws HelixException { Map instanceConfigMap = clusterData.getAssignableInstanceConfigMap(); _clusterTopo = - new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true); Node topNode = _clusterTopo.getRootNode(); // for log only diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java index a53257f3dd..96ddfa485d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java @@ -84,7 +84,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, ResourceControllerDataProvider clusterData) throws HelixException { Map instanceConfigMap = clusterData.getAssignableInstanceConfigMap(); _clusterTopo = - new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true); Node root = _clusterTopo.getRootNode(); Map> zoneMapping = new HashMap<>(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 4d4ebabd11..335c30fdf2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -23,6 +23,7 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -57,8 +58,19 @@ public enum Types { private final Map _instanceConfigMap; private final ClusterTopologyConfig _clusterTopologyConfig; + /** + * Create a Topology for a cluster. + * + * @param allNodes allNodes of the given cluster. + * @param liveNodes liveNodes of the given cluster. + * @param instanceConfigMap instanceConfigMap of the given cluster. + * @param clusterConfig clusterConfig of the given cluster. + * @param faultZoneLevelOnly whether to include additional non-faultZone level nodes in the + * topology tree above the end-nodes. + */ public Topology(final List allNodes, final List liveNodes, - final Map instanceConfigMap, ClusterConfig clusterConfig) { + final Map instanceConfigMap, ClusterConfig clusterConfig, + boolean faultZoneLevelOnly) { try { _md = MessageDigest.getInstance("SHA-1"); } catch (NoSuchAlgorithmException ex) { @@ -73,7 +85,20 @@ public Topology(final List allNodes, final List liveNodes, _allInstances.removeAll(_instanceConfigMap.keySet()))); } _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); - _root = createClusterTree(clusterConfig); + _root = createClusterTree(clusterConfig, faultZoneLevelOnly); + } + + /** + * Create a Topology for a cluster. faultZoneLevelOnly is set to false by default. + * + * @param allNodes allNodes of the given cluster. + * @param liveNodes liveNodes of the given cluster. + * @param instanceConfigMap instanceConfigMap of the given cluster. + * @param clusterConfig clusterConfig of the given cluster. + */ + public Topology(final List allNodes, final List liveNodes, + final Map instanceConfigMap, ClusterConfig clusterConfig) { + this(allNodes, liveNodes, instanceConfigMap, clusterConfig, false); } public String getEndNodeType() { @@ -149,13 +174,18 @@ private static Node cloneTree(Node root, Map newNodeWeight, return newRoot; } - private Node createClusterTree(ClusterConfig clusterConfig) { + private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLevelOnly) { // root Node root = new Node(); root.setName("root"); root.setId(computeId("root")); root.setType(Types.ROOT.name()); + Set unnecessaryTopoKeys = + new HashSet<>(_clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()); + unnecessaryTopoKeys.remove(_clusterTopologyConfig.getFaultZoneType()); + unnecessaryTopoKeys.remove(_clusterTopologyConfig.getEndNodeType()); + // TODO: Currently we add disabled instance to the topology tree. Since they are not considered // TODO: in rebalance, maybe we should skip adding them to the tree for consistence. for (String instanceName : _allInstances) { @@ -167,6 +197,12 @@ private Node createClusterTree(ClusterConfig clusterConfig) { if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { weight = DEFAULT_NODE_WEIGHT; } + + if (faultZoneLevelOnly) { + // Remove unnecessary keys from the topology map. We do not need to use these to build more layers in + // the topology tree. The topology tree only requires FaultZoneType and EndNodeType. + unnecessaryTopoKeys.forEach(instanceTopologyMap::remove); + } addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances); } catch (IllegalArgumentException e) { if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 3f0aa5d9ec..854b0f5371 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -58,9 +59,10 @@ public class TestInstanceOperation extends ZkTestBase { - protected final int NUM_NODE = 6; + private final int ZONE_COUNT = 4; + protected final int NUM_NODE = 10; protected static final int START_PORT = 12918; - protected static final int PARTITIONS = 20; + protected static final int PARTITIONS = 30; protected final String CLASS_NAME = getShortClassName(); protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; @@ -211,8 +213,9 @@ private void resetInstances() { // Drop bad instance from the cluster. _gSetupTool.getClusterManagementTool() .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i))); - _participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i), - "zone_" + i, null, true, -1)); + _participants.set(i, + createParticipant(_participantNames.get(i), UUID.randomUUID().toString(), + "zone_" + i % ZONE_COUNT, null, true, -1)); _participants.get(i).syncStart(); continue; } @@ -1207,8 +1210,9 @@ public void testNodeSwapAddSwapInFirst() { private MockParticipantManager createParticipant(String participantName, String logicalId, String zone, InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { + UUID host = UUID.randomUUID(); InstanceConfig config = new InstanceConfig.Builder().setDomain( - String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, + String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, host, LOGICAL_ID, logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) .build(participantName); if (capacity >= 0) { @@ -1236,8 +1240,8 @@ private void addParticipant(String participantName, String logicalId, String zon } private void addParticipant(String participantName) { - addParticipant(participantName, Integer.toString(_participants.size()), - "zone_" + _participants.size(), null, true, -1); + addParticipant(participantName, UUID.randomUUID().toString(), + "zone_" + _participants.size() % ZONE_COUNT, null, true, -1); } private void createTestDBs(long delayTime) throws InterruptedException {