Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] optimize execution of workflow consisting of bucket-level followed by doc-level monitors #1752

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -486,7 +487,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) {
workflowRunContext.findingIds
} else {
listOf()
}

val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
Expand Down Expand Up @@ -226,6 +231,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
findingIdsForMatchingDocIds
)

val shards = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class WorkflowService(
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String):
Pair<Map<String, List<String>>, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
return Pair(emptyMap(), listOf())
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
if (existsResponse.isExists == false) return Pair(emptyMap(), listOf())
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkflowService(
for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}
return indexToRelatedDocIdsMap
return Pair(indexToRelatedDocIdsMap, findings.map { it.id })
} catch (t: Exception) {
log.error("Error getting finding doc ids: ${t.message}", t)
throw AlertingException.wrap(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,37 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
/**
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
* generates a single alert with multiple findings.
*/
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert(
monitorResult,
it as DocumentLevelTrigger,
monitor,
queryToDocIds,
dryrun,
executionId,
workflowRunContext
)
}
}
}

Expand Down Expand Up @@ -348,6 +366,58 @@ class TransportDocLevelMonitorFanOutAction
}
}

/**
* run doc-level triggers ignoring findings and alerts and generating a single alert.
*/
private suspend fun runForEachDocTriggerCreateSingleGroupedAlert(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val findingIds = if (workflowRunContext?.findingIds != null) {
workflowRunContext.findingIds
} else {
listOf()
}
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
findingIds!!,
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
for (action in trigger.actions) {
this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun)
}

if (!dryrun && monitor.id != Monitor.NO_ID) {
val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap())
val actionExecutionResults = actionResults.values.map { actionRunResult ->
ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0)
}
val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults)

retryPolicy.let {
alertService.saveAlerts(
monitor.dataSources,
listOf(updatedAlert),
it,
routingId = monitor.id
)
}
}
}
return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error)
}

private suspend fun runForEachDocTrigger(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
Expand Down Expand Up @@ -511,7 +581,11 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
if (shouldCreateFinding and (
monitor.shouldCreateSingleAlertForFindings == null ||
monitor.shouldCreateSingleAlertForFindings == false
)
) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -523,13 +597,15 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, finding)
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
this.findingsToTriggeredQueries += findingsToTriggeredQueries

Expand Down Expand Up @@ -687,6 +763,7 @@ class TransportDocLevelMonitorFanOutAction
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitor,
indexExecutionCtx.concreteIndexName,
shard,
from,
Expand Down Expand Up @@ -869,6 +946,7 @@ class TransportDocLevelMonitorFanOutAction
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitor: Monitor,
index: String,
shard: String,
prevSeqNo: Long?,
Expand All @@ -882,8 +960,16 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
}
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam))
}

val request: SearchRequest = SearchRequest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
var lastErrorDelegateRun: Exception? = null

for (delegate in delegates) {
var indexToDocIds = mapOf<String, List<String>>()
var indexToDocIdsWithFindings: Pair<Map<String, List<String>>, List<String>>? = Pair(mapOf(), listOf())
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw AlertingException.wrap(
Expand All @@ -117,7 +117,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand All @@ -130,9 +130,10 @@ object CompositeWorkflowRunner : WorkflowRunner() {
workflowId = workflowMetadata.workflowId,
workflowMetadataId = workflowMetadata.id,
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
matchingDocIdsPerIndex = indexToDocIds,
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!.first,
auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true
else workflow.auditDelegateMonitorAlerts!!
else workflow.auditDelegateMonitorAlerts!!,
findingIds = indexToDocIdsWithFindings.second
)
try {
dataSources = delegateMonitor.dataSources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6235,4 +6235,120 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
)
)
}

fun `test execute workflow when bucket monitor is used in chained finding of ignored doc monitor`() {
val query = QueryBuilders.rangeQuery("test_strict_date_time")
.gt("{{period_end}}||-10d")
.lte("{{period_end}}")
.format("epoch_millis")
val compositeSources = listOf(
TermsValuesSourceBuilder("test_field_1").field("test_field_1")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg))
// Bucket level monitor will reduce the size of matched doc ids on those that belong
// to a bucket that contains more than 1 document after term grouping
val triggerScript = """
params.docCount > 1
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null,
)
)
val bucketCustomAlertsIndex = "custom_alerts_index"
val bucketCustomFindingsIndex = "custom_findings_index"
val bucketCustomFindingsIndexPattern = "custom_findings_index-1"

val bucketLevelMonitorResponse = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger),
dataSources = DataSources(
findingsEnabled = true,
alertsIndex = bucketCustomAlertsIndex,
findingsIndex = bucketCustomFindingsIndex,
findingsIndexPattern = bucketCustomFindingsIndexPattern
)
)
)!!

val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1", fields = listOf())
val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2", fields = listOf())
val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3))
val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val docCustomAlertsIndex = "custom_alerts_index"
val docCustomFindingsIndex = "custom_findings_index"
val docCustomFindingsIndexPattern = "custom_findings_index-1"
var docLevelMonitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(docTrigger),
dataSources = DataSources(
alertsIndex = docCustomAlertsIndex,
findingsIndex = docCustomFindingsIndex,
findingsIndexPattern = docCustomFindingsIndexPattern
),
ignoreFindingsAndAlerts = true
)

val docLevelMonitorResponse = createMonitor(docLevelMonitor)!!
// 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor)
var workflow = randomWorkflow(
monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id),
enabled = false,
auditDelegateMonitorAlerts = false
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
assertNotNull(workflowById)

// Creates 5 documents
insertSampleTimeSerializedData(
index,
listOf(
"test_value_1",
"test_value_1", // adding duplicate to verify aggregation
"test_value_2",
"test_value_2",
"test_value_3"
)
)

val workflowId = workflowResponse.id
// 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4)
// 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth)
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
assertNotNull(executeWorkflowResponse)

for (monitorRunResults in executeWorkflowResponse.workflowRunResult.monitorRunResults) {
if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) {
val searchResult = monitorRunResults.inputResults.results.first()

@Suppress("UNCHECKED_CAST")
val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")
?.get("buckets") as List<kotlin.collections.Map<String, Any>>
assertEquals("Incorrect search result", 3, buckets.size)

val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2, workflowId)
assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2)
assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4"))
} else {
assertEquals(1, monitorRunResults.inputResults.results.size)
val values = monitorRunResults.triggerResults.values
assertEquals(1, values.size)

val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 1, workflowId)
assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 1)
assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 0, 0, listOf("1", "2", "3", "4"))
}
}
}
}
Loading
Loading