Skip to content

Commit

Permalink
add metric for time retention failing due to end time (apache#13879)
Browse files Browse the repository at this point in the history
  • Loading branch information
jadami10 authored Sep 6, 2024
1 parent fc6e690 commit edb1301
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
PERCENT_OF_REPLICAS("percent", false),

SEGMENTS_IN_ERROR_STATE("segments", false),
// Segment start and end time is stored in milliseconds.
// Invalid start/end time means the broker time pruner will not work correctly.
// Invalid end times means time retention will not happen for that segment.
SEGMENTS_WITH_INVALID_START_TIME("segments", false),
SEGMENTS_WITH_INVALID_END_TIME("segments", false),

// Percentage of segments with at least one online replica in external view as compared to total number of segments in
// ideal state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -282,6 +283,8 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
List<String> offlineSegments = new ArrayList<>();
// Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state
List<String> partialOnlineSegments = new ArrayList<>();
List<String> segmentsInvalidStartTime = new ArrayList<>();
List<String> segmentsInvalidEndTime = new ArrayList<>();
for (String segment : segments) {
int numISReplicas = 0;
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
Expand Down Expand Up @@ -318,6 +321,15 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
continue;
}

if (segmentZKMetadata.getStatus() != Status.IN_PROGRESS) {
if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getStartTimeMs())) {
segmentsInvalidStartTime.add(segment);
}
if (!TimeUtils.timeValueInValidRange(segmentZKMetadata.getEndTimeMs())) {
segmentsInvalidEndTime.add(segment);
}
}

int numEVReplicas = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
Expand Down Expand Up @@ -378,6 +390,16 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType,
numPartialOnlineSegments, logSegments(partialOnlineSegments));
}
int numInvalidStartTime = segmentsInvalidStartTime.size();
if (numInvalidStartTime > 0) {
LOGGER.warn("Table {} has {} segments with invalid start time: {}", tableNameWithType, numInvalidStartTime,
logSegments(segmentsInvalidStartTime));
}
int numInvalidEndTime = segmentsInvalidEndTime.size();
if (numInvalidEndTime > 0) {
LOGGER.warn("Table {} has {} segments with invalid end time: {}", tableNameWithType, numInvalidEndTime,
logSegments(segmentsInvalidEndTime));
}

// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
Expand All @@ -391,6 +413,10 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
numPartialOnlineSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
tableCompressedSize);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME,
numInvalidStartTime);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME,
numInvalidEndTime);

if (tableType == TableType.REALTIME && tableConfig != null) {
StreamConfig streamConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -700,4 +701,40 @@ public void testJsonSerializationSegmentStatusInfo()
String jsonString = JsonUtils.objectToPrettyString(segmentStatusInfoList);
assertEquals(jsonString, json);
}

@Test
public void testInvalidSegmentStartEndTime() {
IdealState idealState = new IdealState(OFFLINE_TABLE_NAME);
idealState.setPartitionState("myTable_0", "pinot1", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot2", "ONLINE");
idealState.setPartitionState("myTable_0", "pinot3", "ONLINE");
idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);

ExternalView externalView = new ExternalView(OFFLINE_TABLE_NAME);
externalView.setState("myTable_0", "pinot1", "ONLINE");
externalView.setState("myTable_0", "pinot2", "ONLINE");
externalView.setState("myTable_0", "pinot3", "ONLINE");

ZNRecord znRecord = new ZNRecord("myTable_0");
znRecord.setLongField(CommonConstants.Segment.START_TIME, TimeUtils.VALID_MIN_TIME_MILLIS - 1);
znRecord.setLongField(CommonConstants.Segment.END_TIME, TimeUtils.VALID_MAX_TIME_MILLIS + 1);
SegmentZKMetadata segmentZKMetadata = mockPushedSegmentZKMetadata(1234, 11111L);
when(segmentZKMetadata.getStartTimeMs()).thenReturn(TimeUtils.VALID_MIN_TIME_MILLIS - 1);
when(segmentZKMetadata.getEndTimeMs()).thenReturn(TimeUtils.VALID_MAX_TIME_MILLIS + 1);

PinotHelixResourceManager resourceManager = mock(PinotHelixResourceManager.class);
when(resourceManager.getAllTables()).thenReturn(List.of(OFFLINE_TABLE_NAME));
when(resourceManager.getTableIdealState(OFFLINE_TABLE_NAME)).thenReturn(idealState);
when(resourceManager.getSegmentZKMetadata(eq(OFFLINE_TABLE_NAME), anyString())).thenReturn(segmentZKMetadata);

ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class);
when(resourceManager.getPropertyStore()).thenReturn(propertyStore);

runSegmentStatusChecker(resourceManager, 0);
assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME,
ControllerGauge.SEGMENTS_WITH_INVALID_START_TIME), 1);
assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, OFFLINE_TABLE_NAME,
ControllerGauge.SEGMENTS_WITH_INVALID_END_TIME), 1);
}
}

0 comments on commit edb1301

Please sign in to comment.