Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[apache/helix] -- Fixes #2646 Optimize WagedInstanceCapacity Calculation to improve Helix Controller Pipeline #2649

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,12 @@ public boolean checkAndReduceCapacity(String instance, String resourceName, Stri
return _wagedInstanceCapacity.checkAndReduceInstanceCapacity(instance, resourceName, partition,
partitionWeightMap);
}

/**
* Getter for cached waged instance capacity map.
* @return
*/
public WagedInstanceCapacity getWagedInstanceCapacity() {
return _wagedInstanceCapacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ public Map<String, Integer> getInstanceAvailableCapacity(String instanceName) {
return _instanceCapacityMap.get(instanceName);
}

public Map<String, Map<String, Set<String>>> getAllocatedPartitionsMap() {
return _allocatedPartitionsMap;
}

@Override
public boolean isInstanceCapacityAvailable(String instance, Map<String, Integer> partitionCapacity) {
Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
Expand Down Expand Up @@ -215,7 +219,7 @@ public synchronized boolean checkAndReduceInstanceCapacity(String instance, Stri
}
_allocatedPartitionsMap.computeIfAbsent(instance, k -> new HashMap<>())
.computeIfAbsent(resName, k -> new HashSet<>()).add(partitionName);
LOG.info("Reduced capacity for instance: " + instance + " for resource: " + resName
LOG.debug("Reduced capacity for instance: " + instance + " for resource: " + resName
himanshukandwal marked this conversation as resolved.
Show resolved Hide resolved
+ " for partition: " + partitionName);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ public WagedResourceWeightsProvider(ResourceControllerDataProvider clusterData)

public Map<String, Integer> getPartitionWeights(String resourceName, String partition) {
@Nullable ResourceConfig resourceConfig = _clusterData.getResourceConfig(resourceName);
IdealState is = _clusterData.getIdealState(resourceName);
ResourceConfig mergedResourceConfig =
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, is);

return WagedRebalanceUtil.fetchCapacityUsage(partition, mergedResourceConfig, _clusterData.getClusterConfig());
return WagedRebalanceUtil.fetchCapacityUsage(partition, resourceConfig, _clusterData.getClusterConfig());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -113,14 +114,7 @@ public void process(ClusterEvent event) throws Exception {
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());

// TODO: we only need to compute when there are resource using Waged. We should
// do this as perf improvement in future.
WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(dataProvider);
WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(dataProvider);

// Process the currentState and update the available instance capacity.
capacityProvider.process(dataProvider, currentStateOutput, resourceMap, weightProvider);
dataProvider.setWagedCapacityProviders(capacityProvider, weightProvider);
handleResourceCapacityCalculation(event, (ResourceControllerDataProvider) cache, currentStateOutput);
}
}

Expand Down Expand Up @@ -339,4 +333,62 @@ private void reportResourcePartitionCapacityMetrics(ExecutorService executorServ
return null;
});
}

void handleResourceCapacityCalculation(ClusterEvent event, ResourceControllerDataProvider cache,
CurrentStateOutput currentStateOutput) {
Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
if (skipCapacityCalculation(cache, resourceMap, event)) {
return;
}

Map<String, Resource> wagedEnabledResourceMap = resourceMap.entrySet()
.parallelStream()
.filter(entry -> WagedValidationUtil.isWagedEnabled(cache.getIdealState(entry.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

if (wagedEnabledResourceMap.isEmpty()) {
return;
}

// Phase 1: Rebuild Always
WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(cache);
WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(cache);

capacityProvider.process(cache, currentStateOutput, wagedEnabledResourceMap, weightProvider);
cache.setWagedCapacityProviders(capacityProvider, weightProvider);
}

/**
* Function that checks whether we should return early, without any action on the capacity map or not.
*
* @param cache it is the cluster level cache for the resources.
* @param event the cluster event that is undergoing processing.
* @return true, of the condition evaluate to true and no action is needed, else false.
*/
static boolean skipCapacityCalculation(ResourceControllerDataProvider cache, Map<String, Resource> resourceMap,
ClusterEvent event) {
if (resourceMap == null || resourceMap.isEmpty()) {
return true;
}

if (Objects.isNull(cache.getWagedInstanceCapacity())) {
return false;
}

// TODO: We will change this logic to handle each event-type differently and depending on the resource type.
switch (event.getEventType()) {
case ClusterConfigChange:
case InstanceConfigChange:
case ResourceConfigChange:
case ControllerChange:
case LiveInstanceChange:
case CurrentStateChange:
case PeriodicalRebalance:
case MessageChange:
return false;
default:
return true;
}
}
himanshukandwal marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package org.apache.helix.controller.rebalancer.waged;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.RandomUtils;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class TestWagedInstanceCapacity {

private static final int INSTANCE_COUNT = 3;
private static final int RESOURCE_COUNT = 1;
private static final int PARTITION_COUNT = 3;
private static final List<String> CAPACITY_KEYS = Lists.newArrayList("CU", "PARTCOUNT", "DISK");
private static final Map<String, Integer> DEFAULT_INSTANCE_CAPACITY_MAP =
ImmutableMap.of("CU", 100, "PARTCOUNT", 10, "DISK", 100);

private static final Map<String, Integer> DEFAULT_PART_CAPACITY_MAP =
ImmutableMap.of("CU", 40, "PARTCOUNT", 1, "DISK", 1);

private ResourceControllerDataProvider _clusterData;
private Map<String, Resource> _resourceMap;
private CurrentStateOutput _currentStateOutput;
private WagedInstanceCapacity _wagedInstanceCapacity;

@BeforeMethod
public void setUp() {
// prepare cluster data
_clusterData = new ResourceControllerDataProvider();
Map<String, InstanceConfig> instanceConfigMap = generateInstanceCapacityConfigs();
_clusterData.setInstanceConfigMap(instanceConfigMap);
_clusterData.setResourceConfigMap(generateResourcePartitionCapacityConfigs());
_clusterData.setIdealStates(generateIdealStates());

ClusterConfig clusterConfig = new ClusterConfig("test");
clusterConfig.setTopologyAwareEnabled(false);
clusterConfig.setInstanceCapacityKeys(CAPACITY_KEYS);
_clusterData.setClusterConfig(clusterConfig);

// prepare current state output
_resourceMap = generateResourceMap();
_currentStateOutput = populateCurrentStatesForResources(_resourceMap, instanceConfigMap.keySet());

// prepare instance of waged-instance capacity
_wagedInstanceCapacity = new WagedInstanceCapacity(_clusterData);
}

@Test
public void testProcessCurrentState() {
Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT", 10, "DISK", 100);

Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

Map<String, Integer> instanceAvailableCapacity = _wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
}

@Test
public void testProcessCurrentStateWithUnableToAssignPart() {
Map<String, Integer> partCapMap = ImmutableMap.of("CU", 110, "PARTCOUNT", 10, "DISK", 100);

Assert.assertFalse(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

Map<String, Integer> instanceAvailableCapacity = _wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(100));
}

@Test
public void testProcessCurrentStateWithDoubleCharge() {
Map<String, Integer> partCapMap = ImmutableMap.of("CU", 10, "PARTCOUNT", 10, "DISK", 100);

Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

// charge again
Assert.assertTrue(_wagedInstanceCapacity.checkAndReduceInstanceCapacity(
"instance-0", "resource-0", "partition-0", partCapMap));

Map<String, Integer> instanceAvailableCapacity = _wagedInstanceCapacity.getInstanceAvailableCapacity("instance-0");
Assert.assertTrue(instanceAvailableCapacity.get("CU").equals(90));
}

// -- static helpers
private Map<String, InstanceConfig> generateInstanceCapacityConfigs() {
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();

for (int i = 0; i < INSTANCE_COUNT; i ++) {
String instanceName = "instance-" + i;
InstanceConfig config = new InstanceConfig(instanceName);
config.setInstanceCapacityMap(DEFAULT_INSTANCE_CAPACITY_MAP);
instanceConfigMap.put(instanceName, config);
}

return instanceConfigMap;
}

private Map<String, ResourceConfig> generateResourcePartitionCapacityConfigs() {
Map<String, ResourceConfig> resourceConfigMap = new HashMap<>();

try {
Map<String, Map<String, Integer>> partitionsCapacityMap = new HashMap<>();
partitionsCapacityMap.put("DEFAULT", DEFAULT_PART_CAPACITY_MAP);

for (String resourceName : getResourceNames()) {
ResourceConfig config = new ResourceConfig(resourceName);
config.setPartitionCapacityMap(partitionsCapacityMap);
resourceConfigMap.put(resourceName, config);
}
} catch(IOException e) {
throw new RuntimeException("error while setting partition capacity map");
}
return resourceConfigMap;
}

private List<IdealState> generateIdealStates() {
return getResourceNames().stream()
.map(resourceName -> {
IdealState idealState = new IdealState(resourceName);
idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
idealState.setRebalancerClassName(WagedRebalancer.class.getName());
return idealState;
})
.collect(Collectors.toList());
}

private static CurrentStateOutput populateCurrentStatesForResources(
Map<String, Resource> resourceMap, Set<String> instanceNames) {
CurrentStateOutput currentStateOutput = new CurrentStateOutput();

resourceMap.forEach((resourceName, resource) ->
resource.getPartitions().forEach(partition -> {
int masterPartIdx = RandomUtils.nextInt(0, instanceNames.size());
int idx = 0;
for (Iterator<String> it = instanceNames.iterator(); it.hasNext(); idx ++) {
currentStateOutput.setCurrentState(
resourceName, partition, it.next(), (idx == masterPartIdx) ? "MASTER" : "SLAVE");
}
}));

return currentStateOutput;
}

private static Map<String, Resource> generateResourceMap() {
return getResourceNames().stream()
.map(resourceName -> {
Resource resource = new Resource(resourceName);
IntStream.range(0, PARTITION_COUNT)
.mapToObj(i -> "partition-" + i)
.forEach(resource::addPartition);
return resource;
})
.collect(Collectors.toMap(Resource::getResourceName, Function.identity()));
}

private static List<String> getResourceNames() {
return IntStream.range(0, RESOURCE_COUNT)
.mapToObj(i -> "resource-" + i)
.collect(Collectors.toList());
}
}
Loading
Loading