Skip to content

Commit

Permalink
Troubleshooting.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Feb 5, 2024
1 parent a2c5c10 commit 22cd8eb
Showing 1 changed file with 12 additions and 33 deletions.
45 changes: 12 additions & 33 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -127,45 +123,28 @@ class InputService(
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
// todo hurneyt make debug
logger.info("ClusterMetricsInput remoteMonitoringEnabled: $remoteMonitoringEnabled")

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
val responseMap = mutableMapOf<String, Map<String, Any>>()
scope.launch {
val singleThreadContext = newSingleThreadContext("ClusterMetricsInputThread")
withContext(singleThreadContext) {
val deferredResults = input.clusters.map { cluster ->
async {
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
cluster to response.toMap()
}
}

val awaitedResults = deferredResults.awaitAll()
responseMap.putAll(awaitedResults)
results += responseMap
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}

// input.clusters.forEach { cluster ->
// val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
// val response = executeTransportAction(input, targetClient)
// // Not all supported API reference the cluster name in their response.
// // Mapping each response to the cluster name before adding to results.
// // Not adding this same logic for local-only monitors to avoid breaking existing monitors.
// responseMap[cluster] = response.toMap()
// }
}
}
// todo hurneyt delete?
// while (responseMap.size < input.clusters.size) {
// }
// results += responseMap
val startTime = Instant.now().toEpochMilli()
while ((Instant.now().toEpochMilli() - startTime < inputTimeout.millis) || (responseMap.size < input.clusters.size)) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
Expand Down

0 comments on commit 22cd8eb

Please sign in to comment.