Skip to content

Commit

Permalink
job scheduler based job distribution
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Feb 9, 2024
1 parent b10eaad commit 4eb1802
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.MAX_SHARDS_PER_DOC_LEVEL_MONITOR,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.refresh.RefreshAction
Expand All @@ -19,14 +18,12 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.workflow.WorkflowRunContext
Expand Down Expand Up @@ -55,7 +52,6 @@ import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
Expand All @@ -66,7 +62,6 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand All @@ -80,7 +75,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
workflowRunContext: WorkflowRunContext?,
executionId: String
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
return MonitorRunResult("", Instant.now(), Instant.now())
/* logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
Expand Down Expand Up @@ -188,14 +184,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(concreteIndexName)
)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently)
MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently, monitor.shards)
}
// Prepare updatedLastRunContext for each index
val indexUpdatedRunContext = updateLastRunContext(
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName
concreteIndexName,
monitor.shards
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand All @@ -208,10 +205,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val shard = i.toString()

for (shardInfo in monitor.shards) {
val shard = shardInfo.split(":")[1]
// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
Expand All @@ -231,6 +226,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName)
)
logger.info("shards-${monitor.shards}")
logger.info("no. of docs-${matchingDocs.map { it.first }}")
if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
Expand Down Expand Up @@ -309,6 +306,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
onSuccessfulMonitorRun(monitorCtx, monitor)
}
updatedLastRunContext.forEach {
logger.info(it.key)
it.value.forEach { it1 ->
logger.info(it1.key + "-" + it1.value)
}
}
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
Expand All @@ -327,7 +330,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
e
)
return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException))
}
}*/
}

private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) {
Expand Down Expand Up @@ -567,12 +570,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
private suspend fun updateLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String
index: String,
shards: List<String>
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val currShards = getShards(monitorCtx.clusterService!!, index, shards)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
for (shard in currShards) {
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
}
Expand Down Expand Up @@ -637,6 +640,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return allShards.filter { it.primary() }.size
}

private fun getShards(clusterService: ClusterService, index: String, shards: List<String>): List<String> {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
return allShards.filter { it.primary() && shards.contains(index + ":" + it.id.toString()) }.map { it.id.toString() }
}

private suspend fun getMatchingDocs(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
Expand All @@ -646,11 +654,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
conflictingFields: List<String>,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
for (i: Int in 0 until count) {
val shard = i.toString()
for (shardInfo in monitor.shards) {
try {
val shard = shardInfo.split(":")[1]
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

Expand All @@ -668,7 +675,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields))
}
} catch (e: Exception) {
logger.error("Failed to run for shard $shard. Error: ${e.message}")
logger.error("Failed to run for shard ${shardInfo.split(":")[1]}. Error: ${e.message}")
}
}
return matchingDocs
Expand Down Expand Up @@ -706,7 +713,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())
// .preference(Preference.PRIMARY_FIRST.type())
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object MonitorMetadataService :
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>, monitor)
else null
return if (runContext != null) {
metadata.copy(
Expand All @@ -195,7 +195,7 @@ object MonitorMetadataService :
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
createFullRunContext(monitorIndex, monitor = monitor)
else emptyMap()
return MonitorMetadata(
id = MonitorMetadata.getId(monitor, workflowMetadataId),
Expand All @@ -211,6 +211,7 @@ object MonitorMetadataService :
suspend fun createFullRunContext(
index: String?,
existingRunContext: MutableMap<String, MutableMap<String, Any>>? = null,
monitor: Monitor
): MutableMap<String, MutableMap<String, Any>> {
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
Expand All @@ -231,7 +232,10 @@ object MonitorMetadataService :

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
lastRunContext[indexName] = createRunContextForIndex(indexName)
lastRunContext[indexName] = createRunContextForIndex(
indexName,
shardInfoList = if (monitor.isChild == true) monitor.shards else listOf()
)
}
}
} catch (e: RemoteTransportException) {
Expand All @@ -249,14 +253,16 @@ object MonitorMetadataService :
return lastRunContext
}

suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false): MutableMap<String, Any> {
suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false, shardInfoList: List<String> = listOf()): MutableMap<String, Any> {
val request = IndicesStatsRequest().indices(index).clear()
val response: IndicesStatsResponse = client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) }
if (response.status != RestStatus.OK) {
val errorMessage = "Failed fetching index stats for index:$index"
throw AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(errorMessage))
}
val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() }
val shards = response.shards.filter {
it.shardRouting.primary() && it.shardRouting.active() && shardInfoList.contains(index + ":" + it.shardRouting.id.toString())
}
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = shards.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,12 @@ class AlertingSettings {
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val MAX_SHARDS_PER_DOC_LEVEL_MONITOR = Setting.intSetting(
"plugins.alerting.max_shards_per_doc_level_monitor",
2,
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Loading

0 comments on commit 4eb1802

Please sign in to comment.