diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index d32f4789e..7262b9260 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -90,11 +90,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * Docs are fetched from the source index per shard and transformed.*/ val transformedDocs = mutableListOf>() - // Maps a finding ID to the concrete index name. - val findingIdToConcreteIndex = mutableMapOf() - - // Maps the docId to the doc source - val docIdToDocMap = mutableMapOf>() + // Maps a finding ID to the related document. + private val findingIdToDocSource = mutableMapOf() override suspend fun runMonitor( monitor: Monitor, @@ -489,10 +486,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) alerts.add(alert) - val docId = alert.relatedDocIds.first().split("|").first() - val docSource = docIdToDocMap[docId]?.find { item -> - findingIdToConcreteIndex[alert.findingIds.first()] == item.index - }?.response?.convertToMap() + val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap() alertContexts.add( AlertContext( @@ -591,7 +585,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { findingDocPairs.add(Pair(finding.id, it.key)) findings.add(finding) findingsToTriggeredQueries[finding.id] = triggeredQueries - findingIdToConcreteIndex[finding.id] = finding.index val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) @@ -1104,21 +1097,25 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) { val docFieldTags = parseSampleDocTags(monitor.triggers) val request = MultiGetRequest() - findingToDocPairs.forEach { (_, docIdAndIndex) -> - val docIdAndIndexSplit = docIdAndIndex.split("|") - val docId = docIdAndIndexSplit[0] - val concreteIndex = docIdAndIndexSplit[1] - if (docId.isNotEmpty() && concreteIndex.isNotEmpty()) { - val docItem = MultiGetRequest.Item(concreteIndex, docId) - if (docFieldTags.isNotEmpty()) - docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) - request.add(docItem) + + // Perform mGet request in batches. + findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> + batch.forEach { (findingId, docIdAndIndex) -> + val docIdAndIndexSplit = docIdAndIndex.split("|") + val docId = docIdAndIndexSplit[0] + val concreteIndex = docIdAndIndexSplit[1] + if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) { + val docItem = MultiGetRequest.Item(concreteIndex, docId) + if (docFieldTags.isNotEmpty()) + docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray())) + request.add(docItem) + } + val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) } + response.responses.forEach { item -> + findingIdToDocSource[findingId] = item + } } } - val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) } - response.responses.forEach { item -> - docIdToDocMap.getOrPut(item.id) { mutableListOf() }.add(item) - } } /**