diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 66ebd5eb3..6cce97a21 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -234,6 +234,14 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices) this.threadPool = threadPool this.clusterService = clusterService + + MonitorMetadataService.initialize( + client, + clusterService, + xContentRegistry, + settings + ) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index c45548c6b..07ee5669a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.index.IndexRequest @@ -14,17 +15,16 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest -import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults +import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy -import org.opensearch.alerting.util.updateMonitorMetadata import org.opensearch.client.Client import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService @@ -52,7 +52,6 @@ import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID -import kotlin.collections.HashMap import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { @@ -85,10 +84,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return monitorResult.copy(error = AlertingException.wrap(e)) } + var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, createWithRunContext = false) + monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( monitor = monitor, monitorId = monitor.id, + monitorMetadata, indexTimeout = monitorCtx.indexTimeout!! ) @@ -96,11 +98,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val index = docLevelMonitorInput.indices[0] val queries: List = docLevelMonitorInput.queries - var monitorMetadata = getMonitorMetadata(monitorCtx.client!!, monitorCtx.xContentRegistry!!, "${monitor.id}-metadata") - if (monitorMetadata == null) { - monitorMetadata = createMonitorMetadata(monitor.id) - } - val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> @@ -129,7 +126,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare lastRunContext for each index val indexLastRunContext = lastRunContext.getOrPut(indexName) { val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse) - createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName, indexCreatedRecently) + MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently) } // Prepare updatedLastRunContext for each index @@ -157,7 +154,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName) if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName) + val matchedQueriesForDocs = getMatchedQueries( + monitorCtx, + matchingDocs.map { it.second }, + monitor, + monitorMetadata, + indexName + ) matchedQueriesForDocs.forEach { hit -> val id = hit.id.replace("_${indexName}_${monitor.id}", "") @@ -208,7 +211,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Don't update monitor if this is a test monitor if (!isTempMonitor) { - updateMonitorMetadata(monitorCtx.client!!, monitorCtx.settings!!, monitorMetadata.copy(lastRunContext = updatedLastRunContext)) + MonitorMetadataService.upsertMetadata( + monitorMetadata.copy(lastRunContext = updatedLastRunContext), + true + ) } // TODO: Update the Document as part of the Trigger and return back the trigger action result @@ -500,6 +506,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, docs: List, monitor: Monitor, + monitorMetadata: MonitorMetadata, index: String ): SearchHits { val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index)) @@ -510,7 +517,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } boolQueryBuilder.filter(percolateQueryBuilder) - val queryIndex = monitor.dataSources.queryIndex + val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] + if (queryIndex == null) { + val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + + " sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}" + logger.error(message) + throw AlertingException.wrap( + OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) + ) + } val searchRequest = SearchRequest(queryIndex) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt new file mode 100644 index 000000000..8634e336f --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -0,0 +1,245 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.OpenSearchSecurityException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.DocWriteResponse +import org.opensearch.action.admin.indices.get.GetIndexRequest +import org.opensearch.action.admin.indices.get.GetIndexResponse +import org.opensearch.action.admin.indices.stats.IndicesStatsAction +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.model.MonitorMetadata +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.RestStatus +import org.opensearch.transport.RemoteTransportException + +private val log = LogManager.getLogger(MonitorMetadataService::class.java) + +object MonitorMetadataService : + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("MonitorMetadataService")) { + + private lateinit var client: Client + private lateinit var xContentRegistry: NamedXContentRegistry + private lateinit var clusterService: ClusterService + private lateinit var settings: Settings + + @Volatile private lateinit var indexTimeout: TimeValue + + fun initialize( + client: Client, + clusterService: ClusterService, + xContentRegistry: NamedXContentRegistry, + settings: Settings + ) { + this.clusterService = clusterService + this.client = client + this.xContentRegistry = xContentRegistry + this.settings = settings + this.indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings) + this.clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.INDEX_TIMEOUT) { indexTimeout = it } + } + + @Suppress("ComplexMethod", "ReturnCount") + suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata { + try { + val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .id(metadata.id) + .routing(metadata.monitorId) + .setIfSeqNo(metadata.seqNo) + .setIfPrimaryTerm(metadata.primaryTerm) + .timeout(indexTimeout) + + if (updating) { + indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) + } else { + indexRequest.opType(DocWriteRequest.OpType.CREATE) + } + val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } + when (response.result) { + DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { + val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result" + log.error(failureReason) + throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason)) + } + DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { + log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + } + } + return metadata.copy( + seqNo = response.seqNo, + primaryTerm = response.primaryTerm + ) + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + } + + suspend fun getOrCreateMetadata(monitor: Monitor, createWithRunContext: Boolean = true): Pair { + try { + val created = true + val metadata = getMetadata(monitor) + return if (metadata != null) { + metadata to !created + } else { + val newMetadata = createNewMetadata(monitor, createWithRunContext = createWithRunContext) + upsertMetadata(newMetadata, updating = false) to created + } + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + } + + suspend fun getMetadata(monitor: Monitor): MonitorMetadata? { + try { + val metadataId = MonitorMetadata.getId(monitor) + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, metadataId).routing(monitor.id) + + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + return if (getResponse.isExists) { + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + MonitorMetadata.parse(xcp) + } else { + null + } + } catch (e: Exception) { + if (e.message?.contains("no such index") == true) { + return null + } else { + throw AlertingException.wrap(e) + } + } + } + + suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata { + try { + val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) + (monitor.inputs[0] as DocLevelMonitorInput).indices[0] + else null + val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) + createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap>) + else null + if (runContext != null) { + return metadata.copy( + lastRunContext = runContext + ) + } else { + return metadata + } + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + } + + private suspend fun createNewMetadata(monitor: Monitor, createWithRunContext: Boolean): MonitorMetadata { + val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) + (monitor.inputs[0] as DocLevelMonitorInput).indices[0] + else null + val runContext = + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext) + createFullRunContext(monitorIndex) + else emptyMap() + return MonitorMetadata( + id = "${monitor.id}-metadata", + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + monitorId = monitor.id, + lastActionExecutionTimes = emptyList(), + lastRunContext = runContext, + sourceToQueryIndexMapping = mutableMapOf() + ) + } + + private suspend fun createFullRunContext( + index: String?, + existingRunContext: MutableMap>? = null + ): MutableMap> { + val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf() + try { + if (index == null) return mutableMapOf() + val getIndexRequest = GetIndexRequest().indices(index) + val getIndexResponse: GetIndexResponse = client.suspendUntil { + client.admin().indices().getIndex(getIndexRequest, it) + } + val indices = getIndexResponse.indices() + + indices.forEach { indexName -> + if (!lastRunContext.containsKey(indexName)) { + lastRunContext[indexName] = createRunContextForIndex(index) + } + } + } catch (e: RemoteTransportException) { + val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception + throw AlertingException("Failed fetching index stats", RestStatus.INTERNAL_SERVER_ERROR, unwrappedException) + } catch (e: OpenSearchSecurityException) { + throw AlertingException( + "Failed fetching index stats - missing required index permissions: ${e.localizedMessage}", + RestStatus.INTERNAL_SERVER_ERROR, + e + ) + } catch (e: Exception) { + throw AlertingException("Failed fetching index stats", RestStatus.INTERNAL_SERVER_ERROR, e) + } + return lastRunContext + } + + suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false): MutableMap { + val request = IndicesStatsRequest().indices(index).clear() + val response: IndicesStatsResponse = client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) } + if (response.status != RestStatus.OK) { + val errorMessage = "Failed fetching index stats for index:$index" + throw AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(errorMessage)) + } + val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() } + val lastRunContext = HashMap() + lastRunContext["index"] = index + val count = shards.size + lastRunContext["shards_count"] = count + + for (shard in shards) { + lastRunContext[shard.shardRouting.id.toString()] = + if (createdRecently) -1L + else shard.seqNoStats?.globalCheckpoint ?: SequenceNumbers.UNASSIGNED_SEQ_NO + } + return lastRunContext + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index d4fbb262b..af021bfa2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -10,7 +10,6 @@ import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetDestinationsRequest import org.opensearch.alerting.action.GetDestinationsResponse import org.opensearch.alerting.model.ActionRunResult -import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.opensearchapi.InjectorContextElement @@ -180,8 +179,4 @@ abstract class MonitorRunner { return NotificationActionConfigs(destination, channel) } - - protected fun createMonitorMetadata(monitorId: String): MonitorMetadata { - return MonitorMetadata("$monitorId-metadata", monitorId, emptyList(), emptyMap()) - } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt index 7a96d2a44..0064d0d0d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt @@ -17,11 +17,8 @@ import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper -import org.opensearch.common.xcontent.XContentParser -import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.ScheduledJob -import org.opensearch.index.IndexNotFoundException /** * This is an accessor class to retrieve documents/information from the Alerting config index. @@ -29,28 +26,6 @@ import org.opensearch.index.IndexNotFoundException class AlertingConfigAccessor { companion object { - suspend fun getMonitorMetadata(client: Client, xContentRegistry: NamedXContentRegistry, metadataId: String): MonitorMetadata? { - return try { - val jobSource = getAlertingConfigDocumentSource(client, "Monitor Metadata", metadataId) - withContext(Dispatchers.IO) { - val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - jobSource, XContentType.JSON - ) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - MonitorMetadata.parse(xcp) - } - } catch (e: IllegalStateException) { - if (e.message?.equals("Monitor Metadata document with id $metadataId not found or source is empty") == true) { - return null - } else throw e - } catch (e: IndexNotFoundException) { - if (e.message?.equals("no such index [.opendistro-alerting-config]") == true) { - return null - } else throw e - } - } - suspend fun getEmailAccountInfo(client: Client, xContentRegistry: NamedXContentRegistry, emailAccountId: String): EmailAccount { val source = getAlertingConfigDocumentSource(client, "Email account", emailAccountId) return withContext(Dispatchers.IO) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt index 2007139f1..41fd1362b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt @@ -5,6 +5,7 @@ package org.opensearch.alerting.model +import org.opensearch.alerting.model.destination.Destination.Companion.NO_ID import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable @@ -14,29 +15,40 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.util.instant +import org.opensearch.index.seqno.SequenceNumbers import java.io.IOException import java.time.Instant data class MonitorMetadata( val id: String, + val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, val monitorId: String, val lastActionExecutionTimes: List, - val lastRunContext: Map + val lastRunContext: Map, + // Maps (sourceIndex + monitorId) --> concreteQueryIndex + val sourceToQueryIndexMapping: MutableMap = mutableMapOf() ) : Writeable, ToXContent { @Throws(IOException::class) constructor(sin: StreamInput) : this( id = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), monitorId = sin.readString(), lastActionExecutionTimes = sin.readList(ActionExecutionTime::readFrom), - lastRunContext = Monitor.suppressWarning(sin.readMap()) + lastRunContext = Monitor.suppressWarning(sin.readMap()), + sourceToQueryIndexMapping = sin.readMap() as MutableMap ) override fun writeTo(out: StreamOutput) { out.writeString(id) + out.writeLong(seqNo) + out.writeLong(primaryTerm) out.writeString(monitorId) out.writeCollection(lastActionExecutionTimes) out.writeMap(lastRunContext) + out.writeMap(sourceToQueryIndexMapping as MutableMap) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -45,6 +57,9 @@ data class MonitorMetadata( builder.field(MONITOR_ID_FIELD, monitorId) .field(LAST_ACTION_EXECUTION_FIELD, lastActionExecutionTimes.toTypedArray()) if (lastRunContext.isNotEmpty()) builder.field(LAST_RUN_CONTEXT_FIELD, lastRunContext) + if (sourceToQueryIndexMapping.isNotEmpty()) { + builder.field(SOURCE_TO_QUERY_INDEX_MAP_FIELD, sourceToQueryIndexMapping as MutableMap) + } if (params.paramAsBoolean("with_type", false)) builder.endObject() return builder.endObject() } @@ -54,13 +69,20 @@ data class MonitorMetadata( const val MONITOR_ID_FIELD = "monitor_id" const val LAST_ACTION_EXECUTION_FIELD = "last_action_execution_times" const val LAST_RUN_CONTEXT_FIELD = "last_run_context" + const val SOURCE_TO_QUERY_INDEX_MAP_FIELD = "source_to_query_index_mapping" @JvmStatic @JvmOverloads @Throws(IOException::class) - fun parse(xcp: XContentParser): MonitorMetadata { + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): MonitorMetadata { lateinit var monitorId: String val lastActionExecutionTimes = mutableListOf() var lastRunContext: Map = mapOf() + var sourceToQueryIndexMapping: MutableMap = mutableMapOf() XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -76,14 +98,18 @@ data class MonitorMetadata( } } LAST_RUN_CONTEXT_FIELD -> lastRunContext = xcp.map() + SOURCE_TO_QUERY_INDEX_MAP_FIELD -> sourceToQueryIndexMapping = xcp.map() as MutableMap } } return MonitorMetadata( - "$monitorId-metadata", + if (id != NO_ID) id else "$monitorId-metadata", + seqNo = seqNo, + primaryTerm = primaryTerm, monitorId = monitorId, lastActionExecutionTimes = lastActionExecutionTimes, - lastRunContext = lastRunContext + lastRunContext = lastRunContext, + sourceToQueryIndexMapping = sourceToQueryIndexMapping ) } @@ -92,6 +118,10 @@ data class MonitorMetadata( fun readFrom(sin: StreamInput): MonitorMetadata { return MonitorMetadata(sin) } + + fun getId(monitor: Monitor): String { + return monitor.id + "-metadata" + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index ab57a0d45..13f6e8147 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -13,12 +13,20 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.delete.DeleteResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException @@ -42,6 +50,7 @@ import org.opensearch.index.reindex.BulkByScrollResponse import org.opensearch.index.reindex.DeleteByQueryAction import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import kotlin.coroutines.resume @@ -101,8 +110,8 @@ class TransportDeleteMonitorAction @Inject constructor( if (canDelete) { val deleteResponse = deleteMonitor(monitor) + deleteDocLevelMonitorQueriesAndIndices(monitor) deleteMetadata(monitor) - deleteDocLevelMonitorQueries(monitor) actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)) } else { actionListener.onFailure( @@ -141,22 +150,58 @@ class TransportDeleteMonitorAction @Inject constructor( val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } } - private suspend fun deleteDocLevelMonitorQueries(monitor: Monitor) { + private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { val clusterState = clusterService.state() - if (!clusterState.routingTable.hasIndex(monitor.dataSources.queryIndex)) { - return - } - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(monitor.dataSources.queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return + } + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitorId) + ) + ) + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it ) + } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it + ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } + } } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index dd804e980..e25891a6d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -17,6 +17,7 @@ import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest @@ -125,13 +126,15 @@ class TransportExecuteMonitorAction @Inject constructor( if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { try { scope.launch { - if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) { docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") } + val (metadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor) docLevelMonitorQueries.indexDocLevelQueries( monitor, monitor.id, + metadata, WriteRequest.RefreshPolicy.IMMEDIATE, indexTimeout ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 6c943cef8..5d20b51e4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -19,8 +19,6 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse -import org.opensearch.action.admin.indices.get.GetIndexRequest -import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -31,9 +29,8 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.alerting.DocumentLevelMonitorRunner +import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.core.ScheduledJobIndices -import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings @@ -463,8 +460,6 @@ class TransportIndexMonitorAction @Inject constructor( } private suspend fun indexMonitor() { - var metadata = createMetadata() - if (user != null) { // Use the backend roles which is an intersection of the requested backend roles and the user's backend roles. // Admins can pass in any backend role. Also if no backend role is passed in, all the user's backend roles are used. @@ -495,22 +490,16 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - metadata = metadata.copy(monitorId = indexResponse.id, id = "${indexResponse.id}-metadata") - - // In case the metadata fails to be created, the monitor runner should have logic to recreate and index the metadata. - // This is currently being handled in DocumentLevelMonitor as its the only current monitor to use metadata currently. - // This should be enhanced by having a utility class to handle the logic of management and creation of the metadata. - // Issue to track this: https://github.com/opensearch-project/alerting/issues/445 - val metadataIndexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(request.refreshPolicy) - .source(metadata.toXContent(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(metadata.id) - .timeout(indexTimeout) - client.suspendUntil { client.index(metadataIndexRequest, it) } - + request.monitor = request.monitor.copy(id = indexResponse.id) + var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) + if (created == false) { + log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!") + } if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - indexDocLevelMonitorQueries(request.monitor, indexResponse.id, request.refreshPolicy) + indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(metadata, updating = true) actionListener.onResponse( IndexMonitorResponse( @@ -524,7 +513,12 @@ class TransportIndexMonitorAction @Inject constructor( } @Suppress("UNCHECKED_CAST") - private suspend fun indexDocLevelMonitorQueries(monitor: Monitor, monitorId: String, refreshPolicy: RefreshPolicy) { + private suspend fun indexDocLevelMonitorQueries( + monitor: Monitor, + monitorId: String, + monitorMetadata: MonitorMetadata, + refreshPolicy: RefreshPolicy + ) { val queryIndex = monitor.dataSources.queryIndex if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) { docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) @@ -533,6 +527,7 @@ class TransportIndexMonitorAction @Inject constructor( docLevelMonitorQueries.indexDocLevelQueries( monitor, monitorId, + monitorMetadata, refreshPolicy, indexTimeout ) @@ -626,40 +621,20 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - - val metadata = getMonitorMetadata(client, xContentRegistry, "${request.monitor.id}-metadata") - - if (metadata == null) { - val newMetadata = createMetadata() - val indexMetadataRequest = IndexRequest(SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(request.refreshPolicy) - .source(newMetadata.toXContent(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(newMetadata.id) - .timeout(indexTimeout) - client.suspendUntil { client.index(indexMetadataRequest, it) } - } else if (currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - val monitorIndex = (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] - val runContext = createFullRunContext( - monitorIndex, - metadata.lastRunContext as MutableMap> - ) - val updatedMetadata = metadata.copy(lastRunContext = runContext) - val indexMetadataRequest = IndexRequest(SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(request.refreshPolicy) - .source(updatedMetadata.toXContent(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(metadata.id) - .timeout(indexTimeout) - client.suspendUntil { client.index(indexMetadataRequest, it) } - } - - if (currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + var updatedMetadata: MonitorMetadata + val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) + // Recreate runContext if metadata exists + // Delete and insert all queries from/to queryIndex + if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) client.suspendUntil { DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) .source(currentMonitor.dataSources.queryIndex) .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) .execute(it) } - indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, request.refreshPolicy) + indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) + MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) } actionListener.onResponse( IndexMonitorResponse( @@ -672,34 +647,6 @@ class TransportIndexMonitorAction @Inject constructor( } } - private suspend fun createFullRunContext( - index: String?, - existingRunContext: MutableMap>? = null - ): MutableMap> { - if (index == null) return mutableMapOf() - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = client.suspendUntil { - client.admin().indices().getIndex(getIndexRequest, it) - } - val indices = getIndexResponse.indices() - val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf>() - indices.forEach { indexName -> - if (!lastRunContext.containsKey(indexName)) - lastRunContext[indexName] = DocumentLevelMonitorRunner.createRunContext(clusterService, client, indexName) - } - return lastRunContext - } - - private suspend fun createMetadata(): MonitorMetadata { - val monitorIndex = if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) - (request.monitor.inputs[0] as DocLevelMonitorInput).indices[0] - else null - val runContext = if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) createFullRunContext(monitorIndex) - else emptyMap() - return MonitorMetadata("${request.monitorId}-metadata", request.monitorId, emptyList(), runContext) - } - private fun checkShardsFailure(response: IndexResponse): String? { val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 086c1302c..a7c8fe81c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -6,22 +6,11 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager -import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse -import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.BucketLevelTriggerRunResult -import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.destination.Destination -import org.opensearch.alerting.opensearchapi.suspendUntil -import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings -import org.opensearch.client.Client -import org.opensearch.common.settings.Settings -import org.opensearch.common.xcontent.ToXContent -import org.opensearch.common.xcontent.XContentFactory import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Monitor -import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope @@ -122,13 +111,3 @@ fun defaultToPerExecutionAction( return false } - -suspend fun updateMonitorMetadata(client: Client, settings: Settings, monitorMetadata: MonitorMetadata): IndexResponse { - val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(monitorMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(monitorMetadata.id) - .timeout(AlertingSettings.INDEX_TIMEOUT.get(settings)) - - return client.suspendUntil { client.index(indexRequest, it) } -} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3b036a382..4a892b194 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -6,17 +6,23 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.rollover.RolloverRequest +import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService @@ -27,16 +33,17 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.rest.RestStatus private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) class DocLevelMonitorQueries(private val client: Client, private val clusterService: ClusterService) { companion object { - val PROPERTIES = "properties" - val NESTED = "nested" - val TYPE = "type" - + const val PROPERTIES = "properties" + const val NESTED = "nested" + const val TYPE = "type" + const val INDEX_PATTERN_SUFFIX = "-000001" @JvmStatic fun docLevelQueriesMappings(): String { return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() @@ -45,8 +52,23 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ suspend fun initDocLevelQueryIndex(): Boolean { if (!docLevelQueryIndexExists()) { - val indexRequest = CreateIndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + // Since we changed queryIndex to be alias now, for backwards compatibility, we have to delete index with same name + // as our alias, to avoid name clash. + if (clusterService.state().metadata.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + val acknowledgedResponse: AcknowledgedResponse = client.suspendUntil { + admin().indices().delete(DeleteIndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX), it) + } + if (!acknowledgedResponse.isAcknowledged) { + val errorMessage = "Deletion of old queryIndex [${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}] index is not acknowledged!" + log.error(errorMessage) + throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR)) + } + } + val alias = ScheduledJob.DOC_LEVEL_QUERIES_INDEX + val indexPattern = ScheduledJob.DOC_LEVEL_QUERIES_INDEX + INDEX_PATTERN_SUFFIX + val indexRequest = CreateIndexRequest(indexPattern) .mapping(docLevelQueriesMappings()) + .alias(Alias(alias)) .settings( Settings.builder().put("index.hidden", true) .build() @@ -68,10 +90,24 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ if (dataSources.queryIndex == ScheduledJob.DOC_LEVEL_QUERIES_INDEX) { return initDocLevelQueryIndex() } - val queryIndex = dataSources.queryIndex - if (!clusterService.state().routingTable.hasIndex(queryIndex)) { - val indexRequest = CreateIndexRequest(queryIndex) + // Since we changed queryIndex to be alias now, for backwards compatibility, we have to delete index with same name + // as our alias, to avoid name clash. + if (clusterService.state().metadata.hasIndex(dataSources.queryIndex)) { + val acknowledgedResponse: AcknowledgedResponse = client.suspendUntil { + admin().indices().delete(DeleteIndexRequest(dataSources.queryIndex), it) + } + if (!acknowledgedResponse.isAcknowledged) { + val errorMessage = "Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!" + log.error(errorMessage) + throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR)) + } + } + val alias = dataSources.queryIndex + val indexPattern = dataSources.queryIndex + INDEX_PATTERN_SUFFIX + if (!clusterService.state().metadata.hasAlias(alias)) { + val indexRequest = CreateIndexRequest(indexPattern) .mapping(docLevelQueriesMappings()) + .alias(Alias(alias)) .settings( Settings.builder().put("index.hidden", true) .build() @@ -92,12 +128,12 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { val clusterState = clusterService.state() - return clusterState.routingTable.hasIndex(dataSources.queryIndex) + return clusterState.metadata.hasAlias(dataSources.queryIndex) } fun docLevelQueryIndexExists(): Boolean { val clusterState = clusterService.state() - return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + return clusterState.metadata.hasAlias(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) } /** @@ -155,6 +191,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ suspend fun indexDocLevelQueries( monitor: Monitor, monitorId: String, + monitorMetadata: MonitorMetadata, refreshPolicy: RefreshPolicy = RefreshPolicy.IMMEDIATE, indexTimeout: TimeValue ) { @@ -202,43 +239,153 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) // Updated mappings ready to be applied on queryIndex val updatedProperties = properties + // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. + var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( + monitor, + monitorMetadata, + indexName, + updatedProperties + ) - val queryIndex = monitor.dataSources.queryIndex + if (updateMappingResponse.isAcknowledged) { + doIndexAllQueries(concreteQueryIndex, indexName, monitorId, queries, flattenPaths, refreshPolicy, indexTimeout) + } + } + } + } + } - val updateMappingRequest = PutMappingRequest(queryIndex) + private suspend fun doIndexAllQueries( + concreteQueryIndex: String, + sourceIndex: String, + monitorId: String, + queries: List, + flattenPaths: MutableList, + refreshPolicy: RefreshPolicy, + indexTimeout: TimeValue + ) { + val indexRequests = mutableListOf() + queries.forEach { + var query = it.query + flattenPaths.forEach { fieldPath -> + query = query.replace("$fieldPath:", "${fieldPath}_${sourceIndex}_$monitorId:") + } + val indexRequest = IndexRequest(concreteQueryIndex) + .id(it.id + "_${sourceIndex}_$monitorId") + .source( + mapOf( + "query" to mapOf("query_string" to mapOf("query" to query)), + "monitor_id" to monitorId, + "index" to sourceIndex + ) + ) + indexRequests.add(indexRequest) + } + if (indexRequests.isNotEmpty()) { + val bulkResponse: BulkResponse = client.suspendUntil { + client.bulk( + BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout).add(indexRequests), it + ) + } + bulkResponse.forEach { bulkItemResponse -> + if (bulkItemResponse.isFailed) { + log.debug(bulkItemResponse.failureMessage) + } + } + } + } + + private suspend fun updateQueryIndexMappings( + monitor: Monitor, + monitorMetadata: MonitorMetadata, + sourceIndex: String, + updatedProperties: MutableMap + ): Pair { + var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] + if (targetQueryIndex == null) { + // queryIndex is alias which will always have only 1 backing index which is writeIndex + // This is due to a fact that that _rollover API would maintain only single index under alias + // if you don't add is_write_index setting when creating index initially + targetQueryIndex = getWriteIndexNameForAlias(monitor.dataSources.queryIndex) + if (targetQueryIndex == null) { + val message = "Failed to get write index for queryIndex alias:${monitor.dataSources.queryIndex}" + log.error(message) + throw AlertingException.wrap( + OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) + ) + } + monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] = targetQueryIndex + } + val updateMappingRequest = PutMappingRequest(targetQueryIndex) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + var updateMappingResponse = AcknowledgedResponse(false) + try { + updateMappingResponse = client.suspendUntil { + client.admin().indices().putMapping(updateMappingRequest, it) + } + return Pair(updateMappingResponse, targetQueryIndex) + } catch (e: Exception) { + // If we reached limit for total number of fields in mappings, do a rollover here + if (e.message?.contains("Limit of total fields") == true) { + targetQueryIndex = rolloverQueryIndex(monitor) + try { + // PUT mappings to newly created index + val updateMappingRequest = PutMappingRequest(targetQueryIndex) updateMappingRequest.source(mapOf("properties" to updatedProperties)) - val updateMappingResponse: AcknowledgedResponse = client.suspendUntil { + updateMappingResponse = client.suspendUntil { client.admin().indices().putMapping(updateMappingRequest, it) } - - if (updateMappingResponse.isAcknowledged) { - val indexRequests = mutableListOf() - queries.forEach { - var query = it.query - flattenPaths.forEach { fieldPath -> - query = query.replace("$fieldPath:", "${fieldPath}_${indexName}_$monitorId:") - } - val indexRequest = IndexRequest(queryIndex) - .id(it.id + "_${indexName}_$monitorId") - .source( - mapOf( - "query" to mapOf("query_string" to mapOf("query" to query)), - "monitor_id" to monitorId, - "index" to indexName - ) - ) - indexRequests.add(indexRequest) - } - if (indexRequests.isNotEmpty()) { - val bulkResponse: BulkResponse = client.suspendUntil { - client.bulk( - BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout).add(indexRequests), it - ) - } - } + } catch (e: Exception) { + // If we reached limit for total number of fields in mappings after rollover + // it means that source index has more then (FIELD_LIMIT - 3) fields (every query index has 3 fields defined) + // TODO maybe split queries/mappings between multiple query indices? + if (e.message?.contains("Limit of total fields") == true) { + val errorMessage = + "Monitor [${monitorMetadata.monitorId}] can't process index [$sourceIndex] due to field mapping limit" + log.error(errorMessage) + throw AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, e) + } else { + throw AlertingException.wrap(e) } } + } else { + throw AlertingException.wrap(e) } } + // We did rollover, so try to apply mappings again on new targetQueryIndex + if (targetQueryIndex.isNotEmpty()) { + // add newly created index to monitor's metadata object so that we can fetch it later on, when either applying mappings or running queries + monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] = targetQueryIndex + } else { + val failureMessage = "Failed to resolve targetQueryIndex!" + log.error(failureMessage) + throw AlertingException(failureMessage, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureMessage)) + } + return Pair(updateMappingResponse, targetQueryIndex) + } + + private suspend fun rolloverQueryIndex(monitor: Monitor): String? { + val queryIndex = monitor.dataSources.queryIndex + val queryIndexPattern = monitor.dataSources.queryIndex + INDEX_PATTERN_SUFFIX + + val request = RolloverRequest(queryIndex, null) + request.createIndexRequest.index(queryIndexPattern) + .mapping(docLevelQueriesMappings()) + .settings(Settings.builder().put("index.hidden", true).build()) + val response: RolloverResponse = client.suspendUntil { + client.admin().indices().rolloverIndex(request, it) + } + if (response.isRolledOver == false) { + val message = "failed to rollover queryIndex:$queryIndex queryIndexPattern:$queryIndexPattern" + log.error(message) + throw AlertingException.wrap( + OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) + ) + } + return response.newIndex + } + + private fun getWriteIndexNameForAlias(alias: String): String? { + return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 952c7f1db..94c6fd7a4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -74,7 +74,6 @@ import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL -import kotlin.collections.HashMap /** * Superclass for tests that interact with an external test cluster using OpenSearch's RestClient @@ -686,7 +685,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } protected fun refreshIndex(index: String): Response { - val response = client().makeRequest("POST", "/$index/_refresh") + val response = client().makeRequest("POST", "/$index/_refresh?expand_wildcards=all") assertEquals("Unable to refresh index", RestStatus.OK, response.restStatus()) return response } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index a0bbbd074..38104c747 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -8,12 +8,16 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.get.GetIndexRequest +import org.opensearch.action.admin.indices.get.GetIndexResponse +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.action.SearchMonitorRequest +import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.transport.AlertingSingleNodeTestCase import org.opensearch.common.settings.Settings @@ -25,6 +29,8 @@ import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.Table import org.opensearch.index.query.MatchQueryBuilder @@ -290,16 +296,15 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) searchAlerts(id) - val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get() - val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping() - Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true) + val mapping = client().admin().indices().getMappings(GetMappingsRequest().indices(customQueryIndex)).get() + Assert.assertTrue(mapping.toString().contains("\"analyzer\":\"$analyzer\"")) } fun `test delete monitor deletes all queries and metadata too`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) - val customQueryIndex = "custom_alerts_index" + val customQueryIndex = "custom_query_index" val analyzer = "whitespace" var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), @@ -325,22 +330,17 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) searchAlerts(monitorId) val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get() - val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping() - Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true) + val mapping = client().admin().indices().getMappings(GetMappingsRequest().indices(customQueryIndex)).get() + Assert.assertTrue(mapping.toString().contains("\"analyzer\":\"$analyzer\"") == true) // Verify queries exist var searchResponse = client().search( SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).get() assertNotEquals(0, searchResponse.hits.hits.size) - client().execute( - AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE) - ).get() - client().admin().indices().refresh(RefreshRequest(customQueryIndex)).get() - // Verify queries are deleted - searchResponse = client().search( - SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) - ).get() - assertEquals(0, searchResponse.hits.hits.size) + + deleteMonitor(monitorId) + assertIndexNotExists(customQueryIndex + "*") + assertAliasNotExists(customQueryIndex) } fun `test execute monitor with custom findings index and pattern`() { @@ -387,7 +387,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(indices.isNotEmpty()) } - fun `test execute pre-existing monitorand update`() { + fun `test execute pre-existing monitor and update`() { val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) .settings(Settings.builder().put("index.hidden", true).build()) client().admin().indices().create(request) @@ -465,7 +465,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertNotNull(executeMonitorResponse.monitorRunResult.monitorName) } val alerts = searchAlerts(monitorId) - assertEquals(alerts.size, 1) + assertEquals(1, alerts.size) val customAlertsIndex = "custom_alerts_index" val customQueryIndex = "custom_query_index" @@ -866,4 +866,285 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(getAlertsByWrongAlertIds != null) Assert.assertEquals(getAlertsByWrongAlertIds.alerts.size, 0) } + + fun `test queryIndex rollover and delete monitor success`() { + + val testSourceIndex = "test_source_index" + createIndex(testSourceIndex, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api + val docPayload: StringBuilder = StringBuilder(100000) + docPayload.append("{") + for (i in 1..330) { + docPayload.append(""" "id$i.somefield.somefield$i":$i,""") + } + docPayload.append("\"test_field\" : \"us-west-2\" }") + indexDoc(testSourceIndex, "1", docPayload.toString()) + // Create monitor #1 + var monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + // Execute monitor #1 + var executeMonitorResponse = executeMonitor(monitor, monitorResponse.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + // Create monitor #2 + var monitorResponse2 = createMonitor(monitor) + assertFalse(monitorResponse2?.id.isNullOrEmpty()) + monitor = monitorResponse2!!.monitor + // Insert doc #2. This one should trigger creation of alerts during monitor exec + val testDoc = """{ + "test_field" : "us-west-2" + }""" + indexDoc(testSourceIndex, "2", testDoc) + // Execute monitor #2 + var executeMonitorResponse2 = executeMonitor(monitor, monitorResponse2.id, false) + Assert.assertEquals(executeMonitorResponse2!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse2.monitorRunResult.triggerResults.size, 1) + + refreshIndex(AlertIndices.ALERT_INDEX) + var alerts = searchAlerts(monitorResponse2.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 1) + + // Both monitors used same queryIndex alias. Since source index has close to limit amount of fields in mappings, + // we expect that creation of second monitor would trigger rollover of queryIndex + var getIndexResponse: GetIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(2, getIndexResponse.indices.size) + assertEquals(DOC_LEVEL_QUERIES_INDEX + "-000001", getIndexResponse.indices[0]) + assertEquals(DOC_LEVEL_QUERIES_INDEX + "-000002", getIndexResponse.indices[1]) + // Now we'll verify that execution of both monitors still works + indexDoc(testSourceIndex, "3", testDoc) + // Exec Monitor #1 + executeMonitorResponse = executeMonitor(monitor, monitorResponse.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + refreshIndex(AlertIndices.ALERT_INDEX) + alerts = searchAlerts(monitorResponse.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 2) + // Exec Monitor #2 + executeMonitorResponse = executeMonitor(monitor, monitorResponse2.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + refreshIndex(AlertIndices.ALERT_INDEX) + alerts = searchAlerts(monitorResponse2.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 2) + // Delete monitor #1 + client().execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorResponse.id, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + // Expect first concrete queryIndex to be deleted since that one was only used by this monitor + getIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(1, getIndexResponse.indices.size) + assertEquals(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "-000002", getIndexResponse.indices[0]) + // Delete monitor #2 + client().execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorResponse2.id, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + // Expect second concrete queryIndex to be deleted since that one was only used by this monitor + getIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(0, getIndexResponse.indices.size) + } + + fun `test queryIndex rollover failure source_index field count over limit`() { + + val testSourceIndex = "test_source_index" + createIndex(testSourceIndex, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // This doc should create 999 fields in mapping, only 1 field less then limit + val docPayload: StringBuilder = StringBuilder(100000) + docPayload.append("{") + for (i in 1..998) { + docPayload.append(""" "id$i":$i,""") + } + docPayload.append("\"test_field\" : \"us-west-2\" }") + indexDoc(testSourceIndex, "1", docPayload.toString()) + // Create monitor and expect failure. + // queryIndex has 3 fields in mappings initially so 999 + 3 > 1000(default limit) + try { + createMonitor(monitor) + } catch (e: Exception) { + assertTrue(e.message?.contains("can't process index [$testSourceIndex] due to field mapping limit") ?: false) + } + } + + fun `test queryIndex not rolling over multiple monitors`() { + val testSourceIndex = "test_source_index" + createIndex(testSourceIndex, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // Create doc with 11 fields + val docPayload: StringBuilder = StringBuilder(1000) + docPayload.append("{") + for (i in 1..10) { + docPayload.append(""" "id$i":$i,""") + } + docPayload.append("\"test_field\" : \"us-west-2\" }") + indexDoc(testSourceIndex, "1", docPayload.toString()) + // Create monitor #1 + var monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + // Execute monitor #1 + var executeMonitorResponse = executeMonitor(monitor, monitorResponse.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + // Create monitor #2 + var monitorResponse2 = createMonitor(monitor) + assertFalse(monitorResponse2?.id.isNullOrEmpty()) + monitor = monitorResponse2!!.monitor + // Insert doc #2. This one should trigger creation of alerts during monitor exec + val testDoc = """{ + "test_field" : "us-west-2" + }""" + indexDoc(testSourceIndex, "2", testDoc) + // Execute monitor #2 + var executeMonitorResponse2 = executeMonitor(monitor, monitorResponse2.id, false) + Assert.assertEquals(executeMonitorResponse2!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse2.monitorRunResult.triggerResults.size, 1) + + refreshIndex(AlertIndices.ALERT_INDEX) + var alerts = searchAlerts(monitorResponse2.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 1) + + // Both monitors used same queryIndex. Since source index has well below limit amount of fields in mappings, + // we expect only 1 backing queryIndex + val getIndexResponse: GetIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(1, getIndexResponse.indices.size) + // Now we'll verify that execution of both monitors work + indexDoc(testSourceIndex, "3", testDoc) + // Exec Monitor #1 + executeMonitorResponse = executeMonitor(monitor, monitorResponse.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + refreshIndex(AlertIndices.ALERT_INDEX) + alerts = searchAlerts(monitorResponse.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 2) + // Exec Monitor #2 + executeMonitorResponse = executeMonitor(monitor, monitorResponse2.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + refreshIndex(AlertIndices.ALERT_INDEX) + alerts = searchAlerts(monitorResponse2.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 2) + } + + /** + * 1. Create monitor with input source_index with 900 fields in mappings - can fit 1 in queryIndex + * 2. Update monitor and change input source_index to a new one with 900 fields in mappings + * 3. Expect queryIndex rollover resulting in 2 backing indices + * 4. Delete monitor and expect that all backing indices are deleted + * */ + fun `test updating monitor no execution queryIndex rolling over`() { + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api + val docPayload: StringBuilder = StringBuilder(100000) + docPayload.append("{") + for (i in 1..899) { + docPayload.append(""" "id$i":$i,""") + } + docPayload.append("\"test_field\" : \"us-west-2\" }") + // Indexing docs here as an easier means to set index mappings + indexDoc(testSourceIndex1, "1", docPayload.toString()) + indexDoc(testSourceIndex2, "1", docPayload.toString()) + // Create monitor + var monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + // Update monitor and change input + val updatedMonitor = monitor.copy( + inputs = listOf( + DocLevelMonitorInput("description", listOf(testSourceIndex2), listOf(docQuery)) + ) + ) + updateMonitor(updatedMonitor, updatedMonitor.id) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + // Expect queryIndex to rollover after setting new source_index with close to limit amount of fields in mappings + var getIndexResponse: GetIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(2, getIndexResponse.indices.size) + + deleteMonitor(updatedMonitor.id) + waitUntil { + getIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + return@waitUntil getIndexResponse.indices.isEmpty() + } + assertEquals(0, getIndexResponse.indices.size) + } + + fun `test queryIndex bwc when index was not an alias`() { + createIndex(DOC_LEVEL_QUERIES_INDEX, Settings.builder().put("index.hidden", true).build()) + assertIndexExists(DOC_LEVEL_QUERIES_INDEX) + + val testSourceIndex = "test_source_index" + createIndex(testSourceIndex, Settings.EMPTY) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // This doc should create 999 fields in mapping, only 1 field less then limit + val docPayload = "{\"test_field\" : \"us-west-2\" }" + // Create monitor + try { + var monitorResponse = createMonitor(monitor) + indexDoc(testSourceIndex, "1", docPayload) + var executeMonitorResponse = executeMonitor(monitor, monitorResponse!!.id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + refreshIndex(AlertIndices.ALERT_INDEX) + val alerts = searchAlerts(monitorResponse.id) + Assert.assertTrue(alerts != null) + Assert.assertTrue(alerts.size == 1) + // check if DOC_LEVEL_QUERIES_INDEX alias exists + assertAliasExists(DOC_LEVEL_QUERIES_INDEX) + } catch (e: Exception) { + fail("Exception happend but it shouldn't!") + } + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index 55263b536..324009ec0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -62,7 +62,7 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(createRandomMonitor()) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) - verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5) + verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 6) verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 4) verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 4) } @@ -86,7 +86,7 @@ class AlertIndicesIT : AlertingRestTestCase() { val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) executeMonitor(trueMonitor.id) assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) - verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5) + verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 6) verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 1) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt index c2d49a8f3..19af610bc 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt @@ -80,7 +80,13 @@ class XContentTests : OpenSearchTestCase() { } fun `test MonitorMetadata`() { - val monitorMetadata = MonitorMetadata("monitorId-metadata", "monitorId", emptyList(), emptyMap()) + val monitorMetadata = MonitorMetadata( + id = "monitorId-metadata", + monitorId = "monitorId", + lastActionExecutionTimes = emptyList(), + lastRunContext = emptyMap(), + sourceToQueryIndexMapping = mutableMapOf() + ) val monitorMetadataString = monitorMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string() val parsedMonitorMetadata = MonitorMetadata.parse(parser(monitorMetadataString)) assertEquals("Round tripping MonitorMetadata doesn't work", monitorMetadata, parsedMonitorMetadata) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index 32451e9e5..fbe0e8b0f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -61,6 +61,7 @@ import org.opensearch.test.rest.OpenSearchRestTestCase import java.time.Instant import java.time.ZoneId import java.time.temporal.ChronoUnit +import java.util.concurrent.TimeUnit @TestLogging("level:DEBUG", reason = "Debug for tests.") @Suppress("UNCHECKED_CAST") @@ -866,15 +867,27 @@ class MonitorRestApiIT : AlertingRestTestCase() { updatedMonitor.toHttpEntity() ) assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus()) - - // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) - + // Wait until postIndex hook is executed due to monitor update + waitUntil({ + val alerts = searchAlerts(monitor) + if (alerts.size == 1) { + return@waitUntil true + } + return@waitUntil false + }, 60, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) // We have two alerts from above, 1 for each trigger, there should be only 1 left in active index assertEquals("One alert should be in active index", 1, alerts.size) assertEquals("Wrong alert in active index", alertKeep.toJsonString(), alerts.single().toJsonString()) + waitUntil({ + val alerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) + if (alerts.size == 1) { + return@waitUntil true + } + return@waitUntil false + }, 60, TimeUnit.SECONDS) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) // Only alertDelete should of been moved to history index assertEquals("One alert should be in history index", 1, historyAlerts.size) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index 61e788a32..de8dc8319 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -6,10 +6,13 @@ package org.opensearch.alerting.transport import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest +import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequestBuilder import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.refresh.RefreshAction import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.action.ExecuteMonitorAction @@ -23,6 +26,7 @@ import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest @@ -40,7 +44,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.test.OpenSearchSingleNodeTestCase import java.time.Instant -import java.util.* +import java.util.Locale /** * A test that keep a singleton node started for all tests that can be used to get @@ -88,6 +92,45 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { .setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get() } + protected fun assertIndexExists(index: String) { + val getIndexResponse = + client().admin().indices().getIndex( + GetIndexRequest().indices(index).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN) + ).get() + assertTrue(getIndexResponse.indices.size > 0) + } + + protected fun assertIndexNotExists(index: String) { + val getIndexResponse = + client().admin().indices().getIndex( + GetIndexRequest().indices(index).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN) + ).get() + assertFalse(getIndexResponse.indices.size > 0) + } + + protected fun assertAliasNotExists(alias: String) { + val aliasesResponse = client().admin().indices().getAliases(GetAliasesRequest()).get() + val foundAlias = aliasesResponse.aliases.values().forEach { + it.value.forEach { + if (it.alias == alias) { + fail("alias exists, but it shouldn't") + } + } + } + } + + protected fun assertAliasExists(alias: String) { + val aliasesResponse = client().admin().indices().getAliases(GetAliasesRequest()).get() + val foundAlias = aliasesResponse.aliases.values().forEach { + it.value.forEach { + if (it.alias == alias) { + return + } + } + } + fail("alias doesn't exists, but it should") + } + protected fun createMonitor(monitor: Monitor): IndexMonitorResponse? { val request = IndexMonitorRequest( monitorId = Monitor.NO_ID, @@ -112,6 +155,13 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { return client().execute(AlertingActions.INDEX_MONITOR_ACTION_TYPE, request).actionGet() } + protected fun deleteMonitor(monitorId: String): Boolean { + client().execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + return true + } + protected fun searchAlerts(id: String, indices: String = AlertIndices.ALERT_INDEX, refresh: Boolean = true): List { try { if (refresh) refreshIndex(indices) diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 5ee414c0e..4f8d71d82 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 5 + "schema_version": 6 }, "properties": { "monitor": { @@ -512,6 +512,10 @@ "last_run_context": { "type": "object", "enabled": false + }, + "source_to_query_index_mapping": { + "type": "object", + "enabled": false } } }