Skip to content

Commit

Permalink
Use device_allgatherv instead of P2P to distribute extra numbers
Browse files Browse the repository at this point in the history
  • Loading branch information
Naim committed Dec 8, 2023
1 parent 867093c commit d0b37dc
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions cpp/src/community/louvain_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <community/flatten_dendrogram.hpp>
#include <prims/update_edge_src_dst_property.cuh>
// FIXME: Only outstanding items preventing this becoming a .hpp file
#include <cugraph/detail/collect_comm_wrapper.hpp>
#include <cugraph/detail/shuffle_wrappers.hpp>
#include <cugraph/detail/utility_wrappers.hpp>
#include <cugraph/graph.hpp>
Expand Down Expand Up @@ -110,17 +111,37 @@ std::pair<std::unique_ptr<Dendrogram<vertex_t>>, weight_t> louvain(
random_numbers.end(),
random_cluster_assignments.begin());

// evenly distributed Shuffled/permuted numbers to other GPUs
// distribute shuffled/permuted numbers to other GPUs
auto& comm = handle.get_comms();
auto const comm_size = comm.get_size();
auto const comm_rank = comm.get_rank();
std::vector<size_t> tx_value_counts(comm_size, 0);

for (int i = 0; i < comm_size; i++) {
tx_value_counts[i] = random_cluster_assignments.size() / comm_size + i <
(random_cluster_assignments.size() % comm_size)
? 1
: 0;
std::vector<size_t> tx_value_counts(comm_size);
std::fill(tx_value_counts.begin(),
tx_value_counts.end(),
random_cluster_assignments.size() / comm_size);

std::vector<vertex_t> h_random_numbers;
{
rmm::device_uvector<vertex_t> d_random_numbers(
random_cluster_assignments.size() % comm_size, handle.get_stream());
cugraph::detail::uniform_random_fill(handle.get_stream(),
d_random_numbers.data(),
d_random_numbers.size(),
vertex_t{0},
vertex_t{comm_size},
*rng_state);

h_random_numbers.resize(d_random_numbers.size());

raft::update_host(h_random_numbers.data(),
d_random_numbers.data(),
d_random_numbers.size(),
handle.get_stream());
}

for (int i = 0; i < static_cast<int>(random_cluster_assignments.size() % comm_size); i++) {
tx_value_counts[h_random_numbers[i]]++;
}

std::tie(random_cluster_assignments, std::ignore) =
Expand All @@ -129,7 +150,7 @@ std::pair<std::unique_ptr<Dendrogram<vertex_t>>, weight_t> louvain(
tx_value_counts,
handle.get_stream());

// shuffle/permute locally
// shuffle/permute locally again
random_numbers.resize(random_cluster_assignments.size(), handle.get_stream());

cugraph::detail::uniform_random_fill(handle.get_stream(),
Expand All @@ -145,34 +166,33 @@ std::pair<std::unique_ptr<Dendrogram<vertex_t>>, weight_t> louvain(

// find out how many elements current GPU needs to send to other GPUs

std::fill(tx_value_counts.begin(), tx_value_counts.end(), 0);
auto sample_buffer_sizes = cugraph::host_scalar_allgather(
handle.get_comms(), random_cluster_assignments.size(), handle.get_stream());

auto expected_sample_buffer_sizes =
cugraph::host_scalar_allgather(handle.get_comms(),
current_graph_view.local_vertex_partition_range_size(),
handle.get_stream());

std::vector<size_t> nr_smaples_per_GPU(comm_size, 0);
for (int i = 0; i < comm_size; i++) {
size_t nr_samples_ith_gpu = sample_buffer_sizes[i];
for (int j = 0; nr_samples_ith_gpu > 0 && j < comm_size; j++) {
if (expected_sample_buffer_sizes[j] > static_cast<vertex_t>(nr_smaples_per_GPU[j])) {
size_t delta = std::min(nr_samples_ith_gpu,
expected_sample_buffer_sizes[j] - nr_smaples_per_GPU[j]);
if (comm_rank == i) { tx_value_counts[j] = delta; }
nr_smaples_per_GPU[j] += delta;
nr_samples_ith_gpu -= delta;
}
}
vertex_t nr_extras = static_cast<vertex_t>(random_cluster_assignments.size()) -
current_graph_view.local_vertex_partition_range_size();
vertex_t nr_deficits = -nr_extras;

auto extra_cluster_ids = cugraph::detail::device_allgatherv(
handle,
comm,
raft::device_span<vertex_t const>(
random_cluster_assignments.data() +
current_graph_view.local_vertex_partition_range_size(),
nr_extras > 0 ? nr_extras : 0));

random_cluster_assignments.resize(current_graph_view.local_vertex_partition_range_size(),
handle.get_stream());

auto deficits = cugraph::host_scalar_allgather(
handle.get_comms(), nr_deficits > 0 ? nr_deficits : 0, handle.get_stream());

std::exclusive_scan(deficits.begin(), deficits.end(), deficits.begin(), vertex_t{0});

if (nr_deficits > 0) {
raft::copy(random_cluster_assignments.data() +
current_graph_view.local_vertex_partition_range_size() - nr_deficits,
extra_cluster_ids.begin() + deficits[comm_rank],
nr_deficits,
handle.get_stream());
}
// shuffle as many vertices as local vertex partition size to each GPU.
std::tie(random_cluster_assignments, std::ignore) =
cugraph::shuffle_values(handle.get_comms(),
random_cluster_assignments.begin(),
tx_value_counts,
handle.get_stream());
}

assert(random_cluster_assignments.size() ==
Expand Down

0 comments on commit d0b37dc

Please sign in to comment.