From b6a1fb06d9ff13ec2020e87a153e28a768b03d81 Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Mon, 9 Sep 2024 00:13:45 -0700 Subject: [PATCH] multi-stream execution --- cpp/src/prims/fill_edge_src_dst_property.cuh | 232 ++++++++++-------- .../prims/update_edge_src_dst_property.cuh | 3 +- 2 files changed, 132 insertions(+), 103 deletions(-) diff --git a/cpp/src/prims/fill_edge_src_dst_property.cuh b/cpp/src/prims/fill_edge_src_dst_property.cuh index 46a12555eca..3197ac8e963 100644 --- a/cpp/src/prims/fill_edge_src_dst_property.cuh +++ b/cpp/src/prims/fill_edge_src_dst_property.cuh @@ -316,7 +316,7 @@ void fill_edge_minor_property(raft::handle_t const& handle, if constexpr (GraphViewType::is_multi_gpu) { auto& comm = handle.get_comms(); - auto const comm_rank = comm.get_rank(); + auto const comm_size = comm.get_size(); auto& major_comm = handle.get_subcomm(cugraph::partition_manager::major_comm_name()); auto const major_comm_rank = major_comm.get_rank(); auto const major_comm_size = major_comm.get_size(); @@ -362,128 +362,158 @@ void fill_edge_minor_property(raft::handle_t const& handle, auto local_v_list_range_lasts = host_scalar_allgather(major_comm, v_list_range[1], handle.get_stream()); + auto num_concurrent_bcasts = + (static_cast(graph_view.compute_number_of_edges(handle) / comm_size) * + sizeof(vertex_t)) / + std::min( + (std::reduce(local_v_list_sizes.begin(), local_v_list_sizes.end()) / major_comm_size) * + sizeof(vertex_t), + size_t{1}); + num_concurrent_bcasts = std::min(num_concurrent_bcasts, handle.get_stream_pool_size()); + num_concurrent_bcasts = + std::min(std::max(num_concurrent_bcasts, size_t{1}), static_cast(major_comm_size)); + + std::optional> stream_pool_indices{std::nullopt}; + if (num_concurrent_bcasts > 1) { + stream_pool_indices = std::vector(num_concurrent_bcasts); + std::iota((*stream_pool_indices).begin(), (*stream_pool_indices).end(), size_t{0}); + } + std::optional> key_offsets{}; if constexpr (GraphViewType::is_storage_transposed) { key_offsets = graph_view.local_sorted_unique_edge_src_vertex_partition_offsets(); } else { key_offsets = graph_view.local_sorted_unique_edge_dst_vertex_partition_offsets(); } + if (stream_pool_indices) { handle.sync_stream(); } auto edge_partition_keys = edge_minor_property_output.keys(); - for (int i = 0; i < major_comm_size; ++i) { - if (is_packed_bool() && - !edge_partition_keys && use_bitmap_flags[i]) { - rmm::device_uvector rx_bitmap( - packed_bool_size(local_v_list_range_lasts[i] - local_v_list_range_firsts[i]), - handle.get_stream()); - device_bcast( - major_comm, - (i == major_comm_rank) ? (*v_list_bitmap).data() : static_cast(nullptr), - rx_bitmap.data(), - rx_bitmap.size(), - i, - handle.get_stream()); - thrust::for_each( - handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(rx_bitmap.size()), - [input, - output_value_first = - edge_partition_value_first + - packed_bool_offset(local_v_list_range_firsts[i] - minor_range_first), - rx_bitmap = raft::device_span(rx_bitmap.data(), - rx_bitmap.size())] __device__(size_t i) { - if ((i == 0) || (i == (rx_bitmap.size() - 1))) { // first or last - cuda::atomic_ref word(*(output_value_first + i)); - if (input) { - word.fetch_or(rx_bitmap[i], cuda::std::memory_order_relaxed); - } else { - word.fetch_and(~rx_bitmap[i], cuda::std::memory_order_relaxed); - } - } else { - if (input) { - *(output_value_first + i) |= rx_bitmap[i]; - } else { - *(output_value_first + i) &= ~rx_bitmap[i]; - } - } - }); - } else { - rmm::device_uvector rx_vertices(local_v_list_sizes[i], handle.get_stream()); - // FIXME: these broadcast operations can be placed between ncclGroupStart() and - // ncclGroupEnd() - std::variant, decltype(sorted_unique_vertex_first)> - v_list{}; - if (use_bitmap_flags[i]) { - v_list = - (i == major_comm_rank) - ? raft::device_span((*v_list_bitmap).data(), (*v_list_bitmap).size()) - : raft::device_span(static_cast(nullptr), size_t{0}); - } else { - v_list = sorted_unique_vertex_first; - } - device_bcast_vertex_list(major_comm, - v_list, - rx_vertices.begin(), - local_v_list_range_firsts[i], - local_v_list_range_lasts[i], - local_v_list_sizes[i], - i, - handle.get_stream()); - - if (edge_partition_keys) { + for (size_t i = 0; i < static_cast(major_comm_size); i += num_concurrent_bcasts) { + auto loop_count = std::min(num_concurrent_bcasts, static_cast(major_comm_size) - i); + for (size_t j = 0; j < loop_count; ++j) { + auto partition_idx = i + j; + auto loop_stream = stream_pool_indices ? handle.get_stream_from_stream_pool((*stream_pool_indices)[j]) : handle.get_stream(); + + if (is_packed_bool() && + !edge_partition_keys && use_bitmap_flags[partition_idx]) { + rmm::device_uvector rx_bitmap( + packed_bool_size(local_v_list_range_lasts[partition_idx] - + local_v_list_range_firsts[partition_idx]), + loop_stream); + device_bcast(major_comm, + (static_cast(partition_idx) == major_comm_rank) ? (*v_list_bitmap).data() + : static_cast(nullptr), + rx_bitmap.data(), + rx_bitmap.size(), + partition_idx, + loop_stream); thrust::for_each( - handle.get_thrust_policy(), + rmm::exec_policy_nosync(loop_stream), thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(local_v_list_sizes[i]), - [rx_vertex_first = rx_vertices.begin(), - input, - subrange_key_first = (*edge_partition_keys).begin() + (*key_offsets)[i], - subrange_key_last = (*edge_partition_keys).begin() + (*key_offsets)[i + 1], - edge_partition_value_first = edge_partition_value_first, - subrange_start_offset = (*key_offsets)[i]] __device__(auto i) { - auto minor = *(rx_vertex_first + i); - auto it = - thrust::lower_bound(thrust::seq, subrange_key_first, subrange_key_last, minor); - if ((it != subrange_key_last) && (*it == minor)) { - auto subrange_offset = thrust::distance(subrange_key_first, it); - if constexpr (contains_packed_bool_element) { - fill_scalar_or_thrust_tuple( - edge_partition_value_first, subrange_start_offset + subrange_offset, input); + thrust::make_counting_iterator(rx_bitmap.size()), + [input, + output_value_first = + edge_partition_value_first + + packed_bool_offset(local_v_list_range_firsts[partition_idx] - minor_range_first), + rx_bitmap = raft::device_span(rx_bitmap.data(), + rx_bitmap.size())] __device__(size_t i) { + if ((i == 0) || (i == (rx_bitmap.size() - 1))) { // first or last + cuda::atomic_ref word( + *(output_value_first + i)); + if (input) { + word.fetch_or(rx_bitmap[i], cuda::std::memory_order_relaxed); } else { - *(edge_partition_value_first + subrange_start_offset + subrange_offset) = input; + word.fetch_and(~rx_bitmap[i], cuda::std::memory_order_relaxed); + } + } else { + if (input) { + *(output_value_first + i) |= rx_bitmap[i]; + } else { + *(output_value_first + i) &= ~rx_bitmap[i]; } } }); } else { - if constexpr (contains_packed_bool_element) { + rmm::device_uvector rx_vertices(local_v_list_sizes[partition_idx], + loop_stream); + // FIXME: these broadcast operations can be placed between ncclGroupStart() and + // ncclGroupEnd() + std::variant, decltype(sorted_unique_vertex_first)> + v_list{}; + if (use_bitmap_flags[partition_idx]) { + v_list = (static_cast(partition_idx) == major_comm_rank) ? raft::device_span( + (*v_list_bitmap).data(), (*v_list_bitmap).size()) + : raft::device_span( + static_cast(nullptr), size_t{0}); + } else { + v_list = sorted_unique_vertex_first; + } + device_bcast_vertex_list(major_comm, + v_list, + rx_vertices.begin(), + local_v_list_range_firsts[partition_idx], + local_v_list_range_lasts[partition_idx], + local_v_list_sizes[partition_idx], + partition_idx, + loop_stream); + + if (edge_partition_keys) { thrust::for_each( - handle.get_thrust_policy(), - thrust::make_counting_iterator(vertex_t{0}), - thrust::make_counting_iterator(static_cast(local_v_list_sizes[i])), - [minor_range_first, - rx_vertex_first = rx_vertices.begin(), + rmm::exec_policy_nosync(loop_stream), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(local_v_list_sizes[partition_idx]), + [rx_vertex_first = rx_vertices.begin(), input, - output_value_first = edge_partition_value_first] __device__(auto i) { - auto rx_vertex = *(rx_vertex_first + i); - auto minor_offset = rx_vertex - minor_range_first; - fill_scalar_or_thrust_tuple(output_value_first, minor_offset, input); + subrange_key_first = (*edge_partition_keys).begin() + (*key_offsets)[partition_idx], + subrange_key_last = + (*edge_partition_keys).begin() + (*key_offsets)[partition_idx + 1], + edge_partition_value_first = edge_partition_value_first, + subrange_start_offset = (*key_offsets)[partition_idx]] __device__(auto i) { + auto minor = *(rx_vertex_first + i); + auto it = + thrust::lower_bound(thrust::seq, subrange_key_first, subrange_key_last, minor); + if ((it != subrange_key_last) && (*it == minor)) { + auto subrange_offset = thrust::distance(subrange_key_first, it); + if constexpr (contains_packed_bool_element) { + fill_scalar_or_thrust_tuple( + edge_partition_value_first, subrange_start_offset + subrange_offset, input); + } else { + *(edge_partition_value_first + subrange_start_offset + subrange_offset) = input; + } + } }); } else { - auto map_first = thrust::make_transform_iterator( - rx_vertices.begin(), - cuda::proclaim_return_type( - [minor_range_first] __device__(auto v) { return v - minor_range_first; })); - auto val_first = thrust::make_constant_iterator(input); - thrust::scatter(handle.get_thrust_policy(), - val_first, - val_first + local_v_list_sizes[i], - map_first, - edge_partition_value_first); + if constexpr (contains_packed_bool_element) { + thrust::for_each( + rmm::exec_policy_nosync(loop_stream), + thrust::make_counting_iterator(vertex_t{0}), + thrust::make_counting_iterator( + static_cast(local_v_list_sizes[partition_idx])), + [minor_range_first, + rx_vertex_first = rx_vertices.begin(), + input, + output_value_first = edge_partition_value_first] __device__(auto i) { + auto rx_vertex = *(rx_vertex_first + i); + auto minor_offset = rx_vertex - minor_range_first; + fill_scalar_or_thrust_tuple(output_value_first, minor_offset, input); + }); + } else { + auto map_first = thrust::make_transform_iterator( + rx_vertices.begin(), + cuda::proclaim_return_type( + [minor_range_first] __device__(auto v) { return v - minor_range_first; })); + auto val_first = thrust::make_constant_iterator(input); + thrust::scatter(rmm::exec_policy_nosync(loop_stream), + val_first, + val_first + local_v_list_sizes[partition_idx], + map_first, + edge_partition_value_first); + } } } } + if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); } } } else { assert(graph_view.local_vertex_partition_range_size() == diff --git a/cpp/src/prims/update_edge_src_dst_property.cuh b/cpp/src/prims/update_edge_src_dst_property.cuh index 2408dcb3d68..f95928520ab 100644 --- a/cpp/src/prims/update_edge_src_dst_property.cuh +++ b/cpp/src/prims/update_edge_src_dst_property.cuh @@ -495,8 +495,7 @@ void update_edge_minor_property(raft::handle_t const& handle, (static_cast(graph_view.compute_number_of_edges(handle) / comm_size) * sizeof(vertex_t)) / std::max(bcast_size, size_t{1}); - num_concurrent_bcasts = std::max(num_concurrent_bcasts, size_t{1}); - num_concurrent_bcasts = std::min(num_concurrent_bcasts, static_cast(major_comm_size)); + num_concurrent_bcasts = std::min(std::max(num_concurrent_bcasts, size_t{1}), static_cast(major_comm_size)); auto num_rounds = (static_cast(major_comm_size) + num_concurrent_bcasts - size_t{1}) / num_concurrent_bcasts;