Skip to content

Commit

Permalink
make group::flow::setup_partitions callable from multiple threads
Browse files Browse the repository at this point in the history
  • Loading branch information
kuron99 committed Oct 27, 2024
1 parent 275305d commit 2d4fa92
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/jogasaki/executor/exchange/group/flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ takatori::util::sequence_view<std::shared_ptr<model::task>> flow::create_tasks()
}

flow::sinks_sources flow::setup_partitions(std::size_t partitions) {
std::unique_lock lk{mutex_};
sinks_.reserve(partitions);
for(std::size_t i=0; i < partitions; ++i) {
sinks_.emplace_back(std::make_unique<group::sink>(downstream_partitions_, info_, context()));
Expand Down
9 changes: 5 additions & 4 deletions src/jogasaki/executor/exchange/group/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ class flow : public shuffle::flow {
using field_index_type = meta::record_meta::field_index_type;

~flow() override;
flow(flow const& other) = default;
flow& operator=(flow const& other) = default;
flow(flow&& other) noexcept = default;
flow& operator=(flow&& other) noexcept = default;
flow(flow const& other) = delete;
flow& operator=(flow const& other) = delete;
flow(flow&& other) noexcept = delete;
flow& operator=(flow&& other) noexcept = delete;

/**
* @brief create new instance with empty schema (for testing)
Expand Down Expand Up @@ -118,6 +118,7 @@ class flow : public shuffle::flow {
class request_context* context_{};
step* owner_{};
std::size_t downstream_partitions_{default_partitions};
std::mutex mutex_{};
};

} // namespace jogasaki::executor::exchange::group

0 comments on commit 2d4fa92

Please sign in to comment.