From f6cbebedab450290a2e31bade44dee01545d327a Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Wed, 4 Oct 2023 18:25:48 -0700 Subject: [PATCH] updated job scheduler and ThreatIntelFeedDataService --- .../SecurityAnalyticsPlugin.java | 2 +- .../monitors/opensearch_security.policy | 4 +- .../ThreatIntelFeedDataService.java | 248 ++++++++++++- .../threatIntel/common/Constants.java | 9 + .../DeleteDatasourceTransportAction.java | 10 +- .../common/DatasourceManifest.java | 45 +-- .../threatintel/dao/ThreatIntelFeedDao.java | 349 ------------------ .../threatintel/jobscheduler/Datasource.java | 4 +- .../jobscheduler/DatasourceRunner.java | 5 +- .../jobscheduler/DatasourceUpdateService.java | 34 +- .../securityanalytics/TestHelpers.java | 2 +- 11 files changed, 304 insertions(+), 408 deletions(-) create mode 100644 src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java delete mode 100644 src/main/java/org/opensearch/securityanalytics/threatintel/dao/ThreatIntelFeedDao.java diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index ccf2f44ab..33808b445 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -137,7 +137,7 @@ public Collection createComponents(Client client, mapperService = new MapperService(client, clusterService, indexNameExpressionResolver, indexTemplateManager, logTypeService); ruleIndices = new RuleIndices(logTypeService, client, clusterService, threadPool); correlationRuleIndices = new CorrelationRuleIndices(client, clusterService); - ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), client, indexNameExpressionResolver, xContentRegistry); + ThreatIntelFeedDataService threatIntelFeedDataService = new ThreatIntelFeedDataService(clusterService.state(), clusterService, client, indexNameExpressionResolver, xContentRegistry); DetectorThreatIntelService detectorThreatIntelService = new DetectorThreatIntelService(threatIntelFeedDataService); this.client = client; diff --git a/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy b/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy index eb8697803..c5af78398 100644 --- a/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy +++ b/src/main/java/org/opensearch/securityanalytics/config/monitors/opensearch_security.policy @@ -1,3 +1,3 @@ grant { - permission java.lang.management.ManagementPermission "connect,resolve"; -}; + permission java.lang.management.ManagementPermission "reputation.alienvault.com:443" "connect,resolve"; +}; \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java index 91d156003..351572470 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java @@ -1,39 +1,106 @@ package org.opensearch.securityanalytics.threatIntel; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.SpecialPermission; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.client.Requests; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.securityanalytics.findings.FindingsService; import org.opensearch.securityanalytics.model.ThreatIntelFeedData; +import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; +import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; +import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelSettings; +import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; import org.opensearch.securityanalytics.util.IndexUtils; +import org.opensearch.securityanalytics.util.SecurityAnalyticsException; +import org.opensearch.securityanalytics.threatIntel.common.Constants; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.*; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import static org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource.THREAT_INTEL_DATA_INDEX_NAME_PREFIX; /** * Service to handle CRUD operations on Threat Intel Feed Data */ public class ThreatIntelFeedDataService { private static final Logger log = LogManager.getLogger(FindingsService.class); + private static final String SCHEMA_VERSION = "schema_version"; + private static final String IOC_TYPE = "ioc_type"; + private static final String IOC_VALUE = "ioc_value"; + private static final String FEED_ID = "feed_id"; + private static final String TIMESTAMP = "timestamp"; + private static final String TYPE = "type"; + private static final String DATA_FIELD_NAME = "_data"; + private final ClusterState state; private final Client client; private final IndexNameExpressionResolver indexNameExpressionResolver; + private static final Map INDEX_SETTING_TO_CREATE = Map.of( + "index.number_of_shards", + 1, + "index.number_of_replicas", + 0, + "index.refresh_interval", + -1, + "index.hidden", + true + ); + private static final Map INDEX_SETTING_TO_FREEZE = Map.of( + "index.auto_expand_replicas", + "0-all", + "index.blocks.write", + true + ); + private final ClusterService clusterService; + private final ClusterSettings clusterSettings; + public ThreatIntelFeedDataService( ClusterState state, + ClusterService clusterService, Client client, IndexNameExpressionResolver indexNameExpressionResolver, NamedXContentRegistry xContentRegistry) { @@ -41,6 +108,8 @@ public ThreatIntelFeedDataService( this.client = client; this.indexNameExpressionResolver = indexNameExpressionResolver; this.xContentRegistry = xContentRegistry; + this.clusterService = clusterService; + this.clusterSettings = clusterService.getClusterSettings(); } private final NamedXContentRegistry xContentRegistry; @@ -52,7 +121,7 @@ public void getThreatIntelFeedData( String tifdIndex = IndexUtils.getNewIndexByCreationDate( this.state, this.indexNameExpressionResolver, - ".opendsearch-sap-threatintel*" + ".opensearch-sap-threatintel*" //name? ); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.boolQuery().filter(QueryBuilders.termQuery("ioc_type", iocType))); @@ -87,4 +156,175 @@ private List getTifdList(SearchResponse searchResponse) { } return list; } + + /** + * Create an index for a threat intel feed + * + * Index setting start with single shard, zero replica, no refresh interval, and hidden. + * Once the threat intel feed is indexed, do refresh and force merge. + * Then, change the index setting to expand replica to all nodes, and read only allow delete. + * + * @param indexName index name + */ + public void createIndexIfNotExists(final String indexName) { + if (clusterService.state().metadata().hasIndex(indexName) == true) { + return; + } + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE) + .mapping(getIndexMapping()); + StashedThreadContext.run( + client, + () -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) + ); + } + + private void freezeIndex(final String indexName) { + TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT); + StashedThreadContext.run(client, () -> { + client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); + client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); + client.admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(INDEX_SETTING_TO_FREEZE) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)); + }); + } + + private String getIndexMapping() { + try { + try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threat_intel_feed_mapping.json")) { // TODO: check Datasource dao and this mapping + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().map(String::trim).collect(Collectors.joining()); + } + } + } catch (IOException e) { + log.error("Runtime exception when getting the threat intel index mapping", e); + throw new SecurityAnalyticsException("Runtime exception when getting the threat intel index mapping", RestStatus.INTERNAL_SERVER_ERROR, e); + } + } + + /** + * Create CSVParser of a threat intel feed + * + * @param manifest Datasource manifest + * @return CSVParser for threat intel feed + */ + @SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") + public CSVParser getDatabaseReader(final DatasourceManifest manifest) { + SpecialPermission.check(); + return AccessController.doPrivileged((PrivilegedAction) () -> { + try { + URL url = new URL(manifest.getUrl()); + return internalGetDatabaseReader(manifest, url.openConnection()); + } catch (IOException e) { + log.error("Exception: failed to read threat intel feed data from {}",manifest.getUrl(), e); + throw new OpenSearchException("failed to read threat intel feed data from {}", manifest.getUrl(), e); + } + }); + } + + @SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") // TODO: update this function because no zip file... + protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest, final URLConnection connection) throws IOException { + connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); + ZipInputStream zipIn = new ZipInputStream(connection.getInputStream()); + ZipEntry zipEntry = zipIn.getNextEntry(); + while (zipEntry != null) { + if (zipEntry.getName().equalsIgnoreCase(manifest.getDbName()) == false) { + zipEntry = zipIn.getNextEntry(); + continue; + } + return new CSVParser(new BufferedReader(new InputStreamReader(zipIn)), CSVFormat.RFC4180); + } + throw new IllegalArgumentException( + String.format(Locale.ROOT, "database file [%s] does not exist in the zip file [%s]", manifest.getDbName(), manifest.getUrl()) + ); + } + + /** + * Puts threat intel feed from CSVRecord iterator into a given index in bulk + * + * @param indexName Index name to puts the TIF data + * @param fields Field name matching with data in CSVRecord in order + * @param iterator TIF data to insert + * @param renewLock Runnable to renew lock + */ + public void saveThreatIntelFeedData( + final String indexName, + final String[] fields, + final Iterator iterator, + final Runnable renewLock +// final ThreatIntelFeedData threatIntelFeedData + ) throws IOException { + if (indexName == null || fields == null || iterator == null || renewLock == null){ + throw new IllegalArgumentException("Fields cannot be null"); + } + + TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT); + Integer batchSize = clusterSettings.get(ThreatIntelSettings.BATCH_SIZE); + final BulkRequest bulkRequest = new BulkRequest(); + Queue requests = new LinkedList<>(); + for (int i = 0; i < batchSize; i++) { + requests.add(Requests.indexRequest(indexName)); + } + while (iterator.hasNext()) { + CSVRecord record = iterator.next(); +// XContentBuilder tifData = threatIntelFeedData.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); + IndexRequest indexRequest = (IndexRequest) requests.poll(); +// indexRequest.source(tifData); + indexRequest.id(record.get(0)); + bulkRequest.add(indexRequest); + if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) { + BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout)); + if (response.hasFailures()) { + throw new OpenSearchException( + "error occurred while ingesting threat intel feed data in {} with an error {}", + indexName, + response.buildFailureMessage() + ); + } + requests.addAll(bulkRequest.requests()); + bulkRequest.requests().clear(); + } + renewLock.run(); + } + freezeIndex(indexName); + } + + public void deleteThreatIntelDataIndex(final String index) { + deleteThreatIntelDataIndex(Arrays.asList(index)); + } + + public void deleteThreatIntelDataIndex(final List indices) { + if (indices == null || indices.isEmpty()) { + return; + } + + Optional invalidIndex = indices.stream() + .filter(index -> index.startsWith(THREAT_INTEL_DATA_INDEX_NAME_PREFIX) == false) + .findAny(); + if (invalidIndex.isPresent()) { + throw new OpenSearchException( + "the index[{}] is not threat intel data index which should start with {}", + invalidIndex.get(), + THREAT_INTEL_DATA_INDEX_NAME_PREFIX + ); + } + + AcknowledgedResponse response = StashedThreadContext.run( + client, + () -> client.admin() + .indices() + .prepareDelete(indices.toArray(new String[0])) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .execute() + .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) + ); + + if (response.isAcknowledged() == false) { + throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices)); + } + } + } diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java new file mode 100644 index 000000000..af31e7897 --- /dev/null +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/Constants.java @@ -0,0 +1,9 @@ +package org.opensearch.securityanalytics.threatIntel.common; + +import org.opensearch.Version; + +import java.util.Locale; +public class Constants { + public static final String USER_AGENT_KEY = "User-Agent"; + public static final String USER_AGENT_VALUE = String.format(Locale.ROOT, "OpenSearch/%s vanilla", Version.CURRENT.toString()); +} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java index d8515d40e..5ff65a945 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/action/DeleteDatasourceTransportAction.java @@ -21,7 +21,7 @@ import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelLockService; import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; -import org.opensearch.securityanalytics.threatIntel.dao.ThreatIntelFeedDao; +import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService; import org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -39,7 +39,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction { String url = (String) args[0]; String dbName = (String) args[1]; - String sha256Hash = (String) args[2]; - String organization = (String) args[4]; - String description = (String) args[5]; - Long updatedAt = (Long) args[3]; - return new DatasourceManifest(url, dbName, sha256Hash, organization, description, updatedAt); + return new DatasourceManifest(url, dbName); } ); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), URL_FIELD); PARSER.declareString(ConstructingObjectParser.constructorArg(), DB_NAME_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), SHA256_HASH_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), ORGANIZATION_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), DESCRIPTION_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), UPDATED_AT_FIELD); - } - - public String getUrl() { - return this.url; } /** * Datasource manifest builder */ public static class Builder { - private static final int MANIFEST_FILE_MAX_BYTES = 1024 * 8; //check this + private static final int MANIFEST_FILE_MAX_BYTES = 1024 * 8; /** * Build DatasourceManifest from a given url @@ -145,7 +134,7 @@ public static class Builder { * @param url url to downloads a manifest file * @return DatasourceManifest representing the manifest file */ - @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") + @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") // change permissions public static DatasourceManifest build(final URL url) { SpecialPermission.check(); return AccessController.doPrivileged((PrivilegedAction) () -> { @@ -153,7 +142,7 @@ public static DatasourceManifest build(final URL url) { URLConnection connection = url.openConnection(); return internalBuild(connection); } catch (IOException e) { - log.error("Runtime exception", e); + log.error("Runtime exception connecting to the manifest file", e); throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO } }); @@ -161,7 +150,7 @@ public static DatasourceManifest build(final URL url) { @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") protected static DatasourceManifest internalBuild(final URLConnection connection) throws IOException { -// connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); + connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); InputStreamReader inputStreamReader = new InputStreamReader(connection.getInputStream()); try (BufferedReader reader = new BufferedReader(inputStreamReader)) { CharBuffer charBuffer = CharBuffer.allocate(MANIFEST_FILE_MAX_BYTES); diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/dao/ThreatIntelFeedDao.java b/src/main/java/org/opensearch/securityanalytics/threatintel/dao/ThreatIntelFeedDao.java deleted file mode 100644 index f0a4fa615..000000000 --- a/src/main/java/org/opensearch/securityanalytics/threatintel/dao/ThreatIntelFeedDao.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.securityanalytics.threatIntel.dao; - -import static org.opensearch.securityanalytics.threatIntel.jobscheduler.Datasource.THREAT_INTEL_DATA_INDEX_NAME_PREFIX; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URL; -import java.net.URLConnection; -import java.nio.charset.StandardCharsets; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Queue; -import java.util.stream.Collectors; -import java.util.zip.ZipEntry; -import java.util.zip.ZipInputStream; - -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Strings; -import org.opensearch.OpenSearchException; -import org.opensearch.SpecialPermission; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.bulk.BulkRequest; -import org.opensearch.action.bulk.BulkResponse; -import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.client.Client; -import org.opensearch.client.Requests; -import org.opensearch.cluster.routing.Preference; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.SuppressForbidden; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.common.xcontent.XContentHelper; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.securityanalytics.model.DetectorTrigger; -import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; -import org.opensearch.securityanalytics.threatIntel.common.ThreatIntelSettings; - -import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.securityanalytics.util.SecurityAnalyticsException; - -/** - * Data access object for threat intel feed data - */ -public class ThreatIntelFeedDao { - private static final Logger log = LogManager.getLogger(DetectorTrigger.class); - - private static final String IP_RANGE_FIELD_NAME = "_cidr"; - private static final String DATA_FIELD_NAME = "_data"; - private static final Map INDEX_SETTING_TO_CREATE = Map.of( - "index.number_of_shards", - 1, - "index.number_of_replicas", - 0, - "index.refresh_interval", - -1, - "index.hidden", - true - ); - private static final Map INDEX_SETTING_TO_FREEZE = Map.of( - "index.auto_expand_replicas", - "0-all", - "index.blocks.write", - true - ); - private final ClusterService clusterService; - private final ClusterSettings clusterSettings; - private final Client client; - - public ThreatIntelFeedDao(final ClusterService clusterService, final Client client) { - this.clusterService = clusterService; - this.clusterSettings = clusterService.getClusterSettings(); - this.client = client; - } - - /** - * Create an index for TIF data - * - * Index setting start with single shard, zero replica, no refresh interval, and hidden. - * Once the TIF data is indexed, do refresh and force merge. - * Then, change the index setting to expand replica to all nodes, and read only allow delete. - * See {@link #freezeIndex} - * - * @param indexName index name - */ - public void createIndexIfNotExists(final String indexName) { - if (clusterService.state().metadata().hasIndex(indexName) == true) { - return; - } - final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(INDEX_SETTING_TO_CREATE) - .mapping(getIndexMapping()); - StashedThreadContext.run( - client, - () -> client.admin().indices().create(createIndexRequest).actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) - ); - } - - private void freezeIndex(final String indexName) { - TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT); - StashedThreadContext.run(client, () -> { - client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); - client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); - client.admin() - .indices() - .prepareUpdateSettings(indexName) - .setSettings(INDEX_SETTING_TO_FREEZE) - .execute() - .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)); - }); - } - - /** - * Generate XContentBuilder representing datasource database index mapping - * - * { - * "dynamic": false, - * "properties": { - * "_cidr": { - * "type": "ip_range", - * "doc_values": false - * } - * } - * } - * - * @return String representing datasource database index mapping - */ - private String getIndexMapping() { - try { - try (InputStream is = DatasourceDao.class.getResourceAsStream("/mappings/threat_intel_feed_mapping.json")) { - try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { - return reader.lines().map(String::trim).collect(Collectors.joining()); - } - } - } catch (IOException e) { - log.error("Runtime exception", e); - throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); //TODO - } - } - - /** - * Create CSVParser of a threat intel feed - * - * @param manifest Datasource manifest - * @return CSVParser for threat intel feed - */ - @SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") - public CSVParser getDatabaseReader(final DatasourceManifest manifest) { - SpecialPermission.check(); - return AccessController.doPrivileged((PrivilegedAction) () -> { - try { - URL zipUrl = new URL(manifest.getUrl()); - return internalGetDatabaseReader(manifest, zipUrl.openConnection()); - } catch (IOException e) { - log.error("Exception: failed to read threat intel feed data from {}",manifest.getUrl(), e); - throw new OpenSearchException("failed to read threat intel feed data from {}", manifest.getUrl(), e); - } - }); - } - - @SuppressForbidden(reason = "Need to connect to http endpoint to read threat intel feed database file") - protected CSVParser internalGetDatabaseReader(final DatasourceManifest manifest, final URLConnection connection) throws IOException { -// connection.addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); - ZipInputStream zipIn = new ZipInputStream(connection.getInputStream()); - ZipEntry zipEntry = zipIn.getNextEntry(); - while (zipEntry != null) { - if (zipEntry.getName().equalsIgnoreCase(manifest.getDbName()) == false) { - zipEntry = zipIn.getNextEntry(); - continue; - } - return new CSVParser(new BufferedReader(new InputStreamReader(zipIn)), CSVFormat.RFC4180); - } - throw new IllegalArgumentException( - String.format(Locale.ROOT, "database file [%s] does not exist in the zip file [%s]", manifest.getDbName(), manifest.getUrl()) - ); - } - - /** - * Create a document to ingest in datasource database index - * - * It assumes the first field as ip_range. The rest is added under data field. - * - * Document example - * { - * "_cidr":"1.0.0.1/25", - * "_data":{ - * "country": "USA", - * "city": "Seattle", - * "location":"13.23,42.12" - * } - * } - * - * @param fields a list of field name - * @param values a list of values - * @return Document in json string format - * @throws IOException the exception - */ - public XContentBuilder createDocument(final String[] fields, final String[] values) throws IOException { - if (fields.length != values.length) { - throw new OpenSearchException("header[{}] and record[{}] length does not match", fields, values); - } - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - builder.field(IP_RANGE_FIELD_NAME, values[0]); - builder.startObject(DATA_FIELD_NAME); - for (int i = 1; i < fields.length; i++) { - if (Strings.isBlank(values[i])) { - continue; - } - builder.field(fields[i], values[i]); - } - builder.endObject(); - builder.endObject(); - builder.close(); - return builder; - } - - /** - * Query a given index using a given ip address to get TIF data - * - * @param indexName index - * @param ip ip address - * @return TIF data - */ - public Map getTIFData(final String indexName, final String ip) { - SearchResponse response = StashedThreadContext.run( - client, - () -> client.prepareSearch(indexName) - .setSize(1) - .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) - .setPreference(Preference.LOCAL.type()) - .setRequestCache(true) - .get(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) - ); - - if (response.getHits().getHits().length == 0) { - return Collections.emptyMap(); - } else { - return (Map) XContentHelper.convertToMap(response.getHits().getAt(0).getSourceRef(), false, XContentType.JSON) - .v2() - .get(DATA_FIELD_NAME); - } - } - - /** - * Puts TIF data from CSVRecord iterator into a given index in bulk - * - * @param indexName Index name to puts the TIF data - * @param fields Field name matching with data in CSVRecord in order - * @param iterator TIF data to insert - * @param renewLock Runnable to renew lock - */ - public void putTIFData( - final String indexName, //non null all of these fields - final String[] fields, - final Iterator iterator, - final Runnable renewLock - ) throws IOException { - TimeValue timeout = clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT); - Integer batchSize = clusterSettings.get(ThreatIntelSettings.BATCH_SIZE); - final BulkRequest bulkRequest = new BulkRequest(); - Queue requests = new LinkedList<>(); - for (int i = 0; i < batchSize; i++) { - requests.add(Requests.indexRequest(indexName)); - } - while (iterator.hasNext()) { - CSVRecord record = iterator.next(); - XContentBuilder document = createDocument(fields, record.values()); - IndexRequest indexRequest = (IndexRequest) requests.poll(); - indexRequest.source(document); - indexRequest.id(record.get(0)); - bulkRequest.add(indexRequest); - if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) { - BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout)); - if (response.hasFailures()) { - throw new OpenSearchException( - "error occurred while ingesting threat intel feed data in {} with an error {}", - indexName, - response.buildFailureMessage() - ); - } - requests.addAll(bulkRequest.requests()); - bulkRequest.requests().clear(); - } - renewLock.run(); - } - freezeIndex(indexName); - } - - public void deleteThreatIntelDataIndex(final String index) { - deleteThreatIntelDataIndex(Arrays.asList(index)); - } - - public void deleteThreatIntelDataIndex(final List indices) { - if (indices == null || indices.isEmpty()) { - return; - } - - Optional invalidIndex = indices.stream() - .filter(index -> index.startsWith(THREAT_INTEL_DATA_INDEX_NAME_PREFIX) == false) - .findAny(); - if (invalidIndex.isPresent()) { - throw new OpenSearchException( - "the index[{}] is not threat intel data index which should start with {}", - invalidIndex.get(), - THREAT_INTEL_DATA_INDEX_NAME_PREFIX - ); - } - - AcknowledgedResponse response = StashedThreadContext.run( - client, - () -> client.admin() - .indices() - .prepareDelete(indices.toArray(new String[0])) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .execute() - .actionGet(clusterSettings.get(ThreatIntelSettings.THREAT_INTEL_TIMEOUT)) - ); - - if (response.isAcknowledged() == false) { - throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices)); - } - } -} diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java index 948c6f0e0..00ff1d419 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/Datasource.java @@ -36,7 +36,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { /** * Prefix of indices having threatIntel data */ - public static final String THREAT_INTEL_DATA_INDEX_NAME_PREFIX = ".opensearch-sap-threat-intel-config"; + public static final String THREAT_INTEL_DATA_INDEX_NAME_PREFIX = "opensearch-sap-threatintel"; /** * Default fields for job scheduling @@ -406,7 +406,7 @@ public Instant getEnabledTime() { } @Override - public Schedule getSchedule() { + public IntervalSchedule getSchedule() { return this.schedule; } diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java index 2041d1ece..8de306d33 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceRunner.java @@ -13,9 +13,11 @@ import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.securityanalytics.model.DetectorTrigger; import java.io.IOException; +import java.time.temporal.ChronoUnit; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.time.Instant; @@ -149,8 +151,9 @@ protected void updateDatasource(final ScheduledJobParameter jobParameter, final log.error("Failed to update datasource for {}", datasource.getName(), e); datasource.getUpdateStats().setLastFailedAt(Instant.now()); datasourceDao.updateDatasource(datasource); + } finally { //post processing + datasourceUpdateService.updateDatasource(datasource, datasource.getSchedule(), DatasourceTask.ALL); } } - } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java index 235513db1..5a24c5a84 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatintel/jobscheduler/DatasourceUpdateService.java @@ -29,7 +29,7 @@ import org.opensearch.securityanalytics.model.DetectorTrigger; import org.opensearch.securityanalytics.threatIntel.common.DatasourceManifest; import org.opensearch.securityanalytics.threatIntel.dao.DatasourceDao; -import org.opensearch.securityanalytics.threatIntel.dao.ThreatIntelFeedDao; +import org.opensearch.securityanalytics.threatIntel.ThreatIntelFeedDataService; import org.opensearch.securityanalytics.threatIntel.common.DatasourceState; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; @@ -41,17 +41,17 @@ public class DatasourceUpdateService { private final ClusterService clusterService; private final ClusterSettings clusterSettings; private final DatasourceDao datasourceDao; - private final ThreatIntelFeedDao threatIntelFeedDao; + private final ThreatIntelFeedDataService threatIntelFeedDataService; public DatasourceUpdateService( final ClusterService clusterService, final DatasourceDao datasourceDao, - final ThreatIntelFeedDao threatIntelFeedDao + final ThreatIntelFeedDataService threatIntelFeedDataService ) { this.clusterService = clusterService; this.clusterSettings = clusterService.getClusterSettings(); this.datasourceDao = datasourceDao; - this.threatIntelFeedDao = threatIntelFeedDao; + this.threatIntelFeedDataService = threatIntelFeedDataService; } /** @@ -80,7 +80,7 @@ public void updateOrCreateThreatIntelFeedData(final Datasource datasource, final String indexName = setupIndex(datasource); String[] header; List fieldsToStore; - try (CSVParser reader = threatIntelFeedDao.getDatabaseReader(manifest)) { + try (CSVParser reader = threatIntelFeedDataService.getDatabaseReader(manifest)) { CSVRecord headerLine = reader.iterator().next(); header = validateHeader(headerLine).values(); fieldsToStore = Arrays.asList(header).subList(1, header.length); @@ -92,12 +92,12 @@ public void updateOrCreateThreatIntelFeedData(final Datasource datasource, final datasource.getDatabase().getFields().toString() ); } - threatIntelFeedDao.putTIFData(indexName, header, reader.iterator(), renewLock); + threatIntelFeedDataService.saveThreatIntelFeedData(indexName, header, reader.iterator(), renewLock); } waitUntilAllShardsStarted(indexName, MAX_WAIT_TIME_FOR_REPLICATION_TO_COMPLETE_IN_MILLIS); Instant endTime = Instant.now(); - updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); + updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); // then I update the datasource } @@ -133,13 +133,13 @@ protected void waitUntilAllShardsStarted(final String indexName, final int timeo * Therefore, we don't store the first column's header name. * * @param manifestUrl the url of a manifest file - * @return header fields of ioc data + * @return header fields of threat intel feed */ public List getHeaderFields(String manifestUrl) throws IOException { URL url = new URL(manifestUrl); DatasourceManifest manifest = DatasourceManifest.Builder.build(url); - try (CSVParser reader = threatIntelFeedDao.getDatabaseReader(manifest)) { + try (CSVParser reader = threatIntelFeedDataService.getDatabaseReader(manifest)) { String[] fields = reader.iterator().next().values(); return Arrays.asList(fields).subList(1, fields.length); } @@ -177,6 +177,10 @@ public void deleteUnusedIndices(final Datasource datasource) { */ public void updateDatasource(final Datasource datasource, final IntervalSchedule systemSchedule, final DatasourceTask task) { boolean updated = false; + if (datasource.getSchedule().equals(systemSchedule) == false) { + datasource.setSchedule(systemSchedule); + updated = true; + } if (datasource.getTask().equals(task) == false) { datasource.setTask(task); @@ -186,7 +190,7 @@ public void updateDatasource(final Datasource datasource, final IntervalSchedule if (updated) { datasourceDao.updateDatasource(datasource); } - } //TODO + } private List deleteIndices(final List indicesToDelete) { List deletedIndices = new ArrayList<>(indicesToDelete.size()); @@ -197,7 +201,7 @@ private List deleteIndices(final List indicesToDelete) { } try { - threatIntelFeedDao.deleteThreatIntelDataIndex(index); + threatIntelFeedDataService.deleteThreatIntelDataIndex(index); deletedIndices.add(index); } catch (Exception e) { log.error("Failed to delete an index [{}]", index, e); @@ -263,7 +267,7 @@ private String setupIndex(final Datasource datasource) { String indexName = datasource.newIndexName(UUID.randomUUID().toString()); datasource.getIndices().add(indexName); datasourceDao.updateDatasource(datasource); - threatIntelFeedDao.createIndexIfNotExists(indexName); + threatIntelFeedDataService.createIndexIfNotExists(indexName); return indexName; } @@ -284,9 +288,9 @@ private boolean shouldUpdate(final Datasource datasource, final DatasourceManife return false; } - if (manifest.getSha256Hash().equals(datasource.getDatabase().getSha256Hash())) { - return false; - } +// if (manifest.getSha256Hash().equals(datasource.getDatabase().getSha256Hash())) { +// return false; +// } return true; } } diff --git a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java index c18c54872..f7033477c 100644 --- a/src/test/java/org/opensearch/securityanalytics/TestHelpers.java +++ b/src/test/java/org/opensearch/securityanalytics/TestHelpers.java @@ -164,7 +164,7 @@ public static CustomLogType randomCustomLogType(String name, String description, public static ThreatIntelFeedData randomThreatIntelFeedData() { return new ThreatIntelFeedData( "IP_ADDRESS", - ip, + "ip", "alientVault", Instant.now() );