From b48929ce6f6ca94cb9738702cdf0163df89032a5 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 20 Oct 2023 15:31:24 -0400 Subject: [PATCH 1/5] Reformat indentation in JScheduler.h --- src/libraries/JANA/Engine/JScheduler.h | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/libraries/JANA/Engine/JScheduler.h b/src/libraries/JANA/Engine/JScheduler.h index 01fc3316f..dd5455512 100644 --- a/src/libraries/JANA/Engine/JScheduler.h +++ b/src/libraries/JANA/Engine/JScheduler.h @@ -11,35 +11,35 @@ #include - struct JArrowTopology; +struct JArrowTopology; - /// Scheduler assigns Arrows to Workers in a first-come-first-serve manner, - /// not unlike OpenMP's `schedule dynamic`. - class JScheduler { +/// Scheduler assigns Arrows to Workers in a first-come-first-serve manner, +/// not unlike OpenMP's `schedule dynamic`. +class JScheduler { - private: - std::shared_ptr m_topology; - size_t m_next_idx; - std::mutex m_mutex; +private: + std::shared_ptr m_topology; + size_t m_next_idx; + std::mutex m_mutex; - public: +public: - /// Constructor. Note that a Scheduler operates on a vector of Arrow*s. - JScheduler(std::shared_ptr topology); + /// Constructor. Note that a Scheduler operates on a vector of Arrow*s. + JScheduler(std::shared_ptr topology); - /// Lets a Worker ask the Scheduler for another assignment. If no assignments make sense, - /// Scheduler returns nullptr, which tells that Worker to idle until his next checkin. - /// If next_assignment() makes any changes to internal Scheduler state or to any of its arrows, - /// it must be synchronized. - JArrow* next_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status result); + /// Lets a Worker ask the Scheduler for another assignment. If no assignments make sense, + /// Scheduler returns nullptr, which tells that Worker to idle until his next checkin. + /// If next_assignment() makes any changes to internal Scheduler state or to any of its arrows, + /// it must be synchronized. + JArrow* next_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status result); - /// Lets a Worker tell the scheduler that he is shutting down and won't be working on his assignment - /// any more. The scheduler is thus free to reassign the arrow to one of the remaining workers. - void last_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status result); + /// Lets a Worker tell the scheduler that he is shutting down and won't be working on his assignment + /// any more. The scheduler is thus free to reassign the arrow to one of the remaining workers. + void last_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status result); - /// Logger is public so that somebody else can configure it - JLogger logger; - }; + /// Logger is public so that somebody else can configure it + JLogger logger; +}; #endif // _JSCHEDULER_H_ From d9c307656e6cb3a079d67a791a68cfac8bb185cb Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 21 Oct 2023 22:25:18 -0400 Subject: [PATCH 2/5] Move topology status variable into JScheduler --- src/examples/BlockExample/main.cc | 4 - src/libraries/JANA/Engine/JArrow.h | 4 +- .../JANA/Engine/JArrowProcessingController.cc | 28 ++-- src/libraries/JANA/Engine/JArrowTopology.cc | 111 -------------- src/libraries/JANA/Engine/JArrowTopology.h | 12 -- src/libraries/JANA/Engine/JMultithreading.cc | 2 + src/libraries/JANA/Engine/JMultithreading.h | 48 ++++++ src/libraries/JANA/Engine/JScheduler.cc | 143 ++++++++++++++++-- src/libraries/JANA/Engine/JScheduler.h | 35 ++++- src/libraries/JANA/Engine/JTopologyBuilder.h | 2 - src/programs/tests/ArrowActivationTests.cc | 41 ++--- src/programs/tests/SchedulerTests.cc | 7 +- src/programs/tests/TopologyTests.cc | 13 +- 13 files changed, 266 insertions(+), 184 deletions(-) create mode 100644 src/libraries/JANA/Engine/JMultithreading.cc create mode 100644 src/libraries/JANA/Engine/JMultithreading.h diff --git a/src/examples/BlockExample/main.cc b/src/examples/BlockExample/main.cc index fd062899b..2a372f847 100644 --- a/src/examples/BlockExample/main.cc +++ b/src/examples/BlockExample/main.cc @@ -39,10 +39,6 @@ std::shared_ptr configure_block_topology(std::shared_ptrattach(block_disentangler_arrow); block_disentangler_arrow->attach(processor_arrow); - block_source_arrow->set_running_arrows(&topology->running_arrow_count); - block_disentangler_arrow->set_running_arrows(&topology->running_arrow_count); - processor_arrow->set_running_arrows(&topology->running_arrow_count); - // If you want to add additional processors loaded from plugins, do this like so: for (auto proc : topology->component_manager->get_evt_procs()) { processor_arrow->add_processor(proc); diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Engine/JArrow.h index 18815c8b6..376772aca 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Engine/JArrow.h @@ -44,7 +44,7 @@ class JArrow { // These are protected by the Topology mutex, NOT the Arrow mutex!!! int64_t m_thread_count = 0; // Current number of threads assigned to this arrow std::atomic_int64_t m_running_upstreams {0}; // Current number of running arrows immediately upstream - std::atomic_int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses + int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses std::vector m_listeners; // Downstream Arrows protected: @@ -167,7 +167,7 @@ class JArrow { return m_running_upstreams; } - void set_running_arrows(std::atomic_int64_t* running_arrows_ptr) { + void set_running_arrows(int64_t* running_arrows_ptr) { m_running_arrows = running_arrows_ptr; } diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index baab1f903..5aa7bee5d 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -31,7 +31,7 @@ void JArrowProcessingController::initialize() { m_scheduler->logger = m_scheduler_logger; LOG_INFO(m_logger) << m_topology->mapping << LOG_END; - m_topology->initialize(); + m_scheduler->initialize_topology(); } @@ -46,8 +46,8 @@ void JArrowProcessingController::initialize() { /// @param [in] nthreads The number of worker threads to start void JArrowProcessingController::run(size_t nthreads) { LOG_INFO(m_logger) << "run(): Launching " << nthreads << " workers" << LOG_END; - // topology->run needs to happen _before_ threads are started so that threads don't quit due to lack of assignments - m_topology->run(nthreads); + // run_topology needs to happen _before_ threads are started so that threads don't quit due to lack of assignments + m_scheduler->run_topology(nthreads); bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None); @@ -71,11 +71,11 @@ void JArrowProcessingController::run(size_t nthreads) { void JArrowProcessingController::scale(size_t nthreads) { LOG_INFO(m_logger) << "scale(): Stopping all running workers" << LOG_END; - m_topology->request_pause(); + m_scheduler->request_topology_pause(); for (JWorker* worker : m_workers) { worker->wait_for_stop(); } - m_topology->achieve_pause(); + m_scheduler->achieve_topology_pause(); LOG_INFO(m_logger) << "scale(): All workers are stopped" << LOG_END; bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None); @@ -94,7 +94,7 @@ void JArrowProcessingController::scale(size_t nthreads) { LOG_INFO(m_logger) << "scale(): Restarting " << nthreads << " workers" << LOG_END; // topology->run needs to happen _before_ threads are started so that threads don't quit due to lack of assignments - m_topology->run(nthreads); + m_scheduler->run_topology(nthreads); for (size_t i=0; istart(); @@ -102,7 +102,7 @@ void JArrowProcessingController::scale(size_t nthreads) { } void JArrowProcessingController::request_pause() { - m_topology->request_pause(); + m_scheduler->request_topology_pause(); // Or: // for (JWorker* worker : m_workers) { // worker->request_stop(); @@ -116,7 +116,7 @@ void JArrowProcessingController::wait_until_paused() { // Join all the worker threads. // Do not trigger the pause (we may want the pause to come internally, e.g. from an event source running out.) // Do NOT finish() the topology (we want the ability to be able to restart it) - m_topology->achieve_pause(); + m_scheduler->achieve_topology_pause(); } void JArrowProcessingController::request_stop() { @@ -128,7 +128,7 @@ void JArrowProcessingController::request_stop() { // request_pause => pause // wait_until_stop => join_then_finish // wait_until_pause => join - m_topology->drain(); + m_scheduler->drain_topology(); } void JArrowProcessingController::wait_until_stopped() { @@ -138,18 +138,16 @@ void JArrowProcessingController::wait_until_stopped() { } // finish out the topology // (note some arrows might have already finished e.g. event sources, but that's fine, finish() is idempotent) - m_topology->achieve_pause(); - m_topology->finish(); + m_scheduler->achieve_topology_pause(); + m_scheduler->finish_topology(); } bool JArrowProcessingController::is_stopped() { - // TODO: Protect topology current status - return m_topology->m_current_status == JArrowTopology::Status::Paused; + return m_scheduler->get_topology_status() == JScheduler::TopologyStatus::Paused; } bool JArrowProcessingController::is_finished() { - // TODO: Protect topology current status - return m_topology->m_current_status == JArrowTopology::Status::Finished; + return m_scheduler->get_topology_status() == JScheduler::TopologyStatus::Finished; } bool JArrowProcessingController::is_timed_out() { diff --git a/src/libraries/JANA/Engine/JArrowTopology.cc b/src/libraries/JANA/Engine/JArrowTopology.cc index 0b8b1db64..efb0da2d4 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.cc +++ b/src/libraries/JANA/Engine/JArrowTopology.cc @@ -21,115 +21,4 @@ JArrowTopology::~JArrowTopology() { } } -std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) { - switch(status) { - case JArrowTopology::Status::Uninitialized: os << "Uninitialized"; break; - case JArrowTopology::Status::Running: os << "Running"; break; - case JArrowTopology::Status::Pausing: os << "Pausing"; break; - case JArrowTopology::Status::Paused: os << "Paused"; break; - case JArrowTopology::Status::Finished: os << "Finished"; break; - case JArrowTopology::Status::Draining: os << "Draining"; break; - } - return os; -} - - -/// This needs to be called _before_ launching the worker threads. After this point, everything is initialized. -/// No initialization happens afterwards. -void JArrowTopology::initialize() { - assert(m_current_status == Status::Uninitialized); - for (JArrow* arrow : arrows) { - arrow->initialize(); - } - m_current_status = Status::Paused; -} - -void JArrowTopology::drain() { - if (m_current_status == Status::Finished) { - LOG_DEBUG(m_logger) << "JArrowTopology: drain(): Skipping because topology is already Finished" << LOG_END; - return; - } - LOG_DEBUG(m_logger) << "JArrowTopology: drain()" << LOG_END; - for (auto source : sources) { - if (source->get_status() == JArrow::Status::Running) { - // TODO: I'm considering creating a single TopologyMutex that controls access to - // - scheduler - // - arrow thread counts - // - arrow state - // - running upstream counts - // - running arrow counts - // - total worker count - } - source->pause(); - m_current_status = Status::Draining; - // We pause (as opposed to finish) for two reasons: - // 1. There might be workers in the middle of calling eventSource->GetEvent. - // 2. drain() might be called from a signal handler. It isn't safe to make syscalls during signal handlers - // due to risk of deadlock. (We technically shouldn't even do logging!) - } -} - -void JArrowTopology::run(int nthreads) { - - Status current_status = m_current_status; - if (current_status == Status::Running || current_status == Status::Finished) { - LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => " << current_status << LOG_END; - return; - } - LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => Running" << LOG_END; - - if (sources.empty()) { - throw JException("No event sources found!"); - } - for (auto source : sources) { - // We activate any sources, and everything downstream activates automatically - // Note that this won't affect finished sources. It will, however, stop drain(). - source->run(); - } - // Note that we activate workers AFTER we activate the topology, so no actual processing will have happened - // by this point when we start up the metrics. - metrics.reset(); - metrics.start(nthreads); - m_current_status = Status::Running; -} - -void JArrowTopology::request_pause() { - // This sets all Running arrows to Paused, which prevents Workers from picking up any additional assignments - // Once all Workers have completed their remaining assignments, the scheduler will notify us via achieve_pause(). - Status current_status = m_current_status; - if (current_status == Status::Running) { - LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => Pausing" << LOG_END; - for (auto arrow: arrows) { - arrow->pause(); - // If arrow is not running, pause() is a no-op - } - m_current_status = Status::Pausing; - } - else { - LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => " << current_status << LOG_END; - } -} - -void JArrowTopology::achieve_pause() { - // This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to finish(), etc - Status current_status = m_current_status; - if (current_status == Status::Running || current_status == Status::Pausing || current_status == Status::Draining) { - LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << Status::Paused << LOG_END; - metrics.stop(); - m_current_status = Status::Paused; - } - else { - LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << current_status << LOG_END; - } -} - -/// Finish is called by a single thread once the worker threads have all joined. -void JArrowTopology::finish() { - // This finalizes all arrows. Once this happens, we cannot restart the topology. - // assert(m_current_status == Status::Paused); - for (auto arrow : arrows) { - arrow->finish(); - } - m_current_status = Status::Finished; -} diff --git a/src/libraries/JANA/Engine/JArrowTopology.h b/src/libraries/JANA/Engine/JArrowTopology.h index a4094d3d0..a4e82f13b 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.h +++ b/src/libraries/JANA/Engine/JArrowTopology.h @@ -17,7 +17,6 @@ struct JArrowTopology { - enum class Status { Uninitialized, Paused, Running, Pausing, Draining, Finished }; using Event = std::shared_ptr; using EventQueue = JMailbox; @@ -38,10 +37,6 @@ struct JArrowTopology { std::vector queues; // Queues shared between arrows JProcessorMapping mapping; - std::atomic m_current_status {Status::Uninitialized}; - std::atomic_int64_t running_arrow_count {0}; // Detects when the topology has paused - // int64_t running_worker_count = 0; // Detects when the workers have all joined - size_t event_pool_size = 1; // Will be defaulted to nthreads by builder bool limit_total_events_in_flight = true; bool enable_call_graph_recording = false; @@ -55,13 +50,6 @@ struct JArrowTopology { JLogger m_logger; - void initialize(); - void drain(); - void run(int nthreads); - void request_pause(); - void achieve_pause(); - void finish(); - }; diff --git a/src/libraries/JANA/Engine/JMultithreading.cc b/src/libraries/JANA/Engine/JMultithreading.cc new file mode 100644 index 000000000..3a80c1b6a --- /dev/null +++ b/src/libraries/JANA/Engine/JMultithreading.cc @@ -0,0 +1,2 @@ + +#include "JMultithreading.h" diff --git a/src/libraries/JANA/Engine/JMultithreading.h b/src/libraries/JANA/Engine/JMultithreading.h new file mode 100644 index 000000000..d882b102f --- /dev/null +++ b/src/libraries/JANA/Engine/JMultithreading.h @@ -0,0 +1,48 @@ + + +#pragma once +#include +#include "JArrow.h" +#include "JArrowTopology.h" + +class JMultithreading { + + + std::mutex m_mutex; + + + struct ArrowState { + enum class Status { Unopened, Running, Paused, Finished }; + + JArrow* arrow_ptr; + Status status = Status::Unopened; + bool is_parallel; + int running_upstreams = 0; + int running_instances = 0; + std::vector listeners; + } + + std::vector arrow_states; + int backoff_tries; + int running_arrows; + + std::shared_ptr m_topology; + size_t m_next_idx; + + + +public: + // Previously on JArrow + // Called by Arrow subclasses + void run_arrow(JArrow* arrow); + void pause_arrow(JArrow* arrow); + void finish_arrow(JArrow* arrow); + + // + + + +}; + + +std::ostream& operator<<(std::ostream& os, const JMultithreading::Status& s) diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index d887229d4..8e51d9589 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -28,12 +28,12 @@ JArrow* JScheduler::next_assignment(uint32_t worker_id, JArrow* assignment, JArr assignment->get_type() != JArrow::NodeType::Source && assignment->get_status() == JArrow::Status::Running) { - LOG_DEBUG(logger) << "Deactivating arrow '" << assignment->get_name() << "' (" << m_topology->running_arrow_count - 1 << " remaining)" << LOG_END; + LOG_DEBUG(logger) << "Deactivating arrow '" << assignment->get_name() << "' (" << running_arrow_count - 1 << " remaining)" << LOG_END; assignment->pause(); - assert(m_topology->running_arrow_count >= 0); - if (m_topology->running_arrow_count == 0) { + assert(running_arrow_count >= 0); + if (running_arrow_count == 0) { LOG_DEBUG(logger) << "All arrows deactivated. Deactivating topology." << LOG_END; - m_topology->achieve_pause(); + achieve_topology_pause_unprotected(); } } } @@ -69,25 +69,25 @@ JArrow* JScheduler::next_assignment(uint32_t worker_id, JArrow* assignment, JArr } else { // Candidate can be paused immediately because there is no more work coming - LOG_DEBUG(logger) << "Deactivating arrow '" << candidate->get_name() << "' (" << m_topology->running_arrow_count - 1 << " remaining)" << LOG_END; + LOG_DEBUG(logger) << "Deactivating arrow '" << candidate->get_name() << "' (" << running_arrow_count - 1 << " remaining)" << LOG_END; candidate->pause(); - assert(m_topology->running_arrow_count >= 0); - if (m_topology->running_arrow_count == 0) { + assert(running_arrow_count >= 0); + if (running_arrow_count == 0) { LOG_DEBUG(logger) << "All arrows deactivated. Deactivating topology." << LOG_END; - m_topology->achieve_pause(); + achieve_topology_pause_unprotected(); } } } } while (current_idx != m_next_idx); - if (m_topology->running_arrow_count == 0 && m_topology->m_current_status == JArrowTopology::Status::Running) { + if (running_arrow_count == 0 && m_current_topology_status == TopologyStatus::Running) { // This exists just in case the user provided a topology that cannot self-exit, e.g. because no event sources // have been specified. If no arrows can be assigned to this worker, and no arrows have yet been assigned to any // workers, we can conclude that the topology is dead. After deactivating the topology, is_stopped() returns true, // and Run(true) terminates. LOG_DEBUG(logger) << "No active arrows found. Deactivating topology." << LOG_END; - m_topology->achieve_pause(); + achieve_topology_pause_unprotected(); } m_mutex.unlock(); @@ -108,4 +108,127 @@ void JScheduler::last_assignment(uint32_t worker_id, JArrow* assignment, JArrowM } +void JScheduler::initialize_topology() { + std::lock_guard lock(m_mutex); + + assert(m_current_topology_status == TopologyStatus::Uninitialized); + for (JArrow* arrow : m_topology->arrows) { + arrow->initialize(); + } + m_current_topology_status = TopologyStatus::Paused; +} + + +void JScheduler::drain_topology() { + std::lock_guard lock(m_mutex); + if (m_current_topology_status == TopologyStatus::Finished) { + LOG_DEBUG(logger) << "JScheduler: drain(): Skipping because topology is already Finished" << LOG_END; + return; + } + LOG_DEBUG(logger) << "JScheduler: drain_topology()" << LOG_END; + for (auto source : m_topology->sources) { + source->pause(); + m_current_topology_status = TopologyStatus::Draining; + // We pause (as opposed to finish) for two reasons: + // 1. There might be workers in the middle of calling eventSource->GetEvent. + // 2. drain() might be called from a signal handler. It isn't safe to make syscalls during signal handlers + // due to risk of deadlock. (We technically shouldn't even do logging!) + } + +} + +void JScheduler::run_topology(int nthreads) { + std::lock_guard lock(m_mutex); + TopologyStatus current_status = m_current_topology_status; + if (current_status == TopologyStatus::Running || current_status == TopologyStatus::Finished) { + LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => " << current_status << LOG_END; + return; + } + LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => Running" << LOG_END; + + for (auto arrow : m_topology->arrows) { + arrow->set_running_arrows(&running_arrow_count); + } + + if (m_topology->sources.empty()) { + throw JException("No event sources found!"); + } + for (auto source : m_topology->sources) { + // We activate any sources, and everything downstream activates automatically + // Note that this won't affect finished sources. It will, however, stop drain(). + source->run(); + } + // Note that we activate workers AFTER we activate the topology, so no actual processing will have happened + // by this point when we start up the metrics. + m_topology->metrics.reset(); + m_topology->metrics.start(nthreads); + m_current_topology_status = TopologyStatus::Running; +} + +void JScheduler::request_topology_pause() { + std::lock_guard lock(m_mutex); + // This sets all Running arrows to Paused, which prevents Workers from picking up any additional assignments + // Once all Workers have completed their remaining assignments, the scheduler will call achieve_pause(). + TopologyStatus current_status = m_current_topology_status; + if (current_status == TopologyStatus::Running) { + LOG_DEBUG(logger) << "JScheduler: request_pause() : " << current_status << " => Pausing" << LOG_END; + for (auto arrow: m_topology->arrows) { + arrow->pause(); + // If arrow is not running, pause() is a no-op + } + m_current_topology_status = TopologyStatus::Pausing; + } + else { + LOG_DEBUG(logger) << "JScheduler: request_pause() : " << current_status << " => " << current_status << LOG_END; + } +} + +void JScheduler::achieve_topology_pause() { + + std::lock_guard lock(m_mutex); + achieve_topology_pause_unprotected(); +} + +void JScheduler::achieve_topology_pause_unprotected() { + + // This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to finish(), etc + TopologyStatus current_status = m_current_topology_status; + if (current_status == TopologyStatus::Running || current_status == TopologyStatus::Pausing || current_status == TopologyStatus::Draining) { + LOG_DEBUG(logger) << "JScheduler: achieve_topology_pause() : " << current_status << " => " << TopologyStatus::Paused << LOG_END; + m_topology->metrics.stop(); + m_current_topology_status = TopologyStatus::Paused; + } + else { + LOG_DEBUG(logger) << "JScheduler: achieve_topology_pause() : " << current_status << " => " << current_status << LOG_END; + } +} + +void JScheduler::finish_topology() { + std::lock_guard lock(m_mutex); + // This finalizes all arrows. Once this happens, we cannot restart the topology. + // assert(m_current_topology_status == TopologyStatus::Paused); + for (auto arrow : m_topology->arrows) { + arrow->finish(); + } + m_current_topology_status = TopologyStatus::Finished; +} + +JScheduler::TopologyStatus JScheduler::get_topology_status() { + std::lock_guard lock(m_mutex); + return m_current_topology_status; +} + + + +std::ostream& operator<<(std::ostream& os, JScheduler::TopologyStatus status) { + switch(status) { + case JScheduler::TopologyStatus::Uninitialized: os << "Uninitialized"; break; + case JScheduler::TopologyStatus::Running: os << "Running"; break; + case JScheduler::TopologyStatus::Pausing: os << "Pausing"; break; + case JScheduler::TopologyStatus::Paused: os << "Paused"; break; + case JScheduler::TopologyStatus::Finished: os << "Finished"; break; + case JScheduler::TopologyStatus::Draining: os << "Draining"; break; + } + return os; +} diff --git a/src/libraries/JANA/Engine/JScheduler.h b/src/libraries/JANA/Engine/JScheduler.h index dd5455512..5bbf286be 100644 --- a/src/libraries/JANA/Engine/JScheduler.h +++ b/src/libraries/JANA/Engine/JScheduler.h @@ -16,17 +16,31 @@ struct JArrowTopology; /// Scheduler assigns Arrows to Workers in a first-come-first-serve manner, /// not unlike OpenMP's `schedule dynamic`. class JScheduler { +public: + enum class TopologyStatus { Uninitialized, Paused, Running, Pausing, Draining, Finished }; private: + // This mutex controls ALL scheduler state + std::mutex m_mutex; + std::shared_ptr m_topology; size_t m_next_idx; - std::mutex m_mutex; + + // Topology-level state + TopologyStatus m_current_topology_status = TopologyStatus::Uninitialized; + int64_t running_arrow_count {0}; // Detects when the topology has paused + //int64_t running_worker_count = 0; // Detects when the workers have all joined // TODO: Why is this commented out? + + // Arrow-level state public: /// Constructor. Note that a Scheduler operates on a vector of Arrow*s. JScheduler(std::shared_ptr topology); + + // Worker-facing operations + /// Lets a Worker ask the Scheduler for another assignment. If no assignments make sense, /// Scheduler returns nullptr, which tells that Worker to idle until his next checkin. /// If next_assignment() makes any changes to internal Scheduler state or to any of its arrows, @@ -39,8 +53,27 @@ class JScheduler { /// Logger is public so that somebody else can configure it JLogger logger; + + + // JArrowProcessingController-facing operations + + void initialize_topology(); + void drain_topology(); + void run_topology(int nthreads); + void request_topology_pause(); + void achieve_topology_pause(); + void finish_topology(); + TopologyStatus get_topology_status(); + + // Arrow-facing operations + + +private: + void achieve_topology_pause_unprotected(); + }; +std::ostream& operator<<(std::ostream& os, JScheduler::TopologyStatus status); #endif // _JSCHEDULER_H_ diff --git a/src/libraries/JANA/Engine/JTopologyBuilder.h b/src/libraries/JANA/Engine/JTopologyBuilder.h index 4924923c0..512402828 100644 --- a/src/libraries/JANA/Engine/JTopologyBuilder.h +++ b/src/libraries/JANA/Engine/JTopologyBuilder.h @@ -166,13 +166,11 @@ class JTopologyBuilder : public JService { m_topology->sources.push_back(arrow); arrow->set_chunksize(m_event_source_chunksize); arrow->set_logger(m_arrow_logger); - arrow->set_running_arrows(&m_topology->running_arrow_count); auto proc_arrow = new JEventProcessorArrow("processors", queue, nullptr, m_topology->event_pool); proc_arrow->set_chunksize(m_event_processor_chunksize); proc_arrow->set_logger(m_arrow_logger); - proc_arrow->set_running_arrows(&m_topology->running_arrow_count); m_topology->arrows.push_back(proc_arrow); for (auto proc: m_components->get_evt_procs()) { diff --git a/src/programs/tests/ArrowActivationTests.cc b/src/programs/tests/ArrowActivationTests.cc index 65b823067..a38984ee7 100644 --- a/src/programs/tests/ArrowActivationTests.cc +++ b/src/programs/tests/ArrowActivationTests.cc @@ -7,6 +7,8 @@ #include #include +#include + JArrowMetrics::Status steppe(JArrow* arrow) { if (arrow->get_type() != JArrow::NodeType::Source && @@ -30,7 +32,8 @@ TEST_CASE("ArrowActivationTests") { SubOneProcessor p2; SumSink sink; - JArrowTopology topology; + auto topology = std::make_shared(); + JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -45,18 +48,18 @@ TEST_CASE("ArrowActivationTests") { multiply_by_two->attach(subtract_one); subtract_one->attach(sum_everything); - topology.sources.push_back(emit_rand_ints); + topology->sources.push_back(emit_rand_ints); - topology.arrows.push_back(emit_rand_ints); - topology.arrows.push_back(multiply_by_two); - topology.arrows.push_back(subtract_one); - topology.arrows.push_back(sum_everything); - topology.sinks.push_back(sum_everything); + topology->arrows.push_back(emit_rand_ints); + topology->arrows.push_back(multiply_by_two); + topology->arrows.push_back(subtract_one); + topology->arrows.push_back(sum_everything); + topology->sinks.push_back(sum_everything); emit_rand_ints->set_chunksize(1); auto logger = JLogger(JLogger::Level::OFF); - topology.m_logger = logger; + topology->m_logger = logger; source.logger = logger; @@ -84,7 +87,7 @@ TEST_CASE("ArrowActivationTests") { REQUIRE(subtract_one->get_status() == JArrow::Status::Unopened); REQUIRE(sum_everything->get_status() == JArrow::Status::Unopened); - topology.run(1); + scheduler.run_topology(1); // TODO: Check that initialize has been called, but not finalize REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Running); @@ -105,7 +108,8 @@ TEST_CASE("ActivableDeactivationTests") { SubOneProcessor p2; SumSink sink; - JArrowTopology topology; + auto topology = std::make_shared(); + JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -120,15 +124,15 @@ TEST_CASE("ActivableDeactivationTests") { multiply_by_two->attach(subtract_one); subtract_one->attach(sum_everything); - topology.sources.push_back(emit_rand_ints); - topology.arrows.push_back(emit_rand_ints); - topology.arrows.push_back(multiply_by_two); - topology.arrows.push_back(subtract_one); - topology.arrows.push_back(sum_everything); - topology.sinks.push_back(sum_everything); + topology->sources.push_back(emit_rand_ints); + topology->arrows.push_back(emit_rand_ints); + topology->arrows.push_back(multiply_by_two); + topology->arrows.push_back(subtract_one); + topology->arrows.push_back(sum_everything); + topology->sinks.push_back(sum_everything); auto logger = JLogger(JLogger::Level::OFF); - topology.m_logger = logger; + topology->m_logger = logger; source.logger = logger; REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Unopened); @@ -136,7 +140,7 @@ TEST_CASE("ActivableDeactivationTests") { REQUIRE(subtract_one->get_status() == JArrow::Status::Unopened); REQUIRE(sum_everything->get_status() == JArrow::Status::Unopened); - topology.run(1); + scheduler.run_topology(1); REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Running); REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); @@ -159,4 +163,3 @@ TEST_CASE("ActivableDeactivationTests") { - diff --git a/src/programs/tests/SchedulerTests.cc b/src/programs/tests/SchedulerTests.cc index b438bdcec..3b76322a5 100644 --- a/src/programs/tests/SchedulerTests.cc +++ b/src/programs/tests/SchedulerTests.cc @@ -17,6 +17,7 @@ TEST_CASE("SchedulerTests") { SumSink sink; auto topology = std::make_shared(); + JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -39,7 +40,7 @@ TEST_CASE("SchedulerTests") { topology->sinks.push_back(sum_everything); emit_rand_ints->set_chunksize(1); - topology->run(1); + scheduler.run_topology(1); JArrow* assignment; JArrowMetrics::Status last_result; @@ -49,7 +50,6 @@ TEST_CASE("SchedulerTests") { auto logger = JLogger(JLogger::Level::OFF); - JScheduler scheduler(topology); last_result = JArrowMetrics::Status::ComeBackLater; assignment = nullptr; @@ -119,9 +119,10 @@ TEST_CASE("SchedulerRoundRobinBehaviorTests") { topology->sinks.push_back(sum_everything); emit_rand_ints->set_chunksize(1); - topology->run(1); JScheduler scheduler(topology); + scheduler.run_topology(1); + auto logger = JLogger(JLogger::Level::OFF); auto last_result = JArrowMetrics::Status::ComeBackLater; diff --git a/src/programs/tests/TopologyTests.cc b/src/programs/tests/TopologyTests.cc index f63b451bf..28ebd8524 100644 --- a/src/programs/tests/TopologyTests.cc +++ b/src/programs/tests/TopologyTests.cc @@ -8,6 +8,7 @@ #include #include #include +#include JArrowMetrics::Status step(JArrow* arrow) { @@ -41,6 +42,7 @@ TEST_CASE("JTopology: Basic functionality") { SumSink sink; auto topology = std::make_shared(); + JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -99,7 +101,7 @@ TEST_CASE("JTopology: Basic functionality") { SECTION("After emitting") { LOG_INFO(logger) << "After emitting; should be something in q0" << LOG_END; - topology->run(1); + scheduler.run_topology(1); step(emit_rand_ints); REQUIRE(multiply_by_two->get_pending() == 1); @@ -120,7 +122,7 @@ TEST_CASE("JTopology: Basic functionality") { SECTION("Running each stage sequentially yields the correct results") { //LOG_INFO(logger) << "Running each stage sequentially yields the correct results" << LOG_END; - topology->run(1); + scheduler.run_topology(1); for (int i = 0; i < 20; ++i) { step(emit_rand_ints); @@ -162,7 +164,7 @@ TEST_CASE("JTopology: Basic functionality") { results["sum_everything"] = JArrowMetrics::Status::KeepGoing; // Put something in the queue to get started - topology->run(1); + scheduler.run_topology(1); step(emit_rand_ints); bool work_left = true; @@ -198,7 +200,7 @@ TEST_CASE("JTopology: Basic functionality") { topology->m_logger = logger; source.logger = logger; - topology->run(1); + scheduler.run_topology(1); REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Running); REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); @@ -254,7 +256,8 @@ TEST_CASE("JTopology: Basic functionality") { app.Run(true); - REQUIRE(topology->m_current_status == JArrowTopology::Status::Finished); + // TODO: Access the JScheduler managed by JApplication + // REQUIRE(scheduler.get_topology_status() == JScheduler::TopologyStatus::Finished); REQUIRE(sum_everything->get_status() == JArrow::Status::Finished); REQUIRE(sink.sum == 20 * 13); From 326b09e9891c935ddbfce99cf29934cfa8e00e62 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 27 Oct 2023 14:56:28 -0400 Subject: [PATCH 3/5] Refactor JScheduler state to include Arrow state --- src/libraries/JANA/Engine/JArrow.h | 105 +---- .../JANA/Engine/JArrowPerfSummary.cc | 7 +- src/libraries/JANA/Engine/JArrowPerfSummary.h | 1 - .../JANA/Engine/JArrowProcessingController.cc | 67 +-- src/libraries/JANA/Engine/JBlockSourceArrow.h | 6 - .../JANA/Engine/JEventSourceArrow.cc | 3 +- src/libraries/JANA/Engine/JScheduler.cc | 408 +++++++++++++----- src/libraries/JANA/Engine/JScheduler.h | 56 ++- src/programs/tests/SinkArrow.h | 10 - src/programs/tests/SourceArrow.h | 5 - src/programs/tests/TopologyTests.cc | 2 +- 11 files changed, 375 insertions(+), 295 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Engine/JArrow.h index 376772aca..99b92c103 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Engine/JArrow.h @@ -18,11 +18,11 @@ class JArrow { public: - enum class Status { Unopened, Running, Paused, Finished }; enum class NodeType {Source, Sink, Stage, Group}; enum class BackoffStrategy { Constant, Linear, Exponential }; using duration_t = std::chrono::steady_clock::duration; + private: // Info const std::string m_name; // Used for human understanding @@ -30,6 +30,8 @@ class JArrow { const NodeType m_type; JArrowMetrics m_metrics; // Performance information accumulated over all workers + mutable std::mutex m_arrow_mutex; // Protects access to arrow properties + // Knobs size_t m_chunksize = 1; // Number of items to pop off the input queue at once BackoffStrategy m_backoff_strategy = BackoffStrategy::Exponential; @@ -37,14 +39,7 @@ class JArrow { duration_t m_checkin_time = std::chrono::milliseconds(500); unsigned m_backoff_tries = 4; - mutable std::mutex m_arrow_mutex; // Protects access to arrow properties, except m_status - std::atomic m_status {Status::Unopened}; - - // Scheduler stats - // These are protected by the Topology mutex, NOT the Arrow mutex!!! - int64_t m_thread_count = 0; // Current number of threads assigned to this arrow - std::atomic_int64_t m_running_upstreams {0}; // Current number of running arrows immediately upstream - int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses + friend class JScheduler; std::vector m_listeners; // Downstream Arrows protected: @@ -117,16 +112,6 @@ class JArrow { m_checkin_time = checkin_time; } - void update_thread_count(int thread_count_delta) { - std::lock_guard lock(m_arrow_mutex); - m_thread_count += thread_count_delta; - } - - size_t get_thread_count() { - std::lock_guard lock(m_arrow_mutex); - return m_thread_count; - } - // TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize JArrowMetrics& get_metrics() { return m_metrics; @@ -157,79 +142,6 @@ class JArrow { virtual void set_threshold(size_t /* threshold */) {} - - - Status get_status() const { - return m_status; - } - - int64_t get_running_upstreams() const { - return m_running_upstreams; - } - - void set_running_arrows(int64_t* running_arrows_ptr) { - m_running_arrows = running_arrows_ptr; - } - - void run() { - Status status = m_status; - - // if (status == Status::Unopened) { - // LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run(): Not initialized!" << LOG_END; - // throw JException("Arrow %s has not been initialized!", m_name.c_str()); - // } - // if (status == Status::Running || m_status == Status::Finished) { - // LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END; - // return; - // } - LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => Running" << LOG_END; - if (m_running_arrows != nullptr) (*m_running_arrows)++; - for (auto listener: m_listeners) { - listener->m_running_upstreams++; - listener->run(); // Activating something recursively activates everything downstream. - } - m_status = Status::Running; - } - - void pause() { - Status status = m_status; - if (status != Status::Running) { - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => " << status << LOG_END; - return; // pause() is a no-op unless running - } - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => Paused" << LOG_END; - if (m_running_arrows != nullptr) (*m_running_arrows)--; - for (auto listener: m_listeners) { - listener->m_running_upstreams--; - // listener->pause(); - // This is NOT a sufficient condition for pausing downstream listeners. - // What we need is zero running upstreams AND zero messages in queue AND zero threads currently processing - // Correspondingly, the scheduler or worker needs to be the one to call pause() when this condition is reached. - } - m_status = Status::Paused; - } - - void finish() { - Status status = m_status; - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END; - Status old_status = m_status; - // if (old_status == Status::Unopened) { - // LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END; - // throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str()); - // } - if (old_status == Status::Running) { - if (m_running_arrows != nullptr) (*m_running_arrows)--; - for (auto listener: m_listeners) { - listener->m_running_upstreams--; - } - } - if (old_status != Status::Finished) { - LOG_TRACE(m_logger) << "JArrow '" << m_name << "': Finalizing (this must only happen once)" << LOG_END; - this->finalize(); - } - m_status = Status::Finished; - } - void attach(JArrow* downstream) { m_listeners.push_back(downstream); }; @@ -247,14 +159,5 @@ inline std::ostream& operator<<(std::ostream& os, const JArrow::NodeType& nt) { return os; } -inline std::ostream& operator<<(std::ostream& os, const JArrow::Status& s) { - switch (s) { - case JArrow::Status::Unopened: os << "Unopened"; break; - case JArrow::Status::Running: os << "Running"; break; - case JArrow::Status::Paused: os << "Paused"; break; - case JArrow::Status::Finished: os << "Finished"; break; - } - return os; -} #endif // GREENFIELD_ARROW_H diff --git a/src/libraries/JANA/Engine/JArrowPerfSummary.cc b/src/libraries/JANA/Engine/JArrowPerfSummary.cc index 0feba0426..4d92c8b57 100644 --- a/src/libraries/JANA/Engine/JArrowPerfSummary.cc +++ b/src/libraries/JANA/Engine/JArrowPerfSummary.cc @@ -22,14 +22,13 @@ std::ostream& operator<<(std::ostream& os, const JArrowPerfSummary& s) { os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl; os << std::endl; - os << " +--------------------------+------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; - os << " | Name | Status | Type | Par | Threads | Chunk | Thresh | Pending | Completed |" << std::endl; - os << " +--------------------------+------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; + os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; + os << " | Name | Type | Par | Threads | Chunk | Thresh | Pending | Completed |" << std::endl; + os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; for (auto as : s.arrows) { os << " | " << std::setw(24) << std::left << as.arrow_name << " | " - << std::setw(11) << as.status << "| " << std::setw(6) << std::left << as.arrow_type << " | " << std::setw(3) << std::right << (as.is_parallel ? " T " : " F ") << " | " << std::setw(7) << as.thread_count << " |" diff --git a/src/libraries/JANA/Engine/JArrowPerfSummary.h b/src/libraries/JANA/Engine/JArrowPerfSummary.h index 34ba94f0d..ae093465f 100644 --- a/src/libraries/JANA/Engine/JArrowPerfSummary.h +++ b/src/libraries/JANA/Engine/JArrowPerfSummary.h @@ -20,7 +20,6 @@ struct ArrowSummary { JArrow::NodeType arrow_type; int running_upstreams; bool has_backpressure; - JArrow::Status status; size_t messages_pending; size_t threshold; size_t chunksize; diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index 5aa7bee5d..14ed2321e 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -147,7 +147,7 @@ bool JArrowProcessingController::is_stopped() { } bool JArrowProcessingController::is_finished() { - return m_scheduler->get_topology_status() == JScheduler::TopologyStatus::Finished; + return m_scheduler->get_topology_status() == JScheduler::TopologyStatus::Finalized; } bool JArrowProcessingController::is_timed_out() { @@ -232,11 +232,11 @@ std::unique_ptr JArrowProcessingController::measure_int // Measure perf on all Workers first, as this will prompt them to publish // any ArrowMetrics they have collected - m_perf_summary.workers.clear(); - for (JWorker* worker : m_workers) { - WorkerSummary summary; - worker->measure_perf(summary); - m_perf_summary.workers.push_back(summary); + if (m_perf_summary.workers.size() != m_workers.size()) { + m_perf_summary.workers = std::vector(m_workers.size()); + } + for (size_t i=0; imeasure_perf(m_perf_summary.workers[i]); } size_t monotonic_event_count = 0; @@ -251,59 +251,16 @@ std::unique_ptr JArrowProcessingController::measure_int double worst_seq_latency = 0; double worst_par_latency = 0; - m_perf_summary.arrows.clear(); - for (JArrow* arrow : m_topology->arrows) { - JArrowMetrics::Status last_status; - size_t total_message_count; - size_t last_message_count; - size_t total_queue_visits; - size_t last_queue_visits; - JArrowMetrics::duration_t total_latency; - JArrowMetrics::duration_t last_latency; - JArrowMetrics::duration_t total_queue_latency; - JArrowMetrics::duration_t last_queue_latency; - - arrow->get_metrics().get(last_status, total_message_count, last_message_count, total_queue_visits, - last_queue_visits, total_latency, last_latency, total_queue_latency, last_queue_latency); - - auto total_latency_ms = millisecs(total_latency).count(); - auto total_queue_latency_ms = millisecs(total_queue_latency).count(); - - ArrowSummary summary; - summary.arrow_type = arrow->get_type(); - summary.is_parallel = arrow->is_parallel(); - summary.thread_count = arrow->get_thread_count(); - summary.arrow_name = arrow->get_name(); - summary.chunksize = arrow->get_chunksize(); - summary.messages_pending = arrow->get_pending(); - summary.running_upstreams = arrow->get_running_upstreams(); - summary.threshold = arrow->get_threshold(); - summary.status = arrow->get_status(); - - summary.total_messages_completed = total_message_count; - summary.last_messages_completed = last_message_count; - summary.queue_visit_count = total_queue_visits; - - summary.avg_queue_latency_ms = (total_queue_visits == 0) - ? std::numeric_limits::infinity() - : total_queue_latency_ms / total_queue_visits; - - summary.avg_queue_overhead_frac = total_queue_latency_ms / (total_queue_latency_ms + total_latency_ms); - - summary.avg_latency_ms = (total_message_count == 0) - ? std::numeric_limits::infinity() - : total_latency_ms/total_message_count; - - summary.last_latency_ms = (last_message_count == 0) - ? std::numeric_limits::infinity() - : millisecs(last_latency).count()/last_message_count; - - if (arrow->is_parallel()) { + m_scheduler->summarize_arrows(m_perf_summary.arrows); + + + // Figure out what the bottlenecks in this topology are + for (const ArrowSummary& summary : m_perf_summary.arrows) { + if (summary.is_parallel) { worst_par_latency = std::max(worst_par_latency, summary.avg_latency_ms); } else { worst_seq_latency = std::max(worst_seq_latency, summary.avg_latency_ms); } - m_perf_summary.arrows.push_back(summary); } // bottlenecks diff --git a/src/libraries/JANA/Engine/JBlockSourceArrow.h b/src/libraries/JANA/Engine/JBlockSourceArrow.h index ecf5966de..fd3908785 100644 --- a/src/libraries/JANA/Engine/JBlockSourceArrow.h +++ b/src/libraries/JANA/Engine/JBlockSourceArrow.h @@ -64,19 +64,13 @@ class JBlockSourceArrow : public JArrow { } else if (lambda_result == Status::FailTryAgain) { status = JArrowMetrics::Status::ComeBackLater; - // TODO: Consider reporting queueempty/queuefull/sourcebusy here instead } else if (lambda_result == Status::FailFinished) { - finish(); - // TODO: finish() really ought to be called by JScheduler. - // However, it is not, nor is it being called for JEventSourceArrow either. - // I'm going to try to fix this when I get around to parallel event sources (issue #72) status = JArrowMetrics::Status::Finished; } } else { // reserved_count = 0 => downstream is full status = JArrowMetrics::Status::ComeBackLater; - // TODO: Consider reporting queueempty/queuefull/sourcebusy here instead } result.update(status, message_count, 1, latency, overhead); } diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.cc b/src/libraries/JANA/Engine/JEventSourceArrow.cc index df503c9cc..50cf07d90 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.cc +++ b/src/libraries/JANA/Engine/JEventSourceArrow.cc @@ -93,7 +93,8 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { JArrowMetrics::Status status; if (in_status == JEventSource::ReturnStatus::Finished) { - finish(); + // finish(); + // TODO: NWB: It is extra very important to not call finish() here status = JArrowMetrics::Status::Finished; } else if (in_status == JEventSource::ReturnStatus::Success && out_status == EventQueue::Status::Ready) { diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index 8e51d9589..13ed2810f 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -2,6 +2,9 @@ // Copyright 2020, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. +#include + +#include "JScheduler.h" #include #include #include @@ -9,102 +12,146 @@ JScheduler::JScheduler(std::shared_ptr topology) : m_topology(topology) - , m_next_idx(0) { + m_topology_state.next_arrow_index = 0; + + // Keep track of downstream arrows + std::map arrow_map; + size_t i=0; + for (auto* arrow : topology->arrows) { + arrow_map[arrow] = i++; + } + m_topology_state.arrow_states = std::vector(topology->arrows.size()); + + for (i=0; iarrows.size(); ++i) { + auto& as = m_topology_state.arrow_states[i]; + JArrow* arrow = topology->arrows[i]; + as.arrow = arrow; + for (JArrow* downstream : arrow->m_listeners) { + as.downstream_arrow_indices.push_back(arrow_map[downstream]); + } + } } JArrow* JScheduler::next_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status last_result) { - m_mutex.lock(); + std::lock_guard lock(m_mutex); + + LOG_DEBUG(logger) << "Worker " << worker_id << " checking in: " + << ((assignment == nullptr) ? "idle" : assignment->get_name()) << " -> " << to_string(last_result) << LOG_END; // Check latest arrow back in if (assignment != nullptr) { - assignment->update_thread_count(-1); - - if (assignment->get_running_upstreams() == 0 && - assignment->get_pending() == 0 && - assignment->get_thread_count() == 0 && - assignment->get_type() != JArrow::NodeType::Source && - assignment->get_status() == JArrow::Status::Running) { - - LOG_DEBUG(logger) << "Deactivating arrow '" << assignment->get_name() << "' (" << running_arrow_count - 1 << " remaining)" << LOG_END; - assignment->pause(); - assert(running_arrow_count >= 0); - if (running_arrow_count == 0) { - LOG_DEBUG(logger) << "All arrows deactivated. Deactivating topology." << LOG_END; - achieve_topology_pause_unprotected(); - } - } + checkin_unprotected(assignment, last_result); } - // Choose a new arrow. Loop over all arrows, starting at where we last left off, and pick the first - // arrow that works - size_t current_idx = m_next_idx; - do { - JArrow* candidate = m_topology->arrows[current_idx]; - current_idx += 1; - current_idx %= m_topology->arrows.size(); + JArrow* next = checkout_unprotected(); - if (candidate->get_status() == JArrow::Status::Running && - (candidate->is_parallel() || candidate->get_thread_count() == 0)) { + LOG_DEBUG(logger) << "Worker " << worker_id << " assigned: " + << ((next == nullptr) ? "idle" : next->get_name()) << LOG_END; + return next; - // Found a plausible candidate. +} - if (candidate->get_type() == JArrow::NodeType::Source || - candidate->get_running_upstreams() > 0 || - candidate->get_pending() > 0) { - // Candidate still has work they can do - m_next_idx = current_idx; // Next time, continue right where we left off - candidate->update_thread_count(1); +void JScheduler::last_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status last_result) { - LOG_DEBUG(logger) << "Worker " << worker_id << ", " - << ((assignment == nullptr) ? "idle" : assignment->get_name()) - << ", " << to_string(last_result) << " => " - << candidate->get_name() << " [" << candidate->get_thread_count() << " threads]" << LOG_END; - m_mutex.unlock(); - return candidate; + std::lock_guard lock(m_mutex); - } - else { - // Candidate can be paused immediately because there is no more work coming - LOG_DEBUG(logger) << "Deactivating arrow '" << candidate->get_name() << "' (" << running_arrow_count - 1 << " remaining)" << LOG_END; - candidate->pause(); - assert(running_arrow_count >= 0); - if (running_arrow_count == 0) { - LOG_DEBUG(logger) << "All arrows deactivated. Deactivating topology." << LOG_END; - achieve_topology_pause_unprotected(); - } - } + LOG_DEBUG(logger) << "Worker " << worker_id << " checking in: " + << ((assignment == nullptr) ? "idle" : assignment->get_name()) + << " -> " << to_string(last_result) << "). Shutting down!" << LOG_END; + + if (assignment != nullptr) { + checkin_unprotected(assignment, last_result); + } +} + + + +void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status last_result) { + // Find index of arrow + size_t index = 0; + while (m_topology_state.arrow_states[index].arrow != assignment) index++; + assert(index < m_topology_state.arrow_states.size()); + + // Decrement arrow's thread count + ArrowState& as = m_topology_state.arrow_states[index]; + as.thread_count -= 1; + + bool found_inactive_source = + assignment->get_type() == JArrow::NodeType::Source && + last_result == JArrowMetrics::Status::Finished && // Only Sources get to declare themselves finished! + as.status == ArrowStatus::Active; // We only want to deactivate once + + + bool found_draining_stage_or_sink = + assignment->get_type() != JArrow::NodeType::Source && // We aren't a source + as.active_or_draining_upstream_arrow_count == 0 && // All upstreams arrows are inactive + assignment->get_pending() == 0 && // All upstream queues are empty + as.thread_count > 0 && // There are other workers still assigned to this arrow + as.status == ArrowStatus::Active; // We only want to deactivate once + + + bool found_inactive_stage_or_sink = + assignment->get_type() != JArrow::NodeType::Source && // We aren't a source + as.active_or_draining_upstream_arrow_count == 0 && // All upstreams arrows are inactive + assignment->get_pending() == 0 && // All upstream queues are empty + as.thread_count == 0 && // There are NO other workers still assigned to this arrow + (as.status == ArrowStatus::Draining || // We only want to deactivate once + as.status == ArrowStatus::Active); + + + if (found_inactive_source || found_inactive_stage_or_sink) { + + // Deactivate arrow + as.status = ArrowStatus::Inactive; + m_topology_state.active_or_draining_arrow_count--; + + LOG_DEBUG(logger) << "Deactivated arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END; + + for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) { + m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--; } + } + else if (found_draining_stage_or_sink) { + // Drain arrow + as.status = ArrowStatus::Draining; + LOG_DEBUG(logger) << "Draining arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END; + } - } while (current_idx != m_next_idx); + if (found_inactive_source) { + // Sources are finalized immediately, unlike other arrows. This is to facilitate closing resources like sockets and file handles + assignment->finalize(); + } - if (running_arrow_count == 0 && m_current_topology_status == TopologyStatus::Running) { - // This exists just in case the user provided a topology that cannot self-exit, e.g. because no event sources - // have been specified. If no arrows can be assigned to this worker, and no arrows have yet been assigned to any - // workers, we can conclude that the topology is dead. After deactivating the topology, is_stopped() returns true, - // and Run(true) terminates. - LOG_DEBUG(logger) << "No active arrows found. Deactivating topology." << LOG_END; + // Test if this was the last arrow running + if (m_topology_state.active_or_draining_arrow_count == 0) { + LOG_DEBUG(logger) << "All arrows are inactive. Deactivating topology." << LOG_END; achieve_topology_pause_unprotected(); } - m_mutex.unlock(); - - return nullptr; // We've looped through everything with no luck } +JArrow* JScheduler::checkout_unprotected() { -void JScheduler::last_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status result) { + // Choose a new arrow. Loop over all arrows, starting at where we last left off, and pick the first arrow that works + size_t current_idx = m_topology_state.next_arrow_index; + do { + ArrowState& candidate = m_topology_state.arrow_states[current_idx]; + current_idx += 1; + current_idx %= m_topology->arrows.size(); - LOG_DEBUG(logger) << "Worker " << worker_id << ", " - << ((assignment == nullptr) ? "idle" : assignment->get_name()) - << ", " << to_string(result) << ") => Shutting down!" << LOG_END; - m_mutex.lock(); - if (assignment != nullptr) { - assignment->update_thread_count(-1); - } - m_mutex.unlock(); + if (candidate.status == ArrowStatus::Active && // This excludes Draining arrows + (candidate.arrow->is_parallel() || candidate.thread_count == 0)) { // This excludes non-parallel arrows that are already assigned to a worker + + m_topology_state.next_arrow_index = current_idx; // Next time, continue right where we left off + candidate.thread_count += 1; + return candidate.arrow; + + } + } while (current_idx != m_topology_state.next_arrow_index); + return nullptr; // We've looped through everything with no luck } @@ -112,72 +159,76 @@ void JScheduler::initialize_topology() { std::lock_guard lock(m_mutex); - assert(m_current_topology_status == TopologyStatus::Uninitialized); + assert(m_topology_state.current_topology_status == TopologyStatus::Uninitialized); for (JArrow* arrow : m_topology->arrows) { arrow->initialize(); } - m_current_topology_status = TopologyStatus::Paused; + m_topology_state.current_topology_status = TopologyStatus::Paused; } void JScheduler::drain_topology() { std::lock_guard lock(m_mutex); - if (m_current_topology_status == TopologyStatus::Finished) { - LOG_DEBUG(logger) << "JScheduler: drain(): Skipping because topology is already Finished" << LOG_END; + if (m_topology_state.current_topology_status == TopologyStatus::Finalized) { + LOG_DEBUG(logger) << "JScheduler: drain(): Skipping because topology is already Finalized" << LOG_END; return; } LOG_DEBUG(logger) << "JScheduler: drain_topology()" << LOG_END; - for (auto source : m_topology->sources) { - source->pause(); - m_current_topology_status = TopologyStatus::Draining; + // We pause (as opposed to finish) for two reasons: // 1. There might be workers in the middle of calling eventSource->GetEvent. // 2. drain() might be called from a signal handler. It isn't safe to make syscalls during signal handlers // due to risk of deadlock. (We technically shouldn't even do logging!) + // + for (size_t i=0; iget_type() == JArrow::NodeType::Source) { + pause_arrow_unprotected(i); + } } + m_topology_state.current_topology_status = TopologyStatus::Draining; } void JScheduler::run_topology(int nthreads) { std::lock_guard lock(m_mutex); - TopologyStatus current_status = m_current_topology_status; - if (current_status == TopologyStatus::Running || current_status == TopologyStatus::Finished) { + TopologyStatus current_status = m_topology_state.current_topology_status; + if (current_status == TopologyStatus::Running || current_status == TopologyStatus::Finalized) { LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => " << current_status << LOG_END; return; } LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => Running" << LOG_END; - for (auto arrow : m_topology->arrows) { - arrow->set_running_arrows(&running_arrow_count); - } - if (m_topology->sources.empty()) { throw JException("No event sources found!"); } - for (auto source : m_topology->sources) { + for (size_t i=0; irun(); + ArrowState& as = m_topology_state.arrow_states[i]; + if (as.arrow->get_type() == JArrow::NodeType::Source) { + run_arrow_unprotected(i); + } } // Note that we activate workers AFTER we activate the topology, so no actual processing will have happened // by this point when we start up the metrics. m_topology->metrics.reset(); m_topology->metrics.start(nthreads); - m_current_topology_status = TopologyStatus::Running; + m_topology_state.current_topology_status = TopologyStatus::Running; } void JScheduler::request_topology_pause() { std::lock_guard lock(m_mutex); // This sets all Running arrows to Paused, which prevents Workers from picking up any additional assignments // Once all Workers have completed their remaining assignments, the scheduler will call achieve_pause(). - TopologyStatus current_status = m_current_topology_status; + TopologyStatus current_status = m_topology_state.current_topology_status; if (current_status == TopologyStatus::Running) { LOG_DEBUG(logger) << "JScheduler: request_pause() : " << current_status << " => Pausing" << LOG_END; - for (auto arrow: m_topology->arrows) { - arrow->pause(); + for (size_t i=0; i " << current_status << LOG_END; @@ -193,11 +244,11 @@ void JScheduler::achieve_topology_pause() { void JScheduler::achieve_topology_pause_unprotected() { // This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to finish(), etc - TopologyStatus current_status = m_current_topology_status; + TopologyStatus current_status = m_topology_state.current_topology_status; if (current_status == TopologyStatus::Running || current_status == TopologyStatus::Pausing || current_status == TopologyStatus::Draining) { LOG_DEBUG(logger) << "JScheduler: achieve_topology_pause() : " << current_status << " => " << TopologyStatus::Paused << LOG_END; m_topology->metrics.stop(); - m_current_topology_status = TopologyStatus::Paused; + m_topology_state.current_topology_status = TopologyStatus::Paused; } else { LOG_DEBUG(logger) << "JScheduler: achieve_topology_pause() : " << current_status << " => " << current_status << LOG_END; @@ -207,28 +258,187 @@ void JScheduler::achieve_topology_pause_unprotected() { void JScheduler::finish_topology() { std::lock_guard lock(m_mutex); // This finalizes all arrows. Once this happens, we cannot restart the topology. - // assert(m_current_topology_status == TopologyStatus::Paused); - for (auto arrow : m_topology->arrows) { - arrow->finish(); + // assert(m_topology_state.current_topology_status == TopologyStatus::Inactive); + for (ArrowState& as : m_topology_state.arrow_states) { + + if (as.status != ArrowStatus::Finalized) { + as.arrow->finalize(); + } } - m_current_topology_status = TopologyStatus::Finished; + m_topology_state.current_topology_status = TopologyStatus::Finalized; } JScheduler::TopologyStatus JScheduler::get_topology_status() { std::lock_guard lock(m_mutex); - return m_current_topology_status; + return m_topology_state.current_topology_status; +} + +JScheduler::TopologyState JScheduler::get_topology_state() { + std::lock_guard lock(m_mutex); + return m_topology_state; } +void JScheduler::run_arrow_unprotected(size_t index) { + auto& as = m_topology_state.arrow_states[index]; + auto name = as.arrow->get_name(); + ArrowStatus status = as.status; + + // if (status == ArrowStatus::Unopened) { + // LOG_DEBUG(logger) << "Arrow '" << name << "' run(): Not initialized!" << LOG_END; + // throw rException("Arrow %s has not been initialized!", name.c_str()); + // } + // if (status == ArrowStatus::Active || m_status == ArrowStatus::Finalized) { + // LOG_DEBUG(logger) << "Arrow '" << name << "' run() : " << status << " => " << status << LOG_END; + // return; + // } + LOG_DEBUG(logger) << "Arrow '" << name << "' run() : " << status << " => Active" << LOG_END; + + m_topology_state.active_or_draining_arrow_count++; + for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) { + m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count++; + run_arrow_unprotected(downstream); // Activating something recursively activates everything downstream. + } + m_topology_state.arrow_states[index].status = ArrowStatus::Active; +} + +void JScheduler::pause_arrow_unprotected(size_t index) { + auto& as = m_topology_state.arrow_states[index]; + auto name = as.arrow->get_name(); + ArrowStatus status = as.status; + + if (status != ArrowStatus::Active) { + LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => " << status << LOG_END; + return; // pause() is a no-op unless running + } + LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => Inactive" << LOG_END; + m_topology_state.active_or_draining_arrow_count++; + for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) { + m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--; + } + m_topology_state.arrow_states[index].status = ArrowStatus::Inactive; +} + +void JScheduler::finish_arrow_unprotected(size_t index) { + auto& as = m_topology_state.arrow_states[index]; + auto name = as.arrow->get_name(); + ArrowStatus status = as.status; + + LOG_DEBUG(logger) << "JArrow '" << name << "' finish() : " << status << " => Finalized" << LOG_END; + ArrowStatus old_status = as.status; + // if (old_status == ArrowStatus::Unopened) { + // LOG_DEBUG(logger) << "JArrow '" << name << "': Uninitialized!" << LOG_END; + // throw JException("JArrow::finish(): Arrow %s has not been initialized!", name.c_str()); + // } + if (old_status == ArrowStatus::Active) { + m_topology_state.active_or_draining_arrow_count--; + for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) { + m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--; + } + } + if (old_status != ArrowStatus::Finalized) { + LOG_TRACE(logger) << "JArrow '" << name << "': Finalizing (this must only happen once)" << LOG_END; + as.arrow->finalize(); + } + m_topology_state.arrow_states[index].status = ArrowStatus::Finalized; +} + std::ostream& operator<<(std::ostream& os, JScheduler::TopologyStatus status) { switch(status) { case JScheduler::TopologyStatus::Uninitialized: os << "Uninitialized"; break; case JScheduler::TopologyStatus::Running: os << "Running"; break; case JScheduler::TopologyStatus::Pausing: os << "Pausing"; break; - case JScheduler::TopologyStatus::Paused: os << "Paused"; break; - case JScheduler::TopologyStatus::Finished: os << "Finished"; break; case JScheduler::TopologyStatus::Draining: os << "Draining"; break; + case JScheduler::TopologyStatus::Paused: os << "Paused"; break; + case JScheduler::TopologyStatus::Finalized: os << "Finalized"; break; + } + return os; +} + + +std::ostream& operator<<(std::ostream& os, JScheduler::ArrowStatus status) { + switch (status) { + case JScheduler::ArrowStatus::Uninitialized: os << "Uninitialized"; break; + case JScheduler::ArrowStatus::Active: os << "Active"; break; + case JScheduler::ArrowStatus::Draining: os << "Draining"; break; + case JScheduler::ArrowStatus::Inactive: os << "Inactive"; break; + case JScheduler::ArrowStatus::Finalized: os << "Finalized"; break; } return os; } + + +void JScheduler::summarize_arrows(std::vector& summaries) { + + // Make sure we have exactly one ArrowSummary for each arrow + if (summaries.size() != m_topology_state.arrow_states.size()) { + summaries = std::vector(m_topology_state.arrow_states.size()); + } + + std::lock_guard lock(m_mutex); + + // Copy over arrow status variables + for (size_t i=0; iget_name(); + summary.arrow_type = as.arrow->get_type(); + summary.is_parallel = as.arrow->is_parallel(); + summary.chunksize = as.arrow->get_chunksize(); + summary.messages_pending = as.arrow->get_pending(); + summary.threshold = as.arrow->get_threshold(); + + summary.thread_count = as.thread_count; + summary.running_upstreams = as.active_or_draining_upstream_arrow_count; + + JArrowMetrics::Status last_status; + size_t total_message_count; + size_t last_message_count; + size_t total_queue_visits; + size_t last_queue_visits; + JArrowMetrics::duration_t total_latency; + JArrowMetrics::duration_t last_latency; + JArrowMetrics::duration_t total_queue_latency; + JArrowMetrics::duration_t last_queue_latency; + + as.arrow->get_metrics().get( + last_status, + total_message_count, + last_message_count, + total_queue_visits, + last_queue_visits, + total_latency, + last_latency, + total_queue_latency, + last_queue_latency + ); + + using millisecs = std::chrono::duration; + auto total_latency_ms = millisecs(total_latency).count(); + auto total_queue_latency_ms = millisecs(total_queue_latency).count(); + + summary.total_messages_completed = total_message_count; + summary.last_messages_completed = last_message_count; + summary.queue_visit_count = total_queue_visits; + + summary.avg_queue_latency_ms = (total_queue_visits == 0) + ? std::numeric_limits::infinity() + : total_queue_latency_ms / total_queue_visits; + + summary.avg_queue_overhead_frac = total_queue_latency_ms / (total_queue_latency_ms + total_latency_ms); + + summary.avg_latency_ms = (total_message_count == 0) + ? std::numeric_limits::infinity() + : total_latency_ms/total_message_count; + + summary.last_latency_ms = (last_message_count == 0) + ? std::numeric_limits::infinity() + : millisecs(last_latency).count()/last_message_count; + + } + + +} + diff --git a/src/libraries/JANA/Engine/JScheduler.h b/src/libraries/JANA/Engine/JScheduler.h index 5bbf286be..8cfc85e6e 100644 --- a/src/libraries/JANA/Engine/JScheduler.h +++ b/src/libraries/JANA/Engine/JScheduler.h @@ -6,8 +6,10 @@ #define _JSCHEDULER_H_ #include +#include #include +#include #include @@ -17,21 +19,46 @@ struct JArrowTopology; /// not unlike OpenMP's `schedule dynamic`. class JScheduler { public: - enum class TopologyStatus { Uninitialized, Paused, Running, Pausing, Draining, Finished }; + enum class TopologyStatus { + Uninitialized, // Arrows have not been initialized() yet + Running, // At least one arrow is active + Pausing, // At least one arrow is active, but a pause has been requested + Draining, // At least one arrow is active, but a drain has been requested + Paused, // No arrows are active, but may be activated or re-activated + Finalized // No arrows are active, and all arrows have been finalized(), so they many not be re-activated + }; + + enum class ArrowStatus { Uninitialized, // Arrow has not been initialized() yet + Active, // Arrow may be scheduled + Draining, // All upstreams are inactive and queues are emptied, but arrow still has at least one worker out. Should not be scheduled. + Inactive, // Arrow should not be scheduled, but may be re-activated + Finalized // Arrow should not be scheduled, and has been finalized(), so it may not be re-activated + }; + + struct ArrowState { + JArrow* arrow = nullptr; + ArrowStatus status = ArrowStatus::Uninitialized; + int64_t thread_count = 0; // Current number of threads assigned to this arrow + int64_t active_or_draining_upstream_arrow_count = 0; // Current number of active or draining arrows immediately upstream + std::vector downstream_arrow_indices; + }; + + struct TopologyState { + std::vector arrow_states; + TopologyStatus current_topology_status = TopologyStatus::Uninitialized; + int64_t active_or_draining_arrow_count = 0; // Detects when the topology has paused + size_t next_arrow_index; + }; private: // This mutex controls ALL scheduler state std::mutex m_mutex; std::shared_ptr m_topology; - size_t m_next_idx; - // Topology-level state - TopologyStatus m_current_topology_status = TopologyStatus::Uninitialized; - int64_t running_arrow_count {0}; // Detects when the topology has paused - //int64_t running_worker_count = 0; // Detects when the workers have all joined // TODO: Why is this commented out? - - // Arrow-level state + // Protected state + TopologyState m_topology_state; + public: @@ -55,25 +82,30 @@ class JScheduler { JLogger logger; - // JArrowProcessingController-facing operations - void initialize_topology(); void drain_topology(); void run_topology(int nthreads); void request_topology_pause(); void achieve_topology_pause(); void finish_topology(); - TopologyStatus get_topology_status(); - // Arrow-facing operations + TopologyStatus get_topology_status(); + TopologyState get_topology_state(); + void summarize_arrows(std::vector& summaries); private: void achieve_topology_pause_unprotected(); + void run_arrow_unprotected(size_t index); + void pause_arrow_unprotected(size_t index); + void finish_arrow_unprotected(size_t index); + void checkin_unprotected(JArrow* arrow, JArrowMetrics::Status last_result); + JArrow* checkout_unprotected(); }; std::ostream& operator<<(std::ostream& os, JScheduler::TopologyStatus status); +std::ostream& operator<<(std::ostream& os, JScheduler::ArrowStatus status); #endif // _JSCHEDULER_H_ diff --git a/src/programs/tests/SinkArrow.h b/src/programs/tests/SinkArrow.h index f5ede3367..ee49c5e41 100644 --- a/src/programs/tests/SinkArrow.h +++ b/src/programs/tests/SinkArrow.h @@ -41,10 +41,6 @@ class SinkArrow : public JArrow { }; void execute(JArrowMetrics& result, size_t /* location_id */) override { - if (get_status() == Status::Finished) { - result.update_finished(); - return; - } if (!_is_initialized) { _sink.initialize(); _chunk_buffer.reserve(get_chunksize()); @@ -71,12 +67,6 @@ class SinkArrow : public JArrow { auto overhead = (stop_time - start_time) - latency; JArrowMetrics::Status status; - // TODO: NOT HERE. Finalize the sink in on_status_change(). - // if (in_status == JMailbox::Status::Finished) { - // set_status(JActivable::Status::Finished); - // _sink.finalize(); - // status = JArrowMetrics::Status::Finished; - // } if (in_status == JMailbox::Status::Empty) { status = JArrowMetrics::Status::ComeBackLater; } diff --git a/src/programs/tests/SourceArrow.h b/src/programs/tests/SourceArrow.h index 3c643168f..763a88293 100644 --- a/src/programs/tests/SourceArrow.h +++ b/src/programs/tests/SourceArrow.h @@ -48,10 +48,6 @@ class SourceArrow : public JArrow { } void execute(JArrowMetrics& result, size_t /* location_id */) override { - if (get_status() == Status::Finished) { - result.update_finished(); - return; - } auto start_time = std::chrono::steady_clock::now(); auto in_status = _source.inprocess(_chunk_buffer, get_chunksize()); @@ -65,7 +61,6 @@ class SourceArrow : public JArrow { JArrowMetrics::Status status; if (in_status == Source::Status::Finished) { - finish(); // This will call finalize() which will call _source.finalize() status = JArrowMetrics::Status::Finished; } else if (in_status == Source::Status::KeepGoing && out_status == JMailbox::Status::Ready) { diff --git a/src/programs/tests/TopologyTests.cc b/src/programs/tests/TopologyTests.cc index 28ebd8524..8af35dc49 100644 --- a/src/programs/tests/TopologyTests.cc +++ b/src/programs/tests/TopologyTests.cc @@ -257,7 +257,7 @@ TEST_CASE("JTopology: Basic functionality") { app.Run(true); // TODO: Access the JScheduler managed by JApplication - // REQUIRE(scheduler.get_topology_status() == JScheduler::TopologyStatus::Finished); + // REQUIRE(scheduler.get_topology_status() == JScheduler::TopologyStatus::Finalized); REQUIRE(sum_everything->get_status() == JArrow::Status::Finished); REQUIRE(sink.sum == 20 * 13); From d1f9517d111fa65ced0bee00b11e35340b355535 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 27 Oct 2023 22:19:28 -0400 Subject: [PATCH 4/5] Test cases compile again --- .../JANA/Engine/JArrowProcessingController.h | 3 + src/programs/tests/ArrowActivationTests.cc | 74 +++++++++---------- src/programs/tests/SchedulerTests.cc | 12 +-- src/programs/tests/TopologyTests.cc | 67 +++++++++-------- 4 files changed, 80 insertions(+), 76 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.h b/src/libraries/JANA/Engine/JArrowProcessingController.h index 95484025c..f502ba29a 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.h +++ b/src/libraries/JANA/Engine/JArrowProcessingController.h @@ -42,6 +42,9 @@ class JArrowProcessingController : public JProcessingController { void print_report() override; void print_final_report() override; + // This is so we can test + inline JScheduler* get_scheduler() { return m_scheduler; } + private: diff --git a/src/programs/tests/ArrowActivationTests.cc b/src/programs/tests/ArrowActivationTests.cc index a38984ee7..0fe2b2dd3 100644 --- a/src/programs/tests/ArrowActivationTests.cc +++ b/src/programs/tests/ArrowActivationTests.cc @@ -11,18 +11,9 @@ JArrowMetrics::Status steppe(JArrow* arrow) { - if (arrow->get_type() != JArrow::NodeType::Source && - arrow->get_status() == JArrow::Status::Running && - arrow->get_pending() == 0 && - arrow->get_running_upstreams() == 0) { - - arrow->finish(); - return JArrowMetrics::Status::Finished; - } - JArrowMetrics metrics; - arrow->execute(metrics, 0); - return metrics.get_last_status(); + arrow->execute(metrics, 0); + return metrics.get_last_status(); } TEST_CASE("ArrowActivationTests") { @@ -33,7 +24,6 @@ TEST_CASE("ArrowActivationTests") { SumSink sink; auto topology = std::make_shared(); - JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -57,6 +47,7 @@ TEST_CASE("ArrowActivationTests") { topology->sinks.push_back(sum_everything); emit_rand_ints->set_chunksize(1); + JScheduler scheduler(topology); auto logger = JLogger(JLogger::Level::OFF); topology->m_logger = logger; @@ -69,10 +60,12 @@ TEST_CASE("ArrowActivationTests") { REQUIRE(q2->size() == 0); REQUIRE(q3->size() == 0); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Unopened); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Unopened); - REQUIRE(subtract_one->get_status() == JArrow::Status::Unopened); - REQUIRE(sum_everything->get_status() == JArrow::Status::Unopened); + JScheduler::TopologyState state = scheduler.get_topology_state(); + + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized); REQUIRE(emit_rand_ints->get_pending() == 0); REQUIRE(multiply_by_two->get_pending() == 0); @@ -82,18 +75,21 @@ TEST_CASE("ArrowActivationTests") { SECTION("As a message propagates, arrows and queues downstream automatically activate") { - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Unopened); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Unopened); - REQUIRE(subtract_one->get_status() == JArrow::Status::Unopened); - REQUIRE(sum_everything->get_status() == JArrow::Status::Unopened); + JScheduler::TopologyState state = scheduler.get_topology_state(); + + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized); scheduler.run_topology(1); + state = scheduler.get_topology_state(); // TODO: Check that initialize has been called, but not finalize - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Running); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); - REQUIRE(subtract_one->get_status() == JArrow::Status::Running); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); } } // TEST_CASE @@ -109,7 +105,6 @@ TEST_CASE("ActivableDeactivationTests") { SumSink sink; auto topology = std::make_shared(); - JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -135,24 +130,29 @@ TEST_CASE("ActivableDeactivationTests") { topology->m_logger = logger; source.logger = logger; - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Unopened); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Unopened); - REQUIRE(subtract_one->get_status() == JArrow::Status::Unopened); - REQUIRE(sum_everything->get_status() == JArrow::Status::Unopened); + JScheduler scheduler(topology); + JScheduler::TopologyState state = scheduler.get_topology_state(); + + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); scheduler.run_topology(1); + state = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Running); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); - REQUIRE(subtract_one->get_status() == JArrow::Status::Running); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); steppe(emit_rand_ints); + state = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Finished); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); - REQUIRE(subtract_one->get_status() == JArrow::Status::Running); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); // TODO: Test that finalize was called exactly once } // TEST_CASE diff --git a/src/programs/tests/SchedulerTests.cc b/src/programs/tests/SchedulerTests.cc index 3b76322a5..85e8bd34b 100644 --- a/src/programs/tests/SchedulerTests.cc +++ b/src/programs/tests/SchedulerTests.cc @@ -17,7 +17,6 @@ TEST_CASE("SchedulerTests") { SumSink sink; auto topology = std::make_shared(); - JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -40,6 +39,8 @@ TEST_CASE("SchedulerTests") { topology->sinks.push_back(sum_everything); emit_rand_ints->set_chunksize(1); + + JScheduler scheduler(topology); scheduler.run_topology(1); JArrow* assignment; @@ -62,10 +63,11 @@ TEST_CASE("SchedulerTests") { } } while (assignment != nullptr); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Finished); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Paused); - REQUIRE(subtract_one->get_status() == JArrow::Status::Paused); - REQUIRE(sum_everything->get_status() == JArrow::Status::Paused); + JScheduler::TopologyState state = scheduler.get_topology_state(); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Inactive); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Inactive); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Inactive); } SECTION("When run sequentially, topology finished => RRS returns nullptr") { diff --git a/src/programs/tests/TopologyTests.cc b/src/programs/tests/TopologyTests.cc index 8af35dc49..cbe26e2eb 100644 --- a/src/programs/tests/TopologyTests.cc +++ b/src/programs/tests/TopologyTests.cc @@ -4,6 +4,7 @@ #include "catch.hpp" #include "JANA/Engine/JTopologyBuilder.h" +#include #include #include @@ -13,15 +14,6 @@ JArrowMetrics::Status step(JArrow* arrow) { - if (arrow->get_type() != JArrow::NodeType::Source && - arrow->get_status() == JArrow::Status::Running && - arrow->get_pending() == 0 && - arrow->get_running_upstreams() == 0) { - - arrow->finish(); - return JArrowMetrics::Status::Finished; - } - JArrowMetrics metrics; arrow->execute(metrics, 0); auto status = metrics.get_last_status(); @@ -42,7 +34,6 @@ TEST_CASE("JTopology: Basic functionality") { SumSink sink; auto topology = std::make_shared(); - JScheduler scheduler(topology); auto q1 = new JMailbox(); auto q2 = new JMailbox(); @@ -69,14 +60,15 @@ TEST_CASE("JTopology: Basic functionality") { auto logger = JLogger(JLogger::Level::TRACE); topology->m_logger = logger; source.logger = logger; + JScheduler scheduler(topology); SECTION("Before anything runs...") { // All queues are empty, none are finished REQUIRE(multiply_by_two->get_pending() == 0); - REQUIRE(subtract_one->get_pending() == 0); REQUIRE(sum_everything->get_pending() == 0); + REQUIRE(subtract_one->get_pending() == 0); } SECTION("When nothing is in the input queue...") { @@ -201,47 +193,52 @@ TEST_CASE("JTopology: Basic functionality") { source.logger = logger; scheduler.run_topology(1); + auto ts = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Running); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); - REQUIRE(subtract_one->get_status() == JArrow::Status::Running); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active); for (int i = 0; i < 20; ++i) { step(emit_rand_ints); } + ts = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Finished); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Running); - REQUIRE(subtract_one->get_status() == JArrow::Status::Running); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active); for (int i = 0; i < 20; ++i) { step(multiply_by_two); } + ts = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Finished); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Finished); - REQUIRE(subtract_one->get_status() == JArrow::Status::Running); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active); for (int i = 0; i < 20; ++i) { step(subtract_one); } + ts = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Finished); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Finished); - REQUIRE(subtract_one->get_status() == JArrow::Status::Finished); - REQUIRE(sum_everything->get_status() == JArrow::Status::Running); + REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active); for (int i = 0; i < 20; ++i) { step(sum_everything); } + ts = scheduler.get_topology_state(); - REQUIRE(emit_rand_ints->get_status() == JArrow::Status::Finished); - REQUIRE(multiply_by_two->get_status() == JArrow::Status::Finished); - REQUIRE(subtract_one->get_status() == JArrow::Status::Finished); - REQUIRE(sum_everything->get_status() == JArrow::Status::Finished); + REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[2].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Finalized); log_status(*topology); @@ -255,10 +252,12 @@ TEST_CASE("JTopology: Basic functionality") { REQUIRE(sink.sum == 0); app.Run(true); + auto scheduler = app.GetService()->get_scheduler(); + - // TODO: Access the JScheduler managed by JApplication - // REQUIRE(scheduler.get_topology_status() == JScheduler::TopologyStatus::Finalized); - REQUIRE(sum_everything->get_status() == JArrow::Status::Finished); + auto ts = scheduler->get_topology_state(); + REQUIRE(ts.current_topology_status == JScheduler::TopologyStatus::Finalized); + REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); REQUIRE(sink.sum == 20 * 13); } From 317596652b680c2a61d1d6a920a85b3ef044b831 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sat, 28 Oct 2023 18:50:22 -0400 Subject: [PATCH 5/5] Test cases pass again --- src/libraries/JANA/Engine/JScheduler.cc | 11 ++++------- src/programs/tests/ArrowActivationTests.cc | 23 ++++++++++++---------- src/programs/tests/SchedulerTests.cc | 12 ++++------- src/programs/tests/TopologyTests.cc | 7 ++++++- 4 files changed, 27 insertions(+), 26 deletions(-) diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index 13ed2810f..b0b503c07 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -106,7 +106,9 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l if (found_inactive_source || found_inactive_stage_or_sink) { // Deactivate arrow - as.status = ArrowStatus::Inactive; + // Because we are deactivating it from Active state, we know the topology has not been paused. Hence we can finalize immediately. + assignment->finalize(); + as.status = ArrowStatus::Finalized; m_topology_state.active_or_draining_arrow_count--; LOG_DEBUG(logger) << "Deactivated arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END; @@ -121,11 +123,6 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l LOG_DEBUG(logger) << "Draining arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END; } - if (found_inactive_source) { - // Sources are finalized immediately, unlike other arrows. This is to facilitate closing resources like sockets and file handles - assignment->finalize(); - } - // Test if this was the last arrow running if (m_topology_state.active_or_draining_arrow_count == 0) { LOG_DEBUG(logger) << "All arrows are inactive. Deactivating topology." << LOG_END; @@ -312,7 +309,7 @@ void JScheduler::pause_arrow_unprotected(size_t index) { return; // pause() is a no-op unless running } LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => Inactive" << LOG_END; - m_topology_state.active_or_draining_arrow_count++; + m_topology_state.active_or_draining_arrow_count--; for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) { m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--; } diff --git a/src/programs/tests/ArrowActivationTests.cc b/src/programs/tests/ArrowActivationTests.cc index 0fe2b2dd3..70a6aa584 100644 --- a/src/programs/tests/ArrowActivationTests.cc +++ b/src/programs/tests/ArrowActivationTests.cc @@ -131,28 +131,31 @@ TEST_CASE("ActivableDeactivationTests") { source.logger = logger; JScheduler scheduler(topology); + scheduler.logger = logger; JScheduler::TopologyState state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized); scheduler.run_topology(1); state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); + + auto result = steppe(emit_rand_ints); - steppe(emit_rand_ints); + scheduler.next_assignment(0, emit_rand_ints, result); state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); // TODO: Test that finalize was called exactly once } // TEST_CASE diff --git a/src/programs/tests/SchedulerTests.cc b/src/programs/tests/SchedulerTests.cc index 85e8bd34b..03abff2eb 100644 --- a/src/programs/tests/SchedulerTests.cc +++ b/src/programs/tests/SchedulerTests.cc @@ -41,6 +41,7 @@ TEST_CASE("SchedulerTests") { emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); + // scheduler.logger = JLogger(JLogger::Level::DEBUG); scheduler.run_topology(1); JArrow* assignment; @@ -49,9 +50,6 @@ TEST_CASE("SchedulerTests") { SECTION("When run sequentially, RRS returns nullptr => topology finished") { - auto logger = JLogger(JLogger::Level::OFF); - - last_result = JArrowMetrics::Status::ComeBackLater; assignment = nullptr; do { @@ -65,15 +63,13 @@ TEST_CASE("SchedulerTests") { JScheduler::TopologyState state = scheduler.get_topology_state(); REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); - REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Inactive); - REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Inactive); - REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Inactive); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Finalized); } SECTION("When run sequentially, topology finished => RRS returns nullptr") { - auto logger = JLogger(JLogger::Level::OFF); - JScheduler scheduler(topology); last_result = JArrowMetrics::Status::ComeBackLater; assignment = nullptr; diff --git a/src/programs/tests/TopologyTests.cc b/src/programs/tests/TopologyTests.cc index cbe26e2eb..654750448 100644 --- a/src/programs/tests/TopologyTests.cc +++ b/src/programs/tests/TopologyTests.cc @@ -194,6 +194,7 @@ TEST_CASE("JTopology: Basic functionality") { scheduler.run_topology(1); auto ts = scheduler.get_topology_state(); + JArrowMetrics::Status status; REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Active); REQUIRE(ts.arrow_states[1].status == JScheduler::ArrowStatus::Active); @@ -201,8 +202,9 @@ TEST_CASE("JTopology: Basic functionality") { REQUIRE(ts.arrow_states[3].status == JScheduler::ArrowStatus::Active); for (int i = 0; i < 20; ++i) { - step(emit_rand_ints); + status = step(emit_rand_ints); } + scheduler.next_assignment(0, emit_rand_ints, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); @@ -213,6 +215,7 @@ TEST_CASE("JTopology: Basic functionality") { for (int i = 0; i < 20; ++i) { step(multiply_by_two); } + scheduler.next_assignment(0, multiply_by_two, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); @@ -223,6 +226,7 @@ TEST_CASE("JTopology: Basic functionality") { for (int i = 0; i < 20; ++i) { step(subtract_one); } + scheduler.next_assignment(0, subtract_one, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); @@ -233,6 +237,7 @@ TEST_CASE("JTopology: Basic functionality") { for (int i = 0; i < 20; ++i) { step(sum_everything); } + scheduler.next_assignment(0, sum_everything, status); ts = scheduler.get_topology_state(); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized);