Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
achirkin committed Jan 31, 2024
1 parent d4ae271 commit ca9a49e
Show file tree
Hide file tree
Showing 13 changed files with 318 additions and 282 deletions.
35 changes: 26 additions & 9 deletions cpp/bench/ann/src/common/ann_types.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,13 +74,33 @@ struct AlgoProperty {
class AnnBase {
public:
inline AnnBase(Metric metric, int dim) : metric_(metric), dim_(dim) {}
virtual ~AnnBase() = default;
virtual ~AnnBase() noexcept = default;

protected:
Metric metric_;
int dim_;
};

/**
* The GPU-based algorithms, which do not perform CPU synchronization at the end of their build or
* search methods, must implement this interface.
*
* The `cuda_timer` / `cuda_lap` from `util.hpp` uses this stream to record GPU times with events
* and, if necessary, also synchronize (via events) between iterations.
*
* If the algo does not implement this interface, GPU timings are disabled.
*/
class AnnGPU {
public:
/**
* Return the main cuda stream for this algorithm.
* If any work is done in multiple streams, they should synchornize with the main stream at the
* end.
*/
[[nodiscard]] virtual auto get_sync_stream() const noexcept -> cudaStream_t = 0;
virtual ~AnnGPU() noexcept = default;
};

template <typename T>
class ANN : public AnnBase {
public:
Expand All @@ -91,18 +111,15 @@ class ANN : public AnnBase {
};

inline ANN(Metric metric, int dim) : AnnBase(metric, dim) {}
virtual ~ANN() noexcept override = default;

virtual void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) = 0;
virtual void build(const T* dataset, size_t nrow) = 0;

virtual void set_search_param(const AnnSearchParam& param) = 0;
// TODO: this assumes that an algorithm can always return k results.
// This is not always possible.
virtual void search(const T* queries,
int batch_size,
int k,
size_t* neighbors,
float* distances,
cudaStream_t stream = 0) const = 0;
virtual void search(
const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const = 0;

virtual void save(const std::string& file) const = 0;
virtual void load(const std::string& file) = 0;
Expand Down
34 changes: 20 additions & 14 deletions cpp/bench/ann/src/common/benchmark.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ void bench_build(::benchmark::State& state,
Configuration::Index index,
bool force_overwrite)
{
// NB: these two thread-local vars can be used within algo wrappers
raft::bench::ann::benchmark_thread_id = state.thread_index();
raft::bench::ann::benchmark_n_threads = state.threads();
dump_parameters(state, index.build_param);
if (file_exists(index.file)) {
if (force_overwrite) {
Expand All @@ -149,21 +152,23 @@ void bench_build(::benchmark::State& state,
const T* base_set = dataset->base_set(algo_property.dataset_memory_type);
std::size_t index_size = dataset->base_set_size();

cuda_timer gpu_timer;
cuda_timer gpu_timer{algo};
{
nvtx_case nvtx{state.name()};
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();
try {
algo->build(base_set, index_size, gpu_timer.stream());
algo->build(base_set, index_size);
} catch (const std::exception& e) {
state.SkipWithError(std::string(e.what()));
}
}
}
state.counters.insert(
{{"GPU", gpu_timer.total_time() / state.iterations()}, {"index_size", index_size}});
if (gpu_timer.active()) {
state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}});
}
state.counters.insert({{"index_size", index_size}});

if (state.skipped()) { return; }
make_sure_parent_dir_exists(index.file);
Expand All @@ -177,7 +182,10 @@ void bench_search(::benchmark::State& state,
std::shared_ptr<const Dataset<T>> dataset,
Objective metric_objective)
{
std::size_t queries_processed = 0;
// NB: these two thread-local vars can be used within algo wrappers
raft::bench::ann::benchmark_thread_id = state.thread_index();
raft::bench::ann::benchmark_n_threads = state.threads();
std::size_t queries_processed = 0;

const auto& sp_json = index.search_params[search_param_ix];

Expand Down Expand Up @@ -286,24 +294,21 @@ void bench_search(::benchmark::State& state,
std::shared_ptr<buf<std::size_t>> neighbors =
std::make_shared<buf<std::size_t>>(current_algo_props->query_memory_type, k * query_set_size);

cuda_timer gpu_timer;
{
nvtx_case nvtx{state.name()};

auto algo = dynamic_cast<ANN<T>*>(current_algo.get())->copy();
auto algo = dynamic_cast<ANN<T>*>(current_algo.get())->copy();
cuda_timer gpu_timer{algo};
auto start = std::chrono::high_resolution_clock::now();
for (auto _ : state) {
[[maybe_unused]] auto ntx_lap = nvtx.lap();
[[maybe_unused]] auto gpu_lap = gpu_timer.lap();

// run the search
try {
algo->search(query_set + batch_offset * dataset->dim(),
n_queries,
k,
neighbors->data + out_offset * k,
distances->data + out_offset * k,
gpu_timer.stream());
distances->data + out_offset * k);
} catch (const std::exception& e) {
state.SkipWithError(std::string(e.what()));
}
Expand All @@ -318,12 +323,13 @@ void bench_search(::benchmark::State& state,
auto duration = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
if (state.thread_index() == 0) { state.counters.insert({{"end_to_end", duration}}); }
state.counters.insert({"Latency", {duration, benchmark::Counter::kAvgIterations}});

if (gpu_timer.active()) {
state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}});
}
}

state.SetItemsProcessed(queries_processed);
if (cudart.found()) {
state.counters.insert({"GPU", {gpu_timer.total_time(), benchmark::Counter::kAvgIterations}});
}

// This will be the total number of queries across all threads
state.counters.insert({{"total_queries", queries_processed}});
Expand Down
107 changes: 94 additions & 13 deletions cpp/bench/ann/src/common/util.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,15 +29,32 @@
#include <cstdio>
#include <cstring>
#include <ctime>
#include <mutex>
#include <sstream>
#include <string>
#include <vector>

#include <filesystem>
#include <functional>
#include <optional>

namespace raft::bench::ann {

/**
* Current thread id as given by the benchmark State.
* It's populated on every call of a benchmark case.
* It's relevant in the 'throughput' mode of the search benchmarks,
* where some algorithms might want to coordinate allocation of the resources.
*/
inline thread_local int benchmark_thread_id = 0;
/**
* Total concurrent thread count as given by the benchmark State.
* It's populated on every call of a benchmark case.
* It's relevant in the 'throughput' mode of the search benchmarks,
* where some algorithms might want to coordinate allocation of the resources.
*/
inline thread_local int benchmark_n_threads = 1;

template <typename T>
struct buf {
MemoryType memory_type;
Expand Down Expand Up @@ -91,11 +108,19 @@ struct buf {

struct cuda_timer {
private:
cudaStream_t stream_{nullptr};
std::optional<cudaStream_t> stream_;
cudaEvent_t start_{nullptr};
cudaEvent_t stop_{nullptr};
double total_time_{0};

template <typename AnnT>
static inline auto extract_stream(AnnT* algo) -> std::optional<cudaStream_t>
{
auto gpu_ann = dynamic_cast<AnnGPU*>(algo);
if (gpu_ann != nullptr) { return std::make_optional(gpu_ann->get_sync_stream()); }
return std::nullopt;
}

public:
struct cuda_lap {
private:
Expand All @@ -109,7 +134,6 @@ struct cuda_timer {
: start_(start), stop_(stop), stream_(stream), total_time_(total_time)
{
#ifndef BUILD_CPU_ONLY
cudaStreamSynchronize(stream_);
cudaEventRecord(start_, stream_);
#endif
}
Expand All @@ -127,34 +151,91 @@ struct cuda_timer {
}
};

cuda_timer()
explicit cuda_timer(std::optional<cudaStream_t> stream) : stream_{stream}
{
#ifndef BUILD_CPU_ONLY
cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking);
cudaEventCreate(&stop_);
cudaEventCreate(&start_);
if (stream_.has_value()) {
cudaEventCreate(&stop_);
cudaEventCreate(&start_);
}
#endif
}

template <typename AnnT>
explicit cuda_timer(const std::unique_ptr<AnnT>& algo) : cuda_timer{extract_stream(algo.get())}
{
}

~cuda_timer() noexcept
{
#ifndef BUILD_CPU_ONLY
cudaEventDestroy(start_);
cudaEventDestroy(stop_);
cudaStreamDestroy(stream_);
if (stream_.has_value()) {
cudaStreamSynchronize(stream_.value());
cudaEventDestroy(start_);
cudaEventDestroy(stop_);
}
#endif
}

[[nodiscard]] auto stream() const -> cudaStream_t { return stream_; }
cuda_timer() = delete;
cuda_timer(cuda_timer const&) = delete;
cuda_timer(cuda_timer&&) = delete;
auto operator=(cuda_timer const&) -> cuda_timer& = delete;
auto operator=(cuda_timer&&) -> cuda_timer& = delete;

[[nodiscard]] auto stream() const -> std::optional<cudaStream_t> { return stream_; }

[[nodiscard]] auto active() const -> bool { return stream_.has_value(); }

[[nodiscard]] auto total_time() const -> double { return total_time_; }

[[nodiscard]] auto lap() -> cuda_timer::cuda_lap
[[nodiscard]] auto lap(bool enabled = true) -> std::optional<cuda_timer::cuda_lap>
{
return cuda_lap{stream_, start_, stop_, total_time_};
return enabled && stream_.has_value()
? std::make_optional<cuda_timer::cuda_lap>(stream_.value(), start_, stop_, total_time_)
: std::nullopt;
}
};

#ifndef BUILD_CPU_ONLY
// ATM, rmm::stream does not support passing in flags; hence this helper type.
struct non_blocking_stream {
non_blocking_stream() { cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking); }
~non_blocking_stream() noexcept
{
if (stream_ != nullptr) { cudaStreamDestroy(stream_); }
}
non_blocking_stream(non_blocking_stream const&) = delete;
non_blocking_stream(non_blocking_stream&& other) { std::swap(stream_, other.stream_); }
auto operator=(non_blocking_stream const&) -> non_blocking_stream& = delete;
auto operator=(non_blocking_stream&&) -> non_blocking_stream& = delete;
[[nodiscard]] auto view() const noexcept -> cudaStream_t { return stream_; }

private:
cudaStream_t stream_{nullptr};
};
#endif

/**
* Get a stream associated with the current benchmark thread.
*
* Note, the streams are reused between the benchmark cases.
* This makes it easier to profile and analyse multiple benchmark cases in one timeline using tools
* like nsys.
*/
inline auto get_stream_from_global_pool() -> cudaStream_t
{
#ifndef BUILD_CPU_ONLY
static std::vector<non_blocking_stream> pool(1);
static std::mutex m;
std::lock_guard guard(m);
if (int(pool.size()) < benchmark_n_threads) { pool.resize(benchmark_n_threads); }
return pool[benchmark_thread_id].view();
#else
return 0;
#endif
}

inline auto cuda_info()
{
std::vector<std::tuple<std::string, std::string>> props;
Expand Down
22 changes: 7 additions & 15 deletions cpp/bench/ann/src/faiss/faiss_cpu_wrapper.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,7 @@ class FaissCpu : public ANN<T> {
static_assert(std::is_same_v<T, float>, "faiss support only float type");
}

void build(const T* dataset, size_t nrow, cudaStream_t stream = 0) final;
void build(const T* dataset, size_t nrow) final;

void set_search_param(const AnnSearchParam& param) override;

Expand All @@ -88,12 +88,8 @@ class FaissCpu : public ANN<T> {

// TODO: if the number of results is less than k, the remaining elements of 'neighbors'
// will be filled with (size_t)-1
void search(const T* queries,
int batch_size,
int k,
size_t* neighbors,
float* distances,
cudaStream_t stream = 0) const final;
void search(
const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const final;

AlgoProperty get_preference() const override
{
Expand Down Expand Up @@ -123,7 +119,7 @@ class FaissCpu : public ANN<T> {
};

template <typename T>
void FaissCpu<T>::build(const T* dataset, size_t nrow, cudaStream_t stream)
void FaissCpu<T>::build(const T* dataset, size_t nrow)
{
auto index_ivf = dynamic_cast<faiss::IndexIVF*>(index_.get());
if (index_ivf != nullptr) {
Expand Down Expand Up @@ -172,12 +168,8 @@ void FaissCpu<T>::set_search_param(const AnnSearchParam& param)
}

template <typename T>
void FaissCpu<T>::search(const T* queries,
int batch_size,
int k,
size_t* neighbors,
float* distances,
cudaStream_t stream) const
void FaissCpu<T>::search(
const T* queries, int batch_size, int k, size_t* neighbors, float* distances) const
{
static_assert(sizeof(size_t) == sizeof(faiss::idx_t),
"sizes of size_t and faiss::idx_t are different");
Expand Down
Loading

0 comments on commit ca9a49e

Please sign in to comment.