From 046bb2ead3d7ab8fb8d103a00fac6a696e110264 Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Tue, 7 Nov 2023 18:42:43 -0800 Subject: [PATCH] Revert "make threat intel async (#703) (#704)" This reverts commit 5b4ab6c20755ece318ab5a7ba740c7b7f3ace952. --- .../DetectorThreatIntelService.java | 9 + .../ThreatIntelFeedDataService.java | 137 ++----- .../action/ThreatIntelIndicesResponse.java | 43 --- .../action/TransportPutTIFJobAction.java | 35 +- .../jobscheduler/TIFJobParameterService.java | 47 +-- .../jobscheduler/TIFJobRunner.java | 40 +- .../jobscheduler/TIFJobUpdateService.java | 169 +++------ .../TransportIndexDetectorAction.java | 342 +++++++++--------- .../action/TransportPutTIFJobActionTests.java | 4 +- .../TIFJobParameterServiceTests.java | 4 +- .../jobscheduler/TIFJobRunnerTests.java | 4 +- .../TIFJobUpdateServiceTests.java | 4 +- 12 files changed, 304 insertions(+), 534 deletions(-) delete mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java index df4971b66..2565d8175 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java @@ -158,6 +158,7 @@ private static String constructId(Detector detector, String iocType) { /** Updates all detectors having threat intel detection enabled with the latest threat intel feed data*/ public void updateDetectorsWithLatestThreatIntelRules() { + try { QueryBuilder queryBuilder = QueryBuilders.nestedQuery("detector", QueryBuilders.boolQuery().must( @@ -167,6 +168,7 @@ public void updateDetectorsWithLatestThreatIntelRules() { SearchSourceBuilder ssb = searchRequest.source(); ssb.query(queryBuilder); ssb.size(9999); + CountDownLatch countDownLatch = new CountDownLatch(1); client.execute(SearchDetectorAction.INSTANCE, new SearchDetectorRequest(searchRequest), ActionListener.wrap(searchResponse -> { List detectors = getDetectors(searchResponse, xContentRegistry); @@ -179,15 +181,22 @@ public void updateDetectorsWithLatestThreatIntelRules() { ActionListener.wrap( indexDetectorResponse -> { log.debug("updated {} with latest threat intel info", indexDetectorResponse.getDetector().getId()); + countDownLatch.countDown(); }, e -> { log.error(() -> new ParameterizedMessage("Failed to update detector {} with latest threat intel info", detector.getId()), e); + countDownLatch.countDown(); })); } ); }, e -> { log.error("Failed to fetch detectors to update with threat intel queries.", e); + countDownLatch.countDown(); })); + countDownLatch.await(5, TimeUnit.MINUTES); + } catch (InterruptedException e) { + log.error(""); + } } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java index f37018ae5..40bc7bc53 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java @@ -9,15 +9,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; -import org.opensearch.OpenSearchStatusException; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -36,7 +33,6 @@ import org.opensearch.securityanalytics.model.ThreatIntelFeedData; import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; -import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; @@ -51,7 +47,6 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -107,17 +102,17 @@ public void getThreatIntelFeedData( String tifdIndex = getLatestIndexByCreationDate(); if (tifdIndex == null) { - createThreatIntelFeedData(listener); - } else { - SearchRequest searchRequest = new SearchRequest(tifdIndex); - searchRequest.source().size(9999); //TODO: convert to scroll - String finalTifdIndex = tifdIndex; - client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { - log.error(String.format( - "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); - listener.onFailure(e); - })); + createThreatIntelFeedData(); + tifdIndex = getLatestIndexByCreationDate(); } + SearchRequest searchRequest = new SearchRequest(tifdIndex); + searchRequest.source().size(9999); //TODO: convert to scroll + String finalTifdIndex = tifdIndex; + client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { + log.error(String.format( + "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); + listener.onFailure(e); + })); } catch (InterruptedException e) { log.error("Failed to get threat intel feed data", e); listener.onFailure(e); @@ -141,30 +136,15 @@ private String getLatestIndexByCreationDate() { * * @param indexName index name */ - public void createIndexIfNotExists(final String indexName, final ActionListener listener) { + public void createIndexIfNotExists(final String indexName) { if (clusterService.state().metadata().hasIndex(indexName) == true) { - listener.onResponse(new CreateIndexResponse(true, true, indexName)); return; } final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE) - .mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)); + .mapping(getIndexMapping()); StashedThreadContext.run( client, - () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { - @Override - public void onResponse(CreateIndexResponse response) { - if (response.isAcknowledged()) { - listener.onResponse(response); - } else { - onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }) + () -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) ); } @@ -179,8 +159,7 @@ public void parseAndSaveThreatIntelFeedDataCSV( final String indexName, final Iterator iterator, final Runnable renewLock, - final TIFMetadata tifMetadata, - final ActionListener listener + final TIFMetadata tifMetadata ) throws IOException { if (indexName == null || iterator == null || renewLock == null) { throw new IllegalArgumentException("Parameters cannot be null, failed to save threat intel feed data"); @@ -188,11 +167,8 @@ public void parseAndSaveThreatIntelFeedDataCSV( TimeValue timeout = clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT); Integer batchSize = clusterSettings.get(SecurityAnalyticsSettings.BATCH_SIZE); - - List bulkRequestList = new ArrayList<>(); - BulkRequest bulkRequest = new BulkRequest(); + final BulkRequest bulkRequest = new BulkRequest(); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - List tifdList = new ArrayList<>(); while (iterator.hasNext()) { CSVRecord record = iterator.next(); @@ -216,39 +192,10 @@ public void parseAndSaveThreatIntelFeedDataCSV( bulkRequest.add(indexRequest); if (bulkRequest.requests().size() == batchSize) { - bulkRequestList.add(bulkRequest); - bulkRequest = new BulkRequest(); - bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - } - } - bulkRequestList.add(bulkRequest); - - GroupedActionListener bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(Collection bulkResponses) { - int idx = 0; - for (BulkResponse response: bulkResponses) { - BulkRequest request = bulkRequestList.get(idx); - if (response.hasFailures()) { - throw new OpenSearchException( - "error occurred while ingesting threat intel feed data in {} with an error {}", - StringUtils.join(request.getIndices()), - response.buildFailureMessage() - ); - } - } - listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + saveTifds(bulkRequest, timeout); } - }, bulkRequestList.size()); - - for (int i = 0; i < bulkRequestList.size(); ++i) { - saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener); } + saveTifds(bulkRequest, timeout); renewLock.run(); } @@ -259,9 +206,19 @@ public static boolean isValidIp(String ip) { return matcher.matches(); } - public void saveTifds(BulkRequest bulkRequest, TimeValue timeout, ActionListener listener) { + public void saveTifds(BulkRequest bulkRequest, TimeValue timeout) { try { - StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener)); + BulkResponse response = StashedThreadContext.run(client, () -> { + return client.bulk(bulkRequest).actionGet(timeout); + }); + if (response.hasFailures()) { + throw new OpenSearchException( + "error occurred while ingesting threat intel feed data in {} with an error {}", + StringUtils.join(bulkRequest.getIndices()), + response.buildFailureMessage() + ); + } + bulkRequest.requests().clear(); } catch (OpenSearchException e) { log.error("failed to save threat intel feed data", e); } @@ -284,49 +241,31 @@ public void deleteThreatIntelDataIndex(final List indices) { ); } - StashedThreadContext.run( + AcknowledgedResponse response = StashedThreadContext.run( client, () -> client.admin() .indices() .prepareDelete(indices.toArray(new String[0])) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) - .execute(new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse response) { - if (response.isAcknowledged() == false) { - onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices))); - } - } - - @Override - public void onFailure(Exception e) { - log.error("unknown exception:", e); - } - }) + .execute() + .actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) ); + + if (response.isAcknowledged() == false) { + throw new OpenSearchException("failed to delete data[{}]", String.join(",", indices)); + } } - private void createThreatIntelFeedData(ActionListener> listener) throws InterruptedException { + private void createThreatIntelFeedData() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); client.execute( PutTIFJobAction.INSTANCE, new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), - new ActionListener<>() { + new ActionListener() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { log.debug("Acknowledged threat intel feed updater job created"); countDownLatch.countDown(); - String tifdIndex = getLatestIndexByCreationDate(); - - SearchRequest searchRequest = new SearchRequest(tifdIndex); - searchRequest.source().size(9999); //TODO: convert to scroll - String finalTifdIndex = tifdIndex; - client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { - log.error(String.format( - "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); - listener.onFailure(e); - })); } @Override diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java deleted file mode 100644 index 0d4e81546..000000000 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/ThreatIntelIndicesResponse.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.securityanalytics.threatIntel.action; - -import org.opensearch.core.action.ActionResponse; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.util.List; - -public class ThreatIntelIndicesResponse extends ActionResponse { - - private Boolean isAcknowledged; - - private List indices; - - public ThreatIntelIndicesResponse(Boolean isAcknowledged, List indices) { - super(); - this.isAcknowledged = isAcknowledged; - this.indices = indices; - } - - public ThreatIntelIndicesResponse(StreamInput sin) throws IOException { - this(sin.readBoolean(), sin.readStringList()); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(isAcknowledged); - out.writeStringCollection(indices); - } - - public Boolean isAcknowledged() { - return isAcknowledged; - } - - public List getIndices() { - return indices; - } -} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java index 393a0f102..1346da40c 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java @@ -7,7 +7,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchStatusException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.StepListener; import org.opensearch.action.index.IndexResponse; @@ -128,22 +127,12 @@ protected ActionListener postIndexingTifJobParameter( @Override public void onResponse(final IndexResponse indexResponse) { AtomicReference lockReference = new AtomicReference<>(lock); - createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), new ActionListener<>() { - @Override - public void onResponse(ThreatIntelIndicesResponse threatIntelIndicesResponse) { - if (threatIntelIndicesResponse.isAcknowledged()) { - lockService.releaseLock(lockReference.get()); - listener.onResponse(new AcknowledgedResponse(true)); - } else { - onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR)); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + try { + createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference)); + } finally { + lockService.releaseLock(lockReference.get()); + } + listener.onResponse(new AcknowledgedResponse(true)); } @Override @@ -160,26 +149,26 @@ public void onFailure(final Exception e) { }; } - protected void createThreatIntelFeedData(final TIFJobParameter tifJobParameter, final Runnable renewLock, final ActionListener listener) { + protected void createThreatIntelFeedData(final TIFJobParameter tifJobParameter, final Runnable renewLock) { if (TIFJobState.CREATING.equals(tifJobParameter.getState()) == false) { log.error("Invalid tifJobParameter state. Expecting {} but received {}", TIFJobState.CREATING, tifJobParameter.getState()); - markTIFJobAsCreateFailed(tifJobParameter, listener); + markTIFJobAsCreateFailed(tifJobParameter); return; } try { - tifJobUpdateService.createThreatIntelFeedData(tifJobParameter, renewLock, listener); + tifJobUpdateService.createThreatIntelFeedData(tifJobParameter, renewLock); } catch (Exception e) { log.error("Failed to create tifJobParameter for {}", tifJobParameter.getName(), e); - markTIFJobAsCreateFailed(tifJobParameter, listener); + markTIFJobAsCreateFailed(tifJobParameter); } } - private void markTIFJobAsCreateFailed(final TIFJobParameter tifJobParameter, final ActionListener listener) { + private void markTIFJobAsCreateFailed(final TIFJobParameter tifJobParameter) { tifJobParameter.getUpdateStats().setLastFailedAt(Instant.now()); tifJobParameter.setState(TIFJobState.CREATE_FAILED); try { - tifJobParameterService.updateJobSchedulerParameter(tifJobParameter, listener); + tifJobParameterService.updateJobSchedulerParameter(tifJobParameter); } catch (Exception e) { log.error("Failed to mark tifJobParameter state as CREATE_FAILED for {}", tifJobParameter.getName(), e); } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java index de9bb5365..640b3874b 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java @@ -7,17 +7,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchStatusException; +import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -32,7 +33,6 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.securityanalytics.SecurityAnalyticsPlugin; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; -import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; @@ -105,40 +105,19 @@ private String getIndexMapping() { /** * Update jobSchedulerParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} * @param jobSchedulerParameter the jobSchedulerParameter + * @return index response */ - public void updateJobSchedulerParameter(final TIFJobParameter jobSchedulerParameter, final ActionListener listener) { + public IndexResponse updateJobSchedulerParameter(final TIFJobParameter jobSchedulerParameter) { jobSchedulerParameter.setLastUpdateTime(Instant.now()); - StashedThreadContext.run(client, () -> { + return StashedThreadContext.run(client, () -> { try { - if (listener != null) { - client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) - .setId(jobSchedulerParameter.getName()) - .setOpType(DocWriteRequest.OpType.INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .execute(new ActionListener<>() { - @Override - public void onResponse(IndexResponse indexResponse) { - if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { - listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); - } else { - listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) - .setId(jobSchedulerParameter.getName()) - .setOpType(DocWriteRequest.OpType.INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .execute().actionGet(); - } + return client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) + .setId(jobSchedulerParameter.getName()) + .setOpType(DocWriteRequest.OpType.INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .execute() + .actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)); } catch (IOException e) { throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java index 13db6235d..e3500064f 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java @@ -9,7 +9,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.core.action.ActionListener; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -23,7 +22,6 @@ import java.time.Instant; import org.opensearch.securityanalytics.threatIntel.DetectorThreatIntelService; -import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; import org.opensearch.securityanalytics.threatIntel.common.TIFLockService; import org.opensearch.threadpool.ThreadPool; @@ -147,34 +145,22 @@ protected void updateJobParameter(final ScheduledJobParameter jobParameter, fina log.error("Invalid jobSchedulerParameter state. Expecting {} but received {}", TIFJobState.AVAILABLE, jobSchedulerParameter.getState()); jobSchedulerParameter.disable(); jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, null); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter); return; } - // create new TIF data and delete old ones - List oldIndices = new ArrayList<>(jobSchedulerParameter.getIndices()); - jobSchedulerUpdateService.createThreatIntelFeedData(jobSchedulerParameter, renewLock, new ActionListener<>() { - @Override - public void onResponse(ThreatIntelIndicesResponse response) { - if (response.isAcknowledged()) { - List newFeedIndices = response.getIndices(); - jobSchedulerUpdateService.deleteAllTifdIndices(oldIndices, newFeedIndices); - if (false == newFeedIndices.isEmpty()) { - detectorThreatIntelService.updateDetectorsWithLatestThreatIntelRules(); - } - } else { - log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName()); - jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, null); - } + try { + // create new TIF data and delete old ones + List oldIndices = new ArrayList<>(jobSchedulerParameter.getIndices()); + List newFeedIndices = jobSchedulerUpdateService.createThreatIntelFeedData(jobSchedulerParameter, renewLock); + jobSchedulerUpdateService.deleteAllTifdIndices(oldIndices, newFeedIndices); + if(false == newFeedIndices.isEmpty()) { + detectorThreatIntelService.updateDetectorsWithLatestThreatIntelRules(); } - - @Override - public void onFailure(Exception e) { - log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName(), e); - jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, null); - } - }); + } catch (Exception e) { + log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName(), e); + jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter); + } } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateService.java index 5c48ed8aa..3006285ad 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateService.java @@ -12,17 +12,11 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; -import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; -import org.opensearch.action.support.GroupedActionListener; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService; import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedParser; -import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; import org.opensearch.securityanalytics.threatIntel.common.TIFJobState; import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; import org.opensearch.securityanalytics.threatIntel.feedMetadata.BuiltInTIFMetadataLoader; @@ -31,12 +25,8 @@ import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class TIFJobUpdateService { private static final Logger log = LogManager.getLogger(TIFJobUpdateService.class); @@ -106,107 +96,55 @@ private List deleteIndices(final List indicesToDelete) { * * @param jobSchedulerParameter the jobSchedulerParameter * @param renewLock runnable to renew lock + * @throws IOException */ - public void createThreatIntelFeedData(final TIFJobParameter jobSchedulerParameter, final Runnable renewLock, final ActionListener listener) { + public List createThreatIntelFeedData(final TIFJobParameter jobSchedulerParameter, final Runnable renewLock) throws IOException { Instant startTime = Instant.now(); - List> tifMetadataList = new ArrayList<>(); - Map indexTIFMetadataMap = new HashMap<>(); - for (TIFMetadata tifMetadata: builtInTIFMetadataLoader.getTifMetadataList()) { - String indexName = jobSchedulerParameter.newIndexName(jobSchedulerParameter, tifMetadata); - tifMetadataList.add(new AbstractMap.SimpleEntry<>(jobSchedulerParameter, tifMetadata)); - indexTIFMetadataMap.put(indexName, tifMetadata); - } - - GroupedActionListener createdThreatIntelIndices = new GroupedActionListener<>( - new ActionListener<>() { - @Override - public void onResponse(Collection responses) { - try { - - int noOfUnprocessedResponses = 0; - for (CreateIndexResponse response: responses) { - String indexName = response.index(); - TIFMetadata tifMetadata = indexTIFMetadataMap.get(indexName); - if (tifMetadata.getFeedType().equals("csv")) { - ++noOfUnprocessedResponses; - } - } - GroupedActionListener saveThreatIntelFeedResponseListener = new GroupedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(Collection responses) { - List freshIndices = new ArrayList<>(); - for (ThreatIntelIndicesResponse response: responses) { - Boolean succeeded = false; - if (response.isAcknowledged()) { - String indexName = response.getIndices().get(0); - waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS); - freshIndices.add(indexName); - succeeded = true; - } - - if (!succeeded) { - log.error("Exception: failed to parse correct feed type"); - onFailure(new OpenSearchException("Exception: failed to parse correct feed type")); - } - } - - Instant endTime = Instant.now(); - updateJobSchedulerParameterAsSucceeded(freshIndices, jobSchedulerParameter, startTime, endTime, listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, noOfUnprocessedResponses); - - for (CreateIndexResponse response: responses) { - String indexName = response.index(); - TIFMetadata tifMetadata = indexTIFMetadataMap.get(indexName); - switch (tifMetadata.getFeedType()) { - case "csv": - try (CSVParser reader = ThreatIntelFeedParser.getThreatIntelFeedReaderCSV(tifMetadata)) { - CSVParser noHeaderReader = ThreatIntelFeedParser.getThreatIntelFeedReaderCSV(tifMetadata); - boolean notFound = true; - - while (notFound) { - CSVRecord hasHeaderRecord = reader.iterator().next(); - - //if we want to skip this line and keep iterating - if ((hasHeaderRecord.values().length ==1 && "".equals(hasHeaderRecord.values()[0])) || hasHeaderRecord.get(0).charAt(0) == '#' || hasHeaderRecord.get(0).charAt(0) == ' '){ - noHeaderReader.iterator().next(); - } else { // we found the first line that contains information - notFound = false; - } - } - if (tifMetadata.hasHeader()){ - threatIntelFeedDataService.parseAndSaveThreatIntelFeedDataCSV(indexName, reader.iterator(), renewLock, tifMetadata, saveThreatIntelFeedResponseListener); - } else { - threatIntelFeedDataService.parseAndSaveThreatIntelFeedDataCSV(indexName, noHeaderReader.iterator(), renewLock, tifMetadata, saveThreatIntelFeedResponseListener); - } - } - break; - default: - // if the feed type doesn't match any of the supporting feed types, throw an exception - } + List freshIndices = new ArrayList<>(); + for (TIFMetadata tifMetadata : builtInTIFMetadataLoader.getTifMetadataList()) { + String indexName = setupIndex(jobSchedulerParameter, tifMetadata); + + Boolean succeeded; + switch (tifMetadata.getFeedType()) { + case "csv": + try (CSVParser reader = ThreatIntelFeedParser.getThreatIntelFeedReaderCSV(tifMetadata)) { + CSVParser noHeaderReader = ThreatIntelFeedParser.getThreatIntelFeedReaderCSV(tifMetadata); + boolean notFound = true; + + while (notFound) { + CSVRecord hasHeaderRecord = reader.iterator().next(); + + //if we want to skip this line and keep iterating + if ((hasHeaderRecord.values().length ==1 && "".equals(hasHeaderRecord.values()[0])) || hasHeaderRecord.get(0).charAt(0) == '#' || hasHeaderRecord.get(0).charAt(0) == ' '){ + noHeaderReader.iterator().next(); + } else { // we found the first line that contains information + notFound = false; } - } catch (IOException ex) { - onFailure(ex); } + if (tifMetadata.hasHeader()){ + threatIntelFeedDataService.parseAndSaveThreatIntelFeedDataCSV(indexName, reader.iterator(), renewLock, tifMetadata); + } else { + threatIntelFeedDataService.parseAndSaveThreatIntelFeedDataCSV(indexName, noHeaderReader.iterator(), renewLock, tifMetadata); + } + succeeded = true; } + break; + default: + // if the feed type doesn't match any of the supporting feed types, throw an exception + succeeded = false; + } + waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS); - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, - tifMetadataList.size() - ); - - for (AbstractMap.SimpleEntry tifJobParameterTIFMetadataSimpleEntry : tifMetadataList) { - setupIndex(tifJobParameterTIFMetadataSimpleEntry.getKey(), tifJobParameterTIFMetadataSimpleEntry.getValue(), createdThreatIntelIndices); + if (!succeeded) { + log.error("Exception: failed to parse correct feed type"); + throw new OpenSearchException("Exception: failed to parse correct feed type"); + } + freshIndices.add(indexName); } + Instant endTime = Instant.now(); + updateJobSchedulerParameterAsSucceeded(freshIndices, jobSchedulerParameter, startTime, endTime); + return freshIndices; } // helper functions @@ -220,15 +158,14 @@ public void updateJobSchedulerParameterAsSucceeded( List indices, final TIFJobParameter jobSchedulerParameter, final Instant startTime, - final Instant endTime, - final ActionListener listener + final Instant endTime ) { jobSchedulerParameter.setIndices(indices); jobSchedulerParameter.getUpdateStats().setLastSucceededAt(endTime); jobSchedulerParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); jobSchedulerParameter.enable(); jobSchedulerParameter.setState(TIFJobState.AVAILABLE); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, listener); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter); log.info( "threat intel feed data creation succeeded for {} and took {} seconds", jobSchedulerParameter.getName(), @@ -243,24 +180,12 @@ public void updateJobSchedulerParameterAsSucceeded( * @param tifMetadata * @return new index name */ - private void setupIndex(final TIFJobParameter jobSchedulerParameter, TIFMetadata tifMetadata, ActionListener listener) { + private String setupIndex(final TIFJobParameter jobSchedulerParameter, TIFMetadata tifMetadata) { String indexName = jobSchedulerParameter.newIndexName(jobSchedulerParameter, tifMetadata); jobSchedulerParameter.getIndices().add(indexName); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, new ActionListener<>() { - @Override - public void onResponse(ThreatIntelIndicesResponse response) { - if (response.isAcknowledged()) { - threatIntelFeedDataService.createIndexIfNotExists(indexName, listener); - } else { - onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter); + threatIntelFeedDataService.createIndexIfNotExists(indexName); + return indexName; } /** diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index a403cf3ab..162530e0a 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -249,103 +249,87 @@ public void onFailure(Exception e) { }); } - private void createMonitorFromQueries(List> rulesById, Detector detector, ActionListener> listener, WriteRequest.RefreshPolicy refreshPolicy) { + private void createMonitorFromQueries(List> rulesById, Detector detector, ActionListener> listener, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( Collectors.toList()); List> bucketLevelRules = rulesById.stream().filter(it -> it.getRight().isAggregationRule()).collect( Collectors.toList()); - addThreatIntelBasedDocLevelQueries(detector, new ActionListener<>() { - @Override - public void onResponse(List dlqs) { - try { - List monitorRequests = new ArrayList<>(); + List monitorRequests = new ArrayList<>(); - if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { - monitorRequests.add(createDocLevelMonitorRequest(docLevelRules, dlqs != null ? dlqs : List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST)); - } + if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { + monitorRequests.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST)); + } - if (!bucketLevelRules.isEmpty()) { - StepListener> bucketLevelMonitorRequests = new StepListener<>(); - buildBucketLevelMonitorRequests(bucketLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST, bucketLevelMonitorRequests); - bucketLevelMonitorRequests.whenComplete(indexMonitorRequests -> { - monitorRequests.addAll(indexMonitorRequests); - // Do nothing if detector doesn't have any monitor - if (monitorRequests.isEmpty()) { - listener.onResponse(Collections.emptyList()); - return; - } + if (!bucketLevelRules.isEmpty()) { + StepListener> bucketLevelMonitorRequests = new StepListener<>(); + buildBucketLevelMonitorRequests(bucketLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST, bucketLevelMonitorRequests); + bucketLevelMonitorRequests.whenComplete(indexMonitorRequests -> { + monitorRequests.addAll(indexMonitorRequests); + // Do nothing if detector doesn't have any monitor + if (monitorRequests.isEmpty()) { + listener.onResponse(Collections.emptyList()); + return; + } - List monitorResponses = new ArrayList<>(); - StepListener addFirstMonitorStep = new StepListener(); - - // Indexing monitors in two steps in order to prevent all shards failed error from alerting - // https://github.com/opensearch-project/alerting/issues/646 - AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, addFirstMonitorStep); - addFirstMonitorStep.whenComplete(addedFirstMonitorResponse -> { - monitorResponses.add(addedFirstMonitorResponse); - - StepListener> indexMonitorsStep = new StepListener<>(); - indexMonitorsStep.whenComplete( - indexMonitorResponses -> saveWorkflow(rulesById, detector, indexMonitorResponses, refreshPolicy, listener), - e -> { - log.error("Failed to index the workflow", e); - listener.onFailure(e); - }); - - int numberOfUnprocessedResponses = monitorRequests.size() - 1; - if (numberOfUnprocessedResponses == 0) { - saveWorkflow(rulesById, detector, monitorResponses, refreshPolicy, listener); - } else { - // Saves the rest of the monitors and saves the workflow if supported - saveMonitors( - monitorRequests, - monitorResponses, - numberOfUnprocessedResponses, - indexMonitorsStep - ); - } - }, - e1 -> { - log.error("Failed to index doc level monitor in detector creation", e1); - listener.onFailure(e1); - } - ); - }, listener::onFailure); - } else { - // Failure if detector doesn't have any monitor - if (monitorRequests.isEmpty()) { - listener.onFailure(new OpenSearchStatusException("Detector cannot be created as no compatible rules were provided", RestStatus.BAD_REQUEST)); - return; - } + List monitorResponses = new ArrayList<>(); + StepListener addFirstMonitorStep = new StepListener(); + + // Indexing monitors in two steps in order to prevent all shards failed error from alerting + // https://github.com/opensearch-project/alerting/issues/646 + AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, addFirstMonitorStep); + addFirstMonitorStep.whenComplete(addedFirstMonitorResponse -> { + monitorResponses.add(addedFirstMonitorResponse); + + StepListener> indexMonitorsStep = new StepListener<>(); + indexMonitorsStep.whenComplete( + indexMonitorResponses -> saveWorkflow(rulesById, detector, indexMonitorResponses, refreshPolicy, listener), + e -> { + log.error("Failed to index the workflow", e); + listener.onFailure(e); + }); - List monitorResponses = new ArrayList<>(); - StepListener indexDocLevelMonitorStep = new StepListener(); - - // Indexing monitors in two steps in order to prevent all shards failed error from alerting - // https://github.com/opensearch-project/alerting/issues/646 - AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, indexDocLevelMonitorStep); - indexDocLevelMonitorStep.whenComplete(addedFirstMonitorResponse -> { - monitorResponses.add(addedFirstMonitorResponse); - saveWorkflow(rulesById, detector, monitorResponses, refreshPolicy, listener); - }, - e -> { - listener.onFailure(e); - } - ); - } - } catch (Exception ex) { - onFailure(ex); - } + int numberOfUnprocessedResponses = monitorRequests.size() - 1; + if (numberOfUnprocessedResponses == 0) { + saveWorkflow(rulesById, detector, monitorResponses, refreshPolicy, listener); + } else { + // Saves the rest of the monitors and saves the workflow if supported + saveMonitors( + monitorRequests, + monitorResponses, + numberOfUnprocessedResponses, + indexMonitorsStep + ); + } + }, + e1 -> { + log.error("Failed to index doc level monitor in detector creation", e1); + listener.onFailure(e1); + } + ); + }, listener::onFailure); + } else { + // Failure if detector doesn't have any monitor + if (monitorRequests.isEmpty()) { + listener.onFailure(new OpenSearchStatusException("Detector cannot be created as no compatible rules were provided", RestStatus.BAD_REQUEST)); + return; } - @Override - public void onFailure(Exception e) { - // not failing detector creation if any fatal exception occurs during doc level query creation from threat intel feed data - log.error("Failed to convert threat intel feed to. Proceeding with detector creation", e); - listener.onFailure(e); - } - }); + List monitorResponses = new ArrayList<>(); + StepListener indexDocLevelMonitorStep = new StepListener(); + + // Indexing monitors in two steps in order to prevent all shards failed error from alerting + // https://github.com/opensearch-project/alerting/issues/646 + AlertingPluginInterface.INSTANCE.indexMonitor((NodeClient) client, monitorRequests.get(0), namedWriteableRegistry, indexDocLevelMonitorStep); + indexDocLevelMonitorStep.whenComplete(addedFirstMonitorResponse -> { + monitorResponses.add(addedFirstMonitorResponse); + saveWorkflow(rulesById, detector, monitorResponses, refreshPolicy, listener); + }, + e -> { + listener.onFailure(e); + } + ); + } } private void saveMonitors( @@ -421,104 +405,93 @@ private void updateMonitorFromQueries(String index, List> rul List> bucketLevelRules = rulesById.stream().filter(it -> it.getRight().isAggregationRule()).collect( Collectors.toList()); + List monitorsToBeAdded = new ArrayList<>(); + // Process bucket level monitors + if (!bucketLevelRules.isEmpty()) { + logTypeService.getRuleFieldMappings(new ActionListener<>() { + @Override + public void onResponse(Map> ruleFieldMappings) { + try { + List ruleCategories = bucketLevelRules.stream().map(Pair::getRight).map(Rule::getCategory).distinct().collect( + Collectors.toList()); + Map queryBackendMap = new HashMap<>(); + for (String category : ruleCategories) { + Map fieldMappings = ruleFieldMappings.get(category); + queryBackendMap.put(category, new OSQueryBackend(fieldMappings, true, true)); + } - addThreatIntelBasedDocLevelQueries(detector, new ActionListener<>() { - @Override - public void onResponse(List docLevelQueries) { - List monitorsToBeAdded = new ArrayList<>(); - // Process bucket level monitors - if (!bucketLevelRules.isEmpty()) { - logTypeService.getRuleFieldMappings(new ActionListener<>() { - @Override - public void onResponse(Map> ruleFieldMappings) { - try { - List ruleCategories = bucketLevelRules.stream().map(Pair::getRight).map(Rule::getCategory).distinct().collect( - Collectors.toList()); - Map queryBackendMap = new HashMap<>(); - for (String category : ruleCategories) { - Map fieldMappings = ruleFieldMappings.get(category); - queryBackendMap.put(category, new OSQueryBackend(fieldMappings, true, true)); - } - - // Pair of RuleId - MonitorId for existing monitors of the detector - Map monitorPerRule = detector.getRuleIdMonitorIdMap(); - - for (Pair query : bucketLevelRules) { - Rule rule = query.getRight(); - if (rule.getAggregationQueries() != null) { - // Detect if the monitor should be added or updated - if (monitorPerRule.containsKey(rule.getId())) { - String monitorId = monitorPerRule.get(rule.getId()); - monitorsToBeUpdated.add(createBucketLevelMonitorRequest(query.getRight(), - detector, - refreshPolicy, - monitorId, - Method.PUT, - queryBackendMap.get(rule.getCategory()))); - } else { - monitorsToBeAdded.add(createBucketLevelMonitorRequest(query.getRight(), - detector, - refreshPolicy, - Monitor.NO_ID, - Method.POST, - queryBackendMap.get(rule.getCategory()))); - } - } - } - - List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( - Collectors.toList()); - - // Process doc level monitors - if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { - if (detector.getDocLevelMonitorId() == null) { - monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST)); - } else { - monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT)); - } + // Pair of RuleId - MonitorId for existing monitors of the detector + Map monitorPerRule = detector.getRuleIdMonitorIdMap(); + + for (Pair query : bucketLevelRules) { + Rule rule = query.getRight(); + if (rule.getAggregationQueries() != null) { + // Detect if the monitor should be added or updated + if (monitorPerRule.containsKey(rule.getId())) { + String monitorId = monitorPerRule.get(rule.getId()); + monitorsToBeUpdated.add(createBucketLevelMonitorRequest(query.getRight(), + detector, + refreshPolicy, + monitorId, + Method.PUT, + queryBackendMap.get(rule.getCategory()))); + } else { + monitorsToBeAdded.add(createBucketLevelMonitorRequest(query.getRight(), + detector, + refreshPolicy, + Monitor.NO_ID, + Method.POST, + queryBackendMap.get(rule.getCategory()))); } + } + } - List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); - monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( - Collectors.toList())); + List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( + Collectors.toList()); - updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); - } catch (Exception ex) { - listener.onFailure(ex); + // Process doc level monitors + if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { + if (detector.getDocLevelMonitorId() == null) { + monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST)); + } else { + monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT)); } } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( - Collectors.toList()); + List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); + monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( + Collectors.toList())); - // Process doc level monitors - if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { - if (detector.getDocLevelMonitorId() == null) { - monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, Monitor.NO_ID, Method.POST)); - } else { - monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, docLevelQueries != null? docLevelQueries: List.of(), detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT)); - } + updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); + } catch (Exception ex) { + listener.onFailure(ex); } + } - List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); - monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( - Collectors.toList())); + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } else { + List> docLevelRules = rulesById.stream().filter(it -> !it.getRight().isAggregationRule()).collect( + Collectors.toList()); - updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); + // Process doc level monitors + if (!docLevelRules.isEmpty() || detector.getThreatIntelEnabled()) { + if (detector.getDocLevelMonitorId() == null) { + monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST)); + } else { + monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT)); } } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + List monitorIdsToBeDeleted = detector.getRuleIdMonitorIdMap().values().stream().collect(Collectors.toList()); + monitorIdsToBeDeleted.removeAll(monitorsToBeUpdated.stream().map(IndexMonitorRequest::getMonitorId).collect( + Collectors.toList())); + + updateAlertingMonitors(rulesById, detector, monitorsToBeAdded, monitorsToBeUpdated, monitorIdsToBeDeleted, refreshPolicy, listener); + } } /** @@ -663,7 +636,7 @@ public void onFailure(Exception e) { } } - private IndexMonitorRequest createDocLevelMonitorRequest(List> queries, List threatIntelQueries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) { + private IndexMonitorRequest createDocLevelMonitorRequest(List> queries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) { List docLevelMonitorInputs = new ArrayList<>(); List docLevelQueries = new ArrayList<>(); @@ -684,7 +657,7 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List DocLevelQuery docLevelQuery = new DocLevelQuery(id, name, Collections.emptyList(), actualQuery, tags); docLevelQueries.add(docLevelQuery); } - docLevelQueries.addAll(threatIntelQueries); + addThreatIntelBasedDocLevelQueries(detector, docLevelQueries); DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput(detector.getName(), detector.getInputs().get(0).getIndices(), docLevelQueries); docLevelMonitorInputs.add(docLevelMonitorInput); @@ -715,24 +688,37 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null); } - private void addThreatIntelBasedDocLevelQueries(Detector detector, ActionListener> listener) { + private void addThreatIntelBasedDocLevelQueries(Detector detector, List docLevelQueries) { try { if (detector.getThreatIntelEnabled()) { log.debug("threat intel enabled for detector {} . adding threat intel based doc level queries.", detector.getName()); List iocFieldsList = logTypeService.getIocFieldsList(detector.getDetectorType()); if (iocFieldsList == null || iocFieldsList.isEmpty()) { - listener.onResponse(List.of()); + } else { - detectorThreatIntelService.createDocLevelQueryFromThreatIntel(iocFieldsList, detector, listener); + CountDownLatch countDownLatch = new CountDownLatch(1); + detectorThreatIntelService.createDocLevelQueryFromThreatIntel(iocFieldsList, detector, new ActionListener<>() { + @Override + public void onResponse(List dlqs) { + if (dlqs != null) + docLevelQueries.addAll(dlqs); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + // not failing detector creation if any fatal exception occurs during doc level query creation from threat intel feed data + log.error("Failed to convert threat intel feed to. Proceeding with detector creation", e); + countDownLatch.countDown(); + } + }); + countDownLatch.await(); } - } else { - listener.onResponse(List.of()); } } catch (Exception e) { // not failing detector creation if any fatal exception occurs during doc level query creation from threat intel feed data log.error("Failed to convert threat intel feed to doc level query. Proceeding with detector creation", e); - listener.onFailure(e); } } diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobActionTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobActionTests.java index 27a01f5c0..68dcbf527 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobActionTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobActionTests.java @@ -24,7 +24,7 @@ import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; -/*public class TransportPutTIFJobActionTests extends ThreatIntelTestCase { +public class TransportPutTIFJobActionTests extends ThreatIntelTestCase { private TransportPutTIFJobAction action; @Before @@ -158,4 +158,4 @@ public void testCreateTIFJobParameter_whenValidInput_thenUpdateStateAsCreating() verify(tifJobUpdateService).createThreatIntelFeedData(tifJob, renewLock); assertEquals(TIFJobState.CREATING, tifJob.getState()); } -}*/ +} diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterServiceTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterServiceTests.java index c8d004d03..6e3b83a78 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterServiceTests.java @@ -34,7 +34,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -/*public class TIFJobParameterServiceTests extends ThreatIntelTestCase { +public class TIFJobParameterServiceTests extends ThreatIntelTestCase { private TIFJobParameterService tifJobParameterService; @Before @@ -204,4 +204,4 @@ private BytesReference toBytesReference(TIFJobParameter tifJobParameter) { } } -}*/ +} diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunnerTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunnerTests.java index 71bd68c61..82038a91f 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunnerTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunnerTests.java @@ -24,7 +24,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; -/*public class TIFJobRunnerTests extends ThreatIntelTestCase { +public class TIFJobRunnerTests extends ThreatIntelTestCase { @Before public void init() { TIFJobRunner.getJobRunnerInstance() @@ -164,5 +164,5 @@ public void testUpdateTIFJobExceptionHandling() throws IOException { assertNotNull(tifJob.getUpdateStats().getLastFailedAt()); verify(tifJobParameterService).updateJobSchedulerParameter(tifJob); } -}*/ +} diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateServiceTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateServiceTests.java index 218793787..76b0f8fe4 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobUpdateServiceTests.java @@ -20,7 +20,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; -/*@SuppressForbidden(reason = "unit test") +@SuppressForbidden(reason = "unit test") public class TIFJobUpdateServiceTests extends ThreatIntelTestCase { private TIFJobUpdateService tifJobUpdateService1; @@ -49,4 +49,4 @@ public void testUpdateOrCreateThreatIntelFeedData_whenValidInput_thenSucceed() t assertNotNull(newFeeds); } -}*/ +}