From 17dc3c213f018dd6633903546a41c68b3326056b Mon Sep 17 00:00:00 2001 From: David An Date: Tue, 10 Dec 2024 15:36:54 -0500 Subject: [PATCH] send failed notification if cost cap aborted some workflows (#3152) --- .../rawls/jobexec/SubmissionMonitorActor.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala index 98831edc8d..a55073434e 100644 --- a/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala +++ b/core/src/main/scala/org/broadinstitute/dsde/rawls/jobexec/SubmissionMonitorActor.scala @@ -316,6 +316,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum for { costBreakdown <- executionServiceCluster.getCost(workflowRec, petUser) updatedWorkflowRec <- + // TODO CORE-217: if workflow is already in Aborted or Aborting, don't try to re-abort it if (costBreakdown.cost > costCap) { executionServiceCluster.abort(workflowRec, petUser).map { case Success(abortedWfRec) => @@ -330,6 +331,9 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some)) } } else { + // TODO CORE-217: don't update unless status or cost has actually changed? + // Do we need to incrementally update cost? + // If we track cost changes on every iteration, we're going to blow up the AUDIT_WORKFLOW_STATUS table Future.successful(Option(workflowRec.copy(status = costBreakdown.status, cost = costBreakdown.cost.some))) } } yield updatedWorkflowRec @@ -510,7 +514,9 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum val dataEntity = submission.submissionEntity.fold("N/A")(entity => s"${entity.entityName} (${entity.entityType})" ) // Format: my_sample (sample) - val hasFailedWorkflows = submission.workflows.exists(_.status.equals(WorkflowStatuses.Failed)) + val hasFailedOrAbortedWorkflows = submission.workflows.exists(wf => + wf.status.equals(WorkflowStatuses.Failed) || wf.status.equals(WorkflowStatuses.Aborted) + ) val notificationWorkspaceName = Notifications.WorkspaceName(workspaceName.namespace, workspaceName.name) val userComment = submission.userComment.getOrElse("N/A") @@ -528,7 +534,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum userComment ) ) - case SubmissionStatuses.Done if hasFailedWorkflows => + case SubmissionStatuses.Done if hasFailedOrAbortedWorkflows => Some( FailedSubmissionNotification( recipientUserId, @@ -541,7 +547,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum userComment ) ) - case SubmissionStatuses.Done if !hasFailedWorkflows => + case SubmissionStatuses.Done if !hasFailedOrAbortedWorkflows => Some( SuccessfulSubmissionNotification( recipientUserId, @@ -556,7 +562,7 @@ trait SubmissionMonitor extends FutureSupport with LazyLogging with RawlsInstrum ) case _ => logger.info( - s"Unable to send terminal submission notification for ${submissionId}. State was unexpected: status: ${finalStatus}, hasFailedWorkflows: ${hasFailedWorkflows}" + s"Unable to send terminal submission notification for ${submissionId}. State was unexpected: status: ${finalStatus}, hasFailedOrAbortedWorkflows: ${hasFailedOrAbortedWorkflows}" ) None }