Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update count_if_e, transform_reduce_e, and transform_e to support edge masking #4001

Merged
merged 17 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/include/cugraph/utilities/misc_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <raft/util/cudart_utils.hpp>
#include <rmm/device_uvector.hpp>

#include <cuda/atomic>
#include <thrust/binary_search.h>
#include <thrust/gather.h>
#include <thrust/iterator/counting_iterator.h>
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/prims/count_if_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ typename GraphViewType::edge_type count_if_e(raft::handle_t const& handle,
using vertex_t = typename GraphViewType::vertex_type;
using edge_t = typename GraphViewType::edge_type;

CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented.");

if (do_expensive_check) {
// currently, nothing to do
}
Expand Down
73 changes: 65 additions & 8 deletions cpp/src/prims/fill_edge_property.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
#pragma once

#include <cugraph/edge_partition_edge_property_device_view.cuh>
#include <cugraph/edge_property.hpp>
#include <cugraph/graph_view.hpp>
#include <cugraph/utilities/error.hpp>
Expand All @@ -23,6 +24,7 @@
#include <rmm/exec_policy.hpp>

#include <thrust/fill.h>
#include <thrust/iterator/constant_iterator.h>

#include <cstddef>

Expand All @@ -38,21 +40,78 @@ void fill_edge_property(raft::handle_t const& handle,
{
static_assert(std::is_same_v<T, typename EdgePropertyOutputWrapper::value_type>);

using edge_t = typename GraphViewType::edge_type;

auto edge_mask_view = graph_view.edge_mask_view();

auto value_firsts = edge_property_output.value_firsts();
auto edge_counts = edge_property_output.edge_counts();
for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) {
auto edge_partition_e_mask =
edge_mask_view
? thrust::make_optional<
detail::edge_partition_edge_property_device_view_t<edge_t, uint32_t const*, bool>>(
*edge_mask_view, i)
: thrust::nullopt;

if constexpr (cugraph::has_packed_bool_element<
std::remove_reference_t<decltype(value_firsts[i])>,
T>()) {
static_assert(std::is_arithmetic_v<T>, "unimplemented for thrust::tuple types.");
auto packed_input = input ? packed_bool_full_mask() : packed_bool_empty_mask();
thrust::fill_n(handle.get_thrust_policy(),
value_firsts[i],
packed_bool_size(static_cast<size_t>(edge_counts[i])),
packed_input);
auto rem = edge_counts[i] % packed_bools_per_word();
if (edge_partition_e_mask) {
auto input_first =
thrust::make_zip_iterator(value_firsts[i], (*edge_partition_e_mask).value_first());
thrust::transform(handle.get_thrust_policy(),
input_first,
input_first + packed_bool_size(static_cast<size_t>(edge_counts[i] - rem)),
value_firsts[i],
[packed_input] __device__(thrust::tuple<T, uint32_t> pair) {
auto old_value = thrust::get<0>(pair);
auto mask = thrust::get<1>(pair);
return (old_value & ~mask) | (packed_input & mask);
});
if (rem > 0) {
thrust::transform(
handle.get_thrust_policy(),
input_first + packed_bool_size(static_cast<size_t>(edge_counts[i] - rem)),
input_first + packed_bool_size(static_cast<size_t>(edge_counts[i])),
value_firsts[i] + packed_bool_size(static_cast<size_t>(edge_counts[i] - rem)),
[packed_input, rem] __device__(thrust::tuple<T, uint32_t> pair) {
auto old_value = thrust::get<0>(pair);
auto mask = thrust::get<1>(pair);
return ((old_value & ~mask) | (packed_input & mask)) & packed_bool_partial_mask(rem);
});
}
} else {
thrust::fill_n(handle.get_thrust_policy(),
value_firsts[i],
packed_bool_size(static_cast<size_t>(edge_counts[i] - rem)),
packed_input);
if (rem > 0) {
thrust::fill_n(
handle.get_thrust_policy(),
value_firsts[i] + packed_bool_size(static_cast<size_t>(edge_counts[i] - rem)),
1,
packed_input & packed_bool_partial_mask(rem));
}
}
} else {
thrust::fill_n(
handle.get_thrust_policy(), value_firsts[i], static_cast<size_t>(edge_counts[i]), input);
if (edge_partition_e_mask) {
thrust::transform_if(handle.get_thrust_policy(),
thrust::make_constant_iterator(input),
thrust::make_constant_iterator(input) + edge_counts[i],
thrust::make_counting_iterator(edge_t{0}),
value_firsts[i],
thrust::identity<T>{},
[edge_partition_e_mask = *edge_partition_e_mask] __device__(edge_t i) {
return edge_partition_e_mask.get(i);
});
} else {
thrust::fill_n(
handle.get_thrust_policy(), value_firsts[i], static_cast<size_t>(edge_counts[i]), input);
}
}
}
}
Expand All @@ -79,8 +138,6 @@ void fill_edge_property(raft::handle_t const& handle,
edge_property_t<GraphViewType, T>& edge_property_output,
bool do_expensive_check = false)
{
CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented.");

if (do_expensive_check) {
// currently, nothing to do
}
Expand Down
123 changes: 85 additions & 38 deletions cpp/src/prims/transform_e.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
#pragma once

#include <cugraph/edge_partition_device_view.cuh>
#include <cugraph/edge_partition_edge_property_device_view.cuh>
#include <cugraph/edge_partition_endpoint_property_device_view.cuh>
#include <cugraph/edge_src_dst_property.hpp>
#include <cugraph/graph_view.hpp>
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/mask_utils.cuh>
#include <cugraph/utilities/packed_bool_utils.hpp>

#include <raft/core/handle.hpp>
Expand All @@ -44,6 +46,7 @@ template <typename GraphViewType,
typename EdgePartitionSrcValueInputWrapper,
typename EdgePartitionDstValueInputWrapper,
typename EdgePartitionEdgeValueInputWrapper,
typename EdgePartitionEdgeMaskWrapper,
typename EdgePartitionEdgeValueOutputWrapper,
typename EdgeOp>
__global__ void transform_e_packed_bool(
Expand All @@ -53,6 +56,7 @@ __global__ void transform_e_packed_bool(
EdgePartitionSrcValueInputWrapper edge_partition_src_value_input,
EdgePartitionDstValueInputWrapper edge_partition_dst_value_input,
EdgePartitionEdgeValueInputWrapper edge_partition_e_value_input,
thrust::optional<EdgePartitionEdgeMaskWrapper> edge_partition_e_mask,
EdgePartitionEdgeValueOutputWrapper edge_partition_e_value_output,
EdgeOp e_op)
{
Expand All @@ -68,11 +72,14 @@ __global__ void transform_e_packed_bool(

auto num_edges = edge_partition.number_of_edges();
while (idx < static_cast<edge_t>(packed_bool_size(num_edges))) {
auto edge_mask = packed_bool_full_mask();
if (edge_partition_e_mask) { edge_mask = *((*edge_partition_e_mask).value_first() + idx); }

auto local_edge_idx =
idx * static_cast<edge_t>(packed_bools_per_word()) + static_cast<edge_t>(lane_id);
uint32_t mask{0};
int predicate{0};
if (local_edge_idx < num_edges) {

if ((local_edge_idx < num_edges) && (edge_mask & packed_bool_mask(lane_id))) {
auto major_idx = edge_partition.major_idx_from_local_edge_idx_nocheck(local_edge_idx);
auto major = edge_partition.major_from_major_idx_nocheck(major_idx);
auto major_offset = edge_partition.major_offset_from_major_nocheck(major);
Expand All @@ -91,8 +98,15 @@ __global__ void transform_e_packed_bool(
? int{1}
: int{0};
}
mask = __ballot_sync(uint32_t{0xffffffff}, predicate);
if (lane_id == 0) { *(edge_partition_e_value_output.value_first() + idx) = mask; }
uint32_t new_val = __ballot_sync(uint32_t{0xffffffff}, predicate);
if (lane_id == 0) {
if (edge_mask == packed_bool_full_mask()) {
*(edge_partition_e_value_output.value_first() + idx) = new_val;
} else {
auto old_val = *(edge_partition_e_value_output.value_first() + idx);
*(edge_partition_e_value_output.value_first() + idx) = (old_val & ~edge_mask) | new_val;
}
}

idx += static_cast<edge_t>(gridDim.x * (blockDim.x / raft::warp_size()));
}
Expand Down Expand Up @@ -178,12 +192,18 @@ void transform_e(raft::handle_t const& handle,
typename EdgeValueOutputWrapper::value_iterator,
typename EdgeValueOutputWrapper::value_type>;

CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented.");
auto edge_mask_view = graph_view.edge_mask_view();

for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) {
auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
graph_view.local_edge_partition_view(i));
auto edge_partition_e_mask =
edge_mask_view
? thrust::make_optional<
detail::edge_partition_edge_property_device_view_t<edge_t, uint32_t const*, bool>>(
*edge_mask_view, i)
: thrust::nullopt;

edge_partition_src_input_device_view_t edge_partition_src_value_input{};
edge_partition_dst_input_device_view_t edge_partition_dst_value_input{};
Expand Down Expand Up @@ -214,35 +234,40 @@ void transform_e(raft::handle_t const& handle,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_e_value_input,
edge_partition_e_mask,
edge_partition_e_value_output,
e_op);
}
} else {
thrust::transform(
thrust::for_each(
handle.get_thrust_policy(),
thrust::make_counting_iterator(edge_t{0}),
thrust::make_counting_iterator(num_edges),
edge_partition_e_value_output.value_first(),
[e_op,
edge_partition,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_e_value_input] __device__(edge_t i) {
auto major_idx = edge_partition.major_idx_from_local_edge_idx_nocheck(i);
auto major = edge_partition.major_from_major_idx_nocheck(major_idx);
auto major_offset = edge_partition.major_offset_from_major_nocheck(major);
auto minor = *(edge_partition.indices() + i);
auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor);

auto src = GraphViewType::is_storage_transposed ? minor : major;
auto dst = GraphViewType::is_storage_transposed ? major : minor;
auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset;
auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset;
return e_op(src,
dst,
edge_partition_src_value_input.get(src_offset),
edge_partition_dst_value_input.get(dst_offset),
edge_partition_e_value_input.get(i));
edge_partition_e_value_input,
edge_partition_e_mask,
edge_partition_e_value_output] __device__(edge_t i) {
if (!edge_partition_e_mask || (*edge_partition_e_mask).get(i)) {
auto major_idx = edge_partition.major_idx_from_local_edge_idx_nocheck(i);
auto major = edge_partition.major_from_major_idx_nocheck(major_idx);
auto major_offset = edge_partition.major_offset_from_major_nocheck(major);
auto minor = *(edge_partition.indices() + i);
auto minor_offset = edge_partition.minor_offset_from_minor_nocheck(minor);

auto src = GraphViewType::is_storage_transposed ? minor : major;
auto dst = GraphViewType::is_storage_transposed ? major : minor;
auto src_offset = GraphViewType::is_storage_transposed ? minor_offset : major_offset;
auto dst_offset = GraphViewType::is_storage_transposed ? major_offset : minor_offset;
auto e_op_result = e_op(src,
dst,
edge_partition_src_value_input.get(src_offset),
edge_partition_dst_value_input.get(dst_offset),
edge_partition_e_value_input.get(i));
edge_partition_e_value_output.set(i, e_op_result);
}
});
}
}
Expand Down Expand Up @@ -336,14 +361,12 @@ void transform_e(raft::handle_t const& handle,
typename EdgeValueOutputWrapper::value_iterator,
typename EdgeValueOutputWrapper::value_type>;

CUGRAPH_EXPECTS(!graph_view.has_edge_mask(), "unimplemented.");

auto major_first =
GraphViewType::is_storage_transposed ? edge_list.dst_begin() : edge_list.src_begin();
auto minor_first =
GraphViewType::is_storage_transposed ? edge_list.src_begin() : edge_list.dst_begin();

auto edge_first = thrust::make_zip_iterator(thrust::make_tuple(major_first, minor_first));
auto edge_first = thrust::make_zip_iterator(major_first, minor_first);

if (do_expensive_check) {
CUGRAPH_EXPECTS(
Expand Down Expand Up @@ -382,18 +405,27 @@ void transform_e(raft::handle_t const& handle,
edge_partition_offsets.back() = edge_list.size();
}

auto edge_mask_view = graph_view.edge_mask_view();

for (size_t i = 0; i < graph_view.number_of_local_edge_partitions(); ++i) {
auto edge_partition =
edge_partition_device_view_t<vertex_t, edge_t, GraphViewType::is_multi_gpu>(
graph_view.local_edge_partition_view(i));
auto edge_partition_e_mask =
edge_mask_view
? thrust::make_optional<
detail::edge_partition_edge_property_device_view_t<edge_t, uint32_t const*, bool>>(
*edge_mask_view, i)
: thrust::nullopt;

if (do_expensive_check) {
CUGRAPH_EXPECTS(
thrust::count_if(
handle.get_thrust_policy(),
edge_first + edge_partition_offsets[i],
edge_first + edge_partition_offsets[i + 1],
[edge_partition] __device__(thrust::tuple<vertex_t, vertex_t> edge) {
[edge_partition,
edge_partition_e_mask] __device__(thrust::tuple<vertex_t, vertex_t> edge) {
auto major = thrust::get<0>(edge);
auto minor = thrust::get<1>(edge);
vertex_t major_idx{};
Expand All @@ -416,8 +448,19 @@ void transform_e(raft::handle_t const& handle,
edge_t edge_offset{};
edge_t local_degree{};
thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_idx);
auto it = thrust::lower_bound(thrust::seq, indices, indices + local_degree, minor);
return *it != minor;
auto lower_it =
thrust::lower_bound(thrust::seq, indices, indices + local_degree, minor);
if (*lower_it != minor) { return true; }
if (edge_partition_e_mask) {
auto upper_it =
thrust::upper_bound(thrust::seq, lower_it, indices + local_degree, minor);
if (detail::count_set_bits((*edge_partition_e_mask).value_first(),
edge_offset + thrust::distance(indices, lower_it),
thrust::distance(lower_it, upper_it)) == 0) {
return true;
}
}
return false;
}) == 0,
"Invalid input arguments: edge_list contains edges that do not exist in the input graph.");
}
Expand Down Expand Up @@ -446,6 +489,7 @@ void transform_e(raft::handle_t const& handle,
edge_partition_src_value_input,
edge_partition_dst_value_input,
edge_partition_e_value_input,
edge_partition_e_mask,
edge_partition_e_value_output] __device__(thrust::tuple<vertex_t, vertex_t> edge) {
auto major = thrust::get<0>(edge);
auto minor = thrust::get<1>(edge);
Expand All @@ -469,7 +513,7 @@ void transform_e(raft::handle_t const& handle,
edge_t local_degree{};
thrust::tie(indices, edge_offset, local_degree) = edge_partition.local_edges(major_idx);
auto lower_it = thrust::lower_bound(thrust::seq, indices, indices + local_degree, minor);
auto upper_it = thrust::upper_bound(thrust::seq, indices, indices + local_degree, minor);
auto upper_it = thrust::upper_bound(thrust::seq, lower_it, indices + local_degree, minor);

auto src = GraphViewType::is_storage_transposed ? minor : major;
auto dst = GraphViewType::is_storage_transposed ? major : minor;
Expand All @@ -478,14 +522,17 @@ void transform_e(raft::handle_t const& handle,

for (auto it = lower_it; it != upper_it; ++it) {
assert(*it == minor);
auto e_op_result =
e_op(src,
dst,
edge_partition_src_value_input.get(src_offset),
edge_partition_dst_value_input.get(dst_offset),
edge_partition_e_value_input.get(edge_offset + thrust::distance(indices, it)));
edge_partition_e_value_output.set(edge_offset + thrust::distance(indices, it),
e_op_result);
if (!edge_partition_e_mask ||
((*edge_partition_e_mask).get(edge_offset + thrust::distance(indices, it)))) {
auto e_op_result =
e_op(src,
dst,
edge_partition_src_value_input.get(src_offset),
edge_partition_dst_value_input.get(dst_offset),
edge_partition_e_value_input.get(edge_offset + thrust::distance(indices, it)));
edge_partition_e_value_output.set(edge_offset + thrust::distance(indices, it),
e_op_result);
}
}
});
}
Expand Down
Loading
Loading