From be504cccdbef757366da2389d8e069c938c1512f Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Fri, 30 Aug 2024 15:05:00 -0700 Subject: [PATCH 1/5] reduce memory footprint in graph creation --- cpp/include/cugraph/partition_manager.hpp | 27 +- cpp/src/structure/renumber_edgelist_impl.cuh | 301 +++++++++++++------ 2 files changed, 240 insertions(+), 88 deletions(-) diff --git a/cpp/include/cugraph/partition_manager.hpp b/cpp/include/cugraph/partition_manager.hpp index 309b169e646..18c19d3b54d 100644 --- a/cpp/include/cugraph/partition_manager.hpp +++ b/cpp/include/cugraph/partition_manager.hpp @@ -42,7 +42,8 @@ class partition_manager { // partitioning along the major axis (major sub-communicator is responsible for this) and along // the minor axis (minor sub-communicator is responsible for this). This variable controls whether // to map the major sub-communicator to the GPU row communicator or the GPU column communicator. - static constexpr bool map_major_comm_to_gpu_row_comm = true; + static constexpr bool map_major_comm_to_gpu_row_comm = + false; // FIXME: this is for benchmarking, reset to true before merging #ifdef __CUDACC__ __host__ __device__ @@ -71,6 +72,30 @@ class partition_manager { : (major_comm_rank * minor_comm_size + minor_comm_rank); } +#ifdef __CUDACC__ + __host__ __device__ +#endif + static int + compute_major_comm_rank_from_global_comm_rank(int major_comm_size, + int minor_comm_size, + int comm_rank) + { + return map_major_comm_to_gpu_row_comm ? comm_rank % major_comm_size + : comm_rank / minor_comm_size; + } + +#ifdef __CUDACC__ + __host__ __device__ +#endif + static int + compute_minor_comm_rank_from_global_comm_rank(int major_comm_size, + int minor_comm_size, + int comm_rank) + { + return map_major_comm_to_gpu_row_comm ? comm_rank / major_comm_size + : comm_rank % minor_comm_size; + } + #ifdef __CUDACC__ __host__ __device__ #endif diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 41f81d72ab1..574163d4af5 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -51,6 +51,8 @@ #include #include +#include + #include #include #include @@ -242,119 +244,240 @@ std::tuple, std::vector, vertex_t> compu std::vector const& edgelist_minors, std::vector const& edgelist_edge_counts) { + // 1. if local_vertices.has_value() is false, find unique vertices from edge majors & minors (to + // construct local_vertices) + rmm::device_uvector sorted_local_vertices(0, handle.get_stream()); + if (!local_vertices) { + rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); + if (edgelist_majors.size() > 1) { + constexpr size_t num_bins{8}; // increase the number of bins to cut peak memory usage (at the + // expense of additional computing) + constexpr uint32_t hash_seed = + 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function + // used to map vertices to GPUs, and we may not see the expected randomization) + + assert(multi_gpu); + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + assert(static_cast(minor_comm_size) == edgelist_majors.size()); + + auto edge_count_vectors = num_bins > 1 + ? std::make_optional>>( + edgelist_majors.size(), std::vector(num_bins)) + : std::nullopt; + if (edge_count_vectors) { + for (size_t i = 0; i < edgelist_majors.size(); ++i) { + rmm::device_uvector d_edge_counts(num_bins, handle.get_stream()); + thrust::fill( + handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0}); + thrust::for_each(handle.get_thrust_policy(), + edgelist_majors[i], + edgelist_majors[i] + edgelist_edge_counts[i], + [counts = raft::device_span( + d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + cuda::atomic_ref atomic_counter( + counts[hash_func(v) % num_bins]); + atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); + }); + raft::update_host((*edge_count_vectors)[i].data(), + d_edge_counts.data(), + d_edge_counts.size(), + handle.get_stream()); + } + handle.sync_stream(); + } - edge_t num_local_edges = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + for (size_t i = 0; i < num_bins; ++i) { + std::vector> + edge_partition_sorted_unique_majors{}; // for bin "i" + edge_partition_sorted_unique_majors.reserve(edgelist_majors.size()); + std::vector tx_counts(minor_comm_size); + for (size_t j = 0; j < edgelist_majors.size(); ++j) { + rmm::device_uvector majors(0, handle.get_stream()); + if (num_bins > 1) { + majors.resize((*edge_count_vectors)[j][i], handle.get_stream()); + thrust::copy_if(handle.get_thrust_policy(), + edgelist_majors[j], + edgelist_majors[j] + edgelist_edge_counts[j], + majors.begin(), + [i] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + return (static_cast(hash_func(v) % num_bins) == i); + }); + } else { + majors.resize(edgelist_edge_counts[j], handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), + edgelist_majors[j], + edgelist_majors[j] + edgelist_edge_counts[j], + majors.begin()); + } + thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); + majors.resize(thrust::distance( + majors.begin(), + thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), + handle.get_stream()); + majors.shrink_to_fit(handle.get_stream()); - // 1. if local_vertices.has_value() is false, find unique vertices from edge majors (to construct - // local_vertices) + tx_counts[j] = majors.size(); + edge_partition_sorted_unique_majors.push_back(std::move(majors)); + } - rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); - if (!local_vertices) { - sorted_unique_majors.resize(num_local_edges, handle.get_stream()); - size_t major_offset{0}; - for (size_t i = 0; i < edgelist_majors.size(); ++i) { - thrust::copy(handle.get_thrust_policy(), - edgelist_majors[i], - edgelist_majors[i] + edgelist_edge_counts[i], - sorted_unique_majors.begin() + major_offset); - thrust::sort(handle.get_thrust_policy(), - sorted_unique_majors.begin() + major_offset, - sorted_unique_majors.begin() + major_offset + edgelist_edge_counts[i]); - major_offset += static_cast(thrust::distance( - sorted_unique_majors.begin() + major_offset, - thrust::unique(handle.get_thrust_policy(), - sorted_unique_majors.begin() + major_offset, - sorted_unique_majors.begin() + major_offset + edgelist_edge_counts[i]))); - } - sorted_unique_majors.resize(major_offset, handle.get_stream()); + rmm::device_uvector tmp_majors(std::reduce(tx_counts.begin(), tx_counts.end()), + handle.get_stream()); + size_t offset{0}; + for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { + thrust::copy(handle.get_thrust_policy(), + edge_partition_sorted_unique_majors[j].begin(), + edge_partition_sorted_unique_majors[j].end(), + tmp_majors.begin() + offset); + offset += edge_partition_sorted_unique_majors[j].size(); + } + edge_partition_sorted_unique_majors.clear(); - if (edgelist_majors.size() > 1) { - thrust::sort( - handle.get_thrust_policy(), sorted_unique_majors.begin(), sorted_unique_majors.end()); - } - sorted_unique_majors.shrink_to_fit(handle.get_stream()); - } + std::tie(tmp_majors, std::ignore) = + shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); - // 2. if local_vertices.has_value() is false, find unique vertices from edge minors (to construct - // local_vertices) + thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); + tmp_majors.resize( + thrust::distance( + tmp_majors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), + handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + if (i == 0) { + sorted_unique_majors = std::move(tmp_majors); + } else { + rmm::device_uvector merged_majors( + sorted_unique_majors.size() + tmp_majors.size(), handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_unique_majors.begin(), + sorted_unique_majors.end(), + tmp_majors.begin(), + tmp_majors.end(), + merged_majors.begin()); + sorted_unique_majors = std::move(merged_majors); + tmp_majors.resize(0, handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + sorted_unique_majors.resize(thrust::distance(sorted_unique_majors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_majors.begin(), + sorted_unique_majors.end())), + handle.get_stream()); + sorted_unique_majors.shrink_to_fit(handle.get_stream()); + } + } + } else { + rmm::device_uvector majors(edgelist_edge_counts[0], handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), + edgelist_majors[0], + edgelist_majors[0] + edgelist_edge_counts[0], + majors.begin()); + thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); + majors.resize( + thrust::distance(majors.begin(), + thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), + handle.get_stream()); + majors.shrink_to_fit(handle.get_stream()); + sorted_unique_majors = std::move(majors); + } - rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); - if (!local_vertices) { - sorted_unique_minors.resize(num_local_edges, handle.get_stream()); - size_t minor_offset{0}; + rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); for (size_t i = 0; i < edgelist_minors.size(); ++i) { + rmm::device_uvector tmp_minors(edgelist_edge_counts[i], handle.get_stream()); thrust::copy(handle.get_thrust_policy(), edgelist_minors[i], edgelist_minors[i] + edgelist_edge_counts[i], - sorted_unique_minors.begin() + minor_offset); - thrust::sort(handle.get_thrust_policy(), - sorted_unique_minors.begin() + minor_offset, - sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]); - minor_offset += static_cast(thrust::distance( - sorted_unique_minors.begin() + minor_offset, - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin() + minor_offset, - sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]))); - } - sorted_unique_minors.resize(minor_offset, handle.get_stream()); - if (edgelist_minors.size() > 1) { - thrust::sort( - handle.get_thrust_policy(), sorted_unique_minors.begin(), sorted_unique_minors.end()); - sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end())), - handle.get_stream()); - } - sorted_unique_minors.shrink_to_fit(handle.get_stream()); - } - - // 3. update sorted_local_vertices. - // if local_vertices.has_value() is false, reconstruct local_vertices first + tmp_minors.begin()); + thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); + tmp_minors.resize( + thrust::distance( + tmp_minors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), + handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + + if constexpr (multi_gpu) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + compute_gpu_id_from_ext_vertex_t gpu_id_func{ + comm_size, major_comm_size, minor_comm_size}; + std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( + major_comm, + tmp_minors.begin(), + tmp_minors.end(), + [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { + auto comm_rank = gpu_id_func(v); + return partition_manager::compute_major_comm_rank_from_global_comm_rank( + major_comm_size, minor_comm_size, comm_rank); + }, + handle.get_stream()); + thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); + tmp_minors.resize( + thrust::distance( + tmp_minors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), + handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + } - if (local_vertices) { - sorted_local_vertices = std::move(*local_vertices); - thrust::sort( - handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end()); - } else { + if (i == 0) { + sorted_unique_minors = std::move(tmp_minors); + } else { + rmm::device_uvector merged_minors(sorted_unique_minors.size() + tmp_minors.size(), + handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end(), + tmp_minors.begin(), + tmp_minors.end(), + merged_minors.begin()); + sorted_unique_minors = std::move(merged_minors); + tmp_minors.resize(0, handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end())), + handle.get_stream()); + sorted_unique_minors.shrink_to_fit(handle.get_stream()); + } + } sorted_local_vertices.resize(sorted_unique_majors.size() + sorted_unique_minors.size(), handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), sorted_unique_majors.begin(), sorted_unique_majors.end(), sorted_unique_minors.begin(), sorted_unique_minors.end(), sorted_local_vertices.begin()); - sorted_unique_majors.resize(0, handle.get_stream()); sorted_unique_majors.shrink_to_fit(handle.get_stream()); sorted_unique_minors.resize(0, handle.get_stream()); sorted_unique_minors.shrink_to_fit(handle.get_stream()); - sorted_local_vertices.resize(thrust::distance(sorted_local_vertices.begin(), thrust::unique(handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end())), handle.get_stream()); sorted_local_vertices.shrink_to_fit(handle.get_stream()); - - if constexpr (multi_gpu) { - sorted_local_vertices = - cugraph::detail::shuffle_ext_vertices_to_local_gpu_by_vertex_partitioning( - handle, std::move(sorted_local_vertices)); - thrust::sort( - handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end()); - sorted_local_vertices.resize(thrust::distance(sorted_local_vertices.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_local_vertices.begin(), - sorted_local_vertices.end())), - handle.get_stream()); - sorted_local_vertices.shrink_to_fit(handle.get_stream()); - } + } else { + sorted_local_vertices = std::move(*local_vertices); + thrust::sort( + handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end()); } + // 2. find an unused vertex ID + auto locally_unused_vertex_id = find_locally_unused_ext_vertex_id( handle, raft::device_span(sorted_local_vertices.data(), sorted_local_vertices.size()), @@ -363,7 +486,7 @@ std::tuple, std::vector, vertex_t> compu "Invalid input arguments: there is no unused value in the entire range of " "vertex_t, increase vertex_t to 64 bit."); - // 4. compute global degrees for the sorted local vertices + // 3. compute global degrees for the sorted local vertices rmm::device_uvector sorted_local_vertex_degrees(0, handle.get_stream()); std::optional> stream_pool_indices{ @@ -387,6 +510,9 @@ std::tuple, std::vector, vertex_t> compu host_scalar_allgather(minor_comm, sorted_local_vertices.size(), handle.get_stream()); if ((minor_comm_size >= 2) && (handle.get_stream_pool_size() >= 2)) { + edge_t num_local_edges = + std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + auto vertex_edge_counts = host_scalar_allreduce( comm, thrust::make_tuple(static_cast(sorted_local_vertices.size()), num_local_edges), @@ -857,10 +983,10 @@ renumber_edgelist( (*edgelist_intra_partition_segment_offsets).size() == static_cast(minor_comm_size), "Invalid input arguments: erroneous (*edgelist_intra_partition_segment_offsets).size()."); for (size_t i = 0; i < edgelist_majors.size(); ++i) { - CUGRAPH_EXPECTS( - (*edgelist_intra_partition_segment_offsets)[i].size() == - static_cast(major_comm_size + 1), - "Invalid input arguments: erroneous (*edgelist_intra_partition_segment_offsets)[].size()."); + CUGRAPH_EXPECTS((*edgelist_intra_partition_segment_offsets)[i].size() == + static_cast(major_comm_size + 1), + "Invalid input arguments: erroneous " + "(*edgelist_intra_partition_segment_offsets)[].size()."); CUGRAPH_EXPECTS( std::is_sorted((*edgelist_intra_partition_segment_offsets)[i].begin(), (*edgelist_intra_partition_segment_offsets)[i].end()), @@ -868,7 +994,8 @@ renumber_edgelist( CUGRAPH_EXPECTS( ((*edgelist_intra_partition_segment_offsets)[i][0] == 0) && ((*edgelist_intra_partition_segment_offsets)[i].back() == edgelist_edge_counts[i]), - "Invalid input arguments: (*edgelist_intra_partition_segment_offsets)[][0] should be 0 and " + "Invalid input arguments: (*edgelist_intra_partition_segment_offsets)[][0] should be 0 " + "and " "(*edgelist_intra_partition_segment_offsets)[].back() should coincide with " "edgelist_edge_counts[]."); } From 3b151e0cb7891ef1448fdb48f6e6f42cab98057c Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Fri, 30 Aug 2024 15:35:11 -0700 Subject: [PATCH 2/5] undo temporary change for benchmarking --- cpp/include/cugraph/partition_manager.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/include/cugraph/partition_manager.hpp b/cpp/include/cugraph/partition_manager.hpp index 18c19d3b54d..e3bb699f00d 100644 --- a/cpp/include/cugraph/partition_manager.hpp +++ b/cpp/include/cugraph/partition_manager.hpp @@ -42,8 +42,7 @@ class partition_manager { // partitioning along the major axis (major sub-communicator is responsible for this) and along // the minor axis (minor sub-communicator is responsible for this). This variable controls whether // to map the major sub-communicator to the GPU row communicator or the GPU column communicator. - static constexpr bool map_major_comm_to_gpu_row_comm = - false; // FIXME: this is for benchmarking, reset to true before merging + static constexpr bool map_major_comm_to_gpu_row_comm = true; #ifdef __CUDACC__ __host__ __device__ From ad0c87994ca52b0ee7277eff8ecb159e3dde4a41 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Fri, 30 Aug 2024 15:42:02 -0700 Subject: [PATCH 3/5] update comments --- cpp/src/structure/renumber_edgelist_impl.cuh | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 574163d4af5..5ebd8323fac 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -251,8 +251,10 @@ std::tuple, std::vector, vertex_t> compu if (!local_vertices) { rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); if (edgelist_majors.size() > 1) { - constexpr size_t num_bins{8}; // increase the number of bins to cut peak memory usage (at the - // expense of additional computing) + constexpr size_t num_bins{ + 8}; // increase the number of bins to cut peak memory usage (at the expense of additional + // computing), limit the maximum temporary memory usage to "size of local edge list + // majors & minors" / "# bins". constexpr uint32_t hash_seed = 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function // used to map vertices to GPUs, and we may not see the expected randomization) @@ -387,7 +389,10 @@ std::tuple, std::vector, vertex_t> compu } rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); - for (size_t i = 0; i < edgelist_minors.size(); ++i) { + for (size_t i = 0; i < edgelist_minors.size(); + ++i) { // limit the maximum temporary memory usage to "size of local edge list majors & + // minors" / "# local edge partitions" (FXIME: we can further cut peak memory usage + // by applying binning here as well; fewer bins than the edge list major case) rmm::device_uvector tmp_minors(edgelist_edge_counts[i], handle.get_stream()); thrust::copy(handle.get_thrust_policy(), edgelist_minors[i], From ce4ea93a53ca7a8316ae59a82bd69ea4f0148148 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Fri, 30 Aug 2024 15:54:45 -0700 Subject: [PATCH 4/5] cosmetic updates --- cpp/src/structure/renumber_edgelist_impl.cuh | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 5ebd8323fac..9507e0fd33b 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -260,9 +260,7 @@ std::tuple, std::vector, vertex_t> compu // used to map vertices to GPUs, and we may not see the expected randomization) assert(multi_gpu); - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); auto const minor_comm_size = minor_comm.get_size(); assert(static_cast(minor_comm_size) == edgelist_majors.size()); @@ -373,7 +371,7 @@ std::tuple, std::vector, vertex_t> compu sorted_unique_majors.shrink_to_fit(handle.get_stream()); } } - } else { + } else { // FIXME: why not apply binning here? rmm::device_uvector majors(edgelist_edge_counts[0], handle.get_stream()); thrust::copy(handle.get_thrust_policy(), edgelist_majors[0], @@ -1000,8 +998,7 @@ renumber_edgelist( ((*edgelist_intra_partition_segment_offsets)[i][0] == 0) && ((*edgelist_intra_partition_segment_offsets)[i].back() == edgelist_edge_counts[i]), "Invalid input arguments: (*edgelist_intra_partition_segment_offsets)[][0] should be 0 " - "and " - "(*edgelist_intra_partition_segment_offsets)[].back() should coincide with " + "and (*edgelist_intra_partition_segment_offsets)[].back() should coincide with " "edgelist_edge_counts[]."); } } From 9445027410faf76df813d76d9ff78aab5ac10e1c Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Tue, 3 Sep 2024 10:51:30 -0700 Subject: [PATCH 5/5] update renumbering to use binning in more places --- cpp/src/structure/renumber_edgelist_impl.cuh | 293 +++++++++++-------- 1 file changed, 168 insertions(+), 125 deletions(-) diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 9507e0fd33b..abfa515df05 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -249,22 +249,16 @@ std::tuple, std::vector, vertex_t> compu rmm::device_uvector sorted_local_vertices(0, handle.get_stream()); if (!local_vertices) { - rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); - if (edgelist_majors.size() > 1) { - constexpr size_t num_bins{ - 8}; // increase the number of bins to cut peak memory usage (at the expense of additional - // computing), limit the maximum temporary memory usage to "size of local edge list - // majors & minors" / "# bins". - constexpr uint32_t hash_seed = - 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function - // used to map vertices to GPUs, and we may not see the expected randomization) - - assert(multi_gpu); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - assert(static_cast(minor_comm_size) == edgelist_majors.size()); + constexpr size_t num_bins{ + 8}; // increase the number of bins to cut peak memory usage (at the expense of additional + // computing), limit the maximum temporary memory usage to "size of local edge list + // majors|minors * 2 / # bins" + constexpr uint32_t hash_seed = + 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function + // used to map vertices to GPUs, and we may not see the expected randomization) + rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); + { auto edge_count_vectors = num_bins > 1 ? std::make_optional>>( edgelist_majors.size(), std::vector(num_bins)) @@ -296,7 +290,6 @@ std::tuple, std::vector, vertex_t> compu std::vector> edge_partition_sorted_unique_majors{}; // for bin "i" edge_partition_sorted_unique_majors.reserve(edgelist_majors.size()); - std::vector tx_counts(minor_comm_size); for (size_t j = 0; j < edgelist_majors.size(); ++j) { rmm::device_uvector majors(0, handle.get_stream()); if (num_bins > 1) { @@ -323,32 +316,51 @@ std::tuple, std::vector, vertex_t> compu handle.get_stream()); majors.shrink_to_fit(handle.get_stream()); - tx_counts[j] = majors.size(); edge_partition_sorted_unique_majors.push_back(std::move(majors)); } - rmm::device_uvector tmp_majors(std::reduce(tx_counts.begin(), tx_counts.end()), - handle.get_stream()); - size_t offset{0}; - for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { - thrust::copy(handle.get_thrust_policy(), - edge_partition_sorted_unique_majors[j].begin(), - edge_partition_sorted_unique_majors[j].end(), - tmp_majors.begin() + offset); - offset += edge_partition_sorted_unique_majors[j].size(); + rmm::device_uvector tmp_majors(0, handle.get_stream()); + if constexpr (multi_gpu) { + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + assert(static_cast(minor_comm_size) == + edge_partition_sorted_unique_majors.size()); + + if (minor_comm_size > 1) { + std::vector tx_counts(minor_comm_size); + for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { + tx_counts[j] = edge_partition_sorted_unique_majors[j].size(); + } + tmp_majors.resize(std::reduce(tx_counts.begin(), tx_counts.end()), handle.get_stream()); + size_t offset{0}; + for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { + thrust::copy(handle.get_thrust_policy(), + edge_partition_sorted_unique_majors[j].begin(), + edge_partition_sorted_unique_majors[j].end(), + tmp_majors.begin() + offset); + offset += tx_counts[j]; + } + edge_partition_sorted_unique_majors.clear(); + + std::tie(tmp_majors, std::ignore) = + shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); + + thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); + tmp_majors.resize( + thrust::distance( + tmp_majors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), + handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + } else { + assert(edge_partition_sorted_unique_majors.size() == 1); + tmp_majors = std::move(edge_partition_sorted_unique_majors[0]); + } + } else { + assert(edge_partition_sorted_unique_majors.size() == 1); + tmp_majors = std::move(edge_partition_sorted_unique_majors[0]); } - edge_partition_sorted_unique_majors.clear(); - std::tie(tmp_majors, std::ignore) = - shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); - - thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); - tmp_majors.resize( - thrust::distance( - tmp_majors.begin(), - thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), - handle.get_stream()); - tmp_majors.shrink_to_fit(handle.get_stream()); if (i == 0) { sorted_unique_majors = std::move(tmp_majors); } else { @@ -359,71 +371,72 @@ std::tuple, std::vector, vertex_t> compu sorted_unique_majors.end(), tmp_majors.begin(), tmp_majors.end(), - merged_majors.begin()); + merged_majors.begin()); // merging two unique sets from different hash + // bins, so the merged set can't have duplicates sorted_unique_majors = std::move(merged_majors); - tmp_majors.resize(0, handle.get_stream()); - tmp_majors.shrink_to_fit(handle.get_stream()); - sorted_unique_majors.resize(thrust::distance(sorted_unique_majors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_majors.begin(), - sorted_unique_majors.end())), - handle.get_stream()); - sorted_unique_majors.shrink_to_fit(handle.get_stream()); } } - } else { // FIXME: why not apply binning here? - rmm::device_uvector majors(edgelist_edge_counts[0], handle.get_stream()); - thrust::copy(handle.get_thrust_policy(), - edgelist_majors[0], - edgelist_majors[0] + edgelist_edge_counts[0], - majors.begin()); - thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); - majors.resize( - thrust::distance(majors.begin(), - thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), - handle.get_stream()); - majors.shrink_to_fit(handle.get_stream()); - sorted_unique_majors = std::move(majors); } rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); - for (size_t i = 0; i < edgelist_minors.size(); - ++i) { // limit the maximum temporary memory usage to "size of local edge list majors & - // minors" / "# local edge partitions" (FXIME: we can further cut peak memory usage - // by applying binning here as well; fewer bins than the edge list major case) - rmm::device_uvector tmp_minors(edgelist_edge_counts[i], handle.get_stream()); - thrust::copy(handle.get_thrust_policy(), - edgelist_minors[i], - edgelist_minors[i] + edgelist_edge_counts[i], - tmp_minors.begin()); - thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); - tmp_minors.resize( - thrust::distance( - tmp_minors.begin(), - thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), - handle.get_stream()); - tmp_minors.shrink_to_fit(handle.get_stream()); - - if constexpr (multi_gpu) { - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - compute_gpu_id_from_ext_vertex_t gpu_id_func{ - comm_size, major_comm_size, minor_comm_size}; - std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( - major_comm, - tmp_minors.begin(), - tmp_minors.end(), - [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { - auto comm_rank = gpu_id_func(v); - return partition_manager::compute_major_comm_rank_from_global_comm_rank( - major_comm_size, minor_comm_size, comm_rank); - }, - handle.get_stream()); + { + auto edge_count_vectors = num_bins > 1 + ? std::make_optional>>( + edgelist_minors.size(), std::vector(num_bins)) + : std::nullopt; + if (edge_count_vectors) { + for (size_t i = 0; i < edgelist_minors.size(); ++i) { + rmm::device_uvector d_edge_counts(num_bins, handle.get_stream()); + thrust::fill( + handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0}); + thrust::for_each(handle.get_thrust_policy(), + edgelist_minors[i], + edgelist_minors[i] + edgelist_edge_counts[i], + [counts = raft::device_span( + d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + cuda::atomic_ref atomic_counter( + counts[hash_func(v) % num_bins]); + atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); + }); + raft::update_host((*edge_count_vectors)[i].data(), + d_edge_counts.data(), + d_edge_counts.size(), + handle.get_stream()); + } + handle.sync_stream(); + } + + for (size_t i = 0; i < num_bins; ++i) { + edge_t bin_size{0}; + if (edge_count_vectors) { + for (size_t j = 0; j < edgelist_minors.size(); ++j) { + bin_size += (*edge_count_vectors)[j][i]; + } + } else { + bin_size = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + } + rmm::device_uvector tmp_minors(bin_size, handle.get_stream()); + edge_t offset{0}; + for (size_t j = 0; j < edgelist_minors.size(); ++j) { + if (num_bins > 1) { + thrust::copy_if(handle.get_thrust_policy(), + edgelist_minors[j], + edgelist_minors[j] + edgelist_edge_counts[j], + tmp_minors.begin() + offset, + [i] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + return (static_cast(hash_func(v) % num_bins) == i); + }); + offset += (*edge_count_vectors)[j][i]; + } else { + thrust::copy(handle.get_thrust_policy(), + edgelist_minors[j], + edgelist_minors[j] + edgelist_edge_counts[j], + tmp_minors.begin() + offset); + offset += edgelist_edge_counts[j]; + } + } thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); tmp_minors.resize( thrust::distance( @@ -431,30 +444,60 @@ std::tuple, std::vector, vertex_t> compu thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), handle.get_stream()); tmp_minors.shrink_to_fit(handle.get_stream()); - } - if (i == 0) { - sorted_unique_minors = std::move(tmp_minors); - } else { - rmm::device_uvector merged_minors(sorted_unique_minors.size() + tmp_minors.size(), - handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end(), - tmp_minors.begin(), - tmp_minors.end(), - merged_minors.begin()); - sorted_unique_minors = std::move(merged_minors); - tmp_minors.resize(0, handle.get_stream()); - tmp_minors.shrink_to_fit(handle.get_stream()); - sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end())), - handle.get_stream()); - sorted_unique_minors.shrink_to_fit(handle.get_stream()); + if constexpr (multi_gpu) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + compute_gpu_id_from_ext_vertex_t gpu_id_func{ + comm_size, major_comm_size, minor_comm_size}; + std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( + major_comm, + tmp_minors.begin(), + tmp_minors.end(), + [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { + auto comm_rank = gpu_id_func(v); + return partition_manager::compute_major_comm_rank_from_global_comm_rank( + major_comm_size, minor_comm_size, comm_rank); + }, + handle.get_stream()); + thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); + tmp_minors.resize( + thrust::distance( + tmp_minors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), + handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + } + + if (i == 0) { + sorted_unique_minors = std::move(tmp_minors); + } else { + rmm::device_uvector merged_minors( + sorted_unique_minors.size() + tmp_minors.size(), handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end(), + tmp_minors.begin(), + tmp_minors.end(), + merged_minors.begin()); + sorted_unique_minors = std::move(merged_minors); + tmp_minors.resize(0, handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end())), + handle.get_stream()); + sorted_unique_minors.shrink_to_fit(handle.get_stream()); + } } } + sorted_local_vertices.resize(sorted_unique_majors.size() + sorted_unique_minors.size(), handle.get_stream()); thrust::merge(handle.get_thrust_policy(), @@ -492,14 +535,12 @@ std::tuple, std::vector, vertex_t> compu // 3. compute global degrees for the sorted local vertices rmm::device_uvector sorted_local_vertex_degrees(0, handle.get_stream()); - std::optional> stream_pool_indices{ - std::nullopt}; // FIXME: move this inside the if statement auto constexpr num_chunks = size_t{ - 2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more binary - // searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and temporary - // buffer requirement (cut by num_chunks times), currently set to 2 to avoid peak memory - // usage happening in this part (especially when minor_comm_size is small) + 2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more + // binary searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and + // temporary buffer requirement (cut by num_chunks times), currently set to 2 to avoid + // peak memory usage happening in this part (especially when minor_comm_size is small) if constexpr (multi_gpu) { auto& comm = handle.get_comms(); @@ -512,6 +553,8 @@ std::tuple, std::vector, vertex_t> compu auto edge_partition_major_range_sizes = host_scalar_allgather(minor_comm, sorted_local_vertices.size(), handle.get_stream()); + std::optional> stream_pool_indices{std::nullopt}; + if ((minor_comm_size >= 2) && (handle.get_stream_pool_size() >= 2)) { edge_t num_local_edges = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); @@ -664,7 +707,7 @@ std::tuple, std::vector, vertex_t> compu } } - // 4. sort local vertices by degree (descending) + // 5. sort local vertices by degree (descending) thrust::sort_by_key(handle.get_thrust_policy(), sorted_local_vertex_degrees.begin(), @@ -672,7 +715,7 @@ std::tuple, std::vector, vertex_t> compu sorted_local_vertices.begin(), thrust::greater()); - // 5. compute segment_offsets + // 6. compute segment_offsets static_assert(detail::num_sparse_segments_per_vertex_partition == 3); static_assert((detail::low_degree_threshold <= detail::mid_degree_threshold) &&