From 96d4a2ce9998d4867d787a86ced111fc1829af3b Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sat, 3 Feb 2024 03:59:23 +0000 Subject: [PATCH] add distributed locking to jobs in alerting Signed-off-by: Subhobrata Dey --- .../org/opensearch/alerting/AlertingPlugin.kt | 3 + .../alerting/DocumentLevelMonitorRunner.kt | 7 + .../alerting/MonitorRunnerExecutionContext.kt | 5 +- .../alerting/MonitorRunnerService.kt | 21 ++ .../alerting/DocumentMonitorRunnerIT.kt | 39 +++ .../alerting/resthandler/WorkflowRestApiIT.kt | 43 +++ .../alerting/core/lock/LockModel.kt | 132 ++++++++ .../alerting/core/lock/LockService.kt | 319 ++++++++++++++++++ .../core/settings/ScheduledJobSettings.kt | 6 + .../opensearch-alerting-config-lock.json | 18 + 10 files changed, 592 insertions(+), 1 deletion(-) create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt create mode 100644 core/src/main/resources/mappings/opensearch-alerting-config-lock.json diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a80f33ae..9cbdbccb0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -18,6 +18,7 @@ import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler import org.opensearch.alerting.core.schedule.JobScheduler import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings @@ -258,6 +259,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) .registerWorkflowService(WorkflowService(client, xContentRegistry)) + .registerLockService(LockService(client, clusterService)) .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) @@ -295,6 +297,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ScheduledJobSettings.SWEEP_PERIOD, ScheduledJobSettings.SWEEP_PAGE_SIZE, ScheduledJobSettings.SWEEPER_ENABLED, + ScheduledJobSettings.LOCK_DURATION_SECONDS, LegacyOpenDistroScheduledJobSettings.REQUEST_TIMEOUT, LegacyOpenDistroScheduledJobSettings.SWEEP_BACKOFF_MILLIS, LegacyOpenDistroScheduledJobSettings.SWEEP_BACKOFF_RETRY_COUNT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3b8e4dee7..24bf17d92 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -255,6 +255,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) + delayIfTempMonitor(monitor) /* populate the map queryToDocIds with pairs of { launch { + val lock = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, lockDurationSeconds, it) + } ?: return@launch runJob(job, periodStart, periodEnd, false) + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } } } is Monitor -> { launch { + val lock: LockModel = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, lockDurationSeconds, it) + } ?: return@launch runJob(job, periodStart, periodEnd, false) + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } } } else -> { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 5f969d661..a80bfb442 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -17,6 +17,7 @@ import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope @@ -25,6 +26,7 @@ import org.opensearch.core.rest.RestStatus import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.MILLIS import java.util.Locale @@ -147,6 +149,43 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) } + fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + Thread.sleep(240000) + + val inputMap = HashMap() + inputMap["searchString"] = monitor.name + + val responseMap = getAlerts(inputMap).asMap() + val alerts = (responseMap["alerts"] as ArrayList>) + alerts.forEach { + assertTrue(it["error_message"] == null) + } + } + fun `test execute monitor with tag as trigger condition generates alerts and findings`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index cf48720af..c7750d40d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -38,6 +38,8 @@ import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.junit.annotations.TestLogging import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.Collections import java.util.Locale @@ -1190,4 +1192,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() { val findings = searchFindings(monitor.copy(id = monitorResponse.id)) assertEquals("Findings saved for test monitor", 1, findings.size) } + + fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = false, + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + createWorkflow( + randomWorkflow( + monitorIds = listOf(monitor.id), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + ) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + Thread.sleep(240000) + + val alerts = searchAlerts(monitor) + alerts.forEach { + assertTrue(it.errorMessage == null) + } + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt new file mode 100644 index 000000000..ce7c58fae --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.core.lock + +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +class LockModel( + val lockId: String, + val scheduledJobId: String, + val lockTime: Instant, + val lockDurationSeconds: Long, + val released: Boolean, + val seqNo: Long, + val primaryTerm: Long +) : ToXContentObject { + + constructor( + copyLock: LockModel, + seqNo: Long, + primaryTerm: Long + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + copyLock.lockTime, + copyLock.lockDurationSeconds, + copyLock.released, + seqNo, + primaryTerm + ) + + constructor( + copyLock: LockModel, + released: Boolean + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + copyLock.lockTime, + copyLock.lockDurationSeconds, + released, + copyLock.seqNo, + copyLock.primaryTerm + ) + + constructor( + copyLock: LockModel, + updateLockTime: Instant, + lockDurationSeconds: Long, + released: Boolean + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + updateLockTime, + lockDurationSeconds, + released, + copyLock.seqNo, + copyLock.primaryTerm + ) + + constructor( + scheduledJobId: String, + lockTime: Instant, + lockDurationSeconds: Long, + released: Boolean + ) : this ( + generateLockId(scheduledJobId), + scheduledJobId, + lockTime, + lockDurationSeconds, + released, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + + fun expired(): Boolean { + return lockTime.epochSecond + lockDurationSeconds < Instant.now().epochSecond + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(SCHEDULED_JOB_ID, scheduledJobId) + .field(LOCK_TIME, lockTime.epochSecond) + .field(LOCK_DURATION, lockDurationSeconds) + .field(RELEASED, released) + .endObject() + return builder + } + + companion object { + const val SCHEDULED_JOB_ID = "scheduled_job_id" + const val LOCK_TIME = "lock_time" + const val LOCK_DURATION = "lock_duration_seconds" + const val RELEASED = "released" + + fun generateLockId(scheduledJobId: String): String { + return "$scheduledJobId-lock" + } + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): LockModel { + lateinit var scheduledJobId: String + lateinit var lockTime: Instant + var lockDurationSeconds: Long = 0L + var released: Boolean = false + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEDULED_JOB_ID -> scheduledJobId = xcp.text() + LOCK_TIME -> lockTime = Instant.ofEpochSecond(xcp.longValue()) + LOCK_DURATION -> lockDurationSeconds = xcp.longValue() + RELEASED -> released = xcp.booleanValue() + } + } + return LockModel(generateLockId(scheduledJobId), scheduledJobId, lockTime, lockDurationSeconds, released, seqNo, primaryTerm) + } + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt new file mode 100644 index 000000000..f3f148da4 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt @@ -0,0 +1,319 @@ +package org.opensearch.alerting.core.lock + +import org.apache.logging.log4j.LogManager +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.DocWriteResponse +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.update.UpdateRequest +import org.opensearch.action.update.UpdateResponse +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.engine.DocumentMissingException +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +private val log = LogManager.getLogger(LockService::class.java) + +class LockService(private val client: Client, private val clusterService: ClusterService) { + private var testInstant: Instant? = null + + companion object { + const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock" + + @JvmStatic + fun lockMapping(): String? { + return LockService::class.java.classLoader.getResource("mappings/opensearch-alerting-config-lock.json") + ?.readText() + } + } + + fun lockIndexExist(): Boolean { + return clusterService.state().routingTable().hasIndex(LOCK_INDEX_NAME) + } + + fun acquireLock( + scheduledJob: ScheduledJob, + lockDurationSeconds: Long, + listener: ActionListener + ) { + val scheduledJobId = scheduledJob.id + acquireLockWithId(lockDurationSeconds, scheduledJobId, listener) + } + + fun acquireLockWithId( + lockDurationSeconds: Long, + scheduledJobId: String, + listener: ActionListener + ) { + val lockId = LockModel.generateLockId(scheduledJobId) + createLockIndex( + object : ActionListener { + override fun onResponse(created: Boolean) { + if (created) { + try { + findLock( + lockId, + object : ActionListener { + override fun onResponse(existingLock: LockModel?) { + if (existingLock != null) { + if (isLockReleasedOrExpired(existingLock)) { + log.debug("lock is released or expired: {}", existingLock) + val updateLock = LockModel(existingLock, getNow(), lockDurationSeconds, false) + updateLock(updateLock, listener) + } else { + log.debug("Lock is NOT released or expired. {}", existingLock) + listener.onResponse(null) + } + } else { + val tempLock = LockModel(scheduledJobId, getNow(), lockDurationSeconds, false) + log.debug("Lock does not exist. Creating new lock {}", tempLock) + createLock(tempLock, listener) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } catch (e: VersionConflictEngineException) { + log.debug("could not acquire lock {}", e.message) + listener.onResponse(null) + } + } else { + listener.onResponse(null) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } + + private fun createLock( + tempLock: LockModel, + listener: ActionListener + ) { + try { + val request = IndexRequest(LOCK_INDEX_NAME).id(tempLock.lockId) + .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + .create(true) + client.index( + request, + object : ActionListener { + override fun onResponse(response: IndexResponse) { + listener.onResponse(LockModel(tempLock, response.seqNo, response.primaryTerm)) + } + + override fun onFailure(e: Exception) { + if (e is VersionConflictEngineException) { + log.debug("Lock is already created. {}", e.message) + listener.onResponse(null) + return + } + listener.onFailure(e) + } + } + ) + } catch (ex: IOException) { + log.error("IOException occurred creating lock", ex) + listener.onFailure(ex) + } + } + + private fun updateLock( + updateLock: LockModel, + listener: ActionListener + ) { + try { + val updateRequest = UpdateRequest().index(LOCK_INDEX_NAME) + .id(updateLock.lockId) + .setIfSeqNo(updateLock.seqNo) + .setIfPrimaryTerm(updateLock.primaryTerm) + .doc(updateLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .fetchSource(true) + + client.update( + updateRequest, + object : ActionListener { + override fun onResponse(response: UpdateResponse) { + listener.onResponse(LockModel(updateLock, response.seqNo, response.primaryTerm)) + } + + override fun onFailure(e: Exception) { + if (e is VersionConflictEngineException) { + log.debug("could not acquire lock {}", e.message) + } + if (e is DocumentMissingException) { + log.debug( + "Document is deleted. This happens if the job is already removed and" + " this is the last run." + "{}", + e.message + ) + } + if (e is IOException) { + log.error("IOException occurred updating lock.", e) + } + listener.onResponse(null) + } + } + ) + } catch (ex: IOException) { + log.error("IOException occurred updating lock.", ex) + listener.onResponse(null) + } + } + + fun findLock( + lockId: String, + listener: ActionListener + ) { + val getRequest = GetRequest(LOCK_INDEX_NAME).id(lockId) + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + listener.onResponse(null) + } else { + try { + val parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.sourceAsString) + parser.nextToken() + listener.onResponse(LockModel.parse(parser, response.seqNo, response.primaryTerm)) + } catch (e: IOException) { + log.error("IOException occurred finding lock", e) + listener.onResponse(null) + } + } + } + + override fun onFailure(e: Exception) { + log.error("Exception occurred finding lock", e) + listener.onFailure(e) + } + } + ) + } + + fun release( + lock: LockModel?, + listener: ActionListener + ) { + if (lock == null) { + log.debug("Lock is null. Nothing to release.") + listener.onResponse(false) + } else { + log.debug("Releasing lock: {}", lock) + val lockToRelease = LockModel(lock, true) + updateLock( + lockToRelease, + object : ActionListener { + override fun onResponse(releasedLock: LockModel?) { + listener.onResponse(releasedLock != null) + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } + } + + fun deleteLock( + lockId: String, + listener: ActionListener + ) { + val deleteRequest = DeleteRequest(LOCK_INDEX_NAME).id(lockId) + client.delete( + deleteRequest, + object : ActionListener { + override fun onResponse(response: DeleteResponse) { + listener.onResponse( + response.result == DocWriteResponse.Result.DELETED || response.result == DocWriteResponse.Result.NOT_FOUND + ) + } + + override fun onFailure(e: Exception) { + if (e is IndexNotFoundException || e.cause is IndexNotFoundException) { + log.debug("Index is not found to delete lock. {}", e.message) + listener.onResponse(true) + } else { + listener.onFailure(e) + } + } + } + ) + } + + fun renewLock( + lock: LockModel, + listener: ActionListener + ) { + } + + private fun createLockIndex(listener: ActionListener) { + if (lockIndexExist()) { + listener.onResponse(true) + } else { + val indexRequest = CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()) + .settings(Settings.builder().put("index.hidden", true).build()) + client.admin().indices().create( + indexRequest, + object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + listener.onResponse(response.isAcknowledged) + } + + override fun onFailure(ex: Exception) { + log.error("Failed to update config index schema", ex) + if (ex is ResourceAlreadyExistsException || ex.cause is ResourceAlreadyExistsException + ) { + listener.onResponse(true) + } else { + listener.onFailure(ex) + } + } + } + ) + } + } + + private fun isLockReleasedOrExpired(lock: LockModel): Boolean { + return lock.released || lock.expired() + } + + private fun getNow(): Instant { + return if (testInstant != null) { + testInstant!! + } else { + Instant.now() + } + } + + fun setTime(testInstant: Instant) { + this.testInstant = testInstant + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt b/core/src/main/kotlin/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt index 6bdb18bec..7e82e39e0 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/settings/ScheduledJobSettings.kt @@ -47,5 +47,11 @@ class ScheduledJobSettings { LegacyOpenDistroScheduledJobSettings.SWEEP_PAGE_SIZE, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val LOCK_DURATION_SECONDS = Setting.longSetting( + "plugins.scheduled_jobs.lock_duration_seconds", + 120L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/core/src/main/resources/mappings/opensearch-alerting-config-lock.json b/core/src/main/resources/mappings/opensearch-alerting-config-lock.json new file mode 100644 index 000000000..401374a8f --- /dev/null +++ b/core/src/main/resources/mappings/opensearch-alerting-config-lock.json @@ -0,0 +1,18 @@ +{ + "dynamic": "strict", + "properties": { + "scheduled_job_id": { + "type": "keyword" + }, + "lock_time": { + "type": "date", + "format": "epoch_second" + }, + "lock_duration_seconds": { + "type": "long" + }, + "released": { + "type": "boolean" + } + } +} \ No newline at end of file