diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 2829bc26a..80ccef0f6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -10,6 +10,8 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -132,19 +134,23 @@ class InputService( client.threadPool().threadContext.stashContext().use { val responseMap = mutableMapOf>() scope.launch { - val deferredResults = input.clusters.map { cluster -> - async { - val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) - val response = executeTransportAction(input, targetClient) - // Not all supported API reference the cluster name in their response. - // Mapping each response to the cluster name before adding to results. - // Not adding this same logic for local-only monitors to avoid breaking existing monitors. - cluster to response.toMap() + val singleThreadContext = newSingleThreadContext("ClusterMetricsInputThread") + withContext(singleThreadContext) { + val deferredResults = input.clusters.map { cluster -> + async { + val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) + val response = executeTransportAction(input, targetClient) + // Not all supported API reference the cluster name in their response. + // Mapping each response to the cluster name before adding to results. + // Not adding this same logic for local-only monitors to avoid breaking existing monitors. + cluster to response.toMap() + } } - } - val awaitedResults = deferredResults.awaitAll() - responseMap.putAll(awaitedResults) + val awaitedResults = deferredResults.awaitAll() + responseMap.putAll(awaitedResults) + results += responseMap + } // input.clusters.forEach { cluster -> // val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) @@ -155,7 +161,6 @@ class InputService( // responseMap[cluster] = response.toMap() // } } - results += responseMap } // todo hurneyt delete? // while (responseMap.size < input.clusters.size) {