diff --git a/CMakeLists.txt b/CMakeLists.txt index 0c18b15e62a4..a83bd619a3bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright (c) 2020 Mikael Simberg # Copyright (c) 2007-2024 Hartmut Kaiser # Copyright (c) 2011-2014 Thomas Heller +# Copyright (c) 2024 Isidoros Tsaousis-Seiras # Copyright (c) 2007-2008 Chirag Dekate # Copyright (c) 2011 Bryce Lelbach # Copyright (c) 2011 Vinay C Amatya @@ -578,6 +579,27 @@ if(HPX_WITH_CUDA AND HPX_WITH_HIP) ) endif() +# ## HPX STDEXEC configuration ## + +hpx_option( + HPX_WITH_STDEXEC BOOL + "Use STDEXEC executors instead of native HPX.(default: OFF)" OFF + CATEGORY "Executor" + ADVANCED +) + +hpx_option( + HPX_WITH_FETCH_STDEXEC BOOL "Use FetchContent to fetch STDEXEC.(default: ON)" + ON + CATEGORY "Executor" + ADVANCED +) + +hpx_option( + HPX_WITH_STDEXEC_TAG STRING "STDEXEC repository tag or branch" "main" + CATEGORY "Executor" +) + # ############################################################################## # HPX SYCL configuration # ############################################################################## @@ -1346,7 +1368,7 @@ if(HPX_WITH_NETWORKING AND HPX_WITH_PARCELPORT_LCI) endif() endif() -# External libraries/frameworks used by sme of the examples and benchmarks +# External libraries/frameworks used by some of the examples and benchmarks hpx_option( HPX_WITH_EXAMPLES_OPENMP BOOL "Enable examples requiring OpenMP support (default: OFF)." OFF @@ -2265,6 +2287,9 @@ if(HPX_WITH_CUDA OR HPX_WITH_HIP) hpx_add_config_define(HPX_HAVE_GPU_SUPPORT) endif() +# Setup NVIDIA's stdexec if requested +include(HPX_SetupStdexec) + if(HPX_WITH_SANITIZERS) hpx_add_config_define(HPX_HAVE_SANITIZERS) endif() diff --git a/cmake/FindStdexec.cmake b/cmake/FindStdexec.cmake new file mode 100644 index 000000000000..9967c04b9afb --- /dev/null +++ b/cmake/FindStdexec.cmake @@ -0,0 +1,39 @@ +# Copyright (c) 2024 Isidoros Tsaousis-Seiras +# +# SPDX-License-Identifier: BSL-1.0 +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +if(NOT TARGET STDEXEC::stdexec) + if(STDEXEC_ROOT AND NOT Stdexec_ROOT) + set(Stdexec_ROOT + ${STDEXEC_ROOT} + CACHE PATH "stdexec base directory" + ) + unset(STDEXEC_ROOT CACHE) + endif() + + find_path(Stdexec_INCLUDE_DIR stdexec HINTS ${Stdexec_ROOT}) + message(STATUS "stdexec include dir: ${Stdexec_INCLUDE_DIR}") + if(Stdexec_INCLUDE_DIR) + file(TO_CMAKE_PATH ${Stdexec_INCLUDE_DIR} Stdexec_INCLUDE_DIR) + else() + message(FATAL_ERROR "stdexec not found") + endif() + + include(FindPackageHandleStandardArgs) + find_package_handle_standard_args( + Stdexec + REQUIRED_VARS Stdexec_INCLUDE_DIR + FOUND_VAR Stdexec_FOUND + VERSION_VAR Stdexec_VERSION + FAIL_MESSAGE "stdexec not found" + ) + + add_library(STDEXEC::stdexec INTERFACE IMPORTED) + target_include_directories( + STDEXEC::stdexec SYSTEM INTERFACE ${Stdexec_INCLUDE_DIR} + ) + + mark_as_advanced(Stdexec_INCLUDE_DIR Stdexec_ROOT) +endif() diff --git a/cmake/HPX_AddConfigTest.cmake b/cmake/HPX_AddConfigTest.cmake index d3adb881c377..92b24ceec194 100644 --- a/cmake/HPX_AddConfigTest.cmake +++ b/cmake/HPX_AddConfigTest.cmake @@ -633,6 +633,15 @@ function(hpx_check_for_cxx20_std_bit_cast) ) endfunction() +# ############################################################################## +function(hpx_check_for_cxx20_std_identity) + add_hpx_config_test( + HPX_WITH_CXX20_STD_IDENTITY + SOURCE cmake/tests/cxx20_std_identity.cpp + FILE ${ARGN} + ) +endfunction() + # ############################################################################## function(hpx_check_for_cxx20_constexpr_destructor) add_hpx_config_test( diff --git a/cmake/HPX_PerformCxxFeatureTests.cmake b/cmake/HPX_PerformCxxFeatureTests.cmake index 2e9d02eb8de6..9e2389ef376f 100644 --- a/cmake/HPX_PerformCxxFeatureTests.cmake +++ b/cmake/HPX_PerformCxxFeatureTests.cmake @@ -152,6 +152,8 @@ function(hpx_perform_cxx_feature_tests) hpx_check_for_cxx20_std_bit_cast(DEFINITIONS HPX_HAVE_CXX20_STD_BIT_CAST) + hpx_check_for_cxx20_std_identity(DEFINITIONS HPX_HAVE_CXX20_STD_IDENTITY) + hpx_check_for_cxx20_constexpr_destructor( DEFINITIONS HPX_HAVE_CXX20_CONSTEXPR_DESTRUCTOR ) diff --git a/cmake/HPX_SetupBoost.cmake b/cmake/HPX_SetupBoost.cmake index 01b6058375d2..14aa36c25df8 100644 --- a/cmake/HPX_SetupBoost.cmake +++ b/cmake/HPX_SetupBoost.cmake @@ -109,7 +109,7 @@ if(NOT TARGET hpx_dependencies_boost) hpx_set_cmake_policy(CMP0167 OLD) # use CMake's FindBoost for now # Find the headers and get the version - find_package(Boost ${Boost_MINIMUM_VERSION} REQUIRED) + find_package(Boost ${Boost_MINIMUM_VERSION} NO_POLICY_SCOPE MODULE REQUIRED) if(NOT Boost_VERSION_STRING) set(Boost_VERSION_STRING "${Boost_MAJOR_VERSION}.${Boost_MINOR_VERSION}.${Boost_SUBMINOR_VERSION}" diff --git a/cmake/HPX_SetupStdexec.cmake b/cmake/HPX_SetupStdexec.cmake new file mode 100644 index 000000000000..b52a114a7a83 --- /dev/null +++ b/cmake/HPX_SetupStdexec.cmake @@ -0,0 +1,104 @@ +# Copyright (c) 2024 Isidoros Tsaousis-Seiras +# +# SPDX-License-Identifier: BSL-1.0 +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +if(HPX_WITH_STDEXEC) + if(HPX_WITH_CXX_STANDARD LESS 20) + hpx_error("HPX_WITH_STDEXEC Requires C++20 or later.") + endif() + if(NOT HPX_WITH_CXX20_STD_IDENTITY) + hpx_error("HPX_WITH_STDEXEC Requires std::identity.") + endif() + if(MSVC) + hpx_error("HPX_WITH_STDEXEC is not available on MSVC.") + endif() +endif() + +if(STDEXEC_ROOT AND NOT Stdexec_ROOT) + set(Stdexec_ROOT ${STDEXEC_ROOT}) + # remove STDEXEC_ROOT from the cache + unset(STDEXEC_ROOT CACHE) +endif() + +if(HPX_WITH_STDEXEC) + # prefer HPX_WITH_FETCH_STDEXEC by default + if(Stdexec_ROOT AND HPX_WITH_FETCH_STDEXEC) + hpx_warn( + "Both Stdexec_ROOT and HPX_WITH_FETCH_STDEXEC are provided. HPX_WITH_FETCH_STDEXEC will take precedence." + ) + endif() + + hpx_add_config_define(HPX_HAVE_STDEXEC) + + if(HPX_WITH_FETCH_STDEXEC) + hpx_info( + "HPX_WITH_FETCH_STDEXEC=${HPX_WITH_FETCH_STDEXEC}, Stdexec will be fetched using CMake's FetchContent and installed alongside HPX (HPX_WITH_STDEXEC_TAG=${HPX_WITH_STDEXEC_TAG})" + ) + if(UNIX) + include(FetchContent) + fetchcontent_declare( + Stdexec + GIT_REPOSITORY https://github.com/NVIDIA/stdexec.git + GIT_TAG ${HPX_WITH_STDEXEC_TAG} + ) + + fetchcontent_getproperties(Stdexec) + if(NOT Stdexec_POPULATED) + fetchcontent_populate(Stdexec) + endif() + set(Stdexec_ROOT ${stdexec_SOURCE_DIR}) + + add_library(Stdexec INTERFACE) + target_include_directories( + Stdexec INTERFACE $ + $ + ) + + install( + TARGETS Stdexec + EXPORT HPXStdexecTarget + COMPONENT core + ) + + install( + DIRECTORY ${Stdexec_ROOT}/include/ + DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} + COMPONENT core + FILES_MATCHING + PATTERN "*.hpp" + ) + + export( + TARGETS Stdexec + NAMESPACE Stdexec:: + FILE "${CMAKE_CURRENT_BINARY_DIR}/lib/cmake/${HPX_PACKAGE_NAME}/HPXStdexecTarget.cmake" + ) + + install( + EXPORT HPXStdexecTarget + NAMESPACE Stdexec:: + FILE HPXStdexecTarget.cmake + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${HPX_PACKAGE_NAME} + COMPONENT cmake + ) + + # TODO: Enforce a single spelling + add_library(Stdexec::Stdexec ALIAS Stdexec) + add_library(STDEXEC::stdexec ALIAS Stdexec) + + endif() + + else() + find_package(Stdexec REQUIRED) + + if(Stdexec_FOUND) + hpx_add_config_define(HPX_HAVE_STDEXEC) + else() + hpx_error( + "Stdexec could not be found, please specify Stdexec_ROOT to point to the correct location or enable HPX_WITH_FETCH_STDEXEC" + ) + endif() + endif() +endif() diff --git a/cmake/templates/HPXConfig.cmake.in b/cmake/templates/HPXConfig.cmake.in index 5c2c7260f96e..3c5161690bde 100644 --- a/cmake/templates/HPXConfig.cmake.in +++ b/cmake/templates/HPXConfig.cmake.in @@ -27,6 +27,20 @@ else() include(HPX_SetupAsio) endif() + +# Stdexec can be installed by HPX or externally installed. In the first case we use +# exported targets, in the second we find Stdexec again using find_package. +if(HPX_WITH_STDEXEC) + if(HPX_WITH_FETCH_STDEXEC) + include("${CMAKE_CURRENT_LIST_DIR}/HPXStdexecTarget.cmake") + else() + set(Stdexec_ROOT "@Stdexec_ROOT@") + include(HPX_SetupStdexec) + endif() +endif() + + + # NLohnmann JSON can be installed by HPX or externally installed. In the first # case we use exported targets, in the second we find JSON again using # find_package. diff --git a/cmake/tests/cxx20_std_identity.cpp b/cmake/tests/cxx20_std_identity.cpp new file mode 100644 index 000000000000..a1373bb2d1a4 --- /dev/null +++ b/cmake/tests/cxx20_std_identity.cpp @@ -0,0 +1,14 @@ +// Copyright (c) 2024 Isidoros Tsaousis-Seiras +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +// test for availability of std::identity + +#include + +int main() +{ + auto f = std::identity{}(3); +} diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 3f5e4546c3ec..cc4381550257 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -376,6 +376,10 @@ if(HPX_WITH_ITTNOTIFY) target_link_libraries(hpx_core PUBLIC Amplifier::amplifier) endif() +if(TARGET STDEXEC::stdexec) + target_link_libraries(hpx_core INTERFACE STDEXEC::stdexec) +endif() + if(HPX_WITH_PARCELPORT_GASNET AND GASNET_LIBRARY_DIRS) target_link_directories(hpx_core PUBLIC ${GASNET_LIBRARY_DIRS}) endif() diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/rotate.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/rotate.hpp index 2bf345ad3e92..6e3965057191 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/rotate.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/rotate.hpp @@ -4,10 +4,9 @@ // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) - -/// \file hpx/parallel/algorithms/rotate.hpp /// \page hpx::rotate, hpx::rotate_copy /// \headerfile hpx/algorithm.hpp +/// \file hpx/parallel/algorithms/rotate.hpp #pragma once @@ -266,27 +265,58 @@ namespace hpx::parallel { detail::reverse r; - return hpx::dataflow( - hpx::launch::sync, - [=](auto&& f1, auto&& f2) mutable { - // propagate exceptions, if appropriate - constexpr bool handle_futures = - hpx::traits::is_future_v && - hpx::traits::is_future_v; - - if constexpr (handle_futures) - { - f1.get(); - f2.get(); - } - - r.call(p(hpx::execution::non_task), first, last); - - std::advance(first, size_right); - return util::in_out_result{first, last}; - }, - r.call(left_policy, first, new_first), - r.call(right_policy, new_first, last)); + auto&& process = [=](auto&& f1, auto&& f2) mutable { + // propagate exceptions, if appropriate + constexpr bool handle_futures = + hpx::traits::is_future_v && + hpx::traits::is_future_v; + + if constexpr (handle_futures) + { + f1.get(); + f2.get(); + } + + r.call(p(hpx::execution::non_task), first, last); + + std::advance(first, size_right); + return util::in_out_result{first, last}; + }; + + using rcall_left_t = + decltype(r.call(left_policy, first, new_first)); + using rcall_right_t = + decltype(r.call(right_policy, new_first, last)); + + // Sanity check + static_assert(std::is_same_v); + + constexpr bool handle_senders = + hpx::execution::experimental::is_sender_v; + constexpr bool handle_futures = + hpx::traits::is_future_v; + constexpr bool handle_both = handle_senders && handle_futures; + + static_assert(handle_senders || handle_futures, + "the reverse operation must return either a sender or a " + "future"); + + // Futures pass the concept check for senders, so if something is + // both a future and a sender we treat it as a future. + if constexpr (handle_futures || handle_both) + { + return hpx::dataflow(hpx::launch::sync, std::move(process), + r.call(left_policy, first, new_first), + r.call(right_policy, new_first, last)); + } + else if constexpr (handle_senders && !handle_both) + { + return hpx::execution::experimental::then( + hpx::execution::experimental::when_all( + r.call(left_policy, first, new_first), + r.call(right_policy, new_first, last)), + std::move(process)); + } } template diff --git a/libs/core/algorithms/include/hpx/parallel/util/detail/algorithm_result.hpp b/libs/core/algorithms/include/hpx/parallel/util/detail/algorithm_result.hpp index 14de93f2ac2d..ae04cb021c5f 100644 --- a/libs/core/algorithms/include/hpx/parallel/util/detail/algorithm_result.hpp +++ b/libs/core/algorithms/include/hpx/parallel/util/detail/algorithm_result.hpp @@ -18,12 +18,11 @@ #include #include -#ifdef HPX_HAVE_STDEXEC +#if defined(HPX_HAVE_STDEXEC) // for is_sender #include #endif -#include #include #include diff --git a/libs/core/algorithms/tests/unit/algorithms/adjacentdifference_tests.hpp b/libs/core/algorithms/tests/unit/algorithms/adjacentdifference_tests.hpp index c1965226f782..0bd32e56d915 100644 --- a/libs/core/algorithms/tests/unit/algorithms/adjacentdifference_tests.hpp +++ b/libs/core/algorithms/tests/unit/algorithms/adjacentdifference_tests.hpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include diff --git a/libs/core/algorithms/tests/unit/algorithms/foreach_scheduler.cpp b/libs/core/algorithms/tests/unit/algorithms/foreach_scheduler.cpp index f0e3c5765f3d..15b6a98541a0 100644 --- a/libs/core/algorithms/tests/unit/algorithms/foreach_scheduler.cpp +++ b/libs/core/algorithms/tests/unit/algorithms/foreach_scheduler.cpp @@ -95,12 +95,10 @@ void test_for_each_execute_on_async(Policy l, ExPolicy&& policy, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; - auto snd_result = tt::sync_wait(hpx::for_each( + auto result = tt::sync_wait(hpx::for_each( ex::execute_on(scheduler_t(l), std::forward(policy)), iterator(std::begin(c)), iterator(std::end(c)), f)); - auto result = hpx::get<0>(*snd_result); - - HPX_TEST(result == iterator(std::end(c))); + HPX_TEST(hpx::get<0>(*result) == iterator(std::end(c))); // verify values std::size_t count = 0; @@ -131,13 +129,11 @@ void test_for_each_execute_on_sender(Policy l, ExPolicy&& policy, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; - auto snd_result = tt::sync_wait( + auto result = tt::sync_wait( ex::just(iterator(std::begin(c)), iterator(std::end(c)), f) | hpx::for_each( ex::execute_on(scheduler_t(l), std::forward(policy)))); - auto result = hpx::get<0>(*snd_result); - - HPX_TEST(result == iterator(std::end(c))); + HPX_TEST(hpx::get<0>(*result) == iterator(std::end(c))); // verify values std::size_t count = 0; diff --git a/libs/core/algorithms/tests/unit/container_algorithms/adjacentdifference_range_sender.cpp b/libs/core/algorithms/tests/unit/container_algorithms/adjacentdifference_range_sender.cpp index b571b358c558..efddda7943f1 100644 --- a/libs/core/algorithms/tests/unit/container_algorithms/adjacentdifference_range_sender.cpp +++ b/libs/core/algorithms/tests/unit/container_algorithms/adjacentdifference_range_sender.cpp @@ -87,9 +87,14 @@ void test_adjacent_difference_async_direct(Policy l, ExPolicy&& p) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); +#if defined(HPX_HAVE_STDEXEC) + auto result = tt::sync_wait( + hpx::ranges::adjacent_difference(p.on(exec), c, std::begin(d))); +#else auto result = hpx::ranges::adjacent_difference(p.on(exec), c, std::begin(d)) | tt::sync_wait(); +#endif std::adjacent_difference(std::begin(c), std::end(c), std::begin(d_ans)); diff --git a/libs/core/algorithms/tests/unit/container_algorithms/foreach_range_sender.cpp b/libs/core/algorithms/tests/unit/container_algorithms/foreach_range_sender.cpp index f39f60d1a7fd..18045a5619f3 100644 --- a/libs/core/algorithms/tests/unit/container_algorithms/foreach_range_sender.cpp +++ b/libs/core/algorithms/tests/unit/container_algorithms/foreach_range_sender.cpp @@ -77,8 +77,7 @@ void test_for_each_explicit_sender_direct_async( using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); - auto result = - hpx::ranges::for_each(policy.on(exec), rng, f) | tt::sync_wait(); + auto result = tt::sync_wait(hpx::ranges::for_each(policy.on(exec), rng, f)); HPX_TEST(hpx::get<0>(*result) == iterator(std::end(c))); // verify values @@ -114,8 +113,8 @@ void test_for_each_explicit_sender(Policy l, ExPolicy&& policy, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); - auto result = ex::just(rng, f) | hpx::ranges::for_each(policy.on(exec)) | - tt::sync_wait(); + auto result = tt::sync_wait( + ex::just(rng, f) | hpx::ranges::for_each(policy.on(exec))); HPX_TEST(hpx::get<0>(*result) == iterator(std::end(c))); // verify values diff --git a/libs/core/algorithms/tests/unit/container_algorithms/foreach_tests.hpp b/libs/core/algorithms/tests/unit/container_algorithms/foreach_tests.hpp index bde9b18ffd8a..e5fc1f1b080b 100644 --- a/libs/core/algorithms/tests/unit/container_algorithms/foreach_tests.hpp +++ b/libs/core/algorithms/tests/unit/container_algorithms/foreach_tests.hpp @@ -352,8 +352,8 @@ void test_for_each_sender(Policy l, ExPolicy&& p, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); - auto result = hpx::get<0>(*(ex::just(rng, f) | - hpx::ranges::for_each(p.on(exec)) | tt::sync_wait())); + auto result = hpx::get<0>( + *tt::sync_wait(ex::just(rng, f) | hpx::ranges::for_each(p.on(exec)))); HPX_TEST(result == iterator(std::end(c))); // verify values @@ -387,7 +387,7 @@ void test_for_each_exception_sender(Policy l, ExPolicy&& p, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); - ex::just(rng, f) | hpx::ranges::for_each(p.on(exec)) | tt::sync_wait(); + tt::sync_wait(ex::just(rng, f) | hpx::ranges::for_each(p.on(exec))); HPX_TEST(false); } @@ -426,7 +426,7 @@ void test_for_each_bad_alloc_sender(Policy l, ExPolicy&& p, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); - ex::just(rng, f) | hpx::ranges::for_each(p.on(exec)) | tt::sync_wait(); + tt::sync_wait(ex::just(rng, f) | hpx::ranges::for_each(p.on(exec))); HPX_TEST(false); } diff --git a/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp b/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp index 1a7da4e4df78..4bf839756ecb 100644 --- a/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp +++ b/libs/core/async_cuda/include/hpx/async_cuda/transform_stream.hpp @@ -132,6 +132,9 @@ namespace hpx { namespace cuda { namespace experimental { template struct transform_stream_receiver { +#if defined(HPX_HAVE_STDEXEC) + using receiver_concept = hpx::execution::experimental::receiver_t; +#endif std::decay_t r; std::decay_t f; cudaStream_t stream; @@ -251,8 +254,34 @@ namespace hpx { namespace cuda { namespace experimental { HPX_MOVE(r), HPX_MOVE(ep)); }); } + +#if defined(HPX_HAVE_STDEXEC) + template + friend void tag_invoke(hpx::execution::experimental::set_value_t, + transform_stream_receiver&& r, Ts&&... ts) noexcept + { + // set_value is in a member function only because of a + // compiler bug in GCC 7. When the body of set_value is + // inlined here compilation fails with an internal compiler + // error. + r.set_value(HPX_FORWARD(Ts, ts)...); + } +#endif }; +#if !defined(HPX_HAVE_STDEXEC) + // This should be a hidden friend in transform_stream_receiver. However, + // nvcc does not know how to compile it with some argument types + // ("error: no instance of overloaded function std::forward matches the + // argument list"). + template + void tag_invoke(hpx::execution::experimental::set_value_t, + transform_stream_receiver&& r, Ts&&... ts) + { + r.set_value(HPX_FORWARD(Ts, ts)...); + } +#endif + template struct transform_stream_sender { @@ -260,12 +289,72 @@ namespace hpx { namespace cuda { namespace experimental { std::decay_t f; cudaStream_t stream{}; +#if defined(HPX_HAVE_STDEXEC) + using sender_concept = hpx::execution::experimental::sender_t; + + template + struct invoke_function_transformation_helper + { + template + struct set_value_void_checked + { + using type = hpx::execution::experimental::set_value_t(T); + }; + + template + struct set_value_void_checked + { + using type = hpx::execution::experimental::set_value_t(); + }; + + static_assert(hpx::is_invocable_v, + "F not invocable with the value_types specified."); + + using result_type = + hpx::util::invoke_result_t; + using set_value_result_type = + typename set_value_void_checked, + result_type>::type; + using type = + hpx::execution::experimental::completion_signatures< + set_value_result_type>; + }; + + template + using invoke_function_transformation = + invoke_function_transformation_helper::type; + + // clang-format off + template + friend auto tag_invoke( + hpx::execution::experimental::get_completion_signatures_t, + transform_stream_sender const&, Env const&) + -> hpx::execution::experimental::transform_completion_signatures_of< + S, Env, + hpx::execution::experimental::completion_signatures< + hpx::execution::experimental::set_error_t(std::exception_ptr) + >, + invoke_function_transformation + // stop and error channel will be forwarded if they are present. + >; + // clang-format on + + friend constexpr auto tag_invoke( + hpx::execution::experimental::get_env_t, + transform_stream_sender const& s) noexcept + { + return hpx::execution::experimental::get_env(s.s); + } +#else template struct invoke_result_helper; template