diff --git a/ci/build_wheel.sh b/ci/build_wheel.sh index cc90d65709..3dc7740a1e 100755 --- a/ci/build_wheel.sh +++ b/ci/build_wheel.sh @@ -38,7 +38,7 @@ fi if [[ ${package_name} == "raft-dask" ]]; then sed -r -i "s/pylibraft==(.*)\"/pylibraft${PACKAGE_CUDA_SUFFIX}==\1${alpha_spec}\"/g" ${pyproject_file} - sed -i "s/ucx-py/ucx-py${PACKAGE_CUDA_SUFFIX}/g" python/raft-dask/pyproject.toml + sed -r -i "s/ucx-py==(.*)\"/ucx-py${PACKAGE_CUDA_SUFFIX}==\1${alpha_spec}\"/g" ${pyproject_file} else sed -r -i "s/rmm(.*)\"/rmm${PACKAGE_CUDA_SUFFIX}\1${alpha_spec}\"/g" ${pyproject_file} fi diff --git a/cpp/bench/ann/src/common/benchmark.hpp b/cpp/bench/ann/src/common/benchmark.hpp index 3a930d288e..ecb5e366b5 100644 --- a/cpp/bench/ann/src/common/benchmark.hpp +++ b/cpp/bench/ann/src/common/benchmark.hpp @@ -25,17 +25,21 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include #include - namespace raft::bench::ann { +std::mutex init_mutex; +std::condition_variable cond_var; + static inline std::unique_ptr current_algo{nullptr}; static inline std::shared_ptr current_algo_props{nullptr}; @@ -172,8 +176,6 @@ void bench_search(::benchmark::State& state, std::ptrdiff_t batch_offset = 0; std::size_t queries_processed = 0; - double total_time = 0; - const auto& sp_json = index.search_params[search_param_ix]; if (state.thread_index() == 0) { dump_parameters(state, sp_json); } @@ -185,6 +187,8 @@ void bench_search(::benchmark::State& state, // Round down the query data to a multiple of the batch size to loop over full batches of data const std::size_t query_set_size = (dataset->query_set_size() / n_queries) * n_queries; + const T* query_set = nullptr; + if (!file_exists(index.file)) { state.SkipWithError("Index file is missing. Run the benchmark in the build mode first."); return; @@ -194,6 +198,7 @@ void bench_search(::benchmark::State& state, * Make sure the first thread loads the algo and dataset */ if (state.thread_index() == 0) { + std::lock_guard lk(init_mutex); // algo is static to cache it between close search runs to save time on index loading static std::string index_file = ""; if (index.file != index_file) { @@ -233,7 +238,6 @@ void bench_search(::benchmark::State& state, return; } } - try { algo->set_search_param(*search_param); @@ -241,10 +245,19 @@ void bench_search(::benchmark::State& state, state.SkipWithError("An error occurred setting search parameters: " + std::string(ex.what())); return; } - } + query_set = dataset->query_set(current_algo_props->query_memory_type); + cond_var.notify_all(); + } else { + // All other threads will wait for the first thread to initialize the algo. + std::unique_lock lk(init_mutex); + cond_var.wait(lk, [] { return current_algo_props.get() != nullptr; }); + // gbench ensures that all threads are synchronized at the start of the benchmark loop. + // We are accessing shared variables (like current_algo, current_algo_probs) before the + // benchmark loop, therefore the synchronization here is necessary. + } const auto algo_property = *current_algo_props; - const T* query_set = dataset->query_set(algo_property.query_memory_type); + query_set = dataset->query_set(algo_property.query_memory_type); /** * Each thread will manage its own outputs @@ -265,7 +278,6 @@ void bench_search(::benchmark::State& state, [[maybe_unused]] auto ntx_lap = nvtx.lap(); [[maybe_unused]] auto gpu_lap = gpu_timer.lap(); - auto start = std::chrono::high_resolution_clock::now(); // run the search try { algo->search(query_set + batch_offset * dataset->dim(), @@ -278,24 +290,21 @@ void bench_search(::benchmark::State& state, state.SkipWithError(std::string(e.what())); } - auto end = std::chrono::high_resolution_clock::now(); - - auto elapsed_seconds = std::chrono::duration_cast>(end - start); // advance to the next batch batch_offset = (batch_offset + n_queries) % query_set_size; queries_processed += n_queries; - state.SetIterationTime(elapsed_seconds.count()); - total_time += elapsed_seconds.count(); } } - auto end = std::chrono::high_resolution_clock::now(); - if (state.thread_index() == 0) { - auto duration = std::chrono::duration_cast>(end - start).count(); - state.counters.insert({{"end_to_end", duration}}); - } + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast>(end - start).count(); + if (state.thread_index() == 0) { state.counters.insert({{"end_to_end", duration}}); } + state.counters.insert( + {"Latency", {duration / double(state.iterations()), benchmark::Counter::kAvgThreads}}); + state.SetItemsProcessed(queries_processed); if (cudart.found()) { - state.counters.insert({{"GPU", gpu_timer.total_time() / double(state.iterations())}}); + double gpu_time_per_iteration = gpu_timer.total_time() / (double)state.iterations(); + state.counters.insert({"GPU", {gpu_time_per_iteration, benchmark::Counter::kAvgThreads}}); } // This will be the total number of queries across all threads @@ -341,6 +350,7 @@ inline void printf_usage() " [--index_prefix=]\n" " [--override_kv=]\n" " [--mode=\n" + " [--threads=min[:max]]\n" " .json\n" "\n" "Note the non-standard benchmark parameters:\n" @@ -359,8 +369,12 @@ inline void printf_usage() " you can use this parameter multiple times to get the Cartesian product of benchmark" " configs.\n" " --mode=" - " run the benchmarks in latency (accumulate times spent in each batch) or " - " throughput (pipeline batches and measure end-to-end) mode\n"); + " run the benchmarks in latency (accumulate times spent in each batch) or " + " throughput (pipeline batches and measure end-to-end) mode\n" + " --threads=min[:max] specify the number threads to use for throughput benchmark." + " Power of 2 values between 'min' and 'max' will be used. If only 'min' is specified," + " then a single test is run with 'min' threads. By default min=1, max=.\n"); } template @@ -385,29 +399,23 @@ void register_build(std::shared_ptr> dataset, template void register_search(std::shared_ptr> dataset, std::vector indices, - Objective metric_objective) + Objective metric_objective, + const std::vector& threads) { for (auto index : indices) { for (std::size_t i = 0; i < index.search_params.size(); i++) { auto suf = static_cast(index.search_params[i]["override_suffix"]); index.search_params[i].erase("override_suffix"); - int max_threads = - metric_objective == Objective::THROUGHPUT ? std::thread::hardware_concurrency() : 1; - auto* b = ::benchmark::RegisterBenchmark( index.name + suf, bench_search, index, i, dataset, metric_objective) ->Unit(benchmark::kMillisecond) - ->ThreadRange(1, max_threads) - + ->ThreadRange(threads[0], threads[1]) /** * The following are important for getting accuracy QPS measurements on both CPU * and GPU These make sure that * - `end_to_end` ~ (`Time` * `Iterations`) * - `items_per_second` ~ (`total_queries` / `end_to_end`) - * - `Time` = `end_to_end` / `Iterations` - * - * - Latency = `Time` * - Throughput = `items_per_second` */ ->MeasureProcessCPUTime() @@ -424,7 +432,8 @@ void dispatch_benchmark(const Configuration& conf, std::string data_prefix, std::string index_prefix, kv_series override_kv, - Objective metric_objective) + Objective metric_objective, + const std::vector& threads) { if (cudart.found()) { for (auto [key, value] : cuda_info()) { @@ -493,7 +502,7 @@ void dispatch_benchmark(const Configuration& conf, index.search_params = apply_overrides(index.search_params, override_kv); index.file = combine_path(index_prefix, index.file); } - register_search(dataset, indices, metric_objective); + register_search(dataset, indices, metric_objective, threads); } } @@ -525,6 +534,8 @@ inline auto run_main(int argc, char** argv) -> int std::string index_prefix = "index"; std::string new_override_kv = ""; std::string mode = "latency"; + std::string threads_arg_txt = ""; + std::vector threads = {1, -1}; // min_thread, max_thread kv_series override_kv{}; char arg0_default[] = "benchmark"; // NOLINT @@ -548,7 +559,18 @@ inline auto run_main(int argc, char** argv) -> int parse_string_flag(argv[i], "--data_prefix", data_prefix) || parse_string_flag(argv[i], "--index_prefix", index_prefix) || parse_string_flag(argv[i], "--mode", mode) || - parse_string_flag(argv[i], "--override_kv", new_override_kv)) { + parse_string_flag(argv[i], "--override_kv", new_override_kv) || + parse_string_flag(argv[i], "--threads", threads_arg_txt)) { + if (!threads_arg_txt.empty()) { + auto threads_arg = split(threads_arg_txt, ':'); + threads[0] = std::stoi(threads_arg[0]); + if (threads_arg.size() > 1) { + threads[1] = std::stoi(threads_arg[1]); + } else { + threads[1] = threads[0]; + } + threads_arg_txt = ""; + } if (!new_override_kv.empty()) { auto kvv = split(new_override_kv, ':'); auto key = kvv[0]; @@ -570,6 +592,17 @@ inline auto run_main(int argc, char** argv) -> int Objective metric_objective = Objective::LATENCY; if (mode == "throughput") { metric_objective = Objective::THROUGHPUT; } + int max_threads = + (metric_objective == Objective::THROUGHPUT) ? std::thread::hardware_concurrency() : 1; + if (threads[1] == -1) threads[1] = max_threads; + + if (metric_objective == Objective::LATENCY) { + if (threads[0] != 1 || threads[1] != 1) { + log_warn("Latency mode enabled. Overriding threads arg, running with single thread."); + threads = {1, 1}; + } + } + if (build_mode == search_mode) { log_error("One and only one of --build and --search should be specified"); printf_usage(); @@ -596,7 +629,8 @@ inline auto run_main(int argc, char** argv) -> int data_prefix, index_prefix, override_kv, - metric_objective); + metric_objective, + threads); } else if (dtype == "uint8") { dispatch_benchmark(conf, force_overwrite, @@ -605,7 +639,8 @@ inline auto run_main(int argc, char** argv) -> int data_prefix, index_prefix, override_kv, - metric_objective); + metric_objective, + threads); } else if (dtype == "int8") { dispatch_benchmark(conf, force_overwrite, @@ -614,7 +649,8 @@ inline auto run_main(int argc, char** argv) -> int data_prefix, index_prefix, override_kv, - metric_objective); + metric_objective, + threads); } else { log_error("datatype '%s' is not supported", dtype.c_str()); return -1; @@ -629,5 +665,4 @@ inline auto run_main(int argc, char** argv) -> int current_algo.reset(); return 0; } - }; // namespace raft::bench::ann diff --git a/cpp/bench/ann/src/common/cuda_stub.hpp b/cpp/bench/ann/src/common/cuda_stub.hpp index b2e3130304..6e3b63cd38 100644 --- a/cpp/bench/ann/src/common/cuda_stub.hpp +++ b/cpp/bench/ann/src/common/cuda_stub.hpp @@ -154,6 +154,8 @@ namespace stub { { return cudaSuccess; } +[[gnu::weak, gnu::noinline]] cudaError_t cudaDeviceSynchronize() { return cudaSuccess; } + [[gnu::weak, gnu::noinline]] cudaError_t cudaStreamSynchronize(cudaStream_t pStream) { return cudaSuccess; @@ -214,6 +216,7 @@ RAFT_DECLARE_CUDART(cudaFree); RAFT_DECLARE_CUDART(cudaStreamCreate); RAFT_DECLARE_CUDART(cudaStreamCreateWithFlags); RAFT_DECLARE_CUDART(cudaStreamDestroy); +RAFT_DECLARE_CUDART(cudaDeviceSynchronize); RAFT_DECLARE_CUDART(cudaStreamSynchronize); RAFT_DECLARE_CUDART(cudaEventCreate); RAFT_DECLARE_CUDART(cudaEventRecord); diff --git a/docs/source/raft_ann_benchmarks.md b/docs/source/raft_ann_benchmarks.md index e0ff0970b2..cf4da30896 100644 --- a/docs/source/raft_ann_benchmarks.md +++ b/docs/source/raft_ann_benchmarks.md @@ -397,22 +397,27 @@ The benchmarks capture several different measurements. The table below describes | index_size | Number of vectors used to train index | -The table below describes each of the measurements for the index search benchmarks: - -| Name | Description | -|------|-------------------------------------------------------------------------------------------------------------------------------------------------------| -| Benchmark | A name that uniquely identifies the benchmark instance | -| Time | The average runtime for each batch. This is approximately `end_to_end` / `Iterations` | -| CPU | The average `wall-time`. In `throughput` mode, this is the average `wall-time` spent in each thread. | -| Iterations | Total number of batches. This is going to be `total_queres` / `n_queries` | -| Recall | Proportion of correct neighbors to ground truth neighbors. Note this column is only present if groundtruth file is specified in dataset configuration | -| items_per_second | Total throughput. This is approximately `total_queries` / `end_to_end`. | -| k | Number of neighbors being queried in each iteration | +The table below describes each of the measurements for the index search benchmarks. The most important measurements `Latency`, `items_per_second`, `end_to_end`. + +| Name | Description | +|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| +| Benchmark | A name that uniquely identifies the benchmark instance | +| Time | The wall-clock time of a single iteration (batch) divided by the number of threads. | +| CPU | The average CPU time (user + sys time). This does not include idle time (which can also happen while waiting for GPU sync). | +| Iterations | Total number of batches. This is going to be `total_queries` / `n_queries`. | +| GPU | GPU latency of a single batch (seconds). In throughput mode this is averaged over multiple threads. | +| Latency | Latency of a single batch (seconds), calculated from wall-clock time. In throughput mode this is averaged over multiple threads. | +| Recall | Proportion of correct neighbors to ground truth neighbors. Note this column is only present if groundtruth file is specified in dataset configuration.| +| items_per_second | Total throughput, a.k.a Queries per second (QPS). This is approximately `total_queries` / `end_to_end`. | +| k | Number of neighbors being queried in each iteration | | end_to_end | Total time taken to run all batches for all iterations | -| n_queries | Total number of query vectors in each batch | -| total_queries | Total number of vectors queries across all iterations | +| n_queries | Total number of query vectors in each batch | +| total_queries | Total number of vectors queries across all iterations ( = `iterations` * `n_queries`) | -Note that the actual table displayed on the screen may differ slightly as the hyper-parameters will also be displayed for each different combination being benchmarked. +Note the following: +- A slightly different method is used to measure `Time` and `end_to_end`. That is why `end_to_end` = `Time` * `Iterations` holds only approximately. +- The actual table displayed on the screen may differ slightly as the hyper-parameters will also be displayed for each different combination being benchmarked. +- Recall calculation: the number of queries processed per test depends on the number of iterations. Because of this, recall can show slight fluctuations if less neighbors are processed then it is available for the benchmark. ## Creating and customizing dataset configurations diff --git a/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py b/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py index 57f03d59ec..ac5d83e4c2 100644 --- a/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py +++ b/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py @@ -82,6 +82,7 @@ def run_build_and_search( search, k, batch_size, + search_threads, mode="throughput", ): for executable, ann_executable_path, algo in executables_to_run.keys(): @@ -128,7 +129,7 @@ def run_build_and_search( "--benchmark_counters_tabular=true", "--override_kv=k:%s" % k, "--override_kv=n_queries:%s" % batch_size, - "--benchmark_min_warmup_time=0.01", + "--benchmark_min_warmup_time=1", "--benchmark_out_format=json", "--mode=%s" % mode, "--benchmark_out=" @@ -136,6 +137,10 @@ def run_build_and_search( ] if force: cmd = cmd + ["--overwrite"] + + if search_threads: + cmd = cmd + ["--threads=%s" % search_threads] + cmd = cmd + [temp_conf_filepath] subprocess.run(cmd, check=True) @@ -243,6 +248,18 @@ def main(): default="latency", ) + parser.add_argument( + "-t", + "--search-threads", + help="specify the number threads to use for throughput benchmark." + " Single value or a pair of min and max separated by ':'. " + "Example --threads=1:4. Power of 2 values between 'min' " + "and 'max' will be used. If only 'min' is specified, then a " + "single test is run with 'min' threads. By default min=1, " + "max=.", + default=None, + ) + args = parser.parse_args() # If both build and search are not provided, @@ -444,6 +461,7 @@ def add_algo_group(group_list): search, k, batch_size, + args.search_threads, mode, )