Skip to content

Commit

Permalink
Fix bug to return validDocIDsMetadata from all servers (apache#12431)
Browse files Browse the repository at this point in the history
* Fix bug to return validDocIDsMetadata from all servers

* Deduping validDocIdsMetadata response
  • Loading branch information
tibrewalpratik17 authored Feb 21, 2024
1 parent 8c395ff commit d0ce68b
Showing 1 changed file with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,31 +239,36 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab
serverToEndpoints.get(serverToSegments.getKey())));
}

BiMap<String, String> endpointsToServers = serverToEndpoints.inverse();

// request the urls from the servers
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager, serverToEndpoints);
new CompletionServiceHelper(_executor, _connectionManager, endpointsToServers);

Map<String, String> requestHeaders = Map.of("Content-Type", "application/json");
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, tableNameWithType, false, requestHeaders,
timeoutMs, null);

List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new ArrayList<>();
Map<String, ValidDocIdsMetadataInfo> validDocIdsMetadataInfos = new HashMap<>();
int failedParses = 0;
int returnedServersCount = 0;
for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
String validDocIdsMetadataList = streamResponse.getValue();
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfoList =
JsonUtils.stringToObject(validDocIdsMetadataList, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
});
validDocIdsMetadataInfos.addAll(validDocIdsMetadataInfo);
for (ValidDocIdsMetadataInfo validDocIdsMetadataInfo: validDocIdsMetadataInfoList) {
validDocIdsMetadataInfos.put(validDocIdsMetadataInfo.getSegmentName(), validDocIdsMetadataInfo);
}
returnedServersCount++;
} catch (Exception e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
}
}

if (failedParses != 0) {
LOGGER.error("Unable to parse server {} / {} response due to an error: ", failedParses,
serverURLsAndBodies.size());
Expand All @@ -281,7 +286,7 @@ public List<ValidDocIdsMetadataInfo> getValidDocIdsMetadataFromServer(String tab

LOGGER.info("Retrieved validDocIds metadata for {} segments from {} servers.", validDocIdsMetadataInfos.size(),
returnedServersCount);
return validDocIdsMetadataInfos;
return new ArrayList<>(validDocIdsMetadataInfos.values());
}

/**
Expand Down

0 comments on commit d0ce68b

Please sign in to comment.