diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 27bf9c4fcdf..e4958e65d94 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -262,166 +262,113 @@ std::tuple, std::vector, vertex_t> compu 1; // shouldn't be 0 (in that case this hash function will coincide with the hash function // used to map vertices to GPUs, and we may not see the expected randomization) - rmm::device_uvector sorted_unique_majors(0, handle.get_stream()); - { - auto edge_count_vectors = num_bins > 1 - ? std::make_optional>>( - edgelist_majors.size(), std::vector(num_bins)) - : std::nullopt; - if (edge_count_vectors) { - for (size_t i = 0; i < edgelist_majors.size(); ++i) { - rmm::device_uvector d_edge_counts(num_bins, handle.get_stream()); - thrust::fill( - handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0}); - thrust::for_each(handle.get_thrust_policy(), - edgelist_majors[i], - edgelist_majors[i] + edgelist_edge_counts[i], - [counts = raft::device_span( - d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) { - cuco::detail::MurmurHash3_32 hash_func{hash_seed}; - cuda::atomic_ref atomic_counter( - counts[hash_func(v) % num_bins]); - atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); - }); - raft::update_host((*edge_count_vectors)[i].data(), - d_edge_counts.data(), - d_edge_counts.size(), - handle.get_stream()); - } - handle.sync_stream(); + auto edge_major_count_vectors = num_bins > 1 + ? std::make_optional>>( + edgelist_majors.size(), std::vector(num_bins)) + : std::nullopt; + if (edge_major_count_vectors) { + for (size_t i = 0; i < edgelist_majors.size(); ++i) { + rmm::device_uvector d_edge_major_counts(num_bins, handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), + d_edge_major_counts.begin(), + d_edge_major_counts.end(), + edge_t{0}); + thrust::for_each( + handle.get_thrust_policy(), + edgelist_majors[i], + edgelist_majors[i] + edgelist_edge_counts[i], + [counts = raft::device_span(d_edge_major_counts.data(), + d_edge_major_counts.size())] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + cuda::atomic_ref atomic_counter( + counts[hash_func(v) % num_bins]); + atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); + }); + raft::update_host((*edge_major_count_vectors)[i].data(), + d_edge_major_counts.data(), + d_edge_major_counts.size(), + handle.get_stream()); + } + } + + auto edge_minor_count_vectors = num_bins > 1 + ? std::make_optional>>( + edgelist_minors.size(), std::vector(num_bins)) + : std::nullopt; + if (edge_minor_count_vectors) { + for (size_t i = 0; i < edgelist_minors.size(); ++i) { + rmm::device_uvector d_edge_minor_counts(num_bins, handle.get_stream()); + thrust::fill(handle.get_thrust_policy(), + d_edge_minor_counts.begin(), + d_edge_minor_counts.end(), + edge_t{0}); + thrust::for_each( + handle.get_thrust_policy(), + edgelist_minors[i], + edgelist_minors[i] + edgelist_edge_counts[i], + [counts = raft::device_span(d_edge_minor_counts.data(), + d_edge_minor_counts.size())] __device__(auto v) { + cuco::detail::MurmurHash3_32 hash_func{hash_seed}; + cuda::atomic_ref atomic_counter( + counts[hash_func(v) % num_bins]); + atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); + }); + raft::update_host((*edge_minor_count_vectors)[i].data(), + d_edge_minor_counts.data(), + d_edge_minor_counts.size(), + handle.get_stream()); } + } - for (size_t i = 0; i < num_bins; ++i) { - std::vector> - edge_partition_sorted_unique_majors{}; // for bin "i" - edge_partition_sorted_unique_majors.reserve(edgelist_majors.size()); + handle.sync_stream(); + + for (size_t i = 0; i < num_bins; ++i) { + std::vector> edge_partition_tmp_majors{}; // for bin "i" + { + edge_partition_tmp_majors.reserve(edgelist_majors.size()); for (size_t j = 0; j < edgelist_majors.size(); ++j) { - rmm::device_uvector majors(0, handle.get_stream()); + rmm::device_uvector tmp_majors(0, handle.get_stream()); if (num_bins > 1) { - majors.resize((*edge_count_vectors)[j][i], handle.get_stream()); + tmp_majors.resize((*edge_major_count_vectors)[j][i], handle.get_stream()); thrust::copy_if(handle.get_thrust_policy(), edgelist_majors[j], edgelist_majors[j] + edgelist_edge_counts[j], - majors.begin(), + tmp_majors.begin(), [i] __device__(auto v) { cuco::detail::MurmurHash3_32 hash_func{hash_seed}; return (static_cast(hash_func(v) % num_bins) == i); }); } else { - majors.resize(edgelist_edge_counts[j], handle.get_stream()); + tmp_majors.resize(edgelist_edge_counts[j], handle.get_stream()); thrust::copy(handle.get_thrust_policy(), edgelist_majors[j], edgelist_majors[j] + edgelist_edge_counts[j], - majors.begin()); + tmp_majors.begin()); } - thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end()); - majors.resize(thrust::distance( - majors.begin(), - thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())), - handle.get_stream()); - majors.shrink_to_fit(handle.get_stream()); - - edge_partition_sorted_unique_majors.push_back(std::move(majors)); - } - - rmm::device_uvector tmp_majors(0, handle.get_stream()); - if constexpr (multi_gpu) { - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - assert(static_cast(minor_comm_size) == - edge_partition_sorted_unique_majors.size()); - - if (minor_comm_size > 1) { - std::vector tx_counts(minor_comm_size); - for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { - tx_counts[j] = edge_partition_sorted_unique_majors[j].size(); - } - tmp_majors.resize(std::reduce(tx_counts.begin(), tx_counts.end()), handle.get_stream()); - size_t offset{0}; - for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) { - thrust::copy(handle.get_thrust_policy(), - edge_partition_sorted_unique_majors[j].begin(), - edge_partition_sorted_unique_majors[j].end(), - tmp_majors.begin() + offset); - offset += tx_counts[j]; - } - edge_partition_sorted_unique_majors.clear(); - - std::tie(tmp_majors, std::ignore) = - shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream()); - - thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); - tmp_majors.resize( - thrust::distance( - tmp_majors.begin(), - thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), - handle.get_stream()); - tmp_majors.shrink_to_fit(handle.get_stream()); - } else { - assert(edge_partition_sorted_unique_majors.size() == 1); - tmp_majors = std::move(edge_partition_sorted_unique_majors[0]); - } - } else { - assert(edge_partition_sorted_unique_majors.size() == 1); - tmp_majors = std::move(edge_partition_sorted_unique_majors[0]); - } - - if (i == 0) { - sorted_unique_majors = std::move(tmp_majors); - } else { - rmm::device_uvector merged_majors( - sorted_unique_majors.size() + tmp_majors.size(), handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), - sorted_unique_majors.begin(), - sorted_unique_majors.end(), - tmp_majors.begin(), - tmp_majors.end(), - merged_majors.begin()); // merging two unique sets from different hash - // bins, so the merged set can't have duplicates - sorted_unique_majors = std::move(merged_majors); - } - } - } + thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); + tmp_majors.resize( + thrust::distance( + tmp_majors.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())), + handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); - rmm::device_uvector sorted_unique_minors(0, handle.get_stream()); - { - auto edge_count_vectors = num_bins > 1 - ? std::make_optional>>( - edgelist_minors.size(), std::vector(num_bins)) - : std::nullopt; - if (edge_count_vectors) { - for (size_t i = 0; i < edgelist_minors.size(); ++i) { - rmm::device_uvector d_edge_counts(num_bins, handle.get_stream()); - thrust::fill( - handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0}); - thrust::for_each(handle.get_thrust_policy(), - edgelist_minors[i], - edgelist_minors[i] + edgelist_edge_counts[i], - [counts = raft::device_span( - d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) { - cuco::detail::MurmurHash3_32 hash_func{hash_seed}; - cuda::atomic_ref atomic_counter( - counts[hash_func(v) % num_bins]); - atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed); - }); - raft::update_host((*edge_count_vectors)[i].data(), - d_edge_counts.data(), - d_edge_counts.size(), - handle.get_stream()); + edge_partition_tmp_majors.push_back(std::move(tmp_majors)); } - handle.sync_stream(); } - for (size_t i = 0; i < num_bins; ++i) { + rmm::device_uvector tmp_minors(0, handle.get_stream()); + { edge_t bin_size{0}; - if (edge_count_vectors) { + if (edge_minor_count_vectors) { for (size_t j = 0; j < edgelist_minors.size(); ++j) { - bin_size += (*edge_count_vectors)[j][i]; + bin_size += (*edge_minor_count_vectors)[j][i]; } } else { bin_size = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end()); } - rmm::device_uvector tmp_minors(bin_size, handle.get_stream()); + tmp_minors.resize(bin_size, handle.get_stream()); + edge_t offset{0}; for (size_t j = 0; j < edgelist_minors.size(); ++j) { if (num_bins > 1) { @@ -433,7 +380,7 @@ std::tuple, std::vector, vertex_t> compu cuco::detail::MurmurHash3_32 hash_func{hash_seed}; return (static_cast(hash_func(v) % num_bins) == i); }); - offset += (*edge_count_vectors)[j][i]; + offset += (*edge_minor_count_vectors)[j][i]; } else { thrust::copy(handle.get_thrust_policy(), edgelist_minors[j], @@ -449,78 +396,126 @@ std::tuple, std::vector, vertex_t> compu thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), handle.get_stream()); tmp_minors.shrink_to_fit(handle.get_stream()); + } - if constexpr (multi_gpu) { - auto& comm = handle.get_comms(); - auto const comm_size = comm.get_size(); - auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); - auto const major_comm_size = major_comm.get_size(); - auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); - auto const minor_comm_size = minor_comm.get_size(); - - compute_gpu_id_from_ext_vertex_t gpu_id_func{ - comm_size, major_comm_size, minor_comm_size}; - std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values( - major_comm, - tmp_minors.begin(), - tmp_minors.end(), - [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { - auto comm_rank = gpu_id_func(v); - return partition_manager::compute_major_comm_rank_from_global_comm_rank( - major_comm_size, minor_comm_size, comm_rank); - }, - handle.get_stream()); - thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end()); - tmp_minors.resize( - thrust::distance( - tmp_minors.begin(), - thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())), - handle.get_stream()); - tmp_minors.shrink_to_fit(handle.get_stream()); + rmm::device_uvector tmp_vertices(0, handle.get_stream()); + if (multi_gpu && (handle.get_comms().get_size() > 1)) { + auto& comm = handle.get_comms(); + auto const comm_size = comm.get_size(); + auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); + auto const major_comm_rank = major_comm.get_rank(); + auto const major_comm_size = major_comm.get_size(); + auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); + auto const minor_comm_rank = minor_comm.get_rank(); + auto const minor_comm_size = minor_comm.get_size(); + + compute_gpu_id_from_ext_vertex_t gpu_id_func{ + comm_size, major_comm_size, minor_comm_size}; + auto d_minor_counts = groupby_and_count( + tmp_minors.begin(), + tmp_minors.end(), + [major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) { + return partition_manager::compute_major_comm_rank_from_global_comm_rank( + major_comm_size, minor_comm_size, gpu_id_func(v)); + }, + major_comm_size, + std::numeric_limits::max(), + handle.get_stream()); + std::vector h_minor_counts(d_minor_counts.size()); + raft::update_host( + h_minor_counts.data(), d_minor_counts.data(), d_minor_counts.size(), handle.get_stream()); + handle.sync_stream(); + std::vector h_minor_displacements(h_minor_counts.size()); + std::exclusive_scan( + h_minor_counts.begin(), h_minor_counts.end(), h_minor_displacements.begin(), size_t{0}); + + std::vector tx_counts(comm_size, 0); + for (size_t j = 0; j < edge_partition_tmp_majors.size(); ++j) { + auto idx = partition_manager::compute_global_comm_rank_from_graph_subcomm_ranks( + major_comm_size, minor_comm_size, major_comm_rank, j); + tx_counts[idx] = edge_partition_tmp_majors[j].size(); } - - if (i == 0) { - sorted_unique_minors = std::move(tmp_minors); - } else { - rmm::device_uvector merged_minors( - sorted_unique_minors.size() + tmp_minors.size(), handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end(), - tmp_minors.begin(), - tmp_minors.end(), - merged_minors.begin()); - sorted_unique_minors = std::move(merged_minors); - tmp_minors.resize(0, handle.get_stream()); - tmp_minors.shrink_to_fit(handle.get_stream()); - sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_unique_minors.begin(), - sorted_unique_minors.end())), - handle.get_stream()); - sorted_unique_minors.shrink_to_fit(handle.get_stream()); + for (size_t j = 0; j < h_minor_counts.size(); ++j) { + auto idx = partition_manager::compute_global_comm_rank_from_graph_subcomm_ranks( + major_comm_size, minor_comm_size, j, minor_comm_rank); + tx_counts[idx] += h_minor_counts[j]; } + std::vector tx_displacements(comm_size); + std::exclusive_scan( + tx_counts.begin(), tx_counts.end(), tx_displacements.begin(), size_t{0}); + tmp_vertices.resize(tx_displacements.back() + tx_counts.back(), handle.get_stream()); + for (size_t j = 0; j < edge_partition_tmp_majors.size(); ++j) { + auto idx = partition_manager::compute_global_comm_rank_from_graph_subcomm_ranks( + major_comm_size, minor_comm_size, major_comm_rank, j); + thrust::copy(handle.get_thrust_policy(), + edge_partition_tmp_majors[j].begin(), + edge_partition_tmp_majors[j].end(), + tmp_vertices.begin() + tx_displacements[idx]); + edge_partition_tmp_majors[j].resize(0, handle.get_stream()); + edge_partition_tmp_majors[j].shrink_to_fit(handle.get_stream()); + } + for (size_t j = 0; j < h_minor_counts.size(); ++j) { + auto idx = partition_manager::compute_global_comm_rank_from_graph_subcomm_ranks( + major_comm_size, minor_comm_size, j, minor_comm_rank); + thrust::copy( + handle.get_thrust_policy(), + tmp_minors.begin() + h_minor_displacements[j], + tmp_minors.begin() + (h_minor_displacements[j] + h_minor_counts[j]), + tmp_vertices.begin() + tx_displacements[idx] + (tx_counts[idx] - h_minor_counts[j])); + } + tmp_minors.resize(0, handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + + // single shuffle_values() on comm instead of one shuffle_values() on minor_comm & one + // shuffle_values() on majro_comm (to cut NCCL P2P buffer size) + std::tie(tmp_vertices, std::ignore) = + shuffle_values(comm, tmp_vertices.begin(), tx_counts, handle.get_stream()); + thrust::sort(handle.get_thrust_policy(), tmp_vertices.begin(), tmp_vertices.end()); + tmp_vertices.resize( + thrust::distance( + tmp_vertices.begin(), + thrust::unique(handle.get_thrust_policy(), tmp_vertices.begin(), tmp_vertices.end())), + handle.get_stream()); + tmp_vertices.shrink_to_fit(handle.get_stream()); + } else { + assert(edge_partition_sorted_unique_majors.size() == 1); + auto& tmp_majors = edge_partition_tmp_majors[0]; + rmm::device_uvector merged_vertices(tmp_majors.size() + tmp_minors.size(), + handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + tmp_majors.begin(), + tmp_majors.end(), + tmp_minors.begin(), + tmp_minors.end(), + merged_vertices.begin()); + tmp_majors.resize(0, handle.get_stream()); + tmp_majors.shrink_to_fit(handle.get_stream()); + tmp_minors.resize(0, handle.get_stream()); + tmp_minors.shrink_to_fit(handle.get_stream()); + merged_vertices.resize(thrust::distance(merged_vertices.begin(), + thrust::unique(handle.get_thrust_policy(), + merged_vertices.begin(), + merged_vertices.end())), + handle.get_stream()); + merged_vertices.shrink_to_fit(handle.get_stream()); + tmp_vertices = std::move(merged_vertices); } - } - sorted_local_vertices.resize(sorted_unique_majors.size() + sorted_unique_minors.size(), - handle.get_stream()); - thrust::merge(handle.get_thrust_policy(), - sorted_unique_majors.begin(), - sorted_unique_majors.end(), - sorted_unique_minors.begin(), - sorted_unique_minors.end(), - sorted_local_vertices.begin()); - sorted_unique_majors.resize(0, handle.get_stream()); - sorted_unique_majors.shrink_to_fit(handle.get_stream()); - sorted_unique_minors.resize(0, handle.get_stream()); - sorted_unique_minors.shrink_to_fit(handle.get_stream()); - sorted_local_vertices.resize(thrust::distance(sorted_local_vertices.begin(), - thrust::unique(handle.get_thrust_policy(), - sorted_local_vertices.begin(), - sorted_local_vertices.end())), - handle.get_stream()); - sorted_local_vertices.shrink_to_fit(handle.get_stream()); + if (sorted_local_vertices.size() == 0) { + sorted_local_vertices = std::move(tmp_vertices); + } else { + rmm::device_uvector merged_vertices( + sorted_local_vertices.size() + tmp_vertices.size(), handle.get_stream()); + thrust::merge(handle.get_thrust_policy(), + sorted_local_vertices.begin(), + sorted_local_vertices.end(), + tmp_vertices.begin(), + tmp_vertices.end(), + merged_vertices.begin()); // merging two unique sets from different hash + // bins, so the merged set can't have duplicates + sorted_local_vertices = std::move(merged_vertices); + } + } } else { sorted_local_vertices = std::move(*local_vertices); thrust::sort(