diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java index cc8ea07153d4..320649c0d190 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java @@ -239,31 +239,36 @@ public List getValidDocIdsMetadataFromServer(String tab serverToEndpoints.get(serverToSegments.getKey()))); } + BiMap endpointsToServers = serverToEndpoints.inverse(); + // request the urls from the servers CompletionServiceHelper completionServiceHelper = - new CompletionServiceHelper(_executor, _connectionManager, serverToEndpoints); + new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers); Map requestHeaders = Map.of("Content-Type", "application/json"); CompletionServiceHelper.CompletionServiceResponse serviceResponse = completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, false, requestHeaders, timeoutMs, null); - List validDocIdsMetadataInfos = new ArrayList<>(); + Map validDocIdsMetadataInfos = new HashMap<>(); int failedParses = 0; int returnedServersCount = 0; for (Map.Entry streamResponse : serviceResponse._httpResponses.entrySet()) { try { String validDocIdsMetadataList = streamResponse.getValue(); - List validDocIdsMetadataInfo = + List validDocIdsMetadataInfoList = JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference>() { }); - validDocIdsMetadataInfos.addAll(validDocIdsMetadataInfo); + for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: validDocIdsMetadataInfoList) { + validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo); + } returnedServersCount++; } catch (Exception e) { failedParses++; LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e); } } + if (failedParses != 0) { LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses, serverURLsAndBodies.size()); @@ -281,7 +286,7 @@ public List getValidDocIdsMetadataFromServer(String tab LOGGER.info("Retrieved validDocIds metadata for {} segments from {} servers.", validDocIdsMetadataInfos.size(), returnedServersCount); - return validDocIdsMetadataInfos; + return new ArrayList<>(validDocIdsMetadataInfos.values()); } /**