Skip to content

Commit

Permalink
[Backport 2.x] Alerting Enhancements: Alerting Comments (Experimental) (
Browse files Browse the repository at this point in the history
#1561) (#1572) (#1573)

* Alerting Enhancements: Alerting Comments (Experimental) (#1561)

* initial commit, functional but needs refactoring



* refactored QueryLevelTriggerExecutionContext to not need both Alert and AlertContext field, but only AlertContext field



* misc additions and fixes



* misc cleanup



* misc changes and basic ITs



* cleanup



* renaming notes to comments



* misc changes



* changed API endpoints



* more misc changes and fixes



* misc cleanup



* updated a comment



* misc cleanup and setting refresh policy to immediate



* fixed lint issues and other restructuring



* review-based changes



* update after adding entityType to Comment model



* misc fixes



* removing comments history enabled setting



* adding release notes



* IT fixes



* removing dev code vestiges



* changing logger calls



---------




* fixed imports



---------



(cherry picked from commit f52eb2a)

Signed-off-by: Dennis Toepker <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Dennis Toepker <[email protected]>
  • Loading branch information
3 people authored Jun 12, 2024
1 parent 159c142 commit 9d6005c
Show file tree
Hide file tree
Showing 29 changed files with 2,067 additions and 41 deletions.
21 changes: 18 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -157,7 +158,7 @@ class AlertService(
workflorwRunContext: WorkflowRunContext?
): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert
val currentAlert = ctx.alertContext?.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
Expand Down Expand Up @@ -686,6 +687,8 @@ class AlertService(
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex

val commentIdsToDelete = mutableListOf<String>()

var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand Down Expand Up @@ -732,13 +735,22 @@ class AlertService(
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertsIndex, alert.id)
.routing(routingId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled()) {
// Only add completed alert to history index if history is enabled
IndexRequest(alertsHistoryIndex)
.routing(routingId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
} else null
} else {
// Otherwise, prepare the Alert's comments for deletion, and don't include
// a request to index the Alert to an Alert history index.
// The delete request can't be added to the list of DocWriteRequests because
// Comments are stored in aliased history indices, not a concrete Comments
// index like Alerts. A DeleteBy request will be used to delete Comments, instead
// of a regular Delete request
commentIdsToDelete.addAll(CommentsUtils.getCommentIDsByAlertIDs(client, listOf(alert.id)))
null
}
)
}
}
Expand All @@ -758,6 +770,9 @@ class AlertService(
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}

// delete all the comments of any Alerts that were deleted
CommentsUtils.deleteComments(client, commentIdsToDelete)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.comments.CommentsIndices
import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
Expand All @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
Expand All @@ -40,8 +42,10 @@ import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexAlertingCommentAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand All @@ -54,6 +58,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
Expand All @@ -68,8 +73,10 @@ import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexAlertingCommentAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
Expand Down Expand Up @@ -158,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"

@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val COMMENTS_BASE_URI = "/_plugins/_alerting/comments"

@JvmField val ALERTING_JOB_TYPES = listOf("monitor", "workflow")
}
Expand All @@ -166,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var commentsIndices: CommentsIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
Expand Down Expand Up @@ -203,6 +212,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAction(),
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
)
}

Expand All @@ -229,6 +241,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_COMMENT_ACTION_TYPE, TransportIndexAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.SEARCH_COMMENTS_ACTION_TYPE, TransportSearchAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
Expand Down Expand Up @@ -287,6 +302,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
Expand Down Expand Up @@ -315,6 +331,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
scheduler,
runner,
scheduledJobIndices,
commentsIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
lockService,
Expand Down Expand Up @@ -389,7 +406,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED,
AlertingSettings.ALERTING_COMMENTS_ENABLED,
AlertingSettings.COMMENTS_HISTORY_MAX_DOCS,
AlertingSettings.COMMENTS_HISTORY_INDEX_MAX_AGE,
AlertingSettings.COMMENTS_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.COMMENTS_HISTORY_RETENTION_PERIOD,
AlertingSettings.COMMENTS_MAX_CONTENT_SIZE,
AlertingSettings.MAX_COMMENTS_PER_ALERT,
AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
Expand All @@ -36,6 +38,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.Comment
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
Expand Down Expand Up @@ -274,6 +277,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// to alertsToUpdate to ensure the Alert doc is updated at the end in either case
completedAlertsToUpdate.addAll(completedAlerts)

// retrieve max Comments per Alert notification setting
val maxComments = monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.MAX_COMMENTS_PER_NOTIFICATION)

// All trigger contexts and results should be available at this point since all triggers were evaluated in the main do-while loop
val triggerCtx = triggerContexts[trigger.id]!!
val triggerResult = triggerResults[trigger.id]!!
Expand All @@ -291,9 +297,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
val alertsToExecuteActionsForIds = alertsToExecuteActionsFor.map { it.id }
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
for (alert in alertsToExecuteActionsFor) {
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)
val alertContext = if (alertCategory != AlertCategory.NEW) {
AlertContext(alert = alert, comments = allAlertsComments[alert.id])
} else {
getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs, allAlertsComments[alert.id])
}

val actionCtx = getActionContextForAlertCategory(
alertCategory,
Expand Down Expand Up @@ -329,12 +344,28 @@ object BucketLevelMonitorRunner : MonitorRunner() {
continue
}

val alertsToExecuteActionsForIds = dedupedAlerts.map { it.id }
.plus(newAlerts.map { it.id })
.plus(completedAlerts.map { it.id })
val allAlertsComments = CommentsUtils.getCommentsForAlertNotification(
monitorCtx.client!!,
alertsToExecuteActionsForIds,
maxComments
)
val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
dedupedAlerts = dedupedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
newAlerts = newAlerts.map {
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
getAlertContext(
alert = it,
alertSampleDocs = alertSampleDocs,
alertComments = allAlertsComments[it.id]
)
},
completedAlerts = completedAlerts.map {
AlertContext(alert = it, comments = allAlertsComments[it.id])
},
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
val actionResult = this.runAction(action, actionCtx, monitorCtx, monitor, dryrun)
Expand Down Expand Up @@ -537,17 +568,18 @@ object BucketLevelMonitorRunner : MonitorRunner() {
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = listOf(alertContext), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
AlertCategory.NEW ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
AlertCategory.COMPLETED ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext), error = error)
}
}

private fun getAlertContext(
alert: Alert,
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>,
alertComments: List<Comment>?
): AlertContext {
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
Expand All @@ -561,7 +593,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
alert.monitorId,
alert.executionId
)
AlertContext(alert = alert, sampleDocs = listOf())
AlertContext(alert = alert, sampleDocs = listOf(), comments = alertComments)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ abstract class MonitorRunner {
dryrun: Boolean
): ActionRunResult {
return try {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alert)) {
if (ctx is QueryLevelTriggerExecutionContext && !MonitorRunnerService.isActionActionable(action, ctx.alertContext?.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}
val actionOutput = mutableMapOf<String, String>()
Expand Down
Loading

0 comments on commit 9d6005c

Please sign in to comment.