From 9751332e25b8b6c7ef5b364ad28a29dfbd3eb64e Mon Sep 17 00:00:00 2001 From: Seunghwa Kang Date: Wed, 10 Jan 2024 12:35:43 -0800 Subject: [PATCH] update to support edge masking --- ...v_transform_reduce_incoming_outgoing_e.cuh | 260 +++++++++++------- 1 file changed, 161 insertions(+), 99 deletions(-) diff --git a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh index 1a7fc0130c4..cbd1662b37f 100644 --- a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh +++ b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh @@ -68,6 +68,7 @@ template edge_partition_e_mask, ResultValueOutputIteratorOrWrapper result_value_output, EdgeOp e_op, T init /* relevant only if update_major == true */, + T identity_element /* relevant only if update_major == true */, ReduceOp reduce_op) { static_assert(update_major || reduce_op::has_compatible_raft_comms_op_v< @@ -115,22 +118,28 @@ __global__ void per_v_transform_reduce_e_hypersparse( &edge_partition_src_value_input, &edge_partition_dst_value_input, &edge_partition_e_value_input, + &edge_partition_e_mask, &e_op, + identity_element, major, indices, edge_offset] __device__(auto i) { - auto major_offset = edge_partition.major_offset_from_major_nocheck(major); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed ? minor : major; - auto dst = GraphViewType::is_storage_transposed ? major : minor; - auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset; - auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto major_offset = edge_partition.major_offset_from_major_nocheck(major); + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed ? minor : major; + auto dst = GraphViewType::is_storage_transposed ? major : minor; + auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset; + auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset; + return e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + } else { + return identity_element; + } }; if constexpr (update_major) { @@ -147,22 +156,34 @@ __global__ void per_v_transform_reduce_e_hypersparse( thrust::seq, thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(local_degree), - [&edge_partition, indices, &result_value_output, &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + [&edge_partition, + identity_element, + indices, + &result_value_output, + &transform_op] __device__(auto i) { + auto e_op_result = transform_op(i); + if (e_op_result != identity_element) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } }); } else { thrust::for_each( thrust::seq, thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(local_degree), - [&edge_partition, indices, &result_value_output, &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + [&edge_partition, + identity_element, + indices, + &result_value_output, + &transform_op] __device__(auto i) { + auto e_op_result = transform_op(i); + if (e_op_result != identity_element) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } }); } } @@ -175,6 +196,7 @@ template edge_partition_e_mask, ResultValueOutputIteratorOrWrapper result_value_output, EdgeOp e_op, T init /* relevant only if update_major == true */, + T identity_element /* relevant only if update_major == true */, ReduceOp reduce_op) { static_assert(update_major || reduce_op::has_compatible_raft_comms_op_v< @@ -219,27 +243,33 @@ __global__ void per_v_transform_reduce_e_low_degree( &edge_partition_src_value_input, &edge_partition_dst_value_input, &edge_partition_e_value_input, + &edge_partition_e_mask, &e_op, + identity_element, major_offset, indices, edge_offset] __device__(auto i) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed + ? minor + : edge_partition.major_from_major_offset_nocheck(major_offset); + auto dst = GraphViewType::is_storage_transposed + ? edge_partition.major_from_major_offset_nocheck(major_offset) + : minor; + auto src_offset = + GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); + auto dst_offset = + GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; + return e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + } else { + return identity_element; + } }; if constexpr (update_major) { @@ -256,22 +286,34 @@ __global__ void per_v_transform_reduce_e_low_degree( thrust::seq, thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(local_degree), - [&edge_partition, indices, &result_value_output, &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + [&edge_partition, + identity_element, + indices, + &result_value_output, + &transform_op] __device__(auto i) { + auto e_op_result = transform_op(i); + if (e_op_result != identity_element) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } }); } else { thrust::for_each( thrust::seq, thrust::make_counting_iterator(edge_t{0}), thrust::make_counting_iterator(local_degree), - [&edge_partition, indices, &result_value_output, &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + [&edge_partition, + identity_element, + indices, + &result_value_output, + &transform_op] __device__(auto i) { + auto e_op_result = transform_op(i); + if (e_op_result != identity_element) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } }); } } @@ -284,6 +326,7 @@ template edge_partition_e_mask, ResultValueOutputIteratorOrWrapper result_value_output, EdgeOp e_op, T init /* relevant only if update_major == true */, @@ -335,30 +379,32 @@ __global__ void per_v_transform_reduce_e_mid_degree( [[maybe_unused]] auto reduced_e_op_result = lane_id == 0 ? init : identity_element; // relevant only if update_major == true for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - auto e_op_result = e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); - if constexpr (update_major) { - reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); - } else { - if constexpr (GraphViewType::is_multi_gpu) { - reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed + ? minor + : edge_partition.major_from_major_offset_nocheck(major_offset); + auto dst = GraphViewType::is_storage_transposed + ? edge_partition.major_from_major_offset_nocheck(major_offset) + : minor; + auto src_offset = + GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); + auto dst_offset = + GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; + auto e_op_result = e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + if constexpr (update_major) { + reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); } else { - reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + if constexpr (GraphViewType::is_multi_gpu) { + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } else { + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } } } } @@ -377,6 +423,7 @@ template edge_partition_e_mask, ResultValueOutputIteratorOrWrapper result_value_output, EdgeOp e_op, T init /* relevant only if update_major == true */, @@ -424,30 +472,32 @@ __global__ void per_v_transform_reduce_e_high_degree( [[maybe_unused]] auto reduced_e_op_result = threadIdx.x == 0 ? init : identity_element; // relevant only if update_major == true for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - auto e_op_result = e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); - if constexpr (update_major) { - reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); - } else { - if constexpr (GraphViewType::is_multi_gpu) { - reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed + ? minor + : edge_partition.major_from_major_offset_nocheck(major_offset); + auto dst = GraphViewType::is_storage_transposed + ? edge_partition.major_from_major_offset_nocheck(major_offset) + : minor; + auto src_offset = + GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); + auto dst_offset = + GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; + auto e_op_result = e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + if constexpr (update_major) { + reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); } else { - reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + if constexpr (GraphViewType::is_multi_gpu) { + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } else { + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } } } } @@ -656,10 +706,18 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, if (stream_pool_indices) { handle.sync_stream(); } + auto edge_mask_view = graph_view.edge_mask_view(); + for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( graph_view.local_edge_partition_view(i)); + auto edge_partition_e_mask = + edge_mask_view + ? thrust::make_optional< + detail::edge_partition_edge_property_device_view_t>( + *edge_mask_view, i) + : thrust::nullopt; auto major_init = ReduceOp::identity_element; if constexpr (update_major) { @@ -737,9 +795,11 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, segment_output_buffer, e_op, major_init, + ReduceOp::identity_element, reduce_op); } } @@ -761,9 +821,11 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, segment_output_buffer, e_op, major_init, + ReduceOp::identity_element, reduce_op); } if ((*segment_offsets)[2] - (*segment_offsets)[1] > 0) { @@ -784,6 +846,7 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, segment_output_buffer, e_op, major_init, @@ -806,6 +869,7 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, output_buffer, e_op, major_init, @@ -825,9 +889,11 @@ void per_v_transform_reduce_e(raft::handle_t const& handle, edge_partition_src_value_input, edge_partition_dst_value_input, edge_partition_e_value_input, + edge_partition_e_mask, output_buffer, e_op, major_init, + ReduceOp::identity_element, reduce_op); } } @@ -1056,8 +1122,6 @@ void per_v_transform_reduce_incoming_e(raft::handle_t const& handle, VertexValueOutputIterator vertex_value_output_first, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do } @@ -1137,8 +1201,6 @@ void per_v_transform_reduce_outgoing_e(raft::handle_t const& handle, VertexValueOutputIterator vertex_value_output_first, bool do_expensive_check = false) { - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do }