diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt index c5ad421df..aa0d186e6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteDocumentLevelMonitorRunner.kt @@ -12,6 +12,7 @@ import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.MonitorRunner import org.opensearch.alerting.MonitorRunnerExecutionContext import org.opensearch.alerting.util.IndexUtils +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.node.DiscoveryNode import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService @@ -27,6 +28,7 @@ import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonit import org.opensearch.commons.alerting.util.AlertingException import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus +import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.transport.TransportService import java.io.IOException import java.time.Instant @@ -64,12 +66,28 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { logger.info(monitorMetadata.lastRunContext.toMutableMap().toString()) val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> + val updatedLastRunContext = lastRunContext.toMutableMap() val remoteDocLevelMonitorInput = monitor.inputs[0] as RemoteDocLevelMonitorInput val docLevelMonitorInput = remoteDocLevelMonitorInput.docLevelMonitorInput var shards: Set = mutableSetOf() var concreteIndices = listOf() + // Resolve all passed indices to concrete indices + val allConcreteIndices = IndexUtils.resolveAllIndices( + docLevelMonitorInput.indices, + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + // cleanup old indices that are not monitored anymore from the same monitor + val runContextKeys = updatedLastRunContext.keys.toMutableSet() + for (ind in runContextKeys) { + if (!allConcreteIndices.contains(ind)) { + updatedLastRunContext.remove(ind) + lastRunContext.remove(ind) + } + } + try { docLevelMonitorInput.indices.forEach { indexName -> concreteIndices = IndexUtils.resolveAllIndices( @@ -93,6 +111,39 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { } } + concreteIndices.forEach { concreteIndexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + ) + MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently) + } + + val indexUpdatedRunContext = initializeNewLastRunContext( + indexLastRunContext.toMutableMap(), + monitorCtx, + concreteIndexName + ) as MutableMap + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + if (concreteIndexName == IndexUtils.getWriteIndex( + indexName, + monitorCtx.clusterService!!.state() + ) + ) { + updatedLastRunContext.remove(lastWriteIndex) + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + } else { + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + } + concreteIndices.forEach { val shardCount = getShardsCount(monitorCtx.clusterService!!, it) for (i in 0 until shardCount) { @@ -111,7 +162,7 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { val docLevelMonitorFanOutResponses = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.doFanOut( monitorCtx.clusterService!!, monitor, - monitorMetadata, + monitorMetadata.copy(lastRunContext = lastRunContext), executionId, concreteIndices, workflowRunContext, @@ -120,12 +171,12 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { nodeMap, nodeShardAssignments ) - updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, lastRunContext) + updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext) val triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses) val inputRunResults = buildInputRunResults(docLevelMonitorFanOutResponses) if (!isTempMonitor) { MonitorMetadataService.upsertMetadata( - monitorMetadata.copy(lastRunContext = lastRunContext), + monitorMetadata.copy(lastRunContext = updatedLastRunContext), true ) } @@ -216,17 +267,17 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { // fanOutResponse.lastRunContexts //updatedContexts for relevant shards val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap - if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) { - fanOutResponse.lastRunContexts.keys.forEach { + if (fanOutResponse.lastRunContexts.contains(indexName)) { + (fanOutResponse.lastRunContexts[indexName] as Map).forEach { - val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull() + val seq_no = it.value.toString().toIntOrNull() if ( - it != "shards_count" && - it != "index" && + it.key != "shards_count" && + it.key != "index" && seq_no != null && seq_no >= 0 ) { - indexLastRunContext[it] = seq_no + indexLastRunContext[it.key] = seq_no } } } @@ -309,4 +360,29 @@ class RemoteDocumentLevelMonitorRunner : MonitorRunner() { } return InputRunResults(listOf(inputRunResults), if (!errors.isEmpty()) AlertingException.merge(*errors.toTypedArray()) else null) } + + private fun createdRecently( + monitor: Monitor, + periodStart: Instant, + periodEnd: Instant, + indexMetadata: IndexMetadata + ): Boolean { + val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart + val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L + return indexCreationDate > lastExecutionTime.toEpochMilli() + } + + private fun initializeNewLastRunContext( + lastRunContext: Map, + monitorCtx: MonitorRunnerExecutionContext, + index: String, + ): Map { + val count: Int = getShardsCount(monitorCtx.clusterService!!, index) + val updatedLastRunContext = lastRunContext.toMutableMap() + for (i: Int in 0 until count) { + val shard = i.toString() + updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO + } + return updatedLastRunContext + } } diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index ac9d9f29e..0340baa6b 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -208,13 +208,15 @@ public void onFailure(Exception e) { ); }; } else { + String indices = restRequest.param("index", "index"); + List index = List.of(indices.split(",")); SampleRemoteDocLevelMonitorInput sampleRemoteDocLevelMonitorInput = new SampleRemoteDocLevelMonitorInput("hello", Map.of("world", 1), 2); BytesStreamOutput out2 = new BytesStreamOutput(); sampleRemoteDocLevelMonitorInput.writeTo(out2); BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes(); - DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", List.of("index"), emptyList()); + DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList()); RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput); Monitor remoteDocLevelMonitor = new Monitor( diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java index 57f3496aa..1be9255fe 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/fanouts/TransportRemoteDocLevelMonitorFanOutAction.java @@ -83,7 +83,12 @@ protected void doExecute(Task task, DocLevelMonitorFanOutRequest request, Action SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin); - ((Map) lastRunContext.get(index)).put("0", 0); + if (lastRunContext.containsKey(index)) { + ((Map) lastRunContext.get(index)).put("2", 0); + } + if (docLevelMonitorInput.getIndices().size() > 1 && lastRunContext.containsKey(docLevelMonitorInput.getIndices().get(1))) { + ((Map) lastRunContext.get(docLevelMonitorInput.getIndices().get(1))).put("4", 0); + } IndexRequest indexRequest = new IndexRequest(SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX) .source(Map.of(sampleRemoteDocLevelMonitorInput.getA(), remoteMonitorTrigger.getA())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); this.client.index(indexRequest, new ActionListener<>() { diff --git a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java index a8c34935c..9d14505c6 100644 --- a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java +++ b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java @@ -37,6 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -167,7 +168,115 @@ public void testSampleRemoteDocLevelMonitor() throws IOException, InterruptedExc LoggingDeprecationHandler.INSTANCE, searchResponse.getEntity().getContent() ).map(); - found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 && + found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello")); + return found.get(); + } catch (IOException ex) { + return false; + } + }, 10, TimeUnit.SECONDS); + Assert.assertTrue(found.get()); + } + + @SuppressWarnings("unchecked") + public void testSampleRemoteDocLevelMonitorWithDynamicMetadataUpdate() throws IOException, InterruptedException { + createIndex("index1", Settings.builder().put("number_of_shards", "7").build()); + Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor", + Map.of("run_monitor", "doc_level", "index", "index1"), null); + Assert.assertEquals("Unable to create remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + Map responseJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getEntity().getContent() + ).map(); + String monitorId = responseJson.get("_id").toString(); + + response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null); + Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + createIndex("index2", Settings.builder().put("number_of_shards", "7").build()); + String updatedMonitor = String.format(Locale.ROOT, "{\"type\":\"monitor\",\"name\":\"remote_doc_level_monitor\",\"monitor_type\":\"remote_doc_level_monitor\"," + + "\"user\":{\"name\":\"\",\"backend_roles\":[],\"roles\":[],\"custom_attribute_names\":[],\"user_requested_tenant\":null},\"enabled\":true,\"schedule\":{\"period\":{\"interval\":5,\"unit\":\"MINUTES\"}}," + + "\"inputs\":[{\"remote_doc_level_monitor_input\":{\"size\":24,\"input\":\"BWhlbGxvCgEFd29ybGQBAAAAAQAAAAIA\",\"doc_level_input\":" + + "{\"doc_level_input\":{\"description\":\"description\",\"indices\":[%s],\"queries\":[]}}}}],\"" + + "triggers\":[{\"remote_monitor_trigger\":{\"id\":\"id\",\"name\":\"name\",\"severity\":\"1\"," + + "\"actions\":[{\"id\":\"id\",\"name\":\"name\",\"destination_id\":\"destinationId\",\"message_template\":{\"source\":\"Hello World\"," + + "\"lang\":\"mustache\"},\"throttle_enabled\":false,\"subject_template\":{\"source\":\"Hello World\",\"lang\":\"mustache\"}," + + "\"throttle\":{\"value\":60,\"unit\":\"MINUTES\"}}],\"size\":24,\"trigger\":\"BWhlbGxvCgEEdGVzdAM/gAAAAAAAAQAA\"}}]," + + "\"owner\":\"sample-remote-monitor-plugin\"}", "\"index1\", \"index2\""); + makeRequest(client(), "PUT", "/_plugins/_alerting/monitors/" + monitorId, Map.of(), new StringEntity(updatedMonitor, ContentType.APPLICATION_JSON)); + + response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null); + Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + makeRequest(client(), "DELETE", "/index1", Map.of(), null); + + response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null); + Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + AtomicBoolean found = new AtomicBoolean(false); + OpenSearchRestTestCase.waitUntil( + () -> { + try { + Response searchResponse = makeRequest(client(), "POST", SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX + "/_search", Map.of(), + new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON)); + Map searchResponseJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + searchResponse.getEntity().getContent() + ).map(); + found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello")); + return found.get(); + } catch (IOException ex) { + return false; + } + }, 10, TimeUnit.SECONDS); + Assert.assertTrue(found.get()); + } + + @SuppressWarnings("unchecked") + public void testSampleRemoteDocLevelMonitorWithAlias() throws IOException, InterruptedException { + String indexAlias = "test_alias"; + createIndex("index-000001", Settings.EMPTY); + makeRequest(client(), "POST", "_aliases", Map.of(), + new StringEntity(String.format(Locale.ROOT, "{\"actions\":[{\"add\":{\"index\":\"index-000001\",\"alias\":\"%s\",\"is_write_index\":true}}]}", indexAlias), ContentType.APPLICATION_JSON)); + Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor", Map.of("run_monitor", "doc_level", "index", indexAlias), null); + Assert.assertEquals("Unable to create remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + Map responseJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getEntity().getContent() + ).map(); + String monitorId = responseJson.get("_id").toString(); + + response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null); + Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + makeRequest(client(), "POST", String.format(Locale.ROOT, "%s/_rollover", indexAlias), Map.of(), + new StringEntity("", ContentType.APPLICATION_JSON)); + + response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null); + Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + response = makeRequest(client(), "POST", "/_plugins/_alerting/monitors/" + monitorId + "/_execute", Map.of(), null); + Assert.assertEquals("Unable to execute remote monitor", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + AtomicBoolean found = new AtomicBoolean(false); + OpenSearchRestTestCase.waitUntil( + () -> { + try { + Response searchResponse = makeRequest(client(), "POST", SampleRemoteDocLevelMonitorRunner.SAMPLE_REMOTE_DOC_LEVEL_MONITOR_RUNNER_INDEX + "/_search", Map.of(), + new StringEntity("{\"query\":{\"match_all\":{}}}", ContentType.APPLICATION_JSON)); + Map searchResponseJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + searchResponse.getEntity().getContent() + ).map(); + found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) > 0 && ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") && ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("hello")); return found.get();