diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java index 26dab019556a..9218b5033018 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java @@ -75,8 +75,9 @@ public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithTyp _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); // Gets info for segments with LLRealtimeSegmentDataManager found in the table data manager - Map> serverToSegmentConsumerInfoMap = + ConsumingSegmentsInfoFromServersResponse response = getConsumingSegmentsInfoFromServers(tableNameWithType, serverToEndpoints, timeoutMs); + Map> serverToSegmentConsumerInfoMap = response._serverToSegmentConsumerInfoMap; TreeMap> consumingSegmentInfoMap = new TreeMap<>(); for (Map.Entry> entry : serverToSegmentConsumerInfoMap.entrySet()) { String serverName = entry.getKey(); @@ -93,14 +94,14 @@ public ConsumingSegmentsInfoMap getConsumingSegmentsInfo(String tableNameWithTyp // Segments which are in CONSUMING state but found no consumer on the server Set consumingSegments = _pinotHelixResourceManager.getConsumingSegments(tableNameWithType); consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c, Collections.emptyList())); - return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap); + return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap, response._failedResponseCount, response._failedParses); } /** * This method makes a MultiGet call to all servers to get the consuming segments info. * @return servers queried and a list of consumer status information for consuming segments on that server */ - private Map> getConsumingSegmentsInfoFromServers(String tableNameWithType, + private ConsumingSegmentsInfoFromServersResponse getConsumingSegmentsInfoFromServers(String tableNameWithType, BiMap serverToEndpoints, int timeoutMs) { LOGGER.info("Reading consuming segment info from servers: {} for table: {}", serverToEndpoints.keySet(), tableNameWithType); @@ -132,7 +133,8 @@ private Map> getConsumingSegmentsInfoFromServe if (failedParses != 0) { LOGGER.warn("Failed to parse {} / {} segment size info responses from servers.", failedParses, serverUrls.size()); } - return serverToConsumingSegmentInfoList; + return new ConsumingSegmentsInfoFromServersResponse( + serverToConsumingSegmentInfoList, serviceResponse._failedResponseCount, failedParses); } private String generateServerURL(String tableNameWithType, String endpoint) { @@ -189,10 +191,18 @@ public TableStatus.IngestionStatus getIngestionStatus(String tableNameWithType, @JsonIgnoreProperties(ignoreUnknown = true) static public class ConsumingSegmentsInfoMap { public TreeMap> _segmentToConsumingInfoMap; + @JsonProperty("serversFailingToRespond") + public int _serversFailingToRespond; + @JsonProperty("serversUnparsableRespond") + public int _serversUnparsableRespond; public ConsumingSegmentsInfoMap(@JsonProperty("segmentToConsumingInfoMap") - TreeMap> segmentToConsumingInfoMap) { + TreeMap> segmentToConsumingInfoMap, + @JsonProperty("serversFailingToRespond") int serversFailingToRespond, + @JsonProperty("serversUnparsableRespond") int serversUnparsableRespond) { _segmentToConsumingInfoMap = segmentToConsumingInfoMap; + _serversFailingToRespond = serversFailingToRespond; + _serversUnparsableRespond = serversUnparsableRespond; } } @@ -254,4 +264,18 @@ public PartitionOffsetInfo( _availabilityLagMap = availabilityLagMsMap; } } + + public static class ConsumingSegmentsInfoFromServersResponse { + private final Map> _serverToSegmentConsumerInfoMap; + private final int _failedResponseCount; + private final int _failedParses; + + public ConsumingSegmentsInfoFromServersResponse( + Map> serverToSegmentConsumerInfoMap, int failedResponseCount, + int failedParses) { + _serverToSegmentConsumerInfoMap = serverToSegmentConsumerInfoMap; + _failedResponseCount = failedResponseCount; + _failedParses = failedParses; + } + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java index 28fd0ec795d3..d4298a65a3b4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java @@ -137,7 +137,7 @@ public void realtimeBasicTest() ConsumingSegmentInfoReader consumingSegmentReader = mock(ConsumingSegmentInfoReader.class); when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000)) - .thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response)); + .thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0)); RealtimeConsumerMonitor realtimeConsumerMonitor = new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, controllerMetrics, consumingSegmentReader);