From a9715f2db9527699ee40d33eeacb6881ca0d27ee Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sun, 18 Feb 2024 10:30:25 +0000 Subject: [PATCH] distributed job-scheduler --- .../alerting/DocumentLevelMonitorRunner.kt | 465 ++++++++++-------- .../alerting/model/MonitorRunResult.kt | 4 +- .../alerting/service/DeleteMonitorService.kt | 25 + .../TransportExecuteMonitorAction.kt | 142 +++++- .../transport/TransportIndexMonitorAction.kt | 112 ++--- .../alerting/alerts/finding_mapping.json | 3 + .../alerting/AlertingRestTestCase.kt | 2 +- .../alerting/DocumentMonitorRunnerIT.kt | 26 +- .../org/opensearch/alerting/TestHelpers.kt | 3 +- .../resources/mappings/scheduled-jobs.json | 4 +- 10 files changed, 503 insertions(+), 283 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index d36587579..276d8a6e8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -6,6 +6,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest import org.opensearch.action.admin.indices.refresh.RefreshAction @@ -18,12 +19,14 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.workflow.WorkflowRunContext @@ -52,6 +55,7 @@ import org.opensearch.core.common.bytes.BytesReference import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders @@ -62,6 +66,7 @@ import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID +import kotlin.math.max object DocumentLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) @@ -75,262 +80,299 @@ object DocumentLevelMonitorRunner : MonitorRunner() { workflowRunContext: WorkflowRunContext?, executionId: String ): MonitorRunResult { - return MonitorRunResult("", Instant.now(), Instant.now()) -/* logger.debug("Document-level-monitor is running ...") - val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID - var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + if (monitor.isChild!!) { + logger.debug("Document-level-monitor is running ...") + val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID + var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + val monitorShards = monitor.shards.toMutableList() - try { - monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) - monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) - monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources) - } catch (e: Exception) { - val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id - logger.error("Error setting up alerts and findings indices for monitor: $id", e) - monitorResult = monitorResult.copy(error = AlertingException.wrap(e)) - } - - try { - validate(monitor) - } catch (e: Exception) { - logger.error("Failed to start Document-level-monitor. Error: ${e.message}") - monitorResult = monitorResult.copy(error = AlertingException.wrap(e)) - } - - var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata( - monitor = monitor, - createWithRunContext = false, - skipIndex = isTempMonitor, - workflowRunContext?.workflowMetadataId - ) - - val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput - - val queries: List = docLevelMonitorInput.queries - - val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() - else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> - - val updatedLastRunContext = lastRunContext.toMutableMap() - - val queryToDocIds = mutableMapOf>() - val inputRunResults = mutableMapOf>() - val docsToQueries = mutableMapOf>() + try { + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) + monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources) + } catch (e: Exception) { + val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id + logger.error("Error setting up alerts and findings indices for monitor: $id", e) + monitorResult = monitorResult.copy(error = AlertingException.wrap(e)) + } - try { - // Resolve all passed indices to concrete indices - val allConcreteIndices = IndexUtils.resolveAllIndices( - docLevelMonitorInput.indices, - monitorCtx.clusterService!!, - monitorCtx.indexNameExpressionResolver!! - ) - if (allConcreteIndices.isEmpty()) { - logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}") - throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) + try { + validate(monitor) + } catch (e: Exception) { + logger.error("Failed to start Document-level-monitor. Error: ${e.message}") + monitorResult = monitorResult.copy(error = AlertingException.wrap(e)) } - monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) - monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( + var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata( monitor = monitor, - monitorId = monitor.id, - monitorMetadata, - indexTimeout = monitorCtx.indexTimeout!! + createWithRunContext = false, + skipIndex = isTempMonitor, + workflowRunContext?.workflowMetadataId ) - // cleanup old indices that are not monitored anymore from the same monitor - val runContextKeys = updatedLastRunContext.keys.toMutableSet() - for (ind in runContextKeys) { - if (!allConcreteIndices.contains(ind)) { - updatedLastRunContext.remove(ind) - } - } + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + + val queries: List = docLevelMonitorInput.queries + + val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() + else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> - // Map of document ids per index when monitor is workflow delegate and has chained findings - val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val updatedLastRunContext = lastRunContext.toMutableMap() - docLevelMonitorInput.indices.forEach { indexName -> - var concreteIndices = IndexUtils.resolveAllIndices( - listOf(indexName), + val queryToDocIds = mutableMapOf>() + val inputRunResults = mutableMapOf>() + val docsToQueries = mutableMapOf>() + + try { + // Resolve all passed indices to concrete indices + val allConcreteIndices = IndexUtils.resolveAllIndices( + docLevelMonitorInput.indices, monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) - var lastWriteIndex: String? = null - if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || - IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) - ) { - lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } - if (lastWriteIndex != null) { - val lastWriteIndexCreationDate = - IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) - concreteIndices = IndexUtils.getNewestIndicesByCreationDate( - concreteIndices, - monitorCtx.clusterService!!.state(), - lastWriteIndexCreationDate - ) - } + if (allConcreteIndices.isEmpty()) { + logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}") + throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) } - val updatedIndexName = indexName.replace("*", "_") - val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( - monitorCtx.clusterService!!.state(), - concreteIndices + + monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) + monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( + monitor = monitor, + monitorId = monitor.id, + monitorMetadata, + indexTimeout = monitorCtx.indexTimeout!! ) - concreteIndices.forEach { concreteIndexName -> - // Prepare lastRunContext for each index - val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { - val isIndexCreatedRecently = createdRecently( - monitor, - periodStart, - periodEnd, - monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) - ) - MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently, monitor.shards) + // cleanup old indices that are not monitored anymore from the same monitor + val runContextKeys = updatedLastRunContext.keys.toMutableSet() + for (ind in runContextKeys) { + if (!allConcreteIndices.contains(ind)) { + updatedLastRunContext.remove(ind) } + } - // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( - indexLastRunContext.toMutableMap(), - monitorCtx, - concreteIndexName, - monitor.shards - ) as MutableMap + // Map of document ids per index when monitor is workflow delegate and has chained findings + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + + docLevelMonitorInput.indices.forEach { indexName -> + var concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + var lastWriteIndex: String? = null if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) ) { - if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { - updatedLastRunContext.remove(lastWriteIndex) - updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) + + val tempMonitorShards = mutableListOf() + monitorShards.forEach { + val resolvedMonitorShards = mutableListOf() + if (it.startsWith(indexName)) { + concreteIndices.forEach { concreteIndex -> + resolvedMonitorShards.add("$concreteIndex:${it.split(":")[1]}") + } + } + tempMonitorShards.addAll(resolvedMonitorShards) + } + monitorShards.clear() + monitorShards.addAll(tempMonitorShards) } - } else { - updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext } + val updatedIndexName = indexName.replace("*", "_") + val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( + monitorCtx.clusterService!!.state(), + concreteIndices + ) - for (shardInfo in monitor.shards) { - val shard = shardInfo.split(":")[1] - // update lastRunContext if its a temp monitor as we only want to view the last bit of data then - // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data - if (isTempMonitor) { - indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + concreteIndices.forEach { concreteIndexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + ) + MonitorMetadataService.createRunContextForIndex( + concreteIndexName, + isIndexCreatedRecently, + monitorShards + ) } - } - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - val matchingDocs = getMatchingDocs( - monitor, - monitorCtx, - docExecutionContext, - updatedIndexName, - concreteIndexName, - conflictingFields.toList(), - matchingDocIdsPerIndex?.get(concreteIndexName) - ) - logger.info("shards-${monitor.shards}") - logger.info("no. of docs-${matchingDocs.map { it.first }}") - - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( + // Prepare updatedLastRunContext for each index + val indexUpdatedRunContext = updateLastRunContext( + indexLastRunContext.toMutableMap(), monitorCtx, - matchingDocs.map { it.second }, + concreteIndexName, + monitorShards + ) as MutableMap + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + if (concreteIndexName == IndexUtils.getWriteIndex( + indexName, + monitorCtx.clusterService!!.state() + ) + ) { + updatedLastRunContext.remove(lastWriteIndex) + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + } else { + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + + for (shardInfo in monitorShards) { + val shard = shardInfo.split(":")[1] + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + indexLastRunContext[shard] = + max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + } + } + + // Prepare DocumentExecutionContext for each index + val docExecutionContext = + DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) + + val matchingDocs = getMatchingDocs( monitor, - monitorMetadata, + monitorCtx, + docExecutionContext, updatedIndexName, - concreteIndexName + concreteIndexName, + conflictingFields.toList(), + monitorShards, + matchingDocIdsPerIndex?.get(concreteIndexName) ) - - matchedQueriesForDocs.forEach { hit -> - val id = hit.id - .replace("_${updatedIndexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + logger.info("shards-$monitorShards") + logger.info("no. of docs-${matchingDocs.map { it.first }}") + + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries( + monitorCtx, + matchingDocs.map { it.second }, + monitor, + monitorMetadata, + updatedIndexName, + concreteIndexName + ) + + matchedQueriesForDocs.forEach { hit -> + val id = hit.id + .replace("_${updatedIndexName}_${monitor.id}", "") + .replace("_${concreteIndexName}_${monitor.id}", "") + + val docIndices = + hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + } } } } } - } - monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) + monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) - /* + /* populate the map queryToDocIds with pairs of this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser */ - queries.forEach { - if (inputRunResults.containsKey(it.id)) { - queryToDocIds[it] = inputRunResults[it.id]!! + queries.forEach { + if (inputRunResults.containsKey(it.id)) { + queryToDocIds[it] = inputRunResults[it.id]!! + } } - } - val idQueryMap: Map = queries.associateBy { it.id } + val idQueryMap: Map = queries.associateBy { it.id } - val triggerResults = mutableMapOf() - // If there are no triggers defined, we still want to generate findings - if (monitor.triggers.isEmpty()) { - if (dryrun == false && monitor.id != Monitor.NO_ID) { - createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) - } - } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorCtx, - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun, - executionId = executionId, - workflowRunContext = workflowRunContext - ) - } - } - // Don't update monitor if this is a test monitor - if (!isTempMonitor) { - // If any error happened during trigger execution, upsert monitor error alert - val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) - if (errorMessage.isNotEmpty()) { - monitorCtx.alertService!!.upsertMonitorErrorAlert( - monitor = monitor, - errorMessage = errorMessage, - executionId = executionId, - workflowRunContext - ) + val triggerResults = mutableMapOf() + // If there are no triggers defined, we still want to generate findings + if (monitor.triggers.isEmpty()) { + if (dryrun == false && monitor.id != Monitor.NO_ID) { + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) + } } else { - onSuccessfulMonitorRun(monitorCtx, monitor) + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorCtx, + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + dryrun, + executionId = executionId, + workflowRunContext = workflowRunContext + ) + } } + // Don't update monitor if this is a test monitor + if (!isTempMonitor) { + // If any error happened during trigger execution, upsert monitor error alert + val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + if (errorMessage.isNotEmpty()) { + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = executionId, + workflowRunContext + ) + } else { + onSuccessfulMonitorRun(monitorCtx, monitor) + } - updatedLastRunContext.forEach { - logger.info(it.key) - it.value.forEach { it1 -> - logger.info(it1.key + "-" + it1.value) + updatedLastRunContext.forEach { + logger.info(it.key) + it.value.forEach { it1 -> + logger.info(it1.key + "-" + it1.value) + } } + MonitorMetadataService.upsertMetadata( + monitorMetadata.copy(lastRunContext = updatedLastRunContext), + true + ) } - MonitorMetadataService.upsertMetadata( - monitorMetadata.copy(lastRunContext = updatedLastRunContext), - true + + // TODO: Update the Document as part of the Trigger and return back the trigger action result + return monitorResult.copy(triggerResults = triggerResults) + } catch (e: Exception) { + val errorMessage = ExceptionsHelper.detailedMessage(e) + monitorCtx.alertService!!.upsertMonitorErrorAlert( + monitor, + errorMessage, + executionId, + workflowRunContext + ) + logger.error("Failed running Document-level-monitor ${monitor.name}", e) + val alertingException = AlertingException( + errorMessage, + RestStatus.INTERNAL_SERVER_ERROR, + e + ) + return monitorResult.copy( + error = alertingException, + inputResults = InputRunResults(emptyList(), alertingException) ) } - - // TODO: Update the Document as part of the Trigger and return back the trigger action result - return monitorResult.copy(triggerResults = triggerResults) - } catch (e: Exception) { - val errorMessage = ExceptionsHelper.detailedMessage(e) - monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext) - logger.error("Failed running Document-level-monitor ${monitor.name}", e) - val alertingException = AlertingException( - errorMessage, - RestStatus.INTERNAL_SERVER_ERROR, - e - ) - return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) - }*/ + } else { + return MonitorRunResult("", Instant.now(), Instant.now()) + } } private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) { @@ -374,7 +416,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { workflowRunContext: WorkflowRunContext?, executionId: String ): DocumentLevelTriggerRunResult { - val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) + val triggerCtx = DocumentLevelTriggerExecutionContext(monitor.copy(id = monitor.owner!!), trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) val triggerFindingDocPairs = mutableListOf>() @@ -490,7 +532,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { id = UUID.randomUUID().toString(), relatedDocIds = listOf(docIndex[0]), correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, + monitorId = monitor.owner!!, monitorName = monitor.name, index = docIndex[1], docLevelQueries = triggeredQueries, @@ -652,10 +694,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { index: String, concreteIndex: String, conflictingFields: List, + monitorShards: List, docIds: List? = null ): List> { val matchingDocs = mutableListOf>() - for (shardInfo in monitor.shards) { + for (shardInfo in monitorShards) { try { val shard = shardInfo.split(":")[1] val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt index 18a433848..527beaa6a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt @@ -23,7 +23,7 @@ data class MonitorRunResult( val monitorName: String, val periodStart: Instant, val periodEnd: Instant, - val error: Exception? = null, + var error: Exception? = null, val inputResults: InputRunResults = InputRunResults(), val triggerResults: Map = mapOf() ) : Writeable, ToXContent { @@ -53,7 +53,7 @@ data class MonitorRunResult( /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ fun alertError(): AlertError? { if (error != null) { - return AlertError(Instant.now(), "Failed running monitor:\n${error.userErrorMessage()}") + return AlertError(Instant.now(), "Failed running monitor:\n${error!!.userErrorMessage()}") } if (inputResults.error != null) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt index 97d35e52e..f896d19ee 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -28,6 +28,8 @@ import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGAT import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_MONITOR_PATH import org.opensearch.alerting.util.use import org.opensearch.client.Client +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob @@ -65,6 +67,7 @@ object DeleteMonitorService : val deleteResponse = deleteMonitor(monitor.id, refreshPolicy) deleteDocLevelMonitorQueriesAndIndices(monitor) deleteMetadata(monitor) + deleteChildDocLevelMonitors(monitor, refreshPolicy) return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version) } @@ -148,6 +151,28 @@ object DeleteMonitorService : } } + private suspend fun deleteChildDocLevelMonitors(monitor: Monitor, refreshPolicy: RefreshPolicy) { + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + val request: SearchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source( + SearchSourceBuilder() + .query(QueryBuilders.matchQuery("monitor.owner", monitor.id)) + .size(10000) + ) + val response = client.suspendUntil { client.search(request, it) } + response.hits.forEach { childMonitor -> + val deleteMonitorResponse = client.suspendUntil { + client.execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, + DeleteMonitorRequest(childMonitor.id, refreshPolicy), + it + ) + } + } + } + } + /** * Checks if the monitor is part of the workflow * diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index b0de10ff0..b9ecd8468 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -11,8 +11,13 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException +import org.opensearch.action.admin.indices.stats.IndicesStatsAction +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest @@ -21,6 +26,11 @@ import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest import org.opensearch.alerting.action.ExecuteMonitorResponse +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.InputRunResults +import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.TriggerRunResult +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.DocLevelMonitorQueries @@ -33,12 +43,15 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.authuser.User import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.time.Instant @@ -91,6 +104,70 @@ class TransportExecuteMonitorAction @Inject constructor( } } } + val executeMonitors = fun(monitors: List) { + // Launch the coroutine with the clients threadContext. This is needed to preserve authentication information + // stored on the threadContext set by the security plugin when using the Alerting plugin with the Security plugin. + // runner.launch(ElasticThreadContextElement(client.threadPool().threadContext)) { + runner.launch { + val (periodStart, periodEnd) = + monitors[0].schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis)) + val inputRunResults = mutableMapOf>() + val triggerRunResults = mutableMapOf() + val monitorRunResult = MonitorRunResult(monitors[0].name, periodStart, periodEnd) + monitors.forEach { monitor -> + log.info( + "Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " + + "periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}" + ) + val childMonitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun) + if (childMonitorRunResult.error != null) { + monitorRunResult.error = childMonitorRunResult.error + } else { + val childInputRunResults = childMonitorRunResult.inputResults.results[0] + childInputRunResults.forEach { + if (inputRunResults.containsKey(it.key)) { + val existingResults = inputRunResults[it.key] + existingResults!!.addAll(it.value as Set) + inputRunResults[it.key] = existingResults + } else { + inputRunResults[it.key] = it.value as MutableSet + } + } + childMonitorRunResult.triggerResults.forEach { + if (triggerRunResults.containsKey(it.key)) { + val newDocs = mutableListOf() + val existingResults = triggerRunResults[it.key] + + newDocs.addAll(existingResults!!.triggeredDocs) + newDocs.addAll((it.value as DocumentLevelTriggerRunResult).triggeredDocs) + + triggerRunResults[it.key] = existingResults.copy(triggeredDocs = newDocs) + } else { + triggerRunResults[it.key] = it.value as DocumentLevelTriggerRunResult + } + } + } + } + + try { + withContext(Dispatchers.IO) { + actionListener.onResponse( + ExecuteMonitorResponse( + monitorRunResult.copy( + inputResults = InputRunResults(listOf(inputRunResults)), + triggerResults = triggerRunResults + ) + ) + ) + } + } catch (e: Exception) { + log.error("Unexpected error running monitor", e) + withContext(Dispatchers.IO) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + } + } if (execMonitorRequest.monitorId != null) { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) @@ -112,7 +189,41 @@ class TransportExecuteMonitorAction @Inject constructor( response.sourceAsBytesRef, XContentType.JSON ).use { xcp -> val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor - executeMonitor(monitor) + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + val request: SearchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source( + SearchSourceBuilder() + .query(QueryBuilders.matchQuery("monitor.owner", monitor.id)) + .size(10000) + ) + client.search( + request, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val childMonitors = mutableListOf() + response.hits.forEach { hit -> + XContentHelper.createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ).use { xcp -> + val childMonitor = ScheduledJob.parse(xcp, hit.id, hit.version) as Monitor + childMonitors.add(childMonitor) + } + } + executeMonitors(childMonitors) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } else { + executeMonitor(monitor) + } } } } @@ -144,7 +255,16 @@ class TransportExecuteMonitorAction @Inject constructor( indexTimeout ) log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") - executeMonitor(monitor) + val shardInfoMap = getShards((monitor.inputs[0] as DocLevelMonitorInput).indices) + val indexShardPairs = mutableListOf() + shardInfoMap.forEach { + val index = it.key + val shards = it.value + shards.forEach { shard -> + indexShardPairs.add("$index:$shard") + } + } + executeMonitor(monitor.copy(isChild = true, shards = indexShardPairs)) } } catch (t: Exception) { actionListener.onFailure(AlertingException.wrap(t)) @@ -155,4 +275,22 @@ class TransportExecuteMonitorAction @Inject constructor( } } } + + private suspend fun getShards(indices: List): Map> { + return indices.associateWith { index -> + val request = IndicesStatsRequest().indices(index).clear() + val response: IndicesStatsResponse = + client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) } + if (response.status != RestStatus.OK) { + val errorMessage = "Failed fetching index stats for index:$index" + throw AlertingException( + errorMessage, + RestStatus.INTERNAL_SERVER_ERROR, + IllegalStateException(errorMessage) + ) + } + val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() } + shards.map { it.shardRouting.id.toString() } + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 02d4779a5..495e71488 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -50,7 +50,6 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.isADMonitor -import org.opensearch.alerting.util.use import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -61,6 +60,8 @@ import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest +import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.model.DocLevelMonitorInput @@ -77,9 +78,6 @@ import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent import org.opensearch.index.query.QueryBuilders -import org.opensearch.index.reindex.BulkByScrollResponse -import org.opensearch.index.reindex.DeleteByQueryAction -import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task @@ -498,10 +496,10 @@ class TransportIndexMonitorAction @Inject constructor( ToXContent.MapParams(mapOf("with_type" to "true")) )}" ) - + var indexResponse: IndexResponse? = null try { - val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } - val failureReasons = checkShardsFailure(indexResponse) + indexResponse = client.suspendUntil { client.index(indexRequest, it) } + val failureReasons = checkShardsFailure(indexResponse!!) if (failureReasons != null) { log.info(failureReasons.toString()) actionListener.onFailure( @@ -509,35 +507,9 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - -/* var metadata: MonitorMetadata? - try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener - request.monitor = request.monitor.copy(id = indexResponse.id) - var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor) - if (created == false) { - log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") - } - metadata = monitorMetadata - } catch (t: Exception) { - log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor") - cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) - throw t - } - try { - if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) - } - // When inserting queries in queryIndex we could update sourceToQueryIndexMapping - MonitorMetadataService.upsertMetadata(metadata, updating = true) - } catch (t: Exception) { - log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t) - cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) - throw t - }*/ if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { val monitorShardAssignments = distributeShards(client, request.monitor.inputs[0] as DocLevelMonitorInput) - val childMonitorIds = createChildMonitors(client, monitorShardAssignments, request.monitor.copy(id = indexResponse.id)) - log.info("") + indexChildMonitors(client, monitorShardAssignments, request.monitor.copy(id = indexResponse.id)) } actionListener.onResponse( IndexMonitorResponse( @@ -546,6 +518,7 @@ class TransportIndexMonitorAction @Inject constructor( ) ) } catch (t: Exception) { + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse!!) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -682,20 +655,10 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - var updatedMetadata: MonitorMetadata - val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) - // Recreate runContext if metadata exists - // Delete and insert all queries from/to queryIndex - if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) - client.suspendUntil { - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(currentMonitor.dataSources.queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) - .execute(it) - } - indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) - MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) + if (currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + deleteChildDocLevelMonitors(client, request.monitorId, request.refreshPolicy) + val monitorShardAssignments = distributeShards(client, request.monitor.inputs[0] as DocLevelMonitorInput) + indexChildMonitors(client, monitorShardAssignments, request.monitor.copy(id = indexResponse.id)) } actionListener.onResponse( IndexMonitorResponse( @@ -708,7 +671,7 @@ class TransportIndexMonitorAction @Inject constructor( } } - private suspend fun createChildMonitors( + private suspend fun indexChildMonitors( client: Client, monitorShardAssignments: Map>>, parentMonitor: Monitor @@ -719,7 +682,7 @@ class TransportIndexMonitorAction @Inject constructor( val id = monitorShardAssignment.key val shards = monitorShardAssignment.value.map { indexShardPair -> "${indexShardPair.first}:${indexShardPair.second}" } var monitor = Monitor( - name = parentMonitor.name + "-" + id, + name = parentMonitor.name, monitorType = parentMonitor.monitorType, enabled = parentMonitor.enabled, inputs = parentMonitor.inputs, @@ -731,7 +694,8 @@ class TransportIndexMonitorAction @Inject constructor( uiMetadata = mapOf(), isChild = true, shards = shards, - owner = parentMonitor.id + owner = parentMonitor.id, + dataSources = parentMonitor.dataSources ) val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) @@ -786,8 +750,10 @@ class TransportIndexMonitorAction @Inject constructor( val shardInfoMap = getShards(client, indices) val totalShards = shardInfoMap.map { it.value.size }.sum() - val shardsPerNode = totalShards / maxShardsPerDocLevelMonitor - var shardsRemaining = totalShards % maxShardsPerDocLevelMonitor + var noOfDocLevelMonitors = totalShards / maxShardsPerDocLevelMonitor + if (totalShards % maxShardsPerDocLevelMonitor != 0) { + ++noOfDocLevelMonitors + } val shardInfoList = mutableListOf>() shardInfoMap.forEach { @@ -802,14 +768,12 @@ class TransportIndexMonitorAction @Inject constructor( val monitorShardAssignments = mutableMapOf>>() var idx = 0 - for (id in 1..maxShardsPerDocLevelMonitor) { + for (id in 1..noOfDocLevelMonitors) { val monitorShardAssignment = mutableSetOf>() - for (i in 1..shardsPerNode) { - monitorShardAssignment.add(shardInfoList[idx++]) - } - if (shardsRemaining > 0) { - monitorShardAssignment.add(shardInfoList[idx++]) - shardsRemaining -= 1 + for (i in 1..maxShardsPerDocLevelMonitor) { + if (idx < shardInfoList.size) { + monitorShardAssignment.add(shardInfoList[idx++]) + } } monitorShardAssignments[id.toString()] = monitorShardAssignment } @@ -817,7 +781,13 @@ class TransportIndexMonitorAction @Inject constructor( } private suspend fun getShards(client: Client, indices: List): Map> { - return indices.associateWith { index -> + return indices.associateWith { + var index: String? = it + if (IndexUtils.isAlias(index!!, clusterService.state()) || + IndexUtils.isDataStream(index, clusterService.state()) + ) { + index = IndexUtils.getWriteIndex(index, clusterService.state()) + } val request = IndicesStatsRequest().indices(index).clear() val response: IndicesStatsResponse = client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) } @@ -834,6 +804,26 @@ class TransportIndexMonitorAction @Inject constructor( } } + private suspend fun deleteChildDocLevelMonitors(client: Client, monitorId: String, refreshPolicy: RefreshPolicy) { + val request: SearchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source( + SearchSourceBuilder() + .query(QueryBuilders.matchQuery("monitor.owner", monitorId)) + .size(10000) + ) + val response = client.suspendUntil { client.search(request, it) } + response.hits.forEach { childMonitor -> + val deleteMonitorResponse = client.suspendUntil { + client.execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, + DeleteMonitorRequest(childMonitor.id, refreshPolicy), + it + ) + } + } + } + private fun checkShardsFailure(response: IndexResponse): String? { val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index d2ecc0907..0cb8852b8 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -49,6 +49,9 @@ }, "fields": { "type": "text" + }, + "query_field_names": { + "type": "text" } } }, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 352ce0a53..f7756f89b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1100,7 +1100,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { Collectors.joining(",", "\"", "\"") ) + "]," if (componentTemplateName == null) { - body += "\"template\": {\"mappings\": {$mappings}}," + body += "\"template\": {\"settings\": { \"number_of_shards\": 7 }, \"mappings\": {$mappings}}," } if (componentTemplateName != null) { body += "\"composed_of\": [\"$componentTemplateName\"]," diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 2696c92ef..4f35491f9 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -151,7 +151,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } fun `test execute monitor generates alerts and findings in a distributed way`() { - val testIndex = createTestIndex(settings = Settings.builder().put("number_of_shards", "6").build()) + val testIndex = createTestIndex(settings = Settings.builder().put("number_of_shards", "7").build()) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ "message" : "This is an error from IAD region", @@ -180,7 +180,29 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(testIndex, "6", testDoc) indexDoc(testIndex, "7", testDoc) - Thread.sleep(30000) + Thread.sleep(90000) + + /* updateMonitor(monitor, true) + + indexDoc(testIndex, "11", testDoc) + indexDoc(testIndex, "12", testDoc) + indexDoc(testIndex, "14", testDoc) + indexDoc(testIndex, "15", testDoc) + indexDoc(testIndex, "16", testDoc) + indexDoc(testIndex, "17", testDoc) + + Thread.sleep(30000) + + indexDoc(testIndex, "21", testDoc) + indexDoc(testIndex, "22", testDoc) + indexDoc(testIndex, "24", testDoc) + indexDoc(testIndex, "25", testDoc) + indexDoc(testIndex, "26", testDoc) + indexDoc(testIndex, "27", testDoc) + + val response = executeMonitor(monitor.id) + + deleteMonitor(monitor, true)*/ val alerts = searchAlertsWithFilter(monitor) assertEquals("Alert saved for test monitor", 2, alerts.size) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 055da96e5..e73c95d8c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -200,8 +200,7 @@ fun randomDocumentLevelMonitor( return Monitor( name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, - uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), isChild = true, - shards = listOf("0", "1") + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) } diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 027118c60..1f068ff89 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -303,8 +303,8 @@ "shards": { "type": "text" }, - "node": { - "type": "keyword" + "child_monitors": { + "type": "text" } } },