Skip to content

Commit

Permalink
Change all rebalancer strategies to create Topology without additiona…
Browse files Browse the repository at this point in the history
…l non-FaultZone or EndNode levels of the tree. This will allow for swap to work in clusters where the non-FaultZone or EndNode domain kv pairs don't directly match the swapping node.
  • Loading branch information
zpinto committed Dec 13, 2023
1 parent 6ac348f commit 5dc2e79
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private ZNRecord computeBestPartitionAssignment(List<String> allNodes, List<Stri
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo =
new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
clusterData.getClusterConfig());
clusterData.getClusterConfig(), true);
// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
Node topNode = _clusterTopo.getRootNode();

// for log only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
Node root = _clusterTopo.getRootNode();

Map<String, List<Node>> zoneMapping = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,8 +58,19 @@ public enum Types {
private final Map<String, InstanceConfig> _instanceConfigMap;
private final ClusterTopologyConfig _clusterTopologyConfig;

/**
* Create a Topology for a cluster.
*
* @param allNodes allNodes of the given cluster.
* @param liveNodes liveNodes of the given cluster.
* @param instanceConfigMap instanceConfigMap of the given cluster.
* @param clusterConfig clusterConfig of the given cluster.
* @param faultZoneLevelOnly whether to include additional non-faultZone level nodes in the
* topology tree above the end-nodes.
*/
public Topology(final List<String> allNodes, final List<String> liveNodes,
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig,
boolean faultZoneLevelOnly) {
try {
_md = MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException ex) {
Expand All @@ -73,7 +85,20 @@ public Topology(final List<String> allNodes, final List<String> liveNodes,
_allInstances.removeAll(_instanceConfigMap.keySet())));
}
_clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
_root = createClusterTree(clusterConfig);
_root = createClusterTree(clusterConfig, faultZoneLevelOnly);
}

/**
* Create a Topology for a cluster. faultZoneLevelOnly is set to false by default.
*
* @param allNodes allNodes of the given cluster.
* @param liveNodes liveNodes of the given cluster.
* @param instanceConfigMap instanceConfigMap of the given cluster.
* @param clusterConfig clusterConfig of the given cluster.
*/
public Topology(final List<String> allNodes, final List<String> liveNodes,
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
this(allNodes, liveNodes, instanceConfigMap, clusterConfig, false);
}

public String getEndNodeType() {
Expand Down Expand Up @@ -149,13 +174,18 @@ private static Node cloneTree(Node root, Map<Node, Integer> newNodeWeight,
return newRoot;
}

private Node createClusterTree(ClusterConfig clusterConfig) {
private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLevelOnly) {
// root
Node root = new Node();
root.setName("root");
root.setId(computeId("root"));
root.setType(Types.ROOT.name());

Set<String> unnecessaryTopoKeys =
new HashSet<>(_clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
unnecessaryTopoKeys.remove(_clusterTopologyConfig.getFaultZoneType());
unnecessaryTopoKeys.remove(_clusterTopologyConfig.getEndNodeType());

// TODO: Currently we add disabled instance to the topology tree. Since they are not considered
// TODO: in rebalance, maybe we should skip adding them to the tree for consistence.
for (String instanceName : _allInstances) {
Expand All @@ -167,6 +197,12 @@ private Node createClusterTree(ClusterConfig clusterConfig) {
if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
weight = DEFAULT_NODE_WEIGHT;
}

if (faultZoneLevelOnly) {
// Remove unnecessary keys from the topology map. We do not need to use these to build more layers in
// the topology tree. The topology tree only requires FaultZoneType and EndNodeType.
unnecessaryTopoKeys.forEach(instanceTopologyMap::remove);
}
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (IllegalArgumentException e) {
if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -58,9 +59,10 @@


public class TestInstanceOperation extends ZkTestBase {
protected final int NUM_NODE = 6;
private final int ZONE_COUNT = 4;
protected final int NUM_NODE = 10;
protected static final int START_PORT = 12918;
protected static final int PARTITIONS = 20;
protected static final int PARTITIONS = 30;

protected final String CLASS_NAME = getShortClassName();
protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
Expand Down Expand Up @@ -211,8 +213,9 @@ private void resetInstances() {
// Drop bad instance from the cluster.
_gSetupTool.getClusterManagementTool()
.dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i)));
_participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i),
"zone_" + i, null, true, -1));
_participants.set(i,
createParticipant(_participantNames.get(i), UUID.randomUUID().toString(),
"zone_" + i % ZONE_COUNT, null, true, -1));
_participants.get(i).syncStart();
continue;
}
Expand Down Expand Up @@ -1207,8 +1210,9 @@ public void testNodeSwapAddSwapInFirst() {

private MockParticipantManager createParticipant(String participantName, String logicalId, String zone,
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
UUID host = UUID.randomUUID();
InstanceConfig config = new InstanceConfig.Builder().setDomain(
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, host, LOGICAL_ID,
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
.build(participantName);
if (capacity >= 0) {
Expand Down Expand Up @@ -1236,8 +1240,8 @@ private void addParticipant(String participantName, String logicalId, String zon
}

private void addParticipant(String participantName) {
addParticipant(participantName, Integer.toString(_participants.size()),
"zone_" + _participants.size(), null, true, -1);
addParticipant(participantName, UUID.randomUUID().toString(),
"zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
}

private void createTestDBs(long delayTime) throws InterruptedException {
Expand Down

0 comments on commit 5dc2e79

Please sign in to comment.