From f16db12c01065f63f63c32f9720728075ef6c8e0 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Fri, 16 Feb 2024 00:14:29 +0000 Subject: [PATCH] fix for MapperException[the [enabled] parameter can't be updated for the object mapping [metadata.source_to_query_index_mapping] Signed-off-by: Subhobrata Dey --- .../org/opensearch/alerting/AlertingPlugin.kt | 1 + .../alerting/DocumentLevelMonitorRunner.kt | 12 ++++ .../alerting/MonitorMetadataService.kt | 67 ++++++++++++------- .../alerting/settings/AlertingSettings.kt | 6 ++ .../transport/TransportIndexMonitorAction.kt | 8 +++ .../opensearch/alerting/util/IndexUtils.kt | 2 +- .../alerting/DocumentMonitorRunnerIT.kt | 25 +++++++ 7 files changed, 95 insertions(+), 26 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index ea01b2524..5a69c8bba 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -324,6 +324,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT, + AlertingSettings.TEST_MONITOR_NAME, LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT, LegacyOpenDistroAlertingSettings.BULK_TIMEOUT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 0e88b5cb3..df6e795c3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.refresh.RefreshAction import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.bulk.BulkRequest @@ -17,6 +18,7 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults @@ -25,6 +27,7 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -309,6 +312,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { onSuccessfulMonitorRun(monitorCtx, monitor) } + deleteIfTempMonitor(monitor, monitorCtx) MonitorMetadataService.upsertMetadata( monitorMetadata.copy(lastRunContext = updatedLastRunContext), true @@ -839,4 +843,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } jsonAsMap.putAll(tempMap) } + + private suspend fun deleteIfTempMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext) { + if (monitor.name == AlertingSettings.TEST_MONITOR_NAME.getDefault(monitorCtx.settings!!)) { + monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.admin().indices().delete(DeleteIndexRequest(".opendistro-alerting-config"), it) + } + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 5c8886686..8c7e28734 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.get.GetIndexRequest @@ -78,35 +79,51 @@ object MonitorMetadataService : @Suppress("ComplexMethod", "ReturnCount") suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata { try { - val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(metadata.id) - .routing(metadata.monitorId) - .setIfSeqNo(metadata.seqNo) - .setIfPrimaryTerm(metadata.primaryTerm) - .timeout(indexTimeout) + if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) { + val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source( + metadata.toXContent( + XContentFactory.jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) + .id(metadata.id) + .routing(metadata.monitorId) + .setIfSeqNo(metadata.seqNo) + .setIfPrimaryTerm(metadata.primaryTerm) + .timeout(indexTimeout) - if (updating) { - indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) - } else { - indexRequest.opType(DocWriteRequest.OpType.CREATE) - } - val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } - when (response.result) { - DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { - val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result" - log.error(failureReason) - throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason)) + if (updating) { + indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) + } else { + indexRequest.opType(DocWriteRequest.OpType.CREATE) } - DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { - log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } + when (response.result) { + DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { + val failureReason = + "The upsert metadata call failed with a ${response.result?.lowercase} result" + log.error(failureReason) + throw AlertingException( + failureReason, + RestStatus.INTERNAL_SERVER_ERROR, + IllegalStateException(failureReason) + ) + } + + DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { + log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + } } + return metadata.copy( + seqNo = response.seqNo, + primaryTerm = response.primaryTerm + ) + } else { + val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata" + throw OpenSearchStatusException(failureReason, RestStatus.INTERNAL_SERVER_ERROR) } - return metadata.copy( - seqNo = response.seqNo, - primaryTerm = response.primaryTerm - ) } catch (e: Exception) { throw AlertingException.wrap(e) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 743d582e5..951851277 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -168,5 +168,11 @@ class AlertingSettings { 1, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val TEST_MONITOR_NAME = Setting.simpleString( + "plugins.alerting.integ_test.test_monitor_name", + "__delete_config_index_monitor__", + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 49743b3f0..72396fbc5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -302,6 +302,14 @@ class TransportIndexMonitorAction @Inject constructor( } fun start() { + /** + * this happens in a multi-node scenario when in 1 node IndexUtils.scheduledJobIndexUpdated is set to true but + * in another node where the new monitor creation request is received & the flag IndexUtils.scheduledJobIndexUpdated + * is set to false. + */ + if (request.monitor.name == AlertingSettings.TEST_MONITOR_NAME.getDefault(settings)) { + IndexUtils.scheduledJobIndexUpdated = false + } if (!scheduledJobIndices.scheduledJobIndexExists()) { scheduledJobIndices.initScheduledJobIndex(object : ActionListener { override fun onResponse(response: CreateIndexResponse) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 6949ca58d..0d39c97b9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -39,7 +39,7 @@ class IndexUtils { private set var scheduledJobIndexUpdated: Boolean = false - private set + public set var alertIndexUpdated: Boolean = false private set var findingIndexUpdated: Boolean = false diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 479e29dca..0cf845585 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -187,6 +187,31 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) } + fun `test execute monitor generates alerts and findings with alerting config index deleted`() { + val monitorName = "__delete_config_index_monitor__" + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger), name = monitorName)) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + executeMonitor(monitor.id) + + monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger), name = monitorName)) + assertNotNull(monitor.id) + } + fun `test execute monitor input error`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))