From ffdd3819bfce71a5c6692f65ccc7f94e741b497a Mon Sep 17 00:00:00 2001 From: Jorge Ejarque Date: Wed, 23 Oct 2024 12:02:06 +0200 Subject: [PATCH] Exposing trace record meta info in the task context (#5402) This commit introduces the ability to access `task.previousTrace` and `task.previousException` that enable to access the trace runtime metrics and exception object of a previous failed task execution attempt. Signed-off-by: jorgee Signed-off-by: Paolo Di Tommaso Signed-off-by: Jorge Ejarque Co-authored-by: Paolo Di Tommaso Co-authored-by: Ben Sherman Co-authored-by: Christopher Hakkaart --- docs/process.md | 19 +++++++++++++++ docs/reference/process.md | 15 ++++++++++++ .../processor/TaskPollingMonitor.groovy | 6 +++-- .../nextflow/processor/TaskProcessor.groovy | 13 +++++++--- .../groovy/test/MockHelpers.groovy | 2 +- tests/checks/trace-access.nf/.checks | 11 +++++++++ tests/trace-access.nf | 24 +++++++++++++++++++ 7 files changed, 84 insertions(+), 6 deletions(-) create mode 100644 tests/checks/trace-access.nf/.checks create mode 100644 tests/trace-access.nf diff --git a/docs/process.md b/docs/process.md index d89f3b62e1..7d7d153d66 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1283,6 +1283,25 @@ If the task execution fail reporting an exit status in the range between 137 and The directive {ref}`process-maxretries` set the maximum number of time the same task can be re-executed. +### Dynamic task resources with previous execution trace +:::{versionadded} 24.10.0 +::: + +Task resource requests can be updated relative to the {ref}`trace record ` metrics of the previous task attempt. The metrics can be accessed through the `task.previousTrace` variable. For example: + +```groovy +process foo { + memory { task.attempt > 1 ? task.previousTrace.memory * 2 : (1.GB) } + errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' } + maxRetries 3 + + script: + +} +``` +In the above example, the {ref}`process-memory` is set according to previous trace record metrics. In the first attempt, when no trace metrics are available, it is set to one GB. In the subsequent attempts, it doubles the previously allocated memory. See {ref}`trace-report` for more information about trace records. + + ### Dynamic retry with backoff There are cases in which the required execution resources may be temporary unavailable e.g. network congestion. In these cases immediately re-executing the task will likely result in the identical error. A retry with an exponential backoff delay can better recover these error conditions: diff --git a/docs/reference/process.md b/docs/reference/process.md index 7f3db05073..2c7ca3af33 100644 --- a/docs/reference/process.md +++ b/docs/reference/process.md @@ -26,6 +26,21 @@ The following task properties are defined in the process body: : *Available only in `exec:` blocks* : The current task name. +`task.previousException` +: :::{versionadded} 24.10.0 + ::: +: The exception reported by the previous task attempt. +: Since the exception is available after a failed task attempt, + it can only be accessed when retrying a failed task execution, and therefore when `task.attempt` is greater than 1. + +`task.previousTrace` +: :::{versionadded} 24.10.0 + ::: +: The trace record associated with the previous task attempt. +: Since the trace record is available after a failed task attempt, + it can only be accessed when retrying a failed task execution, and therefore when `task.attempt` is greater than 1. +: This is useful when retrying a task execution to access the previous task attempt runtime metrics e.g. used memory and CPUs. + `task.process` : The current process name. diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy index 87780e731c..4e97543d2e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy @@ -16,6 +16,8 @@ package nextflow.processor +import static nextflow.processor.TaskProcessor.* + import java.util.concurrent.ExecutorService import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -617,7 +619,7 @@ class TaskPollingMonitor implements TaskMonitor { if (evict(handler)) { handler.decProcessForks() } - fault = handler.task.processor.resumeOrDie(handler?.task, error) + fault = handler.task.processor.resumeOrDie(handler?.task, error, handler.getTraceRecord()) log.trace "Task fault (1): $fault" } finally { @@ -683,7 +685,7 @@ class TaskPollingMonitor implements TaskMonitor { protected void finalizeTask( TaskHandler handler ) { // finalize the task execution - final fault = handler.task.processor.finalizeTask(handler.task) + final fault = handler.task.processor.finalizeTask(handler) // notify task completion session.notifyTaskComplete(handler) diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index b734523de0..a1d48e5316 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -15,6 +15,8 @@ */ package nextflow.processor +import nextflow.trace.TraceRecord + import static nextflow.processor.ErrorStrategy.* import java.lang.reflect.InvocationTargetException @@ -1016,7 +1018,7 @@ class TaskProcessor { * a {@link ErrorStrategy#TERMINATE}) */ @PackageScope - final synchronized resumeOrDie( TaskRun task, Throwable error ) { + final synchronized resumeOrDie( TaskRun task, Throwable error, TraceRecord traceRecord = null) { log.debug "Handling unexpected condition for\n task: name=${safeTaskName(task)}; work-dir=${task?.workDirStr}\n error [${error?.class?.name}]: ${error?.getMessage()?:error}" ErrorStrategy errorStrategy = TERMINATE @@ -1061,6 +1063,10 @@ class TaskProcessor { task.config.exitStatus = task.exitStatus task.config.errorCount = procErrCount task.config.retryCount = taskErrCount + //Add trace of the previous execution in the task context for next execution + if ( traceRecord ) + task.config.previousTrace = traceRecord + task.config.previousException = error errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries) if( errorStrategy.soft ) { @@ -2361,7 +2367,8 @@ class TaskProcessor { * @param task The {@code TaskRun} instance to finalize */ @PackageScope - final finalizeTask( TaskRun task ) { + final finalizeTask( TaskHandler handler) { + def task = handler.task log.trace "finalizing process > ${safeTaskName(task)} -- $task" def fault = null @@ -2384,7 +2391,7 @@ class TaskProcessor { collectOutputs(task) } catch ( Throwable error ) { - fault = resumeOrDie(task, error) + fault = resumeOrDie(task, error, handler.getTraceRecord()) log.trace "Task fault (3): $fault" } diff --git a/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy b/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy index 5a0c0c1c39..51a298dbd6 100644 --- a/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy +++ b/modules/nextflow/src/testFixtures/groovy/test/MockHelpers.groovy @@ -175,7 +175,7 @@ class MockTaskHandler extends TaskHandler { task.code.call() } status = TaskStatus.COMPLETED - task.processor.finalizeTask(task) + task.processor.finalizeTask(this) } @Override diff --git a/tests/checks/trace-access.nf/.checks b/tests/checks/trace-access.nf/.checks new file mode 100644 index 0000000000..887f37fd3b --- /dev/null +++ b/tests/checks/trace-access.nf/.checks @@ -0,0 +1,11 @@ +set -e + +echo '' +$NXF_RUN | tee stdout + +[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > foo'` == 1 ]] || false +[[ `grep 'INFO' .nextflow.log | grep -c 'Re-submitted process > foo'` == 3 ]] || false + +[[ `grep -c 'mem: 8 GB (previous: 4294967296) (error: nextflow.exception.ProcessFailedException: Process .* terminated with an error exit status (137))' stdout` == 1 ]] || false + + diff --git a/tests/trace-access.nf b/tests/trace-access.nf new file mode 100644 index 0000000000..ff55dbdb84 --- /dev/null +++ b/tests/trace-access.nf @@ -0,0 +1,24 @@ +process foo { + memory { task.attempt > 1 ? task.previousTrace.memory * 2 : (1.GB) } + errorStrategy 'retry' + maxRetries 3 + input: + val i + output: + stdout + script: + if( task.attempt <= 3 ){ + """ + exit 137 + """ + } else { + """ + echo 'mem: $task.memory (previous: $task.previousTrace.memory) (error: $task.previousException)' + exit 0 + """ + } +} + +workflow { + foo(channel.of(1)).view() +}