Skip to content

Commit

Permalink
add distributed locking to jobs in alerting
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Feb 3, 2024
1 parent 0d776da commit 96d4a2c
Show file tree
Hide file tree
Showing 10 changed files with 592 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
Expand Down Expand Up @@ -258,6 +259,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerLockService(LockService(client, clusterService))
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
Expand Down Expand Up @@ -295,6 +297,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ScheduledJobSettings.SWEEP_PERIOD,
ScheduledJobSettings.SWEEP_PAGE_SIZE,
ScheduledJobSettings.SWEEPER_ENABLED,
ScheduledJobSettings.LOCK_DURATION_SECONDS,
LegacyOpenDistroScheduledJobSettings.REQUEST_TIMEOUT,
LegacyOpenDistroScheduledJobSettings.SWEEP_BACKOFF_MILLIS,
LegacyOpenDistroScheduledJobSettings.SWEEP_BACKOFF_RETRY_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
delayIfTempMonitor(monitor)

/*
populate the map queryToDocIds with pairs of <DocLevelQuery object from queries in monitor metadata &
Expand Down Expand Up @@ -800,4 +801,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
jsonAsMap.putAll(tempMap)
}

private fun delayIfTempMonitor(monitor: Monitor) {
if (monitor.name == "__lag-monitor-test__") {
Thread.sleep(80000)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -47,5 +48,7 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,

@Volatile var lockService: LockService? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
Expand Down Expand Up @@ -64,6 +68,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon

var monitorCtx: MonitorRunnerExecutionContext = MonitorRunnerExecutionContext()
private lateinit var runnerSupervisor: Job
private var lockDurationSeconds: Long = 120L
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor

Expand Down Expand Up @@ -180,6 +185,14 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerLockService(lockService: LockService): MonitorRunnerService {
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ScheduledJobSettings.LOCK_DURATION_SECONDS) {
lockDurationSeconds = it
}
monitorCtx.lockService = lockService
return this
}

// Updates destination settings when the reload API is called so that new keystore values are visible
fun reloadDestinationSettings(settings: Settings) {
monitorCtx.destinationSettings = loadDestinationSettings(settings)
Expand Down Expand Up @@ -251,12 +264,20 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
val lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, lockDurationSeconds, it)
} ?: return@launch
runJob(job, periodStart, periodEnd, false)
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
}
}
is Monitor -> {
launch {
val lock: LockModel = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, lockDurationSeconds, it)
} ?: return@launch
runJob(job, periodStart, periodEnd, false)
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
}
}
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
Expand All @@ -25,6 +26,7 @@ import org.opensearch.core.rest.RestStatus
import org.opensearch.script.Script
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Locale

Expand Down Expand Up @@ -147,6 +149,43 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
}

fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val inputMap = HashMap<String, Any>()
inputMap["searchString"] = monitor.name

val responseMap = getAlerts(inputMap).asMap()
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
alerts.forEach {
assertTrue(it["error_message"] == null)
}
}

fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
Expand Down Expand Up @@ -1190,4 +1192,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
}

fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = false,
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)
createWorkflow(
randomWorkflow(
monitorIds = listOf(monitor.id),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val alerts = searchAlerts(monitor)
alerts.forEach {
assertTrue(it.errorMessage == null)
}
}
}
132 changes: 132 additions & 0 deletions core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.core.lock

import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentObject
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import java.io.IOException
import java.time.Instant

class LockModel(
val lockId: String,
val scheduledJobId: String,
val lockTime: Instant,
val lockDurationSeconds: Long,
val released: Boolean,
val seqNo: Long,
val primaryTerm: Long
) : ToXContentObject {

constructor(
copyLock: LockModel,
seqNo: Long,
primaryTerm: Long
) : this (
copyLock.lockId,
copyLock.scheduledJobId,
copyLock.lockTime,
copyLock.lockDurationSeconds,
copyLock.released,
seqNo,
primaryTerm
)

constructor(
copyLock: LockModel,
released: Boolean
) : this (
copyLock.lockId,
copyLock.scheduledJobId,
copyLock.lockTime,
copyLock.lockDurationSeconds,
released,
copyLock.seqNo,
copyLock.primaryTerm
)

constructor(
copyLock: LockModel,
updateLockTime: Instant,
lockDurationSeconds: Long,
released: Boolean
) : this (
copyLock.lockId,
copyLock.scheduledJobId,
updateLockTime,
lockDurationSeconds,
released,
copyLock.seqNo,
copyLock.primaryTerm
)

constructor(
scheduledJobId: String,
lockTime: Instant,
lockDurationSeconds: Long,
released: Boolean
) : this (
generateLockId(scheduledJobId),
scheduledJobId,
lockTime,
lockDurationSeconds,
released,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
)

fun expired(): Boolean {
return lockTime.epochSecond + lockDurationSeconds < Instant.now().epochSecond
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(SCHEDULED_JOB_ID, scheduledJobId)
.field(LOCK_TIME, lockTime.epochSecond)
.field(LOCK_DURATION, lockDurationSeconds)
.field(RELEASED, released)
.endObject()
return builder
}

companion object {
const val SCHEDULED_JOB_ID = "scheduled_job_id"
const val LOCK_TIME = "lock_time"
const val LOCK_DURATION = "lock_duration_seconds"
const val RELEASED = "released"

fun generateLockId(scheduledJobId: String): String {
return "$scheduledJobId-lock"
}

@JvmStatic
@JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): LockModel {
lateinit var scheduledJobId: String
lateinit var lockTime: Instant
var lockDurationSeconds: Long = 0L
var released: Boolean = false

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
SCHEDULED_JOB_ID -> scheduledJobId = xcp.text()
LOCK_TIME -> lockTime = Instant.ofEpochSecond(xcp.longValue())
LOCK_DURATION -> lockDurationSeconds = xcp.longValue()
RELEASED -> released = xcp.booleanValue()
}
}
return LockModel(generateLockId(scheduledJobId), scheduledJobId, lockTime, lockDurationSeconds, released, seqNo, primaryTerm)
}
}
}
Loading

0 comments on commit 96d4a2c

Please sign in to comment.