diff --git a/cpp/src/c_api/capi_helper.cu b/cpp/src/c_api/capi_helper.cu index af0163b0512..0ee49f87265 100644 --- a/cpp/src/c_api/capi_helper.cu +++ b/cpp/src/c_api/capi_helper.cu @@ -44,7 +44,7 @@ shuffle_vertex_ids_and_offsets(raft::handle_t const& handle, thrust::make_zip_iterator(ids.end(), vertices.end())); auto return_offsets = cugraph::detail::compute_sparse_offsets( - ids.begin(), ids.end(), size_t{0}, size_t{offsets.size() - 1}, handle.get_stream()); + ids.begin(), ids.end(), size_t{0}, size_t{offsets.size() - 1}, true, handle.get_stream()); return std::make_tuple(std::move(vertices), std::move(return_offsets)); } diff --git a/cpp/src/structure/create_graph_from_edgelist_impl.cuh b/cpp/src/structure/create_graph_from_edgelist_impl.cuh index 0d4b12a3e38..8dd587e1661 100644 --- a/cpp/src/structure/create_graph_from_edgelist_impl.cuh +++ b/cpp/src/structure/create_graph_from_edgelist_impl.cuh @@ -510,7 +510,18 @@ create_graph_from_edgelist_impl( auto use_dcs = num_segments_per_vertex_partition > (detail::num_sparse_segments_per_vertex_partition + 2); - // 4. compress edge list (COO) to CSR (or CSC) or CSR + DCSR (CSC + DCSC) hybrid + // 4. sort and compress edge list (COO) to CSR (or CSC) or CSR + DCSR (CSC + DCSC) hybrid + + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + size_t element_size = sizeof(vertex_t) * 2; + if (edgelist_weights) { element_size += sizeof(weight_t); } + if (edgelist_edge_ids) { element_size += sizeof(edge_id_t); } + if (edgelist_edge_types) { element_size += sizeof(edge_type_t); } + auto constexpr mem_frugal_ratio = + 0.25; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the + // total_global_mem, switch to the memory frugal approach + auto mem_frugal_threshold = + static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); std::vector> edge_partition_offsets; std::vector> edge_partition_indices; @@ -559,154 +570,139 @@ create_graph_from_edgelist_impl( if (edgelist_weights) { if (edgelist_edge_ids) { if (edgelist_edge_types) { - auto edge_value_first = - thrust::make_zip_iterator((*edge_partition_edgelist_weights)[i].begin(), - (*edge_partition_edgelist_edge_ids)[i].begin(), - (*edge_partition_edgelist_edge_types)[i].begin()); std::forward_as_tuple( offsets, indices, std::tie(weights, edge_ids, edge_types), dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::make_tuple(std::move((*edge_partition_edgelist_weights)[i]), + std::move((*edge_partition_edgelist_edge_ids)[i]), + std::move((*edge_partition_edgelist_edge_types)[i])), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } else { - auto edge_value_first = - thrust::make_zip_iterator((*edge_partition_edgelist_weights)[i].begin(), - (*edge_partition_edgelist_edge_ids)[i].begin()); std::forward_as_tuple(offsets, indices, std::tie(weights, edge_ids), dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::make_tuple(std::move((*edge_partition_edgelist_weights)[i]), + std::move((*edge_partition_edgelist_edge_ids)[i])), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } } else { if (edgelist_edge_types) { - auto edge_value_first = - thrust::make_zip_iterator((*edge_partition_edgelist_weights)[i].begin(), - (*edge_partition_edgelist_edge_types)[i].begin()); std::forward_as_tuple(offsets, indices, std::tie(weights, edge_types), dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::make_tuple(std::move((*edge_partition_edgelist_weights)[i]), + std::move((*edge_partition_edgelist_edge_types)[i])), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } else { - auto edge_value_first = (*edge_partition_edgelist_weights)[i].begin(); std::forward_as_tuple(offsets, indices, weights, dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::move((*edge_partition_edgelist_weights)[i]), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } } } else { if (edgelist_edge_ids) { if (edgelist_edge_types) { - auto edge_value_first = - thrust::make_zip_iterator((*edge_partition_edgelist_edge_ids)[i].begin(), - (*edge_partition_edgelist_edge_types)[i].begin()); std::forward_as_tuple( offsets, indices, std::tie(edge_ids, edge_types), dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::make_tuple(std::move((*edge_partition_edgelist_edge_ids)[i]), + std::move((*edge_partition_edgelist_edge_types)[i])), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } else { - auto edge_value_first = (*edge_partition_edgelist_edge_ids)[i].begin(); std::forward_as_tuple(offsets, indices, edge_ids, dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::move((*edge_partition_edgelist_edge_ids)[i]), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } } else { if (edgelist_edge_types) { - auto edge_value_first = (*edge_partition_edgelist_edge_types)[i].begin(); std::forward_as_tuple(offsets, indices, edge_types, dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), - edge_value_first, + detail::sort_and_compress_edgelist( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), + std::move((*edge_partition_edgelist_edge_types)[i]), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } else { std::forward_as_tuple(offsets, indices, dcs_nzd_vertices) = - detail::compress_edgelist( - edge_partition_edgelist_srcs[i].begin(), - edge_partition_edgelist_srcs[i].end(), - edge_partition_edgelist_dsts[i].begin(), + detail::sort_and_compress_edgelist( + std::move(edge_partition_edgelist_srcs[i]), + std::move(edge_partition_edgelist_dsts[i]), major_range_first, major_hypersparse_first, major_range_last, minor_range_first, minor_range_last, + mem_frugal_threshold, handle.get_stream()); } } } - edge_partition_edgelist_srcs[i].resize(0, handle.get_stream()); - edge_partition_edgelist_srcs[i].shrink_to_fit(handle.get_stream()); - edge_partition_edgelist_dsts[i].resize(0, handle.get_stream()); - edge_partition_edgelist_dsts[i].shrink_to_fit(handle.get_stream()); - if (edge_partition_edgelist_weights) { - (*edge_partition_edgelist_weights)[i].resize(0, handle.get_stream()); - (*edge_partition_edgelist_weights)[i].shrink_to_fit(handle.get_stream()); - } - if (edge_partition_edgelist_edge_ids) { - (*edge_partition_edgelist_edge_ids)[i].resize(0, handle.get_stream()); - (*edge_partition_edgelist_edge_ids)[i].shrink_to_fit(handle.get_stream()); - } - if (edge_partition_edgelist_edge_types) { - (*edge_partition_edgelist_edge_types)[i].resize(0, handle.get_stream()); - (*edge_partition_edgelist_edge_types)[i].shrink_to_fit(handle.get_stream()); - } edge_partition_offsets.push_back(std::move(offsets)); edge_partition_indices.push_back(std::move(indices)); if (edge_partition_weights) { (*edge_partition_weights).push_back(std::move(*weights)); } @@ -954,6 +950,17 @@ create_graph_from_edgelist_impl( // convert edge list (COO) to compressed sparse format (CSR or CSC) + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + size_t element_size = sizeof(vertex_t) * 2; + if (edgelist_weights) { element_size += sizeof(weight_t); } + if (edgelist_edge_ids) { element_size += sizeof(edge_id_t); } + if (edgelist_edge_types) { element_size += sizeof(edge_type_t); } + auto constexpr mem_frugal_ratio = + 0.25; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the + // total_global_mem, switch to the memory frugal approach + auto mem_frugal_threshold = + static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); + rmm::device_uvector offsets(size_t{0}, handle.get_stream()); rmm::device_uvector indices(size_t{0}, handle.get_stream()); std::optional> weights{std::nullopt}; @@ -963,202 +970,130 @@ create_graph_from_edgelist_impl( if (edgelist_weights) { if (edgelist_edge_ids) { if (edgelist_edge_types) { - auto edge_value_first = thrust::make_zip_iterator((*edgelist_weights).begin(), - (*edgelist_edge_ids).begin(), - (*edgelist_edge_types).begin()); std::forward_as_tuple(offsets, indices, std::tie(weights, ids, types), std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::make_tuple(std::move(*edgelist_weights), + std::move(*edgelist_edge_ids), + std::move(*edgelist_edge_types)), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } else { - auto edge_value_first = - thrust::make_zip_iterator((*edgelist_weights).begin(), (*edgelist_edge_ids).begin()); std::forward_as_tuple(offsets, indices, std::tie(weights, ids), std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::make_tuple(std::move(*edgelist_weights), std::move(*edgelist_edge_ids)), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } } else { if (edgelist_edge_types) { - auto edge_value_first = - thrust::make_zip_iterator((*edgelist_weights).begin(), (*edgelist_edge_types).begin()); std::forward_as_tuple(offsets, indices, std::tie(weights, types), std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::make_tuple(std::move(*edgelist_weights), std::move(*edgelist_edge_types)), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } else { - auto edge_value_first = (*edgelist_weights).begin(); std::forward_as_tuple(offsets, indices, weights, std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::move(*edgelist_weights), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } } } else { if (edgelist_edge_ids) { if (edgelist_edge_types) { - auto edge_value_first = - thrust::make_zip_iterator((*edgelist_edge_ids).begin(), (*edgelist_edge_types).begin()); std::forward_as_tuple(offsets, indices, std::tie(ids, types), std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist, + store_transposed>( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::make_tuple(std::move(*edgelist_edge_ids), std::move(*edgelist_edge_types)), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } else { - auto edge_value_first = (*edgelist_edge_ids).begin(); std::forward_as_tuple(offsets, indices, ids, std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::move(*edgelist_edge_ids), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } } else { if (edgelist_edge_types) { - auto edge_value_first = (*edgelist_edge_types).begin(); std::forward_as_tuple(offsets, indices, types, std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - edge_value_first, - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); + detail::sort_and_compress_edgelist( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + std::move(*edgelist_edge_types), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } else { std::forward_as_tuple(offsets, indices, std::ignore) = - detail::compress_edgelist(edgelist_srcs.begin(), - edgelist_srcs.end(), - edgelist_dsts.begin(), - vertex_t{0}, - std::optional{std::nullopt}, - num_vertices, - vertex_t{0}, - num_vertices, - handle.get_stream()); - } - } - } - - edgelist_srcs.resize(0, handle.get_stream()); - edgelist_srcs.shrink_to_fit(handle.get_stream()); - edgelist_dsts.resize(0, handle.get_stream()); - edgelist_dsts.shrink_to_fit(handle.get_stream()); - if (edgelist_weights) { - (*edgelist_weights).resize(0, handle.get_stream()); - (*edgelist_weights).shrink_to_fit(handle.get_stream()); - } - if (edgelist_edge_ids) { - (*edgelist_edge_ids).resize(0, handle.get_stream()); - (*edgelist_edge_ids).shrink_to_fit(handle.get_stream()); - } - if (edgelist_edge_types) { - (*edgelist_edge_types).resize(0, handle.get_stream()); - (*edgelist_edge_types).shrink_to_fit(handle.get_stream()); - } - - // segmented sort neighbors - - if (weights) { - if (ids) { - if (types) { - detail::sort_adjacency_list( - handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - thrust::make_zip_iterator((*weights).begin(), (*ids).begin(), (*types).begin())); - } else { - detail::sort_adjacency_list(handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - thrust::make_zip_iterator((*weights).begin(), (*ids).begin())); - } - } else { - if (types) { - detail::sort_adjacency_list( - handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - thrust::make_zip_iterator((*weights).begin(), (*types).begin())); - } else { - detail::sort_adjacency_list(handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - (*weights).begin()); - } - } - } else { - if (ids) { - if (types) { - detail::sort_adjacency_list(handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - thrust::make_zip_iterator((*ids).begin(), (*types).begin())); - } else { - detail::sort_adjacency_list(handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - (*ids).begin()); - } - } else { - if (types) { - detail::sort_adjacency_list(handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end(), - (*types).begin()); - } else { - detail::sort_adjacency_list(handle, - raft::device_span(offsets.data(), offsets.size()), - indices.begin(), - indices.end()); + detail::sort_and_compress_edgelist( + std::move(edgelist_srcs), + std::move(edgelist_dsts), + vertex_t{0}, + std::optional{std::nullopt}, + num_vertices, + vertex_t{0}, + num_vertices, + mem_frugal_threshold, + handle.get_stream()); } } } diff --git a/cpp/src/structure/detail/structure_utils.cuh b/cpp/src/structure/detail/structure_utils.cuh index f57b549e1ef..01fbccaa53e 100644 --- a/cpp/src/structure/detail/structure_utils.cuh +++ b/cpp/src/structure/detail/structure_utils.cuh @@ -47,57 +47,38 @@ namespace cugraph { namespace detail { -template -struct update_edge_t { - raft::device_span offsets{}; - raft::device_span indices{}; - EdgeValueIterator edge_value_first{}; - vertex_t major_range_first{}; - - __device__ void operator()(typename thrust::iterator_traits::value_type e) const - { - auto s = thrust::get<0>(e); - auto d = thrust::get<1>(e); - auto major = store_transposed ? d : s; - auto minor = store_transposed ? s : d; - auto start = offsets[major - major_range_first]; - auto degree = offsets[(major - major_range_first) + 1] - start; - auto idx = - atomicAdd(&indices[start + degree - 1], vertex_t{1}); // use the last element as a counter - // FIXME: we can actually store minor - minor_range_first instead of minor to save memory if - // minor can be larger than 32 bit but minor - minor_range_first fits within 32 bit - indices[start + idx] = minor; // overwrite the counter only if idx == degree - 1 (no race) - if constexpr (!std::is_same_v) { - auto value = thrust::get<2>(e); - *(edge_value_first + (start + idx)) = value; - } - } -}; - template rmm::device_uvector compute_sparse_offsets( VertexIterator edgelist_major_first, VertexIterator edgelist_major_last, typename thrust::iterator_traits::value_type major_range_first, typename thrust::iterator_traits::value_type major_range_last, + bool edgelist_major_sorted, rmm::cuda_stream_view stream_view) { rmm::device_uvector offsets((major_range_last - major_range_first) + 1, stream_view); - thrust::fill(rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), edge_t{0}); - - auto offset_view = raft::device_span(offsets.data(), offsets.size()); - thrust::for_each(rmm::exec_policy(stream_view), - edgelist_major_first, - edgelist_major_last, - [offset_view, major_range_first] __device__(auto v) { - atomicAdd(&offset_view[v - major_range_first], edge_t{1}); - }); - thrust::exclusive_scan( - rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), offsets.begin()); + if (edgelist_major_sorted) { + offsets.set_element_to_zero_async(0, stream_view); + thrust::upper_bound(rmm::exec_policy(stream_view), + edgelist_major_first, + edgelist_major_last, + thrust::make_counting_iterator(major_range_first), + thrust::make_counting_iterator(major_range_last), + offsets.begin() + 1); + } else { + thrust::fill(rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), edge_t{0}); + + auto offset_view = raft::device_span(offsets.data(), offsets.size()); + thrust::for_each(rmm::exec_policy(stream_view), + edgelist_major_first, + edgelist_major_last, + [offset_view, major_range_first] __device__(auto v) { + atomicAdd(&offset_view[v - major_range_first], edge_t{1}); + }); + + thrust::exclusive_scan( + rmm::exec_policy(stream_view), offsets.begin(), offsets.end(), offsets.begin()); + } return offsets; } @@ -156,61 +137,77 @@ std::tuple, rmm::device_uvector> compress_ } // compress edge list (COO) to CSR (or CSC) or CSR + DCSR (CSC + DCSC) hybrid -template -std::tuple< - rmm::device_uvector, - rmm::device_uvector::value_type>, - decltype(allocate_dataframe_buffer::value_type>(size_t{0}, rmm::cuda_stream_view{})), - std::optional::value_type>>> -compress_edgelist( - VertexIterator edgelist_src_first, - VertexIterator edgelist_src_last, - VertexIterator edgelist_dst_first, - EdgeValueIterator edge_value_first, - typename thrust::iterator_traits::value_type major_range_first, - std::optional::value_type> - major_hypersparse_first, - typename thrust::iterator_traits::value_type major_range_last, - typename thrust::iterator_traits::value_type /* minor_range_first */, - typename thrust::iterator_traits::value_type /* minor_range_last */, +template +std::tuple, + rmm::device_uvector, + decltype(allocate_dataframe_buffer(size_t{0}, rmm::cuda_stream_view{})), + std::optional>> +sort_and_compress_edgelist( + rmm::device_uvector&& edgelist_srcs, + rmm::device_uvector&& edgelist_dsts, + decltype(allocate_dataframe_buffer(0, rmm::cuda_stream_view{}))&& edgelist_values, + vertex_t major_range_first, + std::optional major_hypersparse_first, + vertex_t major_range_last, + vertex_t /* minor_range_first */, + vertex_t /* minor_range_last */, + size_t mem_frugal_threshold, rmm::cuda_stream_view stream_view) { - using vertex_t = std::remove_cv_t::value_type>; - using edge_value_t = - std::remove_cv_t::value_type>; - - auto number_of_edges = - static_cast(thrust::distance(edgelist_src_first, edgelist_src_last)); - - auto offsets = compute_sparse_offsets( - store_transposed ? edgelist_dst_first : edgelist_src_first, - store_transposed ? edgelist_dst_first + number_of_edges : edgelist_src_last, - major_range_first, - major_range_last, - stream_view); - - rmm::device_uvector indices(number_of_edges, stream_view); - thrust::fill(rmm::exec_policy(stream_view), indices.begin(), indices.end(), vertex_t{0}); - auto values = allocate_dataframe_buffer(number_of_edges, stream_view); - - auto offset_view = raft::device_span(offsets.data(), offsets.size()); - auto index_view = raft::device_span(indices.data(), indices.size()); - auto edge_first = thrust::make_zip_iterator( - thrust::make_tuple(edgelist_src_first, edgelist_dst_first, edge_value_first)); - thrust::for_each( - rmm::exec_policy(stream_view), - edge_first, - edge_first + number_of_edges, - update_edge_t{ - offset_view, index_view, get_dataframe_buffer_begin(values), major_range_first}); + auto edgelist_majors = std::move(store_transposed ? edgelist_dsts : edgelist_srcs); + auto edgelist_minors = std::move(store_transposed ? edgelist_srcs : edgelist_dsts); + + rmm::device_uvector offsets(0, stream_view); + rmm::device_uvector indices(0, stream_view); + auto values = allocate_dataframe_buffer(0, stream_view); + auto pair_first = thrust::make_zip_iterator(edgelist_majors.begin(), edgelist_minors.begin()); + if (edgelist_minors.size() > mem_frugal_threshold) { + offsets = compute_sparse_offsets(edgelist_majors.begin(), + edgelist_majors.end(), + major_range_first, + major_range_last, + false, + stream_view); + + auto pivot = major_range_first + static_cast(thrust::distance( + offsets.begin(), + thrust::lower_bound(rmm::exec_policy(stream_view), + offsets.begin(), + offsets.end(), + edgelist_minors.size() / 2))); + auto second_first = + detail::mem_frugal_partition(pair_first, + pair_first + edgelist_minors.size(), + get_dataframe_buffer_begin(edgelist_values), + thrust_tuple_get, 0>{}, + pivot, + stream_view); + thrust::sort_by_key(rmm::exec_policy(stream_view), + pair_first, + std::get<0>(second_first), + get_dataframe_buffer_begin(edgelist_values)); + thrust::sort_by_key(rmm::exec_policy(stream_view), + std::get<0>(second_first), + pair_first + edgelist_minors.size(), + std::get<1>(second_first)); + } else { + thrust::sort_by_key(rmm::exec_policy(stream_view), + pair_first, + pair_first + edgelist_minors.size(), + get_dataframe_buffer_begin(edgelist_values)); + + offsets = compute_sparse_offsets(edgelist_majors.begin(), + edgelist_majors.end(), + major_range_first, + major_range_last, + true, + stream_view); + } + indices = std::move(edgelist_minors); + values = std::move(edgelist_values); + + edgelist_majors.resize(0, stream_view); + edgelist_majors.shrink_to_fit(stream_view); std::optional> dcs_nzd_vertices{std::nullopt}; if (major_hypersparse_first) { @@ -226,47 +223,61 @@ compress_edgelist( } // compress edge list (COO) to CSR (or CSC) or CSR + DCSR (CSC + DCSC) hybrid -template -std::tuple< - rmm::device_uvector, - rmm::device_uvector::value_type>, - std::optional::value_type>>> -compress_edgelist( - VertexIterator edgelist_src_first, - VertexIterator edgelist_src_last, - VertexIterator edgelist_dst_first, - typename thrust::iterator_traits::value_type major_range_first, - std::optional::value_type> - major_hypersparse_first, - typename thrust::iterator_traits::value_type major_range_last, - typename thrust::iterator_traits::value_type /* minor_range_first */, - typename thrust::iterator_traits::value_type /* minor_range_last */, - rmm::cuda_stream_view stream_view) +template +std::tuple, + rmm::device_uvector, + std::optional>> +sort_and_compress_edgelist(rmm::device_uvector&& edgelist_srcs, + rmm::device_uvector&& edgelist_dsts, + vertex_t major_range_first, + std::optional major_hypersparse_first, + vertex_t major_range_last, + vertex_t /* minor_range_first */, + vertex_t /* minor_range_last */, + size_t mem_frugal_threshold, + rmm::cuda_stream_view stream_view) { - using vertex_t = std::remove_cv_t::value_type>; - - auto number_of_edges = - static_cast(thrust::distance(edgelist_src_first, edgelist_src_last)); - - auto offsets = compute_sparse_offsets( - store_transposed ? edgelist_dst_first : edgelist_src_first, - store_transposed ? edgelist_dst_first + number_of_edges : edgelist_src_last, - major_range_first, - major_range_last, - stream_view); - - rmm::device_uvector indices(number_of_edges, stream_view); - thrust::fill(rmm::exec_policy(stream_view), indices.begin(), indices.end(), vertex_t{0}); - - auto offset_view = raft::device_span(offsets.data(), offsets.size()); - auto index_view = raft::device_span(indices.data(), indices.size()); - auto edge_first = - thrust::make_zip_iterator(thrust::make_tuple(edgelist_src_first, edgelist_dst_first)); - thrust::for_each(rmm::exec_policy(stream_view), - edge_first, - edge_first + number_of_edges, - update_edge_t{ - offset_view, index_view, static_cast(nullptr), major_range_first}); + auto edgelist_majors = std::move(store_transposed ? edgelist_dsts : edgelist_srcs); + auto edgelist_minors = std::move(store_transposed ? edgelist_srcs : edgelist_dsts); + + rmm::device_uvector offsets(0, stream_view); + rmm::device_uvector indices(0, stream_view); + auto edge_first = thrust::make_zip_iterator(edgelist_majors.begin(), edgelist_minors.begin()); + if (edgelist_minors.size() > mem_frugal_threshold) { + offsets = compute_sparse_offsets(edgelist_majors.begin(), + edgelist_majors.end(), + major_range_first, + major_range_last, + false, + stream_view); + + auto pivot = major_range_first + static_cast(thrust::distance( + offsets.begin(), + thrust::lower_bound(rmm::exec_policy(stream_view), + offsets.begin(), + offsets.end(), + edgelist_minors.size() / 2))); + auto second_first = + detail::mem_frugal_partition(edge_first, + edge_first + edgelist_minors.size(), + thrust_tuple_get, 0>{}, + pivot, + stream_view); + thrust::sort(rmm::exec_policy(stream_view), edge_first, second_first); + thrust::sort(rmm::exec_policy(stream_view), second_first, edge_first + edgelist_minors.size()); + } else { + thrust::sort(rmm::exec_policy(stream_view), edge_first, edge_first + edgelist_minors.size()); + offsets = compute_sparse_offsets(edgelist_majors.begin(), + edgelist_majors.end(), + major_range_first, + major_range_last, + true, + stream_view); + } + indices = std::move(edgelist_minors); + + edgelist_majors.resize(0, stream_view); + edgelist_majors.shrink_to_fit(stream_view); std::optional> dcs_nzd_vertices{std::nullopt}; if (major_hypersparse_first) { diff --git a/cpp/src/structure/induced_subgraph_impl.cuh b/cpp/src/structure/induced_subgraph_impl.cuh index 950cca5828d..18e1af32a71 100644 --- a/cpp/src/structure/induced_subgraph_impl.cuh +++ b/cpp/src/structure/induced_subgraph_impl.cuh @@ -196,6 +196,7 @@ extract_induced_subgraphs( graph_ids_v.end(), size_t{0}, size_t{subgraph_offsets.size() - 1}, + true, handle.get_stream()); dst_subgraph_offsets = @@ -290,6 +291,7 @@ extract_induced_subgraphs( subgraph_edge_graph_ids.end(), size_t{0}, size_t{subgraph_offsets.size() - 1}, + true, handle.get_stream()); #ifdef TIMING diff --git a/cpp/src/structure/renumber_edgelist_impl.cuh b/cpp/src/structure/renumber_edgelist_impl.cuh index 6bc19ff4fe1..09a4dae6c64 100644 --- a/cpp/src/structure/renumber_edgelist_impl.cuh +++ b/cpp/src/structure/renumber_edgelist_impl.cuh @@ -367,18 +367,19 @@ std::tuple, std::vector, vertex_t> compu rmm::device_uvector sorted_local_vertex_degrees(0, handle.get_stream()); std::optional> stream_pool_indices{ std::nullopt}; // FIXME: move this inside the if statement + + auto constexpr num_chunks = size_t{ + 2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more binary + // searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and temporary + // buffer requirement (cut by num_chunks times), currently set to 2 to avoid peak memory + // usage happening in this part (especially when minor_comm_size is small) + if constexpr (multi_gpu) { auto& comm = handle.get_comms(); auto& minor_comm = handle.get_subcomm(cugraph::partition_manager::minor_comm_name()); auto const minor_comm_rank = minor_comm.get_rank(); auto const minor_comm_size = minor_comm.get_size(); - auto constexpr num_chunks = size_t{ - 2}; // tuning parameter, this trade-offs # binary searches (up to num_chunks times more - // binary searches can be necessary if num_unique_majors << edgelist_edge_counts[i]) and - // temporary buffer requirement (cut by num_chunks times), currently set to 2 to avoid - // peak memory usage happening in this part (especially when minor_comm_size is small) - assert(edgelist_majors.size() == minor_comm_size); auto edge_partition_major_range_sizes = @@ -433,29 +434,30 @@ std::tuple, std::vector, vertex_t> compu sorted_major_degrees.end(), edge_t{0}); - rmm::device_uvector tmp_majors( + rmm::device_uvector tmp_majors(0, loop_stream); + tmp_majors.reserve( (static_cast(edgelist_edge_counts[i]) + (num_chunks - 1)) / num_chunks, - handle.get_stream()); + loop_stream); size_t offset{0}; for (size_t j = 0; j < num_chunks; ++j) { size_t this_chunk_size = - std::min(tmp_majors.size(), static_cast(edgelist_edge_counts[i]) - offset); + std::min(tmp_majors.capacity(), static_cast(edgelist_edge_counts[i]) - offset); + tmp_majors.resize(this_chunk_size, loop_stream); thrust::copy(rmm::exec_policy(loop_stream), edgelist_majors[i] + offset, - edgelist_majors[i] + offset + this_chunk_size, + edgelist_majors[i] + offset + tmp_majors.size(), tmp_majors.begin()); - thrust::sort( - rmm::exec_policy(loop_stream), tmp_majors.begin(), tmp_majors.begin() + this_chunk_size); + thrust::sort(rmm::exec_policy(loop_stream), tmp_majors.begin(), tmp_majors.end()); auto num_unique_majors = thrust::count_if(rmm::exec_policy(loop_stream), thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(this_chunk_size), + thrust::make_counting_iterator(tmp_majors.size()), is_first_in_run_t{tmp_majors.data()}); rmm::device_uvector tmp_keys(num_unique_majors, loop_stream); rmm::device_uvector tmp_values(num_unique_majors, loop_stream); thrust::reduce_by_key(rmm::exec_policy(loop_stream), tmp_majors.begin(), - tmp_majors.begin() + this_chunk_size, + tmp_majors.end(), thrust::make_constant_iterator(edge_t{1}), tmp_keys.begin(), tmp_values.begin()); @@ -486,44 +488,50 @@ std::tuple, std::vector, vertex_t> compu } else { assert(edgelist_majors.size() == 1); - rmm::device_uvector tmp_majors(edgelist_edge_counts[0], handle.get_stream()); - thrust::copy(handle.get_thrust_policy(), - edgelist_majors[0], - edgelist_majors[0] + edgelist_edge_counts[0], - tmp_majors.begin()); - thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); - auto num_unique_majors = - thrust::count_if(handle.get_thrust_policy(), - thrust::make_counting_iterator(size_t{0}), - thrust::make_counting_iterator(tmp_majors.size()), - is_first_in_run_t{tmp_majors.data()}); - rmm::device_uvector tmp_keys(num_unique_majors, handle.get_stream()); - rmm::device_uvector tmp_values(num_unique_majors, handle.get_stream()); - thrust::reduce_by_key(handle.get_thrust_policy(), - tmp_majors.begin(), - tmp_majors.end(), - thrust::make_constant_iterator(edge_t{1}), - tmp_keys.begin(), - tmp_values.begin()); - - tmp_majors.resize(0, handle.get_stream()); - tmp_majors.shrink_to_fit(handle.get_stream()); - sorted_local_vertex_degrees.resize(sorted_local_vertices.size(), handle.get_stream()); thrust::fill(handle.get_thrust_policy(), sorted_local_vertex_degrees.begin(), sorted_local_vertex_degrees.end(), edge_t{0}); - auto kv_pair_first = - thrust::make_zip_iterator(thrust::make_tuple(tmp_keys.begin(), tmp_values.begin())); - thrust::for_each(handle.get_thrust_policy(), - kv_pair_first, - kv_pair_first + tmp_keys.size(), - search_and_increment_degree_t{ - sorted_local_vertices.data(), - static_cast(sorted_local_vertices.size()), - sorted_local_vertex_degrees.data()}); + rmm::device_uvector tmp_majors(0, handle.get_stream()); + tmp_majors.reserve(static_cast(edgelist_edge_counts[0] + (num_chunks - 1)) / num_chunks, + handle.get_stream()); + size_t offset{0}; + for (size_t i = 0; i < num_chunks; ++i) { + size_t this_chunk_size = + std::min(tmp_majors.capacity(), static_cast(edgelist_edge_counts[0]) - offset); + tmp_majors.resize(this_chunk_size, handle.get_stream()); + thrust::copy(handle.get_thrust_policy(), + edgelist_majors[0] + offset, + edgelist_majors[0] + offset + tmp_majors.size(), + tmp_majors.begin()); + thrust::sort(handle.get_thrust_policy(), tmp_majors.begin(), tmp_majors.end()); + auto num_unique_majors = + thrust::count_if(handle.get_thrust_policy(), + thrust::make_counting_iterator(size_t{0}), + thrust::make_counting_iterator(tmp_majors.size()), + is_first_in_run_t{tmp_majors.data()}); + rmm::device_uvector tmp_keys(num_unique_majors, handle.get_stream()); + rmm::device_uvector tmp_values(num_unique_majors, handle.get_stream()); + thrust::reduce_by_key(handle.get_thrust_policy(), + tmp_majors.begin(), + tmp_majors.end(), + thrust::make_constant_iterator(edge_t{1}), + tmp_keys.begin(), + tmp_values.begin()); + + auto kv_pair_first = + thrust::make_zip_iterator(thrust::make_tuple(tmp_keys.begin(), tmp_values.begin())); + thrust::for_each(handle.get_thrust_policy(), + kv_pair_first, + kv_pair_first + tmp_keys.size(), + search_and_increment_degree_t{ + sorted_local_vertices.data(), + static_cast(sorted_local_vertices.size()), + sorted_local_vertex_degrees.data()}); + offset += this_chunk_size; + } } // 4. sort local vertices by degree (descending) diff --git a/cpp/tests/community/mg_egonet_test.cu b/cpp/tests/community/mg_egonet_test.cu index 42a2bba1181..6660eac3cad 100644 --- a/cpp/tests/community/mg_egonet_test.cu +++ b/cpp/tests/community/mg_egonet_test.cu @@ -215,6 +215,7 @@ class Tests_MGEgonet graph_ids_v.end(), size_t{0}, d_mg_edgelist_offsets.size() - 1, + true, handle_->get_stream()); auto [d_reference_src, d_reference_dst, d_reference_wgt, d_reference_offsets] = diff --git a/cpp/tests/structure/mg_induced_subgraph_test.cu b/cpp/tests/structure/mg_induced_subgraph_test.cu index 3f3db7c5278..b7bd22dfa63 100644 --- a/cpp/tests/structure/mg_induced_subgraph_test.cu +++ b/cpp/tests/structure/mg_induced_subgraph_test.cu @@ -210,6 +210,7 @@ class Tests_MGInducedSubgraph graph_ids_v.end(), size_t{0}, size_t{d_subgraph_offsets.size() - 1}, + true, handle_->get_stream()); auto [sg_graph, sg_edge_weights, sg_number_map] = cugraph::test::mg_graph_to_sg_graph( diff --git a/cpp/tests/utilities/test_utilities_impl.cuh b/cpp/tests/utilities/test_utilities_impl.cuh index 3025ca7908b..856c50ad35f 100644 --- a/cpp/tests/utilities/test_utilities_impl.cuh +++ b/cpp/tests/utilities/test_utilities_impl.cuh @@ -183,43 +183,42 @@ graph_to_host_csr( } } + auto total_global_mem = handle.get_device_properties().totalGlobalMem; + size_t element_size = sizeof(vertex_t) * 2; + if (d_wgt) { element_size += sizeof(weight_t); } + auto constexpr mem_frugal_ratio = + 0.25; // if the expected temporary buffer size exceeds the mem_frugal_ratio of the + // total_global_mem, switch to the memory frugal approach + auto mem_frugal_threshold = + static_cast(static_cast(total_global_mem / element_size) * mem_frugal_ratio); + rmm::device_uvector d_offsets(0, handle.get_stream()); if (d_wgt) { std::tie(d_offsets, d_dst, *d_wgt, std::ignore) = - detail::compress_edgelist(d_src.begin(), - d_src.end(), - d_dst.begin(), - d_wgt->begin(), - vertex_t{0}, - std::optional{std::nullopt}, - graph_view.number_of_vertices(), - vertex_t{0}, - graph_view.number_of_vertices(), - handle.get_stream()); - - // segmented sort neighbors - detail::sort_adjacency_list(handle, - raft::device_span(d_offsets.data(), d_offsets.size()), - d_dst.begin(), - d_dst.end(), - d_wgt->begin()); + detail::sort_and_compress_edgelist( + std::move(d_src), + std::move(d_dst), + std::move(*d_wgt), + vertex_t{0}, + std::optional{std::nullopt}, + graph_view.number_of_vertices(), + vertex_t{0}, + graph_view.number_of_vertices(), + mem_frugal_threshold, + handle.get_stream()); } else { std::tie(d_offsets, d_dst, std::ignore) = - detail::compress_edgelist(d_src.begin(), - d_src.end(), - d_dst.begin(), - vertex_t{0}, - std::optional{std::nullopt}, - graph_view.number_of_vertices(), - vertex_t{0}, - graph_view.number_of_vertices(), - handle.get_stream()); - // segmented sort neighbors - detail::sort_adjacency_list(handle, - raft::device_span(d_offsets.data(), d_offsets.size()), - d_dst.begin(), - d_dst.end()); + detail::sort_and_compress_edgelist( + std::move(d_src), + std::move(d_dst), + vertex_t{0}, + std::optional{std::nullopt}, + graph_view.number_of_vertices(), + vertex_t{0}, + graph_view.number_of_vertices(), + mem_frugal_threshold, + handle.get_stream()); } return std::make_tuple(