Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address FIXMEs #3988

Merged
merged 11 commits into from
Nov 20, 2023
45 changes: 0 additions & 45 deletions cpp/include/cugraph/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,51 +464,6 @@ k_truss_subgraph(raft::handle_t const& handle,
size_t number_of_vertices,
int k);

// FIXME: Internally distances is of int (signed 32-bit) data type, but current
// template uses data from VT, ET, WT from the legacy::GraphCSR View even if weights
// are not considered
/**
* @Synopsis Performs a breadth first search traversal of a graph starting from a vertex.
*
* @throws cugraph::logic_error with a custom message when an error occurs.
*
* @tparam VT Type of vertex identifiers. Supported value : int (signed,
* 32-bit)
* @tparam ET Type of edge identifiers. Supported value : int (signed,
* 32-bit)
* @tparam WT Type of edge weights. Supported values : int (signed, 32-bit)
*
* @param[in] handle Library handle (RAFT). If a communicator is set in the handle,
the multi GPU version will be selected.
* @param[in] graph cuGraph graph descriptor, should contain the connectivity
* information as a CSR
*
* @param[out] distances If set to a valid pointer, this is populated by distance of
* every vertex in the graph from the starting vertex
*
* @param[out] predecessors If set to a valid pointer, this is populated by bfs traversal
* predecessor of every vertex
*
* @param[out] sp_counters If set to a valid pointer, this is populated by bfs traversal
* shortest_path counter of every vertex
*
* @param[in] start_vertex The starting vertex for breadth first search traversal
*
* @param[in] directed Treat the input graph as directed
*
* @param[in] mg_batch If set to true use SG BFS path when comms are initialized.
*
*/
template <typename VT, typename ET, typename WT>
void bfs(raft::handle_t const& handle,
legacy::GraphCSRView<VT, ET, WT> const& graph,
VT* distances,
VT* predecessors,
double* sp_counters,
const VT start_vertex,
bool directed = true,
bool mg_batch = false);

/**
* @brief Compute Hungarian algorithm on a weighted bipartite graph
*
Expand Down
8 changes: 1 addition & 7 deletions cpp/include/cugraph/utilities/device_comm.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -806,9 +806,6 @@ device_sendrecv(raft::comms::comms_t const& comm,
size_t constexpr tuple_size =
thrust::tuple_size<typename thrust::iterator_traits<InputIterator>::value_type>::value;

// FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination
// rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside
// ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8
detail::device_sendrecv_tuple_iterator_element_impl<InputIterator,
OutputIterator,
size_t{0},
Expand Down Expand Up @@ -866,9 +863,6 @@ device_multicast_sendrecv(raft::comms::comms_t const& comm,
size_t constexpr tuple_size =
thrust::tuple_size<typename thrust::iterator_traits<InputIterator>::value_type>::value;

// FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination
// rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside
// ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8
detail::device_multicast_sendrecv_tuple_iterator_element_impl<InputIterator,
OutputIterator,
size_t{0},
Expand Down
98 changes: 74 additions & 24 deletions cpp/include/cugraph/utilities/host_scalar_comm.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -254,19 +254,11 @@ template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, std::vector<T>> host_scalar_allgather(
raft::comms::comms_t const& comm, T input, cudaStream_t stream)
{
std::vector<size_t> rx_counts(comm.get_size(), size_t{1});
std::vector<size_t> displacements(rx_counts.size(), size_t{0});
std::iota(displacements.begin(), displacements.end(), size_t{0});
rmm::device_uvector<T> d_outputs(rx_counts.size(), stream);
rmm::device_uvector<T> d_outputs(comm.get_size(), stream);
raft::update_device(d_outputs.data() + comm.get_rank(), &input, 1, stream);
// FIXME: better use allgather
comm.allgatherv(d_outputs.data() + comm.get_rank(),
d_outputs.data(),
rx_counts.data(),
displacements.data(),
stream);
std::vector<T> h_outputs(rx_counts.size());
raft::update_host(h_outputs.data(), d_outputs.data(), rx_counts.size(), stream);
comm.allgather(d_outputs.data() + comm.get_rank(), d_outputs.data(), size_t{1}, stream);
std::vector<T> h_outputs(d_outputs.size());
raft::update_host(h_outputs.data(), d_outputs.data(), d_outputs.size(), stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");
return h_outputs;
Expand All @@ -277,11 +269,6 @@ std::enable_if_t<cugraph::is_thrust_tuple_of_arithmetic<T>::value, std::vector<T
host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t stream)
{
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
std::vector<size_t> rx_counts(comm.get_size(), tuple_size);
std::vector<size_t> displacements(rx_counts.size(), size_t{0});
for (size_t i = 0; i < displacements.size(); ++i) {
displacements[i] = i * tuple_size;
}
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
rmm::device_uvector<int64_t> d_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size,
stream);
Expand All @@ -292,12 +279,10 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st
h_tuple_scalar_elements.data(),
tuple_size,
stream);
// FIXME: better use allgather
comm.allgatherv(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
d_allgathered_tuple_scalar_elements.data(),
rx_counts.data(),
displacements.data(),
stream);
comm.allgather(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
d_allgathered_tuple_scalar_elements.data(),
tuple_size,
stream);
std::vector<int64_t> h_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size);
raft::update_host(h_allgathered_tuple_scalar_elements.data(),
d_allgathered_tuple_scalar_elements.data(),
Expand All @@ -318,6 +303,71 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st
return ret;
}

template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, T> host_scalar_scatter(
raft::comms::comms_t const& comm,
std::vector<T> const& inputs, // relevant only in root
int root,
cudaStream_t stream)
{
CUGRAPH_EXPECTS(
((comm.get_rank() == root) && (inputs.size() == static_cast<size_t>(comm.get_size()))) ||
((comm.get_rank() != root) && (inputs.size() == 0)),
"inputs.size() should match with comm.get_size() in root and should be 0 otherwise.");
rmm::device_uvector<T> d_outputs(comm.get_size(), stream);
if (comm.get_rank() == root) {
raft::update_device(d_outputs.data(), inputs.data(), inputs.size(), stream);
}
comm.bcast(d_outputs.data(), d_outputs.size(), root, stream);
T h_output{};
raft::update_host(&h_output, d_outputs.data() + comm.get_rank(), 1, stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");
return h_output;
}

template <typename T>
std::enable_if_t<cugraph::is_thrust_tuple_of_arithmetic<T>::value, T> host_scalar_scatter(
raft::comms::comms_t const& comm,
std::vector<T> const& inputs, // relevant only in root
int root,
cudaStream_t stream)
{
CUGRAPH_EXPECTS(
((comm.get_rank() == root) && (inputs.size() == static_cast<size_t>(comm.get_size()))) ||
((comm.get_rank() != root) && (inputs.size() == 0)),
"inputs.size() should match with comm.get_size() in root and should be 0 otherwise.");
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
rmm::device_uvector<int64_t> d_scatter_tuple_scalar_elements(comm.get_size() * tuple_size,
stream);
if (comm.get_rank() == root) {
for (int i = 0; i < comm.get_size(); ++i) {
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
detail::update_vector_of_tuple_scalar_elements_from_tuple_impl<T, size_t{0}, tuple_size>()
.update(h_tuple_scalar_elements, inputs[i]);
raft::update_device(d_scatter_tuple_scalar_elements.data() + i * tuple_size,
h_tuple_scalar_elements.data(),
tuple_size,
stream);
}
}
comm.bcast(
d_scatter_tuple_scalar_elements.data(), d_scatter_tuple_scalar_elements.size(), root, stream);
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
raft::update_host(h_tuple_scalar_elements.data(),
d_scatter_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
tuple_size,
stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");

T ret{};
detail::update_tuple_from_vector_of_tuple_scalar_elements_impl<T, size_t{0}, tuple_size>().update(
ret, h_tuple_scalar_elements);

return ret;
}

// Return value is valid only in root (return value may better be std::optional in C++17 or later)
template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, std::vector<T>> host_scalar_gather(
Expand Down
5 changes: 0 additions & 5 deletions cpp/include/cugraph/utilities/shuffle_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const& comm,

rmm::device_uvector<size_t> d_rx_value_counts(comm_size, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released.
std::vector<size_t> tx_counts(comm_size, size_t{1});
std::vector<size_t> tx_offsets(comm_size);
std::iota(tx_offsets.begin(), tx_offsets.end(), size_t{0});
Expand Down Expand Up @@ -835,7 +834,6 @@ auto shuffle_values(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<TxValueIterator>::value_type>(
rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down Expand Up @@ -889,7 +887,6 @@ auto groupby_gpu_id_and_shuffle_values(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down Expand Up @@ -946,7 +943,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
rx_keys.size(), stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_key_first,
Expand All @@ -959,7 +955,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm,
rx_src_ranks,
stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/centrality/katz_centrality_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ void katz_centrality(
CUGRAPH_EXPECTS(epsilon >= 0.0, "Invalid input argument: epsilon should be non-negative.");

if (do_expensive_check) {
// FIXME: should I check for betas?

if (has_initial_guess) {
auto num_negative_values =
count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) {
Expand Down
40 changes: 8 additions & 32 deletions cpp/src/components/weakly_connected_components_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,16 @@ struct v_op_t {
auto tag = thrust::get<1>(tagged_v);
auto v_offset =
vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(thrust::get<0>(tagged_v));
// FIXME: better switch to atomic_ref after
// https://github.com/nvidia/libcudacxx/milestone/2
auto old =
atomicCAS(level_components + v_offset, invalid_component_id<vertex_type>::value, tag);
if (old != invalid_component_id<vertex_type>::value && old != tag) { // conflict
cuda::atomic_ref<vertex_type> v_component(*(level_components + v_offset));
auto old = invalid_component_id<vertex_type>::value;
bool success = v_component.compare_exchange_strong(old, tag, cuda::std::memory_order_relaxed);
if (!success && (old != tag)) { // conflict
return thrust::make_tuple(thrust::optional<size_t>{bucket_idx_conflict},
thrust::optional<std::byte>{std::byte{0}} /* dummy */);
} else {
auto update = (old == invalid_component_id<vertex_type>::value);
return thrust::make_tuple(
update ? thrust::optional<size_t>{bucket_idx_next} : thrust::nullopt,
update ? thrust::optional<std::byte>{std::byte{0}} /* dummy */ : thrust::nullopt);
success ? thrust::optional<size_t>{bucket_idx_next} : thrust::nullopt,
success ? thrust::optional<std::byte>{std::byte{0}} /* dummy */ : thrust::nullopt);
}
}

Expand Down Expand Up @@ -457,33 +455,11 @@ void weakly_connected_components_impl(raft::handle_t const& handle,
std::numeric_limits<vertex_t>::max());
}

// FIXME: we need to add host_scalar_scatter
#if 1
rmm::device_uvector<vertex_t> d_counts(comm_size, handle.get_stream());
raft::update_device(d_counts.data(),
init_max_new_root_counts.data(),
init_max_new_root_counts.size(),
handle.get_stream());
device_bcast(
comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream());
raft::update_host(
&init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream());
#else
init_max_new_roots =
host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream());
#endif
host_scalar_scatter(comm, init_max_new_root_counts, int{0}, handle.get_stream());
} else {
// FIXME: we need to add host_scalar_scatter
#if 1
rmm::device_uvector<vertex_t> d_counts(comm_size, handle.get_stream());
device_bcast(
comm, d_counts.data(), d_counts.data(), d_counts.size(), int{0}, handle.get_stream());
raft::update_host(
&init_max_new_roots, d_counts.data() + comm_rank, size_t{1}, handle.get_stream());
#else
init_max_new_roots =
host_scalar_scatter(comm, init_max_new_root_counts.data(), int{0}, handle.get_stream());
#endif
host_scalar_scatter(comm, std::vector<vertex_t>{}, int{0}, handle.get_stream());
}

handle.sync_stream();
Expand Down