From f80480dae7aca9c419a7cdf9c6e90a77e056695f Mon Sep 17 00:00:00 2001 From: Chuck Hastings <45364586+ChuckHastings@users.noreply.github.com> Date: Wed, 6 Dec 2023 11:29:47 -0500 Subject: [PATCH 1/2] Add support for Louvain to MTMG (#4033) Added vertex result instantiation for `vertex_t` which is necessary to handle Louvain results. Added an MTMG test for Louvain to demonstrate how to use Louvain in MTMG. Closes https://github.com/rapidsai/graph_dl/issues/330 Authors: - Chuck Hastings (https://github.com/ChuckHastings) Approvers: - Seunghwa Kang (https://github.com/seunghwak) - Joseph Nke (https://github.com/jnke2016) - Naim (https://github.com/naimnv) URL: https://github.com/rapidsai/cugraph/pull/4033 --- .../mtmg/detail/device_shared_wrapper.hpp | 1 - cpp/include/cugraph/mtmg/graph_view.hpp | 23 +- .../cugraph/mtmg/vertex_result_view.hpp | 5 +- cpp/src/mtmg/vertex_result.cu | 89 ++- cpp/tests/CMakeLists.txt | 13 +- cpp/tests/mtmg/multi_node_threaded_test.cu | 3 +- cpp/tests/mtmg/threaded_test.cu | 3 +- cpp/tests/mtmg/threaded_test_louvain.cu | 511 ++++++++++++++++++ 8 files changed, 617 insertions(+), 31 deletions(-) create mode 100644 cpp/tests/mtmg/threaded_test_louvain.cu diff --git a/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp b/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp index 3e4b2513a8d..5fbe7bc9f01 100644 --- a/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp +++ b/cpp/include/cugraph/mtmg/detail/device_shared_wrapper.hpp @@ -79,7 +79,6 @@ class device_shared_wrapper_t { objects_.insert(std::make_pair(local_rank, std::move(obj))); } - public: /** * @brief Get reference to an object for a particular thread * diff --git a/cpp/include/cugraph/mtmg/graph_view.hpp b/cpp/include/cugraph/mtmg/graph_view.hpp index 94347e016ea..8e202ab4904 100644 --- a/cpp/include/cugraph/mtmg/graph_view.hpp +++ b/cpp/include/cugraph/mtmg/graph_view.hpp @@ -27,8 +27,27 @@ namespace mtmg { * @brief Graph view for each GPU */ template -using graph_view_t = detail::device_shared_wrapper_t< - cugraph::graph_view_t>; +class graph_view_t : public detail::device_shared_wrapper_t< + cugraph::graph_view_t> { + public: + /** + * @brief Get the vertex_partition_view for this graph + */ + vertex_partition_view_t get_vertex_partition_view( + cugraph::mtmg::handle_t const& handle) const + { + return this->get(handle).local_vertex_partition_view(); + } + + /** + * @brief Get the vertex_partition_view for this graph + */ + std::vector get_vertex_partition_range_lasts( + cugraph::mtmg::handle_t const& handle) const + { + return this->get(handle).vertex_partition_range_lasts(); + } +}; } // namespace mtmg } // namespace cugraph diff --git a/cpp/include/cugraph/mtmg/vertex_result_view.hpp b/cpp/include/cugraph/mtmg/vertex_result_view.hpp index a349bb95333..42b80cea62f 100644 --- a/cpp/include/cugraph/mtmg/vertex_result_view.hpp +++ b/cpp/include/cugraph/mtmg/vertex_result_view.hpp @@ -39,11 +39,12 @@ class vertex_result_view_t : public detail::device_shared_device_span_t + template rmm::device_uvector gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + cugraph::vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); }; diff --git a/cpp/src/mtmg/vertex_result.cu b/cpp/src/mtmg/vertex_result.cu index 97fcd291c87..5b1825656ff 100644 --- a/cpp/src/mtmg/vertex_result.cu +++ b/cpp/src/mtmg/vertex_result.cu @@ -27,15 +27,14 @@ namespace cugraph { namespace mtmg { template -template +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view) { - auto this_gpu_graph_view = graph_view.get(handle); - rmm::device_uvector local_vertices(vertices.size(), handle.get_stream()); rmm::device_uvector vertex_gpu_ids(vertices.size(), handle.get_stream()); rmm::device_uvector vertex_pos(vertices.size(), handle.get_stream()); @@ -47,11 +46,11 @@ rmm::device_uvector vertex_result_view_t::gather( cugraph::detail::sequence_fill( handle.get_stream(), vertex_pos.data(), vertex_pos.size(), size_t{0}); - rmm::device_uvector d_vertex_partition_range_lasts( - this_gpu_graph_view.vertex_partition_range_lasts().size(), handle.get_stream()); + rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), + handle.get_stream()); raft::update_device(d_vertex_partition_range_lasts.data(), - this_gpu_graph_view.vertex_partition_range_lasts().data(), - this_gpu_graph_view.vertex_partition_range_lasts().size(), + vertex_partition_range_lasts.data(), + vertex_partition_range_lasts.size(), handle.get_stream()); if (renumber_map_view) { @@ -60,8 +59,8 @@ rmm::device_uvector vertex_result_view_t::gather( local_vertices.data(), local_vertices.size(), renumber_map_view->get(handle).data(), - this_gpu_graph_view.local_vertex_partition_range_first(), - this_gpu_graph_view.local_vertex_partition_range_last()); + vertex_partition_view.local_vertex_partition_range_first(), + vertex_partition_view.local_vertex_partition_range_last()); } auto const major_comm_size = @@ -89,8 +88,8 @@ rmm::device_uvector vertex_result_view_t::gather( auto& wrapped = this->get(handle); - auto vertex_partition = vertex_partition_device_view_t( - this_gpu_graph_view.local_vertex_partition_view()); + auto vertex_partition = + vertex_partition_device_view_t(vertex_partition_view); auto iter = thrust::make_transform_iterator(local_vertices.begin(), [vertex_partition] __device__(auto v) { @@ -130,37 +129,85 @@ rmm::device_uvector vertex_result_view_t::gather( template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); -template rmm::device_uvector vertex_result_view_t::gather( +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); -template rmm::device_uvector vertex_result_view_t::gather( +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); -template rmm::device_uvector vertex_result_view_t::gather( +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( + handle_t const& handle, + raft::device_span vertices, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, + std::optional>& renumber_map_view); + +template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, - cugraph::mtmg::graph_view_t const& graph_view, + std::vector const& vertex_partition_range_lasts, + vertex_partition_view_t vertex_partition_view, std::optional>& renumber_map_view); } // namespace mtmg diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 6530a25d178..d9c88bc179e 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -738,9 +738,16 @@ if (BUILD_CUGRAPH_MTMG_TESTS) # - MTMG tests ------------------------------------------------------------------------- ConfigureTest(MTMG_TEST mtmg/threaded_test.cu) target_link_libraries(MTMG_TEST - PRIVATE - UCP::UCP - ) + PRIVATE + UCP::UCP + ) + + ConfigureTest(MTMG_LOUVAIN_TEST mtmg/threaded_test_louvain.cu) + target_link_libraries(MTMG_LOUVAIN_TEST + PRIVATE + cugraphmgtestutil + UCP::UCP + ) if(BUILD_CUGRAPH_MG_TESTS) ############################################################################################### diff --git a/cpp/tests/mtmg/multi_node_threaded_test.cu b/cpp/tests/mtmg/multi_node_threaded_test.cu index e5a7de07781..17aed4fdecf 100644 --- a/cpp/tests/mtmg/multi_node_threaded_test.cu +++ b/cpp/tests/mtmg/multi_node_threaded_test.cu @@ -311,7 +311,8 @@ class Tests_Multithreaded auto d_my_pageranks = pageranks_view.gather( thread_handle, raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, - graph_view, + graph_view.get_vertex_partition_range_lasts(thread_handle), + graph_view.get_vertex_partition_view(thread_handle), renumber_map_view); std::vector my_pageranks(d_my_pageranks.size()); diff --git a/cpp/tests/mtmg/threaded_test.cu b/cpp/tests/mtmg/threaded_test.cu index 1a6a17eaa18..a5df0199cac 100644 --- a/cpp/tests/mtmg/threaded_test.cu +++ b/cpp/tests/mtmg/threaded_test.cu @@ -327,7 +327,8 @@ class Tests_Multithreaded auto d_my_pageranks = pageranks_view.gather( thread_handle, raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, - graph_view, + graph_view.get_vertex_partition_range_lasts(thread_handle), + graph_view.get_vertex_partition_view(thread_handle), renumber_map_view); std::vector my_pageranks(d_my_pageranks.size()); diff --git a/cpp/tests/mtmg/threaded_test_louvain.cu b/cpp/tests/mtmg/threaded_test_louvain.cu new file mode 100644 index 00000000000..c1395037646 --- /dev/null +++ b/cpp/tests/mtmg/threaded_test_louvain.cu @@ -0,0 +1,511 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include + +#include + +#include + +#include +#include + +struct Multithreaded_Usecase { + bool test_weighted{false}; + bool check_correctness{true}; +}; + +template +class Tests_Multithreaded + : public ::testing::TestWithParam> { + public: + Tests_Multithreaded() {} + + static void SetUpTestCase() {} + static void TearDownTestCase() {} + + virtual void SetUp() {} + virtual void TearDown() {} + + std::vector get_gpu_list() + { + int num_gpus_per_node{1}; + RAFT_CUDA_TRY(cudaGetDeviceCount(&num_gpus_per_node)); + + std::vector gpu_list(num_gpus_per_node); + std::iota(gpu_list.begin(), gpu_list.end(), 0); + + return gpu_list; + } + + template + void run_current_test( + std::tuple const& param, + std::vector gpu_list) + { + using edge_type_t = int32_t; + + constexpr bool renumber = true; + constexpr bool do_expensive_check = false; + + auto [multithreaded_usecase, input_usecase] = param; + + raft::handle_t handle{}; + + size_t max_level{1}; // Louvain is non-deterministic in MG if max_leve > 1 + weight_t threshold{1e-6}; + weight_t resolution{1}; + + size_t device_buffer_size{64 * 1024 * 1024}; + size_t thread_buffer_size{4 * 1024 * 1024}; + + int num_gpus = gpu_list.size(); + int num_threads = num_gpus * 4; + + cugraph::mtmg::resource_manager_t resource_manager; + + std::for_each(gpu_list.begin(), gpu_list.end(), [&resource_manager](int gpu_id) { + resource_manager.register_local_gpu(gpu_id, rmm::cuda_device_id{gpu_id}); + }); + + ncclUniqueId instance_manager_id; + ncclGetUniqueId(&instance_manager_id); + + auto instance_manager = resource_manager.create_instance_manager( + resource_manager.registered_ranks(), instance_manager_id); + + cugraph::mtmg::edgelist_t edgelist; + cugraph::mtmg::graph_t graph; + cugraph::mtmg::graph_view_t graph_view; + cugraph::mtmg::vertex_result_t louvain_clusters; + std::optional> renumber_map = + std::make_optional>(); + + auto edge_weights = multithreaded_usecase.test_weighted + ? std::make_optional, + weight_t>>() + : std::nullopt; + + // + // Simulate graph creation by spawning threads to walk through the + // local COO and add edges + // + std::vector running_threads; + + // Initialize shared edgelist object, one per GPU + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &edgelist, + device_buffer_size, + use_weight = true, + use_edge_id = false, + use_edge_type = false]() { + auto thread_handle = instance_manager->get_handle(); + + edgelist.set(thread_handle, device_buffer_size, use_weight, use_edge_id, use_edge_type); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + // Load SG edge list + auto [d_src_v, d_dst_v, d_weights_v, d_vertices_v, is_symmetric] = + input_usecase.template construct_edgelist( + handle, multithreaded_usecase.test_weighted, false, false); + + rmm::device_uvector d_unique_vertices(2 * d_src_v.size(), handle.get_stream()); + thrust::copy( + handle.get_thrust_policy(), d_src_v.begin(), d_src_v.end(), d_unique_vertices.begin()); + thrust::copy(handle.get_thrust_policy(), + d_dst_v.begin(), + d_dst_v.end(), + d_unique_vertices.begin() + d_src_v.size()); + thrust::sort(handle.get_thrust_policy(), d_unique_vertices.begin(), d_unique_vertices.end()); + + d_unique_vertices.resize(thrust::distance(d_unique_vertices.begin(), + thrust::unique(handle.get_thrust_policy(), + d_unique_vertices.begin(), + d_unique_vertices.end())), + handle.get_stream()); + + auto h_src_v = cugraph::test::to_host(handle, d_src_v); + auto h_dst_v = cugraph::test::to_host(handle, d_dst_v); + auto h_weights_v = cugraph::test::to_host(handle, d_weights_v); + auto unique_vertices = cugraph::test::to_host(handle, d_unique_vertices); + + // Load edgelist from different threads. We'll use more threads than GPUs here + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back([&instance_manager, + thread_buffer_size, + &edgelist, + &h_src_v, + &h_dst_v, + &h_weights_v, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + cugraph::mtmg::per_thread_edgelist_t + per_thread_edgelist(edgelist.get(thread_handle), thread_buffer_size); + + for (size_t j = i; j < h_src_v.size(); j += num_threads) { + per_thread_edgelist.append( + thread_handle, + h_src_v[j], + h_dst_v[j], + h_weights_v ? std::make_optional((*h_weights_v)[j]) : std::nullopt, + std::nullopt, + std::nullopt); + } + + per_thread_edgelist.flush(thread_handle); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph, + &edge_weights, + &edgelist, + &renumber_map, + is_symmetric = is_symmetric, + renumber, + do_expensive_check]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + std::optional, + edge_t>> + edge_ids{std::nullopt}; + std::optional, + int32_t>> + edge_types{std::nullopt}; + + edgelist.finalize_buffer(thread_handle); + edgelist.consolidate_and_shuffle(thread_handle, false); + + cugraph::mtmg:: + create_graph_from_edgelist( + thread_handle, + edgelist, + cugraph::graph_properties_t{is_symmetric, true}, + renumber, + graph, + edge_weights, + edge_ids, + edge_types, + renumber_map, + do_expensive_check); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + graph_view = graph.view(); + auto renumber_map_view = renumber_map ? std::make_optional(renumber_map->view()) : std::nullopt; + + weight_t modularity{0}; + + for (int i = 0; i < num_threads; ++i) { + running_threads.emplace_back([&instance_manager, + &graph_view, + &edge_weights, + &louvain_clusters, + &modularity, + &renumber_map, + max_level, + threshold, + resolution]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_thread_rank() > 0) return; + + rmm::device_uvector local_louvain_clusters( + graph_view.get(thread_handle).local_vertex_partition_range_size(), + thread_handle.get_stream()); + + std::tie(std::ignore, modularity) = cugraph::louvain( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) : std::nullopt, + local_louvain_clusters.data(), + max_level, + threshold, + resolution); + + louvain_clusters.set(thread_handle, std::move(local_louvain_clusters)); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + std::vector, std::vector>> computed_clusters_v; + std::mutex computed_clusters_lock{}; + + auto louvain_clusters_view = louvain_clusters.view(); + std::vector h_renumber_map; + + // Load computed_clusters_v from different threads. + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back([&instance_manager, + &graph_view, + &renumber_map_view, + &louvain_clusters_view, + &computed_clusters_lock, + &computed_clusters_v, + &h_src_v, + &h_dst_v, + &h_weights_v, + &h_renumber_map, + &unique_vertices, + i, + num_threads]() { + auto thread_handle = instance_manager->get_handle(); + + auto number_of_vertices = unique_vertices.size(); + + std::vector my_vertex_list; + my_vertex_list.reserve((number_of_vertices + num_threads - 1) / num_threads); + + for (size_t j = i; j < number_of_vertices; j += num_threads) { + my_vertex_list.push_back(unique_vertices[j]); + } + + rmm::device_uvector d_my_vertex_list(my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + raft::update_device(d_my_vertex_list.data(), + my_vertex_list.data(), + my_vertex_list.size(), + thread_handle.raft_handle().get_stream()); + + auto d_my_clusters = louvain_clusters_view.gather( + thread_handle, + raft::device_span{d_my_vertex_list.data(), d_my_vertex_list.size()}, + graph_view.get_vertex_partition_range_lasts(thread_handle), + graph_view.get_vertex_partition_view(thread_handle), + renumber_map_view); + + std::vector my_clusters(d_my_clusters.size()); + raft::update_host(my_clusters.data(), + d_my_clusters.data(), + d_my_clusters.size(), + thread_handle.raft_handle().get_stream()); + + { + std::lock_guard lock(computed_clusters_lock); + computed_clusters_v.push_back( + std::make_tuple(std::move(my_vertex_list), std::move(my_clusters))); + } + + h_renumber_map = cugraph::test::to_host( + thread_handle.raft_handle(), + cugraph::test::device_allgatherv(thread_handle.raft_handle(), + renumber_map_view->get(thread_handle))); + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + if (multithreaded_usecase.check_correctness) { + // Want to compare the results in computed_clusters_v with SG results + cugraph::graph_t sg_graph(handle); + std::optional< + cugraph::edge_property_t, weight_t>> + sg_edge_weights{std::nullopt}; + + for (int i = 0; i < num_gpus; ++i) { + running_threads.emplace_back( + [&instance_manager, &graph_view, &edge_weights, &sg_graph, &sg_edge_weights]() { + auto thread_handle = instance_manager->get_handle(); + + if (thread_handle.get_rank() == 0) { + std::tie(sg_graph, sg_edge_weights, std::ignore) = + cugraph::test::mg_graph_to_sg_graph( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) + : std::nullopt, + std::optional>{std::nullopt}, + false); // create an SG graph with MG graph vertex IDs + } else { + cugraph::test::mg_graph_to_sg_graph( + thread_handle.raft_handle(), + graph_view.get(thread_handle), + edge_weights ? std::make_optional(edge_weights->get(thread_handle).view()) + : std::nullopt, + std::optional>{std::nullopt}, + false); // create an SG graph with MG graph vertex IDs + } + }); + } + + // Wait for CPU threads to complete + std::for_each(running_threads.begin(), running_threads.end(), [](auto& t) { t.join(); }); + running_threads.resize(0); + instance_manager->reset_threads(); + + rmm::device_uvector sg_clusters(sg_graph.number_of_vertices(), handle.get_stream()); + weight_t modularity; + + std::tie(std::ignore, modularity) = cugraph::louvain( + handle, + sg_graph.view(), + sg_edge_weights ? std::make_optional(sg_edge_weights->view()) : std::nullopt, + sg_clusters.data(), + max_level, + threshold, + resolution); + + auto h_sg_clusters = cugraph::test::to_host(handle, sg_clusters); + std::map h_cluster_map; + std::map h_cluster_reverse_map; + + std::for_each( + computed_clusters_v.begin(), + computed_clusters_v.end(), + [&h_sg_clusters, &h_cluster_map, &h_renumber_map, &h_cluster_reverse_map](auto t1) { + std::for_each( + thrust::make_zip_iterator(std::get<0>(t1).begin(), std::get<1>(t1).begin()), + thrust::make_zip_iterator(std::get<0>(t1).end(), std::get<1>(t1).end()), + [&h_sg_clusters, &h_cluster_map, &h_renumber_map, &h_cluster_reverse_map](auto t2) { + vertex_t v = thrust::get<0>(t2); + vertex_t c = thrust::get<1>(t2); + + auto pos = std::find(h_renumber_map.begin(), h_renumber_map.end(), v); + auto offset = std::distance(h_renumber_map.begin(), pos); + + auto cluster_pos = h_cluster_map.find(c); + if (cluster_pos == h_cluster_map.end()) { + auto reverse_pos = h_cluster_reverse_map.find(h_sg_clusters[offset]); + + ASSERT_TRUE(reverse_pos != h_cluster_map.end()) << "two different cluster mappings"; + + h_cluster_map.insert(std::make_pair(c, h_sg_clusters[offset])); + h_cluster_reverse_map.insert(std::make_pair(h_sg_clusters[offset], c)); + } else { + ASSERT_EQ(cluster_pos->second, h_sg_clusters[offset]) + << "vertex " << v << ", offset = " << offset + << ", SG cluster = " << h_sg_clusters[offset] << ", mtmg cluster = " << c + << ", mapped value = " << cluster_pos->second; + } + }); + }); + } + } +}; + +using Tests_Multithreaded_File = Tests_Multithreaded; +using Tests_Multithreaded_Rmat = Tests_Multithreaded; + +// FIXME: add tests for type combinations +TEST_P(Tests_Multithreaded_File, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_File_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); +} + +TEST_P(Tests_Multithreaded_Rmat, CheckInt32Int32FloatFloat) +{ + run_current_test( + override_Rmat_Usecase_with_cmd_line_arguments(GetParam()), std::vector{{0, 1}}); +} + +INSTANTIATE_TEST_SUITE_P(file_test, + Tests_Multithreaded_File, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{true, true}), + ::testing::Values(cugraph::test::File_Usecase("karate.csv"), + cugraph::test::File_Usecase("dolphins.csv")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_small_test, + Tests_Multithreaded_Rmat, + ::testing::Combine( + // enable correctness checks + ::testing::Values(Multithreaded_Usecase{true, true}), + //::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + ::testing::Values(cugraph::test::Rmat_Usecase(5, 8, 0.57, 0.19, 0.19, 0, false, false)))); + +INSTANTIATE_TEST_SUITE_P( + file_benchmark_test, /* note that the test filename can be overridden in benchmarking (with + --gtest_filter to select only the file_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one File_Usecase that differ only in filename + (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_File, + ::testing::Combine( + // disable correctness checks + ::testing::Values(Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::File_Usecase("test/datasets/karate.mtx")))); + +INSTANTIATE_TEST_SUITE_P( + rmat_benchmark_test, /* note that scale & edge factor can be overridden in benchmarking (with + --gtest_filter to select only the rmat_benchmark_test with a specific + vertex & edge type combination) by command line arguments and do not + include more than one Rmat_Usecase that differ only in scale or edge + factor (to avoid running same benchmarks more than once) */ + Tests_Multithreaded_Rmat, + ::testing::Combine( + // disable correctness checks for large graphs + ::testing::Values(Multithreaded_Usecase{true, false}), + ::testing::Values(cugraph::test::Rmat_Usecase(10, 16, 0.57, 0.19, 0.19, 0, false, false)))); + +CUGRAPH_TEST_PROGRAM_MAIN() From a5718c66aa5e72a7d91b4b3a073736f195352736 Mon Sep 17 00:00:00 2001 From: Vibhu Jawa Date: Wed, 6 Dec 2023 08:52:39 -0800 Subject: [PATCH 2/2] Add a barrier before cugraph Graph creation (#4046) This PR introduces a short term fix for https://github.com/rapidsai/cugraph/issues/4037 . CC: @jnke2016 , @rlratzel Authors: - Vibhu Jawa (https://github.com/VibhuJawa) Approvers: - Rick Ratzel (https://github.com/rlratzel) - Joseph Nke (https://github.com/jnke2016) URL: https://github.com/rapidsai/cugraph/pull/4046 --- .../cugraph/cugraph/dask/common/mg_utils.py | 7 +++++- .../simpleDistributedGraph.py | 22 +++++++++---------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/python/cugraph/cugraph/dask/common/mg_utils.py b/python/cugraph/cugraph/dask/common/mg_utils.py index 6acda48c9da..b04f293dc0e 100644 --- a/python/cugraph/cugraph/dask/common/mg_utils.py +++ b/python/cugraph/cugraph/dask/common/mg_utils.py @@ -12,7 +12,7 @@ # limitations under the License. import os - +import gc import numba.cuda @@ -68,3 +68,8 @@ def get_visible_devices(): else: visible_devices = _visible_devices.strip().split(",") return visible_devices + + +def run_gc_on_dask_cluster(client): + gc.collect() + client.run(gc.collect) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index f666900b226..319435575cc 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -37,8 +37,8 @@ from cugraph.dask.common.part_utils import ( get_persisted_df_worker_map, persist_dask_df_equal_parts_per_worker, - _chunk_lst, ) +from cugraph.dask.common.mg_utils import run_gc_on_dask_cluster from cugraph.dask import get_n_workers import cugraph.dask.comms.comms as Comms @@ -171,7 +171,6 @@ def __from_edgelist( store_transposed=False, legacy_renum_only=False, ): - if not isinstance(input_ddf, dask_cudf.DataFrame): raise TypeError("input should be a dask_cudf dataFrame") @@ -275,7 +274,6 @@ def __from_edgelist( ) value_col = None else: - source_col, dest_col, value_col = symmetrize( input_ddf, source, @@ -350,9 +348,11 @@ def __from_edgelist( is_symmetric=not self.properties.directed, ) ddf = ddf.repartition(npartitions=len(workers) * 2) - ddf_keys = ddf.to_delayed() workers = _client.scheduler_info()["workers"].keys() - ddf_keys_ls = _chunk_lst(ddf_keys, len(workers)) + persisted_keys_d = persist_dask_df_equal_parts_per_worker( + ddf, _client, return_type="dict" + ) + del ddf delayed_tasks_d = { w: delayed(simpleDistributedGraphImpl._make_plc_graph)( @@ -367,19 +367,19 @@ def __from_edgelist( self.edge_id_type, self.edge_type_id_type, ) - for w, edata in zip(workers, ddf_keys_ls) + for w, edata in persisted_keys_d.items() } + del persisted_keys_d self._plc_graph = { w: _client.compute( delayed_task, workers=w, allow_other_workers=False, pure=False ) for w, delayed_task in delayed_tasks_d.items() } - wait(list(self._plc_graph.values())) - del ddf_keys del delayed_tasks_d - gc.collect() - _client.run(gc.collect) + run_gc_on_dask_cluster(_client) + wait(list(self._plc_graph.values())) + run_gc_on_dask_cluster(_client) @property def renumbered(self): @@ -945,7 +945,6 @@ def convert_to_cudf(cp_arrays: cp.ndarray) -> cudf.Series: def _call_plc_select_random_vertices( mg_graph_x, sID: bytes, random_state: int, num_vertices: int ) -> cudf.Series: - cp_arrays = pylibcugraph_select_random_vertices( graph=mg_graph_x, resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), @@ -961,7 +960,6 @@ def _mg_call_plc_select_random_vertices( random_state: int, num_vertices: int, ) -> dask_cudf.Series: - result = [ client.submit( _call_plc_select_random_vertices,