Skip to content

Commit

Permalink
fix implicit synchronization in multi-stream execution
Browse files Browse the repository at this point in the history
  • Loading branch information
seunghwak committed Aug 27, 2024
1 parent 350f17e commit 93f726f
Showing 1 changed file with 118 additions and 30 deletions.
148 changes: 118 additions & 30 deletions cpp/src/prims/detail/per_v_transform_reduce_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ void per_v_transform_reduce_e_edge_partition(
}
}

#define PER_V_PERFORMANCE_MEASUREMENT 0
#define PER_V_PERFORMANCE_MEASUREMENT 1

template <bool incoming, // iterate over incoming edges (incoming == true) or outgoing edges
// (incoming == false)
Expand Down Expand Up @@ -1839,6 +1839,9 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
auto time1 = std::chrono::steady_clock::now();
#endif
for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); i += num_concurrent_loops) {
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime0 = std::chrono::steady_clock::now();
#endif
auto loop_count =
std::min(num_concurrent_loops, graph_view.number_of_local_edge_partitions() - i);

Expand All @@ -1847,38 +1850,26 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
std::byte /* dummy */>
edge_partition_key_buffers{};
if constexpr (GraphViewType::is_multi_gpu && use_input_key) {
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_rank = minor_comm.get_rank();
auto const minor_comm_size = minor_comm.get_size();

edge_partition_key_buffers.reserve(loop_count);
}
std::vector<std::optional<std::vector<size_t>>> key_segment_offset_vectors{};
key_segment_offset_vectors.reserve(loop_count);
std::conditional_t<GraphViewType::is_multi_gpu && update_major,
std::vector<dataframe_buffer_type_t<T>>,
std::byte /* dummy */>
major_output_buffers{};
if constexpr (GraphViewType::is_multi_gpu && update_major) {
major_output_buffers.reserve(loop_count);
}
for (size_t j = 0; j < loop_count; ++j) {
auto partition_idx = i * num_concurrent_loops + j;
auto loop_stream = stream_pool_indices
? handle.get_stream_from_stream_pool((*stream_pool_indices)[j])
: handle.get_stream();

auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
graph_view.local_edge_partition_view(partition_idx));
auto segment_offsets = graph_view.local_edge_partition_segment_offsets(partition_idx);
for (size_t j = 0; j < loop_count; ++j) {
auto partition_idx = i * num_concurrent_loops + j;
auto loop_stream = stream_pool_indices
? handle.get_stream_from_stream_pool((*stream_pool_indices)[j])
: handle.get_stream();

auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
graph_view.local_edge_partition_view(partition_idx));
auto segment_offsets = graph_view.local_edge_partition_segment_offsets(partition_idx);

auto edge_partition_key_first = sorted_unique_key_first;
auto edge_partition_key_last = sorted_unique_nzd_key_last;
if constexpr (GraphViewType::is_multi_gpu && use_input_key) {
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_size = minor_comm.get_size();
auto edge_partition_key_buffer = allocate_dataframe_buffer<key_t>(
minor_comm_size > 1 ? local_key_list_sizes[partition_idx] : size_t{0}, loop_stream);
if (minor_comm_size > 1) {
auto const minor_comm_rank = minor_comm.get_rank();

if constexpr (try_bitmap) {
std::variant<raft::device_span<uint32_t const>, decltype(sorted_unique_key_first)>
v_list{};
Expand Down Expand Up @@ -1911,13 +1902,47 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
}
}
edge_partition_key_buffers.push_back(std::move(edge_partition_key_buffer));
}
}
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime1 = std::chrono::steady_clock::now();
#endif
if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); }
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime2 = std::chrono::steady_clock::now();
#endif

std::vector<std::optional<std::vector<size_t>>> key_segment_offset_vectors{};
key_segment_offset_vectors.reserve(loop_count);
std::conditional_t<GraphViewType::is_multi_gpu && update_major,
std::vector<dataframe_buffer_type_t<T>>,
std::byte /* dummy */>
major_output_buffers{};
if constexpr (GraphViewType::is_multi_gpu && update_major) {
major_output_buffers.reserve(loop_count);
}
for (size_t j = 0; j < loop_count; ++j) {
auto partition_idx = i * num_concurrent_loops + j;
auto loop_stream = stream_pool_indices
? handle.get_stream_from_stream_pool((*stream_pool_indices)[j])
: handle.get_stream();

auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
graph_view.local_edge_partition_view(partition_idx));
auto segment_offsets = graph_view.local_edge_partition_segment_offsets(partition_idx);

auto edge_partition_key_first = sorted_unique_key_first;
auto edge_partition_key_last = sorted_unique_nzd_key_last;
if constexpr (GraphViewType::is_multi_gpu && use_input_key) {
edge_partition_key_first = get_dataframe_buffer_begin(edge_partition_key_buffers[j]);
edge_partition_key_last = get_dataframe_buffer_end(edge_partition_key_buffers[j]);
}

std::optional<std::vector<size_t>> key_segment_offsets{std::nullopt};
if (segment_offsets) {
if constexpr (use_input_key) {
// FIXME: compute_key_segment_offsets implicitly synchronizes to copy the results to host
key_segment_offsets = compute_key_segment_offsets(
edge_partition_key_first,
edge_partition_key_last,
Expand Down Expand Up @@ -1946,7 +1971,13 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
major_output_buffers.push_back(allocate_dataframe_buffer<T>(buffer_size, loop_stream));
}
}
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime3 = std::chrono::steady_clock::now();
#endif
if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); }
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime4 = std::chrono::steady_clock::now();
#endif

for (size_t j = 0; j < loop_count; ++j) {
auto partition_idx = i * num_concurrent_loops + j;
Expand Down Expand Up @@ -2061,12 +2092,52 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
key_segment_offsets,
edge_partition_stream_pool_indices);
}
}
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime5 = std::chrono::steady_clock::now();
#endif
if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); }
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime6 = std::chrono::steady_clock::now();
#endif

if constexpr (GraphViewType::is_multi_gpu && update_major) {
for (size_t j = 0; j < loop_count; ++j) {
auto partition_idx = i * num_concurrent_loops + j;

auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
graph_view.local_edge_partition_view(partition_idx));
auto edge_partition_stream_pool_indices =
stream_pool_indices ? std::make_optional<raft::host_span<size_t const>>(
(*stream_pool_indices).data() + j * max_segments, max_segments)
: std::nullopt;

if constexpr (GraphViewType::is_multi_gpu && update_major) {
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_rank = minor_comm.get_rank();
auto const minor_comm_size = minor_comm.get_size();

auto edge_partition_key_first = sorted_unique_key_first;
auto edge_partition_key_last = sorted_unique_nzd_key_last;
if constexpr (GraphViewType::is_multi_gpu && use_input_key) {
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_size = minor_comm.get_size();
if (minor_comm_size > 1) {
edge_partition_key_first = get_dataframe_buffer_begin(edge_partition_key_buffers[j]);
edge_partition_key_last = get_dataframe_buffer_end(edge_partition_key_buffers[j]);
}
}

bool process_local_edges = true;
if constexpr (filter_input_key) {
auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name());
auto const minor_comm_rank = minor_comm.get_rank();
if (static_cast<int>(partition_idx) == minor_comm_rank) { process_local_edges = false; }
}

auto const& key_segment_offsets = key_segment_offset_vectors[j];
auto output_buffer = get_dataframe_buffer_begin(major_output_buffers[j]);

if (key_segment_offsets && edge_partition_stream_pool_indices) {
if (edge_partition.dcs_nzd_vertex_count()) {
if ((*key_segment_offsets)[4] - (*key_segment_offsets)[3] > 0) {
Expand Down Expand Up @@ -2222,7 +2293,24 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
}
}
}
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime7 = std::chrono::steady_clock::now();
#endif
if (stream_pool_indices) { handle.sync_stream_pool(*stream_pool_indices); }
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
auto subtime8 = std::chrono::steady_clock::now();
std::chrono::duration<double> subdur0 = subtime1 - subtime0;
std::chrono::duration<double> subdur1 = subtime2 - subtime1;
std::chrono::duration<double> subdur2 = subtime3 - subtime2;
std::chrono::duration<double> subdur3 = subtime4 - subtime3;
std::chrono::duration<double> subdur4 = subtime5 - subtime4;
std::chrono::duration<double> subdur5 = subtime6 - subtime5;
std::chrono::duration<double> subdur6 = subtime7 - subtime6;
std::chrono::duration<double> subdur7 = subtime8 - subtime7;
std::cout << "sub took (" << subdur0.count() << "," << subdur1.count() << "," << subdur2.count()
<< "," << subdur3.count() << "," << subdur4.count() << "," << subdur5.count() << ","
<< subdur6.count() << "," << subdur7.count() << ")" << std::endl;
#endif
}
#if PER_V_PERFORMANCE_MEASUREMENT // FIXME: delete
RAFT_CUDA_TRY(cudaDeviceSynchronize());
Expand Down Expand Up @@ -2419,8 +2507,8 @@ void per_v_transform_reduce_e(raft::handle_t const& handle,
std::chrono::duration<double> dur1 = time2 - time1;
std::chrono::duration<double> dur2 = time3 - time2;
std::chrono::duration<double> dur3 = time4 - time3;
std::cout << "\t\tdetail::per_v (prep, ep, scatter, comm) took (" << dur0.count() << "," << dur1.count() << ","
<< dur2.count() << "," << dur3.count() << ")" << std::endl;
std::cout << "\t\tdetail::per_v (prep, ep, scatter, comm) took (" << dur0.count() << ","
<< dur1.count() << "," << dur2.count() << "," << dur3.count() << ")" << std::endl;
#endif
}

Expand Down

0 comments on commit 93f726f

Please sign in to comment.