diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 0e6b4f319..48a5a34fa 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -20,6 +20,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.firstFailureOrNull import org.opensearch.alerting.opensearchapi.retry @@ -169,6 +170,18 @@ class AlertService( ) } + // Including a list of triggered clusters for cluster metrics monitors + var triggeredClusters: MutableList? = null + logger.info("hurneyt composeQueryLevelAlert result is ClusterMetricsTriggerRunResult = {}", (result is ClusterMetricsTriggerRunResult)) + if (result is ClusterMetricsTriggerRunResult) + result.clusterTriggerResults.forEach { + logger.info("hurneyt composeQueryLevelAlert::it = {}", it) + if (it.triggered) { + if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf() + triggeredClusters!!.add(it.cluster) + } + } + // Merge the alert's error message to the current alert's history val updatedHistory = currentAlert?.errorHistory.update(alertError) return if (alertError == null && !result.triggered) { @@ -178,7 +191,8 @@ class AlertService( errorMessage = null, errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, - schemaVersion = IndexUtils.alertIndexSchemaVersion + schemaVersion = IndexUtils.alertIndexSchemaVersion, + clusters = triggeredClusters ) } else if (alertError == null && currentAlert?.isAcknowledged() == true) { null @@ -191,6 +205,7 @@ class AlertService( errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion, + clusters = triggeredClusters ) } else { val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index a53d2e915..ff14c1972 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -62,10 +62,19 @@ object QueryLevelMonitorRunner : MonitorRunner() { val updatedAlerts = mutableListOf() val triggerResults = mutableMapOf() + logger.info("hurneyt currentAlerts = {}", currentAlerts) for (trigger in monitor.triggers) { val currentAlert = currentAlerts[trigger] + logger.info("hurneyt currentAlert = {}", currentAlert) val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) - val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + val triggerResult = when (monitor.monitorType) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> + monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> + monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) + else -> + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.") + } logger.info("hurneyt triggerResult = {}", triggerResult) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index 8c64e43be..e0ecf987d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -9,14 +9,18 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.ChainedAlertTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser +import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.cluster.service.ClusterService import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH import org.opensearch.commons.alerting.model.AggregationResultBucket @@ -69,6 +73,63 @@ class TriggerService(val scriptService: ScriptService) { } } + fun runClusterMetricsTrigger( + monitor: Monitor, + trigger: QueryLevelTrigger, + ctx: QueryLevelTriggerExecutionContext, + clusterService: ClusterService + ): ClusterMetricsTriggerRunResult { + var runResult: ClusterMetricsTriggerRunResult? + try { + val inputResults = ctx.results.getOrElse(0) { mapOf() } + logger.info("hurneyt runClusterMetricsTrigger::inputResults = {}", inputResults) + var triggered = false + val clusterTriggerResults = mutableListOf() + if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) { + inputResults.forEach { clusterResult -> + logger.info("hurneyt runClusterMetricsTrigger::clusterResult = {}", clusterResult) + // Reducing the inputResults to only include results from 1 cluster at a time + val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair()))) + + val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(trigger.condition.params) + .execute(clusterTriggerCtx) + + if (clusterTriggered && !triggered) { + logger.info("hurneyt runClusterMetricsTrigger TRUE") + triggered = clusterTriggered + clusterTriggerResults + .add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered)) + } else { + logger.info("hurneyt runClusterMetricsTrigger ELSE triggered = {}", triggered) + logger.info("hurneyt runClusterMetricsTrigger ELSE clusterTriggered = {}", clusterTriggered) + } + } + logger.info("hurneyt runClusterMetricsTrigger::triggered = {}", triggered) + logger.info("hurneyt runClusterMetricsTrigger::triggeredClusters = {}", clusterTriggerResults) + } else { + logger.info("hurneyt runClusterMetricsTrigger ELSE") + triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(trigger.condition.params) + .execute(ctx) + if (triggered) clusterTriggerResults + .add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered)) + } + runResult = ClusterMetricsTriggerRunResult( + triggerName = trigger.name, + triggered = triggered, + error = null, + clusterTriggerResults = clusterTriggerResults + ) + logger.info("hurneyt runClusterMetricsTrigger::runResult = {}", runResult) + } catch (e: Exception) { + logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e) + // if the script fails we need to send an alert so set triggered = true + runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e) + } + return runResult!! + } + // TODO: improve performance and support match all and match any fun runDocLevelTrigger( monitor: Monitor, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt new file mode 100644 index 000000000..1d97bda94 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ClusterMetricsTriggerRunResult( + override var triggerName: String, + override var triggered: Boolean, + override var error: Exception?, + override var actionResults: MutableMap = mutableMapOf(), + var clusterTriggerResults: List = listOf() +) : QueryLevelTriggerRunResult( + triggerName = triggerName, + error = error, + triggered = triggered, + actionResults = actionResults +) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + clusterTriggerResults = sin.readList((ClusterTriggerResult.Companion)::readFrom) + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field(TRIGGERED_FIELD, triggered) + .field(ACTION_RESULTS_FIELD, actionResults as Map) + .field(CLUSTER_RESULTS_FIELD, clusterTriggerResults) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + clusterTriggerResults.forEach { it.writeTo(out) } + } + + companion object { + const val TRIGGERED_FIELD = "triggered" + const val ACTION_RESULTS_FIELD = "action_results" + const val CLUSTER_RESULTS_FIELD = "cluster_results" + } + + data class ClusterTriggerResult( + val cluster: String, + val triggered: Boolean, + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + cluster = sin.readString(), + triggered = sin.readBoolean() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject(cluster) + .field(TRIGGERED_FIELD, triggered) + .endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(cluster) + out.writeBoolean(triggered) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterTriggerResult { + return ClusterTriggerResult(sin) + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt index e243146fa..46750067d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt @@ -14,11 +14,11 @@ import org.opensearch.script.ScriptException import java.io.IOException import java.time.Instant -data class QueryLevelTriggerRunResult( +open class QueryLevelTriggerRunResult( override var triggerName: String, - var triggered: Boolean, + open var triggered: Boolean, override var error: Exception?, - var actionResults: MutableMap = mutableMapOf() + open var actionResults: MutableMap = mutableMapOf() ) : TriggerRunResult(triggerName, error) { @Throws(IOException::class)