From 84d89efafa389ed1fa9449bf9c4708bcc3c72a5b Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Wed, 27 Sep 2023 18:31:42 -0700 Subject: [PATCH] Fix CurrentStateComputationStage when mis configure(#2635) --- .../rebalancer/waged/WagedInstanceCapacity.java | 12 ++++++++++-- .../stages/CurrentStateComputationStage.java | 2 ++ .../stages/TestCurrentStateComputationStage.java | 14 +++++++++++++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java index 29ecf451d9..cd19c301cd 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; import org.apache.helix.controller.rebalancer.util.WagedValidationUtil; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.ClusterConfig; @@ -54,8 +55,15 @@ public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) { return; } for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) { - Map instanceCapacity = WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig); - _instanceCapacityMap.put(instanceConfig.getInstanceName(), instanceCapacity); + Map instanceCapacity = null; + try { + instanceCapacity = WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, instanceConfig); + } catch (HelixException ignore) { + // We don't want to throw exception here, it would be OK if no resource is using Waged. + // Waged rebalancer will fail in later pipeline stage only for waged resource. So it won't block other resources. + } + _instanceCapacityMap.put(instanceConfig.getInstanceName(), + instanceCapacity == null ? new HashMap<>() : instanceCapacity); _allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new HashMap<>()); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java index 4eb4004af2..51abca36b3 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java @@ -113,6 +113,8 @@ public void process(ClusterEvent event) throws Exception { reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(), clusterStatusMonitor, dataProvider.getResourceConfigMap().values()); + // TODO: we only need to compute when there are resource using Waged. We should + // do this as perf improvement in future. WagedInstanceCapacity capacityProvider = new WagedInstanceCapacity(dataProvider); WagedResourceWeightsProvider weightProvider = new WagedResourceWeightsProvider(dataProvider); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java index 91e275aafa..f66638ec61 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java @@ -21,9 +21,13 @@ import java.util.HashMap; import java.util.Map; +import java.util.List; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.model.CurrentState; @@ -40,9 +44,17 @@ public void testEmptyCS() { Map resourceMap = getResourceMap(); event.addAttribute(AttributeName.RESOURCES.name(), resourceMap); event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), resourceMap); - event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); + ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider(); + event.addAttribute(AttributeName.ControllerDataProvider.name(), dataCache); + event.addAttribute(AttributeName.clusterStatusMonitor.name(), new ClusterStatusMonitor(_clusterName)); CurrentStateComputationStage stage = new CurrentStateComputationStage(); runStage(event, new ReadClusterDataStage()); + ClusterConfig clsCfg = dataCache.getClusterConfig(); + clsCfg.setInstanceCapacityKeys(List.of("s1", "s2", "s3")); + dataCache.setClusterConfig(clsCfg); + dataCache.setInstanceConfigMap(Map.of( + "a", new InstanceConfig("a") + )); runStage(event, stage); CurrentStateOutput output = event.getAttribute(AttributeName.CURRENT_STATE.name()); AssertJUnit.assertEquals(