diff --git a/cpp/src/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh b/cpp/src/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh index bcf7606c423..f06fdf513bf 100644 --- a/cpp/src/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh +++ b/cpp/src/prims/transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v.cuh @@ -259,8 +259,6 @@ void transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v( typename EdgeDstValueInputWrapper::value_iterator, typename EdgeDstValueInputWrapper::value_type>>; - CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented."); - if (do_expensive_check) { // currently, nothing to do. } @@ -271,6 +269,7 @@ void transform_reduce_dst_nbr_intersection_of_e_endpoints_by_v( init); auto edge_mask_view = graph_view.edge_mask_view(); + for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( diff --git a/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh b/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh index 77bf195b4d7..e2a5bf45714 100644 --- a/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh +++ b/cpp/src/prims/transform_reduce_e_by_src_dst_key.cuh @@ -95,6 +95,7 @@ template __global__ void transform_reduce_by_src_dst_key_hypersparse( @@ -105,6 +106,9 @@ __global__ void transform_reduce_by_src_dst_key_hypersparse( EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, EdgePartitionSrcDstKeyInputWrapper edge_partition_src_dst_key_input, + EdgePartitionEdgeMaskWrapper edge_partition_e_mask, + thrust::optional> + edge_offsets_with_mask, EdgeOp e_op, typename GraphViewType::vertex_type* keys, ValueIterator value_iter) @@ -129,19 +133,42 @@ __global__ void transform_reduce_by_src_dst_key_hypersparse( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(static_cast(major_idx)); - auto local_offset = edge_partition.local_offset(major_idx); - for (edge_t i = 0; i < local_degree; ++i) { - update_buffer_element(edge_partition, - major, - indices[i], - edge_offset + i, - edge_partition_src_value_input, - edge_partition_dst_value_input, - edge_partition_e_value_input, - edge_partition_src_dst_key_input, - e_op, - keys + local_offset + i, - value_iter + local_offset + i); + if (edge_partition_e_mask) { + auto major_offset = edge_partition.major_offset_from_major_nocheck(major); + auto edge_offset_with_mask = (*edge_offsets_with_mask)[major_offset]; + edge_t counter{0}; + for (edge_t i = 0; i < local_degree; ++i) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset_with_mask + counter, + value_iter + edge_offset_with_mask + counter); + ++counter; + } + } + } else { + for (edge_t i = 0; i < local_degree; ++i) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset + i, + value_iter + edge_offset + i); + } } idx += gridDim.x * blockDim.x; @@ -154,6 +181,7 @@ template __global__ void transform_reduce_by_src_dst_key_low_degree( @@ -166,6 +194,9 @@ __global__ void transform_reduce_by_src_dst_key_low_degree( EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, EdgePartitionSrcDstKeyInputWrapper edge_partition_src_dst_key_input, + EdgePartitionEdgeMaskWrapper edge_partition_e_mask, + thrust::optional> + edge_offsets_with_mask, EdgeOp e_op, typename GraphViewType::vertex_type* keys, ValueIterator value_iter) @@ -187,19 +218,41 @@ __global__ void transform_reduce_by_src_dst_key_low_degree( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(static_cast(major_offset)); - auto local_offset = edge_partition.local_offset(major_offset); - for (edge_t i = 0; i < local_degree; ++i) { - update_buffer_element(edge_partition, - major, - indices[i], - edge_offset + i, - edge_partition_src_value_input, - edge_partition_dst_value_input, - edge_partition_e_value_input, - edge_partition_src_dst_key_input, - e_op, - keys + local_offset + i, - value_iter + local_offset + i); + if (edge_partition_e_mask) { + auto edge_offset_with_mask = (*edge_offsets_with_mask)[major_offset]; + edge_t counter{0}; + for (edge_t i = 0; i < local_degree; ++i) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset_with_mask + counter, + value_iter + edge_offset_with_mask + counter); + ++counter; + } + } + } else { + for (edge_t i = 0; i < local_degree; ++i) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset + i, + value_iter + edge_offset + i); + } } idx += gridDim.x * blockDim.x; @@ -212,6 +265,7 @@ template __global__ void transform_reduce_by_src_dst_key_mid_degree( @@ -224,6 +278,9 @@ __global__ void transform_reduce_by_src_dst_key_mid_degree( EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, EdgePartitionSrcDstKeyInputWrapper edge_partition_src_dst_key_input, + EdgePartitionEdgeMaskWrapper edge_partition_e_mask, + thrust::optional> + edge_offsets_with_mask, EdgeOp e_op, typename GraphViewType::vertex_type* keys, ValueIterator value_iter) @@ -238,6 +295,9 @@ __global__ void transform_reduce_by_src_dst_key_mid_degree( static_cast(major_range_first - edge_partition.major_range_first()); size_t idx = static_cast(tid / raft::warp_size()); + using WarpScan = cub::WarpScan; + __shared__ typename WarpScan::TempStorage temp_storage; + while (idx < static_cast(major_range_last - major_range_first)) { auto major_offset = major_start_offset + idx; auto major = @@ -247,19 +307,49 @@ __global__ void transform_reduce_by_src_dst_key_mid_degree( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(static_cast(major_offset)); - auto local_offset = edge_partition.local_offset(major_offset); - for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { - update_buffer_element(edge_partition, - major, - indices[i], - edge_offset + i, - edge_partition_src_value_input, - edge_partition_dst_value_input, - edge_partition_e_value_input, - edge_partition_src_dst_key_input, - e_op, - keys + local_offset + i, - value_iter + local_offset + i); + if (edge_partition_e_mask) { + // FIXME: it might be faster to update in warp-sync way + auto edge_offset_with_mask = (*edge_offsets_with_mask)[major_offset]; + edge_t counter{0}; + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { ++counter; } + } + edge_t offset_within_warp{}; + WarpScan(temp_storage).ExclusiveSum(counter, offset_within_warp); + edge_offset_with_mask += offset_within_warp; + counter = 0; + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset_with_mask + counter, + value_iter + edge_offset_with_mask + counter); + ++counter; + } + } + } else { + for (edge_t i = lane_id; i < local_degree; i += raft::warp_size()) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset + i, + value_iter + edge_offset + i); + } } idx += gridDim.x * (blockDim.x / raft::warp_size()); @@ -272,6 +362,7 @@ template __global__ void transform_reduce_by_src_dst_key_high_degree( @@ -284,6 +375,9 @@ __global__ void transform_reduce_by_src_dst_key_high_degree( EdgePartitionDstValueInputWrapper edge_partition_dst_value_input, EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input, EdgePartitionSrcDstKeyInputWrapper edge_partition_src_dst_key_input, + EdgePartitionEdgeMaskWrapper edge_partition_e_mask, + thrust::optional> + edge_offsets_with_mask, EdgeOp e_op, typename GraphViewType::vertex_type* keys, ValueIterator value_iter) @@ -295,6 +389,9 @@ __global__ void transform_reduce_by_src_dst_key_high_degree( static_cast(major_range_first - edge_partition.major_range_first()); auto idx = static_cast(blockIdx.x); + using BlockScan = cub::BlockScan; + __shared__ typename BlockScan::TempStorage temp_storage; + while (idx < static_cast(major_range_last - major_range_first)) { auto major_offset = major_start_offset + idx; auto major = @@ -304,19 +401,49 @@ __global__ void transform_reduce_by_src_dst_key_high_degree( edge_t local_degree{}; thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(static_cast(major_offset)); - auto local_offset = edge_partition.local_offset(major_offset); - for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { - update_buffer_element(edge_partition, - major, - indices[i], - edge_offset + i, - edge_partition_src_value_input, - edge_partition_dst_value_input, - edge_partition_e_value_input, - edge_partition_src_dst_key_input, - e_op, - keys + local_offset + i, - value_iter + local_offset + i); + if (edge_partition_e_mask) { + // FIXME: it might be faster to update in block-sync way + auto edge_offset_with_mask = (*edge_offsets_with_mask)[major_offset]; + edge_t counter{0}; + for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { ++counter; } + } + edge_t offset_within_block{}; + BlockScan(temp_storage).ExclusiveSum(counter, offset_within_block); + edge_offset_with_mask += offset_within_block; + counter = 0; + for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { + if ((*edge_partition_e_mask).get(edge_offset + i)) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset_with_mask + counter, + value_iter + edge_offset_with_mask + counter); + ++counter; + } + } + } else { + for (edge_t i = threadIdx.x; i < local_degree; i += blockDim.x) { + update_buffer_element( + edge_partition, + major, + indices[i], + edge_offset + i, + edge_partition_src_value_input, + edge_partition_dst_value_input, + edge_partition_e_value_input, + edge_partition_src_dst_key_input, + e_op, + keys + edge_offset + i, + value_iter + edge_offset + i); + } } idx += gridDim.x; @@ -410,19 +537,41 @@ transform_reduce_e_by_src_dst_key(raft::handle_t const& handle, typename EdgeSrcDstKeyInputWrapper::value_iterator, typename EdgeSrcDstKeyInputWrapper::value_type>; + auto edge_mask_view = graph_view.edge_mask_view(); + rmm::device_uvector keys(0, handle.get_stream()); auto value_buffer = allocate_dataframe_buffer(0, handle.get_stream()); for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) { auto edge_partition = edge_partition_device_view_t( graph_view.local_edge_partition_view(i)); - - auto num_edges = edge_partition.number_of_edges(); - - rmm::device_uvector tmp_keys(num_edges, handle.get_stream()); + auto edge_partition_e_mask = + edge_mask_view + ? thrust::make_optional< + detail::edge_partition_edge_property_device_view_t>( + *edge_mask_view, i) + : thrust::nullopt; + + rmm::device_uvector tmp_keys(0, handle.get_stream()); + std::optional> edge_offsets_with_mask{std::nullopt}; + if (edge_partition_e_mask) { + auto local_degrees = edge_partition.compute_local_degrees_with_mask( + (*edge_partition_e_mask).value_first(), handle.get_stream()); + edge_offsets_with_mask = + rmm::device_uvector(edge_partition.major_range_size() + 1, handle.get_stream()); + (*edge_offsets_with_mask).set_element_to_zero_async(0, handle.get_stream()); + thrust::inclusive_scan(handle.get_thrust_policy(), + local_degrees.begin(), + local_degrees.end(), + (*edge_offsets_with_mask).begin() + 1); + tmp_keys.resize((*edge_offsets_with_mask).back_element(handle.get_stream()), + handle.get_stream()); + } else { + tmp_keys.resize(edge_partition.number_of_edges(), handle.get_stream()); + } auto tmp_value_buffer = allocate_dataframe_buffer(tmp_keys.size(), handle.get_stream()); - if (num_edges > 0) { + if (tmp_keys.size() > 0) { edge_partition_src_input_device_view_t edge_partition_src_value_input{}; edge_partition_dst_input_device_view_t edge_partition_dst_value_input{}; if constexpr (GraphViewType::is_storage_transposed) { @@ -467,6 +616,11 @@ transform_reduce_e_by_src_dst_key(raft::handle_t const& handle, edge_partition_dst_value_input, edge_partition_e_value_input, edge_partition_src_dst_key_input, + edge_partition_e_mask, + edge_offsets_with_mask + ? thrust::make_optional>( + (*edge_offsets_with_mask).data(), (*edge_offsets_with_mask).size()) + : thrust::nullopt, e_op, tmp_keys.data(), get_dataframe_buffer_begin(tmp_value_buffer)); @@ -485,6 +639,11 @@ transform_reduce_e_by_src_dst_key(raft::handle_t const& handle, edge_partition_dst_value_input, edge_partition_e_value_input, edge_partition_src_dst_key_input, + edge_partition_e_mask, + edge_offsets_with_mask + ? thrust::make_optional>( + (*edge_offsets_with_mask).data(), (*edge_offsets_with_mask).size()) + : thrust::nullopt, e_op, tmp_keys.data(), get_dataframe_buffer_begin(tmp_value_buffer)); @@ -503,6 +662,11 @@ transform_reduce_e_by_src_dst_key(raft::handle_t const& handle, edge_partition_dst_value_input, edge_partition_e_value_input, edge_partition_src_dst_key_input, + edge_partition_e_mask, + edge_offsets_with_mask + ? thrust::make_optional>( + (*edge_offsets_with_mask).data(), (*edge_offsets_with_mask).size()) + : thrust::nullopt, e_op, tmp_keys.data(), get_dataframe_buffer_begin(tmp_value_buffer)); @@ -520,6 +684,11 @@ transform_reduce_e_by_src_dst_key(raft::handle_t const& handle, edge_partition_dst_value_input, edge_partition_e_value_input, edge_partition_src_dst_key_input, + edge_partition_e_mask, + edge_offsets_with_mask + ? thrust::make_optional>( + (*edge_offsets_with_mask).data(), (*edge_offsets_with_mask).size()) + : thrust::nullopt, e_op, tmp_keys.data(), get_dataframe_buffer_begin(tmp_value_buffer)); @@ -539,6 +708,11 @@ transform_reduce_e_by_src_dst_key(raft::handle_t const& handle, edge_partition_dst_value_input, edge_partition_e_value_input, edge_partition_src_dst_key_input, + edge_partition_e_mask, + edge_offsets_with_mask + ? thrust::make_optional>( + (*edge_offsets_with_mask).data(), (*edge_offsets_with_mask).size()) + : thrust::nullopt, e_op, tmp_keys.data(), get_dataframe_buffer_begin(tmp_value_buffer));