Skip to content

Commit

Permalink
Build Topology with only required levels (FaultZone and EndNode) (#2713)
Browse files Browse the repository at this point in the history
* Change all rebalancer strategies to create Topology without additional non-FaultZone or EndNode levels of the tree. This will allow for swap to work in clusters where the non-FaultZone or EndNode domain kv pairs don't directly match the swapping node.
  • Loading branch information
zpinto authored and xyuanlu committed Dec 20, 2023
1 parent aa60897 commit d0d7183
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private ZNRecord computeBestPartitionAssignment(List<String> allNodes, List<Stri
Map<String, List<Node>> finalPartitionMap = null;
Topology allNodeTopo =
new Topology(allNodes, allNodes, clusterData.getAssignableInstanceConfigMap(),
clusterData.getClusterConfig());
clusterData.getClusterConfig(), true);
// Transform current assignment to instance->partitions map, and get total partitions
Map<Node, List<String>> nodeToPartitionMap =
convertPartitionMap(origPartitionMap, allNodeTopo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
Node topNode = _clusterTopo.getRootNode();

// for log only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ZNRecord computePartitionAssignment(final List<String> allNodes,
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getAssignableInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig(), true);
Node root = _clusterTopo.getRootNode();

Map<String, List<Node>> zoneMapping = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,8 +58,19 @@ public enum Types {
private final Map<String, InstanceConfig> _instanceConfigMap;
private final ClusterTopologyConfig _clusterTopologyConfig;

/**
* Create a Topology for a cluster.
*
* @param allNodes allNodes of the given cluster.
* @param liveNodes liveNodes of the given cluster.
* @param instanceConfigMap instanceConfigMap of the given cluster.
* @param clusterConfig clusterConfig of the given cluster.
* @param faultZoneLevelOnly whether to include additional non-faultZone level nodes in the
* topology tree above the end-nodes.
*/
public Topology(final List<String> allNodes, final List<String> liveNodes,
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig,
boolean faultZoneLevelOnly) {
try {
_md = MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException ex) {
Expand All @@ -73,7 +85,20 @@ public Topology(final List<String> allNodes, final List<String> liveNodes,
_allInstances.removeAll(_instanceConfigMap.keySet())));
}
_clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
_root = createClusterTree(clusterConfig);
_root = createClusterTree(clusterConfig, faultZoneLevelOnly);
}

/**
* Create a Topology for a cluster. faultZoneLevelOnly is set to false by default.
*
* @param allNodes allNodes of the given cluster.
* @param liveNodes liveNodes of the given cluster.
* @param instanceConfigMap instanceConfigMap of the given cluster.
* @param clusterConfig clusterConfig of the given cluster.
*/
public Topology(final List<String> allNodes, final List<String> liveNodes,
final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig clusterConfig) {
this(allNodes, liveNodes, instanceConfigMap, clusterConfig, false);
}

public String getEndNodeType() {
Expand Down Expand Up @@ -149,13 +174,18 @@ private static Node cloneTree(Node root, Map<Node, Integer> newNodeWeight,
return newRoot;
}

private Node createClusterTree(ClusterConfig clusterConfig) {
private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLevelOnly) {
// root
Node root = new Node();
root.setName("root");
root.setId(computeId("root"));
root.setType(Types.ROOT.name());

Set<String> unnecessaryTopoKeys =
new HashSet<>(_clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
unnecessaryTopoKeys.remove(_clusterTopologyConfig.getFaultZoneType());
unnecessaryTopoKeys.remove(_clusterTopologyConfig.getEndNodeType());

// TODO: Currently we add disabled instance to the topology tree. Since they are not considered
// TODO: in rebalance, maybe we should skip adding them to the tree for consistence.
for (String instanceName : _allInstances) {
Expand All @@ -167,6 +197,12 @@ private Node createClusterTree(ClusterConfig clusterConfig) {
if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
weight = DEFAULT_NODE_WEIGHT;
}

if (faultZoneLevelOnly) {
// Remove unnecessary keys from the topology map. We do not need to use these to build more layers in
// the topology tree. The topology tree only requires FaultZoneType and EndNodeType.
unnecessaryTopoKeys.forEach(instanceTopologyMap::remove);
}
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (IllegalArgumentException e) {
if (InstanceValidationUtil.isInstanceEnabled(insConfig, clusterConfig)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider
new InstanceConfig(instanceName))
.getLogicalId(clusterTopologyConfig.getEndNodeType())).collect(Collectors.toSet());

Set<String> assignableLiveInstanceNames = dataProvider.getAssignableLiveInstances().keySet();
// TODO: Figure out why streaming the keySet directly in rare cases causes ConcurrentModificationException
// In theory, this should not be happening since cache refresh is at beginning of the pipeline, so could be some other reason.
// For now, we just copy the keySet to a new HashSet to avoid the exception.
Set<String> assignableLiveInstanceNames = new HashSet<>(dataProvider.getAssignableLiveInstances().keySet());
Set<String> assignableLiveInstanceLogicalIds =
assignableLiveInstanceNames.stream().map(
instanceName -> assignableInstanceConfigMap.getOrDefault(instanceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -58,7 +59,8 @@


public class TestInstanceOperation extends ZkTestBase {
protected final int NUM_NODE = 6;
private final int ZONE_COUNT = 4;
protected final int NUM_NODE = 10;
protected static final int START_PORT = 12918;
protected static final int PARTITIONS = 20;

Expand Down Expand Up @@ -145,6 +147,13 @@ public void beforeClass() throws Exception {

@AfterClass
public void afterClass() {
// Drop all DBs
for (String db : _allDBs) {
_gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
}

Assert.assertTrue(_clusterVerifier.verifyByPolling());

for (MockParticipantManager p : _participants) {
p.syncStop();
}
Expand Down Expand Up @@ -208,13 +217,9 @@ private void resetInstances() {
for (int i = 0; i < _participants.size(); i++) {
// If instance is not connected to ZK, replace it
if (!_participants.get(i).isConnected()) {
// Drop bad instance from the cluster.
_gSetupTool.getClusterManagementTool()
.dropInstance(CLUSTER_NAME, _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, _participantNames.get(i)));
_participants.set(i, createParticipant(_participantNames.get(i), Integer.toString(i),
"zone_" + i, null, true, -1));
// Replace the stopped participant with a new one and inherit the old instance config.
_participants.set(i, createParticipant(_participantNames.get(i)));
_participants.get(i).syncStart();
continue;
}
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, _participantNames.get(i), null);
Expand Down Expand Up @@ -1205,17 +1210,7 @@ public void testNodeSwapAddSwapInFirst() {
Collections.emptySet(), Set.of(instanceToSwapInName));
}

private MockParticipantManager createParticipant(String participantName, String logicalId, String zone,
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
InstanceConfig config = new InstanceConfig.Builder().setDomain(
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
.build(participantName);
if (capacity >= 0) {
config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
}
_gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);

private MockParticipantManager createParticipant(String participantName) {
// start dummy participants
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, participantName);
StateMachineEngine stateMachine = participant.getStateMachineEngine();
Expand All @@ -1227,17 +1222,26 @@ private MockParticipantManager createParticipant(String participantName, String

private void addParticipant(String participantName, String logicalId, String zone,
InstanceConstants.InstanceOperation instanceOperation, boolean enabled, int capacity) {
MockParticipantManager participant = createParticipant(participantName, logicalId, zone,
instanceOperation, enabled, capacity);
InstanceConfig config = new InstanceConfig.Builder().setDomain(
String.format("%s=%s, %s=%s, %s=%s", ZONE, zone, HOST, participantName, LOGICAL_ID,
logicalId)).setInstanceEnabled(enabled).setInstanceOperation(instanceOperation)
.build(participantName);

if (capacity >= 0) {
config.setInstanceCapacityMap(Map.of(TEST_CAPACITY_KEY, capacity));
}
_gSetupTool.getClusterManagementTool().addInstance(CLUSTER_NAME, config);

MockParticipantManager participant = createParticipant(participantName);

participant.syncStart();
_participants.add(participant);
_participantNames.add(participantName);
}

private void addParticipant(String participantName) {
addParticipant(participantName, Integer.toString(_participants.size()),
"zone_" + _participants.size(), null, true, -1);
addParticipant(participantName, UUID.randomUUID().toString(),
"zone_" + _participants.size() % ZONE_COUNT, null, true, -1);
}

private void createTestDBs(long delayTime) throws InterruptedException {
Expand Down

0 comments on commit d0d7183

Please sign in to comment.