Skip to content

Commit

Permalink
parallelization of simple query
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Nov 25, 2024
1 parent dfc42ad commit e2a85cd
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
2 changes: 0 additions & 2 deletions src/jogasaki/executor/process/impl/bound.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class bound {
bound(kvs::end_point_kind endpointkind, std::size_t len,
std::unique_ptr<data::aligned_buffer> key)
: endpointkind_(endpointkind), len_(len), key_(std::move(key)) {}
kvs::end_point_kind endpointkind() { return endpointkind_; };
std::unique_ptr<data::aligned_buffer> key() { return std::move(key_); };
[[nodiscard]] std::string_view key() const noexcept;
[[nodiscard]] kvs::end_point_kind endpointkind() const noexcept;
/**
Expand Down
22 changes: 21 additions & 1 deletion src/jogasaki/executor/process/impl/ops/operator_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <yugawara/storage/table.h>

#include <jogasaki/data/iterable_record_store.h>
#include <jogasaki/dist/simple_key_distribution.h>
#include <jogasaki/dist/key_range.h>
#include <jogasaki/executor/process/impl/bound.h>
#include <jogasaki/executor/process/impl/ops/details/encode_key.h>
#include <jogasaki/executor/process/impl/ops/details/search_key_field_info.h>
Expand Down Expand Up @@ -405,7 +407,25 @@ std::vector<std::shared_ptr<impl::scan_range>> operator_builder::create_scan_ran
bool is_empty = (status_result == status::err_integrity_constraint_violation);

std::vector<bound> bounds;
bounds.emplace_back(std::move(begin));
size_t scan_parallel_count = global::config_pool()->scan_default_parallel();
bool is_rtx = request_context_->transaction()->option()->type()
== kvs::transaction_option::transaction_type::read_only;
if (scan_parallel_count > 1 && is_rtx) {
jogasaki::dist::simple_key_distribution distribution;
jogasaki::dist::key_range range(
begin.key(), begin.endpointkind(), end.key(), end.endpointkind());
auto pivot_count = scan_parallel_count - 1;
auto pivots = distribution.compute_pivots(pivot_count, range);
bounds.emplace_back(std::move(begin));
for (const auto& pivot : pivots) {
bounds.emplace_back(bound(kvs::end_point_kind::exclusive, pivot.size(),
std::make_unique<data::aligned_buffer>(pivot)));
bounds.emplace_back(bound(kvs::end_point_kind::inclusive, pivot.size(),
std::make_unique<data::aligned_buffer>(pivot)));
}
} else {
bounds.emplace_back(std::move(begin));
}
bounds.emplace_back(std::move(end));
for (size_t i = 0; i + 1 < bounds.size(); i += 2) {
scan_ranges.emplace_back(std::make_shared<impl::scan_range>(
Expand Down

0 comments on commit e2a85cd

Please sign in to comment.