diff --git a/.circleci/config.yml b/.circleci/config.yml index 2298868cac25..eb498cbc59b3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -250,7 +250,8 @@ jobs: -DHPX_WITH_FETCH_JSON=On \ -DCMAKE_EXPORT_COMPILE_COMMANDS=On \ -DHPX_WITH_DOCUMENTATION=On \ - -DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" + -DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" \ + -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo - persist_to_workspace: root: /hpx paths: diff --git a/.jenkins/lsu/env-clang-13.sh b/.jenkins/lsu/env-clang-13.sh index d37c78c912a3..57700fbf99b7 100644 --- a/.jenkins/lsu/env-clang-13.sh +++ b/.jenkins/lsu/env-clang-13.sh @@ -25,6 +25,8 @@ configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo" + # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" diff --git a/.jenkins/lsu/env-gcc-11.sh b/.jenkins/lsu/env-gcc-11.sh index b9f7be125f7f..882cc60eb73d 100644 --- a/.jenkins/lsu/env-gcc-11.sh +++ b/.jenkins/lsu/env-gcc-11.sh @@ -15,6 +15,7 @@ module load pwrapi/1.1.1 export HPXRUN_RUNWRAPPER=srun export CXX_STD="20" +configure_extra_options+=" -DCMAKE_BUILD_TYPE=${build_type}" configure_extra_options+=" -DHPX_WITH_CXX_STANDARD=${CXX_STD}" configure_extra_options+=" -DHPX_WITH_MALLOC=system" configure_extra_options+=" -DHPX_WITH_FETCH_ASIO=ON" @@ -25,3 +26,5 @@ configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=STD_EXPERIMENTAL_SIMD" # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" + +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-lifo" diff --git a/CMakeLists.txt b/CMakeLists.txt index 589fbce7e12f..597f1adeda8e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1407,6 +1407,13 @@ hpx_option( ADVANCED ) +hpx_option( + HPX_WITH_TESTS_COMMAND_LINE STRING + "Add given command line options to all tests run" "" + CATEGORY "Debugging" + ADVANCED +) + hpx_option( HPX_WITH_TESTS_MAX_THREADS_PER_LOCALITY STRING diff --git a/cmake/HPX_AddTest.cmake b/cmake/HPX_AddTest.cmake index d416d99fe3ce..fbabe1aabd4e 100644 --- a/cmake/HPX_AddTest.cmake +++ b/cmake/HPX_AddTest.cmake @@ -68,6 +68,11 @@ function(add_hpx_test category name) ) endif() + # add additional command line arguments to all test executions + if(NOT x${HPX_WITH_TESTS_COMMAND_LINE} STREQUAL x"") + set(args ${args} "${HPX_WITH_TESTS_COMMAND_LINE}") + endif() + if(${HPX_WITH_PARALLEL_TESTS_BIND_NONE} AND NOT run_serial AND NOT "${name}_RUNWRAPPER" diff --git a/docs/sphinx/manual/hpx_runtime_and_resources.rst b/docs/sphinx/manual/hpx_runtime_and_resources.rst index 667c7ea7f535..195427972ed9 100644 --- a/docs/sphinx/manual/hpx_runtime_and_resources.rst +++ b/docs/sphinx/manual/hpx_runtime_and_resources.rst @@ -16,13 +16,13 @@ |hpx| thread scheduling policies ================================ -The |hpx| runtime has five thread scheduling policies: local-priority, -static-priority, local, static and abp-priority. These policies can be specified -from the command line using the command line option :option:`--hpx:queuing`. In -order to use a particular scheduling policy, the runtime system must be built -with the appropriate scheduler flag turned on (e.g. ``cmake --DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for more -information). +The |hpx| runtime has six thread scheduling policies: local-priority, +static-priority, local, static, local-workrequesting-fifo, and abp-priority. +These policies can be specified from the command line using the command line +option :option:`--hpx:queuing`. In order to use a particular scheduling policy, +the runtime system must be built with the appropriate scheduler flag turned on +(e.g. ``cmake -DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for +more information). Priority local scheduling policy (default policy) ------------------------------------------------- @@ -51,9 +51,7 @@ policy and must be invoked using the command line option Static priority scheduling policy --------------------------------- -* invoke using: :option:`--hpx:queuing`\ ``=static-priority`` (or ``-qs``) -* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or - ``HPX_THREAD_SCHEDULERS=static-priority`` +* invoke using: :option:`--hpx:queuing`\ ``static-priority`` (or ``-qs``) The static scheduling policy maintains one queue per OS thread from which each OS thread pulls its tasks (user threads). Threads are distributed in a round @@ -102,7 +100,7 @@ domain first, only after that work is stolen from other NUMA domains. This scheduler can be used with two underlying queuing policies (FIFO: first-in-first-out, and LIFO: last-in-first-out). In order to use the LIFO policy use the command line option -:option:`--hpx:queuing`\ ``=abp-priority-lifo``. +:option:`--hpx:queuing`\ ``abp-priority-lifo``. .. Questions, concerns and notes: @@ -151,6 +149,23 @@ policy use the command line option I see both FIFO and double ended queues in ABP policies? +Work requesting scheduling policies +----------------------------------- + +* invoke using: :option:`--hpx:queuing`\ ``local-workrequesting-fifo`` + or using :option:`--hpx:queuing`\ ``local-workrequesting-lifo`` + +The work-requesting policies rely on a different mechanism of balancing work +between cores (compared to the other policies listed above). Instead of actively +trying to steal work from other cores, requesting work relies on a less +disruptive mechanism. If a core runs out of work, instead of actively looking at +the queues of neighboring cores, in this case a request is posted to another +core. This core now (whenever it is not busy with other work) either responds to +the original core by sending back work or passes the request on to the next +possible core in the system. In general, this scheme avoids contention on the +work queues as those are always accessed by their own cores only. + + The |hpx| resource partitioner ============================== diff --git a/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst b/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst index bc08a401b736..2b92572f4e10 100644 --- a/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst +++ b/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst @@ -1571,7 +1571,9 @@ The predefined command line options for any application using The queue scheduling policy to use. Options are ``local``, ``local-priority-fifo``, ``local-priority-lifo``, ``static``, - ``static-priority``, ``abp-priority-fifo`` and ``abp-priority-lifo`` + ``static-priority``, ``abp-priority-fifo``, + ``local-workrequesting-fifo``, ``local-workrequesting-lifo`` + and ``abp-priority-lifo`` (default: ``local-priority-fifo``). .. option:: --hpx:high-priority-threads arg diff --git a/examples/1d_stencil/1d_stencil_4.cpp b/examples/1d_stencil/1d_stencil_4.cpp index de59844927af..c39fe45e4a48 100644 --- a/examples/1d_stencil/1d_stencil_4.cpp +++ b/examples/1d_stencil/1d_stencil_4.cpp @@ -163,7 +163,7 @@ struct stepper }); // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); auto Op = unwrapping(&stepper::heat_part); @@ -184,15 +184,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'nt'. diff --git a/examples/1d_stencil/1d_stencil_4_throttle.cpp b/examples/1d_stencil/1d_stencil_4_throttle.cpp index 87e8222c2712..5358c3eff3f1 100644 --- a/examples/1d_stencil/1d_stencil_4_throttle.cpp +++ b/examples/1d_stencil/1d_stencil_4_throttle.cpp @@ -240,7 +240,7 @@ struct stepper }); // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); auto Op = unwrapping(&stepper::heat_part); @@ -261,15 +261,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'nt'. diff --git a/examples/1d_stencil/1d_stencil_7.cpp b/examples/1d_stencil/1d_stencil_7.cpp index 99258d307a08..71971fb927ef 100644 --- a/examples/1d_stencil/1d_stencil_7.cpp +++ b/examples/1d_stencil/1d_stencil_7.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -359,7 +360,7 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt) // limit depth of dependency tree std::size_t nd = 3; - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); heat_part_action act; for (std::size_t t = 0; t != nt; ++t) @@ -377,15 +378,15 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt) if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'nt'. diff --git a/examples/1d_stencil/1d_stencil_8.cpp b/examples/1d_stencil/1d_stencil_8.cpp index 644452b5f820..d01c227c3724 100644 --- a/examples/1d_stencil/1d_stencil_8.cpp +++ b/examples/1d_stencil/1d_stencil_8.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -575,7 +576,7 @@ stepper_server::space stepper_server::do_work( } // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); for (std::size_t t = 0; t != nt; ++t) { @@ -626,15 +627,15 @@ stepper_server::space stepper_server::do_work( // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } return U_[nt % 2]; diff --git a/examples/quickstart/fibonacci_local.cpp b/examples/quickstart/fibonacci_local.cpp index 910a6f5285ab..2b1da660eed5 100644 --- a/examples/quickstart/fibonacci_local.cpp +++ b/examples/quickstart/fibonacci_local.cpp @@ -18,6 +18,9 @@ #include #include +#include +#include +#include /////////////////////////////////////////////////////////////////////////////// //[fibonacci @@ -26,15 +29,10 @@ std::uint64_t fibonacci(std::uint64_t n) if (n < 2) return n; - // Invoking the Fibonacci algorithm twice is inefficient. - // However, we intentionally demonstrate it this way to create some - // heavy workload. - hpx::future n1 = hpx::async(fibonacci, n - 1); - hpx::future n2 = hpx::async(fibonacci, n - 2); + std::uint64_t n2 = fibonacci(n - 2); - return n1.get() + - n2.get(); // wait for the Futures to return their values + return n1.get() + n2; // wait for the Future to return their values } //fibonacci] @@ -42,6 +40,9 @@ std::uint64_t fibonacci(std::uint64_t n) //[hpx_main int hpx_main(hpx::program_options::variables_map& vm) { + hpx::threads::add_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + // extract command line argument, i.e. fib(N) std::uint64_t n = vm["n-value"].as(); @@ -67,9 +68,13 @@ int main(int argc, char* argv[]) hpx::program_options::options_description desc_commandline( "Usage: " HPX_APPLICATION_STRING " [options]"); - desc_commandline.add_options()("n-value", - hpx::program_options::value()->default_value(10), - "n value for the Fibonacci function"); + // clang-format off + desc_commandline.add_options() + ("n-value", + hpx::program_options::value()->default_value(10), + "n value for the Fibonacci function") + ; + // clang-format on // Initialize and run HPX hpx::local::init_params init_args; diff --git a/libs/core/command_line_handling_local/src/parse_command_line_local.cpp b/libs/core/command_line_handling_local/src/parse_command_line_local.cpp index 646d805b19b3..6352b81c22e0 100644 --- a/libs/core/command_line_handling_local/src/parse_command_line_local.cpp +++ b/libs/core/command_line_handling_local/src/parse_command_line_local.cpp @@ -478,18 +478,24 @@ namespace hpx::local::detail { "run on (default: 0), valid for " "--hpx:queuing=local, --hpx:queuing=abp-priority, " "--hpx:queuing=static, --hpx:queuing=static-priority, " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " "and --hpx:queuing=local-priority only") ("hpx:pu-step", value(), "the step between used processing unit numbers for this " "instance of HPX (default: 1), valid for " "--hpx:queuing=local, --hpx:queuing=abp-priority, " "--hpx:queuing=static, --hpx:queuing=static-priority " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " "and --hpx:queuing=local-priority only") ("hpx:affinity", value(), "the affinity domain the OS threads will be confined to, " "possible values: pu, core, numa, machine (default: pu), valid for " "--hpx:queuing=local, --hpx:queuing=abp-priority, " "--hpx:queuing=static, --hpx:queuing=static-priority " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " " and --hpx:queuing=local-priority only") ("hpx:bind", value >()->composing(), "the detailed affinity description for the OS threads, see " @@ -514,13 +520,16 @@ namespace hpx::local::detail { ("hpx:queuing", value(), "the queue scheduling policy to use, options are " "'local', 'local-priority-fifo','local-priority-lifo', " - "'abp-priority-fifo', 'abp-priority-lifo', 'static', and " - "'static-priority' (default: 'local-priority'; " + "'abp-priority-fifo', 'abp-priority-lifo', 'static', " + "'static-priority', 'local-workrequesting-fifo', and " + "'local-workrequesting-lifo' (default: 'local-priority'; " "all option values can be abbreviated)") ("hpx:high-priority-threads", value(), "the number of operating system threads maintaining a high " "priority queue (default: number of OS threads), valid for " "--hpx:queuing=local-priority,--hpx:queuing=static-priority, " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " " and --hpx:queuing=abp-priority only)") ("hpx:numa-sensitive", value()->implicit_value(0), "makes the local-priority scheduler NUMA sensitive (" diff --git a/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp b/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp index 6e37da5b482b..de5de3981c9b 100644 --- a/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp +++ b/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp @@ -150,7 +150,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, - std::uint64_t nd, hpx::sliding_semaphore& sem) + std::uint64_t nd, std::shared_ptr sem) { using hpx::dataflow; using hpx::unwrapping; @@ -186,15 +186,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -227,7 +227,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, sem); diff --git a/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp b/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp index c67de2cf4d64..cfeff02ab118 100644 --- a/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp +++ b/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp @@ -236,7 +236,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay_validate; @@ -275,15 +275,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -328,7 +328,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp b/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp index 48438b63079b..a42824677cea 100644 --- a/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp +++ b/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp @@ -193,7 +193,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay; @@ -231,15 +231,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -275,7 +275,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp b/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp index b489745bc4d5..93cd2e11d8a6 100644 --- a/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp +++ b/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp @@ -237,7 +237,8 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::uint64_t nd, - std::uint64_t n_value, double error, hpx::sliding_semaphore& sem) + std::uint64_t n_value, double error, + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay; @@ -273,15 +274,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -314,7 +315,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work( subdomains, subdomain_width, iterations, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp b/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp index a8746218ed18..b09400bb34e9 100644 --- a/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp +++ b/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp @@ -229,7 +229,8 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::uint64_t nd, - std::uint64_t n_value, double error, hpx::sliding_semaphore& sem) + std::uint64_t n_value, double error, + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay_validate; @@ -268,15 +269,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -318,7 +319,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work( subdomains, subdomain_width, iterations, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp b/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp index 8c59a48645f2..d049c042f88b 100644 --- a/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp +++ b/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp @@ -163,7 +163,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::uint64_t nd, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::dataflow; using hpx::unwrapping; @@ -199,15 +199,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -237,7 +237,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, nd, sem); diff --git a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp index a3633a478c69..33d71caf3757 100644 --- a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp +++ b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp @@ -194,7 +194,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replicate; @@ -230,15 +230,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -274,7 +274,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp index 76e17b837f59..8b33c2b44edc 100644 --- a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp +++ b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp @@ -235,7 +235,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replicate_validate; @@ -271,15 +271,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -324,7 +324,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp b/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp index 5a3df6be1c4a..379879be4b01 100644 --- a/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp +++ b/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp @@ -105,6 +105,8 @@ namespace hpx::resource { abp_priority_fifo = 5, abp_priority_lifo = 6, shared_priority = 7, + local_workrequesting_fifo = 8, + local_workrequesting_lifo = 9, }; #define HPX_SCHEDULING_POLICY_UNSCOPED_ENUM_DEPRECATION_MSG \ diff --git a/libs/core/resource_partitioner/src/detail_partitioner.cpp b/libs/core/resource_partitioner/src/detail_partitioner.cpp index 68fb708814e7..8dad2d6379ce 100644 --- a/libs/core/resource_partitioner/src/detail_partitioner.cpp +++ b/libs/core/resource_partitioner/src/detail_partitioner.cpp @@ -142,6 +142,12 @@ namespace hpx::resource::detail { case resource::scheduling_policy::local_priority_lifo: sched = "local_priority_lifo"; break; + case resource::scheduling_policy::local_workrequesting_fifo: + sched = "local_workrequesting_fifo"; + break; + case resource::scheduling_policy::local_workrequesting_lifo: + sched = "local_workrequesting_lifo"; + break; case resource::scheduling_policy::static_: sched = "static"; break; @@ -475,6 +481,18 @@ namespace hpx::resource::detail { { default_scheduler = scheduling_policy::local_priority_lifo; } + else if (0 == + std::string("local-workrequesting-fifo") + .find(default_scheduler_str)) + { + default_scheduler = scheduling_policy::local_workrequesting_fifo; + } + else if (0 == + std::string("local-workrequesting-lifo") + .find(default_scheduler_str)) + { + default_scheduler = scheduling_policy::local_workrequesting_lifo; + } else if (0 == std::string("static").find(default_scheduler_str)) { default_scheduler = scheduling_policy::static_; diff --git a/libs/core/schedulers/CMakeLists.txt b/libs/core/schedulers/CMakeLists.txt index 70b038c539a3..0476a76a8893 100644 --- a/libs/core/schedulers/CMakeLists.txt +++ b/libs/core/schedulers/CMakeLists.txt @@ -11,6 +11,7 @@ set(schedulers_headers hpx/schedulers/deadlock_detection.hpp hpx/schedulers/local_priority_queue_scheduler.hpp hpx/schedulers/local_queue_scheduler.hpp + hpx/schedulers/local_workrequesting_scheduler.hpp hpx/schedulers/lockfree_queue_backends.hpp hpx/schedulers/maintain_queue_wait_times.hpp hpx/schedulers/queue_helpers.hpp diff --git a/libs/core/schedulers/include/hpx/modules/schedulers.hpp b/libs/core/schedulers/include/hpx/modules/schedulers.hpp index d77ba076c7e9..2de14cc98363 100644 --- a/libs/core/schedulers/include/hpx/modules/schedulers.hpp +++ b/libs/core/schedulers/include/hpx/modules/schedulers.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp new file mode 100644 index 000000000000..bd9909fb9472 --- /dev/null +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -0,0 +1,1843 @@ +// Copyright (c) 2007-2023 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/////////////////////////////////////////////////////////////////////////////// +// The scheduler implemented here is adapted from the ideas described in chapter +// 3 of Andreas Prell's PhD thesis 'Embracing Explicit Communication in +// Work-Stealing Runtime Systems' (see: https://epub.uni-bayreuth.de/2990/). +// While it's being described as a work-stealing scheduler, it relies on a +// different working principle if compared to the classic work-stealing. Instead +// of actively pulling work from the work queues of neighboring cores it relies +// on a push model. Cores that run out of work post steal requests that are +// handled by cores that have work available by actively sending tasks to the +// requesting core. +// +// When a worker runs out of tasks, it becomes a thief by sending steal requests +// to selected victim workers, those either reply with tasks or signal that they +// have no tasks left. A steal request is a message containing the thief's ID, a +// reference to a channel for sending tasks from victim to thief, and other +// information needed for thread coordination. +// +// When the runtime system starts up, every worker allocates two channels: a +// channel for receiving steal requests and a channel for receiving tasks. A +// reference to the latter is stored in steal requests, and workers use this +// reference to send tasks. By "owning" two channels, workers are able to +// receive steal requests and tasks independently of other workers, which in +// turn enables efficient channel implementations based on single-consumer +// queues. The total number of channels grows linearly with the number of +// workers: n workers allocate 2n channels to communicate with each other. +// +// Matching traditional work stealing, we allow one outstanding steal request +// per worker. This decision has two important consequences: (1) The number of +// steal requests is bounded by n, the number of workers. (2) A thief will never +// receive tasks from more than one victim at a time. It follows from (1) that a +// channel capacity of n - 1 is sufficient to deal with other workers' steal +// requests since no more than n - 1 thieves may request tasks from a single +// victim. We actually increase the capacity to n so that steal requests can be +// returned to their senders, for instance, in case of repeated failure. (2) +// implies that, at any given time, a task channel has at most one sender and +// one receiver, meeting the requirements for an SPSC implementation. +// +// In summary, every worker allocates two specialized channels: an MPSC channel +// where it receives steal requests and an SPSC channel where it receives tasks. +// Steal requests are forwarded rather than acknowledged, letting workers steal +// on behalf of others upon receiving steal requests that cannot be handled. +// Random victim selection fits in well with forwarding steal requests, but may +// cause a lot of communication if only few workers have tasks left. Stealing +// half of a victim's tasks (steal-half) is straightforward to implement with +// private task queues, especially when shared memory is available, in which +// case tasks do not need to be copied. While steal-half is important to tackle +// fine-grained parallelism, polling is necessary to achieve short message +// handling delays when workers schedule long-running tasks. + +namespace hpx::threads::policies { + + /////////////////////////////////////////////////////////////////////////// +#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT) + using default_local_workrequesting_scheduler_terminated_queue = + lockfree_lifo; +#else + using default_local_workrequesting_scheduler_terminated_queue = + lockfree_fifo; +#endif + + /////////////////////////////////////////////////////////////////////////// + // The local_workrequesting_scheduler maintains exactly one queue of work + // items (threads) per OS thread, where this OS thread pulls its next work + // from. + template + class local_workrequesting_scheduler : public scheduler_base + { + public: + using has_periodic_maintenance = std::false_type; + + using thread_queue_type = thread_queue; + + struct init_parameter + { + init_parameter(std::size_t num_queues, + detail::affinity_data const& affinity_data, + std::size_t num_high_priority_queues = static_cast( + -1), + thread_queue_init_parameters const& thread_queue_init = + thread_queue_init_parameters{}, + char const* description = "local_workrequesting_scheduler") + : num_queues_(num_queues) + , num_high_priority_queues_( + num_high_priority_queues == static_cast(-1) ? + num_queues : + num_high_priority_queues) + , thread_queue_init_(thread_queue_init) + , affinity_data_(affinity_data) + , description_(description) + { + } + + init_parameter(std::size_t num_queues, + detail::affinity_data const& affinity_data, + char const* description) + : num_queues_(num_queues) + , num_high_priority_queues_(num_queues) + , affinity_data_(affinity_data) + , description_(description) + { + } + + std::size_t num_queues_; + std::size_t num_high_priority_queues_; + thread_queue_init_parameters thread_queue_init_; + detail::affinity_data const& affinity_data_; + char const* description_; + }; + using init_parameter_type = init_parameter; + + private: + //////////////////////////////////////////////////////////////////////// + struct task_data + { + explicit HPX_HOST_DEVICE_CONSTEXPR task_data( + std::uint16_t num_thread = static_cast( + -1)) noexcept + : num_thread_(num_thread) + { + } + + // core number this task data originated from + std::uint16_t num_thread_; + hpx::detail::small_vector tasks_; + }; + + //////////////////////////////////////////////////////////////////////// + using task_channel = lcos::local::channel_spsc; + + //////////////////////////////////////////////////////////////////////// + struct steal_request + { + enum class state : std::uint16_t + { + working = 0, + idle = 2, + failed = 4 + }; + + steal_request() = default; + + steal_request(std::size_t const num_thread, task_channel* channel, + mask_cref_type victims, bool idle, bool const stealhalf) + : channel_(channel) + , victims_(victims) + , num_thread_(static_cast(num_thread)) + , attempt_(static_cast(count(victims) - 1)) + , state_(idle ? state::idle : state::working) + , stealhalf_(stealhalf) + { + } + + task_channel* channel_ = nullptr; + mask_type victims_; + std::uint16_t num_thread_ = static_cast(-1); + std::uint16_t attempt_ = 0; + state state_ = state::failed; + // true ? attempt steal-half : attempt steal-one + bool stealhalf_ = false; + }; + + //////////////////////////////////////////////////////////////////////// + using steal_request_channel = + lcos::local::base_channel_mpsc; + + //////////////////////////////////////////////////////////////////////// + struct scheduler_data + { + scheduler_data() = default; + + scheduler_data(scheduler_data const&) = delete; + scheduler_data(scheduler_data&&) = delete; + scheduler_data& operator=(scheduler_data const&) = delete; + scheduler_data& operator=(scheduler_data&&) = delete; + + ~scheduler_data() + { + delete queue_; + delete high_priority_queue_; + delete bound_queue_; + delete requests_; + delete tasks_; + } + + // interval at which we re-decide on whether we should steal just + // one task or half of what's available + static constexpr std::uint16_t num_steal_adaptive_interval_ = 25; + + void init(std::size_t num_thread, std::size_t size, + thread_queue_init_parameters const& queue_init, + bool need_high_priority_queue) + { + if (queue_ == nullptr) + { + num_thread_ = static_cast(num_thread); + + // initialize queues + queue_ = new thread_queue_type(queue_init); + if (need_high_priority_queue) + { + high_priority_queue_ = + new thread_queue_type(queue_init); + } + bound_queue_ = new thread_queue_type(queue_init); + + // initialize channels needed for work stealing + requests_ = new steal_request_channel(size); + tasks_ = new task_channel(1); + } + } + + // initial affinity mask for this core + mask_type victims_; + + // queues for threads scheduled on this core + thread_queue_type* queue_ = nullptr; + thread_queue_type* high_priority_queue_ = nullptr; + thread_queue_type* bound_queue_ = nullptr; + + // channel for posting steal requests to this core (use + // hpx::spinlock) + steal_request_channel* requests_ = nullptr; + + // one channel per steal request per core + task_channel* tasks_ = nullptr; + + // the number of outstanding steal requests + std::uint16_t requested_ = 0; + + // core number this scheduler data instance refers to + std::uint16_t num_thread_ = static_cast(-1); + + // adaptive stealing + std::uint16_t num_recent_steals_ = 0; + std::uint16_t num_recent_tasks_executed_ = 0; + bool stealhalf_ = false; + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + // core number the last stolen tasks originated from + std::uint16_t last_victim_ = static_cast(-1); +#endif +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + // collect some statistics + std::uint32_t steal_requests_sent_ = 0; + std::uint32_t steal_requests_received_ = 0; + std::uint32_t steal_requests_discarded_ = 0; +#endif + }; + + public: + static unsigned int random_seed() noexcept + { + static std::random_device rd; + return rd(); + } + + explicit local_workrequesting_scheduler(init_parameter_type const& init, + bool deferred_initialization = true) + : scheduler_base(init.num_queues_, init.description_, + init.thread_queue_init_, + policies::scheduler_mode::fast_idle_mode) + , data_(init.num_queues_) + , low_priority_queue_(thread_queue_init_) + , curr_queue_(0) + , gen_(random_seed()) + , affinity_data_(init.affinity_data_) + , num_queues_(init.num_queues_) + , num_high_priority_queues_(init.num_high_priority_queues_) + { + HPX_ASSERT(init.num_queues_ != 0); + HPX_ASSERT(num_high_priority_queues_ != 0); + HPX_ASSERT(num_high_priority_queues_ <= num_queues_); + + if (!deferred_initialization) + { + for (std::size_t i = 0; i != init.num_queues_; ++i) + { + data_[i].data_.init(i, init.num_queues_, + this->thread_queue_init_, + i < num_high_priority_queues_); + } + } + } + + local_workrequesting_scheduler( + local_workrequesting_scheduler const&) = delete; + local_workrequesting_scheduler( + local_workrequesting_scheduler&&) = delete; + local_workrequesting_scheduler& operator=( + local_workrequesting_scheduler const&) = delete; + local_workrequesting_scheduler& operator=( + local_workrequesting_scheduler&&) = delete; + + ~local_workrequesting_scheduler() override = default; + + static std::string_view get_scheduler_name() + { + return "local_workrequesting_scheduler"; + } + +#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES + std::uint64_t get_creation_time(bool reset) override + { + std::uint64_t time = 0; + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + time += d.high_priority_queue_->get_creation_time(reset); + } + time += d.queue_->get_creation_time(reset); + time += d.bound_queue_->get_creation_time(reset); + } + + return time + low_priority_queue_.get_creation_time(reset); + } + + std::uint64_t get_cleanup_time(bool reset) override + { + std::uint64_t time = 0; + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + time += d.high_priority_queue_->get_cleanup_time(reset); + } + time += d.queue_->get_cleanup_time(reset); + time += d.bound_queue_->get_cleanup_time(reset); + } + + return time + low_priority_queue_.get_cleanup_time(reset); + } +#endif + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + std::int64_t get_num_pending_misses( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_num_pending_misses( + reset); + } + count += d.queue_->get_num_pending_misses(reset); + count += d.bound_queue_->get_num_pending_misses(reset); + } + + return count + + low_priority_queue_.get_num_pending_misses(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_num_pending_misses(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_pending_misses(reset); + } + count += d.queue_->get_num_pending_misses(reset); + return count + d.bound_queue_->get_num_pending_misses(reset); + } + + std::int64_t get_num_pending_accesses( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_pending_accesses( + reset); + } + count += d.queue_->get_num_pending_accesses(reset); + count += d.bound_queue_->get_num_pending_accesses(reset); + } + + return count + + low_priority_queue_.get_num_pending_accesses(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_pending_accesses(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_pending_accesses(reset); + } + count += d.queue_->get_num_pending_accesses(reset); + return count + d.bound_queue_->get_num_pending_accesses(reset); + } + + std::int64_t get_num_stolen_from_pending( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_pending( + reset); + } + count += d.queue_->get_num_stolen_from_pending(reset); + count += d.bound_queue_->get_num_stolen_from_pending(reset); + } + + return count + + low_priority_queue_.get_num_stolen_from_pending(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_pending(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_from_pending(reset); + } + count += d.queue_->get_num_stolen_from_pending(reset); + return count + d.bound_queue_->get_num_stolen_from_pending(reset); + } + + std::int64_t get_num_stolen_to_pending( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_pending( + reset); + } + count += d.queue_->get_num_stolen_to_pending(reset); + count += d.bound_queue_->get_num_stolen_to_pending(reset); + } + + return count + + low_priority_queue_.get_num_stolen_to_pending(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_pending(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_to_pending(reset); + } + count += d.queue_->get_num_stolen_to_pending(reset); + return count + d.bound_queue_->get_num_stolen_to_pending(reset); + } + + std::int64_t get_num_stolen_from_staged( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_staged( + reset); + } + count += d.queue_->get_num_stolen_from_staged(reset); + count += d.bound_queue_->get_num_stolen_from_staged(reset); + } + + return count + + low_priority_queue_.get_num_stolen_from_staged(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_staged(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_from_staged(reset); + } + count += d.queue_->get_num_stolen_from_staged(reset); + return count + d.bound_queue_->get_num_stolen_from_staged(reset); + } + + std::int64_t get_num_stolen_to_staged( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_staged( + reset); + } + count += d.queue_->get_num_stolen_to_staged(reset); + count += d.bound_queue_->get_num_stolen_to_staged(reset); + } + + return count + + low_priority_queue_.get_num_stolen_to_staged(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_staged(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_to_staged(reset); + } + count += d.queue_->get_num_stolen_to_staged(reset); + return count + d.bound_queue_->get_num_stolen_to_staged(reset); + } +#endif + + /////////////////////////////////////////////////////////////////////// + void abort_all_suspended_threads() override + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + data_[i].data_.queue_->abort_all_suspended_threads(); + data_[i].data_.bound_queue_->abort_all_suspended_threads(); + } + } + + /////////////////////////////////////////////////////////////////////// + bool cleanup_terminated(bool delete_all) override + { + bool empty = true; + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + empty = d.high_priority_queue_->cleanup_terminated( + delete_all) && + empty; + } + empty = d.queue_->cleanup_terminated(delete_all) && empty; + empty = d.bound_queue_->cleanup_terminated(delete_all) && empty; + } + return low_priority_queue_.cleanup_terminated(delete_all) && empty; + } + + bool cleanup_terminated( + std::size_t num_thread, bool delete_all) override + { + auto& d = data_[num_thread].data_; + bool empty = d.queue_->cleanup_terminated(delete_all); + empty = d.queue_->cleanup_terminated(delete_all) && empty; + if (!delete_all) + return empty; + + if (num_thread < num_high_priority_queues_) + { + empty = + d.high_priority_queue_->cleanup_terminated(delete_all) && + empty; + } + + if (num_thread == num_queues_ - 1) + { + return low_priority_queue_.cleanup_terminated(delete_all) && + empty; + } + return empty; + } + + /////////////////////////////////////////////////////////////////////// + // create a new thread and schedule it if the initial state is equal to + // pending + void create_thread(thread_init_data& data, thread_id_ref_type* id, + error_code& ec) override + { + std::size_t num_thread = + data.schedulehint.mode == thread_schedule_hint_mode::thread ? + data.schedulehint.hint : + static_cast(-1); + + if (static_cast(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread); + + data.schedulehint.mode = thread_schedule_hint_mode::thread; + data.schedulehint.hint = static_cast(num_thread); + + // now create the thread + switch (data.priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + if (data.priority == thread_priority::boost) + { + data.priority = thread_priority::normal; + } + + if (num_thread >= num_high_priority_queues_) + { + num_thread %= num_high_priority_queues_; + } + + // we never stage high priority threads, so there is no need to + // call wait_or_add_new for those. + data_[num_thread].data_.high_priority_queue_->create_thread( + data, id, ec); + break; + } + + case thread_priority::low: + low_priority_queue_.create_thread(data, id, ec); + break; + + case thread_priority::bound: + HPX_ASSERT(num_thread < num_queues_); + data_[num_thread].data_.bound_queue_->create_thread( + data, id, ec); + break; + + case thread_priority::default_: + case thread_priority::normal: + HPX_ASSERT(num_thread < num_queues_); + data_[num_thread].data_.queue_->create_thread(data, id, ec); + break; + + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::create_thread", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + // Retrieve the next viable steal request from our channel + bool try_receiving_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + bool ret = d.requests_->get(&req); + while (ret && req.state_ == steal_request::state::failed) + { + // forget the received steal request + --data_[req.num_thread_].data_.requested_; + + // there should have been exactly one outstanding steal request + HPX_ASSERT(data_[req.num_thread_].data_.requested_ == 0); + + // try to retrieve next steal request + ret = d.requests_->get(&req); + } + + // No special treatment for other states + HPX_ASSERT( + (ret && req.state_ != steal_request::state::failed) || !ret); + + return ret; + } + + // Pass steal request on to another worker. Returns true if we have + // handled our own steal request. + bool decline_or_forward_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + HPX_ASSERT(req.attempt_ < num_queues_); + + if (req.num_thread_ == d.num_thread_) + { + // Steal request was either returned by another worker or + // picked up by us. + + if (req.state_ == steal_request::state::idle || + d.queue_->get_pending_queue_length( + std::memory_order_relaxed) != 0) + { +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_discarded_; +#endif + // we have work now, drop this steal request + --d.requested_; + + // there should have been exactly one outstanding steal + // request + HPX_ASSERT(d.requested_ == 0); + } + else + { + // Continue circulating the steal request if it makes sense + req.state_ = steal_request::state::idle; + req.victims_ = d.victims_; + req.attempt_ = + static_cast(count(d.victims_) - 1); + + std::size_t victim = next_victim(d, req); + data_[victim].data_.requests_->set(HPX_MOVE(req)); +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_sent_; +#endif + } + + return true; + } + + // send this steal request on to the next (random) core + ++req.attempt_; + set(req.victims_, d.num_thread_); // don't ask a core twice + + HPX_ASSERT(req.attempt_ == count(req.victims_) - 1); + + std::size_t victim = next_victim(d, req); + data_[victim].data_.requests_->set(HPX_MOVE(req)); +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_sent_; +#endif + return false; + } + + // decline_or_forward_all_steal_requests is only called when a worker + // has nothing else to do but to relay steal requests, which means the + // worker is idle. + void decline_or_forward_all_steal_requests(scheduler_data& d) noexcept + { + steal_request req; + while (try_receiving_steal_request(d, req)) + { +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_received_; +#endif + decline_or_forward_steal_request(d, req); + } + } + + // Handle a steal request by sending tasks in return or passing it on to + // another worker. Returns true if the request was satisfied. + bool handle_steal_request( + scheduler_data& d, steal_request& req) noexcept + { +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_received_; +#endif + if (req.num_thread_ == d.num_thread_) + { + // got back our own steal request. + HPX_ASSERT(req.state_ != steal_request::state::failed); + + // Defer the decision to decline_steal_request + decline_or_forward_steal_request(d, req); + return false; + } + + // Send tasks from our queue to the requesting core, depending on + // what's requested, either one task or half of the available tasks + std::size_t max_num_to_steal = 1; + if (req.stealhalf_) + { + max_num_to_steal = d.queue_->get_pending_queue_length( + std::memory_order_relaxed) / + 2; + } + + if (max_num_to_steal != 0) + { + task_data thrds(d.num_thread_); + thrds.tasks_.reserve(max_num_to_steal); + + thread_id_ref_type thrd; + while (max_num_to_steal-- != 0 && + d.queue_->get_next_thread(thrd, false, true)) + { +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.queue_->increment_num_stolen_from_pending(); +#endif + thrds.tasks_.push_back(HPX_MOVE(thrd)); + thrd = thread_id_ref_type{}; + } + + // we are ready to send at least one task + if (!thrds.tasks_.empty()) + { + // send these tasks to the core that has sent the steal + // request + req.channel_->set(HPX_MOVE(thrds)); + + // wake the thread up so that it can pick up the stolen + // tasks + do_some_work(req.num_thread_); + + return true; + } + } + + // There's nothing we can do with this steal request except pass + // it on to a different worker + decline_or_forward_steal_request(d, req); + return false; + } + + // Return the next thread to be executed, return false if none is + // available + bool get_next_thread(std::size_t num_thread, bool running, + thread_id_ref_type& thrd, bool allow_stealing) + { + HPX_ASSERT(num_thread < num_queues_); + + auto& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + bool result = d.high_priority_queue_->get_next_thread(thrd); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.high_priority_queue_->increment_num_pending_accesses(); + if (result) + { + ++d.num_recent_tasks_executed_; + return true; + } + d.high_priority_queue_->increment_num_pending_misses(); +#else + if (result) + { + ++d.num_recent_tasks_executed_; + return true; + } +#endif + } + + bool result = false; + for (thread_queue_type* this_queue : {d.bound_queue_, d.queue_}) + { + result = this_queue->get_next_thread(thrd); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + this_queue->increment_num_pending_accesses(); +#endif + if (result) + break; +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + this_queue->increment_num_pending_misses(); +#endif + } + + if (allow_stealing && result) + { + // We found a task to run, however before running it we handle + // steal requests (assuming that there is more work left that + // could be used to satisfy steal requests). + + steal_request req; + while (try_receiving_steal_request(d, req)) + { + if (!handle_steal_request(d, req)) + break; + } + + ++d.num_recent_tasks_executed_; + return true; + } + + // Give up if we have work to convert. + if (d.queue_->get_staged_queue_length(std::memory_order_relaxed) != + 0 || + !running) + { + return false; + } + + if (low_priority_queue_.get_next_thread(thrd)) + { + ++d.num_recent_tasks_executed_; + return true; + } + + return false; + } + + // Schedule the passed thread + void schedule_thread(thread_id_ref_type thrd, + threads::thread_schedule_hint schedulehint, + bool allow_fallback = false, + thread_priority priority = thread_priority::default_) override + { + auto num_thread = static_cast(-1); + if (schedulehint.mode == thread_schedule_hint_mode::thread) + { + num_thread = schedulehint.hint; + } + else + { + allow_fallback = false; + } + + if (static_cast(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread, allow_fallback); + + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); + HPX_ASSERT(num_thread < num_queues_); + + switch (priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + data_[num].data_.high_priority_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + + case thread_priority::low: + low_priority_queue_.schedule_thread(HPX_MOVE(thrd)); + break; + + case thread_priority::default_: + case thread_priority::normal: + data_[num_thread].data_.queue_->schedule_thread(HPX_MOVE(thrd)); + break; + + case thread_priority::bound: + data_[num_thread].data_.bound_queue_->schedule_thread( + HPX_MOVE(thrd)); + break; + + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::schedule_thread", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + void schedule_thread_last(thread_id_ref_type thrd, + threads::thread_schedule_hint schedulehint, + bool allow_fallback = false, + thread_priority priority = thread_priority::default_) override + { + auto num_thread = static_cast(-1); + if (schedulehint.mode == thread_schedule_hint_mode::thread) + { + num_thread = schedulehint.hint; + } + else + { + allow_fallback = false; + } + + if (static_cast(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread, allow_fallback); + + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); + HPX_ASSERT(num_thread < num_queues_); + + switch (priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + data_[num].data_.high_priority_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + + case thread_priority::low: + low_priority_queue_.schedule_thread(HPX_MOVE(thrd), true); + break; + + case thread_priority::default_: + case thread_priority::normal: + data_[num_thread].data_.queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + + case thread_priority::bound: + data_[num_thread].data_.bound_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::schedule_thread_last", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + /// Destroy the passed thread as it has been terminated + void destroy_thread(threads::thread_data* thrd) override + { + HPX_ASSERT(thrd->get_scheduler_base() == this); + thrd->get_queue().destroy_thread(thrd); + } + + /////////////////////////////////////////////////////////////////////// + // This returns the current length of the queues (work items and new + // items) + std::int64_t get_queue_length(std::size_t num_thread) const override + { + // Return queue length of one specific queue. + std::int64_t count = 0; + if (static_cast(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + auto const& d = data_[num_thread].data_; + + if (num_thread < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_queue_length(); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_queue_length(); + } + count += d.queue_->get_queue_length(); + return count + d.bound_queue_->get_queue_length(); + } + + // Cumulative queue lengths of all queues. + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_queue_length(); + } + count += d.queue_->get_queue_length(); + count += d.bound_queue_->get_queue_length(); + } + return count + low_priority_queue_.get_queue_length(); + } + + /////////////////////////////////////////////////////////////////////// + // Queries the current thread count of the queues. + std::int64_t get_thread_count( + thread_schedule_state state = thread_schedule_state::unknown, + thread_priority priority = thread_priority::default_, + std::size_t num_thread = static_cast(-1), + bool /* reset */ = false) const override + { + // Return thread count of one specific queue. + std::int64_t count = 0; + if (static_cast(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + switch (priority) + { + case thread_priority::default_: + { + if (num_thread < num_high_priority_queues_) + { + count = d.high_priority_queue_->get_thread_count(state); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_thread_count(state); + } + count += d.queue_->get_thread_count(state); + return count + d.bound_queue_->get_thread_count(state); + } + + case thread_priority::low: + { + if (num_queues_ - 1 == num_thread) + return low_priority_queue_.get_thread_count(state); + break; + } + + case thread_priority::normal: + return d.queue_->get_thread_count(state); + + case thread_priority::bound: + return d.bound_queue_->get_thread_count(state); + + case thread_priority::boost: + case thread_priority::high: + case thread_priority::high_recursive: + { + if (num_thread < num_high_priority_queues_) + { + return d.high_priority_queue_->get_thread_count(state); + } + break; + } + + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::get_thread_count", + "unknown thread priority value " + "(thread_priority::unknown)"); + } + } + return 0; + } + + // Return the cumulative count for all queues. + switch (priority) + { + case thread_priority::default_: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_thread_count(state); + } + count += d.queue_->get_thread_count(state); + count += d.bound_queue_->get_thread_count(state); + } + count += low_priority_queue_.get_thread_count(state); + break; + } + + case thread_priority::low: + return low_priority_queue_.get_thread_count(state); + + case thread_priority::normal: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + count += data_[i].data_.queue_->get_thread_count(state); + } + break; + } + + case thread_priority::bound: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + count += + data_[i].data_.bound_queue_->get_thread_count(state); + } + break; + } + + case thread_priority::boost: + case thread_priority::high: + case thread_priority::high_recursive: + { + for (std::size_t i = 0; i != num_high_priority_queues_; ++i) + { + count += + data_[i].data_.high_priority_queue_->get_thread_count( + state); + } + break; + } + + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::get_thread_count", + "unknown thread priority value " + "(thread_priority::unknown)"); + } + } + return count; + } + + // Queries whether a given core is idle + bool is_core_idle(std::size_t num_thread) const override + { + if (num_thread < num_queues_) + { + for (thread_queue_type* this_queue : + {data_[num_thread].data_.bound_queue_, + data_[num_thread].data_.queue_}) + { + if (this_queue->get_queue_length() != 0) + { + return false; + } + } + } + + if (num_thread < num_high_priority_queues_ && + data_[num_thread] + .data_.high_priority_queue_->get_queue_length() != 0) + { + return false; + } + return true; + } + + /////////////////////////////////////////////////////////////////////// + // Enumerate matching threads from all queues + bool enumerate_threads(hpx::function const& f, + thread_schedule_state state = + thread_schedule_state::unknown) const override + { + bool result = true; + for (std::size_t i = 0; i != num_high_priority_queues_; ++i) + { + result = result && + data_[i].data_.high_priority_queue_->enumerate_threads( + f, state); + } + + result = result && low_priority_queue_.enumerate_threads(f, state); + + for (std::size_t i = 0; i != num_queues_; ++i) + { + result = result && + data_[i].data_.queue_->enumerate_threads(f, state); + result = result && + data_[i].data_.bound_queue_->enumerate_threads(f, state); + } + return result; + } + +#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME + /////////////////////////////////////////////////////////////////////// + // Queries the current average thread wait time of the queues. + std::int64_t get_average_thread_wait_time( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return average thread wait time of one specific queue. + std::uint64_t wait_time = 0; + std::uint64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time = + d.high_priority_queue_->get_average_thread_wait_time(); + ++count; + } + + if (num_thread == num_queues_ - 1) + { + wait_time += + low_priority_queue_.get_average_thread_wait_time(); + ++count; + } + + wait_time += d.queue_->get_average_thread_wait_time(); + wait_time += d.bound_queue_->get_average_thread_wait_time(); + return wait_time / (count + 1); + } + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time += + d.high_priority_queue_->get_average_thread_wait_time(); + } + wait_time += d.queue_->get_average_thread_wait_time(); + wait_time += d.bound_queue_->get_average_thread_wait_time(); + ++count; + } + + return (wait_time + + low_priority_queue_.get_average_thread_wait_time()) / + (count + 1); + } + + /////////////////////////////////////////////////////////////////////// + // Queries the current average task wait time of the queues. + std::int64_t get_average_task_wait_time( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return average task wait time of one specific queue. + // Return average thread wait time of one specific queue. + std::uint64_t wait_time = 0; + std::uint64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time = + d.high_priority_queue_->get_average_task_wait_time(); + ++count; + } + + if (num_thread == num_queues_ - 1) + { + wait_time += + low_priority_queue_.get_average_task_wait_time(); + ++count; + } + + wait_time += d.queue_->get_average_task_wait_time(); + wait_time += d.bound_queue_->get_average_task_wait_time(); + return wait_time / (count + 1); + } + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time += + d.high_priority_queue_->get_average_task_wait_time(); + } + wait_time += d.queue_->get_average_task_wait_time(); + wait_time += d.bound_queue_->get_average_task_wait_time(); + ++count; + } + + return (wait_time + + low_priority_queue_.get_average_task_wait_time()) / + (count + 1); + } +#endif + + // return a random victim for the current stealing operation + std::size_t random_victim(steal_request const& req) noexcept + { + std::size_t result = 0; + + { + // generate at most 3 random numbers before resorting to more + // expensive algorithm + std::uniform_int_distribution uniform( + 0, static_cast(num_queues_ - 1)); + + int attempts = 0; + do + { + result = uniform(gen_); + if (result != req.num_thread_ && + !test(req.victims_, result)) + { + HPX_ASSERT(result < num_queues_); + return result; + } + } while (++attempts < 3); + } + + // to avoid infinite trials we randomly select one of the possible + // victims + std::uniform_int_distribution uniform(0, + static_cast( + num_queues_ - count(req.victims_) - 1)); + + // generate one more random number + std::size_t selected_victim = uniform(gen_); + for (std::size_t i = 0; i != num_queues_; ++i) + { + if (!test(req.victims_, i)) + { + if (selected_victim == 0) + { + result = i; + break; + } + --selected_victim; + } + } + + HPX_ASSERT(result < num_queues_ && result != req.num_thread_ && + !test(req.victims_, result)); + + return result; + } + + // return the number of the next victim core + std::size_t next_victim([[maybe_unused]] scheduler_data& d, + steal_request const& req) noexcept + { + std::size_t victim; + + // return thief if max steal attempts has been reached or no more + // cores are available for stealing + if (req.attempt_ == num_queues_ - 1) + { + // Return steal request to thief + victim = req.num_thread_; + } + else + { + HPX_ASSERT( + req.num_thread_ == d.num_thread_ || req.attempt_ != 0); + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + if (d.last_victim_ != std::uint16_t(-1)) + { + victim = d.last_victim_; + } + else +#endif + { + victim = random_victim(req); + } + } + + // couldn't find victim, return steal request to thief + if (victim == static_cast(-1)) + { + victim = req.num_thread_; + HPX_ASSERT(victim != d.num_thread_); + } + + HPX_ASSERT(victim < num_queues_); + HPX_ASSERT(req.attempt_ < num_queues_); + + return victim; + } + + // Every worker can have at most MAXSTEAL pending steal requests. A + // steal request with idle == false indicates that the requesting worker + // is still busy working on some tasks. A steal request with idle == + // true indicates that the requesting worker is in fact idle and has + // nothing to work on. + void send_steal_request(scheduler_data& d, bool idle = true) noexcept + { + if (d.requested_ == 0) + { + // Estimate work-stealing efficiency during the last interval; + // switch strategies if the value is below a threshold + if (d.num_recent_steals_ >= + scheduler_data::num_steal_adaptive_interval_) + { + double const ratio = + static_cast(d.num_recent_tasks_executed_) / + d.num_steal_adaptive_interval_; + + d.num_recent_steals_ = 0; + d.num_recent_tasks_executed_ = 0; + + if (ratio >= 2.) + { + d.stealhalf_ = true; + } + else + { + if (d.stealhalf_) + { + d.stealhalf_ = false; + } + else if (ratio <= 1.) + { + d.stealhalf_ = true; + } + } + } + + steal_request req( + d.num_thread_, d.tasks_, d.victims_, idle, d.stealhalf_); + std::size_t victim = next_victim(d, req); + + ++d.requested_; + data_[victim].data_.requests_->set(HPX_MOVE(req)); +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_sent_; +#endif + } + } + + // Try receiving tasks that are sent by another core as a response to + // one of our steal requests. This returns true if new tasks were + // received. + bool try_receiving_tasks(scheduler_data& d, std::size_t& added, + thread_id_ref_type* next_thrd) + { + task_data thrds{}; + if (d.tasks_->get(&thrds)) + { + // keep track of number of outstanding steal requests, there + // should have been at most one + --d.requested_; + HPX_ASSERT(d.requested_ == 0); + + // if at least one thrd was received + if (!thrds.tasks_.empty()) + { + // Schedule all but the first received task in reverse order + // to maintain the sequence of tasks as pulled from the + // victims queue. + for (std::size_t i = thrds.tasks_.size() - 1; i != 0; --i) + { + HPX_ASSERT(thrds.tasks_[i]); + d.queue_->schedule_thread( + HPX_MOVE(thrds.tasks_[i]), true); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.queue_->increment_num_stolen_to_pending(); +#endif + ++added; + } + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + // store the originating core for the next stealing + // operation + d.last_victim_ = thrds.num_thread_; + HPX_ASSERT(d.last_victim_ != d.num_thread_); +#endif + // the last of the received tasks will be either directly + // executed or normally scheduled + if (next_thrd != nullptr) + { + // directly return the last thread as it should be run + // immediately + ++d.num_recent_tasks_executed_; + *next_thrd = HPX_MOVE(thrds.tasks_.front()); + } + else + { + d.queue_->schedule_thread( + HPX_MOVE(thrds.tasks_.front()), true); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.queue_->increment_num_stolen_to_pending(); +#endif + ++added; + } + + ++d.num_recent_steals_; + return true; + } + } + return false; + } + + // This is a function which gets called periodically by the thread + // manager to allow for maintenance tasks to be executed in the + // scheduler. Returns true if the OS thread calling this function has to + // be terminated (i.e. no more work has to be done). + bool wait_or_add_new(std::size_t num_thread, bool running, + [[maybe_unused]] std::int64_t& idle_loop_count, bool allow_stealing, + std::size_t& added, thread_id_ref_type* next_thrd = nullptr) + { + HPX_ASSERT(num_thread < num_queues_); + + added = 0; + + auto& d = data_[num_thread].data_; + + // We don't need to call wait_or_add_new for high priority or bound + // threads as these threads are never created 'staged'. + + bool result = + d.queue_->wait_or_add_new(running, added, allow_stealing); + + // check if work was available + if (0 != added) + { + return result; + } + + if (num_thread == num_queues_ - 1) + { + result = low_priority_queue_.wait_or_add_new(running, added) && + result; + } + + // check if we have been disabled or if no stealing is requested (or + // not possible) + if (!running || num_queues_ == 1) + { + return !running; + } + + // attempt to steal more work + if (allow_stealing) + { + send_steal_request(d); + HPX_ASSERT(d.requested_ != 0); + } + + if (try_receiving_tasks(d, added, next_thrd)) + { + return false; + } + + // if we did not receive any new task, decline or forward all + // pending steal requests + decline_or_forward_all_steal_requests(d); + +#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION + // no new work is available, are we deadlocked? + if (HPX_UNLIKELY(get_minimal_deadlock_detection_enabled() && + LHPX_ENABLED(error))) + { + bool suspended_only = true; + + for (std::size_t i = 0; suspended_only && i != num_queues_; ++i) + { + suspended_only = + data_[i].data_.queue_->dump_suspended_threads( + i, idle_loop_count, running); + } + + if (HPX_UNLIKELY(suspended_only)) + { + if (running) + { + LTM_(error) //-V128 + << "queue(" << num_thread << "): " + << "no new work available, are we " + "deadlocked?"; + } + else + { + LHPX_CONSOLE_( + hpx::util::logging::level::error) //-V128 + << " [TM] " //-V128 + << "queue(" << num_thread << "): " + << "no new work available, are we " + "deadlocked?\n"; + } + } + } +#endif + return result; + } + + /////////////////////////////////////////////////////////////////////// + void on_start_thread(std::size_t num_thread) override + { + hpx::threads::detail::set_local_thread_num_tss(num_thread); + hpx::threads::detail::set_thread_pool_num_tss( + parent_pool_->get_pool_id().index()); + + auto& d = data_[num_thread].data_; + d.init(num_thread, num_queues_, this->thread_queue_init_, + num_thread < num_high_priority_queues_); + + d.queue_->on_start_thread(num_thread); + d.bound_queue_->on_start_thread(num_thread); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_start_thread(num_thread); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_start_thread(num_thread); + } + + // Initially set all bits, code below resets the bits corresponding + // to cores that can serve as a victim for the current core. A set + // bit in this mask means 'do not steal from this core'. + resize(d.victims_, num_queues_); + reset(d.victims_); + set(d.victims_, num_thread); + } + + void on_stop_thread(std::size_t num_thread) override + { + auto& d = data_[num_thread].data_; + + d.queue_->on_stop_thread(num_thread); + d.bound_queue_->on_stop_thread(num_thread); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_stop_thread(num_thread); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_stop_thread(num_thread); + } + } + + void on_error( + std::size_t num_thread, std::exception_ptr const& e) override + { + auto& d = data_[num_thread].data_; + + d.queue_->on_error(num_thread, e); + d.bound_queue_->on_error(num_thread, e); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_error(num_thread, e); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_error(num_thread, e); + } + } + + void reset_thread_distribution() noexcept override + { + curr_queue_.store(0, std::memory_order_release); + } + + void set_scheduler_mode(scheduler_mode mode) noexcept override + { + // we should not disable stealing for this scheduler, this would + // possibly lead to deadlocks + scheduler_base::set_scheduler_mode(mode | + policies::scheduler_mode::enable_stealing | + policies::scheduler_mode::enable_stealing_numa); + } + + protected: + std::vector> data_; + thread_queue_type low_priority_queue_; + + std::atomic curr_queue_; + + std::mt19937 gen_; + + detail::affinity_data const& affinity_data_; + std::size_t const num_queues_; + std::size_t const num_high_priority_queues_; + }; +} // namespace hpx::threads::policies + +#include diff --git a/libs/core/synchronization/tests/unit/sliding_semaphore.cpp b/libs/core/synchronization/tests/unit/sliding_semaphore.cpp index fd6876fa9b9a..185b65756da7 100644 --- a/libs/core/synchronization/tests/unit/sliding_semaphore.cpp +++ b/libs/core/synchronization/tests/unit/sliding_semaphore.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -23,9 +24,9 @@ int const initial_count = 42; int const num_tasks = 139; std::atomic completed_tasks(0); -void worker(hpx::sliding_semaphore& sem) +void worker(std::shared_ptr sem) { - sem.signal(++count); // signal main thread + sem->signal(++count); // signal main thread } /////////////////////////////////////////////////////////////////////////////// @@ -34,14 +35,14 @@ int hpx_main() std::vector> futures; futures.reserve(num_tasks); - hpx::sliding_semaphore sem(initial_count); + auto sem = std::make_shared(initial_count); for (std::size_t i = 0; i != num_tasks; ++i) { futures.emplace_back(hpx::async(&worker, std::ref(sem))); } - sem.wait(initial_count + num_tasks); + sem->wait(initial_count + num_tasks); HPX_TEST_EQ(count, num_tasks); diff --git a/libs/core/testing/src/performance.cpp b/libs/core/testing/src/performance.cpp index b06b8b8c6cd1..5cd224ff1497 100644 --- a/libs/core/testing/src/performance.cpp +++ b/libs/core/testing/src/performance.cpp @@ -63,6 +63,7 @@ namespace hpx::util { strm << R"( "executor" : ")" << std::get<1>(item.first) << "\",\n"; strm << R"( "series" : [)"; + double average = 0.0; int series = 0; for (auto const val : item.second) { @@ -70,8 +71,10 @@ namespace hpx::util { strm << ", "; strm << val; ++series; + average += val; } - strm << "]\n"; + strm << "],\n"; + strm << " \"average\" : " << average / series << "\n"; strm << " }"; ++outputs; } diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp index a84afdb668dd..6a6dbb766d69 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp @@ -464,6 +464,7 @@ namespace hpx::threads::detail { { ++idle_loop_count; + next_thrd = nullptr; if (scheduler.wait_or_add_new(num_thread, running, idle_loop_count, enable_stealing_staged, added, &next_thrd)) diff --git a/libs/core/thread_pools/src/scheduled_thread_pool.cpp b/libs/core/thread_pools/src/scheduled_thread_pool.cpp index cc2c684cae9f..308d8748f586 100644 --- a/libs/core/thread_pools/src/scheduled_thread_pool.cpp +++ b/libs/core/thread_pools/src/scheduled_thread_pool.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -66,3 +67,14 @@ template class HPX_CORE_EXPORT hpx::threads::policies::shared_priority_queue_scheduler<>; template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< hpx::threads::policies::shared_priority_queue_scheduler<>>; + +template class HPX_CORE_EXPORT + hpx::threads::policies::local_workrequesting_scheduler<>; +template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< + hpx::threads::policies::local_workrequesting_scheduler<>>; +template class HPX_CORE_EXPORT + hpx::threads::policies::local_workrequesting_scheduler; +template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< + hpx::threads::policies::local_workrequesting_scheduler>; diff --git a/libs/core/threading_base/src/scheduler_base.cpp b/libs/core/threading_base/src/scheduler_base.cpp index 3bb4facc62b3..8ddd582abe4e 100644 --- a/libs/core/threading_base/src/scheduler_base.cpp +++ b/libs/core/threading_base/src/scheduler_base.cpp @@ -330,8 +330,7 @@ namespace hpx::threads::policies { void scheduler_base::add_scheduler_mode(scheduler_mode mode) noexcept { // distribute the same value across all cores - mode = static_cast(get_scheduler_mode() | mode); - set_scheduler_mode(mode); + set_scheduler_mode(get_scheduler_mode() | mode); } void scheduler_base::remove_scheduler_mode(scheduler_mode mode) noexcept diff --git a/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp b/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp index 077df205ea9f..eab76a473acc 100644 --- a/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp +++ b/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp @@ -75,7 +75,9 @@ int main(int argc, char** argv) "abp-priority-fifo", "abp-priority-lifo", #endif - "shared-priority" + "shared-priority", + "local-workrequesting-fifo", + "local-workrequesting-lifo" }; // clang-format on for (auto const& scheduler : schedulers) diff --git a/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp b/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp index b21712446e79..6e5efe4e9693 100644 --- a/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp +++ b/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp @@ -425,6 +425,12 @@ namespace hpx { namespace threads { void create_scheduler_shared_priority( thread_pool_init_parameters const&, policies::thread_queue_init_parameters const&, std::size_t); + void create_scheduler_local_workrequesting_fifo( + thread_pool_init_parameters const&, + policies::thread_queue_init_parameters const&, std::size_t); + void create_scheduler_local_workrequesting_lifo( + thread_pool_init_parameters const&, + policies::thread_queue_init_parameters const&, std::size_t); mutable mutex_type mtx_; // mutex protecting the members diff --git a/libs/core/threadmanager/src/threadmanager.cpp b/libs/core/threadmanager/src/threadmanager.cpp index 1b4c555a7eca..1fde4443d865 100644 --- a/libs/core/threadmanager/src/threadmanager.cpp +++ b/libs/core/threadmanager/src/threadmanager.cpp @@ -502,6 +502,85 @@ namespace hpx { namespace threads { pools_.push_back(HPX_MOVE(pool)); } + void threadmanager::create_scheduler_local_workrequesting_fifo( + thread_pool_init_parameters const& thread_pool_init, + policies::thread_queue_init_parameters const& thread_queue_init, + std::size_t numa_sensitive) + { + // set parameters for scheduler and pool instantiation and + // perform compatibility checks + std::size_t num_high_priority_queues = + hpx::util::get_entry_as(rtcfg_, + "hpx.thread_queue.high_priority_queues", + thread_pool_init.num_threads_); + detail::check_num_high_priority_queues( + thread_pool_init.num_threads_, num_high_priority_queues); + + // instantiate the scheduler + using local_sched_type = + hpx::threads::policies::local_workrequesting_scheduler<>; + + local_sched_type::init_parameter_type init( + thread_pool_init.num_threads_, thread_pool_init.affinity_data_, + num_high_priority_queues, thread_queue_init, + "core-local_workrequesting_scheduler"); + + std::unique_ptr sched(new local_sched_type(init)); + + // set the default scheduler flags + sched->set_scheduler_mode(thread_pool_init.mode_); + + // conditionally set/unset this flag + sched->update_scheduler_mode( + policies::scheduler_mode::enable_stealing_numa, !numa_sensitive); + + // instantiate the pool + std::unique_ptr pool = std::make_unique< + hpx::threads::detail::scheduled_thread_pool>( + HPX_MOVE(sched), thread_pool_init); + pools_.push_back(HPX_MOVE(pool)); + } + + void threadmanager::create_scheduler_local_workrequesting_lifo( + thread_pool_init_parameters const& thread_pool_init, + policies::thread_queue_init_parameters const& thread_queue_init, + std::size_t numa_sensitive) + { + // set parameters for scheduler and pool instantiation and + // perform compatibility checks + std::size_t num_high_priority_queues = + hpx::util::get_entry_as(rtcfg_, + "hpx.thread_queue.high_priority_queues", + thread_pool_init.num_threads_); + detail::check_num_high_priority_queues( + thread_pool_init.num_threads_, num_high_priority_queues); + + // instantiate the scheduler + using local_sched_type = + hpx::threads::policies::local_workrequesting_scheduler; + + local_sched_type::init_parameter_type init( + thread_pool_init.num_threads_, thread_pool_init.affinity_data_, + num_high_priority_queues, thread_queue_init, + "core-local_workrequesting_scheduler"); + + std::unique_ptr sched(new local_sched_type(init)); + + // set the default scheduler flags + sched->set_scheduler_mode(thread_pool_init.mode_); + + // conditionally set/unset this flag + sched->update_scheduler_mode( + policies::scheduler_mode::enable_stealing_numa, !numa_sensitive); + + // instantiate the pool + std::unique_ptr pool = std::make_unique< + hpx::threads::detail::scheduled_thread_pool>( + HPX_MOVE(sched), thread_pool_init); + pools_.push_back(HPX_MOVE(pool)); + } + void threadmanager::create_pools() { auto& rp = hpx::resource::get_partitioner(); @@ -640,6 +719,16 @@ namespace hpx { namespace threads { thread_pool_init, thread_queue_init, numa_sensitive); break; + case resource::scheduling_policy::local_workrequesting_fifo: + create_scheduler_local_workrequesting_fifo( + thread_pool_init, thread_queue_init, numa_sensitive); + break; + + case resource::scheduling_policy::local_workrequesting_lifo: + create_scheduler_local_workrequesting_lifo( + thread_pool_init, thread_queue_init, numa_sensitive); + break; + case resource::scheduling_policy::abp_priority_fifo: create_scheduler_abp_priority_fifo( thread_pool_init, thread_queue_init, numa_sensitive); diff --git a/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp b/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp index 9a8fcf5f933d..c25d206a50df 100644 --- a/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp +++ b/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp @@ -317,7 +317,7 @@ struct stepper print(U); // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); auto Op = unwrapping(&stepper::heat_part); @@ -367,15 +367,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Wait on Checkpoint Printing diff --git a/tests/performance/local/future_overhead.cpp b/tests/performance/local/future_overhead.cpp index 2ea3701c48d0..4bf154750913 100644 --- a/tests/performance/local/future_overhead.cpp +++ b/tests/performance/local/future_overhead.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -257,16 +258,16 @@ void measure_function_futures_sliding_semaphore( // start the clock high_resolution_timer walltime; const int sem_count = 5000; - hpx::sliding_semaphore sem(sem_count); + auto sem = std::make_shared(sem_count); for (std::uint64_t i = 0; i < count; ++i) { - hpx::async(exec, [i, &sem]() { + hpx::async(exec, [i, sem]() { null_function(); - sem.signal(i); + sem->signal(i); }); - sem.wait(i); + sem->wait(i); } - sem.wait(count + sem_count - 1); + sem->wait(count + sem_count - 1); // stop the clock const double duration = walltime.elapsed(); diff --git a/tools/perftests_ci/perftest/plot.py b/tools/perftests_ci/perftest/plot.py index 2d7c4777fa27..aecbf6ab713e 100644 --- a/tools/perftests_ci/perftest/plot.py +++ b/tools/perftests_ci/perftest/plot.py @@ -38,8 +38,10 @@ def __str__(self): @classmethod def outputs_by_key(cls, data): def split_output(o): - return cls(**{k: v - for k, v in o.items() if k != 'series'}), o['series'] + return cls(**{ + k: v for k, v in o.items() \ + if k != 'series' and k != 'average' \ + }), o['series'] return dict(split_output(o) for o in data['outputs'])