Skip to content

Commit

Permalink
Fix ann-bench multithreading (#2021)
Browse files Browse the repository at this point in the history
In the current state, ann-benchmarks running in the `--throughput` mode (multi-threaded) share ANN wrappers among CPU threads. This is not thread-safe and may result in incorrectly measured time (e.g. sharing cuda events among CPU threads) or various exceptions and segfaults (e.g. doing state-changing cublas calls from multiple CPU threads).

This PR makes the search benchmarks copy ANN wrappers in each thread. The copies of the wrappers then selectively:
  - share thread-safe resources (e.g. rmm memory pool) and large objects that are not expected to change during search (e.g. index data);
  - duplicate the resources that are not thread-safe or carry the thread-specific state (e.g. cublas handles, CUDA events and streams).

Alongside, the PR adds a few small changes, including:
 - enables ann-bench NVTX annotations for the non-common-executable mode (shows benchmark labels and iterations in nsys timeline);
 - fixes compile errors for the common-executable mode.

Authors:
  - Artem M. Chirkin (https://github.com/achirkin)
  - William Hicks (https://github.com/wphicks)

Approvers:
  - William Hicks (https://github.com/wphicks)
  - Mark Harris (https://github.com/harrism)
  - Corey J. Nolet (https://github.com/cjnolet)

URL: #2021
  • Loading branch information
achirkin authored Dec 13, 2023
1 parent addb059 commit d9a7290
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 206 deletions.
45 changes: 33 additions & 12 deletions cpp/bench/ann/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ if(RAFT_ANN_BENCH_USE_FAISS)
include(cmake/thirdparty/get_faiss.cmake)
endif()

# ##################################################################################################
# * Enable NVTX if available

# Note: ANN_BENCH wrappers have extra NVTX code not related to raft::nvtx.They track gbench
# benchmark cases and iterations. This is to make limited NVTX available to all algos, not just
# raft.
if(TARGET CUDA::nvtx3)
set(_CMAKE_REQUIRED_INCLUDES_ORIG ${CMAKE_REQUIRED_INCLUDES})
get_target_property(CMAKE_REQUIRED_INCLUDES CUDA::nvtx3 INTERFACE_INCLUDE_DIRECTORIES)
unset(NVTX3_HEADERS_FOUND CACHE)
# Check the headers explicitly to make sure the cpu-only build succeeds
CHECK_INCLUDE_FILE_CXX(nvtx3/nvToolsExt.h NVTX3_HEADERS_FOUND)
set(CMAKE_REQUIRED_INCLUDES ${_CMAKE_REQUIRED_INCLUDES_ORIG})
endif()

# ##################################################################################################
# * Configure tests function-------------------------------------------------------------

Expand All @@ -141,8 +156,13 @@ function(ConfigureAnnBench)
add_dependencies(${BENCH_NAME} ANN_BENCH)
else()
add_executable(${BENCH_NAME} ${ConfigureAnnBench_PATH})
target_compile_definitions(${BENCH_NAME} PRIVATE ANN_BENCH_BUILD_MAIN)
target_link_libraries(${BENCH_NAME} PRIVATE benchmark::benchmark)
target_compile_definitions(
${BENCH_NAME} PRIVATE ANN_BENCH_BUILD_MAIN
$<$<BOOL:${NVTX3_HEADERS_FOUND}>:ANN_BENCH_NVTX3_HEADERS_FOUND>
)
target_link_libraries(
${BENCH_NAME} PRIVATE benchmark::benchmark $<$<BOOL:${NVTX3_HEADERS_FOUND}>:CUDA::nvtx3>
)
endif()

target_link_libraries(
Expand Down Expand Up @@ -340,8 +360,16 @@ if(RAFT_ANN_BENCH_SINGLE_EXE)
target_include_directories(ANN_BENCH PRIVATE ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES})

target_link_libraries(
ANN_BENCH PRIVATE nlohmann_json::nlohmann_json benchmark_static dl -static-libgcc
-static-libstdc++ CUDA::nvtx3
ANN_BENCH
PRIVATE raft::raft
nlohmann_json::nlohmann_json
benchmark_static
dl
-static-libgcc
fmt::fmt-header-only
spdlog::spdlog_header_only
-static-libstdc++
$<$<BOOL:${NVTX3_HEADERS_FOUND}>:CUDA::nvtx3>
)
set_target_properties(
ANN_BENCH
Expand All @@ -355,17 +383,10 @@ if(RAFT_ANN_BENCH_SINGLE_EXE)
BUILD_RPATH "\$ORIGIN"
INSTALL_RPATH "\$ORIGIN"
)

# Disable NVTX when the nvtx3 headers are missing
set(_CMAKE_REQUIRED_INCLUDES_ORIG ${CMAKE_REQUIRED_INCLUDES})
get_target_property(CMAKE_REQUIRED_INCLUDES ANN_BENCH INCLUDE_DIRECTORIES)
CHECK_INCLUDE_FILE_CXX(nvtx3/nvToolsExt.h NVTX3_HEADERS_FOUND)
set(CMAKE_REQUIRED_INCLUDES ${_CMAKE_REQUIRED_INCLUDES_ORIG})
target_compile_definitions(
ANN_BENCH
PRIVATE
$<$<BOOL:${CUDAToolkit_FOUND}>:ANN_BENCH_LINK_CUDART="libcudart.so.${CUDAToolkit_VERSION_MAJOR}.${CUDAToolkit_VERSION_MINOR}.${CUDAToolkit_VERSION_PATCH}
">
$<$<BOOL:${CUDAToolkit_FOUND}>:ANN_BENCH_LINK_CUDART="libcudart.so.${CUDAToolkit_VERSION_MAJOR}.${CUDAToolkit_VERSION_MINOR}.${CUDAToolkit_VERSION_PATCH}">
$<$<BOOL:${NVTX3_HEADERS_FOUND}>:ANN_BENCH_NVTX3_HEADERS_FOUND>
)

Expand Down
15 changes: 7 additions & 8 deletions cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "cuda_stub.hpp" // cudaStream_t

#include <memory>
#include <stdexcept>
#include <string>
#include <vector>
Expand Down Expand Up @@ -64,17 +65,10 @@ inline auto parse_memory_type(const std::string& memory_type) -> MemoryType
}
}

class AlgoProperty {
public:
inline AlgoProperty() {}
inline AlgoProperty(MemoryType dataset_memory_type_, MemoryType query_memory_type_)
: dataset_memory_type(dataset_memory_type_), query_memory_type(query_memory_type_)
{
}
struct AlgoProperty {
MemoryType dataset_memory_type;
// neighbors/distances should have same memory type as queries
MemoryType query_memory_type;
virtual ~AlgoProperty() = default;
};

class AnnBase {
Expand Down Expand Up @@ -125,6 +119,11 @@ class ANN : public AnnBase {
// The client code should call set_search_dataset() before searching,
// and should not release dataset before searching is finished.
virtual void set_search_dataset(const T* /*dataset*/, size_t /*nrow*/){};

/**
* Make a shallow copy of the ANN wrapper that shares the resources and ensures thread-safe access
* to them. */
virtual auto copy() -> std::unique_ptr<ANN<T>> = 0;
};

} // namespace raft::bench::ann
Expand Down
16 changes: 7 additions & 9 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::condition_variable cond_var;
std::atomic_int processed_threads{0};

static inline std::unique_ptr<AnnBase> current_algo{nullptr};
static inline std::shared_ptr<AlgoProperty> current_algo_props{nullptr};
static inline std::unique_ptr<AlgoProperty> current_algo_props{nullptr};

using kv_series = std::vector<std::tuple<std::string, std::vector<nlohmann::json>>>;

Expand Down Expand Up @@ -241,9 +241,8 @@ void bench_search(::benchmark::State& state,
return;
}

auto algo_property = parse_algo_property(algo->get_preference(), sp_json);
current_algo_props = std::make_shared<AlgoProperty>(algo_property.dataset_memory_type,
algo_property.query_memory_type);
current_algo_props = std::make_unique<AlgoProperty>(
std::move(parse_algo_property(algo->get_preference(), sp_json)));

if (search_param->needs_dataset()) {
try {
Expand Down Expand Up @@ -277,23 +276,22 @@ void bench_search(::benchmark::State& state,
// We are accessing shared variables (like current_algo, current_algo_probs) before the
// benchmark loop, therefore the synchronization here is necessary.
}
const auto algo_property = *current_algo_props;
query_set = dataset->query_set(algo_property.query_memory_type);
query_set = dataset->query_set(current_algo_props->query_memory_type);

/**
* Each thread will manage its own outputs
*/
std::shared_ptr<buf<float>> distances =
std::make_shared<buf<float>>(algo_property.query_memory_type, k * query_set_size);
std::make_shared<buf<float>>(current_algo_props->query_memory_type, k * query_set_size);
std::shared_ptr<buf<std::size_t>> neighbors =
std::make_shared<buf<std::size_t>>(algo_property.query_memory_type, k * query_set_size);
std::make_shared<buf<std::size_t>>(current_algo_props->query_memory_type, k * query_set_size);

cuda_timer gpu_timer;
auto start = std::chrono::high_resolution_clock::now();
{
nvtx_case nvtx{state.name()};

ANN<T>* algo = dynamic_cast<ANN<T>*>(current_algo.get());
auto algo = dynamic_cast<ANN<T>*>(current_algo.get())->copy();
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();
Expand Down
50 changes: 34 additions & 16 deletions cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,16 @@ class FaissCpu : public ANN<T> {
static_assert(std::is_same_v<T, float>, "faiss support only float type");
}

virtual ~FaissCpu() noexcept {}

void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) final;

void set_search_param(const AnnSearchParam& param) override;

void init_quantizer(int dim)
{
if (this->metric_type_ == faiss::MetricType::METRIC_L2) {
this->quantizer_ = std::make_unique<faiss::IndexFlatL2>(dim);
this->quantizer_ = std::make_shared<faiss::IndexFlatL2>(dim);
} else if (this->metric_type_ == faiss::MetricType::METRIC_INNER_PRODUCT) {
this->quantizer_ = std::make_unique<faiss::IndexFlatIP>(dim);
this->quantizer_ = std::make_shared<faiss::IndexFlatIP>(dim);
}
}

Expand Down Expand Up @@ -113,15 +111,15 @@ class FaissCpu : public ANN<T> {
template <typename Index>
void load_(const std::string& file);

std::unique_ptr<faiss::Index> index_;
std::unique_ptr<faiss::Index> quantizer_;
std::unique_ptr<faiss::IndexRefineFlat> index_refine_;
std::shared_ptr<faiss::Index> index_;
std::shared_ptr<faiss::Index> quantizer_;
std::shared_ptr<faiss::IndexRefineFlat> index_refine_;
faiss::MetricType metric_type_;
int nlist_;
double training_sample_fraction_;

int num_threads_;
std::unique_ptr<FixedThreadPool> thread_pool_;
std::shared_ptr<FixedThreadPool> thread_pool_;
};

template <typename T>
Expand Down Expand Up @@ -152,7 +150,7 @@ void FaissCpu<T>::build(const T* dataset, size_t nrow, cudaStream_t stream)
index_->train(nrow, dataset); // faiss::IndexFlat::train() will do nothing
assert(index_->is_trained);
index_->add(nrow, dataset);
index_refine_ = std::make_unique<faiss::IndexRefineFlat>(this->index_.get(), dataset);
index_refine_ = std::make_shared<faiss::IndexRefineFlat>(this->index_.get(), dataset);
}

template <typename T>
Expand All @@ -169,7 +167,7 @@ void FaissCpu<T>::set_search_param(const AnnSearchParam& param)

if (!thread_pool_ || num_threads_ != search_param.num_threads) {
num_threads_ = search_param.num_threads;
thread_pool_ = std::make_unique<FixedThreadPool>(num_threads_);
thread_pool_ = std::make_shared<FixedThreadPool>(num_threads_);
}
}

Expand Down Expand Up @@ -203,7 +201,7 @@ template <typename T>
template <typename Index>
void FaissCpu<T>::load_(const std::string& file)
{
index_ = std::unique_ptr<Index>(dynamic_cast<Index*>(faiss::read_index(file.c_str())));
index_ = std::shared_ptr<Index>(dynamic_cast<Index*>(faiss::read_index(file.c_str())));
}

template <typename T>
Expand All @@ -214,7 +212,7 @@ class FaissCpuIVFFlat : public FaissCpu<T> {
FaissCpuIVFFlat(Metric metric, int dim, const BuildParam& param) : FaissCpu<T>(metric, dim, param)
{
this->init_quantizer(dim);
this->index_ = std::make_unique<faiss::IndexIVFFlat>(
this->index_ = std::make_shared<faiss::IndexIVFFlat>(
this->quantizer_.get(), dim, param.nlist, this->metric_type_);
}

Expand All @@ -223,6 +221,11 @@ class FaissCpuIVFFlat : public FaissCpu<T> {
this->template save_<faiss::IndexIVFFlat>(file);
}
void load(const std::string& file) override { this->template load_<faiss::IndexIVFFlat>(file); }

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuIVFFlat<T>>(*this); // use copy constructor
}
};

template <typename T>
Expand All @@ -237,7 +240,7 @@ class FaissCpuIVFPQ : public FaissCpu<T> {
FaissCpuIVFPQ(Metric metric, int dim, const BuildParam& param) : FaissCpu<T>(metric, dim, param)
{
this->init_quantizer(dim);
this->index_ = std::make_unique<faiss::IndexIVFPQ>(
this->index_ = std::make_shared<faiss::IndexIVFPQ>(
this->quantizer_.get(), dim, param.nlist, param.M, param.bitsPerCode, this->metric_type_);
}

Expand All @@ -246,6 +249,11 @@ class FaissCpuIVFPQ : public FaissCpu<T> {
this->template save_<faiss::IndexIVFPQ>(file);
}
void load(const std::string& file) override { this->template load_<faiss::IndexIVFPQ>(file); }

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuIVFPQ<T>>(*this); // use copy constructor
}
};

// TODO: Enable this in cmake
Expand All @@ -270,7 +278,7 @@ class FaissCpuIVFSQ : public FaissCpu<T> {
}

this->init_quantizer(dim);
this->index_ = std::make_unique<faiss::IndexIVFScalarQuantizer>(
this->index_ = std::make_shared<faiss::IndexIVFScalarQuantizer>(
this->quantizer_.get(), dim, param.nlist, qtype, this->metric_type_, true);
}

Expand All @@ -282,6 +290,11 @@ class FaissCpuIVFSQ : public FaissCpu<T> {
{
this->template load_<faiss::IndexIVFScalarQuantizer>(file);
}

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuIVFSQ<T>>(*this); // use copy constructor
}
};

template <typename T>
Expand All @@ -290,7 +303,7 @@ class FaissCpuFlat : public FaissCpu<T> {
FaissCpuFlat(Metric metric, int dim)
: FaissCpu<T>(metric, dim, typename FaissCpu<T>::BuildParam{})
{
this->index_ = std::make_unique<faiss::IndexFlat>(dim, this->metric_type_);
this->index_ = std::make_shared<faiss::IndexFlat>(dim, this->metric_type_);
}

// class FaissCpu is more like a IVF class, so need special treating here
Expand All @@ -299,7 +312,7 @@ class FaissCpuFlat : public FaissCpu<T> {
auto search_param = dynamic_cast<const typename FaissCpu<T>::SearchParam&>(param);
if (!this->thread_pool_ || this->num_threads_ != search_param.num_threads) {
this->num_threads_ = search_param.num_threads;
this->thread_pool_ = std::make_unique<FixedThreadPool>(this->num_threads_);
this->thread_pool_ = std::make_shared<FixedThreadPool>(this->num_threads_);
}
};

Expand All @@ -308,6 +321,11 @@ class FaissCpuFlat : public FaissCpu<T> {
this->template save_<faiss::IndexFlat>(file);
}
void load(const std::string& file) override { this->template load_<faiss::IndexFlat>(file); }

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuFlat<T>>(*this); // use copy constructor
}
};

} // namespace raft::bench::ann
Loading

0 comments on commit d9a7290

Please sign in to comment.