diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ddb84d8a0f0..542e9cacb77 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -63,3 +63,23 @@ repos: [.]flake8[.]cython$| meta[.]yaml$| setup[.]cfg$ + - repo: local + hooks: + - id: nx-cugraph-meta-data-update + name: nx-cugraph meta-data updater + entry: bash -c "PYTHONPATH=./python/nx-cugraph python ./python/nx-cugraph/_nx_cugraph/__init__.py" + files: ^python/nx-cugraph/ + types: [python] + language: python + pass_filenames: false + additional_dependencies: ["networkx>=3.2"] + - repo: local + hooks: + - id: nx-cugraph-readme-update + name: nx-cugraph README updater + entry: bash -c "PYTHONPATH=./python/nx-cugraph python ./python/nx-cugraph/scripts/update_readme.py ./python/nx-cugraph/README.md" + files: ^python/nx-cugraph/ + types_or: [python, markdown] + language: python + pass_filenames: false + additional_dependencies: ["networkx>=3.2"] diff --git a/ci/test_python.sh b/ci/test_python.sh index e05160239ab..3a47d7e1490 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -127,13 +127,6 @@ python -m nx_cugraph.scripts.print_tree --dispatch-name --plc --incomplete --dif python -m nx_cugraph.scripts.print_table popd -rapids-logger "ensure nx-cugraph autogenerated files are up to date" -pushd python/nx-cugraph -make || true -git diff --exit-code . -git checkout . -popd - rapids-logger "pytest cugraph-service (single GPU)" ./ci/run_cugraph_service_pytests.sh \ --verbose \ diff --git a/cpp/include/cugraph/mtmg/instance_manager.hpp b/cpp/include/cugraph/mtmg/instance_manager.hpp index f60063c4101..a2111804997 100644 --- a/cpp/include/cugraph/mtmg/instance_manager.hpp +++ b/cpp/include/cugraph/mtmg/instance_manager.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,21 @@ class instance_manager_t { return handle_t(*raft_handle_[gpu_id], thread_id, device_ids_[gpu_id]); } + /** + * @brief Get handle for particular GPU + * + * Return a handle for a particular GPU. In a context-free environment + * this lets the caller reconstitute the handle for the right host thread. + * It does assume that the caller will not allow multiple threads to + * concurrently use a gpu_id/thread_id pair. + * + * @return a handle for this thread. + */ + handle_t get_handle(int gpu_id, int thread_id = 0) + { + return handle_t(*raft_handle_[gpu_id], thread_id, device_ids_[gpu_id]); + } + /** * @brief Reset the thread counter * diff --git a/cpp/include/cugraph/mtmg/vertex_result_view.hpp b/cpp/include/cugraph/mtmg/vertex_result_view.hpp index 42b80cea62f..cd22fc98f79 100644 --- a/cpp/include/cugraph/mtmg/vertex_result_view.hpp +++ b/cpp/include/cugraph/mtmg/vertex_result_view.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,8 @@ class vertex_result_view_t : public detail::device_shared_device_span_t vertices, std::vector const& vertex_partition_range_lasts, cugraph::vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + result_t default_value = 0); }; } // namespace mtmg diff --git a/cpp/src/c_api/random_walks.cpp b/cpp/src/c_api/random_walks.cpp index fd340a2c8e3..b9a2c8e4f60 100644 --- a/cpp/src/c_api/random_walks.cpp +++ b/cpp/src/c_api/random_walks.cpp @@ -475,6 +475,14 @@ cugraph_error_code_t cugraph_node2vec(const cugraph_resource_handle_t* handle, cugraph_random_walk_result_t** result, cugraph_error_t** error) { + CAPI_EXPECTS(reinterpret_cast(graph)->vertex_type_ == + reinterpret_cast( + start_vertices) + ->type_, + CUGRAPH_INVALID_INPUT, + "vertex type of graph and start_vertices must match", + *error); + cugraph::c_api::node2vec_functor functor( handle, graph, start_vertices, max_length, compress_results, p, q); @@ -528,6 +536,14 @@ cugraph_error_code_t cugraph_uniform_random_walks( cugraph_random_walk_result_t** result, cugraph_error_t** error) { + CAPI_EXPECTS(reinterpret_cast(graph)->vertex_type_ == + reinterpret_cast( + start_vertices) + ->type_, + CUGRAPH_INVALID_INPUT, + "vertex type of graph and start_vertices must match", + *error); + uniform_random_walks_functor functor(handle, graph, start_vertices, max_length); return cugraph::c_api::run_algorithm(graph, functor, result, error); @@ -541,6 +557,14 @@ cugraph_error_code_t cugraph_biased_random_walks( cugraph_random_walk_result_t** result, cugraph_error_t** error) { + CAPI_EXPECTS(reinterpret_cast(graph)->vertex_type_ == + reinterpret_cast( + start_vertices) + ->type_, + CUGRAPH_INVALID_INPUT, + "vertex type of graph and start_vertices must match", + *error); + biased_random_walks_functor functor(handle, graph, start_vertices, max_length); return cugraph::c_api::run_algorithm(graph, functor, result, error); @@ -556,6 +580,14 @@ cugraph_error_code_t cugraph_node2vec_random_walks( cugraph_random_walk_result_t** result, cugraph_error_t** error) { + CAPI_EXPECTS(reinterpret_cast(graph)->vertex_type_ == + reinterpret_cast( + start_vertices) + ->type_, + CUGRAPH_INVALID_INPUT, + "vertex type of graph and start_vertices must match", + *error); + node2vec_random_walks_functor functor(handle, graph, start_vertices, max_length, p, q); return cugraph::c_api::run_algorithm(graph, functor, result, error); diff --git a/cpp/src/mtmg/vertex_result.cu b/cpp/src/mtmg/vertex_result.cu index 0339ff10d0a..1dc6b876d52 100644 --- a/cpp/src/mtmg/vertex_result.cu +++ b/cpp/src/mtmg/vertex_result.cu @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -34,58 +35,71 @@ rmm::device_uvector vertex_result_view_t::gather( raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view) + std::optional>& renumber_map_view, + result_t default_value) { - rmm::device_uvector local_vertices(vertices.size(), handle.get_stream()); - rmm::device_uvector vertex_gpu_ids(vertices.size(), handle.get_stream()); - rmm::device_uvector vertex_pos(vertices.size(), handle.get_stream()); - rmm::device_uvector result(vertices.size(), handle.get_stream()); - - raft::copy(local_vertices.data(), vertices.data(), vertices.size(), handle.get_stream()); - cugraph::detail::scalar_fill( - handle.get_stream(), vertex_gpu_ids.data(), vertex_gpu_ids.size(), handle.get_rank()); - cugraph::detail::sequence_fill( - handle.get_stream(), vertex_pos.data(), vertex_pos.size(), size_t{0}); - - rmm::device_uvector d_vertex_partition_range_lasts(vertex_partition_range_lasts.size(), - handle.get_stream()); - raft::update_device(d_vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.data(), - vertex_partition_range_lasts.size(), - handle.get_stream()); + auto stream = handle.raft_handle().get_stream(); + + rmm::device_uvector local_vertices(vertices.size(), stream); + rmm::device_uvector vertex_gpu_ids(multi_gpu ? vertices.size() : 0, stream); + rmm::device_uvector vertex_pos(multi_gpu ? vertices.size() : 0, stream); + + raft::copy(local_vertices.data(), vertices.data(), vertices.size(), stream); + + if constexpr (multi_gpu) { + cugraph::detail::scalar_fill( + stream, vertex_gpu_ids.data(), vertex_gpu_ids.size(), handle.get_rank()); + cugraph::detail::sequence_fill(stream, vertex_pos.data(), vertex_pos.size(), size_t{0}); + + auto const comm_size = handle.raft_handle().get_comms().get_size(); + auto const major_comm_size = + handle.raft_handle().get_subcomm(cugraph::partition_manager::major_comm_name()).get_size(); + auto const minor_comm_size = + handle.raft_handle().get_subcomm(cugraph::partition_manager::minor_comm_name()).get_size(); + + std::forward_as_tuple(local_vertices, std::tie(vertex_gpu_ids, vertex_pos), std::ignore) = + groupby_gpu_id_and_shuffle_kv_pairs( + handle.raft_handle().get_comms(), + local_vertices.begin(), + local_vertices.end(), + thrust::make_zip_iterator(vertex_gpu_ids.begin(), vertex_pos.begin()), + cugraph::detail::compute_gpu_id_from_ext_vertex_t{ + comm_size, major_comm_size, minor_comm_size}, + stream); + } if (renumber_map_view) { - cugraph::renumber_ext_vertices( + cugraph::renumber_local_ext_vertices( handle.raft_handle(), local_vertices.data(), local_vertices.size(), renumber_map_view->get(handle).data(), vertex_partition_view.local_vertex_partition_range_first(), vertex_partition_view.local_vertex_partition_range_last()); - } - auto const major_comm_size = - handle.raft_handle().get_subcomm(cugraph::partition_manager::major_comm_name()).get_size(); - auto const minor_comm_size = - handle.raft_handle().get_subcomm(cugraph::partition_manager::minor_comm_name()).get_size(); - - std::forward_as_tuple(local_vertices, std::tie(vertex_gpu_ids, vertex_pos), std::ignore) = - groupby_gpu_id_and_shuffle_kv_pairs( - handle.raft_handle().get_comms(), - local_vertices.begin(), - local_vertices.end(), - thrust::make_zip_iterator(vertex_gpu_ids.begin(), vertex_pos.begin()), - cugraph::detail::compute_gpu_id_from_int_vertex_t{ - raft::device_span(d_vertex_partition_range_lasts.data(), - d_vertex_partition_range_lasts.size()), - major_comm_size, - minor_comm_size}, - handle.get_stream()); + size_t new_size = thrust::distance( + thrust::make_zip_iterator(local_vertices.begin(), vertex_gpu_ids.begin(), vertex_pos.begin()), + thrust::remove_if( + rmm::exec_policy(stream), + thrust::make_zip_iterator( + local_vertices.begin(), vertex_gpu_ids.begin(), vertex_pos.begin()), + thrust::make_zip_iterator(local_vertices.end(), vertex_gpu_ids.end(), vertex_pos.end()), + [check = cugraph::detail::check_out_of_range_t{ + vertex_partition_view.local_vertex_partition_range_first(), + vertex_partition_view.local_vertex_partition_range_last()}] __device__(auto tuple) { + return check(thrust::get<0>(tuple)); + })); + + local_vertices.resize(new_size, stream); + vertex_gpu_ids.resize(new_size, stream); + vertex_pos.resize(new_size, stream); + } // // Now gather // - rmm::device_uvector tmp_result(local_vertices.size(), handle.get_stream()); + rmm::device_uvector result(local_vertices.size(), stream); + cugraph::detail::scalar_fill(stream, result.data(), result.size(), default_value); auto& wrapped = this->get(handle); @@ -98,32 +112,36 @@ rmm::device_uvector vertex_result_view_t::gather( return vertex_partition.local_vertex_partition_offset_from_vertex_nocheck(v); })); - thrust::gather(handle.get_thrust_policy(), - iter, - iter + local_vertices.size(), - wrapped.begin(), - tmp_result.begin()); - - // - // Shuffle back - // - std::forward_as_tuple(std::ignore, std::tie(std::ignore, vertex_pos, tmp_result), std::ignore) = - groupby_gpu_id_and_shuffle_kv_pairs( - handle.raft_handle().get_comms(), - vertex_gpu_ids.begin(), - vertex_gpu_ids.end(), - thrust::make_zip_iterator(local_vertices.begin(), vertex_pos.begin(), tmp_result.begin()), - thrust::identity{}, - handle.get_stream()); - - // - // Finally, reorder result - // - thrust::scatter(handle.get_thrust_policy(), - tmp_result.begin(), - tmp_result.end(), - vertex_pos.begin(), - result.begin()); + thrust::gather( + rmm::exec_policy(stream), iter, iter + local_vertices.size(), wrapped.begin(), result.begin()); + + if constexpr (multi_gpu) { + rmm::device_uvector tmp_result(0, stream); + + // + // Shuffle back + // + std::forward_as_tuple(std::ignore, std::tie(std::ignore, vertex_pos, tmp_result), std::ignore) = + groupby_gpu_id_and_shuffle_kv_pairs( + handle.raft_handle().get_comms(), + vertex_gpu_ids.begin(), + vertex_gpu_ids.end(), + thrust::make_zip_iterator(local_vertices.begin(), vertex_pos.begin(), result.begin()), + thrust::identity{}, + stream); + + // + // Finally, reorder result + // + result.resize(tmp_result.size(), stream); + cugraph::detail::scalar_fill(stream, result.data(), result.size(), default_value); + + thrust::scatter(rmm::exec_policy(stream), + tmp_result.begin(), + tmp_result.end(), + vertex_pos.begin(), + result.begin()); + } return result; } @@ -133,84 +151,96 @@ template rmm::device_uvector vertex_result_view_t::gather( raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + float default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + float default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + float default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + float default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + double default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + double default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + double default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + double default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + int32_t default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + int32_t default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + int64_t default_value); template rmm::device_uvector vertex_result_view_t::gather( handle_t const& handle, raft::device_span vertices, std::vector const& vertex_partition_range_lasts, vertex_partition_view_t vertex_partition_view, - std::optional>& renumber_map_view); + std::optional>& renumber_map_view, + int64_t default_value); } // namespace mtmg } // namespace cugraph diff --git a/cpp/src/prims/detail/optional_dataframe_buffer.hpp b/cpp/src/prims/detail/optional_dataframe_buffer.hpp index 62b2245a651..87c095f8e81 100644 --- a/cpp/src/prims/detail/optional_dataframe_buffer.hpp +++ b/cpp/src/prims/detail/optional_dataframe_buffer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ #include +#include + namespace cugraph { namespace detail { @@ -58,12 +60,72 @@ void* get_optional_dataframe_buffer_begin(std::byte& optional_dataframe_buffer) template >* = nullptr> auto get_optional_dataframe_buffer_begin( - std::add_lvalue_reference_t( - size_t{0}, rmm::cuda_stream_view{}))> optional_dataframe_buffer) + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer) { return get_dataframe_buffer_begin(optional_dataframe_buffer); } +template >* = nullptr> +void* get_optional_dataframe_buffer_end(std::byte& optional_dataframe_buffer) +{ + return static_cast(nullptr); +} + +template >* = nullptr> +auto get_optional_dataframe_buffer_end( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer) +{ + return get_dataframe_buffer_end(optional_dataframe_buffer); +} + +template >* = nullptr> +void const* get_optional_dataframe_buffer_cbegin(std::byte const& optional_dataframe_buffer) +{ + return static_cast(nullptr); +} + +template >* = nullptr> +auto get_optional_dataframe_buffer_cbegin( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))> const& + optional_dataframe_buffer) +{ + return get_dataframe_buffer_cbegin(optional_dataframe_buffer); +} + +template >* = nullptr> +void const* get_optional_dataframe_buffer_cend(std::byte const& optional_dataframe_buffer) +{ + return static_cast(nullptr); +} + +template >* = nullptr> +auto get_optional_dataframe_buffer_cend( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))> const& + optional_dataframe_buffer) +{ + return get_dataframe_buffer_cend(optional_dataframe_buffer); +} + +template >* = nullptr> +void reserve_optional_dataframe_buffer(std::byte& optional_dataframe_buffer, + size_t new_buffer_capacity, + rmm::cuda_stream_view stream_view) +{ + return; +} + +template >* = nullptr> +void reserve_optional_dataframe_buffer( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer, + size_t new_buffer_capacity, + rmm::cuda_stream_view stream_view) +{ + return reserve_dataframe_buffer(optional_dataframe_buffer, new_buffer_capacity, stream_view); +} + template >* = nullptr> void resize_optional_dataframe_buffer(std::byte& optional_dataframe_buffer, size_t new_buffer_size, @@ -74,8 +136,8 @@ void resize_optional_dataframe_buffer(std::byte& optional_dataframe_buffer, template >* = nullptr> void resize_optional_dataframe_buffer( - std::add_lvalue_reference_t( - size_t{0}, rmm::cuda_stream_view{}))> optional_dataframe_buffer, + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view) { @@ -91,13 +153,27 @@ void shrink_to_fit_optional_dataframe_buffer(std::byte& optional_dataframe_buffe template >* = nullptr> void shrink_to_fit_optional_dataframe_buffer( - std::add_lvalue_reference_t( - size_t{0}, rmm::cuda_stream_view{}))> optional_dataframe_buffer, + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer, rmm::cuda_stream_view stream_view) { return shrink_to_fit_dataframe_buffer(optional_dataframe_buffer, stream_view); } +template >* = nullptr> +size_t size_optional_dataframe_buffer(std::byte const& optional_dataframe_buffer) +{ + return size_t{0}; +} + +template >* = nullptr> +size_t size_optional_dataframe_buffer( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))> const& + optional_dataframe_buffer) +{ + return size_dataframe_buffer(optional_dataframe_buffer); +} + } // namespace detail } // namespace cugraph diff --git a/cpp/src/prims/kv_store.cuh b/cpp/src/prims/kv_store.cuh index 4c728d58930..bc1d88217f6 100644 --- a/cpp/src/prims/kv_store.cuh +++ b/cpp/src/prims/kv_store.cuh @@ -15,6 +15,8 @@ */ #pragma once +#include "prims/detail/optional_dataframe_buffer.hpp" + #include #include @@ -87,21 +89,9 @@ struct kv_binary_search_contains_op_t { } }; -template +template struct kv_cuco_insert_and_increment_t { - using key_type = typename thrust::iterator_traits::value_type; - using cuco_set_type = - cuco::static_map, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, // CG size - cuco::murmurhash3_32>, - rmm::mr::stream_allocator_adaptor>, - cuco_storage_type>; - - typename cuco_set_type::ref_type device_ref{}; + RefType device_ref{}; KeyIterator key_first{}; size_t* counter{nullptr}; size_t invalid_idx{}; @@ -112,9 +102,8 @@ struct kv_cuco_insert_and_increment_t { auto [iter, inserted] = device_ref.insert_and_find(pair); if (inserted) { cuda::atomic_ref atomic_counter(*counter); - auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); - using ref_type = typename cuco_set_type::ref_type; - cuda::atomic_ref ref( + auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); + cuda::atomic_ref ref( (*iter).second); ref.store(idx, cuda::std::memory_order_relaxed); return idx; @@ -124,21 +113,9 @@ struct kv_cuco_insert_and_increment_t { } }; -template +template struct kv_cuco_insert_if_and_increment_t { - using key_type = typename thrust::iterator_traits::value_type; - using cuco_set_type = - cuco::static_map, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, // CG size - cuco::murmurhash3_32>, - rmm::mr::stream_allocator_adaptor>, - cuco_storage_type>; - - typename cuco_set_type::ref_type device_ref{}; + RefType device_ref{}; KeyIterator key_first{}; StencilIterator stencil_first{}; PredOp pred_op{}; @@ -153,9 +130,8 @@ struct kv_cuco_insert_if_and_increment_t { auto [iter, inserted] = device_ref.insert_and_find(pair); if (inserted) { cuda::atomic_ref atomic_counter(*counter); - auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); - using ref_type = typename cuco_set_type::ref_type; - cuda::atomic_ref ref( + auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); + cuda::atomic_ref ref( (*iter).second); ref.store(idx, cuda::std::memory_order_relaxed); return idx; @@ -165,27 +141,15 @@ struct kv_cuco_insert_if_and_increment_t { } }; -template +template struct kv_cuco_insert_and_assign_t { - using cuco_set_type = - cuco::static_map, value_t, size_t>, - cuco::extent, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, // CG size - cuco::murmurhash3_32>, - rmm::mr::stream_allocator_adaptor>, - cuco_storage_type>; - - typename cuco_set_type::ref_type device_ref{}; + RefType device_ref{}; __device__ void operator()(thrust::tuple pair) { auto [iter, inserted] = device_ref.insert_and_find(pair); if (!inserted) { - using ref_type = typename cuco_set_type::ref_type; - cuda::atomic_ref ref( + cuda::atomic_ref ref( (*iter).second); ref.store(thrust::get<1>(pair), cuda::std::memory_order_relaxed); } @@ -228,7 +192,7 @@ template struct kv_cuco_store_find_device_view_t { using key_type = typename ViewType::key_type; using value_type = typename ViewType::value_type; - using cuco_store_device_ref_type = typename ViewType::cuco_set_type::ref_type; + using cuco_store_device_ref_type = typename ViewType::cuco_map_type::ref_type; static_assert(!ViewType::binary_search); @@ -253,7 +217,7 @@ struct kv_cuco_store_find_device_view_t { if constexpr (std::is_arithmetic_v) { return val; } else { - return *((*store_value_first) + val); + return *(store_value_first + val); } } } @@ -341,7 +305,7 @@ class kv_cuco_store_view_t { static constexpr bool binary_search = false; - using cuco_set_type = + using cuco_map_type = cuco::static_map, value_type, size_t>, cuco::extent, @@ -353,14 +317,14 @@ class kv_cuco_store_view_t { cuco_storage_type>; template - kv_cuco_store_view_t(cuco_set_type const* store, + kv_cuco_store_view_t(cuco_map_type const* store, std::enable_if_t, int32_t> = 0) : cuco_store_(store) { } template - kv_cuco_store_view_t(cuco_set_type const* store, + kv_cuco_store_view_t(cuco_map_type const* store, ValueIterator value_first, type invalid_value, std::enable_if_t, int32_t> = 0) @@ -380,12 +344,12 @@ class kv_cuco_store_view_t { rmm::device_uvector indices(thrust::distance(key_first, key_last), stream); auto invalid_idx = cuco_store_->empty_value_sentinel(); cuco_store_->find(key_first, key_last, indices.begin(), stream.value()); - thrust::transform( - rmm::exec_policy(stream), - indices.begin(), - indices.end(), - value_first, - indirection_if_idx_valid_t{store_value_first_, invalid_idx, invalid_value_}); + thrust::transform(rmm::exec_policy(stream), + indices.begin(), + indices.end(), + value_first, + indirection_if_idx_valid_t{ + store_value_first_, invalid_idx, invalid_value_}); } } @@ -418,11 +382,11 @@ class kv_cuco_store_view_t { } private: - cuco_set_type const* cuco_store_{}; - std::conditional_t, ValueIterator, std::byte /* dummy */> + cuco_map_type const* cuco_store_{}; + std::conditional_t, ValueIterator, std::byte /* dummy */> store_value_first_{}; - std::conditional_t, value_type, std::byte /* dummy */> + std::conditional_t, value_type, std::byte /* dummy */> invalid_value_{}; }; @@ -537,7 +501,7 @@ class kv_cuco_store_t { std::invoke_result_t), value_buffer_type&>; - using cuco_set_type = + using cuco_map_type = cuco::static_map, value_t, size_t>, cuco::extent, @@ -548,12 +512,18 @@ class kv_cuco_store_t { rmm::mr::stream_allocator_adaptor>, cuco_storage_type>; - kv_cuco_store_t(rmm::cuda_stream_view stream) {} + kv_cuco_store_t(rmm::cuda_stream_view stream) + : store_values_(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>(0, stream)) + { + } kv_cuco_store_t(size_t capacity, key_t invalid_key, value_t invalid_value, rmm::cuda_stream_view stream) + : store_values_(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>(0, stream)) { allocate(capacity, invalid_key, invalid_value, stream); capacity_ = capacity; @@ -567,7 +537,13 @@ class kv_cuco_store_t { key_t invalid_key, value_t invalid_value, rmm::cuda_stream_view stream) + : store_values_(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>(0, stream)) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); allocate(num_keys, invalid_key, invalid_value, stream); if constexpr (!std::is_arithmetic_v) { invalid_value_ = invalid_value; } @@ -583,6 +559,10 @@ class kv_cuco_store_t { ValueIterator value_first, rmm::cuda_stream_view stream) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); if (num_keys == 0) return; @@ -590,7 +570,7 @@ class kv_cuco_store_t { auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(key_first, value_first)); size_ += cuco_store_->insert(pair_first, pair_first + num_keys, stream.value()); } else { - auto old_store_value_size = size_dataframe_buffer(store_values_); + auto old_store_value_size = size_optional_dataframe_buffer(store_values_); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this adds additional // complication. @@ -601,16 +581,16 @@ class kv_cuco_store_t { rmm::exec_policy(stream), store_value_offsets.begin(), store_value_offsets.end(), - kv_cuco_insert_and_increment_t{ + kv_cuco_insert_and_increment_t{ mutable_device_ref, key_first, counter.data(), std::numeric_limits::max()}); size_ += counter.value(stream); - resize_dataframe_buffer(store_values_, size_, stream); + resize_optional_dataframe_buffer(store_values_, size_, stream); thrust::scatter_if(rmm::exec_policy(stream), value_first, value_first + num_keys, store_value_offsets.begin() /* map */, store_value_offsets.begin() /* stencil */, - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), is_not_equal_t{std::numeric_limits::max()}); } } @@ -623,6 +603,10 @@ class kv_cuco_store_t { PredOp pred_op, rmm::cuda_stream_view stream) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); if (num_keys == 0) return; @@ -631,31 +615,34 @@ class kv_cuco_store_t { size_ += cuco_store_->insert_if( pair_first, pair_first + num_keys, stencil_first, pred_op, stream.value()); } else { - auto old_store_value_size = size_dataframe_buffer(store_values_); + auto old_store_value_size = size_optional_dataframe_buffer(store_values_); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this adds additional // complication. rmm::device_scalar counter(old_store_value_size, stream); auto mutable_device_ref = cuco_store_->ref(cuco::insert_and_find); rmm::device_uvector store_value_offsets(num_keys, stream); - thrust::tabulate(rmm::exec_policy(stream), - store_value_offsets.begin(), - store_value_offsets.end(), - kv_cuco_insert_if_and_increment_t{ - mutable_device_ref, - key_first, - stencil_first, - pred_op, - counter.data(), - std::numeric_limits::max()}); + thrust::tabulate( + rmm::exec_policy(stream), + store_value_offsets.begin(), + store_value_offsets.end(), + kv_cuco_insert_if_and_increment_t{mutable_device_ref, + key_first, + stencil_first, + pred_op, + counter.data(), + std::numeric_limits::max()}); size_ += counter.value(stream); - resize_dataframe_buffer(store_values_, size_, stream); + resize_optional_dataframe_buffer(store_values_, size_, stream); thrust::scatter_if(rmm::exec_policy(stream), value_first, value_first + num_keys, store_value_offsets.begin() /* map */, store_value_offsets.begin() /* stencil */, - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), is_not_equal_t{std::numeric_limits::max()}); } } @@ -666,6 +653,10 @@ class kv_cuco_store_t { ValueIterator value_first, rmm::cuda_stream_view stream) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); if (num_keys == 0) return; @@ -674,14 +665,16 @@ class kv_cuco_store_t { // FIXME: a temporary solution till insert_and_assign is added to // cuco::static_map auto mutable_device_ref = cuco_store_->ref(cuco::insert_and_find); - thrust::for_each(rmm::exec_policy(stream), - pair_first, - pair_first + num_keys, - detail::kv_cuco_insert_and_assign_t{mutable_device_ref}); + thrust::for_each( + rmm::exec_policy(stream), + pair_first, + pair_first + num_keys, + detail::kv_cuco_insert_and_assign_t{ + mutable_device_ref}); // FIXME: this is an upper bound of size_, as some inserts may fail due to existing keys size_ += num_keys; } else { - auto old_store_value_size = size_dataframe_buffer(store_values_); + auto old_store_value_size = size_optional_dataframe_buffer(store_values_); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this adds additional // complication. @@ -692,16 +685,16 @@ class kv_cuco_store_t { rmm::exec_policy(stream), store_value_offsets.begin(), store_value_offsets.end(), - kv_cuco_insert_and_increment_t{ + kv_cuco_insert_and_increment_t{ mutable_device_ref, key_first, counter.data(), std::numeric_limits::max()}); size_ += counter.value(stream); - resize_dataframe_buffer(store_values_, size_, stream); + resize_optional_dataframe_buffer(store_values_, size_, stream); thrust::scatter_if(rmm::exec_policy(stream), value_first, value_first + num_keys, store_value_offsets.begin() /* map */, store_value_offsets.begin() /* stencil */, - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), is_not_equal_t{std::numeric_limits::max()}); // now perform assigns (for k,v pairs that failed to insert) @@ -738,19 +731,20 @@ class kv_cuco_store_t { })), stream); - thrust::for_each(rmm::exec_policy(stream), - kv_indices.begin(), - kv_indices.end(), - [key_first, - value_first, - store_value_first = get_dataframe_buffer_begin(store_values_), - device_ref = cuco_store_->ref(cuco::find)] __device__(auto kv_idx) { - size_t store_value_offset{}; - auto found = device_ref.find(*(key_first + kv_idx)); - assert(found != device_ref.end()); - store_value_offset = (*found).second; - *(store_value_first + store_value_offset) = *(value_first + kv_idx); - }); + thrust::for_each( + rmm::exec_policy(stream), + kv_indices.begin(), + kv_indices.end(), + [key_first, + value_first, + store_value_first = get_optional_dataframe_buffer_begin(store_values_), + device_ref = cuco_store_->ref(cuco::find)] __device__(auto kv_idx) { + size_t store_value_offset{}; + auto found = device_ref.find(*(key_first + kv_idx)); + assert(found != device_ref.end()); + store_value_offset = (*found).second; + *(store_value_first + store_value_offset) = *(value_first + kv_idx); + }); } } @@ -774,7 +768,7 @@ class kv_cuco_store_t { thrust::gather(rmm::exec_policy(stream), indices.begin(), indices.end(), - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), get_dataframe_buffer_begin(values)); } return std::make_tuple(std::move(keys), std::move(values)); @@ -789,12 +783,12 @@ class kv_cuco_store_t { return std::make_tuple(std::move(retrieved_keys), std::move(retrieved_values)); } - cuco_set_type const* cuco_store_ptr() const { return cuco_store_.get(); } + cuco_map_type const* cuco_store_ptr() const { return cuco_store_.get(); } template std::enable_if_t, const_value_iterator> store_value_first() const { - return get_dataframe_buffer_cbegin(store_values_); + return get_optional_dataframe_buffer_cbegin(store_values_); } key_t invalid_key() const { return cuco_store_->empty_key_sentinel(); } @@ -828,7 +822,7 @@ class kv_cuco_store_t { rmm::mr::polymorphic_allocator(rmm::mr::get_current_device_resource()), stream); if constexpr (std::is_arithmetic_v) { cuco_store_ = - std::make_unique(cuco_size, + std::make_unique(cuco_size, cuco::sentinel::empty_key{invalid_key}, cuco::sentinel::empty_value{invalid_value}, thrust::equal_to{}, @@ -839,25 +833,25 @@ class kv_cuco_store_t { stream_adapter, stream.value()); } else { - cuco_store_ = std::make_unique( + cuco_store_ = std::make_unique( cuco_size, cuco::sentinel::empty_key{invalid_key}, cuco::sentinel::empty_value{std::numeric_limits::max()}, thrust::equal_to{}, cuco::linear_probing<1, // CG size cuco::murmurhash3_32>{}, + cuco::thread_scope_device, + cuco_storage_type{}, stream_adapter, - stream); - store_values_ = allocate_dataframe_buffer(0, stream); - reserve_dataframe_buffer(store_values_, num_keys, stream); + stream.value()); + reserve_optional_dataframe_buffer(store_values_, num_keys, stream); } } - std::unique_ptr cuco_store_{nullptr}; - std::conditional_t, - decltype(allocate_dataframe_buffer(0, rmm::cuda_stream_view{})), - std::byte /* dummy */> - store_values_{}; + std::unique_ptr cuco_store_{nullptr}; + decltype(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>( + 0, rmm::cuda_stream_view{})) store_values_; std::conditional_t, value_t, std::byte /* dummy */> invalid_value_{}; diff --git a/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu b/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu index af56807746a..89d3205d051 100644 --- a/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu +++ b/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu @@ -487,15 +487,12 @@ using Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_File = using Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_Rmat = Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE; -// FIXME: this tests do not build as cugrpah::kv_store_t has a build error when use_binary_search = -// false and value_t is thrust::tuple, this will be fixed in a separate PR -#if 0 TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_File, CheckInt32Int32FloatTupleIntFloatTransposeFalse) { auto param = GetParam(); run_current_test>(std::get<0>(param), - std::get<1>(param)); + std::get<1>(param)); } TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_Rmat, @@ -524,7 +521,6 @@ TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_Rmat, std::get<0>(param), cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); } -#endif TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_File, CheckInt32Int32FloatTransposeFalse) diff --git a/python/cugraph/cugraph/sampling/node2vec.py b/python/cugraph/cugraph/sampling/node2vec.py index bc9b88250af..71fc2969f86 100644 --- a/python/cugraph/cugraph/sampling/node2vec.py +++ b/python/cugraph/cugraph/sampling/node2vec.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,10 +16,32 @@ node2vec as pylibcugraph_node2vec, ) from cugraph.utilities import ensure_cugraph_obj_for_nx +import warnings import cudf +# FIXME: Move this function to the utility module so that it can be +# shared by other algos +def ensure_valid_dtype(input_graph, start_vertices): + vertex_dtype = input_graph.edgelist.edgelist_df.dtypes.iloc[0] + if isinstance(start_vertices, cudf.Series): + start_vertices_dtype = start_vertices.dtype + else: + start_vertices_dtype = start_vertices.dtypes.iloc[0] + + if start_vertices_dtype != vertex_dtype: + warning_msg = ( + "Node2vec requires 'start_vertices' to match the graph's " + f"'vertex' type. input graph's vertex type is: {vertex_dtype} and got " + f"'start_vertices' of type: {start_vertices_dtype}." + ) + warnings.warn(warning_msg, UserWarning) + start_vertices = start_vertices.astype(vertex_dtype) + + return start_vertices + + def node2vec(G, start_vertices, max_depth=1, compress_result=True, p=1.0, q=1.0): """ Computes random walks for each node in 'start_vertices', under the @@ -120,6 +142,8 @@ def node2vec(G, start_vertices, max_depth=1, compress_result=True, p=1.0, q=1.0) else: start_vertices = G.lookup_internal_vertex_id(start_vertices) + start_vertices = ensure_valid_dtype(G, start_vertices) + vertex_set, edge_set, sizes = pylibcugraph_node2vec( resource_handle=ResourceHandle(), graph=G._plc_graph, diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py index 8ccbbfc9ec5..1c73ebb0216 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_betweenness_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -24,46 +24,49 @@ compare_scores, ) -DIRECTED_GRAPH_OPTIONS = [False, True] -WEIGHTED_GRAPH_OPTIONS = [False, True] -ENDPOINTS_OPTIONS = [False, True] -NORMALIZED_OPTIONS = [False, True] -DEFAULT_EPSILON = 0.0001 -SUBSET_SIZE_OPTIONS = [4, None] -SUBSET_SEED_OPTIONS = [42] - # ============================================================================= # Parameters # ============================================================================= -DATASETS = [karate] -# FIXME: The "preset_gpu_count" from 21.08 and below are currently not -# supported and have been removed -RESULT_DTYPE_OPTIONS = [np.float64] + +DATASETS = [karate] +DEFAULT_EPSILON = 0.0001 +IS_DIRECTED = [False, True] +ENDPOINTS = [False, True] +IS_NORMALIZED = [False, True] +RESULT_DTYPES = [np.float64] +SUBSET_SIZES = [4, None] +SUBSET_SEEDS = [42] +IS_WEIGHTED = [False, True] # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", DATASETS, ids=[f"dataset={d.get_path().stem}" for d in DATASETS] -) -@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS) -@pytest.mark.parametrize("subset_size", SUBSET_SIZE_OPTIONS) -@pytest.mark.parametrize("normalized", NORMALIZED_OPTIONS) +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.parametrize("subset_size", SUBSET_SIZES) +@pytest.mark.parametrize("normalized", IS_NORMALIZED) @pytest.mark.parametrize("weight", [None]) -@pytest.mark.parametrize("endpoints", ENDPOINTS_OPTIONS) -@pytest.mark.parametrize("subset_seed", SUBSET_SEED_OPTIONS) -@pytest.mark.parametrize("result_dtype", RESULT_DTYPE_OPTIONS) +@pytest.mark.parametrize("endpoints", ENDPOINTS) +@pytest.mark.parametrize("subset_seed", SUBSET_SEEDS) +@pytest.mark.parametrize("result_dtype", RESULT_DTYPES) def test_mg_betweenness_centrality( - graph_file, + dataset, directed, subset_size, normalized, @@ -74,7 +77,7 @@ def test_mg_betweenness_centrality( dask_client, ): sorted_df = calc_betweenness_centrality( - graph_file, + dataset, directed=directed, normalized=normalized, k=subset_size, @@ -90,3 +93,6 @@ def test_mg_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py index 154477a1a67..4530dd3da86 100644 --- a/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_batch_edge_betweenness_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -19,51 +19,49 @@ from cugraph.dask.common.mg_utils import is_single_gpu from cugraph.datasets import karate, netscience -# Get parameters from standard betwenness_centrality_test -# As tests directory is not a module, we need to add it to the path -# FIXME: Test must be reworked to import from 'cugraph.testing' instead of -# importing from other tests -from test_edge_betweenness_centrality import ( - DIRECTED_GRAPH_OPTIONS, - NORMALIZED_OPTIONS, - DEFAULT_EPSILON, - SUBSET_SIZE_OPTIONS, -) - from test_edge_betweenness_centrality import ( calc_edge_betweenness_centrality, compare_scores, ) + # ============================================================================= # Parameters # ============================================================================= -DATASETS = [karate, netscience] -# FIXME: The "preset_gpu_count" from 21.08 and below are not supported and have -# been removed -RESULT_DTYPE_OPTIONS = [np.float32, np.float64] + +DATASETS = [karate, netscience] +IS_DIRECTED = [True, False] +IS_NORMALIZED = [True, False] +DEFAULT_EPSILON = 0.0001 +SUBSET_SIZES = [4, None] +RESULT_DTYPES = [np.float32, np.float64] # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= + + def setup_function(): gc.collect() +# ============================================================================= +# Tests +# ============================================================================= + + # FIXME: Fails for directed = False(bc score twice as much) and normalized = True. @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") -@pytest.mark.parametrize( - "graph_file", DATASETS, ids=[f"dataset={d.get_path().stem}" for d in DATASETS] -) -@pytest.mark.parametrize("directed", DIRECTED_GRAPH_OPTIONS) -@pytest.mark.parametrize("subset_size", SUBSET_SIZE_OPTIONS) -@pytest.mark.parametrize("normalized", NORMALIZED_OPTIONS) -@pytest.mark.parametrize("result_dtype", RESULT_DTYPE_OPTIONS) +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.parametrize("subset_size", SUBSET_SIZES) +@pytest.mark.parametrize("normalized", IS_NORMALIZED) +@pytest.mark.parametrize("result_dtype", RESULT_DTYPES) def test_mg_edge_betweenness_centrality( - graph_file, + dataset, directed, subset_size, normalized, @@ -71,7 +69,7 @@ def test_mg_edge_betweenness_centrality( dask_client, ): sorted_df = calc_edge_betweenness_centrality( - graph_file, + dataset, directed=directed, normalized=normalized, k=subset_size, @@ -86,3 +84,5 @@ def test_mg_edge_betweenness_centrality( second_key="ref_bc", epsilon=DEFAULT_EPSILON, ) + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py index 1e20287d1e5..c94c2dcaff6 100644 --- a/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_betweenness_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -15,13 +15,11 @@ import pytest -import dask_cudf import cupy import cudf import cugraph import cugraph.dask as dcg -from cugraph.testing import utils -from pylibcugraph.testing import gen_fixture_params_product +from cugraph.datasets import karate, dolphins # ============================================================================= @@ -33,145 +31,104 @@ def setup_function(): gc.collect() -IS_DIRECTED = [True, False] +# ============================================================================= +# Parameters +# ============================================================================= +DATASETS = [karate, dolphins] +IS_DIRECTED = [True, False] +IS_NORMALIZED = [True, False] +ENDPOINTS = [True, False] +SUBSET_SEEDS = [42, None] +SUBSET_SIZES = [None, 15] +VERTEX_LIST_TYPES = [list, cudf] # ============================================================================= -# Pytest fixtures +# Helper functions # ============================================================================= -datasets = utils.DATASETS_UNDIRECTED - -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - ([False, True], "normalized"), - ([False, True], "endpoints"), - ([42, None], "subset_seed"), - ([None, 15], "subset_size"), - (IS_DIRECTED, "directed"), - ([list, cudf], "vertex_list_type"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict( - zip( - ( - "graph_file", - "normalized", - "endpoints", - "subset_seed", - "subset_size", - "directed", - "vertex_list_type", - ), - request.param, - ) + +def get_sg_graph(dataset, directed): + dataset.unload() + G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) + + return G + + +def get_mg_graph(dataset, directed): + dataset.unload() + ddf = dataset.get_dask_edgelist() + dg = cugraph.Graph(directed=directed) + dg.from_dask_cudf_edgelist( + ddf, + source="src", + destination="dst", + edge_attr="wgt", + renumber=True, + store_transposed=True, ) - return parameters + return dg -@pytest.fixture(scope="module") -def input_expected_output(input_combo): - """ - This fixture returns the inputs and expected results from the - betweenness_centrality algo based on cuGraph betweenness_centrality) which can - be used for validation. - """ +# ============================================================================= +# Tests +# ============================================================================= - input_data_path = input_combo["graph_file"] - normalized = input_combo["normalized"] - endpoints = input_combo["endpoints"] - random_state = input_combo["subset_seed"] - subset_size = input_combo["subset_size"] - directed = input_combo["directed"] - vertex_list_type = input_combo["vertex_list_type"] - G = utils.generate_cugraph_graph_from_file(input_data_path, directed=directed) +@pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.parametrize("normalized", IS_NORMALIZED) +@pytest.mark.parametrize("endpoint", ENDPOINTS) +@pytest.mark.parametrize("subset_seed", SUBSET_SEEDS) +@pytest.mark.parametrize("subset_size", SUBSET_SIZES) +@pytest.mark.parametrize("v_list_type", VERTEX_LIST_TYPES) +def test_dask_mg_betweenness_centrality( + dataset, + directed, + normalized, + endpoint, + subset_seed, + subset_size, + v_list_type, + dask_client, + benchmark, +): + g = get_sg_graph(dataset, directed) + dataset.unload() + dg = get_mg_graph(dataset, directed) + random_state = subset_seed if subset_size is None: k = subset_size elif isinstance(subset_size, int): # Select random vertices - k = G.select_random_vertices( + k = g.select_random_vertices( random_state=random_state, num_vertices=subset_size ) - if vertex_list_type is list: + if v_list_type is list: k = k.to_arrow().to_pylist() print("the seeds are \n", k) - if vertex_list_type is int: + if v_list_type is int: # This internally sample k vertices in betweenness centrality. # Since the nodes that will be sampled by each implementation will # be random, therefore sample all vertices which will make the test # consistent. - k = len(G.nodes()) - - input_combo["k"] = k + k = len(g.nodes()) sg_cugraph_bc = cugraph.betweenness_centrality( - G, k=k, normalized=normalized, endpoints=endpoints, random_state=random_state + g, k=k, normalized=normalized, endpoints=endpoint, random_state=random_state ) - # Save the results back to the input_combo dictionary to prevent redundant - # cuGraph runs. Other tests using the input_combo fixture will look for - # them, and if not present they will have to re-run the same cuGraph call. sg_cugraph_bc = sg_cugraph_bc.sort_values("vertex").reset_index(drop=True) - input_combo["sg_cugraph_results"] = sg_cugraph_bc - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - - dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist( - ddf, - source="src", - destination="dst", - edge_attr="value", - renumber=True, - store_transposed=True, - ) - - input_combo["MGGraph"] = dg - - return input_combo - - -# ============================================================================= -# Tests -# ============================================================================= - - -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) - - -@pytest.mark.mg -def test_dask_mg_betweenness_centrality(dask_client, benchmark, input_expected_output): - - dg = input_expected_output["MGGraph"] - k = input_expected_output["k"] - endpoints = input_expected_output["endpoints"] - normalized = input_expected_output["normalized"] - random_state = input_expected_output["subset_seed"] mg_bc_results = benchmark( dcg.betweenness_centrality, dg, k=k, normalized=normalized, - endpoints=endpoints, + endpoints=endpoint, random_state=random_state, ) @@ -179,12 +136,13 @@ def test_dask_mg_betweenness_centrality(dask_client, benchmark, input_expected_o mg_bc_results.compute().sort_values("vertex").reset_index(drop=True) )["betweenness_centrality"].to_cupy() - sg_bc_results = ( - input_expected_output["sg_cugraph_results"] - .sort_values("vertex") - .reset_index(drop=True) - )["betweenness_centrality"].to_cupy() + sg_bc_results = (sg_cugraph_bc.sort_values("vertex").reset_index(drop=True))[ + "betweenness_centrality" + ].to_cupy() diff = cupy.isclose(mg_bc_results, sg_bc_results) assert diff.all() + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py index 1bef1e0872b..68daff9238c 100644 --- a/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_degree_centrality_mg.py @@ -15,12 +15,12 @@ import pytest -import cudf -import dask_cudf import cugraph -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.datasets import karate_asymmetric, polbooks, email_Eu_core from cudf.testing import assert_series_equal + # ============================================================================= # Pytest Setup / Teardown - called for each test function # ============================================================================= @@ -30,44 +30,57 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate_asymmetric, polbooks, email_Eu_core] IS_DIRECTED = [True, False] -DATA_PATH = [ - (RAPIDS_DATASET_ROOT_DIR_PATH / "karate-asymmetric.csv").as_posix(), - (RAPIDS_DATASET_ROOT_DIR_PATH / "polbooks.csv").as_posix(), - (RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv").as_posix(), -] + +# ============================================================================= +# Helper functions +# ============================================================================= -@pytest.mark.mg -@pytest.mark.parametrize("directed", IS_DIRECTED) -@pytest.mark.parametrize("data_file", DATA_PATH) -def test_dask_mg_degree(dask_client, directed, data_file): - - input_data_path = data_file - chunksize = cugraph.dask.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +def get_sg_graph(dataset, directed): + dataset.unload() + G = dataset.get_graph(create_using=cugraph.Graph(directed=directed)) - df = cudf.read_csv( - input_data_path, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + return G + +def get_mg_graph(dataset, directed): + dataset.unload() + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist(ddf, "src", "dst") + dg.from_dask_cudf_edgelist( + ddf, + source="src", + destination="dst", + edge_attr="wgt", + renumber=True, + store_transposed=True, + ) + + return dg + + +# ============================================================================= +# Tests +# ============================================================================= + + +@pytest.mark.mg +@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +def test_dask_mg_degree(dask_client, dataset, directed): + dg = get_mg_graph(dataset, directed) dg.compute_renumber_edge_list() - g = cugraph.Graph(directed=directed) - g.from_cudf_edgelist(df, "src", "dst") + g = get_sg_graph(dataset, directed) merge_df_in_degree = ( dg.in_degree() @@ -105,3 +118,6 @@ def test_dask_mg_degree(dask_client, directed, data_file): check_names=False, check_dtype=False, ) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py index 31eb1d50acb..80acfe1c4ad 100644 --- a/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_edge_betweenness_centrality_mg.py @@ -14,14 +14,9 @@ import gc import pytest -import dask_cudf -from pylibcugraph.testing.utils import gen_fixture_params_product -from cugraph.datasets import karate, dolphins - import cugraph import cugraph.dask as dcg - -# from cugraph.dask.common.mg_utils import is_single_gpu +from cugraph.datasets import karate, dolphins # ============================================================================= @@ -33,79 +28,35 @@ def setup_function(): gc.collect() -IS_DIRECTED = [True, False] -INCLUDE_WEIGHTS = [False, True] -INCLUDE_EDGE_IDS = [False, True] -NORMALIZED_OPTIONS = [False, True] -SUBSET_SIZE_OPTIONS = [4, None] - - -# email_Eu_core is too expensive to test -datasets = [karate, dolphins] - - # ============================================================================= -# Pytest fixtures +# Parameters # ============================================================================= -fixture_params = gen_fixture_params_product( - (datasets, "graph_file"), - (IS_DIRECTED, "directed"), - (INCLUDE_WEIGHTS, "include_weights"), - (INCLUDE_EDGE_IDS, "include_edgeids"), - (NORMALIZED_OPTIONS, "normalized"), - (SUBSET_SIZE_OPTIONS, "subset_size"), -) - - -@pytest.fixture(scope="module", params=fixture_params) -def input_combo(request): - """ - Simply return the current combination of params as a dictionary for use in - tests or other parameterized fixtures. - """ - parameters = dict( - zip( - ( - "graph_file", - "directed", - "include_weights", - "include_edge_ids", - "normalized", - "subset_size", - "subset_seed", - ), - request.param, - ) - ) +DATASETS = [karate, dolphins] +IS_DIRECTED = [True, False] +IS_WEIGHTED = [True, False] +INCLUDE_EDGE_IDS = [True, False] +IS_NORMALIZED = [True, False] +SUBSET_SIZES = [4, None] - return parameters +# ============================================================================= +# Helper functions +# ============================================================================= -@pytest.fixture(scope="module") -def input_expected_output(input_combo): - """ - This fixture returns the inputs and expected results from the edge - betweenness centrality algo. - (based on cuGraph edge betweenness centrality) which can be used - for validation. - """ - directed = input_combo["directed"] - normalized = input_combo["normalized"] - k = input_combo["subset_size"] - subset_seed = 42 - edge_ids = input_combo["include_edge_ids"] - weight = input_combo["include_weights"] - df = input_combo["graph_file"].get_edgelist() +def get_sg_graph(dataset, directed, edge_ids): + dataset.unload() + df = dataset.get_edgelist() if edge_ids: if not directed: # Edge ids not supported for undirected graph - return + return None + dtype = df.dtypes.iloc[0] edge_id = "edge_id" - df["edge_id"] = df.index + df[edge_id] = df.index df = df.astype(dtype) else: @@ -115,30 +66,13 @@ def input_expected_output(input_combo): G.from_cudf_edgelist( df, source="src", destination="dst", weight="wgt", edge_id=edge_id ) - if isinstance(k, int): - k = G.select_random_vertices(subset_seed, k) - input_combo["k"] = k - # Save the results back to the input_combo dictionary to prevent redundant - # cuGraph runs. Other tests using the input_combo fixture will look for - # them, and if not present they will have to re-run the same cuGraph call. - sg_cugraph_edge_bc = ( - cugraph.edge_betweenness_centrality(G, k, normalized) - .sort_values(["src", "dst"]) - .reset_index(drop=True) - ) + return G - input_data_path = input_combo["graph_file"].get_path() - input_combo["sg_cugraph_results"] = sg_cugraph_edge_bc - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +def get_mg_graph(dataset, directed, edge_ids, weight): + dataset.unload() + ddf = dataset.get_dask_edgelist() if weight: weight = ddf @@ -154,20 +88,16 @@ def input_expected_output(input_combo): edge_id = None dg = cugraph.Graph(directed=directed) - dg.from_dask_cudf_edgelist( ddf, source="src", destination="dst", - weight="value", + weight="wgt", edge_id=edge_id, renumber=True, ) - input_combo["MGGraph"] = dg - input_combo["include_weights"] = weight - - return input_combo + return dg, weight # ============================================================================= @@ -175,57 +105,79 @@ def input_expected_output(input_combo): # ============================================================================= -# @pytest.mark.skipif( -# is_single_gpu(), reason="skipping MG testing on Single GPU system" -# ) @pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +@pytest.mark.parametrize("directed", IS_DIRECTED) +@pytest.mark.parametrize("weighted", IS_WEIGHTED) +@pytest.mark.parametrize("edge_ids", INCLUDE_EDGE_IDS) +@pytest.mark.parametrize("normalized", IS_NORMALIZED) +@pytest.mark.parametrize("subset_size", SUBSET_SIZES) def test_dask_mg_edge_betweenness_centrality( - dask_client, benchmark, input_expected_output + dask_client, + dataset, + directed, + weighted, + edge_ids, + normalized, + subset_size, + benchmark, ): - if input_expected_output is not None: - dg = input_expected_output["MGGraph"] - k = input_expected_output["k"] - normalized = input_expected_output["normalized"] - weight = input_expected_output["include_weights"] - if weight is not None: - with pytest.raises(NotImplementedError): - result_edge_bc = benchmark( - dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight - ) - - else: + g = get_sg_graph(dataset, directed, edge_ids) + + if g is None: + pytest.skip("Edge_ids not supported for undirected graph") + + dg, weight = get_mg_graph(dataset, directed, edge_ids, weighted) + subset_seed = 42 + + k = subset_size + if isinstance(k, int): + k = g.select_random_vertices(subset_seed, k) + + sg_cugraph_edge_bc = ( + cugraph.edge_betweenness_centrality(g, k, normalized) + .sort_values(["src", "dst"]) + .reset_index(drop=True) + ) + + if weight is not None: + with pytest.raises(NotImplementedError): result_edge_bc = benchmark( dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight ) - result_edge_bc = ( - result_edge_bc.compute() - .sort_values(["src", "dst"]) - .reset_index(drop=True) - .rename(columns={"betweenness_centrality": "mg_betweenness_centrality"}) - ) - if len(result_edge_bc.columns) > 3: - result_edge_bc = result_edge_bc.rename( - columns={"edge_id": "mg_edge_id"} - ) + else: + result_edge_bc = benchmark( + dcg.edge_betweenness_centrality, dg, k, normalized, weight=weight + ) + result_edge_bc = ( + result_edge_bc.compute() + .sort_values(["src", "dst"]) + .reset_index(drop=True) + .rename(columns={"betweenness_centrality": "mg_betweenness_centrality"}) + ) - expected_output = input_expected_output["sg_cugraph_results"].reset_index( - drop=True - ) - result_edge_bc["betweenness_centrality"] = expected_output[ - "betweenness_centrality" - ] - if len(expected_output.columns) > 3: - result_edge_bc["edge_id"] = expected_output["edge_id"] - edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id") - assert len(edge_id_diff) == 0 - - edge_bc_diffs1 = result_edge_bc.query( - "mg_betweenness_centrality - betweenness_centrality > 0.01" - ) - edge_bc_diffs2 = result_edge_bc.query( - "betweenness_centrality - mg_betweenness_centrality < -0.01" - ) + if len(result_edge_bc.columns) > 3: + result_edge_bc = result_edge_bc.rename(columns={"edge_id": "mg_edge_id"}) + + expected_output = sg_cugraph_edge_bc.reset_index(drop=True) + result_edge_bc["betweenness_centrality"] = expected_output[ + "betweenness_centrality" + ] + if len(expected_output.columns) > 3: + result_edge_bc["edge_id"] = expected_output["edge_id"] + edge_id_diff = result_edge_bc.query("mg_edge_id != edge_id") + assert len(edge_id_diff) == 0 + + edge_bc_diffs1 = result_edge_bc.query( + "mg_betweenness_centrality - betweenness_centrality > 0.01" + ) + edge_bc_diffs2 = result_edge_bc.query( + "betweenness_centrality - mg_betweenness_centrality < -0.01" + ) + + assert len(edge_bc_diffs1) == 0 + assert len(edge_bc_diffs2) == 0 - assert len(edge_bc_diffs1) == 0 - assert len(edge_bc_diffs2) == 0 + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py index e2ce7d2c341..8cd77fb5e24 100644 --- a/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_eigenvector_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,11 +16,10 @@ import pytest import cudf -import dask_cudf import cugraph import cugraph.dask as dcg from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import DATASETS +from cugraph.datasets import karate_disjoint, dolphins, netscience # ============================================================================= @@ -32,28 +31,34 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate_disjoint, dolphins, netscience] IS_DIRECTED = [True, False] +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -@pytest.mark.parametrize("input_data_path", DATASETS) -def test_dask_mg_eigenvector_centrality(dask_client, directed, input_data_path): - input_data_path = input_data_path.as_posix() +def test_dask_mg_eigenvector_centrality(dask_client, dataset, directed): + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + dataset.unload() + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) mg_res = dcg.eigenvector_centrality(dg, tol=1e-6) mg_res = mg_res.compute() + import networkx as nx from cugraph.testing import utils @@ -84,21 +89,16 @@ def test_dask_mg_eigenvector_centrality(dask_client, directed, input_data_path): err = err + 1 assert err == 0 + # Clean-up stored dataset edge-lists + dataset.unload() + @pytest.mark.mg def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): - input_data_path = DATASETS[0] - - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + dataset = DATASETS[0] + dataset.unload() + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -110,3 +110,6 @@ def test_dask_mg_eigenvector_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.eigenvector_centrality(dg) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py index 72b81ce50bb..ebbe5974814 100644 --- a/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py +++ b/python/cugraph/cugraph/tests/centrality/test_katz_centrality_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -16,11 +16,10 @@ import pytest import cudf -import dask_cudf import cugraph import cugraph.dask as dcg from cugraph.dask.common.mg_utils import is_single_gpu -from cugraph.testing.utils import RAPIDS_DATASET_ROOT_DIR_PATH +from cugraph.datasets import karate # ============================================================================= @@ -32,26 +31,30 @@ def setup_function(): gc.collect() +# ============================================================================= +# Parameters +# ============================================================================= + + +DATASETS = [karate] IS_DIRECTED = [True, False] +# ============================================================================= +# Tests +# ============================================================================= + + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_katz_centrality(dask_client, directed): - - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() +def test_dask_mg_katz_centrality(dask_client, dataset, directed): + input_data_path = dataset.get_path() print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) + dataset.unload() + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -92,23 +95,17 @@ def test_dask_mg_katz_centrality(dask_client, directed): err = err + 1 assert err == 0 + # Clean-up stored dataset edge-lists + dataset.unload() + @pytest.mark.mg @pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system") +@pytest.mark.parametrize("dataset", DATASETS) @pytest.mark.parametrize("directed", IS_DIRECTED) -def test_dask_mg_katz_centrality_nstart(dask_client, directed): - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - print(f"dataset={input_data_path}") - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) - +def test_dask_mg_katz_centrality_nstart(dask_client, dataset, directed): + dataset.unload() + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=True) @@ -139,21 +136,15 @@ def test_dask_mg_katz_centrality_nstart(dask_client, directed): err = err + 1 assert err == 0 + # Clean-up stored dataset edge-lists + dataset.unload() -@pytest.mark.mg -def test_dask_mg_katz_centrality_transposed_false(dask_client): - input_data_path = (RAPIDS_DATASET_ROOT_DIR_PATH / "karate.csv").as_posix() - - chunksize = dcg.get_chunksize(input_data_path) - - ddf = dask_cudf.read_csv( - input_data_path, - chunksize=chunksize, - delimiter=" ", - names=["src", "dst", "value"], - dtype=["int32", "int32", "float32"], - ) +@pytest.mark.mg +@pytest.mark.parametrize("dataset", DATASETS) +def test_dask_mg_katz_centrality_transposed_false(dask_client, dataset): + dataset.unload() + ddf = dataset.get_dask_edgelist() dg = cugraph.Graph(directed=True) dg.from_dask_cudf_edgelist(ddf, "src", "dst", store_transposed=False) @@ -165,3 +156,6 @@ def test_dask_mg_katz_centrality_transposed_false(dask_client): with pytest.warns(UserWarning, match=warning_msg): dcg.katz_centrality(dg) + + # Clean-up stored dataset edge-lists + dataset.unload() diff --git a/python/cugraph/cugraph/tests/sampling/test_node2vec.py b/python/cugraph/cugraph/tests/sampling/test_node2vec.py index 0bfdd460cae..00c32705338 100644 --- a/python/cugraph/cugraph/tests/sampling/test_node2vec.py +++ b/python/cugraph/cugraph/tests/sampling/test_node2vec.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -27,6 +27,7 @@ # ============================================================================= DIRECTED_GRAPH_OPTIONS = [False, True] COMPRESSED = [False, True] +START_VERTICES_TYPE = ["int32", "int64"] LINE = small_line KARATE = karate @@ -150,11 +151,8 @@ def test_node2vec_line(graph_file, directed): @pytest.mark.parametrize(*_get_param_args("graph_file", SMALL_DATASETS)) @pytest.mark.parametrize(*_get_param_args("directed", DIRECTED_GRAPH_OPTIONS)) @pytest.mark.parametrize(*_get_param_args("compress", COMPRESSED)) -def test_node2vec( - graph_file, - directed, - compress, -): +@pytest.mark.parametrize(*_get_param_args("start_vertices_type", START_VERTICES_TYPE)) +def test_node2vec(graph_file, directed, compress, start_vertices_type): dataset_path = graph_file.get_path() cu_M = utils.read_csv_file(dataset_path) @@ -165,8 +163,23 @@ def test_node2vec( ) num_verts = G.number_of_vertices() k = random.randint(6, 12) - start_vertices = cudf.Series(random.sample(range(num_verts), k), dtype="int32") + # FIXME: Random sample can make it hard to debug + start_vertices = cudf.Series( + random.sample(range(num_verts), k), dtype=start_vertices_type + ) max_depth = 5 + + if start_vertices_type == "int64": + warning_msg = ( + "Node2vec requires 'start_vertices' to match the graph's " + "'vertex' type. input graph's vertex type is: int32 and " + "got 'start_vertices' of type: int64." + ) + with pytest.warns(UserWarning, match=warning_msg): + calc_node2vec( + G, start_vertices, max_depth, compress_result=compress, p=0.8, q=0.5 + ) + result, seeds = calc_node2vec( G, start_vertices, max_depth, compress_result=compress, p=0.8, q=0.5 ) diff --git a/python/nx-cugraph/README.md b/python/nx-cugraph/README.md index 1bf310c8c88..77066356a4b 100644 --- a/python/nx-cugraph/README.md +++ b/python/nx-cugraph/README.md @@ -7,11 +7,10 @@ to run supported algorithms with GPU acceleration. ## System Requirements nx-cugraph requires the following: - - * NVIDIA GPU, Pascal architecture or later + * NVIDIA GPU, Volta architecture or later, with [compute capability](https://developer.nvidia.com/cuda-gpus) 7.0+ * CUDA 11.2, 11.4, 11.5, 11.8, or 12.0 - * Python versions 3.9, 3.10, or 3.11 - * NetworkX >= version 3.2 + * Python version 3.9, 3.10, or 3.11 + * NetworkX >= version 3.0 (version 3.2 or higher recommended) More details about system requirements can be found in the [RAPIDS System Requirements documentation](https://docs.rapids.ai/install#system-req). @@ -20,16 +19,25 @@ More details about system requirements can be found in the [RAPIDS System Requir nx-cugraph can be installed using either conda or pip. ### conda +#### latest nightly version ``` conda install -c rapidsai-nightly -c conda-forge -c nvidia nx-cugraph ``` +#### latest stable version +``` +conda install -c rapidsai -c conda-forge -c nvidia nx-cugraph +``` ### pip +#### latest nightly version +``` +python -m pip install nx-cugraph-cu11 --extra-index-url https://pypi.anaconda.org/rapidsai-wheels-nightly/simple +``` +#### latest stable version ``` python -m pip install nx-cugraph-cu11 --extra-index-url https://pypi.nvidia.com ``` Notes: - - * Nightly wheel builds will not be available until the 23.12 release, therefore the index URL for the stable release version is being used in the pip install command above. + * The pip example above installs for CUDA 11. To install for CUDA 12, replace `-cu11` with `-cu12` * Additional information relevant to installing any RAPIDS package can be found [here](https://rapids.ai/#quick-start). ## Enabling nx-cugraph diff --git a/python/nx-cugraph/_nx_cugraph/__init__.py b/python/nx-cugraph/_nx_cugraph/__init__.py index 8c6a6504675..098de46af8e 100644 --- a/python/nx-cugraph/_nx_cugraph/__init__.py +++ b/python/nx-cugraph/_nx_cugraph/__init__.py @@ -23,18 +23,19 @@ $ python _nx_cugraph/__init__.py """ -from packaging.version import Version - from _nx_cugraph._version import __version__ -_nx_cugraph_version = Version(__version__) +# This is normally handled by packaging.version.Version, but instead of adding +# an additional runtime dependency on "packaging", assume __version__ will +# always be in .. format. +(_version_major, _version_minor) = __version__.split(".")[:2] # Entries between BEGIN and END are automatically generated _info = { "backend_name": "cugraph", "project": "nx-cugraph", "package": "nx_cugraph", - "url": f"https://github.com/rapidsai/cugraph/tree/branch-{_nx_cugraph_version.major:02}.{_nx_cugraph_version.minor:02}/python/nx-cugraph", + "url": f"https://github.com/rapidsai/cugraph/tree/branch-{_version_major:0>2}.{_version_minor:0>2}/python/nx-cugraph", "short_summary": "GPU-accelerated backend.", # "description": "TODO", "functions": { @@ -261,6 +262,22 @@ def get_info(): if __name__ == "__main__": from pathlib import Path + # This script imports nx_cugraph modules, which imports nx_cugraph runtime + # dependencies. The modules do not need the runtime deps, so stub them out + # to avoid installing them. + class Stub: + def __getattr__(self, *args, **kwargs): + return Stub() + + def __call__(self, *args, **kwargs): + return Stub() + + import sys + + sys.modules["cupy"] = Stub() + sys.modules["numpy"] = Stub() + sys.modules["pylibcugraph"] = Stub() + from _nx_cugraph.core import main filepath = Path(__file__) diff --git a/python/nx-cugraph/_nx_cugraph/core.py b/python/nx-cugraph/_nx_cugraph/core.py index c4de197f3b1..82ce7bc438a 100644 --- a/python/nx-cugraph/_nx_cugraph/core.py +++ b/python/nx-cugraph/_nx_cugraph/core.py @@ -45,18 +45,27 @@ def update_text(text, lines_to_add, target, indent=" " * 8): return f"{text[:start]}{begin}{to_add}\n{indent}{text[stop:]}" +def dq_repr(s): + """Return repr(s) quoted with the double quote preference used by black.""" + rs = repr(s) + if rs.startswith("'") and '"' not in rs: + rs = rs.strip("'") + return f'"{rs}"' + return rs + + def dict_to_lines(d, *, indent=""): for key in sorted(d): val = d[key] if "\n" not in val: - yield f"{indent}{key!r}: {val!r}," + yield f"{indent}{dq_repr(key)}: {dq_repr(val)}," else: - yield f"{indent}{key!r}: (" + yield f"{indent}{dq_repr(key)}: (" *lines, last_line = val.split("\n") for line in lines: line += "\n" - yield f" {indent}{line!r}" - yield f" {indent}{last_line!r}" + yield f" {indent}{dq_repr(line)}" + yield f" {indent}{dq_repr(last_line)}" yield f"{indent})," @@ -83,7 +92,7 @@ def main(filepath): to_add = [] for name in sorted(additional_parameters): params = additional_parameters[name] - to_add.append(f"{name!r}: {{") + to_add.append(f"{dq_repr(name)}: {{") to_add.extend(dict_to_lines(params, indent=" " * 4)) to_add.append("},") text = update_text(text, to_add, "additional_parameters") diff --git a/python/nx-cugraph/scripts/update_readme.py b/python/nx-cugraph/scripts/update_readme.py index 1ab5a76c4c0..fcaa1769d8b 100755 --- a/python/nx-cugraph/scripts/update_readme.py +++ b/python/nx-cugraph/scripts/update_readme.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright (c) 2024, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,12 +12,13 @@ # limitations under the License. import argparse import re +import urllib.request import zlib from collections import namedtuple from pathlib import Path from warnings import warn -from nx_cugraph.scripts.print_tree import create_tree, tree_lines +_objs_file_url = "https://networkx.org/documentation/stable/objects.inv" # See: https://sphobjinv.readthedocs.io/en/stable/syntax.html DocObject = namedtuple( @@ -75,6 +75,8 @@ def replace_body(text, match, new_body): def main(readme_file, objects_filename): """``readme_file`` must be readable and writable, so use mode ``"a+"``""" + from nx_cugraph.scripts.print_tree import create_tree, tree_lines + # Use the `objects.inv` file to determine URLs. For details about this file, see: # https://sphobjinv.readthedocs.io/en/stable/syntax.html # We might be better off using a library like that, but roll our own for now. @@ -190,14 +192,59 @@ def get_payload_internal(keys): return text +def find_or_download_objs_file(objs_file_dir): + """ + Returns the path to /objects.inv, downloading it from + _objs_file_url if it does not already exist. + """ + objs_file_path = objs_file_dir / "objects.inv" + if not objs_file_path.exists(): + request = urllib.request.Request(_objs_file_url) + with ( + urllib.request.urlopen(request) as response, + Path(objs_file_path).open("wb") as out, + ): + out.write(response.read()) + return objs_file_path + + if __name__ == "__main__": + # This script imports a nx_cugraph script module, which imports nx_cugraph + # runtime dependencies. The script module does not need the runtime deps, + # so stub them out to avoid installing them. + class Stub: + def __getattr__(self, *args, **kwargs): + return Stub() + + def __call__(self, *args, **kwargs): + return Stub() + + import sys + + sys.modules["cupy"] = Stub() + sys.modules["numpy"] = Stub() + sys.modules["pylibcugraph"] = Stub() + parser = argparse.ArgumentParser( "Update README.md to show NetworkX functions implemented by nx-cugraph" ) parser.add_argument("readme_filename", help="Path to the README.md file") parser.add_argument( - "networkx_objects", help="Path to the objects.inv file from networkx docs" + "networkx_objects", + nargs="?", + default=None, + help="Optional path to the objects.inv file from the NetworkX docs. Default is " + "the objects.inv file in the directory containing the specified README.md. If " + "an objects.inv file does not exist in that location, one will be downloaded " + "and saved to that location.", ) args = parser.parse_args() - with Path(args.readme_filename).open("a+") as readme_file: - main(readme_file, args.networkx_objects) + + readme_filename = args.readme_filename + readme_path = Path(readme_filename) + objects_filename = args.networkx_objects + if objects_filename is None: + objects_filename = find_or_download_objs_file(readme_path.parent) + + with readme_path.open("a+") as readme_file: + main(readme_file, objects_filename)