Skip to content

Commit

Permalink
Adjusted thread stashing.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Feb 2, 2024
1 parent 695ed1f commit d884eed
Showing 1 changed file with 67 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package org.opensearch.alerting.transport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
Expand Down Expand Up @@ -82,83 +84,86 @@ class TransportGetRemoteIndexesAction @Inject constructor(

client.threadPool().threadContext.stashContext().use {
scope.launch {
it.restore()
val clusterIndexesList = mutableListOf<ClusterIndexes>()

var resolveIndexResponse: ResolveIndexAction.Response? = null
try {
resolveIndexResponse = getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService))
log.info("hurneyt TransportGetRemoteIndexesAction::resolveIndexResponse = {}", resolveIndexResponse.convertToMap())
} catch (e: Exception) {
log.error("Failed to retrieve indexes for request $request", e)
actionListener.onFailure(AlertingException.wrap(e))
}

val resolvedIndexes: MutableList<String> = mutableListOf()
if (resolveIndexResponse != null) {
resolveIndexResponse.indices.forEach { resolvedIndexes.add(it.name) }
resolveIndexResponse.aliases.forEach { resolvedIndexes.add(it.name) }
}
log.info("hurneyt TransportGetRemoteIndexesAction::resolvedIndexes = {}", resolvedIndexes)

val clusterIndexesMap = CrossClusterMonitorUtils.separateClusterIndexes(resolvedIndexes, clusterService)
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap = {}", clusterIndexesMap)

clusterIndexesMap.forEach { (clusterName, indexes) ->
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap clusterName = {}", clusterName)
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap indexes = {}", indexes)
val targetClient = CrossClusterMonitorUtils.getClientForCluster(clusterName, client, clusterService)
val singleThreadContext = newSingleThreadContext("${GetRemoteIndexesAction.NAME}_thread")
withContext(singleThreadContext) {
it.restore()
val clusterIndexesList = mutableListOf<ClusterIndexes>()

val startTime = Instant.now()
var clusterHealthResponse: ClusterHealthResponse? = null
var resolveIndexResponse: ResolveIndexAction.Response? = null
try {
clusterHealthResponse = getHealthStatuses(targetClient, indexes)
resolveIndexResponse = getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService))
log.info("hurneyt TransportGetRemoteIndexesAction::resolveIndexResponse = {}", resolveIndexResponse.convertToMap())
} catch (e: Exception) {
log.error("Failed to retrieve health statuses for request $request", e)
log.error("Failed to retrieve indexes for request $request", e)
actionListener.onFailure(AlertingException.wrap(e))
}
val endTime = Instant.now()

log.info("hurneyt TransportGetRemoteIndexesAction::clusterHealthResponse keys = {}", clusterHealthResponse?.indices?.keys)
val resolvedIndexes: MutableList<String> = mutableListOf()
if (resolveIndexResponse != null) {
resolveIndexResponse.indices.forEach { resolvedIndexes.add(it.name) }
resolveIndexResponse.aliases.forEach { resolvedIndexes.add(it.name) }
}
log.info("hurneyt TransportGetRemoteIndexesAction::resolvedIndexes = {}", resolvedIndexes)

val clusterIndexesMap = CrossClusterMonitorUtils.separateClusterIndexes(resolvedIndexes, clusterService)
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap = {}", clusterIndexesMap)

val latency = Duration.between(startTime, endTime).toMillis()
clusterIndexesMap.forEach { (clusterName, indexes) ->
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap clusterName = {}", clusterName)
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap indexes = {}", indexes)
val targetClient = CrossClusterMonitorUtils.getClientForCluster(clusterName, client, clusterService)

var mappingsResponse: GetMappingsResponse? = null
if (request.includeMappings) {
val startTime = Instant.now()
var clusterHealthResponse: ClusterHealthResponse? = null
try {
mappingsResponse = getIndexMappings(targetClient, indexes)
clusterHealthResponse = getHealthStatuses(targetClient, indexes)
} catch (e: Exception) {
log.error("Failed to retrieve mappings for request $request", e)
log.error("Failed to retrieve health statuses for request $request", e)
actionListener.onFailure(AlertingException.wrap(e))
}
}
val endTime = Instant.now()

log.info("hurneyt TransportGetRemoteIndexesAction::clusterHealthResponse keys = {}", clusterHealthResponse?.indices?.keys)

val clusterIndexList = mutableListOf<ClusterIndex>()
if (clusterHealthResponse != null) {
indexes.forEach {
clusterIndexList.add(
ClusterIndex(
indexName = it,
indexHealth = clusterHealthResponse.indices[it]!!.status,
mappings = mappingsResponse?.mappings?.get(it)
val latency = Duration.between(startTime, endTime).toMillis()

var mappingsResponse: GetMappingsResponse? = null
if (request.includeMappings) {
try {
mappingsResponse = getIndexMappings(targetClient, indexes)
} catch (e: Exception) {
log.error("Failed to retrieve mappings for request $request", e)
actionListener.onFailure(AlertingException.wrap(e))
}
}

val clusterIndexList = mutableListOf<ClusterIndex>()
if (clusterHealthResponse != null) {
indexes.forEach {
clusterIndexList.add(
ClusterIndex(
indexName = it,
indexHealth = clusterHealthResponse.indices[it]!!.status,
mappings = mappingsResponse?.mappings?.get(it)
)
)
)
}
}
}

clusterIndexesList.add(
ClusterIndexes(
clusterName = clusterName,
clusterHealth = clusterHealthResponse!!.status,
hubCluster = clusterName == clusterService.clusterName.value(),
indexes = clusterIndexList,
latency = latency
clusterIndexesList.add(
ClusterIndexes(
clusterName = clusterName,
clusterHealth = clusterHealthResponse!!.status,
hubCluster = clusterName == clusterService.clusterName.value(),
indexes = clusterIndexList,
latency = latency
)
)
)
}
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesList = {}", clusterIndexesList)
log.info("hurneyt TransportGetRemoteIndexesAction END")
actionListener.onResponse(GetRemoteIndexesResponse(clusterIndexes = clusterIndexesList))
}
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesList = {}", clusterIndexesList)
log.info("hurneyt TransportGetRemoteIndexesAction END")
actionListener.onResponse(GetRemoteIndexesResponse(clusterIndexes = clusterIndexesList))
}
}
}
Expand All @@ -179,11 +184,9 @@ class TransportGetRemoteIndexesAction @Inject constructor(
.indices(*parsedIndexesNames.toTypedArray())
.indicesOptions(IndicesOptions.lenientExpandHidden())

return client.suspendUntil { targetClient.admin().cluster().health(clusterHealthRequest, it) }
// TODO hurneyt delete
// return targetClient.suspendUntil {
// admin().cluster().health(clusterHealthRequest, it)
// }
return targetClient.suspendUntil {
admin().cluster().health(clusterHealthRequest, it)
}
}

private suspend fun getIndexMappings(targetClient: Client, parsedIndexNames: List<String>): GetMappingsResponse {
Expand Down

0 comments on commit d884eed

Please sign in to comment.