diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java index 7750bd70b0..c10a6a9ab2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java @@ -117,7 +117,7 @@ private ZNRecord computeBestPartitionAssignment(List allNodes, List> finalPartitionMap = null; Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(), - clusterData.getClusterConfig()); + clusterData.getClusterConfig(), true); // Transform current assignment to instance->partitions map, and get total partitions Map> nodeToPartitionMap = convertPartitionMap(origPartitionMap, allNodeTopo); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java index 08bbaaffa0..011da67772 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java @@ -77,7 +77,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, ResourceControllerDataProvider clusterData) throws HelixException { Map instanceConfigMap = clusterData.getAssignableInstanceConfigMap(); _clusterTopo = - new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true); Node topNode = _clusterTopo.getRootNode(); // for log only diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java index a53257f3dd..96ddfa485d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/MultiRoundCrushRebalanceStrategy.java @@ -84,7 +84,7 @@ public ZNRecord computePartitionAssignment(final List allNodes, ResourceControllerDataProvider clusterData) throws HelixException { Map instanceConfigMap = clusterData.getAssignableInstanceConfigMap(); _clusterTopo = - new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig()); + new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true); Node root = _clusterTopo.getRootNode(); Map> zoneMapping = new HashMap<>(); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 4d4ebabd11..335c30fdf2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -23,6 +23,7 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -57,8 +58,19 @@ public enum Types { private final Map _instanceConfigMap; private final ClusterTopologyConfig _clusterTopologyConfig; + /** + * Create a Topology for a cluster. + * + * @param allNodes allNodes of the given cluster. + * @param liveNodes liveNodes of the given cluster. + * @param instanceConfigMap instanceConfigMap of the given cluster. + * @param clusterConfig clusterConfig of the given cluster. + * @param faultZoneLevelOnly whether to include additional non-faultZone level nodes in the + * topology tree above the end-nodes. + */ public Topology(final List allNodes, final List liveNodes, - final Map instanceConfigMap, ClusterConfig clusterConfig) { + final Map instanceConfigMap, ClusterConfig clusterConfig, + boolean faultZoneLevelOnly) { try { _md = MessageDigest.getInstance("SHA-1"); } catch (NoSuchAlgorithmException ex) { @@ -73,7 +85,20 @@ public Topology(final List allNodes, final List liveNodes, _allInstances.removeAll(_instanceConfigMap.keySet()))); } _clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); - _root = createClusterTree(clusterConfig); + _root = createClusterTree(clusterConfig, faultZoneLevelOnly); + } + + /** + * Create a Topology for a cluster. faultZoneLevelOnly is set to false by default. + * + * @param allNodes allNodes of the given cluster. + * @param liveNodes liveNodes of the given cluster. + * @param instanceConfigMap instanceConfigMap of the given cluster. + * @param clusterConfig clusterConfig of the given cluster. + */ + public Topology(final List allNodes, final List liveNodes, + final Map instanceConfigMap, ClusterConfig clusterConfig) { + this(allNodes, liveNodes, instanceConfigMap, clusterConfig, false); } public String getEndNodeType() { @@ -149,13 +174,18 @@ private static Node cloneTree(Node root, Map newNodeWeight, return newRoot; } - private Node createClusterTree(ClusterConfig clusterConfig) { + private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLevelOnly) { // root Node root = new Node(); root.setName("root"); root.setId(computeId("root")); root.setType(Types.ROOT.name()); + Set unnecessaryTopoKeys = + new HashSet<>(_clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()); + unnecessaryTopoKeys.remove(_clusterTopologyConfig.getFaultZoneType()); + unnecessaryTopoKeys.remove(_clusterTopologyConfig.getEndNodeType()); + // TODO: Currently we add disabled instance to the topology tree. Since they are not considered // TODO: in rebalance, maybe we should skip adding them to the tree for consistence. for (String instanceName : _allInstances) { @@ -167,6 +197,12 @@ private Node createClusterTree(ClusterConfig clusterConfig) { if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) { weight = DEFAULT_NODE_WEIGHT; } + + if (faultZoneLevelOnly) { + // Remove unnecessary keys from the topology map. We do not need to use these to build more layers in + // the topology tree. The topology tree only requires FaultZoneType and EndNodeType. + unnecessaryTopoKeys.forEach(instanceTopologyMap::remove); + } addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances); } catch (IllegalArgumentException e) { if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 69fec9b2ca..dc825f2f78 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -213,7 +213,10 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider new InstanceConfig(instanceName)) .getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet()); - Set assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet(); + // TODO: Figure out why streaming the keySet directly in rare cases causes ConcurrentModificationException + // In theory, this should not be happening since cache refresh is at beginning of the pipeline, so could be some other reason. + // For now, we just copy the keySet to a new HashSet to avoid the exception. + Set assignableLiveInstanceNames = new HashSet<>(dataProvider.getAssignableLiveInstances().keySet()); Set assignableLiveInstanceLogicalIds = assignableLiveInstanceNames.stream().map( instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName, diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java index 3f0aa5d9ec..7cd08d86fa 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -58,7 +59,8 @@ public class TestInstanceOperation extends ZkTestBase { - protected final int NUM_NODE = 6; + private final int ZONE_COUNT = 4; + protected final int NUM_NODE = 10; protected static final int START_PORT = 12918; protected static final int PARTITIONS = 20; @@ -145,6 +147,13 @@ public void beforeClass() throws Exception { @AfterClass public void afterClass() { + // Drop all DBs + for (String db : _allDBs) { + _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + for (MockParticipantManager p : _participants) { p.syncStop(); } @@ -208,13 +217,9 @@ private void resetInstances() { for (int i = 0; i < _participants.size(); i++) { // If instance is not connected to ZK, replace it if (!_participants.get(i).isConnected()) { - // Drop bad instance from the cluster. - _gSetupTool.getClusterManagementTool() - .dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i))); - _participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i), - "zone_" + i, null, true, -1)); + // Replace the stopped participant with a new one and inherit the old instance config. + _participants.set(i, createParticipant(_participantNames.get(i))); _participants.get(i).syncStart(); - continue; } _gSetupTool.getClusterManagementTool() .setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null); @@ -1205,17 +1210,7 @@ public void testNodeSwapAddSwapInFirst() { Collections.emptySet(), Set.of(instanceToSwapInName)); } - private MockParticipantManager createParticipant(String participantName, String logicalId, String zone, - InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { - InstanceConfig config = new InstanceConfig.Builder().setDomain( - String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, - logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) - .build(participantName); - if (capacity >= 0) { - config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity)); - } - _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); - + private MockParticipantManager createParticipant(String participantName) { // start dummy participants MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName); StateMachineEngine stateMachine = participant.getStateMachineEngine(); @@ -1227,8 +1222,17 @@ private MockParticipantManager createParticipant(String participantName, String private void addParticipant(String participantName, String logicalId, String zone, InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) { - MockParticipantManager participant = createParticipant(participantName, logicalId, zone, - instanceOperation, enabled, capacity); + InstanceConfig config = new InstanceConfig.Builder().setDomain( + String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID, + logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation) + .build(participantName); + + if (capacity >= 0) { + config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity)); + } + _gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config); + + MockParticipantManager participant = createParticipant(participantName); participant.syncStart(); _participants.add(participant); @@ -1236,8 +1240,8 @@ private void addParticipant(String participantName, String logicalId, String zon } private void addParticipant(String participantName) { - addParticipant(participantName, Integer.toString(_participants.size()), - "zone_" + _participants.size(), null, true, -1); + addParticipant(participantName, UUID.randomUUID().toString(), + "zone_" + _participants.size() % ZONE_COUNT, null, true, -1); } private void createTestDBs(long delayTime) throws InterruptedException {