diff --git a/src/jogasaki/executor/process/impl/bound.h b/src/jogasaki/executor/process/impl/bound.h index c058e8ed..9bc9bf85 100644 --- a/src/jogasaki/executor/process/impl/bound.h +++ b/src/jogasaki/executor/process/impl/bound.h @@ -32,8 +32,6 @@ class bound { bound(kvs::end_point_kind endpointkind, std::size_t len, std::unique_ptr key) : endpointkind_(endpointkind), len_(len), key_(std::move(key)) {} - kvs::end_point_kind endpointkind() { return endpointkind_; }; - std::unique_ptr key() { return std::move(key_); }; [[nodiscard]] std::string_view key() const noexcept; [[nodiscard]] kvs::end_point_kind endpointkind() const noexcept; /** diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index 0511bcb8..9aa74422 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -36,6 +36,8 @@ #include #include +#include +#include #include #include #include @@ -405,7 +407,23 @@ std::vector> operator_builder::create_scan_ran bool is_empty = (status_result == status::err_integrity_constraint_violation); std::vector bounds; - bounds.emplace_back(std::move(begin)); + size_t scan_parallel_count = global::config_pool()->scan_default_parallel(); + if (scan_parallel_count > 1) { + jogasaki::dist::simple_key_distribution distribution; + jogasaki::dist::key_range range( + begin.key(), begin.endpointkind(), end.key(), end.endpointkind()); + auto pivot_count = scan_parallel_count - 1; + auto pivots = distribution.compute_pivots(pivot_count, range); + bounds.emplace_back(std::move(begin)); + for (const auto& pivot : pivots) { + bounds.emplace_back(std::move(bound(kvs::end_point_kind::exclusive, pivot.size(), + std::make_unique(pivot)))); + bounds.emplace_back(std::move(bound(kvs::end_point_kind::inclusive, pivot.size(), + std::make_unique(pivot)))); + } + } else { + bounds.emplace_back(std::move(begin)); + } bounds.emplace_back(std::move(end)); for (size_t i = 0; i + 1 < bounds.size(); i += 2) { scan_ranges.emplace_back(std::make_shared(