Skip to content

Commit

Permalink
Merge pull request #131 from project-tsurugi/scan_vector
Browse files Browse the repository at this point in the history
change scan_range to scan_range vector
  • Loading branch information
YoshiakiNishimura authored Nov 25, 2024
2 parents 4f07c93 + c5374ef commit dfc42ad
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 24 deletions.
10 changes: 8 additions & 2 deletions src/jogasaki/executor/process/flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ sequence_view<std::shared_ptr<model::task>> flow::create_tasks() {
std::vector<std::shared_ptr<abstract::task_context>> contexts{};
auto partitions = check_empty_input_and_calculate_partitions();
auto& operators = proc->operators();
const auto& scan_ranges = operators.scan_ranges();
if (!scan_ranges.empty()){
partitions = scan_ranges.size();
}
auto external_output = operators.io_exchange_map().external_output();
auto ch = context_->record_channel();
if (ch && external_output != nullptr) {
Expand All @@ -124,8 +128,10 @@ sequence_view<std::shared_ptr<model::task>> flow::create_tasks() {
}

contexts.reserve(partitions);
for (std::size_t i=0; i < partitions; ++i) {
contexts.emplace_back(create_task_context(i, operators, sink_idx_base + i, operators.range()));

for (std::size_t i = 0; i < partitions; ++i) {
contexts.emplace_back(create_task_context(
i, operators, sink_idx_base + i, scan_ranges.empty() ? nullptr : scan_ranges[i]));
}

bool is_rtx = context_->transaction()->option()->type()
Expand Down
18 changes: 14 additions & 4 deletions src/jogasaki/executor/process/impl/ops/operator_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ operator_builder::operator_builder(

operator_container operator_builder::operator()()&& {
auto root = dispatch(*this, head());
return operator_container{std::move(root), index_, *io_exchange_map_, std::move(range_)};
return operator_container{std::move(root), index_, *io_exchange_map_, std::move(scan_ranges_)};
}

relation::expression const& operator_builder::head() {
Expand Down Expand Up @@ -132,7 +132,7 @@ std::unique_ptr<operator_base> operator_builder::operator()(const relation::scan
auto& secondary_or_primary_index = yugawara::binding::extract<yugawara::storage::index>(node.source());
auto& table = secondary_or_primary_index.table();
auto primary = table.owner()->find_primary_index(table);
range_ = create_range(node);
scan_ranges_ = create_scan_ranges(node);
return std::make_unique<scan>(
index_++,
*info_,
Expand Down Expand Up @@ -374,7 +374,9 @@ std::unique_ptr<operator_base> operator_builder::operator()(const relation::step
}


std::shared_ptr<impl::scan_range> operator_builder::create_range(relation::scan const& node) {
std::vector<std::shared_ptr<impl::scan_range>> operator_builder::create_scan_ranges(
relation::scan const& node) {
std::vector<std::shared_ptr<impl::scan_range>> scan_ranges{};
auto& secondary_or_primary_index =
yugawara::binding::extract<yugawara::storage::index>(node.source());
executor::process::impl::variable_table vars{};
Expand All @@ -401,7 +403,15 @@ std::shared_ptr<impl::scan_range> operator_builder::create_range(relation::scan
bound begin(begin_end_point_kind, blen, std::move(key_begin));
bound end(end_end_point_kind, elen, std::move(key_end));
bool is_empty = (status_result == status::err_integrity_constraint_violation);
return std::make_shared<impl::scan_range>(std::move(begin), std::move(end), is_empty);

std::vector<bound> bounds;
bounds.emplace_back(std::move(begin));
bounds.emplace_back(std::move(end));
for (size_t i = 0; i + 1 < bounds.size(); i += 2) {
scan_ranges.emplace_back(std::make_shared<impl::scan_range>(
std::move(bounds[i]), std::move(bounds[i + 1]), is_empty));
}
return scan_ranges;
}

operator_container create_operators(
Expand Down
5 changes: 2 additions & 3 deletions src/jogasaki/executor/process/impl/ops/operator_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,14 @@ class operator_builder {
using key = yugawara::storage::index::key;
using endpoint = takatori::relation::scan::endpoint;

std::shared_ptr<impl::scan_range> create_range(relation::scan const& node);

std::vector<std::shared_ptr<impl::scan_range>> create_scan_ranges(relation::scan const& node);
private:
std::shared_ptr<processor_info> info_{};
std::shared_ptr<io_info> io_info_{};
io_exchange_map* io_exchange_map_{};
std::shared_ptr<relation_io_map> relation_io_map_{};
operator_base::operator_index_type index_{};
std::shared_ptr<impl::scan_range> range_{};
std::vector<std::shared_ptr<impl::scan_range>> scan_ranges_{};
request_context* request_context_{};

};
Expand Down
9 changes: 4 additions & 5 deletions src/jogasaki/executor/process/impl/ops/operator_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@ namespace jogasaki::executor::process::impl::ops {


operator_container::operator_container(std::unique_ptr<ops::operator_base> root, std::size_t operator_count,
class io_exchange_map& io_exchange_map, std::shared_ptr<impl::scan_range> range) :
class io_exchange_map& io_exchange_map, std::vector<std::shared_ptr<impl::scan_range>> scan_ranges) :
root_(std::move(root)),
operator_count_(operator_count),
io_exchange_map_(std::addressof(io_exchange_map)),
range_(std::move(range))
scan_ranges_(std::move(scan_ranges))
{}

std::size_t operator_container::size() const noexcept {
return operator_count_;
}
Expand All @@ -43,7 +42,7 @@ operator_base& operator_container::root() const noexcept {
return *root_;
}

std::shared_ptr<impl::scan_range> const& operator_container::range() const noexcept {
return range_;
std::vector<std::shared_ptr<impl::scan_range>> operator_container::scan_ranges() const noexcept {
return scan_ranges_;
}
} // namespace jogasaki::executor::process::impl::ops
10 changes: 5 additions & 5 deletions src/jogasaki/executor/process/impl/ops/operator_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ class operator_container {
* @param root the root of the operator tree
* @param operator_count the number of operators
* @param io_exchange_map the mapping from input/output index to exchange
* @param range the range gathered from the scan operator in the operator tree (if any).
* @param scan_ranges the ranges gathered from the scan operator in the operator tree (if any).
* Can be nullptr if the operators don't contain scan operation.
*/
operator_container(
std::unique_ptr<ops::operator_base> root,
std::size_t operator_count,
class io_exchange_map& io_exchange_map,
std::shared_ptr<impl::scan_range> range
std::vector<std::shared_ptr<impl::scan_range>> scan_ranges
);

/**
Expand All @@ -69,14 +69,14 @@ class operator_container {

/**
* @brief accessor to scan_range
* @return the range, or nullptr if there is no scan operation in the process
* @return the scan_range vector , or empty if there are no scan operations in the process
*/
[[nodiscard]] std::shared_ptr<impl::scan_range> const& range() const noexcept;
[[nodiscard]] std::vector<std::shared_ptr<impl::scan_range>> scan_ranges() const noexcept;
private:
std::unique_ptr<ops::operator_base> root_{};
std::size_t operator_count_{};
class io_exchange_map* io_exchange_map_{};
std::shared_ptr<impl::scan_range> range_{};
std::vector<std::shared_ptr<impl::scan_range>> scan_ranges_{};
};

} // namespace jogasaki::executor::process::impl::ops
10 changes: 5 additions & 5 deletions test/jogasaki/executor/process/ops/scan_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ TEST_F(scan_test, simple) {
jogasaki::plan::compiler_context compiler_ctx{};
io_exchange_map exchange_map{};
operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_};
auto range = builder.create_range(target);
auto range = (builder.create_scan_ranges(target))[0];
mock::task_context task_ctx{ {}, {}, {},{range}};
scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_);
ASSERT_TRUE(static_cast<bool>(op(ctx)));
Expand Down Expand Up @@ -246,7 +246,7 @@ TEST_F(scan_test, nullable_fields) {
jogasaki::plan::compiler_context compiler_ctx{};
io_exchange_map exchange_map{};
operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_};
auto range = builder.create_range(target);
auto range = (builder.create_scan_ranges(target))[0];
mock::task_context task_ctx{ {}, {}, {},{range}};
scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_);
ASSERT_TRUE(static_cast<bool>(op(ctx)));
Expand Down Expand Up @@ -344,7 +344,7 @@ TEST_F(scan_test, scan_info) {
jogasaki::plan::compiler_context compiler_ctx{};
io_exchange_map exchange_map{};
operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_};
auto range = builder.create_range(target);
auto range = (builder.create_scan_ranges(target))[0];
mock::task_context task_ctx{ {}, {}, {},{range}};

put( *db_, primary_idx->simple_name(), create_record<kind::int8, kind::character>(100, accessor::text{"123456789012345678901234567890/B"}), create_record<kind::float8>(1.0));
Expand Down Expand Up @@ -435,7 +435,7 @@ TEST_F(scan_test, secondary_index) {
request_context_.transaction(transaction_ctx);
io_exchange_map exchange_map{};
operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_};
auto range = builder.create_range(target);
auto range = (builder.create_scan_ranges(target))[0];
mock::task_context task_ctx{ {}, {}, {} ,{range}};

put( *db_, primary_idx->simple_name(), create_record<kind::int4>(10), create_record<kind::float8, kind::int8>(1.0, 100));
Expand Down Expand Up @@ -566,7 +566,7 @@ TEST_F(scan_test, host_variables) {
jogasaki::plan::compiler_context compiler_ctx{};
io_exchange_map exchange_map{};
operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_};
auto range = builder.create_range(target);
auto range = (builder.create_scan_ranges(target))[0];
mock::task_context task_ctx{ {}, {}, {},{range}};

put( *db_, primary_idx->simple_name(), create_record<kind::int4, kind::int8>(100, 10), create_record<kind::int8>(1));
Expand Down

0 comments on commit dfc42ad

Please sign in to comment.