diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 9507e0fd33b..abfa515df05 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -249,22 +249,16 @@ std::tuple, std::vector, vertex_t> compu rmm::device_uvector sorted_local_vertices(0, handle.get_stream()); if (!local_vertices) { - rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); - if (edgelist_majors.size() > 1) { - constexpr size_t num_bins{ - 8}; // increase the number of bins to cut peak memory usage (at the expense of additional - // computing), limit the maximum temporary memory usage to "size of local edge list - // majors & minors" / "# bins". - constexpr uint32_t hash_seed = - 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function - // used to map vertices to GPUs, and we may not see the expected randomization) - - assert(multi_gpu); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - assert(static_cast(minor_comm_size) == edgelist_majors.size()); + constexpr size_t num_bins{ + 8}; // increase the number of bins to cut peak memory usage (at the expense of additional + // computing), limit the maximum temporary memory usage to "size of local edge list + // majors|minors * 2 / # bins" + constexpr uint32_t hash_seed = + 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function + // used to map vertices to GPUs, and we may not see the expected randomization) + rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); + { auto edge_count_vectors = num_bins > 1 ? std::make_optional>>( edgelist_majors.size(), std::vector(num_bins)) @@ -296,7 +290,6 @@ std::tuple, std::vector, vertex_t> compu std::vector> edge_partition_sorted_unique_majors{}; // for bin "i" edge_partition_sorted_unique_majors.reserve(edgelist_majors.size()); - std::vector tx_counts(minor_comm_size); for (size_t j = 0; j < edgelist_majors.size(); ++j) { rmm::device_uvector majors(0, handle.get_stream()); if (num_bins > 1) { @@ -323,32 +316,51 @@ std::tuple, std::vector, vertex_t> compu handle.get_stream()); majors.shrink_to_fit(handle.get_stream()); - tx_counts[j] = majors.size(); edge_partition_sorted_unique_majors.push_back(std::move(majors)); } - rmm::device_uvector tmp_majors(std::reduce(tx_counts.begin(), tx_counts.end()), - handle.get_stream()); - size_t offset{0}; - for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { - thrust::copy(handle.get_thrust_policy(), - edge_partition_sorted_unique_majors[j].begin(), - edge_partition_sorted_unique_majors[j].end(), - tmp_majors.begin() + offset); - offset += edge_partition_sorted_unique_majors[j].size(); + rmm::device_uvector tmp_majors(0, handle.get_stream()); + if constexpr (multi_gpu) { + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + assert(static_cast(minor_comm_size) == + edge_partition_sorted_unique_majors.size()); + + if (minor_comm_size > 1) { + std::vector tx_counts(minor_comm_size); + for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { + tx_counts[j] = edge_partition_sorted_unique_majors[j].size(); + } + tmp_majors.resize(std::reduce(tx_counts.begin(), tx_counts.end()), handle.get_stream()); + size_t offset{0}; + for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { + thrust::copy(handle.get_thrust_policy(), + edge_partition_sorted_unique_majors[j].begin(), + edge_partition_sorted_unique_majors[j].end(), + tmp_majors.begin() + offset); + offset += tx_counts[j]; + } + edge_partition_sorted_unique_majors.clear(); + + std::tie(tmp_majors, std::ignore) = + shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); + + thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); + tmp_majors.resize( + thrust::distance( + tmp_majors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), + handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + } else { + assert(edge_partition_sorted_unique_majors.size() == 1); + tmp_majors = std::move(edge_partition_sorted_unique_majors[0]); + } + } else { + assert(edge_partition_sorted_unique_majors.size() == 1); + tmp_majors = std::move(edge_partition_sorted_unique_majors[0]); } - edge_partition_sorted_unique_majors.clear(); - std::tie(tmp_majors, std::ignore) = - shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); - - thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); - tmp_majors.resize( - thrust::distance( - tmp_majors.begin(), - thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), - handle.get_stream()); - tmp_majors.shrink_to_fit(handle.get_stream()); if (i == 0) { sorted_unique_majors = std::move(tmp_majors); } else { @@ -359,71 +371,72 @@ std::tuple, std::vector, vertex_t> compu sorted_unique_majors.end(), tmp_majors.begin(), tmp_majors.end(), - merged_majors.begin()); + merged_majors.begin()); // merging two unique sets from different hash + // bins, so the merged set can't have duplicates sorted_unique_majors = std::move(merged_majors); - tmp_majors.resize(0, handle.get_stream()); - tmp_majors.shrink_to_fit(handle.get_stream()); - sorted_unique_majors.resize(thrust::distance(sorted_unique_majors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_majors.begin(), - sorted_unique_majors.end())), - handle.get_stream()); - sorted_unique_majors.shrink_to_fit(handle.get_stream()); } } - } else { // FIXME: why not apply binning here? - rmm::device_uvector majors(edgelist_edge_counts[0], handle.get_stream()); - thrust::copy(handle.get_thrust_policy(), - edgelist_majors[0], - edgelist_majors[0] + edgelist_edge_counts[0], - majors.begin()); - thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); - majors.resize( - thrust::distance(majors.begin(), - thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), - handle.get_stream()); - majors.shrink_to_fit(handle.get_stream()); - sorted_unique_majors = std::move(majors); } rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); - for (size_t i = 0; i < edgelist_minors.size(); - ++i) { // limit the maximum temporary memory usage to "size of local edge list majors & - // minors" / "# local edge partitions" (FXIME: we can further cut peak memory usage - // by applying binning here as well; fewer bins than the edge list major case) - rmm::device_uvector tmp_minors(edgelist_edge_counts[i], handle.get_stream()); - thrust::copy(handle.get_thrust_policy(), - edgelist_minors[i], - edgelist_minors[i] + edgelist_edge_counts[i], - tmp_minors.begin()); - thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); - tmp_minors.resize( - thrust::distance( - tmp_minors.begin(), - thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), - handle.get_stream()); - tmp_minors.shrink_to_fit(handle.get_stream()); - - if constexpr (multi_gpu) { - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - compute_gpu_id_from_ext_vertex_t gpu_id_func{ - comm_size, major_comm_size, minor_comm_size}; - std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( - major_comm, - tmp_minors.begin(), - tmp_minors.end(), - [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { - auto comm_rank = gpu_id_func(v); - return partition_manager::compute_major_comm_rank_from_global_comm_rank( - major_comm_size, minor_comm_size, comm_rank); - }, - handle.get_stream()); + { + auto edge_count_vectors = num_bins > 1 + ? std::make_optional>>( + edgelist_minors.size(), std::vector(num_bins)) + : std::nullopt; + if (edge_count_vectors) { + for (size_t i = 0; i < edgelist_minors.size(); ++i) { + rmm::device_uvector d_edge_counts(num_bins, handle.get_stream()); + thrust::fill( + handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0}); + thrust::for_each(handle.get_thrust_policy(), + edgelist_minors[i], + edgelist_minors[i] + edgelist_edge_counts[i], + [counts = raft::device_span( + d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + cuda::atomic_ref atomic_counter( + counts[hash_func(v) % num_bins]); + atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); + }); + raft::update_host((*edge_count_vectors)[i].data(), + d_edge_counts.data(), + d_edge_counts.size(), + handle.get_stream()); + } + handle.sync_stream(); + } + + for (size_t i = 0; i < num_bins; ++i) { + edge_t bin_size{0}; + if (edge_count_vectors) { + for (size_t j = 0; j < edgelist_minors.size(); ++j) { + bin_size += (*edge_count_vectors)[j][i]; + } + } else { + bin_size = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); + } + rmm::device_uvector tmp_minors(bin_size, handle.get_stream()); + edge_t offset{0}; + for (size_t j = 0; j < edgelist_minors.size(); ++j) { + if (num_bins > 1) { + thrust::copy_if(handle.get_thrust_policy(), + edgelist_minors[j], + edgelist_minors[j] + edgelist_edge_counts[j], + tmp_minors.begin() + offset, + [i] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + return (static_cast(hash_func(v) % num_bins) == i); + }); + offset += (*edge_count_vectors)[j][i]; + } else { + thrust::copy(handle.get_thrust_policy(), + edgelist_minors[j], + edgelist_minors[j] + edgelist_edge_counts[j], + tmp_minors.begin() + offset); + offset += edgelist_edge_counts[j]; + } + } thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); tmp_minors.resize( thrust::distance( @@ -431,30 +444,60 @@ std::tuple, std::vector, vertex_t> compu thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), handle.get_stream()); tmp_minors.shrink_to_fit(handle.get_stream()); - } - if (i == 0) { - sorted_unique_minors = std::move(tmp_minors); - } else { - rmm::device_uvector merged_minors(sorted_unique_minors.size() + tmp_minors.size(), - handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end(), - tmp_minors.begin(), - tmp_minors.end(), - merged_minors.begin()); - sorted_unique_minors = std::move(merged_minors); - tmp_minors.resize(0, handle.get_stream()); - tmp_minors.shrink_to_fit(handle.get_stream()); - sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end())), - handle.get_stream()); - sorted_unique_minors.shrink_to_fit(handle.get_stream()); + if constexpr (multi_gpu) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_size = minor_comm.get_size(); + + compute_gpu_id_from_ext_vertex_t gpu_id_func{ + comm_size, major_comm_size, minor_comm_size}; + std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( + major_comm, + tmp_minors.begin(), + tmp_minors.end(), + [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { + auto comm_rank = gpu_id_func(v); + return partition_manager::compute_major_comm_rank_from_global_comm_rank( + major_comm_size, minor_comm_size, comm_rank); + }, + handle.get_stream()); + thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); + tmp_minors.resize( + thrust::distance( + tmp_minors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), + handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + } + + if (i == 0) { + sorted_unique_minors = std::move(tmp_minors); + } else { + rmm::device_uvector merged_minors( + sorted_unique_minors.size() + tmp_minors.size(), handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end(), + tmp_minors.begin(), + tmp_minors.end(), + merged_minors.begin()); + sorted_unique_minors = std::move(merged_minors); + tmp_minors.resize(0, handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), + thrust::unique(handle.get_thrust_policy(), + sorted_unique_minors.begin(), + sorted_unique_minors.end())), + handle.get_stream()); + sorted_unique_minors.shrink_to_fit(handle.get_stream()); + } } } + sorted_local_vertices.resize(sorted_unique_majors.size() + sorted_unique_minors.size(), handle.get_stream()); thrust::merge(handle.get_thrust_policy(), @@ -492,14 +535,12 @@ std::tuple, std::vector, vertex_t> compu // 3. compute global degrees for the sorted local vertices rmm::device_uvector sorted_local_vertex_degrees(0, handle.get_stream()); - std::optional> stream_pool_indices{ - std::nullopt}; // FIXME: move this inside the if statement auto constexpr num_chunks = size_t{ - 2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more binary - // searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and temporary - // buffer requirement (cut by num_chunks times), currently set to 2 to avoid peak memory - // usage happening in this part (especially when minor_comm_size is small) + 2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more + // binary searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and + // temporary buffer requirement (cut by num_chunks times), currently set to 2 to avoid + // peak memory usage happening in this part (especially when minor_comm_size is small) if constexpr (multi_gpu) { auto& comm = handle.get_comms(); @@ -512,6 +553,8 @@ std::tuple, std::vector, vertex_t> compu auto edge_partition_major_range_sizes = host_scalar_allgather(minor_comm, sorted_local_vertices.size(), handle.get_stream()); + std::optional> stream_pool_indices{std::nullopt}; + if ((minor_comm_size >= 2) && (handle.get_stream_pool_size() >= 2)) { edge_t num_local_edges = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); @@ -664,7 +707,7 @@ std::tuple, std::vector, vertex_t> compu } } - // 4. sort local vertices by degree (descending) + // 5. sort local vertices by degree (descending) thrust::sort_by_key(handle.get_thrust_policy(), sorted_local_vertex_degrees.begin(), @@ -672,7 +715,7 @@ std::tuple, std::vector, vertex_t> compu sorted_local_vertices.begin(), thrust::greater()); - // 5. compute segment_offsets + // 6. compute segment_offsets static_assert(detail::num_sparse_segments_per_vertex_partition == 3); static_assert((detail::low_degree_threshold <= detail::mid_degree_threshold) &&