diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 16272fd99..903f2ecc7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -239,7 +239,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() { shards.remove("index") shards.remove("shards_count") - val nodeMap = getNodes(monitorCtx) + /** + * if fanout flag is disabled and force assign all shards to local node + * thus effectively making the fan-out a single node operation. + * This is done to avoid de-dupe Alerts generated by Aggregation Sigma Rules + **/ + val localNode = monitorCtx.clusterService!!.localNode() + val nodeMap: Map = if (docLevelMonitorInput?.fanoutEnabled == true) { + getNodes(monitorCtx) + } else { + logger.info("Fan-out is disabled for chained findings monitor ${monitor.id}") + mapOf(localNode.id to localNode) + } + val nodeShardAssignments = distributeShards( monitorCtx, nodeMap.keys.toList(), diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 769c20ead..0c32ecf0b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2750,6 +2750,68 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { deleteDataStream(aliasName) } + fun `test document-level monitor fanout disabled approach when aliases contain indices with multiple shards`() { + val aliasName = "test-alias" + createIndexAlias( + aliasName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + "\"index.number_of_shards\": 7" + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(aliasName), listOf(docQuery), false) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(aliasName, "1", testDoc) + indexDoc(aliasName, "2", testDoc) + indexDoc(aliasName, "4", testDoc) + indexDoc(aliasName, "5", testDoc) + indexDoc(aliasName, "6", testDoc) + indexDoc(aliasName, "7", testDoc) + OpenSearchTestCase.waitUntil( + { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, + 2, + TimeUnit.MINUTES + ) + + rolloverDatastream(aliasName) + indexDoc(aliasName, "11", testDoc) + indexDoc(aliasName, "12", testDoc) + indexDoc(aliasName, "14", testDoc) + indexDoc(aliasName, "15", testDoc) + indexDoc(aliasName, "16", testDoc) + indexDoc(aliasName, "17", testDoc) + OpenSearchTestCase.waitUntil( + { searchFindings(monitor).size == 6 && searchAlertsWithFilter(monitor).size == 1 }, + 2, + TimeUnit.MINUTES + ) + + deleteDataStream(aliasName) + } + fun `test execute monitor generates alerts and findings with renewable locks`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 085a8db80..91c97a636 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -136,7 +136,7 @@ public void onFailure(Exception e) { }; } else if (runMonitorParam.equals("multiple")) { SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello", - new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())))); + new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())), true)); BytesStreamOutput out1 = new BytesStreamOutput(); input2.writeTo(out1); BytesReference input1Serialized1 = out1.bytes(); @@ -220,7 +220,7 @@ public void onFailure(Exception e) { sampleRemoteDocLevelMonitorInput.writeTo(out2); BytesReference sampleRemoteDocLevelMonitorInputSerialized = out2.bytes(); - DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList()); + DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput("description", index, emptyList(), true); RemoteDocLevelMonitorInput remoteDocLevelMonitorInput = new RemoteDocLevelMonitorInput(sampleRemoteDocLevelMonitorInputSerialized, docLevelMonitorInput); Monitor remoteDocLevelMonitor = new Monitor(