Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent CAGRA kernel #215

Merged
merged 38 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8b76ff7
Persistent kernel for CAGRA, with implicit kernel runner and benchmar…
achirkin Jul 8, 2024
80ba46e
Fix style
achirkin Jul 8, 2024
73ca412
Don't store the source data in the CAGRA index file
achirkin Jul 12, 2024
12e228a
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 12, 2024
14fc20c
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 16, 2024
554fec7
Make the benchmark workspace resources allocate/initialize memory laz…
achirkin Jul 16, 2024
eeb50fc
Improve the scheduling logic to make the QPS more consistent
achirkin Jul 17, 2024
0408707
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 17, 2024
4c8d432
Update cpp/src/neighbors/detail/cagra/search_plan.cuh
achirkin Jul 18, 2024
691a9b3
Update cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h
achirkin Jul 18, 2024
6883517
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 18, 2024
8007f5a
Fix style
achirkin Jul 18, 2024
628cb62
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 19, 2024
a5ee7bc
Add more docstrings addressing review questions
achirkin Jul 22, 2024
4e4317d
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 22, 2024
acd9f5e
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 23, 2024
4d6f2d4
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 24, 2024
8c31553
Expose persistent parameters at runtime
achirkin Jul 24, 2024
8426ce9
Resilience: make the calling thread throw an exception if it doesn't …
achirkin Jul 25, 2024
d94ad77
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 26, 2024
c63da90
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 29, 2024
d2ca9ad
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 31, 2024
103cb24
Update cpp/include/cuvs/neighbors/cagra.hpp
achirkin Jul 31, 2024
1611912
Update cpp/include/cuvs/neighbors/cagra.hpp
achirkin Jul 31, 2024
8b65f95
Add the persistent CAGRA program example
achirkin Jul 31, 2024
04c2745
Fix a typo
achirkin Jul 31, 2024
b757aa9
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Jul 31, 2024
53a9ba5
Improve comments in the example code
achirkin Jul 31, 2024
69a8740
Merge branch 'branch-24.08' into fea-persistent-cagra
achirkin Aug 12, 2024
b7f5106
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Aug 14, 2024
05100ce
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Aug 19, 2024
1c23549
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Aug 23, 2024
f9ee7c7
Allow setting team_size explicitly
achirkin Aug 28, 2024
5bc2982
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Sep 10, 2024
ee35f74
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Sep 10, 2024
7195a2b
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Sep 23, 2024
b138a07
Merge branch 'branch-24.10' into fea-persistent-cagra
achirkin Sep 26, 2024
0bfb6be
Lazy-initialize the dataset_descriptor to avoid its overheads in the …
achirkin Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ enum class Mode {
enum class MemoryType {
kHost,
kHostMmap,
kHostPinned,
kDevice,
};

Expand All @@ -60,6 +61,8 @@ inline auto parse_memory_type(const std::string& memory_type) -> MemoryType
return MemoryType::kHost;
} else if (memory_type == "mmap") {
return MemoryType::kHostMmap;
} else if (memory_type == "pinned") {
return MemoryType::kHostPinned;
} else if (memory_type == "device") {
return MemoryType::kDevice;
} else {
Expand Down
56 changes: 51 additions & 5 deletions cpp/bench/ann/src/common/dataset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,56 @@ class dataset {
{
switch (memory_type) {
case MemoryType::kDevice: return query_set_on_gpu();
default: return query_set();
case MemoryType::kHost: {
auto r = query_set();
#ifndef BUILD_CPU_ONLY
if (query_set_pinned_) {
cudaHostUnregister(const_cast<T*>(r));
query_set_pinned_ = false;
}
#endif
return r;
}
case MemoryType::kHostPinned: {
auto r = query_set();
#ifndef BUILD_CPU_ONLY
if (!query_set_pinned_) {
cudaHostRegister(
const_cast<T*>(r), query_set_size() * dim() * sizeof(T), cudaHostRegisterDefault);
query_set_pinned_ = true;
}
#endif
return r;
}
default: return nullptr;
}
}

auto base_set(MemoryType memory_type) const -> const T*
{
switch (memory_type) {
case MemoryType::kDevice: return base_set_on_gpu();
case MemoryType::kHost: return base_set();
case MemoryType::kHost: {
auto r = base_set();
#ifndef BUILD_CPU_ONLY
if (base_set_pinned_) {
cudaHostUnregister(const_cast<T*>(r));
base_set_pinned_ = false;
}
#endif
return r;
}
case MemoryType::kHostPinned: {
auto r = base_set();
#ifndef BUILD_CPU_ONLY
if (!base_set_pinned_) {
cudaHostRegister(
const_cast<T*>(r), base_set_size() * dim() * sizeof(T), cudaHostRegisterDefault);
base_set_pinned_ = true;
}
#endif
return r;
}
case MemoryType::kHostMmap: return mapped_base_set();
default: return nullptr;
}
Expand All @@ -315,18 +356,23 @@ class dataset {
mutable T* d_query_set_ = nullptr;
mutable T* mapped_base_set_ = nullptr;
mutable int32_t* gt_set_ = nullptr;

mutable bool base_set_pinned_ = false;
mutable bool query_set_pinned_ = false;
};

template <typename T>
dataset<T>::~dataset()
{
delete[] base_set_;
delete[] query_set_;
delete[] gt_set_;
#ifndef BUILD_CPU_ONLY
if (d_base_set_) { cudaFree(d_base_set_); }
if (d_query_set_) { cudaFree(d_query_set_); }
if (base_set_pinned_) { cudaHostUnregister(base_set_); }
if (query_set_pinned_) { cudaHostUnregister(query_set_); }
#endif
delete[] base_set_;
delete[] query_set_;
delete[] gt_set_;
}

template <typename T>
Expand Down
133 changes: 98 additions & 35 deletions cpp/bench/ann/src/common/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,42 +193,71 @@ inline auto get_stream_from_global_pool() -> cudaStream_t
#endif
}

struct result_buffer {
explicit result_buffer(size_t size, cudaStream_t stream) : size_{size}, stream_{stream}
/** The workspace buffer for use thread-locally. */
struct ws_buffer {
explicit ws_buffer(size_t size, cudaStream_t stream) : size_{size}, stream_{stream} {}
ws_buffer() = delete;
ws_buffer(ws_buffer&&) = delete;
auto operator=(ws_buffer&&) -> ws_buffer& = delete;
ws_buffer(const ws_buffer&) = delete;
auto operator=(const ws_buffer&) -> ws_buffer& = delete;
~ws_buffer() noexcept
{
if (size_ == 0) { return; }
data_host_ = malloc(size_);
#ifndef BUILD_CPU_ONLY
cudaMallocAsync(&data_device_, size_, stream_);
cudaStreamSynchronize(stream_);
#endif
}
result_buffer() = delete;
result_buffer(result_buffer&&) = delete;
auto operator=(result_buffer&&) -> result_buffer& = delete;
result_buffer(const result_buffer&) = delete;
auto operator=(const result_buffer&) -> result_buffer& = delete;
~result_buffer() noexcept
{
if (size_ == 0) { return; }
#ifndef BUILD_CPU_ONLY
cudaFreeAsync(data_device_, stream_);
cudaStreamSynchronize(stream_);
if (data_device_ != nullptr) {
cudaFreeAsync(data_device_, stream_);
cudaStreamSynchronize(stream_);
}
if (data_host_ != nullptr) { cudaFreeHost(data_host_); }
#else
if (data_host_ != nullptr) { free(data_host_); }
#endif
free(data_host_);
}

[[nodiscard]] auto size() const noexcept { return size_; }
[[nodiscard]] auto data(MemoryType loc) const noexcept
[[nodiscard]] auto data(MemoryType loc) const noexcept -> void*
{
if (size_ == 0) { return nullptr; }
switch (loc) {
case MemoryType::kDevice: return data_device_;
default: return data_host_;
#ifndef BUILD_CPU_ONLY
case MemoryType::kDevice: {
if (data_device_ == nullptr) {
cudaMallocAsync(&data_device_, size_, stream_);
cudaStreamSynchronize(stream_);
needs_cleanup_device_ = false;
} else if (needs_cleanup_device_) {
cudaMemsetAsync(data_device_, 0, size_, stream_);
cudaStreamSynchronize(stream_);
needs_cleanup_device_ = false;
}
return data_device_;
}
#endif
default: {
if (data_host_ == nullptr) {
#ifndef BUILD_CPU_ONLY
cudaMallocHost(&data_host_, size_);
#else
data_host_ = malloc(size_);
#endif
needs_cleanup_host_ = false;
} else if (needs_cleanup_host_) {
memset(data_host_, 0, size_);
needs_cleanup_host_ = false;
}
return data_host_;
}
}
}

void transfer_data(MemoryType dst, MemoryType src)
{
// The destination is overwritten and thus does not need cleanup
if (dst == MemoryType::kDevice) {
needs_cleanup_device_ = false;
} else {
needs_cleanup_host_ = false;
}
auto dst_ptr = data(dst);
auto src_ptr = data(src);
if (dst_ptr == src_ptr) { return; }
Expand All @@ -238,15 +267,25 @@ struct result_buffer {
#endif
}

/** Mark the buffer for reuse - it needs to be cleared to make sure the previous results are not
* leaked to the new iteration. */
void reuse()
{
needs_cleanup_host_ = true;
needs_cleanup_device_ = true;
}

private:
size_t size_{0};
cudaStream_t stream_ = nullptr;
void* data_host_ = nullptr;
void* data_device_ = nullptr;
cudaStream_t stream_ = nullptr;
mutable void* data_host_ = nullptr;
mutable void* data_device_ = nullptr;
mutable bool needs_cleanup_host_ = false;
mutable bool needs_cleanup_device_ = false;
};

namespace detail {
inline std::vector<std::unique_ptr<result_buffer>> global_result_buffer_pool(0);
inline std::vector<std::unique_ptr<ws_buffer>> global_result_buffer_pool(0);
inline std::mutex grp_mutex;
} // namespace detail

Expand All @@ -257,24 +296,47 @@ inline std::mutex grp_mutex;
* This reduces the setup overhead and number of times the context is being blocked
* (this is relevant if there is a persistent kernel running across multiples benchmark cases).
*/
inline auto get_result_buffer_from_global_pool(size_t size) -> result_buffer&
inline auto get_result_buffer_from_global_pool(size_t size) -> ws_buffer&
{
auto stream = get_stream_from_global_pool();
auto& rb = [stream, size]() -> result_buffer& {
auto& rb = [stream, size]() -> ws_buffer& {
std::lock_guard guard(detail::grp_mutex);
if (static_cast<int>(detail::global_result_buffer_pool.size()) < benchmark_n_threads) {
detail::global_result_buffer_pool.resize(benchmark_n_threads);
}
auto& rb = detail::global_result_buffer_pool[benchmark_thread_id];
if (!rb || rb->size() < size) { rb = std::make_unique<result_buffer>(size, stream); }
if (!rb || rb->size() < size) {
rb = std::make_unique<ws_buffer>(size, stream);
} else {
rb->reuse();
}
return *rb;
}();
return rb;
}

memset(rb.data(MemoryType::kHost), 0, size);
#ifndef BUILD_CPU_ONLY
cudaMemsetAsync(rb.data(MemoryType::kDevice), 0, size, stream);
cudaStreamSynchronize(stream);
#endif
namespace detail {
inline std::vector<std::unique_ptr<ws_buffer>> global_tmp_buffer_pool(0);
inline std::mutex gtp_mutex;
} // namespace detail

/**
* Global temporary buffer pool for use by algorithms.
* In contrast to `get_result_buffer_from_global_pool`, the content of these buffers is never
* initialized.
*/
inline auto get_tmp_buffer_from_global_pool(size_t size) -> ws_buffer&
{
auto stream = get_stream_from_global_pool();
auto& rb = [stream, size]() -> ws_buffer& {
std::lock_guard guard(detail::gtp_mutex);
if (static_cast<int>(detail::global_tmp_buffer_pool.size()) < benchmark_n_threads) {
detail::global_tmp_buffer_pool.resize(benchmark_n_threads);
}
auto& rb = detail::global_tmp_buffer_pool[benchmark_thread_id];
if (!rb || rb->size() < size) { rb = std::make_unique<ws_buffer>(size, stream); }
return *rb;
}();
return rb;
}

Expand All @@ -288,6 +350,7 @@ inline void reset_global_device_resources()
{
#ifndef BUILD_CPU_ONLY
std::lock_guard guard(detail::gsp_mutex);
detail::global_tmp_buffer_pool.resize(0);
detail::global_result_buffer_pool.resize(0);
detail::global_stream_pool.resize(0);
#endif
Expand Down
4 changes: 4 additions & 0 deletions cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ void parse_search_param(const nlohmann::json& conf,
if (conf.contains("itopk")) { param.p.itopk_size = conf.at("itopk"); }
if (conf.contains("search_width")) { param.p.search_width = conf.at("search_width"); }
if (conf.contains("max_iterations")) { param.p.max_iterations = conf.at("max_iterations"); }
if (conf.contains("persistent")) { param.p.persistent = conf.at("persistent"); }
if (conf.contains("thread_block_size")) {
param.p.thread_block_size = conf.at("thread_block_size");
}
if (conf.contains("algo")) {
if (conf.at("algo") == "single_cta") {
param.p.algo = cuvs::neighbors::cagra::search_algo::SINGLE_CTA;
Expand Down
55 changes: 37 additions & 18 deletions cpp/bench/ann/src/cuvs/cuvs_ann_bench_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,27 +218,46 @@ void refine_helper(const raft::resources& res,
} else {
auto dataset_host = raft::make_host_matrix_view<const data_type, extents_type>(
dataset.data_handle(), dataset.extent(0), dataset.extent(1));
auto queries_host = raft::make_host_matrix<data_type, extents_type>(batch_size, dim);
auto candidates_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k0);
auto neighbors_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k);
auto distances_host = raft::make_host_matrix<float, extents_type>(batch_size, k);
if (raft::get_device_for_address(queries.data_handle()) >= 0) {
// Queries & results are on the device

auto stream = raft::resource::get_cuda_stream(res);
raft::copy(queries_host.data_handle(), queries.data_handle(), queries_host.size(), stream);
raft::copy(
candidates_host.data_handle(), candidates.data_handle(), candidates_host.size(), stream);
auto queries_host = raft::make_host_matrix<data_type, extents_type>(batch_size, dim);
auto candidates_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k0);
auto neighbors_host = raft::make_host_matrix<index_type, extents_type>(batch_size, k);
auto distances_host = raft::make_host_matrix<float, extents_type>(batch_size, k);

raft::resource::sync_stream(res); // wait for the queries and candidates
cuvs::neighbors::refine(res,
dataset_host,
queries_host.view(),
candidates_host.view(),
neighbors_host.view(),
distances_host.view(),
metric);
auto stream = raft::resource::get_cuda_stream(res);
raft::copy(queries_host.data_handle(), queries.data_handle(), queries_host.size(), stream);
raft::copy(
candidates_host.data_handle(), candidates.data_handle(), candidates_host.size(), stream);

raft::resource::sync_stream(res); // wait for the queries and candidates
cuvs::neighbors::refine(res,
dataset_host,
queries_host.view(),
candidates_host.view(),
neighbors_host.view(),
distances_host.view(),
metric);

raft::copy(neighbors, neighbors_host.data_handle(), neighbors_host.size(), stream);
raft::copy(distances, distances_host.data_handle(), distances_host.size(), stream);

} else {
// Queries & results are on the host - no device sync / copy needed

auto queries_host = raft::make_host_matrix_view<const data_type, extents_type>(
queries.data_handle(), batch_size, dim);
auto candidates_host = raft::make_host_matrix_view<const index_type, extents_type>(
candidates.data_handle(), batch_size, k0);
auto neighbors_host =
raft::make_host_matrix_view<index_type, extents_type>(neighbors, batch_size, k);
auto distances_host =
raft::make_host_matrix_view<float, extents_type>(distances, batch_size, k);

raft::copy(neighbors, neighbors_host.data_handle(), neighbors_host.size(), stream);
raft::copy(distances, distances_host.data_handle(), distances_host.size(), stream);
cuvs::neighbors::refine(
res, dataset_host, queries_host, candidates_host, neighbors_host, distances_host, metric);
}
}
}

Expand Down
Loading
Loading