Skip to content

Commit

Permalink
Copy benchmark wrappers to avoid concurrently accessing not thread-sa…
Browse files Browse the repository at this point in the history
…fe resources
  • Loading branch information
achirkin committed Nov 23, 2023
1 parent 19cb314 commit 9359e89
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 72 deletions.
6 changes: 6 additions & 0 deletions cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "cuda_stub.hpp" // cudaStream_t

#include <memory>
#include <stdexcept>
#include <string>
#include <vector>
Expand Down Expand Up @@ -118,6 +119,11 @@ class ANN : public AnnBase {
// The client code should call set_search_dataset() before searching,
// and should not release dataset before searching is finished.
virtual void set_search_dataset(const T* /*dataset*/, size_t /*nrow*/){};

/**
* Make a shallow copy of the ANN wrapper that shares the resources and ensures thread-safe access
* to them. */
virtual auto copy() -> std::unique_ptr<ANN<T>> = 0;
};

} // namespace raft::bench::ann
Expand Down
2 changes: 1 addition & 1 deletion cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void bench_search(::benchmark::State& state,
{
nvtx_case nvtx{state.name()};

ANN<T>* algo = dynamic_cast<ANN<T>*>(current_algo.get());
auto algo = dynamic_cast<ANN<T>*>(current_algo.get())->copy();
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();
Expand Down
1 change: 1 addition & 0 deletions cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class FaissCpu : public ANN<T> {
property.query_memory_type = MemoryType::Host;
return property;
}
std::unique_ptr<ANN<T>> copy() override;

protected:
template <typename Index>
Expand Down
3 changes: 2 additions & 1 deletion cpp/bench/ann/src/faiss/faiss_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class FaissGpu : public ANN<T> {
property.query_memory_type = MemoryType::Host;
return property;
}
std::unique_ptr<ANN<T>> copy() override;

protected:
template <typename GpuIndex, typename CpuIndex>
Expand Down Expand Up @@ -432,4 +433,4 @@ class FaissGpuFlat : public FaissGpu<T> {

} // namespace raft::bench::ann

#endif
#endif
22 changes: 13 additions & 9 deletions cpp/bench/ann/src/ggnn/ggnn_wrapper.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class Ggnn : public ANN<T> {
};

Ggnn(Metric metric, int dim, const BuildParam& param);
~Ggnn() { delete impl_; }

void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) override
{
Expand All @@ -72,6 +71,7 @@ class Ggnn : public ANN<T> {

void save(const std::string& file) const override { impl_->save(file); }
void load(const std::string& file) override { impl_->load(file); }
std::unique_ptr<ANN<T>> copy() override { return std::make_unique<Ggnn<T>>(*this); };

AlgoProperty get_preference() const override { return impl_->get_preference(); }

Expand All @@ -81,7 +81,7 @@ class Ggnn : public ANN<T> {
};

private:
ANN<T>* impl_;
std::shared_ptr<ANN<T>> impl_;
};

template <typename T>
Expand All @@ -90,23 +90,23 @@ Ggnn<T>::Ggnn(Metric metric, int dim, const BuildParam& param) : ANN<T>(metric,
// ggnn/src/sift1m.cu
if (metric == Metric::kEuclidean && dim == 128 && param.k_build == 24 && param.k == 10 &&
param.segment_size == 32) {
impl_ = new GgnnImpl<T, Euclidean, 128, 24, 10, 32>(metric, dim, param);
impl_ = std::make_shared<GgnnImpl<T, Euclidean, 128, 24, 10, 32>>(metric, dim, param);
}
// ggnn/src/deep1b_multi_gpu.cu, and adapt it deep1B
else if (metric == Metric::kEuclidean && dim == 96 && param.k_build == 24 && param.k == 10 &&
param.segment_size == 32) {
impl_ = new GgnnImpl<T, Euclidean, 96, 24, 10, 32>(metric, dim, param);
impl_ = std::make_shared<GgnnImpl<T, Euclidean, 96, 24, 10, 32>>(metric, dim, param);
} else if (metric == Metric::kInnerProduct && dim == 96 && param.k_build == 24 && param.k == 10 &&
param.segment_size == 32) {
impl_ = new GgnnImpl<T, Cosine, 96, 24, 10, 32>(metric, dim, param);
impl_ = std::make_shared<GgnnImpl<T, Cosine, 96, 24, 10, 32>>(metric, dim, param);
} else if (metric == Metric::kInnerProduct && dim == 96 && param.k_build == 96 && param.k == 10 &&
param.segment_size == 64) {
impl_ = new GgnnImpl<T, Cosine, 96, 96, 10, 64>(metric, dim, param);
impl_ = std::make_shared<GgnnImpl<T, Cosine, 96, 96, 10, 64>>(metric, dim, param);
}
// ggnn/src/glove200.cu, adapt it to glove100
else if (metric == Metric::kInnerProduct && dim == 100 && param.k_build == 96 && param.k == 10 &&
param.segment_size == 64) {
impl_ = new GgnnImpl<T, Cosine, 100, 96, 10, 64>(metric, dim, param);
impl_ = std::make_shared<GgnnImpl<T, Cosine, 100, 96, 10, 64>>(metric, dim, param);
} else {
throw std::runtime_error(
"ggnn: not supported combination of metric, dim and build param; "
Expand All @@ -133,6 +133,10 @@ class GgnnImpl : public ANN<T> {

void save(const std::string& file) const override;
void load(const std::string& file) override;
std::unique_ptr<ANN<T>> copy() override
{
return std::make_unique<GgnnImpl<T, measure, D, KBuild, KQuery, S>>(*this);
};

AlgoProperty get_preference() const override
{
Expand All @@ -159,7 +163,7 @@ class GgnnImpl : public ANN<T> {
KBuild / 2 /* KF */,
KQuery,
S>;
std::unique_ptr<GGNNGPUInstance> ggnn_;
std::shared_ptr<GGNNGPUInstance> ggnn_;
typename Ggnn<T>::BuildParam build_param_;
typename Ggnn<T>::SearchParam search_param_;
};
Expand Down Expand Up @@ -189,7 +193,7 @@ void GgnnImpl<T, measure, D, KBuild, KQuery, S>::build(const T* dataset,
{
int device;
RAFT_CUDA_TRY(cudaGetDevice(&device));
ggnn_ = std::make_unique<GGNNGPUInstance>(
ggnn_ = std::make_shared<GGNNGPUInstance>(
device, nrow, build_param_.num_layers, true, build_param_.tau);

ggnn_->set_base_data(dataset);
Expand Down
27 changes: 14 additions & 13 deletions cpp/bench/ann/src/hnswlib/hnswlib_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class HnswLib : public ANN<T> {

void save(const std::string& path_to_index) const override;
void load(const std::string& path_to_index) override;
std::unique_ptr<ANN<T>> copy() override { return std::make_unique<HnswLib<T>>(*this); };

AlgoProperty get_preference() const override
{
Expand All @@ -96,15 +97,15 @@ class HnswLib : public ANN<T> {
private:
void get_search_knn_results_(const T* query, int k, size_t* indices, float* distances) const;

std::unique_ptr<hnswlib::HierarchicalNSW<typename hnsw_dist_t<T>::type>> appr_alg_;
std::unique_ptr<hnswlib::SpaceInterface<typename hnsw_dist_t<T>::type>> space_;
std::shared_ptr<hnswlib::HierarchicalNSW<typename hnsw_dist_t<T>::type>> appr_alg_;
std::shared_ptr<hnswlib::SpaceInterface<typename hnsw_dist_t<T>::type>> space_;

using ANN<T>::metric_;
using ANN<T>::dim_;
int ef_construction_;
int m_;
int num_threads_;
std::unique_ptr<FixedThreadPool> thread_pool_;
std::shared_ptr<FixedThreadPool> thread_pool_;
Objective metric_objective_;
};

Expand All @@ -129,18 +130,18 @@ void HnswLib<T>::build(const T* dataset, size_t nrow, cudaStream_t)
{
if constexpr (std::is_same_v<T, float>) {
if (metric_ == Metric::kInnerProduct) {
space_ = std::make_unique<hnswlib::InnerProductSpace>(dim_);
space_ = std::make_shared<hnswlib::InnerProductSpace>(dim_);
} else {
space_ = std::make_unique<hnswlib::L2Space>(dim_);
space_ = std::make_shared<hnswlib::L2Space>(dim_);
}
} else if constexpr (std::is_same_v<T, uint8_t>) {
space_ = std::make_unique<hnswlib::L2SpaceI>(dim_);
space_ = std::make_shared<hnswlib::L2SpaceI>(dim_);
}

appr_alg_ = std::make_unique<hnswlib::HierarchicalNSW<typename hnsw_dist_t<T>::type>>(
appr_alg_ = std::make_shared<hnswlib::HierarchicalNSW<typename hnsw_dist_t<T>::type>>(
space_.get(), nrow, m_, ef_construction_);

thread_pool_ = std::make_unique<FixedThreadPool>(num_threads_);
thread_pool_ = std::make_shared<FixedThreadPool>(num_threads_);
const size_t items_per_thread = nrow / (num_threads_ + 1);

thread_pool_->submit(
Expand Down Expand Up @@ -168,7 +169,7 @@ void HnswLib<T>::set_search_param(const AnnSearchParam& param_)

// Create a pool if multiple query threads have been set and the pool hasn't been created already
bool create_pool = (metric_objective_ == Objective::LATENCY && num_threads_ > 1 && !thread_pool_);
if (create_pool) { thread_pool_ = std::make_unique<FixedThreadPool>(num_threads_); }
if (create_pool) { thread_pool_ = std::make_shared<FixedThreadPool>(num_threads_); }
}

template <typename T>
Expand Down Expand Up @@ -199,15 +200,15 @@ void HnswLib<T>::load(const std::string& path_to_index)
{
if constexpr (std::is_same_v<T, float>) {
if (metric_ == Metric::kInnerProduct) {
space_ = std::make_unique<hnswlib::InnerProductSpace>(dim_);
space_ = std::make_shared<hnswlib::InnerProductSpace>(dim_);
} else {
space_ = std::make_unique<hnswlib::L2Space>(dim_);
space_ = std::make_shared<hnswlib::L2Space>(dim_);
}
} else if constexpr (std::is_same_v<T, uint8_t>) {
space_ = std::make_unique<hnswlib::L2SpaceI>(dim_);
space_ = std::make_shared<hnswlib::L2SpaceI>(dim_);
}

appr_alg_ = std::make_unique<hnswlib::HierarchicalNSW<typename hnsw_dist_t<T>::type>>(
appr_alg_ = std::make_shared<hnswlib::HierarchicalNSW<typename hnsw_dist_t<T>::type>>(
space_.get(), path_to_index);
}

Expand Down
59 changes: 44 additions & 15 deletions cpp/bench/ann/src/raft/raft_ann_bench_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,50 @@ inline raft::distance::DistanceType parse_metric_type(raft::bench::ann::Metric m

class configured_raft_resources {
public:
explicit configured_raft_resources(
const std::shared_ptr<rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource>> mr)
: mr_{mr},
sync_{[]() {
auto* ev = new cudaEvent_t;
RAFT_CUDA_TRY(cudaEventCreate(ev, cudaEventDisableTiming));
return ev;
}(),
[](cudaEvent_t* ev) {
RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(*ev));
delete ev;
}},
res_{cudaStreamPerThread}
{
}

configured_raft_resources()
: mr_{rmm::mr::get_current_device_resource(), 1024 * 1024 * 1024ull},
res_{cudaStreamPerThread},
sync_{nullptr}
: configured_raft_resources{
{[]() {
auto* mr = new rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource>{
rmm::mr::get_current_device_resource(), 1024 * 1024 * 1024ull};
rmm::mr::set_current_device_resource(mr);
return mr;
}(),
[](rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource>* mr) {
if (rmm::mr::get_current_device_resource()->is_equal(*mr)) {
rmm::mr::set_current_device_resource(mr->get_upstream());
}
delete mr;
}}}
{
rmm::mr::set_current_device_resource(&mr_);
RAFT_CUDA_TRY(cudaEventCreate(&sync_, cudaEventDisableTiming));
}

~configured_raft_resources() noexcept
configured_raft_resources(configured_raft_resources&&) = default;
configured_raft_resources& operator=(configured_raft_resources&&) = default;
~configured_raft_resources() = default;
configured_raft_resources(const configured_raft_resources& res)
: configured_raft_resources{res.mr_}
{
}
configured_raft_resources& operator=(const configured_raft_resources& other)
{
RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(sync_));
if (rmm::mr::get_current_device_resource()->is_equal(mr_)) {
rmm::mr::set_current_device_resource(mr_.get_upstream());
}
this->mr_ = other.mr_;
return *this;
}

operator raft::resources&() noexcept { return res_; }
Expand All @@ -67,17 +96,17 @@ class configured_raft_resources {
/** Make the given stream wait on all work submitted to the resource. */
void stream_wait(cudaStream_t stream) const
{
RAFT_CUDA_TRY(cudaEventRecord(sync_, resource::get_cuda_stream(res_)));
RAFT_CUDA_TRY(cudaStreamWaitEvent(stream, sync_));
RAFT_CUDA_TRY(cudaEventRecord(*sync_, resource::get_cuda_stream(res_)));
RAFT_CUDA_TRY(cudaStreamWaitEvent(stream, *sync_));
}

/** Get the internal sync event (which otherwise used only in `stream_wait`). */
cudaEvent_t get_sync_event() const { return sync_; }
cudaEvent_t get_sync_event() const { return *sync_; }

private:
rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource> mr_;
std::shared_ptr<rmm::mr::pool_memory_resource<rmm::mr::device_memory_resource>> mr_;
std::unique_ptr<cudaEvent_t, std::function<void(cudaEvent_t*)>> sync_;
raft::device_resources res_;
cudaEvent_t sync_;
};

} // namespace raft::bench::ann
1 change: 1 addition & 0 deletions cpp/bench/ann/src/raft/raft_cagra_hnswlib_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class RaftCagraHnswlib : public ANN<T> {
}
void save(const std::string& file) const override;
void load(const std::string&) override;
std::unique_ptr<ANN<T>> copy() override;

private:
raft::device_resources handle_;
Expand Down
Loading

0 comments on commit 9359e89

Please sign in to comment.