From 2a28941748bf00a9bb2d9d51e16c23280ce64b96 Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Mon, 26 Aug 2024 17:05:46 -0700 Subject: [PATCH] fix multinode Signed-off-by: Joanne Wang --- .../services/STIX2IOCFeedStore.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/services/STIX2IOCFeedStore.java b/src/main/java/org/opensearch/securityanalytics/services/STIX2IOCFeedStore.java index 7659cdba8..0eddc8f88 100644 --- a/src/main/java/org/opensearch/securityanalytics/services/STIX2IOCFeedStore.java +++ b/src/main/java/org/opensearch/securityanalytics/services/STIX2IOCFeedStore.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.bulk.BulkRequest; @@ -80,7 +81,6 @@ public STIX2IOCFeedStore( this.baseListener = listener; batchSize = clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE); newActiveIndex = getNewActiveIndex(saTifSourceConfig.getId()); - initSourceConfigIndexes(); } @Override @@ -113,7 +113,15 @@ public void storeIOCs(Map actionToIOCs) { } public void indexIocs(List iocs) throws IOException { - bulkIndexIocs(iocs, newActiveIndex); + StepListener initSourceConfigIndexesListener = new StepListener<>(); + initSourceConfigIndexes(initSourceConfigIndexesListener); + initSourceConfigIndexesListener.whenComplete(r -> { + bulkIndexIocs(iocs, newActiveIndex); + }, e -> { + log.error("Failed to init source config indexes"); + baseListener.onFailure(e); + }); + } private void bulkIndexIocs(List iocs, String activeIndex) throws IOException { @@ -197,7 +205,7 @@ public SATIFSourceConfig getSaTifSourceConfig() { return saTifSourceConfig; } - private void initSourceConfigIndexes() { + private void initSourceConfigIndexes(StepListener stepListener) { String iocIndexPattern = getAllIocIndexPatternById(saTifSourceConfig.getId()); initFeedIndex(newActiveIndex, ActionListener.wrap( r -> { @@ -214,10 +222,10 @@ private void initSourceConfigIndexes() { ((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocToIndexDetails().add(iocToIndexDetails); } }); - + stepListener.onResponse(null); }, e-> { log.error("Failed to initialize the IOC index and save the IOCs", e); - baseListener.onFailure(e); + stepListener.onFailure(e); } )); }