Skip to content

Commit

Permalink
Refactoring the 3 new API into 1.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Oct 3, 2023
1 parent 8ba4394 commit 3241892
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@

package org.opensearch.alerting.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import java.io.IOException

private val log = LogManager.getLogger(GetRemoteIndexesRequest::class.java)

class GetRemoteIndexesRequest : ActionRequest {
var clusterAliases: List<String> = listOf()
var includeMappings: Boolean

constructor(clusterAliases: List<String>) : super() {
constructor(clusterAliases: List<String>, includeMappings: Boolean) : super() {
this.clusterAliases = clusterAliases
this.includeMappings = includeMappings
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readStringList()
sin.readStringList(),
sin.readBoolean()
)

override fun validate(): ActionRequestValidationException? {
Expand All @@ -30,9 +36,12 @@ class GetRemoteIndexesRequest : ActionRequest {
@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringArray(clusterAliases.toTypedArray())
out.writeBoolean(includeMappings)
}

companion object {
// TODO hurneyt: is this companion needed?
const val CLUSTERS_FIELD = "clusters"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package org.opensearch.alerting.action

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionResponse
import org.opensearch.cluster.health.ClusterHealthStatus
import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
Expand All @@ -15,6 +17,8 @@ import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

private val log = LogManager.getLogger(GetRemoteIndexesResponse::class.java)

class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {
var clusters: List<ClusterInfo> = emptyList()

Expand Down Expand Up @@ -43,35 +47,48 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {
}

data class ClusterInfo(
val clusterAlias: String,
val clusterName: String,
val clusterHealth: ClusterHealthStatus,
// val connected: Boolean,
val hubCluster: Boolean,
val indexes: List<IndexInfo> = listOf(),
val latency: Long
) : ToXContentObject, Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
clusterAlias = sin.readString(),
clusterName = sin.readString(),
clusterHealth = sin.readEnum(ClusterHealthStatus::class.java),
// connected = sin.readBoolean(),
hubCluster = sin.readBoolean(),
indexes = sin.readList((IndexInfo.Companion)::readFrom),
latency = sin.readLong()
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(CLUSTER_ALIAS_FIELD, clusterAlias)
.field(CLUSTER_NAME_FIELD, clusterName)
.field(CLUSTER_HEALTH_FIELD, clusterHealth)
// .field(CONNECTED_FIELD, connected)
.field(HUB_CLUSTER_FIELD, hubCluster)
.field(INDEX_LATENCY_FIELD, latency)
.startArray(INDEXES_FIELD)
indexes.forEach { it.toXContent(builder, params) }
return builder.endArray().endObject()
}

override fun writeTo(out: StreamOutput) {
out.writeString(clusterAlias)
out.writeString(clusterName)
out.writeEnum(clusterHealth)
indexes.forEach { it.writeTo(out) }
out.writeLong(latency)
}

companion object {
const val CLUSTER_ALIAS_FIELD = "cluster"
const val CLUSTER_NAME_FIELD = "cluster"
const val CLUSTER_HEALTH_FIELD = "health"
const val CONNECTED_FIELD = "connected"
const val HUB_CLUSTER_FIELD = "hub_cluster"
const val INDEXES_FIELD = "indexes"
const val INDEX_LATENCY_FIELD = "latency"

Expand All @@ -84,30 +101,36 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

data class IndexInfo(
val indexName: String,
val indexHealth: ClusterHealthStatus
val indexHealth: ClusterHealthStatus,
val mappings: MappingMetadata?
) : ToXContentObject, Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
indexName = sin.readString(),
indexHealth = sin.readEnum(ClusterHealthStatus::class.java),
mappings = sin.readOptionalWriteable(::MappingMetadata)
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
builder.startObject()
.field(INDEX_NAME_FIELD, indexName)
.field(INDEX_HEALTH_FIELD, indexHealth)
.endObject()
if (mappings == null) builder.startObject(GetRemoteIndexesMappingsResponse.MAPPINGS_FIELD).endObject()
else builder.field(MAPPINGS_FIELD, mappings.sourceAsMap())
return builder.endObject()
}

override fun writeTo(out: StreamOutput) {
out.writeString(indexName)
out.writeEnum(indexHealth)
if (mappings != null) out.writeMap(mappings.sourceAsMap)
}

companion object {
const val INDEX_NAME_FIELD = "name"
const val INDEX_HEALTH_FIELD = "health"
const val MAPPINGS_FIELD = "mappings"

@JvmStatic
@Throws(IOException::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.GetRemoteIndexesRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.common.Strings
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
Expand All @@ -20,27 +21,41 @@ import org.opensearch.rest.action.RestToXContentListener
private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java)

class RestGetRemoteIndexesAction : BaseRestHandler() {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes/{name}"

override fun getName(): String {
return "get_remote_indexes_action"
}

override fun routes(): List<RestHandler.Route> {
return mutableListOf(
RestHandler.Route(RestRequest.Method.GET, ROUTE),
RestHandler.Route(RestRequest.Method.POST, ROUTE)
RestHandler.Route(RestRequest.Method.GET, ROUTE)
// ,
// RestHandler.Route(RestRequest.Method.POST, ROUTE)
)
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} $ROUTE")
val clusterAliases = getClusterAliases(request.contentParser())
log.info("hurneyt RestGetRemoteIndexesAction::request = {}", request)

val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false)
log.info("hurneyt RestGetRemoteIndexesAction::includeMappings = {}", includeMappings)

// val clusterAliases = getClusterAliases(request.contentParser())
val clusterField = request.paramAsStringArray(GetRemoteIndexesRequest.CLUSTERS_FIELD, emptyArray<String>())
log.info("hurneyt RestGetRemoteIndexesAction::clusterField = ", clusterField)

val indexes = Strings.splitStringByCommaToArray(request.param("name"))
log.info("hurneyt RestGetRemoteIndexesAction::indexes = ", indexes)

val indexes2 = request.paramAsStringArray("name", emptyArray<String>())
log.info("hurneyt RestGetRemoteIndexesAction::indexes2 = ", indexes2)
return RestChannelConsumer {
channel ->
client.execute(
GetRemoteIndexesAction.INSTANCE,
GetRemoteIndexesRequest(clusterAliases),
GetRemoteIndexesRequest(indexes.toList(), includeMappings),
RestToXContentListener(channel)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.action.admin.indices.resolve.ResolveIndexAction
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.IndicesOptions
Expand All @@ -22,6 +25,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterInfo
import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterInfo.IndexInfo
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
Expand All @@ -31,7 +35,6 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Duration
import java.time.Instant
import kotlin.streams.toList

private val log = LogManager.getLogger(TransportGetRemoteIndexesAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand All @@ -54,63 +57,97 @@ class TransportGetRemoteIndexesAction @Inject constructor(
request: GetRemoteIndexesRequest,
listener: ActionListener<GetRemoteIndexesResponse>
) {
var clusterAliases = transportService.remoteClusterService.remoteConnectionInfos.toList()

log.info("hurneyt filter clusterAliases BEFORE = $clusterAliases")

// If clusters are specified, filter out unspecified clusters
if (request.clusterAliases.isNotEmpty())
clusterAliases = clusterAliases.filter { request.clusterAliases.contains(it.clusterAlias) }.toList()

log.info("hurneyt clusterAliases AFTER = $clusterAliases")
log.info("hurneyt TransportGetRemoteIndexesAction START")
client.threadPool().threadContext.stashContext().use {
scope.launch {
val clusterInfos = mutableListOf<ClusterInfo>()

var resolveIndexResponse: ResolveIndexAction.Response? = null
try {
val clusterName = clusterService.clusterName.value()
if (request.clusterAliases.contains(clusterName)) {
clusterInfos.add(getRemoteIndexes(clusterName))
resolveIndexResponse = getRemoteClusters(request.clusterAliases)
} catch (e: Exception) {
log.error("Failed to retrieve indexes for request $request", e)
listener.onFailure(AlertingException.wrap(e))
}

val resolvedIndexes = resolveIndexResponse?.indices?.map { it.name } ?: emptyList()
log.info("hurneyt TransportGetRemoteIndexesAction::resolvedIndexes = {}", resolvedIndexes)

val clusterIndexesMap = CrossClusterMonitorUtils.separateClusterIndexes(resolvedIndexes, clusterService)
log.info("hurneyt TransportGetRemoteIndexesAction::clusterIndexesMap = {}", clusterIndexesMap)

clusterIndexesMap.forEach { (clusterName, indexes) ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(clusterName, client, clusterService)

val startTime = Instant.now()
var clusterHealthResponse: ClusterHealthResponse? = null
try {
clusterHealthResponse = getHealthStatuses(targetClient, indexes)
} catch (e: Exception) {
log.error("Failed to retrieve health statuses for request $request", e)
listener.onFailure(AlertingException.wrap(e))
}
val endTime = Instant.now()
val latency = Duration.between(startTime, endTime).toMillis()

clusterAliases.forEach {
if (it.isConnected) {
clusterInfos.add(getRemoteIndexes(it.clusterAlias))
var mappingsResponse: GetMappingsResponse? = null
if (request.includeMappings) {
try {
mappingsResponse = getIndexMappings(targetClient, indexes)
} catch (e: Exception) {
log.error("Failed to retrieve mappings for request $request", e)
listener.onFailure(AlertingException.wrap(e))
}
}
} catch (e: Exception) {
log.error("Failed to retrieve index stats for request $request", e)
listener.onFailure(AlertingException.wrap(e))

val indexInfos = mutableListOf<IndexInfo>()
indexes.forEach {
indexInfos.add(
IndexInfo(
indexName = it,
indexHealth = clusterHealthResponse!!.indices[it]!!.status,
mappings = mappingsResponse?.mappings?.get(it)
)
)
}
clusterInfos.add(
ClusterInfo(
clusterName = clusterName,
clusterHealth = clusterHealthResponse!!.status,
hubCluster = clusterName == clusterService.clusterName.value(),
indexes = listOf(),
latency = latency
)
)
}
log.info("hurneyt TransportGetRemoteIndexesAction END")
listener.onResponse(GetRemoteIndexesResponse(clusters = clusterInfos))
}
}
}

private suspend fun getRemoteIndexes(clusterAlias: String): ClusterInfo {
log.info("hurneyt getRemoteIndexes::clusterAlias = $clusterAlias")
val targetClient =
if (clusterService.clusterName.value() == clusterAlias) {
log.info("hurneyt getRemoteIndexes LOCAL CLIENT")
client
} else {
log.info("hurneyt getRemoteIndexes REMOTE CLIENT")
client.getRemoteClusterClient(clusterAlias)
}

val clusterHealthRequest = ClusterHealthRequest().indicesOptions(IndicesOptions.lenientExpandHidden())

val startTime = Instant.now()
val clusterHealthResponse: ClusterHealthResponse =
targetClient.suspendUntil { admin().cluster().health(clusterHealthRequest, it) }
val endTime = Instant.now()

// Manually calculating the latency of ClusterHealth call as the API does not return that metric
val latency = Duration.between(startTime, endTime).toMillis()
private suspend fun getRemoteClusters(unparsedIndexes: List<String>): ResolveIndexAction.Response {
val resolveRequest = ResolveIndexAction.Request(
unparsedIndexes.toTypedArray(),
ResolveIndexAction.Request.DEFAULT_INDICES_OPTIONS
)
return client.suspendUntil {
admin().indices().resolveIndex(resolveRequest, it)
}
}
private suspend fun getHealthStatuses(targetClient: Client, parsedIndexesNames: List<String>): ClusterHealthResponse {
val clusterHealthRequest = ClusterHealthRequest()
.indices(*parsedIndexesNames.toTypedArray())
.indicesOptions(IndicesOptions.lenientExpandHidden())
return targetClient.suspendUntil {
admin().cluster().health(clusterHealthRequest, it)
}
}

val indexInfos = mutableListOf<IndexInfo>()
clusterHealthResponse.indices.forEach { (index, health) ->
indexInfos.add(IndexInfo(index, health.status))
private suspend fun getIndexMappings(targetClient: Client, parsedIndexNames: List<String>): GetMappingsResponse {
val getMappingsRequest = GetMappingsRequest().indices(*parsedIndexNames.toTypedArray())
return targetClient.suspendUntil {
admin().indices().getMappings(getMappingsRequest, it)
}
return ClusterInfo(clusterAlias, indexInfos, latency)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class CrossClusterMonitorUtils {
@JvmStatic
fun getClientForIndex(index: String, client: Client, clusterService: ClusterService): Client {
val clusterName = parseClusterName(index)
log.info("hurneyt getClientForIndex::clusterName = $clusterName")
return if (clusterName.isNotEmpty() && clusterName != clusterService.clusterName.value()) {
log.info("hurneyt getClient REMOTE")
log.info("hurneyt getClientForIndex REMOTE")
client.getRemoteClusterClient(clusterName)
} else {
log.info("hurneyt getClient LOCAL")
log.info("hurneyt getClientForIndex LOCAL")
client
}
}
Expand Down

0 comments on commit 3241892

Please sign in to comment.