Skip to content

Commit

Permalink
Fix ANN bench latency (#1940)
Browse files Browse the repository at this point in the history
This adds explicit latency column to the benchmark.

Authors:
  - Tamas Bela Feher (https://github.com/tfeher)
  - Corey J. Nolet (https://github.com/cjnolet)

Approvers:
  - Corey J. Nolet (https://github.com/cjnolet)

URL: #1940
  • Loading branch information
tfeher authored Nov 3, 2023
1 parent d244beb commit b21cad3
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 51 deletions.
107 changes: 71 additions & 36 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
#include <algorithm>
#include <chrono>
#include <cmath>
#include <condition_variable>
#include <cstdint>
#include <fstream>
#include <limits>
#include <memory>
#include <mutex>
#include <numeric>
#include <string>
#include <unistd.h>
#include <vector>

namespace raft::bench::ann {

std::mutex init_mutex;
std::condition_variable cond_var;

static inline std::unique_ptr<AnnBase> current_algo{nullptr};
static inline std::shared_ptr<AlgoProperty> current_algo_props{nullptr};

Expand Down Expand Up @@ -172,8 +176,6 @@ void bench_search(::benchmark::State& state,
std::ptrdiff_t batch_offset = 0;
std::size_t queries_processed = 0;

double total_time = 0;

const auto& sp_json = index.search_params[search_param_ix];

if (state.thread_index() == 0) { dump_parameters(state, sp_json); }
Expand All @@ -185,6 +187,8 @@ void bench_search(::benchmark::State& state,
// Round down the query data to a multiple of the batch size to loop over full batches of data
const std::size_t query_set_size = (dataset->query_set_size() / n_queries) * n_queries;

const T* query_set = nullptr;

if (!file_exists(index.file)) {
state.SkipWithError("Index file is missing. Run the benchmark in the build mode first.");
return;
Expand All @@ -194,6 +198,7 @@ void bench_search(::benchmark::State& state,
* Make sure the first thread loads the algo and dataset
*/
if (state.thread_index() == 0) {
std::lock_guard lk(init_mutex);
// algo is static to cache it between close search runs to save time on index loading
static std::string index_file = "";
if (index.file != index_file) {
Expand Down Expand Up @@ -233,18 +238,26 @@ void bench_search(::benchmark::State& state,
return;
}
}

try {
algo->set_search_param(*search_param);

} catch (const std::exception& ex) {
state.SkipWithError("An error occurred setting search parameters: " + std::string(ex.what()));
return;
}
}

query_set = dataset->query_set(current_algo_props->query_memory_type);
cond_var.notify_all();
} else {
// All other threads will wait for the first thread to initialize the algo.
std::unique_lock lk(init_mutex);
cond_var.wait(lk, [] { return current_algo_props.get() != nullptr; });
// gbench ensures that all threads are synchronized at the start of the benchmark loop.
// We are accessing shared variables (like current_algo, current_algo_probs) before the
// benchmark loop, therefore the synchronization here is necessary.
}
const auto algo_property = *current_algo_props;
const T* query_set = dataset->query_set(algo_property.query_memory_type);
query_set = dataset->query_set(algo_property.query_memory_type);

/**
* Each thread will manage its own outputs
Expand All @@ -265,7 +278,6 @@ void bench_search(::benchmark::State& state,
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();

auto start = std::chrono::high_resolution_clock::now();
// run the search
try {
algo->search(query_set + batch_offset * dataset->dim(),
Expand All @@ -278,24 +290,21 @@ void bench_search(::benchmark::State& state,
state.SkipWithError(std::string(e.what()));
}

auto end = std::chrono::high_resolution_clock::now();

auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(end - start);
// advance to the next batch
batch_offset = (batch_offset + n_queries) % query_set_size;
queries_processed += n_queries;
state.SetIterationTime(elapsed_seconds.count());
total_time += elapsed_seconds.count();
}
}
auto end = std::chrono::high_resolution_clock::now();
if (state.thread_index() == 0) {
auto duration = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
state.counters.insert({{"end_to_end", duration}});
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
if (state.thread_index() == 0) { state.counters.insert({{"end_to_end", duration}}); }
state.counters.insert(
{"Latency", {duration / double(state.iterations()), benchmark::Counter::kAvgThreads}});

state.SetItemsProcessed(queries_processed);
if (cudart.found()) {
state.counters.insert({{"GPU", gpu_timer.total_time() / double(state.iterations())}});
double gpu_time_per_iteration = gpu_timer.total_time() / (double)state.iterations();
state.counters.insert({"GPU", {gpu_time_per_iteration, benchmark::Counter::kAvgThreads}});
}

// This will be the total number of queries across all threads
Expand Down Expand Up @@ -341,6 +350,7 @@ inline void printf_usage()
" [--index_prefix=<prefix>]\n"
" [--override_kv=<key:value1:value2:...:valueN>]\n"
" [--mode=<latency|throughput>\n"
" [--threads=min[:max]]\n"
" <conf>.json\n"
"\n"
"Note the non-standard benchmark parameters:\n"
Expand All @@ -359,8 +369,12 @@ inline void printf_usage()
" you can use this parameter multiple times to get the Cartesian product of benchmark"
" configs.\n"
" --mode=<latency|throughput>"
" run the benchmarks in latency (accumulate times spent in each batch) or "
" throughput (pipeline batches and measure end-to-end) mode\n");
" run the benchmarks in latency (accumulate times spent in each batch) or "
" throughput (pipeline batches and measure end-to-end) mode\n"
" --threads=min[:max] specify the number threads to use for throughput benchmark."
" Power of 2 values between 'min' and 'max' will be used. If only 'min' is specified,"
" then a single test is run with 'min' threads. By default min=1, max=<num hyper"
" threads>.\n");
}

template <typename T>
Expand All @@ -385,29 +399,23 @@ void register_build(std::shared_ptr<const Dataset<T>> dataset,
template <typename T>
void register_search(std::shared_ptr<const Dataset<T>> dataset,
std::vector<Configuration::Index> indices,
Objective metric_objective)
Objective metric_objective,
const std::vector<int>& threads)
{
for (auto index : indices) {
for (std::size_t i = 0; i < index.search_params.size(); i++) {
auto suf = static_cast<std::string>(index.search_params[i]["override_suffix"]);
index.search_params[i].erase("override_suffix");

int max_threads =
metric_objective == Objective::THROUGHPUT ? std::thread::hardware_concurrency() : 1;

auto* b = ::benchmark::RegisterBenchmark(
index.name + suf, bench_search<T>, index, i, dataset, metric_objective)
->Unit(benchmark::kMillisecond)
->ThreadRange(1, max_threads)

->ThreadRange(threads[0], threads[1])
/**
* The following are important for getting accuracy QPS measurements on both CPU
* and GPU These make sure that
* - `end_to_end` ~ (`Time` * `Iterations`)
* - `items_per_second` ~ (`total_queries` / `end_to_end`)
* - `Time` = `end_to_end` / `Iterations`
*
* - Latency = `Time`
* - Throughput = `items_per_second`
*/
->MeasureProcessCPUTime()
Expand All @@ -424,7 +432,8 @@ void dispatch_benchmark(const Configuration& conf,
std::string data_prefix,
std::string index_prefix,
kv_series override_kv,
Objective metric_objective)
Objective metric_objective,
const std::vector<int>& threads)
{
if (cudart.found()) {
for (auto [key, value] : cuda_info()) {
Expand Down Expand Up @@ -493,7 +502,7 @@ void dispatch_benchmark(const Configuration& conf,
index.search_params = apply_overrides(index.search_params, override_kv);
index.file = combine_path(index_prefix, index.file);
}
register_search<T>(dataset, indices, metric_objective);
register_search<T>(dataset, indices, metric_objective, threads);
}
}

Expand Down Expand Up @@ -525,6 +534,8 @@ inline auto run_main(int argc, char** argv) -> int
std::string index_prefix = "index";
std::string new_override_kv = "";
std::string mode = "latency";
std::string threads_arg_txt = "";
std::vector<int> threads = {1, -1}; // min_thread, max_thread
kv_series override_kv{};

char arg0_default[] = "benchmark"; // NOLINT
Expand All @@ -548,7 +559,18 @@ inline auto run_main(int argc, char** argv) -> int
parse_string_flag(argv[i], "--data_prefix", data_prefix) ||
parse_string_flag(argv[i], "--index_prefix", index_prefix) ||
parse_string_flag(argv[i], "--mode", mode) ||
parse_string_flag(argv[i], "--override_kv", new_override_kv)) {
parse_string_flag(argv[i], "--override_kv", new_override_kv) ||
parse_string_flag(argv[i], "--threads", threads_arg_txt)) {
if (!threads_arg_txt.empty()) {
auto threads_arg = split(threads_arg_txt, ':');
threads[0] = std::stoi(threads_arg[0]);
if (threads_arg.size() > 1) {
threads[1] = std::stoi(threads_arg[1]);
} else {
threads[1] = threads[0];
}
threads_arg_txt = "";
}
if (!new_override_kv.empty()) {
auto kvv = split(new_override_kv, ':');
auto key = kvv[0];
Expand All @@ -570,6 +592,17 @@ inline auto run_main(int argc, char** argv) -> int
Objective metric_objective = Objective::LATENCY;
if (mode == "throughput") { metric_objective = Objective::THROUGHPUT; }

int max_threads =
(metric_objective == Objective::THROUGHPUT) ? std::thread::hardware_concurrency() : 1;
if (threads[1] == -1) threads[1] = max_threads;

if (metric_objective == Objective::LATENCY) {
if (threads[0] != 1 || threads[1] != 1) {
log_warn("Latency mode enabled. Overriding threads arg, running with single thread.");
threads = {1, 1};
}
}

if (build_mode == search_mode) {
log_error("One and only one of --build and --search should be specified");
printf_usage();
Expand All @@ -596,7 +629,8 @@ inline auto run_main(int argc, char** argv) -> int
data_prefix,
index_prefix,
override_kv,
metric_objective);
metric_objective,
threads);
} else if (dtype == "uint8") {
dispatch_benchmark<std::uint8_t>(conf,
force_overwrite,
Expand All @@ -605,7 +639,8 @@ inline auto run_main(int argc, char** argv) -> int
data_prefix,
index_prefix,
override_kv,
metric_objective);
metric_objective,
threads);
} else if (dtype == "int8") {
dispatch_benchmark<std::int8_t>(conf,
force_overwrite,
Expand All @@ -614,7 +649,8 @@ inline auto run_main(int argc, char** argv) -> int
data_prefix,
index_prefix,
override_kv,
metric_objective);
metric_objective,
threads);
} else {
log_error("datatype '%s' is not supported", dtype.c_str());
return -1;
Expand All @@ -629,5 +665,4 @@ inline auto run_main(int argc, char** argv) -> int
current_algo.reset();
return 0;
}

}; // namespace raft::bench::ann
3 changes: 3 additions & 0 deletions cpp/bench/ann/src/common/cuda_stub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ namespace stub {
{
return cudaSuccess;
}
[[gnu::weak, gnu::noinline]] cudaError_t cudaDeviceSynchronize() { return cudaSuccess; }

[[gnu::weak, gnu::noinline]] cudaError_t cudaStreamSynchronize(cudaStream_t pStream)
{
return cudaSuccess;
Expand Down Expand Up @@ -214,6 +216,7 @@ RAFT_DECLARE_CUDART(cudaFree);
RAFT_DECLARE_CUDART(cudaStreamCreate);
RAFT_DECLARE_CUDART(cudaStreamCreateWithFlags);
RAFT_DECLARE_CUDART(cudaStreamDestroy);
RAFT_DECLARE_CUDART(cudaDeviceSynchronize);
RAFT_DECLARE_CUDART(cudaStreamSynchronize);
RAFT_DECLARE_CUDART(cudaEventCreate);
RAFT_DECLARE_CUDART(cudaEventRecord);
Expand Down
33 changes: 19 additions & 14 deletions docs/source/raft_ann_benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,22 +397,27 @@ The benchmarks capture several different measurements. The table below describes
| index_size | Number of vectors used to train index |
The table below describes each of the measurements for the index search benchmarks:
| Name | Description |
|------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Benchmark | A name that uniquely identifies the benchmark instance |
| Time | The average runtime for each batch. This is approximately `end_to_end` / `Iterations` |
| CPU | The average `wall-time`. In `throughput` mode, this is the average `wall-time` spent in each thread. |
| Iterations | Total number of batches. This is going to be `total_queres` / `n_queries` |
| Recall | Proportion of correct neighbors to ground truth neighbors. Note this column is only present if groundtruth file is specified in dataset configuration |
| items_per_second | Total throughput. This is approximately `total_queries` / `end_to_end`. |
| k | Number of neighbors being queried in each iteration |
The table below describes each of the measurements for the index search benchmarks. The most important measurements `Latency`, `items_per_second`, `end_to_end`.
| Name | Description |
|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|
| Benchmark | A name that uniquely identifies the benchmark instance |
| Time | The wall-clock time of a single iteration (batch) divided by the number of threads. |
| CPU | The average CPU time (user + sys time). This does not include idle time (which can also happen while waiting for GPU sync). |
| Iterations | Total number of batches. This is going to be `total_queries` / `n_queries`. |
| GPU | GPU latency of a single batch (seconds). In throughput mode this is averaged over multiple threads. |
| Latency | Latency of a single batch (seconds), calculated from wall-clock time. In throughput mode this is averaged over multiple threads. |
| Recall | Proportion of correct neighbors to ground truth neighbors. Note this column is only present if groundtruth file is specified in dataset configuration.|
| items_per_second | Total throughput, a.k.a Queries per second (QPS). This is approximately `total_queries` / `end_to_end`. |
| k | Number of neighbors being queried in each iteration |
| end_to_end | Total time taken to run all batches for all iterations |
| n_queries | Total number of query vectors in each batch |
| total_queries | Total number of vectors queries across all iterations |
| n_queries | Total number of query vectors in each batch |
| total_queries | Total number of vectors queries across all iterations ( = `iterations` * `n_queries`) |
Note that the actual table displayed on the screen may differ slightly as the hyper-parameters will also be displayed for each different combination being benchmarked.
Note the following:
- A slightly different method is used to measure `Time` and `end_to_end`. That is why `end_to_end` = `Time` * `Iterations` holds only approximately.
- The actual table displayed on the screen may differ slightly as the hyper-parameters will also be displayed for each different combination being benchmarked.
- Recall calculation: the number of queries processed per test depends on the number of iterations. Because of this, recall can show slight fluctuations if less neighbors are processed then it is available for the benchmark.
## Creating and customizing dataset configurations
Expand Down
20 changes: 19 additions & 1 deletion python/raft-ann-bench/src/raft-ann-bench/run/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def run_build_and_search(
search,
k,
batch_size,
search_threads,
mode="throughput",
):
for executable, ann_executable_path, algo in executables_to_run.keys():
Expand Down Expand Up @@ -128,14 +129,18 @@ def run_build_and_search(
"--benchmark_counters_tabular=true",
"--override_kv=k:%s" % k,
"--override_kv=n_queries:%s" % batch_size,
"--benchmark_min_warmup_time=0.01",
"--benchmark_min_warmup_time=1",
"--benchmark_out_format=json",
"--mode=%s" % mode,
"--benchmark_out="
+ f"{os.path.join(search_folder, f'{algo}.json')}",
]
if force:
cmd = cmd + ["--overwrite"]

if search_threads:
cmd = cmd + ["--threads=%s" % search_threads]

cmd = cmd + [temp_conf_filepath]
subprocess.run(cmd, check=True)

Expand Down Expand Up @@ -243,6 +248,18 @@ def main():
default="latency",
)

parser.add_argument(
"-t",
"--search-threads",
help="specify the number threads to use for throughput benchmark."
" Single value or a pair of min and max separated by ':'. "
"Example --threads=1:4. Power of 2 values between 'min' "
"and 'max' will be used. If only 'min' is specified, then a "
"single test is run with 'min' threads. By default min=1, "
"max=<num hyper threads>.",
default=None,
)

args = parser.parse_args()

# If both build and search are not provided,
Expand Down Expand Up @@ -444,6 +461,7 @@ def add_algo_group(group_list):
search,
k,
batch_size,
args.search_threads,
mode,
)

Expand Down

0 comments on commit b21cad3

Please sign in to comment.