From 9fb21adc82e625deb7cc0f20b68c0f42902246f1 Mon Sep 17 00:00:00 2001 From: "Artem M. Chirkin" <9253178+achirkin@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:38:05 +0100 Subject: [PATCH] Dynamic Batching (#261) Non-blocking / stream-ordered dynamic batching as a new index type. ## API This PR implements dynamic batching as a new index type, mirroring the API of other indices. * [_building is wrapping_] Building the index means creating a lightweight wrapper on top of an existing index and initializing necessary components, such as IO batch buffers and synchronization primitives. * [_type erasure_] The underlying/upstream index type is erased once the dynamic_batching wrapper is created, i.e. there's no way to recover the original search index type or parameters. * [_explicit control over batching_] To allow multiple user requests group into a dynamic batch request, the users must use copies of the same dynamic batching index (the user-facing index type is a thin wrapper on top of a shared pointer, hence the copy is shallow and cheap). The search function is thread-safe. ## Feature: stream-ordered dynamic batching Non-blocking / stream-ordered dynamic batching means the batching does not involve synchronizing with a GPU stream. The control is returned to the user as soon as the necessary work is submitted to the GPU. This entails a few good-to-know features: 1. The dynamic batching index has the same blocking properties as the upstream index: if the upstream index does not involve stream sync during search, that the dynamic batching index does not involve it as well (otherwise, the dynamic batching search obviously waits till the upstream search synchronizes under the hood). 2. It's responsibility of the user to synchronize the stream before getting the results back - even if the upstream index search does not need it (the batch results are scattered back to the request threads in a post-processing kernel). 3. If the upstream index does not synchronize during search, the dynamic batching index can group the queries even in a single-threaded application (_try it with --no-lap-sync option in the ann-bench benchmarks_). Overall, stream-ordered dynamic batching makes it easy to modify existing cuVS indexes, because the wrapped index has the same execution behavior as the upstream index. ## Work-in-progress TODO - [x] Add dynamic batching option to more indices in ann-bench - [x] Add tests - [x] **(postponed to 25.02)** Do proper benchmarking and possibly fine-tune the inter-thread communication - [x] Review the API side (`cpp/include/cuvs/neighbors/dynamic_batching.hpp`) [ready for review CC @cjnolet] - [x] Review the algorithm side (`cpp/src/neighbors/detail/dynamic_batching.cuh`) [ready for preliminary review: requests for algorithm docsting/clarifications are especially welcome] Authors: - Artem M. Chirkin (https://github.com/achirkin) Approvers: - Tamas Bela Feher (https://github.com/tfeher) - Corey J. Nolet (https://github.com/cjnolet) URL: https://github.com/rapidsai/cuvs/pull/261 --- cpp/CMakeLists.txt | 1 + .../src/cuvs/cuvs_ann_bench_param_parser.h | 26 + cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h | 97 +- cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h | 40 +- cpp/include/cuvs/neighbors/cagra.hpp | 4 + .../cuvs/neighbors/dynamic_batching.hpp | 290 ++++ cpp/include/cuvs/neighbors/ivf_flat.hpp | 4 + cpp/include/cuvs/neighbors/ivf_pq.hpp | 3 + cpp/src/neighbors/detail/dynamic_batching.cuh | 1197 +++++++++++++++++ cpp/src/neighbors/dynamic_batching.cu | 91 ++ cpp/test/CMakeLists.txt | 13 + cpp/test/neighbors/dynamic_batching.cuh | 292 ++++ .../neighbors/dynamic_batching/test_cagra.cu | 84 ++ .../dynamic_batching/test_ivf_flat.cu | 44 + .../neighbors/dynamic_batching/test_ivf_pq.cu | 41 + docs/source/cpp_api/neighbors.rst | 1 + .../cpp_api/neighbors_dynamic_batching.rst | 45 + examples/cpp/CMakeLists.txt | 4 + examples/cpp/src/dynamic_batching_example.cu | 282 ++++ 19 files changed, 2539 insertions(+), 20 deletions(-) create mode 100644 cpp/include/cuvs/neighbors/dynamic_batching.hpp create mode 100644 cpp/src/neighbors/detail/dynamic_batching.cuh create mode 100644 cpp/src/neighbors/dynamic_batching.cu create mode 100644 cpp/test/neighbors/dynamic_batching.cuh create mode 100644 cpp/test/neighbors/dynamic_batching/test_cagra.cu create mode 100644 cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu create mode 100644 cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu create mode 100644 docs/source/cpp_api/neighbors_dynamic_batching.rst create mode 100644 examples/cpp/src/dynamic_batching_example.cu diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 34b7cb898..6af423bd5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -397,6 +397,7 @@ if(BUILD_SHARED_LIBS) src/neighbors/iface/iface_pq_uint8_t_int64_t.cu src/neighbors/detail/cagra/cagra_build.cpp src/neighbors/detail/cagra/topk_for_cagra/topk.cu + src/neighbors/dynamic_batching.cu $<$:src/neighbors/hnsw.cpp> src/neighbors/ivf_flat_index.cpp src/neighbors/ivf_flat/ivf_flat_build_extend_float_int64_t.cu diff --git a/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h b/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h index 57d5b1910..7617bfa66 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h +++ b/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h @@ -56,6 +56,26 @@ extern template class cuvs::bench::cuvs_cagra; #include "cuvs_mg_cagra_wrapper.h" #endif +template +void parse_dynamic_batching_params(const nlohmann::json& conf, ParamT& param) +{ + if (!conf.value("dynamic_batching", false)) { return; } + param.dynamic_batching = true; + if (conf.contains("dynamic_batching_max_batch_size")) { + param.dynamic_batching_max_batch_size = conf.at("dynamic_batching_max_batch_size"); + } + param.dynamic_batching_conservative_dispatch = + conf.value("dynamic_batching_conservative_dispatch", false); + if (conf.contains("dynamic_batching_dispatch_timeout_ms")) { + param.dynamic_batching_dispatch_timeout_ms = conf.at("dynamic_batching_dispatch_timeout_ms"); + } + if (conf.contains("dynamic_batching_n_queues")) { + param.dynamic_batching_n_queues = conf.at("dynamic_batching_n_queues"); + } + param.dynamic_batching_k = + uint32_t(uint32_t(conf.at("k")) * float(conf.value("refine_ratio", 1.0f))); +} + #if defined(CUVS_ANN_BENCH_USE_CUVS_IVF_FLAT) || defined(CUVS_ANN_BENCH_USE_CUVS_MG) template void parse_build_param(const nlohmann::json& conf, @@ -138,6 +158,9 @@ void parse_search_param(const nlohmann::json& conf, param.refine_ratio = conf.at("refine_ratio"); if (param.refine_ratio < 1.0f) { throw std::runtime_error("refine_ratio should be >= 1.0"); } } + + // enable dynamic batching + parse_dynamic_batching_params(conf, param); } #endif @@ -291,5 +314,8 @@ void parse_search_param(const nlohmann::json& conf, } // Same ratio as in IVF-PQ param.refine_ratio = conf.value("refine_ratio", 1.0f); + + // enable dynamic batching + parse_dynamic_batching_params(conf, param); } #endif diff --git a/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h b/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h index f6d3d60fc..8c9cb2d4f 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h +++ b/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +64,13 @@ class cuvs_cagra : public algo, public algo_gpu { AllocatorType graph_mem = AllocatorType::kDevice; AllocatorType dataset_mem = AllocatorType::kDevice; [[nodiscard]] auto needs_dataset() const -> bool override { return true; } + /* Dynamic batching */ + bool dynamic_batching = false; + int64_t dynamic_batching_k; + int64_t dynamic_batching_max_batch_size = 4; + double dynamic_batching_dispatch_timeout_ms = 0.01; + size_t dynamic_batching_n_queues = 8; + bool dynamic_batching_conservative_dispatch = false; }; struct build_param { @@ -173,6 +181,12 @@ class cuvs_cagra : public algo, public algo_gpu { std::shared_ptr> dataset_; std::shared_ptr> input_dataset_v_; + std::shared_ptr> dynamic_batcher_; + cuvs::neighbors::dynamic_batching::search_params dynamic_batcher_sp_{}; + int64_t dynamic_batching_max_batch_size_; + size_t dynamic_batching_n_queues_; + bool dynamic_batching_conservative_dispatch_; + inline rmm::device_async_resource_ref get_mr(AllocatorType mem_type) { switch (mem_type) { @@ -216,26 +230,33 @@ inline auto allocator_to_string(AllocatorType mem_type) -> std::string template void cuvs_cagra::set_search_param(const search_param_base& param) { - auto sp = dynamic_cast(param); - search_params_ = sp.p; - refine_ratio_ = sp.refine_ratio; + auto sp = dynamic_cast(param); + bool needs_dynamic_batcher_update = + (dynamic_batching_max_batch_size_ != sp.dynamic_batching_max_batch_size) || + (dynamic_batching_n_queues_ != sp.dynamic_batching_n_queues) || + (dynamic_batching_conservative_dispatch_ != sp.dynamic_batching_conservative_dispatch); + dynamic_batching_max_batch_size_ = sp.dynamic_batching_max_batch_size; + dynamic_batching_n_queues_ = sp.dynamic_batching_n_queues; + dynamic_batching_conservative_dispatch_ = sp.dynamic_batching_conservative_dispatch; + search_params_ = sp.p; + refine_ratio_ = sp.refine_ratio; if (sp.graph_mem != graph_mem_) { // Move graph to correct memory space graph_mem_ = sp.graph_mem; RAFT_LOG_DEBUG("moving graph to new memory space: %s", allocator_to_string(graph_mem_).c_str()); // We create a new graph and copy to it from existing graph - auto mr = get_mr(graph_mem_); - auto new_graph = raft::make_device_mdarray( + auto mr = get_mr(graph_mem_); + *graph_ = raft::make_device_mdarray( handle_, mr, raft::make_extents(index_->graph().extent(0), index_->graph_degree())); - raft::copy(new_graph.data_handle(), + raft::copy(graph_->data_handle(), index_->graph().data_handle(), index_->graph().size(), raft::resource::get_cuda_stream(handle_)); - index_->update_graph(handle_, make_const_mdspan(new_graph.view())); - // update_graph() only stores a view in the index. We need to keep the graph object alive. - *graph_ = std::move(new_graph); + // NB: update_graph() only stores a view in the index. We need to keep the graph object alive. + index_->update_graph(handle_, make_const_mdspan(graph_->view())); + needs_dynamic_batcher_update = true; } if (sp.dataset_mem != dataset_mem_ || need_dataset_update_) { @@ -256,7 +277,26 @@ void cuvs_cagra::set_search_param(const search_param_base& param) dataset_->data_handle(), dataset_->extent(0), this->dim_, dataset_->extent(1)); index_->update_dataset(handle_, dataset_view); - need_dataset_update_ = false; + need_dataset_update_ = false; + needs_dynamic_batcher_update = true; + } + + // dynamic batching + if (sp.dynamic_batching) { + if (!dynamic_batcher_ || needs_dynamic_batcher_update) { + dynamic_batcher_ = std::make_shared>( + handle_, + cuvs::neighbors::dynamic_batching::index_params{{}, + sp.dynamic_batching_k, + sp.dynamic_batching_max_batch_size, + sp.dynamic_batching_n_queues, + sp.dynamic_batching_conservative_dispatch}, + *index_, + search_params_); + } + dynamic_batcher_sp_.dispatch_timeout_ms = sp.dynamic_batching_dispatch_timeout_ms; + } else { + if (dynamic_batcher_) { dynamic_batcher_.reset(); } } } @@ -306,7 +346,7 @@ void cuvs_cagra::load(const std::string& file) template std::unique_ptr> cuvs_cagra::copy() { - return std::make_unique>(*this); // use copy constructor + return std::make_unique>(std::cref(*this)); // use copy constructor } template @@ -330,8 +370,17 @@ void cuvs_cagra::search_base(const T* queries, raft::make_device_matrix_view(neighbors_idx_t, batch_size, k); auto distances_view = raft::make_device_matrix_view(distances, batch_size, k); - cuvs::neighbors::cagra::search( - handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + if (dynamic_batcher_) { + cuvs::neighbors::dynamic_batching::search(handle_, + dynamic_batcher_sp_, + *dynamic_batcher_, + queries_view, + neighbors_view, + distances_view); + } else { + cuvs::neighbors::cagra::search( + handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + } if constexpr (sizeof(IdxT) != sizeof(algo_base::index_type)) { if (raft::get_device_for_address(neighbors) < 0 && @@ -367,11 +416,23 @@ void cuvs_cagra::search( const raft::resources& res = handle_; auto mem_type = raft::get_device_for_address(neighbors) >= 0 ? MemoryType::kDevice : MemoryType::kHostPinned; - auto& tmp_buf = get_tmp_buffer_from_global_pool( - ((disable_refinement ? 0 : (sizeof(float) + sizeof(algo_base::index_type))) + - (kNeedsIoMapping ? sizeof(IdxT) : 0)) * - batch_size * k0); - auto* candidates_ptr = reinterpret_cast(tmp_buf.data(mem_type)); + + // If dynamic batching is used and there's no sync between benchmark laps, multiple sequential + // requests can group together. The data is copied asynchronously, and if the same intermediate + // buffer is used for multiple requests, they can override each other's data. Hence, we need to + // allocate as much space as required by the maximum number of sequential requests. + auto max_dyn_grouping = dynamic_batcher_ ? raft::div_rounding_up_safe( + dynamic_batching_max_batch_size_, batch_size) * + dynamic_batching_n_queues_ + : 1; + auto tmp_buf_size = ((disable_refinement ? 0 : (sizeof(float) + sizeof(algo_base::index_type))) + + (kNeedsIoMapping ? sizeof(IdxT) : 0)) * + batch_size * k0; + auto& tmp_buf = get_tmp_buffer_from_global_pool(tmp_buf_size * max_dyn_grouping); + thread_local static int64_t group_id = 0; + auto* candidates_ptr = reinterpret_cast( + reinterpret_cast(tmp_buf.data(mem_type)) + tmp_buf_size * group_id); + group_id = (group_id + 1) % max_dyn_grouping; auto* candidate_dists_ptr = reinterpret_cast(candidates_ptr + (disable_refinement ? 0 : batch_size * k0)); auto* neighbors_idx_t = diff --git a/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h b/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h index 4c8a91f23..dac766669 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h +++ b/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h @@ -19,7 +19,9 @@ #include "cuvs_ann_bench_utils.h" #include +#include #include + #include #include #include @@ -46,6 +48,13 @@ class cuvs_ivf_pq : public algo, public algo_gpu { cuvs::neighbors::ivf_pq::search_params pq_param; float refine_ratio = 1.0f; [[nodiscard]] auto needs_dataset() const -> bool override { return refine_ratio > 1.0f; } + /* Dynamic batching */ + bool dynamic_batching = false; + int64_t dynamic_batching_k; + int64_t dynamic_batching_max_batch_size = 128; + double dynamic_batching_dispatch_timeout_ms = 0.01; + size_t dynamic_batching_n_queues = 3; + bool dynamic_batching_conservative_dispatch = true; }; using build_param = cuvs::neighbors::ivf_pq::index_params; @@ -98,6 +107,9 @@ class cuvs_ivf_pq : public algo, public algo_gpu { int dimension_; float refine_ratio_ = 1.0; raft::device_matrix_view dataset_; + + std::shared_ptr> dynamic_batcher_; + cuvs::neighbors::dynamic_batching::search_params dynamic_batcher_sp_{}; }; template @@ -138,6 +150,21 @@ void cuvs_ivf_pq::set_search_param(const search_param_base& param) search_params_ = sp.pq_param; refine_ratio_ = sp.refine_ratio; assert(search_params_.n_probes <= index_params_.n_lists); + + if (sp.dynamic_batching) { + dynamic_batcher_ = std::make_shared>( + handle_, + cuvs::neighbors::dynamic_batching::index_params{{}, + sp.dynamic_batching_k, + sp.dynamic_batching_max_batch_size, + sp.dynamic_batching_n_queues, + sp.dynamic_batching_conservative_dispatch}, + *index_, + search_params_); + dynamic_batcher_sp_.dispatch_timeout_ms = sp.dynamic_batching_dispatch_timeout_ms; + } else { + dynamic_batcher_.reset(); + } } template @@ -168,8 +195,17 @@ void cuvs_ivf_pq::search_base( raft::make_device_matrix_view(neighbors_idx_t, batch_size, k); auto distances_view = raft::make_device_matrix_view(distances, batch_size, k); - cuvs::neighbors::ivf_pq::search( - handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + if (dynamic_batcher_) { + cuvs::neighbors::dynamic_batching::search(handle_, + dynamic_batcher_sp_, + *dynamic_batcher_, + queries_view, + neighbors_view, + distances_view); + } else { + cuvs::neighbors::ivf_pq::search( + handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + } if constexpr (sizeof(IdxT) != sizeof(algo_base::index_type)) { raft::linalg::unaryOp(neighbors, diff --git a/cpp/include/cuvs/neighbors/cagra.hpp b/cpp/include/cuvs/neighbors/cagra.hpp index 5ceb3010e..a4684ce26 100644 --- a/cpp/include/cuvs/neighbors/cagra.hpp +++ b/cpp/include/cuvs/neighbors/cagra.hpp @@ -272,6 +272,10 @@ static_assert(std::is_aggregate_v); */ template struct index : cuvs::neighbors::index { + using index_params_type = cagra::index_params; + using search_params_type = cagra::search_params; + using index_type = IdxT; + using value_type = T; static_assert(!raft::is_narrowing_v, "IdxT must be able to represent all values of uint32_t"); diff --git a/cpp/include/cuvs/neighbors/dynamic_batching.hpp b/cpp/include/cuvs/neighbors/dynamic_batching.hpp new file mode 100644 index 000000000..410800357 --- /dev/null +++ b/cpp/include/cuvs/neighbors/dynamic_batching.hpp @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cuvs::neighbors::dynamic_batching { + +namespace detail { +template +class batch_runner; +} + +/** + * @defgroup dynamic_batching_cpp_index_params Dynamic Batching index parameters + * @{ + */ +struct index_params : cuvs::neighbors::index_params { + /** The number of neighbors to search is fixed at construction time. */ + int64_t k; + /** Maximum size of the batch to submit to the upstream index. */ + int64_t max_batch_size = 100; + /** + * The number of independent request queues. + * + * Each queue is associated with a unique CUDA stream and IO device buffers. If the number of + * concurrent requests is high, using multiple queues allows to fill-in data and prepare the batch + * while the other queue is busy. Moreover, the queues are submitted concurrently; this allows to + * better utilize the GPU by hiding the kernel launch latencies, which helps to improve the + * throughput. + */ + size_t n_queues = 3; + /** + * By default (`conservative_dispatch = false`) the first CPU thread to commit a query to a batch + * dispatches the upstream search function as soon as possible (before the batch is full). In that + * case, it does not know the final batch size at the time of calling the upstream search and thus + * runs the upstream search with the maximum batch size every time, even if only one valid query + * is present in the batch. This reduces the latency at the cost of wasted GPU resources. + * + * The alternative behavaior (`conservative_dispatch = true`) is more conservative: the dispatcher + * thread starts the kernel that gathers input queries, but waits till the batch is full or the + * waiting time is exceeded. Only then it acquires the actual batch size and launches the upstream + * search. As a result, less GPU resources are wasted at the cost of exposing upstream search + * latency. + * + * *Rule of Thumb*: + * for a large `max_batch_size` set `conservative_dispatch = true`, otherwise keep it disabled. + */ + bool conservative_dispatch = false; +}; +/** @} */ + +/** + * @defgroup dynamic_batching_cpp_search_params Dynamic Batching search parameters + * @{ + */ +struct search_params : cuvs::neighbors::search_params { + /** + * How long a request can stay in the queue (milliseconds). + * Note, this only affects the dispatch time and does not reflect full request latency; + * the latter depends on the upstream search parameters and the batch size. + */ + double dispatch_timeout_ms = 1.0; +}; +/** @} */ + +/** + * @defgroup dynamic_batching_cpp_index Dynamic Batching index type + * @{ + */ + +/** + * @brief Lightweight dynamic batching index wrapper + * + * @tparam T data type + * @tparam IdxT index type + * + * One lightweight dynamic batching index manages a single index and a single search parameter set. + * This structure should be shared among multiple users via copy semantics: access to the + * underlying implementation is managed via a shared pointer, and concurrent search among the + * participants is thread-safe. + * + * __Usage example__ + * @code{.cpp} + * using namespace cuvs::neighbors; + * // When creating a dynamic batching index, k parameter has to be passed explicitly. + * // The first empty braces default-initialize the parent `neighbors::index_params` (unused). + * dynamic_batching::index_params dynb_index_params{{}, k}; + * // Construct the index by wrapping the upstream index and search parameters. + * dynamic_batching::index index{ + * res, dynb_index_params, upstream_index, upstream_search_params + * }; + * // Use default search parameters + * dynamic_batching::search_params search_params; + * // Search K nearest neighbours + * auto neighbors = raft::make_device_matrix(res, n_queries, k); + * auto distances = raft::make_device_matrix(res, n_queries, k); + * dynamic_batching::search( + * res, search_params, index, queries, neighbors.view(), distances.view() + * ); + * @endcode + * + * + * __Priority queues__ + * + * The dynamic batching index has a limited support for prioritizing individual requests. + * There's only one pool of queues in the batcher and no functionality to prioritize one bach over + * the other. The `search_params::dispatch_timeout_ms` parameters passed in each request are + * aggregated internally and the batch is dispatched no later than any of the timeouts is exceeded. + * In this logic, a high-priority request can never be processed earlier than any lower-priority + * requests submitted earlier. + * + * However, dynamic batching indexes are lightweight and do not contain any global or static state. + * This means it's easy to combine multiple batchers. + * As an example, you can construct one batching index per priority class: + * @code{.cpp} + * using namespace cuvs::neighbors; + * // Large batch size (128), couple queues (2), + * // enabled conservative dispatch - all for better throughput + * dynamic_batching::index_params low_priority_params{{}, k, 128, 2, true}; + * // Small batch size (16), more queues (4), + * // disabled conservative dispatch - to minimize latency with reasonable throughput + * dynamic_batching::index_params high_priority_params{{}, k, 16, 4, false}; + * // Construct the indexes by wrapping the upstream index and search parameters. + * dynamic_batching::index low_priority_index{ + * res, low_priority_params, upstream_index, upstream_search_params + * }; + * dynamic_batching::index high_priority_index{ + * res, high_priority_params, upstream_index, upstream_search_params + * }; + * // Define a combined search function with priority selection + * double high_priority_threshold_ms = 0.1; + * auto search_function = + * [low_priority_index, high_priority_index, high_priority_threshold_ms]( + * raft::resources const &res, + * dynamic_batching::search_params search_params, + * raft::device_matrix_view queries, + * raft::device_matrix_view neighbors, + * raft::device_matrix_view distances) { + * dynamic_batching::search( + * res, + * search_params, + * search_params.dispatch_timeout_ms < high_priority_threshold_ms + * ? high_priority_index : low_priority_index, + * queries, + * neighbors, + * distances + * ); + * }; + * @endcode + */ +template +struct index : cuvs::neighbors::index { + std::shared_ptr> runner; + + /** + * @brief Construct a dynamic batching index by wrapping the upstream index. + * + * @tparam Upstream the upstream index type + * + * @param[in] res raft resources + * @param[in] params dynamic batching parameters + * @param[in] upstream_index the original index to perform the search + * (the reference must be alive for the lifetime of the dynamic batching index) + * @param[in] upstream_params the original index search parameters for all queries in a batch + * (the parameters are captured by value for the lifetime of the dynamic batching index) + * @param[in] sample_filter + * filtering function, if any, must be the same for all requests in a batch + * (the pointer must be alive for the lifetime of the dynamic batching index) + */ + template + index(const raft::resources& res, + const cuvs::neighbors::dynamic_batching::index_params& params, + const Upstream& upstream_index, + const typename Upstream::search_params_type& upstream_params, + const cuvs::neighbors::filtering::base_filter* sample_filter = nullptr); +}; +/** @} */ + +/** + * + * @defgroup dynamic_batching_cpp_search Dynamic Batching search + * + * @{ + */ + +/** + * @brief Search ANN using a dynamic batching index. + * + * The search parameters of the upstream index and the optional filtering function are configured at + * the dynamic batching index construction time. + * + * Like with many other indexes, the dynamic batching search has the stream-ordered semantics: the + * host function may return the control before the results are ready. Synchronize with the main CUDA + * stream in the given resource object to wait for arrival of the search results. + * + * Dynamic batching search is thread-safe: call the search function with copies of the same index in + * multiple threads to increase the occupancy of the batches. + * + * @param[in] res + * @param[in] params query-specific batching parameters, such as the maximum waiting time + * @param[in] index a dynamic batching index + * @param[in] queries a device matrix view to a row-major matrix + * [n_queries, dim] + * @param[out] neighbors a device matrix view to the indices of the neighbors in the source dataset + * [n_queries, k] + * @param[out] distances a device matrix view to the distances to the selected neighbors + * [n_queries, k] + * + */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @} */ + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/include/cuvs/neighbors/ivf_flat.hpp b/cpp/include/cuvs/neighbors/ivf_flat.hpp index 7f852d635..e017946d9 100644 --- a/cpp/include/cuvs/neighbors/ivf_flat.hpp +++ b/cpp/include/cuvs/neighbors/ivf_flat.hpp @@ -138,6 +138,10 @@ using list_data = ivf::list; */ template struct index : cuvs::neighbors::index { + using index_params_type = ivf_flat::index_params; + using search_params_type = ivf_flat::search_params; + using index_type = IdxT; + using value_type = T; static_assert(!raft::is_narrowing_v, "IdxT must be able to represent all values of uint32_t"); diff --git a/cpp/include/cuvs/neighbors/ivf_pq.hpp b/cpp/include/cuvs/neighbors/ivf_pq.hpp index ae543c9e9..d85753b7f 100644 --- a/cpp/include/cuvs/neighbors/ivf_pq.hpp +++ b/cpp/include/cuvs/neighbors/ivf_pq.hpp @@ -319,6 +319,9 @@ using list_data = ivf::list; */ template struct index : cuvs::neighbors::index { + using index_params_type = ivf_pq::index_params; + using search_params_type = ivf_pq::search_params; + using index_type = IdxT; static_assert(!raft::is_narrowing_v, "IdxT must be able to represent all values of uint32_t"); diff --git a/cpp/src/neighbors/detail/dynamic_batching.cuh b/cpp/src/neighbors/detail/dynamic_batching.cuh new file mode 100644 index 000000000..5c6b1654e --- /dev/null +++ b/cpp/src/neighbors/detail/dynamic_batching.cuh @@ -0,0 +1,1197 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../sample_filter.cuh" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifndef CUVS_SYSTEM_LITTLE_ENDIAN +#if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ +#define CUVS_SYSTEM_LITTLE_ENDIAN 0 +#else +#define CUVS_SYSTEM_LITTLE_ENDIAN 1 +#endif +#endif + +namespace cuvs::neighbors::dynamic_batching::detail { + +using raft::RAFT_NAME; // TODO: a workaround for RAFT_LOG_XXX macros + +/** + * A helper to make the requester threads more cooperative when busy-spinning. + * It is used in the wait loops across this file to reduce the CPU usage. + * + * Ideally, we should be using atomics notify/wait feature, but that is not always possible + * (e.g. waiting on multiple things or waiting on GPU volatile stores). + */ +struct local_waiter { + static constexpr inline int64_t kNonSleepIterations = 10; + + explicit local_waiter(std::chrono::nanoseconds base_sleep_time, + int64_t start_iteration = 0) noexcept + : base_sleep_time_{base_sleep_time}, iteration_{start_iteration} + { + } + + inline void wait() noexcept + { + if (iteration_ < 2) { + // Don't wait for the first few iterations: + // maybe there's a weak CAS op in the loop, or something else that could return quickly + } else if (iteration_ < kNonSleepIterations) { + std::this_thread::yield(); + } else { + auto k = iteration_ + 1 - kNonSleepIterations; + std::this_thread::sleep_for(base_sleep_time_ * k); + } + ++iteration_; + } + + inline void reset(int64_t start_iteration = 0) noexcept { iteration_ = start_iteration; } + + private: + std::chrono::nanoseconds base_sleep_time_; + int64_t iteration_; +}; + +class cuda_event { + public: + cuda_event(cuda_event&&) = default; + cuda_event& operator=(cuda_event&&) = default; + ~cuda_event() = default; + cuda_event(cuda_event const&) = delete; // Copying disallowed: one event one owner + cuda_event& operator=(cuda_event&) = delete; + + cuda_event() + : event_{[]() { + cudaEvent_t* e = new cudaEvent_t; + RAFT_CUDA_TRY(cudaEventCreateWithFlags(e, cudaEventDisableTiming)); + return e; + }(), + [](cudaEvent_t* e) { + RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(*e)); + delete e; + }} + { + } + + cudaEvent_t value() const { return *event_; } + + private: + std::unique_ptr> event_; +}; + +template +struct get_accessor_type_t { + using type = typename MdSpanOrArray::accessor_type; +}; + +template +struct get_accessor_type_t> { + using mdarray_type = raft::mdarray; + using view_type = typename mdarray_type::view_type; + using type = typename view_type::accessor_type; +}; + +template +using get_accessor_type = typename get_accessor_type_t::type; + +template +constexpr inline auto slice_3d(typename Source3DT::index_type i, + const Source3DT& source3d, + typename Source3DT::index_type n_rows = 0) +{ + using element_type = typename Source3DT::element_type; + using index_type = typename Source3DT::index_type; + using layout_type = typename Source3DT::layout_type; + using accessor_type = get_accessor_type; + auto extent2d = + raft::make_extents(n_rows == 0 ? source3d.extent(1) : n_rows, source3d.extent(2)); + auto stride = uint64_t(source3d.extent(1)) * uint64_t(source3d.extent(2)); + return raft::mdspan{ + const_cast(source3d.data_handle()) + stride * i, extent2d}; +} + +template +constexpr inline auto slice_2d(typename Source2DT::index_type i, const Source2DT& source2d) +{ + using element_type = typename Source2DT::element_type; + using index_type = typename Source2DT::index_type; + using layout_type = typename Source2DT::layout_type; + using accessor_type = get_accessor_type; + auto extent1d = raft::make_extents(source2d.extent(1)); + auto stride = uint64_t(extent1d.extent(0)); + return raft::mdspan{ + const_cast(source2d.data_handle()) + stride * i, extent1d}; +} + +// --------------------------------------------- + +constexpr size_t kCacheLineBytes = 64; + +template +using upstream_search_type_const = void(raft::resources const&, + typename Upstream::search_params_type const&, + Upstream const&, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view, + const cuvs::neighbors::filtering::base_filter&); + +template +using upstream_search_type = void(raft::resources const&, + typename Upstream::search_params_type const&, + Upstream&, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view, + const cuvs::neighbors::filtering::base_filter&); + +template +using function_search_type = void(raft::resources const&, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view); + +/** + * State of the batch token slot. + * + * In a nutshell, there are only two batch slot states that matter: empty or full. + * Initially, all slots are empty. The host threads can commit (i.e. subscribe) to a batch slot even + * if it's empty (when they know it will be filled-in at some point in future). With this logic, we + * smooth out the bottleneck that occurs when many threads try to submit their work using a single + * atomic counter (the batch queue head). + * + * Once a GPU IO buffer is available, its owner returns the buffer to the queue by marking a slot as + * full. By that time, it may be partially or fully committed (i.e. several host threads are + * committed to submit a certain number of queries). + * + * If we had an infinite buffer, these two states would suffice. However, we have a finite ring + * buffer, so the used-up slots must be emptied again, so that they are usable in the following + * rounds through the ring buffer. + * + * The slot state depends not only on the value stored in it, but on the accessing thread as well + * (see `batch_queue_t::batch_status` below). The accessing thread may be ahead or behind the others + * (as defined by the sequential order id below). Depending on the accessor state, it may view the + * slot as being emptied/filled in the future, current, or previous rounds. This affects the + * decision whether the slot can be used and whether the thread has the right to advance tail or + * head counters of the batch queue. + * + */ +enum struct slot_state : int32_t { + /** The slot is empty, cleared-up in this round (hence the head should be past it). */ + kEmptyPast = 1025, + /** The slot is empty, cleared-up in previous round. */ + kEmpty = 1024, + /** The slot is empty, cleared-up two round ago and cannot be used yet (due to be filled). */ + kEmptyBusy = 1023, + /** The current thread has been sleeping for too long and is way behind the others. */ + kFullPast = 1, + /** The slot is full, filled-in in this round. */ + kFull = 0, + /** This state is considered full, filled-in in previous round. */ + kFullBusy = -1 + /** The rest of the values are impossible states indicating an error in the algo. */ +}; + +/** + * Identifies the batch and its job-commit state. + * Should be in the pinned memory for fast shared access on CPU and GPU side. + * + * The batch token packs the IO buffer address (id) and a number of committed queries in a single + * 64-bit atomic. This is to allow conflict-free atomic updates of both values. + * + */ +struct batch_token { + uint64_t value = 0; + + constexpr inline batch_token() {} + explicit constexpr inline batch_token(uint32_t buffer_id) { id() = buffer_id; } + + /** + * Sequential id of the batch in the array of batches. + * + * The `id` field, in practice, stores not only the IO buffer address, but also an extra + * sequential "round" id. The latter identifies how many rounds through the batch ring buffer has + * already been done (computed from the the `seq_order_id` counter in the batch queue) and is used + * by `batch_queue_t::batch_status` below to compute the `slot_state`. This is to avoid the ABA + * atomic updates problem when using the ring buffer. + * + * There cannot be more IO buffers than the size of the ring buffer. The size of the ring buffer + * is always a power-of-two. Hence the IO buffer address needs only `log2(Size)` bits, and the + * rest is used for the ring buffer round id (see `batch_queue_t::make_seq_batch_id`). + * + */ + RAFT_INLINE_FUNCTION auto id() noexcept -> uint32_t& + { + return *(reinterpret_cast(&value) + kOffsetOfId); + } + /** + * How many queries are promised by the participating CPU threads (requesters). + * + * The CPU threads atomically increment this counter until its size reaches `max_batch_size`. + * + * Any (CPU or GPU thread) may atomically write to the highest byte of this value, which indicates + * that no one can commit to this batch anymore (e.g. the wait timeout is exceeded). + * Hence, the actual number of committed queries is `size_committed % 0x00ffffff`. + * + * The gather kernel cannot finish while `size_committed < max_batch_size`. + * + * NB: we use the trick of writing to the highest byte to allow GPU write atomically to the pinned + * host memory. This way, we don't need to use device RMW atomics on host memory, which are not + * available on a broad class of GPUs. If not this workaround, we could simply do atomic add/or + * with value 0x01000000. + */ + RAFT_INLINE_FUNCTION auto size_committed() noexcept -> uint32_t& + { + return *(reinterpret_cast(&value) + kOffsetOfSC); + } + + private: + /** Offset of the `id()` value in the token if it's interpreted as uint32_t[2]. */ + static constexpr inline uint32_t kOffsetOfId = CUVS_SYSTEM_LITTLE_ENDIAN; + /** Offset of the `size_committed()` value in the token if it's interpreted as uint32_t[2]. */ + static constexpr inline uint32_t kOffsetOfSC = 1 - kOffsetOfId; +}; +static_assert(sizeof(batch_token) == sizeof(uint64_t)); +static_assert(cuda::std::atomic::is_always_lock_free); + +/** + * The batch queue consists of several ring buffers and two counters determining where are the head + * and the tail of the queue in those buffers. + * + * There is an internal sequentially consistent order in the queue, defined by `seq_order_id` + * counter. The head and tail members define where the participants should look for full and + * empty slots in the queue respectively. + * + * The slots in the queue have their own states (see `slot_state` above). The states are updated + * concurrently in many threads, so the head and tail counters do not always accurately represent + * the actual compound state of the queue. + * + * `.head()` is where a host thread starts looking for a batch token. All slots earlier than + * returned by this method are not usable anymore (they batches are either "fully committed", + * dispatched, or emptied earlier). If a host thread determines that the current slot is not usable + * anymore, it increments the counter by calling `.pop()`. + * + * The tail is where a host thread reserves an empty slot to be filled-in by a GPU worker thread + * once it releases the owned IO buffer. There's no `.tail()` method, but `.push()` method returns + * the tail position (before advancing it). `.push()` blocks the host thread until it knows the slot + * isn't used by any other threads anymore (i.e. cleaned-up from the previous round). + * + * There's no strict relation between the head and the tail. + * Normally there is a single batch in the ring buffer being partially filled. It is followed by + * contiguous list of empty idle batches and reserved empty slots. The head and the tail loosely + * correspond to the beginning and the end of this sequence. + * + * Sometimes, the head can go further than the tail. This means all batches are busy and there are + * more threads committed to the slots that are not populated with the batches (and not even + * reserved for filling-in yet). + * + * + */ +template +struct batch_queue_t { + static constexpr uint32_t kSize = Size; + static constexpr uint32_t kMinElemSize = sizeof(uint32_t); + static_assert(cuda::std::atomic::is_always_lock_free, + "The value type must be lock-free."); + static_assert(cuda::std::atomic::is_always_lock_free, + "The value type must be lock-free."); + static_assert(cuda::std::atomic::is_always_lock_free, + "The value type must be lock-free."); + static_assert(raft::is_a_power_of_two(kSize), "The size must be a power-of-two for efficiency."); + + static constexpr auto kMemOrder = cuda::std::memory_order_relaxed; + + /** Type-safe synonym for the internal head & tail counters. */ + struct seq_order_id { + uint32_t value; + }; + + explicit batch_queue_t(const raft::resources& res, bool use_batch_sizes) noexcept + : tokens_{raft::make_pinned_vector, + uint32_t>(res, kSize)}, + rem_time_us_{ + raft::make_pinned_vector, uint32_t>( + res, kSize)}, + dispatch_sequence_id_(kSize), + batch_sizes_{ + use_batch_sizes + ? std::make_optional( + raft::make_pinned_vector, uint32_t>( + res, kSize)) + : std::nullopt} + { + tail_.store(0, kMemOrder); + head_.store(0, kMemOrder); + auto past_seq_id = seq_order_id{static_cast(-1)}; + for (uint32_t i = 0; i < kSize; i++) { + rem_time_us_(i).store(std::numeric_limits::max(), kMemOrder); + if (batch_sizes_.has_value()) { batch_sizes_.value()(i).store(0, kMemOrder); } + dispatch_sequence_id_[i].store(past_seq_id.value, kMemOrder); + tokens_(i).store(make_empty_token(past_seq_id), kMemOrder); + } + } + + /** + * Advance the tail position, ensure the slot is empty, and return the reference to the new slot. + * The calling side is responsible for filling-in the slot with an actual value at a later time. + * + * Conceptually, this method reserves a ring buffer slot on the host side, so that the GPU worker + * thread can return the IO buffer (filling the token slot) asynchronously. + */ + inline auto push() -> seq_order_id + { + seq_order_id seq_id{tail_.fetch_add(1, kMemOrder)}; + auto& loc = token(seq_id); + auto ss = batch_status(loc.load(kMemOrder), seq_id); + /* [Note: very small waiting time] + + Only a few (dispatcher) threads are going to call this function at the same time as opposed to + potentially any number of threads waiting on new batches to arrive. + This is a performance-critical code path. + + Hence the small base sleep time. + */ + local_waiter till_empty{std::chrono::nanoseconds{1000}}; + while (ss == slot_state::kFull || ss == slot_state::kFullBusy || ss == slot_state::kEmptyBusy) { + // Wait till the slot becomes empty (doesn't matter future or past). + // The batch id is only ever updated in the scatter/gather kernels, which are the only source + // of truth whether a batch buffer is currently used by the GPU. + till_empty.wait(); + ss = batch_status(loc.load(kMemOrder), seq_id); + } + return seq_id; + } + + /** + * Return the offset of the given w.r.t. the tail of the queue. + * Negative value means the given slot is in the body of the queue and should be dispatched soon. + * Positive value means the given slot is ahead of the queue and should wait longer. + * + * That is the lower the value the higher the priority. + */ + [[nodiscard]] inline auto niceness(seq_order_id id) const noexcept -> int32_t + { + return static_cast(id.value - tail_.load(kMemOrder)); + } + + /** Get the reference to the first element in the queue. */ + inline auto head() noexcept -> seq_order_id + { + auto h = head_.load(kMemOrder); + // The head cannot go ahead of the tail by more than the queue buffer size. + // If the head is ahead by not more than kSize elements though, everything is fine; + // the slots too far ahead are protected by busy tokens. + local_waiter for_tail(std::chrono::nanoseconds{100000}); + while (static_cast(h - tail_.load(kMemOrder)) >= static_cast(kSize)) { + for_tail.wait(); + h = head_.load(kMemOrder); + } + return seq_order_id{h}; + } + + /** Batch commit state and IO buffer id (see `batch_token`) */ + inline auto token(seq_order_id id) -> cuda::atomic& + { + return tokens_(cache_friendly_idx(id.value)); + } + + /** + * How much time has this batch left for waiting. + * It is an approximate value by design - to minimize the synchronization between CPU and GPU. + * + * The clocks on GPU and CPU may have different values, so the running kernel and the CPU thread + * have different ideas on how much time is left. Rather than trying to synchronize the clocks, we + * maintain independent timers and accept the uncertainty. + * + * Access pattern: CPU write-only (producer); GPU read-only (consumer). + */ + inline auto rem_time_us(seq_order_id id) -> cuda::atomic& + { + return rem_time_us_(cache_friendly_idx(id.value)); + } + + /** + * The actual batch size - the final number of committed queries. + * This is only used if `conservative_dispatch = true`. + */ + inline auto batch_size(seq_order_id id) noexcept + -> cuda::atomic* + { + if (batch_sizes_.has_value()) { return &batch_sizes_.value()(cache_friendly_idx(id.value)); } + return nullptr; + } + + /** + * This value is updated by the host thread after it submits the job completion event to indicate + * to other threads can wait on the event to get the results back. + * Other threads get the value from the batch queue and compare that value against this atomic. + * + * Access pattern: CPU-only; dispatching thread writes the id once, other threads wait on it. + */ + inline auto dispatch_sequence_id(seq_order_id id) -> cuda::std::atomic& + { + return dispatch_sequence_id_[cache_friendly_idx(id.value)]; + } + + /** + * An `atomicMax` on the queue head in disguise. + * This makes the given batch slot and all prior slots unreachable (not possible to commit). + */ + inline void pop(seq_order_id id) noexcept + { + const auto desired = id.value + 1; + auto observed = id.value; + while (observed < desired && + !head_.compare_exchange_weak(observed, desired, kMemOrder, kMemOrder)) {} + } + + static constexpr inline auto batch_id(batch_token token) noexcept -> uint32_t + { + return token.id() & kCounterLocMask; + } + + /** + * Construct a token that is interpreted as having been emptied in the current round + * (the round is derived from seq_id). + * + * NB: "round" is the number of times the queue counters went over the whole ring buffer. + * It's used to avoid the ABA problem for atomic token updates. + */ + static constexpr inline auto make_empty_token(seq_order_id seq_id) noexcept -> batch_token + { + // Modify the seq_id to identify that the token slot is empty + auto empty_round = static_cast(slot_state::kEmptyPast) * kSize; + auto empty_round_id = seq_order_id{seq_id.value + empty_round}; + // Id of empty slot is ignored and can be anything + auto empty_id = kCounterLocMask; + return batch_token{make_seq_batch_id(empty_round_id, empty_id)}; + } + + /** + * Construct a sequential batch id by combining the current round and the real batch id. + * + * The "round" part gives a hint when the token slot was filled-in to avoid the ABA problem + * (see above). + */ + static constexpr inline auto make_seq_batch_id(seq_order_id seq_id, uint32_t batch_id) noexcept + -> uint32_t + { + return seq_round(seq_id) | batch_id; + } + + /** + * Get the state of the batch slot w.r.t. the given seq_order_id counter. + * This gives the information whether the slot is emptied/filled by another thread and whether + * that thread is ahead or behind the current thread. + * By introducing these future/past flavours of states we solve the ABA problem for atomic updates + * of the ring buffer slots. + */ + static inline auto batch_status(batch_token token, seq_order_id seq_id) -> slot_state + { + /* + The "round" part of the id is just a seq_id without the low bits. + Essentially, we comparing here seq_ids of two threads: the one that wrote to the slot in the + past and the one reads from it now. + + `kSize` determines the number of bits we use for the IO buffer id and for the round id. + */ + auto v = + static_cast(seq_round(token) - seq_round(seq_id)) / static_cast(kSize); + if (v < static_cast(slot_state::kFullBusy)) { RAFT_FAIL("Invalid batch state %d", v); } + if (v < static_cast(slot_state::kEmptyBusy)) { + return static_cast(std::min(v, static_cast(slot_state::kFullPast))); + } + return static_cast(std::min(v, static_cast(slot_state::kEmptyPast))); + } + + private: + alignas(kCacheLineBytes) cuda::std::atomic tail_{}; + alignas(kCacheLineBytes) cuda::std::atomic head_{}; + + alignas(kCacheLineBytes) + raft::pinned_vector, uint32_t> tokens_; + raft::pinned_vector, uint32_t> rem_time_us_; + std::vector> dispatch_sequence_id_; + std::optional, uint32_t>> + batch_sizes_; + + /* [Note: cache-friendly indexing] + To avoid false sharing, the queue pushes and pops values not sequentially, but with an + increment that is larger than the cache line size. + Hence we introduce the `kCounterIncrement > kCacheLineBytes`. + However, to make sure all indices are used, we choose the increment to be coprime with the + buffer size. We also require that the buffer size is a power-of-two for two reasons: + 1) Fast modulus operation - reduces to binary `and` (with `kCounterLocMask`). + 2) Easy to ensure GCD(kCounterIncrement, kSize) == 1 by construction + (see the definition below). + */ + static constexpr uint32_t kElemsPerCacheLine = + raft::div_rounding_up_safe(kCacheLineBytes, kMinElemSize); + static constexpr uint32_t kCounterIncrement = raft::bound_by_power_of_two(kElemsPerCacheLine) + 1; + static constexpr uint32_t kCounterLocMask = kSize - 1; + // These props hold by design, but we add them here as a documentation and a sanity check. + static_assert( + kCounterIncrement * kMinElemSize >= kCacheLineBytes, + "The counter increment should be larger than the cache line size to avoid false sharing."); + static_assert( + std::gcd(kCounterIncrement, kSize) == 1, + "The counter increment and the size must be coprime to allow using all of the queue slots."); + /** Map the sequential index onto cache-friendly strided index. */ + static constexpr inline auto cache_friendly_idx(uint32_t source_idx) noexcept -> uint32_t + { + return (source_idx * kCounterIncrement) & kCounterLocMask; + } + + /** The "round": the number of times the queue counter went over the whole ring buffer. */ + static constexpr inline auto seq_round(seq_order_id id) noexcept -> uint32_t + { + return id.value & ~kCounterLocMask; + } + + /** The "round": the number of times the queue counter went over the whole ring buffer. */ + static constexpr inline auto seq_round(batch_token token) noexcept -> uint32_t + { + return token.id() & ~kCounterLocMask; + } +}; + +template +struct alignas(kCacheLineBytes) request_pointers { + /** + * A pointer to `dim` values of a single query (input). + * + * Serves as a synchronization point between the CPU thread (producer) and a GPU block in the + * `gather_inputs` kernel (consumer). + */ + cuda::atomic query{nullptr}; + /** A pointer to `k` nearest neighbors (output) */ + IdxT* neighbors{nullptr}; + /** A pointer to distances of `k` nearest neighbors (output) */ + float* distances{nullptr}; +}; + +/** + * Check the current timestamp at the moment of construction and repeatedly compare the elapsed time + * to the timeout value provided by the host (passed via an atomic). + * + * This is used in the gather inputs kernel to make it stop waiting for new queries in a batch + * once the deadline is reached. + */ +struct gpu_time_keeper { + /** + * @param[in] cpu_provided_remaining_time_us + * a pointer to a shared atomic, represent the remaining waiting time in microseconds. + * Note, the remaining time is updated atomically by each participating host thread in their + * "private coordinate systems". That's ok, we don't expect a single reference time for all host + * and device threads. + * We tolerate the errors coming from the time difference between the host thread writing their + * remaining waiting time and the GPU thread reading that value. + */ + RAFT_DEVICE_INLINE_FUNCTION explicit gpu_time_keeper( + cuda::atomic* cpu_provided_remaining_time_us) + : cpu_provided_remaining_time_us_{cpu_provided_remaining_time_us} + { + update_timestamp(); + } + + /** + * Check whether the deadline is not reached yet: + * 1) Compare the internal clock against the last-read deadline value + * 2) Read the deadline value from the host-visible atomic and check the internal clock again. + */ + RAFT_DEVICE_INLINE_FUNCTION auto has_time() noexcept -> bool + { + if (timeout) { return false; } + update_local_remaining_time(); + if (local_remaining_time_us_ <= 0) { + timeout = true; + return false; + } + update_cpu_provided_remaining_time(); + if (local_remaining_time_us_ <= 0) { + timeout = true; + return false; + } + return true; + } + + private: + cuda::atomic* cpu_provided_remaining_time_us_; + uint64_t timestamp_ns_ = 0; + int32_t local_remaining_time_us_ = std::numeric_limits::max(); + bool timeout = false; + + RAFT_DEVICE_INLINE_FUNCTION void update_timestamp() noexcept + { + asm volatile("mov.u64 %0, %%globaltimer;" : "=l"(timestamp_ns_)); + } + + RAFT_DEVICE_INLINE_FUNCTION void update_local_remaining_time() noexcept + { + auto prev_timestamp = timestamp_ns_; + update_timestamp(); + // subtract the time passed since the last check + // (assuming local time is updated every time timestamp is read) + local_remaining_time_us_ -= static_cast((timestamp_ns_ - prev_timestamp) / 1000ull); + } + + RAFT_DEVICE_INLINE_FUNCTION void update_cpu_provided_remaining_time() noexcept + { + local_remaining_time_us_ = + std::min(local_remaining_time_us_, + cpu_provided_remaining_time_us_->load(cuda::std::memory_order_relaxed)); + } +}; + +/** + * Copy the queries from the submitted pointers to the batch store, one query per block. + * Upon completion of this kernel, the submitted queries are all in the contiguous buffer + * `batch_queries`. + * + * Block size: (n, 1, 1) any number of threads copying a single row of data. + * Grid size: (max_batch_size, 1, 1) - one block per query + * + * Note, we view the incoming queries and the batch as going through multiple stages: + * 1) A host thread "commits" a query: it reserves a slot for the query in the batch and promises + * to fill-in the corresponding query pointer. + * 2) A host thread "submits" the query: it fills-in the pointer to the query data in the reserved + * slot. + * 3) This kernel copies the query data to the contiguous query buffer owned by the batch. + * + * The batch is "fully committed" when the number of committed queries reaches the maximum batch + * size (all slots are reserved). Committing, submitting, and copying of the queries is somewhat + * overlapped among multiple host and device threads. Only the copying happens in a CUDA stream in + * this kernel, and the upstream search is dispatched right after this kernel (in the same stream). + * + */ +template +RAFT_KERNEL gather_inputs( + raft::device_matrix_view batch_queries, + raft::pinned_vector_view, uint32_t> request_ptrs, + /* The remaining time may be updated on the host side: a thread with a tighter deadline may reduce + it (but not increase). */ + cuda::atomic* remaining_time_us, + /* The token contains the current number of queries committed and is cleared in this kernel. */ + cuda::atomic* batch_token_ptr, + /* The host-visible batch size counter (used in `conservative_dispatch`). */ + cuda::atomic* batch_size_out, + /** + * The token value considered empty depends on the round over the ring buffer + * (which is defined by the seq_order_id) + */ + batch_token empty_token_value, + /** + * The counter is used to find the last CTA to finish and to share the batch size with the + * scatter_inputs kernel. + */ + cuda::atomic* kernel_progress_counter) +{ + const uint32_t query_id = blockIdx.x; + __shared__ const T* query_ptr; + + if (threadIdx.x == 0) { + query_ptr = nullptr; + + // NB: we have to read/write to `batch_token_ptr`, `bs_committed`, and `batch_fully_committed` + // using volatile assembly ops, because otherwise the compiler seems to fail to understand that + // this is the same location in memory. The order of reads in writes here is extremely + // important, as it involves multiple host and device threads (the host threads do RMW atomic + // increments on the commit counter). + volatile uint32_t* bs_committed = + reinterpret_cast(batch_token_ptr) + 1 - CUVS_SYSTEM_LITTLE_ENDIAN; + volatile uint8_t* batch_fully_committed = + reinterpret_cast(bs_committed) + (CUVS_SYSTEM_LITTLE_ENDIAN * 3); + + gpu_time_keeper runtime{remaining_time_us}; + bool committed = false; // if the query is committed, we have to wait for it to arrive + auto& request_query_ptr = request_ptrs(query_id).query; + while (true) { + query_ptr = request_query_ptr.load(cuda::std::memory_order_acquire); + if (query_ptr != nullptr) { + // The query is submitted to this block's slot; erase the pointer buffer for future use and + // exit the loop. + request_query_ptr.store(nullptr, cuda::std::memory_order_relaxed); + break; + } + // The query hasn't been submitted, but is already committed; other checks may be skipped + if (committed) { continue; } + // Check if the query is committed + uint32_t committed_count; + asm volatile("ld.volatile.global.u32 %0, [%1];" + : "=r"(committed_count) + : "l"(bs_committed) + : "memory"); + committed = (committed_count & 0x00ffffff) > query_id; + if (committed) { continue; } + // If the query is not committed, but the batch is past the deadline, we exit without copying + // the query + if (committed_count > 0x00ffffff) { break; } + // The query hasn't been submitted yet; check if we're past the deadline + if (runtime.has_time()) { continue; } + // Otherwise, let the others know time is out + // Set the highest byte of the commit counter to 1 (thus avoiding RMW atomic) + // This prevents any more CPU threads from committing to this batch. + asm volatile("st.volatile.global.u8 [%0], %1;" + : + : "l"(batch_fully_committed), "r"(1) + : "memory"); + asm volatile("ld.volatile.global.u32 %0, [%1];" + : "=r"(committed_count) + : "l"(bs_committed) + : "memory"); + committed = (committed_count & 0x00ffffff) > query_id; + if (committed) { continue; } + break; + } + auto progress = kernel_progress_counter->fetch_add(1, cuda::std::memory_order_acq_rel) + 1; + if (progress >= gridDim.x) { + // read the last value of the committed count to know the batch size for sure + uint32_t committed_count; + asm volatile("ld.volatile.global.u32 %0, [%1];" + : "=r"(committed_count) + : "l"(bs_committed) + : "memory"); + committed_count &= 0x00ffffff; // Clear the timeout bit + if (batch_size_out != nullptr) { + // Inform the dispatcher about the final batch size if `conservative_dispatch` is enabled + batch_size_out->store(committed_count, cuda::std::memory_order_relaxed); + } + // store the batch size in the progress counter, so we can read it in the scatter kernel + kernel_progress_counter->store(committed_count, cuda::std::memory_order_relaxed); + // Clear the batch token slot, so it can be re-used by others + asm volatile("st.volatile.global.u64 [%0], %1;" + : + : "l"(reinterpret_cast(batch_token_ptr)), + "l"(reinterpret_cast(empty_token_value)) + : "memory"); + } + } + // The block waits till the leading thread gets the query pointer + cooperative_groups::this_thread_block().sync(); + auto query_ptr_local = query_ptr; + if (query_ptr_local == nullptr) { return; } + // block-wide copy input query + auto dim = batch_queries.extent(1); + for (uint32_t i = threadIdx.x; i < dim; i += blockDim.x) { + batch_queries(query_id, i) = query_ptr_local[i]; + } +} + +/** Copy the results of the search back to the requesters. */ +template +RAFT_KERNEL scatter_outputs( + raft::pinned_vector_view, uint32_t> request_ptrs, + raft::device_matrix_view batch_neighbors, + raft::device_matrix_view batch_distances, + cuda::atomic* kernel_progress_counter, + cuda::atomic* next_token, + uint32_t batch_id) +{ + __shared__ uint32_t batch_size; + if (threadIdx.x == 0 && threadIdx.y == 0) { + batch_size = kernel_progress_counter->exchange(0, cuda::std::memory_order_relaxed); + } + // Copy output + cooperative_groups::this_thread_block().sync(); + auto k = batch_neighbors.extent(1); + for (uint32_t i = threadIdx.y; i < batch_size; i += blockDim.y) { + auto* request_neighbors = request_ptrs(i).neighbors; + auto* request_distances = request_ptrs(i).distances; + for (uint32_t j = threadIdx.x; j < k; j += blockDim.x) { + request_neighbors[j] = batch_neighbors(i, j); + request_distances[j] = batch_distances(i, j); + } + } + // Clear the batch state after all threads copied the data, so the batch can be reused + cuda::atomic_thread_fence(cuda::std::memory_order_release, cuda::thread_scope_system); + cooperative_groups::this_thread_block().sync(); + if (threadIdx.x != 0 || threadIdx.y != 0) { return; } + reinterpret_cast*>( + &reinterpret_cast(next_token)->id()) + ->store(batch_id, cuda::std::memory_order_relaxed); +} + +/** + * Batch runner is shared among the users of the `dynamic_batching::index` (i.e. the index can be + * copied, but the copies hold shared pointers to a single batch runner). + * + * Constructor and destructor of this class do not need to be thread-safe, as their execution is + * guaranteed to happen in one thread by the holding shared pointer. + * + * The search function must be thread-safe. We only have to pay attention to the `mutable` members + * though, because the function is marked const. + */ +template +class batch_runner { + public: + constexpr static uint32_t kMaxNumQueues = 256; + + using batch_queue = batch_queue_t; + using seq_order_id = typename batch_queue::seq_order_id; + + // Save the parameters and the upstream batched search function to invoke + template + batch_runner(const raft::resources& res, + const dynamic_batching::index_params& params, + const Upstream& upstream_index, + const typename Upstream::search_params_type& upstream_params, + upstream_search_type_const* upstream_search, + const cuvs::neighbors::filtering::base_filter* sample_filter) + : res_{res}, + upstream_search_{[&upstream_index, upstream_search, upstream_params, sample_filter]( + raft::resources const& res, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + /* Note: passing sample_filter by pointer + + Ideally, dynamic batching would capture the filter by value. Unfortunately, one cannot use + the copy constructor of the `base_filter` (it would erase the actual filter type). + Therefore, we can only pass the filter by pointer or reference and require the user to keep + the filter alive for the lifetime of the dynamic batching index. + This, however, may lead to a segfault when the user doesn't provide the filter argument and + the argument is passed by reference: the lifetime of the none_sample_filter default argument + is limited to the search function call, so it is destroyed while the dynamic batching index + is still alive. + Hence the solution is to pass the filter by pointer and default it to nullptr. + */ + if (sample_filter == nullptr) { + using base_filter_type = cuvs::neighbors::filtering::base_filter; + const auto none_filter = cuvs::neighbors::filtering::none_sample_filter{}; + return upstream_search(res, + upstream_params, + upstream_index, + queries, + neighbors, + distances, + static_cast(none_filter)); + + } else { + return upstream_search( + res, upstream_params, upstream_index, queries, neighbors, distances, *sample_filter); + } + }}, + k_{uint32_t(params.k)}, + dim_{uint32_t(upstream_index.dim())}, + max_batch_size_{uint32_t(params.max_batch_size)}, + n_queues_{uint32_t(params.n_queues)}, + batch_queue_{res_, params.conservative_dispatch}, + completion_events_(n_queues_), + input_extents_{n_queues_, max_batch_size_, dim_}, + output_extents_{n_queues_, max_batch_size_, k_}, + queries_{raft::make_device_mdarray(res_, input_extents_)}, + neighbors_{raft::make_device_mdarray(res_, output_extents_)}, + distances_{raft::make_device_mdarray(res_, output_extents_)}, + kernel_progress_counters_{ + raft::make_device_vector>( + res_, n_queues_)}, + request_ptrs_{raft::make_pinned_matrix, uint32_t>( + res_, n_queues_, max_batch_size_)} + { + RAFT_CUDA_TRY(cudaMemsetAsync( + kernel_progress_counters_.data_handle(), + 0, + sizeof(*kernel_progress_counters_.data_handle()) * kernel_progress_counters_.size(), + raft::resource::get_cuda_stream(res_))); + // Make sure to initialize the atomic values in the batch_state structs. + for (uint32_t i = 0; i < n_queues_; i++) { + auto seq_id = batch_queue_.push(); + batch_queue_.token(seq_id).store(batch_token{batch_queue::make_seq_batch_id(seq_id, i)}); + // Make sure to initialize query pointers, because they are used for synchronization + for (uint32_t j = 0; j < max_batch_size_; j++) { + new (&request_ptrs_(i, j)) request_pointers{}; + } + } + } + + // A workaround for algos, which have non-const `index` type in their arguments + template + batch_runner(const raft::resources& res, + const dynamic_batching::index_params& params, + const Upstream& upstream_index, + const typename Upstream::search_params_type& upstream_params, + upstream_search_type* upstream_search, + const cuvs::neighbors::filtering::base_filter* sample_filter) + : batch_runner{ + res, + params, + upstream_index, + upstream_params, + reinterpret_cast*>(upstream_search), + sample_filter} + { + } + + void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) const + { + uint32_t n_queries = queries.extent(0); + if (n_queries >= max_batch_size_) { + return upstream_search_(res, queries, neighbors, distances); + } + + if (neighbors.extent(1) != int64_t(k_)) { + // TODO: the check can be relaxed to `neighbors.extent(1) > int64_t(k_)`; + // this, however, would require an extra bounds check per-query in the scatter kernel. + RAFT_LOG_WARN( + "The requested number of neighbors (%zd) doesn't match the configured " + "dynamic_batching::index_params::k (%u); dynamic batching is disabled for the request.", + neighbors.extent(1), + k_); + return upstream_search_(res, queries, neighbors, distances); + } + + auto deadline = std::chrono::system_clock::now() + + std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1000000.0)); + + int64_t local_io_offset = 0; + batch_token batch_token_observed{0}; + local_waiter to_commit{std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 3e5)), + local_waiter::kNonSleepIterations}; + while (true) { + const auto seq_id = batch_queue_.head(); + const auto commit_result = try_commit(seq_id, n_queries); + // The bool (busy or not) returned if no queries were committed: + if (std::holds_alternative(commit_result)) { + // Pause if the system is busy + // (otherwise the progress is guaranteed due to update of the head counter) + if (std::get(commit_result)) { to_commit.wait(); } + continue; // Try to get a new batch token + } + batch_token_observed = std::get(std::get<0>(commit_result)); + const auto queries_committed = std::get(std::get<0>(commit_result)); + const auto batch_offset = batch_token_observed.size_committed(); + auto& batch_token_ref = batch_queue_.token(seq_id); + auto& rem_time_us_ref = batch_queue_.rem_time_us(seq_id); + auto& dispatch_sequence_id_ref = batch_queue_.dispatch_sequence_id(seq_id); + auto* batch_size_ptr = batch_queue_.batch_size(seq_id); + // sleep for 1/10 of deadline time or more + // (if couldn't get the value in the first few iterations). + local_waiter till_full{std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1e5)), + batch_queue_.niceness(seq_id)}; + while (batch_queue::batch_status(batch_token_observed, seq_id) != slot_state::kFull) { + /* Note: waiting for batch IO buffers + The CPU threads can commit to the incoming batches in the queue in advance (this happens in + try_commit). + In this loop, a thread waits for the batch IO buffer to be released by a running search on + the GPU side (scatter_outputs kernel). Hence, this loop is engaged only if all buffers are + currently used, which suggests that the GPU is busy (or there's not enough IO buffers). + This also means the current search is not likely to meet the deadline set by the user. + + The scatter kernel returns its buffer id into an acquired slot in the batch queue; in this + loop we wait for that id to arrive. + + Generally, we want to waste as little as possible CPU cycles here to let other threads wait + on dispatch_sequence_id_ref below more efficiently. At the same time, we shouldn't use + `.wait()` here, because `.notify_all()` would have to come from GPU. + */ + till_full.wait(); + batch_token_observed = batch_token_ref.load(cuda::std::memory_order_acquire); + } + // Whether this thread is responsible for dispatching the batch. + bool is_dispatcher = batch_offset == 0; + auto stream = raft::resource::get_cuda_stream(res); + auto batch_id = batch_queue::batch_id(batch_token_observed); + auto request_ptrs = slice_2d(batch_id, request_ptrs_); + + if (is_dispatcher) { + // Conservatively initialize the remaining time + // TODO (achirkin): this initialization may happen after the other requesters update the + // time and thus erase their deadlines. + rem_time_us_ref.store(static_cast(params.dispatch_timeout_ms * 1000), + cuda::std::memory_order_relaxed); + // run the gather kernel before submitting the data to reduce the latency + gather_inputs<<>>( + slice_3d(batch_id, queries_), + request_ptrs, + &rem_time_us_ref, + &batch_token_ref, + batch_size_ptr, + // This indicates the empty token slot, which can only be used in the following round + batch_queue::make_empty_token(seq_id), + kernel_progress_counters_.data_handle() + batch_id); + } + + // *** Set the pointers to queries, neighbors, distances - query-by-query + for (uint32_t i = 0; i < queries_committed; i++) { + const auto o = local_io_offset + i; + auto& ptrs = request_ptrs(batch_offset + i); + ptrs.neighbors = neighbors.data_handle() + o * k_; + ptrs.distances = distances.data_handle() + o * k_; + ptrs.query.store(queries.data_handle() + o * dim_, cuda::std::memory_order_release); + } + + // Submit estimated remaining time + { + auto rem_time_us = static_cast( + std::max(0, (deadline - std::chrono::system_clock::now()).count()) / 1000); + rem_time_us_ref.fetch_min(rem_time_us, cuda::std::memory_order_relaxed); + } + + if (is_dispatcher) { + uint32_t batch_size = max_batch_size_; + if (batch_size_ptr != nullptr) { + // Block until the real batch size is available if conservative dispatch is used. + local_waiter for_dispatch{ + std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1e5))}; + batch_size = batch_size_ptr->load(cuda::std::memory_order_relaxed); + while (batch_size == 0) { + for_dispatch.wait(); + batch_size = batch_size_ptr->load(cuda::std::memory_order_relaxed); + } + batch_size_ptr->store(0, cuda::std::memory_order_relaxed); + } + auto batch_neighbors = slice_3d(batch_id, neighbors_, batch_size); + auto batch_distances = slice_3d(batch_id, distances_, batch_size); + upstream_search_( + res, slice_3d(batch_id, queries_, batch_size), batch_neighbors, batch_distances); + auto next_seq_id = batch_queue_.push(); + auto& next_token_ref = batch_queue_.token(next_seq_id); + // next_batch_token); + auto bs = dim3(128, 8, 1); + scatter_outputs + <<<1, bs, 0, stream>>>(request_ptrs, + batch_neighbors, + batch_distances, + kernel_progress_counters_.data_handle() + batch_id, + &next_token_ref, + batch_queue::make_seq_batch_id(next_seq_id, batch_id)); + RAFT_CUDA_TRY(cudaEventRecord(completion_events_[batch_id].value(), stream)); + dispatch_sequence_id_ref.store(seq_id.value, cuda::std::memory_order_release); + dispatch_sequence_id_ref.notify_all(); + + } else { + // Wait till the dispatch_sequence_id counter is updated, which means the event is recorded + auto dispatched_id_observed = + dispatch_sequence_id_ref.load(cuda::std::memory_order_acquire); + while (static_cast(seq_id.value - dispatched_id_observed) > 0) { + dispatch_sequence_id_ref.wait(dispatched_id_observed, cuda::std::memory_order_relaxed); + dispatched_id_observed = dispatch_sequence_id_ref.load(cuda::std::memory_order_acquire); + } + // Now we can safely record the event + RAFT_CUDA_TRY(cudaStreamWaitEvent(stream, completion_events_[batch_id].value())); + } + + n_queries -= queries_committed; + + if (n_queries == 0) { return; } + // If not all queries were committed, continue in the loop. + // TODO: it could potentially be more efficient to first commit everything and only then + // submit the work/wait for the event + local_io_offset += queries_committed; + to_commit.reset( + local_waiter::kNonSleepIterations); // reset the waiter for the next iteration. + } + } + + private: + raft::resources res_; // Sic! Store by value to copy the resource. + std::function> upstream_search_; + uint32_t k_; + uint32_t dim_; + uint32_t max_batch_size_; + uint32_t n_queues_; + + mutable batch_queue batch_queue_; + std::vector completion_events_; + + using batch_extents = raft::extent_3d; + batch_extents input_extents_; + batch_extents output_extents_; + + mutable raft::device_mdarray queries_; + mutable raft::device_mdarray neighbors_; + mutable raft::device_mdarray distances_; + mutable raft::device_vector> + kernel_progress_counters_; + + mutable raft::pinned_matrix, uint32_t, raft::row_major> request_ptrs_; + + /** + * Try to commit n_queries at most; returns the last observed batch_token (where `size_committed` + * represents offset at which new queries are committed if successful), the number of committed + * queries, or whether the ring buffer appears to be busy (on unsuccessful commit). + */ + auto try_commit(seq_order_id seq_id, uint32_t n_queries) const + -> std::variant, bool> + { + auto& batch_token_ref = batch_queue_.token(seq_id); + batch_token batch_token_observed = batch_token_ref.load(cuda::std::memory_order_relaxed); + batch_token batch_token_updated; + slot_state token_status; + do { + // The interpretation of the token status depends on the current seq_order_id and a similar + // counter in the token. This is to prevent conflicts when too many parallel requests wrap + // over the whole ring buffer (batch_queue_t). + token_status = batch_queue::batch_status(batch_token_observed, seq_id); + // Busy status means the current thread is a whole ring buffer ahead of the token. + // The thread should wait for the rest of the system. + if (token_status == slot_state::kFullBusy || token_status == slot_state::kEmptyBusy) { + return true; + } + // This branch checks if the token was recently filled or dispatched. + // This means the head counter of the ring buffer is slightly outdated. + if (token_status == slot_state::kEmptyPast || token_status == slot_state::kFullPast || + batch_token_observed.size_committed() >= max_batch_size_) { + batch_queue_.pop(seq_id); + return false; + } + batch_token_updated = batch_token_observed; + batch_token_updated.size_committed() = + std::min(batch_token_observed.size_committed() + n_queries, max_batch_size_); + } while (!batch_token_ref.compare_exchange_weak(batch_token_observed, + batch_token_updated, + cuda::std::memory_order_acq_rel, + cuda::std::memory_order_relaxed)); + if (batch_token_updated.size_committed() >= max_batch_size_) { + // The batch is already full, let's try to pop it from the queue + // (if nobody has done so already) + batch_queue_.pop(seq_id); + } + return std::make_tuple( + batch_token_observed, + batch_token_updated.size_committed() - batch_token_observed.size_committed()); + } +}; + +} // namespace cuvs::neighbors::dynamic_batching::detail diff --git a/cpp/src/neighbors/dynamic_batching.cu b/cpp/src/neighbors/dynamic_batching.cu new file mode 100644 index 000000000..6be70353b --- /dev/null +++ b/cpp/src/neighbors/dynamic_batching.cu @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "detail/dynamic_batching.cuh" + +#include +#include +#include + +#include +#include + +namespace cuvs::neighbors::dynamic_batching { + +// NB: the (template) index parameter should be the last; it may contain the spaces and so split +// into multiple preprocessor token. Then it is consumed as __VA_ARGS__ +// +#define CUVS_INST_DYNAMIC_BATCHING_INDEX(T, IdxT, Namespace, ...) \ + template <> \ + template <> \ + index::index( \ + const raft::resources& res, \ + const cuvs::neighbors::dynamic_batching::index_params& params, \ + const Namespace ::__VA_ARGS__& upstream_index, \ + const typename Namespace ::__VA_ARGS__::search_params_type& upstream_params, \ + const cuvs::neighbors::filtering::base_filter* sample_filter) \ + : runner{new detail::batch_runner( \ + res, params, upstream_index, upstream_params, Namespace ::search, sample_filter)} \ + { \ + } + +#define CUVS_INST_DYNAMIC_BATCHING_SEARCH(T, IdxT) \ + void search(raft::resources const& res, \ + cuvs::neighbors::dynamic_batching::search_params const& params, \ + cuvs::neighbors::dynamic_batching::index const& index, \ + raft::device_matrix_view queries, \ + raft::device_matrix_view neighbors, \ + raft::device_matrix_view distances) \ + { \ + return index.runner->search(res, params, queries, neighbors, distances); \ + } + +CUVS_INST_DYNAMIC_BATCHING_INDEX(float, uint32_t, cuvs::neighbors::cagra, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(half, uint32_t, cuvs::neighbors::cagra, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(int8_t, uint32_t, cuvs::neighbors::cagra, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(uint8_t, + uint32_t, + cuvs::neighbors::cagra, + index); + +CUVS_INST_DYNAMIC_BATCHING_INDEX(float, int64_t, cuvs::neighbors::ivf_pq, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(half, int64_t, cuvs::neighbors::ivf_pq, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(int8_t, int64_t, cuvs::neighbors::ivf_pq, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(uint8_t, int64_t, cuvs::neighbors::ivf_pq, index); + +CUVS_INST_DYNAMIC_BATCHING_INDEX(float, int64_t, cuvs::neighbors::ivf_flat, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(int8_t, + int64_t, + cuvs::neighbors::ivf_flat, + index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(uint8_t, + int64_t, + cuvs::neighbors::ivf_flat, + index); + +CUVS_INST_DYNAMIC_BATCHING_SEARCH(float, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(half, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(int8_t, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(uint8_t, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(float, uint32_t); // uint32_t index type is needed for CAGRA +CUVS_INST_DYNAMIC_BATCHING_SEARCH(half, uint32_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(int8_t, uint32_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(uint8_t, uint32_t); + +#undef CUVS_INST_DYNAMIC_BATCHING_INDEX +#undef CUVS_INST_DYNAMIC_BATCHING_SEARCH + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 286d721d7..1c8de2ad0 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -175,6 +175,19 @@ if(BUILD_TESTS) 100 ) + ConfigureTest( + NAME + NEIGHBORS_DYNAMIC_BATCHING_TEST + PATH + neighbors/dynamic_batching/test_cagra.cu + neighbors/dynamic_batching/test_ivf_flat.cu + neighbors/dynamic_batching/test_ivf_pq.cu + GPUS + 1 + PERCENT + 100 + ) + if(BUILD_CAGRA_HNSWLIB) ConfigureTest(NAME NEIGHBORS_HNSW_TEST PATH neighbors/hnsw.cu GPUS 1 PERCENT 100) target_link_libraries(NEIGHBORS_HNSW_TEST PRIVATE hnswlib::hnswlib) diff --git a/cpp/test/neighbors/dynamic_batching.cuh b/cpp/test/neighbors/dynamic_batching.cuh new file mode 100644 index 000000000..b64c5b01e --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching.cuh @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "ann_utils.cuh" + +#include + +#include + +#include +#include +#include + +#include + +#include +#include +#include + +namespace cuvs::neighbors::dynamic_batching { + +struct dynamic_batching_spec { + int64_t n_queries = 1000; + int64_t n_rows = 100000; + int64_t dim = 128; + int64_t k = 10; + int64_t max_batch_size = 64; + size_t n_queues = 3; + bool conservative_dispatch = false; + cuvs::distance::DistanceType metric = cuvs::distance::DistanceType::L2Expanded; + int64_t max_concurrent_threads = 128; +}; + +inline ::std::ostream& operator<<(::std::ostream& os, const dynamic_batching_spec& p) +{ + os << "{n_queries=" << p.n_queries; + os << ", dataset shape=" << p.n_rows << "x" << p.dim; + os << ", metric=" << print_metric{p.metric}; + os << ", k=" << p.k; + os << ", max_batch_size=" << p.max_batch_size; + os << ", n_queues=" << p.n_queues; + os << ", conservative_dispatch=" << p.conservative_dispatch; + os << '}' << std::endl; + return os; +} + +template +using build_function = UpstreamT(const raft::resources&, + const typename UpstreamT::index_params_type&, + raft::device_matrix_view); + +template +using search_function = void(const raft::resources&, + const typename UpstreamT::search_params_type& params, + const UpstreamT& index, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view, + const cuvs::neighbors::filtering::base_filter&); + +template UpstreamBuildF, + search_function UpstreamSearchF> +struct dynamic_batching_test : public ::testing::TestWithParam { + using distance_type = float; + using data_type = DataT; + using index_type = IdxT; + using upstream_type = UpstreamT; + + dynamic_batching_spec ps = ::testing::TestWithParam::GetParam(); + raft::resources res; + + // input data + std::optional> dataset = std::nullopt; + std::optional> queries = std::nullopt; + std::optional> neighbors_upsm = std::nullopt; + std::optional> neighbors_dynb = std::nullopt; + std::optional> distances_upsm = std::nullopt; + std::optional> distances_dynb = std::nullopt; + + // build parameters + cuvs::neighbors::index_params build_params_base{ps.metric}; + typename upstream_type::index_params_type build_params_upsm{build_params_base}; + dynamic_batching::index_params build_params_dynb{ + build_params_base, ps.k, ps.max_batch_size, ps.n_queues, ps.conservative_dispatch}; + + // search parameters + typename upstream_type::search_params_type search_params_upsm{}; + dynamic_batching::search_params search_params_dynb{}; + + // indexes + std::optional index_upsm = std::nullopt; + std::optional> index_dynb = std::nullopt; + + void build_all() + { + index_dynb.reset(); + index_upsm.reset(); + index_upsm = UpstreamBuildF(res, build_params_upsm, dataset->view()); + index_dynb.emplace(res, build_params_dynb, index_upsm.value(), search_params_upsm); + } + + void search_all() + { + // Search using upstream index - all queries at once + UpstreamSearchF(res, + search_params_upsm, + index_upsm.value(), + queries->view(), + neighbors_upsm->view(), + distances_upsm->view(), + filtering::none_sample_filter{}); + raft::resource::sync_stream(res); + + // Search with dynamic batching + // Streaming scenario: prepare concurrent resources + rmm::cuda_stream_pool worker_streams(ps.max_concurrent_threads); + std::vector> futures(ps.max_concurrent_threads); + std::vector resource_pool(0); + for (int64_t i = 0; i < ps.max_concurrent_threads; i++) { + resource_pool.push_back(res); // copies the resource + raft::resource::set_cuda_stream(resource_pool[i], worker_streams.get_stream(i)); + } + + // Try multiple batch sizes in a round-robin to improve test coverage + std::vector minibatch_sizes{1, 3, 7, 10}; + auto get_bs = [&minibatch_sizes](auto i) { + return minibatch_sizes[i % minibatch_sizes.size()]; + }; + int64_t i = 0; + for (int64_t offset = 0; offset < ps.n_queries; offset += get_bs(i++)) { + auto bs = std::min(get_bs(i), ps.n_queries - offset); + auto j = i % ps.max_concurrent_threads; + // wait for previous job in the same slot to finish + if (i >= ps.max_concurrent_threads) { futures[j].wait(); } + // submit a new job + futures[j] = std::async( + std::launch::async, + [&res = resource_pool[j], + ¶ms = search_params_dynb, + index = index_dynb.value(), + query_view = raft::make_device_matrix_view( + queries->data_handle() + offset * ps.dim, bs, ps.dim), + neighbors_view = raft::make_device_matrix_view( + neighbors_dynb->data_handle() + offset * ps.k, bs, ps.k), + distances_view = raft::make_device_matrix_view( + distances_dynb->data_handle() + offset * ps.k, bs, ps.k)]() { + dynamic_batching::search(res, params, index, query_view, neighbors_view, distances_view); + }); + } + + // finalize all resources + for (int64_t j = 0; j < ps.max_concurrent_threads && j < i; j++) { + futures[j].wait(); + raft::resource::sync_stream(resource_pool[j]); + } + raft::resource::sync_stream(res); + } + + /* + Check the dynamic batching generated neighbors against the upstream index. They both may be + imperfect w.r.t. the ground truth, but they shouldn't differ too much. + */ + void check_neighbors() + { + auto stream = raft::resource::get_cuda_stream(res); + size_t queries_size = ps.n_queries * ps.k; + std::vector neighbors_upsm_host(queries_size); + std::vector neighbors_dynb_host(queries_size); + std::vector distances_upsm_host(queries_size); + std::vector distances_dynb_host(queries_size); + raft::copy(neighbors_upsm_host.data(), neighbors_upsm->data_handle(), queries_size, stream); + raft::copy(neighbors_dynb_host.data(), neighbors_dynb->data_handle(), queries_size, stream); + raft::copy(distances_upsm_host.data(), distances_upsm->data_handle(), queries_size, stream); + raft::copy(distances_dynb_host.data(), distances_dynb->data_handle(), queries_size, stream); + raft::resource::sync_stream(res); + ASSERT_TRUE(eval_neighbours(neighbors_upsm_host, + neighbors_dynb_host, + distances_upsm_host, + distances_dynb_host, + ps.n_queries, + ps.k, + 0.001, + 0.9)) + << ps; + } + + void SetUp() override + { + dataset.emplace(raft::make_device_matrix(res, ps.n_rows, ps.dim)); + queries.emplace(raft::make_device_matrix(res, ps.n_queries, ps.dim)); + neighbors_upsm.emplace(raft::make_device_matrix(res, ps.n_queries, ps.k)); + neighbors_dynb.emplace(raft::make_device_matrix(res, ps.n_queries, ps.k)); + distances_upsm.emplace( + raft::make_device_matrix(res, ps.n_queries, ps.k)); + distances_dynb.emplace( + raft::make_device_matrix(res, ps.n_queries, ps.k)); + + raft::random::RngState rng(666ULL); + if constexpr (std::is_same_v || std::is_same_v) { + raft::random::uniform( + res, rng, dataset->data_handle(), dataset->size(), data_type(0.1), data_type(2.0)); + raft::random::uniform( + res, rng, queries->data_handle(), queries->size(), data_type(0.1), data_type(2.0)); + } else { + raft::random::uniformInt( + res, rng, dataset->data_handle(), dataset->size(), data_type(1), data_type(20)); + raft::random::uniformInt( + res, rng, queries->data_handle(), queries->size(), data_type(1), data_type(20)); + } + raft::resource::sync_stream(res); + } + + void TearDown() override + { + index_dynb.reset(); + index_upsm.reset(); + dataset.reset(); + queries.reset(); + neighbors_upsm.reset(); + neighbors_dynb.reset(); + distances_upsm.reset(); + distances_dynb.reset(); + raft::resource::sync_stream(res); + } +}; + +inline std::vector generate_inputs() +{ + std::vector inputs{dynamic_batching_spec{}}; + + for (auto alt_n_queries : {10, 50, 100}) { + dynamic_batching_spec input{}; + input.n_queries = alt_n_queries; + inputs.push_back(input); + } + + for (auto alt_k : {100, 200}) { + dynamic_batching_spec input{}; + input.k = alt_k; + inputs.push_back(input); + } + + for (auto alt_max_batch_size : {4, 16, 128, 256, 512, 1024}) { + dynamic_batching_spec input{}; + input.max_batch_size = alt_max_batch_size; + inputs.push_back(input); + } + + for (auto alt_n_queues : {1, 2, 16, 32}) { + dynamic_batching_spec input{}; + input.n_queues = alt_n_queues; + inputs.push_back(input); + } + + for (auto alt_max_concurrent_threads : {1, 2, 16, 32}) { + dynamic_batching_spec input{}; + input.max_concurrent_threads = alt_max_concurrent_threads; + inputs.push_back(input); + } + + { + auto n = inputs.size(); + for (size_t i = 0; i < n; i++) { + auto input = inputs[i]; + input.conservative_dispatch = !input.conservative_dispatch; + inputs.push_back(input); + } + } + + return inputs; +} + +const std::vector inputs = generate_inputs(); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/neighbors/dynamic_batching/test_cagra.cu b/cpp/test/neighbors/dynamic_batching/test_cagra.cu new file mode 100644 index 000000000..604fc29cf --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching/test_cagra.cu @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../dynamic_batching.cuh" + +#include + +namespace cuvs::neighbors::dynamic_batching { + +using cagra_F32 = dynamic_batching_test, + cagra::build, + cagra::search>; + +using cagra_U8 = dynamic_batching_test, + cagra::build, + cagra::search>; + +template +static void set_default_cagra_params(fixture& that) +{ + that.build_params_upsm.intermediate_graph_degree = 128; + that.build_params_upsm.graph_degree = 64; + that.search_params_upsm.itopk_size = + std::clamp(raft::bound_by_power_of_two(that.ps.k) * 16, 128, 512); +} + +TEST_P(cagra_F32, single_cta) +{ + set_default_cagra_params(*this); + search_params_upsm.algo = cagra::search_algo::SINGLE_CTA; + build_all(); + search_all(); + check_neighbors(); +} + +TEST_P(cagra_F32, multi_cta) +{ + set_default_cagra_params(*this); + search_params_upsm.algo = cagra::search_algo::MULTI_CTA; + build_all(); + search_all(); + check_neighbors(); +} + +TEST_P(cagra_F32, multi_kernel) +{ + set_default_cagra_params(*this); + search_params_upsm.algo = cagra::search_algo::MULTI_KERNEL; + build_all(); + search_all(); + check_neighbors(); +} + +TEST_P(cagra_U8, defaults) +{ + set_default_cagra_params(*this); + build_all(); + search_all(); + check_neighbors(); +} + +INSTANTIATE_TEST_CASE_P(dynamic_batching, cagra_F32, ::testing::ValuesIn(inputs)); +INSTANTIATE_TEST_CASE_P(dynamic_batching, cagra_U8, ::testing::ValuesIn(inputs)); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu b/cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu new file mode 100644 index 000000000..4922cffa3 --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../dynamic_batching.cuh" + +#include + +namespace cuvs::neighbors::dynamic_batching { + +using ivf_flat_i8 = dynamic_batching_test, + ivf_flat::build, + ivf_flat::search>; + +TEST_P(ivf_flat_i8, defaults) +{ + build_params_upsm.n_lists = std::round(std::sqrt(ps.n_rows)); + search_params_upsm.n_probes = + std::max(std::min(build_params_upsm.n_lists, 10), + raft::div_rounding_up_safe(build_params_upsm.n_lists, 50)); + build_all(); + search_all(); + check_neighbors(); +} + +INSTANTIATE_TEST_CASE_P(dynamic_batching, ivf_flat_i8, ::testing::ValuesIn(inputs)); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu b/cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu new file mode 100644 index 000000000..ec57e0b57 --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../dynamic_batching.cuh" + +#include + +namespace cuvs::neighbors::dynamic_batching { + +using ivf_pq_f16 = + dynamic_batching_test, ivf_pq::build, ivf_pq::search>; + +TEST_P(ivf_pq_f16, defaults) +{ + build_params_upsm.n_lists = std::round(std::sqrt(ps.n_rows)); + search_params_upsm.n_probes = + std::max(std::min(build_params_upsm.n_lists, 10), + raft::div_rounding_up_safe(build_params_upsm.n_lists, 50)); + build_all(); + search_all(); + check_neighbors(); +} + +INSTANTIATE_TEST_CASE_P(dynamic_batching, ivf_pq_f16, ::testing::ValuesIn(inputs)); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/docs/source/cpp_api/neighbors.rst b/docs/source/cpp_api/neighbors.rst index d55d58eb0..ab810ab53 100644 --- a/docs/source/cpp_api/neighbors.rst +++ b/docs/source/cpp_api/neighbors.rst @@ -11,6 +11,7 @@ Nearest Neighbors neighbors_bruteforce.rst neighbors_cagra.rst + neighbors_dynamic_batching.rst neighbors_hnsw.rst neighbors_ivf_flat.rst neighbors_ivf_pq.rst diff --git a/docs/source/cpp_api/neighbors_dynamic_batching.rst b/docs/source/cpp_api/neighbors_dynamic_batching.rst new file mode 100644 index 000000000..adc5cb56a --- /dev/null +++ b/docs/source/cpp_api/neighbors_dynamic_batching.rst @@ -0,0 +1,45 @@ +Dynamic Batching +================ + +Dynamic Batching allows grouping small search requests into batches to increase the device occupancy and throughput while keeping the latency within limits. + +.. role:: py(code) + :language: c++ + :class: highlight + +``#include `` + +namespace *cuvs::neighbors::dynamic_batching* + +Index build parameters +---------------------- + +.. doxygengroup:: dynamic_batching_cpp_index_params + :project: cuvs + :members: + :content-only: + +Index search parameters +----------------------- + +.. doxygengroup:: dynamic_batching_cpp_search_params + :project: cuvs + :members: + :content-only: + +Index +----- + +.. doxygengroup:: dynamic_batching_cpp_index + :project: cuvs + :members: + :content-only: + + +Index search +------------ + +.. doxygengroup:: dynamic_batching_cpp_search + :project: cuvs + :members: + :content-only: diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 092b65ed9..951e0ad0c 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -38,6 +38,7 @@ include(../cmake/thirdparty/get_cuvs.cmake) # -------------- compile tasks ----------------- # add_executable(CAGRA_EXAMPLE src/cagra_example.cu) add_executable(CAGRA_PERSISTENT_EXAMPLE src/cagra_persistent_example.cu) +add_executable(DYNAMIC_BATCHING_EXAMPLE src/dynamic_batching_example.cu) add_executable(IVF_FLAT_EXAMPLE src/ivf_flat_example.cu) add_executable(IVF_PQ_EXAMPLE src/ivf_pq_example.cu) add_executable(VAMANA_EXAMPLE src/vamana_example.cu) @@ -48,6 +49,9 @@ target_link_libraries(CAGRA_EXAMPLE PRIVATE cuvs::cuvs $ Threads::Threads ) +target_link_libraries( + DYNAMIC_BATCHING_EXAMPLE PRIVATE cuvs::cuvs $ Threads::Threads +) target_link_libraries(IVF_PQ_EXAMPLE PRIVATE cuvs::cuvs $) target_link_libraries(IVF_FLAT_EXAMPLE PRIVATE cuvs::cuvs $) target_link_libraries(VAMANA_EXAMPLE PRIVATE cuvs::cuvs $) diff --git a/examples/cpp/src/dynamic_batching_example.cu b/examples/cpp/src/dynamic_batching_example.cu new file mode 100644 index 000000000..95f66a454 --- /dev/null +++ b/examples/cpp/src/dynamic_batching_example.cu @@ -0,0 +1,282 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common.cuh" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +// A helper to split the dataset into chunks +template +auto slice_matrix(const DeviceMatrixOrView &source, + typename DeviceMatrixOrView::index_type offset_rows, + typename DeviceMatrixOrView::index_type count_rows) { + auto n_cols = source.extent(1); + return raft::make_device_matrix_view< + typename DeviceMatrixOrView::element_type, + typename DeviceMatrixOrView::index_type>( + const_cast( + source.data_handle()) + + offset_rows * n_cols, + count_rows, n_cols); +} + +// A helper to measure the execution time of a function +template +void time_it(std::string label, F f, Args &&...xs) { + auto start = std::chrono::system_clock::now(); + f(std::forward(xs)...); + auto end = std::chrono::system_clock::now(); + auto t = std::chrono::duration_cast(end - start); + auto t_ms = double(t.count()) / 1000.0; + std::cout << "[" << label << "] execution time: " << t_ms << " ms" + << std::endl; +} + +/** + * Wrap waiting on a stream work into an async C++ future object. + * This is similar to recording and waiting on CUDA events, but in C++11 API. + */ +struct cuda_work_completion_promise { + + cuda_work_completion_promise(const raft::resources &res) { + auto *promise = new std::promise; + RAFT_CUDA_TRY(cudaLaunchHostFunc(raft::resource::get_cuda_stream(res), + completion_callback, + reinterpret_cast(promise))); + value_ = promise->get_future(); + } + + /** + * Waiting on the produced `future` object has the same effect as + * cudaEventSynchronize if an event was recorded at the time of creation of + * this promise object. + */ + auto get_future() -> std::future && { return std::move(value_); } + +private: + std::future value_; + + static void completion_callback(void *ptr) { + auto *promise = reinterpret_cast *>(ptr); + promise->set_value(); + delete promise; + } +}; + +void dynamic_batching_example( + raft::resources const &res, + raft::device_matrix_view dataset, + raft::device_matrix_view queries) { + using namespace cuvs::neighbors; + + // Number of neighbors to search + int64_t topk = 100; + + // Streaming scenario: maximum number of requests in-flight + constexpr int64_t kMaxJobs = 1000; + // Streaming scenario: number of concurrent CUDA streams + constexpr int64_t kNumWorkerStreams = 5; + + // Split the queries into two subsets to run every experiment twice and thus + // surface any initialization overheads. + int64_t n_queries_a = queries.extent(0) / 2; + int64_t n_queries_b = queries.extent(0) - n_queries_a; + + auto queries_a = slice_matrix(queries, 0, n_queries_a); + auto queries_b = slice_matrix(queries, n_queries_a, n_queries_b); + + // create output arrays + auto neighbors = + raft::make_device_matrix(res, queries.extent(0), topk); + auto distances = + raft::make_device_matrix(res, queries.extent(0), topk); + // slice them same as queries + auto neighbors_a = slice_matrix(neighbors, 0, n_queries_a); + auto distances_a = slice_matrix(distances, 0, n_queries_a); + auto neighbors_b = slice_matrix(neighbors, n_queries_a, n_queries_b); + auto distances_b = slice_matrix(distances, n_queries_a, n_queries_b); + + // use default index parameters + cagra::index_params orig_index_params; + + std::cout << "Building CAGRA index (search graph)" << std::endl; + auto orig_index = cagra::build(res, orig_index_params, dataset); + + std::cout << "CAGRA index has " << orig_index.size() << " vectors" + << std::endl; + std::cout << "CAGRA graph has degree " << orig_index.graph_degree() + << ", graph size [" << orig_index.graph().extent(0) << ", " + << orig_index.graph().extent(1) << "]" << std::endl; + + // use default search parameters + cagra::search_params orig_search_params; + // get a decent recall by increasing the internal topk list + orig_search_params.itopk_size = 512; + orig_search_params.algo = cagra::search_algo::SINGLE_CTA; + + // Set up dynamic batching parameters + dynamic_batching::index_params dynb_index_params{ + /* default-initializing the parent `neighbors::index_params` + (not used anyway) */ + {}, + /* Set the K in advance (the batcher needs to allocate buffers) */ + topk, + /* Configure the number and the size of IO buffers */ + 64, + kNumWorkerStreams}; + + // "build" the index (it's a low-cost index wrapping), + // that is we need to pass the original index and its search params here + dynamic_batching::index dynb_index( + res, dynb_index_params, orig_index, orig_search_params); + + // You can implement job priorities by varying the deadlines of individual + // requests + dynamic_batching::search_params dynb_search_params; + dynb_search_params.dispatch_timeout_ms = 0.1; + + // Define the big-batch setting as a baseline for measuring the throughput. + auto search_batch_orig = + [&res, &orig_index, &orig_search_params]( + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + cagra::search(res, orig_search_params, orig_index, queries, neighbors, + distances); + raft::resource::sync_stream(res); + }; + + // Launch the baseline search: check the big-batch performance + time_it("standard/batch A", search_batch_orig, queries_a, neighbors_a, + distances_a); + time_it("standard/batch B", search_batch_orig, queries_b, neighbors_b, + distances_b); + + // Streaming scenario: prepare concurrent resources + rmm::cuda_stream_pool worker_streams{kNumWorkerStreams}; + std::vector resource_pool(0); + for (int64_t i = 0; i < kNumWorkerStreams; i++) { + resource_pool.push_back(res); + raft::resource::set_cuda_stream(resource_pool[i], + worker_streams.get_stream(i)); + } + + // Streaming scenario: + // send queries one-by-one, with a maximum kMaxJobs in-flight + auto search_async_orig = + [&resource_pool, &orig_index, &orig_search_params]( + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + auto work_size = queries.extent(0); + std::array, kMaxJobs> futures; + for (int64_t i = 0; i < work_size + kMaxJobs; i++) { + // wait for previous job in the same slot to finish + if (i >= kMaxJobs) { + futures[i % kMaxJobs].wait(); + } + // submit a new job + if (i < work_size) { + auto &res = resource_pool[i % kNumWorkerStreams]; + cagra::search(res, orig_search_params, orig_index, + slice_matrix(queries, i, 1), + slice_matrix(neighbors, i, 1), + slice_matrix(distances, i, 1)); + futures[i % kMaxJobs] = + cuda_work_completion_promise(res).get_future(); + } + } + }; + + // Streaming scenario with dynamic batching: + // send queries one-by-one, with a maximum kMaxJobs in-flight, + // yet allow grouping the sequential requests (subject to deadlines) + auto search_async_dynb = + [&resource_pool, &dynb_index, &dynb_search_params]( + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + auto work_size = queries.extent(0); + std::array, kMaxJobs> futures; + for (int64_t i = 0; i < work_size + kMaxJobs; i++) { + // wait for previous job in the same slot to finish + if (i >= kMaxJobs) { + futures[i % kMaxJobs].wait(); + } + // submit a new job + if (i < work_size) { + auto &res = resource_pool[i % kNumWorkerStreams]; + dynamic_batching::search(res, dynb_search_params, dynb_index, + slice_matrix(queries, i, 1), + slice_matrix(neighbors, i, 1), + slice_matrix(distances, i, 1)); + futures[i % kMaxJobs] = + cuda_work_completion_promise(res).get_future(); + } + } + }; + + // Try to handle the same amount of work in the async setting using the + // standard implementation. + time_it("standard/async A", search_async_orig, queries_a, neighbors_a, + distances_a); + time_it("standard/async B", search_async_orig, queries_b, neighbors_b, + distances_b); + + // Do the same using dynamic batching + time_it("dynamic_batching/async A", search_async_dynb, queries_a, neighbors_a, + distances_a); + time_it("dynamic_batching/async B", search_async_dynb, queries_b, neighbors_b, + distances_b); +} + +int main() { + raft::device_resources res; + + // Set the raft resource to use a pool for internal memory allocations + // (workspace) and limit the available workspace size. + raft::resource::set_workspace_to_pool_resource(res, + 12ull * 1024 * 1024 * 1024ull); + + // Create input arrays. + int64_t n_samples = 1000000; + int64_t n_dim = 128; + int64_t n_queries = 10000; + auto dataset = + raft::make_device_matrix(res, n_samples, n_dim); + auto queries = + raft::make_device_matrix(res, n_queries, n_dim); + generate_dataset(res, dataset.view(), queries.view()); + + // run the interesting part of the program + dynamic_batching_example(res, raft::make_const_mdspan(dataset.view()), + raft::make_const_mdspan(queries.view())); +}