Skip to content

Commit

Permalink
fix for MapperException[the [enabled] parameter can't be updated for …
Browse files Browse the repository at this point in the history
…the object mapping [metadata.source_to_query_index_mapping]

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Feb 16, 2024
1 parent 719db46 commit f16db12
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.TEST_MONITOR_NAME,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
Expand All @@ -17,6 +18,7 @@ import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
Expand All @@ -25,6 +27,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -309,6 +312,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
onSuccessfulMonitorRun(monitorCtx, monitor)
}

deleteIfTempMonitor(monitor, monitorCtx)
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
Expand Down Expand Up @@ -839,4 +843,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
jsonAsMap.putAll(tempMap)
}

private suspend fun deleteIfTempMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext) {
if (monitor.name == AlertingSettings.TEST_MONITOR_NAME.getDefault(monitorCtx.settings!!)) {
monitorCtx.client!!.suspendUntil<Client, AcknowledgedResponse> {
monitorCtx.client!!.admin().indices().delete(DeleteIndexRequest(".opendistro-alerting-config"), it)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.get.GetIndexRequest
Expand Down Expand Up @@ -78,35 +79,51 @@ object MonitorMetadataService :
@Suppress("ComplexMethod", "ReturnCount")
suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata {
try {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(metadata.id)
.routing(metadata.monitorId)
.setIfSeqNo(metadata.seqNo)
.setIfPrimaryTerm(metadata.primaryTerm)
.timeout(indexTimeout)
if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(
metadata.toXContent(
XContentFactory.jsonBuilder(),
ToXContent.MapParams(mapOf("with_type" to "true"))
)
)
.id(metadata.id)
.routing(metadata.monitorId)
.setIfSeqNo(metadata.seqNo)
.setIfPrimaryTerm(metadata.primaryTerm)
.timeout(indexTimeout)

if (updating) {
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE)
}
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
when (response.result) {
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result"
log.error(failureReason)
throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason))
if (updating) {
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE)
}
DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
when (response.result) {
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
val failureReason =
"The upsert metadata call failed with a ${response.result?.lowercase} result"
log.error(failureReason)
throw AlertingException(
failureReason,
RestStatus.INTERNAL_SERVER_ERROR,
IllegalStateException(failureReason)
)
}

DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
}
}
return metadata.copy(
seqNo = response.seqNo,
primaryTerm = response.primaryTerm
)
} else {
val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata"
throw OpenSearchStatusException(failureReason, RestStatus.INTERNAL_SERVER_ERROR)
}
return metadata.copy(
seqNo = response.seqNo,
primaryTerm = response.primaryTerm
)
} catch (e: Exception) {
throw AlertingException.wrap(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,11 @@ class AlertingSettings {
1,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val TEST_MONITOR_NAME = Setting.simpleString(
"plugins.alerting.integ_test.test_monitor_name",
"__delete_config_index_monitor__",
Setting.Property.NodeScope, Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ class TransportIndexMonitorAction @Inject constructor(
}

fun start() {
/**
* this happens in a multi-node scenario when in 1 node IndexUtils.scheduledJobIndexUpdated is set to true but
* in another node where the new monitor creation request is received & the flag IndexUtils.scheduledJobIndexUpdated
* is set to false.
*/
if (request.monitor.name == AlertingSettings.TEST_MONITOR_NAME.getDefault(settings)) {
IndexUtils.scheduledJobIndexUpdated = false
}
if (!scheduledJobIndices.scheduledJobIndexExists()) {
scheduledJobIndices.initScheduledJobIndex(object : ActionListener<CreateIndexResponse> {
override fun onResponse(response: CreateIndexResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class IndexUtils {
private set

var scheduledJobIndexUpdated: Boolean = false
private set
public set
var alertIndexUpdated: Boolean = false
private set
var findingIndexUpdated: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,31 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5"))
}

fun `test execute monitor generates alerts and findings with alerting config index deleted`() {
val monitorName = "__delete_config_index_monitor__"
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger), name = monitorName))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
executeMonitor(monitor.id)

monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger), name = monitorName))
assertNotNull(monitor.id)
}

fun `test execute monitor input error`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down

0 comments on commit f16db12

Please sign in to comment.