diff --git a/docs/_static/img/async-exec-policy.svg b/docs/_static/img/async-exec-policy.svg index 198a661d3e..5dce160ee6 100644 --- a/docs/_static/img/async-exec-policy.svg +++ b/docs/_static/img/async-exec-policy.svg @@ -1,3 +1,3 @@ -
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
SE
SE
BU
BU
RU
RU
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
SA
SA
PE
PE
CL
CL
Viewer does not support full SVG 1.1
\ No newline at end of file +
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
BU
BU
SE
SE
SE
SE
BU
BU
RU
RU
RU
RU
SA
SA
CL
CL
PE
PE
SA
SA
CL
CL
PE
PE
Viewer does not support full SVG 1.1
\ No newline at end of file diff --git a/docs/_static/img/regression-task-state-machine.svg b/docs/_static/img/regression-task-state-machine.svg new file mode 100644 index 0000000000..2e822b3443 --- /dev/null +++ b/docs/_static/img/regression-task-state-machine.svg @@ -0,0 +1,3 @@ + + +
exception
exception
STARTUP
STARTUP
exception
exception
READY
COMPILE
READY...
exception
exception
COMPILING
COMPILING
exception
exception
READY
RUN
READY...
exception
exception
RUNNING
RUNNING
Sanity & perf. check
Sanity & perf. check
exception
exception
COMPLETING
COMPLETING
Deps ready &
not RunOnly
Deps ready...
Exec. slots
available
Exec. slot...
Finished and
not CompileOnly
Finished a...
COMPLETED
COMPLETED
Deps
peding
Deps...
No exec. slots
No exec. slots
Not finished
Not finished
Exec. slots
available
Exec. slot...
No exec.
slots
No exec....
Not finished
Not finished
Cleanup
Cleanup
Cleanup
failure
Cleanup...
RETIRED
(success)
RETIRED...
Finished
Finished
FAILED
FAILED
ERROR
ERROR
compile_complete
compile_complete
compile_wait
compile_wait
setup
setup
compile
compile
run
run
run_complete
run_complete
run_wait
run_wait
sanity
sanity
performance
performance
cleanup
cleanup
Viewer does not support full SVG 1.1
\ No newline at end of file diff --git a/docs/_static/img/serial-exec-policy.svg b/docs/_static/img/serial-exec-policy.svg index 1955dfbacc..449540ba67 100644 --- a/docs/_static/img/serial-exec-policy.svg +++ b/docs/_static/img/serial-exec-policy.svg @@ -1,3 +1,3 @@ -
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
Idling
Idling
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
Idling
Idling
Viewer does not support full SVG 1.1
\ No newline at end of file +
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
Idling
Idling
SE
SE
BU
BU
RU
RU
SA
SA
PE
PE
CL
CL
Idling
Idling
Idling
Idling
Idling
Idling
Viewer does not support full SVG 1.1
\ No newline at end of file diff --git a/docs/config_reference.rst b/docs/config_reference.rst index 75caf01dd8..6820840120 100644 --- a/docs/config_reference.rst +++ b/docs/config_reference.rst @@ -101,6 +101,18 @@ System Configuration A list of hostname regular expression patterns in Python `syntax `__, which will be used by the framework in order to automatically select a system configuration. For the auto-selection process, see `here `__. +.. js:attribute:: .systems[].max_local_jobs + + The maximum number of forced local build or run jobs allowed. + + Forced local jobs run within the execution context of ReFrame. + + :required: No + :default: ``8`` + + .. versionadded:: 3.10.0 + + .. js:attribute:: .systems[].modules_system :required: No @@ -1289,6 +1301,30 @@ General Configuration Timeout value in seconds used when checking if a git repository exists. +.. js:attribute:: .general[].dump_pipeline_progress + + Dump pipeline progress for the asynchronous execution policy in ``pipeline-progress.json``. + This option is meant for debug purposes only. + + :required: No + :default: ``False`` + + .. versionadded:: 3.10.0 + + +.. js:attribute:: .general[].pipeline_timeout + + Timeout in seconds for advancing the pipeline in the asynchronous execution policy. + + ReFrame's asynchronous execution policy will try to advance as many tests as possible in their pipeline, but some tests may take too long to proceed (e.g., due to copying of large files) blocking the advancement of previously started tests. + If this timeout value is exceeded and at least one test has progressed, ReFrame will stop processing new tests and it will try to further advance tests that have already started. + + :required: No + :default: ``10`` + + .. versionadded:: 3.10.0 + + .. js:attribute:: .general[].remote_detect :required: No diff --git a/docs/manpage.rst b/docs/manpage.rst index b077e855bb..baec1f350e 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -344,13 +344,13 @@ Options controlling ReFrame execution - ``async``: Tests will be executed asynchronously. This is the default policy. - The ``async`` execution policy executes the run phase of tests asynchronously by submitting their associated jobs in a non-blocking way. - ReFrame's runtime monitors the progress of each test and will resume the pipeline execution of an asynchronously spawned test as soon as its run phase has finished. + The ``async`` execution policy executes the build and run phases of tests asynchronously by submitting their associated jobs in a non-blocking way. + ReFrame's runtime monitors the progress of each test and will resume the pipeline execution of an asynchronously spawned test as soon as its build or run phase have finished. Note that the rest of the pipeline stages are still executed sequentially in this policy. Concurrency can be controlled by setting the :js:attr:`max_jobs` system partition configuration parameter. As soon as the concurrency limit is reached, ReFrame will first poll the status of all its pending tests to check if any execution slots have been freed up. - If there are tests that have finished their run phase, ReFrame will keep pushing tests for execution until the concurrency limit is reached again. + If there are tests that have finished their build or run phase, ReFrame will keep pushing tests for execution until the concurrency limit is reached again. If no execution slots are available, ReFrame will throttle job submission. .. option:: --force-local diff --git a/docs/pipeline.rst b/docs/pipeline.rst index 3d9438fa6a..e7cf037a92 100644 --- a/docs/pipeline.rst +++ b/docs/pipeline.rst @@ -52,7 +52,8 @@ A `job descriptor `__ this phase is a no-op. Before building the test, all the `resources `__ associated with it are copied to the test case's stage directory. @@ -100,10 +101,10 @@ Execution Policies All regression tests in ReFrame will execute the pipeline stages described above. However, how exactly this pipeline will be executed is responsibility of the test execution policy. -There are two execution policies in ReFrame: the serial and the asynchronous one. +There are two execution policies in ReFrame: the serial and the asynchronous execution policy. In the serial execution policy, a new test gets into the pipeline after the previous one has exited. -As the figure below shows, this can lead to long idling times in the run phase, since the execution blocks until the associated test job finishes. +As the figure below shows, this can lead to long idling times in the build and run phases, since the execution blocks until the associated test job finishes. .. figure:: _static/img/serial-exec-policy.svg @@ -114,7 +115,7 @@ As the figure below shows, this can lead to long idling times in the run phase, In the asynchronous execution policy, multiple tests can be simultaneously on-the-fly. -When a test enters the run phase, ReFrame does not block, but continues by picking the next test case to run. +When a test enters the build or run phase, ReFrame does not block, but continues by picking the next test case to run. This continues until no more test cases are left for execution or until a maximum concurrency limit is reached. At the end, ReFrame enters a busy-wait loop monitoring the spawned test cases. As soon as test case finishes, it resumes its pipeline and runs it to completion. @@ -133,6 +134,67 @@ When the `concurrency limit test failed during 'compile': test staged in '/Users/user/Repositories/reframe/stage/generic/default/builtin/HelloMultiLangTest_cpp' - [----------] finished processing HelloMultiLangTest_cpp (HelloMultiLangTest_cpp) - - [----------] waiting for spawned checks to finish [ OK ] (2/2) HelloMultiLangTest_c on generic:default using builtin [compile: 0.981s run: 0.468s total: 1.475s] [----------] all spawned checks have finished @@ -397,17 +388,11 @@ Let's now rerun our "Hello, World!" tests: [==========] Running 2 check(s) [==========] Started on Tue Mar 9 23:28:00 2021 - [----------] started processing HelloMultiLangTest_c (HelloMultiLangTest_c) + [----------] start processing checks [ RUN ] HelloMultiLangTest_c on catalina:default using gnu [ RUN ] HelloMultiLangTest_c on catalina:default using clang - [----------] finished processing HelloMultiLangTest_c (HelloMultiLangTest_c) - - [----------] started processing HelloMultiLangTest_cpp (HelloMultiLangTest_cpp) [ RUN ] HelloMultiLangTest_cpp on catalina:default using gnu [ RUN ] HelloMultiLangTest_cpp on catalina:default using clang - [----------] finished processing HelloMultiLangTest_cpp (HelloMultiLangTest_cpp) - - [----------] waiting for spawned checks to finish [ OK ] (1/4) HelloMultiLangTest_cpp on catalina:default using gnu [compile: 0.768s run: 1.115s total: 1.909s] [ OK ] (2/4) HelloMultiLangTest_c on catalina:default using gnu [compile: 0.600s run: 2.230s total: 2.857s] [ OK ] (3/4) HelloMultiLangTest_c on catalina:default using clang [compile: 0.238s run: 2.129s total: 2.393s] @@ -499,12 +484,9 @@ Let's run the test now: [==========] Running 1 check(s) [==========] Started on Mon Oct 12 20:02:37 2020 - [----------] started processing HelloThreadedTest (HelloThreadedTest) + [----------] start processing checks [ RUN ] HelloThreadedTest on catalina:default using gnu [ RUN ] HelloThreadedTest on catalina:default using clang - [----------] finished processing HelloThreadedTest (HelloThreadedTest) - - [----------] waiting for spawned checks to finish [ OK ] (1/2) HelloThreadedTest on catalina:default using gnu [compile: 1.591s run: 1.205s total: 2.816s] [ OK ] (2/2) HelloThreadedTest on catalina:default using clang [compile: 1.141s run: 0.309s total: 1.465s] [----------] all spawned checks have finished @@ -592,12 +574,9 @@ Let's run this version of the test now and see if it fails: [==========] Running 1 check(s) [==========] Started on Mon Oct 12 20:04:59 2020 - [----------] started processing HelloThreadedExtendedTest (HelloThreadedExtendedTest) + [----------] start processing checks [ RUN ] HelloThreadedExtendedTest on catalina:default using gnu [ RUN ] HelloThreadedExtendedTest on catalina:default using clang - [----------] finished processing HelloThreadedExtendedTest (HelloThreadedExtendedTest) - - [----------] waiting for spawned checks to finish [ FAIL ] (1/2) HelloThreadedExtendedTest on catalina:default using gnu [compile: 1.222s run: 0.891s total: 2.130s] [ FAIL ] (2/2) HelloThreadedExtendedTest on catalina:default using clang [compile: 0.835s run: 0.167s total: 1.018s] [----------] all spawned checks have finished @@ -718,11 +697,8 @@ The :option:`--performance-report` will generate a short report at the end for e [==========] Running 1 check(s) [==========] Started on Mon Oct 12 20:06:09 2020 - [----------] started processing StreamTest (StreamTest) + [----------] start processing checks [ RUN ] StreamTest on catalina:default using gnu - [----------] finished processing StreamTest (StreamTest) - - [----------] waiting for spawned checks to finish [ OK ] (1/1) StreamTest on catalina:default using gnu [compile: 1.386s run: 2.377s total: 3.780s] [----------] all spawned checks have finished @@ -967,7 +943,7 @@ We will only do so with the final versions of the tests from the previous sectio [==========] Running 4 check(s) [==========] Started on Mon Jan 25 00:34:32 2021 - [----------] started processing HelloMultiLangTest_c (HelloMultiLangTest_c) + [----------] start processing checks [ RUN ] HelloMultiLangTest_c on daint:login using builtin [ RUN ] HelloMultiLangTest_c on daint:login using gnu [ RUN ] HelloMultiLangTest_c on daint:login using intel @@ -981,9 +957,6 @@ We will only do so with the final versions of the tests from the previous sectio [ RUN ] HelloMultiLangTest_c on daint:mc using intel [ RUN ] HelloMultiLangTest_c on daint:mc using pgi [ RUN ] HelloMultiLangTest_c on daint:mc using cray - [----------] finished processing HelloMultiLangTest_c (HelloMultiLangTest_c) - - [----------] started processing HelloMultiLangTest_cpp (HelloMultiLangTest_cpp) [ RUN ] HelloMultiLangTest_cpp on daint:login using builtin [ RUN ] HelloMultiLangTest_cpp on daint:login using gnu [ RUN ] HelloMultiLangTest_cpp on daint:login using intel @@ -997,9 +970,6 @@ We will only do so with the final versions of the tests from the previous sectio [ RUN ] HelloMultiLangTest_cpp on daint:mc using intel [ RUN ] HelloMultiLangTest_cpp on daint:mc using pgi [ RUN ] HelloMultiLangTest_cpp on daint:mc using cray - [----------] finished processing HelloMultiLangTest_cpp (HelloMultiLangTest_cpp) - - [----------] started processing HelloThreadedExtended2Test (HelloThreadedExtended2Test) [ RUN ] HelloThreadedExtended2Test on daint:login using builtin [ RUN ] HelloThreadedExtended2Test on daint:login using gnu [ RUN ] HelloThreadedExtended2Test on daint:login using intel @@ -1013,15 +983,9 @@ We will only do so with the final versions of the tests from the previous sectio [ RUN ] HelloThreadedExtended2Test on daint:mc using intel [ RUN ] HelloThreadedExtended2Test on daint:mc using pgi [ RUN ] HelloThreadedExtended2Test on daint:mc using cray - [----------] finished processing HelloThreadedExtended2Test (HelloThreadedExtended2Test) - - [----------] started processing StreamWithRefTest (StreamWithRefTest) [ RUN ] StreamWithRefTest on daint:login using gnu [ RUN ] StreamWithRefTest on daint:gpu using gnu [ RUN ] StreamWithRefTest on daint:mc using gnu - [----------] finished processing StreamWithRefTest (StreamWithRefTest) - - [----------] waiting for spawned checks to finish [ OK ] ( 1/42) HelloThreadedExtended2Test on daint:login using cray [compile: 0.959s run: 56.203s total: 57.189s] [ OK ] ( 2/42) HelloThreadedExtended2Test on daint:login using intel [compile: 2.096s run: 61.438s total: 64.062s] [ OK ] ( 3/42) HelloMultiLangTest_cpp on daint:login using cray [compile: 0.479s run: 98.909s total: 99.406s] @@ -1205,7 +1169,7 @@ Let's run our adapted test now: [==========] Running 1 check(s) [==========] Started on Mon Oct 12 20:16:03 2020 - [----------] started processing StreamMultiSysTest (StreamMultiSysTest) + [----------] start processing checks [ RUN ] StreamMultiSysTest on daint:login using gnu [ RUN ] StreamMultiSysTest on daint:login using intel [ RUN ] StreamMultiSysTest on daint:login using pgi @@ -1218,9 +1182,6 @@ Let's run our adapted test now: [ RUN ] StreamMultiSysTest on daint:mc using intel [ RUN ] StreamMultiSysTest on daint:mc using pgi [ RUN ] StreamMultiSysTest on daint:mc using cray - [----------] finished processing StreamMultiSysTest (StreamMultiSysTest) - - [----------] waiting for spawned checks to finish [ OK ] ( 1/12) StreamMultiSysTest on daint:gpu using pgi [compile: 2.092s run: 11.201s total: 13.307s] [ OK ] ( 2/12) StreamMultiSysTest on daint:gpu using gnu [compile: 2.349s run: 17.140s total: 19.509s] [ OK ] ( 3/12) StreamMultiSysTest on daint:login using pgi [compile: 2.230s run: 20.946s total: 23.189s] diff --git a/docs/tutorial_deps.rst b/docs/tutorial_deps.rst index fea0450671..e8bc2a80bd 100644 --- a/docs/tutorial_deps.rst +++ b/docs/tutorial_deps.rst @@ -114,7 +114,7 @@ Here is the output when running the OSU tests with the asynchronous execution po .. code-block:: none [ReFrame Setup] - version: 3.6.0-dev.0+4de0fee1 + version: 3.10.0-dev.2 command: './bin/reframe -c tutorials/deps/osu_benchmarks.py -r' launched by: user@daint101 working directory: '/users/user/Devel/reframe' @@ -126,96 +126,51 @@ Here is the output when running the OSU tests with the asynchronous execution po [==========] Running 8 check(s) [==========] Started on Wed Mar 10 20:53:56 2021 - [----------] started processing OSUDownloadTest (OSU benchmarks download sources) + [----------] start processing checks [ RUN ] OSUDownloadTest on daint:login using builtin - [----------] finished processing OSUDownloadTest (OSU benchmarks download sources) - - [----------] started processing OSUBuildTest (OSU benchmarks build test) + [ OK ] ( 1/22) OSUDownloadTest on daint:login using builtin [compile: 0.035s run: 2.520s total: 2.716s] [ RUN ] OSUBuildTest on daint:gpu using gnu - [ DEP ] OSUBuildTest on daint:gpu using gnu [ RUN ] OSUBuildTest on daint:gpu using intel - [ DEP ] OSUBuildTest on daint:gpu using intel [ RUN ] OSUBuildTest on daint:gpu using pgi - [ DEP ] OSUBuildTest on daint:gpu using pgi - [----------] finished processing OSUBuildTest (OSU benchmarks build test) - - [----------] started processing OSULatencyTest (OSU latency test) + [ OK ] ( 2/22) OSUBuildTest on daint:gpu using gnu [compile: 156.713s run: 10.222s total: 170.501s] [ RUN ] OSULatencyTest on daint:gpu using gnu - [ DEP ] OSULatencyTest on daint:gpu using gnu - [ RUN ] OSULatencyTest on daint:gpu using intel - [ DEP ] OSULatencyTest on daint:gpu using intel - [ RUN ] OSULatencyTest on daint:gpu using pgi - [ DEP ] OSULatencyTest on daint:gpu using pgi - [----------] finished processing OSULatencyTest (OSU latency test) - - [----------] started processing OSUBandwidthTest (OSU bandwidth test) [ RUN ] OSUBandwidthTest on daint:gpu using gnu - [ DEP ] OSUBandwidthTest on daint:gpu using gnu - [ RUN ] OSUBandwidthTest on daint:gpu using intel - [ DEP ] OSUBandwidthTest on daint:gpu using intel - [ RUN ] OSUBandwidthTest on daint:gpu using pgi - [ DEP ] OSUBandwidthTest on daint:gpu using pgi - [----------] finished processing OSUBandwidthTest (OSU bandwidth test) - - [----------] started processing OSUAllreduceTest_2 (OSU Allreduce test) [ RUN ] OSUAllreduceTest_2 on daint:gpu using gnu - [ DEP ] OSUAllreduceTest_2 on daint:gpu using gnu - [ RUN ] OSUAllreduceTest_2 on daint:gpu using intel - [ DEP ] OSUAllreduceTest_2 on daint:gpu using intel - [ RUN ] OSUAllreduceTest_2 on daint:gpu using pgi - [ DEP ] OSUAllreduceTest_2 on daint:gpu using pgi - [----------] finished processing OSUAllreduceTest_2 (OSU Allreduce test) - - [----------] started processing OSUAllreduceTest_4 (OSU Allreduce test) [ RUN ] OSUAllreduceTest_4 on daint:gpu using gnu - [ DEP ] OSUAllreduceTest_4 on daint:gpu using gnu - [ RUN ] OSUAllreduceTest_4 on daint:gpu using intel - [ DEP ] OSUAllreduceTest_4 on daint:gpu using intel - [ RUN ] OSUAllreduceTest_4 on daint:gpu using pgi - [ DEP ] OSUAllreduceTest_4 on daint:gpu using pgi - [----------] finished processing OSUAllreduceTest_4 (OSU Allreduce test) - - [----------] started processing OSUAllreduceTest_8 (OSU Allreduce test) + [ RUN ] OSUAllreduceTest_16 on daint:gpu using gnu [ RUN ] OSUAllreduceTest_8 on daint:gpu using gnu - [ DEP ] OSUAllreduceTest_8 on daint:gpu using gnu - [ RUN ] OSUAllreduceTest_8 on daint:gpu using intel - [ DEP ] OSUAllreduceTest_8 on daint:gpu using intel + [ OK ] ( 3/22) OSUBuildTest on daint:gpu using pgi [compile: 168.692s run: 0.751s total: 171.227s] [ RUN ] OSUAllreduceTest_8 on daint:gpu using pgi - [ DEP ] OSUAllreduceTest_8 on daint:gpu using pgi - [----------] finished processing OSUAllreduceTest_8 (OSU Allreduce test) - - [----------] started processing OSUAllreduceTest_16 (OSU Allreduce test) - [ RUN ] OSUAllreduceTest_16 on daint:gpu using gnu - [ DEP ] OSUAllreduceTest_16 on daint:gpu using gnu - [ RUN ] OSUAllreduceTest_16 on daint:gpu using intel - [ DEP ] OSUAllreduceTest_16 on daint:gpu using intel + [ RUN ] OSULatencyTest on daint:gpu using pgi + [ RUN ] OSUBandwidthTest on daint:gpu using pgi + [ RUN ] OSUAllreduceTest_2 on daint:gpu using pgi + [ RUN ] OSUAllreduceTest_4 on daint:gpu using pgi [ RUN ] OSUAllreduceTest_16 on daint:gpu using pgi - [ DEP ] OSUAllreduceTest_16 on daint:gpu using pgi - [----------] finished processing OSUAllreduceTest_16 (OSU Allreduce test) - - [----------] waiting for spawned checks to finish - [ OK ] ( 1/22) OSUDownloadTest on daint:login using builtin [compile: 0.007s run: 2.033s total: 2.078s] - [ OK ] ( 2/22) OSUBuildTest on daint:gpu using gnu [compile: 20.531s run: 0.039s total: 83.089s] - [ OK ] ( 3/22) OSUBuildTest on daint:gpu using pgi [compile: 27.193s run: 55.871s total: 83.082s] - [ OK ] ( 4/22) OSUAllreduceTest_16 on daint:gpu using gnu [compile: 0.007s run: 30.713s total: 33.470s] - [ OK ] ( 5/22) OSUBuildTest on daint:gpu using intel [compile: 35.256s run: 54.218s total: 116.712s] - [ OK ] ( 6/22) OSULatencyTest on daint:gpu using pgi [compile: 0.011s run: 23.738s total: 51.190s] - [ OK ] ( 7/22) OSUAllreduceTest_2 on daint:gpu using gnu [compile: 0.008s run: 31.879s total: 51.187s] - [ OK ] ( 8/22) OSUAllreduceTest_4 on daint:gpu using gnu [compile: 0.006s run: 37.447s total: 51.194s] - [ OK ] ( 9/22) OSUAllreduceTest_8 on daint:gpu using gnu [compile: 0.007s run: 42.914s total: 51.202s] - [ OK ] (10/22) OSUAllreduceTest_16 on daint:gpu using pgi [compile: 0.006s run: 51.172s total: 51.197s] - [ OK ] (11/22) OSULatencyTest on daint:gpu using gnu [compile: 0.007s run: 21.500s total: 51.730s] - [ OK ] (12/22) OSUAllreduceTest_2 on daint:gpu using pgi [compile: 0.007s run: 35.083s total: 51.700s] - [ OK ] (13/22) OSUAllreduceTest_8 on daint:gpu using pgi [compile: 0.007s run: 46.187s total: 51.681s] - [ OK ] (14/22) OSUAllreduceTest_4 on daint:gpu using pgi [compile: 0.007s run: 41.060s total: 52.030s] - [ OK ] (15/22) OSUAllreduceTest_2 on daint:gpu using intel [compile: 0.008s run: 27.401s total: 35.900s] - [ OK ] (16/22) OSUBandwidthTest on daint:gpu using gnu [compile: 0.008s run: 82.553s total: 107.334s] - [ OK ] (17/22) OSUBandwidthTest on daint:gpu using pgi [compile: 0.009s run: 87.559s total: 109.613s] - [ OK ] (18/22) OSUAllreduceTest_16 on daint:gpu using intel [compile: 0.006s run: 99.899s total: 99.924s] - [ OK ] (19/22) OSUBandwidthTest on daint:gpu using intel [compile: 0.007s run: 116.771s total: 128.125s] - [ OK ] (20/22) OSULatencyTest on daint:gpu using intel [compile: 0.008s run: 114.236s total: 128.398s] - [ OK ] (21/22) OSUAllreduceTest_8 on daint:gpu using intel [compile: 0.008s run: 125.541s total: 128.387s] - [ OK ] (22/22) OSUAllreduceTest_4 on daint:gpu using intel [compile: 0.007s run: 123.079s total: 128.651s] + [ OK ] ( 4/22) OSULatencyTest on daint:gpu using gnu [compile: 0.031s run: 63.644s total: 64.558s] + [ OK ] ( 5/22) OSUAllreduceTest_2 on daint:gpu using gnu [compile: 0.016s run: 53.954s total: 64.619s] + [ OK ] ( 6/22) OSULatencyTest on daint:gpu using pgi [compile: 0.032s run: 28.134s total: 65.222s] + [ OK ] ( 7/22) OSUAllreduceTest_4 on daint:gpu using gnu [compile: 0.015s run: 49.682s total: 65.862s] + [ OK ] ( 8/22) OSUAllreduceTest_16 on daint:gpu using gnu [compile: 0.011s run: 44.188s total: 66.009s] + [ OK ] ( 9/22) OSUAllreduceTest_8 on daint:gpu using gnu [compile: 0.014s run: 38.366s total: 66.076s] + [ OK ] (10/22) OSUAllreduceTest_8 on daint:gpu using pgi [compile: 0.009s run: 34.306s total: 66.546s] + [ OK ] (11/22) OSUBuildTest on daint:gpu using intel [compile: 245.878s run: 0.555s total: 246.570s] + [ RUN ] OSUAllreduceTest_8 on daint:gpu using intel + [ RUN ] OSUAllreduceTest_4 on daint:gpu using intel + [ RUN ] OSULatencyTest on daint:gpu using intel + [ RUN ] OSUBandwidthTest on daint:gpu using intel + [ RUN ] OSUAllreduceTest_2 on daint:gpu using intel + [ RUN ] OSUAllreduceTest_16 on daint:gpu using intel + [ OK ] (12/22) OSUBandwidthTest on daint:gpu using gnu [compile: 0.017s run: 98.239s total: 104.363s] + [ OK ] (13/22) OSUAllreduceTest_2 on daint:gpu using pgi [compile: 0.014s run: 58.084s total: 93.705s] + [ OK ] (14/22) OSUAllreduceTest_4 on daint:gpu using pgi [compile: 0.023s run: 53.762s total: 82.721s] + [ OK ] (15/22) OSUAllreduceTest_16 on daint:gpu using pgi [compile: 0.052s run: 49.170s total: 82.695s] + [ OK ] (16/22) OSUBandwidthTest on daint:gpu using pgi [compile: 0.048s run: 89.141s total: 125.222s] + [ OK ] (17/22) OSUAllreduceTest_2 on daint:gpu using intel [compile: 0.024s run: 46.974s total: 65.742s] + [ OK ] (18/22) OSUAllreduceTest_8 on daint:gpu using intel [compile: 0.010s run: 70.032s total: 71.045s] + [ OK ] (19/22) OSUAllreduceTest_4 on daint:gpu using intel [compile: 0.045s run: 67.585s total: 72.897s] + [ OK ] (20/22) OSULatencyTest on daint:gpu using intel [compile: 0.013s run: 61.913s total: 73.029s] + [ OK ] (21/22) OSUAllreduceTest_16 on daint:gpu using intel [compile: 0.024s run: 59.141s total: 81.230s] + [ OK ] (22/22) OSUBandwidthTest on daint:gpu using intel [compile: 0.044s run: 121.324s total: 136.121s] [----------] all spawned checks have finished [ PASSED ] Ran 22/22 test case(s) from 8 check(s) (0 failure(s)) @@ -226,7 +181,7 @@ Here is the output when running the OSU tests with the asynchronous execution po Before starting running the tests, ReFrame topologically sorts them based on their dependencies and schedules them for running using the selected execution policy. With the serial execution policy, ReFrame simply executes the tests to completion as they "arrive," since the tests are already topologically sorted. In the asynchronous execution policy, tests are spawned and not waited for. -If a test's dependencies have not yet completed, it will not start its execution and a ``DEP`` message will be printed to denote this. +If a test's dependencies have not yet completed, it will not start its execution immediately. ReFrame's runtime takes care of properly cleaning up the resources of the tests respecting dependencies. Normally when an individual test finishes successfully, its stage directory is cleaned up. diff --git a/docs/tutorial_fixtures.rst b/docs/tutorial_fixtures.rst index e8410f461f..eb51b33b1e 100644 --- a/docs/tutorial_fixtures.rst +++ b/docs/tutorial_fixtures.rst @@ -171,102 +171,51 @@ The following listing shows the output of running the tutorial examples. [==========] Running 10 check(s) [==========] Started on Sun Oct 31 22:00:28 2021 - [----------] started processing fetch_osu_benchmarks~daint (Fetch OSU benchmarks) + [----------] start processing checks [ RUN ] fetch_osu_benchmarks~daint on daint:gpu using gnu - [----------] finished processing fetch_osu_benchmarks~daint (Fetch OSU benchmarks) - - [----------] started processing build_osu_benchmarks~daint:gpu+gnu (Build OSU benchmarks) - [ RUN ] build_osu_benchmarks~daint:gpu+gnu on daint:gpu using gnu - [ DEP ] build_osu_benchmarks~daint:gpu+gnu on daint:gpu using gnu - [----------] finished processing build_osu_benchmarks~daint:gpu+gnu (Build OSU benchmarks) - - [----------] started processing build_osu_benchmarks~daint:gpu+intel (Build OSU benchmarks) + [ OK ] ( 1/22) fetch_osu_benchmarks~daint on daint:gpu using gnu [compile: 0.007s run: 2.960s total: 2.988s] [ RUN ] build_osu_benchmarks~daint:gpu+intel on daint:gpu using intel - [ DEP ] build_osu_benchmarks~daint:gpu+intel on daint:gpu using intel - [----------] finished processing build_osu_benchmarks~daint:gpu+intel (Build OSU benchmarks) - - [----------] started processing build_osu_benchmarks~daint:gpu+pgi (Build OSU benchmarks) [ RUN ] build_osu_benchmarks~daint:gpu+pgi on daint:gpu using pgi - [ DEP ] build_osu_benchmarks~daint:gpu+pgi on daint:gpu using pgi - [----------] finished processing build_osu_benchmarks~daint:gpu+pgi (Build OSU benchmarks) - - [----------] started processing osu_allreduce_test_16 (OSU Allreduce test) + [ RUN ] build_osu_benchmarks~daint:gpu+gnu on daint:gpu using gnu + [ OK ] ( 2/22) build_osu_benchmarks~daint:gpu+gnu on daint:gpu using gnu [compile: 26.322s run: 2.609s total: 30.214s] [ RUN ] osu_allreduce_test_16 on daint:gpu using gnu - [ DEP ] osu_allreduce_test_16 on daint:gpu using gnu - [ RUN ] osu_allreduce_test_16 on daint:gpu using intel - [ DEP ] osu_allreduce_test_16 on daint:gpu using intel - [ RUN ] osu_allreduce_test_16 on daint:gpu using pgi - [ DEP ] osu_allreduce_test_16 on daint:gpu using pgi - [----------] finished processing osu_allreduce_test_16 (OSU Allreduce test) - - [----------] started processing osu_allreduce_test_8 (OSU Allreduce test) + [ RUN ] osu_bandwidth_test on daint:gpu using gnu + [ RUN ] osu_latency_test on daint:gpu using gnu + [ RUN ] osu_allreduce_test_2 on daint:gpu using gnu [ RUN ] osu_allreduce_test_8 on daint:gpu using gnu - [ DEP ] osu_allreduce_test_8 on daint:gpu using gnu - [ RUN ] osu_allreduce_test_8 on daint:gpu using intel - [ DEP ] osu_allreduce_test_8 on daint:gpu using intel - [ RUN ] osu_allreduce_test_8 on daint:gpu using pgi - [ DEP ] osu_allreduce_test_8 on daint:gpu using pgi - [----------] finished processing osu_allreduce_test_8 (OSU Allreduce test) - - [----------] started processing osu_allreduce_test_4 (OSU Allreduce test) [ RUN ] osu_allreduce_test_4 on daint:gpu using gnu - [ DEP ] osu_allreduce_test_4 on daint:gpu using gnu + [ OK ] ( 3/22) build_osu_benchmarks~daint:gpu+intel on daint:gpu using intel [compile: 53.068s run: 0.650s total: 53.773s] + [ RUN ] osu_allreduce_test_2 on daint:gpu using intel + [ RUN ] osu_latency_test on daint:gpu using intel [ RUN ] osu_allreduce_test_4 on daint:gpu using intel - [ DEP ] osu_allreduce_test_4 on daint:gpu using intel + [ RUN ] osu_allreduce_test_16 on daint:gpu using intel + [ RUN ] osu_allreduce_test_8 on daint:gpu using intel + [ OK ] ( 4/22) build_osu_benchmarks~daint:gpu+pgi on daint:gpu using pgi [compile: 52.482s run: 0.803s total: 53.981s] [ RUN ] osu_allreduce_test_4 on daint:gpu using pgi - [ DEP ] osu_allreduce_test_4 on daint:gpu using pgi - [----------] finished processing osu_allreduce_test_4 (OSU Allreduce test) - - [----------] started processing osu_allreduce_test_2 (OSU Allreduce test) - [ RUN ] osu_allreduce_test_2 on daint:gpu using gnu - [ DEP ] osu_allreduce_test_2 on daint:gpu using gnu - [ RUN ] osu_allreduce_test_2 on daint:gpu using intel - [ DEP ] osu_allreduce_test_2 on daint:gpu using intel - [ RUN ] osu_allreduce_test_2 on daint:gpu using pgi - [ DEP ] osu_allreduce_test_2 on daint:gpu using pgi - [----------] finished processing osu_allreduce_test_2 (OSU Allreduce test) - - [----------] started processing osu_bandwidth_test (OSU bandwidth test) - [ RUN ] osu_bandwidth_test on daint:gpu using gnu - [ DEP ] osu_bandwidth_test on daint:gpu using gnu [ RUN ] osu_bandwidth_test on daint:gpu using intel - [ DEP ] osu_bandwidth_test on daint:gpu using intel - [ RUN ] osu_bandwidth_test on daint:gpu using pgi - [ DEP ] osu_bandwidth_test on daint:gpu using pgi - [----------] finished processing osu_bandwidth_test (OSU bandwidth test) - - [----------] started processing osu_latency_test (OSU latency test) - [ RUN ] osu_latency_test on daint:gpu using gnu - [ DEP ] osu_latency_test on daint:gpu using gnu - [ RUN ] osu_latency_test on daint:gpu using intel - [ DEP ] osu_latency_test on daint:gpu using intel + [ OK ] ( 5/22) osu_allreduce_test_16 on daint:gpu using gnu [compile: 0.015s run: 23.535s total: 23.922s] [ RUN ] osu_latency_test on daint:gpu using pgi - [ DEP ] osu_latency_test on daint:gpu using pgi - [----------] finished processing osu_latency_test (OSU latency test) - - [----------] waiting for spawned checks to finish - [ OK ] ( 1/22) fetch_osu_benchmarks~daint on daint:gpu using gnu [compile: 0.009s run: 2.761s total: 2.802s] - [ OK ] ( 2/22) build_osu_benchmarks~daint:gpu+gnu on daint:gpu using gnu [compile: 25.758s run: 0.056s total: 104.626s] - [ OK ] ( 3/22) build_osu_benchmarks~daint:gpu+pgi on daint:gpu using pgi [compile: 33.936s run: 70.452s total: 104.473s] - [ OK ] ( 4/22) build_osu_benchmarks~daint:gpu+intel on daint:gpu using intel [compile: 44.565s run: 65.010s total: 143.664s] - [ OK ] ( 5/22) osu_allreduce_test_4 on daint:gpu using gnu [compile: 0.011s run: 78.717s total: 101.428s] - [ OK ] ( 6/22) osu_allreduce_test_2 on daint:gpu using pgi [compile: 0.014s run: 88.060s total: 101.409s] - [ OK ] ( 7/22) osu_latency_test on daint:gpu using pgi [compile: 0.009s run: 101.325s total: 101.375s] - [ OK ] ( 8/22) osu_allreduce_test_8 on daint:gpu using pgi [compile: 0.013s run: 76.031s total: 102.005s] - [ OK ] ( 9/22) osu_allreduce_test_2 on daint:gpu using gnu [compile: 0.011s run: 85.525s total: 101.974s] - [ OK ] (10/22) osu_allreduce_test_4 on daint:gpu using pgi [compile: 0.011s run: 82.847s total: 102.407s] - [ OK ] (11/22) osu_allreduce_test_8 on daint:gpu using gnu [compile: 0.010s run: 77.818s total: 106.993s] - [ OK ] (12/22) osu_latency_test on daint:gpu using gnu [compile: 0.012s run: 103.641s total: 106.858s] - [ OK ] (13/22) osu_bandwidth_test on daint:gpu using pgi [compile: 0.011s run: 157.129s total: 164.087s] - [ OK ] (14/22) osu_bandwidth_test on daint:gpu using gnu [compile: 0.010s run: 154.343s total: 164.540s] - [ OK ] (15/22) osu_allreduce_test_8 on daint:gpu using intel [compile: 0.010s run: 194.643s total: 207.980s] - [ OK ] (16/22) osu_allreduce_test_2 on daint:gpu using intel [compile: 0.013s run: 201.145s total: 207.983s] - [ OK ] (17/22) osu_allreduce_test_4 on daint:gpu using intel [compile: 0.016s run: 198.143s total: 208.335s] - [ OK ] (18/22) osu_latency_test on daint:gpu using intel [compile: 0.010s run: 208.271s total: 208.312s] - [ OK ] (19/22) osu_allreduce_test_16 on daint:gpu using pgi [compile: 0.013s run: 215.854s total: 248.101s] - [ OK ] (20/22) osu_allreduce_test_16 on daint:gpu using gnu [compile: 0.010s run: 213.190s total: 248.731s] - [ OK ] (21/22) osu_allreduce_test_16 on daint:gpu using intel [compile: 0.010s run: 194.339s total: 210.962s] - [ OK ] (22/22) osu_bandwidth_test on daint:gpu using intel [compile: 0.022s run: 267.171s total: 270.475s] + [ RUN ] osu_bandwidth_test on daint:gpu using pgi + [ RUN ] osu_allreduce_test_2 on daint:gpu using pgi + [ RUN ] osu_allreduce_test_16 on daint:gpu using pgi + [ RUN ] osu_allreduce_test_8 on daint:gpu using pgi + [ OK ] ( 6/22) osu_latency_test on daint:gpu using gnu [compile: 0.010s run: 47.016s total: 54.703s] + [ OK ] ( 7/22) osu_allreduce_test_2 on daint:gpu using intel [compile: 0.009s run: 41.732s total: 42.313s] + [ OK ] ( 8/22) osu_allreduce_test_2 on daint:gpu using gnu [compile: 0.012s run: 54.571s total: 65.684s] + [ OK ] ( 9/22) osu_allreduce_test_8 on daint:gpu using gnu [compile: 0.011s run: 51.414s total: 65.712s] + [ OK ] (10/22) osu_allreduce_test_4 on daint:gpu using gnu [compile: 0.010s run: 48.378s total: 65.741s] + [ OK ] (11/22) osu_latency_test on daint:gpu using intel [compile: 0.008s run: 39.131s total: 42.877s] + [ OK ] (12/22) osu_allreduce_test_4 on daint:gpu using intel [compile: 0.009s run: 35.861s total: 42.898s] + [ OK ] (13/22) osu_allreduce_test_16 on daint:gpu using intel [compile: 0.008s run: 32.300s total: 42.901s] + [ OK ] (14/22) osu_allreduce_test_8 on daint:gpu using intel [compile: 0.009s run: 29.237s total: 42.914s] + [ OK ] (15/22) osu_allreduce_test_4 on daint:gpu using pgi [compile: 0.009s run: 26.134s total: 42.904s] + [ OK ] (16/22) osu_latency_test on daint:gpu using pgi [compile: 0.009s run: 23.085s total: 47.232s] + [ OK ] (17/22) osu_allreduce_test_2 on daint:gpu using pgi [compile: 0.008s run: 17.401s total: 41.728s] + [ OK ] (18/22) osu_allreduce_test_16 on daint:gpu using pgi [compile: 0.008s run: 15.895s total: 36.613s] + [ OK ] (19/22) osu_allreduce_test_8 on daint:gpu using pgi [compile: 0.009s run: 13.485s total: 34.296s] + [ OK ] (20/22) osu_bandwidth_test on daint:gpu using gnu [compile: 0.011s run: 80.564s total: 85.070s] + [ OK ] (21/22) osu_bandwidth_test on daint:gpu using intel [compile: 0.008s run: 76.772s total: 97.828s] + [ OK ] (22/22) osu_bandwidth_test on daint:gpu using pgi [compile: 0.009s run: 83.003s total: 110.656s] [----------] all spawned checks have finished [ PASSED ] Ran 22/22 test case(s) from 10 check(s) (0 failure(s), 0 skipped) diff --git a/docs/tutorial_tips_tricks.rst b/docs/tutorial_tips_tricks.rst index 304a87bf5e..34efaa0713 100644 --- a/docs/tutorial_tips_tricks.rst +++ b/docs/tutorial_tips_tricks.rst @@ -129,7 +129,6 @@ If we run the test, we can see that the correct standard output filename will be .. code-block:: none - [----------] waiting for spawned checks to finish rfm_HelloMultiLangTest_cpp_job.out [ OK ] (1/4) HelloMultiLangTest_cpp on catalina:default using gnu [compile: 0.677s run: 0.700s total: 1.394s] rfm_HelloMultiLangTest_c_job.out @@ -417,7 +416,6 @@ Let's run the whole test DAG: - [----------] waiting for spawned checks to finish [ OK ] ( 1/10) T0 on generic:default using builtin [compile: 0.014s run: 0.297s total: 0.337s] [ OK ] ( 2/10) T4 on generic:default using builtin [compile: 0.010s run: 0.171s total: 0.207s] [ OK ] ( 3/10) T5 on generic:default using builtin [compile: 0.010s run: 0.192s total: 0.225s] @@ -478,11 +476,8 @@ Notice how only the :class:`T6` test was rerun and none of its dependencies, sin [==========] Running 1 check(s) [==========] Started on Thu Jan 21 14:27:18 2021 - [----------] started processing T6 (T6) + [----------] start processing checks [ RUN ] T6 on generic:default using builtin - [----------] finished processing T6 (T6) - - [----------] waiting for spawned checks to finish [ OK ] (1/1) T6 on generic:default using builtin [compile: 0.012s run: 0.428s total: 0.464s] [----------] all spawned checks have finished @@ -498,7 +493,6 @@ If we tried to run :class:`T6` without restoring the session, we would have to r .. code-block:: none - [----------] waiting for spawned checks to finish [ OK ] (1/5) T0 on generic:default using builtin [compile: 0.012s run: 0.424s total: 0.464s] [ OK ] (2/5) T4 on generic:default using builtin [compile: 0.011s run: 0.348s total: 0.381s] [ OK ] (3/5) T5 on generic:default using builtin [compile: 0.007s run: 0.225s total: 0.248s] diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 9870af33d3..7967e7a854 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1685,6 +1685,23 @@ def run(self): if self.job.sched_flex_alloc_nodes: self.num_tasks = self.job.num_tasks + @final + def compile_complete(self): + '''Check if the build phase has completed. + + :returns: :class:`True` if the associated build job has finished, + :class:`False` otherwise. + + If no job descriptor is yet associated with this test, + :class:`True` is returned. + :raises reframe.core.exceptions.ReframeError: In case of errors. + + ''' + if not self._build_job: + return True + + return self._build_job.finished() + @final def run_complete(self): '''Check if the run phase has completed. diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 82866c1cea..e4b4ed8213 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -517,6 +517,20 @@ def main(): action='store_true', help='Use a compact test naming scheme' ) + argparser.add_argument( + dest='dump_pipeline_progress', + envvar='RFM_DUMP_PIPELINE_PROGRESS', + configvar='general/dump_pipeline_progress', + action='store_true', + help='Dump progress information for the async execution' + ) + argparser.add_argument( + dest='pipeline_timeout', + envvar='RFM_PIPELINE_TIMEOUT', + configvar='general/pipeline_timeout', + action='store', + help='Timeout for advancing the pipeline' + ) argparser.add_argument( dest='remote_detect', envvar='RFM_REMOTE_DETECT', diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 4baf6efd28..8077ffe33c 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -215,6 +215,26 @@ def failed(self): return (self._failed_stage is not None and not self._aborted and not self._skipped) + @property + def state(self): + if self.failed: + return 'fail' + + if self.skipped: + return 'skip' + + states = { + 'startup': 'startup', + 'setup': 'ready_compile', + 'compile': 'compiling', + 'compile_wait': 'ready_run', + 'run': 'running', + 'run_wait': 'completing', + 'finalize': 'retired', + 'cleanup': 'completed', + } + return states[self._current_stage] + @property def failed_stage(self): return self._failed_stage @@ -248,7 +268,9 @@ class update_timestamps: # we don't want to masquerade the self argument of our containing # function def __enter__(this): - if fn.__name__ != 'poll': + if fn.__name__ not in ('poll', + 'run_complete', + 'compile_complete'): stage = self._current_stage self._timestamps[f'{stage}_start'] = time.time() @@ -257,7 +279,7 @@ def __exit__(this, exc_type, exc_value, traceback): self._timestamps[f'{stage}_finish'] = time.time() self._timestamps['pipeline_end'] = time.time() - if fn.__name__ != 'poll': + if fn.__name__ not in ('poll', 'run_complete', 'compile_complete'): self._current_stage = fn.__name__ try: @@ -284,6 +306,7 @@ def setup(self, *args, **kwargs): def compile(self): self._safe_call(self.check.compile) + self._notify_listeners('on_task_compile') def compile_wait(self): self._safe_call(self.check.compile_wait) @@ -300,6 +323,13 @@ def run_complete(self): return done + def compile_complete(self): + done = self._safe_call(self.check.compile_complete) + if done: + self._notify_listeners('on_task_compile_exit') + + return done + def run_wait(self): self._safe_call(self.check.run_wait) self.zombie = False @@ -367,10 +397,19 @@ def on_task_setup(self, task): def on_task_run(self, task): '''Called whenever the run() method of a RegressionTask is called.''' + @abc.abstractmethod + def on_task_compile(self, task): + '''Called whenever the compile() method of a RegressionTask is + called.''' + @abc.abstractmethod def on_task_exit(self, task): '''Called whenever a RegressionTask finishes.''' + @abc.abstractmethod + def on_task_compile_exit(self, task): + '''Called whenever a RegressionTask compilation phase finishes.''' + @abc.abstractmethod def on_task_skip(self, task): '''Called whenever a RegressionTask is skipped.''' @@ -480,26 +519,16 @@ def print_separator(check, prefix): '%s %s (%s)' % (prefix, check.name, check.descr) ) + self._printer.separator('short single line', + 'start processing checks') self._policy.enter() self._printer.reset_progress(len(testcases)) - last_check = None for t in testcases: - if last_check is None or last_check.name != t.check.name: - if last_check is not None: - print_separator(last_check, 'finished processing') - self._printer.info('') - - print_separator(t.check, 'started processing') - last_check = t.check - self._policy.runcase(t) - # Close the last visual box - if last_check is not None: - print_separator(last_check, 'finished processing') - self._printer.info('') - self._policy.exit() + self._printer.separator('short single line', + 'all spawned checks have finished\n') class ExecutionPolicy(abc.ABC): diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index deb6858225..c647f7cc62 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -4,23 +4,29 @@ # SPDX-License-Identifier: BSD-3-Clause import contextlib -import functools -import itertools import math import sys import time +import reframe.core.runtime as rt +import reframe.utility as util from reframe.core.exceptions import (FailureLimitError, SkipTestError, TaskDependencyError, TaskExit) from reframe.core.logging import getlogger +from reframe.core.pipeline import (CompileOnlyRegressionTest, + RunOnlyRegressionTest) from reframe.frontend.executors import (ExecutionPolicy, RegressionTask, TaskEventListener, ABORT_REASONS) -def countall(d): - return functools.reduce(lambda l, r: l + len(r), d.values(), 0) +def _get_partition_name(task, phase='run'): + if (task.check.local or + (phase == 'build' and task.check.build_locally)): + return '_rfm_local' + else: + return task.check.current_partition.fullname def _cleanup_all(tasks, *args, **kwargs): @@ -90,8 +96,8 @@ def runcase(self, case): check, partition, environ = case self.printer.status( - 'RUN', '%s on %s using %s' % - (check.name, partition.fullname, environ.name) + 'RUN', + f'{check.name} on {partition.fullname} using {environ.name}' ) task = RegressionTask(case, self.task_listeners) self._task_index[case] = task @@ -160,9 +166,15 @@ def on_task_setup(self, task): def on_task_run(self, task): pass + def on_task_compile(self, task): + pass + def on_task_exit(self, task): pass + def on_task_compile_exit(self, task): + pass + def on_task_skip(self, task): msg = str(task.exc_info[1]) self.printer.status('SKIP', msg, just='right') @@ -220,8 +232,9 @@ def exit(self): class AsynchronousExecutionPolicy(ExecutionPolicy, TaskEventListener): - def __init__(self): + '''The asynchronous execution policy.''' + def __init__(self): super().__init__() self._pollctl = _PollController() @@ -229,329 +242,391 @@ def __init__(self): # Index tasks by test cases self._task_index = {} - # All currently running tasks per partition - self._running_tasks = {} + # A set of all the current tasks. We use an ordered set here, because + # we want to preserve the order of the tasks. + self._current_tasks = util.OrderedSet() + + # Quick look up for the partition schedulers including the + # `_rfm_local` pseudo-partition + self._schedulers = { + '_rfm_local': self.local_scheduler + } - # Tasks that need to be finalized - self._completed_tasks = [] + # Tasks per partition + self._partition_tasks = { + '_rfm_local': util.OrderedSet() + } # Retired tasks that need to be cleaned up self._retired_tasks = [] - # Ready tasks to be executed per partition - self._ready_tasks = {} - - # Tasks that are waiting for dependencies - self._waiting_tasks = [] - # Job limit per partition - self._max_jobs = {} + self._max_jobs = { + '_rfm_local': rt.runtime().get_option('systems/0/max_local_jobs') + } + self._pipeline_statistics = rt.runtime().get_option( + 'systems/0/dump_pipeline_progress' + ) + self.task_listeners.append(self) - # Keep a reference to all the partitions - self._partitions = set() + def _init_pipeline_progress(self, num_tasks): + self._pipeline_progress = { + 'startup': [(num_tasks, 0)], + 'ready_compile': [(0, 0)], + 'compiling': [(0, 0)], + 'ready_run': [(0, 0)], + 'running': [(0, 0)], + 'completing': [(0, 0)], + 'retired': [(0, 0)], + 'completed': [(0, 0)], + 'fail': [(0, 0)], + 'skip': [(0, 0)] + } + self._pipeline_step = 0 + self._t_pipeline_start = time.time() + + def _update_pipeline_progress(self, old_state, new_state, num_tasks=1): + timestamp = time.time() - self._t_pipeline_start + for state in self._pipeline_progress: + count = self._pipeline_progress[state][self._pipeline_step][0] + if old_state != new_state: + if state == old_state: + count -= num_tasks + elif state == new_state: + count += num_tasks + + self._pipeline_progress[state].append((count, timestamp)) + + self._pipeline_step += 1 + + def _dump_pipeline_progress(self, filename): + import reframe.utility.jsonext as jsonext + + with open(filename, 'w') as fp: + jsonext.dump(self._pipeline_progress, fp, indent=2) - self.task_listeners.append(self) + def runcase(self, case): + super().runcase(case) + check, partition, environ = case + self._schedulers[partition.fullname] = partition.scheduler - def _remove_from_running(self, task): + # Set partition-based counters, if not set already + self._partition_tasks.setdefault(partition.fullname, util.OrderedSet()) + self._max_jobs.setdefault(partition.fullname, partition.max_jobs) + + task = RegressionTask(case, self.task_listeners) + self._task_index[case] = task + self.stats.add_task(task) getlogger().debug2( - f'Removing task from the running list: {task.testcase}' + f'Added {check.name} on {partition.fullname} ' + f'using {environ.name}' ) - try: - partname = task.check.current_partition.fullname - self._running_tasks[partname].remove(task) - except (ValueError, AttributeError, KeyError): - getlogger().debug2('Task was not running') - pass - - # FIXME: The following functions are very similar and they are also reused - # in the serial policy; we should refactor them - def deps_failed(self, task): - # NOTE: Restored dependencies are not in the task_index - return any(self._task_index[c].failed - for c in task.testcase.deps if c in self._task_index) + self._current_tasks.add(task) - def deps_succeeded(self, task): - # NOTE: Restored dependencies are not in the task_index - return all(self._task_index[c].succeeded - for c in task.testcase.deps if c in self._task_index) + def exit(self): + if self._pipeline_statistics: + self._init_pipeline_progress(len(self._current_tasks)) - def deps_skipped(self, task): - # NOTE: Restored dependencies are not in the task_index - return any(self._task_index[c].skipped - for c in task.testcase.deps if c in self._task_index) + while self._current_tasks: + try: + self._poll_tasks() + num_running = sum( + 1 if t.state in ('running', 'compiling') else 0 + for t in self._current_tasks + ) - def on_task_setup(self, task): - partname = task.check.current_partition.fullname - self._ready_tasks[partname].append(task) + timeout = rt.runtime().get_option( + 'general/0/pipeline_timeout' + ) - def on_task_run(self, task): - partname = task.check.current_partition.fullname - self._running_tasks[partname].append(task) + # FIXME: Always convert due to #GH 2246 + if timeout is not None: + timeout = float(timeout) - def on_task_skip(self, task): - # Remove the task from the running list if it was skipped after the - # run phase - if task.check.current_partition: - partname = task.check.current_partition.fullname - if task.failed_stage in ('run_complete', 'run_wait'): - self._running_tasks[partname].remove(task) + self._advance_all(self._current_tasks, timeout) + if self._pipeline_statistics: + num_retired = len(self._retired_tasks) - msg = str(task.exc_info[1]) - self.printer.status('SKIP', msg, just='right') + _cleanup_all(self._retired_tasks, not self.keep_stage_files) + if self._pipeline_statistics: + num_retired_actual = num_retired - len(self._retired_tasks) + + # Some tests might not be cleaned up because they are + # waiting for dependencies or because their dependencies + # have failed. + self._update_pipeline_progress( + 'retired', 'completed', num_retired_actual + ) - def on_task_failure(self, task): - if task.aborted: - return + if num_running: + self._pollctl.running_tasks(num_running).snooze() + except ABORT_REASONS as e: + self._failall(e) + raise - self._num_failed_tasks += 1 - msg = f'{task.check.info()} [{task.pipeline_timings_basic()}]' - if task.failed_stage == 'cleanup': - self.printer.status('ERROR', msg, just='right') + if self._pipeline_statistics: + self._dump_pipeline_progress('pipeline-progress.json') + + def _poll_tasks(self): + for partname, sched in self._schedulers.items(): + jobs = [] + for t in self._partition_tasks[partname]: + if t.state == 'compiling': + jobs.append(t.check.build_job) + elif t.state == 'running': + jobs.append(t.check.job) + + sched.poll(*jobs) + + def _exec_stage(self, task, stage_methods): + '''Execute a series of pipeline stages. + + Return True on success, False otherwise. + ''' + + try: + for stage in stage_methods: + stage() + except TaskExit: + self._current_tasks.remove(task) + if task.check.current_partition: + partname = task.check.current_partition.fullname + else: + partname = None + + # Remove tasks from the partition tasks if there + with contextlib.suppress(KeyError): + self._partition_tasks['_rfm_local'].remove(task) + if partname: + self._partition_tasks[partname].remove(task) + + return False else: - self._remove_from_running(task) - self.printer.status('FAIL', msg, just='right') + return True - stagedir = task.check.stagedir - if not stagedir: - stagedir = '' + def _advance_all(self, tasks, timeout=None): + t_init = time.time() + num_progressed = 0 - getlogger().info(f'==> test failed during {task.failed_stage!r}: ' - f'test staged in {stagedir!r}') - getlogger().verbose(f'==> timings: {task.pipeline_timings_all()}') - if self._num_failed_tasks >= self.max_failures: - raise FailureLimitError( - f'maximum number of failures ({self.max_failures}) reached' - ) + getlogger().debug2(f'Current tests: {len(tasks)}') - def on_task_success(self, task): - msg = f'{task.check.info()} [{task.pipeline_timings_basic()}]' - self.printer.status('OK', msg, just='right') - getlogger().verbose(f'==> timings: {task.pipeline_timings_all()}') + # We take a snapshot of the tasks to advance by doing a shallow copy, + # since the tasks may removed by the individual advance functions. + for t in list(tasks): + old_state = t.state + bump_state = getattr(self, f'_advance_{t.state}') + num_progressed += bump_state(t) + new_state = t.state - # Update reference count of dependencies - for c in task.testcase.deps: - # NOTE: Restored dependencies are not in the task_index - if c in self._task_index: - self._task_index[c].ref_count -= 1 + t_elapsed = time.time() - t_init + if timeout and t_elapsed > timeout and num_progressed: + break - self._retired_tasks.append(task) + if self._pipeline_statistics: + self._update_pipeline_progress(old_state, new_state, 1) - def on_task_exit(self, task): - task.run_wait() - self._remove_from_running(task) - self._completed_tasks.append(task) + getlogger().debug2(f'Bumped {num_progressed} test(s)') - def _setup_task(self, task): + def _advance_startup(self, task): if self.deps_skipped(task): try: raise SkipTestError('skipped due to skipped dependencies') except SkipTestError as e: task.skip() - return False + self._current_tasks.remove(task) + return 1 elif self.deps_succeeded(task): try: + self.printer.status( + 'RUN', f'{task.check.name} on ' + f'{task.testcase.partition.fullname} using ' + f'{task.testcase.environ.name}' + ) task.setup(task.testcase.partition, task.testcase.environ, sched_flex_alloc_nodes=self.sched_flex_alloc_nodes, sched_options=self.sched_options) except TaskExit: - return False - else: - return True + self._current_tasks.remove(task) + return 1 + + if isinstance(task.check, RunOnlyRegressionTest): + # All tests should execute all the pipeline stages, even if + # they are no-ops + self._exec_stage(task, [task.compile, + task.compile_complete, + task.compile_wait]) + + return 1 elif self.deps_failed(task): exc = TaskDependencyError('dependencies failed') task.fail((type(exc), exc, None)) - return False + self._current_tasks.remove(task) + return 1 else: # Not all dependencies have finished yet - return False - - def runcase(self, case): - super().runcase(case) - check, partition, environ = case - self._partitions.add(partition) + getlogger().debug2(f'{task.check.info()} waiting for dependencies') + return 0 - # Set partition-based counters, if not set already - self._running_tasks.setdefault(partition.fullname, []) - self._ready_tasks.setdefault(partition.fullname, []) - self._max_jobs.setdefault(partition.fullname, partition.max_jobs) + def _advance_ready_compile(self, task): + partname = _get_partition_name(task, phase='build') + max_jobs = self._max_jobs[partname] + if len(self._partition_tasks[partname]) < max_jobs: + if self._exec_stage(task, [task.compile]): + self._partition_tasks[partname].add(task) - task = RegressionTask(case, self.task_listeners) - self._task_index[case] = task - self.stats.add_task(task) - self.printer.status( - 'RUN', '%s on %s using %s' % - (check.name, partition.fullname, environ.name) - ) - try: - partname = partition.fullname - if not self._setup_task(task): - if not task.skipped and not task.failed: - self.printer.status( - 'DEP', '%s on %s using %s' % - (check.name, partname, environ.name), - just='right' - ) - self._waiting_tasks.append(task) + return 1 - return + getlogger().debug2(f'Hit the max job limit of {partname}: {max_jobs}') + return 0 - if len(self._running_tasks[partname]) >= partition.max_jobs: - # Make sure that we still exceeded the job limit - getlogger().debug2( - f'Reached concurrency limit for partition {partname!r}: ' - f'{partition.max_jobs} job(s)' - ) - self._poll_tasks() - - if len(self._running_tasks[partname]) < partition.max_jobs: - # Task was put in _ready_tasks during setup - self._ready_tasks[partname].pop() - self._reschedule(task) + def _advance_compiling(self, task): + partname = _get_partition_name(task, phase='build') + try: + if task.compile_complete(): + task.compile_wait() + self._partition_tasks[partname].remove(task) + if isinstance(task.check, CompileOnlyRegressionTest): + # All tests should pass from all the pipeline stages, + # even if they are no-ops + self._exec_stage(task, [task.run, + task.run_complete, + task.run_wait]) + + return 1 else: - self.printer.status('HOLD', task.check.info(), just='right') + return 0 except TaskExit: - if not task.failed and not task.skipped: - with contextlib.suppress(TaskExit): - self._reschedule(task) + self._partition_tasks[partname].remove(task) + self._current_tasks.remove(task) + return 1 - return - except ABORT_REASONS as e: - # If abort was caused due to failure elsewhere, abort current - # task as well - task.abort(e) - self._failall(e) - raise + def _advance_ready_run(self, task): + partname = _get_partition_name(task, phase='run') + max_jobs = self._max_jobs[partname] + if len(self._partition_tasks[partname]) < max_jobs: + if self._exec_stage(task, [task.run]): + self._partition_tasks[partname].add(task) - def _poll_tasks(self): - '''Update the counts of running checks per partition.''' - - def split_jobs(tasks): - '''Split jobs into forced local and normal ones.''' - forced_local = [] - normal = [] - for t in tasks: - if t.check.local: - forced_local.append(t.check.job) - else: - normal.append(t.check.job) - - return forced_local, normal - - for part in self._partitions: - partname = part.fullname - num_tasks = len(self._running_tasks[partname]) - getlogger().debug2(f'Polling {num_tasks} task(s) in {partname!r}') - forced_local_jobs, part_jobs = split_jobs( - self._running_tasks[partname] - ) - part.scheduler.poll(*part_jobs) - self.local_scheduler.poll(*forced_local_jobs) + return 1 - # Trigger notifications for finished jobs - for t in self._running_tasks[partname]: - t.run_complete() + getlogger().debug2(f'Hit the max job limit of {partname}: {max_jobs}') + return 0 - def _setup_all(self): - still_waiting = [] - for task in self._waiting_tasks: - if (not self._setup_task(task) and - not task.failed and not task.skipped): - still_waiting.append(task) + def _advance_running(self, task): + partname = _get_partition_name(task, phase='run') + try: + if task.run_complete(): + if self._exec_stage(task, [task.run_wait]): + self._partition_tasks[partname].remove(task) - self._waiting_tasks[:] = still_waiting + return 1 + else: + return 0 - def _finalize_all(self): - getlogger().debug2(f'Finalizing {len(self._completed_tasks)} task(s)') - while True: - try: - task = self._completed_tasks.pop() - except IndexError: - break + except TaskExit: + self._partition_tasks[partname].remove(task) + self._current_tasks.remove(task) + return 1 - getlogger().debug2(f'Finalizing task {task.testcase}') - with contextlib.suppress(TaskExit): - self._finalize_task(task) + def _advance_completing(self, task): + try: + if not self.skip_sanity_check: + task.sanity() + + if not self.skip_performance_check: + task.performance() - def _finalize_task(self, task): - getlogger().debug2(f'Finalizing task {task.testcase}') - if not self.skip_sanity_check: - task.sanity() + task.finalize() + self._retired_tasks.append(task) + self._current_tasks.remove(task) + except TaskExit: + self._current_tasks.remove(task) + finally: + return 1 + + def deps_failed(self, task): + # NOTE: Restored dependencies are not in the task_index + return any(self._task_index[c].failed + for c in task.testcase.deps if c in self._task_index) - if not self.skip_performance_check: - task.performance() + def deps_succeeded(self, task): + # NOTE: Restored dependencies are not in the task_index + return all(self._task_index[c].succeeded + for c in task.testcase.deps if c in self._task_index) - task.finalize() + def deps_skipped(self, task): + # NOTE: Restored dependencies are not in the task_index + return any(self._task_index[c].skipped + for c in task.testcase.deps if c in self._task_index) def _failall(self, cause): '''Mark all tests as failures''' getlogger().debug2(f'Aborting all tasks due to {type(cause).__name__}') - for task in list(itertools.chain(*self._running_tasks.values())): - task.abort(cause) - - self._running_tasks = {} - for ready_list in self._ready_tasks.values(): - for task in ready_list: + for task in self._current_tasks: + with contextlib.suppress(FailureLimitError): task.abort(cause) - for task in itertools.chain(self._waiting_tasks, - self._completed_tasks): - task.abort(cause) - - def _reschedule(self, task): - getlogger().debug2(f'Scheduling test case {task.testcase} for running') - task.compile() - task.compile_wait() - task.run() - - def _reschedule_all(self): - for partname, tasks in self._running_tasks.items(): - num_tasks = len(tasks) - num_empty_slots = self._max_jobs[partname] - num_tasks - num_rescheduled = 0 - for _ in range(num_empty_slots): - try: - task = self._ready_tasks[partname].pop() - except IndexError: - break + # These function can be useful for tracking statistics of the framework, + # such as number of tests that have finished setup etc. + def on_task_setup(self, task): + pass - self._reschedule(task) - num_rescheduled += 1 + def on_task_run(self, task): + pass - if num_rescheduled: - getlogger().debug2( - f'Rescheduled {num_rescheduled} job(s) on {partname!r}' - ) + def on_task_compile(self, task): + pass - def exit(self): - self.printer.separator('short single line', - 'waiting for spawned checks to finish') - while (countall(self._running_tasks) or self._waiting_tasks or - self._completed_tasks or countall(self._ready_tasks)): - getlogger().debug2(f'Running tasks: ' - f'{countall(self._running_tasks)}') - try: - self._poll_tasks() + def on_task_exit(self, task): + pass - # We count running tasks just after polling in order to check - # more reliably that the state has changed, so that we - # decrease the sleep time. Otherwise if the number of tasks - # rescheduled was the as the number of tasks retired, the - # sleep time would be increased. - num_running = countall(self._running_tasks) - self._finalize_all() - self._setup_all() - self._reschedule_all() - _cleanup_all(self._retired_tasks, not self.keep_stage_files) - if num_running: - self._pollctl.running_tasks(num_running).snooze() + def on_task_compile_exit(self, task): + pass - except TaskExit: - with contextlib.suppress(TaskExit): - self._reschedule_all() - except ABORT_REASONS as e: - self._failall(e) - raise + def on_task_skip(self, task): + pass + + def on_task_failure(self, task): + self._num_failed_tasks += 1 + timings = task.pipeline_timings(['compile_complete', + 'run_complete', + 'total']) + msg = f'{task.check.info()} [{timings}]' + if task.failed_stage == 'cleanup': + self.printer.status('ERROR', msg, just='right') + else: + self.printer.status('FAIL', msg, just='right') - self.printer.separator('short single line', - 'all spawned checks have finished\n') + timings = task.pipeline_timings(['setup', + 'compile_complete', + 'run_complete', + 'sanity', + 'performance', + 'total']) + getlogger().info(f'==> test failed during {task.failed_stage!r}: ' + f'test staged in {task.check.stagedir!r}') + getlogger().verbose(f'==> {timings}') + if self._num_failed_tasks >= self.max_failures: + raise FailureLimitError( + f'maximum number of failures ({self.max_failures}) reached' + ) + + def on_task_success(self, task): + timings = task.pipeline_timings(['compile_complete', + 'run_complete', + 'total']) + msg = f'{task.check.info()} [{timings}]' + self.printer.status('OK', msg, just='right') + timings = task.pipeline_timings(['setup', + 'compile_complete', + 'run_complete', + 'sanity', + 'performance', + 'total']) + getlogger().verbose(f'==> {timings}') + + for c in task.testcase.deps: + # NOTE: Restored dependencies are not in the task_index + if c in self._task_index: + self._task_index[c].ref_count -= 1 diff --git a/reframe/schemas/config.json b/reframe/schemas/config.json index cc15cc62b7..045267f08b 100644 --- a/reframe/schemas/config.json +++ b/reframe/schemas/config.json @@ -226,6 +226,7 @@ "type": "array", "items": {"type": "string"} }, + "max_local_jobs": {"type": "number"}, "modules_system": { "type": "string", "enum": ["tmod", "tmod31", "tmod32", "tmod4", @@ -464,6 +465,8 @@ "items": {"type": "string"} }, "non_default_craype": {"type": "boolean"}, + "dump_pipeline_progress": {"type": "boolean"}, + "pipeline_timeout": {"type": ["number", "null"]}, "purge_environment": {"type": "boolean"}, "remote_detect": {"type": "boolean"}, "remote_workdir": {"type": "string"}, @@ -497,6 +500,8 @@ "environments/ldflags": [], "environments/extras": {}, "environments/target_systems": ["*"], + "general/dump_pipeline_progress": false, + "general/pipeline_timeout": null, "general/check_search_path": ["${RFM_INSTALL_PREFIX}/checks/"], "general/check_search_recursive": false, "general/clean_stagedir": true, @@ -545,6 +550,7 @@ "schedulers/target_systems": ["*"], "schedulers/use_nodes_option": false, "systems/descr": "", + "systems/max_local_jobs": 8, "systems/modules_system": "nomod", "systems/modules": [], "systems/variables": [], diff --git a/tools/plot_pipeline_progress.py b/tools/plot_pipeline_progress.py new file mode 100644 index 0000000000..b894bded95 --- /dev/null +++ b/tools/plot_pipeline_progress.py @@ -0,0 +1,42 @@ +import json +import matplotlib.pyplot as plt +import os +import sys + + +if __name__ == '__main__': + with open(sys.argv[1]) as fp: + raw_data = json.load(fp) + + for state, steps in raw_data.items(): + print(state, len(steps)) + + try: + mode = sys.argv[2] + if mode not in ('steps', 'time'): + print(f'unknown mode: {mode}') + sys.exit(1) + except IndexError: + mode = 'steps' + + if mode == 'steps': + x_label = '# Steps' + x_values = range(len(raw_data['startup'])) + else: + x_label = 'Time (s)' + x_values = [x[1] for x in raw_data['startup']] + + y_values = [] + for x in raw_data.values(): + step_values = [s[0] for s in x] + y_values.append(step_values) + + fig, ax = plt.subplots() + ax.stackplot(x_values, y_values, labels=raw_data.keys(), alpha=1) + ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) + ax.set_title('Pipeline progress') + ax.set_xlabel(x_label) + ax.set_ylabel('Number of tasks') + figname = os.path.splitext(sys.argv[1])[0] + '_' + mode + '.png' + plt.savefig(figname, bbox_inches='tight') + plt.show() diff --git a/unittests/test_policies.py b/unittests/test_policies.py index ac0dc23a43..d4a70fdd97 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -545,6 +545,12 @@ def on_task_exit(self, task): last = self.num_tasks[-1] self.num_tasks.append(last - 1) + def on_task_compile(self, task): + pass + + def on_task_compile_exit(self, task): + pass + def on_task_success(self, task): pass @@ -559,16 +565,24 @@ def on_task_setup(self, task): def max_jobs_opts(n): - return {'systems/partitions/max_jobs': n} + return {'systems/partitions/max_jobs': n, + 'systems/max_local_jobs': n} @pytest.fixture -def async_runner(): - evt_monitor = _TaskEventMonitor() - ret = executors.Runner(policies.AsynchronousExecutionPolicy()) - ret.policy.keep_stage_files = True - ret.policy.task_listeners.append(evt_monitor) - return ret, evt_monitor +def make_async_runner(): + # We need to have control in the unit tests where the policy is created, + # because in some cases we need it to be initialized after the execution + # context. For this reason, we use a constructor fixture here. + + def _make_runner(): + evt_monitor = _TaskEventMonitor() + ret = executors.Runner(policies.AsynchronousExecutionPolicy()) + ret.policy.keep_stage_files = True + ret.policy.task_listeners.append(evt_monitor) + return ret, evt_monitor + + return _make_runner def _read_timestamps(tasks): @@ -589,11 +603,10 @@ def _read_timestamps(tasks): return begin_stamps, end_stamps -def test_concurrency_unlimited(async_runner, make_cases, make_exec_ctx): +def test_concurrency_unlimited(make_async_runner, make_cases, make_exec_ctx): num_checks = 3 make_exec_ctx(options=max_jobs_opts(num_checks)) - - runner, monitor = async_runner + runner, monitor = make_async_runner() runner.runall(make_cases([SleepCheck(.5) for i in range(num_checks)])) # Ensure that all tests were run and without failures. @@ -615,12 +628,12 @@ def test_concurrency_unlimited(async_runner, make_cases, make_exec_ctx): pytest.skip('the system seems too much loaded.') -def test_concurrency_limited(async_runner, make_cases, make_exec_ctx): +def test_concurrency_limited(make_async_runner, make_cases, make_exec_ctx): # The number of checks must be <= 2*max_jobs. num_checks, max_jobs = 5, 3 make_exec_ctx(options=max_jobs_opts(max_jobs)) - runner, monitor = async_runner + runner, monitor = make_async_runner() runner.runall(make_cases([SleepCheck(.5) for i in range(num_checks)])) # Ensure that all tests were run and without failures. @@ -657,11 +670,11 @@ def test_concurrency_limited(async_runner, make_cases, make_exec_ctx): pytest.skip('the system seems too loaded.') -def test_concurrency_none(async_runner, make_cases, make_exec_ctx): +def test_concurrency_none(make_async_runner, make_cases, make_exec_ctx): num_checks = 3 make_exec_ctx(options=max_jobs_opts(1)) - runner, monitor = async_runner + runner, monitor = make_async_runner() runner.runall(make_cases([SleepCheck(.5) for i in range(num_checks)])) # Ensure that all tests were run and without failures. @@ -697,10 +710,10 @@ def assert_interrupted_run(runner): assert t.exc_info[0] == AbortTaskError -def test_kbd_interrupt_in_wait_with_concurrency(async_runner, make_cases, +def test_kbd_interrupt_in_wait_with_concurrency(make_async_runner, make_cases, make_exec_ctx): make_exec_ctx(options=max_jobs_opts(4)) - runner, _ = async_runner + runner, _ = make_async_runner() with pytest.raises(KeyboardInterrupt): runner.runall(make_cases([ KeyboardInterruptCheck(), SleepCheck(10), @@ -711,7 +724,7 @@ def test_kbd_interrupt_in_wait_with_concurrency(async_runner, make_cases, def test_kbd_interrupt_in_wait_with_limited_concurrency( - async_runner, make_cases, make_exec_ctx + make_async_runner, make_cases, make_exec_ctx ): # The general idea for this test is to allow enough time for all the # four checks to be submitted and at the same time we need the @@ -719,7 +732,7 @@ def test_kbd_interrupt_in_wait_with_limited_concurrency( # trigger the failure), so as to make the framework kill the remaining # three. make_exec_ctx(options=max_jobs_opts(2)) - runner, _ = async_runner + runner, _ = make_async_runner() with pytest.raises(KeyboardInterrupt): runner.runall(make_cases([ KeyboardInterruptCheck(), SleepCheck(10), @@ -729,10 +742,10 @@ def test_kbd_interrupt_in_wait_with_limited_concurrency( assert_interrupted_run(runner) -def test_kbd_interrupt_in_setup_with_concurrency(async_runner, make_cases, +def test_kbd_interrupt_in_setup_with_concurrency(make_async_runner, make_cases, make_exec_ctx): make_exec_ctx(options=max_jobs_opts(4)) - runner, _ = async_runner + runner, _ = make_async_runner() with pytest.raises(KeyboardInterrupt): runner.runall(make_cases([ SleepCheck(1), SleepCheck(1), SleepCheck(1), @@ -743,10 +756,10 @@ def test_kbd_interrupt_in_setup_with_concurrency(async_runner, make_cases, def test_kbd_interrupt_in_setup_with_limited_concurrency( - async_runner, make_cases, make_exec_ctx + make_async_runner, make_cases, make_exec_ctx ): make_exec_ctx(options=max_jobs_opts(2)) - runner, _ = async_runner + runner, _ = make_async_runner() with pytest.raises(KeyboardInterrupt): runner.runall(make_cases([ SleepCheck(1), SleepCheck(1), SleepCheck(1), @@ -756,10 +769,10 @@ def test_kbd_interrupt_in_setup_with_limited_concurrency( assert_interrupted_run(runner) -def test_run_complete_fails_main_loop(async_runner, make_cases, +def test_run_complete_fails_main_loop(make_async_runner, make_cases, make_exec_ctx): make_exec_ctx(options=max_jobs_opts(1)) - runner, _ = async_runner + runner, _ = make_async_runner() num_checks = 3 runner.runall(make_cases([SleepCheckPollFail(10), SleepCheck(0.1), SleepCheckPollFail(10)])) @@ -774,10 +787,10 @@ def test_run_complete_fails_main_loop(async_runner, make_cases, assert isinstance(t.check, SleepCheck) -def test_run_complete_fails_busy_loop(async_runner, make_cases, +def test_run_complete_fails_busy_loop(make_async_runner, make_cases, make_exec_ctx): make_exec_ctx(options=max_jobs_opts(1)) - runner, _ = async_runner + runner, _ = make_async_runner() num_checks = 3 runner.runall(make_cases([SleepCheckPollFailLate(1), SleepCheck(0.1), SleepCheckPollFailLate(0.5)])) @@ -792,10 +805,10 @@ def test_run_complete_fails_busy_loop(async_runner, make_cases, assert isinstance(t.check, SleepCheck) -def test_compile_fail_reschedule_main_loop(async_runner, make_cases, +def test_compile_fail_reschedule_main_loop(make_async_runner, make_cases, make_exec_ctx): make_exec_ctx(options=max_jobs_opts(1)) - runner, _ = async_runner + runner, _ = make_async_runner() num_checks = 2 runner.runall(make_cases([SleepCheckPollFail(.1), CompileFailureCheck()])) @@ -805,10 +818,10 @@ def test_compile_fail_reschedule_main_loop(async_runner, make_cases, assert num_checks == len(stats.failed()) -def test_compile_fail_reschedule_busy_loop(async_runner, make_cases, +def test_compile_fail_reschedule_busy_loop(make_async_runner, make_cases, make_exec_ctx): make_exec_ctx(options=max_jobs_opts(1)) - runner, _ = async_runner + runner, _ = make_async_runner() num_checks = 2 runner.runall( make_cases([SleepCheckPollFailLate(1.5), CompileFailureCheck()])