diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 34b7cb898..199bb232d 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -397,6 +397,7 @@ if(BUILD_SHARED_LIBS) src/neighbors/iface/iface_pq_uint8_t_int64_t.cu src/neighbors/detail/cagra/cagra_build.cpp src/neighbors/detail/cagra/topk_for_cagra/topk.cu + src/neighbors/dynamic_batching.cu $<$:src/neighbors/hnsw.cpp> src/neighbors/ivf_flat_index.cpp src/neighbors/ivf_flat/ivf_flat_build_extend_float_int64_t.cu @@ -458,6 +459,7 @@ if(BUILD_SHARED_LIBS) src/neighbors/vamana_serialize_float.cu src/neighbors/vamana_serialize_uint8.cu src/neighbors/vamana_serialize_int8.cu + src/preprocessing/quantize/scalar.cu src/selection/select_k_float_int64_t.cu src/selection/select_k_float_int32_t.cu src/selection/select_k_float_uint32_t.cu diff --git a/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h b/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h index 57d5b1910..7617bfa66 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h +++ b/cpp/bench/ann/src/cuvs/cuvs_ann_bench_param_parser.h @@ -56,6 +56,26 @@ extern template class cuvs::bench::cuvs_cagra; #include "cuvs_mg_cagra_wrapper.h" #endif +template +void parse_dynamic_batching_params(const nlohmann::json& conf, ParamT& param) +{ + if (!conf.value("dynamic_batching", false)) { return; } + param.dynamic_batching = true; + if (conf.contains("dynamic_batching_max_batch_size")) { + param.dynamic_batching_max_batch_size = conf.at("dynamic_batching_max_batch_size"); + } + param.dynamic_batching_conservative_dispatch = + conf.value("dynamic_batching_conservative_dispatch", false); + if (conf.contains("dynamic_batching_dispatch_timeout_ms")) { + param.dynamic_batching_dispatch_timeout_ms = conf.at("dynamic_batching_dispatch_timeout_ms"); + } + if (conf.contains("dynamic_batching_n_queues")) { + param.dynamic_batching_n_queues = conf.at("dynamic_batching_n_queues"); + } + param.dynamic_batching_k = + uint32_t(uint32_t(conf.at("k")) * float(conf.value("refine_ratio", 1.0f))); +} + #if defined(CUVS_ANN_BENCH_USE_CUVS_IVF_FLAT) || defined(CUVS_ANN_BENCH_USE_CUVS_MG) template void parse_build_param(const nlohmann::json& conf, @@ -138,6 +158,9 @@ void parse_search_param(const nlohmann::json& conf, param.refine_ratio = conf.at("refine_ratio"); if (param.refine_ratio < 1.0f) { throw std::runtime_error("refine_ratio should be >= 1.0"); } } + + // enable dynamic batching + parse_dynamic_batching_params(conf, param); } #endif @@ -291,5 +314,8 @@ void parse_search_param(const nlohmann::json& conf, } // Same ratio as in IVF-PQ param.refine_ratio = conf.value("refine_ratio", 1.0f); + + // enable dynamic batching + parse_dynamic_batching_params(conf, param); } #endif diff --git a/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h b/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h index f6d3d60fc..8c9cb2d4f 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h +++ b/cpp/bench/ann/src/cuvs/cuvs_cagra_wrapper.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -63,6 +64,13 @@ class cuvs_cagra : public algo, public algo_gpu { AllocatorType graph_mem = AllocatorType::kDevice; AllocatorType dataset_mem = AllocatorType::kDevice; [[nodiscard]] auto needs_dataset() const -> bool override { return true; } + /* Dynamic batching */ + bool dynamic_batching = false; + int64_t dynamic_batching_k; + int64_t dynamic_batching_max_batch_size = 4; + double dynamic_batching_dispatch_timeout_ms = 0.01; + size_t dynamic_batching_n_queues = 8; + bool dynamic_batching_conservative_dispatch = false; }; struct build_param { @@ -173,6 +181,12 @@ class cuvs_cagra : public algo, public algo_gpu { std::shared_ptr> dataset_; std::shared_ptr> input_dataset_v_; + std::shared_ptr> dynamic_batcher_; + cuvs::neighbors::dynamic_batching::search_params dynamic_batcher_sp_{}; + int64_t dynamic_batching_max_batch_size_; + size_t dynamic_batching_n_queues_; + bool dynamic_batching_conservative_dispatch_; + inline rmm::device_async_resource_ref get_mr(AllocatorType mem_type) { switch (mem_type) { @@ -216,26 +230,33 @@ inline auto allocator_to_string(AllocatorType mem_type) -> std::string template void cuvs_cagra::set_search_param(const search_param_base& param) { - auto sp = dynamic_cast(param); - search_params_ = sp.p; - refine_ratio_ = sp.refine_ratio; + auto sp = dynamic_cast(param); + bool needs_dynamic_batcher_update = + (dynamic_batching_max_batch_size_ != sp.dynamic_batching_max_batch_size) || + (dynamic_batching_n_queues_ != sp.dynamic_batching_n_queues) || + (dynamic_batching_conservative_dispatch_ != sp.dynamic_batching_conservative_dispatch); + dynamic_batching_max_batch_size_ = sp.dynamic_batching_max_batch_size; + dynamic_batching_n_queues_ = sp.dynamic_batching_n_queues; + dynamic_batching_conservative_dispatch_ = sp.dynamic_batching_conservative_dispatch; + search_params_ = sp.p; + refine_ratio_ = sp.refine_ratio; if (sp.graph_mem != graph_mem_) { // Move graph to correct memory space graph_mem_ = sp.graph_mem; RAFT_LOG_DEBUG("moving graph to new memory space: %s", allocator_to_string(graph_mem_).c_str()); // We create a new graph and copy to it from existing graph - auto mr = get_mr(graph_mem_); - auto new_graph = raft::make_device_mdarray( + auto mr = get_mr(graph_mem_); + *graph_ = raft::make_device_mdarray( handle_, mr, raft::make_extents(index_->graph().extent(0), index_->graph_degree())); - raft::copy(new_graph.data_handle(), + raft::copy(graph_->data_handle(), index_->graph().data_handle(), index_->graph().size(), raft::resource::get_cuda_stream(handle_)); - index_->update_graph(handle_, make_const_mdspan(new_graph.view())); - // update_graph() only stores a view in the index. We need to keep the graph object alive. - *graph_ = std::move(new_graph); + // NB: update_graph() only stores a view in the index. We need to keep the graph object alive. + index_->update_graph(handle_, make_const_mdspan(graph_->view())); + needs_dynamic_batcher_update = true; } if (sp.dataset_mem != dataset_mem_ || need_dataset_update_) { @@ -256,7 +277,26 @@ void cuvs_cagra::set_search_param(const search_param_base& param) dataset_->data_handle(), dataset_->extent(0), this->dim_, dataset_->extent(1)); index_->update_dataset(handle_, dataset_view); - need_dataset_update_ = false; + need_dataset_update_ = false; + needs_dynamic_batcher_update = true; + } + + // dynamic batching + if (sp.dynamic_batching) { + if (!dynamic_batcher_ || needs_dynamic_batcher_update) { + dynamic_batcher_ = std::make_shared>( + handle_, + cuvs::neighbors::dynamic_batching::index_params{{}, + sp.dynamic_batching_k, + sp.dynamic_batching_max_batch_size, + sp.dynamic_batching_n_queues, + sp.dynamic_batching_conservative_dispatch}, + *index_, + search_params_); + } + dynamic_batcher_sp_.dispatch_timeout_ms = sp.dynamic_batching_dispatch_timeout_ms; + } else { + if (dynamic_batcher_) { dynamic_batcher_.reset(); } } } @@ -306,7 +346,7 @@ void cuvs_cagra::load(const std::string& file) template std::unique_ptr> cuvs_cagra::copy() { - return std::make_unique>(*this); // use copy constructor + return std::make_unique>(std::cref(*this)); // use copy constructor } template @@ -330,8 +370,17 @@ void cuvs_cagra::search_base(const T* queries, raft::make_device_matrix_view(neighbors_idx_t, batch_size, k); auto distances_view = raft::make_device_matrix_view(distances, batch_size, k); - cuvs::neighbors::cagra::search( - handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + if (dynamic_batcher_) { + cuvs::neighbors::dynamic_batching::search(handle_, + dynamic_batcher_sp_, + *dynamic_batcher_, + queries_view, + neighbors_view, + distances_view); + } else { + cuvs::neighbors::cagra::search( + handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + } if constexpr (sizeof(IdxT) != sizeof(algo_base::index_type)) { if (raft::get_device_for_address(neighbors) < 0 && @@ -367,11 +416,23 @@ void cuvs_cagra::search( const raft::resources& res = handle_; auto mem_type = raft::get_device_for_address(neighbors) >= 0 ? MemoryType::kDevice : MemoryType::kHostPinned; - auto& tmp_buf = get_tmp_buffer_from_global_pool( - ((disable_refinement ? 0 : (sizeof(float) + sizeof(algo_base::index_type))) + - (kNeedsIoMapping ? sizeof(IdxT) : 0)) * - batch_size * k0); - auto* candidates_ptr = reinterpret_cast(tmp_buf.data(mem_type)); + + // If dynamic batching is used and there's no sync between benchmark laps, multiple sequential + // requests can group together. The data is copied asynchronously, and if the same intermediate + // buffer is used for multiple requests, they can override each other's data. Hence, we need to + // allocate as much space as required by the maximum number of sequential requests. + auto max_dyn_grouping = dynamic_batcher_ ? raft::div_rounding_up_safe( + dynamic_batching_max_batch_size_, batch_size) * + dynamic_batching_n_queues_ + : 1; + auto tmp_buf_size = ((disable_refinement ? 0 : (sizeof(float) + sizeof(algo_base::index_type))) + + (kNeedsIoMapping ? sizeof(IdxT) : 0)) * + batch_size * k0; + auto& tmp_buf = get_tmp_buffer_from_global_pool(tmp_buf_size * max_dyn_grouping); + thread_local static int64_t group_id = 0; + auto* candidates_ptr = reinterpret_cast( + reinterpret_cast(tmp_buf.data(mem_type)) + tmp_buf_size * group_id); + group_id = (group_id + 1) % max_dyn_grouping; auto* candidate_dists_ptr = reinterpret_cast(candidates_ptr + (disable_refinement ? 0 : batch_size * k0)); auto* neighbors_idx_t = diff --git a/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h b/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h index 4c8a91f23..dac766669 100644 --- a/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h +++ b/cpp/bench/ann/src/cuvs/cuvs_ivf_pq_wrapper.h @@ -19,7 +19,9 @@ #include "cuvs_ann_bench_utils.h" #include +#include #include + #include #include #include @@ -46,6 +48,13 @@ class cuvs_ivf_pq : public algo, public algo_gpu { cuvs::neighbors::ivf_pq::search_params pq_param; float refine_ratio = 1.0f; [[nodiscard]] auto needs_dataset() const -> bool override { return refine_ratio > 1.0f; } + /* Dynamic batching */ + bool dynamic_batching = false; + int64_t dynamic_batching_k; + int64_t dynamic_batching_max_batch_size = 128; + double dynamic_batching_dispatch_timeout_ms = 0.01; + size_t dynamic_batching_n_queues = 3; + bool dynamic_batching_conservative_dispatch = true; }; using build_param = cuvs::neighbors::ivf_pq::index_params; @@ -98,6 +107,9 @@ class cuvs_ivf_pq : public algo, public algo_gpu { int dimension_; float refine_ratio_ = 1.0; raft::device_matrix_view dataset_; + + std::shared_ptr> dynamic_batcher_; + cuvs::neighbors::dynamic_batching::search_params dynamic_batcher_sp_{}; }; template @@ -138,6 +150,21 @@ void cuvs_ivf_pq::set_search_param(const search_param_base& param) search_params_ = sp.pq_param; refine_ratio_ = sp.refine_ratio; assert(search_params_.n_probes <= index_params_.n_lists); + + if (sp.dynamic_batching) { + dynamic_batcher_ = std::make_shared>( + handle_, + cuvs::neighbors::dynamic_batching::index_params{{}, + sp.dynamic_batching_k, + sp.dynamic_batching_max_batch_size, + sp.dynamic_batching_n_queues, + sp.dynamic_batching_conservative_dispatch}, + *index_, + search_params_); + dynamic_batcher_sp_.dispatch_timeout_ms = sp.dynamic_batching_dispatch_timeout_ms; + } else { + dynamic_batcher_.reset(); + } } template @@ -168,8 +195,17 @@ void cuvs_ivf_pq::search_base( raft::make_device_matrix_view(neighbors_idx_t, batch_size, k); auto distances_view = raft::make_device_matrix_view(distances, batch_size, k); - cuvs::neighbors::ivf_pq::search( - handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + if (dynamic_batcher_) { + cuvs::neighbors::dynamic_batching::search(handle_, + dynamic_batcher_sp_, + *dynamic_batcher_, + queries_view, + neighbors_view, + distances_view); + } else { + cuvs::neighbors::ivf_pq::search( + handle_, search_params_, *index_, queries_view, neighbors_view, distances_view); + } if constexpr (sizeof(IdxT) != sizeof(algo_base::index_type)) { raft::linalg::unaryOp(neighbors, diff --git a/cpp/include/cuvs/core/c_api.h b/cpp/include/cuvs/core/c_api.h index c8c8d3934..400d162ad 100644 --- a/cpp/include/cuvs/core/c_api.h +++ b/cpp/include/cuvs/core/c_api.h @@ -151,6 +151,22 @@ cuvsError_t cuvsRMMPoolMemoryResourceEnable(int initial_pool_size_percent, */ cuvsError_t cuvsRMMMemoryResourceReset(); +/** + * @brief Allocates pinned memory on the host using RMM + * @param[out] ptr Pointer to allocated host memory + * @param[in] bytes Size in bytes to allocate + * @return cuvsError_t + */ +cuvsError_t cuvsRMMHostAlloc(void** ptr, size_t bytes); + +/** + * @brief Deallocates pinned memory on the host using RMM + * @param[in] ptr Pointer to allocated host memory to free + * @param[in] bytes Size in bytes to deallocate + * @return cuvsError_t + */ +cuvsError_t cuvsRMMHostFree(void* ptr, size_t bytes); + /** @} */ #ifdef __cplusplus diff --git a/cpp/include/cuvs/neighbors/cagra.hpp b/cpp/include/cuvs/neighbors/cagra.hpp index 5ceb3010e..a4684ce26 100644 --- a/cpp/include/cuvs/neighbors/cagra.hpp +++ b/cpp/include/cuvs/neighbors/cagra.hpp @@ -272,6 +272,10 @@ static_assert(std::is_aggregate_v); */ template struct index : cuvs::neighbors::index { + using index_params_type = cagra::index_params; + using search_params_type = cagra::search_params; + using index_type = IdxT; + using value_type = T; static_assert(!raft::is_narrowing_v, "IdxT must be able to represent all values of uint32_t"); diff --git a/cpp/include/cuvs/neighbors/dynamic_batching.hpp b/cpp/include/cuvs/neighbors/dynamic_batching.hpp new file mode 100644 index 000000000..410800357 --- /dev/null +++ b/cpp/include/cuvs/neighbors/dynamic_batching.hpp @@ -0,0 +1,290 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cuvs::neighbors::dynamic_batching { + +namespace detail { +template +class batch_runner; +} + +/** + * @defgroup dynamic_batching_cpp_index_params Dynamic Batching index parameters + * @{ + */ +struct index_params : cuvs::neighbors::index_params { + /** The number of neighbors to search is fixed at construction time. */ + int64_t k; + /** Maximum size of the batch to submit to the upstream index. */ + int64_t max_batch_size = 100; + /** + * The number of independent request queues. + * + * Each queue is associated with a unique CUDA stream and IO device buffers. If the number of + * concurrent requests is high, using multiple queues allows to fill-in data and prepare the batch + * while the other queue is busy. Moreover, the queues are submitted concurrently; this allows to + * better utilize the GPU by hiding the kernel launch latencies, which helps to improve the + * throughput. + */ + size_t n_queues = 3; + /** + * By default (`conservative_dispatch = false`) the first CPU thread to commit a query to a batch + * dispatches the upstream search function as soon as possible (before the batch is full). In that + * case, it does not know the final batch size at the time of calling the upstream search and thus + * runs the upstream search with the maximum batch size every time, even if only one valid query + * is present in the batch. This reduces the latency at the cost of wasted GPU resources. + * + * The alternative behavaior (`conservative_dispatch = true`) is more conservative: the dispatcher + * thread starts the kernel that gathers input queries, but waits till the batch is full or the + * waiting time is exceeded. Only then it acquires the actual batch size and launches the upstream + * search. As a result, less GPU resources are wasted at the cost of exposing upstream search + * latency. + * + * *Rule of Thumb*: + * for a large `max_batch_size` set `conservative_dispatch = true`, otherwise keep it disabled. + */ + bool conservative_dispatch = false; +}; +/** @} */ + +/** + * @defgroup dynamic_batching_cpp_search_params Dynamic Batching search parameters + * @{ + */ +struct search_params : cuvs::neighbors::search_params { + /** + * How long a request can stay in the queue (milliseconds). + * Note, this only affects the dispatch time and does not reflect full request latency; + * the latter depends on the upstream search parameters and the batch size. + */ + double dispatch_timeout_ms = 1.0; +}; +/** @} */ + +/** + * @defgroup dynamic_batching_cpp_index Dynamic Batching index type + * @{ + */ + +/** + * @brief Lightweight dynamic batching index wrapper + * + * @tparam T data type + * @tparam IdxT index type + * + * One lightweight dynamic batching index manages a single index and a single search parameter set. + * This structure should be shared among multiple users via copy semantics: access to the + * underlying implementation is managed via a shared pointer, and concurrent search among the + * participants is thread-safe. + * + * __Usage example__ + * @code{.cpp} + * using namespace cuvs::neighbors; + * // When creating a dynamic batching index, k parameter has to be passed explicitly. + * // The first empty braces default-initialize the parent `neighbors::index_params` (unused). + * dynamic_batching::index_params dynb_index_params{{}, k}; + * // Construct the index by wrapping the upstream index and search parameters. + * dynamic_batching::index index{ + * res, dynb_index_params, upstream_index, upstream_search_params + * }; + * // Use default search parameters + * dynamic_batching::search_params search_params; + * // Search K nearest neighbours + * auto neighbors = raft::make_device_matrix(res, n_queries, k); + * auto distances = raft::make_device_matrix(res, n_queries, k); + * dynamic_batching::search( + * res, search_params, index, queries, neighbors.view(), distances.view() + * ); + * @endcode + * + * + * __Priority queues__ + * + * The dynamic batching index has a limited support for prioritizing individual requests. + * There's only one pool of queues in the batcher and no functionality to prioritize one bach over + * the other. The `search_params::dispatch_timeout_ms` parameters passed in each request are + * aggregated internally and the batch is dispatched no later than any of the timeouts is exceeded. + * In this logic, a high-priority request can never be processed earlier than any lower-priority + * requests submitted earlier. + * + * However, dynamic batching indexes are lightweight and do not contain any global or static state. + * This means it's easy to combine multiple batchers. + * As an example, you can construct one batching index per priority class: + * @code{.cpp} + * using namespace cuvs::neighbors; + * // Large batch size (128), couple queues (2), + * // enabled conservative dispatch - all for better throughput + * dynamic_batching::index_params low_priority_params{{}, k, 128, 2, true}; + * // Small batch size (16), more queues (4), + * // disabled conservative dispatch - to minimize latency with reasonable throughput + * dynamic_batching::index_params high_priority_params{{}, k, 16, 4, false}; + * // Construct the indexes by wrapping the upstream index and search parameters. + * dynamic_batching::index low_priority_index{ + * res, low_priority_params, upstream_index, upstream_search_params + * }; + * dynamic_batching::index high_priority_index{ + * res, high_priority_params, upstream_index, upstream_search_params + * }; + * // Define a combined search function with priority selection + * double high_priority_threshold_ms = 0.1; + * auto search_function = + * [low_priority_index, high_priority_index, high_priority_threshold_ms]( + * raft::resources const &res, + * dynamic_batching::search_params search_params, + * raft::device_matrix_view queries, + * raft::device_matrix_view neighbors, + * raft::device_matrix_view distances) { + * dynamic_batching::search( + * res, + * search_params, + * search_params.dispatch_timeout_ms < high_priority_threshold_ms + * ? high_priority_index : low_priority_index, + * queries, + * neighbors, + * distances + * ); + * }; + * @endcode + */ +template +struct index : cuvs::neighbors::index { + std::shared_ptr> runner; + + /** + * @brief Construct a dynamic batching index by wrapping the upstream index. + * + * @tparam Upstream the upstream index type + * + * @param[in] res raft resources + * @param[in] params dynamic batching parameters + * @param[in] upstream_index the original index to perform the search + * (the reference must be alive for the lifetime of the dynamic batching index) + * @param[in] upstream_params the original index search parameters for all queries in a batch + * (the parameters are captured by value for the lifetime of the dynamic batching index) + * @param[in] sample_filter + * filtering function, if any, must be the same for all requests in a batch + * (the pointer must be alive for the lifetime of the dynamic batching index) + */ + template + index(const raft::resources& res, + const cuvs::neighbors::dynamic_batching::index_params& params, + const Upstream& upstream_index, + const typename Upstream::search_params_type& upstream_params, + const cuvs::neighbors::filtering::base_filter* sample_filter = nullptr); +}; +/** @} */ + +/** + * + * @defgroup dynamic_batching_cpp_search Dynamic Batching search + * + * @{ + */ + +/** + * @brief Search ANN using a dynamic batching index. + * + * The search parameters of the upstream index and the optional filtering function are configured at + * the dynamic batching index construction time. + * + * Like with many other indexes, the dynamic batching search has the stream-ordered semantics: the + * host function may return the control before the results are ready. Synchronize with the main CUDA + * stream in the given resource object to wait for arrival of the search results. + * + * Dynamic batching search is thread-safe: call the search function with copies of the same index in + * multiple threads to increase the occupancy of the batches. + * + * @param[in] res + * @param[in] params query-specific batching parameters, such as the maximum waiting time + * @param[in] index a dynamic batching index + * @param[in] queries a device matrix view to a row-major matrix + * [n_queries, dim] + * @param[out] neighbors a device matrix view to the indices of the neighbors in the source dataset + * [n_queries, k] + * @param[out] distances a device matrix view to the distances to the selected neighbors + * [n_queries, k] + * + */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @copydoc search */ +void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + dynamic_batching::index const& index, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances); + +/** @} */ + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/include/cuvs/neighbors/ivf_flat.hpp b/cpp/include/cuvs/neighbors/ivf_flat.hpp index 7f852d635..e017946d9 100644 --- a/cpp/include/cuvs/neighbors/ivf_flat.hpp +++ b/cpp/include/cuvs/neighbors/ivf_flat.hpp @@ -138,6 +138,10 @@ using list_data = ivf::list; */ template struct index : cuvs::neighbors::index { + using index_params_type = ivf_flat::index_params; + using search_params_type = ivf_flat::search_params; + using index_type = IdxT; + using value_type = T; static_assert(!raft::is_narrowing_v, "IdxT must be able to represent all values of uint32_t"); diff --git a/cpp/include/cuvs/neighbors/ivf_pq.hpp b/cpp/include/cuvs/neighbors/ivf_pq.hpp index ae543c9e9..d85753b7f 100644 --- a/cpp/include/cuvs/neighbors/ivf_pq.hpp +++ b/cpp/include/cuvs/neighbors/ivf_pq.hpp @@ -319,6 +319,9 @@ using list_data = ivf::list; */ template struct index : cuvs::neighbors::index { + using index_params_type = ivf_pq::index_params; + using search_params_type = ivf_pq::search_params; + using index_type = IdxT; static_assert(!raft::is_narrowing_v, "IdxT must be able to represent all values of uint32_t"); diff --git a/cpp/include/cuvs/preprocessing/quantize/scalar.hpp b/cpp/include/cuvs/preprocessing/quantize/scalar.hpp new file mode 100644 index 000000000..49b4bb7a6 --- /dev/null +++ b/cpp/include/cuvs/preprocessing/quantize/scalar.hpp @@ -0,0 +1,489 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace cuvs::preprocessing::quantize::scalar { + +/** + * @defgroup scalar Scalar quantizer utilities + * @{ + */ + +/** + * @brief quantizer parameters. + */ +struct params { + /* + * specifies how many outliers at top & bottom will be ignored + * needs to be within range of (0, 1] + */ + float quantile = 0.99; +}; + +/** + * @brief Defines and stores scalar for quantisation upon training + * + * The quantization is performed by a linear mapping of an interval in the + * float data type to the full range of the quantized int type. + * + * @tparam T data element type + * + */ +template +struct quantizer { + T min_; + T max_; +}; + +/** + * @brief Initializes a scalar quantizer to be used later for quantizing the dataset. + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); + * @endcode + * + * @param[in] res raft resource + * @param[in] params configure scalar quantizer, e.g. quantile + * @param[in] dataset a row-major matrix view on device + * + * @return quantizer + */ +quantizer train(raft::resources const& res, + const params params, + raft::device_matrix_view dataset); + +/** + * @brief Initializes a scalar quantizer to be used later for quantizing the dataset. + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); + * @endcode + * + * @param[in] res raft resource + * @param[in] params configure scalar quantizer, e.g. quantile + * @param[in] dataset a row-major matrix view on host + * + * @return quantizer + */ +quantizer train(raft::resources const& res, + const params params, + raft::host_matrix_view dataset); + +/** + * @brief Applies quantization transform to given dataset + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); auto quantized_dataset = raft::make_device_matrix(handle, samples, + * features); cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on device + * @param[out] out a row-major matrix view on device + * + */ +void transform(raft::resources const& res, + const quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out); + +/** + * @brief Applies quantization transform to given dataset + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); auto quantized_dataset = raft::make_host_matrix(samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on host + * @param[out] out a row-major matrix view on host + * + */ +void transform(raft::resources const& res, + const quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out); + +/** + * @brief Perform inverse quantization step on previously quantized dataset + * + * Note that depending on the chosen data types train dataset the conversion is + * not lossless. + * + * Usage example: + * @code{.cpp} + * auto quantized_dataset = raft::make_device_matrix(handle, samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); auto dataset_revert = raft::make_device_matrix(handle, samples, features); + * cuvs::preprocessing::quantize::scalar::inverse_transform(handle, quantizer, + * dataset_revert.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on device + * @param[out] out a row-major matrix view on device + * + */ +void inverse_transform(raft::resources const& res, + const quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out); + +/** + * @brief Perform inverse quantization step on previously quantized dataset + * + * Note that depending on the chosen data types train dataset the conversion is + * not lossless. + * + * Usage example: + * @code{.cpp} + * auto quantized_dataset = raft::make_host_matrix(samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); auto dataset_revert = raft::make_host_matrix(samples, + * features); cuvs::preprocessing::quantize::scalar::inverse_transform(handle, quantizer, + * dataset_revert.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on host + * @param[out] out a row-major matrix view on host + * + */ +void inverse_transform(raft::resources const& res, + const quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out); + +/** + * @brief Initializes a scalar quantizer to be used later for quantizing the dataset. + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); + * @endcode + * + * @param[in] res raft resource + * @param[in] params configure scalar quantizer, e.g. quantile + * @param[in] dataset a row-major matrix view on device + * + * @return quantizer + */ +quantizer train(raft::resources const& res, + const params params, + raft::device_matrix_view dataset); + +/** + * @brief Initializes a scalar quantizer to be used later for quantizing the dataset. + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); + * @endcode + * + * @param[in] res raft resource + * @param[in] params configure scalar quantizer, e.g. quantile + * @param[in] dataset a row-major matrix view on host + * + * @return quantizer + */ +quantizer train(raft::resources const& res, + const params params, + raft::host_matrix_view dataset); + +/** + * @brief Applies quantization transform to given dataset + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); auto quantized_dataset = raft::make_device_matrix(handle, samples, + * features); cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on device + * @param[out] out a row-major matrix view on device + * + */ +void transform(raft::resources const& res, + const quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out); + +/** + * @brief Applies quantization transform to given dataset + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); auto quantized_dataset = raft::make_host_matrix(samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on host + * @param[out] out a row-major matrix view on host + * + */ +void transform(raft::resources const& res, + const quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out); + +/** + * @brief Perform inverse quantization step on previously quantized dataset + * + * Note that depending on the chosen data types train dataset the conversion is + * not lossless. + * + * Usage example: + * @code{.cpp} + * auto quantized_dataset = raft::make_device_matrix(handle, samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); auto dataset_revert = raft::make_device_matrix(handle, + * samples, features); cuvs::preprocessing::quantize::scalar::inverse_transform(handle, quantizer, + * dataset_revert.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on device + * @param[out] out a row-major matrix view on device + * + */ +void inverse_transform(raft::resources const& res, + const quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out); + +/** + * @brief Perform inverse quantization step on previously quantized dataset + * + * Note that depending on the chosen data types train dataset the conversion is + * not lossless. + * + * Usage example: + * @code{.cpp} + * auto quantized_dataset = raft::make_host_matrix(samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); auto dataset_revert = raft::make_host_matrix(samples, + * features); cuvs::preprocessing::quantize::scalar::inverse_transform(handle, quantizer, + * dataset_revert.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on host + * @param[out] out a row-major matrix view on host + * + */ +void inverse_transform(raft::resources const& res, + const quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out); + +/** + * @brief Initializes a scalar quantizer to be used later for quantizing the dataset. + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); + * @endcode + * + * @param[in] res raft resource + * @param[in] params configure scalar quantizer, e.g. quantile + * @param[in] dataset a row-major matrix view on device + * + * @return quantizer + */ +quantizer train(raft::resources const& res, + const params params, + raft::device_matrix_view dataset); + +/** + * @brief Initializes a scalar quantizer to be used later for quantizing the dataset. + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); + * @endcode + * + * @param[in] res raft resource + * @param[in] params configure scalar quantizer, e.g. quantile + * @param[in] dataset a row-major matrix view on host + * + * @return quantizer + */ +quantizer train(raft::resources const& res, + const params params, + raft::host_matrix_view dataset); + +/** + * @brief Applies quantization transform to given dataset + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); auto quantized_dataset = raft::make_device_matrix(handle, samples, + * features); cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on device + * @param[out] out a row-major matrix view on device + * + */ +void transform(raft::resources const& res, + const quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out); + +/** + * @brief Applies quantization transform to given dataset + * + * Usage example: + * @code{.cpp} + * raft::handle_t handle; + * cuvs::preprocessing::quantize::scalar::params params; + * auto quantizer = cuvs::preprocessing::quantize::scalar::train(handle, params, + * dataset); auto quantized_dataset = raft::make_host_matrix(samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on host + * @param[out] out a row-major matrix view on host + * + */ +void transform(raft::resources const& res, + const quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out); + +/** + * @brief Perform inverse quantization step on previously quantized dataset + * + * Note that depending on the chosen data types train dataset the conversion is + * not lossless. + * + * Usage example: + * @code{.cpp} + * auto quantized_dataset = raft::make_device_matrix(handle, samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); auto dataset_revert = raft::make_device_matrix(handle, + * samples, features); cuvs::preprocessing::quantize::scalar::inverse_transform(handle, quantizer, + * dataset_revert.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on device + * @param[out] out a row-major matrix view on device + * + */ +void inverse_transform(raft::resources const& res, + const quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out); + +/** + * @brief Perform inverse quantization step on previously quantized dataset + * + * Note that depending on the chosen data types train dataset the conversion is + * not lossless. + * + * Usage example: + * @code{.cpp} + * auto quantized_dataset = raft::make_host_matrix(samples, features); + * cuvs::preprocessing::quantize::scalar::transform(handle, quantizer, dataset, + * quantized_dataset.view()); auto dataset_revert = raft::make_host_matrix(samples, + * features); cuvs::preprocessing::quantize::scalar::inverse_transform(handle, quantizer, + * dataset_revert.view()); + * @endcode + * + * @param[in] res raft resource + * @param[in] quantizer a scalar quantizer + * @param[in] dataset a row-major matrix view on host + * @param[out] out a row-major matrix view on host + * + */ +void inverse_transform(raft::resources const& res, + const quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out); + +/** @} */ // end of group scalar + +} // namespace cuvs::preprocessing::quantize::scalar diff --git a/cpp/src/core/c_api.cpp b/cpp/src/core/c_api.cpp index cfbeed2d5..4333bff0c 100644 --- a/cpp/src/core/c_api.cpp +++ b/cpp/src/core/c_api.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include extern "C" cuvsError_t cuvsResourcesCreate(cuvsResources_t* res) @@ -130,6 +131,21 @@ extern "C" cuvsError_t cuvsRMMMemoryResourceReset() }); } +thread_local std::unique_ptr pinned_mr; + +extern "C" cuvsError_t cuvsRMMHostAlloc(void** ptr, size_t bytes) +{ + return cuvs::core::translate_exceptions([=] { + if (pinned_mr == nullptr) { pinned_mr = std::make_unique(); } + *ptr = pinned_mr->allocate(bytes); + }); +} + +extern "C" cuvsError_t cuvsRMMHostFree(void* ptr, size_t bytes) +{ + return cuvs::core::translate_exceptions([=] { pinned_mr->deallocate(ptr, bytes); }); +} + thread_local std::string last_error_text = ""; extern "C" const char* cuvsGetLastErrorText() diff --git a/cpp/src/neighbors/detail/cagra/search_single_cta.cuh b/cpp/src/neighbors/detail/cagra/search_single_cta.cuh index d15ddd269..0911d440c 100644 --- a/cpp/src/neighbors/detail/cagra/search_single_cta.cuh +++ b/cpp/src/neighbors/detail/cagra/search_single_cta.cuh @@ -130,17 +130,27 @@ struct search : search_plan_impl { (sizeof(INDEX_T) + sizeof(DISTANCE_T)) * result_buffer_size_32 + sizeof(INDEX_T) * hashmap::get_size(small_hash_bitlen) + sizeof(INDEX_T) * search_width + sizeof(std::uint32_t) * topk_ws_size + sizeof(std::uint32_t); - smem_size = base_smem_size; + + std::uint32_t additional_smem_size = 0; if (num_itopk_candidates > 256) { // Tentatively calculate the required share memory size when radix // sort based topk is used, assuming the block size is the maximum. if (itopk_size <= 256) { - smem_size += topk_by_radix_sort<256, INDEX_T>::smem_size * sizeof(std::uint32_t); + additional_smem_size += topk_by_radix_sort<256, INDEX_T>::smem_size * sizeof(std::uint32_t); } else { - smem_size += topk_by_radix_sort<512, INDEX_T>::smem_size * sizeof(std::uint32_t); + additional_smem_size += topk_by_radix_sort<512, INDEX_T>::smem_size * sizeof(std::uint32_t); } } + if (!std::is_same_v) { + // For filtering postprocess + using scan_op_t = cub::WarpScan; + additional_smem_size = + std::max(additional_smem_size, sizeof(scan_op_t::TempStorage)); + } + + smem_size = base_smem_size + additional_smem_size; + uint32_t block_size = thread_block_size; if (block_size == 0) { block_size = min_block_size; diff --git a/cpp/src/neighbors/detail/cagra/search_single_cta_kernel-inl.cuh b/cpp/src/neighbors/detail/cagra/search_single_cta_kernel-inl.cuh index 5ba2bc038..0eedb8d09 100644 --- a/cpp/src/neighbors/detail/cagra/search_single_cta_kernel-inl.cuh +++ b/cpp/src/neighbors/detail/cagra/search_single_cta_kernel-inl.cuh @@ -111,7 +111,7 @@ RAFT_DEVICE_INLINE_FUNCTION void pickup_next_parents(std::uint32_t* const termin } template -RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_1st( +RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_and_full( float* candidate_distances, // [num_candidates] IdxT* candidate_indices, // [num_candidates] const std::uint32_t num_candidates, @@ -215,7 +215,7 @@ RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_1st( } template -RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_2nd( +RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_and_merge( float* itopk_distances, // [num_itopk] IdxT* itopk_indices, // [num_itopk] const std::uint32_t num_itopk, @@ -424,7 +424,7 @@ RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_2nd( template -RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort( +RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort_and_merge( float* itopk_distances, // [num_itopk] IdxT* itopk_indices, // [num_itopk] const std::uint32_t num_itopk, @@ -437,20 +437,62 @@ RAFT_DEVICE_INLINE_FUNCTION void topk_by_bitonic_sort( const unsigned MULTI_WARPS_2) { // The results in candidate_distances/indices are sorted by bitonic sort. - topk_by_bitonic_sort_1st( + topk_by_bitonic_sort_and_full( candidate_distances, candidate_indices, num_candidates, num_itopk, MULTI_WARPS_1); // The results sorted above are merged with the internal intermediate top-k // results so far using bitonic merge. - topk_by_bitonic_sort_2nd(itopk_distances, - itopk_indices, - num_itopk, - candidate_distances, - candidate_indices, - num_candidates, - work_buf, - first, - MULTI_WARPS_2); + topk_by_bitonic_sort_and_merge(itopk_distances, + itopk_indices, + num_itopk, + candidate_distances, + candidate_indices, + num_candidates, + work_buf, + first, + MULTI_WARPS_2); +} + +// This function move the invalid index element to the end of the itopk list. +// Require : array_length % 32 == 0 && The invalid entry is only one. +template +RAFT_DEVICE_INLINE_FUNCTION void move_invalid_to_end_of_list(IdxT* const index_array, + float* const distance_array, + const std::uint32_t array_length) +{ + constexpr std::uint32_t warp_size = 32; + constexpr std::uint32_t invalid_index = utils::get_max_value(); + const std::uint32_t lane_id = threadIdx.x % warp_size; + + if (threadIdx.x >= warp_size) { return; } + + bool found_invalid = false; + if (array_length % warp_size == 0) { + for (std::uint32_t i = lane_id; i < array_length; i += warp_size) { + const auto index = index_array[i]; + const auto distance = distance_array[i]; + + if (found_invalid) { + index_array[i - 1] = index; + distance_array[i - 1] = distance; + } else { + // Check if the index is invalid + const auto I_found_invalid = (index == invalid_index); + const auto who_has_invalid = raft::ballot(I_found_invalid); + // if a value that is loaded by a smaller lane id thread, shift the array + if (who_has_invalid << (warp_size - lane_id)) { + index_array[i - 1] = index; + distance_array[i - 1] = distance; + } + + found_invalid = who_has_invalid; + } + } + } + if (lane_id == 0) { + index_array[array_length - 1] = invalid_index; + distance_array[array_length - 1] = utils::get_max_value(); + } } template @@ -591,10 +633,10 @@ __device__ void search_core( // sort if constexpr (TOPK_BY_BITONIC_SORT) { // [Notice] - // It is good to use multiple warps in topk_by_bitonic_sort() when + // It is good to use multiple warps in topk_by_bitonic_sort_and_merge() when // batch size is small (short-latency), but it might not be always good // when batch size is large (high-throughput). - // topk_by_bitonic_sort() consists of two operations: + // topk_by_bitonic_sort_and_merge() consists of two operations: // if MAX_CANDIDATES is greater than 128, the first operation uses two warps; // if MAX_ITOPK is greater than 256, the second operation used two warps. const unsigned multi_warps_1 = ((blockDim.x >= 64) && (MAX_CANDIDATES > 128)) ? 1 : 0; @@ -603,9 +645,9 @@ __device__ void search_core( // reset small-hash table. if ((iter + 1) % small_hash_reset_interval == 0) { // Depending on the block size and the number of warps used in - // topk_by_bitonic_sort(), determine which warps are used to reset + // topk_by_bitonic_sort_and_merge(), determine which warps are used to reset // the small hash and whether they are performed in overlap with - // topk_by_bitonic_sort(). + // topk_by_bitonic_sort_and_merge(). _CLK_START(); unsigned hash_start_tid; if (blockDim.x == 32) { @@ -629,28 +671,28 @@ __device__ void search_core( // topk with bitonic sort _CLK_START(); - if (std::is_same::value || - *filter_flag == 0) { - topk_by_bitonic_sort(result_distances_buffer, - result_indices_buffer, - internal_topk, - result_distances_buffer + internal_topk, - result_indices_buffer + internal_topk, - search_width * graph_degree, - topk_ws, - (iter == 0), - multi_warps_1, - multi_warps_2); - __syncthreads(); - } else { - topk_by_bitonic_sort_1st( - result_distances_buffer, - result_indices_buffer, - internal_topk + search_width * graph_degree, - internal_topk, - false); + if (!(std::is_same::value || + *filter_flag == 0)) { + // Move the filtered out index to the end of the itopk list + for (unsigned i = 0; i < search_width; i++) { + move_invalid_to_end_of_list( + result_indices_buffer, result_distances_buffer, internal_topk); + } + if (threadIdx.x == 0) { *terminate_flag = 0; } } + topk_by_bitonic_sort_and_merge( + result_distances_buffer, + result_indices_buffer, + internal_topk, + result_distances_buffer + internal_topk, + result_indices_buffer + internal_topk, + search_width * graph_degree, + topk_ws, + (iter == 0), + multi_warps_1, + multi_warps_2); + __syncthreads(); _CLK_REC(clk_topk); } else { _CLK_START(); @@ -759,12 +801,66 @@ __device__ void search_core( } __syncthreads(); - topk_by_bitonic_sort_1st( - result_distances_buffer, - result_indices_buffer, - internal_topk + search_width * graph_degree, - top_k, - false); + // Move invalid index items to the end of the buffer without sorting the entire buffer + using scan_op_t = cub::WarpScan; + auto& temp_storage = *reinterpret_cast(smem_work_ptr); + + constexpr std::uint32_t warp_size = 32; + if (threadIdx.x < warp_size) { + std::uint32_t num_found_valid = 0; + for (std::uint32_t buffer_offset = 0; buffer_offset < internal_topk; + buffer_offset += warp_size) { + // Calculate the new buffer index + const auto src_position = buffer_offset + threadIdx.x; + const std::uint32_t is_valid_index = + (result_indices_buffer[src_position] & (~index_msb_1_mask)) == invalid_index ? 0 : 1; + std::uint32_t new_position; + scan_op_t(temp_storage).InclusiveSum(is_valid_index, new_position); + if (is_valid_index) { + const auto dst_position = num_found_valid + (new_position - 1); + result_indices_buffer[dst_position] = result_indices_buffer[src_position]; + result_distances_buffer[dst_position] = result_distances_buffer[src_position]; + } + + // Calculate the largest valid position within a warp and bcast it for the next iteration + num_found_valid += new_position; + for (std::uint32_t offset = (warp_size >> 1); offset > 0; offset >>= 1) { + const auto v = raft::shfl_xor(num_found_valid, offset); + if ((threadIdx.x & offset) == 0) { num_found_valid = v; } + } + + // If the enough number of items are found, do early termination + if (num_found_valid >= top_k) { break; } + } + + if (num_found_valid < top_k) { + // Fill the remaining buffer with invalid values so that `topk_by_bitonic_sort_and_merge` is + // usable in the next step + for (std::uint32_t i = num_found_valid + threadIdx.x; i < internal_topk; i += warp_size) { + result_indices_buffer[i] = invalid_index; + result_distances_buffer[i] = utils::get_max_value(); + } + } + } + + // If the sufficient number of valid indexes are not in the internal topk, pick up from the + // candidate list. + if (top_k > internal_topk || result_indices_buffer[top_k - 1] == invalid_index) { + __syncthreads(); + const unsigned multi_warps_1 = ((blockDim.x >= 64) && (MAX_CANDIDATES > 128)) ? 1 : 0; + const unsigned multi_warps_2 = ((blockDim.x >= 64) && (MAX_ITOPK > 256)) ? 1 : 0; + topk_by_bitonic_sort_and_merge( + result_distances_buffer, + result_indices_buffer, + internal_topk, + result_distances_buffer + internal_topk, + result_indices_buffer + internal_topk, + search_width * graph_degree, + topk_ws, + (iter == 0), + multi_warps_1, + multi_warps_2); + } __syncthreads(); } diff --git a/cpp/src/neighbors/detail/dynamic_batching.cuh b/cpp/src/neighbors/detail/dynamic_batching.cuh new file mode 100644 index 000000000..5c6b1654e --- /dev/null +++ b/cpp/src/neighbors/detail/dynamic_batching.cuh @@ -0,0 +1,1197 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../sample_filter.cuh" + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifndef CUVS_SYSTEM_LITTLE_ENDIAN +#if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ +#define CUVS_SYSTEM_LITTLE_ENDIAN 0 +#else +#define CUVS_SYSTEM_LITTLE_ENDIAN 1 +#endif +#endif + +namespace cuvs::neighbors::dynamic_batching::detail { + +using raft::RAFT_NAME; // TODO: a workaround for RAFT_LOG_XXX macros + +/** + * A helper to make the requester threads more cooperative when busy-spinning. + * It is used in the wait loops across this file to reduce the CPU usage. + * + * Ideally, we should be using atomics notify/wait feature, but that is not always possible + * (e.g. waiting on multiple things or waiting on GPU volatile stores). + */ +struct local_waiter { + static constexpr inline int64_t kNonSleepIterations = 10; + + explicit local_waiter(std::chrono::nanoseconds base_sleep_time, + int64_t start_iteration = 0) noexcept + : base_sleep_time_{base_sleep_time}, iteration_{start_iteration} + { + } + + inline void wait() noexcept + { + if (iteration_ < 2) { + // Don't wait for the first few iterations: + // maybe there's a weak CAS op in the loop, or something else that could return quickly + } else if (iteration_ < kNonSleepIterations) { + std::this_thread::yield(); + } else { + auto k = iteration_ + 1 - kNonSleepIterations; + std::this_thread::sleep_for(base_sleep_time_ * k); + } + ++iteration_; + } + + inline void reset(int64_t start_iteration = 0) noexcept { iteration_ = start_iteration; } + + private: + std::chrono::nanoseconds base_sleep_time_; + int64_t iteration_; +}; + +class cuda_event { + public: + cuda_event(cuda_event&&) = default; + cuda_event& operator=(cuda_event&&) = default; + ~cuda_event() = default; + cuda_event(cuda_event const&) = delete; // Copying disallowed: one event one owner + cuda_event& operator=(cuda_event&) = delete; + + cuda_event() + : event_{[]() { + cudaEvent_t* e = new cudaEvent_t; + RAFT_CUDA_TRY(cudaEventCreateWithFlags(e, cudaEventDisableTiming)); + return e; + }(), + [](cudaEvent_t* e) { + RAFT_CUDA_TRY_NO_THROW(cudaEventDestroy(*e)); + delete e; + }} + { + } + + cudaEvent_t value() const { return *event_; } + + private: + std::unique_ptr> event_; +}; + +template +struct get_accessor_type_t { + using type = typename MdSpanOrArray::accessor_type; +}; + +template +struct get_accessor_type_t> { + using mdarray_type = raft::mdarray; + using view_type = typename mdarray_type::view_type; + using type = typename view_type::accessor_type; +}; + +template +using get_accessor_type = typename get_accessor_type_t::type; + +template +constexpr inline auto slice_3d(typename Source3DT::index_type i, + const Source3DT& source3d, + typename Source3DT::index_type n_rows = 0) +{ + using element_type = typename Source3DT::element_type; + using index_type = typename Source3DT::index_type; + using layout_type = typename Source3DT::layout_type; + using accessor_type = get_accessor_type; + auto extent2d = + raft::make_extents(n_rows == 0 ? source3d.extent(1) : n_rows, source3d.extent(2)); + auto stride = uint64_t(source3d.extent(1)) * uint64_t(source3d.extent(2)); + return raft::mdspan{ + const_cast(source3d.data_handle()) + stride * i, extent2d}; +} + +template +constexpr inline auto slice_2d(typename Source2DT::index_type i, const Source2DT& source2d) +{ + using element_type = typename Source2DT::element_type; + using index_type = typename Source2DT::index_type; + using layout_type = typename Source2DT::layout_type; + using accessor_type = get_accessor_type; + auto extent1d = raft::make_extents(source2d.extent(1)); + auto stride = uint64_t(extent1d.extent(0)); + return raft::mdspan{ + const_cast(source2d.data_handle()) + stride * i, extent1d}; +} + +// --------------------------------------------- + +constexpr size_t kCacheLineBytes = 64; + +template +using upstream_search_type_const = void(raft::resources const&, + typename Upstream::search_params_type const&, + Upstream const&, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view, + const cuvs::neighbors::filtering::base_filter&); + +template +using upstream_search_type = void(raft::resources const&, + typename Upstream::search_params_type const&, + Upstream&, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view, + const cuvs::neighbors::filtering::base_filter&); + +template +using function_search_type = void(raft::resources const&, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view); + +/** + * State of the batch token slot. + * + * In a nutshell, there are only two batch slot states that matter: empty or full. + * Initially, all slots are empty. The host threads can commit (i.e. subscribe) to a batch slot even + * if it's empty (when they know it will be filled-in at some point in future). With this logic, we + * smooth out the bottleneck that occurs when many threads try to submit their work using a single + * atomic counter (the batch queue head). + * + * Once a GPU IO buffer is available, its owner returns the buffer to the queue by marking a slot as + * full. By that time, it may be partially or fully committed (i.e. several host threads are + * committed to submit a certain number of queries). + * + * If we had an infinite buffer, these two states would suffice. However, we have a finite ring + * buffer, so the used-up slots must be emptied again, so that they are usable in the following + * rounds through the ring buffer. + * + * The slot state depends not only on the value stored in it, but on the accessing thread as well + * (see `batch_queue_t::batch_status` below). The accessing thread may be ahead or behind the others + * (as defined by the sequential order id below). Depending on the accessor state, it may view the + * slot as being emptied/filled in the future, current, or previous rounds. This affects the + * decision whether the slot can be used and whether the thread has the right to advance tail or + * head counters of the batch queue. + * + */ +enum struct slot_state : int32_t { + /** The slot is empty, cleared-up in this round (hence the head should be past it). */ + kEmptyPast = 1025, + /** The slot is empty, cleared-up in previous round. */ + kEmpty = 1024, + /** The slot is empty, cleared-up two round ago and cannot be used yet (due to be filled). */ + kEmptyBusy = 1023, + /** The current thread has been sleeping for too long and is way behind the others. */ + kFullPast = 1, + /** The slot is full, filled-in in this round. */ + kFull = 0, + /** This state is considered full, filled-in in previous round. */ + kFullBusy = -1 + /** The rest of the values are impossible states indicating an error in the algo. */ +}; + +/** + * Identifies the batch and its job-commit state. + * Should be in the pinned memory for fast shared access on CPU and GPU side. + * + * The batch token packs the IO buffer address (id) and a number of committed queries in a single + * 64-bit atomic. This is to allow conflict-free atomic updates of both values. + * + */ +struct batch_token { + uint64_t value = 0; + + constexpr inline batch_token() {} + explicit constexpr inline batch_token(uint32_t buffer_id) { id() = buffer_id; } + + /** + * Sequential id of the batch in the array of batches. + * + * The `id` field, in practice, stores not only the IO buffer address, but also an extra + * sequential "round" id. The latter identifies how many rounds through the batch ring buffer has + * already been done (computed from the the `seq_order_id` counter in the batch queue) and is used + * by `batch_queue_t::batch_status` below to compute the `slot_state`. This is to avoid the ABA + * atomic updates problem when using the ring buffer. + * + * There cannot be more IO buffers than the size of the ring buffer. The size of the ring buffer + * is always a power-of-two. Hence the IO buffer address needs only `log2(Size)` bits, and the + * rest is used for the ring buffer round id (see `batch_queue_t::make_seq_batch_id`). + * + */ + RAFT_INLINE_FUNCTION auto id() noexcept -> uint32_t& + { + return *(reinterpret_cast(&value) + kOffsetOfId); + } + /** + * How many queries are promised by the participating CPU threads (requesters). + * + * The CPU threads atomically increment this counter until its size reaches `max_batch_size`. + * + * Any (CPU or GPU thread) may atomically write to the highest byte of this value, which indicates + * that no one can commit to this batch anymore (e.g. the wait timeout is exceeded). + * Hence, the actual number of committed queries is `size_committed % 0x00ffffff`. + * + * The gather kernel cannot finish while `size_committed < max_batch_size`. + * + * NB: we use the trick of writing to the highest byte to allow GPU write atomically to the pinned + * host memory. This way, we don't need to use device RMW atomics on host memory, which are not + * available on a broad class of GPUs. If not this workaround, we could simply do atomic add/or + * with value 0x01000000. + */ + RAFT_INLINE_FUNCTION auto size_committed() noexcept -> uint32_t& + { + return *(reinterpret_cast(&value) + kOffsetOfSC); + } + + private: + /** Offset of the `id()` value in the token if it's interpreted as uint32_t[2]. */ + static constexpr inline uint32_t kOffsetOfId = CUVS_SYSTEM_LITTLE_ENDIAN; + /** Offset of the `size_committed()` value in the token if it's interpreted as uint32_t[2]. */ + static constexpr inline uint32_t kOffsetOfSC = 1 - kOffsetOfId; +}; +static_assert(sizeof(batch_token) == sizeof(uint64_t)); +static_assert(cuda::std::atomic::is_always_lock_free); + +/** + * The batch queue consists of several ring buffers and two counters determining where are the head + * and the tail of the queue in those buffers. + * + * There is an internal sequentially consistent order in the queue, defined by `seq_order_id` + * counter. The head and tail members define where the participants should look for full and + * empty slots in the queue respectively. + * + * The slots in the queue have their own states (see `slot_state` above). The states are updated + * concurrently in many threads, so the head and tail counters do not always accurately represent + * the actual compound state of the queue. + * + * `.head()` is where a host thread starts looking for a batch token. All slots earlier than + * returned by this method are not usable anymore (they batches are either "fully committed", + * dispatched, or emptied earlier). If a host thread determines that the current slot is not usable + * anymore, it increments the counter by calling `.pop()`. + * + * The tail is where a host thread reserves an empty slot to be filled-in by a GPU worker thread + * once it releases the owned IO buffer. There's no `.tail()` method, but `.push()` method returns + * the tail position (before advancing it). `.push()` blocks the host thread until it knows the slot + * isn't used by any other threads anymore (i.e. cleaned-up from the previous round). + * + * There's no strict relation between the head and the tail. + * Normally there is a single batch in the ring buffer being partially filled. It is followed by + * contiguous list of empty idle batches and reserved empty slots. The head and the tail loosely + * correspond to the beginning and the end of this sequence. + * + * Sometimes, the head can go further than the tail. This means all batches are busy and there are + * more threads committed to the slots that are not populated with the batches (and not even + * reserved for filling-in yet). + * + * + */ +template +struct batch_queue_t { + static constexpr uint32_t kSize = Size; + static constexpr uint32_t kMinElemSize = sizeof(uint32_t); + static_assert(cuda::std::atomic::is_always_lock_free, + "The value type must be lock-free."); + static_assert(cuda::std::atomic::is_always_lock_free, + "The value type must be lock-free."); + static_assert(cuda::std::atomic::is_always_lock_free, + "The value type must be lock-free."); + static_assert(raft::is_a_power_of_two(kSize), "The size must be a power-of-two for efficiency."); + + static constexpr auto kMemOrder = cuda::std::memory_order_relaxed; + + /** Type-safe synonym for the internal head & tail counters. */ + struct seq_order_id { + uint32_t value; + }; + + explicit batch_queue_t(const raft::resources& res, bool use_batch_sizes) noexcept + : tokens_{raft::make_pinned_vector, + uint32_t>(res, kSize)}, + rem_time_us_{ + raft::make_pinned_vector, uint32_t>( + res, kSize)}, + dispatch_sequence_id_(kSize), + batch_sizes_{ + use_batch_sizes + ? std::make_optional( + raft::make_pinned_vector, uint32_t>( + res, kSize)) + : std::nullopt} + { + tail_.store(0, kMemOrder); + head_.store(0, kMemOrder); + auto past_seq_id = seq_order_id{static_cast(-1)}; + for (uint32_t i = 0; i < kSize; i++) { + rem_time_us_(i).store(std::numeric_limits::max(), kMemOrder); + if (batch_sizes_.has_value()) { batch_sizes_.value()(i).store(0, kMemOrder); } + dispatch_sequence_id_[i].store(past_seq_id.value, kMemOrder); + tokens_(i).store(make_empty_token(past_seq_id), kMemOrder); + } + } + + /** + * Advance the tail position, ensure the slot is empty, and return the reference to the new slot. + * The calling side is responsible for filling-in the slot with an actual value at a later time. + * + * Conceptually, this method reserves a ring buffer slot on the host side, so that the GPU worker + * thread can return the IO buffer (filling the token slot) asynchronously. + */ + inline auto push() -> seq_order_id + { + seq_order_id seq_id{tail_.fetch_add(1, kMemOrder)}; + auto& loc = token(seq_id); + auto ss = batch_status(loc.load(kMemOrder), seq_id); + /* [Note: very small waiting time] + + Only a few (dispatcher) threads are going to call this function at the same time as opposed to + potentially any number of threads waiting on new batches to arrive. + This is a performance-critical code path. + + Hence the small base sleep time. + */ + local_waiter till_empty{std::chrono::nanoseconds{1000}}; + while (ss == slot_state::kFull || ss == slot_state::kFullBusy || ss == slot_state::kEmptyBusy) { + // Wait till the slot becomes empty (doesn't matter future or past). + // The batch id is only ever updated in the scatter/gather kernels, which are the only source + // of truth whether a batch buffer is currently used by the GPU. + till_empty.wait(); + ss = batch_status(loc.load(kMemOrder), seq_id); + } + return seq_id; + } + + /** + * Return the offset of the given w.r.t. the tail of the queue. + * Negative value means the given slot is in the body of the queue and should be dispatched soon. + * Positive value means the given slot is ahead of the queue and should wait longer. + * + * That is the lower the value the higher the priority. + */ + [[nodiscard]] inline auto niceness(seq_order_id id) const noexcept -> int32_t + { + return static_cast(id.value - tail_.load(kMemOrder)); + } + + /** Get the reference to the first element in the queue. */ + inline auto head() noexcept -> seq_order_id + { + auto h = head_.load(kMemOrder); + // The head cannot go ahead of the tail by more than the queue buffer size. + // If the head is ahead by not more than kSize elements though, everything is fine; + // the slots too far ahead are protected by busy tokens. + local_waiter for_tail(std::chrono::nanoseconds{100000}); + while (static_cast(h - tail_.load(kMemOrder)) >= static_cast(kSize)) { + for_tail.wait(); + h = head_.load(kMemOrder); + } + return seq_order_id{h}; + } + + /** Batch commit state and IO buffer id (see `batch_token`) */ + inline auto token(seq_order_id id) -> cuda::atomic& + { + return tokens_(cache_friendly_idx(id.value)); + } + + /** + * How much time has this batch left for waiting. + * It is an approximate value by design - to minimize the synchronization between CPU and GPU. + * + * The clocks on GPU and CPU may have different values, so the running kernel and the CPU thread + * have different ideas on how much time is left. Rather than trying to synchronize the clocks, we + * maintain independent timers and accept the uncertainty. + * + * Access pattern: CPU write-only (producer); GPU read-only (consumer). + */ + inline auto rem_time_us(seq_order_id id) -> cuda::atomic& + { + return rem_time_us_(cache_friendly_idx(id.value)); + } + + /** + * The actual batch size - the final number of committed queries. + * This is only used if `conservative_dispatch = true`. + */ + inline auto batch_size(seq_order_id id) noexcept + -> cuda::atomic* + { + if (batch_sizes_.has_value()) { return &batch_sizes_.value()(cache_friendly_idx(id.value)); } + return nullptr; + } + + /** + * This value is updated by the host thread after it submits the job completion event to indicate + * to other threads can wait on the event to get the results back. + * Other threads get the value from the batch queue and compare that value against this atomic. + * + * Access pattern: CPU-only; dispatching thread writes the id once, other threads wait on it. + */ + inline auto dispatch_sequence_id(seq_order_id id) -> cuda::std::atomic& + { + return dispatch_sequence_id_[cache_friendly_idx(id.value)]; + } + + /** + * An `atomicMax` on the queue head in disguise. + * This makes the given batch slot and all prior slots unreachable (not possible to commit). + */ + inline void pop(seq_order_id id) noexcept + { + const auto desired = id.value + 1; + auto observed = id.value; + while (observed < desired && + !head_.compare_exchange_weak(observed, desired, kMemOrder, kMemOrder)) {} + } + + static constexpr inline auto batch_id(batch_token token) noexcept -> uint32_t + { + return token.id() & kCounterLocMask; + } + + /** + * Construct a token that is interpreted as having been emptied in the current round + * (the round is derived from seq_id). + * + * NB: "round" is the number of times the queue counters went over the whole ring buffer. + * It's used to avoid the ABA problem for atomic token updates. + */ + static constexpr inline auto make_empty_token(seq_order_id seq_id) noexcept -> batch_token + { + // Modify the seq_id to identify that the token slot is empty + auto empty_round = static_cast(slot_state::kEmptyPast) * kSize; + auto empty_round_id = seq_order_id{seq_id.value + empty_round}; + // Id of empty slot is ignored and can be anything + auto empty_id = kCounterLocMask; + return batch_token{make_seq_batch_id(empty_round_id, empty_id)}; + } + + /** + * Construct a sequential batch id by combining the current round and the real batch id. + * + * The "round" part gives a hint when the token slot was filled-in to avoid the ABA problem + * (see above). + */ + static constexpr inline auto make_seq_batch_id(seq_order_id seq_id, uint32_t batch_id) noexcept + -> uint32_t + { + return seq_round(seq_id) | batch_id; + } + + /** + * Get the state of the batch slot w.r.t. the given seq_order_id counter. + * This gives the information whether the slot is emptied/filled by another thread and whether + * that thread is ahead or behind the current thread. + * By introducing these future/past flavours of states we solve the ABA problem for atomic updates + * of the ring buffer slots. + */ + static inline auto batch_status(batch_token token, seq_order_id seq_id) -> slot_state + { + /* + The "round" part of the id is just a seq_id without the low bits. + Essentially, we comparing here seq_ids of two threads: the one that wrote to the slot in the + past and the one reads from it now. + + `kSize` determines the number of bits we use for the IO buffer id and for the round id. + */ + auto v = + static_cast(seq_round(token) - seq_round(seq_id)) / static_cast(kSize); + if (v < static_cast(slot_state::kFullBusy)) { RAFT_FAIL("Invalid batch state %d", v); } + if (v < static_cast(slot_state::kEmptyBusy)) { + return static_cast(std::min(v, static_cast(slot_state::kFullPast))); + } + return static_cast(std::min(v, static_cast(slot_state::kEmptyPast))); + } + + private: + alignas(kCacheLineBytes) cuda::std::atomic tail_{}; + alignas(kCacheLineBytes) cuda::std::atomic head_{}; + + alignas(kCacheLineBytes) + raft::pinned_vector, uint32_t> tokens_; + raft::pinned_vector, uint32_t> rem_time_us_; + std::vector> dispatch_sequence_id_; + std::optional, uint32_t>> + batch_sizes_; + + /* [Note: cache-friendly indexing] + To avoid false sharing, the queue pushes and pops values not sequentially, but with an + increment that is larger than the cache line size. + Hence we introduce the `kCounterIncrement > kCacheLineBytes`. + However, to make sure all indices are used, we choose the increment to be coprime with the + buffer size. We also require that the buffer size is a power-of-two for two reasons: + 1) Fast modulus operation - reduces to binary `and` (with `kCounterLocMask`). + 2) Easy to ensure GCD(kCounterIncrement, kSize) == 1 by construction + (see the definition below). + */ + static constexpr uint32_t kElemsPerCacheLine = + raft::div_rounding_up_safe(kCacheLineBytes, kMinElemSize); + static constexpr uint32_t kCounterIncrement = raft::bound_by_power_of_two(kElemsPerCacheLine) + 1; + static constexpr uint32_t kCounterLocMask = kSize - 1; + // These props hold by design, but we add them here as a documentation and a sanity check. + static_assert( + kCounterIncrement * kMinElemSize >= kCacheLineBytes, + "The counter increment should be larger than the cache line size to avoid false sharing."); + static_assert( + std::gcd(kCounterIncrement, kSize) == 1, + "The counter increment and the size must be coprime to allow using all of the queue slots."); + /** Map the sequential index onto cache-friendly strided index. */ + static constexpr inline auto cache_friendly_idx(uint32_t source_idx) noexcept -> uint32_t + { + return (source_idx * kCounterIncrement) & kCounterLocMask; + } + + /** The "round": the number of times the queue counter went over the whole ring buffer. */ + static constexpr inline auto seq_round(seq_order_id id) noexcept -> uint32_t + { + return id.value & ~kCounterLocMask; + } + + /** The "round": the number of times the queue counter went over the whole ring buffer. */ + static constexpr inline auto seq_round(batch_token token) noexcept -> uint32_t + { + return token.id() & ~kCounterLocMask; + } +}; + +template +struct alignas(kCacheLineBytes) request_pointers { + /** + * A pointer to `dim` values of a single query (input). + * + * Serves as a synchronization point between the CPU thread (producer) and a GPU block in the + * `gather_inputs` kernel (consumer). + */ + cuda::atomic query{nullptr}; + /** A pointer to `k` nearest neighbors (output) */ + IdxT* neighbors{nullptr}; + /** A pointer to distances of `k` nearest neighbors (output) */ + float* distances{nullptr}; +}; + +/** + * Check the current timestamp at the moment of construction and repeatedly compare the elapsed time + * to the timeout value provided by the host (passed via an atomic). + * + * This is used in the gather inputs kernel to make it stop waiting for new queries in a batch + * once the deadline is reached. + */ +struct gpu_time_keeper { + /** + * @param[in] cpu_provided_remaining_time_us + * a pointer to a shared atomic, represent the remaining waiting time in microseconds. + * Note, the remaining time is updated atomically by each participating host thread in their + * "private coordinate systems". That's ok, we don't expect a single reference time for all host + * and device threads. + * We tolerate the errors coming from the time difference between the host thread writing their + * remaining waiting time and the GPU thread reading that value. + */ + RAFT_DEVICE_INLINE_FUNCTION explicit gpu_time_keeper( + cuda::atomic* cpu_provided_remaining_time_us) + : cpu_provided_remaining_time_us_{cpu_provided_remaining_time_us} + { + update_timestamp(); + } + + /** + * Check whether the deadline is not reached yet: + * 1) Compare the internal clock against the last-read deadline value + * 2) Read the deadline value from the host-visible atomic and check the internal clock again. + */ + RAFT_DEVICE_INLINE_FUNCTION auto has_time() noexcept -> bool + { + if (timeout) { return false; } + update_local_remaining_time(); + if (local_remaining_time_us_ <= 0) { + timeout = true; + return false; + } + update_cpu_provided_remaining_time(); + if (local_remaining_time_us_ <= 0) { + timeout = true; + return false; + } + return true; + } + + private: + cuda::atomic* cpu_provided_remaining_time_us_; + uint64_t timestamp_ns_ = 0; + int32_t local_remaining_time_us_ = std::numeric_limits::max(); + bool timeout = false; + + RAFT_DEVICE_INLINE_FUNCTION void update_timestamp() noexcept + { + asm volatile("mov.u64 %0, %%globaltimer;" : "=l"(timestamp_ns_)); + } + + RAFT_DEVICE_INLINE_FUNCTION void update_local_remaining_time() noexcept + { + auto prev_timestamp = timestamp_ns_; + update_timestamp(); + // subtract the time passed since the last check + // (assuming local time is updated every time timestamp is read) + local_remaining_time_us_ -= static_cast((timestamp_ns_ - prev_timestamp) / 1000ull); + } + + RAFT_DEVICE_INLINE_FUNCTION void update_cpu_provided_remaining_time() noexcept + { + local_remaining_time_us_ = + std::min(local_remaining_time_us_, + cpu_provided_remaining_time_us_->load(cuda::std::memory_order_relaxed)); + } +}; + +/** + * Copy the queries from the submitted pointers to the batch store, one query per block. + * Upon completion of this kernel, the submitted queries are all in the contiguous buffer + * `batch_queries`. + * + * Block size: (n, 1, 1) any number of threads copying a single row of data. + * Grid size: (max_batch_size, 1, 1) - one block per query + * + * Note, we view the incoming queries and the batch as going through multiple stages: + * 1) A host thread "commits" a query: it reserves a slot for the query in the batch and promises + * to fill-in the corresponding query pointer. + * 2) A host thread "submits" the query: it fills-in the pointer to the query data in the reserved + * slot. + * 3) This kernel copies the query data to the contiguous query buffer owned by the batch. + * + * The batch is "fully committed" when the number of committed queries reaches the maximum batch + * size (all slots are reserved). Committing, submitting, and copying of the queries is somewhat + * overlapped among multiple host and device threads. Only the copying happens in a CUDA stream in + * this kernel, and the upstream search is dispatched right after this kernel (in the same stream). + * + */ +template +RAFT_KERNEL gather_inputs( + raft::device_matrix_view batch_queries, + raft::pinned_vector_view, uint32_t> request_ptrs, + /* The remaining time may be updated on the host side: a thread with a tighter deadline may reduce + it (but not increase). */ + cuda::atomic* remaining_time_us, + /* The token contains the current number of queries committed and is cleared in this kernel. */ + cuda::atomic* batch_token_ptr, + /* The host-visible batch size counter (used in `conservative_dispatch`). */ + cuda::atomic* batch_size_out, + /** + * The token value considered empty depends on the round over the ring buffer + * (which is defined by the seq_order_id) + */ + batch_token empty_token_value, + /** + * The counter is used to find the last CTA to finish and to share the batch size with the + * scatter_inputs kernel. + */ + cuda::atomic* kernel_progress_counter) +{ + const uint32_t query_id = blockIdx.x; + __shared__ const T* query_ptr; + + if (threadIdx.x == 0) { + query_ptr = nullptr; + + // NB: we have to read/write to `batch_token_ptr`, `bs_committed`, and `batch_fully_committed` + // using volatile assembly ops, because otherwise the compiler seems to fail to understand that + // this is the same location in memory. The order of reads in writes here is extremely + // important, as it involves multiple host and device threads (the host threads do RMW atomic + // increments on the commit counter). + volatile uint32_t* bs_committed = + reinterpret_cast(batch_token_ptr) + 1 - CUVS_SYSTEM_LITTLE_ENDIAN; + volatile uint8_t* batch_fully_committed = + reinterpret_cast(bs_committed) + (CUVS_SYSTEM_LITTLE_ENDIAN * 3); + + gpu_time_keeper runtime{remaining_time_us}; + bool committed = false; // if the query is committed, we have to wait for it to arrive + auto& request_query_ptr = request_ptrs(query_id).query; + while (true) { + query_ptr = request_query_ptr.load(cuda::std::memory_order_acquire); + if (query_ptr != nullptr) { + // The query is submitted to this block's slot; erase the pointer buffer for future use and + // exit the loop. + request_query_ptr.store(nullptr, cuda::std::memory_order_relaxed); + break; + } + // The query hasn't been submitted, but is already committed; other checks may be skipped + if (committed) { continue; } + // Check if the query is committed + uint32_t committed_count; + asm volatile("ld.volatile.global.u32 %0, [%1];" + : "=r"(committed_count) + : "l"(bs_committed) + : "memory"); + committed = (committed_count & 0x00ffffff) > query_id; + if (committed) { continue; } + // If the query is not committed, but the batch is past the deadline, we exit without copying + // the query + if (committed_count > 0x00ffffff) { break; } + // The query hasn't been submitted yet; check if we're past the deadline + if (runtime.has_time()) { continue; } + // Otherwise, let the others know time is out + // Set the highest byte of the commit counter to 1 (thus avoiding RMW atomic) + // This prevents any more CPU threads from committing to this batch. + asm volatile("st.volatile.global.u8 [%0], %1;" + : + : "l"(batch_fully_committed), "r"(1) + : "memory"); + asm volatile("ld.volatile.global.u32 %0, [%1];" + : "=r"(committed_count) + : "l"(bs_committed) + : "memory"); + committed = (committed_count & 0x00ffffff) > query_id; + if (committed) { continue; } + break; + } + auto progress = kernel_progress_counter->fetch_add(1, cuda::std::memory_order_acq_rel) + 1; + if (progress >= gridDim.x) { + // read the last value of the committed count to know the batch size for sure + uint32_t committed_count; + asm volatile("ld.volatile.global.u32 %0, [%1];" + : "=r"(committed_count) + : "l"(bs_committed) + : "memory"); + committed_count &= 0x00ffffff; // Clear the timeout bit + if (batch_size_out != nullptr) { + // Inform the dispatcher about the final batch size if `conservative_dispatch` is enabled + batch_size_out->store(committed_count, cuda::std::memory_order_relaxed); + } + // store the batch size in the progress counter, so we can read it in the scatter kernel + kernel_progress_counter->store(committed_count, cuda::std::memory_order_relaxed); + // Clear the batch token slot, so it can be re-used by others + asm volatile("st.volatile.global.u64 [%0], %1;" + : + : "l"(reinterpret_cast(batch_token_ptr)), + "l"(reinterpret_cast(empty_token_value)) + : "memory"); + } + } + // The block waits till the leading thread gets the query pointer + cooperative_groups::this_thread_block().sync(); + auto query_ptr_local = query_ptr; + if (query_ptr_local == nullptr) { return; } + // block-wide copy input query + auto dim = batch_queries.extent(1); + for (uint32_t i = threadIdx.x; i < dim; i += blockDim.x) { + batch_queries(query_id, i) = query_ptr_local[i]; + } +} + +/** Copy the results of the search back to the requesters. */ +template +RAFT_KERNEL scatter_outputs( + raft::pinned_vector_view, uint32_t> request_ptrs, + raft::device_matrix_view batch_neighbors, + raft::device_matrix_view batch_distances, + cuda::atomic* kernel_progress_counter, + cuda::atomic* next_token, + uint32_t batch_id) +{ + __shared__ uint32_t batch_size; + if (threadIdx.x == 0 && threadIdx.y == 0) { + batch_size = kernel_progress_counter->exchange(0, cuda::std::memory_order_relaxed); + } + // Copy output + cooperative_groups::this_thread_block().sync(); + auto k = batch_neighbors.extent(1); + for (uint32_t i = threadIdx.y; i < batch_size; i += blockDim.y) { + auto* request_neighbors = request_ptrs(i).neighbors; + auto* request_distances = request_ptrs(i).distances; + for (uint32_t j = threadIdx.x; j < k; j += blockDim.x) { + request_neighbors[j] = batch_neighbors(i, j); + request_distances[j] = batch_distances(i, j); + } + } + // Clear the batch state after all threads copied the data, so the batch can be reused + cuda::atomic_thread_fence(cuda::std::memory_order_release, cuda::thread_scope_system); + cooperative_groups::this_thread_block().sync(); + if (threadIdx.x != 0 || threadIdx.y != 0) { return; } + reinterpret_cast*>( + &reinterpret_cast(next_token)->id()) + ->store(batch_id, cuda::std::memory_order_relaxed); +} + +/** + * Batch runner is shared among the users of the `dynamic_batching::index` (i.e. the index can be + * copied, but the copies hold shared pointers to a single batch runner). + * + * Constructor and destructor of this class do not need to be thread-safe, as their execution is + * guaranteed to happen in one thread by the holding shared pointer. + * + * The search function must be thread-safe. We only have to pay attention to the `mutable` members + * though, because the function is marked const. + */ +template +class batch_runner { + public: + constexpr static uint32_t kMaxNumQueues = 256; + + using batch_queue = batch_queue_t; + using seq_order_id = typename batch_queue::seq_order_id; + + // Save the parameters and the upstream batched search function to invoke + template + batch_runner(const raft::resources& res, + const dynamic_batching::index_params& params, + const Upstream& upstream_index, + const typename Upstream::search_params_type& upstream_params, + upstream_search_type_const* upstream_search, + const cuvs::neighbors::filtering::base_filter* sample_filter) + : res_{res}, + upstream_search_{[&upstream_index, upstream_search, upstream_params, sample_filter]( + raft::resources const& res, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + /* Note: passing sample_filter by pointer + + Ideally, dynamic batching would capture the filter by value. Unfortunately, one cannot use + the copy constructor of the `base_filter` (it would erase the actual filter type). + Therefore, we can only pass the filter by pointer or reference and require the user to keep + the filter alive for the lifetime of the dynamic batching index. + This, however, may lead to a segfault when the user doesn't provide the filter argument and + the argument is passed by reference: the lifetime of the none_sample_filter default argument + is limited to the search function call, so it is destroyed while the dynamic batching index + is still alive. + Hence the solution is to pass the filter by pointer and default it to nullptr. + */ + if (sample_filter == nullptr) { + using base_filter_type = cuvs::neighbors::filtering::base_filter; + const auto none_filter = cuvs::neighbors::filtering::none_sample_filter{}; + return upstream_search(res, + upstream_params, + upstream_index, + queries, + neighbors, + distances, + static_cast(none_filter)); + + } else { + return upstream_search( + res, upstream_params, upstream_index, queries, neighbors, distances, *sample_filter); + } + }}, + k_{uint32_t(params.k)}, + dim_{uint32_t(upstream_index.dim())}, + max_batch_size_{uint32_t(params.max_batch_size)}, + n_queues_{uint32_t(params.n_queues)}, + batch_queue_{res_, params.conservative_dispatch}, + completion_events_(n_queues_), + input_extents_{n_queues_, max_batch_size_, dim_}, + output_extents_{n_queues_, max_batch_size_, k_}, + queries_{raft::make_device_mdarray(res_, input_extents_)}, + neighbors_{raft::make_device_mdarray(res_, output_extents_)}, + distances_{raft::make_device_mdarray(res_, output_extents_)}, + kernel_progress_counters_{ + raft::make_device_vector>( + res_, n_queues_)}, + request_ptrs_{raft::make_pinned_matrix, uint32_t>( + res_, n_queues_, max_batch_size_)} + { + RAFT_CUDA_TRY(cudaMemsetAsync( + kernel_progress_counters_.data_handle(), + 0, + sizeof(*kernel_progress_counters_.data_handle()) * kernel_progress_counters_.size(), + raft::resource::get_cuda_stream(res_))); + // Make sure to initialize the atomic values in the batch_state structs. + for (uint32_t i = 0; i < n_queues_; i++) { + auto seq_id = batch_queue_.push(); + batch_queue_.token(seq_id).store(batch_token{batch_queue::make_seq_batch_id(seq_id, i)}); + // Make sure to initialize query pointers, because they are used for synchronization + for (uint32_t j = 0; j < max_batch_size_; j++) { + new (&request_ptrs_(i, j)) request_pointers{}; + } + } + } + + // A workaround for algos, which have non-const `index` type in their arguments + template + batch_runner(const raft::resources& res, + const dynamic_batching::index_params& params, + const Upstream& upstream_index, + const typename Upstream::search_params_type& upstream_params, + upstream_search_type* upstream_search, + const cuvs::neighbors::filtering::base_filter* sample_filter) + : batch_runner{ + res, + params, + upstream_index, + upstream_params, + reinterpret_cast*>(upstream_search), + sample_filter} + { + } + + void search(raft::resources const& res, + cuvs::neighbors::dynamic_batching::search_params const& params, + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) const + { + uint32_t n_queries = queries.extent(0); + if (n_queries >= max_batch_size_) { + return upstream_search_(res, queries, neighbors, distances); + } + + if (neighbors.extent(1) != int64_t(k_)) { + // TODO: the check can be relaxed to `neighbors.extent(1) > int64_t(k_)`; + // this, however, would require an extra bounds check per-query in the scatter kernel. + RAFT_LOG_WARN( + "The requested number of neighbors (%zd) doesn't match the configured " + "dynamic_batching::index_params::k (%u); dynamic batching is disabled for the request.", + neighbors.extent(1), + k_); + return upstream_search_(res, queries, neighbors, distances); + } + + auto deadline = std::chrono::system_clock::now() + + std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1000000.0)); + + int64_t local_io_offset = 0; + batch_token batch_token_observed{0}; + local_waiter to_commit{std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 3e5)), + local_waiter::kNonSleepIterations}; + while (true) { + const auto seq_id = batch_queue_.head(); + const auto commit_result = try_commit(seq_id, n_queries); + // The bool (busy or not) returned if no queries were committed: + if (std::holds_alternative(commit_result)) { + // Pause if the system is busy + // (otherwise the progress is guaranteed due to update of the head counter) + if (std::get(commit_result)) { to_commit.wait(); } + continue; // Try to get a new batch token + } + batch_token_observed = std::get(std::get<0>(commit_result)); + const auto queries_committed = std::get(std::get<0>(commit_result)); + const auto batch_offset = batch_token_observed.size_committed(); + auto& batch_token_ref = batch_queue_.token(seq_id); + auto& rem_time_us_ref = batch_queue_.rem_time_us(seq_id); + auto& dispatch_sequence_id_ref = batch_queue_.dispatch_sequence_id(seq_id); + auto* batch_size_ptr = batch_queue_.batch_size(seq_id); + // sleep for 1/10 of deadline time or more + // (if couldn't get the value in the first few iterations). + local_waiter till_full{std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1e5)), + batch_queue_.niceness(seq_id)}; + while (batch_queue::batch_status(batch_token_observed, seq_id) != slot_state::kFull) { + /* Note: waiting for batch IO buffers + The CPU threads can commit to the incoming batches in the queue in advance (this happens in + try_commit). + In this loop, a thread waits for the batch IO buffer to be released by a running search on + the GPU side (scatter_outputs kernel). Hence, this loop is engaged only if all buffers are + currently used, which suggests that the GPU is busy (or there's not enough IO buffers). + This also means the current search is not likely to meet the deadline set by the user. + + The scatter kernel returns its buffer id into an acquired slot in the batch queue; in this + loop we wait for that id to arrive. + + Generally, we want to waste as little as possible CPU cycles here to let other threads wait + on dispatch_sequence_id_ref below more efficiently. At the same time, we shouldn't use + `.wait()` here, because `.notify_all()` would have to come from GPU. + */ + till_full.wait(); + batch_token_observed = batch_token_ref.load(cuda::std::memory_order_acquire); + } + // Whether this thread is responsible for dispatching the batch. + bool is_dispatcher = batch_offset == 0; + auto stream = raft::resource::get_cuda_stream(res); + auto batch_id = batch_queue::batch_id(batch_token_observed); + auto request_ptrs = slice_2d(batch_id, request_ptrs_); + + if (is_dispatcher) { + // Conservatively initialize the remaining time + // TODO (achirkin): this initialization may happen after the other requesters update the + // time and thus erase their deadlines. + rem_time_us_ref.store(static_cast(params.dispatch_timeout_ms * 1000), + cuda::std::memory_order_relaxed); + // run the gather kernel before submitting the data to reduce the latency + gather_inputs<<>>( + slice_3d(batch_id, queries_), + request_ptrs, + &rem_time_us_ref, + &batch_token_ref, + batch_size_ptr, + // This indicates the empty token slot, which can only be used in the following round + batch_queue::make_empty_token(seq_id), + kernel_progress_counters_.data_handle() + batch_id); + } + + // *** Set the pointers to queries, neighbors, distances - query-by-query + for (uint32_t i = 0; i < queries_committed; i++) { + const auto o = local_io_offset + i; + auto& ptrs = request_ptrs(batch_offset + i); + ptrs.neighbors = neighbors.data_handle() + o * k_; + ptrs.distances = distances.data_handle() + o * k_; + ptrs.query.store(queries.data_handle() + o * dim_, cuda::std::memory_order_release); + } + + // Submit estimated remaining time + { + auto rem_time_us = static_cast( + std::max(0, (deadline - std::chrono::system_clock::now()).count()) / 1000); + rem_time_us_ref.fetch_min(rem_time_us, cuda::std::memory_order_relaxed); + } + + if (is_dispatcher) { + uint32_t batch_size = max_batch_size_; + if (batch_size_ptr != nullptr) { + // Block until the real batch size is available if conservative dispatch is used. + local_waiter for_dispatch{ + std::chrono::nanoseconds(size_t(params.dispatch_timeout_ms * 1e5))}; + batch_size = batch_size_ptr->load(cuda::std::memory_order_relaxed); + while (batch_size == 0) { + for_dispatch.wait(); + batch_size = batch_size_ptr->load(cuda::std::memory_order_relaxed); + } + batch_size_ptr->store(0, cuda::std::memory_order_relaxed); + } + auto batch_neighbors = slice_3d(batch_id, neighbors_, batch_size); + auto batch_distances = slice_3d(batch_id, distances_, batch_size); + upstream_search_( + res, slice_3d(batch_id, queries_, batch_size), batch_neighbors, batch_distances); + auto next_seq_id = batch_queue_.push(); + auto& next_token_ref = batch_queue_.token(next_seq_id); + // next_batch_token); + auto bs = dim3(128, 8, 1); + scatter_outputs + <<<1, bs, 0, stream>>>(request_ptrs, + batch_neighbors, + batch_distances, + kernel_progress_counters_.data_handle() + batch_id, + &next_token_ref, + batch_queue::make_seq_batch_id(next_seq_id, batch_id)); + RAFT_CUDA_TRY(cudaEventRecord(completion_events_[batch_id].value(), stream)); + dispatch_sequence_id_ref.store(seq_id.value, cuda::std::memory_order_release); + dispatch_sequence_id_ref.notify_all(); + + } else { + // Wait till the dispatch_sequence_id counter is updated, which means the event is recorded + auto dispatched_id_observed = + dispatch_sequence_id_ref.load(cuda::std::memory_order_acquire); + while (static_cast(seq_id.value - dispatched_id_observed) > 0) { + dispatch_sequence_id_ref.wait(dispatched_id_observed, cuda::std::memory_order_relaxed); + dispatched_id_observed = dispatch_sequence_id_ref.load(cuda::std::memory_order_acquire); + } + // Now we can safely record the event + RAFT_CUDA_TRY(cudaStreamWaitEvent(stream, completion_events_[batch_id].value())); + } + + n_queries -= queries_committed; + + if (n_queries == 0) { return; } + // If not all queries were committed, continue in the loop. + // TODO: it could potentially be more efficient to first commit everything and only then + // submit the work/wait for the event + local_io_offset += queries_committed; + to_commit.reset( + local_waiter::kNonSleepIterations); // reset the waiter for the next iteration. + } + } + + private: + raft::resources res_; // Sic! Store by value to copy the resource. + std::function> upstream_search_; + uint32_t k_; + uint32_t dim_; + uint32_t max_batch_size_; + uint32_t n_queues_; + + mutable batch_queue batch_queue_; + std::vector completion_events_; + + using batch_extents = raft::extent_3d; + batch_extents input_extents_; + batch_extents output_extents_; + + mutable raft::device_mdarray queries_; + mutable raft::device_mdarray neighbors_; + mutable raft::device_mdarray distances_; + mutable raft::device_vector> + kernel_progress_counters_; + + mutable raft::pinned_matrix, uint32_t, raft::row_major> request_ptrs_; + + /** + * Try to commit n_queries at most; returns the last observed batch_token (where `size_committed` + * represents offset at which new queries are committed if successful), the number of committed + * queries, or whether the ring buffer appears to be busy (on unsuccessful commit). + */ + auto try_commit(seq_order_id seq_id, uint32_t n_queries) const + -> std::variant, bool> + { + auto& batch_token_ref = batch_queue_.token(seq_id); + batch_token batch_token_observed = batch_token_ref.load(cuda::std::memory_order_relaxed); + batch_token batch_token_updated; + slot_state token_status; + do { + // The interpretation of the token status depends on the current seq_order_id and a similar + // counter in the token. This is to prevent conflicts when too many parallel requests wrap + // over the whole ring buffer (batch_queue_t). + token_status = batch_queue::batch_status(batch_token_observed, seq_id); + // Busy status means the current thread is a whole ring buffer ahead of the token. + // The thread should wait for the rest of the system. + if (token_status == slot_state::kFullBusy || token_status == slot_state::kEmptyBusy) { + return true; + } + // This branch checks if the token was recently filled or dispatched. + // This means the head counter of the ring buffer is slightly outdated. + if (token_status == slot_state::kEmptyPast || token_status == slot_state::kFullPast || + batch_token_observed.size_committed() >= max_batch_size_) { + batch_queue_.pop(seq_id); + return false; + } + batch_token_updated = batch_token_observed; + batch_token_updated.size_committed() = + std::min(batch_token_observed.size_committed() + n_queries, max_batch_size_); + } while (!batch_token_ref.compare_exchange_weak(batch_token_observed, + batch_token_updated, + cuda::std::memory_order_acq_rel, + cuda::std::memory_order_relaxed)); + if (batch_token_updated.size_committed() >= max_batch_size_) { + // The batch is already full, let's try to pop it from the queue + // (if nobody has done so already) + batch_queue_.pop(seq_id); + } + return std::make_tuple( + batch_token_observed, + batch_token_updated.size_committed() - batch_token_observed.size_committed()); + } +}; + +} // namespace cuvs::neighbors::dynamic_batching::detail diff --git a/cpp/src/neighbors/dynamic_batching.cu b/cpp/src/neighbors/dynamic_batching.cu new file mode 100644 index 000000000..6be70353b --- /dev/null +++ b/cpp/src/neighbors/dynamic_batching.cu @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "detail/dynamic_batching.cuh" + +#include +#include +#include + +#include +#include + +namespace cuvs::neighbors::dynamic_batching { + +// NB: the (template) index parameter should be the last; it may contain the spaces and so split +// into multiple preprocessor token. Then it is consumed as __VA_ARGS__ +// +#define CUVS_INST_DYNAMIC_BATCHING_INDEX(T, IdxT, Namespace, ...) \ + template <> \ + template <> \ + index::index( \ + const raft::resources& res, \ + const cuvs::neighbors::dynamic_batching::index_params& params, \ + const Namespace ::__VA_ARGS__& upstream_index, \ + const typename Namespace ::__VA_ARGS__::search_params_type& upstream_params, \ + const cuvs::neighbors::filtering::base_filter* sample_filter) \ + : runner{new detail::batch_runner( \ + res, params, upstream_index, upstream_params, Namespace ::search, sample_filter)} \ + { \ + } + +#define CUVS_INST_DYNAMIC_BATCHING_SEARCH(T, IdxT) \ + void search(raft::resources const& res, \ + cuvs::neighbors::dynamic_batching::search_params const& params, \ + cuvs::neighbors::dynamic_batching::index const& index, \ + raft::device_matrix_view queries, \ + raft::device_matrix_view neighbors, \ + raft::device_matrix_view distances) \ + { \ + return index.runner->search(res, params, queries, neighbors, distances); \ + } + +CUVS_INST_DYNAMIC_BATCHING_INDEX(float, uint32_t, cuvs::neighbors::cagra, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(half, uint32_t, cuvs::neighbors::cagra, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(int8_t, uint32_t, cuvs::neighbors::cagra, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(uint8_t, + uint32_t, + cuvs::neighbors::cagra, + index); + +CUVS_INST_DYNAMIC_BATCHING_INDEX(float, int64_t, cuvs::neighbors::ivf_pq, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(half, int64_t, cuvs::neighbors::ivf_pq, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(int8_t, int64_t, cuvs::neighbors::ivf_pq, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(uint8_t, int64_t, cuvs::neighbors::ivf_pq, index); + +CUVS_INST_DYNAMIC_BATCHING_INDEX(float, int64_t, cuvs::neighbors::ivf_flat, index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(int8_t, + int64_t, + cuvs::neighbors::ivf_flat, + index); +CUVS_INST_DYNAMIC_BATCHING_INDEX(uint8_t, + int64_t, + cuvs::neighbors::ivf_flat, + index); + +CUVS_INST_DYNAMIC_BATCHING_SEARCH(float, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(half, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(int8_t, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(uint8_t, int64_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(float, uint32_t); // uint32_t index type is needed for CAGRA +CUVS_INST_DYNAMIC_BATCHING_SEARCH(half, uint32_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(int8_t, uint32_t); +CUVS_INST_DYNAMIC_BATCHING_SEARCH(uint8_t, uint32_t); + +#undef CUVS_INST_DYNAMIC_BATCHING_INDEX +#undef CUVS_INST_DYNAMIC_BATCHING_SEARCH + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/src/preprocessing/quantize/detail/scalar.cuh b/cpp/src/preprocessing/quantize/detail/scalar.cuh new file mode 100644 index 000000000..fc132eb7f --- /dev/null +++ b/cpp/src/preprocessing/quantize/detail/scalar.cuh @@ -0,0 +1,227 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace cuvs::preprocessing::quantize::detail { + +template +_RAFT_HOST_DEVICE bool fp_lt(const T& a, const T& b) +{ + return a < b; +} + +template <> +_RAFT_HOST_DEVICE bool fp_lt(const half& a, const half& b) +{ + return static_cast(a) < static_cast(b); +} + +template +struct quantize_op { + const T min_; + const T max_; + const QuantI q_type_min_ = std::numeric_limits::min(); + const QuantI q_type_max_ = std::numeric_limits::max(); + const TempT scalar_; + const TempT offset_; + + constexpr explicit quantize_op(T min, T max) + : min_(min), + max_(max), + scalar_(static_cast(max_) > static_cast(min_) + ? ((static_cast(q_type_max_) - static_cast(q_type_min_)) / + (static_cast(max_) - static_cast(min_))) + : static_cast(1)), + offset_(static_cast(q_type_min_) - static_cast(min_) * scalar_) + { + } + + constexpr RAFT_INLINE_FUNCTION QuantI operator()(const T& x) const + { + if (!fp_lt(min_, x)) return q_type_min_; + if (!fp_lt(x, max_)) return q_type_max_; + return static_cast(lroundf(scalar_ * static_cast(x) + offset_)); + } + + constexpr RAFT_INLINE_FUNCTION T operator()(const QuantI& x) const + { + return static_cast((static_cast(x) - offset_) / scalar_); + } +}; + +template +std::tuple quantile_min_max(raft::resources const& res, + raft::device_matrix_view dataset, + double quantile) +{ + // settings for quantile approximation + constexpr size_t max_num_samples = 1000000; + constexpr int seed = 137; + + cudaStream_t stream = raft::resource::get_cuda_stream(res); + + // select subsample + raft::random::RngState rng(seed); + size_t n_elements = dataset.extent(0) * dataset.extent(1); + size_t subset_size = std::min(max_num_samples, n_elements); + auto subset = raft::make_device_vector(res, subset_size); + auto dataset_view = raft::make_device_vector_view(dataset.data_handle(), n_elements); + raft::random::sample_without_replacement( + res, rng, dataset_view, std::nullopt, subset.view(), std::nullopt); + + // quantile / sort and pick for now + thrust::sort(raft::resource::get_thrust_policy(res), + subset.data_handle(), + subset.data_handle() + subset_size); + + double half_quantile_pos = (0.5 + 0.5 * quantile) * subset_size; + int pos_max = std::ceil(half_quantile_pos) - 1; + int pos_min = subset_size - pos_max - 1; + + T minmax_h[2]; + raft::update_host(&(minmax_h[0]), subset.data_handle() + pos_min, 1, stream); + raft::update_host(&(minmax_h[1]), subset.data_handle() + pos_max, 1, stream); + raft::resource::sync_stream(res); + + return {minmax_h[0], minmax_h[1]}; +} + +template +std::tuple quantile_min_max(raft::resources const& res, + raft::host_matrix_view dataset, + double quantile) +{ + // settings for quantile approximation + constexpr size_t max_num_samples = 1000000; + constexpr int seed = 137; + + // select subsample + std::mt19937 rng(seed); + size_t n_elements = dataset.extent(0) * dataset.extent(1); + size_t subset_size = std::min(max_num_samples, n_elements); + std::vector subset; + std::sample(dataset.data_handle(), + dataset.data_handle() + n_elements, + std::back_inserter(subset), + subset_size, + rng); + + // quantile / sort and pick for now + thrust::sort(thrust::omp::par, subset.data(), subset.data() + subset_size, fp_lt); + double half_quantile_pos = (0.5 + 0.5 * quantile) * subset_size; + int pos_max = std::ceil(half_quantile_pos) - 1; + int pos_min = subset_size - pos_max - 1; + + return {subset[pos_min], subset[pos_max]}; +} + +template +cuvs::preprocessing::quantize::scalar::quantizer train( + raft::resources const& res, + const cuvs::preprocessing::quantize::scalar::params params, + raft::device_matrix_view dataset) +{ + RAFT_EXPECTS(params.quantile > 0.0 && params.quantile <= 1.0, + "quantile for scalar quantization needs to be within (0, 1] but is %f", + params.quantile); + + auto [min, max] = detail::quantile_min_max(res, dataset, params.quantile); + + RAFT_LOG_DEBUG("quantizer train min=%lf max=%lf.", double(min), double(max)); + + return cuvs::preprocessing::quantize::scalar::quantizer{min, max}; +} + +template +cuvs::preprocessing::quantize::scalar::quantizer train( + raft::resources const& res, + const cuvs::preprocessing::quantize::scalar::params params, + raft::host_matrix_view dataset) +{ + RAFT_EXPECTS(params.quantile > 0.0 && params.quantile <= 1.0, + "quantile for scalar quantization needs to be within (0, 1] but is %f", + params.quantile); + + auto [min, max] = detail::quantile_min_max(res, dataset, params.quantile); + + RAFT_LOG_DEBUG("quantizer train min=%lf max=%lf.", double(min), double(max)); + + return cuvs::preprocessing::quantize::scalar::quantizer{min, max}; +} + +template +void transform(raft::resources const& res, + const cuvs::preprocessing::quantize::scalar::quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(res); + + raft::linalg::map(res, out, quantize_op(quantizer.min_, quantizer.max_), dataset); +} + +template +void transform(raft::resources const& res, + const cuvs::preprocessing::quantize::scalar::quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out) +{ + auto main_op = quantize_op(quantizer.min_, quantizer.max_); + size_t n_elements = dataset.extent(0) * dataset.extent(1); + +#pragma omp parallel for + for (size_t i = 0; i < n_elements; ++i) { + out.data_handle()[i] = main_op(dataset.data_handle()[i]); + } +} + +template +void inverse_transform(raft::resources const& res, + const cuvs::preprocessing::quantize::scalar::quantizer& quantizer, + raft::device_matrix_view dataset, + raft::device_matrix_view out) +{ + cudaStream_t stream = raft::resource::get_cuda_stream(res); + + raft::linalg::map(res, out, quantize_op(quantizer.min_, quantizer.max_), dataset); +} + +template +void inverse_transform(raft::resources const& res, + const cuvs::preprocessing::quantize::scalar::quantizer& quantizer, + raft::host_matrix_view dataset, + raft::host_matrix_view out) +{ + auto main_op = quantize_op(quantizer.min_, quantizer.max_); + size_t n_elements = dataset.extent(0) * dataset.extent(1); + +#pragma omp parallel for + for (size_t i = 0; i < n_elements; ++i) { + out.data_handle()[i] = main_op(dataset.data_handle()[i]); + } +} + +} // namespace cuvs::preprocessing::quantize::detail diff --git a/cpp/src/preprocessing/quantize/scalar.cu b/cpp/src/preprocessing/quantize/scalar.cu new file mode 100644 index 000000000..9624ad4fe --- /dev/null +++ b/cpp/src/preprocessing/quantize/scalar.cu @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "./detail/scalar.cuh" + +#include + +namespace cuvs::preprocessing::quantize::scalar { + +#define CUVS_INST_QUANTIZATION(T, QuantI) \ + auto train(raft::resources const& res, \ + const params params, \ + raft::device_matrix_view dataset) \ + ->quantizer \ + { \ + return detail::train(res, params, dataset); \ + } \ + auto train(raft::resources const& res, \ + const params params, \ + raft::host_matrix_view dataset) \ + ->quantizer \ + { \ + return detail::train(res, params, dataset); \ + } \ + void transform(raft::resources const& res, \ + const quantizer& quantizer, \ + raft::device_matrix_view dataset, \ + raft::device_matrix_view out) \ + { \ + detail::transform(res, quantizer, dataset, out); \ + } \ + void transform(raft::resources const& res, \ + const quantizer& quantizer, \ + raft::host_matrix_view dataset, \ + raft::host_matrix_view out) \ + { \ + detail::transform(res, quantizer, dataset, out); \ + } \ + void inverse_transform(raft::resources const& res, \ + const quantizer& quantizer, \ + raft::device_matrix_view dataset, \ + raft::device_matrix_view out) \ + { \ + detail::inverse_transform(res, quantizer, dataset, out); \ + } \ + void inverse_transform(raft::resources const& res, \ + const quantizer& quantizer, \ + raft::host_matrix_view dataset, \ + raft::host_matrix_view out) \ + { \ + detail::inverse_transform(res, quantizer, dataset, out); \ + } \ + template struct quantizer; + +CUVS_INST_QUANTIZATION(double, int8_t); +CUVS_INST_QUANTIZATION(float, int8_t); +CUVS_INST_QUANTIZATION(half, int8_t); + +#undef CUVS_INST_QUANTIZATION + +} // namespace cuvs::preprocessing::quantize::scalar \ No newline at end of file diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 286d721d7..0ecac6ec2 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -175,6 +175,19 @@ if(BUILD_TESTS) 100 ) + ConfigureTest( + NAME + NEIGHBORS_DYNAMIC_BATCHING_TEST + PATH + neighbors/dynamic_batching/test_cagra.cu + neighbors/dynamic_batching/test_ivf_flat.cu + neighbors/dynamic_batching/test_ivf_pq.cu + GPUS + 1 + PERCENT + 100 + ) + if(BUILD_CAGRA_HNSWLIB) ConfigureTest(NAME NEIGHBORS_HNSW_TEST PATH neighbors/hnsw.cu GPUS 1 PERCENT 100) target_link_libraries(NEIGHBORS_HNSW_TEST PRIVATE hnswlib::hnswlib) @@ -213,6 +226,11 @@ if(BUILD_TESTS) PERCENT 100 ) + + ConfigureTest( + NAME PREPROCESSING_TEST PATH preprocessing/scalar_quantization.cu GPUS 1 PERCENT 100 + ) + ConfigureTest( NAME STATS_TEST PATH stats/trustworthiness.cu stats/silhouette_score.cu GPUS 1 PERCENT 100 ) diff --git a/cpp/test/core/c_api.c b/cpp/test/core/c_api.c index a3dae6004..a51824d2b 100644 --- a/cpp/test/core/c_api.c +++ b/cpp/test/core/c_api.c @@ -73,6 +73,15 @@ int main() error = cuvsRMMMemoryResourceReset(); if (error == CUVS_ERROR) { exit(EXIT_FAILURE); } + // Alloc memory on host (pinned) + void* ptr3; + cuvsError_t alloc_error_pinned = cuvsRMMHostAlloc(&ptr3, 1024); + if (alloc_error_pinned == CUVS_ERROR) { exit(EXIT_FAILURE); } + + // Free memory + cuvsError_t free_error_pinned = cuvsRMMHostFree(ptr3, 1024); + if (free_error_pinned == CUVS_ERROR) { exit(EXIT_FAILURE); } + // Destroy resources error = cuvsResourcesDestroy(res); if (error == CUVS_ERROR) { exit(EXIT_FAILURE); } diff --git a/cpp/test/neighbors/ann_cagra.cuh b/cpp/test/neighbors/ann_cagra.cuh index 660246c67..8d5701439 100644 --- a/cpp/test/neighbors/ann_cagra.cuh +++ b/cpp/test/neighbors/ann_cagra.cuh @@ -758,11 +758,7 @@ class AnnCagraFilterTest : public ::testing::TestWithParam { search_params.algo = ps.algo; search_params.max_queries = ps.max_queries; search_params.team_size = ps.team_size; - - // TODO: setting search_params.itopk_size here breaks the filter tests, but is required for - // k>1024 skip these tests until fixed - if (ps.k >= 1024) { GTEST_SKIP(); } - // search_params.itopk_size = ps.itopk_size; + search_params.itopk_size = ps.itopk_size; auto database_view = raft::make_device_matrix_view( (const DataT*)database.data(), ps.n_rows, ps.dim); diff --git a/cpp/test/neighbors/dynamic_batching.cuh b/cpp/test/neighbors/dynamic_batching.cuh new file mode 100644 index 000000000..b64c5b01e --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching.cuh @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "ann_utils.cuh" + +#include + +#include + +#include +#include +#include + +#include + +#include +#include +#include + +namespace cuvs::neighbors::dynamic_batching { + +struct dynamic_batching_spec { + int64_t n_queries = 1000; + int64_t n_rows = 100000; + int64_t dim = 128; + int64_t k = 10; + int64_t max_batch_size = 64; + size_t n_queues = 3; + bool conservative_dispatch = false; + cuvs::distance::DistanceType metric = cuvs::distance::DistanceType::L2Expanded; + int64_t max_concurrent_threads = 128; +}; + +inline ::std::ostream& operator<<(::std::ostream& os, const dynamic_batching_spec& p) +{ + os << "{n_queries=" << p.n_queries; + os << ", dataset shape=" << p.n_rows << "x" << p.dim; + os << ", metric=" << print_metric{p.metric}; + os << ", k=" << p.k; + os << ", max_batch_size=" << p.max_batch_size; + os << ", n_queues=" << p.n_queues; + os << ", conservative_dispatch=" << p.conservative_dispatch; + os << '}' << std::endl; + return os; +} + +template +using build_function = UpstreamT(const raft::resources&, + const typename UpstreamT::index_params_type&, + raft::device_matrix_view); + +template +using search_function = void(const raft::resources&, + const typename UpstreamT::search_params_type& params, + const UpstreamT& index, + raft::device_matrix_view, + raft::device_matrix_view, + raft::device_matrix_view, + const cuvs::neighbors::filtering::base_filter&); + +template UpstreamBuildF, + search_function UpstreamSearchF> +struct dynamic_batching_test : public ::testing::TestWithParam { + using distance_type = float; + using data_type = DataT; + using index_type = IdxT; + using upstream_type = UpstreamT; + + dynamic_batching_spec ps = ::testing::TestWithParam::GetParam(); + raft::resources res; + + // input data + std::optional> dataset = std::nullopt; + std::optional> queries = std::nullopt; + std::optional> neighbors_upsm = std::nullopt; + std::optional> neighbors_dynb = std::nullopt; + std::optional> distances_upsm = std::nullopt; + std::optional> distances_dynb = std::nullopt; + + // build parameters + cuvs::neighbors::index_params build_params_base{ps.metric}; + typename upstream_type::index_params_type build_params_upsm{build_params_base}; + dynamic_batching::index_params build_params_dynb{ + build_params_base, ps.k, ps.max_batch_size, ps.n_queues, ps.conservative_dispatch}; + + // search parameters + typename upstream_type::search_params_type search_params_upsm{}; + dynamic_batching::search_params search_params_dynb{}; + + // indexes + std::optional index_upsm = std::nullopt; + std::optional> index_dynb = std::nullopt; + + void build_all() + { + index_dynb.reset(); + index_upsm.reset(); + index_upsm = UpstreamBuildF(res, build_params_upsm, dataset->view()); + index_dynb.emplace(res, build_params_dynb, index_upsm.value(), search_params_upsm); + } + + void search_all() + { + // Search using upstream index - all queries at once + UpstreamSearchF(res, + search_params_upsm, + index_upsm.value(), + queries->view(), + neighbors_upsm->view(), + distances_upsm->view(), + filtering::none_sample_filter{}); + raft::resource::sync_stream(res); + + // Search with dynamic batching + // Streaming scenario: prepare concurrent resources + rmm::cuda_stream_pool worker_streams(ps.max_concurrent_threads); + std::vector> futures(ps.max_concurrent_threads); + std::vector resource_pool(0); + for (int64_t i = 0; i < ps.max_concurrent_threads; i++) { + resource_pool.push_back(res); // copies the resource + raft::resource::set_cuda_stream(resource_pool[i], worker_streams.get_stream(i)); + } + + // Try multiple batch sizes in a round-robin to improve test coverage + std::vector minibatch_sizes{1, 3, 7, 10}; + auto get_bs = [&minibatch_sizes](auto i) { + return minibatch_sizes[i % minibatch_sizes.size()]; + }; + int64_t i = 0; + for (int64_t offset = 0; offset < ps.n_queries; offset += get_bs(i++)) { + auto bs = std::min(get_bs(i), ps.n_queries - offset); + auto j = i % ps.max_concurrent_threads; + // wait for previous job in the same slot to finish + if (i >= ps.max_concurrent_threads) { futures[j].wait(); } + // submit a new job + futures[j] = std::async( + std::launch::async, + [&res = resource_pool[j], + ¶ms = search_params_dynb, + index = index_dynb.value(), + query_view = raft::make_device_matrix_view( + queries->data_handle() + offset * ps.dim, bs, ps.dim), + neighbors_view = raft::make_device_matrix_view( + neighbors_dynb->data_handle() + offset * ps.k, bs, ps.k), + distances_view = raft::make_device_matrix_view( + distances_dynb->data_handle() + offset * ps.k, bs, ps.k)]() { + dynamic_batching::search(res, params, index, query_view, neighbors_view, distances_view); + }); + } + + // finalize all resources + for (int64_t j = 0; j < ps.max_concurrent_threads && j < i; j++) { + futures[j].wait(); + raft::resource::sync_stream(resource_pool[j]); + } + raft::resource::sync_stream(res); + } + + /* + Check the dynamic batching generated neighbors against the upstream index. They both may be + imperfect w.r.t. the ground truth, but they shouldn't differ too much. + */ + void check_neighbors() + { + auto stream = raft::resource::get_cuda_stream(res); + size_t queries_size = ps.n_queries * ps.k; + std::vector neighbors_upsm_host(queries_size); + std::vector neighbors_dynb_host(queries_size); + std::vector distances_upsm_host(queries_size); + std::vector distances_dynb_host(queries_size); + raft::copy(neighbors_upsm_host.data(), neighbors_upsm->data_handle(), queries_size, stream); + raft::copy(neighbors_dynb_host.data(), neighbors_dynb->data_handle(), queries_size, stream); + raft::copy(distances_upsm_host.data(), distances_upsm->data_handle(), queries_size, stream); + raft::copy(distances_dynb_host.data(), distances_dynb->data_handle(), queries_size, stream); + raft::resource::sync_stream(res); + ASSERT_TRUE(eval_neighbours(neighbors_upsm_host, + neighbors_dynb_host, + distances_upsm_host, + distances_dynb_host, + ps.n_queries, + ps.k, + 0.001, + 0.9)) + << ps; + } + + void SetUp() override + { + dataset.emplace(raft::make_device_matrix(res, ps.n_rows, ps.dim)); + queries.emplace(raft::make_device_matrix(res, ps.n_queries, ps.dim)); + neighbors_upsm.emplace(raft::make_device_matrix(res, ps.n_queries, ps.k)); + neighbors_dynb.emplace(raft::make_device_matrix(res, ps.n_queries, ps.k)); + distances_upsm.emplace( + raft::make_device_matrix(res, ps.n_queries, ps.k)); + distances_dynb.emplace( + raft::make_device_matrix(res, ps.n_queries, ps.k)); + + raft::random::RngState rng(666ULL); + if constexpr (std::is_same_v || std::is_same_v) { + raft::random::uniform( + res, rng, dataset->data_handle(), dataset->size(), data_type(0.1), data_type(2.0)); + raft::random::uniform( + res, rng, queries->data_handle(), queries->size(), data_type(0.1), data_type(2.0)); + } else { + raft::random::uniformInt( + res, rng, dataset->data_handle(), dataset->size(), data_type(1), data_type(20)); + raft::random::uniformInt( + res, rng, queries->data_handle(), queries->size(), data_type(1), data_type(20)); + } + raft::resource::sync_stream(res); + } + + void TearDown() override + { + index_dynb.reset(); + index_upsm.reset(); + dataset.reset(); + queries.reset(); + neighbors_upsm.reset(); + neighbors_dynb.reset(); + distances_upsm.reset(); + distances_dynb.reset(); + raft::resource::sync_stream(res); + } +}; + +inline std::vector generate_inputs() +{ + std::vector inputs{dynamic_batching_spec{}}; + + for (auto alt_n_queries : {10, 50, 100}) { + dynamic_batching_spec input{}; + input.n_queries = alt_n_queries; + inputs.push_back(input); + } + + for (auto alt_k : {100, 200}) { + dynamic_batching_spec input{}; + input.k = alt_k; + inputs.push_back(input); + } + + for (auto alt_max_batch_size : {4, 16, 128, 256, 512, 1024}) { + dynamic_batching_spec input{}; + input.max_batch_size = alt_max_batch_size; + inputs.push_back(input); + } + + for (auto alt_n_queues : {1, 2, 16, 32}) { + dynamic_batching_spec input{}; + input.n_queues = alt_n_queues; + inputs.push_back(input); + } + + for (auto alt_max_concurrent_threads : {1, 2, 16, 32}) { + dynamic_batching_spec input{}; + input.max_concurrent_threads = alt_max_concurrent_threads; + inputs.push_back(input); + } + + { + auto n = inputs.size(); + for (size_t i = 0; i < n; i++) { + auto input = inputs[i]; + input.conservative_dispatch = !input.conservative_dispatch; + inputs.push_back(input); + } + } + + return inputs; +} + +const std::vector inputs = generate_inputs(); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/neighbors/dynamic_batching/test_cagra.cu b/cpp/test/neighbors/dynamic_batching/test_cagra.cu new file mode 100644 index 000000000..604fc29cf --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching/test_cagra.cu @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../dynamic_batching.cuh" + +#include + +namespace cuvs::neighbors::dynamic_batching { + +using cagra_F32 = dynamic_batching_test, + cagra::build, + cagra::search>; + +using cagra_U8 = dynamic_batching_test, + cagra::build, + cagra::search>; + +template +static void set_default_cagra_params(fixture& that) +{ + that.build_params_upsm.intermediate_graph_degree = 128; + that.build_params_upsm.graph_degree = 64; + that.search_params_upsm.itopk_size = + std::clamp(raft::bound_by_power_of_two(that.ps.k) * 16, 128, 512); +} + +TEST_P(cagra_F32, single_cta) +{ + set_default_cagra_params(*this); + search_params_upsm.algo = cagra::search_algo::SINGLE_CTA; + build_all(); + search_all(); + check_neighbors(); +} + +TEST_P(cagra_F32, multi_cta) +{ + set_default_cagra_params(*this); + search_params_upsm.algo = cagra::search_algo::MULTI_CTA; + build_all(); + search_all(); + check_neighbors(); +} + +TEST_P(cagra_F32, multi_kernel) +{ + set_default_cagra_params(*this); + search_params_upsm.algo = cagra::search_algo::MULTI_KERNEL; + build_all(); + search_all(); + check_neighbors(); +} + +TEST_P(cagra_U8, defaults) +{ + set_default_cagra_params(*this); + build_all(); + search_all(); + check_neighbors(); +} + +INSTANTIATE_TEST_CASE_P(dynamic_batching, cagra_F32, ::testing::ValuesIn(inputs)); +INSTANTIATE_TEST_CASE_P(dynamic_batching, cagra_U8, ::testing::ValuesIn(inputs)); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu b/cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu new file mode 100644 index 000000000..4922cffa3 --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching/test_ivf_flat.cu @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../dynamic_batching.cuh" + +#include + +namespace cuvs::neighbors::dynamic_batching { + +using ivf_flat_i8 = dynamic_batching_test, + ivf_flat::build, + ivf_flat::search>; + +TEST_P(ivf_flat_i8, defaults) +{ + build_params_upsm.n_lists = std::round(std::sqrt(ps.n_rows)); + search_params_upsm.n_probes = + std::max(std::min(build_params_upsm.n_lists, 10), + raft::div_rounding_up_safe(build_params_upsm.n_lists, 50)); + build_all(); + search_all(); + check_neighbors(); +} + +INSTANTIATE_TEST_CASE_P(dynamic_batching, ivf_flat_i8, ::testing::ValuesIn(inputs)); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu b/cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu new file mode 100644 index 000000000..ec57e0b57 --- /dev/null +++ b/cpp/test/neighbors/dynamic_batching/test_ivf_pq.cu @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "../dynamic_batching.cuh" + +#include + +namespace cuvs::neighbors::dynamic_batching { + +using ivf_pq_f16 = + dynamic_batching_test, ivf_pq::build, ivf_pq::search>; + +TEST_P(ivf_pq_f16, defaults) +{ + build_params_upsm.n_lists = std::round(std::sqrt(ps.n_rows)); + search_params_upsm.n_probes = + std::max(std::min(build_params_upsm.n_lists, 10), + raft::div_rounding_up_safe(build_params_upsm.n_lists, 50)); + build_all(); + search_all(); + check_neighbors(); +} + +INSTANTIATE_TEST_CASE_P(dynamic_batching, ivf_pq_f16, ::testing::ValuesIn(inputs)); + +} // namespace cuvs::neighbors::dynamic_batching diff --git a/cpp/test/preprocessing/scalar_quantization.cu b/cpp/test/preprocessing/scalar_quantization.cu new file mode 100644 index 000000000..2fdfe7555 --- /dev/null +++ b/cpp/test/preprocessing/scalar_quantization.cu @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../test_utils.cuh" +#include +#include +#include +#include +#include +#include +#include + +namespace cuvs::preprocessing::quantize::scalar { + +template +struct QuantizationInputs { + cuvs::preprocessing::quantize::scalar::params quantization_params; + int rows; + int cols; + T min = T(-1.0); + T max = T(1.0); + double threshold = 2e-2; +}; + +template +std::ostream& operator<<(std::ostream& os, const QuantizationInputs& inputs) +{ + return os << "quantization_quantile:<" << inputs.quantization_params.quantile + << "> rows:" << inputs.rows << " cols:" << inputs.cols << " min:" << (double)inputs.min + << " max:" << (double)inputs.max; +} + +template +class QuantizationTest : public ::testing::TestWithParam> { + public: + QuantizationTest() + : params_(::testing::TestWithParam>::GetParam()), + stream(raft::resource::get_cuda_stream(handle)), + input_(0, stream) + { + } + + double getRelativeErrorStddev(const T* array_a, const T* array_b, size_t size, float quantile) + { + // relative error elementwise + rmm::device_uvector relative_error(size, stream); + raft::linalg::binaryOp( + relative_error.data(), + array_a, + array_b, + size, + [] __device__(double a, double b) { + return a != b ? (raft::abs(a - b) / raft::max(raft::abs(a), raft::abs(b))) : 0; + }, + stream); + + // sort by size --> remove largest errors to account for quantile chosen + thrust::sort(raft::resource::get_thrust_policy(handle), + relative_error.data(), + relative_error.data() + size); + int elements_to_consider = + std::ceil(double(params_.quantization_params.quantile) * double(size)); + + rmm::device_uvector mu(1, stream); + RAFT_CUDA_TRY(cudaMemsetAsync(mu.data(), 0, sizeof(double), stream)); + + rmm::device_uvector error_stddev(1, stream); + raft::stats::stddev(error_stddev.data(), + relative_error.data(), + mu.data(), + 1, + elements_to_consider, + false, + true, + stream); + + double error_stddev_h; + raft::update_host(&error_stddev_h, error_stddev.data(), 1, stream); + raft::resource::sync_stream(handle, stream); + return error_stddev_h; + } + + protected: + void testScalarQuantization() + { + // dataset identical on host / device + auto dataset = raft::make_device_matrix_view( + (const T*)(input_.data()), rows_, cols_); + auto dataset_h = raft::make_host_matrix_view( + (const T*)(host_input_.data()), rows_, cols_); + + size_t print_size = std::min(input_.size(), 20ul); + + // train quantizer_1 on device + auto quantizer_1 = + cuvs::preprocessing::quantize::scalar::train(handle, params_.quantization_params, dataset); + std::cerr << "Q1: min = " << (double)quantizer_1.min_ << ", max = " << (double)quantizer_1.max_ + << std::endl; + + { + auto quantized_input_h = raft::make_host_matrix(rows_, cols_); + auto quantized_input_d = raft::make_device_matrix(handle, rows_, cols_); + cuvs::preprocessing::quantize::scalar::transform( + handle, quantizer_1, dataset, quantized_input_d.view()); + cuvs::preprocessing::quantize::scalar::transform( + handle, quantizer_1, dataset_h, quantized_input_h.view()); + + { + raft::print_device_vector("Input array: ", input_.data(), print_size, std::cerr); + + rmm::device_uvector quantization_for_print(print_size, stream); + raft::linalg::unaryOp(quantization_for_print.data(), + quantized_input_d.data_handle(), + print_size, + raft::cast_op{}, + stream); + raft::resource::sync_stream(handle, stream); + raft::print_device_vector( + "Quantized array 1: ", quantization_for_print.data(), print_size, std::cerr); + } + + // test (inverse) transform host/device equal + ASSERT_TRUE(devArrMatchHost(quantized_input_h.data_handle(), + quantized_input_d.data_handle(), + input_.size(), + cuvs::Compare(), + stream)); + + auto quantized_input_h_const_view = raft::make_host_matrix_view( + quantized_input_h.data_handle(), rows_, cols_); + auto re_transformed_input_h = raft::make_host_matrix(rows_, cols_); + cuvs::preprocessing::quantize::scalar::inverse_transform( + handle, quantizer_1, quantized_input_h_const_view, re_transformed_input_h.view()); + + auto quantized_input_d_const_view = raft::make_device_matrix_view( + quantized_input_d.data_handle(), rows_, cols_); + auto re_transformed_input_d = raft::make_device_matrix(handle, rows_, cols_); + cuvs::preprocessing::quantize::scalar::inverse_transform( + handle, quantizer_1, quantized_input_d_const_view, re_transformed_input_d.view()); + raft::print_device_vector( + "re-transformed array: ", re_transformed_input_d.data_handle(), print_size, std::cerr); + + { + double l2_error = getRelativeErrorStddev(dataset.data_handle(), + re_transformed_input_d.data_handle(), + input_.size(), + params_.quantization_params.quantile); + std::cerr << "error stddev = " << l2_error << ", threshold = " << params_.threshold + << std::endl; + // test (inverse) transform close to original dataset + ASSERT_TRUE(l2_error < params_.threshold); + } + } + + // train quantizer_2 on host + auto quantizer_2 = + cuvs::preprocessing::quantize::scalar::train(handle, params_.quantization_params, dataset_h); + std::cerr << "Q2: min = " << (double)quantizer_2.min_ << ", max = " << (double)quantizer_2.max_ + << std::endl; + + // check both quantizers are the same (valid if sampling is identical) + if (input_.size() <= 1000000) { + ASSERT_TRUE((double)quantizer_1.min_ == (double)quantizer_2.min_); + ASSERT_TRUE((double)quantizer_1.max_ == (double)quantizer_2.max_); + } + + { + // test transform host/device equal + auto quantized_input_h = raft::make_host_matrix(rows_, cols_); + auto quantized_input_d = raft::make_device_matrix(handle, rows_, cols_); + cuvs::preprocessing::quantize::scalar::transform( + handle, quantizer_2, dataset, quantized_input_d.view()); + cuvs::preprocessing::quantize::scalar::transform( + handle, quantizer_2, dataset_h, quantized_input_h.view()); + + { + rmm::device_uvector quantization_for_print(print_size, stream); + raft::linalg::unaryOp(quantization_for_print.data(), + quantized_input_d.data_handle(), + print_size, + raft::cast_op{}, + stream); + raft::resource::sync_stream(handle, stream); + raft::print_device_vector( + "Quantized array 2: ", quantization_for_print.data(), print_size, std::cerr); + } + + ASSERT_TRUE(devArrMatchHost(quantized_input_h.data_handle(), + quantized_input_d.data_handle(), + input_.size(), + cuvs::Compare(), + stream)); + } + + // sort_by_key (input, quantization) -- check <= on result + { + auto quantized_input = raft::make_device_matrix(handle, rows_, cols_); + cuvs::preprocessing::quantize::scalar::transform( + handle, quantizer_1, dataset, quantized_input.view()); + thrust::sort_by_key(raft::resource::get_thrust_policy(handle), + input_.data(), + input_.data() + input_.size(), + quantized_input.data_handle()); + std::vector quantized_input_sorted_host(input_.size()); + raft::update_host( + quantized_input_sorted_host.data(), quantized_input.data_handle(), input_.size(), stream); + raft::resource::sync_stream(handle, stream); + + for (size_t i = 0; i < input_.size() - 1; ++i) { + ASSERT_TRUE(quantized_input_sorted_host[i] <= quantized_input_sorted_host[i + 1]); + } + } + } + + void SetUp() override + { + rows_ = params_.rows; + cols_ = params_.cols; + + int n_elements = rows_ * cols_; + input_.resize(n_elements, stream); + host_input_.resize(n_elements); + + // random input + unsigned long long int seed = 1234ULL; + raft::random::RngState r(seed); + uniform(handle, r, input_.data(), input_.size(), params_.min, params_.max); + + raft::update_host(host_input_.data(), input_.data(), input_.size(), stream); + + raft::resource::sync_stream(handle, stream); + } + + private: + raft::resources handle; + cudaStream_t stream; + + QuantizationInputs params_; + int rows_; + int cols_; + rmm::device_uvector input_; + std::vector host_input_; +}; + +template +const std::vector> inputs = { + {{1.0}, 5, 5, T(0.0), T(1.0)}, + {{0.98}, 10, 20, T(0.0), T(1.0)}, + {{0.90}, 1000, 1500, T(-500.0), T(100.0)}, + {{0.59}, 100, 200}, + {{0.1}, 1, 1, T(0.0), T(1.0)}, + {{0.01}, 50, 50, T(0.0), T(1.0)}, + {{0.94}, 10, 20, T(-1.0), T(0.0)}, + {{0.95}, 10, 2, T(50.0), T(100.0)}, + {{0.95}, 10, 20, T(-500.0), T(-100.0)}, + {{0.95}, 10, 20, T(5.0), T(5.0)}, +}; + +typedef QuantizationTest QuantizationTest_float_int8t; +TEST_P(QuantizationTest_float_int8t, ScalarQuantizationTest) { this->testScalarQuantization(); } + +typedef QuantizationTest QuantizationTest_double_int8t; +TEST_P(QuantizationTest_double_int8t, ScalarQuantizationTest) { this->testScalarQuantization(); } + +typedef QuantizationTest QuantizationTest_half_int8t; +TEST_P(QuantizationTest_half_int8t, ScalarQuantizationTest) { this->testScalarQuantization(); } + +INSTANTIATE_TEST_CASE_P(QuantizationTest, + QuantizationTest_float_int8t, + ::testing::ValuesIn(inputs)); +INSTANTIATE_TEST_CASE_P(QuantizationTest, + QuantizationTest_double_int8t, + ::testing::ValuesIn(inputs)); +INSTANTIATE_TEST_CASE_P(QuantizationTest, + QuantizationTest_half_int8t, + ::testing::ValuesIn(inputs)); + +} // namespace cuvs::preprocessing::quantize::scalar diff --git a/docs/source/cpp_api.rst b/docs/source/cpp_api.rst index 49732dc92..34f48a88f 100644 --- a/docs/source/cpp_api.rst +++ b/docs/source/cpp_api.rst @@ -10,5 +10,6 @@ C++ API Documentation cpp_api/cluster.rst cpp_api/distance.rst cpp_api/neighbors.rst + cpp_api/preprocessing.rst cpp_api/selection.rst cpp_api/stats.rst diff --git a/docs/source/cpp_api/neighbors.rst b/docs/source/cpp_api/neighbors.rst index d55d58eb0..ab810ab53 100644 --- a/docs/source/cpp_api/neighbors.rst +++ b/docs/source/cpp_api/neighbors.rst @@ -11,6 +11,7 @@ Nearest Neighbors neighbors_bruteforce.rst neighbors_cagra.rst + neighbors_dynamic_batching.rst neighbors_hnsw.rst neighbors_ivf_flat.rst neighbors_ivf_pq.rst diff --git a/docs/source/cpp_api/neighbors_dynamic_batching.rst b/docs/source/cpp_api/neighbors_dynamic_batching.rst new file mode 100644 index 000000000..adc5cb56a --- /dev/null +++ b/docs/source/cpp_api/neighbors_dynamic_batching.rst @@ -0,0 +1,45 @@ +Dynamic Batching +================ + +Dynamic Batching allows grouping small search requests into batches to increase the device occupancy and throughput while keeping the latency within limits. + +.. role:: py(code) + :language: c++ + :class: highlight + +``#include `` + +namespace *cuvs::neighbors::dynamic_batching* + +Index build parameters +---------------------- + +.. doxygengroup:: dynamic_batching_cpp_index_params + :project: cuvs + :members: + :content-only: + +Index search parameters +----------------------- + +.. doxygengroup:: dynamic_batching_cpp_search_params + :project: cuvs + :members: + :content-only: + +Index +----- + +.. doxygengroup:: dynamic_batching_cpp_index + :project: cuvs + :members: + :content-only: + + +Index search +------------ + +.. doxygengroup:: dynamic_batching_cpp_search + :project: cuvs + :members: + :content-only: diff --git a/docs/source/cpp_api/preprocessing.rst b/docs/source/cpp_api/preprocessing.rst new file mode 100644 index 000000000..1c2b0f051 --- /dev/null +++ b/docs/source/cpp_api/preprocessing.rst @@ -0,0 +1,12 @@ +Preprocessing +============= + +.. role:: py(code) + :language: c++ + :class: highlight + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + preprocessing_quantize.rst diff --git a/docs/source/cpp_api/preprocessing_quantize.rst b/docs/source/cpp_api/preprocessing_quantize.rst new file mode 100644 index 000000000..b660c61c5 --- /dev/null +++ b/docs/source/cpp_api/preprocessing_quantize.rst @@ -0,0 +1,20 @@ +Quantize +======== + +This page provides C++ class references for the publicly-exposed elements of the +`cuvs/preprocessing/quantize` package. + +.. role:: py(code) + :language: c++ + :class: highlight + +Scalar +------ + +``#include `` + +namespace *cuvs::preprocessing::quantize::scalar* + +.. doxygengroup:: scalar + :project: cuvs + diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 092b65ed9..951e0ad0c 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -38,6 +38,7 @@ include(../cmake/thirdparty/get_cuvs.cmake) # -------------- compile tasks ----------------- # add_executable(CAGRA_EXAMPLE src/cagra_example.cu) add_executable(CAGRA_PERSISTENT_EXAMPLE src/cagra_persistent_example.cu) +add_executable(DYNAMIC_BATCHING_EXAMPLE src/dynamic_batching_example.cu) add_executable(IVF_FLAT_EXAMPLE src/ivf_flat_example.cu) add_executable(IVF_PQ_EXAMPLE src/ivf_pq_example.cu) add_executable(VAMANA_EXAMPLE src/vamana_example.cu) @@ -48,6 +49,9 @@ target_link_libraries(CAGRA_EXAMPLE PRIVATE cuvs::cuvs $ Threads::Threads ) +target_link_libraries( + DYNAMIC_BATCHING_EXAMPLE PRIVATE cuvs::cuvs $ Threads::Threads +) target_link_libraries(IVF_PQ_EXAMPLE PRIVATE cuvs::cuvs $) target_link_libraries(IVF_FLAT_EXAMPLE PRIVATE cuvs::cuvs $) target_link_libraries(VAMANA_EXAMPLE PRIVATE cuvs::cuvs $) diff --git a/examples/cpp/src/dynamic_batching_example.cu b/examples/cpp/src/dynamic_batching_example.cu new file mode 100644 index 000000000..95f66a454 --- /dev/null +++ b/examples/cpp/src/dynamic_batching_example.cu @@ -0,0 +1,282 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common.cuh" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +// A helper to split the dataset into chunks +template +auto slice_matrix(const DeviceMatrixOrView &source, + typename DeviceMatrixOrView::index_type offset_rows, + typename DeviceMatrixOrView::index_type count_rows) { + auto n_cols = source.extent(1); + return raft::make_device_matrix_view< + typename DeviceMatrixOrView::element_type, + typename DeviceMatrixOrView::index_type>( + const_cast( + source.data_handle()) + + offset_rows * n_cols, + count_rows, n_cols); +} + +// A helper to measure the execution time of a function +template +void time_it(std::string label, F f, Args &&...xs) { + auto start = std::chrono::system_clock::now(); + f(std::forward(xs)...); + auto end = std::chrono::system_clock::now(); + auto t = std::chrono::duration_cast(end - start); + auto t_ms = double(t.count()) / 1000.0; + std::cout << "[" << label << "] execution time: " << t_ms << " ms" + << std::endl; +} + +/** + * Wrap waiting on a stream work into an async C++ future object. + * This is similar to recording and waiting on CUDA events, but in C++11 API. + */ +struct cuda_work_completion_promise { + + cuda_work_completion_promise(const raft::resources &res) { + auto *promise = new std::promise; + RAFT_CUDA_TRY(cudaLaunchHostFunc(raft::resource::get_cuda_stream(res), + completion_callback, + reinterpret_cast(promise))); + value_ = promise->get_future(); + } + + /** + * Waiting on the produced `future` object has the same effect as + * cudaEventSynchronize if an event was recorded at the time of creation of + * this promise object. + */ + auto get_future() -> std::future && { return std::move(value_); } + +private: + std::future value_; + + static void completion_callback(void *ptr) { + auto *promise = reinterpret_cast *>(ptr); + promise->set_value(); + delete promise; + } +}; + +void dynamic_batching_example( + raft::resources const &res, + raft::device_matrix_view dataset, + raft::device_matrix_view queries) { + using namespace cuvs::neighbors; + + // Number of neighbors to search + int64_t topk = 100; + + // Streaming scenario: maximum number of requests in-flight + constexpr int64_t kMaxJobs = 1000; + // Streaming scenario: number of concurrent CUDA streams + constexpr int64_t kNumWorkerStreams = 5; + + // Split the queries into two subsets to run every experiment twice and thus + // surface any initialization overheads. + int64_t n_queries_a = queries.extent(0) / 2; + int64_t n_queries_b = queries.extent(0) - n_queries_a; + + auto queries_a = slice_matrix(queries, 0, n_queries_a); + auto queries_b = slice_matrix(queries, n_queries_a, n_queries_b); + + // create output arrays + auto neighbors = + raft::make_device_matrix(res, queries.extent(0), topk); + auto distances = + raft::make_device_matrix(res, queries.extent(0), topk); + // slice them same as queries + auto neighbors_a = slice_matrix(neighbors, 0, n_queries_a); + auto distances_a = slice_matrix(distances, 0, n_queries_a); + auto neighbors_b = slice_matrix(neighbors, n_queries_a, n_queries_b); + auto distances_b = slice_matrix(distances, n_queries_a, n_queries_b); + + // use default index parameters + cagra::index_params orig_index_params; + + std::cout << "Building CAGRA index (search graph)" << std::endl; + auto orig_index = cagra::build(res, orig_index_params, dataset); + + std::cout << "CAGRA index has " << orig_index.size() << " vectors" + << std::endl; + std::cout << "CAGRA graph has degree " << orig_index.graph_degree() + << ", graph size [" << orig_index.graph().extent(0) << ", " + << orig_index.graph().extent(1) << "]" << std::endl; + + // use default search parameters + cagra::search_params orig_search_params; + // get a decent recall by increasing the internal topk list + orig_search_params.itopk_size = 512; + orig_search_params.algo = cagra::search_algo::SINGLE_CTA; + + // Set up dynamic batching parameters + dynamic_batching::index_params dynb_index_params{ + /* default-initializing the parent `neighbors::index_params` + (not used anyway) */ + {}, + /* Set the K in advance (the batcher needs to allocate buffers) */ + topk, + /* Configure the number and the size of IO buffers */ + 64, + kNumWorkerStreams}; + + // "build" the index (it's a low-cost index wrapping), + // that is we need to pass the original index and its search params here + dynamic_batching::index dynb_index( + res, dynb_index_params, orig_index, orig_search_params); + + // You can implement job priorities by varying the deadlines of individual + // requests + dynamic_batching::search_params dynb_search_params; + dynb_search_params.dispatch_timeout_ms = 0.1; + + // Define the big-batch setting as a baseline for measuring the throughput. + auto search_batch_orig = + [&res, &orig_index, &orig_search_params]( + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + cagra::search(res, orig_search_params, orig_index, queries, neighbors, + distances); + raft::resource::sync_stream(res); + }; + + // Launch the baseline search: check the big-batch performance + time_it("standard/batch A", search_batch_orig, queries_a, neighbors_a, + distances_a); + time_it("standard/batch B", search_batch_orig, queries_b, neighbors_b, + distances_b); + + // Streaming scenario: prepare concurrent resources + rmm::cuda_stream_pool worker_streams{kNumWorkerStreams}; + std::vector resource_pool(0); + for (int64_t i = 0; i < kNumWorkerStreams; i++) { + resource_pool.push_back(res); + raft::resource::set_cuda_stream(resource_pool[i], + worker_streams.get_stream(i)); + } + + // Streaming scenario: + // send queries one-by-one, with a maximum kMaxJobs in-flight + auto search_async_orig = + [&resource_pool, &orig_index, &orig_search_params]( + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + auto work_size = queries.extent(0); + std::array, kMaxJobs> futures; + for (int64_t i = 0; i < work_size + kMaxJobs; i++) { + // wait for previous job in the same slot to finish + if (i >= kMaxJobs) { + futures[i % kMaxJobs].wait(); + } + // submit a new job + if (i < work_size) { + auto &res = resource_pool[i % kNumWorkerStreams]; + cagra::search(res, orig_search_params, orig_index, + slice_matrix(queries, i, 1), + slice_matrix(neighbors, i, 1), + slice_matrix(distances, i, 1)); + futures[i % kMaxJobs] = + cuda_work_completion_promise(res).get_future(); + } + } + }; + + // Streaming scenario with dynamic batching: + // send queries one-by-one, with a maximum kMaxJobs in-flight, + // yet allow grouping the sequential requests (subject to deadlines) + auto search_async_dynb = + [&resource_pool, &dynb_index, &dynb_search_params]( + raft::device_matrix_view queries, + raft::device_matrix_view neighbors, + raft::device_matrix_view distances) { + auto work_size = queries.extent(0); + std::array, kMaxJobs> futures; + for (int64_t i = 0; i < work_size + kMaxJobs; i++) { + // wait for previous job in the same slot to finish + if (i >= kMaxJobs) { + futures[i % kMaxJobs].wait(); + } + // submit a new job + if (i < work_size) { + auto &res = resource_pool[i % kNumWorkerStreams]; + dynamic_batching::search(res, dynb_search_params, dynb_index, + slice_matrix(queries, i, 1), + slice_matrix(neighbors, i, 1), + slice_matrix(distances, i, 1)); + futures[i % kMaxJobs] = + cuda_work_completion_promise(res).get_future(); + } + } + }; + + // Try to handle the same amount of work in the async setting using the + // standard implementation. + time_it("standard/async A", search_async_orig, queries_a, neighbors_a, + distances_a); + time_it("standard/async B", search_async_orig, queries_b, neighbors_b, + distances_b); + + // Do the same using dynamic batching + time_it("dynamic_batching/async A", search_async_dynb, queries_a, neighbors_a, + distances_a); + time_it("dynamic_batching/async B", search_async_dynb, queries_b, neighbors_b, + distances_b); +} + +int main() { + raft::device_resources res; + + // Set the raft resource to use a pool for internal memory allocations + // (workspace) and limit the available workspace size. + raft::resource::set_workspace_to_pool_resource(res, + 12ull * 1024 * 1024 * 1024ull); + + // Create input arrays. + int64_t n_samples = 1000000; + int64_t n_dim = 128; + int64_t n_queries = 10000; + auto dataset = + raft::make_device_matrix(res, n_samples, n_dim); + auto queries = + raft::make_device_matrix(res, n_queries, n_dim); + generate_dataset(res, dataset.view(), queries.view()); + + // run the interesting part of the program + dynamic_batching_example(res, raft::make_const_mdspan(dataset.view()), + raft::make_const_mdspan(queries.view())); +}