From f9074f9a914b61621ddba497dbe259da40bdb07d Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Wed, 10 Jul 2024 11:56:26 -0700 Subject: [PATCH] Enhance SegmentStatusChecker to honor CONSUMING segment (#13562) --- .../helix/SegmentStatusChecker.java | 285 +++--- .../helix/SegmentStatusCheckerTest.java | 929 ++++++------------ 2 files changed, 472 insertions(+), 742 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index 6488ecb022d9..ceb33402e8a3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.controller.helix; -import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -51,6 +52,8 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TierConfig; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -63,22 +66,18 @@ */ public class SegmentStatusChecker extends ControllerPeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentStatusChecker.class); - private static final int MAX_OFFLINE_SEGMENTS_TO_LOG = 5; - public static final String ONLINE = "ONLINE"; - public static final String ERROR = "ERROR"; - public static final String CONSUMING = "CONSUMING"; - - // log messages about disabled tables atmost once a day - private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1); private static final ZNRecordSerializer RECORD_SERIALIZER = new ZNRecordSerializer(); - private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000; + // log messages about disabled tables at most once a day + private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1); + private static final int MAX_SEGMENTS_TO_LOG = 10; + private final int _waitForPushTimeSeconds; - private long _lastDisabledTableLogTimestamp = 0; + private final TableSizeReader _tableSizeReader; + private final Set _tierBackendGauges = new HashSet<>(); - private TableSizeReader _tableSizeReader; - private Set _tierBackendGauges = new HashSet<>(); + private long _lastDisabledTableLogTimestamp = 0; /** * Constructs the segment status checker. @@ -190,7 +189,7 @@ private void updateTableConfigMetrics(String tableNameWithType, TableConfig tabl } } tierBackendSet.forEach(tierBackend -> context._tierBackendTableCountMap.put(tierBackend, - context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 1)); + context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 1)); context._tierBackendConfiguredTableCount += tierBackendSet.isEmpty() ? 0 : 1; } int replication = tableConfig.getReplication(); @@ -226,145 +225,173 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon return; } - //check if table consumption is paused - boolean isTablePaused = - Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED)); - - if (isTablePaused) { + if (Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED))) { context._pausedTables.add(tableNameWithType); } - if (idealState.getPartitionSet().isEmpty()) { - int nReplicasFromIdealState = 1; + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE, + idealState.toString().length()); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE, + idealState.serialize(RECORD_SERIALIZER).length); + + Set segmentsIncludingReplaced = idealState.getPartitionSet(); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED, + segmentsIncludingReplaced.size()); + // Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot + // be queried from the table. + ZkHelixPropertyStore propertyStore = _pinotHelixResourceManager.getPropertyStore(); + Set segments; + if (segmentsIncludingReplaced.isEmpty()) { + segments = Set.of(); + } else { + segments = new HashSet<>(segmentsIncludingReplaced); + SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType); + SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segments, segmentLineage); + } + int numSegments = segments.size(); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT, numSegments); + if (numSegments == 0) { + int numReplicasFromIS; try { - nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas()); + numReplicasFromIS = Math.max(Integer.parseInt(idealState.getReplicas()), 1); } catch (NumberFormatException e) { - // Ignore + numReplicasFromIS = 1; } - _controllerMetrics - .setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, numReplicasFromIS); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, 0); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, 0); return; } - // Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot - // be queried from the table. - Set segmentsExcludeReplaced = new HashSet<>(idealState.getPartitionSet()); - ZkHelixPropertyStore propertyStore = _pinotHelixResourceManager.getPropertyStore(); - SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType); - SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced, segmentLineage); - _controllerMetrics - .setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length()); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE, - idealState.serialize(RECORD_SERIALIZER).length); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT, - (long) segmentsExcludeReplaced.size()); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED, - (long) (idealState.getPartitionSet().size())); ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType); - int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state - int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view - int nErrors = 0; // Keeps track of number of segments in error state - int nOffline = 0; // Keeps track of number segments with no online replicas - int nNumOfReplicasLessThanIdeal = 0; // Keeps track of number of segments running with less than expected replicas - int nSegments = 0; // Counts number of segments - long tableCompressedSize = 0; // Tracks the total compressed segment size in deep store per table - for (String partitionName : segmentsExcludeReplaced) { - int nReplicas = 0; - int nIdeal = 0; - nSegments++; - // Skip segments not online in ideal state - for (Map.Entry serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) { - if (serverAndState == null) { - break; - } - if (serverAndState.getValue().equals(ONLINE)) { - nIdeal++; - break; + // Maximum number of replicas in ideal state + int maxISReplicas = Integer.MIN_VALUE; + // Minimum number of replicas in external view + int minEVReplicas = Integer.MAX_VALUE; + // Total compressed segment size in deep store + long tableCompressedSize = 0; + // Segments without ZK metadata + List segmentsWithoutZKMetadata = new ArrayList<>(); + // Pairs of segment-instance in ERROR state + List> errorSegments = new ArrayList<>(); + // Offline segments + List offlineSegments = new ArrayList<>(); + // Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state + List partialOnlineSegments = new ArrayList<>(); + for (String segment : segments) { + int numISReplicas = 0; + for (Map.Entry entry : idealState.getInstanceStateMap(segment).entrySet()) { + String state = entry.getValue(); + if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { + numISReplicas++; } } - if (nIdeal == 0) { - // No online segments in ideal state + // Skip segments not ONLINE/CONSUMING in ideal state + if (numISReplicas == 0) { continue; } - SegmentZKMetadata segmentZKMetadata = - _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, partitionName); - if (segmentZKMetadata != null - && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) { - // Push is not finished yet, skip the segment + maxISReplicas = Math.max(maxISReplicas, numISReplicas); + + SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment); + // Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted. + if (segmentZKMetadata == null) { + segmentsWithoutZKMetadata.add(segment); continue; } - if (segmentZKMetadata != null) { - long sizeInBytes = segmentZKMetadata.getSizeInBytes(); - if (sizeInBytes > 0) { - tableCompressedSize += sizeInBytes; - } + long sizeInBytes = segmentZKMetadata.getSizeInBytes(); + if (sizeInBytes > 0) { + tableCompressedSize += sizeInBytes; } - nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState - .getInstanceStateMap(partitionName).size() : nReplicasIdealMax; - if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) { - // No replicas for this segment - nOffline++; - if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) { - LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType); - } - nReplicasExternal = 0; + + // NOTE: We want to skip segments that are just created/pushed to avoid false alerts because it is expected for + // servers to take some time to load them. For consuming (IN_PROGRESS) segments, we use creation time from + // the ZK metadata; for pushed segments, we use push time from the ZK metadata. Both of them are the time + // when segment is newly created. For committed segments from real-time table, push time doesn't exist, and + // creationTimeMs will be Long.MIN_VALUE, which is fine because we want to include them in the check. + long creationTimeMs = segmentZKMetadata.getStatus() == Status.IN_PROGRESS ? segmentZKMetadata.getCreationTime() + : segmentZKMetadata.getPushTime(); + if (creationTimeMs > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000L) { continue; } - for (Map.Entry serverAndState : externalView.getStateMap(partitionName).entrySet()) { - // Count number of online replicas. Ignore if state is CONSUMING. - // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time. - // So, ignore this combination. If a segment exists in this combination for a long time, we will get - // low level-partition-not-consuming alert anyway. - if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) { - nReplicas++; - } - if (serverAndState.getValue().equals(ERROR)) { - nErrors++; + + int numEVReplicas = 0; + if (externalView != null) { + Map stateMap = externalView.getStateMap(segment); + if (stateMap != null) { + for (Map.Entry entry : stateMap.entrySet()) { + String state = entry.getValue(); + if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) { + numEVReplicas++; + } + if (state.equals(SegmentStateModel.ERROR)) { + errorSegments.add(Pair.of(segment, entry.getKey())); + } + } } } - if (nReplicas == 0) { - if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) { - LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType); - } - nOffline++; - } else if (nReplicas < nReplicasIdealMax) { - LOGGER.debug("Segment {} of table {} is running with {} replicas which is less than the expected values {}", - partitionName, tableNameWithType, nReplicas, nReplicasIdealMax); - nNumOfReplicasLessThanIdeal++; + if (numEVReplicas == 0) { + offlineSegments.add(segment); + } else if (numEVReplicas < numISReplicas) { + partialOnlineSegments.add(segment); + } else { + // Do not allow nReplicasEV to be larger than nReplicasIS + numEVReplicas = numISReplicas; + } + minEVReplicas = Math.min(minEVReplicas, numEVReplicas); + } + + if (maxISReplicas == Integer.MIN_VALUE) { + try { + maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1); + } catch (NumberFormatException e) { + maxISReplicas = 1; } - nReplicasExternal = - ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal; } - if (nReplicasExternal == -1) { - nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0; + // Do not allow minEVReplicas to be larger than maxISReplicas + minEVReplicas = Math.min(minEVReplicas, maxISReplicas); + + if (minEVReplicas < maxISReplicas) { + LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}", + tableNameWithType, minEVReplicas, maxISReplicas); + } + int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size(); + if (numSegmentsWithoutZKMetadata > 0) { + LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata, + logSegments(segmentsWithoutZKMetadata)); + } + int numErrorSegments = errorSegments.size(); + if (numErrorSegments > 0) { + LOGGER.warn("Table {} has {} segments in ERROR state: {}", tableNameWithType, numErrorSegments, + logSegments(errorSegments)); + } + int numOfflineSegments = offlineSegments.size(); + if (numOfflineSegments > 0) { + LOGGER.warn("Table {} has {} segments without ONLINE/CONSUMING replica: {}", tableNameWithType, + numOfflineSegments, logSegments(offlineSegments)); + } + int numPartialOnlineSegments = partialOnlineSegments.size(); + if (numPartialOnlineSegments > 0) { + LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType, + numPartialOnlineSegments, logSegments(partialOnlineSegments)); } + // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, - (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors); - _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, - nNumOfReplicasLessThanIdeal); + minEVReplicas * 100L / maxISReplicas); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, + numErrorSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, - (nSegments > 0) ? (nSegments - nOffline) * 100 / nSegments : 100); + numOfflineSegments > 0 ? (numSegments - numOfflineSegments) * 100L / numSegments : 100); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, + numPartialOnlineSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, tableCompressedSize); - if (nOffline > 0) { - LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline); - } - if (nNumOfReplicasLessThanIdeal > 0) { - LOGGER.warn("Table {} has {} segments with number of replicas less than the replication factor", - tableNameWithType, nNumOfReplicasLessThanIdeal); - } - if (nReplicasExternal < nReplicasIdealMax) { - LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}", - tableNameWithType, nReplicasExternal, nReplicasIdealMax); - } - if (tableType == TableType.REALTIME && tableConfig != null) { StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); @@ -373,6 +400,13 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon } } + private static String logSegments(List segments) { + if (segments.size() <= MAX_SEGMENTS_TO_LOG) { + return segments.toString(); + } + return segments.subList(0, MAX_SEGMENTS_TO_LOG) + "..."; + } + @Override protected void nonLeaderCleanup(List tableNamesWithType) { tableNamesWithType.forEach(this::removeMetricsForTable); @@ -403,20 +437,15 @@ private void removeMetricsForTable(String tableNameWithType) { public void cleanUpTask() { } - @VisibleForTesting - void setTableSizeReader(TableSizeReader tableSizeReader) { - _tableSizeReader = tableSizeReader; - } - public static final class Context { private boolean _logDisabledTables; private int _realTimeTableCount; private int _offlineTableCount; private int _upsertTableCount; private int _tierBackendConfiguredTableCount; - private Map _tierBackendTableCountMap = new HashMap<>(); - private Set _processedTables = new HashSet<>(); - private Set _disabledTables = new HashSet<>(); - private Set _pausedTables = new HashSet<>(); + private final Map _tierBackendTableCountMap = new HashMap<>(); + private final Set _processedTables = new HashSet<>(); + private final Set _disabledTables = new HashSet<>(); + private final Set _pausedTables = new HashSet<>(); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 429391e8e32d..81a0f345c15d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -42,8 +41,8 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.metrics.PinotMetricUtils; -import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; @@ -58,25 +57,23 @@ import static org.testng.Assert.assertFalse; +@SuppressWarnings("unchecked") public class SegmentStatusCheckerTest { + private static final String RAW_TABLE_NAME = "myTable"; + private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); - private SegmentStatusChecker _segmentStatusChecker; - private PinotHelixResourceManager _helixResourceManager; - private ZkHelixPropertyStore _helixPropertyStore; - private LeadControllerManager _leadControllerManager; - private PinotMetricsRegistry _metricsRegistry; - private ControllerMetrics _controllerMetrics; - private ControllerConf _config; - private TableSizeReader _tableSizeReader; + // Intentionally not reset the metrics to test all metrics being refreshed. + private final ControllerMetrics _controllerMetrics = + new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); @Test - public void offlineBasicTest() - throws Exception { - String offlineTableName = "myTable_OFFLINE"; + public void offlineBasicTest() { + // Intentionally set the replication number to 2 to test the metrics. TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(2).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(2).build(); - IdealState idealState = new IdealState(offlineTableName); + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); @@ -90,10 +87,10 @@ public void offlineBasicTest() idealState.setPartitionState("myTable_4", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_4", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_4", "pinot3", "ONLINE"); - idealState.setReplicas("2"); + idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(offlineTableName); + ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME); externalView.setState("myTable_0", "pinot1", "ONLINE"); externalView.setState("myTable_0", "pinot2", "ONLINE"); externalView.setState("myTable_1", "pinot1", "ERROR"); @@ -104,170 +101,161 @@ public void offlineBasicTest() externalView.setState("myTable_3", "pinot3", "ONLINE"); externalView.setState("myTable_4", "pinot1", "ONLINE"); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); - when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig); - when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - } - { - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, {myTable_3 -> myTable_4, IN_PROGRESS}, - // myTable_1 and myTable_4 will be skipped for the metrics. - SegmentLineage segmentLineage = new SegmentLineage(offlineTableName); - segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), - new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), LineageEntryState.COMPLETED, 11111L)); - segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), - new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), LineageEntryState.IN_PROGRESS, 11111L)); - when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), any(), - eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord()); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, offlineTableName, - ControllerGauge.REPLICATION_FROM_CONFIG), 2); + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L); + when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, {myTable_3 -> myTable_4, IN_PROGRESS}, + // myTable_1 and myTable_4 will be skipped for the metrics. + SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME); + segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), + new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), LineageEntryState.COMPLETED, 11111L)); + segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), + new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), LineageEntryState.IN_PROGRESS, 11111L)); + when( + propertyStore.get(eq("/SEGMENT_LINEAGE/" + OFFLINE_TABLE_NAME), any(), eq(AccessOption.PERSISTENT))).thenReturn( + segmentLineage.toZNRecord()); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 2, 5, 3, 2, 66, 1, 100, 2, 2468); + } + + private SegmentZKMetadata mockPushedSegmentZKMetadata(long sizeInBytes, long pushTimeMs) { + SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); + when(segmentZKMetadata.getStatus()).thenReturn(Status.UPLOADED); + when(segmentZKMetadata.getSizeInBytes()).thenReturn(sizeInBytes); + when(segmentZKMetadata.getPushTime()).thenReturn(pushTimeMs); + return segmentZKMetadata; + } + + private void runSegmentStatusChecker(PinotHelixResourceManager resourceManager, int waitForPushTimeInSeconds) { + LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); + when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); + ControllerConf controllerConf = mock(ControllerConf.class); + when(controllerConf.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(waitForPushTimeInSeconds); + TableSizeReader tableSizeReader = mock(TableSizeReader.class); + SegmentStatusChecker segmentStatusChecker = + new SegmentStatusChecker(resourceManager, leadControllerManager, controllerConf, _controllerMetrics, + tableSizeReader); + segmentStatusChecker.start(); + segmentStatusChecker.run(); + } + + private void verifyControllerMetrics(String tableNameWithType, int expectedReplicationFromConfig, + int expectedNumSegmentsIncludingReplaced, int expectedNumSegment, int expectedNumReplicas, + int expectedPercentOfReplicas, int expectedSegmentsInErrorState, int expectedPercentSegmentsAvailable, + int expectedSegmentsWithLessReplicas, int expectedTableCompressedSize) { + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.REPLICATION_FROM_CONFIG), expectedReplicationFromConfig); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), expectedNumSegmentsIncludingReplaced); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, ControllerGauge.SEGMENT_COUNT), + expectedNumSegment); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS), + expectedNumReplicas); assertEquals( - MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENT_COUNT), - 3); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 2); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 66); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS), + expectedPercentOfReplicas); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), expectedSegmentsInErrorState); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), expectedPercentSegmentsAvailable); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), expectedSegmentsWithLessReplicas); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_COMPRESSED_SIZE), expectedTableCompressedSize); } @Test - public void realtimeBasicTest() - throws Exception { - String rawTableName = "myTable"; - String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + public void realtimeBasicTest() { TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn") + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName("timeColumn") .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build(); - LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); - LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); - LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, System.currentTimeMillis()); - IdealState idealState = new IdealState(realtimeTableName); - idealState.setPartitionState(seg1.getSegmentName(), "pinot1", "ONLINE"); - idealState.setPartitionState(seg1.getSegmentName(), "pinot2", "ONLINE"); - idealState.setPartitionState(seg1.getSegmentName(), "pinot3", "ONLINE"); - idealState.setPartitionState(seg2.getSegmentName(), "pinot1", "ONLINE"); - idealState.setPartitionState(seg2.getSegmentName(), "pinot2", "ONLINE"); - idealState.setPartitionState(seg2.getSegmentName(), "pinot3", "ONLINE"); - idealState.setPartitionState(seg3.getSegmentName(), "pinot1", "CONSUMING"); - idealState.setPartitionState(seg3.getSegmentName(), "pinot2", "CONSUMING"); - idealState.setPartitionState(seg3.getSegmentName(), "pinot3", "OFFLINE"); + String seg1 = new LLCSegmentName(RAW_TABLE_NAME, 1, 0, System.currentTimeMillis()).getSegmentName(); + String seg2 = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, System.currentTimeMillis()).getSegmentName(); + String seg3 = new LLCSegmentName(RAW_TABLE_NAME, 2, 1, System.currentTimeMillis()).getSegmentName(); + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState(seg1, "pinot1", "ONLINE"); + idealState.setPartitionState(seg1, "pinot2", "ONLINE"); + idealState.setPartitionState(seg1, "pinot3", "ONLINE"); + idealState.setPartitionState(seg2, "pinot1", "ONLINE"); + idealState.setPartitionState(seg2, "pinot2", "ONLINE"); + idealState.setPartitionState(seg2, "pinot3", "ONLINE"); + idealState.setPartitionState(seg3, "pinot1", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot2", "CONSUMING"); + idealState.setPartitionState(seg3, "pinot3", "OFFLINE"); idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(realtimeTableName); - externalView.setState(seg1.getSegmentName(), "pinot1", "ONLINE"); - externalView.setState(seg1.getSegmentName(), "pinot2", "ONLINE"); - externalView.setState(seg1.getSegmentName(), "pinot3", "ONLINE"); - externalView.setState(seg2.getSegmentName(), "pinot1", "CONSUMING"); - externalView.setState(seg2.getSegmentName(), "pinot2", "ONLINE"); - externalView.setState(seg2.getSegmentName(), "pinot3", "CONSUMING"); - externalView.setState(seg3.getSegmentName(), "pinot1", "CONSUMING"); - externalView.setState(seg3.getSegmentName(), "pinot2", "CONSUMING"); - externalView.setState(seg3.getSegmentName(), "pinot3", "OFFLINE"); - - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); - when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView); - ZNRecord znRecord = new ZNRecord("0"); - znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); - when(_helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.REPLICATION_FROM_CONFIG), 3); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 3); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ExternalView externalView = new ExternalView(REALTIME_TABLE_NAME); + externalView.setState(seg1, "pinot1", "ONLINE"); + externalView.setState(seg1, "pinot2", "ONLINE"); + externalView.setState(seg1, "pinot3", "ONLINE"); + externalView.setState(seg2, "pinot1", "CONSUMING"); + externalView.setState(seg2, "pinot2", "ONLINE"); + externalView.setState(seg2, "pinot3", "CONSUMING"); + externalView.setState(seg3, "pinot1", "CONSUMING"); + externalView.setState(seg3, "pinot2", "CONSUMING"); + externalView.setState(seg3, "pinot3", "OFFLINE"); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getTableConfig(REALTIME_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata committedSegmentZKMetadata = mockCommittedSegmentZKMetadata(); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg1)).thenReturn(committedSegmentZKMetadata); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg2)).thenReturn(committedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, seg3)).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + ZNRecord znRecord = new ZNRecord("0"); + znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 3, 3, 3, 2, 66, 0, 100, 0, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, REALTIME_TABLE_NAME, ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } - Map getStreamConfigMap() { + private Map getStreamConfigMap() { return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name", "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); } - @Test - public void missingEVPartitionTest() - throws Exception { - String offlineTableName = "myTable_OFFLINE"; + private SegmentZKMetadata mockCommittedSegmentZKMetadata() { + SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); + when(segmentZKMetadata.getStatus()).thenReturn(Status.DONE); + when(segmentZKMetadata.getSizeInBytes()).thenReturn(-1L); + when(segmentZKMetadata.getPushTime()).thenReturn(Long.MIN_VALUE); + return segmentZKMetadata; + } + + private SegmentZKMetadata mockConsumingSegmentZKMetadata(long creationTimeMs) { + SegmentZKMetadata segmentZKMetadata = mock(SegmentZKMetadata.class); + when(segmentZKMetadata.getStatus()).thenReturn(Status.IN_PROGRESS); + when(segmentZKMetadata.getSizeInBytes()).thenReturn(-1L); + when(segmentZKMetadata.getCreationTime()).thenReturn(creationTimeMs); + return segmentZKMetadata; + } - IdealState idealState = new IdealState(offlineTableName); + @Test + public void missingEVPartitionTest() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); @@ -276,191 +264,89 @@ public void missingEVPartitionTest() idealState.setPartitionState("myTable_1", "pinot3", "ONLINE"); idealState.setPartitionState("myTable_2", "pinot3", "OFFLINE"); idealState.setPartitionState("myTable_3", "pinot3", "ONLINE"); - idealState.setReplicas("2"); + idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(offlineTableName); + ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME); externalView.setState("myTable_0", "pinot1", "ONLINE"); externalView.setState("myTable_0", "pinot2", "ONLINE"); externalView.setState("myTable_1", "pinot1", "ERROR"); externalView.setState("myTable_1", "pinot2", "ONLINE"); - ZNRecord znrecord = new ZNRecord("myTable_0"); - znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1"); - znrecord.setLongField(CommonConstants.Segment.START_TIME, 1000); - znrecord.setLongField(CommonConstants.Segment.END_TIME, 2000); - znrecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, TimeUnit.HOURS.toString()); - znrecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000); - znrecord.setLongField(CommonConstants.Segment.CRC, 1234); - znrecord.setLongField(CommonConstants.Segment.CREATION_TIME, 3000); - znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0"); - znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); - znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); - znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111); - - ZkHelixPropertyStore propertyStore; - { - propertyStore = (ZkHelixPropertyStore) mock(ZkHelixPropertyStore.class); - when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, AccessOption.PERSISTENT)).thenReturn( - znrecord); - } + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L); + when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); - when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_3")).thenReturn( - new SegmentZKMetadata(znrecord)); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 4, 4, 0, 0, 1, 75, 2, 3702); } @Test - public void missingEVTest() - throws Exception { - String realtimeTableName = "myTable_REALTIME"; - - IdealState idealState = new IdealState(realtimeTableName); + public void missingEVTest() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot3", "ONLINE"); - idealState.setPartitionState("myTable_2", "pinot3", "OFFLINE"); - idealState.setReplicas("2"); + idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); - when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - assertEquals( - MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), - 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L); + when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 2, 2, 0, 0, 0, 0, 0, 2468); } @Test - public void missingIdealTest() - throws Exception { - String realtimeTableName = "myTable_REALTIME"; - - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); - when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(null); - when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS)); + public void missingIdealTest() { + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetricsNotExist(); + } + + private void verifyControllerMetricsNotExist() { + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.REPLICATION_FROM_CONFIG), 0); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED)); + assertFalse( + MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.SEGMENT_COUNT)); assertFalse( - MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS)); + MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.NUMBER_OF_REPLICAS)); assertFalse( - MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS)); - assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, + MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.PERCENT_OF_REPLICAS)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENTS_IN_ERROR_STATE)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, OFFLINE_TABLE_NAME, ControllerGauge.TABLE_COMPRESSED_SIZE)); } @Test - public void missingEVPartitionPushTest() - throws Exception { - String offlineTableName = "myTable_OFFLINE"; - - IdealState idealState = new IdealState(offlineTableName); + public void missingEVPartitionPushTest() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); + idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_2", "pinot1", "ONLINE"); @@ -468,246 +354,144 @@ public void missingEVPartitionPushTest() idealState.setReplicas("2"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(offlineTableName); + ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME); + externalView.setState("myTable_0", "pinot1", "ONLINE"); + externalView.setState("myTable_0", "pinot2", "ONLINE"); externalView.setState("myTable_1", "pinot1", "ONLINE"); externalView.setState("myTable_1", "pinot2", "ONLINE"); // myTable_2 is push in-progress and only one replica has been downloaded by servers. It will be skipped for // the segment status check. externalView.setState("myTable_2", "pinot1", "ONLINE"); - ZNRecord znrecord = new ZNRecord("myTable_0"); - znrecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1"); - znrecord.setLongField(CommonConstants.Segment.START_TIME, 1000); - znrecord.setLongField(CommonConstants.Segment.END_TIME, 2000); - znrecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, TimeUnit.HOURS.toString()); - znrecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000); - znrecord.setLongField(CommonConstants.Segment.CRC, 1234); - znrecord.setLongField(CommonConstants.Segment.CREATION_TIME, 3000); - znrecord.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_0"); - znrecord.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); - znrecord.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); - znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111); - - ZNRecord znrecord2 = new ZNRecord("myTable_2"); - znrecord2.setSimpleField(CommonConstants.Segment.INDEX_VERSION, "v1"); - znrecord2.setLongField(CommonConstants.Segment.START_TIME, 1000); - znrecord2.setLongField(CommonConstants.Segment.END_TIME, 2000); - znrecord2.setSimpleField(CommonConstants.Segment.TIME_UNIT, TimeUnit.HOURS.toString()); - znrecord2.setLongField(CommonConstants.Segment.TOTAL_DOCS, 10000); - znrecord2.setLongField(CommonConstants.Segment.CRC, 1235); - znrecord2.setLongField(CommonConstants.Segment.CREATION_TIME, 3000); - znrecord2.setSimpleField(CommonConstants.Segment.DOWNLOAD_URL, "http://localhost:8000/myTable_2"); - znrecord2.setLongField(CommonConstants.Segment.PUSH_TIME, System.currentTimeMillis()); - znrecord2.setLongField(CommonConstants.Segment.REFRESH_TIME, System.currentTimeMillis()); - znrecord.setLongField(CommonConstants.Segment.SIZE_IN_BYTES, 1111); - - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); - when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_0")).thenReturn( - new SegmentZKMetadata(znrecord)); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_2")).thenReturn( - new SegmentZKMetadata(znrecord2)); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 2); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata segmentZKMetadata01 = mockPushedSegmentZKMetadata(1234, 11111L); + when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, "myTable_0")).thenReturn(segmentZKMetadata01); + when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, "myTable_1")).thenReturn(segmentZKMetadata01); + SegmentZKMetadata segmentZKMetadata2 = mockPushedSegmentZKMetadata(1234, System.currentTimeMillis()); + when(resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, "myTable_2")).thenReturn(segmentZKMetadata2); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 600); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 3, 3, 2, 100, 0, 100, 0, 3702); } @Test - public void noReplicas() - throws Exception { - String realtimeTableName = "myTable_REALTIME"; + public void missingEVUploadedConsumingTest() { + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); + idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); + idealState.setPartitionState("myTable_1", "pinot2", "CONSUMING"); + idealState.setReplicas("1"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + SegmentZKMetadata updatedSegmentZKMetadata = mockPushedSegmentZKMetadata(1234, System.currentTimeMillis()); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, "myTable_0")).thenReturn(updatedSegmentZKMetadata); + SegmentZKMetadata consumingSegmentZKMetadata = mockConsumingSegmentZKMetadata(System.currentTimeMillis()); + when(resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, "myTable_1")).thenReturn(consumingSegmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); - IdealState idealState = new IdealState(realtimeTableName); + runSegmentStatusChecker(resourceManager, 600); + verifyControllerMetrics(REALTIME_TABLE_NAME, 0, 2, 2, 1, 100, 0, 100, 0, 1234); + } + + @Test + public void noReplicaTest() { + IdealState idealState = new IdealState(REALTIME_TABLE_NAME); idealState.setPartitionState("myTable_0", "pinot1", "OFFLINE"); idealState.setPartitionState("myTable_0", "pinot2", "OFFLINE"); idealState.setPartitionState("myTable_0", "pinot3", "OFFLINE"); idealState.setReplicas("0"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); - when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - assertEquals( - MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), - 1); - assertEquals( - MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS), - 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(REALTIME_TABLE_NAME)); + when(resourceManager.getTableIdealState(REALTIME_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(REALTIME_TABLE_NAME)).thenReturn(null); + SegmentZKMetadata segmentZKMetadata = mockConsumingSegmentZKMetadata(11111L); + when(resourceManager.getSegmentZKMetadata(eq(REALTIME_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(REALTIME_TABLE_NAME, 0, 1, 1, 1, 100, 0, 100, 0, 0); } @Test - public void disabledTableTest() { - String offlineTableName = "myTable_OFFLINE"; - - IdealState idealState = new IdealState(offlineTableName); - // disable table in idealstate - idealState.enable(false); - idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE"); - idealState.setPartitionState("myTable_OFFLINE", "pinot2", "OFFLINE"); - idealState.setPartitionState("myTable_OFFLINE", "pinot3", "OFFLINE"); + public void noSegmentZKMetadataTest() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); + idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setReplicas("1"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); - when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - null); - - // verify state before test - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 0); - - // update metrics - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 1, 1, 1, 100, 0, 100, 0, 0); } @Test - public void disabledEmptyTableTest() { - String offlineTableName = "myTable_OFFLINE"; - - IdealState idealState = new IdealState(offlineTableName); + public void disabledTableTest() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); // disable table in idealstate idealState.enable(false); - idealState.setReplicas("1"); + idealState.setPartitionState("myTable_OFFLINE", "pinot1", "ONLINE"); + idealState.setPartitionState("myTable_OFFLINE", "pinot2", "ONLINE"); + idealState.setPartitionState("myTable_OFFLINE", "pinot3", "ONLINE"); + idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); - when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - null); - - // verify state before test - assertFalse(MetricValueUtils.globalGaugeExists(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT)); - - // update metrics - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + + runSegmentStatusChecker(resourceManager, 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); + verifyControllerMetricsNotExist(); } @Test - public void noSegments() - throws Exception { - noSegmentsInternal(0); - noSegmentsInternal(5); - noSegmentsInternal(-1); + public void noSegmentTest() { + noSegmentTest(0); + noSegmentTest(5); + noSegmentTest(-1); + } + + public void noSegmentTest(int numReplicas) { + String numReplicasStr = numReplicas >= 0 ? Integer.toString(numReplicas) : "abc"; + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); + idealState.setReplicas(numReplicasStr); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + + runSegmentStatusChecker(resourceManager, 0); + int expectedNumReplicas = Math.max(numReplicas, 1); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 0, 0, 0, expectedNumReplicas, 100, 0, 100, 0, 0); } @Test - public void lessThanOnePercentSegmentsUnavailableTest() - throws Exception { - String offlineTableName = "myTable_OFFLINE"; + public void lessThanOnePercentSegmentsUnavailableTest() { TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(1).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build(); - IdealState idealState = new IdealState(offlineTableName); + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); int numSegments = 200; for (int i = 0; i < numSegments; i++) { idealState.setPartitionState("myTable_" + i, "pinot1", "ONLINE"); @@ -715,107 +499,24 @@ public void lessThanOnePercentSegmentsUnavailableTest() idealState.setReplicas("1"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(offlineTableName); + ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME); externalView.setState("myTable_0", "pinot1", "OFFLINE"); for (int i = 1; i < numSegments; i++) { externalView.setState("myTable_" + i, "pinot1", "ONLINE"); } - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); - when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig); - when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - } - { - _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - SegmentLineage segmentLineage = new SegmentLineage(offlineTableName); - when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), any(), - eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord()); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99); - } - - public void noSegmentsInternal(final int nReplicas) - throws Exception { - String realtimeTableName = "myTable_REALTIME"; + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getTableExternalView(OFFLINE_TABLE_NAME)).thenReturn(externalView); + SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L); + when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); - String nReplicasStr = Integer.toString(nReplicas); - int nReplicasExpectedValue = nReplicas; - if (nReplicas < 0) { - nReplicasStr = "abc"; - nReplicasExpectedValue = 1; - } - IdealState idealState = new IdealState(realtimeTableName); - idealState.setReplicas(nReplicasStr); - idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); - { - _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); - when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); - } - { - _config = mock(ControllerConf.class); - when(_config.getStatusCheckerFrequencyInSeconds()).thenReturn(300); - when(_config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300); - } - { - _leadControllerManager = mock(LeadControllerManager.class); - when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - } - { - _tableSizeReader = mock(TableSizeReader.class); - when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); - } - PinotMetricUtils.cleanUp(); - _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); - _controllerMetrics = new ControllerMetrics(_metricsRegistry); - _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _tableSizeReader); - _segmentStatusChecker.setTableSizeReader(_tableSizeReader); - _segmentStatusChecker.start(); - _segmentStatusChecker.run(); - - assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - assertEquals( - MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), - nReplicasExpectedValue); - assertEquals( - MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS), - 100); - assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + runSegmentStatusChecker(resourceManager, 0); + verifyControllerMetrics(OFFLINE_TABLE_NAME, 1, numSegments, numSegments, 0, 0, 0, 99, 0, 246800); } }