From 12ff00b25af71eb35a35761b8376604e24c6b4a5 Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Tue, 3 Sep 2024 15:21:28 -0700 Subject: [PATCH] check for concurrent modification exception Signed-off-by: Joanne Wang --- .../service/SATIFSourceConfigService.java | 76 ++++++++++++------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java index 69be64b31..f0f9fd478 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigService.java @@ -56,6 +56,7 @@ import org.opensearch.securityanalytics.util.IndexUtils; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; import java.io.BufferedReader; import java.io.IOException; @@ -64,6 +65,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -204,32 +206,7 @@ private String getIndexMapping() { public void createJobIndexIfNotExists(final StepListener stepListener) { // check if job index exists if (clusterService.state().metadata().hasIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) == true) { - try { - // Check if job index contains old mapping, if so update index mapping (current version = 2) - if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) { - log.info("Old schema version found for [{}] index, updating the index mapping", SecurityAnalyticsPlugin.JOB_INDEX_NAME); - IndexUtils.updateIndexMapping( - SecurityAnalyticsPlugin.JOB_INDEX_NAME, - getIndexMapping(), clusterService.state(), client.admin().indices(), - ActionListener.wrap( - r -> { - log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME); - stepListener.onResponse(null); - }, e -> { - log.error("Failed to update job index mapping", e); - stepListener.onFailure(e); - } - ), - false - ); - } else { - // If job index contains newest mapping, then do nothing - stepListener.onResponse(null); - } - } catch (IOException e) { - log.error("Failed to check and update job index mapping", e); - stepListener.onFailure(e); - } + checkAndUpdateJobIndexMapping(stepListener); } else { final CreateIndexRequest createIndexRequest = new CreateIndexRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME).mapping(getIndexMapping()) .settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING); @@ -250,6 +227,53 @@ public void createJobIndexIfNotExists(final StepListener stepListener) { } } + private void checkAndUpdateJobIndexMapping(StepListener stepListener) { + try { + // Check if job index contains old mapping, if so update index mapping (current version = 2) + if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) { + log.info("Old schema version found for [{}] index, updating the index mapping", SecurityAnalyticsPlugin.JOB_INDEX_NAME); + IndexUtils.updateIndexMapping( + SecurityAnalyticsPlugin.JOB_INDEX_NAME, + getIndexMapping(), clusterService.state(), client.admin().indices(), + ActionListener.wrap( + r -> { + log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME); + stepListener.onResponse(null); + }, e -> { + if (e instanceof ConcurrentModificationException || (e instanceof RemoteTransportException && e.getCause() instanceof ConcurrentModificationException)) + { + try { + // Check if job index still contains older mapping + if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) { + log.error("Job index still contains older mapping, failed to update job index mapping", e); + stepListener.onFailure(e); + } else { + // If job index contains newest mapping, then return success + log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME); + stepListener.onResponse(null); + } + } catch (IOException exception) { + log.error("Failed to check if job index contains older mapping. Failed to update job index mapping", e); + stepListener.onFailure(e); + } + } else { + log.error("Failed to update job index mapping", e); + stepListener.onFailure(e); + } + } + ), + false + ); + } else { + // If job index contains newest mapping, then do nothing + stepListener.onResponse(null); + } + } catch (IOException e) { + log.error("Failed to check and update job index mapping", e); + stepListener.onFailure(e); + } + } + // Get TIF source config public void getTIFSourceConfig( String tifSourceConfigId,