diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportExecuteStreamingDetectorsAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportExecuteStreamingDetectorsAction.java index 875edc9dd..5f933255c 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportExecuteStreamingDetectorsAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportExecuteStreamingDetectorsAction.java @@ -57,7 +57,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import java.util.stream.IntStream; public class TransportExecuteStreamingDetectorsAction extends HandledTransportAction implements SecureTransportAction { private static final Logger log = LogManager.getLogger(TransportExecuteStreamingDetectorsAction.class); @@ -155,7 +154,7 @@ public void onResponse(final SearchResponse searchResponse) { try { detectors = DetectorUtils.getDetectors(searchResponse, xContentRegistry); } catch (final IOException e) { - handleAllDetectorsFailure(bulkResponse, e); + handleAllDetectorsFailure(bulkResponse, indexToDocData, e); listener.onResponse(bulkResponse); return; } @@ -169,7 +168,7 @@ public void onFailure(final Exception e) { log.warn("No detectors configured, skipping streaming detectors workflow"); listener.onResponse(bulkResponse); } else { - handleAllDetectorsFailure(bulkResponse, e); + handleAllDetectorsFailure(bulkResponse, indexToDocData, e); listener.onResponse(bulkResponse); } } @@ -300,11 +299,18 @@ public void onFailure(final Exception e) { }); } - private void handleAllDetectorsFailure(final BulkResponse bulkResponse, final Exception exception) { + private void handleAllDetectorsFailure(final BulkResponse bulkResponse, final Map> indexToDocData, + final Exception exception) { log.error("Failed to run all detectors", exception); final String failureMessage = String.format("Failed to run all detectors due to %s.", exception); - IntStream.range(0, bulkResponse.getItems().length).forEach(i -> { + // Only get the indices of documents that were eligible to be sent to a detector workflow + final Set bulkItemResponseArrayIndices = indexToDocData.values().stream() + .flatMap(Collection::stream) + .map(DocData::getBulkItemResponseIndex) + .collect(Collectors.toSet()); + + bulkItemResponseArrayIndices.forEach(i -> { final BulkItemResponse originalBulkItemResponse = bulkResponse.getItems()[i]; final BulkItemResponse recreatedBulkItemResponse = recreateBulkItemResponseWithFailure(originalBulkItemResponse, failureMessage); bulkResponse.getItems()[i] = recreatedBulkItemResponse; @@ -338,7 +344,6 @@ private BulkItemResponse recreateBulkItemResponseWithFailure(final BulkItemRespo index = originalBulkItemResponse.getFailure().getIndex(); docId = originalBulkItemResponse.getFailure().getId(); failureMessage = originalBulkItemResponse.getFailure().getCause().getMessage() + " " + currentFailureMessage; - } else { index = originalBulkItemResponse.getResponse().getIndex(); docId = originalBulkItemResponse.getResponse().getId();