Skip to content

Commit

Permalink
Expose metric for table rebalance (apache#12270)
Browse files Browse the repository at this point in the history
  • Loading branch information
suddendust authored Jan 19, 2024
1 parent 17e1aa1 commit 894e56e
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,6 @@ rules:
cache: true
labels:
version: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableConsumptionPaused.([^\\.]*?)_(OFFLINE|REALTIME)\"><>(\\w+)"
name: "pinot_controller_tableConsumptionPaused_$3"
cache: true
labels:
tableName: "$1"
tableType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.tableDisabled.([^\\.]*?)_(OFFLINE|REALTIME)\"><>(\\w+)"
name: "pinot_controller_tableDisabled_$3"
cache: true
labels:
tableName: "$1"
tableType: "$2"

## Metrics that fit the catch-all patterns above should not be added to this file.
## In case a metric does not fit the catch-all patterns, add them before this comment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {

TABLE_CONSUMPTION_PAUSED("tableConsumptionPaused", false),

TABLE_DISABLED("tableDisabled", false);
TABLE_DISABLED("tableDisabled", false),

TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false);

private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ private void removeMetricsForTable(String tableNameWithType) {
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.TABLE_DISABLED);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.TABLE_CONSUMPTION_PAUSED);
_controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS);
}

private void setStatusToDefault() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
Expand All @@ -48,6 +50,8 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
private boolean _isStopped = false;
private RebalanceResult.Status _stopStatus;

private final ControllerMetrics _controllerMetrics;

public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJobId,
TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager pinotHelixResourceManager) {
Preconditions.checkState(tableNameWithType != null, "Table name cannot be null");
Expand All @@ -59,12 +63,14 @@ public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJ
_tableRebalanceProgressStats = new TableRebalanceProgressStats();
_tableRebalanceContext = tableRebalanceContext;
_numUpdatesToZk = 0;
_controllerMetrics = ControllerMetrics.get();
}

@Override
public void onTrigger(Trigger trigger, Map<String, Map<String, String>> currentState,
Map<String, Map<String, String>> targetState) {
boolean updatedStatsInZk = false;
_controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1);
switch (trigger) {
case START_TRIGGER:
updateOnStart(currentState, targetState);
Expand Down Expand Up @@ -119,6 +125,7 @@ private void updateOnStart(Map<String, Map<String, String>> currentState,
public void onSuccess(String msg) {
Preconditions.checkState(RebalanceResult.Status.DONE != _tableRebalanceProgressStats.getStatus(),
"Table Rebalance already completed");
_controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0);
long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
_tableRebalanceProgressStats.setCompletionStatusMsg(msg);
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
Expand All @@ -132,6 +139,7 @@ public void onSuccess(String msg) {

@Override
public void onError(String errorMsg) {
_controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 0);
long timeToFinishInSeconds = (System.currentTimeMillis() - _tableRebalanceProgressStats.getStartTimeMs()) / 1000;
_tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR;
Expand All @@ -41,6 +43,7 @@ void testZkObserverTracking() {
PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
// Mocking this. We will verify using numZkUpdate stat
when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), any())).thenReturn(true);
ControllerMetrics controllerMetrics = Mockito.mock(ControllerMetrics.class);
TableRebalanceContext retryCtx = new TableRebalanceContext();
retryCtx.setConfig(new RebalanceConfig());
ZkBasedTableRebalanceObserver observer =
Expand Down

0 comments on commit 894e56e

Please sign in to comment.