Skip to content

Commit

Permalink
check for concurrent modification exception
Browse files Browse the repository at this point in the history
Signed-off-by: Joanne Wang <[email protected]>
  • Loading branch information
jowg-amazon committed Sep 3, 2024
1 parent 17670c5 commit 12ff00b
Showing 1 changed file with 50 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -64,6 +65,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -204,32 +206,7 @@ private String getIndexMapping() {
public void createJobIndexIfNotExists(final StepListener<Void> stepListener) {
// check if job index exists
if (clusterService.state().metadata().hasIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) == true) {
try {
// Check if job index contains old mapping, if so update index mapping (current version = 2)
if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) {
log.info("Old schema version found for [{}] index, updating the index mapping", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
IndexUtils.updateIndexMapping(
SecurityAnalyticsPlugin.JOB_INDEX_NAME,
getIndexMapping(), clusterService.state(), client.admin().indices(),
ActionListener.wrap(
r -> {
log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}, e -> {
log.error("Failed to update job index mapping", e);
stepListener.onFailure(e);
}
),
false
);
} else {
// If job index contains newest mapping, then do nothing
stepListener.onResponse(null);
}
} catch (IOException e) {
log.error("Failed to check and update job index mapping", e);
stepListener.onFailure(e);
}
checkAndUpdateJobIndexMapping(stepListener);
} else {
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME).mapping(getIndexMapping())
.settings(SecurityAnalyticsPlugin.TIF_JOB_INDEX_SETTING);
Expand All @@ -250,6 +227,53 @@ public void createJobIndexIfNotExists(final StepListener<Void> stepListener) {
}
}

private void checkAndUpdateJobIndexMapping(StepListener<Void> stepListener) {
try {
// Check if job index contains old mapping, if so update index mapping (current version = 2)
if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) {
log.info("Old schema version found for [{}] index, updating the index mapping", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
IndexUtils.updateIndexMapping(
SecurityAnalyticsPlugin.JOB_INDEX_NAME,
getIndexMapping(), clusterService.state(), client.admin().indices(),
ActionListener.wrap(
r -> {
log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}, e -> {
if (e instanceof ConcurrentModificationException || (e instanceof RemoteTransportException && e.getCause() instanceof ConcurrentModificationException))
{
try {
// Check if job index still contains older mapping
if (shouldUpdateIndex(clusterService.state().metadata().index(SecurityAnalyticsPlugin.JOB_INDEX_NAME), getIndexMapping())) {
log.error("Job index still contains older mapping, failed to update job index mapping", e);
stepListener.onFailure(e);
} else {
// If job index contains newest mapping, then return success
log.info("Successfully updated index mapping for [{}] index", SecurityAnalyticsPlugin.JOB_INDEX_NAME);
stepListener.onResponse(null);
}
} catch (IOException exception) {
log.error("Failed to check if job index contains older mapping. Failed to update job index mapping", e);
stepListener.onFailure(e);
}
} else {
log.error("Failed to update job index mapping", e);
stepListener.onFailure(e);
}
}
),
false
);
} else {
// If job index contains newest mapping, then do nothing
stepListener.onResponse(null);
}
} catch (IOException e) {
log.error("Failed to check and update job index mapping", e);
stepListener.onFailure(e);
}
}

// Get TIF source config
public void getTIFSourceConfig(
String tifSourceConfigId,
Expand Down

0 comments on commit 12ff00b

Please sign in to comment.