diff --git a/src/jogasaki/executor/process/flow.cpp b/src/jogasaki/executor/process/flow.cpp index 9420286e..f2cfcd14 100644 --- a/src/jogasaki/executor/process/flow.cpp +++ b/src/jogasaki/executor/process/flow.cpp @@ -125,7 +125,7 @@ sequence_view> flow::create_tasks() { contexts.reserve(partitions); for (std::size_t i=0; i < partitions; ++i) { - contexts.emplace_back(create_task_context(i, operators, sink_idx_base + i)); + contexts.emplace_back(create_task_context(i, operators, sink_idx_base + i, operators.range())); } bool is_rtx = context_->transaction()->option()->type() @@ -163,14 +163,15 @@ model::step_kind flow::kind() const noexcept { std::shared_ptr flow::create_task_context( std::size_t partition, impl::ops::operator_container const& operators, - std::size_t sink_index + std::size_t sink_index, + std::shared_ptr const& range ) { auto external_output = operators.io_exchange_map().external_output(); auto ctx = std::make_shared( *context_, partition, operators.io_exchange_map(), - operators.range(), + range, (context_->record_channel() && external_output != nullptr) ? context_->record_channel().get() : nullptr, sink_index ); diff --git a/src/jogasaki/executor/process/flow.h b/src/jogasaki/executor/process/flow.h index 0979d4de..614675da 100644 --- a/src/jogasaki/executor/process/flow.h +++ b/src/jogasaki/executor/process/flow.h @@ -83,11 +83,10 @@ class cache_align flow : public model::flow { [[nodiscard]] std::shared_ptr create_task_context( std::size_t partition, impl::ops::operator_container const& operators, - std::size_t sink_index + std::size_t sink_index, + std::shared_ptr const& range ); std::size_t check_empty_input_and_calculate_partitions(); }; -} - - +} // namespace jogasaki::executor::process