Skip to content

Commit

Permalink
Merge pull request #171 from JeffersonLab/nbrei_issue_150
Browse files Browse the repository at this point in the history
Fix deadlock when calling JApplication::Quit
  • Loading branch information
faustus123 authored Nov 15, 2022
2 parents 30c8fc7 + d82cefa commit d39fc3e
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 135 deletions.
76 changes: 37 additions & 39 deletions src/libraries/JANA/Engine/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "JArrowMetrics.h"
#include <JANA/JLogger.h>
#include <JANA/JException.h>

class JArrow {

Expand All @@ -36,12 +37,11 @@ class JArrow {
duration_t m_checkin_time = std::chrono::milliseconds(500);
unsigned m_backoff_tries = 4;

mutable std::mutex m_mutex; // Protects access to arrow properties.

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties, except m_status
std::atomic<Status> m_status {Status::Unopened};

// Scheduler stats
// These are protected by the Topology mutex, NOT the Arrow mutex!!!
Status m_status = Status::Unopened;
int64_t m_thread_count = 0; // Current number of threads assigned to this arrow
std::atomic_int64_t m_running_upstreams {0}; // Current number of running arrows immediately upstream
std::atomic_int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses
Expand All @@ -68,62 +68,62 @@ class JArrow {
}

void set_chunksize(size_t chunksize) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_chunksize = chunksize;
}

size_t get_chunksize() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_chunksize;
}

void set_backoff_tries(unsigned backoff_tries) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_backoff_tries = backoff_tries;
}

unsigned get_backoff_tries() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_backoff_tries;
}

BackoffStrategy get_backoff_strategy() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_backoff_strategy;
}

void set_backoff_strategy(BackoffStrategy backoff_strategy) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_backoff_strategy = backoff_strategy;
}

duration_t get_initial_backoff_time() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_initial_backoff_time;
}

void set_initial_backoff_time(const duration_t& initial_backoff_time) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_initial_backoff_time = initial_backoff_time;
}

const duration_t& get_checkin_time() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_checkin_time;
}

void set_checkin_time(const duration_t& checkin_time) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_checkin_time = checkin_time;
}

void update_thread_count(int thread_count_delta) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_thread_count += thread_count_delta;
}

size_t get_thread_count() {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_thread_count;
}

Expand All @@ -144,7 +144,7 @@ class JArrow {

virtual ~JArrow() = default;

virtual void initialize() {};
virtual void initialize() { };

virtual void execute(JArrowMetrics& result, size_t location_id) = 0;

Expand All @@ -160,7 +160,6 @@ class JArrow {


Status get_status() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_status;
}

Expand All @@ -169,37 +168,36 @@ class JArrow {
}

void set_running_arrows(std::atomic_int64_t* running_arrows_ptr) {
std::lock_guard<std::mutex> lock(m_mutex);
m_running_arrows = running_arrows_ptr;
}

void run() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status == Status::Running || m_status == Status::Finished) {
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << m_status << " => " << m_status << LOG_END;
return;
}
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << m_status << " => Running" << LOG_END;
Status old_status = m_status;
Status status = m_status;

// if (status == Status::Unopened) {
// LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run(): Not initialized!" << LOG_END;
// throw JException("Arrow %s has not been initialized!", m_name.c_str());
// }
// if (status == Status::Running || m_status == Status::Finished) {
// LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END;
// return;
// }
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => Running" << LOG_END;
if (m_running_arrows != nullptr) (*m_running_arrows)++;
for (auto listener: m_listeners) {
listener->m_running_upstreams++;
listener->run(); // Activating something recursively activates everything downstream.
}
if (old_status == Status::Unopened) {
LOG_TRACE(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once)" << LOG_END;
initialize();
}
m_status = Status::Running;
}

void pause() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Running) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << m_status << " => " << m_status << LOG_END;
Status status = m_status;
if (status != Status::Running) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => " << status << LOG_END;
return; // pause() is a no-op unless running
}
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << m_status << " => Paused" << LOG_END;
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => Paused" << LOG_END;
if (m_running_arrows != nullptr) (*m_running_arrows)--;
for (auto listener: m_listeners) {
listener->m_running_upstreams--;
Expand All @@ -212,13 +210,13 @@ class JArrow {
}

void finish() {
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << m_status << " => Finished" << LOG_END;
Status status = m_status;
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END;
Status old_status = m_status;
if (old_status == Status::Unopened) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once) (called from finish(), surprisingly)" << LOG_END;
initialize();
}
// if (old_status == Status::Unopened) {
// LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END;
// throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str());
// }
if (old_status == Status::Running) {
if (m_running_arrows != nullptr) (*m_running_arrows)--;
for (auto listener: m_listeners) {
Expand Down
7 changes: 5 additions & 2 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void JArrowProcessingController::initialize() {
m_scheduler = new JScheduler(m_topology);
m_scheduler->logger = m_scheduler_logger;
LOG_INFO(m_logger) << m_topology->mapping << LOG_END;

m_topology->initialize();

}

void JArrowProcessingController::run(size_t nthreads) {
Expand Down Expand Up @@ -131,12 +134,12 @@ void JArrowProcessingController::wait_until_stopped() {
}

bool JArrowProcessingController::is_stopped() {
std::lock_guard<std::mutex> lock(m_topology->m_mutex);
// TODO: Protect topology current status
return m_topology->m_current_status == JArrowTopology::Status::Paused;
}

bool JArrowProcessingController::is_finished() {
std::lock_guard<std::mutex> lock(m_topology->m_mutex);
// TODO: Protect topology current status
return m_topology->m_current_status == JArrowTopology::Status::Finished;
}

Expand Down
70 changes: 38 additions & 32 deletions src/libraries/JANA/Engine/JArrowTopology.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,30 @@ JArrowTopology::~JArrowTopology() {
}
}

std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) {
switch(status) {
case JArrowTopology::Status::Uninitialized: os << "Uninitialized"; break;
case JArrowTopology::Status::Running: os << "Running"; break;
case JArrowTopology::Status::Pausing: os << "Pausing"; break;
case JArrowTopology::Status::Paused: os << "Paused"; break;
case JArrowTopology::Status::Finished: os << "Finished"; break;
case JArrowTopology::Status::Draining: os << "Draining"; break;
}
return os;
}


/// This needs to be called _before_ launching the worker threads. After this point, everything is initialized.
/// No initialization happens afterwards.
void JArrowTopology::initialize() {
assert(m_current_status == Status::Uninitialized);
for (JArrow* arrow : arrows) {
arrow->initialize();
}
m_current_status = Status::Paused;
}

void JArrowTopology::drain() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_current_status == Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: drain(): Skipping because topology is already Finished" << LOG_END;
return;
Expand All @@ -47,25 +69,14 @@ void JArrowTopology::drain() {
}
}

std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) {
switch(status) {
case JArrowTopology::Status::Running: os << "Running"; break;
case JArrowTopology::Status::Pausing: os << "Pausing"; break;
case JArrowTopology::Status::Paused: os << "Paused"; break;
case JArrowTopology::Status::Finished: os << "Finished"; break;
case JArrowTopology::Status::Draining: os << "Draining"; break;
}
return os;
}

void JArrowTopology::run(int nthreads) {

std::lock_guard<std::mutex> lock(m_mutex);
if (m_current_status == Status::Running || m_current_status == Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << m_current_status << " => " << m_current_status << LOG_END;
Status current_status = m_current_status;
if (current_status == Status::Running || current_status == Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => " << current_status << LOG_END;
return;
}
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << m_current_status << " => Running" << LOG_END;
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => Running" << LOG_END;

if (sources.empty()) {
throw JException("No event sources found!");
Expand All @@ -83,44 +94,39 @@ void JArrowTopology::run(int nthreads) {
}

void JArrowTopology::request_pause() {
std::lock_guard<std::mutex> lock(m_mutex);
// This sets all Running arrows to Paused, which prevents Workers from picking up any additional assignments
// Once all Workers have completed their remaining assignments, the scheduler will notify us via achieve_pause().
if (m_current_status == Status::Running) {
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << m_current_status << " => Pausing" << LOG_END;
Status current_status = m_current_status;
if (current_status == Status::Running) {
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => Pausing" << LOG_END;
for (auto arrow: arrows) {
arrow->pause();
// If arrow is not running, pause() is a no-op
}
m_current_status = Status::Pausing;
}
else {
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << m_current_status << " => " << m_current_status << LOG_END;
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => " << current_status << LOG_END;
}
}

void JArrowTopology::achieve_pause() {
// This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to stop(), etc
std::lock_guard<std::mutex> lock(m_mutex);
if (m_current_status == Status::Running || m_current_status == Status::Pausing || m_current_status == Status::Draining) {
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << m_current_status << " => " << Status::Paused << LOG_END;
// This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to finish(), etc
Status current_status = m_current_status;
if (current_status == Status::Running || current_status == Status::Pausing || current_status == Status::Draining) {
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << Status::Paused << LOG_END;
metrics.stop();
m_current_status = Status::Paused;
}
else {
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << m_current_status << " => " << m_current_status << LOG_END;
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << current_status << LOG_END;
}
}

/// Finish is called by a single thread once the worker threads have all joined.
void JArrowTopology::finish() {
std::lock_guard<std::mutex> lock(m_mutex);
// This finalizes all arrows. Once this happens, we cannot restart the topology.
if (m_current_status == JArrowTopology::Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << m_current_status << " => Finished" << LOG_END;
return;
}
LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << m_current_status << " => Finished" << LOG_END;
assert(m_current_status == Status::Paused);
// assert(m_current_status == Status::Paused);
for (auto arrow : arrows) {
arrow->finish();
}
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/Engine/JArrowTopology.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


struct JArrowTopology {
enum class Status { Paused, Running, Pausing, Draining, Finished };
enum class Status { Uninitialized, Paused, Running, Pausing, Draining, Finished };

using Event = std::shared_ptr<JEvent>;
using EventQueue = JMailbox<Event>;
Expand All @@ -38,8 +38,7 @@ struct JArrowTopology {
std::vector<EventQueue*> queues; // Queues shared between arrows
JProcessorMapping mapping;

std::mutex m_mutex; // Protects m_current_status
Status m_current_status = Status::Paused;
std::atomic<Status> m_current_status {Status::Uninitialized};
std::atomic_int64_t running_arrow_count {0}; // Detects when the topology has paused
// int64_t running_worker_count = 0; // Detects when the workers have all joined

Expand All @@ -56,6 +55,7 @@ struct JArrowTopology {

JLogger m_logger;

void initialize();
void drain();
void run(int nthreads);
void request_pause();
Expand Down
8 changes: 4 additions & 4 deletions src/libraries/JANA/Engine/JEventProcessorArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ void JEventProcessorArrow::execute(JArrowMetrics& result, size_t location_id) {

void JEventProcessorArrow::initialize() {

LOG_DEBUG(m_logger) << "JEventProcessorArrow: Initializing arrow '" << get_name() << "'" << LOG_END;
LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_processors) {
LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetType() << "'" << LOG_END;
processor->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetType() << "'" << LOG_END;
}
}

void JEventProcessorArrow::finalize() {
LOG_DEBUG(m_logger) << "JEventProcessorArrow: Finalizing arrow '" << get_name() << "'" << LOG_END;
LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_processors) {
LOG_INFO(m_logger) << "Finalizing JEventProcessor '" << processor->GetType() << "'" << LOG_END;
processor->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetType() << "'" << LOG_END;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/libraries/JANA/Engine/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) {
}

void JEventSourceArrow::initialize() {
LOG_INFO(m_logger) << "Initializing JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
m_source->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
}

void JEventSourceArrow::finalize() {
LOG_INFO(m_logger) << "Finalizing JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
m_source->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
}
Loading

0 comments on commit d39fc3e

Please sign in to comment.