Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Persistent CAGRA kernel #2316

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
f8d25c1
Sync via barriers
achirkin Apr 11, 2024
cd012c4
Waiting on CPU side.
achirkin Apr 16, 2024
a1a091c
Use simple atomics for synchronization.
achirkin Apr 16, 2024
e6ad7b6
Added launcher_t - a helper state machine struct to interleave work q…
achirkin Apr 17, 2024
0137dd4
Initialize the kernel runner in a separate thread and improve thread …
achirkin Apr 17, 2024
25dda44
Added small memory sync optimizations
achirkin Apr 18, 2024
1d60ab5
Change the benchmark neighbors time size_t -> uint32_t and make bench…
achirkin Apr 25, 2024
efd7966
Slightly increase occupancy to improve QPS
achirkin Apr 19, 2024
e7c35df
Align input and sync variables with cache lines to avoid cache conflicts
achirkin Apr 19, 2024
7089ed8
Optimize the waiting for the input inside the kernel.
achirkin Apr 23, 2024
259e5ec
cagra wrapper: avoid constructing rmm uvectors when not needed
achirkin Apr 24, 2024
63f996a
Avoid any calls to RMM in IO threads.
achirkin Apr 24, 2024
3d1011d
Use atomics on the persistent runner (shared_ptr) to reduce the numbe…
achirkin Apr 24, 2024
8b19907
Remove the shared state and the mutex lock from NVTX helpers
achirkin Apr 24, 2024
4d2b8d5
Split the sync queue in two: job descriptors and idle worker handles
achirkin Apr 24, 2024
83355ab
Add the third-party atomic_queue headers for easier testing
achirkin Apr 24, 2024
f177a81
Tweak the CPU waiting behavior to avoid busy-spinning
achirkin Apr 25, 2024
db5b002
Add a single-threaded deque for pending_reads to reduce the cpu/cache…
achirkin Apr 25, 2024
c748160
ann_bench: minimize chances for GPU sync between benchmark cases
achirkin Apr 26, 2024
d51729c
Fix OOB bugs revealed on GH
achirkin Apr 26, 2024
9dd3d32
Add a thread-local weak_ptr for the runner to further reduce possible…
achirkin Apr 26, 2024
4361a5e
Keep result buffers between runs to avoid blocking the persistent ker…
achirkin Apr 29, 2024
e96cc0f
Avoid an extra layer of atomics on the persistent runner (shared_ptr)
achirkin Apr 29, 2024
c86dfcf
Reducing congestions: avoid too many writes to the last_touch/heartbe…
achirkin Apr 29, 2024
aaba912
Make a custom implementation of the shared resource queue to optimize…
achirkin May 2, 2024
8a4ff2e
Add expectation-based sleep to the waiting loop
achirkin May 3, 2024
732072d
Make the gpu worker report reading the handle is done earlier.
achirkin May 7, 2024
7450f6f
Move the last_touch initialization into the constructor of the contai…
achirkin May 7, 2024
8920dfc
Modify the resource queue to never loop on head/tail counters
achirkin May 7, 2024
ba78957
Replace yield() with a smarter, work-aware pause() to ease the CPU us…
achirkin May 7, 2024
304a864
Expose thread_block_size parameter
achirkin May 13, 2024
0879955
Make the 'persistent' parameter in the search_params
achirkin May 14, 2024
affdcb2
Update the parameter parser to use the 'persistent' flag in the searc…
achirkin May 14, 2024
56195f5
Merge remote-tracking branch 'rapidsai/branch-24.06' into fea-persist…
achirkin May 15, 2024
a48d8f8
Fix the uses_stream() not adapted to the previous change introducing …
achirkin May 15, 2024
6a1e5f1
Merge branch 'branch-24.06' into fea-persistent-cagra
achirkin May 15, 2024
c408dae
Merge remote-tracking branch 'rapidsai/branch-24.06' into fea-persist…
achirkin May 15, 2024
cf26a2b
Merge branch 'branch-24.06' into fea-persistent-cagra
achirkin May 16, 2024
6079cc9
Recover the uses_stream() function in the cagra_wrapper after the cod…
achirkin May 16, 2024
8dd4714
Merge branch 'branch-24.06' into fea-persistent-cagra
achirkin May 17, 2024
fc2ac99
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jun 13, 2024
9580ac6
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jun 13, 2024
74d47c2
Restart the persistent kernel if launch parameters changes
achirkin Jun 24, 2024
608e61d
Merge remote-tracking branch 'rapidsai/branch-24.08' into fea-persist…
achirkin Jun 24, 2024
22a77c2
Allow 'pinned' value for the benchmark queries and adjust host refine…
achirkin Jun 24, 2024
c62644d
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum Objective {
enum class MemoryType {
Host,
HostMmap,
HostPinned,
Device,
};

Expand All @@ -58,6 +59,8 @@ inline auto parse_memory_type(const std::string& memory_type) -> MemoryType
return MemoryType::Host;
} else if (memory_type == "mmap") {
return MemoryType::HostMmap;
} else if (memory_type == "pinned") {
return MemoryType::HostPinned;
} else if (memory_type == "device") {
return MemoryType::Device;
} else {
Expand All @@ -73,7 +76,7 @@ struct AlgoProperty {

class AnnBase {
public:
using index_type = size_t;
using index_type = uint32_t;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to change this for all benchmarks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a temporary change for testing, I've removed that in cuVS PR (cuVS doesn't support uint32_t indexes in the refinement step)


inline AnnBase(Metric metric, int dim) : metric_(metric), dim_(dim) {}
virtual ~AnnBase() noexcept = default;
Expand Down
56 changes: 51 additions & 5 deletions cpp/bench/ann/src/common/dataset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,56 @@ class Dataset {
{
switch (memory_type) {
case MemoryType::Device: return query_set_on_gpu();
default: return query_set();
case MemoryType::Host: {
auto r = query_set();
#ifndef BUILD_CPU_ONLY
if (query_set_pinned_) {
cudaHostUnregister(const_cast<T*>(r));
query_set_pinned_ = false;
}
#endif
return r;
}
case MemoryType::HostPinned: {
auto r = query_set();
#ifndef BUILD_CPU_ONLY
if (!query_set_pinned_) {
cudaHostRegister(
const_cast<T*>(r), query_set_size() * dim() * sizeof(T), cudaHostRegisterDefault);
query_set_pinned_ = true;
}
#endif
return r;
}
default: return nullptr;
}
}

auto base_set(MemoryType memory_type) const -> const T*
{
switch (memory_type) {
case MemoryType::Device: return base_set_on_gpu();
case MemoryType::Host: return base_set();
case MemoryType::Host: {
auto r = base_set();
#ifndef BUILD_CPU_ONLY
if (base_set_pinned_) {
cudaHostUnregister(const_cast<T*>(r));
base_set_pinned_ = false;
}
#endif
return r;
}
case MemoryType::HostPinned: {
auto r = base_set();
#ifndef BUILD_CPU_ONLY
if (!base_set_pinned_) {
cudaHostRegister(
const_cast<T*>(r), base_set_size() * dim() * sizeof(T), cudaHostRegisterDefault);
base_set_pinned_ = true;
}
#endif
return r;
}
case MemoryType::HostMmap: return mapped_base_set();
default: return nullptr;
}
Expand All @@ -312,18 +353,23 @@ class Dataset {
mutable T* d_query_set_ = nullptr;
mutable T* mapped_base_set_ = nullptr;
mutable int32_t* gt_set_ = nullptr;

mutable bool base_set_pinned_ = false;
mutable bool query_set_pinned_ = false;
};

template <typename T>
Dataset<T>::~Dataset()
{
delete[] base_set_;
delete[] query_set_;
delete[] gt_set_;
#ifndef BUILD_CPU_ONLY
if (d_base_set_) { cudaFree(d_base_set_); }
if (d_query_set_) { cudaFree(d_query_set_); }
if (base_set_pinned_) { cudaHostUnregister(base_set_); }
if (query_set_pinned_) { cudaHostUnregister(query_set_); }
#endif
delete[] base_set_;
delete[] query_set_;
delete[] gt_set_;
}

template <typename T>
Expand Down
33 changes: 31 additions & 2 deletions cpp/bench/ann/src/common/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,12 @@ struct result_buffer {
explicit result_buffer(size_t size, cudaStream_t stream) : size_{size}, stream_{stream}
{
if (size_ == 0) { return; }
data_host_ = malloc(size_);
#ifndef BUILD_CPU_ONLY
cudaMallocAsync(&data_device_, size_, stream_);
cudaMallocHost(&data_host_, size_);
cudaStreamSynchronize(stream_);
#else
data_host_ = malloc(size_);
#endif
}
result_buffer() = delete;
Expand All @@ -213,9 +215,11 @@ struct result_buffer {
if (size_ == 0) { return; }
#ifndef BUILD_CPU_ONLY
cudaFreeAsync(data_device_, stream_);
cudaFreeHost(data_host_);
cudaStreamSynchronize(stream_);
#endif
#else
free(data_host_);
#endif
}

[[nodiscard]] auto size() const noexcept { return size_; }
Expand Down Expand Up @@ -278,6 +282,31 @@ inline auto get_result_buffer_from_global_pool(size_t size) -> result_buffer&
return rb;
}

namespace detail {
inline std::vector<std::unique_ptr<result_buffer>> global_tmp_buffer_pool(0);
inline std::mutex gtp_mutex;
} // namespace detail

/**
* Global temporary buffer pool for use by algorithms.
* In contrast to `get_result_buffer_from_global_pool`, the content of these buffers is never
* initialized.
*/
inline auto get_tmp_buffer_from_global_pool(size_t size) -> result_buffer&
{
auto stream = get_stream_from_global_pool();
auto& rb = [stream, size]() -> result_buffer& {
std::lock_guard guard(detail::gtp_mutex);
if (static_cast<int>(detail::global_tmp_buffer_pool.size()) < benchmark_n_threads) {
detail::global_tmp_buffer_pool.resize(benchmark_n_threads);
}
auto& rb = detail::global_tmp_buffer_pool[benchmark_thread_id];
if (!rb || rb->size() < size) { rb = std::make_unique<result_buffer>(size, stream); }
return *rb;
}();
return rb;
}

/**
* Delete all streams and memory allocations in the global pool.
* It's called at the end of the `main` function - before global/static variables and cuda context
Expand Down
4 changes: 4 additions & 0 deletions cpp/bench/ann/src/raft/raft_ann_bench_param_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ void parse_search_param(const nlohmann::json& conf,
if (conf.contains("itopk")) { param.p.itopk_size = conf.at("itopk"); }
if (conf.contains("search_width")) { param.p.search_width = conf.at("search_width"); }
if (conf.contains("max_iterations")) { param.p.max_iterations = conf.at("max_iterations"); }
if (conf.contains("persistent")) { param.p.persistent = conf.at("persistent"); }
if (conf.contains("thread_block_size")) {
param.p.thread_block_size = conf.at("thread_block_size");
}
if (conf.contains("algo")) {
if (conf.at("algo") == "single_cta") {
param.p.algo = raft::neighbors::experimental::cagra::search_algo::SINGLE_CTA;
Expand Down
56 changes: 38 additions & 18 deletions cpp/bench/ann/src/raft/raft_ann_bench_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,27 +228,47 @@ void refine_helper(const raft::resources& res,
} else {
auto dataset_host = raft::make_host_matrix_view<const data_type, extents_type>(
dataset.data_handle(), dataset.extent(0), dataset.extent(1));
auto queries_host = raft::make_host_matrix<data_type, extents_type>(batch_size, dim);
auto candidates_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k0);
auto neighbors_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k);
auto distances_host = raft::make_host_matrix<float, extents_type>(batch_size, k);

auto stream = resource::get_cuda_stream(res);
raft::copy(queries_host.data_handle(), queries.data_handle(), queries_host.size(), stream);
raft::copy(
candidates_host.data_handle(), candidates.data_handle(), candidates_host.size(), stream);
if (raft::get_device_for_address(queries.data_handle()) >= 0) {
// Queries & results are on the device

raft::resource::sync_stream(res); // wait for the queries and candidates
raft::neighbors::refine<index_type, data_type, float, extents_type>(res,
dataset_host,
queries_host.view(),
candidates_host.view(),
neighbors_host.view(),
distances_host.view(),
metric);
auto queries_host = raft::make_host_matrix<data_type, extents_type>(batch_size, dim);
auto candidates_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k0);
auto neighbors_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k);
auto distances_host = raft::make_host_matrix<float, extents_type>(batch_size, k);

auto stream = resource::get_cuda_stream(res);
raft::copy(queries_host.data_handle(), queries.data_handle(), queries_host.size(), stream);
raft::copy(
candidates_host.data_handle(), candidates.data_handle(), candidates_host.size(), stream);

raft::resource::sync_stream(res); // wait for the queries and candidates
raft::neighbors::refine<index_type, data_type, float, extents_type>(res,
dataset_host,
queries_host.view(),
candidates_host.view(),
neighbors_host.view(),
distances_host.view(),
metric);

raft::copy(neighbors, neighbors_host.data_handle(), neighbors_host.size(), stream);
raft::copy(distances, distances_host.data_handle(), distances_host.size(), stream);

} else {
// Queries & results are on the host - no device sync / copy needed

auto queries_host = raft::make_host_matrix_view<const data_type, extents_type>(
queries.data_handle(), batch_size, dim);
auto candidates_host = raft::make_host_matrix_view<const index_type, extents_type>(
candidates.data_handle(), batch_size, k0);
auto neighbors_host =
raft::make_host_matrix_view<index_type, extents_type>(neighbors, batch_size, k);
auto distances_host =
raft::make_host_matrix_view<float, extents_type>(distances, batch_size, k);

raft::copy(neighbors, neighbors_host.data_handle(), neighbors_host.size(), stream);
raft::copy(distances, distances_host.data_handle(), distances_host.size(), stream);
raft::neighbors::refine<index_type, data_type, float, extents_type>(
res, dataset_host, queries_host, candidates_host, neighbors_host, distances_host, metric);
}
}
}

Expand Down
44 changes: 36 additions & 8 deletions cpp/bench/ann/src/raft/raft_cagra_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ class RaftCagra : public ANN<T>, public AnnGPU {
return handle_.get_sync_stream();
}

[[nodiscard]] auto uses_stream() const noexcept -> bool override
{
// If the algorithm uses persistent kernel, the CPU has to synchronize by the end of computing
// the result. Hence it guarantees the benchmark CUDA stream is empty by the end of the
// execution. Hence we inform the benchmark to not waste the time on recording & synchronizing
// the event.
return !search_params_.persistent;
}

// to enable dataset access from GPU memory
AlgoProperty get_preference() const override
{
Expand Down Expand Up @@ -326,14 +335,33 @@ void RaftCagra<T, IdxT>::search(
} else {
auto queries_v =
raft::make_device_matrix_view<const T, AnnBase::index_type>(queries, batch_size, dimension_);
auto candidate_ixs =
raft::make_device_matrix<AnnBase::index_type, AnnBase::index_type>(res, batch_size, k0);
auto candidate_dists =
raft::make_device_matrix<float, AnnBase::index_type>(res, batch_size, k0);
search_base(
queries, batch_size, k0, candidate_ixs.data_handle(), candidate_dists.data_handle());
refine_helper(
res, *input_dataset_v_, queries_v, candidate_ixs, k, neighbors, distances, index_->metric());

auto& tmp_buf = get_tmp_buffer_from_global_pool((sizeof(float) + sizeof(AnnBase::index_type)) *
batch_size * k0);
auto mem_type =
raft::get_device_for_address(neighbors) >= 0 ? MemoryType::Device : MemoryType::HostPinned;

auto candidate_ixs = raft::make_device_matrix_view<AnnBase::index_type, AnnBase::index_type>(
reinterpret_cast<AnnBase::index_type*>(tmp_buf.data(mem_type)), batch_size, k0);
auto candidate_dists = reinterpret_cast<float*>(candidate_ixs.data_handle() + batch_size * k0);

search_base(queries, batch_size, k0, candidate_ixs.data_handle(), candidate_dists);

if (mem_type == MemoryType::HostPinned && uses_stream()) {
// If the algorithm uses a stream to synchronize (non-persistent kernel), but the data is in
// the pinned host memory, we need top synchronize before the refinement operation to wait for
// the data being available for the host.
raft::resource::sync_stream(res);
}

refine_helper(res,
*input_dataset_v_,
queries_v,
raft::make_const_mdspan(candidate_ixs),
k,
neighbors,
distances,
index_->metric());
}
}
} // namespace raft::bench::ann
2 changes: 2 additions & 0 deletions cpp/include/raft/neighbors/cagra_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ struct search_params : ann::search_params {
uint32_t num_random_samplings = 1;
/** Bit mask used for initial random seed node selection. */
uint64_t rand_xor_mask = 0x128394;
/** Whether to use the persistent version of the kernel (only SINGLE_CTA is supported a.t.m.) */
bool persistent = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only parameter that needs to be controlled? What about the temporary buffer size (queue for queries, or max number of threads)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer sizes are compile-time constants in the current design. I'd like to move some of the new constants to the parameters, but that conflicts with the implicit mechanics of running the kernel. They are not search parameters, but the "runner" parameters and should not be changed across calls to the kernel.
So this brings up again the question whether we want to have implicit vs explicit kernel runner.

};

static_assert(std::is_aggregate_v<index_params>);
Expand Down
Loading
Loading