diff --git a/cpp/src/prims/detail/optional_dataframe_buffer.hpp b/cpp/src/prims/detail/optional_dataframe_buffer.hpp index 62b2245a651..87c095f8e81 100644 --- a/cpp/src/prims/detail/optional_dataframe_buffer.hpp +++ b/cpp/src/prims/detail/optional_dataframe_buffer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,8 @@ #include +#include + namespace cugraph { namespace detail { @@ -58,12 +60,72 @@ void* get_optional_dataframe_buffer_begin(std::byte& optional_dataframe_buffer) template >* = nullptr> auto get_optional_dataframe_buffer_begin( - std::add_lvalue_reference_t( - size_t{0}, rmm::cuda_stream_view{}))> optional_dataframe_buffer) + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer) { return get_dataframe_buffer_begin(optional_dataframe_buffer); } +template >* = nullptr> +void* get_optional_dataframe_buffer_end(std::byte& optional_dataframe_buffer) +{ + return static_cast(nullptr); +} + +template >* = nullptr> +auto get_optional_dataframe_buffer_end( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer) +{ + return get_dataframe_buffer_end(optional_dataframe_buffer); +} + +template >* = nullptr> +void const* get_optional_dataframe_buffer_cbegin(std::byte const& optional_dataframe_buffer) +{ + return static_cast(nullptr); +} + +template >* = nullptr> +auto get_optional_dataframe_buffer_cbegin( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))> const& + optional_dataframe_buffer) +{ + return get_dataframe_buffer_cbegin(optional_dataframe_buffer); +} + +template >* = nullptr> +void const* get_optional_dataframe_buffer_cend(std::byte const& optional_dataframe_buffer) +{ + return static_cast(nullptr); +} + +template >* = nullptr> +auto get_optional_dataframe_buffer_cend( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))> const& + optional_dataframe_buffer) +{ + return get_dataframe_buffer_cend(optional_dataframe_buffer); +} + +template >* = nullptr> +void reserve_optional_dataframe_buffer(std::byte& optional_dataframe_buffer, + size_t new_buffer_capacity, + rmm::cuda_stream_view stream_view) +{ + return; +} + +template >* = nullptr> +void reserve_optional_dataframe_buffer( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer, + size_t new_buffer_capacity, + rmm::cuda_stream_view stream_view) +{ + return reserve_dataframe_buffer(optional_dataframe_buffer, new_buffer_capacity, stream_view); +} + template >* = nullptr> void resize_optional_dataframe_buffer(std::byte& optional_dataframe_buffer, size_t new_buffer_size, @@ -74,8 +136,8 @@ void resize_optional_dataframe_buffer(std::byte& optional_dataframe_buffer, template >* = nullptr> void resize_optional_dataframe_buffer( - std::add_lvalue_reference_t( - size_t{0}, rmm::cuda_stream_view{}))> optional_dataframe_buffer, + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer, size_t new_buffer_size, rmm::cuda_stream_view stream_view) { @@ -91,13 +153,27 @@ void shrink_to_fit_optional_dataframe_buffer(std::byte& optional_dataframe_buffe template >* = nullptr> void shrink_to_fit_optional_dataframe_buffer( - std::add_lvalue_reference_t( - size_t{0}, rmm::cuda_stream_view{}))> optional_dataframe_buffer, + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))>& + optional_dataframe_buffer, rmm::cuda_stream_view stream_view) { return shrink_to_fit_dataframe_buffer(optional_dataframe_buffer, stream_view); } +template >* = nullptr> +size_t size_optional_dataframe_buffer(std::byte const& optional_dataframe_buffer) +{ + return size_t{0}; +} + +template >* = nullptr> +size_t size_optional_dataframe_buffer( + std::decay_t(size_t{0}, rmm::cuda_stream_view{}))> const& + optional_dataframe_buffer) +{ + return size_dataframe_buffer(optional_dataframe_buffer); +} + } // namespace detail } // namespace cugraph diff --git a/cpp/src/prims/kv_store.cuh b/cpp/src/prims/kv_store.cuh index 4c728d58930..bc1d88217f6 100644 --- a/cpp/src/prims/kv_store.cuh +++ b/cpp/src/prims/kv_store.cuh @@ -15,6 +15,8 @@ */ #pragma once +#include "prims/detail/optional_dataframe_buffer.hpp" + #include #include @@ -87,21 +89,9 @@ struct kv_binary_search_contains_op_t { } }; -template +template struct kv_cuco_insert_and_increment_t { - using key_type = typename thrust::iterator_traits::value_type; - using cuco_set_type = - cuco::static_map, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, // CG size - cuco::murmurhash3_32>, - rmm::mr::stream_allocator_adaptor>, - cuco_storage_type>; - - typename cuco_set_type::ref_type device_ref{}; + RefType device_ref{}; KeyIterator key_first{}; size_t* counter{nullptr}; size_t invalid_idx{}; @@ -112,9 +102,8 @@ struct kv_cuco_insert_and_increment_t { auto [iter, inserted] = device_ref.insert_and_find(pair); if (inserted) { cuda::atomic_ref atomic_counter(*counter); - auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); - using ref_type = typename cuco_set_type::ref_type; - cuda::atomic_ref ref( + auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); + cuda::atomic_ref ref( (*iter).second); ref.store(idx, cuda::std::memory_order_relaxed); return idx; @@ -124,21 +113,9 @@ struct kv_cuco_insert_and_increment_t { } }; -template +template struct kv_cuco_insert_if_and_increment_t { - using key_type = typename thrust::iterator_traits::value_type; - using cuco_set_type = - cuco::static_map, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, // CG size - cuco::murmurhash3_32>, - rmm::mr::stream_allocator_adaptor>, - cuco_storage_type>; - - typename cuco_set_type::ref_type device_ref{}; + RefType device_ref{}; KeyIterator key_first{}; StencilIterator stencil_first{}; PredOp pred_op{}; @@ -153,9 +130,8 @@ struct kv_cuco_insert_if_and_increment_t { auto [iter, inserted] = device_ref.insert_and_find(pair); if (inserted) { cuda::atomic_ref atomic_counter(*counter); - auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); - using ref_type = typename cuco_set_type::ref_type; - cuda::atomic_ref ref( + auto idx = atomic_counter.fetch_add(size_t{1}, cuda::std::memory_order_relaxed); + cuda::atomic_ref ref( (*iter).second); ref.store(idx, cuda::std::memory_order_relaxed); return idx; @@ -165,27 +141,15 @@ struct kv_cuco_insert_if_and_increment_t { } }; -template +template struct kv_cuco_insert_and_assign_t { - using cuco_set_type = - cuco::static_map, value_t, size_t>, - cuco::extent, - cuda::thread_scope_device, - thrust::equal_to, - cuco::linear_probing<1, // CG size - cuco::murmurhash3_32>, - rmm::mr::stream_allocator_adaptor>, - cuco_storage_type>; - - typename cuco_set_type::ref_type device_ref{}; + RefType device_ref{}; __device__ void operator()(thrust::tuple pair) { auto [iter, inserted] = device_ref.insert_and_find(pair); if (!inserted) { - using ref_type = typename cuco_set_type::ref_type; - cuda::atomic_ref ref( + cuda::atomic_ref ref( (*iter).second); ref.store(thrust::get<1>(pair), cuda::std::memory_order_relaxed); } @@ -228,7 +192,7 @@ template struct kv_cuco_store_find_device_view_t { using key_type = typename ViewType::key_type; using value_type = typename ViewType::value_type; - using cuco_store_device_ref_type = typename ViewType::cuco_set_type::ref_type; + using cuco_store_device_ref_type = typename ViewType::cuco_map_type::ref_type; static_assert(!ViewType::binary_search); @@ -253,7 +217,7 @@ struct kv_cuco_store_find_device_view_t { if constexpr (std::is_arithmetic_v) { return val; } else { - return *((*store_value_first) + val); + return *(store_value_first + val); } } } @@ -341,7 +305,7 @@ class kv_cuco_store_view_t { static constexpr bool binary_search = false; - using cuco_set_type = + using cuco_map_type = cuco::static_map, value_type, size_t>, cuco::extent, @@ -353,14 +317,14 @@ class kv_cuco_store_view_t { cuco_storage_type>; template - kv_cuco_store_view_t(cuco_set_type const* store, + kv_cuco_store_view_t(cuco_map_type const* store, std::enable_if_t, int32_t> = 0) : cuco_store_(store) { } template - kv_cuco_store_view_t(cuco_set_type const* store, + kv_cuco_store_view_t(cuco_map_type const* store, ValueIterator value_first, type invalid_value, std::enable_if_t, int32_t> = 0) @@ -380,12 +344,12 @@ class kv_cuco_store_view_t { rmm::device_uvector indices(thrust::distance(key_first, key_last), stream); auto invalid_idx = cuco_store_->empty_value_sentinel(); cuco_store_->find(key_first, key_last, indices.begin(), stream.value()); - thrust::transform( - rmm::exec_policy(stream), - indices.begin(), - indices.end(), - value_first, - indirection_if_idx_valid_t{store_value_first_, invalid_idx, invalid_value_}); + thrust::transform(rmm::exec_policy(stream), + indices.begin(), + indices.end(), + value_first, + indirection_if_idx_valid_t{ + store_value_first_, invalid_idx, invalid_value_}); } } @@ -418,11 +382,11 @@ class kv_cuco_store_view_t { } private: - cuco_set_type const* cuco_store_{}; - std::conditional_t, ValueIterator, std::byte /* dummy */> + cuco_map_type const* cuco_store_{}; + std::conditional_t, ValueIterator, std::byte /* dummy */> store_value_first_{}; - std::conditional_t, value_type, std::byte /* dummy */> + std::conditional_t, value_type, std::byte /* dummy */> invalid_value_{}; }; @@ -537,7 +501,7 @@ class kv_cuco_store_t { std::invoke_result_t), value_buffer_type&>; - using cuco_set_type = + using cuco_map_type = cuco::static_map, value_t, size_t>, cuco::extent, @@ -548,12 +512,18 @@ class kv_cuco_store_t { rmm::mr::stream_allocator_adaptor>, cuco_storage_type>; - kv_cuco_store_t(rmm::cuda_stream_view stream) {} + kv_cuco_store_t(rmm::cuda_stream_view stream) + : store_values_(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>(0, stream)) + { + } kv_cuco_store_t(size_t capacity, key_t invalid_key, value_t invalid_value, rmm::cuda_stream_view stream) + : store_values_(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>(0, stream)) { allocate(capacity, invalid_key, invalid_value, stream); capacity_ = capacity; @@ -567,7 +537,13 @@ class kv_cuco_store_t { key_t invalid_key, value_t invalid_value, rmm::cuda_stream_view stream) + : store_values_(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>(0, stream)) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); allocate(num_keys, invalid_key, invalid_value, stream); if constexpr (!std::is_arithmetic_v) { invalid_value_ = invalid_value; } @@ -583,6 +559,10 @@ class kv_cuco_store_t { ValueIterator value_first, rmm::cuda_stream_view stream) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); if (num_keys == 0) return; @@ -590,7 +570,7 @@ class kv_cuco_store_t { auto pair_first = thrust::make_zip_iterator(thrust::make_tuple(key_first, value_first)); size_ += cuco_store_->insert(pair_first, pair_first + num_keys, stream.value()); } else { - auto old_store_value_size = size_dataframe_buffer(store_values_); + auto old_store_value_size = size_optional_dataframe_buffer(store_values_); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this adds additional // complication. @@ -601,16 +581,16 @@ class kv_cuco_store_t { rmm::exec_policy(stream), store_value_offsets.begin(), store_value_offsets.end(), - kv_cuco_insert_and_increment_t{ + kv_cuco_insert_and_increment_t{ mutable_device_ref, key_first, counter.data(), std::numeric_limits::max()}); size_ += counter.value(stream); - resize_dataframe_buffer(store_values_, size_, stream); + resize_optional_dataframe_buffer(store_values_, size_, stream); thrust::scatter_if(rmm::exec_policy(stream), value_first, value_first + num_keys, store_value_offsets.begin() /* map */, store_value_offsets.begin() /* stencil */, - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), is_not_equal_t{std::numeric_limits::max()}); } } @@ -623,6 +603,10 @@ class kv_cuco_store_t { PredOp pred_op, rmm::cuda_stream_view stream) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); if (num_keys == 0) return; @@ -631,31 +615,34 @@ class kv_cuco_store_t { size_ += cuco_store_->insert_if( pair_first, pair_first + num_keys, stencil_first, pred_op, stream.value()); } else { - auto old_store_value_size = size_dataframe_buffer(store_values_); + auto old_store_value_size = size_optional_dataframe_buffer(store_values_); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this adds additional // complication. rmm::device_scalar counter(old_store_value_size, stream); auto mutable_device_ref = cuco_store_->ref(cuco::insert_and_find); rmm::device_uvector store_value_offsets(num_keys, stream); - thrust::tabulate(rmm::exec_policy(stream), - store_value_offsets.begin(), - store_value_offsets.end(), - kv_cuco_insert_if_and_increment_t{ - mutable_device_ref, - key_first, - stencil_first, - pred_op, - counter.data(), - std::numeric_limits::max()}); + thrust::tabulate( + rmm::exec_policy(stream), + store_value_offsets.begin(), + store_value_offsets.end(), + kv_cuco_insert_if_and_increment_t{mutable_device_ref, + key_first, + stencil_first, + pred_op, + counter.data(), + std::numeric_limits::max()}); size_ += counter.value(stream); - resize_dataframe_buffer(store_values_, size_, stream); + resize_optional_dataframe_buffer(store_values_, size_, stream); thrust::scatter_if(rmm::exec_policy(stream), value_first, value_first + num_keys, store_value_offsets.begin() /* map */, store_value_offsets.begin() /* stencil */, - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), is_not_equal_t{std::numeric_limits::max()}); } } @@ -666,6 +653,10 @@ class kv_cuco_store_t { ValueIterator value_first, rmm::cuda_stream_view stream) { + static_assert(std::is_same_v::value_type, key_t>); + static_assert( + std::is_same_v::value_type, value_t>); + auto num_keys = static_cast(thrust::distance(key_first, key_last)); if (num_keys == 0) return; @@ -674,14 +665,16 @@ class kv_cuco_store_t { // FIXME: a temporary solution till insert_and_assign is added to // cuco::static_map auto mutable_device_ref = cuco_store_->ref(cuco::insert_and_find); - thrust::for_each(rmm::exec_policy(stream), - pair_first, - pair_first + num_keys, - detail::kv_cuco_insert_and_assign_t{mutable_device_ref}); + thrust::for_each( + rmm::exec_policy(stream), + pair_first, + pair_first + num_keys, + detail::kv_cuco_insert_and_assign_t{ + mutable_device_ref}); // FIXME: this is an upper bound of size_, as some inserts may fail due to existing keys size_ += num_keys; } else { - auto old_store_value_size = size_dataframe_buffer(store_values_); + auto old_store_value_size = size_optional_dataframe_buffer(store_values_); // FIXME: we can use cuda::atomic instead but currently on a system with x86 + GPU, this // requires placing the atomic variable on managed memory and this adds additional // complication. @@ -692,16 +685,16 @@ class kv_cuco_store_t { rmm::exec_policy(stream), store_value_offsets.begin(), store_value_offsets.end(), - kv_cuco_insert_and_increment_t{ + kv_cuco_insert_and_increment_t{ mutable_device_ref, key_first, counter.data(), std::numeric_limits::max()}); size_ += counter.value(stream); - resize_dataframe_buffer(store_values_, size_, stream); + resize_optional_dataframe_buffer(store_values_, size_, stream); thrust::scatter_if(rmm::exec_policy(stream), value_first, value_first + num_keys, store_value_offsets.begin() /* map */, store_value_offsets.begin() /* stencil */, - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), is_not_equal_t{std::numeric_limits::max()}); // now perform assigns (for k,v pairs that failed to insert) @@ -738,19 +731,20 @@ class kv_cuco_store_t { })), stream); - thrust::for_each(rmm::exec_policy(stream), - kv_indices.begin(), - kv_indices.end(), - [key_first, - value_first, - store_value_first = get_dataframe_buffer_begin(store_values_), - device_ref = cuco_store_->ref(cuco::find)] __device__(auto kv_idx) { - size_t store_value_offset{}; - auto found = device_ref.find(*(key_first + kv_idx)); - assert(found != device_ref.end()); - store_value_offset = (*found).second; - *(store_value_first + store_value_offset) = *(value_first + kv_idx); - }); + thrust::for_each( + rmm::exec_policy(stream), + kv_indices.begin(), + kv_indices.end(), + [key_first, + value_first, + store_value_first = get_optional_dataframe_buffer_begin(store_values_), + device_ref = cuco_store_->ref(cuco::find)] __device__(auto kv_idx) { + size_t store_value_offset{}; + auto found = device_ref.find(*(key_first + kv_idx)); + assert(found != device_ref.end()); + store_value_offset = (*found).second; + *(store_value_first + store_value_offset) = *(value_first + kv_idx); + }); } } @@ -774,7 +768,7 @@ class kv_cuco_store_t { thrust::gather(rmm::exec_policy(stream), indices.begin(), indices.end(), - get_dataframe_buffer_begin(store_values_), + get_optional_dataframe_buffer_begin(store_values_), get_dataframe_buffer_begin(values)); } return std::make_tuple(std::move(keys), std::move(values)); @@ -789,12 +783,12 @@ class kv_cuco_store_t { return std::make_tuple(std::move(retrieved_keys), std::move(retrieved_values)); } - cuco_set_type const* cuco_store_ptr() const { return cuco_store_.get(); } + cuco_map_type const* cuco_store_ptr() const { return cuco_store_.get(); } template std::enable_if_t, const_value_iterator> store_value_first() const { - return get_dataframe_buffer_cbegin(store_values_); + return get_optional_dataframe_buffer_cbegin(store_values_); } key_t invalid_key() const { return cuco_store_->empty_key_sentinel(); } @@ -828,7 +822,7 @@ class kv_cuco_store_t { rmm::mr::polymorphic_allocator(rmm::mr::get_current_device_resource()), stream); if constexpr (std::is_arithmetic_v) { cuco_store_ = - std::make_unique(cuco_size, + std::make_unique(cuco_size, cuco::sentinel::empty_key{invalid_key}, cuco::sentinel::empty_value{invalid_value}, thrust::equal_to{}, @@ -839,25 +833,25 @@ class kv_cuco_store_t { stream_adapter, stream.value()); } else { - cuco_store_ = std::make_unique( + cuco_store_ = std::make_unique( cuco_size, cuco::sentinel::empty_key{invalid_key}, cuco::sentinel::empty_value{std::numeric_limits::max()}, thrust::equal_to{}, cuco::linear_probing<1, // CG size cuco::murmurhash3_32>{}, + cuco::thread_scope_device, + cuco_storage_type{}, stream_adapter, - stream); - store_values_ = allocate_dataframe_buffer(0, stream); - reserve_dataframe_buffer(store_values_, num_keys, stream); + stream.value()); + reserve_optional_dataframe_buffer(store_values_, num_keys, stream); } } - std::unique_ptr cuco_store_{nullptr}; - std::conditional_t, - decltype(allocate_dataframe_buffer(0, rmm::cuda_stream_view{})), - std::byte /* dummy */> - store_values_{}; + std::unique_ptr cuco_store_{nullptr}; + decltype(allocate_optional_dataframe_buffer< + std::conditional_t, value_t, void>>( + 0, rmm::cuda_stream_view{})) store_values_; std::conditional_t, value_t, std::byte /* dummy */> invalid_value_{}; diff --git a/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu b/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu index af56807746a..89d3205d051 100644 --- a/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu +++ b/cpp/tests/prims/mg_per_v_transform_reduce_dst_key_aggregated_outgoing_e.cu @@ -487,15 +487,12 @@ using Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_File = using Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_Rmat = Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE; -// FIXME: this tests do not build as cugrpah::kv_store_t has a build error when use_binary_search = -// false and value_t is thrust::tuple, this will be fixed in a separate PR -#if 0 TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_File, CheckInt32Int32FloatTupleIntFloatTransposeFalse) { auto param = GetParam(); run_current_test>(std::get<0>(param), - std::get<1>(param)); + std::get<1>(param)); } TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_Rmat, @@ -524,7 +521,6 @@ TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_Rmat, std::get<0>(param), cugraph::test::override_Rmat_Usecase_with_cmd_line_arguments(std::get<1>(param))); } -#endif TEST_P(Tests_MGPerVTransformReduceDstKeyAggregatedOutgoingE_File, CheckInt32Int32FloatTransposeFalse)