diff --git a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp index 8011146ee4f..7fd5bb726e6 100644 --- a/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp +++ b/cpp/include/cugraph/mtmg/detail/per_device_edgelist.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include // FIXME: Could use std::span once compiler supports C++20 @@ -58,6 +59,15 @@ class per_device_edgelist_t { per_device_edgelist_t& operator=(per_device_edgelist_t const&) = delete; per_device_edgelist_t& operator=(per_device_edgelist_t&&) = delete; + /** + * @brief Construct a new per device edgelist t object + * + * @param handle MTMG resource handle - used to identify GPU resources + * @param device_buffer_size Number of edges to store in each device buffer + * @param use_weight Whether or not the edgelist will have weights + * @param use_edge_id Whether or not the edgelist will have edge ids + * @param use_edge_type Whether or not the edgelist will have edge types + */ per_device_edgelist_t(cugraph::mtmg::handle_t const& handle, size_t device_buffer_size, bool use_weight, @@ -82,6 +92,11 @@ class per_device_edgelist_t { create_new_buffers(handle); } + /** + * @brief Move construct a new per device edgelist t object + * + * @param other Object to move into this instance + */ per_device_edgelist_t(per_device_edgelist_t&& other) : device_buffer_size_{other.device_buffer_size_}, current_pos_{other.current_pos_}, @@ -110,45 +125,72 @@ class per_device_edgelist_t { std::optional> edge_id, std::optional> edge_type) { - // FIXME: This lock guard could be on a smaller region, but it - // would require more careful coding. The raft::update_device - // calls could be done without the lock if we made a local - // of the values of *.back() and did an increment of current_pos_ - // while we hold the lock. - std::lock_guard lock(lock_); - - size_t count = src.size(); - size_t pos = 0; - - while (count > 0) { - size_t copy_count = std::min(count, (src_.back().size() - current_pos_)); - - raft::update_device( - src_.back().begin() + current_pos_, src.begin() + pos, copy_count, handle.get_stream()); - raft::update_device( - dst_.back().begin() + current_pos_, dst.begin() + pos, copy_count, handle.get_stream()); - if (wgt) - raft::update_device( - wgt_->back().begin() + current_pos_, wgt->begin() + pos, copy_count, handle.get_stream()); - if (edge_id) - raft::update_device(edge_id_->back().begin() + current_pos_, - edge_id->begin() + pos, - copy_count, - handle.get_stream()); - if (edge_type) - raft::update_device(edge_type_->back().begin() + current_pos_, - edge_type->begin() + pos, - copy_count, - handle.get_stream()); - - count -= copy_count; - pos += copy_count; - current_pos_ += copy_count; - - if (current_pos_ == src_.back().size()) { create_new_buffers(handle); } + std::vector> copy_positions; + + { + std::lock_guard lock(lock_); + + size_t count = src.size(); + size_t pos = 0; + + while (count > 0) { + size_t copy_count = std::min(count, (src_.back().size() - current_pos_)); + + copy_positions.push_back(std::make_tuple(src_.size() - 1, current_pos_, pos, copy_count)); + + count -= copy_count; + pos += copy_count; + current_pos_ += copy_count; + + if (current_pos_ == src_.back().size()) { create_new_buffers(handle); } + } } - handle.raft_handle().sync_stream(); + std::for_each(copy_positions.begin(), + copy_positions.end(), + [&handle, + &this_src = src_, + &src, + &this_dst = dst_, + &dst, + &this_wgt = wgt_, + &wgt, + &this_edge_id = edge_id_, + &edge_id, + &this_edge_type = edge_type_, + &edge_type](auto tuple) { + auto [buffer_idx, buffer_pos, input_pos, copy_count] = tuple; + + raft::update_device(this_src[buffer_idx].begin() + buffer_pos, + src.begin() + input_pos, + copy_count, + handle.get_stream()); + + raft::update_device(this_dst[buffer_idx].begin() + buffer_pos, + dst.begin() + input_pos, + copy_count, + handle.get_stream()); + + if (this_wgt) + raft::update_device((*this_wgt)[buffer_idx].begin() + buffer_pos, + wgt->begin() + input_pos, + copy_count, + handle.get_stream()); + + if (this_edge_id) + raft::update_device((*this_edge_id)[buffer_idx].begin() + buffer_pos, + edge_id->begin() + input_pos, + copy_count, + handle.get_stream()); + + if (this_edge_type) + raft::update_device((*this_edge_type)[buffer_idx].begin() + buffer_pos, + edge_type->begin() + input_pos, + copy_count, + handle.get_stream()); + }); + + handle.sync_stream(); } /** diff --git a/cpp/include/cugraph/mtmg/handle.hpp b/cpp/include/cugraph/mtmg/handle.hpp index f23bce5aeac..efdec3f0775 100644 --- a/cpp/include/cugraph/mtmg/handle.hpp +++ b/cpp/include/cugraph/mtmg/handle.hpp @@ -18,6 +18,8 @@ #include +#include + namespace cugraph { namespace mtmg { @@ -60,10 +62,41 @@ class handle_t { rmm::cuda_stream_view get_stream() const { return raft_handle_.is_stream_pool_initialized() - ? raft_handle_.get_stream_from_stream_pool(device_id_) + ? raft_handle_.get_stream_from_stream_pool(thread_rank_) : raft_handle_.get_stream(); } + /** + * @brief Sync on the cuda stream + * + * @param stream Which stream to synchronize (defaults to the stream for this handle) + */ + void sync_stream(rmm::cuda_stream_view stream) const { raft_handle_.sync_stream(stream); } + + /** + * @brief Sync on the cuda stream for this handle + */ + void sync_stream() const { sync_stream(get_stream()); } + + /** + * @brief get thrust policy for the stream + * + * @param stream Which stream to use for this thrust call + * + * @return exec policy using the current stream + */ + rmm::exec_policy get_thrust_policy(rmm::cuda_stream_view stream) const + { + return rmm::exec_policy(stream); + } + + /** + * @brief get thrust policy for the stream for this handle + * + * @return exec policy using the current stream + */ + rmm::exec_policy get_thrust_policy() const { return get_thrust_policy(get_stream()); } + /** * @brief Get thread rank * diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index 8bf62b56f4b..687c5ddbf02 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -54,18 +54,18 @@ class instance_manager_t { * the request. Threads will be assigned to GPUs in a round-robin fashion to * spread requesting threads around the GPU resources. * - * This function will be CPU thread-safe. + * This function is CPU thread-safe. * * @return a handle for this thread. */ handle_t get_handle() { - int local_id = thread_counter_++; + int local_id = thread_counter_++; + int gpu_id = local_id % raft_handle_.size(); + int thread_id = local_id / raft_handle_.size(); - RAFT_CUDA_TRY(cudaSetDevice(device_ids_[local_id % raft_handle_.size()].value())); - return handle_t(*raft_handle_[local_id % raft_handle_.size()], - local_id / raft_handle_.size(), - static_cast(local_id % raft_handle_.size())); + RAFT_CUDA_TRY(cudaSetDevice(device_ids_[gpu_id].value())); + return handle_t(*raft_handle_[gpu_id], thread_id, static_cast(gpu_id)); } /** diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index b4633626e7c..e1e1d7ffc9d 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -23,8 +23,8 @@ #include #include -#include #include +#include #include #include @@ -121,11 +121,14 @@ class resource_manager_t { * @param instance_manager_id a ncclUniqueId that is shared by all processes participating * in this instance. All processes must use the same ID in this call, it is up * to the calling code to share this ID properly before the call. + * @param n_streams The number of streams to create in a stream pool for + * each GPU. Defaults to 16. * * @return unique pointer to instance manager */ - std::unique_ptr create_instance_manager( - std::vector ranks_to_include, ncclUniqueId instance_manager_id) const + std::unique_ptr create_instance_manager(std::vector ranks_to_include, + ncclUniqueId instance_manager_id, + size_t n_streams = 16) const { std::for_each( ranks_to_include.begin(), ranks_to_include.end(), [local_ranks = local_rank_map_](int rank) { @@ -153,11 +156,11 @@ class resource_manager_t { auto pos = local_rank_map_.find(rank); RAFT_CUDA_TRY(cudaSetDevice(pos->second.value())); - raft::handle_t tmp_handle; - nccl_comms.push_back(std::make_unique()); handles.push_back( - std::make_unique(tmp_handle, per_device_rmm_resources_.find(rank)->second)); + std::make_unique(rmm::cuda_stream_per_thread, + std::make_shared(n_streams), + per_device_rmm_resources_.find(rank)->second)); device_ids.push_back(pos->second); } diff --git a/cpp/include/cugraph/mtmg/vertex_result_view.hpp b/cpp/include/cugraph/mtmg/vertex_result_view.hpp index 7a7070d6f2a..a349bb95333 100644 --- a/cpp/include/cugraph/mtmg/vertex_result_view.hpp +++ b/cpp/include/cugraph/mtmg/vertex_result_view.hpp @@ -21,6 +21,8 @@ #include #include +#include + namespace cugraph { namespace mtmg { diff --git a/cpp/src/mtmg/vertex_result.cu b/cpp/src/mtmg/vertex_result.cu index a669a127f41..97fcd291c87 100644 --- a/cpp/src/mtmg/vertex_result.cu +++ b/cpp/src/mtmg/vertex_result.cu @@ -97,7 +97,7 @@ rmm::device_uvector vertex_result_view_t::gather( return vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v); }); - thrust::gather(handle.raft_handle().get_thrust_policy(), + thrust::gather(handle.get_thrust_policy(), iter, iter + local_vertices.size(), wrapped.begin(), @@ -118,7 +118,7 @@ rmm::device_uvector vertex_result_view_t::gather( // // Finally, reorder result // - thrust::scatter(handle.raft_handle().get_thrust_policy(), + thrust::scatter(handle.get_thrust_policy(), tmp_result.begin(), tmp_result.end(), vertex_pos.begin(), diff --git a/cpp/tests/mtmg/threaded_test.cu b/cpp/tests/mtmg/threaded_test.cu index c5dc2d3c7ce..bc4d8cfef6a 100644 --- a/cpp/tests/mtmg/threaded_test.cu +++ b/cpp/tests/mtmg/threaded_test.cu @@ -94,8 +94,9 @@ class Tests_Multithreaded size_t device_buffer_size{64 * 1024 * 1024}; size_t thread_buffer_size{4 * 1024 * 1024}; + const int num_threads_per_gpu{4}; int num_gpus = gpu_list.size(); - int num_threads = num_gpus * 4; + int num_threads = num_gpus * num_threads_per_gpu; cugraph::mtmg::resource_manager_t resource_manager; @@ -106,8 +107,10 @@ class Tests_Multithreaded ncclUniqueId instance_manager_id; ncclGetUniqueId(&instance_manager_id); + // Currently the only uses for multiple streams for each CPU threads + // associated with a particular GPU, which is a constant set above auto instance_manager = resource_manager.create_instance_manager( - resource_manager.registered_ranks(), instance_manager_id); + resource_manager.registered_ranks(), instance_manager_id, num_threads_per_gpu); cugraph::mtmg::edgelist_t edgelist; cugraph::mtmg::graph_t graph; @@ -172,15 +175,6 @@ class Tests_Multithreaded per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); for (size_t j = i; j < h_src_v.size(); j += num_threads) { -#if 0 - if (h_weights_v) { - thread_edgelist.append( - thread_handle, h_src_v[j], h_dst_v[j], (*h_weights_v)[j], std::nullopt, std::nullopt); - } else { - thread_edgelist.append( - thread_handle, h_src_v[j], h_dst_v[j], std::nullopt, std::nullopt, std::nullopt); - } -#endif per_thread_edgelist.append( thread_handle, h_src_v[j],