Skip to content

Commit

Permalink
Refactored alert generation for cluster metrics monitors to include t…
Browse files Browse the repository at this point in the history
…riggered clusters in alerts.

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Nov 15, 2023
1 parent affb7aa commit 09abfbf
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 5 deletions.
17 changes: 16 additions & 1 deletion alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -169,6 +170,18 @@ class AlertService(
)
}

// Including a list of triggered clusters for cluster metrics monitors
var triggeredClusters: MutableList<String>? = null
logger.info("hurneyt composeQueryLevelAlert result is ClusterMetricsTriggerRunResult = {}", (result is ClusterMetricsTriggerRunResult))
if (result is ClusterMetricsTriggerRunResult)
result.clusterTriggerResults.forEach {
logger.info("hurneyt composeQueryLevelAlert::it = {}", it)
if (it.triggered) {
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf()
triggeredClusters!!.add(it.cluster)
}
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
Expand All @@ -178,7 +191,8 @@ class AlertService(
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
Expand All @@ -191,6 +205,7 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else {
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,19 @@ object QueryLevelMonitorRunner : MonitorRunner() {

val updatedAlerts = mutableListOf<Alert>()
val triggerResults = mutableMapOf<String, QueryLevelTriggerRunResult>()
logger.info("hurneyt currentAlerts = {}", currentAlerts)
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
logger.info("hurneyt currentAlert = {}", currentAlert)
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
val triggerResult = when (monitor.monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR ->
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
}

logger.info("hurneyt triggerResult = {}", triggerResult)

Expand Down
61 changes: 61 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.commons.alerting.model.AggregationResultBucket
Expand Down Expand Up @@ -69,6 +73,63 @@ class TriggerService(val scriptService: ScriptService) {
}
}

fun runClusterMetricsTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
ctx: QueryLevelTriggerExecutionContext,
clusterService: ClusterService
): ClusterMetricsTriggerRunResult {
var runResult: ClusterMetricsTriggerRunResult?
try {
val inputResults = ctx.results.getOrElse(0) { mapOf() }
logger.info("hurneyt runClusterMetricsTrigger::inputResults = {}", inputResults)
var triggered = false
val clusterTriggerResults = mutableListOf<ClusterTriggerResult>()
if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) {
inputResults.forEach { clusterResult ->
logger.info("hurneyt runClusterMetricsTrigger::clusterResult = {}", clusterResult)
// Reducing the inputResults to only include results from 1 cluster at a time
val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair())))

val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(clusterTriggerCtx)

if (clusterTriggered && !triggered) {
logger.info("hurneyt runClusterMetricsTrigger TRUE")
triggered = clusterTriggered
clusterTriggerResults
.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered))
} else {
logger.info("hurneyt runClusterMetricsTrigger ELSE triggered = {}", triggered)
logger.info("hurneyt runClusterMetricsTrigger ELSE clusterTriggered = {}", clusterTriggered)
}
}
logger.info("hurneyt runClusterMetricsTrigger::triggered = {}", triggered)
logger.info("hurneyt runClusterMetricsTrigger::triggeredClusters = {}", clusterTriggerResults)
} else {
logger.info("hurneyt runClusterMetricsTrigger ELSE")
triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(ctx)
if (triggered) clusterTriggerResults
.add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered))
}
runResult = ClusterMetricsTriggerRunResult(
triggerName = trigger.name,
triggered = triggered,
error = null,
clusterTriggerResults = clusterTriggerResults
)
logger.info("hurneyt runClusterMetricsTrigger::runResult = {}", runResult)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e)
}
return runResult!!
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.model

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.script.ScriptException
import java.io.IOException
import java.time.Instant

data class ClusterMetricsTriggerRunResult(
override var triggerName: String,
override var triggered: Boolean,
override var error: Exception?,
override var actionResults: MutableMap<String, ActionRunResult> = mutableMapOf(),
var clusterTriggerResults: List<ClusterTriggerResult> = listOf()
) : QueryLevelTriggerRunResult(
triggerName = triggerName,
error = error,
triggered = triggered,
actionResults = actionResults
) {

@Throws(IOException::class)
@Suppress("UNCHECKED_CAST")
constructor(sin: StreamInput) : this(
triggerName = sin.readString(),
error = sin.readException(),
triggered = sin.readBoolean(),
actionResults = sin.readMap() as MutableMap<String, ActionRunResult>,
clusterTriggerResults = sin.readList((ClusterTriggerResult.Companion)::readFrom)
)

override fun alertError(): AlertError? {
if (error != null) {
return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}")
}
for (actionResult in actionResults.values) {
if (actionResult.error != null) {
return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}")
}
}
return null
}

override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error)
return builder
.field(TRIGGERED_FIELD, triggered)
.field(ACTION_RESULTS_FIELD, actionResults as Map<String, ActionRunResult>)
.field(CLUSTER_RESULTS_FIELD, clusterTriggerResults)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeBoolean(triggered)
out.writeMap(actionResults as Map<String, ActionRunResult>)
clusterTriggerResults.forEach { it.writeTo(out) }
}

companion object {
const val TRIGGERED_FIELD = "triggered"
const val ACTION_RESULTS_FIELD = "action_results"
const val CLUSTER_RESULTS_FIELD = "cluster_results"
}

data class ClusterTriggerResult(
val cluster: String,
val triggered: Boolean,
) : ToXContentObject, Writeable {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
cluster = sin.readString(),
triggered = sin.readBoolean()
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject(cluster)
.field(TRIGGERED_FIELD, triggered)
.endObject()
}

override fun writeTo(out: StreamOutput) {
out.writeString(cluster)
out.writeBoolean(triggered)
}

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): ClusterTriggerResult {
return ClusterTriggerResult(sin)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import org.opensearch.script.ScriptException
import java.io.IOException
import java.time.Instant

data class QueryLevelTriggerRunResult(
open class QueryLevelTriggerRunResult(
override var triggerName: String,
var triggered: Boolean,
open var triggered: Boolean,
override var error: Exception?,
var actionResults: MutableMap<String, ActionRunResult> = mutableMapOf()
open var actionResults: MutableMap<String, ActionRunResult> = mutableMapOf()
) : TriggerRunResult(triggerName, error) {

@Throws(IOException::class)
Expand Down

0 comments on commit 09abfbf

Please sign in to comment.