diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index def9eb8fe..3b8e4dee7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { // Resolve all passed indices to concrete indices - val concreteIndices = IndexUtils.resolveAllIndices( + val allConcreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) - if (concreteIndices.isEmpty()) { + if (allConcreteIndices.isEmpty()) { logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}") throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) } @@ -141,7 +141,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // cleanup old indices that are not monitored anymore from the same monitor val runContextKeys = updatedLastRunContext.keys.toMutableSet() for (ind in runContextKeys) { - if (!concreteIndices.contains(ind)) { + if (!allConcreteIndices.contains(ind)) { updatedLastRunContext.remove(ind) } } @@ -150,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex docLevelMonitorInput.indices.forEach { indexName -> - val concreteIndices = IndexUtils.resolveAllIndices( + var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + var lastWriteIndex: String? = null + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) + } + } val updatedIndexName = indexName.replace("*", "_") val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), @@ -179,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx, concreteIndexName ) as MutableMap - updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { + updatedLastRunContext.remove(lastWriteIndex) + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + } else { + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } val count: Int = indexLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index a94755af6..5c8886686 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings @@ -214,11 +215,19 @@ object MonitorMetadataService : val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf() try { if (index == null) return mutableMapOf() - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = client.suspendUntil { - client.admin().indices().getIndex(getIndexRequest, it) + + val indices = mutableListOf() + if (IndexUtils.isAlias(index, clusterService.state()) || + IndexUtils.isDataStream(index, clusterService.state()) + ) { + IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) } + } else { + val getIndexRequest = GetIndexRequest().indices(index) + val getIndexResponse: GetIndexResponse = client.suspendUntil { + client.admin().indices().getIndex(getIndexRequest, it) + } + indices.addAll(getIndexResponse.indices()) } - val indices = getIndexResponse.indices() indices.forEach { indexName -> if (!lastRunContext.containsKey(indexName)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 4d4f8088c..42237853f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // Run through each backing index and apply appropriate mappings to query index indices.forEach { indexName -> - val concreteIndices = IndexUtils.resolveAllIndices( + var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) + } + } val updatedIndexName = indexName.replace("*", "_") val updatedProperties = mutableMapOf() val allFlattenPaths = mutableSetOf>() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index ec3bd0031..6949ca58d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -12,6 +12,7 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.metadata.IndexAbstraction import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService @@ -152,5 +153,47 @@ class IndexUtils { return result } + + @JvmStatic + fun isDataStream(name: String, clusterState: ClusterState): Boolean { + return clusterState.metadata().dataStreams().containsKey(name) + } + + @JvmStatic + fun isAlias(name: String, clusterState: ClusterState): Boolean { + return clusterState.metadata().hasAlias(name) + } + + @JvmStatic + fun getWriteIndex(index: String, clusterState: ClusterState): String? { + if (isAlias(index, clusterState) || isDataStream(index, clusterState)) { + val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex + if (metadata != null) { + return metadata.index.name + } + } + return null + } + + @JvmStatic + fun getNewestIndicesByCreationDate(concreteIndices: List, clusterState: ClusterState, thresholdDate: Long): List { + val filteredIndices = mutableListOf() + val lookup = clusterState.metadata().indicesLookup + concreteIndices.forEach { indexName -> + val index = lookup[indexName] + val indexMetadata = clusterState.metadata.index(indexName) + if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) { + if (indexMetadata.creationDate >= thresholdDate) { + filteredIndices.add(indexName) + } + } + } + return filteredIndices + } + + @JvmStatic + fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long { + return clusterState.metadata.index(index).creationDate + } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index cb1a449bd..9c7a1d715 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL +import kotlin.collections.ArrayList +import kotlin.collections.HashMap /** * Superclass for tests that interact with an external test cluster using OpenSearch's RestClient @@ -893,7 +895,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response { val requestBody = StringEntity(doc, APPLICATION_JSON) val params = if (refresh) mapOf("refresh" to "true") else mapOf() - val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody) + val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody) assertTrue( "Unable to index doc: '${doc.take(15)}...' to index: '$index'", listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus()) @@ -963,7 +965,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "") + val indexName = createTestIndex(index = it, mapping = "") val isWriteIndex = indices.getOrDefault(indexName, false) indicesMap[indexName] = isWriteIndex val indexMap = mapOf( @@ -980,17 +982,119 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return mutableMapOf(alias to indicesMap) } + protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) { + val indexPattern = "$datastream*" + var componentTemplateMappings = "\"properties\": {" + + " \"netflow.destination_transport_port\":{ \"type\": \"long\" }," + + " \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" + + "}" + if (mappings != null) { + componentTemplateMappings = mappings + } + if (useComponentTemplate) { + // Setup index_template + createComponentTemplateWithMappings( + "my_ds_component_template-$datastream", + componentTemplateMappings + ) + } + createComposableIndexTemplate( + "my_index_template_ds-$datastream", + listOf(indexPattern), + (if (useComponentTemplate) "my_ds_component_template-$datastream" else null), + mappings, + true, + 0 + ) + createDataStream(datastream) + } + + protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) { + client().makeRequest("PUT", "_data_stream/$datastream") + } + + protected fun deleteDataStream(datastream: String) { + client().makeRequest("DELETE", "_data_stream/$datastream") + } + + protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) { + val body = """{"template" : { "mappings": {$mappings} }}""" + client().makeRequest( + "PUT", + "_component_template/$componentTemplateName", + emptyMap(), + StringEntity(body, ContentType.APPLICATION_JSON), + BasicHeader("Content-Type", "application/json") + ) + } + + protected fun createComposableIndexTemplate( + templateName: String, + indexPatterns: List, + componentTemplateName: String?, + mappings: String?, + isDataStream: Boolean, + priority: Int + ) { + var body = "{\n" + if (isDataStream) { + body += "\"data_stream\": { }," + } + body += "\"index_patterns\": [" + + indexPatterns.stream().collect( + Collectors.joining(",", "\"", "\"") + ) + "]," + if (componentTemplateName == null) { + body += "\"template\": {\"mappings\": {$mappings}}," + } + if (componentTemplateName != null) { + body += "\"composed_of\": [\"$componentTemplateName\"]," + } + body += "\"priority\":$priority}" + client().makeRequest( + "PUT", + "_index_template/$templateName", + emptyMap(), + StringEntity(body, APPLICATION_JSON), + BasicHeader("Content-Type", "application/json") + ) + } + + protected fun getDatastreamWriteIndex(datastream: String): String { + val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null) + var respAsMap = responseAsMap(response) + if (respAsMap.containsKey("data_streams")) { + respAsMap = (respAsMap["data_streams"] as ArrayList>)[0] + val indices = respAsMap["indices"] as List> + val index = indices.last() + return index["index_name"] as String + } else { + respAsMap = respAsMap[datastream] as Map + } + val indices = respAsMap["indices"] as Array + return indices.last() + } + + protected fun rolloverDatastream(datastream: String) { + client().makeRequest( + "POST", + datastream + "/_rollover", + emptyMap(), + null + ) + } + protected fun randomAliasIndices( alias: String, num: Int = randomIntBetween(1, 10), includeWriteIndex: Boolean = true, ): Map { val indices = mutableMapOf() - val writeIndex = randomIntBetween(0, num) + val writeIndex = randomIntBetween(0, num - 1) for (i: Int in 0 until num) { - var indexName = randomAlphaOfLength(10) + var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT) while (indexName.equals(alias) || indices.containsKey(indexName)) - indexName = randomAlphaOfLength(10) + indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT) indices[indexName] = includeWriteIndex && i == writeIndex } return indices diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index f71a1a31c..921a4b8c1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -1229,6 +1229,121 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } } + fun `test document-level monitor when datastreams contain docs that do match query`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteDataStream(dataStreamName) + } + + fun `test document-level monitor when datastreams contain docs across read-only indices that do match query`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + indexDoc(dataStreamName, "2", testDoc) + rolloverDatastream(dataStreamName) + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "4", testDoc) + rolloverDatastream(dataStreamName) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + + indexDoc(dataStreamName, "5", testDoc) + indexDoc(dataStreamName, "6", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + deleteDataStream(dataStreamName) + } + fun `test execute monitor with non-null data sources`() { val testIndex = createTestIndex() @@ -1311,7 +1426,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { deleteIndex(index1) deleteIndex(index2) - indexDoc(index4, "1", testDoc) + indexDoc(index4, "2", testDoc) response = executeMonitor(monitor.id) output = entityAsMap(response)