Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ann-bench multithreading #2021

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions cpp/bench/ann/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ if(RAFT_ANN_BENCH_USE_FAISS)
include(cmake/thirdparty/get_faiss.cmake)
endif()

# ##################################################################################################
# * Enable NVTX if available

# Note: ANN_BENCH wrappers have extra NVTX code not related to raft::nvtx.They track gbench
# benchmark cases and iterations. This is to make limited NVTX available to all algos, not just
# raft.
if(TARGET CUDA::nvtx3)
set(_CMAKE_REQUIRED_INCLUDES_ORIG ${CMAKE_REQUIRED_INCLUDES})
get_target_property(CMAKE_REQUIRED_INCLUDES CUDA::nvtx3 INTERFACE_INCLUDE_DIRECTORIES)
unset(NVTX3_HEADERS_FOUND CACHE)
# Check the headers explicitly to make sure the cpu-only build succeeds
CHECK_INCLUDE_FILE_CXX(nvtx3/nvToolsExt.h NVTX3_HEADERS_FOUND)
set(CMAKE_REQUIRED_INCLUDES ${_CMAKE_REQUIRED_INCLUDES_ORIG})
endif()

# ##################################################################################################
# * Configure tests function-------------------------------------------------------------

Expand All @@ -141,8 +156,13 @@ function(ConfigureAnnBench)
add_dependencies(${BENCH_NAME} ANN_BENCH)
else()
add_executable(${BENCH_NAME} ${ConfigureAnnBench_PATH})
target_compile_definitions(${BENCH_NAME} PRIVATE ANN_BENCH_BUILD_MAIN)
target_link_libraries(${BENCH_NAME} PRIVATE benchmark::benchmark)
target_compile_definitions(
${BENCH_NAME} PRIVATE ANN_BENCH_BUILD_MAIN
$<$<BOOL:${NVTX3_HEADERS_FOUND}>:ANN_BENCH_NVTX3_HEADERS_FOUND>
)
target_link_libraries(
${BENCH_NAME} PRIVATE benchmark::benchmark $<$<BOOL:${NVTX3_HEADERS_FOUND}>:CUDA::nvtx3>
)
endif()

target_link_libraries(
Expand Down Expand Up @@ -340,8 +360,15 @@ if(RAFT_ANN_BENCH_SINGLE_EXE)
target_include_directories(ANN_BENCH PRIVATE ${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES})

target_link_libraries(
ANN_BENCH PRIVATE nlohmann_json::nlohmann_json benchmark_static dl -static-libgcc
-static-libstdc++ CUDA::nvtx3
ANN_BENCH
PRIVATE nlohmann_json::nlohmann_json
benchmark_static
dl
-static-libgcc
fmt::fmt-header-only
spdlog::spdlog_header_only
-static-libstdc++
$<$<BOOL:${NVTX3_HEADERS_FOUND}>:CUDA::nvtx3>
)
set_target_properties(
ANN_BENCH
Expand All @@ -355,17 +382,10 @@ if(RAFT_ANN_BENCH_SINGLE_EXE)
BUILD_RPATH "\$ORIGIN"
INSTALL_RPATH "\$ORIGIN"
)

# Disable NVTX when the nvtx3 headers are missing
set(_CMAKE_REQUIRED_INCLUDES_ORIG ${CMAKE_REQUIRED_INCLUDES})
get_target_property(CMAKE_REQUIRED_INCLUDES ANN_BENCH INCLUDE_DIRECTORIES)
CHECK_INCLUDE_FILE_CXX(nvtx3/nvToolsExt.h NVTX3_HEADERS_FOUND)
set(CMAKE_REQUIRED_INCLUDES ${_CMAKE_REQUIRED_INCLUDES_ORIG})
target_compile_definitions(
ANN_BENCH
PRIVATE
$<$<BOOL:${CUDAToolkit_FOUND}>:ANN_BENCH_LINK_CUDART="libcudart.so.${CUDAToolkit_VERSION_MAJOR}.${CUDAToolkit_VERSION_MINOR}.${CUDAToolkit_VERSION_PATCH}
">
$<$<BOOL:${CUDAToolkit_FOUND}>:ANN_BENCH_LINK_CUDART="libcudart.so.${CUDAToolkit_VERSION_MAJOR}.${CUDAToolkit_VERSION_MINOR}.${CUDAToolkit_VERSION_PATCH}">
$<$<BOOL:${NVTX3_HEADERS_FOUND}>:ANN_BENCH_NVTX3_HEADERS_FOUND>
)

Expand Down
15 changes: 7 additions & 8 deletions cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "cuda_stub.hpp" // cudaStream_t

#include <memory>
#include <stdexcept>
#include <string>
#include <vector>
Expand Down Expand Up @@ -64,17 +65,10 @@ inline auto parse_memory_type(const std::string& memory_type) -> MemoryType
}
}

class AlgoProperty {
public:
inline AlgoProperty() {}
inline AlgoProperty(MemoryType dataset_memory_type_, MemoryType query_memory_type_)
: dataset_memory_type(dataset_memory_type_), query_memory_type(query_memory_type_)
{
}
struct AlgoProperty {
MemoryType dataset_memory_type;
// neighbors/distances should have same memory type as queries
MemoryType query_memory_type;
virtual ~AlgoProperty() = default;
};

class AnnBase {
Expand Down Expand Up @@ -125,6 +119,11 @@ class ANN : public AnnBase {
// The client code should call set_search_dataset() before searching,
// and should not release dataset before searching is finished.
virtual void set_search_dataset(const T* /*dataset*/, size_t /*nrow*/){};

/**
* Make a shallow copy of the ANN wrapper that shares the resources and ensures thread-safe access
* to them. */
virtual auto copy() -> std::unique_ptr<ANN<T>> = 0;
};

} // namespace raft::bench::ann
Expand Down
16 changes: 7 additions & 9 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::condition_variable cond_var;
std::atomic_int processed_threads{0};

static inline std::unique_ptr<AnnBase> current_algo{nullptr};
static inline std::shared_ptr<AlgoProperty> current_algo_props{nullptr};
static inline std::unique_ptr<AlgoProperty> current_algo_props{nullptr};

using kv_series = std::vector<std::tuple<std::string, std::vector<nlohmann::json>>>;

Expand Down Expand Up @@ -241,9 +241,8 @@ void bench_search(::benchmark::State& state,
return;
}

auto algo_property = parse_algo_property(algo->get_preference(), sp_json);
current_algo_props = std::make_shared<AlgoProperty>(algo_property.dataset_memory_type,
algo_property.query_memory_type);
current_algo_props = std::make_unique<AlgoProperty>(
std::move(parse_algo_property(algo->get_preference(), sp_json)));

if (search_param->needs_dataset()) {
try {
Expand Down Expand Up @@ -277,23 +276,22 @@ void bench_search(::benchmark::State& state,
// We are accessing shared variables (like current_algo, current_algo_probs) before the
// benchmark loop, therefore the synchronization here is necessary.
}
const auto algo_property = *current_algo_props;
query_set = dataset->query_set(algo_property.query_memory_type);
query_set = dataset->query_set(current_algo_props->query_memory_type);

/**
* Each thread will manage its own outputs
*/
std::shared_ptr<buf<float>> distances =
std::make_shared<buf<float>>(algo_property.query_memory_type, k * query_set_size);
std::make_shared<buf<float>>(current_algo_props->query_memory_type, k * query_set_size);
std::shared_ptr<buf<std::size_t>> neighbors =
std::make_shared<buf<std::size_t>>(algo_property.query_memory_type, k * query_set_size);
std::make_shared<buf<std::size_t>>(current_algo_props->query_memory_type, k * query_set_size);

cuda_timer gpu_timer;
auto start = std::chrono::high_resolution_clock::now();
{
nvtx_case nvtx{state.name()};

ANN<T>* algo = dynamic_cast<ANN<T>*>(current_algo.get());
auto algo = dynamic_cast<ANN<T>*>(current_algo.get())->copy();
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();
Expand Down
50 changes: 34 additions & 16 deletions cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,16 @@ class FaissCpu : public ANN<T> {
static_assert(std::is_same_v<T, float>, "faiss support only float type");
}

virtual ~FaissCpu() noexcept {}

void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) final;

void set_search_param(const AnnSearchParam& param) override;

void init_quantizer(int dim)
{
if (this->metric_type_ == faiss::MetricType::METRIC_L2) {
this->quantizer_ = std::make_unique<faiss::IndexFlatL2>(dim);
this->quantizer_ = std::make_shared<faiss::IndexFlatL2>(dim);
} else if (this->metric_type_ == faiss::MetricType::METRIC_INNER_PRODUCT) {
this->quantizer_ = std::make_unique<faiss::IndexFlatIP>(dim);
this->quantizer_ = std::make_shared<faiss::IndexFlatIP>(dim);
}
}

Expand Down Expand Up @@ -113,15 +111,15 @@ class FaissCpu : public ANN<T> {
template <typename Index>
void load_(const std::string& file);

std::unique_ptr<faiss::Index> index_;
std::unique_ptr<faiss::Index> quantizer_;
std::unique_ptr<faiss::IndexRefineFlat> index_refine_;
std::shared_ptr<faiss::Index> index_;
std::shared_ptr<faiss::Index> quantizer_;
std::shared_ptr<faiss::IndexRefineFlat> index_refine_;
faiss::MetricType metric_type_;
int nlist_;
double training_sample_fraction_;

int num_threads_;
std::unique_ptr<FixedThreadPool> thread_pool_;
std::shared_ptr<FixedThreadPool> thread_pool_;
};

template <typename T>
Expand Down Expand Up @@ -152,7 +150,7 @@ void FaissCpu<T>::build(const T* dataset, size_t nrow, cudaStream_t stream)
index_->train(nrow, dataset); // faiss::IndexFlat::train() will do nothing
assert(index_->is_trained);
index_->add(nrow, dataset);
index_refine_ = std::make_unique<faiss::IndexRefineFlat>(this->index_.get(), dataset);
index_refine_ = std::make_shared<faiss::IndexRefineFlat>(this->index_.get(), dataset);
}

template <typename T>
Expand All @@ -169,7 +167,7 @@ void FaissCpu<T>::set_search_param(const AnnSearchParam& param)

if (!thread_pool_ || num_threads_ != search_param.num_threads) {
num_threads_ = search_param.num_threads;
thread_pool_ = std::make_unique<FixedThreadPool>(num_threads_);
thread_pool_ = std::make_shared<FixedThreadPool>(num_threads_);
}
}

Expand Down Expand Up @@ -203,7 +201,7 @@ template <typename T>
template <typename Index>
void FaissCpu<T>::load_(const std::string& file)
{
index_ = std::unique_ptr<Index>(dynamic_cast<Index*>(faiss::read_index(file.c_str())));
index_ = std::shared_ptr<Index>(dynamic_cast<Index*>(faiss::read_index(file.c_str())));
}

template <typename T>
Expand All @@ -214,7 +212,7 @@ class FaissCpuIVFFlat : public FaissCpu<T> {
FaissCpuIVFFlat(Metric metric, int dim, const BuildParam& param) : FaissCpu<T>(metric, dim, param)
{
this->init_quantizer(dim);
this->index_ = std::make_unique<faiss::IndexIVFFlat>(
this->index_ = std::make_shared<faiss::IndexIVFFlat>(
this->quantizer_.get(), dim, param.nlist, this->metric_type_);
}

Expand All @@ -223,6 +221,11 @@ class FaissCpuIVFFlat : public FaissCpu<T> {
this->template save_<faiss::IndexIVFFlat>(file);
}
void load(const std::string& file) override { this->template load_<faiss::IndexIVFFlat>(file); }

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuIVFFlat<T>>(*this); // use copy constructor
}
};

template <typename T>
Expand All @@ -237,7 +240,7 @@ class FaissCpuIVFPQ : public FaissCpu<T> {
FaissCpuIVFPQ(Metric metric, int dim, const BuildParam& param) : FaissCpu<T>(metric, dim, param)
{
this->init_quantizer(dim);
this->index_ = std::make_unique<faiss::IndexIVFPQ>(
this->index_ = std::make_shared<faiss::IndexIVFPQ>(
this->quantizer_.get(), dim, param.nlist, param.M, param.bitsPerCode, this->metric_type_);
}

Expand All @@ -246,6 +249,11 @@ class FaissCpuIVFPQ : public FaissCpu<T> {
this->template save_<faiss::IndexIVFPQ>(file);
}
void load(const std::string& file) override { this->template load_<faiss::IndexIVFPQ>(file); }

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuIVFPQ<T>>(*this); // use copy constructor
}
};

// TODO: Enable this in cmake
Expand All @@ -270,7 +278,7 @@ class FaissCpuIVFSQ : public FaissCpu<T> {
}

this->init_quantizer(dim);
this->index_ = std::make_unique<faiss::IndexIVFScalarQuantizer>(
this->index_ = std::make_shared<faiss::IndexIVFScalarQuantizer>(
this->quantizer_.get(), dim, param.nlist, qtype, this->metric_type_, true);
}

Expand All @@ -282,6 +290,11 @@ class FaissCpuIVFSQ : public FaissCpu<T> {
{
this->template load_<faiss::IndexIVFScalarQuantizer>(file);
}

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuIVFSQ<T>>(*this); // use copy constructor
}
};

template <typename T>
Expand All @@ -290,7 +303,7 @@ class FaissCpuFlat : public FaissCpu<T> {
FaissCpuFlat(Metric metric, int dim)
: FaissCpu<T>(metric, dim, typename FaissCpu<T>::BuildParam{})
{
this->index_ = std::make_unique<faiss::IndexFlat>(dim, this->metric_type_);
this->index_ = std::make_shared<faiss::IndexFlat>(dim, this->metric_type_);
}

// class FaissCpu is more like a IVF class, so need special treating here
Expand All @@ -299,7 +312,7 @@ class FaissCpuFlat : public FaissCpu<T> {
auto search_param = dynamic_cast<const typename FaissCpu<T>::SearchParam&>(param);
if (!this->thread_pool_ || this->num_threads_ != search_param.num_threads) {
this->num_threads_ = search_param.num_threads;
this->thread_pool_ = std::make_unique<FixedThreadPool>(this->num_threads_);
this->thread_pool_ = std::make_shared<FixedThreadPool>(this->num_threads_);
}
};

Expand All @@ -308,6 +321,11 @@ class FaissCpuFlat : public FaissCpu<T> {
this->template save_<faiss::IndexFlat>(file);
}
void load(const std::string& file) override { this->template load_<faiss::IndexFlat>(file); }

std::unique_ptr<ANN<T>> copy()
{
return std::make_unique<FaissCpuFlat<T>>(*this); // use copy constructor
}
};

} // namespace raft::bench::ann
Loading