Skip to content

Commit

Permalink
Fix CurrentStateComputationStage when mis configure(#2635)
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu authored Sep 28, 2023
1 parent 38ac277 commit 84d89ef
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;

import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
Expand Down Expand Up @@ -54,8 +55,15 @@ public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
return;
}
for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) {
Map<String, Integer> instanceCapacity = WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
_instanceCapacityMap.put(instanceConfig.getInstanceName(), instanceCapacity);
Map<String, Integer> instanceCapacity = null;
try {
instanceCapacity = WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig);
} catch (HelixException ignore) {
// We don't want to throw exception here, it would be OK if no resource is using Waged.
// Waged rebalancer will fail in later pipeline stage only for waged resource. So it won't block other resources.
}
_instanceCapacityMap.put(instanceConfig.getInstanceName(),
instanceCapacity == null ? new HashMap<>() : instanceCapacity);
_allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new HashMap<>());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public void process(ClusterEvent event) throws Exception {
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());

// TODO: we only need to compute when there are resource using Waged. We should
// do this as perf improvement in future.
WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(dataProvider);
WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(dataProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@

import java.util.HashMap;
import java.util.Map;
import java.util.List;

import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.CurrentState;
Expand All @@ -40,9 +44,17 @@ public void testEmptyCS() {
Map<String, Resource> resourceMap = getResourceMap();
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap);
event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider());
ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider();
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataCache);
event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName));
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
ClusterConfig clsCfg = dataCache.getClusterConfig();
clsCfg.setInstanceCapacityKeys(List.of("s1", "s2", "s3"));
dataCache.setClusterConfig(clsCfg);
dataCache.setInstanceConfigMap(Map.of(
"a", new InstanceConfig("a")
));
runStage(event, stage);
CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(
Expand Down

0 comments on commit 84d89ef

Please sign in to comment.