From f987122158e4054f27dadafe1a098e228e824282 Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Mon, 25 Nov 2024 17:51:22 +0900 Subject: [PATCH] parallelization of simple query --- src/jogasaki/executor/process/impl/bound.h | 2 -- .../process/impl/ops/operator_builder.cpp | 22 ++++++++++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/jogasaki/executor/process/impl/bound.h b/src/jogasaki/executor/process/impl/bound.h index c058e8ed..9bc9bf85 100644 --- a/src/jogasaki/executor/process/impl/bound.h +++ b/src/jogasaki/executor/process/impl/bound.h @@ -32,8 +32,6 @@ class bound { bound(kvs::end_point_kind endpointkind, std::size_t len, std::unique_ptr key) : endpointkind_(endpointkind), len_(len), key_(std::move(key)) {} - kvs::end_point_kind endpointkind() { return endpointkind_; }; - std::unique_ptr key() { return std::move(key_); }; [[nodiscard]] std::string_view key() const noexcept; [[nodiscard]] kvs::end_point_kind endpointkind() const noexcept; /** diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index 0511bcb8..cef2c418 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -36,6 +36,8 @@ #include #include +#include +#include #include #include #include @@ -405,7 +407,25 @@ std::vector> operator_builder::create_scan_ran bool is_empty = (status_result == status::err_integrity_constraint_violation); std::vector bounds; - bounds.emplace_back(std::move(begin)); + size_t scan_parallel_count = global::config_pool()->scan_default_parallel(); + auto option = request_context_->transaction()->option(); + bool is_rtx = option && option->type() == kvs::transaction_option::transaction_type::read_only; + if (scan_parallel_count > 1 && is_rtx) { + jogasaki::dist::simple_key_distribution distribution; + jogasaki::dist::key_range range( + begin.key(), begin.endpointkind(), end.key(), end.endpointkind()); + auto pivot_count = scan_parallel_count - 1; + auto pivots = distribution.compute_pivots(pivot_count, range); + bounds.emplace_back(std::move(begin)); + for (const auto& pivot : pivots) { + bounds.emplace_back(bound(kvs::end_point_kind::exclusive, pivot.size(), + std::make_unique(pivot))); + bounds.emplace_back(bound(kvs::end_point_kind::inclusive, pivot.size(), + std::make_unique(pivot))); + } + } else { + bounds.emplace_back(std::move(begin)); + } bounds.emplace_back(std::move(end)); for (size_t i = 0; i + 1 < bounds.size(); i += 2) { scan_ranges.emplace_back(std::make_shared(