Skip to content

Commit

Permalink
update renumbering to use binning in more places
Browse files Browse the repository at this point in the history
  • Loading branch information
seunghwak committed Sep 3, 2024
1 parent ce4ea93 commit 9445027
Showing 1 changed file with 168 additions and 125 deletions.
293 changes: 168 additions & 125 deletions cpp/src/structure/renumber_edgelist_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,16 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu

rmm::device_uvector<vertex_t> sorted_local_vertices(0, handle.get_stream());
if (!local_vertices) {
rmm::device_uvector<vertex_t> sorted_unique_majors(0, handle.get_stream());
if (edgelist_majors.size() > 1) {
constexpr size_t num_bins{
8}; // increase the number of bins to cut peak memory usage (at the expense of additional
// computing), limit the maximum temporary memory usage to "size of local edge list
// majors & minors" / "# bins".
constexpr uint32_t hash_seed =
1; // shouldn't be 0 (in that case this hash function will coincide with the hash function
// used to map vertices to GPUs, and we may not see the expected randomization)

assert(multi_gpu);
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_size = minor_comm.get_size();

assert(static_cast<size_t>(minor_comm_size) == edgelist_majors.size());
constexpr size_t num_bins{
8}; // increase the number of bins to cut peak memory usage (at the expense of additional
// computing), limit the maximum temporary memory usage to "size of local edge list
// majors|minors * 2 / # bins"
constexpr uint32_t hash_seed =
1; // shouldn't be 0 (in that case this hash function will coincide with the hash function
// used to map vertices to GPUs, and we may not see the expected randomization)

rmm::device_uvector<vertex_t> sorted_unique_majors(0, handle.get_stream());
{
auto edge_count_vectors = num_bins > 1
? std::make_optional<std::vector<std::vector<edge_t>>>(
edgelist_majors.size(), std::vector<edge_t>(num_bins))
Expand Down Expand Up @@ -296,7 +290,6 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
std::vector<rmm::device_uvector<vertex_t>>
edge_partition_sorted_unique_majors{}; // for bin "i"
edge_partition_sorted_unique_majors.reserve(edgelist_majors.size());
std::vector<size_t> tx_counts(minor_comm_size);
for (size_t j = 0; j < edgelist_majors.size(); ++j) {
rmm::device_uvector<vertex_t> majors(0, handle.get_stream());
if (num_bins > 1) {
Expand All @@ -323,32 +316,51 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
handle.get_stream());
majors.shrink_to_fit(handle.get_stream());

tx_counts[j] = majors.size();
edge_partition_sorted_unique_majors.push_back(std::move(majors));
}

rmm::device_uvector<vertex_t> tmp_majors(std::reduce(tx_counts.begin(), tx_counts.end()),
handle.get_stream());
size_t offset{0};
for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) {
thrust::copy(handle.get_thrust_policy(),
edge_partition_sorted_unique_majors[j].begin(),
edge_partition_sorted_unique_majors[j].end(),
tmp_majors.begin() + offset);
offset += edge_partition_sorted_unique_majors[j].size();
rmm::device_uvector<vertex_t> tmp_majors(0, handle.get_stream());
if constexpr (multi_gpu) {
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_size = minor_comm.get_size();
assert(static_cast<size_t>(minor_comm_size) ==
edge_partition_sorted_unique_majors.size());

if (minor_comm_size > 1) {
std::vector<size_t> tx_counts(minor_comm_size);
for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) {
tx_counts[j] = edge_partition_sorted_unique_majors[j].size();
}
tmp_majors.resize(std::reduce(tx_counts.begin(), tx_counts.end()), handle.get_stream());
size_t offset{0};
for (size_t j = 0; j < edge_partition_sorted_unique_majors.size(); ++j) {
thrust::copy(handle.get_thrust_policy(),
edge_partition_sorted_unique_majors[j].begin(),
edge_partition_sorted_unique_majors[j].end(),
tmp_majors.begin() + offset);
offset += tx_counts[j];
}
edge_partition_sorted_unique_majors.clear();

std::tie(tmp_majors, std::ignore) =
shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream());

thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end());
tmp_majors.resize(
thrust::distance(
tmp_majors.begin(),
thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())),
handle.get_stream());
tmp_majors.shrink_to_fit(handle.get_stream());
} else {
assert(edge_partition_sorted_unique_majors.size() == 1);
tmp_majors = std::move(edge_partition_sorted_unique_majors[0]);
}
} else {
assert(edge_partition_sorted_unique_majors.size() == 1);
tmp_majors = std::move(edge_partition_sorted_unique_majors[0]);
}
edge_partition_sorted_unique_majors.clear();

std::tie(tmp_majors, std::ignore) =
shuffle_values(minor_comm, tmp_majors.begin(), tx_counts, handle.get_stream());

thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end());
tmp_majors.resize(
thrust::distance(
tmp_majors.begin(),
thrust::unique(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end())),
handle.get_stream());
tmp_majors.shrink_to_fit(handle.get_stream());
if (i == 0) {
sorted_unique_majors = std::move(tmp_majors);
} else {
Expand All @@ -359,102 +371,133 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
sorted_unique_majors.end(),
tmp_majors.begin(),
tmp_majors.end(),
merged_majors.begin());
merged_majors.begin()); // merging two unique sets from different hash
// bins, so the merged set can't have duplicates
sorted_unique_majors = std::move(merged_majors);
tmp_majors.resize(0, handle.get_stream());
tmp_majors.shrink_to_fit(handle.get_stream());
sorted_unique_majors.resize(thrust::distance(sorted_unique_majors.begin(),
thrust::unique(handle.get_thrust_policy(),
sorted_unique_majors.begin(),
sorted_unique_majors.end())),
handle.get_stream());
sorted_unique_majors.shrink_to_fit(handle.get_stream());
}
}
} else { // FIXME: why not apply binning here?
rmm::device_uvector<vertex_t> majors(edgelist_edge_counts[0], handle.get_stream());
thrust::copy(handle.get_thrust_policy(),
edgelist_majors[0],
edgelist_majors[0] + edgelist_edge_counts[0],
majors.begin());
thrust::sort(handle.get_thrust_policy(), majors.begin(), majors.end());
majors.resize(
thrust::distance(majors.begin(),
thrust::unique(handle.get_thrust_policy(), majors.begin(), majors.end())),
handle.get_stream());
majors.shrink_to_fit(handle.get_stream());
sorted_unique_majors = std::move(majors);
}

rmm::device_uvector<vertex_t> sorted_unique_minors(0, handle.get_stream());
for (size_t i = 0; i < edgelist_minors.size();
++i) { // limit the maximum temporary memory usage to "size of local edge list majors &
// minors" / "# local edge partitions" (FXIME: we can further cut peak memory usage
// by applying binning here as well; fewer bins than the edge list major case)
rmm::device_uvector<vertex_t> tmp_minors(edgelist_edge_counts[i], handle.get_stream());
thrust::copy(handle.get_thrust_policy(),
edgelist_minors[i],
edgelist_minors[i] + edgelist_edge_counts[i],
tmp_minors.begin());
thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end());
tmp_minors.resize(
thrust::distance(
tmp_minors.begin(),
thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())),
handle.get_stream());
tmp_minors.shrink_to_fit(handle.get_stream());

if constexpr (multi_gpu) {
auto& comm = handle.get_comms();
auto const comm_size = comm.get_size();
auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name());
auto const major_comm_size = major_comm.get_size();
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_size = minor_comm.get_size();

compute_gpu_id_from_ext_vertex_t<vertex_t> gpu_id_func{
comm_size, major_comm_size, minor_comm_size};
std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values(
major_comm,
tmp_minors.begin(),
tmp_minors.end(),
[major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) {
auto comm_rank = gpu_id_func(v);
return partition_manager::compute_major_comm_rank_from_global_comm_rank(
major_comm_size, minor_comm_size, comm_rank);
},
handle.get_stream());
{
auto edge_count_vectors = num_bins > 1
? std::make_optional<std::vector<std::vector<edge_t>>>(
edgelist_minors.size(), std::vector<edge_t>(num_bins))
: std::nullopt;
if (edge_count_vectors) {
for (size_t i = 0; i < edgelist_minors.size(); ++i) {
rmm::device_uvector<edge_t> d_edge_counts(num_bins, handle.get_stream());
thrust::fill(
handle.get_thrust_policy(), d_edge_counts.begin(), d_edge_counts.end(), edge_t{0});
thrust::for_each(handle.get_thrust_policy(),
edgelist_minors[i],
edgelist_minors[i] + edgelist_edge_counts[i],
[counts = raft::device_span<edge_t>(
d_edge_counts.data(), d_edge_counts.size())] __device__(auto v) {
cuco::detail::MurmurHash3_32<vertex_t> hash_func{hash_seed};
cuda::atomic_ref<edge_t, cuda::thread_scope_device> atomic_counter(
counts[hash_func(v) % num_bins]);
atomic_counter.fetch_add(edge_t{1}, cuda::std::memory_order_relaxed);
});
raft::update_host((*edge_count_vectors)[i].data(),
d_edge_counts.data(),
d_edge_counts.size(),
handle.get_stream());
}
handle.sync_stream();
}

for (size_t i = 0; i < num_bins; ++i) {
edge_t bin_size{0};
if (edge_count_vectors) {
for (size_t j = 0; j < edgelist_minors.size(); ++j) {
bin_size += (*edge_count_vectors)[j][i];
}
} else {
bin_size = std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end());
}
rmm::device_uvector<vertex_t> tmp_minors(bin_size, handle.get_stream());
edge_t offset{0};
for (size_t j = 0; j < edgelist_minors.size(); ++j) {
if (num_bins > 1) {
thrust::copy_if(handle.get_thrust_policy(),
edgelist_minors[j],
edgelist_minors[j] + edgelist_edge_counts[j],
tmp_minors.begin() + offset,
[i] __device__(auto v) {
cuco::detail::MurmurHash3_32<vertex_t> hash_func{hash_seed};
return (static_cast<size_t>(hash_func(v) % num_bins) == i);
});
offset += (*edge_count_vectors)[j][i];
} else {
thrust::copy(handle.get_thrust_policy(),
edgelist_minors[j],
edgelist_minors[j] + edgelist_edge_counts[j],
tmp_minors.begin() + offset);
offset += edgelist_edge_counts[j];
}
}
thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end());
tmp_minors.resize(
thrust::distance(
tmp_minors.begin(),
thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())),
handle.get_stream());
tmp_minors.shrink_to_fit(handle.get_stream());
}

if (i == 0) {
sorted_unique_minors = std::move(tmp_minors);
} else {
rmm::device_uvector<vertex_t> merged_minors(sorted_unique_minors.size() + tmp_minors.size(),
handle.get_stream());
thrust::merge(handle.get_thrust_policy(),
sorted_unique_minors.begin(),
sorted_unique_minors.end(),
tmp_minors.begin(),
tmp_minors.end(),
merged_minors.begin());
sorted_unique_minors = std::move(merged_minors);
tmp_minors.resize(0, handle.get_stream());
tmp_minors.shrink_to_fit(handle.get_stream());
sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(),
thrust::unique(handle.get_thrust_policy(),
sorted_unique_minors.begin(),
sorted_unique_minors.end())),
handle.get_stream());
sorted_unique_minors.shrink_to_fit(handle.get_stream());
if constexpr (multi_gpu) {
auto& comm = handle.get_comms();
auto const comm_size = comm.get_size();
auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name());
auto const major_comm_size = major_comm.get_size();
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_size = minor_comm.get_size();

compute_gpu_id_from_ext_vertex_t<vertex_t> gpu_id_func{
comm_size, major_comm_size, minor_comm_size};
std::tie(tmp_minors, std::ignore) = groupby_gpu_id_and_shuffle_values(
major_comm,
tmp_minors.begin(),
tmp_minors.end(),
[major_comm_size, minor_comm_size, gpu_id_func] __device__(auto v) {
auto comm_rank = gpu_id_func(v);
return partition_manager::compute_major_comm_rank_from_global_comm_rank(
major_comm_size, minor_comm_size, comm_rank);
},
handle.get_stream());
thrust::sort(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end());
tmp_minors.resize(
thrust::distance(
tmp_minors.begin(),
thrust::unique(handle.get_thrust_policy(), tmp_minors.begin(), tmp_minors.end())),
handle.get_stream());
tmp_minors.shrink_to_fit(handle.get_stream());
}

if (i == 0) {
sorted_unique_minors = std::move(tmp_minors);
} else {
rmm::device_uvector<vertex_t> merged_minors(
sorted_unique_minors.size() + tmp_minors.size(), handle.get_stream());
thrust::merge(handle.get_thrust_policy(),
sorted_unique_minors.begin(),
sorted_unique_minors.end(),
tmp_minors.begin(),
tmp_minors.end(),
merged_minors.begin());
sorted_unique_minors = std::move(merged_minors);
tmp_minors.resize(0, handle.get_stream());
tmp_minors.shrink_to_fit(handle.get_stream());
sorted_unique_minors.resize(thrust::distance(sorted_unique_minors.begin(),
thrust::unique(handle.get_thrust_policy(),
sorted_unique_minors.begin(),
sorted_unique_minors.end())),
handle.get_stream());
sorted_unique_minors.shrink_to_fit(handle.get_stream());
}
}
}

sorted_local_vertices.resize(sorted_unique_majors.size() + sorted_unique_minors.size(),
handle.get_stream());
thrust::merge(handle.get_thrust_policy(),
Expand Down Expand Up @@ -492,14 +535,12 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
// 3. compute global degrees for the sorted local vertices

rmm::device_uvector<edge_t> sorted_local_vertex_degrees(0, handle.get_stream());
std::optional<std::vector<size_t>> stream_pool_indices{
std::nullopt}; // FIXME: move this inside the if statement

auto constexpr num_chunks = size_t{
2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more binary
// searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and temporary
// buffer requirement (cut by num_chunks times), currently set to 2 to avoid peak memory
// usage happening in this part (especially when minor_comm_size is small)
2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more
// binary searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and
// temporary buffer requirement (cut by num_chunks times), currently set to 2 to avoid
// peak memory usage happening in this part (especially when minor_comm_size is small)

if constexpr (multi_gpu) {
auto& comm = handle.get_comms();
Expand All @@ -512,6 +553,8 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
auto edge_partition_major_range_sizes =
host_scalar_allgather(minor_comm, sorted_local_vertices.size(), handle.get_stream());

std::optional<std::vector<size_t>> stream_pool_indices{std::nullopt};

if ((minor_comm_size >= 2) && (handle.get_stream_pool_size() >= 2)) {
edge_t num_local_edges =
std::reduce(edgelist_edge_counts.begin(), edgelist_edge_counts.end());
Expand Down Expand Up @@ -664,15 +707,15 @@ std::tuple<rmm::device_uvector<vertex_t>, std::vector<vertex_t>, vertex_t> compu
}
}

// 4. sort local vertices by degree (descending)
// 5. sort local vertices by degree (descending)

thrust::sort_by_key(handle.get_thrust_policy(),
sorted_local_vertex_degrees.begin(),
sorted_local_vertex_degrees.end(),
sorted_local_vertices.begin(),
thrust::greater<edge_t>());

// 5. compute segment_offsets
// 6. compute segment_offsets

static_assert(detail::num_sparse_segments_per_vertex_partition == 3);
static_assert((detail::low_degree_threshold <= detail::mid_degree_threshold) &&
Expand Down

0 comments on commit 9445027

Please sign in to comment.