Skip to content

Commit

Permalink
Enhance SegmentStatusChecker to honor CONSUMING segment (apache#13562)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Jul 10, 2024
1 parent e80d95f commit f9074f9
Show file tree
Hide file tree
Showing 2 changed files with 472 additions and 742 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
*/
package org.apache.pinot.controller.helix;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
Expand All @@ -51,6 +52,8 @@
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
Expand All @@ -63,22 +66,18 @@
*/
public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusChecker.Context> {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentStatusChecker.class);
private static final int MAX_OFFLINE_SEGMENTS_TO_LOG = 5;
public static final String ONLINE = "ONLINE";
public static final String ERROR = "ERROR";
public static final String CONSUMING = "CONSUMING";

// log messages about disabled tables atmost once a day
private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
private static final ZNRecordSerializer RECORD_SERIALIZER = new ZNRecordSerializer();

private static final int TABLE_CHECKER_TIMEOUT_MS = 30_000;

// log messages about disabled tables at most once a day
private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
private static final int MAX_SEGMENTS_TO_LOG = 10;

private final int _waitForPushTimeSeconds;
private long _lastDisabledTableLogTimestamp = 0;
private final TableSizeReader _tableSizeReader;
private final Set<String> _tierBackendGauges = new HashSet<>();

private TableSizeReader _tableSizeReader;
private Set<String> _tierBackendGauges = new HashSet<>();
private long _lastDisabledTableLogTimestamp = 0;

/**
* Constructs the segment status checker.
Expand Down Expand Up @@ -190,7 +189,7 @@ private void updateTableConfigMetrics(String tableNameWithType, TableConfig tabl
}
}
tierBackendSet.forEach(tierBackend -> context._tierBackendTableCountMap.put(tierBackend,
context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 1));
context._tierBackendTableCountMap.getOrDefault(tierBackend, 0) + 1));
context._tierBackendConfiguredTableCount += tierBackendSet.isEmpty() ? 0 : 1;
}
int replication = tableConfig.getReplication();
Expand Down Expand Up @@ -226,145 +225,173 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
return;
}

//check if table consumption is paused
boolean isTablePaused =
Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED));

if (isTablePaused) {
if (Boolean.parseBoolean(idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.IS_TABLE_PAUSED))) {
context._pausedTables.add(tableNameWithType);
}

if (idealState.getPartitionSet().isEmpty()) {
int nReplicasFromIdealState = 1;
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
idealState.toString().length());
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
idealState.serialize(RECORD_SERIALIZER).length);

Set<String> segmentsIncludingReplaced = idealState.getPartitionSet();
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
segmentsIncludingReplaced.size());
// Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot
// be queried from the table.
ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();
Set<String> segments;
if (segmentsIncludingReplaced.isEmpty()) {
segments = Set.of();
} else {
segments = new HashSet<>(segmentsIncludingReplaced);
SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segments, segmentLineage);
}
int numSegments = segments.size();
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT, numSegments);
if (numSegments == 0) {
int numReplicasFromIS;
try {
nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
numReplicasFromIS = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
// Ignore
numReplicasFromIS = 1;
}
_controllerMetrics
.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, numReplicasFromIS);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, 0);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS, 0);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE, 0);
return;
}

// Get the segments excluding the replaced segments which are specified in the segment lineage entries and cannot
// be queried from the table.
Set<String> segmentsExcludeReplaced = new HashSet<>(idealState.getPartitionSet());
ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();
SegmentLineage segmentLineage = SegmentLineageAccessHelper.getSegmentLineage(propertyStore, tableNameWithType);
SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(segmentsExcludeReplaced, segmentLineage);
_controllerMetrics
.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE, idealState.toString().length());
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE,
idealState.serialize(RECORD_SERIALIZER).length);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
(long) segmentsExcludeReplaced.size());
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED,
(long) (idealState.getPartitionSet().size()));
ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);

int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
int nErrors = 0; // Keeps track of number of segments in error state
int nOffline = 0; // Keeps track of number segments with no online replicas
int nNumOfReplicasLessThanIdeal = 0; // Keeps track of number of segments running with less than expected replicas
int nSegments = 0; // Counts number of segments
long tableCompressedSize = 0; // Tracks the total compressed segment size in deep store per table
for (String partitionName : segmentsExcludeReplaced) {
int nReplicas = 0;
int nIdeal = 0;
nSegments++;
// Skip segments not online in ideal state
for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
if (serverAndState == null) {
break;
}
if (serverAndState.getValue().equals(ONLINE)) {
nIdeal++;
break;
// Maximum number of replicas in ideal state
int maxISReplicas = Integer.MIN_VALUE;
// Minimum number of replicas in external view
int minEVReplicas = Integer.MAX_VALUE;
// Total compressed segment size in deep store
long tableCompressedSize = 0;
// Segments without ZK metadata
List<String> segmentsWithoutZKMetadata = new ArrayList<>();
// Pairs of segment-instance in ERROR state
List<Pair<String, String>> errorSegments = new ArrayList<>();
// Offline segments
List<String> offlineSegments = new ArrayList<>();
// Segments with fewer replicas online (ONLINE/CONSUMING) in external view than in ideal state
List<String> partialOnlineSegments = new ArrayList<>();
for (String segment : segments) {
int numISReplicas = 0;
for (Map.Entry<String, String> entry : idealState.getInstanceStateMap(segment).entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numISReplicas++;
}
}
if (nIdeal == 0) {
// No online segments in ideal state
// Skip segments not ONLINE/CONSUMING in ideal state
if (numISReplicas == 0) {
continue;
}
SegmentZKMetadata segmentZKMetadata =
_pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, partitionName);
if (segmentZKMetadata != null
&& segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
// Push is not finished yet, skip the segment
maxISReplicas = Math.max(maxISReplicas, numISReplicas);

SegmentZKMetadata segmentZKMetadata = _pinotHelixResourceManager.getSegmentZKMetadata(tableNameWithType, segment);
// Skip the segment when it doesn't have ZK metadata. Most likely the segment is just deleted.
if (segmentZKMetadata == null) {
segmentsWithoutZKMetadata.add(segment);
continue;
}
if (segmentZKMetadata != null) {
long sizeInBytes = segmentZKMetadata.getSizeInBytes();
if (sizeInBytes > 0) {
tableCompressedSize += sizeInBytes;
}
long sizeInBytes = segmentZKMetadata.getSizeInBytes();
if (sizeInBytes > 0) {
tableCompressedSize += sizeInBytes;
}
nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState
.getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
// No replicas for this segment
nOffline++;
if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) {
LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
}
nReplicasExternal = 0;

// NOTE: We want to skip segments that are just created/pushed to avoid false alerts because it is expected for
// servers to take some time to load them. For consuming (IN_PROGRESS) segments, we use creation time from
// the ZK metadata; for pushed segments, we use push time from the ZK metadata. Both of them are the time
// when segment is newly created. For committed segments from real-time table, push time doesn't exist, and
// creationTimeMs will be Long.MIN_VALUE, which is fine because we want to include them in the check.
long creationTimeMs = segmentZKMetadata.getStatus() == Status.IN_PROGRESS ? segmentZKMetadata.getCreationTime()
: segmentZKMetadata.getPushTime();
if (creationTimeMs > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000L) {
continue;
}
for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
// Count number of online replicas. Ignore if state is CONSUMING.
// It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
// So, ignore this combination. If a segment exists in this combination for a long time, we will get
// low level-partition-not-consuming alert anyway.
if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
nReplicas++;
}
if (serverAndState.getValue().equals(ERROR)) {
nErrors++;

int numEVReplicas = 0;
if (externalView != null) {
Map<String, String> stateMap = externalView.getStateMap(segment);
if (stateMap != null) {
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
String state = entry.getValue();
if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
numEVReplicas++;
}
if (state.equals(SegmentStateModel.ERROR)) {
errorSegments.add(Pair.of(segment, entry.getKey()));
}
}
}
}
if (nReplicas == 0) {
if (nOffline < MAX_OFFLINE_SEGMENTS_TO_LOG) {
LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
}
nOffline++;
} else if (nReplicas < nReplicasIdealMax) {
LOGGER.debug("Segment {} of table {} is running with {} replicas which is less than the expected values {}",
partitionName, tableNameWithType, nReplicas, nReplicasIdealMax);
nNumOfReplicasLessThanIdeal++;
if (numEVReplicas == 0) {
offlineSegments.add(segment);
} else if (numEVReplicas < numISReplicas) {
partialOnlineSegments.add(segment);
} else {
// Do not allow nReplicasEV to be larger than nReplicasIS
numEVReplicas = numISReplicas;
}
minEVReplicas = Math.min(minEVReplicas, numEVReplicas);
}

if (maxISReplicas == Integer.MIN_VALUE) {
try {
maxISReplicas = Math.max(Integer.parseInt(idealState.getReplicas()), 1);
} catch (NumberFormatException e) {
maxISReplicas = 1;
}
nReplicasExternal =
((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
}
if (nReplicasExternal == -1) {
nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
// Do not allow minEVReplicas to be larger than maxISReplicas
minEVReplicas = Math.min(minEVReplicas, maxISReplicas);

if (minEVReplicas < maxISReplicas) {
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}",
tableNameWithType, minEVReplicas, maxISReplicas);
}
int numSegmentsWithoutZKMetadata = segmentsWithoutZKMetadata.size();
if (numSegmentsWithoutZKMetadata > 0) {
LOGGER.warn("Table {} has {} segments without ZK metadata: {}", tableNameWithType, numSegmentsWithoutZKMetadata,
logSegments(segmentsWithoutZKMetadata));
}
int numErrorSegments = errorSegments.size();
if (numErrorSegments > 0) {
LOGGER.warn("Table {} has {} segments in ERROR state: {}", tableNameWithType, numErrorSegments,
logSegments(errorSegments));
}
int numOfflineSegments = offlineSegments.size();
if (numOfflineSegments > 0) {
LOGGER.warn("Table {} has {} segments without ONLINE/CONSUMING replica: {}", tableNameWithType,
numOfflineSegments, logSegments(offlineSegments));
}
int numPartialOnlineSegments = partialOnlineSegments.size();
if (numPartialOnlineSegments > 0) {
LOGGER.warn("Table {} has {} segments with fewer replicas than the replication factor: {}", tableNameWithType,
numPartialOnlineSegments, logSegments(partialOnlineSegments));
}

// Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, minEVReplicas);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
(nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
nNumOfReplicasLessThanIdeal);
minEVReplicas * 100L / maxISReplicas);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE,
numErrorSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
(nSegments > 0) ? (nSegments - nOffline) * 100 / nSegments : 100);
numOfflineSegments > 0 ? (numSegments - numOfflineSegments) * 100L / numSegments : 100);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS,
numPartialOnlineSegments);
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_SIZE,
tableCompressedSize);

if (nOffline > 0) {
LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
}
if (nNumOfReplicasLessThanIdeal > 0) {
LOGGER.warn("Table {} has {} segments with number of replicas less than the replication factor",
tableNameWithType, nNumOfReplicasLessThanIdeal);
}
if (nReplicasExternal < nReplicasIdealMax) {
LOGGER.warn("Table {} has at least one segment running with only {} replicas, below replication threshold :{}",
tableNameWithType, nReplicasExternal, nReplicasIdealMax);
}

if (tableType == TableType.REALTIME && tableConfig != null) {
StreamConfig streamConfig =
new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
Expand All @@ -373,6 +400,13 @@ private void updateSegmentMetrics(String tableNameWithType, TableConfig tableCon
}
}

private static String logSegments(List<?> segments) {
if (segments.size() <= MAX_SEGMENTS_TO_LOG) {
return segments.toString();
}
return segments.subList(0, MAX_SEGMENTS_TO_LOG) + "...";
}

@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
tableNamesWithType.forEach(this::removeMetricsForTable);
Expand Down Expand Up @@ -403,20 +437,15 @@ private void removeMetricsForTable(String tableNameWithType) {
public void cleanUpTask() {
}

@VisibleForTesting
void setTableSizeReader(TableSizeReader tableSizeReader) {
_tableSizeReader = tableSizeReader;
}

public static final class Context {
private boolean _logDisabledTables;
private int _realTimeTableCount;
private int _offlineTableCount;
private int _upsertTableCount;
private int _tierBackendConfiguredTableCount;
private Map<String, Integer> _tierBackendTableCountMap = new HashMap<>();
private Set<String> _processedTables = new HashSet<>();
private Set<String> _disabledTables = new HashSet<>();
private Set<String> _pausedTables = new HashSet<>();
private final Map<String, Integer> _tierBackendTableCountMap = new HashMap<>();
private final Set<String> _processedTables = new HashSet<>();
private final Set<String> _disabledTables = new HashSet<>();
private final Set<String> _pausedTables = new HashSet<>();
}
}
Loading

0 comments on commit f9074f9

Please sign in to comment.