Skip to content

Commit

Permalink
Added logic to make mGet calls in batches.
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Mar 14, 2024
1 parent 235056c commit e61e602
Showing 1 changed file with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()

// Maps a finding ID to the concrete index name.
val findingIdToConcreteIndex = mutableMapOf<String, String>()

// Maps the docId to the doc source
val docIdToDocMap = mutableMapOf<String, MutableList<MultiGetItemResponse>>()
// Maps a finding ID to the related document.
private val findingIdToDocSource = mutableMapOf<String, MultiGetItemResponse>()

override suspend fun runMonitor(
monitor: Monitor,
Expand Down Expand Up @@ -489,10 +486,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
alerts.add(alert)

val docId = alert.relatedDocIds.first().split("|").first()
val docSource = docIdToDocMap[docId]?.find { item ->
findingIdToConcreteIndex[alert.findingIds.first()] == item.index
}?.response?.convertToMap()
val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap()

alertContexts.add(
AlertContext(
Expand Down Expand Up @@ -591,7 +585,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)
findingsToTriggeredQueries[finding.id] = triggeredQueries
findingIdToConcreteIndex[finding.id] = finding.index

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
Expand Down Expand Up @@ -1104,21 +1097,25 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
) {
val docFieldTags = parseSampleDocTags(monitor.triggers)
val request = MultiGetRequest()
findingToDocPairs.forEach { (_, docIdAndIndex) ->
val docIdAndIndexSplit = docIdAndIndex.split("|")
val docId = docIdAndIndexSplit[0]
val concreteIndex = docIdAndIndexSplit[1]
if (docId.isNotEmpty() && concreteIndex.isNotEmpty()) {
val docItem = MultiGetRequest.Item(concreteIndex, docId)
if (docFieldTags.isNotEmpty())
docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
request.add(docItem)

// Perform mGet request in batches.
findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
batch.forEach { (findingId, docIdAndIndex) ->
val docIdAndIndexSplit = docIdAndIndex.split("|")
val docId = docIdAndIndexSplit[0]
val concreteIndex = docIdAndIndexSplit[1]
if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) {
val docItem = MultiGetRequest.Item(concreteIndex, docId)
if (docFieldTags.isNotEmpty())
docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
request.add(docItem)
}
val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) }
response.responses.forEach { item ->
findingIdToDocSource[findingId] = item
}
}
}
val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) }
response.responses.forEach { item ->
docIdToDocMap.getOrPut(item.id) { mutableListOf() }.add(item)
}
}

/**
Expand Down

0 comments on commit e61e602

Please sign in to comment.