diff --git a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh index cbd1662b37f..3febc2876cc 100644 --- a/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh +++ b/cpp/src/prims/per_v_transform_reduce_incoming_outgoing_e.cuh @@ -63,6 +63,107 @@ namespace detail { int32_t constexpr per_v_transform_reduce_e_kernel_block_size = 512; +template +struct per_v_transform_reduce_call_e_op_t { + edge_partition_device_view_t + edge_partition{}; + EdgePartitionSrcValueInputWrapper edge_partition_src_value_input{}; + EdgePartitionDstValueInputWrapper edge_partition_dst_value_input{}; + EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input{}; + EdgeOp e_op{}; + typename GraphViewType::vertex_type major{}; + typename GraphViewType::vertex_type major_offset{}; + typename GraphViewType::vertex_type const* indices{nullptr}; + typename GraphViewType::edge_type edge_offset{}; + + __device__ auto operator()(typename GraphViewType::edge_type i) const + { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + auto src = GraphViewType::is_storage_transposed ? minor : major; + auto dst = GraphViewType::is_storage_transposed ? major : minor; + auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset; + auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset; + return e_op(src, + dst, + edge_partition_src_value_input.get(src_offset), + edge_partition_dst_value_input.get(dst_offset), + edge_partition_e_value_input.get(edge_offset + i)); + } +}; + +template +__device__ void update_result_value_output( + edge_partition_device_view_t edge_partition, + vertex_t const* indices, + edge_t local_degree, + TransformOp transform_op, + result_t init, + ReduceOp reduce_op, + size_t output_idx /* relevent only when update_major === true */, + result_t identity_element, + ResultValueOutputIteratorOrWrapper result_value_output) +{ + if constexpr (update_major) { + *(result_value_output + output_idx) = + thrust::transform_reduce(thrust::seq, + thrust::make_counting_iterator(edge_t{0}), + thrust::make_counting_iterator(local_degree), + transform_op, + init, + reduce_op); + } else { + if constexpr (multi_gpu) { + thrust::for_each( + thrust::seq, + thrust::make_counting_iterator(edge_t{0}), + thrust::make_counting_iterator(local_degree), + [&edge_partition, + identity_element, + indices, + &result_value_output, + &transform_op] __device__(auto i) { + auto e_op_result = transform_op(i); + if (e_op_result != identity_element) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } + }); + } else { + thrust::for_each( + thrust::seq, + thrust::make_counting_iterator(edge_t{0}), + thrust::make_counting_iterator(local_degree), + [&edge_partition, + identity_element, + indices, + &result_value_output, + &transform_op] __device__(auto i) { + auto e_op_result = transform_op(i); + if (e_op_result != identity_element) { + auto minor = indices[i]; + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } + }); + } + } +} + template (dcs_nzd_vertex_count)) { auto major = *(edge_partition.major_from_major_hypersparse_idx_nocheck(static_cast(idx))); + auto major_offset = edge_partition.major_offset_from_major_nocheck(major); auto major_idx = major_start_offset + idx; // major_offset != major_idx in the hypersparse region vertex_t const* indices{nullptr}; @@ -114,78 +216,52 @@ __global__ void per_v_transform_reduce_e_hypersparse( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(static_cast(major_idx)); - auto transform_op = [&edge_partition, - &edge_partition_src_value_input, - &edge_partition_dst_value_input, - &edge_partition_e_value_input, - &edge_partition_e_mask, - &e_op, - identity_element, - major, - indices, - edge_offset] __device__(auto i) { - if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { - auto major_offset = edge_partition.major_offset_from_major_nocheck(major); - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed ? minor : major; - auto dst = GraphViewType::is_storage_transposed ? major : minor; - auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset; - auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); - } else { - return identity_element; - } - }; - if constexpr (update_major) { - *(result_value_output + (major - *(edge_partition.major_hypersparse_first()))) = - thrust::transform_reduce(thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - transform_op, - init, - reduce_op); + auto call_e_op = per_v_transform_reduce_call_e_op_t{edge_partition, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + e_op, + major, + major_offset, + indices, + edge_offset}; + + if (edge_partition_e_mask) { + auto transform_op = + [&edge_partition_e_mask, &call_e_op, identity_element, edge_offset] __device__(auto i) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + return call_e_op(i); + } else { + return identity_element; + } + }; + + update_result_value_output(edge_partition, + indices, + local_degree, + transform_op, + init, + reduce_op, + major - *(edge_partition).major_hypersparse_first(), + identity_element, + result_value_output); } else { - if constexpr (GraphViewType::is_multi_gpu) { - thrust::for_each( - thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - [&edge_partition, - identity_element, - indices, - &result_value_output, - &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - if (e_op_result != identity_element) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); - } - }); - } else { - thrust::for_each( - thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - [&edge_partition, - identity_element, - indices, - &result_value_output, - &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - if (e_op_result != identity_element) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); - } - }); - } + auto transform_op = [&call_e_op] __device__(auto i) { return call_e_op(i); }; + + update_result_value_output(edge_partition, + indices, + local_degree, + transform_op, + init, + reduce_op, + major - *(edge_partition).major_hypersparse_first(), + identity_element, + result_value_output); } idx += gridDim.x * blockDim.x; } @@ -233,89 +309,59 @@ __global__ void per_v_transform_reduce_e_low_degree( auto idx = static_cast(tid); while (idx < static_cast(major_range_last - major_range_first)) { - auto major_offset = major_start_offset + idx; + auto major_offset = static_cast(major_start_offset + idx); + auto major = edge_partition.major_from_major_offset_nocheck(major_offset); vertex_t const* indices{nullptr}; edge_t edge_offset{}; edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(static_cast(major_offset)); - auto transform_op = [&edge_partition, - &edge_partition_src_value_input, - &edge_partition_dst_value_input, - &edge_partition_e_value_input, - &edge_partition_e_mask, - &e_op, - identity_element, - major_offset, - indices, - edge_offset] __device__(auto i) { - if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - return e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); - } else { - return identity_element; - } - }; - if constexpr (update_major) { - *(result_value_output + idx) = - thrust::transform_reduce(thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - transform_op, - init, - reduce_op); + auto call_e_op = per_v_transform_reduce_call_e_op_t{edge_partition, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + e_op, + major, + major_offset, + indices, + edge_offset}; + + if (edge_partition_e_mask) { + auto transform_op = + [&edge_partition_e_mask, &call_e_op, identity_element, edge_offset] __device__(auto i) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + return call_e_op(i); + } else { + return identity_element; + } + }; + + update_result_value_output(edge_partition, + indices, + local_degree, + transform_op, + init, + reduce_op, + idx, + identity_element, + result_value_output); } else { - if constexpr (GraphViewType::is_multi_gpu) { - thrust::for_each( - thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - [&edge_partition, - identity_element, - indices, - &result_value_output, - &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - if (e_op_result != identity_element) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); - } - }); - } else { - thrust::for_each( - thrust::seq, - thrust::make_counting_iterator(edge_t{0}), - thrust::make_counting_iterator(local_degree), - [&edge_partition, - identity_element, - indices, - &result_value_output, - &transform_op] __device__(auto i) { - auto e_op_result = transform_op(i); - if (e_op_result != identity_element) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); - } - }); - } + auto transform_op = [&call_e_op] __device__(auto i) { return call_e_op(i); }; + + update_result_value_output(edge_partition, + indices, + local_degree, + transform_op, + init, + reduce_op, + idx, + identity_element, + result_value_output); } idx += gridDim.x * blockDim.x; } @@ -371,35 +417,52 @@ __global__ void per_v_transform_reduce_e_mid_degree( raft::warp_size()]; // relevant only if update_major == true while (idx < static_cast(major_range_last - major_range_first)) { - auto major_offset = major_start_offset + idx; + auto major_offset = static_cast(major_start_offset + idx); + auto major = edge_partition.major_from_major_offset_nocheck(major_offset); vertex_t const* indices{nullptr}; edge_t edge_offset{}; edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_offset); + + auto call_e_op = per_v_transform_reduce_call_e_op_t{edge_partition, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + e_op, + major, + major_offset, + indices, + edge_offset}; + [[maybe_unused]] auto reduced_e_op_result = lane_id == 0 ? init : identity_element; // relevant only if update_major == true - for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { - if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - auto e_op_result = e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); + if (edge_partition_e_mask) { + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + auto e_op_result = call_e_op(i); + if constexpr (update_major) { + reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); + } else { + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(indices[i]); + if constexpr (GraphViewType::is_multi_gpu) { + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } else { + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } + } + } + } + } else { + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { + auto e_op_result = call_e_op(i); if constexpr (update_major) { reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); } else { + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(indices[i]); if constexpr (GraphViewType::is_multi_gpu) { reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); } else { @@ -408,6 +471,7 @@ __global__ void per_v_transform_reduce_e_mid_degree( } } } + if constexpr (update_major) { reduced_e_op_result = WarpReduce(temp_storage[threadIdx.x / raft::warp_size()]) .Reduce(reduced_e_op_result, reduce_op); @@ -464,35 +528,52 @@ __global__ void per_v_transform_reduce_e_high_degree( typename BlockReduce::TempStorage temp_storage; // relevant only if update_major == true while (idx < static_cast(major_range_last - major_range_first)) { - auto major_offset = major_start_offset + idx; + auto major_offset = static_cast(major_start_offset + idx); + auto major = edge_partition.major_from_major_offset_nocheck(major_offset); vertex_t const* indices{nullptr}; edge_t edge_offset{}; edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_offset); + + auto call_e_op = per_v_transform_reduce_call_e_op_t{edge_partition, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + e_op, + major, + major_offset, + indices, + edge_offset}; + [[maybe_unused]] auto reduced_e_op_result = threadIdx.x == 0 ? init : identity_element; // relevant only if update_major == true - for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { - if (!edge_partition_e_mask || (*edge_partition_e_mask).get(edge_offset + i)) { - auto minor = indices[i]; - auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor); - auto src = GraphViewType::is_storage_transposed - ? minor - : edge_partition.major_from_major_offset_nocheck(major_offset); - auto dst = GraphViewType::is_storage_transposed - ? edge_partition.major_from_major_offset_nocheck(major_offset) - : minor; - auto src_offset = - GraphViewType::is_storage_transposed ? minor_offset : static_cast(major_offset); - auto dst_offset = - GraphViewType::is_storage_transposed ? static_cast(major_offset) : minor_offset; - auto e_op_result = e_op(src, - dst, - edge_partition_src_value_input.get(src_offset), - edge_partition_dst_value_input.get(dst_offset), - edge_partition_e_value_input.get(edge_offset + i)); + if (edge_partition_e_mask) { + for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + auto e_op_result = call_e_op(i); + if constexpr (update_major) { + reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); + } else { + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(indices[i]); + if constexpr (GraphViewType::is_multi_gpu) { + reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); + } else { + reduce_op::atomic_reduce(result_value_output + minor_offset, e_op_result); + } + } + } + } + } else { + for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { + auto e_op_result = call_e_op(i); if constexpr (update_major) { reduced_e_op_result = reduce_op(reduced_e_op_result, e_op_result); } else { + auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(indices[i]); if constexpr (GraphViewType::is_multi_gpu) { reduce_op::atomic_reduce(result_value_output, minor_offset, e_op_result); } else { @@ -501,6 +582,7 @@ __global__ void per_v_transform_reduce_e_high_degree( } } } + if constexpr (update_major) { reduced_e_op_result = BlockReduce(temp_storage).Reduce(reduced_e_op_result, reduce_op); if (threadIdx.x == 0) { *(result_value_output + idx) = reduced_e_op_result; }