From 586451da8cbdf54e04a420725982c0d60de42356 Mon Sep 17 00:00:00 2001 From: Chuck Hastings <45364586+ChuckHastings@users.noreply.github.com> Date: Mon, 6 Nov 2023 10:28:05 -0500 Subject: [PATCH] MTMG multi node (#3932) This PR extends the MTMG framework to support a multi-node configuration. Multi-node configuration of MTMG assumes some externally available mechanism for communicating configuration parameters and the NCCL unique id between the processes that will participate. For testing purposes we assume a shared file system where we can create a directory and read/write configuration information rather than constructing a more complex communication mechanism (NFS is sufficient for testing). Closes https://github.com/rapidsai/graph_dl/issues/329 Authors: - Chuck Hastings (https://github.com/ChuckHastings) - Ralph Liu (https://github.com/nv-rliu) Approvers: - Joseph Nke (https://github.com/jnke2016) - Seunghwa Kang (https://github.com/seunghwak) URL: https://github.com/rapidsai/cugraph/pull/3932 --- cpp/include/cugraph/mtmg/handle.hpp | 8 - cpp/include/cugraph/mtmg/instance_manager.hpp | 27 +- cpp/include/cugraph/mtmg/resource_manager.hpp | 124 +++-- cpp/tests/CMakeLists.txt | 10 + cpp/tests/mtmg/multi_node_threaded_test.cu | 516 ++++++++++++++++++ 5 files changed, 626 insertions(+), 59 deletions(-) create mode 100644 cpp/tests/mtmg/multi_node_threaded_test.cu diff --git a/cpp/include/cugraph/mtmg/handle.hpp b/cpp/include/cugraph/mtmg/handle.hpp index efdec3f0775..6223de1781d 100644 --- a/cpp/include/cugraph/mtmg/handle.hpp +++ b/cpp/include/cugraph/mtmg/handle.hpp @@ -111,14 +111,6 @@ class handle_t { */ int get_size() const { return raft_handle_.get_comms().get_size(); } - /** - * @brief Get number of local gpus - * - * @return number of local gpus - */ - // FIXME: wrong for multi-node - int get_local_size() const { return raft_handle_.get_comms().get_size(); } - /** * @brief Get gpu rank * diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index 687c5ddbf02..f819a5a0abe 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -18,7 +18,7 @@ #include -#include +#include #include @@ -37,16 +37,27 @@ class instance_manager_t { */ instance_manager_t(std::vector>&& handles, std::vector>&& nccl_comms, - std::vector&& device_ids, - int local_gpu_count) + std::vector&& device_ids) : thread_counter_{0}, raft_handle_{std::move(handles)}, nccl_comms_{std::move(nccl_comms)}, - device_ids_{std::move(device_ids)}, - local_gpu_count_{local_gpu_count} + device_ids_{std::move(device_ids)} { } + ~instance_manager_t() + { + int current_device{}; + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + + for (size_t i = 0; i < nccl_comms_.size(); ++i) { + RAFT_CUDA_TRY(cudaSetDevice(device_ids_[i].value())); + RAFT_NCCL_TRY(ncclCommDestroy(*nccl_comms_[i])); + } + + RAFT_CUDA_TRY(cudaSetDevice(current_device)); + } + /** * @brief Get handle * @@ -79,7 +90,7 @@ class instance_manager_t { /** * @brief Number of local GPUs in the instance */ - int get_local_gpu_count() { return local_gpu_count_; } + int get_local_gpu_count() { return static_cast(raft_handle_.size()); } private: // FIXME: Should this be an std::map<> where the key is the rank? @@ -87,9 +98,11 @@ class instance_manager_t { // (or no) GPUs, so mapping rank to a handle might be a challenge // std::vector> raft_handle_{}; + + // FIXME: Explore what RAFT changes might be desired to allow the ncclComm_t + // to be managed by RAFT instead of cugraph::mtmg std::vector> nccl_comms_{}; std::vector device_ids_{}; - int local_gpu_count_{}; std::atomic thread_counter_{0}; }; diff --git a/cpp/include/cugraph/mtmg/resource_manager.hpp b/cpp/include/cugraph/mtmg/resource_manager.hpp index e1e1d7ffc9d..127944cf7ba 100644 --- a/cpp/include/cugraph/mtmg/resource_manager.hpp +++ b/cpp/include/cugraph/mtmg/resource_manager.hpp @@ -41,6 +41,11 @@ namespace mtmg { * register_local_gpu (or register_remote_gpu once we support a multi-node * configuration) to allocate resources that can be used in the mtmg space. * + * Each GPU in the cluster should be given a unique global rank, an integer + * that will be used to reference the GPU within the resource manager. It + * is recommended that the GPUs be numbered sequentially from 0, although this + * is not required. + * * When we want to execute some graph computations, we need to create an instance for execution. * Based on how big a subset of the desired compute resources is desired, we can allocate some * number of GPUs to the problem (up to the total set of managed resources). @@ -48,7 +53,7 @@ namespace mtmg { * The returned instance can be used to create a graph, execute one or more algorithms, etc. Once * we are done the caller can delete the instance. * - * At the moment, the caller is assumed to be responsible for scheduling use of the resources. + * The caller is assumed to be responsible for scheduling use of the resources. * * For our first release, we will only consider a single node multi-GPU configuration, so the remote * GPU methods are currently disabled via ifdef. @@ -63,25 +68,28 @@ class resource_manager_t { /** * @brief add a local GPU to the resource manager. * - * @param rank The rank to assign to the local GPU - * @param device_id The device_id corresponding to this rank + * @param global_rank The global rank to assign to the local GPU + * @param local_device_id The local device_id corresponding to this rank */ - void register_local_gpu(int rank, rmm::cuda_device_id device_id) + void register_local_gpu(int global_rank, rmm::cuda_device_id local_device_id) { std::lock_guard lock(lock_); - CUGRAPH_EXPECTS(local_rank_map_.find(rank) == local_rank_map_.end(), - "cannot register same rank multiple times"); + CUGRAPH_EXPECTS(remote_rank_set_.find(global_rank) == remote_rank_set_.end(), + "cannot register same global_rank as local and remote"); + CUGRAPH_EXPECTS(local_rank_map_.find(global_rank) == local_rank_map_.end(), + "cannot register same global_rank multiple times"); int num_gpus_this_node; RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_this_node)); - CUGRAPH_EXPECTS((device_id.value() >= 0) && (device_id.value() < num_gpus_this_node), - "device id out of range"); + CUGRAPH_EXPECTS( + (local_device_id.value() >= 0) && (local_device_id.value() < num_gpus_this_node), + "local device id out of range"); - local_rank_map_.insert(std::pair(rank, device_id)); + local_rank_map_.insert(std::pair(global_rank, local_device_id)); - RAFT_CUDA_TRY(cudaSetDevice(device_id.value())); + RAFT_CUDA_TRY(cudaSetDevice(local_device_id.value())); // FIXME: There is a bug in the cuda_memory_resource that results in a Hang. // using the pool resource as a work-around. @@ -89,23 +97,43 @@ class resource_manager_t { // There is a deprecated environment variable: NCCL_LAUNCH_MODE=GROUP // which should temporarily work around this problem. // + // Further NOTE: multi-node requires the NCCL_LAUNCH_MODE=GROUP feature + // to be enabled even with the pool memory resource. + // // Ultimately there should be some RMM parameters passed into this function // (or the constructor of the object) to configure this behavior #if 0 auto per_device_it = per_device_rmm_resources_.insert( - std::pair{rank, std::make_shared()}); + std::pair{global_rank, std::make_shared()}); #else auto const [free, total] = rmm::detail::available_device_memory(); auto const min_alloc = rmm::detail::align_down(std::min(free, total / 6), rmm::detail::CUDA_ALLOCATION_ALIGNMENT); auto per_device_it = per_device_rmm_resources_.insert( - std::pair{rank, + std::pair{global_rank, rmm::mr::make_owning_wrapper( std::make_shared(), min_alloc)}); #endif - rmm::mr::set_per_device_resource(device_id, per_device_it.first->second.get()); + rmm::mr::set_per_device_resource(local_device_id, per_device_it.first->second.get()); + } + + /** + * @brief add a remote GPU to the resource manager. + * + * @param global_rank The global rank to assign to the remote GPU + */ + void register_remote_gpu(int global_rank) + { + std::lock_guard lock(lock_); + + CUGRAPH_EXPECTS(local_rank_map_.find(global_rank) == local_rank_map_.end(), + "cannot register same global_rank as local and remote"); + CUGRAPH_EXPECTS(remote_rank_set_.find(global_rank) == remote_rank_set_.end(), + "cannot register same global_rank multiple times"); + + remote_rank_set_.insert(global_rank); } /** @@ -130,29 +158,36 @@ class resource_manager_t { ncclUniqueId instance_manager_id, size_t n_streams = 16) const { - std::for_each( - ranks_to_include.begin(), ranks_to_include.end(), [local_ranks = local_rank_map_](int rank) { - CUGRAPH_EXPECTS(local_ranks.find(rank) != local_ranks.end(), - "requesting inclusion of an invalid rank"); - }); + std::vector local_ranks_to_include; + + std::copy_if(ranks_to_include.begin(), + ranks_to_include.end(), + std::back_inserter(local_ranks_to_include), + [&local_ranks = local_rank_map_](int rank) { + return (local_ranks.find(rank) != local_ranks.end()); + }); + // FIXME: Explore what RAFT changes might be desired to allow the ncclComm_t + // to be managed by RAFT instead of cugraph::mtmg std::vector> nccl_comms{}; std::vector> handles{}; std::vector device_ids{}; - nccl_comms.reserve(ranks_to_include.size()); - handles.reserve(ranks_to_include.size()); - device_ids.reserve(ranks_to_include.size()); + nccl_comms.reserve(local_ranks_to_include.size()); + handles.reserve(local_ranks_to_include.size()); + device_ids.reserve(local_ranks_to_include.size()); - // FIXME: not quite right for multi-node auto gpu_row_comm_size = static_cast(sqrt(static_cast(ranks_to_include.size()))); while (ranks_to_include.size() % gpu_row_comm_size != 0) { --gpu_row_comm_size; } - // FIXME: not quite right for multi-node - for (size_t i = 0; i < ranks_to_include.size(); ++i) { - int rank = ranks_to_include[i]; + int current_device{}; + RAFT_CUDA_TRY(cudaGetDevice(¤t_device)); + RAFT_NCCL_TRY(ncclGroupStart()); + + for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { + int rank = local_ranks_to_include[i]; auto pos = local_rank_map_.find(rank); RAFT_CUDA_TRY(cudaSetDevice(pos->second.value())); @@ -162,26 +197,28 @@ class resource_manager_t { std::make_shared(n_streams), per_device_rmm_resources_.find(rank)->second)); device_ids.push_back(pos->second); + + RAFT_NCCL_TRY( + ncclCommInitRank(nccl_comms[i].get(), ranks_to_include.size(), instance_manager_id, rank)); + raft::comms::build_comms_nccl_only( + handles[i].get(), *nccl_comms[i], ranks_to_include.size(), rank); } + RAFT_NCCL_TRY(ncclGroupEnd()); + RAFT_CUDA_TRY(cudaSetDevice(current_device)); std::vector running_threads; - for (size_t i = 0; i < ranks_to_include.size(); ++i) { + for (size_t i = 0; i < local_ranks_to_include.size(); ++i) { running_threads.emplace_back([instance_manager_id, idx = i, gpu_row_comm_size, comm_size = ranks_to_include.size(), - &ranks_to_include, - &local_rank_map = local_rank_map_, + &local_ranks_to_include, + &device_ids, &nccl_comms, &handles]() { - int rank = ranks_to_include[idx]; - auto pos = local_rank_map.find(rank); - RAFT_CUDA_TRY(cudaSetDevice(pos->second.value())); - - NCCL_TRY(ncclCommInitRank(nccl_comms[idx].get(), comm_size, instance_manager_id, rank)); - - raft::comms::build_comms_nccl_only(handles[idx].get(), *nccl_comms[idx], comm_size, rank); + int rank = local_ranks_to_include[idx]; + RAFT_CUDA_TRY(cudaSetDevice(device_ids[idx].value())); cugraph::partition_manager::init_subcomm(*handles[idx], gpu_row_comm_size); }); @@ -189,9 +226,8 @@ class resource_manager_t { std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); - // FIXME: Update for multi-node return std::make_unique( - std::move(handles), std::move(nccl_comms), std::move(device_ids), ranks_to_include.size()); + std::move(handles), std::move(nccl_comms), std::move(device_ids)); } /** @@ -203,24 +239,24 @@ class resource_manager_t { { std::lock_guard lock(lock_); - // - // C++20 mechanism: - // return std::vector{ std::views::keys(local_rank_map_).begin(), - // std::views::keys(local_rank_map_).end() }; - // Would need a bit more complicated to handle remote_rank_map_ also - // - std::vector registered_ranks(local_rank_map_.size()); + std::vector registered_ranks(local_rank_map_.size() + remote_rank_set_.size()); std::transform( local_rank_map_.begin(), local_rank_map_.end(), registered_ranks.begin(), [](auto pair) { return pair.first; }); + std::copy(remote_rank_set_.begin(), + remote_rank_set_.end(), + registered_ranks.begin() + local_rank_map_.size()); + + std::sort(registered_ranks.begin(), registered_ranks.end()); return registered_ranks; } private: mutable std::mutex lock_{}; std::map local_rank_map_{}; + std::set remote_rank_set_{}; std::map> per_device_rmm_resources_{}; }; diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 6775ed2eb16..2f69cf9cb0d 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -680,6 +680,16 @@ if(BUILD_CUGRAPH_MG_TESTS) ConfigureCTestMG(MG_CAPI_TWO_HOP_NEIGHBORS_TEST c_api/mg_two_hop_neighbors_test.c) rapids_test_install_relocatable(INSTALL_COMPONENT_SET testing_mg DESTINATION bin/gtests/libcugraph_mg) + + ############################################################################################### + # - Multi-node MTMG tests --------------------------------------------------------------------- + ConfigureTest(MTMG_MULTINODE_TEST mtmg/multi_node_threaded_test.cu utilities/mg_utilities.cpp) + target_link_libraries(MTMG_MULTINODE_TEST + PRIVATE + cugraphmgtestutil + UCP::UCP + ) + endif() ################################################################################################### diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu new file mode 100644 index 00000000000..e5a7de07781 --- /dev/null +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -0,0 +1,516 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include + +#include +#include +#include + +#include +#include + +struct Multithreaded_Usecase { + bool test_weighted{false}; + bool check_correctness{true}; +}; + +// Global variable defining resource manager +static cugraph::mtmg::resource_manager_t g_resource_manager{}; +static int g_node_rank{-1}; +static int g_num_nodes{-1}; + +template +class Tests_Multithreaded + : public ::testing::TestWithParam> { + public: + Tests_Multithreaded() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + std::vector get_gpu_list() + { + int num_gpus_per_node{1}; + RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_per_node)); + + std::vector gpu_list(num_gpus_per_node); + std::iota(gpu_list.begin(), gpu_list.end(), 0); + + return gpu_list; + } + + template + void run_current_test( + std::tuple const& param, + std::vector gpu_list) + { + using edge_type_t = int32_t; + + constexpr bool renumber = true; + constexpr bool do_expensive_check = false; + + auto [multithreaded_usecase, input_usecase] = param; + + raft::handle_t handle{}; + + result_t constexpr alpha{0.85}; + result_t constexpr epsilon{1e-6}; + + size_t device_buffer_size{64 * 1024 * 1024}; + size_t thread_buffer_size{4 * 1024 * 1024}; + + int num_local_gpus = gpu_list.size(); + int num_threads = num_local_gpus * 4; + + ncclUniqueId instance_manager_id{}; + + if (g_node_rank == 0) RAFT_NCCL_TRY(ncclGetUniqueId(&instance_manager_id)); + + RAFT_MPI_TRY( + MPI_Bcast(&instance_manager_id, sizeof(instance_manager_id), MPI_CHAR, 0, MPI_COMM_WORLD)); + + auto instance_manager = g_resource_manager.create_instance_manager( + g_resource_manager.registered_ranks(), instance_manager_id); + + cugraph::mtmg::edgelist_t edgelist; + cugraph::mtmg::graph_t graph; + cugraph::mtmg::graph_view_t graph_view; + cugraph::mtmg::vertex_result_t pageranks; + std::optional> renumber_map = + std::make_optional>(); + + auto edge_weights = multithreaded_usecase.test_weighted + ? std::make_optional, + weight_t>>() + : std::nullopt; + + // + // Simulate graph creation by spawning threads to walk through the + // local COO and add edges + // + std::vector running_threads; + + // Initialize shared edgelist object, one per GPU + for (int i = 0; i < num_local_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &edgelist, + device_buffer_size, + use_weight = true, + use_edge_id = false, + use_edge_type = false]() { + auto thread_handle = instance_manager->get_handle(); + + edgelist.set(thread_handle, device_buffer_size, use_weight, use_edge_id, use_edge_type); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + // Load SG edge list + auto [d_src_v, d_dst_v, d_weights_v, d_vertices_v, is_symmetric] = + input_usecase.template construct_edgelist( + handle, multithreaded_usecase.test_weighted, false, false); + + auto h_src_v = cugraph::test::to_host(handle, d_src_v); + auto h_dst_v = cugraph::test::to_host(handle, d_dst_v); + auto h_weights_v = cugraph::test::to_host(handle, d_weights_v); + auto unique_vertices = cugraph::test::to_host(handle, d_vertices_v); + + // Load edgelist from different threads. We'll use more threads than GPUs here + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back([&instance_manager, + thread_buffer_size, + &edgelist, + &h_src_v, + &h_dst_v, + &h_weights_v, + starting_edge_offset = g_node_rank * num_threads + i, + stride = g_num_nodes * num_threads]() { + auto thread_handle = instance_manager->get_handle(); + cugraph::mtmg::per_thread_edgelist_t + per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); + + for (size_t j = starting_edge_offset; j < h_src_v.size(); j += stride) { + per_thread_edgelist.append( + thread_handle, + h_src_v[j], + h_dst_v[j], + h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, + std::nullopt, + std::nullopt); + } + + per_thread_edgelist.flush(thread_handle); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + for (int i = 0; i < num_local_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph, + &edge_weights, + &edgelist, + &renumber_map, + &pageranks, + &h_src_v, // debugging + is_symmetric = is_symmetric, + renumber, + do_expensive_check]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + std::optional, + edge_t>> + edge_ids{std::nullopt}; + std::optional, + int32_t>> + edge_types{std::nullopt}; + + edgelist.finalize_buffer(thread_handle); + edgelist.consolidate_and_shuffle(thread_handle, true); + + cugraph::mtmg:: + create_graph_from_edgelist( + thread_handle, + edgelist, + cugraph::graph_properties_t{is_symmetric, true}, + renumber, + graph, + edge_weights, + edge_ids, + edge_types, + renumber_map, + do_expensive_check); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + graph_view = graph.view(); + + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back( + [&instance_manager, &graph_view, &edge_weights, &pageranks, alpha, epsilon]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + auto [local_pageranks, metadata] = + cugraph::pagerank( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) + : std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + alpha, + epsilon, + 500, + true); + + pageranks.set(thread_handle, std::move(local_pageranks)); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + std::vector, std::vector>> computed_pageranks_v; + std::mutex computed_pageranks_lock{}; + + auto pageranks_view = pageranks.view(); + auto renumber_map_view = renumber_map ? std::make_optional(renumber_map->view()) : std::nullopt; + + // Load computed_pageranks from different threads. + for (int i = 0; i < num_local_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph_view, + &renumber_map_view, + &pageranks_view, + &computed_pageranks_lock, + &computed_pageranks_v, + &h_src_v, + &h_dst_v, + &h_weights_v, + &unique_vertices, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + + auto number_of_vertices = unique_vertices->size(); + + std::vector my_vertex_list; + my_vertex_list.reserve((number_of_vertices + num_threads - 1) / num_threads); + + for (size_t j = i; j < number_of_vertices; j += num_threads) { + my_vertex_list.push_back((*unique_vertices)[j]); + } + + rmm::device_uvector d_my_vertex_list(my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + raft::update_device(d_my_vertex_list.data(), + my_vertex_list.data(), + my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + + auto d_my_pageranks = pageranks_view.gather( + thread_handle, + raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, + graph_view, + renumber_map_view); + + std::vector my_pageranks(d_my_pageranks.size()); + raft::update_host(my_pageranks.data(), + d_my_pageranks.data(), + d_my_pageranks.size(), + thread_handle.raft_handle().get_stream()); + + { + std::lock_guard lock(computed_pageranks_lock); + computed_pageranks_v.push_back( + std::make_tuple(std::move(my_vertex_list), std::move(my_pageranks))); + } + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + if (multithreaded_usecase.check_correctness) { + // Want to compare the results in computed_pageranks_v with SG results + cugraph::graph_t sg_graph(handle); + std::optional< + cugraph::edge_property_t, weight_t>> + sg_edge_weights{std::nullopt}; + std::optional> sg_renumber_map{std::nullopt}; + + std::tie(sg_graph, sg_edge_weights, std::ignore, std::ignore, sg_renumber_map) = cugraph:: + create_graph_from_edgelist( + handle, + std::nullopt, + std::move(d_src_v), + std::move(d_dst_v), + std::move(d_weights_v), + std::nullopt, + std::nullopt, + cugraph::graph_properties_t{is_symmetric, true}, + true); + + auto [sg_pageranks, meta] = cugraph::pagerank( + handle, + sg_graph.view(), + sg_edge_weights ? std::make_optional(sg_edge_weights->view()) : std::nullopt, + std::nullopt, + std::nullopt, + std::nullopt, + alpha, + epsilon); + + auto h_sg_pageranks = cugraph::test::to_host(handle, sg_pageranks); + auto h_sg_renumber_map = cugraph::test::to_host(handle, sg_renumber_map); + auto compare_functor = cugraph::test::nearly_equal{ + weight_t{1e-3}, + weight_t{(weight_t{1} / static_cast(h_sg_pageranks.size())) * weight_t{1e-3}}}; + + std::for_each(computed_pageranks_v.begin(), + computed_pageranks_v.end(), + [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t1) { + std::for_each( + thrust::make_zip_iterator(std::get<0>(t1).begin(), std::get<1>(t1).begin()), + thrust::make_zip_iterator(std::get<0>(t1).end(), std::get<1>(t1).end()), + [h_sg_pageranks, compare_functor, h_sg_renumber_map](auto t2) { + vertex_t v = thrust::get<0>(t2); + weight_t pr = thrust::get<1>(t2); + + auto pos = + std::find(h_sg_renumber_map->begin(), h_sg_renumber_map->end(), v); + auto offset = std::distance(h_sg_renumber_map->begin(), pos); + + if (pos == h_sg_renumber_map->end()) { + ASSERT_TRUE(compare_functor(pr, weight_t{0})) + << "vertex " << v << ", SG result = " << h_sg_pageranks[offset] + << ", mtmg result = " << pr << ", not in renumber map"; + } else { + ASSERT_TRUE(compare_functor(pr, h_sg_pageranks[offset])) + << "vertex " << v << ", SG result = " << h_sg_pageranks[offset] + << ", mtmg result = " << pr + << ", renumber map = " << (*h_sg_renumber_map)[offset]; + } + }); + }); + } + } +}; + +using Tests_Multithreaded_File = Tests_Multithreaded; +using Tests_Multithreaded_Rmat = Tests_Multithreaded; + +// FIXME: add tests for type combinations +TEST_P(Tests_Multithreaded_File, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_File_Usecase_with_cmd_line_arguments(GetParam()), get_gpu_list()); +} + +TEST_P(Tests_Multithreaded_Rmat, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_Rmat_Usecase_with_cmd_line_arguments(GetParam()), get_gpu_list()); +} + +INSTANTIATE_TEST_SUITE_P(file_test, + Tests_Multithreaded_File, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{false, true}, + Multithreaded_Usecase{true, true}), + ::testing::Values(cugraph::test::File_Usecase("karate.csv"), + cugraph::test::File_Usecase("dolphins.csv")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_Multithreaded_Rmat, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{false, true}, Multithreaded_Usecase{true, true}), + ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +INSTANTIATE_TEST_SUITE_P( + file_benchmark_test, /* note that the test filename can be overridden in benchmarking (with + --gtest_filter to select only the file_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one File_Usecase that differ only in filename + (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_File, + ::testing::Combine( + // disable correctness checks + ::testing::Values(Multithreaded_Usecase{false, false}, Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_Rmat, + ::testing::Combine( + // disable correctness checks for large graphs + ::testing::Values(Multithreaded_Usecase{false, false}, Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +// +// Need to customize the test configuration to support multi-node comms not using MPI +// +int main(int argc, char** argv) +{ + cugraph::test::initialize_mpi(argc, argv); + auto comm_rank = cugraph::test::query_mpi_comm_world_rank(); + auto comm_size = cugraph::test::query_mpi_comm_world_size(); + + ::testing::InitGoogleTest(&argc, argv); + auto const cmd_opts = parse_test_options(argc, argv); + auto const rmm_mode = cmd_opts["rmm_mode"].as(); + auto resource = cugraph::test::create_memory_resource(rmm_mode); + rmm::mr::set_current_device_resource(resource.get()); + cugraph::test::g_perf = cmd_opts["perf"].as(); + cugraph::test::g_rmat_scale = (cmd_opts.count("rmat_scale") > 0) + ? std::make_optional(cmd_opts["rmat_scale"].as()) + : std::nullopt; + cugraph::test::g_rmat_edge_factor = + (cmd_opts.count("rmat_edge_factor") > 0) + ? std::make_optional(cmd_opts["rmat_edge_factor"].as()) + : std::nullopt; + cugraph::test::g_test_file_name = + (cmd_opts.count("test_file_name") > 0) + ? std::make_optional(cmd_opts["test_file_name"].as()) + : std::nullopt; + + // + // Set global values for the test. Need to know the rank of this process, + // the comm size, number of GPUs per node, and the NCCL Id for rank 0. + // + int num_gpus_this_node{-1}; + std::vector num_gpus_per_node{}; + + g_node_rank = comm_rank; + g_num_nodes = comm_size; + + num_gpus_per_node.resize(comm_size); + + RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_this_node)); + RAFT_MPI_TRY(MPI_Allgather( + &num_gpus_this_node, 1, MPI_INT, num_gpus_per_node.data(), 1, MPI_INT, MPI_COMM_WORLD)); + + int node_rank{0}; + + for (int i = 0; i < comm_size; ++i) { + for (int j = 0; j < num_gpus_per_node[i]; ++j) { + if (i != comm_rank) + g_resource_manager.register_remote_gpu(node_rank++); + else + g_resource_manager.register_local_gpu(node_rank++, rmm::cuda_device_id{j}); + } + } + + auto result = RUN_ALL_TESTS(); + cugraph::test::finalize_mpi(); + return result; +}