Skip to content

Commit

Permalink
Exposing trace record meta info in the task context (#5402)
Browse files Browse the repository at this point in the history
This commit introduces the ability to access `task.previousTrace` and `task.previousException`  
that enable to access the trace runtime metrics and exception object of a previous failed 
task execution attempt. 

Signed-off-by: jorgee <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Jorge Ejarque <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Ben Sherman <[email protected]>
Co-authored-by: Christopher Hakkaart <[email protected]>
  • Loading branch information
4 people authored Oct 23, 2024
1 parent 24d595c commit ffdd381
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 6 deletions.
19 changes: 19 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,25 @@ If the task execution fail reporting an exit status in the range between 137 and

The directive {ref}`process-maxretries` set the maximum number of time the same task can be re-executed.

### Dynamic task resources with previous execution trace
:::{versionadded} 24.10.0
:::

Task resource requests can be updated relative to the {ref}`trace record <trace-report>` metrics of the previous task attempt. The metrics can be accessed through the `task.previousTrace` variable. For example:

```groovy
process foo {
memory { task.attempt > 1 ? task.previousTrace.memory * 2 : (1.GB) }
errorStrategy { task.exitStatus in 137..140 ? 'retry' : 'terminate' }
maxRetries 3
script:
<your job here>
}
```
In the above example, the {ref}`process-memory` is set according to previous trace record metrics. In the first attempt, when no trace metrics are available, it is set to one GB. In the subsequent attempts, it doubles the previously allocated memory. See {ref}`trace-report` for more information about trace records.


### Dynamic retry with backoff

There are cases in which the required execution resources may be temporary unavailable e.g. network congestion. In these cases immediately re-executing the task will likely result in the identical error. A retry with an exponential backoff delay can better recover these error conditions:
Expand Down
15 changes: 15 additions & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ The following task properties are defined in the process body:
: *Available only in `exec:` blocks*
: The current task name.

`task.previousException`
: :::{versionadded} 24.10.0
:::
: The exception reported by the previous task attempt.
: Since the exception is available after a failed task attempt,
it can only be accessed when retrying a failed task execution, and therefore when `task.attempt` is greater than 1.

`task.previousTrace`
: :::{versionadded} 24.10.0
:::
: The trace record associated with the previous task attempt.
: Since the trace record is available after a failed task attempt,
it can only be accessed when retrying a failed task execution, and therefore when `task.attempt` is greater than 1.
: This is useful when retrying a task execution to access the previous task attempt runtime metrics e.g. used memory and CPUs.

`task.process`
: The current process name.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.processor

import static nextflow.processor.TaskProcessor.*

import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -617,7 +619,7 @@ class TaskPollingMonitor implements TaskMonitor {
if (evict(handler)) {
handler.decProcessForks()
}
fault = handler.task.processor.resumeOrDie(handler?.task, error)
fault = handler.task.processor.resumeOrDie(handler?.task, error, handler.getTraceRecord())
log.trace "Task fault (1): $fault"
}
finally {
Expand Down Expand Up @@ -683,7 +685,7 @@ class TaskPollingMonitor implements TaskMonitor {

protected void finalizeTask( TaskHandler handler ) {
// finalize the task execution
final fault = handler.task.processor.finalizeTask(handler.task)
final fault = handler.task.processor.finalizeTask(handler)

// notify task completion
session.notifyTaskComplete(handler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package nextflow.processor

import nextflow.trace.TraceRecord

import static nextflow.processor.ErrorStrategy.*

import java.lang.reflect.InvocationTargetException
Expand Down Expand Up @@ -1016,7 +1018,7 @@ class TaskProcessor {
* a {@link ErrorStrategy#TERMINATE})
*/
@PackageScope
final synchronized resumeOrDie( TaskRun task, Throwable error ) {
final synchronized resumeOrDie( TaskRun task, Throwable error, TraceRecord traceRecord = null) {
log.debug "Handling unexpected condition for\n task: name=${safeTaskName(task)}; work-dir=${task?.workDirStr}\n error [${error?.class?.name}]: ${error?.getMessage()?:error}"

ErrorStrategy errorStrategy = TERMINATE
Expand Down Expand Up @@ -1061,6 +1063,10 @@ class TaskProcessor {
task.config.exitStatus = task.exitStatus
task.config.errorCount = procErrCount
task.config.retryCount = taskErrCount
//Add trace of the previous execution in the task context for next execution
if ( traceRecord )
task.config.previousTrace = traceRecord
task.config.previousException = error

errorStrategy = checkErrorStrategy(task, error, taskErrCount, procErrCount, submitRetries)
if( errorStrategy.soft ) {
Expand Down Expand Up @@ -2361,7 +2367,8 @@ class TaskProcessor {
* @param task The {@code TaskRun} instance to finalize
*/
@PackageScope
final finalizeTask( TaskRun task ) {
final finalizeTask( TaskHandler handler) {
def task = handler.task
log.trace "finalizing process > ${safeTaskName(task)} -- $task"

def fault = null
Expand All @@ -2384,7 +2391,7 @@ class TaskProcessor {
collectOutputs(task)
}
catch ( Throwable error ) {
fault = resumeOrDie(task, error)
fault = resumeOrDie(task, error, handler.getTraceRecord())
log.trace "Task fault (3): $fault"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class MockTaskHandler extends TaskHandler {
task.code.call()
}
status = TaskStatus.COMPLETED
task.processor.finalizeTask(task)
task.processor.finalizeTask(this)
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions tests/checks/trace-access.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set -e

echo ''
$NXF_RUN | tee stdout

[[ `grep 'INFO' .nextflow.log | grep -c 'Submitted process > foo'` == 1 ]] || false
[[ `grep 'INFO' .nextflow.log | grep -c 'Re-submitted process > foo'` == 3 ]] || false

[[ `grep -c 'mem: 8 GB (previous: 4294967296) (error: nextflow.exception.ProcessFailedException: Process .* terminated with an error exit status (137))' stdout` == 1 ]] || false


24 changes: 24 additions & 0 deletions tests/trace-access.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
process foo {
memory { task.attempt > 1 ? task.previousTrace.memory * 2 : (1.GB) }
errorStrategy 'retry'
maxRetries 3
input:
val i
output:
stdout
script:
if( task.attempt <= 3 ){
"""
exit 137
"""
} else {
"""
echo 'mem: $task.memory (previous: $task.previousTrace.memory) (error: $task.previousException)'
exit 0
"""
}
}

workflow {
foo(channel.of(1)).view()
}

0 comments on commit ffdd381

Please sign in to comment.