Skip to content

Commit

Permalink
multi-stream execution
Browse files Browse the repository at this point in the history
  • Loading branch information
seunghwak committed Sep 9, 2024
1 parent 9fa4fb4 commit b6a1fb0
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 103 deletions.
232 changes: 131 additions & 101 deletions cpp/src/prims/fill_edge_src_dst_property.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ void fill_edge_minor_property(raft::handle_t const& handle,

if constexpr (GraphViewType::is_multi_gpu) {
auto& comm = handle.get_comms();
auto const comm_rank = comm.get_rank();
auto const comm_size = comm.get_size();
auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name());
auto const major_comm_rank = major_comm.get_rank();
auto const major_comm_size = major_comm.get_size();
Expand Down Expand Up @@ -362,128 +362,158 @@ void fill_edge_minor_property(raft::handle_t const& handle,
auto local_v_list_range_lasts =
host_scalar_allgather(major_comm, v_list_range[1], handle.get_stream());

auto num_concurrent_bcasts =
(static_cast<size_t>(graph_view.compute_number_of_edges(handle) / comm_size) *
sizeof(vertex_t)) /
std::min(
(std::reduce(local_v_list_sizes.begin(), local_v_list_sizes.end()) / major_comm_size) *
sizeof(vertex_t),
size_t{1});
num_concurrent_bcasts = std::min(num_concurrent_bcasts, handle.get_stream_pool_size());
num_concurrent_bcasts =
std::min(std::max(num_concurrent_bcasts, size_t{1}), static_cast<size_t>(major_comm_size));

std::optional<std::vector<size_t>> stream_pool_indices{std::nullopt};
if (num_concurrent_bcasts > 1) {
stream_pool_indices = std::vector<size_t>(num_concurrent_bcasts);
std::iota((*stream_pool_indices).begin(), (*stream_pool_indices).end(), size_t{0});
}

std::optional<raft::host_span<vertex_t const>> key_offsets{};
if constexpr (GraphViewType::is_storage_transposed) {
key_offsets = graph_view.local_sorted_unique_edge_src_vertex_partition_offsets();
} else {
key_offsets = graph_view.local_sorted_unique_edge_dst_vertex_partition_offsets();
}
if (stream_pool_indices) { handle.sync_stream(); }

auto edge_partition_keys = edge_minor_property_output.keys();
for (int i = 0; i < major_comm_size; ++i) {
if (is_packed_bool<typename EdgeMinorPropertyOutputWrapper::value_iterator,
typename EdgeMinorPropertyOutputWrapper::value_type>() &&
!edge_partition_keys && use_bitmap_flags[i]) {
rmm::device_uvector<uint32_t> rx_bitmap(
packed_bool_size(local_v_list_range_lasts[i] - local_v_list_range_firsts[i]),
handle.get_stream());
device_bcast(
major_comm,
(i == major_comm_rank) ? (*v_list_bitmap).data() : static_cast<uint32_t const*>(nullptr),
rx_bitmap.data(),
rx_bitmap.size(),
i,
handle.get_stream());
thrust::for_each(
handle.get_thrust_policy(),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(rx_bitmap.size()),
[input,
output_value_first =
edge_partition_value_first +
packed_bool_offset(local_v_list_range_firsts[i] - minor_range_first),
rx_bitmap = raft::device_span<uint32_t const>(rx_bitmap.data(),
rx_bitmap.size())] __device__(size_t i) {
if ((i == 0) || (i == (rx_bitmap.size() - 1))) { // first or last
cuda::atomic_ref<uint32_t, cuda::thread_scope_device> word(*(output_value_first + i));
if (input) {
word.fetch_or(rx_bitmap[i], cuda::std::memory_order_relaxed);
} else {
word.fetch_and(~rx_bitmap[i], cuda::std::memory_order_relaxed);
}
} else {
if (input) {
*(output_value_first + i) |= rx_bitmap[i];
} else {
*(output_value_first + i) &= ~rx_bitmap[i];
}
}
});
} else {
rmm::device_uvector<vertex_t> rx_vertices(local_v_list_sizes[i], handle.get_stream());
// FIXME: these broadcast operations can be placed between ncclGroupStart() and
// ncclGroupEnd()
std::variant<raft::device_span<uint32_t const>, decltype(sorted_unique_vertex_first)>
v_list{};
if (use_bitmap_flags[i]) {
v_list =
(i == major_comm_rank)
? raft::device_span<uint32_t const>((*v_list_bitmap).data(), (*v_list_bitmap).size())
: raft::device_span<uint32_t const>(static_cast<uint32_t const*>(nullptr), size_t{0});
} else {
v_list = sorted_unique_vertex_first;
}
device_bcast_vertex_list(major_comm,
v_list,
rx_vertices.begin(),
local_v_list_range_firsts[i],
local_v_list_range_lasts[i],
local_v_list_sizes[i],
i,
handle.get_stream());

if (edge_partition_keys) {
for (size_t i = 0; i < static_cast<size_t>(major_comm_size); i += num_concurrent_bcasts) {
auto loop_count = std::min(num_concurrent_bcasts, static_cast<size_t>(major_comm_size) - i);
for (size_t j = 0; j < loop_count; ++j) {
auto partition_idx = i + j;
auto loop_stream = stream_pool_indices ? handle.get_stream_from_stream_pool((*stream_pool_indices)[j]) : handle.get_stream();

if (is_packed_bool<typename EdgeMinorPropertyOutputWrapper::value_iterator,
typename EdgeMinorPropertyOutputWrapper::value_type>() &&
!edge_partition_keys && use_bitmap_flags[partition_idx]) {
rmm::device_uvector<uint32_t> rx_bitmap(
packed_bool_size(local_v_list_range_lasts[partition_idx] -
local_v_list_range_firsts[partition_idx]),
loop_stream);
device_bcast(major_comm,
(static_cast<int>(partition_idx) == major_comm_rank) ? (*v_list_bitmap).data()
: static_cast<uint32_t const*>(nullptr),
rx_bitmap.data(),
rx_bitmap.size(),
partition_idx,
loop_stream);
thrust::for_each(
handle.get_thrust_policy(),
rmm::exec_policy_nosync(loop_stream),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(local_v_list_sizes[i]),
[rx_vertex_first = rx_vertices.begin(),
input,
subrange_key_first = (*edge_partition_keys).begin() + (*key_offsets)[i],
subrange_key_last = (*edge_partition_keys).begin() + (*key_offsets)[i + 1],
edge_partition_value_first = edge_partition_value_first,
subrange_start_offset = (*key_offsets)[i]] __device__(auto i) {
auto minor = *(rx_vertex_first + i);
auto it =
thrust::lower_bound(thrust::seq, subrange_key_first, subrange_key_last, minor);
if ((it != subrange_key_last) && (*it == minor)) {
auto subrange_offset = thrust::distance(subrange_key_first, it);
if constexpr (contains_packed_bool_element) {
fill_scalar_or_thrust_tuple(
edge_partition_value_first, subrange_start_offset + subrange_offset, input);
thrust::make_counting_iterator(rx_bitmap.size()),
[input,
output_value_first =
edge_partition_value_first +
packed_bool_offset(local_v_list_range_firsts[partition_idx] - minor_range_first),
rx_bitmap = raft::device_span<uint32_t const>(rx_bitmap.data(),
rx_bitmap.size())] __device__(size_t i) {
if ((i == 0) || (i == (rx_bitmap.size() - 1))) { // first or last
cuda::atomic_ref<uint32_t, cuda::thread_scope_device> word(
*(output_value_first + i));
if (input) {
word.fetch_or(rx_bitmap[i], cuda::std::memory_order_relaxed);
} else {
*(edge_partition_value_first + subrange_start_offset + subrange_offset) = input;
word.fetch_and(~rx_bitmap[i], cuda::std::memory_order_relaxed);
}
} else {
if (input) {
*(output_value_first + i) |= rx_bitmap[i];
} else {
*(output_value_first + i) &= ~rx_bitmap[i];
}
}
});
} else {
if constexpr (contains_packed_bool_element) {
rmm::device_uvector<vertex_t> rx_vertices(local_v_list_sizes[partition_idx],
loop_stream);
// FIXME: these broadcast operations can be placed between ncclGroupStart() and
// ncclGroupEnd()
std::variant<raft::device_span<uint32_t const>, decltype(sorted_unique_vertex_first)>
v_list{};
if (use_bitmap_flags[partition_idx]) {
v_list = (static_cast<int>(partition_idx) == major_comm_rank) ? raft::device_span<uint32_t const>(
(*v_list_bitmap).data(), (*v_list_bitmap).size())
: raft::device_span<uint32_t const>(
static_cast<uint32_t const*>(nullptr), size_t{0});
} else {
v_list = sorted_unique_vertex_first;
}
device_bcast_vertex_list(major_comm,
v_list,
rx_vertices.begin(),
local_v_list_range_firsts[partition_idx],
local_v_list_range_lasts[partition_idx],
local_v_list_sizes[partition_idx],
partition_idx,
loop_stream);

if (edge_partition_keys) {
thrust::for_each(
handle.get_thrust_policy(),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(static_cast<vertex_t>(local_v_list_sizes[i])),
[minor_range_first,
rx_vertex_first = rx_vertices.begin(),
rmm::exec_policy_nosync(loop_stream),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(local_v_list_sizes[partition_idx]),
[rx_vertex_first = rx_vertices.begin(),
input,
output_value_first = edge_partition_value_first] __device__(auto i) {
auto rx_vertex = *(rx_vertex_first + i);
auto minor_offset = rx_vertex - minor_range_first;
fill_scalar_or_thrust_tuple(output_value_first, minor_offset, input);
subrange_key_first = (*edge_partition_keys).begin() + (*key_offsets)[partition_idx],
subrange_key_last =
(*edge_partition_keys).begin() + (*key_offsets)[partition_idx + 1],
edge_partition_value_first = edge_partition_value_first,
subrange_start_offset = (*key_offsets)[partition_idx]] __device__(auto i) {
auto minor = *(rx_vertex_first + i);
auto it =
thrust::lower_bound(thrust::seq, subrange_key_first, subrange_key_last, minor);
if ((it != subrange_key_last) && (*it == minor)) {
auto subrange_offset = thrust::distance(subrange_key_first, it);
if constexpr (contains_packed_bool_element) {
fill_scalar_or_thrust_tuple(
edge_partition_value_first, subrange_start_offset + subrange_offset, input);
} else {
*(edge_partition_value_first + subrange_start_offset + subrange_offset) = input;
}
}
});
} else {
auto map_first = thrust::make_transform_iterator(
rx_vertices.begin(),
cuda::proclaim_return_type<vertex_t>(
[minor_range_first] __device__(auto v) { return v - minor_range_first; }));
auto val_first = thrust::make_constant_iterator(input);
thrust::scatter(handle.get_thrust_policy(),
val_first,
val_first + local_v_list_sizes[i],
map_first,
edge_partition_value_first);
if constexpr (contains_packed_bool_element) {
thrust::for_each(
rmm::exec_policy_nosync(loop_stream),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(
static_cast<vertex_t>(local_v_list_sizes[partition_idx])),
[minor_range_first,
rx_vertex_first = rx_vertices.begin(),
input,
output_value_first = edge_partition_value_first] __device__(auto i) {
auto rx_vertex = *(rx_vertex_first + i);
auto minor_offset = rx_vertex - minor_range_first;
fill_scalar_or_thrust_tuple(output_value_first, minor_offset, input);
});
} else {
auto map_first = thrust::make_transform_iterator(
rx_vertices.begin(),
cuda::proclaim_return_type<vertex_t>(
[minor_range_first] __device__(auto v) { return v - minor_range_first; }));
auto val_first = thrust::make_constant_iterator(input);
thrust::scatter(rmm::exec_policy_nosync(loop_stream),
val_first,
val_first + local_v_list_sizes[partition_idx],
map_first,
edge_partition_value_first);
}
}
}
}
if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); }
}
} else {
assert(graph_view.local_vertex_partition_range_size() ==
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/prims/update_edge_src_dst_property.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,7 @@ void update_edge_minor_property(raft::handle_t const& handle,
(static_cast<size_t>(graph_view.compute_number_of_edges(handle) / comm_size) *
sizeof(vertex_t)) /
std::max(bcast_size, size_t{1});
num_concurrent_bcasts = std::max(num_concurrent_bcasts, size_t{1});
num_concurrent_bcasts = std::min(num_concurrent_bcasts, static_cast<size_t>(major_comm_size));
num_concurrent_bcasts = std::min(std::max(num_concurrent_bcasts, size_t{1}), static_cast<size_t>(major_comm_size));
auto num_rounds = (static_cast<size_t>(major_comm_size) + num_concurrent_bcasts - size_t{1}) /
num_concurrent_bcasts;

Expand Down

0 comments on commit b6a1fb0

Please sign in to comment.