Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock when calling JApplication::Quit #171

Merged
merged 10 commits into from
Nov 15, 2022
76 changes: 37 additions & 39 deletions src/libraries/JANA/Engine/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "JArrowMetrics.h"
#include <JANA/JLogger.h>
#include <JANA/JException.h>

class JArrow {

Expand All @@ -36,12 +37,11 @@ class JArrow {
duration_t m_checkin_time = std::chrono::milliseconds(500);
unsigned m_backoff_tries = 4;

mutable std::mutex m_mutex; // Protects access to arrow properties.

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties, except m_status
std::atomic<Status> m_status {Status::Unopened};

// Scheduler stats
// These are protected by the Topology mutex, NOT the Arrow mutex!!!
Status m_status = Status::Unopened;
int64_t m_thread_count = 0; // Current number of threads assigned to this arrow
std::atomic_int64_t m_running_upstreams {0}; // Current number of running arrows immediately upstream
std::atomic_int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses
Expand All @@ -68,62 +68,62 @@ class JArrow {
}

void set_chunksize(size_t chunksize) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_chunksize = chunksize;
}

size_t get_chunksize() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_chunksize;
}

void set_backoff_tries(unsigned backoff_tries) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_backoff_tries = backoff_tries;
}

unsigned get_backoff_tries() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_backoff_tries;
}

BackoffStrategy get_backoff_strategy() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_backoff_strategy;
}

void set_backoff_strategy(BackoffStrategy backoff_strategy) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_backoff_strategy = backoff_strategy;
}

duration_t get_initial_backoff_time() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_initial_backoff_time;
}

void set_initial_backoff_time(const duration_t& initial_backoff_time) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_initial_backoff_time = initial_backoff_time;
}

const duration_t& get_checkin_time() const {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_checkin_time;
}

void set_checkin_time(const duration_t& checkin_time) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_checkin_time = checkin_time;
}

void update_thread_count(int thread_count_delta) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_thread_count += thread_count_delta;
}

size_t get_thread_count() {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_thread_count;
}

Expand All @@ -144,7 +144,7 @@ class JArrow {

virtual ~JArrow() = default;

virtual void initialize() {};
virtual void initialize() { };

virtual void execute(JArrowMetrics& result, size_t location_id) = 0;

Expand All @@ -160,7 +160,6 @@ class JArrow {


Status get_status() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_status;
}

Expand All @@ -169,37 +168,36 @@ class JArrow {
}

void set_running_arrows(std::atomic_int64_t* running_arrows_ptr) {
std::lock_guard<std::mutex> lock(m_mutex);
m_running_arrows = running_arrows_ptr;
}

void run() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status == Status::Running || m_status == Status::Finished) {
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << m_status << " => " << m_status << LOG_END;
return;
}
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << m_status << " => Running" << LOG_END;
Status old_status = m_status;
Status status = m_status;

// if (status == Status::Unopened) {
// LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run(): Not initialized!" << LOG_END;
// throw JException("Arrow %s has not been initialized!", m_name.c_str());
// }
// if (status == Status::Running || m_status == Status::Finished) {
// LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END;
// return;
// }
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => Running" << LOG_END;
if (m_running_arrows != nullptr) (*m_running_arrows)++;
for (auto listener: m_listeners) {
listener->m_running_upstreams++;
listener->run(); // Activating something recursively activates everything downstream.
}
if (old_status == Status::Unopened) {
LOG_TRACE(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once)" << LOG_END;
initialize();
}
m_status = Status::Running;
}

void pause() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Running) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << m_status << " => " << m_status << LOG_END;
Status status = m_status;
if (status != Status::Running) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => " << status << LOG_END;
return; // pause() is a no-op unless running
}
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << m_status << " => Paused" << LOG_END;
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => Paused" << LOG_END;
if (m_running_arrows != nullptr) (*m_running_arrows)--;
for (auto listener: m_listeners) {
listener->m_running_upstreams--;
Expand All @@ -212,13 +210,13 @@ class JArrow {
}

void finish() {
std::lock_guard<std::mutex> lock(m_mutex);
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << m_status << " => Finished" << LOG_END;
Status status = m_status;
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END;
Status old_status = m_status;
if (old_status == Status::Unopened) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Initializing (this must only happen once) (called from finish(), surprisingly)" << LOG_END;
initialize();
}
// if (old_status == Status::Unopened) {
// LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END;
// throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str());
// }
if (old_status == Status::Running) {
if (m_running_arrows != nullptr) (*m_running_arrows)--;
for (auto listener: m_listeners) {
Expand Down
7 changes: 5 additions & 2 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ void JArrowProcessingController::initialize() {
m_scheduler = new JScheduler(m_topology);
m_scheduler->logger = m_scheduler_logger;
LOG_INFO(m_logger) << m_topology->mapping << LOG_END;

m_topology->initialize();

}

void JArrowProcessingController::run(size_t nthreads) {
Expand Down Expand Up @@ -131,12 +134,12 @@ void JArrowProcessingController::wait_until_stopped() {
}

bool JArrowProcessingController::is_stopped() {
std::lock_guard<std::mutex> lock(m_topology->m_mutex);
// TODO: Protect topology current status
return m_topology->m_current_status == JArrowTopology::Status::Paused;
}

bool JArrowProcessingController::is_finished() {
std::lock_guard<std::mutex> lock(m_topology->m_mutex);
// TODO: Protect topology current status
return m_topology->m_current_status == JArrowTopology::Status::Finished;
}

Expand Down
70 changes: 38 additions & 32 deletions src/libraries/JANA/Engine/JArrowTopology.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,30 @@ JArrowTopology::~JArrowTopology() {
}
}

std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) {
switch(status) {
case JArrowTopology::Status::Uninitialized: os << "Uninitialized"; break;
case JArrowTopology::Status::Running: os << "Running"; break;
case JArrowTopology::Status::Pausing: os << "Pausing"; break;
case JArrowTopology::Status::Paused: os << "Paused"; break;
case JArrowTopology::Status::Finished: os << "Finished"; break;
case JArrowTopology::Status::Draining: os << "Draining"; break;
}
return os;
}


/// This needs to be called _before_ launching the worker threads. After this point, everything is initialized.
/// No initialization happens afterwards.
void JArrowTopology::initialize() {
assert(m_current_status == Status::Uninitialized);
for (JArrow* arrow : arrows) {
arrow->initialize();
}
m_current_status = Status::Paused;
}

void JArrowTopology::drain() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_current_status == Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: drain(): Skipping because topology is already Finished" << LOG_END;
return;
Expand All @@ -47,25 +69,14 @@ void JArrowTopology::drain() {
}
}

std::ostream& operator<<(std::ostream& os, JArrowTopology::Status status) {
switch(status) {
case JArrowTopology::Status::Running: os << "Running"; break;
case JArrowTopology::Status::Pausing: os << "Pausing"; break;
case JArrowTopology::Status::Paused: os << "Paused"; break;
case JArrowTopology::Status::Finished: os << "Finished"; break;
case JArrowTopology::Status::Draining: os << "Draining"; break;
}
return os;
}

void JArrowTopology::run(int nthreads) {

std::lock_guard<std::mutex> lock(m_mutex);
if (m_current_status == Status::Running || m_current_status == Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << m_current_status << " => " << m_current_status << LOG_END;
Status current_status = m_current_status;
if (current_status == Status::Running || current_status == Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => " << current_status << LOG_END;
return;
}
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << m_current_status << " => Running" << LOG_END;
LOG_DEBUG(m_logger) << "JArrowTopology: run() : " << current_status << " => Running" << LOG_END;

if (sources.empty()) {
throw JException("No event sources found!");
Expand All @@ -83,44 +94,39 @@ void JArrowTopology::run(int nthreads) {
}

void JArrowTopology::request_pause() {
std::lock_guard<std::mutex> lock(m_mutex);
// This sets all Running arrows to Paused, which prevents Workers from picking up any additional assignments
// Once all Workers have completed their remaining assignments, the scheduler will notify us via achieve_pause().
if (m_current_status == Status::Running) {
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << m_current_status << " => Pausing" << LOG_END;
Status current_status = m_current_status;
if (current_status == Status::Running) {
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => Pausing" << LOG_END;
for (auto arrow: arrows) {
arrow->pause();
// If arrow is not running, pause() is a no-op
}
m_current_status = Status::Pausing;
}
else {
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << m_current_status << " => " << m_current_status << LOG_END;
LOG_DEBUG(m_logger) << "JArrowTopology: request_pause() : " << current_status << " => " << current_status << LOG_END;
}
}

void JArrowTopology::achieve_pause() {
// This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to stop(), etc
std::lock_guard<std::mutex> lock(m_mutex);
if (m_current_status == Status::Running || m_current_status == Status::Pausing || m_current_status == Status::Draining) {
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << m_current_status << " => " << Status::Paused << LOG_END;
// This is meant to be used by the scheduler to tell us when all workers have stopped, so it is safe to finish(), etc
Status current_status = m_current_status;
if (current_status == Status::Running || current_status == Status::Pausing || current_status == Status::Draining) {
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << Status::Paused << LOG_END;
metrics.stop();
m_current_status = Status::Paused;
}
else {
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << m_current_status << " => " << m_current_status << LOG_END;
LOG_DEBUG(m_logger) << "JArrowTopology: achieve_pause() : " << current_status << " => " << current_status << LOG_END;
}
}

/// Finish is called by a single thread once the worker threads have all joined.
void JArrowTopology::finish() {
std::lock_guard<std::mutex> lock(m_mutex);
// This finalizes all arrows. Once this happens, we cannot restart the topology.
if (m_current_status == JArrowTopology::Status::Finished) {
LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << m_current_status << " => Finished" << LOG_END;
return;
}
LOG_DEBUG(m_logger) << "JArrowTopology: finish() : " << m_current_status << " => Finished" << LOG_END;
assert(m_current_status == Status::Paused);
// assert(m_current_status == Status::Paused);
for (auto arrow : arrows) {
arrow->finish();
}
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/Engine/JArrowTopology.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


struct JArrowTopology {
enum class Status { Paused, Running, Pausing, Draining, Finished };
enum class Status { Uninitialized, Paused, Running, Pausing, Draining, Finished };

using Event = std::shared_ptr<JEvent>;
using EventQueue = JMailbox<Event>;
Expand All @@ -38,8 +38,7 @@ struct JArrowTopology {
std::vector<EventQueue*> queues; // Queues shared between arrows
JProcessorMapping mapping;

std::mutex m_mutex; // Protects m_current_status
Status m_current_status = Status::Paused;
std::atomic<Status> m_current_status {Status::Uninitialized};
std::atomic_int64_t running_arrow_count {0}; // Detects when the topology has paused
// int64_t running_worker_count = 0; // Detects when the workers have all joined

Expand All @@ -56,6 +55,7 @@ struct JArrowTopology {

JLogger m_logger;

void initialize();
void drain();
void run(int nthreads);
void request_pause();
Expand Down
8 changes: 4 additions & 4 deletions src/libraries/JANA/Engine/JEventProcessorArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ void JEventProcessorArrow::execute(JArrowMetrics& result, size_t location_id) {

void JEventProcessorArrow::initialize() {

LOG_DEBUG(m_logger) << "JEventProcessorArrow: Initializing arrow '" << get_name() << "'" << LOG_END;
LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_processors) {
LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetType() << "'" << LOG_END;
processor->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetType() << "'" << LOG_END;
}
}

void JEventProcessorArrow::finalize() {
LOG_DEBUG(m_logger) << "JEventProcessorArrow: Finalizing arrow '" << get_name() << "'" << LOG_END;
LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_processors) {
LOG_INFO(m_logger) << "Finalizing JEventProcessor '" << processor->GetType() << "'" << LOG_END;
processor->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetType() << "'" << LOG_END;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/libraries/JANA/Engine/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) {
}

void JEventSourceArrow::initialize() {
LOG_INFO(m_logger) << "Initializing JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
m_source->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
}

void JEventSourceArrow::finalize() {
LOG_INFO(m_logger) << "Finalizing JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
m_source->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventSource '" << m_source->GetResourceName() << "' (" << m_source->GetTypeName() << ")" << LOG_END;
}
Loading