Skip to content

Commit

Permalink
Troubleshooting.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Feb 5, 2024
1 parent 07d9efe commit a2c5c10
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -132,19 +134,23 @@ class InputService(
client.threadPool().threadContext.stashContext().use {
val responseMap = mutableMapOf<String, Map<String, Any>>()
scope.launch {
val deferredResults = input.clusters.map { cluster ->
async {
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
cluster to response.toMap()
val singleThreadContext = newSingleThreadContext("ClusterMetricsInputThread")
withContext(singleThreadContext) {
val deferredResults = input.clusters.map { cluster ->
async {
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
cluster to response.toMap()
}
}
}

val awaitedResults = deferredResults.awaitAll()
responseMap.putAll(awaitedResults)
val awaitedResults = deferredResults.awaitAll()
responseMap.putAll(awaitedResults)
results += responseMap
}

// input.clusters.forEach { cluster ->
// val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
Expand All @@ -155,7 +161,6 @@ class InputService(
// responseMap[cluster] = response.toMap()
// }
}
results += responseMap
}
// todo hurneyt delete?
// while (responseMap.size < input.clusters.size) {
Expand Down

0 comments on commit a2c5c10

Please sign in to comment.