Skip to content

Commit

Permalink
Allow detectors to be stopped if underlying workflow is deleted. Don'…
Browse files Browse the repository at this point in the history
…t allow them to then be started/editted (opensearch-project#810)

* Allow detectors to be stopped if underlying workflow is deleted. Don't allow them to then be started

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add copyright headers

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Apr 3, 2024
1 parent 42849c7 commit 901eb26
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ public void setAlertsHistoryIndexPattern(String alertsHistoryIndexPattern) {
this.alertsHistoryIndexPattern = alertsHistoryIndexPattern;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public void setEnabledTime(Instant enabledTime) {
this.enabledTime = enabledTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
Expand Down Expand Up @@ -43,9 +42,11 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -62,6 +63,11 @@
public class TransportDeleteDetectorAction extends HandledTransportAction<DeleteDetectorRequest, DeleteDetectorResponse> {

private static final Logger log = LogManager.getLogger(TransportDeleteDetectorAction.class);
private static final List<ThrowableCheckingPredicates> ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS = List.of(
ThrowableCheckingPredicates.MONITOR_NOT_FOUND,
ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND,
ThrowableCheckingPredicates.ALERTING_CONFIG_INDEX_NOT_FOUND
);

private final Client client;

Expand All @@ -84,9 +90,13 @@ public class TransportDeleteDetectorAction extends HandledTransportAction<Delete

private final DetectorIndices detectorIndices;

private final ExceptionChecker exceptionChecker;

@Inject
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client, ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices, DetectorIndices detectorIndices, ClusterService clusterService,
Settings settings) {
public TransportDeleteDetectorAction(TransportService transportService, IndexTemplateManager indexTemplateManager, Client client,
ActionFilters actionFilters, NamedXContentRegistry xContentRegistry, RuleTopicIndices ruleTopicIndices,
DetectorIndices detectorIndices, ClusterService clusterService, Settings settings,
ExceptionChecker exceptionChecker) {
super(DeleteDetectorAction.NAME, transportService, actionFilters, DeleteDetectorRequest::new);
this.client = client;
this.ruleTopicIndices = ruleTopicIndices;
Expand All @@ -101,6 +111,7 @@ public TransportDeleteDetectorAction(TransportService transportService, IndexTem

this.enabledWorkflowUsage = SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE.get(this.settings);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;
}

@Override
Expand Down Expand Up @@ -205,7 +216,8 @@ public void onResponse(Collection<DeleteMonitorResponse> responses) {

@Override
public void onFailure(Exception e) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(e, detector.getId())) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(e, detector.getId());
deleteDetectorFromConfig(detector.getId(), request.getRefreshPolicy());
} else {
log.error(String.format(Locale.ROOT, "Failed to delete detector %s", detector.getId()), e);
Expand Down Expand Up @@ -244,7 +256,8 @@ private void deleteWorkflow(Detector detector, ActionListener<AcknowledgedRespon

private void handleDeleteWorkflowFailure(final String detectorId, final Exception deleteWorkflowException,
final ActionListener<AcknowledgedResponse> actionListener) {
if (isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(deleteWorkflowException, detectorId)) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(deleteWorkflowException, ACCEPTABLE_ENTITY_MISSING_THROWABLE_MATCHERS)) {
logAcceptableEntityMissingException(deleteWorkflowException, detectorId);
actionListener.onResponse(new AcknowledgedResponse(true));
} else {
actionListener.onFailure(deleteWorkflowException);
Expand Down Expand Up @@ -306,39 +319,12 @@ private void finishHim(String detectorId, Exception t) {
}
}));
}

private boolean isOnlyWorkflowOrMonitorOrIndexMissingExceptionThrownByGroupedActionListener(
Exception ex,
String detectorId
) {
// grouped action listener listens on mutliple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
int len = ex.getSuppressed().length;
for (int i = 0; i <= len; i++) {
Throwable e = i == len ? ex : ex.getSuppressed()[i];
if (isMonitorNotFoundException(e) || isWorkflowNotFoundException(e) || isAlertingConfigIndexNotFoundException(e)) {
log.error(
String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +
" Proceeding with detector %s deletion", detectorId),
e);
} else {
return false;
}
}
return true;
}
}

private boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

private boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

private boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
private void logAcceptableEntityMissingException(final Exception e, final String detectorId) {
final String errorMsg = String.format(Locale.ROOT, "Workflow, monitor, or jobs index already deleted." +
" Proceeding with detector %s deletion", detectorId);
log.error(errorMsg, e);
}

private void setEnabledWorkflowUsage(boolean enabledWorkflowUsage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@
import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService;
import org.opensearch.securityanalytics.util.DetectorIndices;
import org.opensearch.securityanalytics.util.DetectorUtils;
import org.opensearch.securityanalytics.util.ExceptionChecker;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.MonitorService;
import org.opensearch.securityanalytics.util.RuleIndices;
import org.opensearch.securityanalytics.util.RuleTopicIndices;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.securityanalytics.util.ThrowableCheckingPredicates;
import org.opensearch.securityanalytics.util.WorkflowService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -163,6 +165,8 @@ public class TransportIndexDetectorAction extends HandledTransportAction<IndexDe
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DetectorThreatIntelService detectorThreatIntelService;

private final ExceptionChecker exceptionChecker;

private final TimeValue indexTimeout;
@Inject
public TransportIndexDetectorAction(TransportService transportService,
Expand All @@ -178,7 +182,8 @@ public TransportIndexDetectorAction(TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
LogTypeService logTypeService,
IndexNameExpressionResolver indexNameExpressionResolver,
DetectorThreatIntelService detectorThreatIntelService) {
DetectorThreatIntelService detectorThreatIntelService,
ExceptionChecker exceptionChecker) {
super(IndexDetectorAction.NAME, transportService, actionFilters, IndexDetectorRequest::new);
this.client = client;
this.xContentRegistry = xContentRegistry;
Expand All @@ -201,6 +206,7 @@ public TransportIndexDetectorAction(TransportService transportService,

this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.FILTER_BY_BACKEND_ROLES, this::setFilterByEnabled);
this.clusterService.getClusterSettings().addSettingsUpdateConsumer(SecurityAnalyticsSettings.ENABLE_WORKFLOW_USAGE, this::setEnabledWorkflowUsage);
this.exceptionChecker = exceptionChecker;
}

@Override
Expand Down Expand Up @@ -720,8 +726,7 @@ public void onResponse(IndexWorkflowResponse workflowResponse) {
}
@Override
public void onFailure(Exception e) {
log.error("Failed to update the workflow");
listener.onFailure(e);
handleUpsertWorkflowFailure(e, listener, detector, monitorsToBeDeleted, refreshPolicy, updatedMonitors);
}
});
}
Expand Down Expand Up @@ -778,6 +783,25 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List<Pair<String, Rule>
return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null);
}

private void handleUpsertWorkflowFailure(final Exception e, final ActionListener<List<IndexMonitorResponse>> listener,
final Detector detector, final List<String> monitorsToBeDeleted,
final RefreshPolicy refreshPolicy, final List<IndexMonitorResponse> updatedMonitors) {
if (exceptionChecker.doesGroupedActionListenerExceptionMatch(e, List.of(ThrowableCheckingPredicates.WORKFLOW_NOT_FOUND))) {
if (detector.getEnabled()) {
final String errorMessage = String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName());
log.error(errorMessage);
listener.onFailure(new SecurityAnalyticsException(errorMessage, RestStatus.BAD_REQUEST, e));
} else {
log.error("Underlying workflow associated with detector {} not found. Proceeding to disable detector.", detector.getName());
deleteMonitorStep(monitorsToBeDeleted, refreshPolicy, updatedMonitors, listener);
}
} else {
log.error("Failed to update the workflow");
listener.onFailure(e);
}
}

private void addThreatIntelBasedDocLevelQueries(Detector detector, ActionListener<List<DocLevelQuery>> listener) {
try {
if (detector.getThreatIntelEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class ExceptionChecker {

public boolean doesGroupedActionListenerExceptionMatch(final Exception ex, final List<ThrowableCheckingPredicates> exceptionMatchers) {
// grouped action listener listens on multiple listeners but throws only one exception. If multiple
// listeners fail the other exceptions are added as suppressed exceptions to the first failure.
return Stream.concat(Arrays.stream(ex.getSuppressed()), Stream.of(ex))
.allMatch(throwable -> doesExceptionMatch(throwable, exceptionMatchers));
}

private boolean doesExceptionMatch(final Throwable throwable, final List<ThrowableCheckingPredicates> exceptionMatchers) {
return exceptionMatchers.stream()
.map(ThrowableCheckingPredicates::getMatcherPredicate)
.anyMatch(matcher -> matcher.test(throwable));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.securityanalytics.util;

import java.util.function.Predicate;

public enum ThrowableCheckingPredicates {
MONITOR_NOT_FOUND(ThrowableCheckingPredicates::isMonitorNotFoundException),
WORKFLOW_NOT_FOUND(ThrowableCheckingPredicates::isWorkflowNotFoundException),
ALERTING_CONFIG_INDEX_NOT_FOUND(ThrowableCheckingPredicates::isAlertingConfigIndexNotFoundException);

private final Predicate<Throwable> matcherPredicate;
ThrowableCheckingPredicates(final Predicate<Throwable> matcherPredicate) {
this.matcherPredicate = matcherPredicate;
}

public Predicate<Throwable> getMatcherPredicate() {
return this.matcherPredicate;
}

private static boolean isMonitorNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Monitor(.*) is not found(.*)");
}

public static boolean isWorkflowNotFoundException(final Throwable e) {
return e.getMessage().matches("(.*)Workflow(.*) not found(.*)");
}

public static boolean isAlertingConfigIndexNotFoundException(final Throwable e) {
return e.getMessage().contains("Configured indices are not found: [.opendistro-alerting-config]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,50 @@ public void testUpdateADetectorWithIndexNotExists() throws IOException {
}
}

public void testDisableEnableADetectorWithWorkflowNotExists() throws IOException {
final String index = createTestIndex(randomIndex(), windowsIndexMapping());

// Execute CreateMappingsAction to add alias mapping for index
final Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI);
// both req params and req body are supported
createMappingRequest.setJsonEntity(
"{ \"index_name\":\"" + index + "\"," +
" \"rule_topic\":\"" + randomDetectorType() + "\", " +
" \"partial\":true" +
"}"
);

final Response createMappingResponse = client().performRequest(createMappingRequest);
assertEquals(HttpStatus.SC_OK, createMappingResponse.getStatusLine().getStatusCode());

final Detector detector = randomDetector(getRandomPrePackagedRules());
final Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse));

final Map<String, Object> createResponseAsMap = asMap(createResponse);
final String detectorId = createResponseAsMap.get("_id").toString();

final Map<String, Object> detectorSourceAsMap = getDetectorSourceAsMap(detectorId);
final String workflowId = ((List<String>) detectorSourceAsMap.get("workflow_ids")).get(0);

final Response deleteWorkflowResponse = deleteAlertingWorkflow(workflowId);
assertEquals(200, deleteWorkflowResponse.getStatusLine().getStatusCode());
entityAsMap(deleteWorkflowResponse);

detector.setEnabled(false);
Response updateResponse = makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals(200, updateResponse.getStatusLine().getStatusCode());

try {
detector.setEnabled(true);
makeRequest(client(), "PUT", SecurityAnalyticsPlugin.DETECTOR_BASE_URI + "/" + detectorId, Collections.emptyMap(), toHttpEntity(detector));
} catch (ResponseException ex) {
Assert.assertEquals(400, ex.getResponse().getStatusLine().getStatusCode());
Assert.assertEquals(true, ex.getMessage().contains(String.format("Underlying workflow associated with detector %s not found. " +
"Delete and recreate the detector to restore functionality.", detector.getName())));
}
}

@SuppressWarnings("unchecked")
public void testDeletingADetector_single_ruleTopicIndex() throws IOException {
String index = createTestIndex(randomIndex(), windowsIndexMapping());
Expand Down
Loading

0 comments on commit 901eb26

Please sign in to comment.