From edb13011230228393350cddca54e4ac4c4b7e08b Mon Sep 17 00:00:00 2001 From: Johan Adami <4760722+jadami10@users.noreply.github.com> Date: Fri, 6 Sep 2024 18:43:50 -0400 Subject: [PATCH] add metric for time retention failing due to end time (#13879) --- .../pinot/common/metrics/ControllerGauge.java | 5 +++ .../helix/SegmentStatusChecker.java | 26 +++++++++++++ .../helix/SegmentStatusCheckerTest.java | 37 +++++++++++++++++++ 3 files changed, 68 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 82c4e666e16a..d052a75485aa 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -35,6 +35,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { PERCENT_OF_REPLICAS("percent", false), SEGMENTS_IN_ERROR_STATE("segments", false), + // Segment start and end time is stored in milliseconds. + // Invalid start/end time means the broker time pruner will not work correctly. + // Invalid end times means time retention will not happen for that segment. + SEGMENTS_WITH_INVALID_START_TIME("segments", false), + SEGMENTS_WITH_INVALID_END_TIME("segments", false), // Percentage of segments with at least one online replica in external view as compared to total number of segments in // ideal state diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index a9a248475246..c9a48022c0be 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -55,6 +55,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -282,6 +283,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon List offlineSegments = new ArrayList<>(); // Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state List partialOnlineSegments = new ArrayList<>(); + List segmentsInvalidStartTime = new ArrayList<>(); + List segmentsInvalidEndTime = new ArrayList<>(); for (String segment : segments) { int numISReplicas = 0; for (Map.Entry entry : idealState.getInstanceStateMap(segment).entrySet()) { @@ -318,6 +321,15 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon continue; } + if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) { + if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) { + segmentsInvalidStartTime.add(segment); + } + if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) { + segmentsInvalidEndTime.add(segment); + } + } + int numEVReplicas = 0; if (externalView != null) { Map stateMap = externalView.getStateMap(segment); @@ -378,6 +390,16 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType, numPartialOnlineSegments, logSegments(partialOnlineSegments)); } + int numInvalidStartTime = segmentsInvalidStartTime.size(); + if (numInvalidStartTime > 0) { + LOGGER.warn("Table {} has {} segments with invalid start time: {}", tableNameWithType, numInvalidStartTime, + logSegments(segmentsInvalidStartTime)); + } + int numInvalidEndTime = segmentsInvalidEndTime.size(); + if (numInvalidEndTime > 0) { + LOGGER.warn("Table {} has {} segments with invalid end time: {}", tableNameWithType, numInvalidEndTime, + logSegments(segmentsInvalidEndTime)); + } // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas); @@ -391,6 +413,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon numPartialOnlineSegments); _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, tableCompressedSize); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME, + numInvalidStartTime); + _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME, + numInvalidEndTime); if (tableType == TableType.REALTIME && tableConfig != null) { StreamConfig streamConfig = diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 1edd2176e624..5f2ae7ea32f4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -51,6 +51,7 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.annotations.Test; @@ -700,4 +701,40 @@ public void testJsonSerializationSegmentStatusInfo() String jsonString = JsonUtils.objectToPrettyString(segmentStatusInfoList); assertEquals(jsonString, json); } + + @Test + public void testInvalidSegmentStartEndTime() { + IdealState idealState = new IdealState(OFFLINE_TABLE_NAME); + idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); + idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); + idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); + idealState.setReplicas("3"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + + ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME); + externalView.setState("myTable_0", "pinot1", "ONLINE"); + externalView.setState("myTable_0", "pinot2", "ONLINE"); + externalView.setState("myTable_0", "pinot3", "ONLINE"); + + ZNRecord znRecord = new ZNRecord("myTable_0"); + znRecord.setLongField(CommonConstants.Segment.START_TIME, TimeUtils.VALID_MIN_TIME_MILLIS - 1); + znRecord.setLongField(CommonConstants.Segment.END_TIME, TimeUtils.VALID_MAX_TIME_MILLIS + 1); + SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L); + when(segmentZKMetadata.getStartTimeMs()).thenReturn(TimeUtils.VALID_MIN_TIME_MILLIS - 1); + when(segmentZKMetadata.getEndTimeMs()).thenReturn(TimeUtils.VALID_MAX_TIME_MILLIS + 1); + + PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class); + when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME)); + when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState); + when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata); + + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(resourceManager.getPropertyStore()).thenReturn(propertyStore); + + runSegmentStatusChecker(resourceManager, 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME), 1); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME), 1); + } }