diff --git a/cpp/include/cugraph/partition_manager.hpp b/cpp/include/cugraph/partition_manager.hpp index 309b169e646..18c19d3b54d 100644 --- a/cpp/include/cugraph/partition_manager.hpp +++ b/cpp/include/cugraph/partition_manager.hpp @@ -42,7 +42,8 @@ class partition_manager { // partitioning along the major axis (major sub-communicator is responsible for this) and along // the minor axis (minor sub-communicator is responsible for this). This variable controls whether // to map the major sub-communicator to the GPU row communicator or the GPU column communicator. - static constexpr bool map_major_comm_to_gpu_row_comm = true; + static constexpr bool map_major_comm_to_gpu_row_comm = + false; // FIXME: this is for benchmarking, reset to true before merging #ifdef __CUDACC__ __host__ __device__ @@ -71,6 +72,30 @@ class partition_manager { : (major_comm_rank * minor_comm_size + minor_comm_rank); } +#ifdef __CUDACC__ + __host__ __device__ +#endif + static int + compute_major_comm_rank_from_global_comm_rank(int major_comm_size, + int minor_comm_size, + int comm_rank) + { + return map_major_comm_to_gpu_row_comm ? comm_rank % major_comm_size + : comm_rank / minor_comm_size; + } + +#ifdef __CUDACC__ + __host__ __device__ +#endif + static int + compute_minor_comm_rank_from_global_comm_rank(int major_comm_size, + int minor_comm_size, + int comm_rank) + { + return map_major_comm_to_gpu_row_comm ? comm_rank / major_comm_size + : comm_rank % minor_comm_size; + } + #ifdef __CUDACC__ __host__ __device__ #endif diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 41f81d72ab1..574163d4af5 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -51,6 +51,8 @@ #include #include +#include + #include #include #include @@ -242,119 +244,240 @@ std::tuple, std::vector, vertex_t> compu std::vector const& edgelist_minors, std::vector const& edgelist_edge_counts) { + // 1. if local_vertices.has_value() is false, find unique vertices from edge majors & minors (to + // construct local_vertices) + rmm::device_uvector sorted_local_vertices(0, handle.get_stream()); + if (!local_vertices) { + rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); + if (edgelist_majors.size() > 1) { + constexpr size_t num_bins{8}; // increase the number of bins to cut peak memory usage (at the + // expense of additional computing) + constexpr uint32_t hash_seed = + 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function + // used to map vertices to GPUs, and we may not see the expected randomization) + + assert(multi_gpu); + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + assert(static_cast(minor_comm_size) == edgelist_majors.size()); + + auto edge_count_vectors = num_bins > 1 + ? std::make_optional>>( + edgelist_majors.size(), std::vector(num_bins)) + : std::nullopt; + if (edge_count_vectors) { + for (size_t i = 0; i < edgelist_majors.size(); ++i) { + rmm::device_uvector d_edge_counts(num_bins, handle.get_stream()); + thrust::fill( + handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0}); + thrust::for_each(handle.get_thrust_policy(), + edgelist_majors[i], + edgelist_majors[i] + edgelist_edge_counts[i], + [counts = raft::device_span( + d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + cuda::atomic_ref atomic_counter( + counts[hash_func(v) % num_bins]); + atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); + }); + raft::update_host((*edge_count_vectors)[i].data(), + d_edge_counts.data(), + d_edge_counts.size(), + handle.get_stream()); + } + handle.sync_stream(); + } - edge_t num_local_edges = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + for (size_t i = 0; i < num_bins; ++i) { + std::vector> + edge_partition_sorted_unique_majors{}; // for bin "i" + edge_partition_sorted_unique_majors.reserve(edgelist_majors.size()); + std::vector tx_counts(minor_comm_size); + for (size_t j = 0; j < edgelist_majors.size(); ++j) { + rmm::device_uvector majors(0, handle.get_stream()); + if (num_bins > 1) { + majors.resize((*edge_count_vectors)[j][i], handle.get_stream()); + thrust::copy_if(handle.get_thrust_policy(), + edgelist_majors[j], + edgelist_majors[j] + edgelist_edge_counts[j], + majors.begin(), + [i] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + return (static_cast(hash_func(v) % num_bins) == i); + }); + } else { + majors.resize(edgelist_edge_counts[j], handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), + edgelist_majors[j], + edgelist_majors[j] + edgelist_edge_counts[j], + majors.begin()); + } + thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); + majors.resize(thrust::distance( + majors.begin(), + thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), + handle.get_stream()); + majors.shrink_to_fit(handle.get_stream()); - // 1. if local_vertices.has_value() is false, find unique vertices from edge majors (to construct - // local_vertices) + tx_counts[j] = majors.size(); + edge_partition_sorted_unique_majors.push_back(std::move(majors)); + } - rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); - if (!local_vertices) { - sorted_unique_majors.resize(num_local_edges, handle.get_stream()); - size_t major_offset{0}; - for (size_t i = 0; i < edgelist_majors.size(); ++i) { - thrust::copy(handle.get_thrust_policy(), - edgelist_majors[i], - edgelist_majors[i] + edgelist_edge_counts[i], - sorted_unique_majors.begin() + major_offset); - thrust::sort(handle.get_thrust_policy(), - sorted_unique_majors.begin() + major_offset, - sorted_unique_majors.begin() + major_offset + edgelist_edge_counts[i]); - major_offset += static_cast(thrust::distance( - sorted_unique_majors.begin() + major_offset, - thrust::unique(handle.get_thrust_policy(), - sorted_unique_majors.begin() + major_offset, - sorted_unique_majors.begin() + major_offset + edgelist_edge_counts[i]))); - } - sorted_unique_majors.resize(major_offset, handle.get_stream()); + rmm::device_uvector tmp_majors(std::reduce(tx_counts.begin(), tx_counts.end()), + handle.get_stream()); + size_t offset{0}; + for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { + thrust::copy(handle.get_thrust_policy(), + edge_partition_sorted_unique_majors[j].begin(), + edge_partition_sorted_unique_majors[j].end(), + tmp_majors.begin() + offset); + offset += edge_partition_sorted_unique_majors[j].size(); + } + edge_partition_sorted_unique_majors.clear(); - if (edgelist_majors.size() > 1) { - thrust::sort( - handle.get_thrust_policy(), sorted_unique_majors.begin(), sorted_unique_majors.end()); - } - sorted_unique_majors.shrink_to_fit(handle.get_stream()); - } + std::tie(tmp_majors, std::ignore) = + shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); - // 2. if local_vertices.has_value() is false, find unique vertices from edge minors (to construct - // local_vertices) + thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); + tmp_majors.resize( + thrust::distance( + tmp_majors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), + handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + if (i == 0) { + sorted_unique_majors = std::move(tmp_majors); + } else { + rmm::device_uvector merged_majors( + sorted_unique_majors.size() + tmp_majors.size(), handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_unique_majors.begin(), + sorted_unique_majors.end(), + tmp_majors.begin(), + tmp_majors.end(), + merged_majors.begin()); + sorted_unique_majors = std::move(merged_majors); + tmp_majors.resize(0, handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + sorted_unique_majors.resize(thrust::distance(sorted_unique_majors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_majors.begin(), + sorted_unique_majors.end())), + handle.get_stream()); + sorted_unique_majors.shrink_to_fit(handle.get_stream()); + } + } + } else { + rmm::device_uvector majors(edgelist_edge_counts[0], handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), + edgelist_majors[0], + edgelist_majors[0] + edgelist_edge_counts[0], + majors.begin()); + thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); + majors.resize( + thrust::distance(majors.begin(), + thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), + handle.get_stream()); + majors.shrink_to_fit(handle.get_stream()); + sorted_unique_majors = std::move(majors); + } - rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); - if (!local_vertices) { - sorted_unique_minors.resize(num_local_edges, handle.get_stream()); - size_t minor_offset{0}; + rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); for (size_t i = 0; i < edgelist_minors.size(); ++i) { + rmm::device_uvector tmp_minors(edgelist_edge_counts[i], handle.get_stream()); thrust::copy(handle.get_thrust_policy(), edgelist_minors[i], edgelist_minors[i] + edgelist_edge_counts[i], - sorted_unique_minors.begin() + minor_offset); - thrust::sort(handle.get_thrust_policy(), - sorted_unique_minors.begin() + minor_offset, - sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]); - minor_offset += static_cast(thrust::distance( - sorted_unique_minors.begin() + minor_offset, - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin() + minor_offset, - sorted_unique_minors.begin() + minor_offset + edgelist_edge_counts[i]))); - } - sorted_unique_minors.resize(minor_offset, handle.get_stream()); - if (edgelist_minors.size() > 1) { - thrust::sort( - handle.get_thrust_policy(), sorted_unique_minors.begin(), sorted_unique_minors.end()); - sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end())), - handle.get_stream()); - } - sorted_unique_minors.shrink_to_fit(handle.get_stream()); - } - - // 3. update sorted_local_vertices. - // if local_vertices.has_value() is false, reconstruct local_vertices first + tmp_minors.begin()); + thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); + tmp_minors.resize( + thrust::distance( + tmp_minors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), + handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + + if constexpr (multi_gpu) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + compute_gpu_id_from_ext_vertex_t gpu_id_func{ + comm_size, major_comm_size, minor_comm_size}; + std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( + major_comm, + tmp_minors.begin(), + tmp_minors.end(), + [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { + auto comm_rank = gpu_id_func(v); + return partition_manager::compute_major_comm_rank_from_global_comm_rank( + major_comm_size, minor_comm_size, comm_rank); + }, + handle.get_stream()); + thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); + tmp_minors.resize( + thrust::distance( + tmp_minors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), + handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + } - if (local_vertices) { - sorted_local_vertices = std::move(*local_vertices); - thrust::sort( - handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end()); - } else { + if (i == 0) { + sorted_unique_minors = std::move(tmp_minors); + } else { + rmm::device_uvector merged_minors(sorted_unique_minors.size() + tmp_minors.size(), + handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end(), + tmp_minors.begin(), + tmp_minors.end(), + merged_minors.begin()); + sorted_unique_minors = std::move(merged_minors); + tmp_minors.resize(0, handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end())), + handle.get_stream()); + sorted_unique_minors.shrink_to_fit(handle.get_stream()); + } + } sorted_local_vertices.resize(sorted_unique_majors.size() + sorted_unique_minors.size(), handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), sorted_unique_majors.begin(), sorted_unique_majors.end(), sorted_unique_minors.begin(), sorted_unique_minors.end(), sorted_local_vertices.begin()); - sorted_unique_majors.resize(0, handle.get_stream()); sorted_unique_majors.shrink_to_fit(handle.get_stream()); sorted_unique_minors.resize(0, handle.get_stream()); sorted_unique_minors.shrink_to_fit(handle.get_stream()); - sorted_local_vertices.resize(thrust::distance(sorted_local_vertices.begin(), thrust::unique(handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end())), handle.get_stream()); sorted_local_vertices.shrink_to_fit(handle.get_stream()); - - if constexpr (multi_gpu) { - sorted_local_vertices = - cugraph::detail::shuffle_ext_vertices_to_local_gpu_by_vertex_partitioning( - handle, std::move(sorted_local_vertices)); - thrust::sort( - handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end()); - sorted_local_vertices.resize(thrust::distance(sorted_local_vertices.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_local_vertices.begin(), - sorted_local_vertices.end())), - handle.get_stream()); - sorted_local_vertices.shrink_to_fit(handle.get_stream()); - } + } else { + sorted_local_vertices = std::move(*local_vertices); + thrust::sort( + handle.get_thrust_policy(), sorted_local_vertices.begin(), sorted_local_vertices.end()); } + // 2. find an unused vertex ID + auto locally_unused_vertex_id = find_locally_unused_ext_vertex_id( handle, raft::device_span(sorted_local_vertices.data(), sorted_local_vertices.size()), @@ -363,7 +486,7 @@ std::tuple, std::vector, vertex_t> compu "Invalid input arguments: there is no unused value in the entire range of " "vertex_t, increase vertex_t to 64 bit."); - // 4. compute global degrees for the sorted local vertices + // 3. compute global degrees for the sorted local vertices rmm::device_uvector sorted_local_vertex_degrees(0, handle.get_stream()); std::optional> stream_pool_indices{ @@ -387,6 +510,9 @@ std::tuple, std::vector, vertex_t> compu host_scalar_allgather(minor_comm, sorted_local_vertices.size(), handle.get_stream()); if ((minor_comm_size >= 2) && (handle.get_stream_pool_size() >= 2)) { + edge_t num_local_edges = + std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + auto vertex_edge_counts = host_scalar_allreduce( comm, thrust::make_tuple(static_cast(sorted_local_vertices.size()), num_local_edges), @@ -857,10 +983,10 @@ renumber_edgelist( (*edgelist_intra_partition_segment_offsets).size() == static_cast(minor_comm_size), "Invalid input arguments: erroneous (*edgelist_intra_partition_segment_offsets).size()."); for (size_t i = 0; i < edgelist_majors.size(); ++i) { - CUGRAPH_EXPECTS( - (*edgelist_intra_partition_segment_offsets)[i].size() == - static_cast(major_comm_size + 1), - "Invalid input arguments: erroneous (*edgelist_intra_partition_segment_offsets)[].size()."); + CUGRAPH_EXPECTS((*edgelist_intra_partition_segment_offsets)[i].size() == + static_cast(major_comm_size + 1), + "Invalid input arguments: erroneous " + "(*edgelist_intra_partition_segment_offsets)[].size()."); CUGRAPH_EXPECTS( std::is_sorted((*edgelist_intra_partition_segment_offsets)[i].begin(), (*edgelist_intra_partition_segment_offsets)[i].end()), @@ -868,7 +994,8 @@ renumber_edgelist( CUGRAPH_EXPECTS( ((*edgelist_intra_partition_segment_offsets)[i][0] == 0) && ((*edgelist_intra_partition_segment_offsets)[i].back() == edgelist_edge_counts[i]), - "Invalid input arguments: (*edgelist_intra_partition_segment_offsets)[][0] should be 0 and " + "Invalid input arguments: (*edgelist_intra_partition_segment_offsets)[][0] should be 0 " + "and " "(*edgelist_intra_partition_segment_offsets)[].back() should coincide with " "edgelist_edge_counts[]."); }