From 2cdb8b5b8044530fb24de9e355d6c33712d3f780 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sun, 13 Nov 2022 08:57:28 -0500 Subject: [PATCH 01/10] Make topology status enum atomic again --- .../JANA/Engine/JArrowProcessingController.cc | 4 +-- src/libraries/JANA/Engine/JArrowTopology.cc | 33 +++++++++---------- src/libraries/JANA/Engine/JArrowTopology.h | 3 +- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index 87d3e6e08..0f8f60f87 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -131,12 +131,12 @@ void JArrowProcessingController::wait_until_stopped() { } bool JArrowProcessingController::is_stopped() { - std::lock_guard lock(m_topology->m_mutex); + // TODO: Protect topology current status return m_topology->m_current_status == JArrowTopology::Status::Paused; } bool JArrowProcessingController::is_finished() { - std::lock_guard lock(m_topology->m_mutex); + // TODO: Protect topology current status return m_topology->m_current_status == JArrowTopology::Status::Finished; } diff --git a/src/libraries/JANA/Engine/JArrowTopology.cc b/src/libraries/JANA/Engine/JArrowTopology.cc index 48f11cc3a..3f41b657b 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.cc +++ b/src/libraries/JANA/Engine/JArrowTopology.cc @@ -22,7 +22,6 @@ JArrowTopology::~JArrowTopology() { } void JArrowTopology::drain() { - std::lock_guard lock(m_mutex); if (m_current_status == Status::Finished) { LOG_DEBUG(m_logger) << "JArrowTopology: drain(): Skipping because topology is already Finished" << LOG_END; return; @@ -60,12 +59,12 @@ std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) { void JArrowTopology::run(int nthreads) { - std::lock_guard lock(m_mutex); - if (m_current_status == Status::Running || m_current_status == Status::Finished) { - LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << m_current_status << " => " << m_current_status << LOG_END; + Status current_status = m_current_status; + if (current_status == Status::Running || current_status == Status::Finished) { + LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => " << current_status << LOG_END; return; } - LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << m_current_status << " => Running" << LOG_END; + LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => Running" << LOG_END; if (sources.empty()) { throw JException("No event sources found!"); @@ -83,11 +82,11 @@ void JArrowTopology::run(int nthreads) { } void JArrowTopology::request_pause() { - std::lock_guard lock(m_mutex); // This sets all Running arrows to Paused, which prevents Workers from picking up any additional assignments // Once all Workers have completed their remaining assignments, the scheduler will notify us via achieve_pause(). - if (m_current_status == Status::Running) { - LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << m_current_status << " => Pausing" << LOG_END; + Status current_status = m_current_status; + if (current_status == Status::Running) { + LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => Pausing" << LOG_END; for (auto arrow: arrows) { arrow->pause(); // If arrow is not running, pause() is a no-op @@ -95,31 +94,31 @@ void JArrowTopology::request_pause() { m_current_status = Status::Pausing; } else { - LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << m_current_status << " => " << m_current_status << LOG_END; + LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => " << current_status << LOG_END; } } void JArrowTopology::achieve_pause() { // This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to stop(), etc - std::lock_guard lock(m_mutex); - if (m_current_status == Status::Running || m_current_status == Status::Pausing || m_current_status == Status::Draining) { - LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << m_current_status << " => " << Status::Paused << LOG_END; + Status current_status = m_current_status; + if (current_status == Status::Running || current_status == Status::Pausing || current_status == Status::Draining) { + LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << Status::Paused << LOG_END; metrics.stop(); m_current_status = Status::Paused; } else { - LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << m_current_status << " => " << m_current_status << LOG_END; + LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << current_status << LOG_END; } } void JArrowTopology::finish() { - std::lock_guard lock(m_mutex); // This finalizes all arrows. Once this happens, we cannot restart the topology. - if (m_current_status == JArrowTopology::Status::Finished) { - LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << m_current_status << " => Finished" << LOG_END; + Status current_status = m_current_status; + if (current_status == JArrowTopology::Status::Finished) { + LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << current_status << " => Finished" << LOG_END; return; } - LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << m_current_status << " => Finished" << LOG_END; + LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << current_status << " => Finished" << LOG_END; assert(m_current_status == Status::Paused); for (auto arrow : arrows) { arrow->finish(); diff --git a/src/libraries/JANA/Engine/JArrowTopology.h b/src/libraries/JANA/Engine/JArrowTopology.h index 180ea9985..7fd4252f5 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.h +++ b/src/libraries/JANA/Engine/JArrowTopology.h @@ -38,8 +38,7 @@ struct JArrowTopology { std::vector queues; // Queues shared between arrows JProcessorMapping mapping; - std::mutex m_mutex; // Protects m_current_status - Status m_current_status = Status::Paused; + std::atomic m_current_status {Status::Paused}; std::atomic_int64_t running_arrow_count {0}; // Detects when the topology has paused // int64_t running_worker_count = 0; // Detects when the workers have all joined From 63edcb03ccd1cf232187ffc5f3b3908adfe91292 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sun, 13 Nov 2022 23:02:53 -0500 Subject: [PATCH 02/10] Make arrow status enum atomic again --- src/libraries/JANA/Engine/JArrow.h | 52 ++++++++++++++---------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Engine/JArrow.h index 116402380..e7e0aea26 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Engine/JArrow.h @@ -36,12 +36,11 @@ class JArrow { duration_t m_checkin_time = std::chrono::milliseconds(500); unsigned m_backoff_tries = 4; - mutable std::mutex m_mutex; // Protects access to arrow properties. - + mutable std::mutex m_arrow_mutex; // Protects access to arrow properties, except m_status + std::atomic m_status {Status::Unopened}; // Scheduler stats // These are protected by the Topology mutex, NOT the Arrow mutex!!! - Status m_status = Status::Unopened; int64_t m_thread_count = 0; // Current number of threads assigned to this arrow std::atomic_int64_t m_running_upstreams {0}; // Current number of running arrows immediately upstream std::atomic_int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses @@ -68,62 +67,62 @@ class JArrow { } void set_chunksize(size_t chunksize) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); m_chunksize = chunksize; } size_t get_chunksize() const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); return m_chunksize; } void set_backoff_tries(unsigned backoff_tries) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); m_backoff_tries = backoff_tries; } unsigned get_backoff_tries() const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); return m_backoff_tries; } BackoffStrategy get_backoff_strategy() const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); return m_backoff_strategy; } void set_backoff_strategy(BackoffStrategy backoff_strategy) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); m_backoff_strategy = backoff_strategy; } duration_t get_initial_backoff_time() const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); return m_initial_backoff_time; } void set_initial_backoff_time(const duration_t& initial_backoff_time) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); m_initial_backoff_time = initial_backoff_time; } const duration_t& get_checkin_time() const { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); return m_checkin_time; } void set_checkin_time(const duration_t& checkin_time) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); m_checkin_time = checkin_time; } void update_thread_count(int thread_count_delta) { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); m_thread_count += thread_count_delta; } size_t get_thread_count() { - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_arrow_mutex); return m_thread_count; } @@ -160,7 +159,6 @@ class JArrow { Status get_status() const { - std::lock_guard lock(m_mutex); return m_status; } @@ -169,24 +167,22 @@ class JArrow { } void set_running_arrows(std::atomic_int64_t* running_arrows_ptr) { - std::lock_guard lock(m_mutex); m_running_arrows = running_arrows_ptr; } void run() { - std::lock_guard lock(m_mutex); + Status status = m_status; if (m_status == Status::Running || m_status == Status::Finished) { - LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << m_status << " => " << m_status << LOG_END; + LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END; return; } - LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << m_status << " => Running" << LOG_END; - Status old_status = m_status; + LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => Running" << LOG_END; if (m_running_arrows != nullptr) (*m_running_arrows)++; for (auto listener: m_listeners) { listener->m_running_upstreams++; listener->run(); // Activating something recursively activates everything downstream. } - if (old_status == Status::Unopened) { + if (status == Status::Unopened) { LOG_TRACE(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once)" << LOG_END; initialize(); } @@ -194,12 +190,12 @@ class JArrow { } void pause() { - std::lock_guard lock(m_mutex); - if (m_status != Status::Running) { - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << m_status << " => " << m_status << LOG_END; + Status status = m_status; + if (status != Status::Running) { + LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => " << status << LOG_END; return; // pause() is a no-op unless running } - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << m_status << " => Paused" << LOG_END; + LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => Paused" << LOG_END; if (m_running_arrows != nullptr) (*m_running_arrows)--; for (auto listener: m_listeners) { listener->m_running_upstreams--; @@ -212,8 +208,8 @@ class JArrow { } void finish() { - std::lock_guard lock(m_mutex); - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << m_status << " => Finished" << LOG_END; + Status status = m_status; + LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END; Status old_status = m_status; if (old_status == Status::Unopened) { LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once) (called from finish(), surprisingly)" << LOG_END; From f334dc6686724df164f1a14b44e8400ed256a214 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 14 Nov 2022 18:14:55 -0500 Subject: [PATCH 03/10] Move topology init() and finish() out of multithreaded region --- src/libraries/JANA/Engine/JArrow.h | 24 +++++----- .../JANA/Engine/JArrowProcessingController.cc | 3 ++ src/libraries/JANA/Engine/JArrowTopology.cc | 45 +++++++++++-------- src/libraries/JANA/Engine/JArrowTopology.h | 5 ++- 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Engine/JArrow.h index e7e0aea26..8e79b7e33 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Engine/JArrow.h @@ -13,6 +13,7 @@ #include "JArrowMetrics.h" #include +#include class JArrow { @@ -143,7 +144,7 @@ class JArrow { virtual ~JArrow() = default; - virtual void initialize() {}; + virtual void initialize() { }; virtual void execute(JArrowMetrics& result, size_t location_id) = 0; @@ -172,20 +173,21 @@ class JArrow { void run() { Status status = m_status; - if (m_status == Status::Running || m_status == Status::Finished) { - LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END; - return; - } + + // if (status == Status::Unopened) { + // LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run(): Not initialized!" << LOG_END; + // throw JException("Arrow %s has not been initialized!", m_name.c_str()); + // } + // if (status == Status::Running || m_status == Status::Finished) { + // LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END; + // return; + // } LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => Running" << LOG_END; if (m_running_arrows != nullptr) (*m_running_arrows)++; for (auto listener: m_listeners) { listener->m_running_upstreams++; listener->run(); // Activating something recursively activates everything downstream. } - if (status == Status::Unopened) { - LOG_TRACE(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once)" << LOG_END; - initialize(); - } m_status = Status::Running; } @@ -212,8 +214,8 @@ class JArrow { LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END; Status old_status = m_status; if (old_status == Status::Unopened) { - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once) (called from finish(), surprisingly)" << LOG_END; - initialize(); + LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END; + throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str()); } if (old_status == Status::Running) { if (m_running_arrows != nullptr) (*m_running_arrows)--; diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index 0f8f60f87..8fc9ad54b 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -30,6 +30,9 @@ void JArrowProcessingController::initialize() { m_scheduler = new JScheduler(m_topology); m_scheduler->logger = m_scheduler_logger; LOG_INFO(m_logger) << m_topology->mapping << LOG_END; + + m_topology->initialize(); + } void JArrowProcessingController::run(size_t nthreads) { diff --git a/src/libraries/JANA/Engine/JArrowTopology.cc b/src/libraries/JANA/Engine/JArrowTopology.cc index 3f41b657b..0b8b1db64 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.cc +++ b/src/libraries/JANA/Engine/JArrowTopology.cc @@ -21,6 +21,29 @@ JArrowTopology::~JArrowTopology() { } } +std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) { + switch(status) { + case JArrowTopology::Status::Uninitialized: os << "Uninitialized"; break; + case JArrowTopology::Status::Running: os << "Running"; break; + case JArrowTopology::Status::Pausing: os << "Pausing"; break; + case JArrowTopology::Status::Paused: os << "Paused"; break; + case JArrowTopology::Status::Finished: os << "Finished"; break; + case JArrowTopology::Status::Draining: os << "Draining"; break; + } + return os; +} + + +/// This needs to be called _before_ launching the worker threads. After this point, everything is initialized. +/// No initialization happens afterwards. +void JArrowTopology::initialize() { + assert(m_current_status == Status::Uninitialized); + for (JArrow* arrow : arrows) { + arrow->initialize(); + } + m_current_status = Status::Paused; +} + void JArrowTopology::drain() { if (m_current_status == Status::Finished) { LOG_DEBUG(m_logger) << "JArrowTopology: drain(): Skipping because topology is already Finished" << LOG_END; @@ -46,17 +69,6 @@ void JArrowTopology::drain() { } } -std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) { - switch(status) { - case JArrowTopology::Status::Running: os << "Running"; break; - case JArrowTopology::Status::Pausing: os << "Pausing"; break; - case JArrowTopology::Status::Paused: os << "Paused"; break; - case JArrowTopology::Status::Finished: os << "Finished"; break; - case JArrowTopology::Status::Draining: os << "Draining"; break; - } - return os; -} - void JArrowTopology::run(int nthreads) { Status current_status = m_current_status; @@ -99,7 +111,7 @@ void JArrowTopology::request_pause() { } void JArrowTopology::achieve_pause() { - // This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to stop(), etc + // This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to finish(), etc Status current_status = m_current_status; if (current_status == Status::Running || current_status == Status::Pausing || current_status == Status::Draining) { LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << Status::Paused << LOG_END; @@ -111,15 +123,10 @@ void JArrowTopology::achieve_pause() { } } +/// Finish is called by a single thread once the worker threads have all joined. void JArrowTopology::finish() { // This finalizes all arrows. Once this happens, we cannot restart the topology. - Status current_status = m_current_status; - if (current_status == JArrowTopology::Status::Finished) { - LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << current_status << " => Finished" << LOG_END; - return; - } - LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << current_status << " => Finished" << LOG_END; - assert(m_current_status == Status::Paused); + // assert(m_current_status == Status::Paused); for (auto arrow : arrows) { arrow->finish(); } diff --git a/src/libraries/JANA/Engine/JArrowTopology.h b/src/libraries/JANA/Engine/JArrowTopology.h index 7fd4252f5..a4094d3d0 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.h +++ b/src/libraries/JANA/Engine/JArrowTopology.h @@ -17,7 +17,7 @@ struct JArrowTopology { - enum class Status { Paused, Running, Pausing, Draining, Finished }; + enum class Status { Uninitialized, Paused, Running, Pausing, Draining, Finished }; using Event = std::shared_ptr; using EventQueue = JMailbox; @@ -38,7 +38,7 @@ struct JArrowTopology { std::vector queues; // Queues shared between arrows JProcessorMapping mapping; - std::atomic m_current_status {Status::Paused}; + std::atomic m_current_status {Status::Uninitialized}; std::atomic_int64_t running_arrow_count {0}; // Detects when the topology has paused // int64_t running_worker_count = 0; // Detects when the workers have all joined @@ -55,6 +55,7 @@ struct JArrowTopology { JLogger m_logger; + void initialize(); void drain(); void run(int nthreads); void request_pause(); From 754312f7a646f17b67c2347b6df2f01bc7cc6d3d Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 14 Nov 2022 18:21:39 -0500 Subject: [PATCH 04/10] Remove try/catch block --- src/libraries/JANA/JApplication.cc | 77 ++++++++++++++---------------- 1 file changed, 35 insertions(+), 42 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index 320162f82..62563d0b0 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -97,58 +97,51 @@ void JApplication::Initialize() { /// This is called by the Run method so users will usually not /// need to call this directly. - try { - // Only run this once - if (m_initialized) return; + // Only run this once + if (m_initialized) return; - // Attach all plugins - m_plugin_loader->attach_plugins(m_component_manager.get()); + // Attach all plugins + m_plugin_loader->attach_plugins(m_component_manager.get()); - // Look for factories to auto-activate - if (JAutoActivator::IsRequested(m_params)) { - m_component_manager->add(new JAutoActivator); - } + // Look for factories to auto-activate + if (JAutoActivator::IsRequested(m_params)) { + m_component_manager->add(new JAutoActivator); + } - // Set desired nthreads. We parse the 'nthreads' parameter two different ways for backwards compatibility. - m_desired_nthreads = 1; - m_params->SetDefaultParameter("nthreads", m_desired_nthreads, "Desired number of worker threads, or 'Ncores' to use all available cores."); - if (m_params->GetParameterValue("nthreads") == "Ncores") { - m_desired_nthreads = JCpuInfo::GetNumCpus(); - } + // Set desired nthreads. We parse the 'nthreads' parameter two different ways for backwards compatibility. + m_desired_nthreads = 1; + m_params->SetDefaultParameter("nthreads", m_desired_nthreads, "Desired number of worker threads, or 'Ncores' to use all available cores."); + if (m_params->GetParameterValue("nthreads") == "Ncores") { + m_desired_nthreads = JCpuInfo::GetNumCpus(); + } - m_params->SetDefaultParameter("jana:extended_report", m_extended_report, "Controls whether the ticker shows simple vs detailed performance metrics"); + m_params->SetDefaultParameter("jana:extended_report", m_extended_report, "Controls whether the ticker shows simple vs detailed performance metrics"); - m_component_manager->initialize(); - m_component_manager->resolve_event_sources(); + m_component_manager->initialize(); + m_component_manager->resolve_event_sources(); - int engine_choice = 0; - m_params->SetDefaultParameter("jana:engine", engine_choice, - "0: Use arrow engine, 1: Use debug engine")->SetIsAdvanced(true); + int engine_choice = 0; + m_params->SetDefaultParameter("jana:engine", engine_choice, + "0: Use arrow engine, 1: Use debug engine")->SetIsAdvanced(true); - if (engine_choice == 0) { - std::shared_ptr topology_builder = m_service_locator.get(); - auto topology = topology_builder->get_or_create(); - - auto japc = std::make_shared(topology); - m_service_locator.provide(japc); // Make concrete class available via SL - m_processing_controller = m_service_locator.get(); // Get deps from SL - m_service_locator.provide(m_processing_controller); // Make abstract class available via SL - } - else { - auto jdpc = std::make_shared(m_component_manager.get()); - m_service_locator.provide(jdpc); // Make the concrete class available via SL - m_processing_controller = m_service_locator.get(); // Get deps from SL - m_service_locator.provide(m_processing_controller); // Make abstract class available via SL - } + if (engine_choice == 0) { + std::shared_ptr topology_builder = m_service_locator.get(); + auto topology = topology_builder->get_or_create(); - m_processing_controller->initialize(); - m_initialized = true; + auto japc = std::make_shared(topology); + m_service_locator.provide(japc); // Make concrete class available via SL + m_processing_controller = m_service_locator.get(); // Get deps from SL + m_service_locator.provide(m_processing_controller); // Make abstract class available via SL } - catch (JException& e) { - LOG_FATAL(m_logger) << e << LOG_END; - // TODO: This belongs in JMain. We want someone embedding JANA to be able to catch these - exit((int) ExitCode::UnhandledException); + else { + auto jdpc = std::make_shared(m_component_manager.get()); + m_service_locator.provide(jdpc); // Make the concrete class available via SL + m_processing_controller = m_service_locator.get(); // Get deps from SL + m_service_locator.provide(m_processing_controller); // Make abstract class available via SL } + + m_processing_controller->initialize(); + m_initialized = true; } void JApplication::Run(bool wait_until_finished) { From e0ed6cc12a0d9683d200ebacdab8c7660f4c70d8 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 14 Nov 2022 20:10:24 -0500 Subject: [PATCH 05/10] Add test case for issue #150 --- src/libraries/JANA/Engine/JArrow.h | 8 ++++---- src/programs/tests/TerminationTests.cc | 15 +++++++++++++++ src/programs/tests/TerminationTests.h | 7 +++++++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Engine/JArrow.h index 8e79b7e33..18815c8b6 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Engine/JArrow.h @@ -213,10 +213,10 @@ class JArrow { Status status = m_status; LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END; Status old_status = m_status; - if (old_status == Status::Unopened) { - LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END; - throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str()); - } + // if (old_status == Status::Unopened) { + // LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END; + // throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str()); + // } if (old_status == Status::Running) { if (m_running_arrows != nullptr) (*m_running_arrows)--; for (auto listener: m_listeners) { diff --git a/src/programs/tests/TerminationTests.cc b/src/programs/tests/TerminationTests.cc index e64ac95a7..65e006048 100644 --- a/src/programs/tests/TerminationTests.cc +++ b/src/programs/tests/TerminationTests.cc @@ -47,6 +47,21 @@ TEST_CASE("TerminationTests") { app.Quit(); // prevent destructor from triggering finish() before REQUIRE statements } + SECTION("Arrow engine, interrupted during JEventSource::Open()") { + + app.SetParameterValue("jana:engine", 0); + auto source = new InterruptedSource("InterruptedSource", &app); + app.Add(source); + app.Run(true); + REQUIRE(processor->processed_count == 0); + REQUIRE(processor->finish_call_count == 1); + // The "interrupt" sets the topology status flag to draining. Open() runs fully even with the interrupt. + // The topology run() exits early (because draining) and finish is called on the event processors. + + REQUIRE(app.GetNEventsProcessed() == source->GetEventCount()); + app.Quit(); // prevent destructor from triggering finish() before REQUIRE statements + } + SECTION("Debug engine, self-termination") { app.SetParameterValue("jana:engine", 1); diff --git a/src/programs/tests/TerminationTests.h b/src/programs/tests/TerminationTests.h index 184afbc01..cf0d62cf9 100644 --- a/src/programs/tests/TerminationTests.h +++ b/src/programs/tests/TerminationTests.h @@ -11,6 +11,13 @@ #include "catch.hpp" +struct InterruptedSource : public JEventSource { + InterruptedSource(std::string source_name, JApplication* app) : JEventSource(source_name, app) {} + static std::string GetDescription() { return "ComponentTests Fake Event Source"; } + std::string GetType(void) const override { return JTypeInfo::demangle(); } + void Open() override { GetApplication()->Quit(); } + void GetEvent(std::shared_ptr) override {} +}; struct BoundedSource : public JEventSource { From af9a15df125565a238ad18c1565cff724f44e058 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 14 Nov 2022 22:26:49 -0500 Subject: [PATCH 06/10] Make log messages less confusing --- src/libraries/JANA/Engine/JEventProcessorArrow.cc | 8 ++++---- src/libraries/JANA/Engine/JEventSourceArrow.cc | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/libraries/JANA/Engine/JEventProcessorArrow.cc b/src/libraries/JANA/Engine/JEventProcessorArrow.cc index ae72978ea..13652c907 100644 --- a/src/libraries/JANA/Engine/JEventProcessorArrow.cc +++ b/src/libraries/JANA/Engine/JEventProcessorArrow.cc @@ -77,18 +77,18 @@ void JEventProcessorArrow::execute(JArrowMetrics& result, size_t location_id) { void JEventProcessorArrow::initialize() { - LOG_DEBUG(m_logger) << "JEventProcessorArrow: Initializing arrow '" << get_name() << "'" << LOG_END; + LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; for (auto processor : m_processors) { - LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetType() << "'" << LOG_END; processor->DoInitialize(); + LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetType() << "'" << LOG_END; } } void JEventProcessorArrow::finalize() { - LOG_DEBUG(m_logger) << "JEventProcessorArrow: Finalizing arrow '" << get_name() << "'" << LOG_END; + LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END; for (auto processor : m_processors) { - LOG_INFO(m_logger) << "Finalizing JEventProcessor '" << processor->GetType() << "'" << LOG_END; processor->DoFinalize(); + LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetType() << "'" << LOG_END; } } diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.cc b/src/libraries/JANA/Engine/JEventSourceArrow.cc index 4cfaf941d..0ef885915 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.cc +++ b/src/libraries/JANA/Engine/JEventSourceArrow.cc @@ -108,11 +108,11 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { } void JEventSourceArrow::initialize() { - LOG_INFO(m_logger) << "Initializing JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END; m_source->DoInitialize(); + LOG_INFO(m_logger) << "Initialized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END; } void JEventSourceArrow::finalize() { - LOG_INFO(m_logger) << "Finalizing JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END; m_source->DoFinalize(); + LOG_INFO(m_logger) << "Finalized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END; } From 26064b72caaa6c212bac8b7a136fee3ef38a5e97 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 14 Nov 2022 22:31:28 -0500 Subject: [PATCH 07/10] JApplication::Quit actually quits --- src/libraries/JANA/JApplication.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index 62563d0b0..e7afbe41b 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -244,9 +244,10 @@ void JApplication::Stop(bool wait_until_idle) { void JApplication::Quit(bool skip_join) { m_skip_join = skip_join; m_quitting = true; - if (m_processing_controller != nullptr) { - Stop(!skip_join); + if (!skip_join && m_processing_controller != nullptr) { + Stop(true); } + exit(m_exit_code); } void JApplication::SetExitCode(int exit_code) { From 0527d9bb49a1d9cedd44304f9594c5bd958912c9 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 14 Nov 2022 23:29:35 -0500 Subject: [PATCH 08/10] Change behavior of JApp::Stop and JApp::Quit during initialization --- src/libraries/JANA/JApplication.cc | 36 ++++++++++++++++++++++++------ src/libraries/JANA/JApplication.h | 2 +- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index e7afbe41b..674917249 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -235,18 +235,40 @@ void JApplication::Scale(int nthreads) { } void JApplication::Stop(bool wait_until_idle) { - m_processing_controller->request_stop(); - if (wait_until_idle) { - m_processing_controller->wait_until_stopped(); + if (!m_initialized) { + // People might call Stop() during Initialize() rather than Run(). + // For instance, during JEventProcessor::Init, or via Ctrl-C. + // If this is the case, we finish with initialization and then cancel the Run(). + // We don't wait on because we don't want to Finalize() anything + // we haven't Initialize()d yet. + m_quitting = true; + } + else { + // Once we've called Initialize(), we can Finish() all of our components + // whenever we like + m_processing_controller->request_stop(); + if (wait_until_idle) { + m_processing_controller->wait_until_stopped(); + } + } } void JApplication::Quit(bool skip_join) { - m_skip_join = skip_join; - m_quitting = true; - if (!skip_join && m_processing_controller != nullptr) { - Stop(true); + + if (m_initialized) { + m_skip_join = skip_join; + m_quitting = true; + if (!skip_join && m_processing_controller != nullptr) { + Stop(true); + } } + + // People might call Quit() during Initialize() rather than Run(). + // For instance, during JEventProcessor::Init, or via Ctrl-C. + // If this is the case, we exit immediately rather than make the user + // wait on a long Initialize() if no data has been generated yet. + exit(m_exit_code); } diff --git a/src/libraries/JANA/JApplication.h b/src/libraries/JANA/JApplication.h index 3652a0565..46bf26a4c 100644 --- a/src/libraries/JANA/JApplication.h +++ b/src/libraries/JANA/JApplication.h @@ -139,7 +139,7 @@ class JApplication { bool m_quitting = false; bool m_draining_queues = false; bool m_skip_join = false; - bool m_initialized = false; + std::atomic_bool m_initialized {false}; bool m_ticker_on = true; bool m_timeout_on = true; bool m_extended_report = false; From 9949b3719e0bf58fd9f1185c32a1d22811612398 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 15 Nov 2022 00:47:45 -0500 Subject: [PATCH 09/10] Remove app->Quit() from tests As this now actually quits --- src/programs/tests/ScaleTests.cc | 2 -- src/programs/tests/TerminationTests.cc | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/programs/tests/ScaleTests.cc b/src/programs/tests/ScaleTests.cc index 3ef480d95..dcec8f052 100644 --- a/src/programs/tests/ScaleTests.cc +++ b/src/programs/tests/ScaleTests.cc @@ -60,7 +60,6 @@ TEST_CASE("ScaleNWorkerUpdate") { threads = perf_summary->thread_count; REQUIRE(threads == 8); - app.Quit(); } TEST_CASE("ScaleThroughputImprovement", "[.][performance]") { @@ -93,7 +92,6 @@ TEST_CASE("ScaleThroughputImprovement", "[.][performance]") { auto throughput_hz_4 = japc->measure_internal_performance()->latest_throughput_hz; japc->print_report(); std::cout << "nthreads=4: throughput_hz=" << throughput_hz_4 << std::endl; - app.Quit(); REQUIRE(throughput_hz_2 > throughput_hz_1*1.5); REQUIRE(throughput_hz_4 > throughput_hz_2*1.25); } diff --git a/src/programs/tests/TerminationTests.cc b/src/programs/tests/TerminationTests.cc index 65e006048..c27e44ce9 100644 --- a/src/programs/tests/TerminationTests.cc +++ b/src/programs/tests/TerminationTests.cc @@ -44,7 +44,6 @@ TEST_CASE("TerminationTests") { REQUIRE(processor->processed_count == 10); REQUIRE(processor->finish_call_count == 1); REQUIRE(app.GetNEventsProcessed() == source->event_count); - app.Quit(); // prevent destructor from triggering finish() before REQUIRE statements } SECTION("Arrow engine, interrupted during JEventSource::Open()") { @@ -59,7 +58,6 @@ TEST_CASE("TerminationTests") { // The topology run() exits early (because draining) and finish is called on the event processors. REQUIRE(app.GetNEventsProcessed() == source->GetEventCount()); - app.Quit(); // prevent destructor from triggering finish() before REQUIRE statements } SECTION("Debug engine, self-termination") { From d82cefa0144e66d711d439b7cc4d32f3730e7582 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 15 Nov 2022 01:01:55 -0500 Subject: [PATCH 10/10] Test cases all pass again --- src/libraries/JANA/JApplication.cc | 2 +- src/programs/tests/TerminationTests.cc | 6 +++--- src/programs/tests/TerminationTests.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index 674917249..69607d489 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -269,7 +269,7 @@ void JApplication::Quit(bool skip_join) { // If this is the case, we exit immediately rather than make the user // wait on a long Initialize() if no data has been generated yet. - exit(m_exit_code); + _exit(m_exit_code); } void JApplication::SetExitCode(int exit_code) { diff --git a/src/programs/tests/TerminationTests.cc b/src/programs/tests/TerminationTests.cc index c27e44ce9..6b2ce69ad 100644 --- a/src/programs/tests/TerminationTests.cc +++ b/src/programs/tests/TerminationTests.cc @@ -53,9 +53,9 @@ TEST_CASE("TerminationTests") { app.Add(source); app.Run(true); REQUIRE(processor->processed_count == 0); - REQUIRE(processor->finish_call_count == 1); - // The "interrupt" sets the topology status flag to draining. Open() runs fully even with the interrupt. - // The topology run() exits early (because draining) and finish is called on the event processors. + REQUIRE(processor->finish_call_count == 0); + // Stop() tells JApplication to finish Initialize() but not to proceed with Run(). + // If we had called Quit() instead, it would have exited Initialize() immediately and ended the program. REQUIRE(app.GetNEventsProcessed() == source->GetEventCount()); } diff --git a/src/programs/tests/TerminationTests.h b/src/programs/tests/TerminationTests.h index cf0d62cf9..f6d1199dd 100644 --- a/src/programs/tests/TerminationTests.h +++ b/src/programs/tests/TerminationTests.h @@ -15,7 +15,7 @@ struct InterruptedSource : public JEventSource { InterruptedSource(std::string source_name, JApplication* app) : JEventSource(source_name, app) {} static std::string GetDescription() { return "ComponentTests Fake Event Source"; } std::string GetType(void) const override { return JTypeInfo::demangle(); } - void Open() override { GetApplication()->Quit(); } + void Open() override { GetApplication()->Stop(); } void GetEvent(std::shared_ptr) override {} };