Skip to content

Commit

Permalink
commits to support remote monitors in alerting (#1589)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored Jul 3, 2024
1 parent e97f5b8 commit cd8e189
Show file tree
Hide file tree
Showing 107 changed files with 2,640 additions and 1,965 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ jobs:
export SONATYPE_PASSWORD=$(aws secretsmanager get-secret-value --secret-id maven-snapshots-password --query SecretString --output text)
echo "::add-mask::$SONATYPE_USERNAME"
echo "::add-mask::$SONATYPE_PASSWORD"
./gradlew publishShadowPublicationToSnapshotsRepository
./gradlew publishPluginZipPublicationToSnapshotsRepository
2 changes: 1 addition & 1 deletion .github/workflows/test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
java-version: ${{ matrix.java }}
- name: Build and run with Gradle
working-directory: ${{ env.WORKING_DIR }}
run: ./gradlew assemble integTest ${{ env.BUILD_ARGS }}
run: ./gradlew assemble ${{ env.BUILD_ARGS }}
env:
_JAVA_OPTIONS: ${{ matrix.os_java_options }}
- name: Create Artifact Path
Expand Down
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin
import java.util.concurrent.Callable
import org.opensearch.gradle.test.RestIntegTestTask
import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask
Expand Down Expand Up @@ -109,6 +110,7 @@ dependencies {

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.4.1"
implementation project(path: ":alerting-spi", configuration: 'shadow')

testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
10 changes: 5 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand All @@ -32,7 +28,6 @@ import org.opensearch.alerting.util.CommentsUtils
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
Expand All @@ -41,14 +36,19 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.alerts.AlertError
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.commons.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.NoOpTrigger
import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.bytes.BytesReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.alerting

import org.opensearch.action.ActionRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
Expand All @@ -26,6 +25,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteAlertingCommentAction
Expand Down Expand Up @@ -56,6 +56,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteAlertingCommentAction
Expand Down Expand Up @@ -92,6 +93,7 @@ import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsFilter
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedAlertTrigger
Expand All @@ -103,6 +105,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.common.io.stream.StreamInput
Expand All @@ -117,6 +120,7 @@ import org.opensearch.painless.spi.Whitelist
import org.opensearch.painless.spi.WhitelistLoader
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand Down Expand Up @@ -180,6 +184,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
var monitorTypeToMonitorRunners: MutableMap<String, RemoteMonitorRegistry> = mutableMapOf()

override fun getRestHandlers(
settings: Settings,
Expand Down Expand Up @@ -260,6 +265,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down Expand Up @@ -301,6 +307,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
.registerRemoteMonitors(monitorTypeToMonitorRunners)
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
commentsIndices = CommentsIndices(environment.settings(), client, threadPool, clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
Expand Down Expand Up @@ -443,4 +450,20 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
)
)
}

override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) {
for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) {
val monitorTypesToMonitorRunners = monitorExtension.getMonitorTypesToMonitorRunners()

for (monitorTypeToMonitorRunner in monitorTypesToMonitorRunners) {
val monitorType = monitorTypeToMonitorRunner.key
val monitorRunner = monitorTypeToMonitorRunner.value

if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) {
val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner)
this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.retry
Expand All @@ -31,17 +27,21 @@ import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.commons.alerting.model.Comment
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ import org.opensearch.ExceptionsHelper
import org.opensearch.Version
import org.opensearch.action.ActionListenerResponseHandler
import org.opensearch.action.support.GroupedActionListener
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutRequest
import org.opensearch.commons.alerting.action.DocLevelMonitorFanOutResponse
import org.opensearch.commons.alerting.model.ActionRunResult
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.commons.alerting.model.IndexExecutionContext
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorRunResult
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.breaker.CircuitBreakingException
import org.opensearch.core.common.io.stream.Writeable
Expand Down Expand Up @@ -436,7 +436,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (res.exception == null) {
return null
} else {
exceptions.add(res.exception)
exceptions.add(res.exception!!)
}
}
return AlertingException.merge(*exceptions.toTypedArray())
Expand Down Expand Up @@ -501,9 +501,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (response.exception == null) {
if (response.inputResults.error != null) {
if (response.inputResults.error is AlertingException) {
errors.add(response.inputResults.error)
errors.add(response.inputResults.error as AlertingException)
} else {
errors.add(AlertingException.wrap(response.inputResults.error) as AlertingException)
errors.add(AlertingException.wrap(response.inputResults.error as Exception) as AlertingException)
}
}
val partialResult = response.inputResults.results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
Expand All @@ -21,7 +19,6 @@ import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
Expand All @@ -30,8 +27,11 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.TriggerAfterKey
import org.opensearch.commons.alerting.model.WorkflowRunContext
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
Expand All @@ -40,7 +38,10 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.MonitorMetadata
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
Expand Down Expand Up @@ -187,12 +188,14 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
} else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
} else null
else null
return if (runContext != null) {
metadata.copy(
lastRunContext = runContext
Expand All @@ -210,10 +213,12 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR.value)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
(monitor.inputs[0] as RemoteDocLevelMonitorInput).docLevelMonitorInput.indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
val runContext = if (monitor.monitorType.endsWith(Monitor.MonitorType.DOC_LEVEL_MONITOR.value))
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
Expand Down
Loading

0 comments on commit cd8e189

Please sign in to comment.