From df44a5a61d466372e6894fe6649a84d1c4f355ce Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 25 Nov 2019 12:49:34 -0600 Subject: [PATCH 1/6] Adding local work stealing scheduler that is based on message passing internally - Using uniform_int_distribution with proper bounds - Removing queue index from thread_queues as it was unused - flyby: remove commented out options from .clang-format - Renaming workstealing --> workrequesting - Adding adaptive work stealing (steal half/steal one) - this makes this scheduler consistently (albeit only slightly) faster than the (default) local-priority scheduler - Adding LIFO and FIFO variations of local work-stealing scheduler - flyby: fixing HPX_WITH_SWAP_CONTEXT_EMULATION - flyby: minor changes to fibonacci_local example - Adding high- and low- priority queues - flyby: cache_line_data now does not generate warnings errors if padding is not needed - Adding bound queues - flyby: using cache_line_data for scheduler states --- .circleci/config.yml | 3 +- .jenkins/lsu/env-clang-13.sh | 2 + .jenkins/lsu/env-gcc-11.sh | 3 + CMakeLists.txt | 7 + cmake/HPX_AddTest.cmake | 5 + .../manual/hpx_runtime_and_resources.rst | 37 +- ...ching_and_configuring_hpx_applications.rst | 4 +- examples/quickstart/fibonacci_local.cpp | 25 +- .../src/parse_command_line_local.cpp | 13 +- .../resource_partitioner/partitioner_fwd.hpp | 2 + .../src/detail_partitioner.cpp | 18 + libs/core/schedulers/CMakeLists.txt | 1 + .../include/hpx/modules/schedulers.hpp | 3 +- .../local_workrequesting_scheduler.hpp | 1920 +++++++++++++++++ .../include/hpx/schedulers/thread_queue.hpp | 3 +- libs/core/testing/src/performance.cpp | 5 +- .../hpx/thread_pools/scheduling_loop.hpp | 12 +- .../src/scheduled_thread_pool.cpp | 12 + .../hpx/threading_base/register_thread.hpp | 2 - .../hpx/threading_base/thread_helpers.hpp | 3 - .../threading_base/src/scheduler_base.cpp | 3 +- .../threading_base/src/thread_helpers.cpp | 16 - .../regressions/thread_stacksize_current.cpp | 4 +- .../include/hpx/modules/threadmanager.hpp | 6 + libs/core/threadmanager/src/threadmanager.cpp | 89 + tools/perftests_ci/perftest/plot.py | 6 +- 26 files changed, 2146 insertions(+), 58 deletions(-) create mode 100644 libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp diff --git a/.circleci/config.yml b/.circleci/config.yml index 2298868cac25..eb498cbc59b3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -250,7 +250,8 @@ jobs: -DHPX_WITH_FETCH_JSON=On \ -DCMAKE_EXPORT_COMPILE_COMMANDS=On \ -DHPX_WITH_DOCUMENTATION=On \ - -DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" + -DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" \ + -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo - persist_to_workspace: root: /hpx paths: diff --git a/.jenkins/lsu/env-clang-13.sh b/.jenkins/lsu/env-clang-13.sh index d37c78c912a3..57700fbf99b7 100644 --- a/.jenkins/lsu/env-clang-13.sh +++ b/.jenkins/lsu/env-clang-13.sh @@ -25,6 +25,8 @@ configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON" configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON" configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON" configure_extra_options+=" -DHPX_WITH_LOGGING=OFF" +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo" + # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" diff --git a/.jenkins/lsu/env-gcc-11.sh b/.jenkins/lsu/env-gcc-11.sh index b9f7be125f7f..882cc60eb73d 100644 --- a/.jenkins/lsu/env-gcc-11.sh +++ b/.jenkins/lsu/env-gcc-11.sh @@ -15,6 +15,7 @@ module load pwrapi/1.1.1 export HPXRUN_RUNWRAPPER=srun export CXX_STD="20" +configure_extra_options+=" -DCMAKE_BUILD_TYPE=${build_type}" configure_extra_options+=" -DHPX_WITH_CXX_STANDARD=${CXX_STD}" configure_extra_options+=" -DHPX_WITH_MALLOC=system" configure_extra_options+=" -DHPX_WITH_FETCH_ASIO=ON" @@ -25,3 +26,5 @@ configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=STD_EXPERIMENTAL_SIMD" # The pwrapi library still needs to be set up properly on rostam # configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON" + +configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-lifo" diff --git a/CMakeLists.txt b/CMakeLists.txt index dbac004e39f4..b7f87482d423 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1407,6 +1407,13 @@ hpx_option( ADVANCED ) +hpx_option( + HPX_WITH_TESTS_COMMAND_LINE STRING + "Add given command line options to all tests run" "" + CATEGORY "Debugging" + ADVANCED +) + hpx_option( HPX_WITH_TESTS_MAX_THREADS_PER_LOCALITY STRING diff --git a/cmake/HPX_AddTest.cmake b/cmake/HPX_AddTest.cmake index d416d99fe3ce..fbabe1aabd4e 100644 --- a/cmake/HPX_AddTest.cmake +++ b/cmake/HPX_AddTest.cmake @@ -68,6 +68,11 @@ function(add_hpx_test category name) ) endif() + # add additional command line arguments to all test executions + if(NOT x${HPX_WITH_TESTS_COMMAND_LINE} STREQUAL x"") + set(args ${args} "${HPX_WITH_TESTS_COMMAND_LINE}") + endif() + if(${HPX_WITH_PARALLEL_TESTS_BIND_NONE} AND NOT run_serial AND NOT "${name}_RUNWRAPPER" diff --git a/docs/sphinx/manual/hpx_runtime_and_resources.rst b/docs/sphinx/manual/hpx_runtime_and_resources.rst index 667c7ea7f535..195427972ed9 100644 --- a/docs/sphinx/manual/hpx_runtime_and_resources.rst +++ b/docs/sphinx/manual/hpx_runtime_and_resources.rst @@ -16,13 +16,13 @@ |hpx| thread scheduling policies ================================ -The |hpx| runtime has five thread scheduling policies: local-priority, -static-priority, local, static and abp-priority. These policies can be specified -from the command line using the command line option :option:`--hpx:queuing`. In -order to use a particular scheduling policy, the runtime system must be built -with the appropriate scheduler flag turned on (e.g. ``cmake --DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for more -information). +The |hpx| runtime has six thread scheduling policies: local-priority, +static-priority, local, static, local-workrequesting-fifo, and abp-priority. +These policies can be specified from the command line using the command line +option :option:`--hpx:queuing`. In order to use a particular scheduling policy, +the runtime system must be built with the appropriate scheduler flag turned on +(e.g. ``cmake -DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for +more information). Priority local scheduling policy (default policy) ------------------------------------------------- @@ -51,9 +51,7 @@ policy and must be invoked using the command line option Static priority scheduling policy --------------------------------- -* invoke using: :option:`--hpx:queuing`\ ``=static-priority`` (or ``-qs``) -* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or - ``HPX_THREAD_SCHEDULERS=static-priority`` +* invoke using: :option:`--hpx:queuing`\ ``static-priority`` (or ``-qs``) The static scheduling policy maintains one queue per OS thread from which each OS thread pulls its tasks (user threads). Threads are distributed in a round @@ -102,7 +100,7 @@ domain first, only after that work is stolen from other NUMA domains. This scheduler can be used with two underlying queuing policies (FIFO: first-in-first-out, and LIFO: last-in-first-out). In order to use the LIFO policy use the command line option -:option:`--hpx:queuing`\ ``=abp-priority-lifo``. +:option:`--hpx:queuing`\ ``abp-priority-lifo``. .. Questions, concerns and notes: @@ -151,6 +149,23 @@ policy use the command line option I see both FIFO and double ended queues in ABP policies? +Work requesting scheduling policies +----------------------------------- + +* invoke using: :option:`--hpx:queuing`\ ``local-workrequesting-fifo`` + or using :option:`--hpx:queuing`\ ``local-workrequesting-lifo`` + +The work-requesting policies rely on a different mechanism of balancing work +between cores (compared to the other policies listed above). Instead of actively +trying to steal work from other cores, requesting work relies on a less +disruptive mechanism. If a core runs out of work, instead of actively looking at +the queues of neighboring cores, in this case a request is posted to another +core. This core now (whenever it is not busy with other work) either responds to +the original core by sending back work or passes the request on to the next +possible core in the system. In general, this scheme avoids contention on the +work queues as those are always accessed by their own cores only. + + The |hpx| resource partitioner ============================== diff --git a/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst b/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst index bc08a401b736..2b92572f4e10 100644 --- a/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst +++ b/docs/sphinx/manual/launching_and_configuring_hpx_applications.rst @@ -1571,7 +1571,9 @@ The predefined command line options for any application using The queue scheduling policy to use. Options are ``local``, ``local-priority-fifo``, ``local-priority-lifo``, ``static``, - ``static-priority``, ``abp-priority-fifo`` and ``abp-priority-lifo`` + ``static-priority``, ``abp-priority-fifo``, + ``local-workrequesting-fifo``, ``local-workrequesting-lifo`` + and ``abp-priority-lifo`` (default: ``local-priority-fifo``). .. option:: --hpx:high-priority-threads arg diff --git a/examples/quickstart/fibonacci_local.cpp b/examples/quickstart/fibonacci_local.cpp index 910a6f5285ab..2b1da660eed5 100644 --- a/examples/quickstart/fibonacci_local.cpp +++ b/examples/quickstart/fibonacci_local.cpp @@ -18,6 +18,9 @@ #include #include +#include +#include +#include /////////////////////////////////////////////////////////////////////////////// //[fibonacci @@ -26,15 +29,10 @@ std::uint64_t fibonacci(std::uint64_t n) if (n < 2) return n; - // Invoking the Fibonacci algorithm twice is inefficient. - // However, we intentionally demonstrate it this way to create some - // heavy workload. - hpx::future n1 = hpx::async(fibonacci, n - 1); - hpx::future n2 = hpx::async(fibonacci, n - 2); + std::uint64_t n2 = fibonacci(n - 2); - return n1.get() + - n2.get(); // wait for the Futures to return their values + return n1.get() + n2; // wait for the Future to return their values } //fibonacci] @@ -42,6 +40,9 @@ std::uint64_t fibonacci(std::uint64_t n) //[hpx_main int hpx_main(hpx::program_options::variables_map& vm) { + hpx::threads::add_scheduler_mode( + hpx::threads::policies::scheduler_mode::fast_idle_mode); + // extract command line argument, i.e. fib(N) std::uint64_t n = vm["n-value"].as(); @@ -67,9 +68,13 @@ int main(int argc, char* argv[]) hpx::program_options::options_description desc_commandline( "Usage: " HPX_APPLICATION_STRING " [options]"); - desc_commandline.add_options()("n-value", - hpx::program_options::value()->default_value(10), - "n value for the Fibonacci function"); + // clang-format off + desc_commandline.add_options() + ("n-value", + hpx::program_options::value()->default_value(10), + "n value for the Fibonacci function") + ; + // clang-format on // Initialize and run HPX hpx::local::init_params init_args; diff --git a/libs/core/command_line_handling_local/src/parse_command_line_local.cpp b/libs/core/command_line_handling_local/src/parse_command_line_local.cpp index 646d805b19b3..6352b81c22e0 100644 --- a/libs/core/command_line_handling_local/src/parse_command_line_local.cpp +++ b/libs/core/command_line_handling_local/src/parse_command_line_local.cpp @@ -478,18 +478,24 @@ namespace hpx::local::detail { "run on (default: 0), valid for " "--hpx:queuing=local, --hpx:queuing=abp-priority, " "--hpx:queuing=static, --hpx:queuing=static-priority, " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " "and --hpx:queuing=local-priority only") ("hpx:pu-step", value(), "the step between used processing unit numbers for this " "instance of HPX (default: 1), valid for " "--hpx:queuing=local, --hpx:queuing=abp-priority, " "--hpx:queuing=static, --hpx:queuing=static-priority " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " "and --hpx:queuing=local-priority only") ("hpx:affinity", value(), "the affinity domain the OS threads will be confined to, " "possible values: pu, core, numa, machine (default: pu), valid for " "--hpx:queuing=local, --hpx:queuing=abp-priority, " "--hpx:queuing=static, --hpx:queuing=static-priority " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " " and --hpx:queuing=local-priority only") ("hpx:bind", value >()->composing(), "the detailed affinity description for the OS threads, see " @@ -514,13 +520,16 @@ namespace hpx::local::detail { ("hpx:queuing", value(), "the queue scheduling policy to use, options are " "'local', 'local-priority-fifo','local-priority-lifo', " - "'abp-priority-fifo', 'abp-priority-lifo', 'static', and " - "'static-priority' (default: 'local-priority'; " + "'abp-priority-fifo', 'abp-priority-lifo', 'static', " + "'static-priority', 'local-workrequesting-fifo', and " + "'local-workrequesting-lifo' (default: 'local-priority'; " "all option values can be abbreviated)") ("hpx:high-priority-threads", value(), "the number of operating system threads maintaining a high " "priority queue (default: number of OS threads), valid for " "--hpx:queuing=local-priority,--hpx:queuing=static-priority, " + "--hpx:queuing=local-workrequesting-fifo, " + "--hpx:queuing=local-workrequesting-lifo, " " and --hpx:queuing=abp-priority only)") ("hpx:numa-sensitive", value()->implicit_value(0), "makes the local-priority scheduler NUMA sensitive (" diff --git a/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp b/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp index 5a3df6be1c4a..379879be4b01 100644 --- a/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp +++ b/libs/core/resource_partitioner/include/hpx/resource_partitioner/partitioner_fwd.hpp @@ -105,6 +105,8 @@ namespace hpx::resource { abp_priority_fifo = 5, abp_priority_lifo = 6, shared_priority = 7, + local_workrequesting_fifo = 8, + local_workrequesting_lifo = 9, }; #define HPX_SCHEDULING_POLICY_UNSCOPED_ENUM_DEPRECATION_MSG \ diff --git a/libs/core/resource_partitioner/src/detail_partitioner.cpp b/libs/core/resource_partitioner/src/detail_partitioner.cpp index 68fb708814e7..8dad2d6379ce 100644 --- a/libs/core/resource_partitioner/src/detail_partitioner.cpp +++ b/libs/core/resource_partitioner/src/detail_partitioner.cpp @@ -142,6 +142,12 @@ namespace hpx::resource::detail { case resource::scheduling_policy::local_priority_lifo: sched = "local_priority_lifo"; break; + case resource::scheduling_policy::local_workrequesting_fifo: + sched = "local_workrequesting_fifo"; + break; + case resource::scheduling_policy::local_workrequesting_lifo: + sched = "local_workrequesting_lifo"; + break; case resource::scheduling_policy::static_: sched = "static"; break; @@ -475,6 +481,18 @@ namespace hpx::resource::detail { { default_scheduler = scheduling_policy::local_priority_lifo; } + else if (0 == + std::string("local-workrequesting-fifo") + .find(default_scheduler_str)) + { + default_scheduler = scheduling_policy::local_workrequesting_fifo; + } + else if (0 == + std::string("local-workrequesting-lifo") + .find(default_scheduler_str)) + { + default_scheduler = scheduling_policy::local_workrequesting_lifo; + } else if (0 == std::string("static").find(default_scheduler_str)) { default_scheduler = scheduling_policy::static_; diff --git a/libs/core/schedulers/CMakeLists.txt b/libs/core/schedulers/CMakeLists.txt index 70b038c539a3..0476a76a8893 100644 --- a/libs/core/schedulers/CMakeLists.txt +++ b/libs/core/schedulers/CMakeLists.txt @@ -11,6 +11,7 @@ set(schedulers_headers hpx/schedulers/deadlock_detection.hpp hpx/schedulers/local_priority_queue_scheduler.hpp hpx/schedulers/local_queue_scheduler.hpp + hpx/schedulers/local_workrequesting_scheduler.hpp hpx/schedulers/lockfree_queue_backends.hpp hpx/schedulers/maintain_queue_wait_times.hpp hpx/schedulers/queue_helpers.hpp diff --git a/libs/core/schedulers/include/hpx/modules/schedulers.hpp b/libs/core/schedulers/include/hpx/modules/schedulers.hpp index d77ba076c7e9..d6790d352d73 100644 --- a/libs/core/schedulers/include/hpx/modules/schedulers.hpp +++ b/libs/core/schedulers/include/hpx/modules/schedulers.hpp @@ -6,11 +6,10 @@ #pragma once -#include - #include #include #include +#include #include #include #include diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp new file mode 100644 index 000000000000..bb025b80e987 --- /dev/null +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -0,0 +1,1920 @@ +// Copyright (c) 2007-2023 Hartmut Kaiser +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +/////////////////////////////////////////////////////////////////////////////// +// The scheduler implemented here is adapted from the ideas described in chapter +// 3 of Andreas Prell's PhD thesis 'Embracing Explicit Communication in +// Work-Stealing Runtime Systems' (see: https://epub.uni-bayreuth.de/2990/). +// While it's being described as a work-stealing scheduler, it relies on a +// different working principle if compared to the classic work-stealing. Instead +// of actively pulling work from the work queues of neighboring cores it relies +// on a push model. Cores that run out of work post steal requests that are +// handled by cores that have work awailable by actively sending tasks to the +// requesting core. +// +// When a worker runs out of tasks, it becomes a thief by sending steal requests +// to selected victim workers, those either reply with tasks or signal that they +// have no tasks left. A steal request is a message containing the thief's ID, a +// reference to a channel for sending tasks from victim to thief, and other +// information needed for thread coordination. +// +// When the runtime system starts up, every worker allocates two channels: a +// channel for receiving steal requests and a channel for receiving tasks. A +// reference to the latter is stored in steal requests, and workers use this +// reference to send tasks. By "owning" two channels, workers are able to +// receive steal requests and tasks independently of other workers, which in +// turn enables efficient channel implementations based on single-consumer +// queues. The total number of channels grows linearly with the number of +// workers: n workers allocate 2n channels to communicate with each other. +// +// Matching traditional work stealing, we allow one outstanding steal request +// per worker. This decision has two important consequences: (1) The number of +// steal requests is bounded by n, the number of workers. (2) A thief will never +// receive tasks from more than one victim at a time. It follows from (1) that a +// channel capacity of n - 1 is sufficient to deal with other workers' steal +// requests since no more than n - 1 thieves may request tasks from a single +// victim. We actually increase the capacity to n so that steal requests can be +// returned to their senders, for instance, in case of repeated failure. (2) +// implies that, at any given time, a task channel has at most one sender and +// one receiver, meeting the requirements for an SPSC implementation. +// +// In summary, every worker allocates two specialized channels: an MPSC channel +// where it receives steal requests and an SPSC channel where it receives tasks. +// Steal requests are forwarded rather than acknowledged, letting workers steal +// on behalf of others upon receiving steal requests that cannot be handled. +// Random victim selection fits in well with forwarding steal requests, but may +// cause a lot of communication if only few workers have tasks left. Stealing +// half of a victim's tasks (steal-half) is straightforward to implement with +// private task queues, especially when shared memory is available, in which +// case tasks do not need to be copied. While steal-half is important to tackle +// fine-grained parallelism, polling is necessary to achieve short message +// handling delays when workers schedule long-running tasks. + +namespace hpx::threads::policies { + + /////////////////////////////////////////////////////////////////////////// +#if defined(HPX_HAVE_CXX11_STD_ATOMIC_128BIT) + using default_local_workrequesting_scheduler_terminated_queue = + lockfree_lifo; +#else + using default_local_workrequesting_scheduler_terminated_queue = + lockfree_fifo; +#endif + + /////////////////////////////////////////////////////////////////////////// + // The local_workrequesting_scheduler maintains exactly one queue of work + // items (threads) per OS thread, where this OS thread pulls its next work + // from. + template + class local_workrequesting_scheduler : public scheduler_base + { + public: + using has_periodic_maintenance = std::false_type; + + using thread_queue_type = thread_queue; + + public: + struct init_parameter + { + init_parameter(std::size_t num_queues, + detail::affinity_data const& affinity_data, + std::size_t num_high_priority_queues = std::size_t(-1), + thread_queue_init_parameters const& thread_queue_init = + thread_queue_init_parameters{}, + char const* description = "local_workrequesting_scheduler") + : num_queues_(num_queues) + , num_high_priority_queues_( + num_high_priority_queues == std::size_t(-1) ? + num_queues : + num_high_priority_queues) + , thread_queue_init_(thread_queue_init) + , affinity_data_(affinity_data) + , description_(description) + { + } + + init_parameter(std::size_t num_queues, + detail::affinity_data const& affinity_data, + char const* description) + : num_queues_(num_queues) + , num_high_priority_queues_(num_queues) + , thread_queue_init_() + , affinity_data_(affinity_data) + , description_(description) + { + } + + std::size_t num_queues_; + std::size_t num_high_priority_queues_; + thread_queue_init_parameters thread_queue_init_; + detail::affinity_data const& affinity_data_; + char const* description_; + }; + using init_parameter_type = init_parameter; + + private: + //////////////////////////////////////////////////////////////////////// + struct task_data + { + explicit HPX_HOST_DEVICE_CONSTEXPR task_data( + std::uint16_t num_thread = std::uint16_t(-1)) noexcept + : num_thread_(num_thread) + { + } + + // core number this task data originated from + std::uint16_t num_thread_; + hpx::detail::small_vector tasks_; + }; + + //////////////////////////////////////////////////////////////////////// + using task_channel = lcos::local::channel_spsc; + + //////////////////////////////////////////////////////////////////////// + struct steal_request + { + enum class state : std::uint16_t + { + working = 0, + idle = 2, + failed = 4 + }; + + steal_request() noexcept + : victims_() + { + } + + steal_request(std::size_t num_thread, task_channel* channel, + mask_cref_type victims, bool idle, bool stealhalf) + : channel_(channel) + , victims_(victims) + , num_thread_(static_cast(num_thread)) + , attempt_(static_cast(count(victims) - 1)) + , state_(idle ? state::idle : state::working) + , stealhalf_(stealhalf) + { + } + + task_channel* channel_ = nullptr; + mask_type victims_; + std::uint16_t num_thread_ = static_cast(-1); + std::uint16_t attempt_ = 0; + state state_ = state::failed; + // true ? attempt steal-half : attempt steal-one + bool stealhalf_ = false; + }; + + //////////////////////////////////////////////////////////////////////// + using steal_request_channel = + lcos::local::base_channel_mpsc; + + //////////////////////////////////////////////////////////////////////// + struct scheduler_data + { + scheduler_data() noexcept + : victims_() + { + } + + ~scheduler_data() + { + delete queue_; + delete high_priority_queue_; + delete bound_queue_; + delete requests_; + delete tasks_; + } + + // interval at which we re-decide on whether we should steal just + // one task or half of what's available + static constexpr std::uint16_t num_steal_adaptive_interval_ = 25; + + void init(std::size_t num_thread, std::size_t size, + thread_queue_init_parameters const& queue_init, + bool need_high_priority_queue) + { + if (queue_ == nullptr) + { + num_thread_ = static_cast(num_thread); + + // initialize queues + queue_ = new thread_queue_type(queue_init); + if (need_high_priority_queue) + { + high_priority_queue_ = + new thread_queue_type(queue_init); + } + bound_queue_ = new thread_queue_type(queue_init); + + // initialize channels needed for work stealing + requests_ = new steal_request_channel(size); + tasks_ = new task_channel(1); + } + } + + // initial affinity mask for this core + mask_type victims_; + + // queues for threads scheduled on this core + thread_queue_type* queue_ = nullptr; + thread_queue_type* high_priority_queue_ = nullptr; + thread_queue_type* bound_queue_ = nullptr; + + // channel for posting steal requests to this core (use + // hpx::spinlock) + steal_request_channel* requests_ = nullptr; + + // one channel per steal request per core + task_channel* tasks_ = nullptr; + + // the number of outstanding steal requests + std::uint16_t requested_ = 0; + + // core number this scheduler data instance refers to + std::uint16_t num_thread_ = static_cast(-1); + + // adaptive stealing + std::uint16_t num_recent_steals_ = 0; + std::uint16_t num_recent_tasks_executed_ = 0; + bool stealhalf_ = false; + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + // core number the last stolen tasks originated from + std::uint16_t last_victim_ = static_cast(-1); +#endif +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + // collect some statistics + std::uint32_t steal_requests_sent_ = 0; + std::uint32_t steal_requests_received_ = 0; + std::uint32_t steal_requests_discarded_ = 0; +#endif + }; + + public: + static unsigned int random_seed() noexcept + { + static std::random_device rd; + return rd(); + } + + explicit local_workrequesting_scheduler(init_parameter_type const& init, + bool deferred_initialization = true) + : scheduler_base(init.num_queues_, init.description_, + init.thread_queue_init_, + policies::scheduler_mode::fast_idle_mode) + , data_(init.num_queues_) + , low_priority_queue_(thread_queue_init_) + , curr_queue_(0) + , gen_(random_seed()) + , affinity_data_(init.affinity_data_) + , num_queues_(init.num_queues_) + , num_high_priority_queues_(init.num_high_priority_queues_) + { + HPX_ASSERT(init.num_queues_ != 0); + HPX_ASSERT(num_high_priority_queues_ != 0); + HPX_ASSERT(num_high_priority_queues_ <= num_queues_); + + if (!deferred_initialization) + { + for (std::size_t i = 0; i != init.num_queues_; ++i) + { + data_[i].data_.init(i, init.num_queues_, + this->thread_queue_init_, + i < num_high_priority_queues_); + } + } + } + + ~local_workrequesting_scheduler() override = default; + + static std::string get_scheduler_name() + { + return "local_workrequesting_scheduler"; + } + +#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES + std::uint64_t get_creation_time(bool reset) override + { + std::uint64_t time = 0; + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + time += d.high_priority_queue_->get_creation_time(reset); + } + time += d.queue_->get_creation_time(reset); + time += d.bound_queue_->get_creation_time(reset); + } + + return time + low_priority_queue_.get_creation_time(reset); + } + + std::uint64_t get_cleanup_time(bool reset) override + { + std::uint64_t time = 0; + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + time += d.high_priority_queue_->get_cleanup_time(reset); + } + time += d.queue_->get_cleanup_time(reset); + time += d.bound_queue_->get_cleanup_time(reset); + } + + return time + low_priority_queue_.get_cleanup_time(reset); + } +#endif + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + std::int64_t get_num_pending_misses( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_num_pending_misses( + reset); + } + count += d.queue_->get_num_pending_misses(reset); + count += d.bound_queue_->get_num_pending_misses(reset); + } + + return count + + low_priority_queue_.get_num_pending_misses(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_num_pending_misses(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_pending_misses(reset); + } + count += d.queue_->get_num_pending_misses(reset); + return count + d.bound_queue_->get_num_pending_misses(reset); + } + + std::int64_t get_num_pending_accesses( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_pending_accesses( + reset); + } + count += d.queue_->get_num_pending_accesses(reset); + count += d.bound_queue_->get_num_pending_accesses(reset); + } + + return count + + low_priority_queue_.get_num_pending_accesses(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_pending_accesses(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_pending_accesses(reset); + } + count += d.queue_->get_num_pending_accesses(reset); + return count + d.bound_queue_->get_num_pending_accesses(reset); + } + + std::int64_t get_num_stolen_from_pending( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_pending( + reset); + } + count += d.queue_->get_num_stolen_from_pending(reset); + count += d.bound_queue_->get_num_stolen_from_pending(reset); + } + + return count + + low_priority_queue_.get_num_stolen_from_pending(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_pending(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_from_pending(reset); + } + count += d.queue_->get_num_stolen_from_pending(reset); + return count + d.bound_queue_->get_num_stolen_from_pending(reset); + } + + std::int64_t get_num_stolen_to_pending( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_pending( + reset); + } + count += d.queue_->get_num_stolen_to_pending(reset); + count += d.bound_queue_->get_num_stolen_to_pending(reset); + } + + return count + + low_priority_queue_.get_num_stolen_to_pending(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_pending(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_to_pending(reset); + } + count += d.queue_->get_num_stolen_to_pending(reset); + return count + d.bound_queue_->get_num_stolen_to_pending(reset); + } + + std::int64_t get_num_stolen_from_staged( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_staged( + reset); + } + count += d.queue_->get_num_stolen_from_staged(reset); + count += d.bound_queue_->get_num_stolen_from_staged(reset); + } + + return count + + low_priority_queue_.get_num_stolen_from_staged(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_from_staged(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_from_staged(reset); + } + count += d.queue_->get_num_stolen_from_staged(reset); + return count + d.bound_queue_->get_num_stolen_from_staged(reset); + } + + std::int64_t get_num_stolen_to_staged( + std::size_t num_thread, bool reset) override + { + std::int64_t count = 0; + if (num_thread == std::size_t(-1)) + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_staged( + reset); + } + count += d.queue_->get_num_stolen_to_staged(reset); + count += d.bound_queue_->get_num_stolen_to_staged(reset); + } + + return count + + low_priority_queue_.get_num_stolen_to_staged(reset); + } + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_num_stolen_to_staged(reset); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_num_stolen_to_staged(reset); + } + count += d.queue_->get_num_stolen_to_staged(reset); + return count + d.bound_queue_->get_num_stolen_to_staged(reset); + } +#endif + + /////////////////////////////////////////////////////////////////////// + void abort_all_suspended_threads() override + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + data_[i].data_.queue_->abort_all_suspended_threads(); + data_[i].data_.bound_queue_->abort_all_suspended_threads(); + } + } + + /////////////////////////////////////////////////////////////////////// + bool cleanup_terminated(bool delete_all) override + { + bool empty = true; + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + empty = d.high_priority_queue_->cleanup_terminated( + delete_all) && + empty; + } + empty = d.queue_->cleanup_terminated(delete_all) && empty; + empty = d.bound_queue_->cleanup_terminated(delete_all) && empty; + } + return low_priority_queue_.cleanup_terminated(delete_all) && empty; + } + + bool cleanup_terminated( + std::size_t num_thread, bool delete_all) override + { + auto& d = data_[num_thread].data_; + bool empty = d.queue_->cleanup_terminated(delete_all); + empty = d.queue_->cleanup_terminated(delete_all) && empty; + if (!delete_all) + return empty; + + if (num_thread < num_high_priority_queues_) + { + empty = + d.high_priority_queue_->cleanup_terminated(delete_all) && + empty; + } + + if (num_thread == num_queues_ - 1) + { + return low_priority_queue_.cleanup_terminated(delete_all) && + empty; + } + return empty; + } + + /////////////////////////////////////////////////////////////////////// + // create a new thread and schedule it if the initial state is equal to + // pending + void create_thread(thread_init_data& data, thread_id_ref_type* id, + error_code& ec) override + { + std::size_t num_thread = + data.schedulehint.mode == thread_schedule_hint_mode::thread ? + data.schedulehint.hint : + std::size_t(-1); + + if (std::size_t(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread); + + data.schedulehint.mode = thread_schedule_hint_mode::thread; + data.schedulehint.hint = static_cast(num_thread); + + // now create the thread + switch (data.priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + if (data.priority == thread_priority::boost) + { + data.priority = thread_priority::normal; + } + + if (num_thread >= num_high_priority_queues_) + { + num_thread %= num_high_priority_queues_; + } + + // we never stage high priority threads, so there is no need to + // call wait_or_add_new for those. + data_[num_thread].data_.high_priority_queue_->create_thread( + data, id, ec); + break; + } + + case thread_priority::low: + low_priority_queue_.create_thread(data, id, ec); + break; + + case thread_priority::bound: + HPX_ASSERT(num_thread < num_queues_); + data_[num_thread].data_.bound_queue_->create_thread( + data, id, ec); + break; + + case thread_priority::default_: + case thread_priority::normal: + HPX_ASSERT(num_thread < num_queues_); + data_[num_thread].data_.queue_->create_thread(data, id, ec); + break; + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::create_thread", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + // Retrieve the next viable steal request from our channel + bool try_receiving_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + bool ret = d.requests_->get(&req); + while (ret && req.state_ == steal_request::state::failed) + { + // forget the received steal request + --data_[req.num_thread_].data_.requested_; + + // there should have been exactly one outstanding steal request + HPX_ASSERT(data_[req.num_thread_].data_.requested_ == 0); + + // try to retrieve next steal request + ret = d.requests_->get(&req); + } + + // No special treatment for other states + HPX_ASSERT( + (ret && req.state_ != steal_request::state::failed) || !ret); + + return ret; + } + + // Pass steal request on to another worker. Returns true if we have + // handled our own steal request. + bool decline_or_forward_steal_request( + scheduler_data& d, steal_request& req) noexcept + { + HPX_ASSERT(req.attempt_ < num_queues_); + + if (req.num_thread_ == d.num_thread_) + { + // Steal request was either returned by another worker or + // picked up by us. + + if (req.state_ == steal_request::state::idle || + d.queue_->get_pending_queue_length( + std::memory_order_relaxed) != 0) + { +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_discarded_; +#endif + // we have work now, drop this steal request + --d.requested_; + + // there should have been exactly one outstanding steal + // request + HPX_ASSERT(d.requested_ == 0); + } + else + { + // Continue circulating the steal request if it makes sense + req.state_ = steal_request::state::idle; + req.victims_ = d.victims_; + req.attempt_ = + static_cast(count(d.victims_) - 1); + + std::size_t victim = next_victim(d, req); + data_[victim].data_.requests_->set(HPX_MOVE(req)); +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_sent_; +#endif + } + + return true; + } + + // send this steal request on to the next (random) core + ++req.attempt_; + set(req.victims_, d.num_thread_); // don't ask a core twice + + HPX_ASSERT(req.attempt_ == count(req.victims_) - 1); + + std::size_t victim = next_victim(d, req); + data_[victim].data_.requests_->set(HPX_MOVE(req)); +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_sent_; +#endif + return false; + } + + // decline_or_forward_all_steal_requests is only called when a worker + // has nothing else to do but to relay steal requests, which means the + // worker is idle. + void decline_or_forward_all_steal_requests(scheduler_data& d) noexcept + { + steal_request req; + while (try_receiving_steal_request(d, req)) + { +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_received_; +#endif + decline_or_forward_steal_request(d, req); + } + } + + // Handle a steal request by sending tasks in return or passing it on to + // another worker. Returns true if the request was satisfied. + bool handle_steal_request( + scheduler_data& d, steal_request& req) noexcept + { +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_received_; +#endif + if (req.num_thread_ == d.num_thread_) + { + // got back our own steal request. + HPX_ASSERT(req.state_ != steal_request::state::failed); + + // Defer the decision to decline_steal_request + decline_or_forward_steal_request(d, req); + return false; + } + + // Send tasks from our queue to the requesting core, depending on + // what's requested, either one task or half of the available tasks + std::size_t max_num_to_steal = 1; + if (req.stealhalf_) + { + max_num_to_steal = d.queue_->get_pending_queue_length( + std::memory_order_relaxed) / + 2; + } + + if (max_num_to_steal != 0) + { + task_data thrds(d.num_thread_); + thrds.tasks_.reserve(max_num_to_steal); + + thread_id_ref_type thrd; + while (max_num_to_steal-- != 0 && + d.queue_->get_next_thread(thrd, false, true)) + { +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.queue_->increment_num_stolen_from_pending(); +#endif + thrds.tasks_.push_back(HPX_MOVE(thrd)); + } + + // we are ready to send at least one task + if (!thrds.tasks_.empty()) + { + // send these tasks to the core that has sent the steal + // request + req.channel_->set(HPX_MOVE(thrds)); + + // wake the thread up so that it can pick up the stolen + // tasks + do_some_work(req.num_thread_); + + return true; + } + } + + // There's nothing we can do with this steal request except pass + // it on to a different worker + decline_or_forward_steal_request(d, req); + return false; + } + + // Return the next thread to be executed, return false if none is + // available + bool get_next_thread(std::size_t num_thread, bool running, + thread_id_ref_type& thrd, bool enable_stealing) override + { + HPX_ASSERT(num_thread < num_queues_); + + auto& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + bool result = d.high_priority_queue_->get_next_thread(thrd); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.high_priority_queue_->increment_num_pending_accesses(); + if (result) + { + ++d.num_recent_tasks_executed_; + return true; + } + d.high_priority_queue_->increment_num_pending_misses(); +#else + if (result) + { + ++d.num_recent_tasks_executed_; + return true; + } +#endif + } + + bool result = false; + for (thread_queue_type* this_queue : {d.bound_queue_, d.queue_}) + { + result = this_queue->get_next_thread(thrd); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + this_queue->increment_num_pending_accesses(); +#endif + if (result) + break; +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + this_queue->increment_num_pending_misses(); +#endif + } + + if (enable_stealing && result) + { + // We found a task to run, however before running it we handle + // steal requests (assuming that there is more work left that + // could be used to satisfy steal requests). + + steal_request req; + while (try_receiving_steal_request(d, req)) + { + if (!handle_steal_request(d, req)) + break; + } + + ++d.num_recent_tasks_executed_; + return true; + } + + // Give up if we have work to convert. + if (d.queue_->get_staged_queue_length(std::memory_order_relaxed) != + 0 || + !running) + { + return false; + } + + if (low_priority_queue_.get_next_thread(thrd)) + { + ++d.num_recent_tasks_executed_; + return true; + } + + return false; + } + + // Schedule the passed thread + void schedule_thread(thread_id_ref_type thrd, + threads::thread_schedule_hint schedulehint, + bool allow_fallback = false, + thread_priority priority = thread_priority::normal) override + { + std::size_t num_thread = std::size_t(-1); + if (schedulehint.mode == thread_schedule_hint_mode::thread) + { + num_thread = schedulehint.hint; + } + else + { + allow_fallback = false; + } + + if (std::size_t(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread, allow_fallback); + + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); + HPX_ASSERT(num_thread < num_queues_); + + switch (priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + data_[num].data_.high_priority_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + + case thread_priority::low: + low_priority_queue_.schedule_thread(HPX_MOVE(thrd)); + break; + + case thread_priority::default_: + case thread_priority::normal: + data_[num_thread].data_.queue_->schedule_thread(HPX_MOVE(thrd)); + break; + + case thread_priority::bound: + data_[num_thread].data_.bound_queue_->schedule_thread( + HPX_MOVE(thrd)); + break; + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::schedule_thread", + "unknown thread priority value (thread_priority::unknown)"); + } + } + } + + void schedule_thread_last(thread_id_ref_type thrd, + threads::thread_schedule_hint schedulehint, + bool allow_fallback = false, + thread_priority priority = thread_priority::normal) override + { + std::size_t num_thread = std::size_t(-1); + if (schedulehint.mode == thread_schedule_hint_mode::thread) + { + num_thread = schedulehint.hint; + } + else + { + allow_fallback = false; + } + + if (std::size_t(-1) == num_thread) + { + num_thread = curr_queue_++ % num_queues_; + } + else if (num_thread >= num_queues_) + { + num_thread %= num_queues_; + } + + num_thread = select_active_pu(num_thread, allow_fallback); + + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == this); + HPX_ASSERT(num_thread < num_queues_); + + switch (priority) + { + case thread_priority::high_recursive: + case thread_priority::high: + case thread_priority::boost: + { + std::size_t num = num_thread; + if (num >= num_high_priority_queues_) + { + num %= num_high_priority_queues_; + } + + data_[num].data_.high_priority_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + + case thread_priority::low: + low_priority_queue_.schedule_thread(HPX_MOVE(thrd), true); + break; + + default: + case thread_priority::default_: + case thread_priority::normal: + data_[num_thread].data_.queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + + case thread_priority::bound: + data_[num_thread].data_.bound_queue_->schedule_thread( + HPX_MOVE(thrd), true); + break; + } + } + + /// Destroy the passed thread as it has been terminated + void destroy_thread(threads::thread_data* thrd) override + { + HPX_ASSERT(thrd->get_scheduler_base() == this); + thrd->get_queue().destroy_thread(thrd); + } + + /////////////////////////////////////////////////////////////////////// + // This returns the current length of the queues (work items and new + // items) + std::int64_t get_queue_length( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return queue length of one specific queue. + std::int64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + auto const& d = data_[num_thread].data_; + + if (num_thread < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_queue_length(); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_queue_length(); + } + count += d.queue_->get_queue_length(); + return count + d.bound_queue_->get_queue_length(); + } + + // Cumulative queue lengths of all queues. + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += d.high_priority_queue_->get_queue_length(); + } + count += d.queue_->get_queue_length(); + count += d.bound_queue_->get_queue_length(); + } + return count + low_priority_queue_.get_queue_length(); + } + + /////////////////////////////////////////////////////////////////////// + // Queries the current thread count of the queues. + std::int64_t get_thread_count( + thread_schedule_state state = thread_schedule_state::unknown, + thread_priority priority = thread_priority::default_, + std::size_t num_thread = std::size_t(-1), + bool /* reset */ = false) const override + { + // Return thread count of one specific queue. + std::int64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + switch (priority) + { + case thread_priority::default_: + { + if (num_thread < num_high_priority_queues_) + { + count = d.high_priority_queue_->get_thread_count(state); + } + if (num_thread == num_queues_ - 1) + { + count += low_priority_queue_.get_thread_count(state); + } + count += d.queue_->get_thread_count(state); + return count + d.bound_queue_->get_thread_count(state); + } + + case thread_priority::low: + { + if (num_queues_ - 1 == num_thread) + return low_priority_queue_.get_thread_count(state); + break; + } + + case thread_priority::normal: + return d.queue_->get_thread_count(state); + + case thread_priority::bound: + return d.bound_queue_->get_thread_count(state); + + case thread_priority::boost: + case thread_priority::high: + case thread_priority::high_recursive: + { + if (num_thread < num_high_priority_queues_) + { + return d.high_priority_queue_->get_thread_count(state); + } + break; + } + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::get_thread_count", + "unknown thread priority value " + "(thread_priority::unknown)"); + return 0; + } + } + return 0; + } + + // Return the cumulative count for all queues. + switch (priority) + { + case thread_priority::default_: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[i].data_; + if (i < num_high_priority_queues_) + { + count += + d.high_priority_queue_->get_thread_count(state); + } + count += d.queue_->get_thread_count(state); + count += d.bound_queue_->get_thread_count(state); + } + count += low_priority_queue_.get_thread_count(state); + break; + } + + case thread_priority::low: + return low_priority_queue_.get_thread_count(state); + + case thread_priority::normal: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + count += data_[i].data_.queue_->get_thread_count(state); + } + break; + } + + case thread_priority::bound: + { + for (std::size_t i = 0; i != num_queues_; ++i) + { + count += + data_[i].data_.bound_queue_->get_thread_count(state); + } + break; + } + + case thread_priority::boost: + case thread_priority::high: + case thread_priority::high_recursive: + { + for (std::size_t i = 0; i != num_high_priority_queues_; ++i) + { + count += + data_[i].data_.high_priority_queue_->get_thread_count( + state); + } + break; + } + + default: + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::get_thread_count", + "unknown thread priority value " + "(thread_priority::unknown)"); + return 0; + } + } + return count; + } + + // Queries whether a given core is idle + bool is_core_idle(std::size_t num_thread) const override + { + if (num_thread < num_queues_) + { + for (thread_queue_type* this_queue : + {data_[num_thread].data_.bound_queue_, + data_[num_thread].data_.queue_}) + { + if (this_queue->get_queue_length() != 0) + { + return false; + } + } + } + + if (num_thread < num_high_priority_queues_ && + data_[num_thread] + .data_.high_priority_queue_->get_queue_length() != 0) + { + return false; + } + return true; + } + + /////////////////////////////////////////////////////////////////////// + // Enumerate matching threads from all queues + bool enumerate_threads(hpx::function const& f, + thread_schedule_state state = + thread_schedule_state::unknown) const override + { + bool result = true; + for (std::size_t i = 0; i != num_high_priority_queues_; ++i) + { + result = result && + data_[i].data_.high_priority_queue_->enumerate_threads( + f, state); + } + + result = result && low_priority_queue_.enumerate_threads(f, state); + + for (std::size_t i = 0; i != num_queues_; ++i) + { + result = result && + data_[i].data_.queue_->enumerate_threads(f, state); + result = result && + data_[i].data_.bound_queue_->enumerate_threads(f, state); + } + return result; + } + +#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME + /////////////////////////////////////////////////////////////////////// + // Queries the current average thread wait time of the queues. + std::int64_t get_average_thread_wait_time( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return average thread wait time of one specific queue. + std::uint64_t wait_time = 0; + std::uint64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time = + d.high_priority_queue_->get_average_thread_wait_time(); + ++count; + } + + if (num_thread == num_queues_ - 1) + { + wait_time += + low_priority_queue_.get_average_thread_wait_time(); + ++count; + } + + wait_time += d.queue_->get_average_thread_wait_time(); + wait_time += d.bound_queue_->get_average_thread_wait_time(); + return wait_time / (count + 1); + } + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time += + d.high_priority_queue_->get_average_thread_wait_time(); + } + wait_time += d.queue_->get_average_thread_wait_time(); + wait_time += d.bound_queue_->get_average_thread_wait_time(); + ++count; + } + + return (wait_time + + low_priority_queue_.get_average_thread_wait_time()) / + (count + 1); + } + + /////////////////////////////////////////////////////////////////////// + // Queries the current average task wait time of the queues. + std::int64_t get_average_task_wait_time( + std::size_t num_thread = std::size_t(-1)) const override + { + // Return average task wait time of one specific queue. + // Return average thread wait time of one specific queue. + std::uint64_t wait_time = 0; + std::uint64_t count = 0; + if (std::size_t(-1) != num_thread) + { + HPX_ASSERT(num_thread < num_queues_); + + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time = + d.high_priority_queue_->get_average_task_wait_time(); + ++count; + } + + if (num_thread == num_queues_ - 1) + { + wait_time += + low_priority_queue_.get_average_task_wait_time(); + ++count; + } + + wait_time += d.queue_->get_average_task_wait_time(); + wait_time += d.bound_queue_->get_average_task_wait_time(); + return wait_time / (count + 1); + } + + for (std::size_t i = 0; i != num_queues_; ++i) + { + auto const& d = data_[num_thread].data_; + if (num_thread < num_high_priority_queues_) + { + wait_time += + d.high_priority_queue_->get_average_task_wait_time(); + } + wait_time += d.queue_->get_average_task_wait_time(); + wait_time += d.bound_queue_->get_average_task_wait_time(); + ++count; + } + + return (wait_time + + low_priority_queue_.get_average_task_wait_time()) / + (count + 1); + } +#endif + + // return a random victim for the current stealing operation + std::size_t random_victim(steal_request const& req) noexcept + { + std::size_t result = 0; + + { + // generate at most 3 random numbers before resorting to more + // expensive algorithm + std::uniform_int_distribution uniform( + 0, std::int16_t(num_queues_ - 1)); + + int attempts = 0; + do + { + result = uniform(gen_); + if (result != req.num_thread_ && + !test(req.victims_, result)) + { + HPX_ASSERT(result < num_queues_); + return result; + } + } while (++attempts < 3); + } + + // to avoid infinite trials we randomly select one of the possible + // victims + std::uniform_int_distribution uniform( + 0, std::int16_t(num_queues_ - count(req.victims_) - 1)); + + // generate one more random number + std::size_t selected_victim = uniform(gen_); + for (std::size_t i = 0; i != num_queues_; ++i) + { + if (!test(req.victims_, i)) + { + if (selected_victim == 0) + { + result = i; + break; + } + --selected_victim; + } + } + + HPX_ASSERT(result < num_queues_ && result != req.num_thread_ && + !test(req.victims_, result)); + + return result; + } + + // return the number of the next victim core + std::size_t next_victim([[maybe_unused]] scheduler_data& d, + steal_request const& req) noexcept + { + std::size_t victim = std::size_t(-1); + + // return thief if max steal attempts has been reached or no more + // cores are available for stealing + if (req.attempt_ == num_queues_ - 1) + { + // Return steal request to thief + victim = req.num_thread_; + } + else + { + HPX_ASSERT( + req.num_thread_ == d.num_thread_ || req.attempt_ != 0); + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + if (d.last_victim_ != std::uint16_t(-1)) + { + victim = d.last_victim_; + } + else +#endif + { + victim = random_victim(req); + } + } + + // couldn't find victim, return steal request to thief + if (victim == std::size_t(-1)) + { + victim = req.num_thread_; + HPX_ASSERT(victim != d.num_thread_); + } + + HPX_ASSERT(victim < num_queues_); + HPX_ASSERT(req.attempt_ < num_queues_); + + return victim; + } + + // Every worker can have at most MAXSTEAL pending steal requests. A + // steal request with idle == false indicates that the requesting worker + // is still busy working on some tasks. A steal request with idle == + // true indicates that the requesting worker is in fact idle and has + // nothing to work on. + void send_steal_request(scheduler_data& d, bool idle = true) noexcept + { + if (d.requested_ == 0) + { + // Estimate work-stealing efficiency during the last interval; + // switch strategies if the value is below a threshold + if (d.num_recent_steals_ >= + scheduler_data::num_steal_adaptive_interval_) + { + double ratio = + static_cast(d.num_recent_tasks_executed_) / + d.num_steal_adaptive_interval_; + + d.num_recent_steals_ = 0; + d.num_recent_tasks_executed_ = 0; + + if (ratio >= 2.) + { + d.stealhalf_ = true; + } + else + { + if (d.stealhalf_) + { + d.stealhalf_ = false; + } + else if (ratio <= 1.) + { + d.stealhalf_ = true; + } + } + } + + steal_request req( + d.num_thread_, d.tasks_, d.victims_, idle, d.stealhalf_); + std::size_t victim = next_victim(d, req); + + ++d.requested_; + data_[victim].data_.requests_->set(HPX_MOVE(req)); +#if defined(HPX_HAVE_WORKREQUESTING_STEAL_STATISTICS) + ++d.steal_requests_sent_; +#endif + } + } + + // Try receiving tasks that are sent by another core as a response to + // one of our steal requests. This returns true if new tasks were + // received. + bool try_receiving_tasks(scheduler_data& d, std::size_t& added, + thread_id_ref_type* next_thrd) + { + task_data thrds{}; + if (d.tasks_->get(&thrds)) + { + // keep track of number of outstanding steal requests, there + // should have been at most one + --d.requested_; + HPX_ASSERT(d.requested_ == 0); + + // if at least one thrd was received + if (!thrds.tasks_.empty()) + { + // Schedule all but the first received task in reverse order + // to maintain the sequence of tasks as pulled from the + // victims queue. + for (std::size_t i = thrds.tasks_.size() - 1; i != 0; --i) + { + HPX_ASSERT(thrds.tasks_[i]); + d.queue_->schedule_thread( + HPX_MOVE(thrds.tasks_[i]), true); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.queue_->increment_num_stolen_to_pending(); +#endif + ++added; + } + +#if defined(HPX_HAVE_WORKREQUESTING_LAST_VICTIM) + // store the originating core for the next stealing + // operation + d.last_victim_ = thrds.num_thread_; + HPX_ASSERT(d.last_victim_ != d.num_thread_); +#endif + // the last of the received tasks will be either directly + // executed or normally scheduled + if (next_thrd != nullptr) + { + // directly return the last thread as it should be run + // immediately + ++d.num_recent_tasks_executed_; + *next_thrd = HPX_MOVE(thrds.tasks_.front()); + } + else + { + d.queue_->schedule_thread( + HPX_MOVE(thrds.tasks_.front()), true); + +#ifdef HPX_HAVE_THREAD_STEALING_COUNTS + d.queue_->increment_num_stolen_to_pending(); +#endif + ++added; + } + + ++d.num_recent_steals_; + return true; + } + } + return false; + } + + // This is a function which gets called periodically by the thread + // manager to allow for maintenance tasks to be executed in the + // scheduler. Returns true if the OS thread calling this function has to + // be terminated (i.e. no more work has to be done). + bool wait_or_add_new(std::size_t num_thread, bool running, + [[maybe_unused]] std::int64_t& idle_loop_count, + bool enable_stealing, std::size_t& added, + thread_id_ref_type* next_thrd = nullptr) override + { + HPX_ASSERT(num_thread < num_queues_); + + added = 0; + + auto& d = data_[num_thread].data_; + + // We don't need to call wait_or_add_new for high priority or bound + // threads as these threads are never created 'staged'. + + bool result = + d.queue_->wait_or_add_new(running, added, enable_stealing); + + // check if work was available + if (0 != added) + return result; + + if (num_thread == num_queues_ - 1) + { + result = low_priority_queue_.wait_or_add_new(running, added) && + result; + } + + // check if we have been disabled + if (!running) + return true; + + // return if no stealing is requested (or not possible) + if (num_queues_ == 1 || !enable_stealing) + return result; + + // attempt to steal more work + send_steal_request(d); + HPX_ASSERT(d.requested_ != 0); + + // now try to handle steal requests again if we have not received a + // task from some other core yet + if (!try_receiving_tasks(d, added, next_thrd)) + { + // decline or forward all pending steal requests + decline_or_forward_all_steal_requests(d); + } + +#ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION + // no new work is available, are we deadlocked? + if (HPX_UNLIKELY(get_minimal_deadlock_detection_enabled() && + LHPX_ENABLED(error))) + { + bool suspended_only = true; + + for (std::size_t i = 0; suspended_only && i != num_queues_; ++i) + { + suspended_only = + data_[i].data_.queue_->dump_suspended_threads( + i, idle_loop_count, running); + } + + if (HPX_UNLIKELY(suspended_only)) + { + if (running) + { + LTM_(error) //-V128 + << "queue(" << num_thread << "): " + << "no new work available, are we " + "deadlocked?"; + } + else + { + LHPX_CONSOLE_( + hpx::util::logging::level::error) //-V128 + << " [TM] " //-V128 + << "queue(" << num_thread << "): " + << "no new work available, are we " + "deadlocked?\n"; + } + } + } +#endif + return result; + } + + /////////////////////////////////////////////////////////////////////// + void on_start_thread(std::size_t num_thread) override + { + hpx::threads::detail::set_local_thread_num_tss(num_thread); + hpx::threads::detail::set_thread_pool_num_tss( + parent_pool_->get_pool_id().index()); + + auto& d = data_[num_thread].data_; + d.init(num_thread, num_queues_, this->thread_queue_init_, + num_thread < num_high_priority_queues_); + + d.queue_->on_start_thread(num_thread); + d.bound_queue_->on_start_thread(num_thread); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_start_thread(num_thread); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_start_thread(num_thread); + } + + std::size_t num_threads = num_queues_; + //auto const& topo = create_topology(); + + // Initially set all bits, code below resets the bits corresponding + // to cores that can serve as a vistim for the current core. A set + // bit in this mask means 'do not steal from this core'. + resize(d.victims_, num_threads); + reset(d.victims_); + set(d.victims_, num_thread); + //for (std::size_t i = 0; i != num_threads; ++i) + //{ + // set(d.victims_, i); + //} + // + //// get NUMA domain masks of all queues... + //std::vector numa_masks(num_threads); + //std::vector numa_domains(num_threads); + //std::vector core_masks(num_threads); + //for (std::size_t i = 0; i != num_threads; ++i) + //{ + // std::size_t num_pu = affinity_data_.get_pu_num(i); + // numa_masks[i] = topo.get_numa_node_affinity_mask(num_pu); + // numa_domains[i] = static_cast( + // topo.get_numa_node_number(num_pu)); + // core_masks[i] = topo.get_core_affinity_mask(num_pu); + //} + // + //// iterate over the number of threads again to determine where to + //// steal from + //std::ptrdiff_t radius = + // std::lround(static_cast(num_threads) / 2.0); + // + //mask_cref_type numa_mask = numa_masks[num_thread]; + //mask_cref_type core_mask = core_masks[num_thread]; + // + //auto iterate = [&](auto&& f) { + // // check our neighbors in a radial fashion (left and right + // // alternating, increasing distance each iteration) + // std::ptrdiff_t i = 1; + // for (/**/; i < radius; ++i) + // { + // std::ptrdiff_t left = + // (static_cast(num_thread) - i) % + // static_cast(num_threads); + // if (left < 0) + // left = num_threads + left; + // + // if (f(std::size_t(left))) + // { + // unset(data_[num_thread].data_.victims_, + // static_cast(left)); + // } + // + // std::size_t right = (num_thread + i) % num_threads; + // if (f(right)) + // { + // unset(data_[num_thread].data_.victims_, right); + // } + // } + // if ((num_threads % 2) == 0) + // { + // std::size_t right = (num_thread + i) % num_threads; + // if (f(right)) + // { + // unset(data_[num_thread].data_.victims_, right); + // } + // } + //}; + // + //// check for threads that share the same core... + //iterate([&](std::size_t other_num_thread) { + // return any(core_mask & core_masks[other_num_thread]); + //}); + // + //// check for threads that share the same NUMA domain... + //iterate([&](std::size_t other_num_thread) { + // return !any(core_mask & core_masks[other_num_thread]) && + // any(numa_mask & numa_masks[other_num_thread]); + //}); + // + //// check for the rest and if we are NUMA aware + //if (has_scheduler_mode( + // policies::scheduler_mode::enable_stealing_numa)) + //{ + // iterate([&](std::size_t other_num_thread) { + // // allow stealing from neighboring NUMA domain only + // std::ptrdiff_t numa_distance = numa_domains[num_thread] - + // numa_domains[other_num_thread]; + // if (numa_distance > 1 || numa_distance < -1) + // return false; + // // steal from even cores from neighboring NUMA domains + // if (numa_distance == 1 || numa_distance == -1) + // return other_num_thread % 2 == 0; + // // cores from our domain are handled above + // return false; + // }); + //} + } + + void on_stop_thread(std::size_t num_thread) override + { + auto& d = data_[num_thread].data_; + + d.queue_->on_stop_thread(num_thread); + d.bound_queue_->on_stop_thread(num_thread); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_stop_thread(num_thread); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_stop_thread(num_thread); + } + } + + void on_error( + std::size_t num_thread, std::exception_ptr const& e) override + { + auto& d = data_[num_thread].data_; + + d.queue_->on_error(num_thread, e); + d.bound_queue_->on_error(num_thread, e); + if (num_thread < num_high_priority_queues_) + { + d.high_priority_queue_->on_error(num_thread, e); + } + + if (num_thread == num_queues_ - 1) + { + low_priority_queue_.on_error(num_thread, e); + } + } + + void reset_thread_distribution() override + { + curr_queue_.store(0, std::memory_order_release); + } + + void set_scheduler_mode(scheduler_mode mode) noexcept override + { + // we should not disable stealing for this scheduler, this would + // possibly lead to deadlocks + scheduler_base::set_scheduler_mode(mode | + policies::scheduler_mode::enable_stealing | + policies::scheduler_mode::enable_stealing_numa); + } + + protected: + std::vector> data_; + thread_queue_type low_priority_queue_; + + std::atomic curr_queue_; + + std::mt19937 gen_; + + detail::affinity_data const& affinity_data_; + std::size_t const num_queues_; + std::size_t const num_high_priority_queues_; + }; +} // namespace hpx::threads::policies + +#include diff --git a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp index 7758770b9306..6861f9a84683 100644 --- a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp @@ -457,7 +457,8 @@ namespace hpx::threads::policies { if (delete_all) { - // do not lock mutex while deleting all threads, do it piece-wise + // do not lock mutex while deleting all threads, do it + // piece-wise while (true) { std::unique_lock lk(mtx_, std::try_to_lock); diff --git a/libs/core/testing/src/performance.cpp b/libs/core/testing/src/performance.cpp index b06b8b8c6cd1..5cd224ff1497 100644 --- a/libs/core/testing/src/performance.cpp +++ b/libs/core/testing/src/performance.cpp @@ -63,6 +63,7 @@ namespace hpx::util { strm << R"( "executor" : ")" << std::get<1>(item.first) << "\",\n"; strm << R"( "series" : [)"; + double average = 0.0; int series = 0; for (auto const val : item.second) { @@ -70,8 +71,10 @@ namespace hpx::util { strm << ", "; strm << val; ++series; + average += val; } - strm << "]\n"; + strm << "],\n"; + strm << " \"average\" : " << average / series << "\n"; strm << " }"; ++outputs; } diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp index a84afdb668dd..86c18a3ba2b4 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #if defined(HPX_HAVE_APEX) #include @@ -178,6 +179,7 @@ namespace hpx::threads::detail { idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_); [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(idle_rate); + HPX_UNUSED(tfunc_time_collector); // spin for some time after queues have become empty bool may_exit = false; @@ -235,6 +237,8 @@ namespace hpx::threads::detail { scheduler.get_next_thread( num_thread, running, thrd, enable_stealing))) { + HPX_UNUSED(tfunc_time_collector); + HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == &scheduler); @@ -269,6 +273,7 @@ namespace hpx::threads::detail { [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector_inner(idle_rate); + HPX_UNUSED(tfunc_time_collector); // thread returns new required state store the // returned state in the thread @@ -289,7 +294,7 @@ namespace hpx::threads::detail { // and add to aggregate execution time. [[maybe_unused]] exec_time_wrapper exec_time_collector(idle_rate); - + HPX_UNUSED(exec_time_collector); #if defined(HPX_HAVE_APEX) // get the APEX data pointer, in case we are // resuming the thread and have to restore any @@ -464,8 +469,9 @@ namespace hpx::threads::detail { { ++idle_loop_count; - if (scheduler.wait_or_add_new(num_thread, running, - idle_loop_count, enable_stealing_staged, added, + next_thrd = nullptr; + if (scheduler.wait_or_add_new(num_thread, + running, idle_loop_count, enable_stealing_staged, added, &next_thrd)) { // Clean up terminated threads before trying to exit diff --git a/libs/core/thread_pools/src/scheduled_thread_pool.cpp b/libs/core/thread_pools/src/scheduled_thread_pool.cpp index cc2c684cae9f..308d8748f586 100644 --- a/libs/core/thread_pools/src/scheduled_thread_pool.cpp +++ b/libs/core/thread_pools/src/scheduled_thread_pool.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -66,3 +67,14 @@ template class HPX_CORE_EXPORT hpx::threads::policies::shared_priority_queue_scheduler<>; template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< hpx::threads::policies::shared_priority_queue_scheduler<>>; + +template class HPX_CORE_EXPORT + hpx::threads::policies::local_workrequesting_scheduler<>; +template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< + hpx::threads::policies::local_workrequesting_scheduler<>>; +template class HPX_CORE_EXPORT + hpx::threads::policies::local_workrequesting_scheduler; +template class HPX_CORE_EXPORT hpx::threads::detail::scheduled_thread_pool< + hpx::threads::policies::local_workrequesting_scheduler>; diff --git a/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp b/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp index 5fb4df759336..d3f715779875 100644 --- a/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp @@ -43,7 +43,6 @@ namespace hpx::threads { auto* p = get_self_id_data(); p->run_thread_exit_callbacks(); - p->free_thread_exit_callbacks(); return threads::thread_result_type( threads::thread_schedule_state::terminated, @@ -70,7 +69,6 @@ namespace hpx::threads { auto* p = get_self_id_data(); p->run_thread_exit_callbacks(); - p->free_thread_exit_callbacks(); return threads::thread_result_type( threads::thread_schedule_state::terminated, diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp index 82db59ebe895..22cb01f8bd19 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp @@ -384,9 +384,6 @@ namespace hpx::threads { HPX_CORE_EXPORT bool add_thread_exit_callback(thread_id_type const& id, hpx::function const& f, error_code& ec = throws); - HPX_CORE_EXPORT void free_thread_exit_callbacks( - thread_id_type const& id, error_code& ec = throws); - /////////////////////////////////////////////////////////////////////////// HPX_CORE_EXPORT std::size_t get_thread_data( thread_id_type const& id, error_code& ec = throws); diff --git a/libs/core/threading_base/src/scheduler_base.cpp b/libs/core/threading_base/src/scheduler_base.cpp index 3bb4facc62b3..8ddd582abe4e 100644 --- a/libs/core/threading_base/src/scheduler_base.cpp +++ b/libs/core/threading_base/src/scheduler_base.cpp @@ -330,8 +330,7 @@ namespace hpx::threads::policies { void scheduler_base::add_scheduler_mode(scheduler_mode mode) noexcept { // distribute the same value across all cores - mode = static_cast(get_scheduler_mode() | mode); - set_scheduler_mode(mode); + set_scheduler_mode(get_scheduler_mode() | mode); } void scheduler_base::remove_scheduler_mode(scheduler_mode mode) noexcept diff --git a/libs/core/threading_base/src/thread_helpers.cpp b/libs/core/threading_base/src/thread_helpers.cpp index ecbd95b0c87d..6e3830f65430 100644 --- a/libs/core/threading_base/src/thread_helpers.cpp +++ b/libs/core/threading_base/src/thread_helpers.cpp @@ -344,22 +344,6 @@ namespace hpx::threads { return get_thread_id_data(id)->add_thread_exit_callback(f); } - void free_thread_exit_callbacks(thread_id_type const& id, error_code& ec) - { - if (HPX_UNLIKELY(!id)) - { - HPX_THROWS_IF(ec, hpx::error::null_thread_id, - "hpx::threads::add_thread_exit_callback", - "null thread id encountered"); - return; - } - - if (&ec != &throws) - ec = make_success_code(); - - get_thread_id_data(id)->free_thread_exit_callbacks(); - } - /////////////////////////////////////////////////////////////////////////// #ifdef HPX_HAVE_THREAD_FULLBACKTRACE_ON_SUSPENSION char const* get_thread_backtrace(thread_id_type const& id, error_code& ec) diff --git a/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp b/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp index 077df205ea9f..eab76a473acc 100644 --- a/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp +++ b/libs/core/threading_base/tests/regressions/thread_stacksize_current.cpp @@ -75,7 +75,9 @@ int main(int argc, char** argv) "abp-priority-fifo", "abp-priority-lifo", #endif - "shared-priority" + "shared-priority", + "local-workrequesting-fifo", + "local-workrequesting-lifo" }; // clang-format on for (auto const& scheduler : schedulers) diff --git a/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp b/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp index b21712446e79..6e5efe4e9693 100644 --- a/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp +++ b/libs/core/threadmanager/include/hpx/modules/threadmanager.hpp @@ -425,6 +425,12 @@ namespace hpx { namespace threads { void create_scheduler_shared_priority( thread_pool_init_parameters const&, policies::thread_queue_init_parameters const&, std::size_t); + void create_scheduler_local_workrequesting_fifo( + thread_pool_init_parameters const&, + policies::thread_queue_init_parameters const&, std::size_t); + void create_scheduler_local_workrequesting_lifo( + thread_pool_init_parameters const&, + policies::thread_queue_init_parameters const&, std::size_t); mutable mutex_type mtx_; // mutex protecting the members diff --git a/libs/core/threadmanager/src/threadmanager.cpp b/libs/core/threadmanager/src/threadmanager.cpp index 1b4c555a7eca..1fde4443d865 100644 --- a/libs/core/threadmanager/src/threadmanager.cpp +++ b/libs/core/threadmanager/src/threadmanager.cpp @@ -502,6 +502,85 @@ namespace hpx { namespace threads { pools_.push_back(HPX_MOVE(pool)); } + void threadmanager::create_scheduler_local_workrequesting_fifo( + thread_pool_init_parameters const& thread_pool_init, + policies::thread_queue_init_parameters const& thread_queue_init, + std::size_t numa_sensitive) + { + // set parameters for scheduler and pool instantiation and + // perform compatibility checks + std::size_t num_high_priority_queues = + hpx::util::get_entry_as(rtcfg_, + "hpx.thread_queue.high_priority_queues", + thread_pool_init.num_threads_); + detail::check_num_high_priority_queues( + thread_pool_init.num_threads_, num_high_priority_queues); + + // instantiate the scheduler + using local_sched_type = + hpx::threads::policies::local_workrequesting_scheduler<>; + + local_sched_type::init_parameter_type init( + thread_pool_init.num_threads_, thread_pool_init.affinity_data_, + num_high_priority_queues, thread_queue_init, + "core-local_workrequesting_scheduler"); + + std::unique_ptr sched(new local_sched_type(init)); + + // set the default scheduler flags + sched->set_scheduler_mode(thread_pool_init.mode_); + + // conditionally set/unset this flag + sched->update_scheduler_mode( + policies::scheduler_mode::enable_stealing_numa, !numa_sensitive); + + // instantiate the pool + std::unique_ptr pool = std::make_unique< + hpx::threads::detail::scheduled_thread_pool>( + HPX_MOVE(sched), thread_pool_init); + pools_.push_back(HPX_MOVE(pool)); + } + + void threadmanager::create_scheduler_local_workrequesting_lifo( + thread_pool_init_parameters const& thread_pool_init, + policies::thread_queue_init_parameters const& thread_queue_init, + std::size_t numa_sensitive) + { + // set parameters for scheduler and pool instantiation and + // perform compatibility checks + std::size_t num_high_priority_queues = + hpx::util::get_entry_as(rtcfg_, + "hpx.thread_queue.high_priority_queues", + thread_pool_init.num_threads_); + detail::check_num_high_priority_queues( + thread_pool_init.num_threads_, num_high_priority_queues); + + // instantiate the scheduler + using local_sched_type = + hpx::threads::policies::local_workrequesting_scheduler; + + local_sched_type::init_parameter_type init( + thread_pool_init.num_threads_, thread_pool_init.affinity_data_, + num_high_priority_queues, thread_queue_init, + "core-local_workrequesting_scheduler"); + + std::unique_ptr sched(new local_sched_type(init)); + + // set the default scheduler flags + sched->set_scheduler_mode(thread_pool_init.mode_); + + // conditionally set/unset this flag + sched->update_scheduler_mode( + policies::scheduler_mode::enable_stealing_numa, !numa_sensitive); + + // instantiate the pool + std::unique_ptr pool = std::make_unique< + hpx::threads::detail::scheduled_thread_pool>( + HPX_MOVE(sched), thread_pool_init); + pools_.push_back(HPX_MOVE(pool)); + } + void threadmanager::create_pools() { auto& rp = hpx::resource::get_partitioner(); @@ -640,6 +719,16 @@ namespace hpx { namespace threads { thread_pool_init, thread_queue_init, numa_sensitive); break; + case resource::scheduling_policy::local_workrequesting_fifo: + create_scheduler_local_workrequesting_fifo( + thread_pool_init, thread_queue_init, numa_sensitive); + break; + + case resource::scheduling_policy::local_workrequesting_lifo: + create_scheduler_local_workrequesting_lifo( + thread_pool_init, thread_queue_init, numa_sensitive); + break; + case resource::scheduling_policy::abp_priority_fifo: create_scheduler_abp_priority_fifo( thread_pool_init, thread_queue_init, numa_sensitive); diff --git a/tools/perftests_ci/perftest/plot.py b/tools/perftests_ci/perftest/plot.py index 2d7c4777fa27..aecbf6ab713e 100644 --- a/tools/perftests_ci/perftest/plot.py +++ b/tools/perftests_ci/perftest/plot.py @@ -38,8 +38,10 @@ def __str__(self): @classmethod def outputs_by_key(cls, data): def split_output(o): - return cls(**{k: v - for k, v in o.items() if k != 'series'}), o['series'] + return cls(**{ + k: v for k, v in o.items() \ + if k != 'series' and k != 'average' \ + }), o['series'] return dict(split_output(o) for o in data['outputs']) From bfc70993ab20c03ca86dd840590c56d781aaa68a Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Tue, 17 Jan 2023 17:17:59 -0600 Subject: [PATCH 2/6] Fixing race during destruction of hpx::thread --- .../include/hpx/threading/thread.hpp | 6 +- libs/core/threading/src/thread.cpp | 12 +- .../hpx/threading_base/thread_data.hpp | 32 +++++- libs/core/threading_base/src/thread_data.cpp | 103 ++++++++++++++---- 4 files changed, 120 insertions(+), 33 deletions(-) diff --git a/libs/core/threading/include/hpx/threading/thread.hpp b/libs/core/threading/include/hpx/threading/thread.hpp index 77005ea665d8..cf7c3fac0a4d 100644 --- a/libs/core/threading/include/hpx/threading/thread.hpp +++ b/libs/core/threading/include/hpx/threading/thread.hpp @@ -168,9 +168,11 @@ namespace hpx { return threads::invalid_thread_id != id_; } - void detach_locked() + threads::thread_id_ref_type detach_locked() { - id_ = threads::invalid_thread_id; + threads::thread_id_ref_type id; + std::swap(id, id_); + return id; } void start_thread(threads::thread_pool_base* pool, diff --git a/libs/core/threading/src/thread.cpp b/libs/core/threading/src/thread.cpp index 7b135e96bb0f..55a72fb0b7fa 100644 --- a/libs/core/threading/src/thread.cpp +++ b/libs/core/threading/src/thread.cpp @@ -112,7 +112,6 @@ namespace hpx { "run_thread_exit_callbacks", "null thread id encountered"); } threads::run_thread_exit_callbacks(id); - threads::free_thread_exit_callbacks(id); } threads::thread_result_type thread::thread_function_nullary( @@ -214,17 +213,18 @@ namespace hpx { } this_thread::interruption_point(); + // invalidate this object + threads::thread_id_ref_type id = detach_locked(); + // register callback function to be called when thread exits - if (threads::add_thread_exit_callback(id_.noref(), - hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) + if (threads::add_thread_exit_callback( + id.noref(), hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) { // wait for thread to be terminated - unlock_guard ul(l); + l.unlock(); this_thread::suspend( threads::thread_schedule_state::suspended, "thread::join"); } - - detach_locked(); // invalidate this object } // extensions diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp index 5f30957f7e40..8affad0003bb 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp @@ -487,7 +487,6 @@ namespace hpx::threads { bool add_thread_exit_callback(function const& f); void run_thread_exit_callbacks(); - void free_thread_exit_callbacks(); // no need to protect the variables related to scoped children as those // are supposed to be accessed by ourselves only @@ -517,7 +516,8 @@ namespace hpx::threads { void set_last_worker_thread_num( std::size_t last_worker_thread_num) noexcept { - last_worker_thread_num_ = last_worker_thread_num; + last_worker_thread_num_ = + static_cast(last_worker_thread_num); } constexpr std::ptrdiff_t get_stack_size() const noexcept @@ -611,6 +611,34 @@ namespace hpx::threads { private: mutable std::atomic current_state_; + /////////////////////////////////////////////////////////////////////// + thread_priority priority_; + thread_stacksize stacksize_enum_; + + bool requested_interrupt_; + bool enabled_interrupt_; + + enum class exit_func_state + { + none, + ready, + processed + }; + + std::atomic ran_exit_funcs_; + bool const is_stackless_; + + std::uint16_t last_worker_thread_num_; + + // reference to scheduler which created/manages this thread + policies::scheduler_base* scheduler_base_; + void* queue_; + + std::ptrdiff_t stacksize_; + + // Singly linked list (heap-allocated) + std::forward_list> exit_funcs_; + /////////////////////////////////////////////////////////////////////// // Debugging/logging information #ifdef HPX_HAVE_THREAD_DESCRIPTION diff --git a/libs/core/threading_base/src/thread_data.cpp b/libs/core/threading_base/src/thread_data.cpp index a884fbc33d52..d89c7033ebc9 100644 --- a/libs/core/threading_base/src/thread_data.cpp +++ b/libs/core/threading_base/src/thread_data.cpp @@ -52,6 +52,16 @@ namespace hpx::threads { : detail::thread_data_reference_counting(addref) , current_state_(thread_state( init_data.initial_state, thread_restart_state::signaled)) + , priority_(init_data.priority) + , stacksize_enum_(init_data.stacksize) + , requested_interrupt_(false) + , enabled_interrupt_(true) + , ran_exit_funcs_(exit_func_state::none) + , is_stackless_(is_stackless) + , last_worker_thread_num_(std::uint16_t(-1)) + , scheduler_base_(init_data.scheduler_base) + , queue_(queue) + , stacksize_(stacksize) #ifdef HPX_HAVE_THREAD_DESCRIPTION , description_(init_data.description) , lco_description_() @@ -107,7 +117,13 @@ namespace hpx::threads { thread_data::~thread_data() { LTM_(debug).format("thread_data::~thread_data({})", this); - free_thread_exit_callbacks(); + + // Exit functions should have been executed. + HPX_ASSERT(exit_funcs_.empty() || + ran_exit_funcs_.load(std::memory_order_relaxed) == + exit_func_state::none || + ran_exit_funcs_.load(std::memory_order_relaxed) == + exit_func_state::processed); } void thread_data::destroy_thread() @@ -121,20 +137,55 @@ namespace hpx::threads { void thread_data::run_thread_exit_callbacks() { - std::unique_lock l( - spinlock_pool::spinlock_for(this)); - - while (!exit_funcs_.empty()) + // when leaving this function the state must be 'processed' + while (true) { + exit_func_state expected = exit_func_state::ready; + if (ran_exit_funcs_.compare_exchange_strong( + expected, exit_func_state::processed)) { - hpx::unlock_guard> - ul(l); - if (!exit_funcs_.front().empty()) - exit_funcs_.front()(); + // run exit functions only if there are any (state is 'ready') + std::unique_lock l( + spinlock_pool::spinlock_for(this)); + + while (!exit_funcs_.empty()) + { + if (!exit_funcs_.front().empty()) + { + auto f = exit_funcs_.front(); + exit_funcs_.pop_front(); + + hpx::unlock_guard< + std::unique_lock> + ul(l); + f(); + } + else + { + exit_funcs_.pop_front(); + } + } + + // clear all exit functions now as they are not needed anymore + exit_funcs_.clear(); + return; + } + else if (expected == exit_func_state::none) + { + if (ran_exit_funcs_.compare_exchange_strong( + expected, exit_func_state::processed)) + { + return; + } + + // try again, state was set to ready or processed by now + } + else + { + HPX_ASSERT(expected == exit_func_state::processed); + return; } - exit_funcs_.pop_front(); } - ran_exit_funcs_ = true; } bool thread_data::add_thread_exit_callback(hpx::function const& f) @@ -149,20 +200,28 @@ namespace hpx::threads { return false; } - exit_funcs_.push_front(f); + // don't register any more exit callback if the thread has already + // exited + exit_func_state expected = exit_func_state::none; + if (!ran_exit_funcs_.compare_exchange_strong( + expected, exit_func_state::ready)) + { + // the state was not none (i.e. ready or processed), bail out if it + // was processed + if (expected == exit_func_state::processed) + { + return false; + } + } - return true; - } + HPX_ASSERT(ran_exit_funcs_.load(std::memory_order_relaxed) == + exit_func_state::ready); - void thread_data::free_thread_exit_callbacks() - { std::lock_guard l( spinlock_pool::spinlock_for(this)); - // Exit functions should have been executed. - HPX_ASSERT(exit_funcs_.empty() || ran_exit_funcs_); - - exit_funcs_.clear(); + exit_funcs_.push_front(f); + return true; } bool thread_data::interruption_point(bool throw_on_interrupt) @@ -196,8 +255,6 @@ namespace hpx::threads { "thread_data::rebind_base({}), description({}), phase({}), rebind", this, get_description(), get_thread_phase()); - free_thread_exit_callbacks(); - current_state_.store(thread_state( init_data.initial_state, thread_restart_state::signaled)); @@ -219,7 +276,7 @@ namespace hpx::threads { priority_ = init_data.priority; requested_interrupt_ = false; enabled_interrupt_ = true; - ran_exit_funcs_ = false; + ran_exit_funcs_.store(exit_func_state::none, std::memory_order_relaxed); runs_as_child_.store(init_data.schedulehint.runs_as_child_mode() == hpx::threads::thread_execution_hint::run_as_child, From 5dab68ce456c8b2db192533fdd9e5ef20ca0a3de Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Fri, 27 Jan 2023 16:07:16 -0600 Subject: [PATCH 3/6] Fixing merge conflicts, revert changes to thread exit callbacks --- .../include/hpx/modules/schedulers.hpp | 2 + .../local_workrequesting_scheduler.hpp | 80 ++++++++------ .../include/hpx/schedulers/thread_queue.hpp | 3 +- .../hpx/thread_pools/scheduling_loop.hpp | 11 +- .../include/hpx/threading/thread.hpp | 6 +- libs/core/threading/src/thread.cpp | 12 +- .../hpx/threading_base/register_thread.hpp | 2 + .../hpx/threading_base/thread_data.hpp | 32 +----- .../hpx/threading_base/thread_helpers.hpp | 3 + libs/core/threading_base/src/thread_data.cpp | 103 ++++-------------- .../threading_base/src/thread_helpers.cpp | 16 +++ 11 files changed, 107 insertions(+), 163 deletions(-) diff --git a/libs/core/schedulers/include/hpx/modules/schedulers.hpp b/libs/core/schedulers/include/hpx/modules/schedulers.hpp index d6790d352d73..2de14cc98363 100644 --- a/libs/core/schedulers/include/hpx/modules/schedulers.hpp +++ b/libs/core/schedulers/include/hpx/modules/schedulers.hpp @@ -6,6 +6,8 @@ #pragma once +#include + #include #include #include diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp index bb025b80e987..ac6c1e060ee8 100644 --- a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +47,7 @@ // different working principle if compared to the classic work-stealing. Instead // of actively pulling work from the work queues of neighboring cores it relies // on a push model. Cores that run out of work post steal requests that are -// handled by cores that have work awailable by actively sending tasks to the +// handled by cores that have work available by actively sending tasks to the // requesting core. // // When a worker runs out of tasks, it becomes a thief by sending steal requests @@ -115,18 +116,18 @@ namespace hpx::threads::policies { using thread_queue_type = thread_queue; - public: struct init_parameter { init_parameter(std::size_t num_queues, detail::affinity_data const& affinity_data, - std::size_t num_high_priority_queues = std::size_t(-1), + std::size_t num_high_priority_queues = static_cast( + -1), thread_queue_init_parameters const& thread_queue_init = thread_queue_init_parameters{}, char const* description = "local_workrequesting_scheduler") : num_queues_(num_queues) , num_high_priority_queues_( - num_high_priority_queues == std::size_t(-1) ? + num_high_priority_queues == static_cast(-1) ? num_queues : num_high_priority_queues) , thread_queue_init_(thread_queue_init) @@ -159,7 +160,8 @@ namespace hpx::threads::policies { struct task_data { explicit HPX_HOST_DEVICE_CONSTEXPR task_data( - std::uint16_t num_thread = std::uint16_t(-1)) noexcept + std::uint16_t num_thread = static_cast( + -1)) noexcept : num_thread_(num_thread) { } @@ -188,8 +190,8 @@ namespace hpx::threads::policies { { } - steal_request(std::size_t num_thread, task_channel* channel, - mask_cref_type victims, bool idle, bool stealhalf) + steal_request(std::size_t const num_thread, task_channel* channel, + mask_cref_type victims, bool idle, bool const stealhalf) : channel_(channel) , victims_(victims) , num_thread_(static_cast(num_thread)) @@ -221,6 +223,11 @@ namespace hpx::threads::policies { { } + scheduler_data(scheduler_data const&) = delete; + scheduler_data(scheduler_data&&) = delete; + scheduler_data& operator=(scheduler_data const&) = delete; + scheduler_data& operator=(scheduler_data&&) = delete; + ~scheduler_data() { delete queue_; @@ -330,9 +337,18 @@ namespace hpx::threads::policies { } } + local_workrequesting_scheduler( + local_workrequesting_scheduler const&) = delete; + local_workrequesting_scheduler( + local_workrequesting_scheduler&&) = delete; + local_workrequesting_scheduler& operator=( + local_workrequesting_scheduler const&) = delete; + local_workrequesting_scheduler& operator=( + local_workrequesting_scheduler&&) = delete; + ~local_workrequesting_scheduler() override = default; - static std::string get_scheduler_name() + static std::string_view get_scheduler_name() { return "local_workrequesting_scheduler"; } @@ -659,9 +675,9 @@ namespace hpx::threads::policies { std::size_t num_thread = data.schedulehint.mode == thread_schedule_hint_mode::thread ? data.schedulehint.hint : - std::size_t(-1); + static_cast(-1); - if (std::size_t(-1) == num_thread) + if (static_cast(-1) == num_thread) { num_thread = curr_queue_++ % num_queues_; } @@ -889,7 +905,7 @@ namespace hpx::threads::policies { // Return the next thread to be executed, return false if none is // available bool get_next_thread(std::size_t num_thread, bool running, - thread_id_ref_type& thrd, bool enable_stealing) override + thread_id_ref_type& thrd, bool enable_stealing) { HPX_ASSERT(num_thread < num_queues_); @@ -968,9 +984,9 @@ namespace hpx::threads::policies { void schedule_thread(thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, - thread_priority priority = thread_priority::normal) override + thread_priority priority = thread_priority::default_) override { - std::size_t num_thread = std::size_t(-1); + std::size_t num_thread = static_cast(-1); if (schedulehint.mode == thread_schedule_hint_mode::thread) { num_thread = schedulehint.hint; @@ -980,7 +996,7 @@ namespace hpx::threads::policies { allow_fallback = false; } - if (std::size_t(-1) == num_thread) + if (static_cast(-1) == num_thread) { num_thread = curr_queue_++ % num_queues_; } @@ -1038,9 +1054,9 @@ namespace hpx::threads::policies { void schedule_thread_last(thread_id_ref_type thrd, threads::thread_schedule_hint schedulehint, bool allow_fallback = false, - thread_priority priority = thread_priority::normal) override + thread_priority priority = thread_priority::default_) override { - std::size_t num_thread = std::size_t(-1); + std::size_t num_thread = static_cast(-1); if (schedulehint.mode == thread_schedule_hint_mode::thread) { num_thread = schedulehint.hint; @@ -1050,7 +1066,7 @@ namespace hpx::threads::policies { allow_fallback = false; } - if (std::size_t(-1) == num_thread) + if (static_cast(-1) == num_thread) { num_thread = curr_queue_++ % num_queues_; } @@ -1109,12 +1125,11 @@ namespace hpx::threads::policies { /////////////////////////////////////////////////////////////////////// // This returns the current length of the queues (work items and new // items) - std::int64_t get_queue_length( - std::size_t num_thread = std::size_t(-1)) const override + std::int64_t get_queue_length(std::size_t num_thread) const override { // Return queue length of one specific queue. std::int64_t count = 0; - if (std::size_t(-1) != num_thread) + if (static_cast(-1) != num_thread) { HPX_ASSERT(num_thread < num_queues_); auto const& d = data_[num_thread].data_; @@ -1150,12 +1165,12 @@ namespace hpx::threads::policies { std::int64_t get_thread_count( thread_schedule_state state = thread_schedule_state::unknown, thread_priority priority = thread_priority::default_, - std::size_t num_thread = std::size_t(-1), + std::size_t num_thread = static_cast(-1), bool /* reset */ = false) const override { // Return thread count of one specific queue. std::int64_t count = 0; - if (std::size_t(-1) != num_thread) + if (static_cast(-1) != num_thread) { HPX_ASSERT(num_thread < num_queues_); @@ -1207,7 +1222,6 @@ namespace hpx::threads::policies { "local_workrequesting_scheduler::get_thread_count", "unknown thread priority value " "(thread_priority::unknown)"); - return 0; } } return 0; @@ -1275,7 +1289,6 @@ namespace hpx::threads::policies { "local_workrequesting_scheduler::get_thread_count", "unknown thread priority value " "(thread_priority::unknown)"); - return 0; } } return count; @@ -1444,7 +1457,7 @@ namespace hpx::threads::policies { // generate at most 3 random numbers before resorting to more // expensive algorithm std::uniform_int_distribution uniform( - 0, std::int16_t(num_queues_ - 1)); + 0, static_cast(num_queues_ - 1)); int attempts = 0; do @@ -1461,8 +1474,9 @@ namespace hpx::threads::policies { // to avoid infinite trials we randomly select one of the possible // victims - std::uniform_int_distribution uniform( - 0, std::int16_t(num_queues_ - count(req.victims_) - 1)); + std::uniform_int_distribution uniform(0, + static_cast( + num_queues_ - count(req.victims_) - 1)); // generate one more random number std::size_t selected_victim = uniform(gen_); @@ -1489,7 +1503,7 @@ namespace hpx::threads::policies { std::size_t next_victim([[maybe_unused]] scheduler_data& d, steal_request const& req) noexcept { - std::size_t victim = std::size_t(-1); + std::size_t victim; // return thief if max steal attempts has been reached or no more // cores are available for stealing @@ -1516,7 +1530,7 @@ namespace hpx::threads::policies { } // couldn't find victim, return steal request to thief - if (victim == std::size_t(-1)) + if (victim == static_cast(-1)) { victim = req.num_thread_; HPX_ASSERT(victim != d.num_thread_); @@ -1650,7 +1664,7 @@ namespace hpx::threads::policies { bool wait_or_add_new(std::size_t num_thread, bool running, [[maybe_unused]] std::int64_t& idle_loop_count, bool enable_stealing, std::size_t& added, - thread_id_ref_type* next_thrd = nullptr) override + thread_id_ref_type* next_thrd = nullptr) { HPX_ASSERT(num_thread < num_queues_); @@ -1755,11 +1769,11 @@ namespace hpx::threads::policies { low_priority_queue_.on_start_thread(num_thread); } - std::size_t num_threads = num_queues_; + std::size_t const num_threads = num_queues_; //auto const& topo = create_topology(); // Initially set all bits, code below resets the bits corresponding - // to cores that can serve as a vistim for the current core. A set + // to cores that can serve as a victim for the current core. A set // bit in this mask means 'do not steal from this core'. resize(d.victims_, num_threads); reset(d.victims_); @@ -1889,7 +1903,7 @@ namespace hpx::threads::policies { } } - void reset_thread_distribution() override + void reset_thread_distribution() noexcept override { curr_queue_.store(0, std::memory_order_release); } diff --git a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp index 6861f9a84683..7758770b9306 100644 --- a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp @@ -457,8 +457,7 @@ namespace hpx::threads::policies { if (delete_all) { - // do not lock mutex while deleting all threads, do it - // piece-wise + // do not lock mutex while deleting all threads, do it piece-wise while (true) { std::unique_lock lk(mtx_, std::try_to_lock); diff --git a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp index 86c18a3ba2b4..6a6dbb766d69 100644 --- a/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp +++ b/libs/core/thread_pools/include/hpx/thread_pools/scheduling_loop.hpp @@ -19,7 +19,6 @@ #include #include #include -#include #if defined(HPX_HAVE_APEX) #include @@ -179,7 +178,6 @@ namespace hpx::threads::detail { idle_collect_rate idle_rate(counters.tfunc_time_, counters.exec_time_); [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector(idle_rate); - HPX_UNUSED(tfunc_time_collector); // spin for some time after queues have become empty bool may_exit = false; @@ -237,8 +235,6 @@ namespace hpx::threads::detail { scheduler.get_next_thread( num_thread, running, thrd, enable_stealing))) { - HPX_UNUSED(tfunc_time_collector); - HPX_ASSERT(get_thread_id_data(thrd)->get_scheduler_base() == &scheduler); @@ -273,7 +269,6 @@ namespace hpx::threads::detail { [[maybe_unused]] tfunc_time_wrapper tfunc_time_collector_inner(idle_rate); - HPX_UNUSED(tfunc_time_collector); // thread returns new required state store the // returned state in the thread @@ -294,7 +289,7 @@ namespace hpx::threads::detail { // and add to aggregate execution time. [[maybe_unused]] exec_time_wrapper exec_time_collector(idle_rate); - HPX_UNUSED(exec_time_collector); + #if defined(HPX_HAVE_APEX) // get the APEX data pointer, in case we are // resuming the thread and have to restore any @@ -470,8 +465,8 @@ namespace hpx::threads::detail { ++idle_loop_count; next_thrd = nullptr; - if (scheduler.wait_or_add_new(num_thread, - running, idle_loop_count, enable_stealing_staged, added, + if (scheduler.wait_or_add_new(num_thread, running, + idle_loop_count, enable_stealing_staged, added, &next_thrd)) { // Clean up terminated threads before trying to exit diff --git a/libs/core/threading/include/hpx/threading/thread.hpp b/libs/core/threading/include/hpx/threading/thread.hpp index cf7c3fac0a4d..77005ea665d8 100644 --- a/libs/core/threading/include/hpx/threading/thread.hpp +++ b/libs/core/threading/include/hpx/threading/thread.hpp @@ -168,11 +168,9 @@ namespace hpx { return threads::invalid_thread_id != id_; } - threads::thread_id_ref_type detach_locked() + void detach_locked() { - threads::thread_id_ref_type id; - std::swap(id, id_); - return id; + id_ = threads::invalid_thread_id; } void start_thread(threads::thread_pool_base* pool, diff --git a/libs/core/threading/src/thread.cpp b/libs/core/threading/src/thread.cpp index 55a72fb0b7fa..7b135e96bb0f 100644 --- a/libs/core/threading/src/thread.cpp +++ b/libs/core/threading/src/thread.cpp @@ -112,6 +112,7 @@ namespace hpx { "run_thread_exit_callbacks", "null thread id encountered"); } threads::run_thread_exit_callbacks(id); + threads::free_thread_exit_callbacks(id); } threads::thread_result_type thread::thread_function_nullary( @@ -213,18 +214,17 @@ namespace hpx { } this_thread::interruption_point(); - // invalidate this object - threads::thread_id_ref_type id = detach_locked(); - // register callback function to be called when thread exits - if (threads::add_thread_exit_callback( - id.noref(), hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) + if (threads::add_thread_exit_callback(id_.noref(), + hpx::bind_front(&resume_thread, HPX_MOVE(this_id)))) { // wait for thread to be terminated - l.unlock(); + unlock_guard ul(l); this_thread::suspend( threads::thread_schedule_state::suspended, "thread::join"); } + + detach_locked(); // invalidate this object } // extensions diff --git a/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp b/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp index d3f715779875..5fb4df759336 100644 --- a/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/register_thread.hpp @@ -43,6 +43,7 @@ namespace hpx::threads { auto* p = get_self_id_data(); p->run_thread_exit_callbacks(); + p->free_thread_exit_callbacks(); return threads::thread_result_type( threads::thread_schedule_state::terminated, @@ -69,6 +70,7 @@ namespace hpx::threads { auto* p = get_self_id_data(); p->run_thread_exit_callbacks(); + p->free_thread_exit_callbacks(); return threads::thread_result_type( threads::thread_schedule_state::terminated, diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp index 8affad0003bb..5f30957f7e40 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp @@ -487,6 +487,7 @@ namespace hpx::threads { bool add_thread_exit_callback(function const& f); void run_thread_exit_callbacks(); + void free_thread_exit_callbacks(); // no need to protect the variables related to scoped children as those // are supposed to be accessed by ourselves only @@ -516,8 +517,7 @@ namespace hpx::threads { void set_last_worker_thread_num( std::size_t last_worker_thread_num) noexcept { - last_worker_thread_num_ = - static_cast(last_worker_thread_num); + last_worker_thread_num_ = last_worker_thread_num; } constexpr std::ptrdiff_t get_stack_size() const noexcept @@ -611,34 +611,6 @@ namespace hpx::threads { private: mutable std::atomic current_state_; - /////////////////////////////////////////////////////////////////////// - thread_priority priority_; - thread_stacksize stacksize_enum_; - - bool requested_interrupt_; - bool enabled_interrupt_; - - enum class exit_func_state - { - none, - ready, - processed - }; - - std::atomic ran_exit_funcs_; - bool const is_stackless_; - - std::uint16_t last_worker_thread_num_; - - // reference to scheduler which created/manages this thread - policies::scheduler_base* scheduler_base_; - void* queue_; - - std::ptrdiff_t stacksize_; - - // Singly linked list (heap-allocated) - std::forward_list> exit_funcs_; - /////////////////////////////////////////////////////////////////////// // Debugging/logging information #ifdef HPX_HAVE_THREAD_DESCRIPTION diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp index 22cb01f8bd19..82db59ebe895 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_helpers.hpp @@ -384,6 +384,9 @@ namespace hpx::threads { HPX_CORE_EXPORT bool add_thread_exit_callback(thread_id_type const& id, hpx::function const& f, error_code& ec = throws); + HPX_CORE_EXPORT void free_thread_exit_callbacks( + thread_id_type const& id, error_code& ec = throws); + /////////////////////////////////////////////////////////////////////////// HPX_CORE_EXPORT std::size_t get_thread_data( thread_id_type const& id, error_code& ec = throws); diff --git a/libs/core/threading_base/src/thread_data.cpp b/libs/core/threading_base/src/thread_data.cpp index d89c7033ebc9..a884fbc33d52 100644 --- a/libs/core/threading_base/src/thread_data.cpp +++ b/libs/core/threading_base/src/thread_data.cpp @@ -52,16 +52,6 @@ namespace hpx::threads { : detail::thread_data_reference_counting(addref) , current_state_(thread_state( init_data.initial_state, thread_restart_state::signaled)) - , priority_(init_data.priority) - , stacksize_enum_(init_data.stacksize) - , requested_interrupt_(false) - , enabled_interrupt_(true) - , ran_exit_funcs_(exit_func_state::none) - , is_stackless_(is_stackless) - , last_worker_thread_num_(std::uint16_t(-1)) - , scheduler_base_(init_data.scheduler_base) - , queue_(queue) - , stacksize_(stacksize) #ifdef HPX_HAVE_THREAD_DESCRIPTION , description_(init_data.description) , lco_description_() @@ -117,13 +107,7 @@ namespace hpx::threads { thread_data::~thread_data() { LTM_(debug).format("thread_data::~thread_data({})", this); - - // Exit functions should have been executed. - HPX_ASSERT(exit_funcs_.empty() || - ran_exit_funcs_.load(std::memory_order_relaxed) == - exit_func_state::none || - ran_exit_funcs_.load(std::memory_order_relaxed) == - exit_func_state::processed); + free_thread_exit_callbacks(); } void thread_data::destroy_thread() @@ -137,55 +121,20 @@ namespace hpx::threads { void thread_data::run_thread_exit_callbacks() { - // when leaving this function the state must be 'processed' - while (true) - { - exit_func_state expected = exit_func_state::ready; - if (ran_exit_funcs_.compare_exchange_strong( - expected, exit_func_state::processed)) - { - // run exit functions only if there are any (state is 'ready') - std::unique_lock l( - spinlock_pool::spinlock_for(this)); - - while (!exit_funcs_.empty()) - { - if (!exit_funcs_.front().empty()) - { - auto f = exit_funcs_.front(); - exit_funcs_.pop_front(); - - hpx::unlock_guard< - std::unique_lock> - ul(l); - f(); - } - else - { - exit_funcs_.pop_front(); - } - } - - // clear all exit functions now as they are not needed anymore - exit_funcs_.clear(); - return; - } - else if (expected == exit_func_state::none) - { - if (ran_exit_funcs_.compare_exchange_strong( - expected, exit_func_state::processed)) - { - return; - } + std::unique_lock l( + spinlock_pool::spinlock_for(this)); - // try again, state was set to ready or processed by now - } - else + while (!exit_funcs_.empty()) + { { - HPX_ASSERT(expected == exit_func_state::processed); - return; + hpx::unlock_guard> + ul(l); + if (!exit_funcs_.front().empty()) + exit_funcs_.front()(); } + exit_funcs_.pop_front(); } + ran_exit_funcs_ = true; } bool thread_data::add_thread_exit_callback(hpx::function const& f) @@ -200,28 +149,20 @@ namespace hpx::threads { return false; } - // don't register any more exit callback if the thread has already - // exited - exit_func_state expected = exit_func_state::none; - if (!ran_exit_funcs_.compare_exchange_strong( - expected, exit_func_state::ready)) - { - // the state was not none (i.e. ready or processed), bail out if it - // was processed - if (expected == exit_func_state::processed) - { - return false; - } - } + exit_funcs_.push_front(f); - HPX_ASSERT(ran_exit_funcs_.load(std::memory_order_relaxed) == - exit_func_state::ready); + return true; + } + void thread_data::free_thread_exit_callbacks() + { std::lock_guard l( spinlock_pool::spinlock_for(this)); - exit_funcs_.push_front(f); - return true; + // Exit functions should have been executed. + HPX_ASSERT(exit_funcs_.empty() || ran_exit_funcs_); + + exit_funcs_.clear(); } bool thread_data::interruption_point(bool throw_on_interrupt) @@ -255,6 +196,8 @@ namespace hpx::threads { "thread_data::rebind_base({}), description({}), phase({}), rebind", this, get_description(), get_thread_phase()); + free_thread_exit_callbacks(); + current_state_.store(thread_state( init_data.initial_state, thread_restart_state::signaled)); @@ -276,7 +219,7 @@ namespace hpx::threads { priority_ = init_data.priority; requested_interrupt_ = false; enabled_interrupt_ = true; - ran_exit_funcs_.store(exit_func_state::none, std::memory_order_relaxed); + ran_exit_funcs_ = false; runs_as_child_.store(init_data.schedulehint.runs_as_child_mode() == hpx::threads::thread_execution_hint::run_as_child, diff --git a/libs/core/threading_base/src/thread_helpers.cpp b/libs/core/threading_base/src/thread_helpers.cpp index 6e3830f65430..ecbd95b0c87d 100644 --- a/libs/core/threading_base/src/thread_helpers.cpp +++ b/libs/core/threading_base/src/thread_helpers.cpp @@ -344,6 +344,22 @@ namespace hpx::threads { return get_thread_id_data(id)->add_thread_exit_callback(f); } + void free_thread_exit_callbacks(thread_id_type const& id, error_code& ec) + { + if (HPX_UNLIKELY(!id)) + { + HPX_THROWS_IF(ec, hpx::error::null_thread_id, + "hpx::threads::add_thread_exit_callback", + "null thread id encountered"); + return; + } + + if (&ec != &throws) + ec = make_success_code(); + + get_thread_id_data(id)->free_thread_exit_callbacks(); + } + /////////////////////////////////////////////////////////////////////////// #ifdef HPX_HAVE_THREAD_FULLBACKTRACE_ON_SUSPENSION char const* get_thread_backtrace(thread_id_type const& id, error_code& ec) From 68f0a3a0e44840f9e7c6e5e5a3891b8978a905d5 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 21 Aug 2023 14:08:03 -0500 Subject: [PATCH 4/6] Fixing use of sliding semaphore --- examples/1d_stencil/1d_stencil_4.cpp | 8 ++++---- examples/1d_stencil/1d_stencil_4_throttle.cpp | 8 ++++---- examples/1d_stencil/1d_stencil_7.cpp | 9 +++++---- examples/1d_stencil/1d_stencil_8.cpp | 9 +++++---- .../tests/performance/replay/1d_stencil.cpp | 10 +++++----- .../tests/performance/replay/1d_stencil_checksum.cpp | 10 +++++----- .../tests/performance/replay/1d_stencil_replay.cpp | 10 +++++----- .../tests/performance/replay/dataflow_replay.cpp | 11 ++++++----- .../performance/replay/dataflow_replay_validate.cpp | 11 ++++++----- .../tests/performance/replay/pure_dataflow.cpp | 10 +++++----- .../performance/replicate/1d_stencil_replicate.cpp | 10 +++++----- .../replicate/1d_stencil_replicate_checksum.cpp | 10 +++++----- .../synchronization/tests/unit/sliding_semaphore.cpp | 9 +++++---- .../checkpoint/examples/1d_stencil_4_checkpoint.cpp | 8 ++++---- tests/performance/local/future_overhead.cpp | 11 ++++++----- 15 files changed, 75 insertions(+), 69 deletions(-) diff --git a/examples/1d_stencil/1d_stencil_4.cpp b/examples/1d_stencil/1d_stencil_4.cpp index de59844927af..c39fe45e4a48 100644 --- a/examples/1d_stencil/1d_stencil_4.cpp +++ b/examples/1d_stencil/1d_stencil_4.cpp @@ -163,7 +163,7 @@ struct stepper }); // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); auto Op = unwrapping(&stepper::heat_part); @@ -184,15 +184,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'nt'. diff --git a/examples/1d_stencil/1d_stencil_4_throttle.cpp b/examples/1d_stencil/1d_stencil_4_throttle.cpp index 87e8222c2712..5358c3eff3f1 100644 --- a/examples/1d_stencil/1d_stencil_4_throttle.cpp +++ b/examples/1d_stencil/1d_stencil_4_throttle.cpp @@ -240,7 +240,7 @@ struct stepper }); // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); auto Op = unwrapping(&stepper::heat_part); @@ -261,15 +261,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'nt'. diff --git a/examples/1d_stencil/1d_stencil_7.cpp b/examples/1d_stencil/1d_stencil_7.cpp index 99258d307a08..71971fb927ef 100644 --- a/examples/1d_stencil/1d_stencil_7.cpp +++ b/examples/1d_stencil/1d_stencil_7.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -359,7 +360,7 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt) // limit depth of dependency tree std::size_t nd = 3; - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); heat_part_action act; for (std::size_t t = 0; t != nt; ++t) @@ -377,15 +378,15 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt) if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'nt'. diff --git a/examples/1d_stencil/1d_stencil_8.cpp b/examples/1d_stencil/1d_stencil_8.cpp index 644452b5f820..d01c227c3724 100644 --- a/examples/1d_stencil/1d_stencil_8.cpp +++ b/examples/1d_stencil/1d_stencil_8.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -575,7 +576,7 @@ stepper_server::space stepper_server::do_work( } // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); for (std::size_t t = 0; t != nt; ++t) { @@ -626,15 +627,15 @@ stepper_server::space stepper_server::do_work( // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } return U_[nt % 2]; diff --git a/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp b/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp index 6e37da5b482b..de5de3981c9b 100644 --- a/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp +++ b/libs/core/resiliency/tests/performance/replay/1d_stencil.cpp @@ -150,7 +150,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, - std::uint64_t nd, hpx::sliding_semaphore& sem) + std::uint64_t nd, std::shared_ptr sem) { using hpx::dataflow; using hpx::unwrapping; @@ -186,15 +186,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -227,7 +227,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, sem); diff --git a/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp b/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp index c67de2cf4d64..cfeff02ab118 100644 --- a/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp +++ b/libs/core/resiliency/tests/performance/replay/1d_stencil_checksum.cpp @@ -236,7 +236,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay_validate; @@ -275,15 +275,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -328,7 +328,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp b/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp index 48438b63079b..a42824677cea 100644 --- a/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp +++ b/libs/core/resiliency/tests/performance/replay/1d_stencil_replay.cpp @@ -193,7 +193,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay; @@ -231,15 +231,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -275,7 +275,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp b/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp index b489745bc4d5..93cd2e11d8a6 100644 --- a/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp +++ b/libs/core/resiliency/tests/performance/replay/dataflow_replay.cpp @@ -237,7 +237,8 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::uint64_t nd, - std::uint64_t n_value, double error, hpx::sliding_semaphore& sem) + std::uint64_t n_value, double error, + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay; @@ -273,15 +274,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -314,7 +315,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work( subdomains, subdomain_width, iterations, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp b/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp index a8746218ed18..b09400bb34e9 100644 --- a/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp +++ b/libs/core/resiliency/tests/performance/replay/dataflow_replay_validate.cpp @@ -229,7 +229,8 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::uint64_t nd, - std::uint64_t n_value, double error, hpx::sliding_semaphore& sem) + std::uint64_t n_value, double error, + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replay_validate; @@ -268,15 +269,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -318,7 +319,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work( subdomains, subdomain_width, iterations, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp b/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp index 8c59a48645f2..d049c042f88b 100644 --- a/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp +++ b/libs/core/resiliency/tests/performance/replay/pure_dataflow.cpp @@ -163,7 +163,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::uint64_t nd, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::dataflow; using hpx::unwrapping; @@ -199,15 +199,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -237,7 +237,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, nd, sem); diff --git a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp index a3633a478c69..33d71caf3757 100644 --- a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp +++ b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate.cpp @@ -194,7 +194,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replicate; @@ -230,15 +230,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -274,7 +274,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp index 76e17b837f59..8b33c2b44edc 100644 --- a/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp +++ b/libs/core/resiliency/tests/performance/replicate/1d_stencil_replicate_checksum.cpp @@ -235,7 +235,7 @@ struct stepper hpx::future do_work(std::size_t subdomains, std::size_t subdomain_width, std::size_t iterations, std::size_t sti, std::uint64_t nd, std::uint64_t n_value, double error, - hpx::sliding_semaphore& sem) + std::shared_ptr sem) { using hpx::unwrapping; using hpx::resiliency::experimental::dataflow_replicate_validate; @@ -271,15 +271,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Return the solution at time-step 'iterations'. @@ -324,7 +324,7 @@ int hpx_main(hpx::program_options::variables_map& vm) { // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); hpx::future result = step.do_work(subdomains, subdomain_width, iterations, sti, nd, n_value, error, sem); diff --git a/libs/core/synchronization/tests/unit/sliding_semaphore.cpp b/libs/core/synchronization/tests/unit/sliding_semaphore.cpp index fd6876fa9b9a..185b65756da7 100644 --- a/libs/core/synchronization/tests/unit/sliding_semaphore.cpp +++ b/libs/core/synchronization/tests/unit/sliding_semaphore.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -23,9 +24,9 @@ int const initial_count = 42; int const num_tasks = 139; std::atomic completed_tasks(0); -void worker(hpx::sliding_semaphore& sem) +void worker(std::shared_ptr sem) { - sem.signal(++count); // signal main thread + sem->signal(++count); // signal main thread } /////////////////////////////////////////////////////////////////////////////// @@ -34,14 +35,14 @@ int hpx_main() std::vector> futures; futures.reserve(num_tasks); - hpx::sliding_semaphore sem(initial_count); + auto sem = std::make_shared(initial_count); for (std::size_t i = 0; i != num_tasks; ++i) { futures.emplace_back(hpx::async(&worker, std::ref(sem))); } - sem.wait(initial_count + num_tasks); + sem->wait(initial_count + num_tasks); HPX_TEST_EQ(count, num_tasks); diff --git a/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp b/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp index 9a8fcf5f933d..c25d206a50df 100644 --- a/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp +++ b/libs/full/checkpoint/examples/1d_stencil_4_checkpoint.cpp @@ -317,7 +317,7 @@ struct stepper print(U); // limit depth of dependency tree - hpx::sliding_semaphore sem(nd); + auto sem = std::make_shared(nd); auto Op = unwrapping(&stepper::heat_part); @@ -367,15 +367,15 @@ struct stepper // trigger the semaphore once computation has reached this point if ((t % nd) == 0) { - next[0].then([&sem, t](partition&&) { + next[0].then([sem, t](partition&&) { // inform semaphore about new lower limit - sem.signal(t); + sem->signal(t); }); } // suspend if the tree has become too deep, the continuation above // will resume this thread once the computation has caught up - sem.wait(t); + sem->wait(t); } // Wait on Checkpoint Printing diff --git a/tests/performance/local/future_overhead.cpp b/tests/performance/local/future_overhead.cpp index 2ea3701c48d0..4bf154750913 100644 --- a/tests/performance/local/future_overhead.cpp +++ b/tests/performance/local/future_overhead.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -257,16 +258,16 @@ void measure_function_futures_sliding_semaphore( // start the clock high_resolution_timer walltime; const int sem_count = 5000; - hpx::sliding_semaphore sem(sem_count); + auto sem = std::make_shared(sem_count); for (std::uint64_t i = 0; i < count; ++i) { - hpx::async(exec, [i, &sem]() { + hpx::async(exec, [i, sem]() { null_function(); - sem.signal(i); + sem->signal(i); }); - sem.wait(i); + sem->wait(i); } - sem.wait(count + sem_count - 1); + sem->wait(count + sem_count - 1); // stop the clock const double duration = walltime.elapsed(); From 5bb0c1dd64400bf1cdc9ea5a9f0b1db6ba66d488 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 21 Aug 2023 14:22:21 -0500 Subject: [PATCH 5/6] Cleaning up implementation --- .../local_workrequesting_scheduler.hpp | 136 +++--------------- 1 file changed, 20 insertions(+), 116 deletions(-) diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp index ac6c1e060ee8..57b737fbae0e 100644 --- a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -141,7 +141,6 @@ namespace hpx::threads::policies { char const* description) : num_queues_(num_queues) , num_high_priority_queues_(num_queues) - , thread_queue_init_() , affinity_data_(affinity_data) , description_(description) { @@ -185,10 +184,7 @@ namespace hpx::threads::policies { failed = 4 }; - steal_request() noexcept - : victims_() - { - } + steal_request() = default; steal_request(std::size_t const num_thread, task_channel* channel, mask_cref_type victims, bool idle, bool const stealhalf) @@ -218,10 +214,7 @@ namespace hpx::threads::policies { //////////////////////////////////////////////////////////////////////// struct scheduler_data { - scheduler_data() noexcept - : victims_() - { - } + scheduler_data() = default; scheduler_data(scheduler_data const&) = delete; scheduler_data(scheduler_data&&) = delete; @@ -731,7 +724,6 @@ namespace hpx::threads::policies { data_[num_thread].data_.queue_->create_thread(data, id, ec); break; - default: case thread_priority::unknown: { HPX_THROW_EXCEPTION(hpx::error::bad_parameter, @@ -879,6 +871,7 @@ namespace hpx::threads::policies { d.queue_->increment_num_stolen_from_pending(); #endif thrds.tasks_.push_back(HPX_MOVE(thrd)); + thrd = thread_id_ref_type{}; } // we are ready to send at least one task @@ -905,7 +898,7 @@ namespace hpx::threads::policies { // Return the next thread to be executed, return false if none is // available bool get_next_thread(std::size_t num_thread, bool running, - thread_id_ref_type& thrd, bool enable_stealing) + thread_id_ref_type& thrd, bool allow_stealing) { HPX_ASSERT(num_thread < num_queues_); @@ -946,7 +939,7 @@ namespace hpx::threads::policies { #endif } - if (enable_stealing && result) + if (allow_stealing && result) { // We found a task to run, however before running it we handle // steal requests (assuming that there is more work left that @@ -986,7 +979,7 @@ namespace hpx::threads::policies { bool allow_fallback = false, thread_priority priority = thread_priority::default_) override { - std::size_t num_thread = static_cast(-1); + auto num_thread = static_cast(-1); if (schedulehint.mode == thread_schedule_hint_mode::thread) { num_thread = schedulehint.hint; @@ -1041,7 +1034,6 @@ namespace hpx::threads::policies { HPX_MOVE(thrd)); break; - default: case thread_priority::unknown: { HPX_THROW_EXCEPTION(hpx::error::bad_parameter, @@ -1056,7 +1048,7 @@ namespace hpx::threads::policies { bool allow_fallback = false, thread_priority priority = thread_priority::default_) override { - std::size_t num_thread = static_cast(-1); + auto num_thread = static_cast(-1); if (schedulehint.mode == thread_schedule_hint_mode::thread) { num_thread = schedulehint.hint; @@ -1101,7 +1093,6 @@ namespace hpx::threads::policies { low_priority_queue_.schedule_thread(HPX_MOVE(thrd), true); break; - default: case thread_priority::default_: case thread_priority::normal: data_[num_thread].data_.queue_->schedule_thread( @@ -1112,6 +1103,13 @@ namespace hpx::threads::policies { data_[num_thread].data_.bound_queue_->schedule_thread( HPX_MOVE(thrd), true); break; + + case thread_priority::unknown: + { + HPX_THROW_EXCEPTION(hpx::error::bad_parameter, + "local_workrequesting_scheduler::schedule_thread_last", + "unknown thread priority value (thread_priority::unknown)"); + } } } @@ -1215,7 +1213,6 @@ namespace hpx::threads::policies { break; } - default: case thread_priority::unknown: { HPX_THROW_EXCEPTION(hpx::error::bad_parameter, @@ -1282,7 +1279,6 @@ namespace hpx::threads::policies { break; } - default: case thread_priority::unknown: { HPX_THROW_EXCEPTION(hpx::error::bad_parameter, @@ -1556,7 +1552,7 @@ namespace hpx::threads::policies { if (d.num_recent_steals_ >= scheduler_data::num_steal_adaptive_interval_) { - double ratio = + double const ratio = static_cast(d.num_recent_tasks_executed_) / d.num_steal_adaptive_interval_; @@ -1662,9 +1658,8 @@ namespace hpx::threads::policies { // scheduler. Returns true if the OS thread calling this function has to // be terminated (i.e. no more work has to be done). bool wait_or_add_new(std::size_t num_thread, bool running, - [[maybe_unused]] std::int64_t& idle_loop_count, - bool enable_stealing, std::size_t& added, - thread_id_ref_type* next_thrd = nullptr) + [[maybe_unused]] std::int64_t& idle_loop_count, bool allow_stealing, + std::size_t& added, thread_id_ref_type* next_thrd = nullptr) { HPX_ASSERT(num_thread < num_queues_); @@ -1676,7 +1671,7 @@ namespace hpx::threads::policies { // threads as these threads are never created 'staged'. bool result = - d.queue_->wait_or_add_new(running, added, enable_stealing); + d.queue_->wait_or_add_new(running, added, allow_stealing); // check if work was available if (0 != added) @@ -1693,7 +1688,7 @@ namespace hpx::threads::policies { return true; // return if no stealing is requested (or not possible) - if (num_queues_ == 1 || !enable_stealing) + if (num_queues_ == 1 || !allow_stealing) return result; // attempt to steal more work @@ -1769,103 +1764,12 @@ namespace hpx::threads::policies { low_priority_queue_.on_start_thread(num_thread); } - std::size_t const num_threads = num_queues_; - //auto const& topo = create_topology(); - // Initially set all bits, code below resets the bits corresponding // to cores that can serve as a victim for the current core. A set // bit in this mask means 'do not steal from this core'. - resize(d.victims_, num_threads); + resize(d.victims_, num_queues_); reset(d.victims_); set(d.victims_, num_thread); - //for (std::size_t i = 0; i != num_threads; ++i) - //{ - // set(d.victims_, i); - //} - // - //// get NUMA domain masks of all queues... - //std::vector numa_masks(num_threads); - //std::vector numa_domains(num_threads); - //std::vector core_masks(num_threads); - //for (std::size_t i = 0; i != num_threads; ++i) - //{ - // std::size_t num_pu = affinity_data_.get_pu_num(i); - // numa_masks[i] = topo.get_numa_node_affinity_mask(num_pu); - // numa_domains[i] = static_cast( - // topo.get_numa_node_number(num_pu)); - // core_masks[i] = topo.get_core_affinity_mask(num_pu); - //} - // - //// iterate over the number of threads again to determine where to - //// steal from - //std::ptrdiff_t radius = - // std::lround(static_cast(num_threads) / 2.0); - // - //mask_cref_type numa_mask = numa_masks[num_thread]; - //mask_cref_type core_mask = core_masks[num_thread]; - // - //auto iterate = [&](auto&& f) { - // // check our neighbors in a radial fashion (left and right - // // alternating, increasing distance each iteration) - // std::ptrdiff_t i = 1; - // for (/**/; i < radius; ++i) - // { - // std::ptrdiff_t left = - // (static_cast(num_thread) - i) % - // static_cast(num_threads); - // if (left < 0) - // left = num_threads + left; - // - // if (f(std::size_t(left))) - // { - // unset(data_[num_thread].data_.victims_, - // static_cast(left)); - // } - // - // std::size_t right = (num_thread + i) % num_threads; - // if (f(right)) - // { - // unset(data_[num_thread].data_.victims_, right); - // } - // } - // if ((num_threads % 2) == 0) - // { - // std::size_t right = (num_thread + i) % num_threads; - // if (f(right)) - // { - // unset(data_[num_thread].data_.victims_, right); - // } - // } - //}; - // - //// check for threads that share the same core... - //iterate([&](std::size_t other_num_thread) { - // return any(core_mask & core_masks[other_num_thread]); - //}); - // - //// check for threads that share the same NUMA domain... - //iterate([&](std::size_t other_num_thread) { - // return !any(core_mask & core_masks[other_num_thread]) && - // any(numa_mask & numa_masks[other_num_thread]); - //}); - // - //// check for the rest and if we are NUMA aware - //if (has_scheduler_mode( - // policies::scheduler_mode::enable_stealing_numa)) - //{ - // iterate([&](std::size_t other_num_thread) { - // // allow stealing from neighboring NUMA domain only - // std::ptrdiff_t numa_distance = numa_domains[num_thread] - - // numa_domains[other_num_thread]; - // if (numa_distance > 1 || numa_distance < -1) - // return false; - // // steal from even cores from neighboring NUMA domains - // if (numa_distance == 1 || numa_distance == -1) - // return other_num_thread % 2 == 0; - // // cores from our domain are handled above - // return false; - // }); - //} } void on_stop_thread(std::size_t num_thread) override From 02e2b4bdb5108fb7df3ac8b5a95b6795232247f1 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Mon, 28 Aug 2023 10:10:30 -0500 Subject: [PATCH 6/6] Reapply necessary fixes to scheduler --- .../local_workrequesting_scheduler.hpp | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp index 57b737fbae0e..bd9909fb9472 100644 --- a/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/local_workrequesting_scheduler.hpp @@ -1675,7 +1675,9 @@ namespace hpx::threads::policies { // check if work was available if (0 != added) + { return result; + } if (num_thread == num_queues_ - 1) { @@ -1683,26 +1685,29 @@ namespace hpx::threads::policies { result; } - // check if we have been disabled - if (!running) - return true; - - // return if no stealing is requested (or not possible) - if (num_queues_ == 1 || !allow_stealing) - return result; + // check if we have been disabled or if no stealing is requested (or + // not possible) + if (!running || num_queues_ == 1) + { + return !running; + } // attempt to steal more work - send_steal_request(d); - HPX_ASSERT(d.requested_ != 0); + if (allow_stealing) + { + send_steal_request(d); + HPX_ASSERT(d.requested_ != 0); + } - // now try to handle steal requests again if we have not received a - // task from some other core yet - if (!try_receiving_tasks(d, added, next_thrd)) + if (try_receiving_tasks(d, added, next_thrd)) { - // decline or forward all pending steal requests - decline_or_forward_all_steal_requests(d); + return false; } + // if we did not receive any new task, decline or forward all + // pending steal requests + decline_or_forward_all_steal_requests(d); + #ifdef HPX_HAVE_THREAD_MINIMAL_DEADLOCK_DETECTION // no new work is available, are we deadlocked? if (HPX_UNLIKELY(get_minimal_deadlock_detection_enabled() &&