Skip to content

Commit

Permalink
update code-review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Nov 26, 2024
1 parent 14f9371 commit 3495cf8
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first
val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second

val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
Expand Down Expand Up @@ -226,6 +227,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
findingIdsForMatchingDocIds
)

val shards = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class InputService(
periodStart = periodStart,
periodEnd = periodEnd,
prevResult = prevResult,
matchingDocIdsPerIndex = matchingDocIdsPerIndex,
matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first,
returnSampleDocs = false
)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class WorkflowService(
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String):
Pair<Map<String, List<String>>, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
return Pair(emptyMap(), listOf())
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
if (existsResponse.isExists == false) return Pair(emptyMap(), listOf())
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkflowService(
for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}
return indexToRelatedDocIdsMap
return Pair(indexToRelatedDocIdsMap, findings.map { it.id })
} catch (t: Exception) {
log.error("Error getting finding doc ids: ${t.message}", t)
throw AlertingException.wrap(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,11 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
/**
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
* generates a single alert with multiple findings.
*/
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
Expand All @@ -312,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction
workflowRunContext = workflowRunContext
)
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts(
triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts(
monitorResult,
it as DocumentLevelTrigger,
monitor,
Expand Down Expand Up @@ -363,7 +367,10 @@ class TransportDocLevelMonitorFanOutAction
}
}

private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts(
/**
* run doc-level triggers ignoring findings and alerts and generating a single alert.
*/
private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
Expand All @@ -374,9 +381,14 @@ class TransportDocLevelMonitorFanOutAction
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) {
workflowRunContext.matchingDocIdsPerIndex.second
} else {
listOf()
}
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
listOf(),
findingIds,
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
Expand Down Expand Up @@ -570,7 +582,7 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) {
if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -582,7 +594,7 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
Expand Down Expand Up @@ -945,11 +957,11 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
var lastErrorDelegateRun: Exception? = null

for (delegate in delegates) {
var indexToDocIds = mapOf<String, List<String>>()
var indexToDocIdsWithFindings: Pair<Map<String, List<String>>, List<String>>? = Pair(mapOf(), listOf())
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw AlertingException.wrap(
Expand All @@ -118,7 +118,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand All @@ -131,7 +131,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
workflowId = workflowMetadata.workflowId,
workflowMetadataId = workflowMetadata.id,
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
matchingDocIdsPerIndex = indexToDocIds,
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!,
auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true
else workflow.auditDelegateMonitorAlerts!!
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
Map.of(),
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest(
Expand Down Expand Up @@ -156,6 +157,7 @@ public void onFailure(Exception e) {
Map.of(),
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest(
Expand Down Expand Up @@ -240,6 +242,7 @@ public void onFailure(Exception e) {
Map.of(),
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(
Expand Down

0 comments on commit 3495cf8

Please sign in to comment.