Skip to content

Commit

Permalink
flow to use sink_at/source_at instead of list view
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 28, 2024
1 parent 03370f1 commit 9c893ba
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 238 deletions.
5 changes: 4 additions & 1 deletion .vscode/samples/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@
"typeindex": "cpp",
"typeinfo": "cpp",
"valarray": "cpp",
"variant": "cpp"
"variant": "cpp",
"source_location": "cpp",
"format": "cpp",
"span": "cpp"
}
}
14 changes: 7 additions & 7 deletions examples/cogroup_cli/consumer_flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ class consumer_flow : public model::flow {
{}

sequence_view<std::shared_ptr<model::task>> create_tasks() override {
auto l_srcs = dynamic_cast<executor::exchange::group::flow&>(left_upstream_->data_flow_object(*context_)).sources();
auto r_srcs = dynamic_cast<executor::exchange::group::flow&>(right_upstream_->data_flow_object(*context_)).sources();
tasks_.reserve(l_srcs.size());
assert(l_srcs.size() == r_srcs.size());
for(std::size_t i = 0, n = l_srcs.size(); i < n; ++i) {
auto& l_flow = dynamic_cast<executor::exchange::group::flow&>(left_upstream_->data_flow_object(*context_));
auto& r_flow = dynamic_cast<executor::exchange::group::flow&>(right_upstream_->data_flow_object(*context_));
tasks_.reserve(l_flow.source_count());
assert(l_flow.source_count() == r_flow.source_count());
for(std::size_t i = 0, n = l_flow.source_count(); i < n; ++i) {
if (params_->use_priority_queue) {
tasks_.emplace_back(std::make_unique<priority_queue_consumer_task>(context_, step_, l_srcs[i].acquire_reader(), r_srcs[i].acquire_reader(), meta_, meta_));
tasks_.emplace_back(std::make_unique<priority_queue_consumer_task>(context_, step_, l_flow.source_at(i).acquire_reader(), r_flow.source_at(i).acquire_reader(), meta_, meta_));
} else {
tasks_.emplace_back(std::make_unique<consumer_task>(context_, step_, l_srcs[i].acquire_reader(), r_srcs[i].acquire_reader(), meta_, meta_));
tasks_.emplace_back(std::make_unique<consumer_task>(context_, step_, l_flow.source_at(i).acquire_reader(), r_flow.source_at(i).acquire_reader(), meta_, meta_));
}
}
return takatori::util::sequence_view{&*(tasks_.begin()), &*(tasks_.end())};
Expand Down
11 changes: 6 additions & 5 deletions examples/common/producer_flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ class producer_flow : public model::flow {
params_(&p) {}

sequence_view<std::shared_ptr<model::task>> create_tasks() override {
auto [sinks, srcs] = dynamic_cast<executor::exchange::flow&>(downstream_->data_flow_object(*context_)).setup_partitions(params_->upstream_partitions_);
(void)srcs;
resources_.reserve(sinks.size());
tasks_.reserve(sinks.size());
for(auto& s : sinks) {
auto& flow = dynamic_cast<executor::exchange::flow&>(downstream_->data_flow_object(*context_));
flow.setup_partitions(params_->upstream_partitions_);
resources_.reserve(flow.sink_count());
tasks_.reserve(flow.sink_count());
for(std::size_t i=0,n=flow.sink_count(); i<n; ++i) {
auto& s = flow.sink_at(i);
auto& resource = resources_.emplace_back(std::make_unique<memory::monotonic_paged_memory_resource>(&global::page_pool()));
tasks_.emplace_back(std::make_unique<producer_task<Params>>(context_, step_, &s, meta_, *params_, *resource));
}
Expand Down
7 changes: 4 additions & 3 deletions examples/group_cli/consumer_flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ class consumer_flow : public model::flow {
{}

sequence_view<std::shared_ptr<model::task>> create_tasks() override {
auto srcs = dynamic_cast<executor::exchange::group::flow&>(upstream_->data_flow_object(*context_)).sources();
tasks_.reserve(srcs.size());
for(auto& s : srcs) {
auto& flow = dynamic_cast<executor::exchange::group::flow&>(upstream_->data_flow_object(*context_));
tasks_.reserve(flow.source_count());
for(std::size_t i=0, n=flow.source_count(); i<n; ++i) {
auto& s = flow.source_at(i);
tasks_.emplace_back(std::make_unique<consumer_task>(context_, step_, s.acquire_reader(), meta_, *params_));
}
return takatori::util::sequence_view{&*(tasks_.begin()), &*(tasks_.end())};
Expand Down
7 changes: 4 additions & 3 deletions examples/mock_aggregate_cli/consumer_flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ class consumer_flow : public model::flow {
{}

sequence_view<std::shared_ptr<model::task>> create_tasks() override {
auto srcs = dynamic_cast<executor::exchange::mock::aggregate::flow&>(upstream_->data_flow_object(*context_)).sources();
tasks_.reserve(srcs.size());
for(auto& s : srcs) {
auto& flow = dynamic_cast<executor::exchange::mock::aggregate::flow&>(upstream_->data_flow_object(*context_));
tasks_.reserve(flow.source_count());
for(std::size_t i=0, n=flow.source_count(); i<n; ++i) {
auto& s = flow.source_at(i);
tasks_.emplace_back(std::make_unique<consumer_task>(context_, step_, s.acquire_reader(), meta_));
}
return takatori::util::sequence_view{&*(tasks_.begin()), &*(tasks_.end())};
Expand Down
48 changes: 13 additions & 35 deletions mock/jogasaki/executor/exchange/mock/aggregate/flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,6 @@ namespace jogasaki::executor::exchange::mock::aggregate {

using takatori::util::unsafe_downcast;

namespace impl {

flow::source_list_view cast_to_exchange_source(std::vector<std::unique_ptr<mock::aggregate::source>>& vp) {
takatori::util::universal_extractor<exchange::source> ext {
[](void* cursor) -> exchange::source& {
return unsafe_downcast<exchange::source>(**static_cast<std::unique_ptr<mock::aggregate::source>*>(cursor));
},
[](void* cursor, std::ptrdiff_t offset) {
return static_cast<void*>(static_cast<std::unique_ptr<mock::aggregate::source>*>(cursor) + offset); //NOLINT
},
};
return flow::source_list_view{ vp, ext };
}

flow::sink_list_view cast_to_exchange_sink(std::vector<std::unique_ptr<mock::aggregate::sink>>& vp) {
takatori::util::universal_extractor<exchange::sink> ext {
[](void* cursor) -> exchange::sink& {
return unsafe_downcast<exchange::sink>(**static_cast<std::unique_ptr<mock::aggregate::sink>*>(cursor));
},
[](void* cursor, std::ptrdiff_t offset) {
return static_cast<void*>(static_cast<std::unique_ptr<mock::aggregate::sink>*>(cursor) + offset); //NOLINT
},
};
return flow::sink_list_view{ vp, ext };
}

} // namespace impl

flow::~flow() = default;
flow::flow() : info_(std::make_shared<shuffle_info>()) {}
flow::flow(std::shared_ptr<shuffle_info> info,
Expand Down Expand Up @@ -102,7 +74,7 @@ takatori::util::sequence_view<std::shared_ptr<model::task>> flow::create_tasks()
return tasks_;
}

flow::sinks_sources flow::setup_partitions(std::size_t partitions) {
void flow::setup_partitions(std::size_t partitions) {
sinks_.reserve(partitions);
for(std::size_t i=0; i < partitions; ++i) {
sinks_.emplace_back(std::make_unique<mock::aggregate::sink>(downstream_partitions_, info_, context()));
Expand All @@ -111,19 +83,25 @@ flow::sinks_sources flow::setup_partitions(std::size_t partitions) {
for(std::size_t i=0; i < downstream_partitions_; ++i) {
sources_.emplace_back(std::make_unique<source>(info_, context()));
}
}

std::size_t flow::sink_count() const noexcept {
return sinks_.size();
}

return std::pair(impl::cast_to_exchange_sink(sinks_),
impl::cast_to_exchange_source(sources_));
std::size_t flow::source_count() const noexcept {
return sources_.size();
}

flow::sink_list_view flow::sinks() {
return impl::cast_to_exchange_sink(sinks_);
exchange::sink& flow::sink_at(std::size_t index) {
return *sinks_[index];
}

flow::source_list_view flow::sources() {
return impl::cast_to_exchange_source(sources_);
exchange::source& flow::source_at(std::size_t index) {
return *sources_[index];
}


void flow::transfer() {
for(auto& sink : sinks_) {
auto& partitions = sink->input_partitions();
Expand Down
17 changes: 7 additions & 10 deletions mock/jogasaki/executor/exchange/mock/aggregate/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@

namespace jogasaki::executor::exchange::mock::aggregate {

namespace impl {

flow::source_list_view cast_to_exchange_source(std::vector<std::unique_ptr<mock::aggregate::source>>& vp);
flow::sink_list_view cast_to_exchange_sink(std::vector<std::unique_ptr<mock::aggregate::sink>>& vp);

} // namespace impl

/**
* @brief group step data flow
*/
Expand Down Expand Up @@ -88,11 +81,15 @@ class flow : public exchange::flow {

[[nodiscard]] takatori::util::sequence_view<std::shared_ptr<model::task>> create_tasks() override;

[[nodiscard]] sinks_sources setup_partitions(std::size_t partitions) override;
void setup_partitions(std::size_t partitions) override;

[[nodiscard]] std::size_t sink_count() const noexcept override;

[[nodiscard]] std::size_t source_count() const noexcept override;

[[nodiscard]] sink_list_view sinks() override;
[[nodiscard]] exchange::sink& sink_at(std::size_t index) override;

[[nodiscard]] source_list_view sources() override;
[[nodiscard]] exchange::source& source_at(std::size_t index) override;

/**
* @brief transfer the input partitions from sinks to sources
Expand Down
48 changes: 13 additions & 35 deletions src/jogasaki/executor/exchange/aggregate/flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,6 @@ namespace jogasaki::executor::exchange::aggregate {

using takatori::util::unsafe_downcast;

Check warning on line 40 in src/jogasaki/executor/exchange/aggregate/flow.cpp

View workflow job for this annotation

GitHub Actions / Clang-Tidy

misc-unused-using-decls

using decl 'unsafe_downcast' is unused

namespace impl {

flow::source_list_view cast_to_exchange_source(std::vector<std::unique_ptr<aggregate::source>>& vp) {
takatori::util::universal_extractor<exchange::source> ext {
[](void* cursor) -> exchange::source& {
return unsafe_downcast<exchange::source>(**static_cast<std::unique_ptr<aggregate::source>*>(cursor));
},
[](void* cursor, std::ptrdiff_t offset) {
return static_cast<void*>(static_cast<std::unique_ptr<aggregate::source>*>(cursor) + offset); //NOLINT
},
};
return flow::source_list_view{ vp, ext };
}

flow::sink_list_view cast_to_exchange_sink(std::vector<std::unique_ptr<aggregate::sink>>& vp) {
takatori::util::universal_extractor<exchange::sink> ext {
[](void* cursor) -> exchange::sink& {
return unsafe_downcast<exchange::sink>(**static_cast<std::unique_ptr<aggregate::sink>*>(cursor));
},
[](void* cursor, std::ptrdiff_t offset) {
return static_cast<void*>(static_cast<std::unique_ptr<aggregate::sink>*>(cursor) + offset); //NOLINT
},
};
return flow::sink_list_view{ vp, ext };
}

} // namespace impl

flow::~flow() = default;
flow::flow() : info_(std::make_shared<aggregate_info>()) {}
flow::flow(
Expand All @@ -87,7 +59,7 @@ takatori::util::sequence_view<std::shared_ptr<model::task>> flow::create_tasks()
return tasks_;
}

flow::sinks_sources flow::setup_partitions(std::size_t partitions) {
void flow::setup_partitions(std::size_t partitions) {
// assuming aggregate exchange has only one output, so this is called only once
sinks_.reserve(partitions);
for(std::size_t i=0; i < partitions; ++i) {
Expand All @@ -97,16 +69,22 @@ flow::sinks_sources flow::setup_partitions(std::size_t partitions) {
for(std::size_t i=0; i < downstream_partitions_; ++i) {
sources_.emplace_back(std::make_unique<source>(info_, context()));
}
return {impl::cast_to_exchange_sink(sinks_),
impl::cast_to_exchange_source(sources_)};
}

flow::sink_list_view flow::sinks() {
return impl::cast_to_exchange_sink(sinks_);
std::size_t flow::sink_count() const noexcept {
return sinks_.size();
}

std::size_t flow::source_count() const noexcept {
return sources_.size();
}

exchange::sink& flow::sink_at(std::size_t index) {
return *sinks_[index];
}

flow::source_list_view flow::sources() {
return impl::cast_to_exchange_source(sources_);
exchange::source& flow::source_at(std::size_t index) {
return *sources_[index];
}

void flow::transfer() {
Expand Down
17 changes: 7 additions & 10 deletions src/jogasaki/executor/exchange/aggregate/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@

namespace jogasaki::executor::exchange::aggregate {

namespace impl {

flow::source_list_view cast_to_exchange_source(std::vector<std::unique_ptr<aggregate::source>>& vp);
flow::sink_list_view cast_to_exchange_sink(std::vector<std::unique_ptr<aggregate::sink>>& vp);

} // namespace impl

/**
* @brief group step data flow
*/
Expand Down Expand Up @@ -76,11 +69,15 @@ class flow : public shuffle::flow {

[[nodiscard]] takatori::util::sequence_view<std::shared_ptr<model::task>> create_tasks() override;

[[nodiscard]] sinks_sources setup_partitions(std::size_t partitions) override;
void setup_partitions(std::size_t partitions) override;

[[nodiscard]] std::size_t sink_count() const noexcept override;

[[nodiscard]] std::size_t source_count() const noexcept override;

[[nodiscard]] sink_list_view sinks() override;
[[nodiscard]] exchange::sink& sink_at(std::size_t index) override;

[[nodiscard]] source_list_view sources() override;
[[nodiscard]] exchange::source& source_at(std::size_t index) override;

/**
* @brief transfer the input partitions from sinks to sources
Expand Down
30 changes: 18 additions & 12 deletions src/jogasaki/executor/exchange/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,36 @@ namespace jogasaki::executor::exchange {
*/
class flow : public model::flow {
public:
using sink_list_view = takatori::util::reference_list_view<takatori::util::universal_extractor<exchange::sink>>;

using source_list_view = takatori::util::reference_list_view<takatori::util::universal_extractor<exchange::source>>;

using sinks_sources = std::pair<sink_list_view, source_list_view>;

/**
* @brief a function to tell the exchange data flow object about the number of partitions required
* @param partitions the number of partitions requested
* @return list view of sinks and sources newly created by this call
*/
[[nodiscard]] virtual sinks_sources setup_partitions(std::size_t partitions) = 0;
virtual void setup_partitions(std::size_t partitions) = 0;

/**
* @brief accessor for sink count
* @return number of sinks held by this exchange
*/
[[nodiscard]] virtual std::size_t sink_count() const = 0;

/**
* @brief accessor for source count
* @return number of sources held by this exchange
*/
[[nodiscard]] virtual std::size_t source_count() const = 0;

/**
* @brief accessor for sinks
* @return list view of sinks held by this exchange
* @brief accessor for sink
* @return sink object at index
*/
[[nodiscard]] virtual sink_list_view sinks() = 0;
[[nodiscard]] virtual exchange::sink& sink_at(std::size_t index) = 0;

/**
* @brief accessor for sources
* @brief accessor for source
* @return list view of sources held by this exchange
*/
[[nodiscard]] virtual source_list_view sources() = 0;
[[nodiscard]] virtual exchange::source& source_at(std::size_t index) = 0;

[[nodiscard]] takatori::util::sequence_view<std::shared_ptr<model::task>> create_pretask(port_index_type) override {
// exchanges don't have sub input ports
Expand Down
Loading

0 comments on commit 9c893ba

Please sign in to comment.