Skip to content

Commit

Permalink
Fix BulkResponse update for all failed
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Feb 12, 2024
1 parent 1ee02e2 commit 313aab6
Showing 1 changed file with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class TransportExecuteStreamingDetectorsAction extends HandledTransportAction<BulkRequest, BulkResponse> implements SecureTransportAction {
private static final Logger log = LogManager.getLogger(TransportExecuteStreamingDetectorsAction.class);
Expand Down Expand Up @@ -155,7 +154,7 @@ public void onResponse(final SearchResponse searchResponse) {
try {
detectors = DetectorUtils.getDetectors(searchResponse, xContentRegistry);
} catch (final IOException e) {
handleAllDetectorsFailure(bulkResponse, e);
handleAllDetectorsFailure(bulkResponse, indexToDocData, e);
listener.onResponse(bulkResponse);
return;
}
Expand All @@ -169,7 +168,7 @@ public void onFailure(final Exception e) {
log.warn("No detectors configured, skipping streaming detectors workflow");
listener.onResponse(bulkResponse);
} else {
handleAllDetectorsFailure(bulkResponse, e);
handleAllDetectorsFailure(bulkResponse, indexToDocData, e);
listener.onResponse(bulkResponse);
}
}
Expand Down Expand Up @@ -300,11 +299,18 @@ public void onFailure(final Exception e) {
});
}

private void handleAllDetectorsFailure(final BulkResponse bulkResponse, final Exception exception) {
private void handleAllDetectorsFailure(final BulkResponse bulkResponse, final Map<String, List<DocData>> indexToDocData,
final Exception exception) {
log.error("Failed to run all detectors", exception);
final String failureMessage = String.format("Failed to run all detectors due to %s.", exception);

IntStream.range(0, bulkResponse.getItems().length).forEach(i -> {
// Only get the indices of documents that were eligible to be sent to a detector workflow
final Set<Integer> bulkItemResponseArrayIndices = indexToDocData.values().stream()
.flatMap(Collection::stream)
.map(DocData::getBulkItemResponseIndex)
.collect(Collectors.toSet());

bulkItemResponseArrayIndices.forEach(i -> {
final BulkItemResponse originalBulkItemResponse = bulkResponse.getItems()[i];
final BulkItemResponse recreatedBulkItemResponse = recreateBulkItemResponseWithFailure(originalBulkItemResponse, failureMessage);
bulkResponse.getItems()[i] = recreatedBulkItemResponse;
Expand Down Expand Up @@ -338,7 +344,6 @@ private BulkItemResponse recreateBulkItemResponseWithFailure(final BulkItemRespo
index = originalBulkItemResponse.getFailure().getIndex();
docId = originalBulkItemResponse.getFailure().getId();
failureMessage = originalBulkItemResponse.getFailure().getCause().getMessage() + " " + currentFailureMessage;

} else {
index = originalBulkItemResponse.getResponse().getIndex();
docId = originalBulkItemResponse.getResponse().getId();
Expand Down

0 comments on commit 313aab6

Please sign in to comment.