Skip to content

Commit

Permalink
Merge branch 'branch-23.12' of https://github.com/rapidsai/raft into …
Browse files Browse the repository at this point in the history
…faiss-ivf
  • Loading branch information
tarang-jain committed Nov 9, 2023
2 parents 0b88ca4 + 9c38633 commit d67fe8d
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 64 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
- checks
- conda-cpp-build
- conda-cpp-tests
- conda-cpp-checks
- conda-python-build
- conda-python-tests
- docs-build
Expand Down Expand Up @@ -43,6 +44,14 @@ jobs:
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
build_type: pull-request
conda-cpp-checks:
needs: conda-cpp-build
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
build_type: pull-request
enable_check_symbols: true
symbol_exclusions: (void (thrust::|cub::)|_ZN\d+raft_cutlass)
conda-python-build:
needs: conda-cpp-build
secrets: inherit
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ on:
type: string

jobs:
conda-cpp-checks:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
with:
build_type: nightly
branch: ${{ inputs.branch }}
date: ${{ inputs.date }}
sha: ${{ inputs.sha }}
enable_check_symbols: true
symbol_exclusions: (void (thrust::|cub::)|_ZN\d+raft_cutlass)
conda-cpp-tests:
secrets: inherit
uses: rapidsai/shared-workflows/.github/workflows/[email protected]
Expand Down
74 changes: 49 additions & 25 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <memory>
#include <mutex>
#include <numeric>
#include <sstream>
#include <string>
#include <unistd.h>
#include <vector>
Expand Down Expand Up @@ -175,7 +176,6 @@ void bench_search(::benchmark::State& state,
std::shared_ptr<const Dataset<T>> dataset,
Objective metric_objective)
{
std::ptrdiff_t batch_offset = 0;
std::size_t queries_processed = 0;

const auto& sp_json = index.search_params[search_param_ix];
Expand All @@ -189,6 +189,20 @@ void bench_search(::benchmark::State& state,
// Round down the query data to a multiple of the batch size to loop over full batches of data
const std::size_t query_set_size = (dataset->query_set_size() / n_queries) * n_queries;

if (dataset->query_set_size() < n_queries) {
std::stringstream msg;
msg << "Not enough queries in benchmark set. Expected " << n_queries << ", actual "
<< dataset->query_set_size();
return state.SkipWithError(msg.str());
}

// Each thread start from a different offset, so that the queries that they process do not
// overlap.
std::ptrdiff_t batch_offset = (state.thread_index() * n_queries) % query_set_size;
std::ptrdiff_t queries_stride = state.threads() * n_queries;
// Output is saved into a contiguous buffer (separate buffers for each thread).
std::ptrdiff_t out_offset = 0;

const T* query_set = nullptr;

if (!file_exists(index.file)) {
Expand Down Expand Up @@ -278,7 +292,6 @@ void bench_search(::benchmark::State& state,
{
nvtx_case nvtx{state.name()};

// TODO: Have the odd threads load the queries backwards just to rule out caching.
ANN<T>* algo = dynamic_cast<ANN<T>*>(current_algo.get());
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
Expand All @@ -289,15 +302,16 @@ void bench_search(::benchmark::State& state,
algo->search(query_set + batch_offset * dataset->dim(),
n_queries,
k,
neighbors->data + batch_offset * k,
distances->data + batch_offset * k,
neighbors->data + out_offset * k,
distances->data + out_offset * k,
gpu_timer.stream());
} catch (const std::exception& e) {
state.SkipWithError(std::string(e.what()));
}

// advance to the next batch
batch_offset = (batch_offset + n_queries) % query_set_size;
batch_offset = (batch_offset + queries_stride) % query_set_size;
out_offset = (out_offset + n_queries) % query_set_size;

queries_processed += n_queries;
}
Expand All @@ -323,31 +337,41 @@ void bench_search(::benchmark::State& state,
// last thread to finish processing notifies all
if (processed_threads-- == 0) { cond_var.notify_all(); }

// Use the last thread as a sanity check that all the threads are working.
if (state.thread_index() == state.threads() - 1) {
// evaluate recall
if (dataset->max_k() >= k) {
const std::int32_t* gt = dataset->gt_set();
const std::uint32_t max_k = dataset->max_k();
buf<std::size_t> neighbors_host = neighbors->move(MemoryType::Host);
std::size_t rows = std::min(queries_processed, query_set_size);
std::size_t match_count = 0;
std::size_t total_count = rows * static_cast<size_t>(k);
for (std::size_t i = 0; i < rows; i++) {
for (std::uint32_t j = 0; j < k; j++) {
auto act_idx = std::int32_t(neighbors_host.data[i * k + j]);
for (std::uint32_t l = 0; l < k; l++) {
auto exp_idx = gt[i * max_k + l];
if (act_idx == exp_idx) {
match_count++;
break;
// Each thread calculates recall on their partition of queries.
// evaluate recall
if (dataset->max_k() >= k) {
const std::int32_t* gt = dataset->gt_set();
const std::uint32_t max_k = dataset->max_k();
buf<std::size_t> neighbors_host = neighbors->move(MemoryType::Host);
std::size_t rows = std::min(queries_processed, query_set_size);
std::size_t match_count = 0;
std::size_t total_count = rows * static_cast<size_t>(k);

// We go through the groundtruth with same stride as the benchmark loop.
size_t out_offset = 0;
size_t batch_offset = (state.thread_index() * n_queries) % query_set_size;
while (out_offset < rows) {
for (std::size_t i = 0; i < n_queries; i++) {
size_t i_orig_idx = batch_offset + i;
size_t i_out_idx = out_offset + i;
if (i_out_idx < rows) {
for (std::uint32_t j = 0; j < k; j++) {
auto act_idx = std::int32_t(neighbors_host.data[i_out_idx * k + j]);
for (std::uint32_t l = 0; l < k; l++) {
auto exp_idx = gt[i_orig_idx * max_k + l];
if (act_idx == exp_idx) {
match_count++;
break;
}
}
}
}
}
double actual_recall = static_cast<double>(match_count) / static_cast<double>(total_count);
state.counters.insert({{"Recall", actual_recall}});
out_offset += n_queries;
batch_offset = (batch_offset + queries_stride) % query_set_size;
}
double actual_recall = static_cast<double>(match_count) / static_cast<double>(total_count);
state.counters.insert({"Recall", {actual_recall, benchmark::Counter::kAvgThreads}});
}
}

Expand Down
5 changes: 5 additions & 0 deletions docs/source/raft_ann_benchmarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ options:
-f, --force re-run algorithms even if their results already exist (default: False)
-m SEARCH_MODE, --search-mode SEARCH_MODE
run search in 'latency' (measure individual batches) or 'throughput' (pipeline batches and measure end-to-end) mode (default: throughput)
-t SEARCH_THREADS, --search-threads SEARCH_THREADS
specify the number threads to use for throughput benchmark. Single value or a pair of min and max separated by ':'. Example --search-threads=1:4. Power of 2 values between 'min' and 'max' will be used. If only 'min' is
specified, then a single test is run with 'min' threads. By default min=1, max=<num hyper threads>. (default: None)
-r, --dry-run dry-run mode will convert the yaml config for the specified algorithms and datasets to the json format that's consumed by the lower-level c++ binaries and then print the command to run execute the benchmarks but
will not actually execute the command. (default: False)
```
`dataset`: name of the dataset to be searched in [datasets.yaml](#yaml-dataset-config)
Expand Down
134 changes: 108 additions & 26 deletions python/raft-ann-bench/src/raft-ann-bench/data_export/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,35 @@
import argparse
import json
import os
import sys
import traceback
import warnings

import pandas as pd

skip_build_cols = set(
[
"algo_name",
"index_name",
"time",
"name",
"family_index",
"per_family_instance_index",
"run_name",
"run_type",
"repetitions",
"repetition_index",
"iterations",
"real_time",
"time_unit",
"index_size",
]
)

skip_search_cols = (
set(["recall", "qps", "items_per_second", "Recall"]) | skip_build_cols
)


def read_file(dataset, dataset_path, method):
dir = os.path.join(dataset_path, dataset, "result", method)
Expand All @@ -33,36 +59,89 @@ def read_file(dataset, dataset_path, method):

def convert_json_to_csv_build(dataset, dataset_path):
for file, algo_name, df in read_file(dataset, dataset_path, "build"):
algo_name = algo_name.replace("_base", "")
df["name"] = df["name"].str.split("/").str[0]
write = pd.DataFrame(
{
"algo_name": [algo_name] * len(df),
"index_name": df["name"],
"time": df["real_time"],
}
)
filepath = os.path.normpath(file).split(os.sep)
filename = filepath[-1].split("-")[0] + ".csv"
write.to_csv(
os.path.join(f"{os.sep}".join(filepath[:-1]), filename),
index=False,
)
try:
algo_name = algo_name.replace("_base", "")
df["name"] = df["name"].str.split("/").str[0]
write = pd.DataFrame(
{
"algo_name": [algo_name] * len(df),
"index_name": df["name"],
"time": df["real_time"],
}
)
for name in df:
if name not in skip_build_cols:
write[name] = df[name]
filepath = os.path.normpath(file).split(os.sep)
filename = filepath[-1].split("-")[0] + ".csv"
write.to_csv(
os.path.join(f"{os.sep}".join(filepath[:-1]), filename),
index=False,
)
except Exception as e:
print(
"An error occurred processing file %s (%s). Skipping..."
% (file, e)
)
traceback.print_exc()


def convert_json_to_csv_search(dataset, dataset_path):
for file, algo_name, df in read_file(dataset, dataset_path, "search"):
algo_name = algo_name.replace("_base", "")
df["name"] = df["name"].str.split("/").str[0]
write = pd.DataFrame(
{
"algo_name": [algo_name] * len(df),
"index_name": df["name"],
"recall": df["Recall"],
"qps": df["items_per_second"],
}
)
write.to_csv(file.replace(".json", ".csv"), index=False)
try:
build_file = os.path.join(
dataset_path, dataset, "result", "build", f"{algo_name}.csv"
)
algo_name = algo_name.replace("_base", "")
df["name"] = df["name"].str.split("/").str[0]
write = pd.DataFrame(
{
"algo_name": [algo_name] * len(df),
"index_name": df["name"],
"recall": df["Recall"],
"qps": df["items_per_second"],
}
)
for name in df:
if name not in skip_search_cols:
write[name] = df[name]

if os.path.exists(build_file):
build_df = pd.read_csv(build_file)
write_ncols = len(write.columns)
write["build time"] = None
write["build threads"] = None
write["build cpu_time"] = None
write["build GPU"] = None

for col_idx in range(5, len(build_df.columns)):
col_name = build_df.columns[col_idx]
write[col_name] = None

for s_index, search_row in write.iterrows():
for b_index, build_row in build_df.iterrows():
if search_row["index_name"] == build_row["index_name"]:
write.iloc[s_index, write_ncols] = build_df.iloc[
b_index, 2
]
write.iloc[
s_index, write_ncols + 1 :
] = build_df.iloc[b_index, 3:]
break
else:
warnings.warn(
f"Build CSV not found for {algo_name}, "
f"build params won't be "
"appended in the Search CSV"
)

write.to_csv(file.replace(".json", ".csv"), index=False)
except Exception as e:
print(
"An error occurred processing file %s (%s). Skipping..."
% (file, e)
)
traceback.print_exc()


def main():
Expand All @@ -85,6 +164,9 @@ def main():
default=default_dataset_path,
)

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()

convert_json_to_csv_build(args.dataset, args.dataset_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#
import argparse
import os
import sys

import cupy as cp
import numpy as np
Expand Down Expand Up @@ -178,6 +179,9 @@ def main():
" commonly used with RAFT ANN are 'sqeuclidean' and 'inner_product'",
)

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()

if args.rows is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import argparse
import os
import subprocess
import sys
from urllib.request import urlretrieve


Expand Down Expand Up @@ -101,6 +102,10 @@ def main():
help="normalize cosine distance to inner product",
action="store_true",
)

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()

download(args.dataset, args.normalize, args.dataset_path)
Expand Down
4 changes: 4 additions & 0 deletions python/raft-ann-bench/src/raft-ann-bench/plot/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import argparse
import itertools
import os
import sys
from collections import OrderedDict

import matplotlib as mpl
Expand Down Expand Up @@ -486,6 +487,9 @@ def main():
action="store_true",
)

if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()

if args.algorithms:
Expand Down
Loading

0 comments on commit d67fe8d

Please sign in to comment.