Skip to content

Commit

Permalink
Improve analysis experience for ANN benchmarks (#2139)
Browse files Browse the repository at this point in the history
A few improvements to the benchmark executables aimed at improving the profiling/analysis experience using tools like Nsight Systems and possibly reducing benchmark overheads.  

- Reduce the number of spawned streams:
   1. Invert the stream-passing API: the GPU wrappers implement `AnnGPU` interface to provide the synchronization stream; `search` and `build` no longer take the stream as the argument, which simplifies CPU-only implementations.
   2. Create a global pool of streams, which are not deleted between benchmark cases. The algo wrapper may opt in to use these. As a result `nsys` timeline is much less cluttered.
   3. `cuda_timer` does not create a new stream now, but pushes the synchronization events directly to the streams provided by `AnnGPU`. This slightly reduces the number of CUDA driver calls and thus reduces the mutex congestion in the throughput mode.
- ~~Make raft algorithms use local memory workspace resources per-thread. This removes the mutex congestion that happened when many threads in the throughput mode tried to use rmm's pool allocator at the same time.~~ This one is removed/postponed because it could provoke OOM errors due to memory pools conflicting for the device memory.
- Add fp16 (half) support to the benchmark executables (currently, only IVF-PQ and CAGRA support dataset/queries in fp16)
- Add a `progress_barrier` RAII struct, so that the search throughput benchmark is more robust - doesn't deadlock on exceptions.
- Add a `rmm::mr::failure_callback_resource_adaptor` to wrap OOM erros with `raft::exception`. This essentially adds the backtrace to the reported errors to make debugging easier.

Authors:
  - Artem M. Chirkin (https://github.com/achirkin)

Approvers:
  - Tamas Bela Feher (https://github.com/tfeher)

URL: #2139
  • Loading branch information
achirkin authored Feb 18, 2024
1 parent 21291c8 commit 7342980
Show file tree
Hide file tree
Showing 21 changed files with 583 additions and 406 deletions.
35 changes: 26 additions & 9 deletions cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,13 +74,33 @@ struct AlgoProperty {
class AnnBase {
public:
inline AnnBase(Metric metric, int dim) : metric_(metric), dim_(dim) {}
virtual ~AnnBase() = default;
virtual ~AnnBase() noexcept = default;

protected:
Metric metric_;
int dim_;
};

/**
* The GPU-based algorithms, which do not perform CPU synchronization at the end of their build or
* search methods, must implement this interface.
*
* The `cuda_timer` / `cuda_lap` from `util.hpp` uses this stream to record GPU times with events
* and, if necessary, also synchronize (via events) between iterations.
*
* If the algo does not implement this interface, GPU timings are disabled.
*/
class AnnGPU {
public:
/**
* Return the main cuda stream for this algorithm.
* If any work is done in multiple streams, they should synchornize with the main stream at the
* end.
*/
[[nodiscard]] virtual auto get_sync_stream() const noexcept -> cudaStream_t = 0;
virtual ~AnnGPU() noexcept = default;
};

template <typename T>
class ANN : public AnnBase {
public:
Expand All @@ -91,18 +111,15 @@ class ANN : public AnnBase {
};

inline ANN(Metric metric, int dim) : AnnBase(metric, dim) {}
virtual ~ANN() noexcept override = default;

virtual void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) = 0;
virtual void build(const T* dataset, size_t nrow) = 0;

virtual void set_search_param(const AnnSearchParam& param) = 0;
// TODO: this assumes that an algorithm can always return k results.
// This is not always possible.
virtual void search(const T* queries,
int batch_size,
int k,
size_t* neighbors,
float* distances,
cudaStream_t stream = 0) const = 0;
virtual void search(
const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const = 0;

virtual void save(const std::string& file) const = 0;
virtual void load(const std::string& file) = 0;
Expand Down
84 changes: 48 additions & 36 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
#include <raft/core/logger.hpp>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <condition_variable>
#include <cstdint>
#include <fstream>
#include <limits>
Expand All @@ -38,11 +36,8 @@
#include <string>
#include <unistd.h>
#include <vector>
namespace raft::bench::ann {

std::mutex init_mutex;
std::condition_variable cond_var;
std::atomic_int processed_threads{0};
namespace raft::bench::ann {

static inline std::unique_ptr<AnnBase> current_algo{nullptr};
static inline std::unique_ptr<AlgoProperty> current_algo_props{nullptr};
Expand Down Expand Up @@ -126,6 +121,9 @@ void bench_build(::benchmark::State& state,
Configuration::Index index,
bool force_overwrite)
{
// NB: these two thread-local vars can be used within algo wrappers
raft::bench::ann::benchmark_thread_id = state.thread_index();
raft::bench::ann::benchmark_n_threads = state.threads();
dump_parameters(state, index.build_param);
if (file_exists(index.file)) {
if (force_overwrite) {
Expand All @@ -149,21 +147,23 @@ void bench_build(::benchmark::State& state,
const T* base_set = dataset->base_set(algo_property.dataset_memory_type);
std::size_t index_size = dataset->base_set_size();

cuda_timer gpu_timer;
cuda_timer gpu_timer{algo};
{
nvtx_case nvtx{state.name()};
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();
try {
algo->build(base_set, index_size, gpu_timer.stream());
algo->build(base_set, index_size);
} catch (const std::exception& e) {
state.SkipWithError(std::string(e.what()));
}
}
}
state.counters.insert(
{{"GPU", gpu_timer.total_time() / state.iterations()}, {"index_size", index_size}});
if (gpu_timer.active()) {
state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}});
}
state.counters.insert({{"index_size", index_size}});

if (state.skipped()) { return; }
make_sure_parent_dir_exists(index.file);
Expand All @@ -177,7 +177,10 @@ void bench_search(::benchmark::State& state,
std::shared_ptr<const Dataset<T>> dataset,
Objective metric_objective)
{
std::size_t queries_processed = 0;
// NB: these two thread-local vars can be used within algo wrappers
raft::bench::ann::benchmark_thread_id = state.thread_index();
raft::bench::ann::benchmark_n_threads = state.threads();
std::size_t queries_processed = 0;

const auto& sp_json = index.search_params[search_param_ix];

Expand All @@ -194,7 +197,8 @@ void bench_search(::benchmark::State& state,
std::stringstream msg;
msg << "Not enough queries in benchmark set. Expected " << n_queries << ", actual "
<< dataset->query_set_size();
return state.SkipWithError(msg.str());
state.SkipWithError(msg.str());
return;
}

// Each thread start from a different offset, so that the queries that they process do not
Expand All @@ -214,9 +218,8 @@ void bench_search(::benchmark::State& state,
/**
* Make sure the first thread loads the algo and dataset
*/
if (state.thread_index() == 0) {
std::unique_lock lk(init_mutex);
cond_var.wait(lk, [] { return processed_threads.load(std::memory_order_acquire) == 0; });
progress_barrier load_barrier{};
if (load_barrier.arrive(1) == 0) {
// algo is static to cache it between close search runs to save time on index loading
static std::string index_file = "";
if (index.file != index_file) {
Expand Down Expand Up @@ -257,21 +260,16 @@ void bench_search(::benchmark::State& state,
}
try {
algo->set_search_param(*search_param);

} catch (const std::exception& ex) {
state.SkipWithError("An error occurred setting search parameters: " + std::string(ex.what()));
return;
}

query_set = dataset->query_set(current_algo_props->query_memory_type);
processed_threads.store(state.threads(), std::memory_order_acq_rel);
cond_var.notify_all();
load_barrier.arrive(state.threads());
} else {
std::unique_lock lk(init_mutex);
// All other threads will wait for the first thread to initialize the algo.
cond_var.wait(lk, [&state] {
return processed_threads.load(std::memory_order_acquire) == state.threads();
});
load_barrier.wait(state.threads() * 2);
// gbench ensures that all threads are synchronized at the start of the benchmark loop.
// We are accessing shared variables (like current_algo, current_algo_probs) before the
// benchmark loop, therefore the synchronization here is necessary.
Expand All @@ -286,26 +284,31 @@ void bench_search(::benchmark::State& state,
std::shared_ptr<buf<std::size_t>> neighbors =
std::make_shared<buf<std::size_t>>(current_algo_props->query_memory_type, k * query_set_size);

cuda_timer gpu_timer;
{
nvtx_case nvtx{state.name()};

auto algo = dynamic_cast<ANN<T>*>(current_algo.get())->copy();
std::unique_ptr<ANN<T>> algo{nullptr};
try {
dynamic_cast<ANN<T>*>(current_algo.get())->copy().swap(algo);
} catch (const std::exception& e) {
state.SkipWithError("Algo::copy: " + std::string(e.what()));
return;
}
// Initialize with algo, so that the timer.lap() object can sync with algo::get_sync_stream()
cuda_timer gpu_timer{algo};
auto start = std::chrono::high_resolution_clock::now();
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();

// run the search
try {
algo->search(query_set + batch_offset * dataset->dim(),
n_queries,
k,
neighbors->data + out_offset * k,
distances->data + out_offset * k,
gpu_timer.stream());
distances->data + out_offset * k);
} catch (const std::exception& e) {
state.SkipWithError(std::string(e.what()));
state.SkipWithError("Benchmark loop: " + std::string(e.what()));
break;
}

// advance to the next batch
Expand All @@ -318,22 +321,19 @@ void bench_search(::benchmark::State& state,
auto duration = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
if (state.thread_index() == 0) { state.counters.insert({{"end_to_end", duration}}); }
state.counters.insert({"Latency", {duration, benchmark::Counter::kAvgIterations}});

if (gpu_timer.active()) {
state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}});
}
}

state.SetItemsProcessed(queries_processed);
if (cudart.found()) {
state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}});
}

// This will be the total number of queries across all threads
state.counters.insert({{"total_queries", queries_processed}});

if (state.skipped()) { return; }

// assume thread has finished processing successfully at this point
// last thread to finish processing notifies all
if (processed_threads-- == 0) { cond_var.notify_all(); }

// Each thread calculates recall on their partition of queries.
// evaluate recall
if (dataset->max_k() >= k) {
Expand Down Expand Up @@ -673,6 +673,16 @@ inline auto run_main(int argc, char** argv) -> int
override_kv,
metric_objective,
threads);
} else if (dtype == "half") {
dispatch_benchmark<half>(conf,
force_overwrite,
build_mode,
search_mode,
data_prefix,
index_prefix,
override_kv,
metric_objective,
threads);
} else if (dtype == "uint8") {
dispatch_benchmark<std::uint8_t>(conf,
force_overwrite,
Expand Down Expand Up @@ -705,6 +715,8 @@ inline auto run_main(int argc, char** argv) -> int
// Release a possibly cached ANN object, so that it cannot be alive longer than the handle
// to a shared library it depends on (dynamic benchmark executable).
current_algo.reset();
current_algo_props.reset();
reset_global_stream_pool();
return 0;
}
}; // namespace raft::bench::ann
13 changes: 9 additions & 4 deletions cpp/bench/ann/src/common/conf.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,11 +91,16 @@ class Configuration {
dataset_conf_.dtype = conf.at("dtype");
} else {
auto filename = dataset_conf_.base_file;
if (!filename.compare(filename.size() - 4, 4, "fbin")) {
if (filename.size() > 6 && filename.compare(filename.size() - 6, 6, "f16bin") == 0) {
dataset_conf_.dtype = "half";
} else if (filename.size() > 9 &&
filename.compare(filename.size() - 9, 9, "fp16.fbin") == 0) {
dataset_conf_.dtype = "half";
} else if (filename.size() > 4 && filename.compare(filename.size() - 4, 4, "fbin") == 0) {
dataset_conf_.dtype = "float";
} else if (!filename.compare(filename.size() - 5, 5, "u8bin")) {
} else if (filename.size() > 5 && filename.compare(filename.size() - 5, 5, "u8bin") == 0) {
dataset_conf_.dtype = "uint8";
} else if (!filename.compare(filename.size() - 5, 5, "i8bin")) {
} else if (filename.size() > 5 && filename.compare(filename.size() - 5, 5, "i8bin") == 0) {
dataset_conf_.dtype = "int8";
} else {
log_error("Could not determine data type of the dataset %s", filename.c_str());
Expand Down
5 changes: 4 additions & 1 deletion cpp/bench/ann/src/common/cuda_stub.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,14 +33,17 @@ ______________________________________________________________________________
*/

#ifndef BUILD_CPU_ONLY
#include <cuda_fp16.h>
#include <cuda_runtime_api.h>
#ifdef ANN_BENCH_LINK_CUDART
#include <cstring>
#include <dlfcn.h>
#endif
#else
#include <cstdint>
typedef void* cudaStream_t;
typedef void* cudaEvent_t;
typedef uint16_t half;
#endif

namespace raft::bench::ann {
Expand Down
10 changes: 2 additions & 8 deletions cpp/bench/ann/src/common/dataset.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,6 @@

#include "util.hpp"

#ifndef BUILD_CPU_ONLY
#include <cuda_fp16.h>
#else
typedef uint16_t half;
#endif

#include <errno.h>
#include <sys/mman.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -222,7 +216,7 @@ void BinFile<T>::check_suffix_()
throw std::runtime_error("BinFile<float> should has .fbin suffix: " + file_);
}
} else if constexpr (std::is_same_v<T, half>) {
if (suffix != "f16bin") {
if (suffix != "f16bin" && suffix != "fbin") {
throw std::runtime_error("BinFile<half> should has .f16bin suffix: " + file_);
}
} else if constexpr (std::is_same_v<T, int>) {
Expand Down
Loading

0 comments on commit 7342980

Please sign in to comment.