Skip to content

Commit

Permalink
Modify consumingSegmentsInfo endpoint to indicate how many servers fa…
Browse files Browse the repository at this point in the history
…iled (apache#12523)

* Modify consumingSegmentsInfo endpoint to indicate how many servers failed

* Add unparsable respond
  • Loading branch information
gortiz authored Mar 15, 2024
1 parent a5e3d43 commit 3b45dd6
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithTyp
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());

// Gets info for segments with LLRealtimeSegmentDataManager found in the table data manager
Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap =
ConsumingSegmentsInfoFromServersResponse response =
getConsumingSegmentsInfoFromServers(tableNameWithType, serverToEndpoints, timeoutMs);
Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap = response._serverToSegmentConsumerInfoMap;
TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new TreeMap<>();
for (Map.Entry<String, List<SegmentConsumerInfo>> entry : serverToSegmentConsumerInfoMap.entrySet()) {
String serverName = entry.getKey();
Expand All @@ -93,14 +94,14 @@ public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithTyp
// Segments which are in CONSUMING state but found no consumer on the server
Set<String> consumingSegments = _pinotHelixResourceManager.getConsumingSegments(tableNameWithType);
consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c, Collections.emptyList()));
return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap);
return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap, response._failedResponseCount, response._failedParses);
}

/**
* This method makes a MultiGet call to all servers to get the consuming segments info.
* @return servers queried and a list of consumer status information for consuming segments on that server
*/
private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServers(String tableNameWithType,
private ConsumingSegmentsInfoFromServersResponse getConsumingSegmentsInfoFromServers(String tableNameWithType,
BiMap<String, String> serverToEndpoints, int timeoutMs) {
LOGGER.info("Reading consuming segment info from servers: {} for table: {}", serverToEndpoints.keySet(),
tableNameWithType);
Expand Down Expand Up @@ -132,7 +133,8 @@ private Map<String, List<SegmentConsumerInfo>> getConsumingSegmentsInfoFromServe
if (failedParses != 0) {
LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", failedParses, serverUrls.size());
}
return serverToConsumingSegmentInfoList;
return new ConsumingSegmentsInfoFromServersResponse(
serverToConsumingSegmentInfoList, serviceResponse._failedResponseCount, failedParses);
}

private String generateServerURL(String tableNameWithType, String endpoint) {
Expand Down Expand Up @@ -189,10 +191,18 @@ public TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType,
@JsonIgnoreProperties(ignoreUnknown = true)
static public class ConsumingSegmentsInfoMap {
public TreeMap<String, List<ConsumingSegmentInfo>> _segmentToConsumingInfoMap;
@JsonProperty("serversFailingToRespond")
public int _serversFailingToRespond;
@JsonProperty("serversUnparsableRespond")
public int _serversUnparsableRespond;

public ConsumingSegmentsInfoMap(@JsonProperty("segmentToConsumingInfoMap")
TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap) {
TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap,
@JsonProperty("serversFailingToRespond") int serversFailingToRespond,
@JsonProperty("serversUnparsableRespond") int serversUnparsableRespond) {
_segmentToConsumingInfoMap = segmentToConsumingInfoMap;
_serversFailingToRespond = serversFailingToRespond;
_serversUnparsableRespond = serversUnparsableRespond;
}
}

Expand Down Expand Up @@ -254,4 +264,18 @@ public PartitionOffsetInfo(
_availabilityLagMap = availabilityLagMsMap;
}
}

public static class ConsumingSegmentsInfoFromServersResponse {
private final Map<String, List<SegmentConsumerInfo>> _serverToSegmentConsumerInfoMap;
private final int _failedResponseCount;
private final int _failedParses;

public ConsumingSegmentsInfoFromServersResponse(
Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap, int failedResponseCount,
int failedParses) {
_serverToSegmentConsumerInfoMap = serverToSegmentConsumerInfoMap;
_failedResponseCount = failedResponseCount;
_failedParses = failedParses;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void realtimeBasicTest()

ConsumingSegmentInfoReader consumingSegmentReader = mock(ConsumingSegmentInfoReader.class);
when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
.thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response));
.thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0));
RealtimeConsumerMonitor realtimeConsumerMonitor =
new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager,
controllerMetrics, consumingSegmentReader);
Expand Down

0 comments on commit 3b45dd6

Please sign in to comment.