Skip to content

Commit

Permalink
Merge #5845
Browse files Browse the repository at this point in the history
5845: Adding local work requesting scheduler that is based on message passing internally r=hkaiser a=hkaiser

This adds a new experimental work-requesting scheduler to the list of existing schedulers

- Using uniform_int_distribution with proper bounds
- Removing queue index from thread_queues as it was unused
- Renaming workstealing --> workrequesting
- Adding adaptive work stealing (steal half/steal one)
  - this makes this scheduler consistently (albeit only slightly) faster than the (default) local-priority scheduler
- Adding LIFO and FIFO variations of local work-stealing scheduler
  - flyby: fixing HPX_WITH_SWAP_CONTEXT_EMULATION
  - flyby: minor changes to fibonacci_local example
- Adding high- and low- priority queues
  - flyby: cache_line_data now does not generate warnings errors if padding is not needed
- Adding bound queues
- flyby: using cache_line_data for scheduler states


Co-authored-by: Hartmut Kaiser <[email protected]>
  • Loading branch information
StellarBot and hkaiser committed Aug 30, 2023
2 parents c435525 + 02e2b4b commit 7849eef
Show file tree
Hide file tree
Showing 37 changed files with 2,134 additions and 100 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ jobs:
-DHPX_WITH_FETCH_JSON=On \
-DCMAKE_EXPORT_COMPILE_COMMANDS=On \
-DHPX_WITH_DOCUMENTATION=On \
-DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}"
-DHPX_WITH_DOCUMENTATION_OUTPUT_FORMATS="${DOCUMENTATION_OUTPUT_FORMATS}" \
-DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo
- persist_to_workspace:
root: /hpx
paths:
Expand Down
2 changes: 2 additions & 0 deletions .jenkins/lsu/env-clang-13.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ configure_extra_options+=" -DHPX_WITH_PARCELPORT_MPI=ON"
configure_extra_options+=" -DHPX_WITH_PARCELPORT_LCI=ON"
configure_extra_options+=" -DHPX_WITH_FETCH_LCI=ON"
configure_extra_options+=" -DHPX_WITH_LOGGING=OFF"
configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-fifo"


# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"
Expand Down
3 changes: 3 additions & 0 deletions .jenkins/lsu/env-gcc-11.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module load pwrapi/1.1.1
export HPXRUN_RUNWRAPPER=srun
export CXX_STD="20"

configure_extra_options+=" -DCMAKE_BUILD_TYPE=${build_type}"
configure_extra_options+=" -DHPX_WITH_CXX_STANDARD=${CXX_STD}"
configure_extra_options+=" -DHPX_WITH_MALLOC=system"
configure_extra_options+=" -DHPX_WITH_FETCH_ASIO=ON"
Expand All @@ -25,3 +26,5 @@ configure_extra_options+=" -DHPX_WITH_DATAPAR_BACKEND=STD_EXPERIMENTAL_SIMD"

# The pwrapi library still needs to be set up properly on rostam
# configure_extra_options+=" -DHPX_WITH_POWER_COUNTER=ON"

configure_extra_options+=" -DHPX_WITH_TESTS_COMMAND_LINE=--hpx:queuing=local-workrequesting-lifo"
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,13 @@ hpx_option(
ADVANCED
)

hpx_option(
HPX_WITH_TESTS_COMMAND_LINE STRING
"Add given command line options to all tests run" ""
CATEGORY "Debugging"
ADVANCED
)

hpx_option(
HPX_WITH_TESTS_MAX_THREADS_PER_LOCALITY
STRING
Expand Down
5 changes: 5 additions & 0 deletions cmake/HPX_AddTest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ function(add_hpx_test category name)
)
endif()

# add additional command line arguments to all test executions
if(NOT x${HPX_WITH_TESTS_COMMAND_LINE} STREQUAL x"")
set(args ${args} "${HPX_WITH_TESTS_COMMAND_LINE}")
endif()

if(${HPX_WITH_PARALLEL_TESTS_BIND_NONE}
AND NOT run_serial
AND NOT "${name}_RUNWRAPPER"
Expand Down
37 changes: 26 additions & 11 deletions docs/sphinx/manual/hpx_runtime_and_resources.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
|hpx| thread scheduling policies
================================

The |hpx| runtime has five thread scheduling policies: local-priority,
static-priority, local, static and abp-priority. These policies can be specified
from the command line using the command line option :option:`--hpx:queuing`. In
order to use a particular scheduling policy, the runtime system must be built
with the appropriate scheduler flag turned on (e.g. ``cmake
-DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for more
information).
The |hpx| runtime has six thread scheduling policies: local-priority,
static-priority, local, static, local-workrequesting-fifo, and abp-priority.
These policies can be specified from the command line using the command line
option :option:`--hpx:queuing`. In order to use a particular scheduling policy,
the runtime system must be built with the appropriate scheduler flag turned on
(e.g. ``cmake -DHPX_THREAD_SCHEDULERS=local``, see :ref:`cmake_variables` for
more information).

Priority local scheduling policy (default policy)
-------------------------------------------------
Expand Down Expand Up @@ -51,9 +51,7 @@ policy and must be invoked using the command line option
Static priority scheduling policy
---------------------------------

* invoke using: :option:`--hpx:queuing`\ ``=static-priority`` (or ``-qs``)
* flag to turn on for build: ``HPX_THREAD_SCHEDULERS=all`` or
``HPX_THREAD_SCHEDULERS=static-priority``
* invoke using: :option:`--hpx:queuing`\ ``static-priority`` (or ``-qs``)

The static scheduling policy maintains one queue per OS thread from which each
OS thread pulls its tasks (user threads). Threads are distributed in a round
Expand Down Expand Up @@ -102,7 +100,7 @@ domain first, only after that work is stolen from other NUMA domains.
This scheduler can be used with two underlying queuing policies (FIFO:
first-in-first-out, and LIFO: last-in-first-out). In order to use the LIFO
policy use the command line option
:option:`--hpx:queuing`\ ``=abp-priority-lifo``.
:option:`--hpx:queuing`\ ``abp-priority-lifo``.

..
Questions, concerns and notes:
Expand Down Expand Up @@ -151,6 +149,23 @@ policy use the command line option

I see both FIFO and double ended queues in ABP policies?

Work requesting scheduling policies
-----------------------------------

* invoke using: :option:`--hpx:queuing`\ ``local-workrequesting-fifo``
or using :option:`--hpx:queuing`\ ``local-workrequesting-lifo``

The work-requesting policies rely on a different mechanism of balancing work
between cores (compared to the other policies listed above). Instead of actively
trying to steal work from other cores, requesting work relies on a less
disruptive mechanism. If a core runs out of work, instead of actively looking at
the queues of neighboring cores, in this case a request is posted to another
core. This core now (whenever it is not busy with other work) either responds to
the original core by sending back work or passes the request on to the next
possible core in the system. In general, this scheme avoids contention on the
work queues as those are always accessed by their own cores only.


The |hpx| resource partitioner
==============================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,9 @@ The predefined command line options for any application using

The queue scheduling policy to use. Options are ``local``,
``local-priority-fifo``, ``local-priority-lifo``, ``static``,
``static-priority``, ``abp-priority-fifo`` and ``abp-priority-lifo``
``static-priority``, ``abp-priority-fifo``,
``local-workrequesting-fifo``, ``local-workrequesting-lifo``
and ``abp-priority-lifo``
(default: ``local-priority-fifo``).

.. option:: --hpx:high-priority-threads arg
Expand Down
8 changes: 4 additions & 4 deletions examples/1d_stencil/1d_stencil_4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ struct stepper
});

// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

auto Op = unwrapping(&stepper::heat_part);

Expand All @@ -184,15 +184,15 @@ struct stepper
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'nt'.
Expand Down
8 changes: 4 additions & 4 deletions examples/1d_stencil/1d_stencil_4_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ struct stepper
});

// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

auto Op = unwrapping(&stepper::heat_part);

Expand All @@ -261,15 +261,15 @@ struct stepper
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'nt'.
Expand Down
9 changes: 5 additions & 4 deletions examples/1d_stencil/1d_stencil_7.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <memory>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -359,7 +360,7 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt)

// limit depth of dependency tree
std::size_t nd = 3;
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

heat_part_action act;
for (std::size_t t = 0; t != nt; ++t)
Expand All @@ -377,15 +378,15 @@ stepper::space stepper::do_work(std::size_t np, std::size_t nx, std::size_t nt)

if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'nt'.
Expand Down
9 changes: 5 additions & 4 deletions examples/1d_stencil/1d_stencil_8.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <memory>
#include <mutex>
#include <stack>
#include <string>
Expand Down Expand Up @@ -575,7 +576,7 @@ stepper_server::space stepper_server::do_work(
}

// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

for (std::size_t t = 0; t != nt; ++t)
{
Expand Down Expand Up @@ -626,15 +627,15 @@ stepper_server::space stepper_server::do_work(
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

return U_[nt % 2];
Expand Down
25 changes: 15 additions & 10 deletions examples/quickstart/fibonacci_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <cstdint>
#include <iostream>
#include <string>
#include <utility>
#include <vector>

///////////////////////////////////////////////////////////////////////////////
//[fibonacci
Expand All @@ -26,22 +29,20 @@ std::uint64_t fibonacci(std::uint64_t n)
if (n < 2)
return n;

// Invoking the Fibonacci algorithm twice is inefficient.
// However, we intentionally demonstrate it this way to create some
// heavy workload.

hpx::future<std::uint64_t> n1 = hpx::async(fibonacci, n - 1);
hpx::future<std::uint64_t> n2 = hpx::async(fibonacci, n - 2);
std::uint64_t n2 = fibonacci(n - 2);

return n1.get() +
n2.get(); // wait for the Futures to return their values
return n1.get() + n2; // wait for the Future to return their values
}
//fibonacci]

///////////////////////////////////////////////////////////////////////////////
//[hpx_main
int hpx_main(hpx::program_options::variables_map& vm)
{
hpx::threads::add_scheduler_mode(
hpx::threads::policies::scheduler_mode::fast_idle_mode);

// extract command line argument, i.e. fib(N)
std::uint64_t n = vm["n-value"].as<std::uint64_t>();

Expand All @@ -67,9 +68,13 @@ int main(int argc, char* argv[])
hpx::program_options::options_description desc_commandline(
"Usage: " HPX_APPLICATION_STRING " [options]");

desc_commandline.add_options()("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function");
// clang-format off
desc_commandline.add_options()
("n-value",
hpx::program_options::value<std::uint64_t>()->default_value(10),
"n value for the Fibonacci function")
;
// clang-format on

// Initialize and run HPX
hpx::local::init_params init_args;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,18 +478,24 @@ namespace hpx::local::detail {
"run on (default: 0), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
"and --hpx:queuing=local-priority only")
("hpx:pu-step", value<std::size_t>(),
"the step between used processing unit numbers for this "
"instance of HPX (default: 1), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
"and --hpx:queuing=local-priority only")
("hpx:affinity", value<std::string>(),
"the affinity domain the OS threads will be confined to, "
"possible values: pu, core, numa, machine (default: pu), valid for "
"--hpx:queuing=local, --hpx:queuing=abp-priority, "
"--hpx:queuing=static, --hpx:queuing=static-priority "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
" and --hpx:queuing=local-priority only")
("hpx:bind", value<std::vector<std::string> >()->composing(),
"the detailed affinity description for the OS threads, see "
Expand All @@ -514,13 +520,16 @@ namespace hpx::local::detail {
("hpx:queuing", value<std::string>(),
"the queue scheduling policy to use, options are "
"'local', 'local-priority-fifo','local-priority-lifo', "
"'abp-priority-fifo', 'abp-priority-lifo', 'static', and "
"'static-priority' (default: 'local-priority'; "
"'abp-priority-fifo', 'abp-priority-lifo', 'static', "
"'static-priority', 'local-workrequesting-fifo', and "
"'local-workrequesting-lifo' (default: 'local-priority'; "
"all option values can be abbreviated)")
("hpx:high-priority-threads", value<std::size_t>(),
"the number of operating system threads maintaining a high "
"priority queue (default: number of OS threads), valid for "
"--hpx:queuing=local-priority,--hpx:queuing=static-priority, "
"--hpx:queuing=local-workrequesting-fifo, "
"--hpx:queuing=local-workrequesting-lifo, "
" and --hpx:queuing=abp-priority only)")
("hpx:numa-sensitive", value<std::size_t>()->implicit_value(0),
"makes the local-priority scheduler NUMA sensitive ("
Expand Down
10 changes: 5 additions & 5 deletions libs/core/resiliency/tests/performance/replay/1d_stencil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ struct stepper

hpx::future<space> do_work(std::size_t subdomains,
std::size_t subdomain_width, std::size_t iterations, std::size_t sti,
std::uint64_t nd, hpx::sliding_semaphore& sem)
std::uint64_t nd, std::shared_ptr<hpx::sliding_semaphore> sem)
{
using hpx::dataflow;
using hpx::unwrapping;
Expand Down Expand Up @@ -186,15 +186,15 @@ struct stepper
// trigger the semaphore once computation has reached this point
if ((t % nd) == 0)
{
next[0].then([&sem, t](partition&&) {
next[0].then([sem, t](partition&&) {
// inform semaphore about new lower limit
sem.signal(t);
sem->signal(t);
});
}

// suspend if the tree has become too deep, the continuation above
// will resume this thread once the computation has caught up
sem.wait(t);
sem->wait(t);
}

// Return the solution at time-step 'iterations'.
Expand Down Expand Up @@ -227,7 +227,7 @@ int hpx_main(hpx::program_options::variables_map& vm)

{
// limit depth of dependency tree
hpx::sliding_semaphore sem(nd);
auto sem = std::make_shared<hpx::sliding_semaphore>(nd);

hpx::future<stepper::space> result =
step.do_work(subdomains, subdomain_width, iterations, sti, nd, sem);
Expand Down
Loading

0 comments on commit 7849eef

Please sign in to comment.