diff --git a/src/jogasaki/executor/process/flow.cpp b/src/jogasaki/executor/process/flow.cpp index f2cfcd14..a1abcbf0 100644 --- a/src/jogasaki/executor/process/flow.cpp +++ b/src/jogasaki/executor/process/flow.cpp @@ -106,6 +106,10 @@ sequence_view> flow::create_tasks() { std::vector> contexts{}; auto partitions = check_empty_input_and_calculate_partitions(); auto& operators = proc->operators(); + const auto& scan_ranges = operators.scan_ranges(); + if (!scan_ranges.empty()){ + partitions = scan_ranges.size(); + } auto external_output = operators.io_exchange_map().external_output(); auto ch = context_->record_channel(); if (ch && external_output != nullptr) { @@ -124,8 +128,10 @@ sequence_view> flow::create_tasks() { } contexts.reserve(partitions); - for (std::size_t i=0; i < partitions; ++i) { - contexts.emplace_back(create_task_context(i, operators, sink_idx_base + i, operators.range())); + + for (std::size_t i = 0; i < partitions; ++i) { + contexts.emplace_back(create_task_context( + i, operators, sink_idx_base + i, scan_ranges.empty() ? nullptr : scan_ranges[i])); } bool is_rtx = context_->transaction()->option()->type() diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index dc0d0f07..0511bcb8 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -93,7 +93,7 @@ operator_builder::operator_builder( operator_container operator_builder::operator()()&& { auto root = dispatch(*this, head()); - return operator_container{std::move(root), index_, *io_exchange_map_, std::move(range_)}; + return operator_container{std::move(root), index_, *io_exchange_map_, std::move(scan_ranges_)}; } relation::expression const& operator_builder::head() { @@ -132,7 +132,7 @@ std::unique_ptr operator_builder::operator()(const relation::scan auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); auto& table = secondary_or_primary_index.table(); auto primary = table.owner()->find_primary_index(table); - range_ = create_range(node); + scan_ranges_ = create_scan_ranges(node); return std::make_unique( index_++, *info_, @@ -374,7 +374,9 @@ std::unique_ptr operator_builder::operator()(const relation::step } -std::shared_ptr operator_builder::create_range(relation::scan const& node) { +std::vector> operator_builder::create_scan_ranges( + relation::scan const& node) { + std::vector> scan_ranges{}; auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); executor::process::impl::variable_table vars{}; @@ -401,7 +403,15 @@ std::shared_ptr operator_builder::create_range(relation::scan bound begin(begin_end_point_kind, blen, std::move(key_begin)); bound end(end_end_point_kind, elen, std::move(key_end)); bool is_empty = (status_result == status::err_integrity_constraint_violation); - return std::make_shared(std::move(begin), std::move(end), is_empty); + + std::vector bounds; + bounds.emplace_back(std::move(begin)); + bounds.emplace_back(std::move(end)); + for (size_t i = 0; i + 1 < bounds.size(); i += 2) { + scan_ranges.emplace_back(std::make_shared( + std::move(bounds[i]), std::move(bounds[i + 1]), is_empty)); + } + return scan_ranges; } operator_container create_operators( diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.h b/src/jogasaki/executor/process/impl/ops/operator_builder.h index 0c9c6889..2b2e0058 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.h +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.h @@ -122,15 +122,14 @@ class operator_builder { using key = yugawara::storage::index::key; using endpoint = takatori::relation::scan::endpoint; - std::shared_ptr create_range(relation::scan const& node); - + std::vector> create_scan_ranges(relation::scan const& node); private: std::shared_ptr info_{}; std::shared_ptr io_info_{}; io_exchange_map* io_exchange_map_{}; std::shared_ptr relation_io_map_{}; operator_base::operator_index_type index_{}; - std::shared_ptr range_{}; + std::vector> scan_ranges_{}; request_context* request_context_{}; }; diff --git a/src/jogasaki/executor/process/impl/ops/operator_container.cpp b/src/jogasaki/executor/process/impl/ops/operator_container.cpp index dc739e15..68d22c59 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_container.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_container.cpp @@ -24,13 +24,12 @@ namespace jogasaki::executor::process::impl::ops { operator_container::operator_container(std::unique_ptr root, std::size_t operator_count, - class io_exchange_map& io_exchange_map, std::shared_ptr range) : + class io_exchange_map& io_exchange_map, std::vector> scan_ranges) : root_(std::move(root)), operator_count_(operator_count), io_exchange_map_(std::addressof(io_exchange_map)), - range_(std::move(range)) + scan_ranges_(std::move(scan_ranges)) {} - std::size_t operator_container::size() const noexcept { return operator_count_; } @@ -43,7 +42,7 @@ operator_base& operator_container::root() const noexcept { return *root_; } -std::shared_ptr const& operator_container::range() const noexcept { - return range_; +std::vector> operator_container::scan_ranges() const noexcept { + return scan_ranges_; } } // namespace jogasaki::executor::process::impl::ops diff --git a/src/jogasaki/executor/process/impl/ops/operator_container.h b/src/jogasaki/executor/process/impl/ops/operator_container.h index fc6027e7..3a5b6d91 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_container.h +++ b/src/jogasaki/executor/process/impl/ops/operator_container.h @@ -39,14 +39,14 @@ class operator_container { * @param root the root of the operator tree * @param operator_count the number of operators * @param io_exchange_map the mapping from input/output index to exchange - * @param range the range gathered from the scan operator in the operator tree (if any). + * @param scan_ranges the ranges gathered from the scan operator in the operator tree (if any). * Can be nullptr if the operators don't contain scan operation. */ operator_container( std::unique_ptr root, std::size_t operator_count, class io_exchange_map& io_exchange_map, - std::shared_ptr range + std::vector> scan_ranges ); /** @@ -69,14 +69,14 @@ class operator_container { /** * @brief accessor to scan_range - * @return the range, or nullptr if there is no scan operation in the process + * @return the scan_range vector , or empty if there are no scan operations in the process */ - [[nodiscard]] std::shared_ptr const& range() const noexcept; + [[nodiscard]] std::vector> scan_ranges() const noexcept; private: std::unique_ptr root_{}; std::size_t operator_count_{}; class io_exchange_map* io_exchange_map_{}; - std::shared_ptr range_{}; + std::vector> scan_ranges_{}; }; } // namespace jogasaki::executor::process::impl::ops diff --git a/test/jogasaki/executor/process/ops/scan_test.cpp b/test/jogasaki/executor/process/ops/scan_test.cpp index 57099b3a..37a08561 100644 --- a/test/jogasaki/executor/process/ops/scan_test.cpp +++ b/test/jogasaki/executor/process/ops/scan_test.cpp @@ -176,7 +176,7 @@ TEST_F(scan_test, simple) { jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; - auto range = builder.create_range(target); + auto range = (builder.create_scan_ranges(target))[0]; mock::task_context task_ctx{ {}, {}, {},{range}}; scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); @@ -246,7 +246,7 @@ TEST_F(scan_test, nullable_fields) { jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; - auto range = builder.create_range(target); + auto range = (builder.create_scan_ranges(target))[0]; mock::task_context task_ctx{ {}, {}, {},{range}}; scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); @@ -344,7 +344,7 @@ TEST_F(scan_test, scan_info) { jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; - auto range = builder.create_range(target); + auto range = (builder.create_scan_ranges(target))[0]; mock::task_context task_ctx{ {}, {}, {},{range}}; put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/B"}), create_record(1.0)); @@ -435,7 +435,7 @@ TEST_F(scan_test, secondary_index) { request_context_.transaction(transaction_ctx); io_exchange_map exchange_map{}; operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; - auto range = builder.create_range(target); + auto range = (builder.create_scan_ranges(target))[0]; mock::task_context task_ctx{ {}, {}, {} ,{range}}; put( *db_, primary_idx->simple_name(), create_record(10), create_record(1.0, 100)); @@ -566,7 +566,7 @@ TEST_F(scan_test, host_variables) { jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; - auto range = builder.create_range(target); + auto range = (builder.create_scan_ranges(target))[0]; mock::task_context task_ctx{ {}, {}, {},{range}}; put( *db_, primary_idx->simple_name(), create_record(100, 10), create_record(1));