Skip to content

Commit

Permalink
update mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon committed Aug 23, 2024
1 parent 0920e47 commit 58aa310
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.securityanalytics.threatIntel.common.TIFLockService;
import org.opensearch.securityanalytics.threatIntel.model.DefaultIocStoreConfig;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -80,6 +81,7 @@
import static org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig.SOURCE_CONFIG_FIELD;
import static org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig.STATE_FIELD;
import static org.opensearch.securityanalytics.transport.TransportIndexDetectorAction.PLUGIN_OWNER_FIELD;
import static org.opensearch.securityanalytics.util.IndexUtils.shouldUpdateIndex;

/**
* CRUD for threat intel feeds source config object
Expand All @@ -93,7 +95,6 @@ public class SATIFSourceConfigService {
private final NamedXContentRegistry xContentRegistry;
private final TIFLockService lockService;


public SATIFSourceConfigService(final Client client,
final ClusterService clusterService,
ThreadPool threadPool,
Expand Down Expand Up @@ -197,34 +198,59 @@ private String getIndexMapping() {
/**
* Index name: .opensearch-sap--job
* Mapping: /mappings/threat_intel_job_mapping.json
* Updates the job index mapping if currently on a previous version
*
* @param stepListener setup listener
*/
public void createJobIndexIfNotExists(final StepListener<Void> stepListener) {
// check if job index exists
if (clusterService.state().metadata().hasIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) == true) {
stepListener.onResponse(null);
return;
}
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, ActionListener.wrap(
r -> {
log.debug("[{}] index created", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
try {
// Check if job index contains old mapping, if so update index mapping (current version = 2)
if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) {
log.info("Old schema version found for [{}] index, updating the index mapping", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
IndexUtils.updateIndexMapping(
SecurityAnalyticsPlugin.JOB_INDEX_NAME,
getIndexMapping(), clusterService.state(), client.admin().indices(),
ActionListener.wrap(
r -> {
log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}, e -> {
log.error("Failed to update job index mapping", e);
stepListener.onFailure(e);
}
),
false
);
} else {
// If job index contains newest mapping, then do nothing
stepListener.onResponse(null);
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
log.info("Index [{}] already exists", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
}
} catch (IOException e) {
log.error("Failed to check and update job index mapping", e);
stepListener.onFailure(e);
}
} else {
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, ActionListener.wrap(
r -> {
log.debug("[{}] index created", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
return;
}, e -> {
if (e instanceof ResourceAlreadyExistsException) {
log.info("Index [{}] already exists", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
return;
}
log.error("Failed to create [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME, e);
stepListener.onFailure(e);
}
log.error("Failed to create [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME, e);
stepListener.onFailure(e);
}
)));
)));
}
}


// Get TIF source config
public void getTIFSourceConfig(
String tifSourceConfigId,
Expand All @@ -234,7 +260,7 @@ public void getTIFSourceConfig(
client.get(getRequest, ActionListener.wrap(
getResponse -> {
if (!getResponse.isExists()) {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(),"Threat intel source config [%s] not found.", tifSourceConfigId), RestStatus.NOT_FOUND)));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Threat intel source config [%s] not found.", tifSourceConfigId), RestStatus.NOT_FOUND)));
return;
}
SATIFSourceConfig saTifSourceConfig = null;
Expand All @@ -246,7 +272,7 @@ public void getTIFSourceConfig(
saTifSourceConfig = SATIFSourceConfig.docParse(xcp, getResponse.getId(), getResponse.getVersion());
}
if (saTifSourceConfig == null) {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(),"No threat intel source config exists [%s]", tifSourceConfigId), RestStatus.BAD_REQUEST)));
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "No threat intel source config exists [%s]", tifSourceConfigId), RestStatus.BAD_REQUEST)));
} else {
log.debug("Threat intel source config with id [{}] fetched", getResponse.getId());
actionListener.onResponse(saTifSourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@

public class IndexUtils {

private static final String _META = "_meta";
private static final Integer NO_SCHEMA_VERSION = 0;
private static final String SCHEMA_VERSION = "schema_version";
public static final String _META = "_meta";
public static final Integer NO_SCHEMA_VERSION = 0;
public static final String SCHEMA_VERSION = "schema_version";

public static Boolean detectorIndexUpdated = false;
public static Boolean customRuleIndexUpdated = false;
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/mappings/threat_intel_job_mapping.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"dynamic": true,
"_meta" : {
"schema_version": 2
},
Expand Down
63 changes: 62 additions & 1 deletion src/test/java/org/opensearch/securityanalytics/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.securityanalytics.model.DetectorInput;
import org.opensearch.securityanalytics.model.DetectorRule;
import org.opensearch.securityanalytics.model.DetectorTrigger;
import org.opensearch.securityanalytics.model.STIX2IOCDto;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.model.threatintel.IocFinding;
import org.opensearch.securityanalytics.model.threatintel.ThreatIntelAlert;
Expand Down Expand Up @@ -1875,6 +1874,68 @@ public static String windowsIndexMappingOnlyNumericAndText() {
" }";
}

public static String oldThreatIntelJobMapping() {
return " \"dynamic\": \"strict\",\n" +
" \"_meta\": {\n" +
" \"schema_version\": 1\n" +
" },\n" +
" \"properties\": {\n" +
" \"schema_version\": {\n" +
" \"type\": \"integer\"\n" +
" },\n" +
" \"enabled_time\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"indices\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"last_update_time\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"name\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"schedule\": {\n" +
" \"properties\": {\n" +
" \"interval\": {\n" +
" \"properties\": {\n" +
" \"period\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"start_time\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"unit\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"state\": {\n" +
" \"type\": \"text\"\n" +
" },\n" +
" \"update_enabled\": {\n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" \"update_stats\": {\n" +
" \"properties\": {\n" +
" \"last_failed_at_in_epoch_millis\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"last_processing_time_in_millis\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"last_skipped_at_in_epoch_millis\": {\n" +
" \"type\": \"long\"\n" +
" },\n" +
" \"last_succeeded_at_in_epoch_millis\": {\n" +
" \"type\": \"long\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }";
}

public static String randomDoc(int severity, int version, String opCode) {
String doc = "{\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.junit.Assert;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.client.WarningFailureException;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.search.SearchHit;
Expand Down Expand Up @@ -42,6 +44,7 @@

import static org.opensearch.jobscheduler.spi.utils.LockService.LOCK_INDEX_NAME;
import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;
import static org.opensearch.securityanalytics.TestHelpers.oldThreatIntelJobMapping;
import static org.opensearch.securityanalytics.services.STIX2IOCFeedStore.IOC_ALL_INDEX_PATTERN;
import static org.opensearch.securityanalytics.services.STIX2IOCFeedStore.getAllIocIndexPatternById;

Expand Down Expand Up @@ -208,7 +211,7 @@ public void testCreateIocUploadSourceConfigIncorrectIocTypes() throws IOExceptio
}
}

public void testUpdateIocUploadSourceConfig() throws IOException, InterruptedException {
public void testUpdateIocUploadSourceConfig() throws IOException {
// Create source config with IPV4 IOCs
String feedName = "test_update";
String feedFormat = "STIX";
Expand Down Expand Up @@ -365,7 +368,7 @@ public void testUpdateIocUploadSourceConfig() throws IOException, InterruptedExc
assertEquals(1, iocHits.size());
}

public void testActivateDeactivateIocUploadSourceConfig() throws IOException, InterruptedException {
public void testActivateDeactivateIocUploadSourceConfig() throws IOException {
// Create source config with IPV4 IOCs
String feedName = "test_update";
String feedFormat = "STIX";
Expand Down Expand Up @@ -536,7 +539,7 @@ public void testActivateDeactivateIocUploadSourceConfig() throws IOException, In
assertTrue((Boolean) scr.get("enabled_for_scan"));
}

public void testActivateDeactivateUrlDownloadSourceConfig() throws IOException, InterruptedException {
public void testActivateDeactivateUrlDownloadSourceConfig() throws IOException {
// Search source configs when none are created
String request = "{\n" +
" \"query\" : {\n" +
Expand Down Expand Up @@ -889,7 +892,7 @@ public void testSearchAndCreateDefaultSourceConfig() throws IOException {
Assert.assertEquals(1, ((Map<String, Object>) ((Map<String, Object>) responseBody.get("hits")).get("total")).get("value"));
}

public void testUpdateDefaultSourceConfigThrowsError() throws IOException, InterruptedException {
public void testUpdateDefaultSourceConfigThrowsError() throws IOException {
// Search source configs when none are created
String request = "{\n" +
" \"query\" : {\n" +
Expand Down Expand Up @@ -951,6 +954,106 @@ public void testUpdateDefaultSourceConfigThrowsError() throws IOException, Inter
}
}

public void testUpdateJobIndexMapping() throws IOException {
// Create job index with old threat intel mapping
// Try catch needed because of warning when creating a system index which is needed to replicate previous tif job mapping
try {
createIndex(JOB_INDEX_NAME, Settings.EMPTY, oldThreatIntelJobMapping());
} catch (WarningFailureException e) {
// Ensure index was created with old mappings
String request = "{\n" +
" \"query\" : {\n" +
" \"match_all\":{\n" +
" }\n" +
" }\n" +
"}";
List<SearchHit> hits = executeSearch(JOB_INDEX_NAME, request);
Assert.assertEquals(0, hits.size());

Map<String, Object> props = getIndexMappingsAPIFlat(JOB_INDEX_NAME);
assertTrue(props.containsKey("enabled_time"));
assertTrue(props.containsKey("schedule.interval.start_time"));
assertFalse(props.containsKey("source_config.source.ioc_upload.file_name"));
assertFalse(props.containsKey("source_config.source.s3.object_key"));
}

// Create new threat intel source config
SATIFSourceConfigDto saTifSourceConfigDto = getSatifSourceConfigDto();

Response makeResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto));
Assert.assertEquals(RestStatus.CREATED, restStatus(makeResponse));
Map<String, Object> responseBody = asMap(makeResponse);

String createdId = responseBody.get("_id").toString();
Assert.assertNotEquals("response is missing Id", SATIFSourceConfigDto.NO_ID, createdId);

int createdVersion = Integer.parseInt(responseBody.get("_version").toString());
Assert.assertTrue("incorrect version", createdVersion > 0);

// Ensure source config document was indexed
String request = "{\n" +
" \"query\" : {\n" +
" \"match_all\":{\n" +
" }\n" +
" }\n" +
"}";
List<SearchHit> hits = executeSearch(JOB_INDEX_NAME, request);
Assert.assertEquals(1, hits.size());

// Ensure index mappings were updated
Map<String, Object> props = getIndexMappingsAPIFlat(JOB_INDEX_NAME);
assertTrue(props.containsKey("source_config.source.ioc_upload.file_name"));
assertTrue(props.containsKey("source_config.source.s3.object_key"));
assertTrue(props.containsKey("source_config.description"));
assertTrue(props.containsKey("source_config.last_update_time"));
assertTrue(props.containsKey("source_config.refresh_type"));
}

private static SATIFSourceConfigDto getSatifSourceConfigDto() {
String feedName = "test_ioc_upload";
String feedFormat = "STIX";
SourceConfigType sourceConfigType = SourceConfigType.IOC_UPLOAD;

List<STIX2IOCDto> iocs = List.of(new STIX2IOCDto(
"id",
"name",
new IOCType(IOCType.IPV4_TYPE),
"value",
"severity",
null,
null,
"description",
List.of("labels"),
"specversion",
"feedId",
"feedName",
1L));

IocUploadSource iocUploadSource = new IocUploadSource(null, iocs);
Boolean enabled = false;
List<String> iocTypes = List.of(IOCType.IPV4_TYPE);
return new SATIFSourceConfigDto(
null,
null,
feedName,
feedFormat,
sourceConfigType,
null,
null,
null,
iocUploadSource,
null,
null,
null,
null,
null,
null,
null,
enabled,
iocTypes, true
);
}

@Override
protected boolean preserveIndicesUponCompletion() {
return false;
Expand Down

0 comments on commit 58aa310

Please sign in to comment.