From 4fd974c332d8167108204bd00c767b2069ed2eb5 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 24 May 2024 16:14:41 -0700 Subject: [PATCH] ioc scan business logic --- .../SecurityAnalyticsPlugin.java | 19 +- .../logtype/BuiltinLogTypeLoader.java | 6 +- .../IocMatch.java} | 16 +- .../monitor/IocScanMonitorFanOutAction.java | 19 - .../IndexThreatIntelMonitorRequest.java | 12 +- .../iocscan/dao/IocMatchService.java | 155 ++++++++ .../iocscan/dto/IocScanContext.java | 55 +++ .../iocscan/service/IoCScanService.java | 213 +++++++++++ .../service/IoCScanServiceInterface.java | 16 + .../iocscan/service/SaIoCScanService.java | 268 +++++++++++++ ...ner.java => ThreatIntelMonitorRunner.java} | 16 +- ...portRemoteDocLevelMonitorFanOutAction.java | 97 ----- ...ansportThreatIntelMonitorFanOutAction.java | 361 ++++++++++++++++++ .../service/SATIFSourceConfigService.java | 166 ++++---- ...ransportIndexThreatIntelMonitorAction.java | 22 +- .../util/ThreatIntelMonitorUtils.java | 16 +- .../transport/TransportListIOCsAction.java | 4 +- .../SecurityAnalyticsIntegTestCase.java | 23 ++ .../securityanalytics/TestHelpers.java | 12 +- ...{IoCMatchTests.java => IocMatchTests.java} | 15 +- .../ThreatIntelMonitorRestApiIT.java | 12 +- .../iocscan/dao/IocMatchServiceIT.java | 67 ++++ .../model/monitor/ThreatIntelInputTests.java | 2 +- 23 files changed, 1307 insertions(+), 285 deletions(-) rename src/main/java/org/opensearch/securityanalytics/model/{IoCMatch.java => threatintel/IocMatch.java} (95%) delete mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/IocScanMonitorFanOutAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java rename src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/{SampleRemoteDocLevelMonitorRunner.java => ThreatIntelMonitorRunner.java} (60%) delete mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportRemoteDocLevelMonitorFanOutAction.java create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportThreatIntelMonitorFanOutAction.java create mode 100644 src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsIntegTestCase.java rename src/test/java/org/opensearch/securityanalytics/model/{IoCMatchTests.java => IocMatchTests.java} (89%) create mode 100644 src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index a3164bb7c..3091db490 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -125,12 +125,13 @@ import org.opensearch.securityanalytics.threatIntel.action.monitor.SearchThreatIntelMonitorAction; import org.opensearch.securityanalytics.threatIntel.common.TIFLockService; import org.opensearch.securityanalytics.threatIntel.feedMetadata.BuiltInTIFMetadataLoader; +import org.opensearch.securityanalytics.threatIntel.iocscan.service.SaIoCScanService; import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobRunner; import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFSourceConfigRunner; import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig; +import org.opensearch.securityanalytics.threatIntel.model.monitor.ThreatIntelMonitorRunner; +import org.opensearch.securityanalytics.threatIntel.model.monitor.TransportThreatIntelMonitorFanOutAction; import org.opensearch.securityanalytics.threatIntel.resthandler.RestDeleteTIFSourceConfigAction; -import org.opensearch.securityanalytics.threatIntel.model.monitor.SampleRemoteDocLevelMonitorRunner; -import org.opensearch.securityanalytics.threatIntel.model.monitor.TransportRemoteDocLevelMonitorFanOutAction; import org.opensearch.securityanalytics.threatIntel.resthandler.RestGetTIFSourceConfigAction; import org.opensearch.securityanalytics.threatIntel.resthandler.RestIndexTIFSourceConfigAction; import org.opensearch.securityanalytics.threatIntel.resthandler.RestSearchTIFSourceConfigsAction; @@ -196,7 +197,7 @@ import static org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig.FEED_SOURCE_CONFIG_FIELD; import static org.opensearch.securityanalytics.threatIntel.model.TIFJobParameter.THREAT_INTEL_DATA_INDEX_NAME_PREFIX; -import static org.opensearch.securityanalytics.threatIntel.model.monitor.SampleRemoteDocLevelMonitorRunner.THREAT_INTEL_MONITOR_TYPE; +import static org.opensearch.securityanalytics.threatIntel.model.monitor.ThreatIntelMonitorRunner.THREAT_INTEL_MONITOR_TYPE; public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, MapperPlugin, SearchPlugin, EnginePlugin, ClusterPlugin, SystemIndexPlugin, JobSchedulerExtension, RemoteMonitorRunnerExtension { @@ -248,12 +249,11 @@ public class SecurityAnalyticsPlugin extends Plugin implements ActionPlugin, Map private SATIFSourceConfigService saTifSourceConfigService; @Override - public Collection getSystemIndexDescriptors(Settings settings){ + public Collection getSystemIndexDescriptors(Settings settings) { return Collections.singletonList(new SystemIndexDescriptor(THREAT_INTEL_DATA_INDEX_NAME_PREFIX, "System index used for threat intel data")); } - @Override public Collection createComponents(Client client, ClusterService clusterService, @@ -289,11 +289,12 @@ public Collection createComponents(Client client, SecurityAnalyticsRunner.getJobRunnerInstance(); TIFSourceConfigRunner.getJobRunnerInstance().initialize(clusterService, threatIntelLockService, threadPool, saTifSourceConfigManagementService, saTifSourceConfigService); TIFJobRunner.getJobRunnerInstance().initialize(clusterService, tifJobUpdateService, tifJobParameterService, threatIntelLockService, threadPool, detectorThreatIntelService); - + SaIoCScanService ioCScanService = new SaIoCScanService(client, xContentRegistry); return List.of( detectorIndices, correlationIndices, correlationRuleIndices, ruleTopicIndices, customLogTypeIndices, ruleIndices, mapperService, indexTemplateManager, builtinLogTypeLoader, builtInTIFMetadataLoader, threatIntelFeedDataService, detectorThreatIntelService, - tifJobUpdateService, tifJobParameterService, threatIntelLockService, saTifSourceConfigService, saTifSourceConfigManagementService, stix2IOCFetchService); + tifJobUpdateService, tifJobParameterService, threatIntelLockService, saTifSourceConfigService, saTifSourceConfigManagementService, stix2IOCFetchService, + ioCScanService); } @Override @@ -484,7 +485,7 @@ public List> getSettings() { new ActionHandler<>(SAGetTIFSourceConfigAction.INSTANCE, TransportGetTIFSourceConfigAction.class), new ActionHandler<>(SADeleteTIFSourceConfigAction.INSTANCE, TransportDeleteTIFSourceConfigAction.class), new ActionHandler<>(SASearchTIFSourceConfigsAction.INSTANCE, TransportSearchTIFSourceConfigsAction.class), - new ActionHandler<>(SampleRemoteDocLevelMonitorRunner.REMOTE_DOC_LEVEL_MONITOR_ACTION_INSTANCE, TransportRemoteDocLevelMonitorFanOutAction.class), + new ActionHandler<>(ThreatIntelMonitorRunner.REMOTE_DOC_LEVEL_MONITOR_ACTION_INSTANCE, TransportThreatIntelMonitorFanOutAction.class), new ActionHandler<>(ListIOCsAction.INSTANCE, TransportListIOCsAction.class) ); } @@ -509,7 +510,7 @@ public void onFailure(Exception e) { @Override public Map getMonitorTypesToMonitorRunners() { return Map.of( - THREAT_INTEL_MONITOR_TYPE, SampleRemoteDocLevelMonitorRunner.getMonitorRunner() + THREAT_INTEL_MONITOR_TYPE, ThreatIntelMonitorRunner.getMonitorRunner() ); } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/logtype/BuiltinLogTypeLoader.java b/src/main/java/org/opensearch/securityanalytics/logtype/BuiltinLogTypeLoader.java index 0d28bce4d..20cc34ffd 100644 --- a/src/main/java/org/opensearch/securityanalytics/logtype/BuiltinLogTypeLoader.java +++ b/src/main/java/org/opensearch/securityanalytics/logtype/BuiltinLogTypeLoader.java @@ -10,6 +10,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -69,8 +70,11 @@ public void ensureLogTypesLoaded() { private List loadBuiltinLogTypes() throws URISyntaxException, IOException { List logTypes = new ArrayList<>(); - final String url = Objects.requireNonNull(BuiltinLogTypeLoader.class.getClassLoader().getResource(BASE_PATH)).toURI().toString(); + String pathurl = Paths.get(BuiltinLogTypeLoader.class.getClassLoader().getResource(BASE_PATH).toURI()).toString(); + final String url = Objects.requireNonNull(BuiltinLogTypeLoader.class.getClassLoader().getResource(BASE_PATH)).toURI().toString(); + logger.error("SASHANK Path url is {}", pathurl); + logger.error("SASHANK currently used url is {}", url); Path dirPath = null; if (url.contains("!")) { final String[] paths = url.split("!"); diff --git a/src/main/java/org/opensearch/securityanalytics/model/IoCMatch.java b/src/main/java/org/opensearch/securityanalytics/model/threatintel/IocMatch.java similarity index 95% rename from src/main/java/org/opensearch/securityanalytics/model/IoCMatch.java rename to src/main/java/org/opensearch/securityanalytics/model/threatintel/IocMatch.java index 04f54699f..037541741 100644 --- a/src/main/java/org/opensearch/securityanalytics/model/IoCMatch.java +++ b/src/main/java/org/opensearch/securityanalytics/model/threatintel/IocMatch.java @@ -1,4 +1,4 @@ -package org.opensearch.securityanalytics.model; +package org.opensearch.securityanalytics.model.threatintel; import org.apache.commons.lang3.StringUtils; import org.opensearch.core.common.io.stream.StreamInput; @@ -20,7 +20,7 @@ * IoC Match provides mapping of the IoC Value to the list of docs that contain the ioc in a given execution of IoC_Scan_job * It's the inverse of an IoC finding which maps a document to list of IoC's */ -public class IoCMatch implements Writeable, ToXContent { +public class IocMatch implements Writeable, ToXContent { //TODO implement IoC_Match interface from security-analytics-commons public static final String ID_FIELD = "id"; public static final String RELATED_DOC_IDS_FIELD = "related_doc_ids"; @@ -42,7 +42,7 @@ public class IoCMatch implements Writeable, ToXContent { private final Instant timestamp; private final String executionId; - public IoCMatch(String id, List relatedDocIds, List feedIds, String iocScanJobId, + public IocMatch(String id, List relatedDocIds, List feedIds, String iocScanJobId, String iocScanJobName, String iocValue, String iocType, Instant timestamp, String executionId) { validateIoCMatch(id, iocScanJobId, iocScanJobName, iocValue, timestamp, executionId, relatedDocIds); this.id = id; @@ -56,7 +56,7 @@ public IoCMatch(String id, List relatedDocIds, List feedIds, Str this.executionId = executionId; } - public IoCMatch(StreamInput in) throws IOException { + public IocMatch(StreamInput in) throws IOException { id = in.readString(); relatedDocIds = in.readStringList(); feedIds = in.readStringList(); @@ -133,7 +133,7 @@ public String getExecutionId() { return executionId; } - public static IoCMatch parse(XContentParser xcp) throws IOException { + public static IocMatch parse(XContentParser xcp) throws IOException { String id = null; List relatedDocIds = new ArrayList<>(); List feedIds = new ArrayList<>(); @@ -197,11 +197,11 @@ public static IoCMatch parse(XContentParser xcp) throws IOException { } } - return new IoCMatch(id, relatedDocIds, feedIds, iocScanJobId, iocScanName, iocValue, iocType, timestamp, executionId); + return new IocMatch(id, relatedDocIds, feedIds, iocScanJobId, iocScanName, iocValue, iocType, timestamp, executionId); } - public static IoCMatch readFrom(StreamInput in) throws IOException { - return new IoCMatch(in); + public static IocMatch readFrom(StreamInput in) throws IOException { + return new IocMatch(in); } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/IocScanMonitorFanOutAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/IocScanMonitorFanOutAction.java deleted file mode 100644 index eb3665992..000000000 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/IocScanMonitorFanOutAction.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.opensearch.securityanalytics.threatIntel.action.monitor; - -import org.opensearch.action.ActionType; -import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse; -import org.opensearch.core.common.io.stream.Writeable; - -/** - * Ioc Scan Monitor fan out action that distributes the monitor runner logic to mutliple data node. - */ -public class IocScanMonitorFanOutAction extends ActionType { - /** - * @param name The name of the action, must be unique across actions. - * @param docLevelMonitorFanOutResponseReader A reader for the response type - */ - public IocScanMonitorFanOutAction(String name, Writeable.Reader docLevelMonitorFanOutResponseReader) { - super(name, docLevelMonitorFanOutResponseReader); - } - -} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/request/IndexThreatIntelMonitorRequest.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/request/IndexThreatIntelMonitorRequest.java index 64d4a433b..7f7205c5f 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/request/IndexThreatIntelMonitorRequest.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/monitor/request/IndexThreatIntelMonitorRequest.java @@ -16,13 +16,13 @@ public class IndexThreatIntelMonitorRequest extends ActionRequest implements Ind private final String id; private final RestRequest.Method method; - private final ThreatIntelMonitorDto threatIntelMonitor; + private final ThreatIntelMonitorDto monitor; - public IndexThreatIntelMonitorRequest(String id, RestRequest.Method method, ThreatIntelMonitorDto threatIntelMonitor) { + public IndexThreatIntelMonitorRequest(String id, RestRequest.Method method, ThreatIntelMonitorDto monitor) { super(); this.id = id; this.method = method; - this.threatIntelMonitor = threatIntelMonitor; + this.monitor = monitor; } public IndexThreatIntelMonitorRequest(StreamInput sin) throws IOException { @@ -37,7 +37,7 @@ public IndexThreatIntelMonitorRequest(StreamInput sin) throws IOException { public void writeTo(StreamOutput out) throws IOException { out.writeString(id); out.writeEnum(method); - threatIntelMonitor.writeTo(out); + monitor.writeTo(out); } @Override @@ -53,7 +53,7 @@ public RestRequest.Method getMethod() { return method; } - public ThreatIntelMonitorDto getThreatIntelMonitor() { - return threatIntelMonitor; + public ThreatIntelMonitorDto getMonitor() { + return monitor; } } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java new file mode 100644 index 000000000..9bf92a985 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchService.java @@ -0,0 +1,155 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.dao; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.securityanalytics.SecurityAnalyticsPlugin; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; +import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; +import org.opensearch.threadpool.ThreadPool; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Data layer to perform CRUD operations for threat intel ioc match : store in system index. + */ +public class IocMatchService { + //TODO manage index rollover + public static final String INDEX_NAME = ".opensearch-sap-iocmatch"; + private static final Logger log = LogManager.getLogger(IocMatchService.class); + private final Client client; + private final ClusterService clusterService; + + public IocMatchService(final Client client, final ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + public void indexIocMatches(List iocMatches, + final ActionListener actionListener) { + try { + Integer batchSize = this.clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE); + createIndexIfNotExists(ActionListener.wrap( + r -> { + List bulkRequestList = new ArrayList<>(); + BulkRequest bulkRequest = new BulkRequest(INDEX_NAME); + for (int i = 0; i < iocMatches.size(); i++) { + IocMatch iocMatch = iocMatches.get(i); + try { + IndexRequest indexRequest = new IndexRequest(INDEX_NAME) + .source(iocMatch.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .opType(DocWriteRequest.OpType.CREATE); + bulkRequest.add(indexRequest); + if ( + bulkRequest.requests().size() == batchSize + && i != iocMatches.size() - 1 // final bulk request will be added outside for loop with refresh policy none + ) { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); + bulkRequestList.add(bulkRequest); + bulkRequest = new BulkRequest(); + } + } catch (IOException e) { + log.error(String.format("Failed to create index request for ioc match %s moving on to next"), e); + } + } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + bulkRequestList.add(bulkRequest); + GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> { + int idx = 0; + for (BulkResponse response : bulkResponses) { + BulkRequest request = bulkRequestList.get(idx); + if (response.hasFailures()) { + log.error("Failed to bulk index {} Ioc Matches. Failure: {}", request.batchSize(), response.buildFailureMessage()); + } + } + actionListener.onResponse(null); + }, actionListener::onFailure), bulkRequestList.size()); + for (BulkRequest req : bulkRequestList) { + try { + client.bulk(req, groupedListener); //todo why stash context here? + } catch (Exception e) { + log.error("Failed to save ioc matches.", e); + } + } + }, e -> { + log.error("Failed to create System Index"); + actionListener.onFailure(e); + })); + + + } catch (Exception e) { + log.error("Exception saving the threat intel source config in index", e); + actionListener.onFailure(e); + } + } + + private String getIndexMapping() { + try { + try (InputStream is = IocMatchService.class.getResourceAsStream("/mappings/ioc_match_mapping.json")) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().map(String::trim).collect(Collectors.joining()); + } + } + } catch (IOException e) { + log.error("Failed to get the threat intel ioc match index mapping", e); + throw new SecurityAnalyticsException("Failed to get the threat intel ioc match index mapping", RestStatus.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Index name: .opensearch-sap-iocmatch + * Mapping: /mappings/ioc_match_mapping.json + * + * @param listener setup listener + */ + public void createIndexIfNotExists(final ActionListener listener) { + // check if job index exists + try { + if (clusterService.state().metadata().hasIndex(INDEX_NAME) == true) { + listener.onResponse(null); + return; + } + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(INDEX_NAME).mapping(getIndexMapping()) + .settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING); + client.admin().indices().create(createIndexRequest, ActionListener.wrap( + r -> { + log.debug("Ioc match index created"); + listener.onResponse(null); + }, e -> { + if (e instanceof ResourceAlreadyExistsException) { + log.debug("index {} already exist", INDEX_NAME); + listener.onResponse(null); + return; + } + log.error("Failed to create security analytics threat intel job index", e); + listener.onFailure(e); + } + )); + } catch (Exception e) { + log.error("Failure in creating ioc_match index", e); + listener.onFailure(e); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java new file mode 100644 index 000000000..18aeca3f5 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dto/IocScanContext.java @@ -0,0 +1,55 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.dto; + +import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.commons.alerting.model.MonitorMetadata; +import org.opensearch.securityanalytics.threatIntel.model.monitor.ThreatIntelInput; + +import java.util.List; +import java.util.Map; + +public class IocScanContext { + private final Monitor monitor; + private final MonitorMetadata monitorMetadata; + private final boolean dryRun; + private final List data; + private final ThreatIntelInput threatIntelInput; // deserialize threat intel input + private final List indices; // user's log data indices + private final Map> iocTypeToIndices; + public IocScanContext(Monitor monitor, MonitorMetadata monitorMetadata, boolean dryRun, List data, ThreatIntelInput threatIntelInput, List indices, Map> iocTypeToIndices) { + this.monitor = monitor; + this.monitorMetadata = monitorMetadata; + this.dryRun = dryRun; + this.data = data; + this.threatIntelInput = threatIntelInput; + this.indices = indices; + this.iocTypeToIndices = iocTypeToIndices; + } + + public Monitor getMonitor() { + return monitor; + } + + public boolean isDryRun() { + return dryRun; + } + + public List getData() { + return data; + } + + public MonitorMetadata getMonitorMetadata() { + return monitorMetadata; + } + + public ThreatIntelInput getThreatIntelInput() { + return threatIntelInput; + } + + public List getIndices() { + return indices; + } + + public Map> getIocTypeToIndices() { + return iocTypeToIndices; + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java new file mode 100644 index 000000000..9c3868e21 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanService.java @@ -0,0 +1,213 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.securityanalytics.model.STIX2IOC; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; +import org.opensearch.securityanalytics.threatIntel.iocscan.dto.IocScanContext; +import org.opensearch.securityanalytics.threatIntel.model.monitor.PerIocTypeScanInput; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; + + +public abstract class IoCScanService implements IoCScanServiceInterface { + private static final Logger log = LogManager.getLogger(IoCScanService.class); + + @Override + public void scanIoCs(IocScanContext iocScanContext, + BiConsumer scanCallback + ) { + + List data = iocScanContext.getData(); + if (data.isEmpty() == false) { + scanCallback.accept(Collections.emptyList(), null); + return; + } + Monitor monitor = iocScanContext.getMonitor(); + + long start = System.currentTimeMillis(); + // log.debug("beginning to scan IoC's") + IocLookupDtos iocLookupDtos = extractIocPerTypeSet(data, iocScanContext.getThreatIntelInput().getPerIocTypeScanInputList()); + BiConsumer, Exception> iocScanResultConsumer = (List maliciousIocs, Exception e) -> { + if (e == null) { + createIoCMatches(maliciousIocs, iocLookupDtos.iocValueToDocIdMap, iocScanContext, + new BiConsumer, Exception>() { + @Override + public void accept(List iocs, Exception e) { + // TODO create alerts + } + } + ); + + } else { + // onIocMatchFailure(e, iocScanMonitor); + + } + }; + matchAgainstThreatIntelAndReturnMaliciousIocs( + iocLookupDtos.getIocsPerIocTypeMap(), monitor, iocScanResultConsumer, iocScanContext.getIocTypeToIndices()); + } + + abstract void matchAgainstThreatIntelAndReturnMaliciousIocs( + Map> iocPerTypeSet, + Monitor iocScanMonitor, + BiConsumer, Exception> callback, Map> iocTypeToIndices); + + /** + * For each doc, we extract the list of + */ + private IocLookupDtos extractIocPerTypeSet(List data, List iocTypeToIndexFieldMappings) { + Map> iocsPerIocTypeMap = new HashMap<>(); + Map> iocValueToDocIdMap = new HashMap<>(); + Map> docIdToIocsMap = new HashMap<>(); + for (Data datum : data) { + for (PerIocTypeScanInput iocTypeToIndexFieldMapping : iocTypeToIndexFieldMappings) { + String iocType = iocTypeToIndexFieldMapping.getIocType(); + String index = getIndexName(datum); + List fields = iocTypeToIndexFieldMapping.getIndexToFieldsMap().get(index); + for (String field : fields) { + List vals = getValuesAsStringList(datum, field); + String id = getId(datum); + String docId = id + ":" + index; + Set iocs = docIdToIocsMap.getOrDefault(docIdToIocsMap.get(docId), new HashSet<>()); + iocs.addAll(vals); + docIdToIocsMap.put(docId, iocs); + for (String ioc : vals) { + Set docIds = iocValueToDocIdMap.getOrDefault(iocValueToDocIdMap.get(ioc), new HashSet<>()); + docIds.add(docId); + iocValueToDocIdMap.put(ioc, docIds); + } + if (false == vals.isEmpty()) { + iocs = iocsPerIocTypeMap.getOrDefault(iocType, new HashSet<>()); + iocs.addAll(vals); + iocsPerIocTypeMap.put(iocType, iocs); + } + } + } + } + return new IocLookupDtos(iocsPerIocTypeMap, iocValueToDocIdMap, docIdToIocsMap); + } + + abstract List getValuesAsStringList(Data datum, String field); + + abstract String getIndexName(Data datum); + + abstract String getId(Data datum); + + private void createIoCMatches(List iocs, + Map> iocValueToDocIdMap, + IocScanContext iocScanContext, + BiConsumer, Exception> callback) { + try { + Instant timestamp = Instant.now(); + Monitor monitor = iocScanContext.getMonitor(); + // Map to collect unique IocValue with their respective FeedIds + Map> iocValueToFeedIds = new HashMap<>(); + + for (STIX2IOC ioc : iocs) { + String iocValue = ioc.getValue(); + iocValueToFeedIds + .computeIfAbsent(iocValue, k -> new HashSet<>()) + .add(ioc.getFeedId()); + } + + List iocMatches = new ArrayList<>(); + + for (Map.Entry> entry : iocValueToFeedIds.entrySet()) { + String iocValue = entry.getKey(); + Set feedIds = entry.getValue(); + + List relatedDocIds = new ArrayList<>(iocValueToDocIdMap.getOrDefault(iocValue, new HashSet<>())); + List feedIdsList = new ArrayList<>(feedIds); + try { + IocMatch iocMatch = new IocMatch( + UUID.randomUUID().toString(), // Generating a unique ID + relatedDocIds, + feedIdsList, + monitor.getId(), + monitor.getName(), + iocValue, + iocs.stream().filter(i -> i.getValue().equals(iocValue)).findFirst().orElseThrow().getType().toString(), + timestamp, + UUID.randomUUID().toString() // TODO execution ID + ); + iocMatches.add(iocMatch); + } catch (Exception e) { + log.error(String.format("skipping creating ioc match for %s due to unexpected failure.", entry.getKey()), e); + } + } + saveIocs(iocs, callback); + } catch (Exception e) { + log.error(() -> new ParameterizedMessage("Failed to create ioc matches due to unexpected error {}", iocScanContext.getMonitor().getId()), e); + callback.accept(null, e); + } + } + + abstract void saveIocs(List iocs, BiConsumer, Exception> callback); + + private static class IocMatchDto { + private final String iocValue; + private final String iocType; + private final List iocs; + private final List docIdsContainingIoc; + + public IocMatchDto(String iocValue, String iocType, List iocs, List docIdsContainingIoc) { + this.iocValue = iocValue; + this.iocType = iocType; + this.iocs = iocs; + this.docIdsContainingIoc = docIdsContainingIoc; + } + + public String getIocValue() { + return iocValue; + } + + public String getIocType() { + return iocType; + } + + public List getIocs() { + return iocs; + } + + public List getDocIdsContainingIoc() { + return docIdsContainingIoc; + } + } + + private static class IocLookupDtos { + private final Map> iocsPerIocTypeMap; + private final Map> iocValueToDocIdMap; + private final Map> docIdToIocsMap; + + public IocLookupDtos(Map> iocsPerIocTypeMap, Map> iocValueToDocIdMap, Map> docIdToIocsMap) { + this.iocsPerIocTypeMap = iocsPerIocTypeMap; + this.iocValueToDocIdMap = iocValueToDocIdMap; + this.docIdToIocsMap = docIdToIocsMap; + } + + public Map> getIocsPerIocTypeMap() { + return iocsPerIocTypeMap; + } + + public Map> getIocValueToDocIdMap() { + return iocValueToDocIdMap; + } + + public Map> getDocIdToIocsMap() { + return docIdToIocsMap; + } + } + +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java new file mode 100644 index 000000000..0746eddf4 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/IoCScanServiceInterface.java @@ -0,0 +1,16 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.service; + +import org.opensearch.commons.alerting.model.Finding; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; +import org.opensearch.securityanalytics.threatIntel.iocscan.dto.IocScanContext; + +import java.util.List; +import java.util.function.BiConsumer; + +public interface IoCScanServiceInterface { + + void scanIoCs( + IocScanContext iocScanContext, + BiConsumer scanCallback + ); +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java new file mode 100644 index 000000000..722fde197 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/service/SaIoCScanService.java @@ -0,0 +1,268 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.ShardSearchFailure; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.client.Client; +import org.opensearch.common.document.DocumentField; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.securityanalytics.commons.model.IOC; +import org.opensearch.securityanalytics.commons.model.STIX2; +import org.opensearch.securityanalytics.model.STIX2IOC; +import org.opensearch.securityanalytics.threatIntel.model.monitor.TransportThreatIntelMonitorFanOutAction.SearchHitsOrException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyList; + +public class SaIoCScanService extends IoCScanService { + + private static final Logger log = LogManager.getLogger(SaIoCScanService.class); + public static final int MAX_TERMS = 65536; //make ioc index setting based. use same setting value to create index + private final Client client; + private final NamedXContentRegistry xContentRegistry; + + public SaIoCScanService(Client client, NamedXContentRegistry xContentRegistry) { + this.client = client; + this.xContentRegistry = xContentRegistry; + } + + @Override + void matchAgainstThreatIntelAndReturnMaliciousIocs( + Map> iocsPerType, + Monitor monitor, + BiConsumer, Exception> callback, + Map> iocTypeToIndices) { + long startTime = System.currentTimeMillis(); + int numIocs = iocsPerType.values().stream().mapToInt(Set::size).sum(); + GroupedActionListener groupedListenerForAllIocTypes = getGroupedListenerForIocScanFromAllIocTypes(iocsPerType, monitor, callback, startTime, numIocs); + for (String iocType : iocsPerType.keySet()) { + List indices = iocTypeToIndices.get(iocType); + Set iocs = iocsPerType.get(iocType); + if (iocTypeToIndices.containsKey(iocType)) { + if (indices.isEmpty()) { + log.debug( + "Threat intel monitor {} : No ioc indices of type {} found so no scan performed.", + monitor.getId(), + iocType + ); + groupedListenerForAllIocTypes.onResponse(new SearchHitsOrException(emptyList(), null)); + } else if (iocs.isEmpty()) { + log.debug( + "Threat intel monitor {} : No iocs of type {} found in user data so no scan performed.", + monitor.getId(), + iocType + ); + groupedListenerForAllIocTypes.onResponse(new SearchHitsOrException(emptyList(), null)); + } else { + performScanForMaliciousIocsPerIocType(indices, iocs, monitor, iocType, groupedListenerForAllIocTypes); + } + } else { + groupedListenerForAllIocTypes.onResponse(new SearchHitsOrException(emptyList(), null)); + } + } + } + + private GroupedActionListener getGroupedListenerForIocScanFromAllIocTypes(Map> iocsPerType, Monitor monitor, BiConsumer, Exception> callback, long startTime, int numIocs) { + return new GroupedActionListener<>( + ActionListener.wrap( + lists -> { + long endTime = System.currentTimeMillis(); + long timetaken = endTime - startTime; + log.debug("IOC_SCAN: Threat intel monitor {} completed Ioc match phase in {} millis for {} iocs", + monitor.getId(), timetaken, numIocs); + List hits = new ArrayList<>(); + lists.forEach(hitsOrException -> + hits.addAll(hitsOrException.getHits() == null ? + emptyList() : + hitsOrException.getHits())); + List iocs = new ArrayList<>(); + hits.forEach(hit -> { + try { + XContentParser xcp = XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + hit.getSourceAsString()); + xcp.nextToken(); + + STIX2IOC ioc = STIX2IOC.parse(xcp, hit.getId(), hit.getVersion()); + iocs.add(ioc); + } catch (Exception e) { + log.error(() -> new ParameterizedMessage( + "Failed to parse IOC doc from hit {} index {}", hit.getId(), hit.getIndex()), + e + ); + } + }); + callback.accept(iocs, null); + }, + e -> { + log.error("Threat intel monitor {} :Unexpected error while scanning data for malicious Iocs", e); + callback.accept(emptyList(), e); + } + ), + iocsPerType.size() + ); + } + + private void performScanForMaliciousIocsPerIocType( + List indices, + Set iocs, + Monitor monitor, + String iocType, + GroupedActionListener listener) { + // TODO change ioc indices max terms count to 100k and experiment + // TODO add fuzzy postings on ioc value field to enable bloomfilter on iocs as an index data structure and benchmark performance + GroupedActionListener perIocTypeListener = getGroupedListenerForIocScanPerIocType(iocs, monitor, iocType, listener); + List iocList = new ArrayList<>(iocs); + int totalIocs = iocList.size(); + + for (int start = 0; start < totalIocs; start += MAX_TERMS) { + int end = Math.min(start + MAX_TERMS, totalIocs); + List iocsSublist = iocList.subList(start, end); + SearchRequest searchRequest = getSearchRequestForIocType(indices, iocType, iocsSublist); + client.search(searchRequest, ActionListener.wrap( + searchResponse -> { + if (searchResponse.isTimedOut()) { + log.error("Threat intel monitor {} scan with {} user data indicators TIMED OUT for ioc Type {}", + monitor.getId(), + iocsSublist.size(), + iocType + ); + } + if (searchResponse.getFailedShards() > 0) { + for (ShardSearchFailure shardFailure : searchResponse.getShardFailures()) { + log.error("Threat intel monitor {} scan with {} user data indicators for ioc Type {} has Shard failures {}", + monitor.getId(), + iocsSublist.size(), + iocType, + shardFailure.toString() + ); + } + } + listener.onResponse(new SearchHitsOrException(searchResponse.getHits().getHits() == null ? + emptyList() : Arrays.asList(searchResponse.getHits().getHits()), null)); + }, + e -> { + log.error(() -> new ParameterizedMessage("Threat intel monitor {} scan with {} user data indicators failed for ioc Type {}", + monitor.getId(), + iocsSublist.size(), + iocType), e + ); + listener.onResponse(new SearchHitsOrException(emptyList(), e)); + } + )); + } + } + + private static SearchRequest getSearchRequestForIocType(List indices, String iocType, List iocsSublist) { + SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[0])); + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + // add the iocs sublist + boolQueryBuilder.must(new TermsQueryBuilder(STIX2.VALUE_FIELD, iocsSublist)); + // add ioc type filter + boolQueryBuilder.must(new TermsQueryBuilder(STIX2.TYPE_FIELD, iocType.toLowerCase())); + searchRequest.source().query(boolQueryBuilder); + return searchRequest; + } + + /** + * grouped listener for a given ioc type to listen and collate malicious iocs in search hits from batched search calls. + * batching done for every 65536 or MAX_TERMS setting number of iocs in a list. + */ + private GroupedActionListener getGroupedListenerForIocScanPerIocType(Set iocs, Monitor monitor, String iocType, GroupedActionListener groupedListenerForAllIocTypes) { + return new GroupedActionListener<>( + ActionListener.wrap( + (Collection searchHitsOrExceptions) -> { + if (false == searchHitsOrExceptions.stream().allMatch(shoe -> shoe.getException() != null)) { + List searchHits = new ArrayList<>(); + searchHitsOrExceptions.forEach(searchHitsOrException -> { + if (searchHitsOrException.getException() != null) { + log.error( + () -> new ParameterizedMessage( + "Threat intel monitor {}: Failed to perform ioc scan on one batch for ioc type : ", + monitor.getId(), iocType), searchHitsOrException.getException()); + } else { + searchHits.addAll(searchHitsOrException.getHits() != null ? + searchHitsOrException.getHits() : emptyList()); + } + }); + // we collect all hits we can and log all exceptions and submit to outer listener + groupedListenerForAllIocTypes.onResponse(new SearchHitsOrException(searchHits, null)); + } else { + // we collect all exceptions under one exception and respond to outer listener + groupedListenerForAllIocTypes.onResponse(new SearchHitsOrException(emptyList(), buildException(searchHitsOrExceptions)) + ); + } + }, e -> { + log.error( + () -> new ParameterizedMessage( + "Threat intel monitor {}: Failed to perform ioc scan for ioc type : ", + monitor.getId(), iocType), e); + groupedListenerForAllIocTypes.onResponse(new SearchHitsOrException(emptyList(), e)); + } + ), + //TODO fix groupsize + getGroupSizeForIocs(iocs) // batch into #MAX_TERMS setting + ); + } + + private Exception buildException(Collection searchHitsOrExceptions) { + Exception e = null; + for (SearchHitsOrException searchHitsOrException : searchHitsOrExceptions) { + if (e == null) + e = searchHitsOrException.getException(); + else { + e.addSuppressed(searchHitsOrException.getException()); + } + } + return e; + } + + private static int getGroupSizeForIocs(Set iocs) { + return iocs.size() / MAX_TERMS + (iocs.size() % MAX_TERMS == 0 ? 0 : 1); + } + + @Override + public List getValuesAsStringList(SearchHit hit, String field) { + if (hit.getFields().containsKey(field)) { + DocumentField documentField = hit.getFields().get(field); + return documentField.getValues().stream().filter(Objects::nonNull).map(Object::toString).collect(Collectors.toList()); + } else return emptyList(); + } + + @Override + public String getIndexName(SearchHit hit) { + return hit.getIndex(); + } + + @Override + public String getId(SearchHit hit) { + return hit.getId(); + } + + @Override + void saveIocs(List iocs, BiConsumer, Exception> callback) { + callback.accept(emptyList(), null); + } +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/SampleRemoteDocLevelMonitorRunner.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelMonitorRunner.java similarity index 60% rename from src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/SampleRemoteDocLevelMonitorRunner.java rename to src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelMonitorRunner.java index d7dd5b656..0407b14d8 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/SampleRemoteDocLevelMonitorRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelMonitorRunner.java @@ -5,34 +5,34 @@ import org.opensearch.alerting.spi.RemoteMonitorRunner; import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse; -public class SampleRemoteDocLevelMonitorRunner extends RemoteMonitorRunner { +public class ThreatIntelMonitorRunner extends RemoteMonitorRunner { public static final String THREAT_INTEL_MONITOR_ACTION_NAME = "cluster:admin/opensearch/security_analytics/threatIntel/monitor/fanout"; - public static final String REMOTE_DOC_LEVEL_MONITOR_ACTION_NAME = "cluster:admin/security_analytics/threatIntel/monitor/fanout"; + public static final String FAN_OUT_ACTION_NAME = "cluster:admin/security_analytics/threatIntel/monitor/fanout"; public static final String THREAT_INTEL_MONITOR_TYPE = "ti_doc_level_monitor"; public static final String SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX = ".opensearch-alerting-sample-remote-doc-level-monitor"; - public static final ActionType REMOTE_DOC_LEVEL_MONITOR_ACTION_INSTANCE = new ActionType<>(REMOTE_DOC_LEVEL_MONITOR_ACTION_NAME, + public static final ActionType REMOTE_DOC_LEVEL_MONITOR_ACTION_INSTANCE = new ActionType<>(FAN_OUT_ACTION_NAME, DocLevelMonitorFanOutResponse::new); - private static SampleRemoteDocLevelMonitorRunner INSTANCE; + private static ThreatIntelMonitorRunner INSTANCE; - public static SampleRemoteDocLevelMonitorRunner getMonitorRunner() { + public static ThreatIntelMonitorRunner getMonitorRunner() { if (INSTANCE != null) { return INSTANCE; } - synchronized (SampleRemoteDocLevelMonitorRunner.class) { + synchronized (ThreatIntelMonitorRunner.class) { if (INSTANCE != null) { return INSTANCE; } - INSTANCE = new SampleRemoteDocLevelMonitorRunner(); + INSTANCE = new ThreatIntelMonitorRunner(); return INSTANCE; } } @Override public String getFanOutAction() { - return REMOTE_DOC_LEVEL_MONITOR_ACTION_NAME; + return FAN_OUT_ACTION_NAME; } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportRemoteDocLevelMonitorFanOutAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportRemoteDocLevelMonitorFanOutAction.java deleted file mode 100644 index 72d0c9c2b..000000000 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportRemoteDocLevelMonitorFanOutAction.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.opensearch.securityanalytics.threatIntel.model.monitor; - -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.action.support.WriteRequest; -import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; -import org.opensearch.common.settings.Settings; -import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest; -import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse; -import org.opensearch.commons.alerting.model.DocLevelMonitorInput; -import org.opensearch.commons.alerting.model.InputRunResults; -import org.opensearch.commons.alerting.model.Monitor; -import org.opensearch.commons.alerting.model.Trigger; -import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput; -import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportService; - -import java.util.HashMap; -import java.util.Map; - -public class TransportRemoteDocLevelMonitorFanOutAction extends HandledTransportAction { - - private final ClusterService clusterService; - - private final Settings settings; - - private final Client client; - - private final NamedXContentRegistry xContentRegistry; - - @Inject - public TransportRemoteDocLevelMonitorFanOutAction( - TransportService transportService, - Client client, - NamedXContentRegistry xContentRegistry, - ClusterService clusterService, - Settings settings, - ActionFilters actionFilters - ) { - super(SampleRemoteDocLevelMonitorRunner.REMOTE_DOC_LEVEL_MONITOR_ACTION_NAME, transportService, actionFilters, DocLevelMonitorFanOutRequest::new); - this.clusterService = clusterService; - this.client = client; - this.xContentRegistry = xContentRegistry; - this.settings = settings; - } - - @Override - protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, ActionListener actionListener) { - try { - Monitor monitor = request.getMonitor(); - Map lastRunContext = request.getMonitorMetadata().getLastRunContext(); - - RemoteDocLevelMonitorInput input = (RemoteDocLevelMonitorInput) monitor.getInputs().get(0); - BytesReference customInputSerialized = input.getInput(); - StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes); - ThreatIntelInput sampleRemoteDocLevelMonitorInput = new ThreatIntelInput(sin); - DocLevelMonitorInput docLevelMonitorInput = input.getDocLevelMonitorInput(); - String index = docLevelMonitorInput.getIndices().get(0); - - - ((Map) lastRunContext.get(index)).put("0", 0); - IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX) - .source(Map.of()).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - this.client.index(indexRequest, new ActionListener<>() { - @Override - public void onResponse(IndexResponse indexResponse) { - DocLevelMonitorFanOutResponse response = new DocLevelMonitorFanOutResponse( - clusterService.localNode().getId(), - request.getExecutionId(), - monitor.getId(), - lastRunContext, - new InputRunResults(), - new HashMap<>(), - null - ); - actionListener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - actionListener.onFailure(e); - } - }); - } catch (Exception ex) { - actionListener.onFailure(ex); - } - } -} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportThreatIntelMonitorFanOutAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportThreatIntelMonitorFanOutAction.java new file mode 100644 index 000000000..81d2b7133 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/monitor/TransportThreatIntelMonitorFanOutAction.java @@ -0,0 +1,361 @@ +package org.opensearch.securityanalytics.threatIntel.model.monitor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Settings; +import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest; +import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse; +import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult; +import org.opensearch.commons.alerting.model.InputRunResults; +import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.commons.alerting.model.MonitorRunResult; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput; +import org.opensearch.commons.alerting.util.AlertingException; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.securityanalytics.threatIntel.iocscan.dto.IocScanContext; +import org.opensearch.securityanalytics.threatIntel.iocscan.service.SaIoCScanService; +import org.opensearch.securityanalytics.threatIntel.service.SATIFSourceConfigService; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.opensearch.securityanalytics.threatIntel.util.ThreatIntelMonitorUtils.getThreatIntelInputFromBytesReference; + +public class TransportThreatIntelMonitorFanOutAction extends HandledTransportAction { + private static final Logger log = LogManager.getLogger(TransportThreatIntelMonitorFanOutAction.class); + private final ClusterService clusterService; + + private final Settings settings; + private final SATIFSourceConfigService saTifSourceConfigService; + + private final Client client; + + private final NamedXContentRegistry xContentRegistry; + private final SaIoCScanService saIoCScanService; + + @Inject + public TransportThreatIntelMonitorFanOutAction( + TransportService transportService, + Client client, + NamedXContentRegistry xContentRegistry, + ClusterService clusterService, + Settings settings, + ActionFilters actionFilters, + SATIFSourceConfigService saTifSourceConfigService, + SaIoCScanService saIoCScanService + ) { + super(ThreatIntelMonitorRunner.FAN_OUT_ACTION_NAME, transportService, actionFilters, DocLevelMonitorFanOutRequest::new); + this.clusterService = clusterService; + this.client = client; + this.xContentRegistry = xContentRegistry; + this.settings = settings; + this.saTifSourceConfigService = saTifSourceConfigService; + this.saIoCScanService = saIoCScanService; + } + + @Override + protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, ActionListener actionListener) { + try { + Monitor monitor = request.getMonitor(); + MonitorRunResult monitorResult = new MonitorRunResult<>( + monitor.getName(), + Instant.now(), + Instant.now(), + null, + new InputRunResults(Collections.emptyList(), null, null), + Collections.emptyMap() + ); + + // fetch list of threat intel data containing indices per indicator type + saTifSourceConfigService.getIocTypeToIndices(ActionListener.wrap( + iocTypeToIndicesMap -> { + onGetIocTypeToIndices(iocTypeToIndicesMap, request, actionListener); + }, e -> { + log.error(() -> new ParameterizedMessage("Unexpected Failure in threat intel monitor {} fan out action", request.getMonitor().getId()), e); + actionListener.onResponse( + new DocLevelMonitorFanOutResponse( + clusterService.localNode().getId(), + request.getExecutionId(), + request.getMonitor().getId(), + request.getIndexExecutionContext().getUpdatedLastRunContext(), + new InputRunResults(Collections.emptyList(), null, null), + Collections.emptyMap(),//TODO trigger results, + new AlertingException("Fan action of threat intel monitor failed", RestStatus.INTERNAL_SERVER_ERROR, e) + ) + ); + } + )); + + } catch (Exception ex) { + log.error(() -> new ParameterizedMessage("Unexpected Failure in threat intel monitor {} fan out action", request.getMonitor().getId()), ex); + actionListener.onFailure(ex); + } + } + + private void onGetIocTypeToIndices(Map> iocTypeToIndicesMap, DocLevelMonitorFanOutRequest request, ActionListener actionListener) throws IOException { + RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = (RemoteDocLevelMonitorInput) request.getMonitor().getInputs().get(0); + List indices = remoteDocLevelMonitorInput.getDocLevelMonitorInput().getIndices(); + ThreatIntelInput threatIntelInput = getThreatIntelInputFromBytesReference(remoteDocLevelMonitorInput.getInput(), xContentRegistry); + // TODO update fanout request to add mapping of monitor.input.indices' index to concrete index name. + // right now we can't say which one of aliases/index pattern has resolved to this concrete index name + // + // Map> fieldsToFetchPerIndex = new HashMap<>(); alias -> fields mapping is given but we have concrete index name + List fieldsToFetch = new ArrayList<>(); + threatIntelInput.getPerIocTypeScanInputList().forEach(perIocTypeScanInput -> { + perIocTypeScanInput.getIndexToFieldsMap().values().forEach(fieldsToFetch::addAll); +// Map> indexToFieldsMapPerInput = perIocTypeScanInput.getIndexToFieldsMap(); +// for (String index : indexToFieldsMapPerInput.keySet()) { +// List strings = fieldsToFetchPerIndex.computeIfAbsent( +// perIocTypeScanInput.getIocType(), +// k -> new ArrayList<>()); +// strings.addAll(indexToFieldsMapPerInput.get(index)); +// } + }); + + // function passed to update last run context with new max sequence number +// Map updatedLastRunContext = request.getIndexExecutionContext().getUpdatedLastRunContext(); + Map updatedLastRunContext = request.getMonitorMetadata().getLastRunContext(); + BiConsumer lastRunContextUpdateConsumer = (shardId, value) -> { + String indexName = shardId.getIndexName(); + if (updatedLastRunContext.containsKey(indexName)) { + HashMap context = (HashMap) updatedLastRunContext.putIfAbsent(indexName, new HashMap()); + context.put(String.valueOf(shardId.getId()), value); + } else { + log.error("monitor metadata for threat intel monitor {} expected to contain last run context for index {}", + request.getMonitor().getId(), indexName); + } + }; + ActionListener> searchHitsListener = ActionListener.wrap( + (List hits) -> { + BiConsumer resultConsumer = (r, e) -> { + if (e == null) { + actionListener.onResponse( + new DocLevelMonitorFanOutResponse( + clusterService.localNode().getId(), + request.getExecutionId(), + request.getMonitor().getId(), + updatedLastRunContext, + new InputRunResults(Collections.emptyList(), null, null), + Collections.emptyMap(),//TODO trigger results, + null + ) + ); + } else { + actionListener.onFailure(e); + } + }; + saIoCScanService.scanIoCs(new IocScanContext( + request.getMonitor(), + request.getMonitorMetadata(), + false, + hits, + threatIntelInput, + indices, + iocTypeToIndicesMap + ), resultConsumer); + }, + e -> { + log.error("unexpected error while", e); + actionListener.onFailure(e); + } + ); + fetchDataFromShards(request, + fieldsToFetch, + lastRunContextUpdateConsumer, + searchHitsListener); + } + + private void fetchDataFromShards( + DocLevelMonitorFanOutRequest request, + List fieldsToFetch, + BiConsumer updateLastRunContext, + ActionListener> searchHitsListener) { + if (request.getShardIds().isEmpty()) + return; + GroupedActionListener searchHitsFromAllShardsListener = new GroupedActionListener<>( + ActionListener.wrap( + searchHitsOrExceptionCollection -> { + List hits = new ArrayList<>(); + for (SearchHitsOrException searchHitsOrException : searchHitsOrExceptionCollection) { + if (searchHitsOrException.exception == null) { + hits.addAll(searchHitsOrException.hits); + } // else not logging exception as groupedListener onResponse() will log error message + } + searchHitsListener.onResponse(hits); + }, e -> { + log.error("unexpected failure while fetch documents for threat intel monitor " + request.getMonitor().getId(), e); + searchHitsListener.onResponse(Collections.emptyList()); + } + ), request.getShardIds().size() + ); + for (ShardId shardId : request.getShardIds()) { + int shard = shardId.getId(); + + Map lastRunContext = request.getMonitorMetadata().getLastRunContext(); + Long prevSeqNo = lastRunContext.get(shard) != null ? Long.parseLong(lastRunContext.get(shard).toString()) : null; + long fromSeqNo = prevSeqNo != null ? prevSeqNo : SequenceNumbers.NO_OPS_PERFORMED; + long toSeqNo = Long.MAX_VALUE; + fetchLatestDocsFromShard(shardId, fromSeqNo, toSeqNo, new ArrayList<>(), request.getMonitor(), lastRunContext, updateLastRunContext, fieldsToFetch, searchHitsFromAllShardsListener); + } + } + + /** + * recursive function to keep fetching docs in batches of 10000 per search request. all docs with seq_no greater than + * the last seen seq_no are fetched in descending order of sequence number. + */ + + private void fetchLatestDocsFromShard( + ShardId shardId, + long fromSeqNo, long toSeqNo, List searchHitsSoFar, Monitor monitor, + Map lastRunContext, + BiConsumer updateLastRunContext, + List fieldsToFetch, + GroupedActionListener listener) { + + String shard = shardId.getId() + ""; + try { + if (toSeqNo < fromSeqNo || toSeqNo < 0) { + listener.onResponse(new SearchHitsOrException(searchHitsSoFar, null)); + return; + } + Long prevSeqNo = lastRunContext.get(shard) != null ? Long.parseLong(lastRunContext.get(shard).toString()) : null; + if (toSeqNo >= fromSeqNo) { + + searchShard( + shardId.getIndexName(), + shard, + fromSeqNo, + toSeqNo, + Collections.emptyList(), + fieldsToFetch, + ActionListener.wrap( + hits -> { + if (hits.getHits().length == 0) { + if (toSeqNo == Long.MAX_VALUE) { // didn't find any docs + updateLastRunContext.accept(shardId, prevSeqNo != null ? prevSeqNo.toString() : SequenceNumbers.NO_OPS_PERFORMED + ""); + } + listener.onResponse(new SearchHitsOrException(searchHitsSoFar, null)); + return; + } + searchHitsSoFar.addAll(Arrays.asList(hits.getHits())); + if (toSeqNo == Long.MAX_VALUE) { // max sequence number of shard needs to be computed + updateLastRunContext.accept(shardId, String.valueOf(hits.getHits()[0].getSeqNo())); + } + + long leastSeqNoFromHits = hits.getHits()[hits.getHits().length - 1].getSeqNo(); + long updateToSeqNo = leastSeqNoFromHits - 1; + // recursive call to fetch docs with updated seq no. + fetchLatestDocsFromShard(shardId, fromSeqNo, updateToSeqNo, searchHitsSoFar, monitor, lastRunContext, updateLastRunContext, fieldsToFetch, listener); + }, e -> { + log.error(() -> new ParameterizedMessage("Threat intel Monitor {}: Failed to search shard {} in index {}", monitor.getId(), shard, shardId.getIndexName()), e); + listener.onResponse(new SearchHitsOrException(searchHitsSoFar, e)); + } + ) + ); + } + } catch (Exception e) { + log.error(() -> new ParameterizedMessage("threat intel Monitor {}: Failed to run fetch data from shard [{}] of index [{}]", + monitor.getId(), shardId, shardId.getIndexName()), e); + listener.onResponse(new SearchHitsOrException(searchHitsSoFar, e)); + } + } + + public void searchShard( + String index, + String shard, + Long prevSeqNo, + long maxSeqNo, + List docIds, + List fieldsToFetch, + ActionListener listener) { + + if (prevSeqNo != null && prevSeqNo.equals(maxSeqNo) && maxSeqNo != 0L) { + log.debug("Seqquence number unchanged."); + listener.onResponse(SearchHits.empty()); + } + + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)); + + if (docIds != null && !docIds.isEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)); + } + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) + .query(boolQueryBuilder) + .size(10000); + + if (!fieldsToFetch.isEmpty()) { + searchSourceBuilder.fetchSource(false); + for (String field : fieldsToFetch) { + searchSourceBuilder.fetchField(field); + } + } + + SearchRequest request = new SearchRequest() + .indices(index) + .preference("_shards:" + shard) + .source(searchSourceBuilder); + + client.search(request, ActionListener.wrap( + response -> { + if (response.status() != RestStatus.OK) { + log.error("Fetching docs from shard failed"); + throw new IOException("Failed to search shard: [" + shard + "] in index [" + index + "]. Response status is " + response.status()); + } + listener.onResponse(response.getHits()); + }, + listener::onFailure // exception logged in invoker method + )); + + } + + public static class SearchHitsOrException { + private final List hits; + private final Exception exception; + + public SearchHitsOrException(List hits, Exception exception) { + assert hits == null || hits.isEmpty() || exception == null; // just a verification that only one of the variables is non-null + this.hits = hits; + this.exception = exception; + } + + public List getHits() { + return hits; + } + + public Exception getException() { + return exception; + } + } +} \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java index 4105c2fc9..6fb0904c9 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java @@ -20,7 +20,6 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; -import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; @@ -32,17 +31,17 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.jobscheduler.spi.LockModel; -import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.search.fetch.subphase.FetchSourceContext; +import org.opensearch.search.SearchHit; import org.opensearch.securityanalytics.SecurityAnalyticsPlugin; -import org.opensearch.securityanalytics.threatIntel.action.monitor.SearchThreatIntelMonitorAction; -import org.opensearch.securityanalytics.threatIntel.action.monitor.request.SearchThreatIntelMonitorRequest; import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; import org.opensearch.securityanalytics.threatIntel.common.TIFLockService; import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig; +import org.opensearch.securityanalytics.util.DetectorUtils; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; import org.opensearch.threadpool.ThreadPool; @@ -51,11 +50,17 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.stream.Collectors; import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.INDEX_TIMEOUT; -import static org.opensearch.securityanalytics.transport.TransportIndexDetectorAction.PLUGIN_OWNER_FIELD; +import static org.opensearch.securityanalytics.threatIntel.common.TIFJobState.AVAILABLE; +import static org.opensearch.securityanalytics.threatIntel.common.TIFJobState.REFRESHING; +import static org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig.STATE_FIELD; /** * CRUD for threat intel feeds source config object @@ -127,10 +132,8 @@ private static SATIFSourceConfig createSATIFSourceConfig(SATIFSourceConfig SaTif SaTifSourceConfig.getName(), SaTifSourceConfig.getFeedFormat(), SaTifSourceConfig.getFeedType(), - SaTifSourceConfig.getDescription(), SaTifSourceConfig.getCreatedByUser(), SaTifSourceConfig.getCreatedAt(), - SaTifSourceConfig.getSource(), SaTifSourceConfig.getEnabledTime(), SaTifSourceConfig.getLastUpdateTime(), SaTifSourceConfig.getSchedule(), @@ -139,7 +142,7 @@ private static SATIFSourceConfig createSATIFSourceConfig(SATIFSourceConfig SaTif SaTifSourceConfig.getLastRefreshedTime(), SaTifSourceConfig.getLastRefreshedUser(), SaTifSourceConfig.isEnabled(), - SaTifSourceConfig.getIocStoreConfig(), + SaTifSourceConfig.getIocMapStore(), SaTifSourceConfig.getIocTypes() ); } @@ -221,32 +224,6 @@ public void getTIFSourceConfig( ); } - public void searchTIFSourceConfigs( - final SearchRequest searchRequest, - final ActionListener actionListener - ) { - try { - client.search(searchRequest, ActionListener.wrap( - searchResponse -> { - if (searchResponse.isTimedOut()) { - actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException("Search threat intel source configs request timed out", RestStatus.REQUEST_TIMEOUT))); - return; - } - - log.debug("Fetched all threat intel source configs successfully."); - actionListener.onResponse(searchResponse); - }, e -> { - log.error("Failed to fetch all threat intel source configs for search request [{}]", searchRequest, e); - actionListener.onFailure(e); - }) - ); - } catch (Exception e) { - log.error("Failed to fetch all threat intel source configs for search request [{}]", searchRequest, e); - actionListener.onFailure(e); - } - } - - // Update TIF source config public void updateTIFSourceConfig( SATIFSourceConfig SaTifSourceConfig, @@ -284,7 +261,7 @@ public void deleteTIFSourceConfig( client.delete(request, ActionListener.wrap( deleteResponse -> { if (deleteResponse.status().equals(RestStatus.OK)) { - log.debug("Deleted threat intel source config [{}] successfully", SaTifSourceConfig.getId()); + log.info("Deleted threat intel source config [{}] successfully", SaTifSourceConfig.getId()); actionListener.onResponse(deleteResponse); } else if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) { throw SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Threat intel source config with id [{%s}] not found", SaTifSourceConfig.getId()), RestStatus.NOT_FOUND)); @@ -298,71 +275,68 @@ public void deleteTIFSourceConfig( )); } - public void checkAndEnsureThreatIntelMonitorsDeleted( - ActionListener listener - ) { - // TODO: change this to use search source configs API call - SearchRequest searchRequest = new SearchRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME) - .source(new SearchSourceBuilder() - .seqNoAndPrimaryTerm(false) - .version(false) - .query(QueryBuilders.matchAllQuery()) - .fetchSource(FetchSourceContext.FETCH_SOURCE) - ).preference(Preference.PRIMARY_FIRST.type()); - - // Search if there is only one threat intel source config left - client.search(searchRequest, ActionListener.wrap( - saTifSourceConfigResponse -> { - if (saTifSourceConfigResponse.getHits().getHits().length <= 1) { - String alertingConfigIndex = ".opendistro-alerting-config"; - if (clusterService.state().metadata().hasIndex(alertingConfigIndex) == false) { - log.debug("[{}] index does not exist, continuing deleting threat intel source config", alertingConfigIndex); - listener.onResponse(true); - } else { - // Search alerting config index for at least one threat intel monitor - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .seqNoAndPrimaryTerm(false) - .version(false) - .query(QueryBuilders.matchAllQuery()) - .fetchSource(FetchSourceContext.FETCH_SOURCE); - - SearchRequest newSearchRequest = new SearchRequest(); - newSearchRequest.source(searchSourceBuilder); - newSearchRequest.indices(alertingConfigIndex); - newSearchRequest.preference(Preference.PRIMARY_FIRST.type()); - - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery().must(newSearchRequest.source().query()); - BoolQueryBuilder bqb = new BoolQueryBuilder(); - bqb.should().add(new BoolQueryBuilder().must(QueryBuilders.matchQuery("monitor.owner", PLUGIN_OWNER_FIELD))); - boolQueryBuilder.filter(bqb); - newSearchRequest.source().query(boolQueryBuilder); // remove this once logic is moved to transport layer - - client.execute(SearchThreatIntelMonitorAction.INSTANCE, new SearchThreatIntelMonitorRequest(newSearchRequest), ActionListener.wrap( - response -> { - if (response.getHits().getHits().length == 0) { - log.debug("All threat intel monitors are deleted, continuing deleting threat intel source config"); - listener.onResponse(true); - } else { - log.error("All threat intel monitors need to be deleted before deleting threat intel source config"); - listener.onResponse(false); - } - }, e -> { - log.error("Failed to search for threat intel monitors"); - listener.onFailure(e); - } - )); + public void getIocTypeToIndices(ActionListener>> listener) { + SearchRequest searchRequest = new SearchRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME); + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.termsQuery(STATE_FIELD, List.of(REFRESHING.toString(), AVAILABLE.toString()))); + searchRequest.source().query(queryBuilder); + searchTIFSourceConfigs(searchRequest, ActionListener.wrap( + searchResponse -> { + Map> cumulativeIocTypeToIndices = new HashMap<>(); + for (SearchHit hit : searchResponse.getHits().getHits()) { + XContentParser xcp = XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()); + SATIFSourceConfig config = SATIFSourceConfig.parse(xcp, hit.getId(), hit.getVersion()); + /* + Todo DefaultIOCStoreConfig iocStoreConfig = (DefaultIOCStoreConfig) SaTifSourceConfigDto.getIocStoreConfig(); + Map> iocTypeToIndices = iocStoreConfig.getIocMapStore() + */ + //todo replace with above. why is it not type based casting and only default. need a switch case util method? + Map> iocTypeToIndices = new HashMap<>(); + for (String iocType : iocTypeToIndices.keySet()) { + if (iocTypeToIndices.get(iocType).isEmpty()) + continue; + List strings = cumulativeIocTypeToIndices.computeIfAbsent(iocType, k -> new ArrayList<>()); + strings.addAll(iocTypeToIndices.get(iocType)); } - } else { - // If there are multiple threat intel source configs left, proceed with deletion - log.debug("Multiple threat intel source configs exist, threat intel monitors do not need to be deleted"); - listener.onResponse(true); } - }, e -> { - log.error("Failed to search for threat intel source configs"); + listener.onResponse(cumulativeIocTypeToIndices); + }, + e -> { + log.error("Failed to fetch ioc indices", e); listener.onFailure(e); } )); - } + public void searchTIFSourceConfigs( + final SearchRequest searchRequest, + final ActionListener actionListener + ) { + try { + client.search(searchRequest, ActionListener.wrap( + searchResponse -> { + if (searchResponse.isTimedOut()) { + actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException("Search threat intel source configs request timed out", RestStatus.REQUEST_TIMEOUT))); + return; + } + + log.debug("Fetched all threat intel source configs successfully."); + actionListener.onResponse(searchResponse); + }, e -> { + if(e instanceof IndexNotFoundException) { + log.info("TIF config Index not created. Returning empty search response"); + actionListener.onResponse(DetectorUtils.getEmptySearchResponse()); + return; + } + log.error("Failed to fetch all threat intel source configs", e); + actionListener.onFailure(e); + }) + ); + } catch (Exception e) { + log.error("Failed to fetch all threat intel source configs ", e); + actionListener.onFailure(e); + } + } } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/monitor/TransportIndexThreatIntelMonitorAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/monitor/TransportIndexThreatIntelMonitorAction.java index 7a0cb390f..ac92e1c65 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/monitor/TransportIndexThreatIntelMonitorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/transport/monitor/TransportIndexThreatIntelMonitorAction.java @@ -49,7 +49,7 @@ import java.util.List; import java.util.stream.Collectors; -import static org.opensearch.securityanalytics.threatIntel.model.monitor.SampleRemoteDocLevelMonitorRunner.THREAT_INTEL_MONITOR_TYPE; +import static org.opensearch.securityanalytics.threatIntel.model.monitor.ThreatIntelMonitorRunner.THREAT_INTEL_MONITOR_TYPE; import static org.opensearch.securityanalytics.transport.TransportIndexDetectorAction.PLUGIN_OWNER_FIELD; public class TransportIndexThreatIntelMonitorAction extends HandledTransportAction implements SecureTransportAction { @@ -109,7 +109,7 @@ protected void doExecute(Task task, IndexThreatIntelMonitorRequest request, Acti } )); } catch (Exception e) { - log.error(() -> new ParameterizedMessage("Unexpected failure while indexing threat intel monitor {} named {}", request.getId(), request.getThreatIntelMonitor().getName())); + log.error(() -> new ParameterizedMessage("Unexpected failure while indexing threat intel monitor {} named {}", request.getId(), request.getMonitor().getName())); listener.onFailure(new SecurityAnalyticsException("Unexpected failure while indexing threat intel monitor", RestStatus.INTERNAL_SERVER_ERROR, e)); } } @@ -136,11 +136,11 @@ private IndexMonitorRequest buildIndexMonitorRequest(IndexThreatIntelMonitorRequ private Monitor buildThreatIntelMonitor(IndexThreatIntelMonitorRequest request) throws IOException { //TODO replace with threat intel monitor DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput( - String.format("threat intel input for monitor named %s", request.getThreatIntelMonitor().getName()), - request.getThreatIntelMonitor().getIndices(), + String.format("threat intel input for monitor named %s", request.getMonitor().getName()), + request.getMonitor().getIndices(), Collections.emptyList() // no percolate queries ); - List perIocTypeScanInputs = request.getThreatIntelMonitor().getPerIocTypeScanInputList().stream().map( + List perIocTypeScanInputs = request.getMonitor().getPerIocTypeScanInputList().stream().map( it -> new PerIocTypeScanInput(it.getIocType(), it.getIndexToFieldsMap()) ).collect(Collectors.toList()); ThreatIntelInput threatIntelInput = new ThreatIntelInput(perIocTypeScanInputs); @@ -148,7 +148,7 @@ private Monitor buildThreatIntelMonitor(IndexThreatIntelMonitorRequest request) threatIntelInput.getThreatIntelInputAsBytesReference(), docLevelMonitorInput); List triggers = new ArrayList<>(); - for (ThreatIntelTriggerDto it : request.getThreatIntelMonitor().getTriggers()) { + for (ThreatIntelTriggerDto it : request.getMonitor().getTriggers()) { try { RemoteMonitorTrigger trigger = ThreatIntelMonitorUtils.buildRemoteMonitorTrigger(it); triggers.add(trigger); @@ -160,13 +160,13 @@ private Monitor buildThreatIntelMonitor(IndexThreatIntelMonitorRequest request) return new Monitor( request.getMethod() == RestRequest.Method.POST ? Monitor.NO_ID : request.getId(), Monitor.NO_VERSION, - StringUtils.isBlank(request.getThreatIntelMonitor().getName()) ? "threat_intel_monitor" : request.getThreatIntelMonitor().getName(), - request.getThreatIntelMonitor().isEnabled(), - request.getThreatIntelMonitor().getSchedule(), + StringUtils.isBlank(request.getMonitor().getName()) ? "threat_intel_monitor" : request.getMonitor().getName(), + request.getMonitor().isEnabled(), + request.getMonitor().getSchedule(), Instant.now(), - request.getThreatIntelMonitor().isEnabled() ? Instant.now() : null, + request.getMonitor().isEnabled() ? Instant.now() : null, THREAT_INTEL_MONITOR_TYPE, - request.getThreatIntelMonitor().getUser(), + request.getMonitor().getUser(), 1, List.of(remoteDocLevelMonitorInput), triggers, diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/util/ThreatIntelMonitorUtils.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/util/ThreatIntelMonitorUtils.java index 81f148d88..3ce7bf3b2 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/util/ThreatIntelMonitorUtils.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/util/ThreatIntelMonitorUtils.java @@ -7,6 +7,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput; import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; @@ -64,17 +65,16 @@ public static ThreatIntelTrigger getThreatIntelTriggerFromBytesReference(RemoteM return ThreatIntelTrigger.parse(parser); } - public static ThreatIntelInput getThreatIntelInputFromBytesReference(RemoteDocLevelMonitorInput input, NamedXContentRegistry namedXContentRegistry) throws IOException { - String inputBytes = BytesReference.bytes(input.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)).utf8ToString(); - XContentParser parser = XContentType.JSON.xContent().createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, inputBytes); - parser.nextToken(); - return ThreatIntelInput.parse(parser); + public static ThreatIntelInput getThreatIntelInputFromBytesReference(BytesReference bytes, NamedXContentRegistry namedXContentRegistry) throws IOException { + StreamInput sin = StreamInput.wrap(bytes.toBytesRef().bytes); + ThreatIntelInput threatIntelInput = new ThreatIntelInput(sin); + return threatIntelInput; } public static ThreatIntelMonitorDto buildThreatIntelMonitorDto(String id, Monitor monitor, NamedXContentRegistry namedXContentRegistry) throws IOException { - RemoteDocLevelMonitorInput input = (RemoteDocLevelMonitorInput) monitor.getInputs().get(0); - List indices = input.getDocLevelMonitorInput().getIndices(); - ThreatIntelInput threatIntelInput = getThreatIntelInputFromBytesReference(input, namedXContentRegistry); + RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = (RemoteDocLevelMonitorInput) monitor.getInputs().get(0); + List indices = remoteDocLevelMonitorInput.getDocLevelMonitorInput().getIndices(); + ThreatIntelInput threatIntelInput = getThreatIntelInputFromBytesReference(remoteDocLevelMonitorInput.getInput(), namedXContentRegistry); return new ThreatIntelMonitorDto( id, monitor.getName(), diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportListIOCsAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportListIOCsAction.java index 7737b0c08..ded07c9d5 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportListIOCsAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportListIOCsAction.java @@ -45,7 +45,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -121,11 +120,10 @@ void start() { } - SortBuilder sortBuilder = SortBuilders .fieldSort(STIX2_IOC_NESTED_PATH + request.getSortString()) .order(SortOrder.fromString(request.getSortOrder().toString())); - + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .version(true) .seqNoAndPrimaryTerm(true) diff --git a/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsIntegTestCase.java b/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsIntegTestCase.java new file mode 100644 index 000000000..a85196663 --- /dev/null +++ b/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsIntegTestCase.java @@ -0,0 +1,23 @@ +package org.opensearch.securityanalytics; + +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; + +public class SecurityAnalyticsIntegTestCase extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList( + SecurityAnalyticsPlugin.class + ); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + +} diff --git a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java index 77b7c10ab..7539fdd8f 100644 --- a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java +++ b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java @@ -6,19 +6,19 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.lucene.tests.util.LuceneTestCase; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.alerting.model.IntervalSchedule; import org.opensearch.commons.alerting.model.Schedule; import org.opensearch.commons.alerting.model.action.Action; import org.opensearch.commons.alerting.model.action.Throttle; import org.opensearch.commons.authuser.User; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.script.Script; import org.opensearch.script.ScriptType; import org.opensearch.securityanalytics.model.CorrelationQuery; @@ -807,7 +807,7 @@ public static String toJsonStringWithUser(Detector detector) throws IOException return BytesReference.bytes(builder).utf8ToString(); } - public static String toJsonString(IoCMatch iocMatch) throws IOException { + public static String toJsonString(IocMatch iocMatch) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); builder = iocMatch.toXContent(builder, ToXContent.EMPTY_PARAMS); return BytesReference.bytes(builder).utf8ToString(); diff --git a/src/test/java/org/opensearch/securityanalytics/model/IoCMatchTests.java b/src/test/java/org/opensearch/securityanalytics/model/IocMatchTests.java similarity index 89% rename from src/test/java/org/opensearch/securityanalytics/model/IoCMatchTests.java rename to src/test/java/org/opensearch/securityanalytics/model/IocMatchTests.java index 4b56c7eb5..298d3f7e4 100644 --- a/src/test/java/org/opensearch/securityanalytics/model/IoCMatchTests.java +++ b/src/test/java/org/opensearch/securityanalytics/model/IocMatchTests.java @@ -5,6 +5,7 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -13,15 +14,15 @@ import static org.opensearch.securityanalytics.TestHelpers.toJsonString; -public class IoCMatchTests extends OpenSearchTestCase { +public class IocMatchTests extends OpenSearchTestCase { public void testIoCMatchAsAStream() throws IOException { - IoCMatch iocMatch = getRandomIoCMatch(); + IocMatch iocMatch = getRandomIoCMatch(); String jsonString = toJsonString(iocMatch); BytesStreamOutput out = new BytesStreamOutput(); iocMatch.writeTo(out); StreamInput sin = StreamInput.wrap(out.bytes().toBytesRef().bytes); - IoCMatch newIocMatch = new IoCMatch(sin); + IocMatch newIocMatch = new IocMatch(sin); assertEquals(iocMatch.getId(), newIocMatch.getId()); assertEquals(iocMatch.getIocScanJobId(), newIocMatch.getIocScanJobId()); assertEquals(iocMatch.getIocScanJobName(), newIocMatch.getIocScanJobName()); @@ -38,11 +39,11 @@ public void testIoCMatchParse() throws IOException { "\"relatedDocId2\"], \"feed_ids\": [\"feedId1\", \"feedId2\"], \"ioc_scan_job_id\":" + " \"scanJob123\", \"ioc_scan_job_name\": \"Example Scan Job\", \"ioc_value\": \"exampleIocValue\", " + "\"ioc_type\": \"exampleIocType\", \"timestamp\": 1620912896000, \"execution_id\": \"execution123\" }"; - IoCMatch iocMatch = IoCMatch.parse((getParser(iocMatchString))); + IocMatch iocMatch = IocMatch.parse((getParser(iocMatchString))); BytesStreamOutput out = new BytesStreamOutput(); iocMatch.writeTo(out); StreamInput sin = StreamInput.wrap(out.bytes().toBytesRef().bytes); - IoCMatch newIocMatch = new IoCMatch(sin); + IocMatch newIocMatch = new IocMatch(sin); assertEquals(iocMatch.getId(), newIocMatch.getId()); assertEquals(iocMatch.getIocScanJobId(), newIocMatch.getIocScanJobId()); assertEquals(iocMatch.getIocScanJobName(), newIocMatch.getIocScanJobName()); @@ -61,8 +62,8 @@ public XContentParser getParser(String xc) throws IOException { } - private static IoCMatch getRandomIoCMatch() { - return new IoCMatch( + private static IocMatch getRandomIoCMatch() { + return new IocMatch( randomAlphaOfLength(10), List.of(randomAlphaOfLength(10), randomAlphaOfLength(10)), List.of(randomAlphaOfLength(10), randomAlphaOfLength(10)), diff --git a/src/test/java/org/opensearch/securityanalytics/resthandler/ThreatIntelMonitorRestApiIT.java b/src/test/java/org/opensearch/securityanalytics/resthandler/ThreatIntelMonitorRestApiIT.java index 9261fc383..0f612e587 100644 --- a/src/test/java/org/opensearch/securityanalytics/resthandler/ThreatIntelMonitorRestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/resthandler/ThreatIntelMonitorRestApiIT.java @@ -6,6 +6,7 @@ import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.opensearch.client.Response; +import org.opensearch.commons.alerting.model.IntervalSchedule; import org.opensearch.commons.alerting.model.Monitor; import org.opensearch.securityanalytics.SecurityAnalyticsPlugin; import org.opensearch.securityanalytics.SecurityAnalyticsRestTestCase; @@ -32,7 +33,7 @@ public void testCreateThreatIntelMonitor() throws IOException { String monitorName = "test_monitor_name"; ThreatIntelMonitorDto iocScanMonitor = randomIocScanMonitorDto(index); - Response response = makeRequest(client(), "POST", SecurityAnalyticsPlugin.THREAT_INTEL_MONITOR_URI, Collections.emptyMap(), toHttpEntity(iocScanMonitor)); + Response response = makeRequest(client(), "POST", SecurityAnalyticsPlugin.THREAT_INTEL_MONITOR_URI, Collections.emptyMap(), toHttpEntity(iocScanMonitor)); Assert.assertEquals(201, response.getStatusLine().getStatusCode()); Map responseBody = asMap(response); @@ -41,7 +42,8 @@ public void testCreateThreatIntelMonitor() throws IOException { Response alertingMonitorResponse = getAlertingMonitor(client(), monitorId); Assert.assertEquals(200, alertingMonitorResponse.getStatusLine().getStatusCode()); - + String doc = "{\"ip\":\"123\", \"ip1\":\"123\"}"; + indexDoc(index, "1", doc); Response executeResponse = executeAlertingMonitor(monitorId, Collections.emptyMap()); Map executeResults = entityAsMap(executeResponse); assertEquals(1, 1); @@ -75,9 +77,9 @@ private ThreatIntelMonitorDto randomIocScanMonitorDto(String index) { return new ThreatIntelMonitorDto( Monitor.NO_ID, randomAlphaOfLength(10), - List.of(new PerIocTypeScanInputDto("IP", Map.of("abc", List.of("abc")))), - new org.opensearch.commons.alerting.model.IntervalSchedule(1, ChronoUnit.MINUTES, Instant.now()), - true, + List.of(new PerIocTypeScanInputDto("IP", Map.of(index, List.of("ip")))), + new IntervalSchedule(1, ChronoUnit.MINUTES, Instant.now()), + false, //todo change to true after testing null, List.of(index), Collections.emptyList()); } diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java new file mode 100644 index 000000000..fc24b5b76 --- /dev/null +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/IocMatchServiceIT.java @@ -0,0 +1,67 @@ +package org.opensearch.securityanalytics.threatIntel.iocscan.dao; + +import org.opensearch.action.LatchedActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.common.util.concurrent.CountDown; +import org.opensearch.core.action.ActionListener; +import org.opensearch.securityanalytics.SecurityAnalyticsIntegTestCase; +import org.opensearch.securityanalytics.model.threatintel.IocMatch; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + + +public class IocMatchServiceIT extends SecurityAnalyticsIntegTestCase { + + public void test_indexIocMatches() throws InterruptedException { + IocMatchService service = new IocMatchService(client(), clusterService()); + List iocMatches = generateIocMatches(10); + CountDown countdown = new CountDown(1); + service.indexIocMatches(iocMatches, ActionListener.wrap(r -> { + countdown.countDown(); + }, e -> { + logger.error("failed to index ioc matches", e); + fail(); + countdown.countDown(); + })); + SearchRequest request = new SearchRequest(IocMatchService.INDEX_NAME); + request.source().size(10); + CountDown countDownLatch1 = new CountDown(1); + client().search(request, ActionListener.wrap( + response -> { + assertEquals(response.getHits().getHits().length, 10); + countDownLatch1.countDown(); + }, + e -> { + logger.error("failed to search indexed ioc matches", e); + fail(); + countDownLatch1.countDown(); + } + + )); + countDownLatch1.isCountedDown(); + } + + private List generateIocMatches(int i) { + List iocMatches = new ArrayList<>(); + String monitorId = randomAlphaOfLength(10); + String monitorName = randomAlphaOfLength(10); + for (int i1 = 0; i1 < i; i1++) { + iocMatches.add(new IocMatch( + randomAlphaOfLength(10), + randomList(1, 10, () -> randomAlphaOfLength(10)),//docids + randomList(1, 10, () -> randomAlphaOfLength(10)), //feedids + monitorId, + monitorName, + randomAlphaOfLength(10), + "IP", + Instant.now(), + randomAlphaOfLength(10) + )); + } + return iocMatches; + } +} \ No newline at end of file diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelInputTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelInputTests.java index 925df5e15..95eaafc6a 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelInputTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/model/monitor/ThreatIntelInputTests.java @@ -25,7 +25,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; -import static org.opensearch.securityanalytics.threatIntel.model.monitor.SampleRemoteDocLevelMonitorRunner.THREAT_INTEL_MONITOR_TYPE; +import static org.opensearch.securityanalytics.threatIntel.model.monitor.ThreatIntelMonitorRunner.THREAT_INTEL_MONITOR_TYPE; public class ThreatIntelInputTests extends OpenSearchTestCase {