diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 557c6db88..ccea577bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -306,8 +306,10 @@ class InputService( resolvedIndexes.add(indexMetadata.index.name) includePrevious = false // No need to include previous anymore } else if ( - includePrevious && i > 0 && sortedIndices[i - 1].creationDate < - resolveStartTimeOfQueryTimeRange.toEpochMilli() + includePrevious && ( + i == sortedIndices.lastIndex || + sortedIndices[i + 1].creationDate >= resolveStartTimeOfQueryTimeRange.toEpochMilli() + ) ) { // Include the index immediately before the timestamp resolvedIndexes.add(indexMetadata.index.name) @@ -315,6 +317,7 @@ class InputService( } } } else { + // add alias without optimizing for resolve indices resolvedIndexes.add(it) } } else { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index bea6531e5..4ae4cc923 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -1241,6 +1241,25 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } + protected fun insertSampleTimeSerializedDataWithTime( + index: String, + data: List, + time: ZonedDateTime? = ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS), + ) { + data.forEachIndexed { i, value -> + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(time) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + protected fun deleteDataWithDocIds(index: String, docIds: List) { docIds.forEach { deleteDoc(index, it) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 98c41e499..c017bf4a2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -1191,16 +1191,10 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, buckets.size) } - fun `test execute bucket-level monitor with alias optimization - indices not skipped`() { - val testIndex = createTestIndex() - insertSampleTimeSerializedDataCurrentTime( - testIndex, - listOf( - "test_value_3", - "test_value_4", // adding duplicate to verify aggregation - "test_value_5" - ) - ) + fun `test execute bucket-level monitor with alias optimization - indices not skipped from query`() { + val skipIndex = createTestIndex("to_skip_index") + val previousIndex = createTestIndex("to_include_index") + val indexMapping = """ "properties" : { "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, @@ -1218,7 +1212,24 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) - addIndexToAlias(testIndex, aliasName) + insertSampleTimeSerializedDataWithTime( + previousIndex, + listOf( + "test_value_3", + "test_value_4", + "test_value_5" + ) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ) + ) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10s") .lte("{{period_end}}") @@ -1251,18 +1262,30 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - assertEquals("Incorrect search result", 5, buckets.size) + Assert.assertEquals(buckets.size, 8) } fun `test execute bucket-level monitor with alias optimization - indices skipped from query`() { - val testIndex = createTestIndex() - insertSampleTimeSerializedDataCurrentTime( - testIndex, + val skipIndex = createTestIndex("to_skip_index") + Thread.sleep(10000) + val previousIndex = createTestIndex("to_include_index") + insertSampleTimeSerializedDataWithTime( + previousIndex, listOf( - "test_value_1", - "test_value_1", // adding duplicate to verify aggregation - "test_value_2" - ) + "test_value_3", + "test_value_4", + "test_value_5" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) + ) + insertSampleTimeSerializedDataWithTime( + skipIndex, + listOf( + "test_value_6", + "test_value_7", + "test_value_8" + ), + ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS).plusSeconds(10) ) Thread.sleep(10000) val indexMapping = """ @@ -1282,7 +1305,8 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { "test_value_2" ) ) - addIndexToAlias(testIndex, aliasName) + addIndexToAlias(previousIndex, aliasName) + addIndexToAlias(skipIndex, aliasName) val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10s") .lte("{{period_end}}") @@ -1315,7 +1339,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val searchResult = (output.objectMap("input_results")["results"] as List>).first() @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> - Assert.assertTrue(buckets.size <= 2) + Assert.assertTrue(buckets.size <= 5) } fun `test execute bucket-level monitor returns search result with multi term agg`() {