From dbb52e00656d5f65481161d5b806d3a425f492d9 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Thu, 14 Mar 2024 18:34:58 -0700 Subject: [PATCH] Fix miss from manual cherry-pick Signed-off-by: Chase Engelbrecht --- .../TransportIndexDetectorAction.java | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index e3a779d57..e34a8f617 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -269,23 +269,7 @@ private void createMonitorFromQueries(String index, List> rul if (numberOfUnprocessedResponses == 0) { listener.onResponse(monitorResponses); } else { - GroupedActionListener monitorResponseListener = new GroupedActionListener( - new ActionListener>() { - @Override - public void onResponse(Collection indexMonitorResponse) { - monitorResponses.addAll(indexMonitorResponse.stream().collect(Collectors.toList())); - listener.onResponse(monitorResponses); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, numberOfUnprocessedResponses); - - for (int i = 1; i < monitorRequests.size(); i++) { - AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(i), namedWriteableRegistry, monitorResponseListener); - } + saveMonitors(monitorRequests, monitorResponses, numberOfUnprocessedResponses, listener); } }, listener::onFailure); }, listener::onFailure); @@ -300,13 +284,45 @@ public void onFailure(Exception e) { // Indexing monitors in two steps in order to prevent all shards failed error from alerting // https://github.com/opensearch-project/alerting/issues/646 AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, indexDocLevelMonitorStep); - indexDocLevelMonitorStep.whenComplete(monitorResponses::add, listener::onFailure); + indexDocLevelMonitorStep.whenComplete(addedFirstMonitorResponse -> { + log.debug("first monitor created id {} of type {}", addedFirstMonitorResponse.getId(), addedFirstMonitorResponse.getMonitor().getMonitorType()); + monitorResponses.add(addedFirstMonitorResponse); + int numberOfUnprocessedResponses = monitorRequests.size() - 1; + if (numberOfUnprocessedResponses == 0) { + listener.onResponse(monitorResponses); + } else { + saveMonitors(monitorRequests, monitorResponses, numberOfUnprocessedResponses, listener); + } + }, listener::onFailure); } } catch (Exception ex) { listener.onFailure(ex); } } + private void saveMonitors( + List monitorRequests, + List monitorResponses, + int numberOfUnprocessedResponses, + ActionListener> listener + ) { + GroupedActionListener monitorResponseListener = new GroupedActionListener( + new ActionListener>() { + @Override + public void onResponse(Collection indexMonitorResponses) { + monitorResponses.addAll(indexMonitorResponses.stream().collect(Collectors.toList())); + listener.onResponse(monitorResponses); + } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }, numberOfUnprocessedResponses); + for (int i = 1; i < monitorRequests.size(); i++) { + AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(i), namedWriteableRegistry, monitorResponseListener); + } + } + private void updateMonitorFromQueries(String index, List> rulesById, Detector detector, ActionListener> listener, WriteRequest.RefreshPolicy refreshPolicy, List queryFieldNames) throws IOException { List monitorsToBeUpdated = new ArrayList<>();