Skip to content

Commit

Permalink
Fix miss from manual cherry-pick
Browse files Browse the repository at this point in the history
Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas committed Mar 15, 2024
1 parent f2a78b5 commit dbb52e0
Showing 1 changed file with 34 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,23 +269,7 @@ private void createMonitorFromQueries(String index, List<Pair<String, Rule>> rul
if (numberOfUnprocessedResponses == 0) {
listener.onResponse(monitorResponses);
} else {
GroupedActionListener<IndexMonitorResponse> monitorResponseListener = new GroupedActionListener(
new ActionListener<Collection<IndexMonitorResponse>>() {
@Override
public void onResponse(Collection<IndexMonitorResponse> indexMonitorResponse) {
monitorResponses.addAll(indexMonitorResponse.stream().collect(Collectors.toList()));
listener.onResponse(monitorResponses);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, numberOfUnprocessedResponses);

for (int i = 1; i < monitorRequests.size(); i++) {
AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(i), namedWriteableRegistry, monitorResponseListener);
}
saveMonitors(monitorRequests, monitorResponses, numberOfUnprocessedResponses, listener);
}
}, listener::onFailure);
}, listener::onFailure);
Expand All @@ -300,13 +284,45 @@ public void onFailure(Exception e) {
// Indexing monitors in two steps in order to prevent all shards failed error from alerting
// https://github.com/opensearch-project/alerting/issues/646
AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, indexDocLevelMonitorStep);
indexDocLevelMonitorStep.whenComplete(monitorResponses::add, listener::onFailure);
indexDocLevelMonitorStep.whenComplete(addedFirstMonitorResponse -> {
log.debug("first monitor created id {} of type {}", addedFirstMonitorResponse.getId(), addedFirstMonitorResponse.getMonitor().getMonitorType());
monitorResponses.add(addedFirstMonitorResponse);
int numberOfUnprocessedResponses = monitorRequests.size() - 1;
if (numberOfUnprocessedResponses == 0) {
listener.onResponse(monitorResponses);
} else {
saveMonitors(monitorRequests, monitorResponses, numberOfUnprocessedResponses, listener);
}
}, listener::onFailure);
}
} catch (Exception ex) {
listener.onFailure(ex);
}
}

private void saveMonitors(
List<IndexMonitorRequest> monitorRequests,
List<IndexMonitorResponse> monitorResponses,
int numberOfUnprocessedResponses,
ActionListener<List<IndexMonitorResponse>> listener
) {
GroupedActionListener<IndexMonitorResponse> monitorResponseListener = new GroupedActionListener(
new ActionListener<Collection<IndexMonitorResponse>>() {
@Override
public void onResponse(Collection<IndexMonitorResponse> indexMonitorResponses) {
monitorResponses.addAll(indexMonitorResponses.stream().collect(Collectors.toList()));
listener.onResponse(monitorResponses);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}, numberOfUnprocessedResponses);
for (int i = 1; i < monitorRequests.size(); i++) {
AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(i), namedWriteableRegistry, monitorResponseListener);
}
}

private void updateMonitorFromQueries(String index, List<Pair<String, Rule>> rulesById, Detector detector, ActionListener<List<IndexMonitorResponse>> listener, WriteRequest.RefreshPolicy refreshPolicy,
List<String> queryFieldNames) throws IOException {
List<IndexMonitorRequest> monitorsToBeUpdated = new ArrayList<>();
Expand Down

0 comments on commit dbb52e0

Please sign in to comment.