diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index c8bd28d4bb..bd37b8ac01 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -15,6 +15,7 @@ jobs: - checks - conda-cpp-build - conda-cpp-tests + - conda-cpp-checks - conda-python-build - conda-python-tests - docs-build @@ -43,6 +44,14 @@ jobs: uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-tests.yaml@branch-23.12 with: build_type: pull-request + conda-cpp-checks: + needs: conda-cpp-build + secrets: inherit + uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-post-build-checks.yaml@branch-23.12 + with: + build_type: pull-request + enable_check_symbols: true + symbol_exclusions: (void (thrust::|cub::)|_ZN\d+raft_cutlass) conda-python-build: needs: conda-cpp-build secrets: inherit diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 1c2395cb68..0c46d83cf9 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -14,6 +14,16 @@ on: type: string jobs: + conda-cpp-checks: + secrets: inherit + uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-post-build-checks.yaml@branch-23.12 + with: + build_type: nightly + branch: ${{ inputs.branch }} + date: ${{ inputs.date }} + sha: ${{ inputs.sha }} + enable_check_symbols: true + symbol_exclusions: (void (thrust::|cub::)|_ZN\d+raft_cutlass) conda-cpp-tests: secrets: inherit uses: rapidsai/shared-workflows/.github/workflows/conda-cpp-tests.yaml@branch-23.12 diff --git a/cpp/bench/ann/src/common/benchmark.hpp b/cpp/bench/ann/src/common/benchmark.hpp index 342eebe6e3..1cbd54cb7b 100644 --- a/cpp/bench/ann/src/common/benchmark.hpp +++ b/cpp/bench/ann/src/common/benchmark.hpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -175,7 +176,6 @@ void bench_search(::benchmark::State& state, std::shared_ptr> dataset, Objective metric_objective) { - std::ptrdiff_t batch_offset = 0; std::size_t queries_processed = 0; const auto& sp_json = index.search_params[search_param_ix]; @@ -189,6 +189,20 @@ void bench_search(::benchmark::State& state, // Round down the query data to a multiple of the batch size to loop over full batches of data const std::size_t query_set_size = (dataset->query_set_size() / n_queries) * n_queries; + if (dataset->query_set_size() < n_queries) { + std::stringstream msg; + msg << "Not enough queries in benchmark set. Expected " << n_queries << ", actual " + << dataset->query_set_size(); + return state.SkipWithError(msg.str()); + } + + // Each thread start from a different offset, so that the queries that they process do not + // overlap. + std::ptrdiff_t batch_offset = (state.thread_index() * n_queries) % query_set_size; + std::ptrdiff_t queries_stride = state.threads() * n_queries; + // Output is saved into a contiguous buffer (separate buffers for each thread). + std::ptrdiff_t out_offset = 0; + const T* query_set = nullptr; if (!file_exists(index.file)) { @@ -278,7 +292,6 @@ void bench_search(::benchmark::State& state, { nvtx_case nvtx{state.name()}; - // TODO: Have the odd threads load the queries backwards just to rule out caching. ANN* algo = dynamic_cast*>(current_algo.get()); for (auto _ : state) { [[maybe_unused]] auto ntx_lap = nvtx.lap(); @@ -289,15 +302,16 @@ void bench_search(::benchmark::State& state, algo->search(query_set + batch_offset * dataset->dim(), n_queries, k, - neighbors->data + batch_offset * k, - distances->data + batch_offset * k, + neighbors->data + out_offset * k, + distances->data + out_offset * k, gpu_timer.stream()); } catch (const std::exception& e) { state.SkipWithError(std::string(e.what())); } // advance to the next batch - batch_offset = (batch_offset + n_queries) % query_set_size; + batch_offset = (batch_offset + queries_stride) % query_set_size; + out_offset = (out_offset + n_queries) % query_set_size; queries_processed += n_queries; } @@ -323,31 +337,41 @@ void bench_search(::benchmark::State& state, // last thread to finish processing notifies all if (processed_threads-- == 0) { cond_var.notify_all(); } - // Use the last thread as a sanity check that all the threads are working. - if (state.thread_index() == state.threads() - 1) { - // evaluate recall - if (dataset->max_k() >= k) { - const std::int32_t* gt = dataset->gt_set(); - const std::uint32_t max_k = dataset->max_k(); - buf neighbors_host = neighbors->move(MemoryType::Host); - std::size_t rows = std::min(queries_processed, query_set_size); - std::size_t match_count = 0; - std::size_t total_count = rows * static_cast(k); - for (std::size_t i = 0; i < rows; i++) { - for (std::uint32_t j = 0; j < k; j++) { - auto act_idx = std::int32_t(neighbors_host.data[i * k + j]); - for (std::uint32_t l = 0; l < k; l++) { - auto exp_idx = gt[i * max_k + l]; - if (act_idx == exp_idx) { - match_count++; - break; + // Each thread calculates recall on their partition of queries. + // evaluate recall + if (dataset->max_k() >= k) { + const std::int32_t* gt = dataset->gt_set(); + const std::uint32_t max_k = dataset->max_k(); + buf neighbors_host = neighbors->move(MemoryType::Host); + std::size_t rows = std::min(queries_processed, query_set_size); + std::size_t match_count = 0; + std::size_t total_count = rows * static_cast(k); + + // We go through the groundtruth with same stride as the benchmark loop. + size_t out_offset = 0; + size_t batch_offset = (state.thread_index() * n_queries) % query_set_size; + while (out_offset < rows) { + for (std::size_t i = 0; i < n_queries; i++) { + size_t i_orig_idx = batch_offset + i; + size_t i_out_idx = out_offset + i; + if (i_out_idx < rows) { + for (std::uint32_t j = 0; j < k; j++) { + auto act_idx = std::int32_t(neighbors_host.data[i_out_idx * k + j]); + for (std::uint32_t l = 0; l < k; l++) { + auto exp_idx = gt[i_orig_idx * max_k + l]; + if (act_idx == exp_idx) { + match_count++; + break; + } } } } } - double actual_recall = static_cast(match_count) / static_cast(total_count); - state.counters.insert({{"Recall", actual_recall}}); + out_offset += n_queries; + batch_offset = (batch_offset + queries_stride) % query_set_size; } + double actual_recall = static_cast(match_count) / static_cast(total_count); + state.counters.insert({"Recall", {actual_recall, benchmark::Counter::kAvgThreads}}); } } diff --git a/python/raft-ann-bench/src/raft-ann-bench/data_export/__main__.py b/python/raft-ann-bench/src/raft-ann-bench/data_export/__main__.py index 47da9f39fa..fd6c2077e7 100644 --- a/python/raft-ann-bench/src/raft-ann-bench/data_export/__main__.py +++ b/python/raft-ann-bench/src/raft-ann-bench/data_export/__main__.py @@ -18,6 +18,7 @@ import json import os import sys +import traceback import warnings import pandas as pd @@ -58,74 +59,89 @@ def read_file(dataset, dataset_path, method): def convert_json_to_csv_build(dataset, dataset_path): for file, algo_name, df in read_file(dataset, dataset_path, "build"): - algo_name = algo_name.replace("_base", "") - df["name"] = df["name"].str.split("/").str[0] - write = pd.DataFrame( - { - "algo_name": [algo_name] * len(df), - "index_name": df["name"], - "time": df["real_time"], - } - ) - for name in df: - if name not in skip_build_cols: - write[name] = df[name] - filepath = os.path.normpath(file).split(os.sep) - filename = filepath[-1].split("-")[0] + ".csv" - write.to_csv( - os.path.join(f"{os.sep}".join(filepath[:-1]), filename), - index=False, - ) + try: + algo_name = algo_name.replace("_base", "") + df["name"] = df["name"].str.split("/").str[0] + write = pd.DataFrame( + { + "algo_name": [algo_name] * len(df), + "index_name": df["name"], + "time": df["real_time"], + } + ) + for name in df: + if name not in skip_build_cols: + write[name] = df[name] + filepath = os.path.normpath(file).split(os.sep) + filename = filepath[-1].split("-")[0] + ".csv" + write.to_csv( + os.path.join(f"{os.sep}".join(filepath[:-1]), filename), + index=False, + ) + except Exception as e: + print( + "An error occurred processing file %s (%s). Skipping..." + % (file, e) + ) + traceback.print_exc() def convert_json_to_csv_search(dataset, dataset_path): for file, algo_name, df in read_file(dataset, dataset_path, "search"): - build_file = os.path.join( - dataset_path, dataset, "result", "build", f"{algo_name}.csv" - ) - algo_name = algo_name.replace("_base", "") - df["name"] = df["name"].str.split("/").str[0] - write = pd.DataFrame( - { - "algo_name": [algo_name] * len(df), - "index_name": df["name"], - "recall": df["Recall"], - "qps": df["items_per_second"], - } - ) - for name in df: - if name not in skip_search_cols: - write[name] = df[name] - - if os.path.exists(build_file): - build_df = pd.read_csv(build_file) - write_ncols = len(write.columns) - write["build time"] = None - write["build threads"] = None - write["build cpu_time"] = None - write["build GPU"] = None - - for col_idx in range(5, len(build_df.columns)): - col_name = build_df.columns[col_idx] - write[col_name] = None - - for s_index, search_row in write.iterrows(): - for b_index, build_row in build_df.iterrows(): - if search_row["index_name"] == build_row["index_name"]: - write.iloc[s_index, write_ncols] = build_df.iloc[ - b_index, 2 - ] - write.iloc[s_index, write_ncols + 1 :] = build_df.iloc[ - b_index, 3: - ] - break - else: - warnings.warn( - f"Build CSV not found for {algo_name}, build params won't be " - "appended in the Search CSV" + try: + build_file = os.path.join( + dataset_path, dataset, "result", "build", f"{algo_name}.csv" ) - - write.to_csv(file.replace(".json", ".csv"), index=False) + algo_name = algo_name.replace("_base", "") + df["name"] = df["name"].str.split("/").str[0] + write = pd.DataFrame( + { + "algo_name": [algo_name] * len(df), + "index_name": df["name"], + "recall": df["Recall"], + "qps": df["items_per_second"], + } + ) + for name in df: + if name not in skip_search_cols: + write[name] = df[name] + + if os.path.exists(build_file): + build_df = pd.read_csv(build_file) + write_ncols = len(write.columns) + write["build time"] = None + write["build threads"] = None + write["build cpu_time"] = None + write["build GPU"] = None + + for col_idx in range(5, len(build_df.columns)): + col_name = build_df.columns[col_idx] + write[col_name] = None + + for s_index, search_row in write.iterrows(): + for b_index, build_row in build_df.iterrows(): + if search_row["index_name"] == build_row["index_name"]: + write.iloc[s_index, write_ncols] = build_df.iloc[ + b_index, 2 + ] + write.iloc[ + s_index, write_ncols + 1 : + ] = build_df.iloc[b_index, 3:] + break + else: + warnings.warn( + f"Build CSV not found for {algo_name}, " + f"build params won't be " + "appended in the Search CSV" + ) + + write.to_csv(file.replace(".json", ".csv"), index=False) + except Exception as e: + print( + "An error occurred processing file %s (%s). Skipping..." + % (file, e) + ) + traceback.print_exc() def main(): diff --git a/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py b/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py index c9fde6dd7e..a33467b554 100644 --- a/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py +++ b/python/raft-ann-bench/src/raft-ann-bench/run/__main__.py @@ -132,7 +132,8 @@ def run_build_and_search( except Exception as e: print("Error occurred running benchmark: %s" % e) finally: - os.remove(temp_conf_filename) + if not search: + os.remove(temp_conf_filename) if search: search_folder = os.path.join(legacy_result_folder, "search")