From c492896947ef332fe83b24efbe02a712f75197eb Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Thu, 31 Oct 2024 16:01:50 +0900 Subject: [PATCH] Replaced scan_info definition with the range class --- examples/process_cli/main.cpp | 4 +- .../executor/process/mock/task_context.cpp | 6 +- .../executor/process/mock/task_context.h | 16 +- src/jogasaki/data/aligned_buffer.cpp | 21 +- src/jogasaki/data/aligned_buffer.h | 11 +- .../process/abstract/{scan_info.h => range.h} | 26 +- .../executor/process/abstract/task_context.h | 20 +- src/jogasaki/executor/process/flow.cpp | 8 +- src/jogasaki/executor/process/impl/bound.cpp | 33 ++ src/jogasaki/executor/process/impl/bound.h | 52 +++ .../process/impl/ops/details/encode_key.cpp | 38 +- .../process/impl/ops/details/encode_key.h | 30 +- .../process/impl/ops/operator_builder.cpp | 348 +++++++----------- .../process/impl/ops/operator_builder.h | 31 +- .../process/impl/ops/operator_container.cpp | 15 +- .../process/impl/ops/operator_container.h | 20 +- .../executor/process/impl/ops/scan.cpp | 86 +---- src/jogasaki/executor/process/impl/ops/scan.h | 9 +- .../process/impl/ops/scan_context.cpp | 20 +- .../executor/process/impl/ops/scan_context.h | 14 +- .../executor/process/impl/processor.cpp | 8 +- .../executor/process/impl/processor.h | 10 +- src/jogasaki/executor/process/impl/range.cpp | 36 ++ src/jogasaki/executor/process/impl/range.h | 50 +++ .../executor/process/impl/scan_info.cpp | 53 --- .../executor/process/impl/scan_info.h | 63 ---- .../executor/process/impl/task_context.cpp | 16 +- .../executor/process/impl/task_context.h | 18 +- src/jogasaki/kvs/storage.cpp | 36 ++ src/jogasaki/kvs/storage.h | 4 +- .../executor/process/ops/join_find_test.cpp | 9 +- .../executor/process/ops/scan_test.cpp | 69 ++-- .../process/process_executor_test.cpp | 9 +- .../executor/process/processor_test.cpp | 9 +- test/jogasaki/operator_test_utils.h | 6 +- 35 files changed, 606 insertions(+), 598 deletions(-) rename src/jogasaki/executor/process/abstract/{scan_info.h => range.h} (60%) create mode 100644 src/jogasaki/executor/process/impl/bound.cpp create mode 100644 src/jogasaki/executor/process/impl/bound.h create mode 100644 src/jogasaki/executor/process/impl/range.cpp create mode 100644 src/jogasaki/executor/process/impl/range.h delete mode 100644 src/jogasaki/executor/process/impl/scan_info.cpp delete mode 100644 src/jogasaki/executor/process/impl/scan_info.h diff --git a/examples/process_cli/main.cpp b/examples/process_cli/main.cpp index fbeeeec19..08f09e8d3 100644 --- a/examples/process_cli/main.cpp +++ b/examples/process_cli/main.cpp @@ -68,7 +68,7 @@ #include #include #include -#include +#include #include #include #include @@ -383,7 +383,7 @@ class cli { std::vector{r}, std::vector>{writer}, std::shared_ptr{}, - std::shared_ptr{} + std::shared_ptr{} ); ctx->work_context(std::make_unique( diff --git a/mock/jogasaki/executor/process/mock/task_context.cpp b/mock/jogasaki/executor/process/mock/task_context.cpp index 6a2f76eef..25b796324 100644 --- a/mock/jogasaki/executor/process/mock/task_context.cpp +++ b/mock/jogasaki/executor/process/mock/task_context.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,4 @@ #include "task_context.h" namespace jogasaki::executor::process::mock { - -} - +} // jogasaki::executor::process::mock \ No newline at end of file diff --git a/mock/jogasaki/executor/process/mock/task_context.h b/mock/jogasaki/executor/process/mock/task_context.h index 50fc83b20..07997c754 100644 --- a/mock/jogasaki/executor/process/mock/task_context.h +++ b/mock/jogasaki/executor/process/mock/task_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -43,12 +43,12 @@ class task_context : public abstract::task_context { std::vector readers = {}, std::vector> downstream_writers = {}, std::shared_ptr external_writer = {}, - std::shared_ptr info = {} + std::shared_ptr range = {} ) : readers_(std::move(readers)), downstream_writers_(std::move(downstream_writers)), external_writer_(std::move(external_writer)), - scan_info_(std::move(info)) + range_(std::move(range)) {} io::reader_container reader(reader_index idx) override { @@ -76,10 +76,9 @@ class task_context : public abstract::task_context { external_writer_->release(); external_writer_.reset(); } - scan_info_.reset(); } - class abstract::scan_info const* scan_info() override { + class abstract::range const* range() override { return nullptr; } @@ -95,7 +94,6 @@ class task_context : public abstract::task_context { std::vector readers_{}; std::vector> downstream_writers_{}; std::shared_ptr external_writer_{}; - std::shared_ptr scan_info_{}; + std::shared_ptr range_{}; }; - -} +} // namespace jogasaki::executor::process::mock diff --git a/src/jogasaki/data/aligned_buffer.cpp b/src/jogasaki/data/aligned_buffer.cpp index 369a596fb..763c8761d 100644 --- a/src/jogasaki/data/aligned_buffer.cpp +++ b/src/jogasaki/data/aligned_buffer.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -126,4 +126,21 @@ aligned_buffer& aligned_buffer::assign(std::string_view sv) { return assign(aligned_buffer{sv}); } -} // namespace +void aligned_buffer::dump(std::ostream& out, int indent) const noexcept{ + std::string indent_space(indent, ' '); + out << indent_space << "aligned_buffer:" << "\n"; + out << indent_space << " capacity_: " << capacity_ << "\n"; + out << indent_space << " alignment_: " << alignment_ << "\n"; + out << indent_space << " size_: " << size_ << "\n"; + out << indent_space << " data_: " ; + for (std::size_t i = 0; i < size_; ++i) { + out << std::hex << std::setw(2) << std::setfill('0') << static_cast(data_[i]) << " "; + if ((i + 1) % 16 == 0) { + out << std::endl; + } + } + out << std::setfill(' ') << std::dec << std::endl; + +} + +} // namespace jogasaki::data \ No newline at end of file diff --git a/src/jogasaki/data/aligned_buffer.h b/src/jogasaki/data/aligned_buffer.h index d00e86e82..7f13ac1b4 100644 --- a/src/jogasaki/data/aligned_buffer.h +++ b/src/jogasaki/data/aligned_buffer.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -145,6 +145,13 @@ class aligned_buffer { */ [[nodiscard]] std::size_t alignment() const noexcept; + /** + * @brief Support for debugging, callable in GDB + * @param out The output stream to which the buffer's internal state will be written. + * @param indent The indentation level for formatting the output, default is 0. + */ + void dump(std::ostream& out, int indent = 0) const noexcept; + /** * @brief compare two objects * @param a first arg to compare @@ -172,4 +179,4 @@ class aligned_buffer { void resize_internal(std::size_t sz, bool copydata); }; -} // namespace +} // namespace jogasaki::data \ No newline at end of file diff --git a/src/jogasaki/executor/process/abstract/scan_info.h b/src/jogasaki/executor/process/abstract/range.h similarity index 60% rename from src/jogasaki/executor/process/abstract/scan_info.h rename to src/jogasaki/executor/process/abstract/range.h index 72499652b..62d98a4f1 100644 --- a/src/jogasaki/executor/process/abstract/scan_info.h +++ b/src/jogasaki/executor/process/abstract/range.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,27 +18,25 @@ namespace jogasaki::executor::process::abstract { /** - * @brief scan info - * @details this instance provides specification of scan (e.g. definition of the range of scanned records) + * @brief range + * @details definition of the range of scanned records */ -class scan_info { -public: +class range { + public: /** * @brief create empty object */ - scan_info() = default; + range() = default; /** * @brief destroy the object */ - virtual ~scan_info() = default; + virtual ~range() = default; - scan_info(scan_info const& other) = default; - scan_info& operator=(scan_info const& other) = default; - scan_info(scan_info&& other) noexcept = default; - scan_info& operator=(scan_info&& other) noexcept = default; + range(range const& other) = default; + range& operator=(range const& other) = default; + range(range&& other) noexcept = default; + range& operator=(range&& other) noexcept = default; }; -} - - +} // namespace jogasaki::executor::process::abstract \ No newline at end of file diff --git a/src/jogasaki/executor/process/abstract/task_context.h b/src/jogasaki/executor/process/abstract/task_context.h index a68427767..28dc8638f 100644 --- a/src/jogasaki/executor/process/abstract/task_context.h +++ b/src/jogasaki/executor/process/abstract/task_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ #include #include -#include "scan_info.h" +#include "range.h" #include "work_context.h" namespace jogasaki::executor::process::abstract { @@ -33,7 +33,7 @@ namespace jogasaki::executor::process::abstract { * input data reader, and transient work area * * Depending on whether the processor logic is driven by main/sub input or scan, readers() or - * scan_info() functions are called to locate/retrieve the input data for the task. + * range() functions are called to locate/retrieve the input data for the task. * * The knowledge about the number of I/O objects and its index (i.e. what port or exchange the i-th reader/writer * corresponds to) are shared with processor. @@ -104,13 +104,13 @@ class task_context { */ [[nodiscard]] virtual io::record_writer* external_writer() = 0; - /** - * @brief accessor to scan information that defines scan specification for the task - * @details processor impl. knows the details scan_info and drives scan operation using it. - * The details of scan_info is transparent to processor context. + /** + * @brief accessor to range information + * @details processor impl. knows the details range and drives scan operation using it. + * The details of range is transparent to processor context. * @return scan info */ - [[nodiscard]] virtual class scan_info const* scan_info() = 0; + [[nodiscard]] virtual class range const* range() = 0; /** * @brief setter of work context @@ -150,6 +150,4 @@ inline bool operator!=(task_context const& a, task_context const& b) noexcept { return !(a == b); } -} - - +} // namespace jogasaki::executor::process::abstract \ No newline at end of file diff --git a/src/jogasaki/executor/process/flow.cpp b/src/jogasaki/executor/process/flow.cpp index a3a1ea3df..f5edd26a7 100644 --- a/src/jogasaki/executor/process/flow.cpp +++ b/src/jogasaki/executor/process/flow.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,7 +95,7 @@ sequence_view> flow::create_tasks() { step_->io_info(), step_->relation_io_map(), *step_->io_exchange_map(), - context_->request_resource() + context_ ); } catch (plan::impl::compile_exception const& e) { error::set_error_info(*context_, e.info()); @@ -170,7 +170,7 @@ std::shared_ptr flow::create_task_context( *context_, partition, operators.io_exchange_map(), - operators.scan_info(), // simply pass back the scan info. In the future, scan can be parallel and different scan info are created and filled into the task context. + operators.range(), (context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr, sink_index ); @@ -190,4 +190,4 @@ std::shared_ptr flow::create_task_context( return ctx; } -} // namespace jogasaki::executor::process +} // namespace jogasaki::executor::process \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/bound.cpp b/src/jogasaki/executor/process/impl/bound.cpp new file mode 100644 index 000000000..2da784c88 --- /dev/null +++ b/src/jogasaki/executor/process/impl/bound.cpp @@ -0,0 +1,33 @@ +/* + * Copyright 2018-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "bound.h" + +namespace jogasaki::executor::process::impl { + +[[nodiscard]] std::string_view bound::key() const noexcept { + return {static_cast(key_->data()), len_}; +} +[[nodiscard]] kvs::end_point_kind bound::endpointkind() const noexcept { return endpointkind_; } +void bound::dump(std::ostream& out, int indent) const noexcept { + std::string indent_space(indent, ' '); + out << indent_space << "bound:" + << "\n"; + out << indent_space << " endpointkind_: " << endpointkind_ << "\n"; + out << indent_space << " len_: " << len_ << "\n"; + out << indent_space << " key_: " << *key_ << "\n"; + key_->dump(out, indent + 2); +} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/bound.h b/src/jogasaki/executor/process/impl/bound.h new file mode 100644 index 000000000..04331ef3d --- /dev/null +++ b/src/jogasaki/executor/process/impl/bound.h @@ -0,0 +1,52 @@ +/* + * Copyright 2018-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace jogasaki::executor::process::impl { + +class bound { + + public: + bound() = default; + bound(bound const& other) = delete; + bound& operator=(bound const& other) = delete; + bound(bound&& other) noexcept = delete; + bound& operator=(bound&& other) noexcept = delete; + ~bound() = default; + bound(kvs::end_point_kind endpointkind, std::size_t len, + std::unique_ptr key) + : endpointkind_(endpointkind), len_(len), key_(std::move(key)) {} + kvs::end_point_kind endpointkind() { return endpointkind_; }; + std::unique_ptr key() { return std::move(key_); }; + [[nodiscard]] std::string_view key() const noexcept; + [[nodiscard]] kvs::end_point_kind endpointkind() const noexcept; + /** + * @brief Support for debugging, callable in GDB + * @param out The output stream to which the buffer's internal state will be written. + * @param indent The indentation level for formatting the output, default is 0. + */ + void dump(std::ostream& out, int indent = 0) const noexcept; + + private: + kvs::end_point_kind endpointkind_{}; + std::size_t len_{}; + std::unique_ptr key_{}; +}; + +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/details/encode_key.cpp b/src/jogasaki/executor/process/impl/ops/details/encode_key.cpp index 29d5ba5c3..2b356e698 100644 --- a/src/jogasaki/executor/process/impl/ops/details/encode_key.cpp +++ b/src/jogasaki/executor/process/impl/ops/details/encode_key.cpp @@ -34,8 +34,8 @@ #include #include #include +#include #include - namespace jogasaki::executor::process::impl::ops::details { status encode_key( //NOLINT(readability-function-cognitive-complexity) @@ -98,4 +98,38 @@ status encode_key( //NOLINT(readability-function-cognitive-complexity) return status::ok; } -} // namespace jogasaki::executor::process::impl::ops::details +status two_encode_keys( + request_context* context, + std::vector const& begin_keys, + std::vector const& end_keys, + variable_table& input_variables, + memory::lifo_paged_memory_resource& resource, + data::aligned_buffer& key_begin, + std::size_t& blen, + data::aligned_buffer& key_end, + std::size_t& elen +) { + status status_result = status::ok; + std::string message; + if ((status_result = impl::ops::details::encode_key( + context, begin_keys, input_variables, resource, key_begin, blen, message)) + != status::ok) { + + if (status_result == status::err_type_mismatch) { + set_error(*context, error_code::unsupported_runtime_feature_exception, message, status_result); + } + return status_result; + } + if ((status_result = impl::ops::details::encode_key( + context, end_keys, input_variables, resource, key_end, elen, message)) + != status::ok) { + + if (status_result == status::err_type_mismatch) { + set_error(*context, error_code::unsupported_runtime_feature_exception, message, status_result); + } + return status_result; + } + return status::ok; +} + +} // namespace jogasaki::executor::process::impl::ops::details \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/details/encode_key.h b/src/jogasaki/executor/process/impl/ops/details/encode_key.h index 12f87fe81..38764fe61 100644 --- a/src/jogasaki/executor/process/impl/ops/details/encode_key.h +++ b/src/jogasaki/executor/process/impl/ops/details/encode_key.h @@ -53,5 +53,33 @@ status encode_key( std::size_t& length, std::string& message ); +/** + * @brief evaluate the search two keys and encode + * @details evaluate the search start and end keys and encode them so that they can be used for search + * @param context the request context + * @param begin_keys the begin_key fields to be evaluated + * @param end_keys the end_key fields to be evaluated + * @param input_variables the variables to be used for evaluation + * @param resource the memory resource + * @param key_begin the buffer to store the begin encoded key + * @param blen the length of the begin encoded key + * @param key_end the buffer to store the end encoded key + * @param elen the length of the elen encoded key + * @return status::ok when successful + * @return status::err_integrity_constraint_violation when evaluation results in null where it is not allowed + * @return status::err_type_mismatch if the type of the evaluated value does not match the expected type + * @return status::err_expression_evaluation_failure any other evaluation failure + */ +status two_encode_keys( + request_context* context, + std::vector const& begin_keys, + std::vector const& end_keys, + variable_table& input_variables, + memory::lifo_paged_memory_resource& resource, + data::aligned_buffer& key_begin, + std::size_t& blen, + data::aligned_buffer& key_end, + std::size_t& elen +); -} // namespace jogasaki::executor::process::impl::ops::details +} // namespace jogasaki::executor::process::impl::ops::details \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp index d974e30dc..31f3e994d 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,12 +15,12 @@ */ #include "operator_builder.h" +#include #include #include #include #include #include -#include #include #include @@ -31,21 +31,23 @@ #include #include #include +#include #include #include #include +#include #include #include #include #include -#include #include #include #include #include #include #include +#include #include "aggregate_group.h" #include "emit.h" @@ -61,113 +63,86 @@ #include "take_flat.h" #include "take_group.h" #include "write_create.h" -#include "write_kind.h" #include "write_existing.h" +#include "write_kind.h" namespace jogasaki::executor::process::impl::ops { namespace relation = takatori::relation; using takatori::relation::step::dispatch; +using takatori::util::string_builder; using takatori::util::throw_exception; -operator_builder::operator_builder( - std::shared_ptr info, - std::shared_ptr io_info, - std::shared_ptr relation_io_map, - io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource -) : - info_(std::move(info)), - io_info_(std::move(io_info)), - io_exchange_map_(std::addressof(io_exchange_map)), - relation_io_map_(std::move(relation_io_map)), - resource_(resource) -{ - (void)resource_; //TODO remove if not necessary -} +operator_builder::operator_builder(std::shared_ptr info, + std::shared_ptr io_info, std::shared_ptr relation_io_map, + io_exchange_map& io_exchange_map, request_context* request_context) + : info_(std::move(info)), io_info_(std::move(io_info)), + io_exchange_map_(std::addressof(io_exchange_map)), + relation_io_map_(std::move(relation_io_map)), request_context_(request_context) {} -operator_container operator_builder::operator()()&& { +operator_container operator_builder::operator()() && { auto root = dispatch(*this, head()); - return operator_container{std::move(root), index_, *io_exchange_map_, std::move(scan_info_)}; + return operator_container{std::move(root), index_, *io_exchange_map_, std::move(range_)}; } relation::expression const& operator_builder::head() { relation::expression const* result = nullptr; - takatori::relation::enumerate_top(info_->relations(), [&](relation::expression const& v) { - result = &v; - }); - if (result != nullptr) { - return *result; - } + takatori::relation::enumerate_top( + info_->relations(), [&](relation::expression const& v) { result = &v; }); + if (result != nullptr) { return *result; } throw_exception(std::logic_error{""}); } std::unique_ptr operator_builder::operator()(const relation::find& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); - auto& table = secondary_or_primary_index.table(); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto& secondary_or_primary_index = + yugawara::binding::extract(node.source()); + auto& table = secondary_or_primary_index.table(); auto primary = table.owner()->find_primary_index(table); - BOOST_ASSERT(primary); //NOLINT - return std::make_unique( - index_++, - *info_, - block_index, - node.keys(), - *primary, + BOOST_ASSERT(primary); // NOLINT + return std::make_unique(index_++, *info_, block_index, node.keys(), *primary, node.columns(), - *primary != secondary_or_primary_index ? std::addressof(secondary_or_primary_index) : nullptr, - std::move(downstream) - ); + *primary != secondary_or_primary_index ? std::addressof(secondary_or_primary_index) + : nullptr, + std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::scan& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); - auto& table = secondary_or_primary_index.table(); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto& secondary_or_primary_index = + yugawara::binding::extract(node.source()); + auto& table = secondary_or_primary_index.table(); auto primary = table.owner()->find_primary_index(table); - // scan info is not passed to scan operator here, but passed back through task_context - // in order to support parallel scan in the future - scan_info_ = create_scan_info(node, secondary_or_primary_index); - - return std::make_unique( - index_++, - *info_, - block_index, - *primary, - node.columns(), - *primary != secondary_or_primary_index ? std::addressof(secondary_or_primary_index) : nullptr, - std::move(downstream) - ); + range_ = create_range(node); + return std::make_unique(index_++, *info_, block_index, *primary, node.columns(), + *primary != secondary_or_primary_index ? std::addressof(secondary_or_primary_index) + : nullptr, + std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::join_find& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - auto& secondary_or_primary_index = yugawara::binding::extract(node.source()); - auto& table = secondary_or_primary_index.table(); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto& secondary_or_primary_index = + yugawara::binding::extract(node.source()); + auto& table = secondary_or_primary_index.table(); auto primary = table.owner()->find_primary_index(table); - return std::make_unique( - node.operator_kind(), - index_++, - *info_, - block_index, - *primary, - node.columns(), - node.keys(), - node.condition(), - *primary != secondary_or_primary_index ? std::addressof(secondary_or_primary_index) : nullptr, - std::move(downstream) - ); + return std::make_unique(node.operator_kind(), index_++, *info_, block_index, + *primary, node.columns(), node.keys(), node.condition(), + *primary != secondary_or_primary_index ? std::addressof(secondary_or_primary_index) + : nullptr, + std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::project& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - return std::make_unique(index_++, *info_, block_index, node.columns(), std::move(downstream)); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + return std::make_unique( + index_++, *info_, block_index, node.columns(), std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::join_scan& node) { @@ -178,8 +153,9 @@ std::unique_ptr operator_builder::operator()(const relation::join std::unique_ptr operator_builder::operator()(const relation::filter& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - return std::make_unique(index_++, *info_, block_index, node.condition(), std::move(downstream)); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + return std::make_unique( + index_++, *info_, block_index, node.condition(), std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::buffer& node) { @@ -190,38 +166,26 @@ std::unique_ptr operator_builder::operator()(const relation::buff std::unique_ptr operator_builder::operator()(const relation::emit& node) { auto block_index = info_->block_indices().at(&node); - auto e = std::make_unique(index_++, *info_, block_index, node.columns()); + auto e = std::make_unique(index_++, *info_, block_index, node.columns()); io_exchange_map_->set_external_output(e.get()); return e; } std::unique_ptr operator_builder::operator()(const relation::write& node) { auto block_index = info_->block_indices().at(&node); - auto& index = yugawara::binding::extract(node.destination()); - - if (node.operator_kind() == relation::write_kind::update || node.operator_kind() == relation::write_kind::delete_) { - return std::make_unique( - index_++, - *info_, - block_index, - write_kind_from(node.operator_kind()), - index, - node.keys(), - node.columns() - ); + auto& index = yugawara::binding::extract(node.destination()); + + if (node.operator_kind() == relation::write_kind::update || + node.operator_kind() == relation::write_kind::delete_) { + return std::make_unique(index_++, *info_, block_index, + write_kind_from(node.operator_kind()), index, node.keys(), node.columns()); } // INSERT from SELECT std::vector columns{node.keys()}; columns.insert(columns.end(), node.columns().begin(), node.columns().end()); - return std::make_unique( - index_++, - *info_, - block_index, - write_kind_from(node.operator_kind()), - index, - columns, - resource_ - ); + return std::make_unique(index_++, *info_, block_index, + write_kind_from(node.operator_kind()), index, columns, + request_context_->request_resource()); } std::unique_ptr operator_builder::operator()(const relation::values& node) { @@ -238,36 +202,27 @@ std::unique_ptr operator_builder::operator()(const relation::iden std::unique_ptr operator_builder::operator()(const relation::step::join& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - return std::make_unique>( - index_++, - *info_, - block_index, - node.operator_kind(), - node.condition(), - std::move(downstream) - ); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + return std::make_unique>(index_++, *info_, + block_index, node.operator_kind(), node.condition(), std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::step::aggregate& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto downstream = dispatch(*this, node.output().opposite()->owner()); return std::make_unique( - index_++, - *info_, - block_index, - node.columns(), - std::move(downstream) - ); + index_++, *info_, block_index, node.columns(), std::move(downstream)); } -std::unique_ptr operator_builder::operator()(const relation::step::intersection& node) { +std::unique_ptr operator_builder::operator()( + const relation::step::intersection& node) { (void)node; throw_exception(std::logic_error{""}); return {}; } -std::unique_ptr operator_builder::operator()(const relation::step::difference& node) { +std::unique_ptr operator_builder::operator()( + const relation::step::difference& node) { (void)node; throw_exception(std::logic_error{""}); return {}; @@ -275,120 +230,60 @@ std::unique_ptr operator_builder::operator()(const relation::step std::unique_ptr operator_builder::operator()(const relation::step::flatten& node) { auto block_index = info_->block_indices().at(&node); - auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto downstream = dispatch(*this, node.output().opposite()->owner()); return std::make_unique(index_++, *info_, block_index, std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::step::take_flat& node) { - auto block_index = info_->block_indices().at(&node); + auto block_index = info_->block_indices().at(&node); auto reader_index = relation_io_map_->input_index(node.source()); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - auto& input = io_info_->input_at(reader_index); - BOOST_ASSERT(! input.is_group_input()); //NOLINT - - return std::make_unique( - index_++, - *info_, - block_index, - input.column_order(), - input.record_meta(), - node.columns(), - reader_index, - std::move(downstream) - ); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto& input = io_info_->input_at(reader_index); + BOOST_ASSERT(!input.is_group_input()); // NOLINT + + return std::make_unique(index_++, *info_, block_index, input.column_order(), + input.record_meta(), node.columns(), reader_index, std::move(downstream)); } -std::unique_ptr operator_builder::operator()(const relation::step::take_group& node) { - auto block_index = info_->block_indices().at(&node); +std::unique_ptr operator_builder::operator()( + const relation::step::take_group& node) { + auto block_index = info_->block_indices().at(&node); auto reader_index = relation_io_map_->input_index(node.source()); - auto downstream = dispatch(*this, node.output().opposite()->owner()); - auto& input = io_info_->input_at(reader_index); - return std::make_unique( - index_++, - *info_, - block_index, - input.column_order(), - input.group_meta(), - node.columns(), - reader_index, - std::move(downstream) - ); + auto downstream = dispatch(*this, node.output().opposite()->owner()); + auto& input = io_info_->input_at(reader_index); + return std::make_unique(index_++, *info_, block_index, input.column_order(), + input.group_meta(), node.columns(), reader_index, std::move(downstream)); } -std::unique_ptr operator_builder::operator()(const relation::step::take_cogroup& node) { +std::unique_ptr operator_builder::operator()( + const relation::step::take_cogroup& node) { auto block_index = info_->block_indices().at(&node); auto& block_info = info_->vars_info_list()[block_index]; std::vector reader_indices{}; std::vector groups{}; - for(auto&& g : node.groups()) { + for (auto&& g : node.groups()) { auto reader_index = relation_io_map_->input_index(g.source()); - auto& input = io_info_->input_at(reader_index); + auto& input = io_info_->input_at(reader_index); groups.emplace_back( - input.column_order(), - input.group_meta(), - g.columns(), - reader_index, - block_info - ); + input.column_order(), input.group_meta(), g.columns(), reader_index, block_info); } auto downstream = dispatch(*this, node.output().opposite()->owner()); return std::make_unique( - index_++, - *info_, - block_index, - std::move(groups), - std::move(downstream) - ); + index_++, *info_, block_index, std::move(groups), std::move(downstream)); } std::unique_ptr operator_builder::operator()(const relation::step::offer& node) { - auto block_index = info_->block_indices().at(&node); + auto block_index = info_->block_indices().at(&node); auto writer_index = relation_io_map_->output_index(node.destination()); - auto& output = io_info_->output_at(writer_index); - return std::make_unique( - index_++, - *info_, - block_index, - output.column_order(), - output.meta(), - node.columns(), - writer_index - ); -} - -std::shared_ptr -operator_builder::create_scan_info( - operator_builder::endpoint const& lower, - operator_builder::endpoint const& upper, - yugawara::storage::index const& index -) { - return std::make_shared( - details::create_search_key_fields( - index, - lower.keys(), - *info_ - ), - from(lower.kind()), - details::create_search_key_fields( - index, - upper.keys(), - *info_ - ), - from(upper.kind()) - ); -} - -std::shared_ptr operator_builder::create_scan_info( - relation::scan const& node, - yugawara::storage::index const& index -) { - return create_scan_info(node.lower(), node.upper(), index); + auto& output = io_info_->output_at(writer_index); + return std::make_unique(index_++, *info_, block_index, output.column_order(), + output.meta(), node.columns(), writer_index); } kvs::end_point_kind operator_builder::from(relation::scan::endpoint::kind_type type) { using t = relation::scan::endpoint::kind_type; using k = kvs::end_point_kind; - switch(type) { + switch (type) { case t::unbound: return k::unbound; case t::inclusive: return k::inclusive; case t::exclusive: return k::exclusive; @@ -398,22 +293,43 @@ kvs::end_point_kind operator_builder::from(relation::scan::endpoint::kind_type t throw_exception(std::logic_error{""}); } - -operator_container create_operators( - std::shared_ptr info, - std::shared_ptr io_info, - std::shared_ptr relation_io_map, - io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource -) { - return operator_builder{ - std::move(info), - std::move(io_info), - std::move(relation_io_map), - io_exchange_map, - resource - }(); +std::shared_ptr operator_builder::create_range(relation::scan const& node) { + auto& secondary_or_primary_index = + yugawara::binding::extract(node.source()); + executor::process::impl::variable_table vars{}; + auto& table = secondary_or_primary_index.table(); + auto primary = table.owner()->find_primary_index(table); + bool use_secondary = (*primary != secondary_or_primary_index); + std::size_t blen{}; + std::size_t elen{}; + std::unique_ptr key_begin = std::make_unique(); + std::unique_ptr key_end = std::make_unique(); + auto resource_ptr = std::make_unique(&global::page_pool()); + auto status_result = details::two_encode_keys(request_context_, + details::create_search_key_fields(secondary_or_primary_index, node.lower().keys(), *info_), + details::create_search_key_fields(secondary_or_primary_index, node.upper().keys(), *info_), + vars, *resource_ptr, *key_begin, blen, *key_end, elen); + if (status_result != status::ok && + status_result != status::err_integrity_constraint_violation) { + auto msg = string_builder{} << to_string_view(status_result) << string_builder::to_string; + throw_exception(jogasaki::plan::impl::compile_exception{create_error_info( + error_code::sql_execution_exception, msg, status::err_compiler_error)}); + } + auto begin_end_point_kind = kvs::adjust_endpoint_kind(use_secondary, from(node.lower().kind())); + auto end_end_point_kind = kvs::adjust_endpoint_kind(use_secondary, from(node.upper().kind())); + std::unique_ptr begin = + std::make_unique(begin_end_point_kind, blen, std::move(key_begin)); + std::unique_ptr end = + std::make_unique(end_end_point_kind, elen, std::move(key_end)); + bool is_empty = (status_result == status::err_integrity_constraint_violation); + return std::make_shared(std::move(begin), std::move(end), is_empty); } +operator_container create_operators(std::shared_ptr info, + std::shared_ptr io_info, std::shared_ptr relation_io_map, + io_exchange_map& io_exchange_map, request_context* request_context) { + return operator_builder{std::move(info), std::move(io_info), std::move(relation_io_map), + io_exchange_map, request_context}(); } +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_builder.h b/src/jogasaki/executor/process/impl/ops/operator_builder.h index 4653849c8..1fef5c607 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_builder.h +++ b/src/jogasaki/executor/process/impl/ops/operator_builder.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ #include #include #include -#include +#include #include #include #include @@ -83,14 +83,14 @@ class operator_builder { * @param compiler_ctx compiler context * @param io_info I/O information * @param relation_io_map mapping from relation to I/O index - * @param resource the memory resource used to building operators + * @param request_context the memory resource used to building operators */ operator_builder( std::shared_ptr info, std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource = nullptr + request_context* request_context = nullptr ); [[nodiscard]] operator_container operator()() &&; @@ -121,27 +121,17 @@ class operator_builder { // keeping in public for testing using key = yugawara::storage::index::key; using endpoint = takatori::relation::scan::endpoint; - std::shared_ptr create_scan_info( - endpoint const& lower, - endpoint const& upper, - yugawara::storage::index const& index - ); - - std::shared_ptr create_scan_info( - relation::scan const& node, - yugawara::storage::index const& index - ); + std::shared_ptr create_range(relation::scan const& node); -private: + private: std::shared_ptr info_{}; std::shared_ptr io_info_{}; io_exchange_map* io_exchange_map_{}; std::shared_ptr relation_io_map_{}; operator_base::operator_index_type index_{}; - std::shared_ptr scan_info_{}; - memory::lifo_paged_memory_resource* resource_{}; - + std::shared_ptr range_{}; + request_context* request_context_{}; kvs::end_point_kind from(relation::scan::endpoint::kind_type type); }; @@ -160,8 +150,7 @@ class operator_builder { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource = nullptr + request_context* request_context = nullptr ); -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_container.cpp b/src/jogasaki/executor/process/impl/ops/operator_container.cpp index bbfc7758f..d3c14d6b1 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_container.cpp +++ b/src/jogasaki/executor/process/impl/ops/operator_container.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,17 +18,17 @@ #include #include -#include +#include namespace jogasaki::executor::process::impl::ops { operator_container::operator_container(std::unique_ptr root, std::size_t operator_count, - class io_exchange_map& io_exchange_map, std::shared_ptr scan_info) : + class io_exchange_map& io_exchange_map, std::shared_ptr range) : root_(std::move(root)), operator_count_(operator_count), io_exchange_map_(std::addressof(io_exchange_map)), - scan_info_(std::move(scan_info)) + range_(std::move(range)) {} std::size_t operator_container::size() const noexcept { @@ -43,8 +43,7 @@ operator_base& operator_container::root() const noexcept { return *root_; } -std::shared_ptr const& operator_container::scan_info() const noexcept { - return scan_info_; +std::shared_ptr const& operator_container::range() const noexcept { + return range_; } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/operator_container.h b/src/jogasaki/executor/process/impl/ops/operator_container.h index 2102ab45a..55611154e 100644 --- a/src/jogasaki/executor/process/impl/ops/operator_container.h +++ b/src/jogasaki/executor/process/impl/ops/operator_container.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ #include #include -#include #include +#include namespace jogasaki::executor::process::impl::ops { @@ -39,14 +39,14 @@ class operator_container { * @param root the root of the operator tree * @param operator_count the number of operators * @param io_exchange_map the mapping from input/output index to exchange - * @param scan_info the scan information gathered from the scan operator in the operator tree (if any). + * @param range the range gathered from the scan operator in the operator tree (if any). * Can be nullptr if the operators don't contain scan operation. */ operator_container( std::unique_ptr root, std::size_t operator_count, class io_exchange_map& io_exchange_map, - std::shared_ptr scan_info + std::shared_ptr range ); /** @@ -68,17 +68,15 @@ class operator_container { [[nodiscard]] ops::operator_base& root() const noexcept; /** - * @brief accessor to scan info - * @return the scan info, or nullptr if there is no scan operation in the process + * @brief accessor to range + * @return the range, or nullptr if there is no scan operation in the process */ - [[nodiscard]] std::shared_ptr const& scan_info() const noexcept; - + [[nodiscard]] std::shared_ptr const& range() const noexcept; private: std::unique_ptr root_{}; std::size_t operator_count_{}; class io_exchange_map* io_exchange_map_{}; - std::shared_ptr scan_info_{}; + std::shared_ptr range_{}; }; -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/scan.cpp b/src/jogasaki/executor/process/impl/ops/scan.cpp index 1f2d09ba5..1837c026e 100644 --- a/src/jogasaki/executor/process/impl/ops/scan.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan.cpp @@ -35,10 +35,11 @@ #include #include #include +#include +#include #include #include #include -#include #include #include #include @@ -129,7 +130,7 @@ operation_status scan::process_record(abstract::task_context* context) { std::move(stg), use_secondary_ ? ctx.database()->get_storage(secondary_storage_name()) : nullptr, ctx.transaction(), - unsafe_downcast(ctx.task_context()->scan_info()), //NOLINT + unsafe_downcast(ctx.task_context()->range()), //NOLINT ctx.resource(), ctx.varlen_resource() ); @@ -145,12 +146,12 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp return {operation_status_kind::aborted}; } if(ctx.it_ == nullptr){ + if (ctx.range_->is_empty()){ + // range keys contain null. Nothing should match. + finish(context); + return {}; + } if(auto res = open(ctx); res != status::ok) { - if(res == status::err_integrity_constraint_violation) { - // range keys contain null. Nothing should match. - finish(context); - return {}; - } // res can be status::err_type_mismatch, then ctx already filled with error info finish(context); return error_abort(ctx, res); @@ -218,7 +219,6 @@ operation_status scan::operator()( //NOLINT(readability-function-cognitive-comp ) << "scan operator yields count:" << ctx.yield_count_ << " loop_count:" << loop_count << " elapsed(us):" << std::chrono::duration_cast(current_time - previous_time).count(); - return {operation_status_kind::yield}; } } @@ -254,70 +254,17 @@ void scan::finish(abstract::task_context* context) { unsafe_downcast(downstream_.get())->finish(context); } } - status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-function-const) auto& stg = use_secondary_ ? *ctx.secondary_stg_ : *ctx.stg_; - auto be = ctx.scan_info_->begin_endpoint(); - auto ee = ctx.scan_info_->end_endpoint(); - if (use_secondary_) { - // at storage layer, secondary index key contains primary key index as postfix - // so boundary condition needs promotion to be compatible - // TODO verify the promotion - if (be == kvs::end_point_kind::inclusive) { - be = kvs::end_point_kind::prefixed_inclusive; - } - if (be == kvs::end_point_kind::exclusive) { - be = kvs::end_point_kind::prefixed_exclusive; - } - if (ee == kvs::end_point_kind::inclusive) { - ee = kvs::end_point_kind::prefixed_inclusive; - } - if (ee == kvs::end_point_kind::exclusive) { - ee = kvs::end_point_kind::prefixed_exclusive; - } - } - executor::process::impl::variable_table vars{}; - std::size_t blen{}; - std::string msg{}; - if(auto res = details::encode_key( - ctx.req_context(), - ctx.scan_info_->begin_columns(), - vars, - *ctx.varlen_resource(), - ctx.key_begin_, - blen, - msg - ); - res != status::ok) { - if(res == status::err_type_mismatch) { - // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context - set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res); - } - return res; - } - std::size_t elen{}; - if(auto res = details::encode_key( - ctx.req_context(), - ctx.scan_info_->end_columns(), - vars, - *ctx.varlen_resource(), - ctx.key_end_, - elen, - msg - ); - res != status::ok) { - if(res == status::err_type_mismatch) { - // only on err_type_mismatch, msg is filled with error message. use it to create the error info in request context - set_error(*ctx.req_context(), error_code::unsupported_runtime_feature_exception, msg, res); - } - return res; - } + auto range = ctx.range_; + auto begin = range->begin(); + auto end = range->end(); if(auto res = stg.content_scan( *ctx.tx_, - {static_cast(ctx.key_begin_.data()), blen}, - be, - {static_cast(ctx.key_end_.data()), elen}, - ee, + begin->key(), + begin->endpointkind(), + end->key(), + end->endpointkind(), ctx.it_ ); res != status::ok) { handle_kvs_errors(*ctx.req_context(), res); @@ -327,6 +274,7 @@ status scan::open(scan_context& ctx) { //NOLINT(readability-make-member-functio return status::ok; } + void scan::close(scan_context& ctx) { ctx.it_.reset(); } @@ -377,4 +325,4 @@ void scan::dump() const noexcept { std::cerr << head << std::setw(width) << "field_mapper_:" << "not implemented yet" << std::endl; } -} // namespace jogasaki::executor::process::impl::ops +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/ops/scan.h b/src/jogasaki/executor/process/impl/ops/scan.h index 3c7edfb6e..8d3441861 100644 --- a/src/jogasaki/executor/process/impl/ops/scan.h +++ b/src/jogasaki/executor/process/impl/ops/scan.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ #include #include -#include #include #include #include @@ -159,7 +158,6 @@ class scan : public record_operator { index_field_mapper field_mapper_{}; [[nodiscard]] status open(scan_context& ctx); - void close(scan_context& ctx); std::vector create_secondary_key_fields( @@ -167,7 +165,4 @@ class scan : public record_operator { ); }; - -} - - +} // namespace jogasaki::executor::process::impl::ops diff --git a/src/jogasaki/executor/process/impl/ops/scan_context.cpp b/src/jogasaki/executor/process/impl/ops/scan_context.cpp index 40affd1bc..fdf97040d 100644 --- a/src/jogasaki/executor/process/impl/ops/scan_context.cpp +++ b/src/jogasaki/executor/process/impl/ops/scan_context.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ #include -#include +#include #include #include @@ -31,7 +31,7 @@ scan_context::scan_context( std::unique_ptr stg, std::unique_ptr secondary_stg, transaction_context* tx, - impl::scan_info const* scan_info, + impl::range const* range, context_base::memory_resource* resource, context_base::memory_resource* varlen_resource ) : @@ -39,7 +39,7 @@ scan_context::scan_context( stg_(std::move(stg)), secondary_stg_(std::move(secondary_stg)), tx_(tx), - scan_info_(scan_info) + range_(range) {} operator_kind scan_context::kind() const noexcept { @@ -67,15 +67,7 @@ void scan_context::dump() const noexcept { << " " << std::setw(20) << "transaction_context:" << (tx_ ? tx_ : nullptr) << "\n" << " " << std::setw(20) << "iterator:" - << (it_ ? it_.get() : nullptr) << "\n" - << " " << std::setw(20) << "scan_info:" - << (scan_info_ ? scan_info_ : nullptr) << "\n" - << " " << std::setw(20) << "key_begin_size:" - << key_begin_.size() << "\n" - << " " << std::setw(20) << "key_end_size:" - << key_end_.size() << std::endl; + << (it_ ? it_.get() : nullptr) << "\n"; } -} - - +} // namespace jogasaki::executor::process::impl::ops diff --git a/src/jogasaki/executor/process/impl/ops/scan_context.h b/src/jogasaki/executor/process/impl/ops/scan_context.h index 712f40d20..1f8416e63 100644 --- a/src/jogasaki/executor/process/impl/ops/scan_context.h +++ b/src/jogasaki/executor/process/impl/ops/scan_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -53,7 +53,7 @@ class scan_context : public context_base { std::unique_ptr stg, std::unique_ptr secondary_stg, transaction_context* tx, - impl::scan_info const* scan_info, + impl::range const* range_, memory_resource* resource, memory_resource* varlen_resource ); @@ -74,12 +74,8 @@ class scan_context : public context_base { std::unique_ptr secondary_stg_{}; transaction_context* tx_{}; std::unique_ptr it_{}; - impl::scan_info const* scan_info_{}; - data::aligned_buffer key_begin_{}; - data::aligned_buffer key_end_{}; std::size_t yield_count_{}; + impl::range const* range_{}; }; -} - - +} // namespace jogasaki::executor::process::impl::ops diff --git a/src/jogasaki/executor/process/impl/processor.cpp b/src/jogasaki/executor/process/impl/processor.cpp index 7b348787c..5ac006d76 100644 --- a/src/jogasaki/executor/process/impl/processor.cpp +++ b/src/jogasaki/executor/process/impl/processor.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,7 +43,7 @@ processor::processor( std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ) : info_(std::move(info)), operators_( @@ -52,7 +52,7 @@ processor::processor( std::move(io_info), std::move(relation_io_map), io_exchange_map, - resource + request_context ) ), relation_io_map_(std::move(relation_io_map)) @@ -96,4 +96,4 @@ ops::operator_container const& processor::operators() const noexcept { return operators_; } -} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/processor.h b/src/jogasaki/executor/process/impl/processor.h index a9b070f12..3afbc44fb 100644 --- a/src/jogasaki/executor/process/impl/processor.h +++ b/src/jogasaki/executor/process/impl/processor.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ class processor : public process::abstract::processor { * @param io_info input/output information * @param relation_io_map mapping from relation to input/output indices * @param io_exchange_map map from input/output to exchange operator - * @param resource the memory resource to build the structures needed by this processor + * @param request_context memory resource for initializing and managing processor structures * @throws plan::impl::compile_exception if the processor construction fails */ processor( @@ -56,7 +56,7 @@ class processor : public process::abstract::processor { std::shared_ptr io_info, std::shared_ptr relation_io_map, io_exchange_map& io_exchange_map, - memory::lifo_paged_memory_resource* resource + request_context* request_context ); /** @@ -78,6 +78,4 @@ class processor : public process::abstract::processor { std::shared_ptr relation_io_map_{}; }; -} - - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/range.cpp b/src/jogasaki/executor/process/impl/range.cpp new file mode 100644 index 000000000..ea8432f18 --- /dev/null +++ b/src/jogasaki/executor/process/impl/range.cpp @@ -0,0 +1,36 @@ +/* + * Copyright 2018-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "range.h" + +namespace jogasaki::executor::process::impl { + +range::range(std::unique_ptr begin, std::unique_ptr end, bool is_empty) noexcept + : begin_(std::move(begin)), end_(std::move(end)), is_empty_(is_empty) {} + +[[nodiscard]] bound const* range::begin() const noexcept { return begin_.get(); } +[[nodiscard]] bound const* range::end() const noexcept { return end_.get(); } +[[nodiscard]] bool range::is_empty() const noexcept { return is_empty_; } + +void range::dump(std::ostream& out, int indent) const noexcept { + std::string indent_space(indent, ' '); + out << indent_space << " begin_:\n"; + begin_->dump(out, indent + 2); + out << indent_space << " end_:\n"; + end_->dump(out, indent + 2); + out << indent_space << " is_empty_: " << is_empty_ << "\n"; +} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/range.h b/src/jogasaki/executor/process/impl/range.h new file mode 100644 index 000000000..797522c29 --- /dev/null +++ b/src/jogasaki/executor/process/impl/range.h @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2024 Project Tsurugi. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +namespace jogasaki::executor::process::impl { + +class range : public abstract::range { + public: + explicit range(std::unique_ptr begin = nullptr, std::unique_ptr end = nullptr, + bool is_empty = true) noexcept; + + ~range() override = default; + range(range const& other) = delete; + range& operator=(range const& other) = delete; + range(range&& other) noexcept = default; + range& operator=(range&& other) noexcept = default; + [[nodiscard]] bound const* begin() const noexcept; + [[nodiscard]] bound const* end() const noexcept; + [[nodiscard]] bool is_empty() const noexcept; + /** + * @brief Support for debugging, callable in GDB + * @param out The output stream to which the buffer's internal state will be written. + * @param indent The indentation level for formatting the output, default is 0. + * @param status The result of the bound. + */ + void dump(std::ostream& out, int indent = 0) const noexcept; + + private: + std::unique_ptr begin_; + std::unique_ptr end_; + bool is_empty_; +}; + +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/scan_info.cpp b/src/jogasaki/executor/process/impl/scan_info.cpp deleted file mode 100644 index 6a23236a4..000000000 --- a/src/jogasaki/executor/process/impl/scan_info.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright 2018-2023 Project Tsurugi. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "scan_info.h" - -#include - -#include -#include - -namespace jogasaki::executor::process::impl { - -scan_info::scan_info( - std::vector begin_columns, - kvs::end_point_kind begin_endpoint, - std::vector end_columns, - kvs::end_point_kind end_endpoint -) : - begin_columns_(std::move(begin_columns)), - begin_endpoint_(begin_endpoint), - end_columns_(std::move(end_columns)), - end_endpoint_(end_endpoint) -{} - -std::vector const& scan_info::begin_columns() const noexcept { - return begin_columns_; -} - -std::vector const& scan_info::end_columns() const noexcept { - return end_columns_; -} - -kvs::end_point_kind scan_info::begin_endpoint() const noexcept { - return begin_endpoint_; -} - -kvs::end_point_kind scan_info::end_endpoint() const noexcept { - return end_endpoint_; -} - -} diff --git a/src/jogasaki/executor/process/impl/scan_info.h b/src/jogasaki/executor/process/impl/scan_info.h deleted file mode 100644 index 824d84bc2..000000000 --- a/src/jogasaki/executor/process/impl/scan_info.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2018-2023 Project Tsurugi. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include - -#include -#include -#include - -namespace jogasaki::executor::process::impl { - -/** - * @brief scan info - * @details this instance provides specification of scan (e.g. definition of the range of scanned records) - */ -class scan_info : public abstract::scan_info { -public: - /** - * @brief create new object - */ - explicit scan_info( - std::vector = {}, - kvs::end_point_kind begin_endpoint = kvs::end_point_kind::unbound, - std::vector = {}, - kvs::end_point_kind end_endpoint = kvs::end_point_kind::unbound - ); - - ~scan_info() override = default; - - scan_info(scan_info const& other) = default; - scan_info& operator=(scan_info const& other) = default; - scan_info(scan_info&& other) noexcept = default; - scan_info& operator=(scan_info&& other) noexcept = default; - - [[nodiscard]] std::vector const& begin_columns() const noexcept; - [[nodiscard]] std::vector const& end_columns() const noexcept; - [[nodiscard]] kvs::end_point_kind begin_endpoint() const noexcept; - [[nodiscard]] kvs::end_point_kind end_endpoint() const noexcept; - -private: - std::vector begin_columns_{}; - kvs::end_point_kind begin_endpoint_{}; - std::vector end_columns_{}; - kvs::end_point_kind end_endpoint_{}; -}; - -} - - diff --git a/src/jogasaki/executor/process/impl/task_context.cpp b/src/jogasaki/executor/process/impl/task_context.cpp index 2becb91b9..5b3fbd06f 100644 --- a/src/jogasaki/executor/process/impl/task_context.cpp +++ b/src/jogasaki/executor/process/impl/task_context.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,9 +29,9 @@ #include #include #include -#include +#include #include -#include +#include #include #include #include @@ -48,14 +48,14 @@ process::impl::task_context::task_context( request_context& rctx, std::size_t partition, io_exchange_map const& io_exchange_map, - std::shared_ptr scan_info, + std::shared_ptr range, io::record_channel* channel, partition_index sink_index ) : request_context_(std::addressof(rctx)), partition_(partition), io_exchange_map_(std::addressof(io_exchange_map)), - scan_info_(std::move(scan_info)), + range_(std::move(range)), channel_(channel), sink_index_(sink_index) {} @@ -120,8 +120,8 @@ io::record_writer* task_context::external_writer() { return external_writer_.get(); } -class abstract::scan_info const* task_context::scan_info() { - return scan_info_.get(); +class abstract::range const* task_context::range() { + return range_.get(); } std::size_t task_context::partition() const noexcept { @@ -132,4 +132,4 @@ io::record_channel* task_context::channel() const noexcept { return channel_; } -} +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/executor/process/impl/task_context.h b/src/jogasaki/executor/process/impl/task_context.h index b05eef24b..123cffa97 100644 --- a/src/jogasaki/executor/process/impl/task_context.h +++ b/src/jogasaki/executor/process/impl/task_context.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,11 +27,11 @@ #include #include #include -#include +#include #include #include #include -#include +#include #include #include @@ -57,7 +57,7 @@ class task_context : public abstract::task_context { * @brief create new object * @param partition the index of partition assigned to this object (used as index of source on the input exchange) * @param io_exchange_map mapping from input/output indices to exchanges - * @param scan_info the scan information, nullptr if the task doesn't contain scan + * @param range the range information, nullptr if the task doesn't contain range * @param channel the record channel to write the result data * @param sink_index the index of sink on the output exchange */ @@ -65,7 +65,7 @@ class task_context : public abstract::task_context { request_context& rctx, partition_index partition, io_exchange_map const& io_exchange_map, - std::shared_ptr scan_info, + std::shared_ptr range, io::record_channel* channel, partition_index sink_index ); @@ -76,7 +76,7 @@ class task_context : public abstract::task_context { io::record_writer* external_writer() override; - class abstract::scan_info const* scan_info() override; + class abstract::range const* range() override; [[nodiscard]] std::size_t partition() const noexcept; @@ -88,12 +88,10 @@ class task_context : public abstract::task_context { request_context* request_context_{}; std::size_t partition_{}; io_exchange_map const* io_exchange_map_{}; - std::shared_ptr scan_info_{}; + std::shared_ptr range_{}; io::record_channel* channel_{}; std::shared_ptr external_writer_{}; partition_index sink_index_{}; }; -} - - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/src/jogasaki/kvs/storage.cpp b/src/jogasaki/kvs/storage.cpp index b5c297488..418ae9f5c 100644 --- a/src/jogasaki/kvs/storage.cpp +++ b/src/jogasaki/kvs/storage.cpp @@ -143,5 +143,41 @@ status storage::set_options(sharksfin::StorageOptions const& options) { status storage::get_options(sharksfin::StorageOptions& options) { return resolve(sharksfin::storage_get_options(handle_, options)); } + +end_point_kind adjust_endpoint_kind(bool use_secondary, kvs::end_point_kind endpoint){ + if (use_secondary) { + if (endpoint == kvs::end_point_kind::inclusive) { + return kvs::end_point_kind::prefixed_inclusive; + } + if (endpoint == kvs::end_point_kind::exclusive) { + return kvs::end_point_kind::prefixed_exclusive; + } + } + return endpoint; +} + +std::ostream& operator<<(std::ostream& os, end_point_kind kind) { + switch(kind) { + case end_point_kind::unbound: + os << "unbound"; + break; + case end_point_kind::inclusive: + os << "inclusive"; + break; + case end_point_kind::exclusive: + os << "exclusive"; + break; + case end_point_kind::prefixed_inclusive: + os << "prefixed_inclusive"; + break; + case end_point_kind::prefixed_exclusive: + os << "prefixed_exclusive"; + break; + default: + os << "unknown"; + break; + } + return os; } +} \ No newline at end of file diff --git a/src/jogasaki/kvs/storage.h b/src/jogasaki/kvs/storage.h index ca39e84df..9e7e6522a 100644 --- a/src/jogasaki/kvs/storage.h +++ b/src/jogasaki/kvs/storage.h @@ -124,7 +124,6 @@ class storage { std::string_view end_key, end_point_kind end_kind, std::unique_ptr& it ); - /** * @brief get the value for the given key * @param tx transaction used for the point query @@ -238,6 +237,7 @@ inline std::ostream& operator<<(std::ostream& out, storage const& value) { out << "storage(handle:" << std::hex << value.handle() << ")"; return out; } - +end_point_kind adjust_endpoint_kind(bool use_secondary, kvs::end_point_kind endpoint); +std::ostream& operator<<(std::ostream& os, end_point_kind kind); } diff --git a/test/jogasaki/executor/process/ops/join_find_test.cpp b/test/jogasaki/executor/process/ops/join_find_test.cpp index ac84507a2..06375ff3f 100644 --- a/test/jogasaki/executor/process/ops/join_find_test.cpp +++ b/test/jogasaki/executor/process/ops/join_find_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -113,14 +113,14 @@ std::pair, std::shared_ptr> std::vector readers = {}, std::vector> downstream_writers = {}, std::shared_ptr external_writer = {}, - std::shared_ptr info = {}, + std::shared_ptr range = {}, std::shared_ptr tx = {} ) { auto ret = std::make_shared( std::move(readers), std::move(downstream_writers), std::move(external_writer), - std::move(info) + std::move(range) ); auto rctx = std::make_shared(); @@ -434,5 +434,4 @@ TEST_F(join_find_test, host_variable_with_condition_expr) { ctx.release(); } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/test/jogasaki/executor/process/ops/scan_test.cpp b/test/jogasaki/executor/process/ops/scan_test.cpp index 0a965e6b3..63d39d63c 100644 --- a/test/jogasaki/executor/process/ops/scan_test.cpp +++ b/test/jogasaki/executor/process/ops/scan_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,12 +54,12 @@ #include #include #include -#include +#include #include #include #include #include -#include +#include #include #include #include @@ -75,6 +75,8 @@ #include #include #include +#include +#include #include "verifier.h" @@ -168,9 +170,13 @@ TEST_F(scan_test, simple) { put( *db_, primary_idx->simple_name(), create_record(20), create_record(2.0, 200)); auto tx = wrap(db_->create_transaction()); - auto sinfo = std::make_shared(); - mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + std::unique_ptr key_begin = std::make_unique(0,1); + std::unique_ptr key_end = std::make_unique(0,1); + std::unique_ptr begin = std::make_unique(kvs::end_point_kind::unbound,0,std::move(key_begin)); + std::unique_ptr end = std::make_unique(kvs::end_point_kind::unbound,0,std::move(key_end)); + auto range = std::make_shared(std::move(begin),std::move(end)); + mock::task_context task_ctx{ {}, {}, {},{range}}; + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -233,10 +239,13 @@ TEST_F(scan_test, nullable_fields) { put( *db_, primary_idx->simple_name(), create_record(20), create_nullable_record(std::forward_as_tuple(0.0, 0), {true, true})); auto tx = wrap(db_->create_transaction()); - auto sinfo = std::make_shared(); - mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); - + std::unique_ptr key_begin = std::make_unique(0,1); + std::unique_ptr key_end = std::make_unique(0,1); + std::unique_ptr begin = std::make_unique(kvs::end_point_kind::unbound,0,std::move(key_begin)); + std::unique_ptr end = std::make_unique(kvs::end_point_kind::unbound,0,std::move(key_end)); + auto range = std::make_shared(std::move(begin),std::move(end)); + mock::task_context task_ctx{ {}, {}, {},{range}}; + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); ASSERT_EQ(2, result.size()); @@ -326,19 +335,21 @@ TEST_F(scan_test, scan_info) { &output_variable_info }; + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; - auto sinfo = builder.create_scan_info(target, *primary_idx); - mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; + auto range = builder.create_range(target); + mock::task_context task_ctx{ {}, {}, {},{range}}; put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/B"}), create_record(1.0)); put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/C"}), create_record(2.0)); put( *db_, primary_idx->simple_name(), create_record(100, accessor::text{"123456789012345678901234567890/D"}), create_record(3.0)); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); - + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); ASSERT_EQ(2, result.size()); @@ -416,11 +427,13 @@ TEST_F(scan_test, secondary_index) { &input_variable_info, &output_variable_info }; - + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; - auto sinfo = builder.create_scan_info(target, *secondary_idx); - mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; + auto range = builder.create_range(target); + mock::task_context task_ctx{ {}, {}, {} ,{range}}; put( *db_, primary_idx->simple_name(), create_record(10), create_record(1.0, 100)); put( *db_, secondary_idx->simple_name(), create_record(100, 10), {}); @@ -432,8 +445,7 @@ TEST_F(scan_test, secondary_index) { put( *db_, secondary_idx->simple_name(), create_record(300, 30), {}); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), get_storage(*db_, secondary_idx->simple_name()), tx.get(), sinfo.get(), &resource_, &varlen_resource_); - + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), get_storage(*db_, secondary_idx->simple_name()), tx.get(),range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); ASSERT_EQ(2, result.size()); @@ -545,19 +557,21 @@ TEST_F(scan_test, host_variables) { &input_variable_info, &output_variable_info }; - + auto transaction_ctx = std::make_shared(); + transaction_ctx->error_info(create_error_info(error_code::none, "", status::err_unknown)); + request_context_.transaction(transaction_ctx); jogasaki::plan::compiler_context compiler_ctx{}; io_exchange_map exchange_map{}; - operator_builder builder{processor_info_, {}, {}, exchange_map, &resource_}; - auto sinfo = builder.create_scan_info(target, *primary_idx); - mock::task_context task_ctx{ {}, {}, {}, {sinfo}}; + operator_builder builder{processor_info_, {}, {}, exchange_map, &request_context_}; + auto range = builder.create_range(target); + mock::task_context task_ctx{ {}, {}, {},{range}}; put( *db_, primary_idx->simple_name(), create_record(100, 10), create_record(1)); put( *db_, primary_idx->simple_name(), create_record(100, 20), create_record(2)); put( *db_, primary_idx->simple_name(), create_record(100, 30), create_record(3)); auto tx = wrap(db_->create_transaction()); - scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), sinfo.get(), &resource_, &varlen_resource_); + scan_context ctx(&task_ctx, output_variables, get_storage(*db_, primary_idx->simple_name()), nullptr, tx.get(), range.get(), request_context_.request_resource(), &varlen_resource_); ASSERT_TRUE(static_cast(op(ctx))); ctx.release(); @@ -566,5 +580,4 @@ TEST_F(scan_test, host_variables) { ASSERT_EQ(status::ok, tx->commit()); } -} - +} // namespace jogasaki::executor::process::impl::ops \ No newline at end of file diff --git a/test/jogasaki/executor/process/process_executor_test.cpp b/test/jogasaki/executor/process/process_executor_test.cpp index cb9b5d4f2..3805939cb 100644 --- a/test/jogasaki/executor/process/process_executor_test.cpp +++ b/test/jogasaki/executor/process/process_executor_test.cpp @@ -29,7 +29,7 @@ #include #include #include -#include +#include #include #include #include @@ -68,7 +68,7 @@ class process_executor_test : public test_root { std::vector{r}, std::vector>{downstream_writer_}, external_writer_, - std::shared_ptr{} + std::shared_ptr{} )); } mock::basic_record_reader::records_type records_{ @@ -124,7 +124,7 @@ TEST_F(process_executor_test, custom_factory) { std::vector{r}, std::vector>{downstream_writer_}, external_writer_, - std::shared_ptr{} + std::shared_ptr{} )); abstract::process_executor_factory f = [&]( std::shared_ptr processor, @@ -147,5 +147,4 @@ TEST_F(process_executor_test, custom_factory) { EXPECT_TRUE(ewriter->is_released()); } -} - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/test/jogasaki/executor/process/processor_test.cpp b/test/jogasaki/executor/process/processor_test.cpp index fed05b509..08a210d0e 100644 --- a/test/jogasaki/executor/process/processor_test.cpp +++ b/test/jogasaki/executor/process/processor_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ #include #include #include -#include +#include #include #include #include @@ -61,11 +61,10 @@ TEST_F(processor_test, basic) { std::vector{r}, std::vector>{downstream_writer}, external_writer, - std::shared_ptr{} + std::shared_ptr{} ); auto proc = std::make_shared(); // proc->run(context.get()); } -} - +} // namespace jogasaki::executor::process::impl \ No newline at end of file diff --git a/test/jogasaki/operator_test_utils.h b/test/jogasaki/operator_test_utils.h index 18a08205a..2da436966 100644 --- a/test/jogasaki/operator_test_utils.h +++ b/test/jogasaki/operator_test_utils.h @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2024 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -176,6 +176,7 @@ class operator_test_utils { takatori::plan::process& process_; //NOLINT memory::page_pool pool_{}; //NOLINT + request_context request_context_{}; //NOLINT memory::lifo_paged_memory_resource resource_; //NOLINT memory::lifo_paged_memory_resource varlen_resource_; //NOLINT memory::lifo_paged_memory_resource verifier_varlen_resource_; //NOLINT @@ -309,5 +310,4 @@ class operator_test_utils { }; -} - +} // namespace jogasaki::executor::process::impl::ops