diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 80ccef0f6..64abf83b8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -7,11 +7,7 @@ package org.opensearch.alerting import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll import kotlinx.coroutines.launch -import kotlinx.coroutines.newSingleThreadContext -import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -127,45 +123,28 @@ class InputService( logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT) // todo hurneyt make debug logger.info("ClusterMetricsInput remoteMonitoringEnabled: $remoteMonitoringEnabled") + val responseMap = mutableMapOf>() if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { client.threadPool().threadContext.stashContext().use { - val responseMap = mutableMapOf>() scope.launch { - val singleThreadContext = newSingleThreadContext("ClusterMetricsInputThread") - withContext(singleThreadContext) { - val deferredResults = input.clusters.map { cluster -> - async { - val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) - val response = executeTransportAction(input, targetClient) - // Not all supported API reference the cluster name in their response. - // Mapping each response to the cluster name before adding to results. - // Not adding this same logic for local-only monitors to avoid breaking existing monitors. - cluster to response.toMap() - } - } - - val awaitedResults = deferredResults.awaitAll() - responseMap.putAll(awaitedResults) - results += responseMap + input.clusters.forEach { cluster -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) + val response = executeTransportAction(input, targetClient) + // Not all supported API reference the cluster name in their response. + // Mapping each response to the cluster name before adding to results. + // Not adding this same logic for local-only monitors to avoid breaking existing monitors. + responseMap[cluster] = response.toMap() } - -// input.clusters.forEach { cluster -> -// val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) -// val response = executeTransportAction(input, targetClient) -// // Not all supported API reference the cluster name in their response. -// // Mapping each response to the cluster name before adding to results. -// // Not adding this same logic for local-only monitors to avoid breaking existing monitors. -// responseMap[cluster] = response.toMap() -// } } } // todo hurneyt delete? -// while (responseMap.size < input.clusters.size) { -// } -// results += responseMap + val startTime = Instant.now().toEpochMilli() + while ((Instant.now().toEpochMilli() - startTime < inputTimeout.millis) || (responseMap.size < input.clusters.size)) { /* Wait for responses */ } + results += responseMap } else { val response = executeTransportAction(input, client) results += response.toMap()