Skip to content

Commit

Permalink
job scheduler based job distribution
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Feb 19, 2024
1 parent b10eaad commit 615243a
Show file tree
Hide file tree
Showing 15 changed files with 724 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.MAX_SHARDS_PER_DOC_LEVEL_MONITOR,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object MonitorMetadataService :
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>, monitor)
else null
return if (runContext != null) {
metadata.copy(
Expand All @@ -195,7 +195,7 @@ object MonitorMetadataService :
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
createFullRunContext(monitorIndex, monitor = monitor)
else emptyMap()
return MonitorMetadata(
id = MonitorMetadata.getId(monitor, workflowMetadataId),
Expand All @@ -211,6 +211,7 @@ object MonitorMetadataService :
suspend fun createFullRunContext(
index: String?,
existingRunContext: MutableMap<String, MutableMap<String, Any>>? = null,
monitor: Monitor
): MutableMap<String, MutableMap<String, Any>> {
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
Expand All @@ -231,7 +232,10 @@ object MonitorMetadataService :

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
lastRunContext[indexName] = createRunContextForIndex(indexName)
lastRunContext[indexName] = createRunContextForIndex(
indexName,
shardInfoList = if (monitor.isChild == true) monitor.shards else listOf()
)
}
}
} catch (e: RemoteTransportException) {
Expand All @@ -249,14 +253,16 @@ object MonitorMetadataService :
return lastRunContext
}

suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false): MutableMap<String, Any> {
suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false, shardInfoList: List<String> = listOf()): MutableMap<String, Any> {
val request = IndicesStatsRequest().indices(index).clear()
val response: IndicesStatsResponse = client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) }
if (response.status != RestStatus.OK) {
val errorMessage = "Failed fetching index stats for index:$index"
throw AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(errorMessage))
}
val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() }
val shards = response.shards.filter {
it.shardRouting.primary() && it.shardRouting.active() && shardInfoList.contains(index + ":" + it.shardRouting.id.toString())
}
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = shards.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ data class MonitorRunResult<TriggerResult : TriggerRunResult>(
val monitorName: String,
val periodStart: Instant,
val periodEnd: Instant,
val error: Exception? = null,
var error: Exception? = null,
val inputResults: InputRunResults = InputRunResults(),
val triggerResults: Map<String, TriggerResult> = mapOf()
) : Writeable, ToXContent {
Expand Down Expand Up @@ -53,7 +53,7 @@ data class MonitorRunResult<TriggerResult : TriggerRunResult>(
/** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */
fun alertError(): AlertError? {
if (error != null) {
return AlertError(Instant.now(), "Failed running monitor:\n${error.userErrorMessage()}")
return AlertError(Instant.now(), "Failed running monitor:\n${error!!.userErrorMessage()}")
}

if (inputResults.error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGAT
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_MONITOR_PATH
import org.opensearch.alerting.util.use
import org.opensearch.client.Client
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
Expand Down Expand Up @@ -65,6 +67,7 @@ object DeleteMonitorService :
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
deleteChildDocLevelMonitors(monitor, refreshPolicy)
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
}

Expand Down Expand Up @@ -148,6 +151,28 @@ object DeleteMonitorService :
}
}

private suspend fun deleteChildDocLevelMonitors(monitor: Monitor, refreshPolicy: RefreshPolicy) {
if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val request: SearchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(
SearchSourceBuilder()
.query(QueryBuilders.matchQuery("monitor.owner", monitor.id))
.size(10000)
)
val response = client.suspendUntil<Client, SearchResponse> { client.search(request, it) }
response.hits.forEach { childMonitor ->
val deleteMonitorResponse = client.suspendUntil<Client, DeleteMonitorResponse> {
client.execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
DeleteMonitorRequest(childMonitor.id, refreshPolicy),
it
)
}
}
}
}

/**
* Checks if the monitor is part of the workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,12 @@ class AlertingSettings {
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val MAX_SHARDS_PER_DOC_LEVEL_MONITOR = Setting.intSetting(
"plugins.alerting.max_shards_per_doc_level_monitor",
2,
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.indices.stats.IndicesStatsAction
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
Expand All @@ -21,6 +26,11 @@ import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteMonitorRequest
import org.opensearch.alerting.action.ExecuteMonitorResponse
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.TriggerRunResult
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.DocLevelMonitorQueries
Expand All @@ -33,12 +43,15 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.authuser.User
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant
Expand Down Expand Up @@ -91,6 +104,70 @@ class TransportExecuteMonitorAction @Inject constructor(
}
}
}
val executeMonitors = fun(monitors: List<Monitor>) {
// Launch the coroutine with the clients threadContext. This is needed to preserve authentication information
// stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin.
// runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) {
runner.launch {
val (periodStart, periodEnd) =
monitors[0].schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis))
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
val triggerRunResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
val monitorRunResult = MonitorRunResult<TriggerRunResult>(monitors[0].name, periodStart, periodEnd)
monitors.forEach { monitor ->
log.info(
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " +
"periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}"
)
val childMonitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
if (childMonitorRunResult.error != null) {
monitorRunResult.error = childMonitorRunResult.error
} else {
val childInputRunResults = childMonitorRunResult.inputResults.results[0]
childInputRunResults.forEach {
if (inputRunResults.containsKey(it.key)) {
val existingResults = inputRunResults[it.key]
existingResults!!.addAll(it.value as Set<String>)
inputRunResults[it.key] = existingResults
} else {
inputRunResults[it.key] = it.value as MutableSet<String>
}
}
childMonitorRunResult.triggerResults.forEach {
if (triggerRunResults.containsKey(it.key)) {
val newDocs = mutableListOf<String>()
val existingResults = triggerRunResults[it.key]

newDocs.addAll(existingResults!!.triggeredDocs)
newDocs.addAll((it.value as DocumentLevelTriggerRunResult).triggeredDocs)

triggerRunResults[it.key] = existingResults.copy(triggeredDocs = newDocs)
} else {
triggerRunResults[it.key] = it.value as DocumentLevelTriggerRunResult
}
}
}
}

try {
withContext(Dispatchers.IO) {
actionListener.onResponse(
ExecuteMonitorResponse(
monitorRunResult.copy(
inputResults = InputRunResults(listOf(inputRunResults)),
triggerResults = triggerRunResults
)
)
)
}
} catch (e: Exception) {
log.error("Unexpected error running monitor", e)
withContext(Dispatchers.IO) {
actionListener.onFailure(AlertingException.wrap(e))
}
}
}
}

if (execMonitorRequest.monitorId != null) {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId)
Expand All @@ -112,7 +189,41 @@ class TransportExecuteMonitorAction @Inject constructor(
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
executeMonitor(monitor)
if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val request: SearchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(
SearchSourceBuilder()
.query(QueryBuilders.matchQuery("monitor.owner", monitor.id))
.size(10000)
)
client.search(
request,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val childMonitors = mutableListOf<Monitor>()
response.hits.forEach { hit ->
XContentHelper.createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceRef,
XContentType.JSON
).use { xcp ->
val childMonitor = ScheduledJob.parse(xcp, hit.id, hit.version) as Monitor
childMonitors.add(childMonitor)
}
}
executeMonitors(childMonitors)
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
)
} else {
executeMonitor(monitor)
}
}
}
}
Expand Down Expand Up @@ -144,7 +255,16 @@ class TransportExecuteMonitorAction @Inject constructor(
indexTimeout
)
log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}")
executeMonitor(monitor)
val shardInfoMap = getShards((monitor.inputs[0] as DocLevelMonitorInput).indices)
val indexShardPairs = mutableListOf<String>()
shardInfoMap.forEach {
val index = it.key
val shards = it.value
shards.forEach { shard ->
indexShardPairs.add("$index:$shard")
}
}
executeMonitor(monitor.copy(isChild = true, shards = indexShardPairs))
}
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
Expand All @@ -155,4 +275,22 @@ class TransportExecuteMonitorAction @Inject constructor(
}
}
}

private suspend fun getShards(indices: List<String>): Map<String, List<String>> {
return indices.associateWith { index ->
val request = IndicesStatsRequest().indices(index).clear()
val response: IndicesStatsResponse =
client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) }
if (response.status != RestStatus.OK) {
val errorMessage = "Failed fetching index stats for index:$index"
throw AlertingException(
errorMessage,
RestStatus.INTERNAL_SERVER_ERROR,
IllegalStateException(errorMessage)
)
}
val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() }
shards.map { it.shardRouting.id.toString() }
}
}
}
Loading

0 comments on commit 615243a

Please sign in to comment.