Skip to content

Commit

Permalink
[apache/helix] -- Fixes #2646 (Part-1), Optimize WagedInstanceCapacit…
Browse files Browse the repository at this point in the history
…y Calculation to improve Helix Controller Pipeline (#2649)

WagedInstanceCapacity data-structure is computed every time during a pipeline run and in case of large clusters, this computation takes ~80% of total time. Hence, in this PR we are skipping certain cluster events when this cache should not be rebuild, and improving the overall runtime.
  • Loading branch information
himanshukandwal authored Oct 23, 2023
1 parent 7da0ddb commit bd1d28c
Show file tree
Hide file tree
Showing 7 changed files with 706 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,12 @@ public boolean checkAndReduceCapacity(String instance, String resourceName, Stri
return _wagedInstanceCapacity.checkAndReduceInstanceCapacity(instance, resourceName, partition,
partitionWeightMap);
}

/**
* Getter for cached waged instance capacity map.
* @return
*/
public WagedInstanceCapacity getWagedInstanceCapacity() {
return _wagedInstanceCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public Map<String, Integer> getInstanceAvailableCapacity(String instanceName) {
return _instanceCapacityMap.get(instanceName);
}

public Map<String, Map<String, Set<String>>> getAllocatedPartitionsMap() {
return _allocatedPartitionsMap;
}

@Override
public boolean isInstanceCapacityAvailable(String instance, Map<String, Integer> partitionCapacity) {
Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
Expand Down Expand Up @@ -215,7 +219,7 @@ public synchronized boolean checkAndReduceInstanceCapacity(String instance, Stri
}
_allocatedPartitionsMap.computeIfAbsent(instance, k -> new HashMap<>())
.computeIfAbsent(resName, k -> new HashSet<>()).add(partitionName);
LOG.info("Reduced capacity for instance: " + instance + " for resource: " + resName
LOG.debug("Reduced capacity for instance: " + instance + " for resource: " + resName
+ " for partition: " + partitionName);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ public WagedResourceWeightsProvider(ResourceControllerDataProvider clusterData)

public Map<String, Integer> getPartitionWeights(String resourceName, String partition) {
@Nullable ResourceConfig resourceConfig = _clusterData.getResourceConfig(resourceName);
IdealState is = _clusterData.getIdealState(resourceName);
ResourceConfig mergedResourceConfig =
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, is);

return WagedRebalanceUtil.fetchCapacityUsage(partition, mergedResourceConfig, _clusterData.getClusterConfig());
return WagedRebalanceUtil.fetchCapacityUsage(partition, resourceConfig, _clusterData.getClusterConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -113,14 +114,7 @@ public void process(ClusterEvent event) throws Exception {
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());

// TODO: we only need to compute when there are resource using Waged. We should
// do this as perf improvement in future.
WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(dataProvider);
WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(dataProvider);

// Process the currentState and update the available instance capacity.
capacityProvider.process(dataProvider, currentStateOutput, resourceMap, weightProvider);
dataProvider.setWagedCapacityProviders(capacityProvider, weightProvider);
handleResourceCapacityCalculation(event, (ResourceControllerDataProvider) cache, currentStateOutput);
}
}

Expand Down Expand Up @@ -339,4 +333,62 @@ private void reportResourcePartitionCapacityMetrics(ExecutorService executorServ
return null;
});
}

void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDataProvider cache,
CurrentStateOutput currentStateOutput) {
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (skipCapacityCalculation(cache, resourceMap, event)) {
return;
}

Map<String, Resource> wagedEnabledResourceMap = resourceMap.entrySet()
.parallelStream()
.filter(entry -> WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (wagedEnabledResourceMap.isEmpty()) {
return;
}

// Phase 1: Rebuild Always
WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache);
WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(cache);

capacityProvider.process(cache, currentStateOutput, wagedEnabledResourceMap, weightProvider);
cache.setWagedCapacityProviders(capacityProvider, weightProvider);
}

/**
* Function that checks whether we should return early, without any action on the capacity map or not.
*
* @param cache it is the cluster level cache for the resources.
* @param event the cluster event that is undergoing processing.
* @return true, of the condition evaluate to true and no action is needed, else false.
*/
static boolean skipCapacityCalculation(ResourceControllerDataProvider cache, Map<String, Resource> resourceMap,
ClusterEvent event) {
if (resourceMap == null || resourceMap.isEmpty()) {
return true;
}

if (Objects.isNull(cache.getWagedInstanceCapacity())) {
return false;
}

// TODO: We will change this logic to handle each event-type differently and depending on the resource type.
switch (event.getEventType()) {
case ClusterConfigChange:
case InstanceConfigChange:
case ResourceConfigChange:
case ControllerChange:
case LiveInstanceChange:
case CurrentStateChange:
case PeriodicalRebalance:
case MessageChange:
return false;
default:
return true;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package org.apache.helix.controller.rebalancer.waged;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomUtils;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestWagedInstanceCapacity {

private static final int INSTANCE_COUNT = 3;
private static final int RESOURCE_COUNT = 1;
private static final int PARTITION_COUNT = 3;
private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU", "PARTCOUNT", "DISK");
private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);

private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
ImmutableMap.of("CU", 40, "PARTCOUNT", 1, "DISK", 1);

private ResourceControllerDataProvider _clusterData;
private Map<String, Resource> _resourceMap;
private CurrentStateOutput _currentStateOutput;
private WagedInstanceCapacity _wagedInstanceCapacity;

@BeforeMethod
public void setUp() {
// prepare cluster data
_clusterData = new ResourceControllerDataProvider();
Map<String, InstanceConfig> instanceConfigMap = generateInstanceCapacityConfigs();
_clusterData.setInstanceConfigMap(instanceConfigMap);
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
_clusterData.setIdealStates(generateIdealStates());

ClusterConfig clusterConfig = new ClusterConfig("test");
clusterConfig.setTopologyAwareEnabled(false);
clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
_clusterData.setClusterConfig(clusterConfig);

// prepare current state output
_resourceMap = generateResourceMap();
_currentStateOutput = populateCurrentStatesForResources(_resourceMap, instanceConfigMap.keySet());

// prepare instance of waged-instance capacity
_wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
}

@Test
public void testProcessCurrentState() {
Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT", 10, "DISK", 100);

Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

Map<String, Integer> instanceAvailableCapacity = _wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
}

@Test
public void testProcessCurrentStateWithUnableToAssignPart() {
Map<String, Integer> partCapMap = ImmutableMap.of("CU", 110, "PARTCOUNT", 10, "DISK", 100);

Assert.assertFalse(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

Map<String, Integer> instanceAvailableCapacity = _wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(100));
}

@Test
public void testProcessCurrentStateWithDoubleCharge() {
Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT", 10, "DISK", 100);

Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

// charge again
Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

Map<String, Integer> instanceAvailableCapacity = _wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
}

// -- static helpers
private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();

for (int i = 0; i < INSTANCE_COUNT; i ++) {
String instanceName = "instance-" + i;
InstanceConfig config = new InstanceConfig(instanceName);
config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
instanceConfigMap.put(instanceName, config);
}

return instanceConfigMap;
}

private Map<String, ResourceConfig> generateResourcePartitionCapacityConfigs() {
Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();

try {
Map<String, Map<String, Integer>> partitionsCapacityMap = new HashMap<>();
partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);

for (String resourceName : getResourceNames()) {
ResourceConfig config = new ResourceConfig(resourceName);
config.setPartitionCapacityMap(partitionsCapacityMap);
resourceConfigMap.put(resourceName, config);
}
} catch(IOException e) {
throw new RuntimeException("error while setting partition capacity map");
}
return resourceConfigMap;
}

private List<IdealState> generateIdealStates() {
return getResourceNames().stream()
.map(resourceName -> {
IdealState idealState = new IdealState(resourceName);
idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
idealState.setRebalancerClassName(WagedRebalancer.class.getName());
return idealState;
})
.collect(Collectors.toList());
}

private static CurrentStateOutput populateCurrentStatesForResources(
Map<String, Resource> resourceMap, Set<String> instanceNames) {
CurrentStateOutput currentStateOutput = new CurrentStateOutput();

resourceMap.forEach((resourceName, resource) ->
resource.getPartitions().forEach(partition -> {
int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
int idx = 0;
for (Iterator<String> it = instanceNames.iterator(); it.hasNext(); idx ++) {
currentStateOutput.setCurrentState(
resourceName, partition, it.next(), (idx == masterPartIdx) ? "MASTER" : "SLAVE");
}
}));

return currentStateOutput;
}

private static Map<String, Resource> generateResourceMap() {
return getResourceNames().stream()
.map(resourceName -> {
Resource resource = new Resource(resourceName);
IntStream.range(0, PARTITION_COUNT)
.mapToObj(i -> "partition-" + i)
.forEach(resource::addPartition);
return resource;
})
.collect(Collectors.toMap(Resource::getResourceName, Function.identity()));
}

private static List<String> getResourceNames() {
return IntStream.range(0, RESOURCE_COUNT)
.mapToObj(i -> "resource-" + i)
.collect(Collectors.toList());
}
}
Loading

0 comments on commit bd1d28c

Please sign in to comment.