Skip to content

Commit

Permalink
Clean up log debug/trace output
Browse files Browse the repository at this point in the history
nathanwbrei committed Sep 27, 2024
1 parent 1499a1c commit a1dc577
Showing 4 changed files with 44 additions and 37 deletions.
40 changes: 19 additions & 21 deletions src/libraries/JANA/Engine/JScheduler.cc
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ JArrow* JScheduler::next_assignment(uint32_t worker_id, JArrow* assignment, JArr

std::lock_guard<std::mutex> lock(m_mutex);

LOG_DEBUG(logger) << "Worker " << worker_id << " checking in: "
LOG_TRACE(logger) << "JScheduler: Worker " << worker_id << ": Returned arrow "
<< ((assignment == nullptr) ? "idle" : assignment->get_name()) << " -> " << to_string(last_result) << LOG_END;

// Check latest arrow back in
@@ -47,8 +47,8 @@ JArrow* JScheduler::next_assignment(uint32_t worker_id, JArrow* assignment, JArr

JArrow* next = checkout_unprotected();

LOG_DEBUG(logger) << "Worker " << worker_id << " assigned: "
<< ((next == nullptr) ? "idle" : next->get_name()) << LOG_END;
LOG_TRACE(logger) << "JScheduler: Worker " << worker_id << " assigned arrow "
<< ((next == nullptr) ? "(idle)" : next->get_name()) << LOG_END;
return next;

}
@@ -58,8 +58,8 @@ void JScheduler::last_assignment(uint32_t worker_id, JArrow* assignment, JArrowM

std::lock_guard<std::mutex> lock(m_mutex);

LOG_DEBUG(logger) << "Worker " << worker_id << " checking in: "
<< ((assignment == nullptr) ? "idle" : assignment->get_name())
LOG_TRACE(logger) << "JScheduler: Worker " << worker_id << ": returned arrow "
<< ((assignment == nullptr) ? "(idle)" : assignment->get_name())
<< " -> " << to_string(last_result) << "). Shutting down!" << LOG_END;

if (assignment != nullptr) {
@@ -110,7 +110,7 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l
as.status = ArrowStatus::Finalized;
m_topology_state.active_or_draining_arrow_count--;

LOG_DEBUG(logger) << "Deactivated arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END;
LOG_TRACE(logger) << "JScheduler: Deactivated arrow " << assignment->get_name() << " (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END;

for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) {
m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--;
@@ -119,12 +119,12 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l
else if (found_draining_stage_or_sink) {
// Drain arrow
as.status = ArrowStatus::Draining;
LOG_DEBUG(logger) << "Draining arrow '" << assignment->get_name() << "' (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Draining arrow " << assignment->get_name() << " (" << m_topology_state.active_or_draining_arrow_count << " remaining)" << LOG_END;
}

// Test if this was the last arrow running
if (m_topology_state.active_or_draining_arrow_count == 0) {
LOG_DEBUG(logger) << "All arrows are inactive. Deactivating topology." << LOG_END;
LOG_DEBUG(logger) << "JScheduler: All arrows are inactive. Deactivating topology." << LOG_END;
achieve_topology_pause_unprotected();
}
}
@@ -189,10 +189,10 @@ void JScheduler::initialize_topology() {
void JScheduler::drain_topology() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_topology_state.current_topology_status == TopologyStatus::Finalized) {
LOG_DEBUG(logger) << "JScheduler: drain(): Skipping because topology is already Finalized" << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Draining topology: Skipping because topology is already Finalized" << LOG_END;
return;
}
LOG_DEBUG(logger) << "JScheduler: drain_topology()" << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Draining topology" << LOG_END;

// We pause (as opposed to finish) for two reasons:
// 1. There might be workers in the middle of calling eventSource->GetEvent.
@@ -213,10 +213,10 @@ void JScheduler::run_topology(int nthreads) {
std::lock_guard<std::mutex> lock(m_mutex);
TopologyStatus current_status = m_topology_state.current_topology_status;
if (current_status == TopologyStatus::Running || current_status == TopologyStatus::Finalized) {
LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => " << current_status << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Running topology: " << current_status << " => " << current_status << LOG_END;
return;
}
LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => Running" << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Running topology: " << current_status << " => Running" << LOG_END;

bool source_found = false;
for (JArrow* arrow : m_topology->arrows) {
@@ -306,7 +306,7 @@ JScheduler::TopologyState JScheduler::get_topology_state() {

void JScheduler::run_arrow_unprotected(size_t index) {
auto& as = m_topology_state.arrow_states[index];
auto name = as.arrow->get_name();
const auto& name = as.arrow->get_name();
ArrowStatus status = as.status;

// if (status == ArrowStatus::Unopened) {
@@ -317,7 +317,7 @@ void JScheduler::run_arrow_unprotected(size_t index) {
// LOG_DEBUG(logger) << "Arrow '" << name << "' run() : " << status << " => " << status << LOG_END;
// return;
// }
LOG_DEBUG(logger) << "Arrow '" << name << "' run() : " << status << " => Active" << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Activating arrow " << name << " (Previous status was " << status << ")" << LOG_END;

m_topology_state.active_or_draining_arrow_count++;
for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) {
@@ -329,14 +329,13 @@ void JScheduler::run_arrow_unprotected(size_t index) {

void JScheduler::pause_arrow_unprotected(size_t index) {
auto& as = m_topology_state.arrow_states[index];
auto name = as.arrow->get_name();
const auto& name = as.arrow->get_name();
ArrowStatus status = as.status;

LOG_DEBUG(logger) << "JScheduler: Pausing arrow " << name << " (Previous status was " << status << ")" << LOG_END;
if (status != ArrowStatus::Active) {
LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => " << status << LOG_END;
return; // pause() is a no-op unless running
}
LOG_DEBUG(logger) << "JArrow '" << name << "' pause() : " << status << " => Inactive" << LOG_END;
m_topology_state.active_or_draining_arrow_count--;
for (size_t downstream: m_topology_state.arrow_states[index].downstream_arrow_indices) {
m_topology_state.arrow_states[downstream].active_or_draining_upstream_arrow_count--;
@@ -346,11 +345,10 @@ void JScheduler::pause_arrow_unprotected(size_t index) {

void JScheduler::finish_arrow_unprotected(size_t index) {
auto& as = m_topology_state.arrow_states[index];
auto name = as.arrow->get_name();
ArrowStatus status = as.status;
const auto& name = as.arrow->get_name();

LOG_DEBUG(logger) << "JArrow '" << name << "' finish() : " << status << " => Finalized" << LOG_END;
ArrowStatus old_status = as.status;
LOG_DEBUG(logger) << "JScheduler: Finishing arrow " << name << " (Previous status was " << old_status << ")" << LOG_END;
// if (old_status == ArrowStatus::Unopened) {
// LOG_DEBUG(logger) << "JArrow '" << name << "': Uninitialized!" << LOG_END;
// throw JException("JArrow::finish(): Arrow %s has not been initialized!", name.c_str());
@@ -362,7 +360,7 @@ void JScheduler::finish_arrow_unprotected(size_t index) {
}
}
if (old_status != ArrowStatus::Finalized) {
LOG_TRACE(logger) << "JArrow '" << name << "': Finalizing (this must only happen once)" << LOG_END;
LOG_DEBUG(logger) << "JScheduler: Finalizing arrow " << name << " (this must only happen once)" << LOG_END;
as.arrow->finalize();
}
m_topology_state.arrow_states[index].status = ArrowStatus::Finalized;
25 changes: 13 additions & 12 deletions src/libraries/JANA/Engine/JWorker.cc
Original file line number Diff line number Diff line change
@@ -172,7 +172,7 @@ const JException& JWorker::get_exception() const {
void JWorker::loop() {
using jclock_t = JWorkerMetrics::clock_t;
try {
LOG_DEBUG(logger) << "Worker " << m_worker_id << " has entered loop()." << LOG_END;
LOG_TRACE(logger) << "Worker " << m_worker_id << " has entered loop()." << LOG_END;
JArrowMetrics::Status last_result = JArrowMetrics::Status::NotRunYet;

while (m_run_state == RunState::Running) {
@@ -193,7 +193,7 @@ void JWorker::loop() {
auto useful_duration = jclock_t::duration::zero();

if (m_assignment == nullptr) {
LOG_DEBUG(logger) << "Worker " << m_worker_id << " shutdown driven by topology pause" << LOG_END;
LOG_TRACE(logger) << "Worker " << m_worker_id << " shutdown driven by topology pause" << LOG_END;
m_run_state = RunState::Stopped;
return;

@@ -211,7 +211,7 @@ void JWorker::loop() {
(m_run_state == RunState::Running) &&
(jclock_t::now() - start_time) < m_checkin_time) {

LOG_TRACE(logger) << "Worker " << m_worker_id << " is executing "
LOG_TRACE(logger) << "Worker " << m_worker_id << ": Executing "
<< m_assignment->get_name() << LOG_END;
auto before_execute_time = jclock_t::now();
m_assignment->execute(m_arrow_metrics, m_location_id);
@@ -220,8 +220,8 @@ void JWorker::loop() {


if (last_result == JArrowMetrics::Status::KeepGoing) {
LOG_DEBUG(logger) << "Worker " << m_worker_id << " succeeded at "
<< m_assignment->get_name() << LOG_END;
LOG_TRACE(logger) << "Worker " << m_worker_id << ": Executed "
<< m_assignment->get_name() << " with result KeepGoing" << LOG_END;
current_tries = 0;
backoff_duration = m_initial_backoff_time;
}
@@ -234,8 +234,9 @@ void JWorker::loop() {
else if (m_backoff_strategy == BackoffStrategy::Exponential) {
backoff_duration *= 2;
}
LOG_TRACE(logger) << "Worker " << m_worker_id << " backing off with "
<< m_assignment->get_name() << ", tries = " << current_tries
LOG_TRACE(logger) << "Worker " << m_worker_id << ": Executed "
<< m_assignment->get_name() << " with result " << to_string(last_result)
<< "; backoff try = " << current_tries
<< LOG_END;

std::this_thread::sleep_for(backoff_duration);
@@ -255,22 +256,22 @@ void JWorker::loop() {

m_scheduler->last_assignment(m_worker_id, m_assignment, last_result);
m_assignment = nullptr; // Worker has 'handed in' the assignment
LOG_DEBUG(logger) << "Worker " << m_worker_id << " shutdown due to worker->request_stop()." << LOG_END;
LOG_TRACE(logger) << "Worker " << m_worker_id << " shutdown due to worker->request_stop()." << LOG_END;
}
catch (const JException& e) {
// For now the excepting Worker prints the error, and then terminates the whole program.
// Eventually we want to unify error handling across JApplication::Run, and maybe even across the entire JApplication.
// This means that Workers must pass JExceptions back to the master thread.
LOG_INFO(logger) << "Worker " << m_worker_id << " shutdown due to JException: " << e.what() << LOG_END;
LOG_DEBUG(logger) << e << LOG_END;
LOG_ERROR(logger) << "Worker " << m_worker_id << " shutdown due to JException: " << e.what() << LOG_END;
LOG_ERROR(logger) << e << LOG_END;
m_run_state = RunState::Excepted;
m_exception = e;
m_japc->request_pause(); // We aren't going to even try to drain queues.
}
catch (std::runtime_error& e){
// same as above
LOG_INFO(logger) << "Worker " << m_worker_id << " shutdown due to std::runtime_error:" << e.what() << LOG_END;
LOG_DEBUG(logger) << e.what() << LOG_END;
LOG_ERROR(logger) << "Worker " << m_worker_id << " shutdown due to std::runtime_error:" << e.what() << LOG_END;
LOG_ERROR(logger) << e.what() << LOG_END;
m_run_state = RunState::Excepted;
m_exception = JException(e.what());
m_exception.nested_exception = std::current_exception();
12 changes: 8 additions & 4 deletions src/libraries/JANA/Topology/JEventProcessorArrow.cc
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ void JEventProcessorArrow::add_processor(JEventProcessor* processor) {
void JEventProcessorArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {


LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventProcessor* processor : m_processors) {
// TODO: Move me into JEventProcessor::DoMap
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope
@@ -41,24 +41,28 @@ void JEventProcessorArrow::process(Event* event, bool& success, JArrowMetrics::S

}
}
LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
}

void JEventProcessorArrow::initialize() {
LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_processors) {
LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
processor->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
}
LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END;
}

void JEventProcessorArrow::finalize() {
LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END;
LOG_DEBUG(m_logger) << "Finalizing arrow " << get_name() << LOG_END;
for (auto processor : m_processors) {
LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END;
processor->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END;
}
LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END;
}

4 changes: 4 additions & 0 deletions src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
@@ -30,19 +30,23 @@ void JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::Stat

while (m_current_source < m_sources.size()) {

LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
auto source_status = m_sources[m_current_source]->DoNext(*event);

if (source_status == JEventSource::Result::FailureFinished) {
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureFinished"<< LOG_END;
m_current_source++;
// TODO: Adjust nskip and nevents for the new source
}
else if (source_status == JEventSource::Result::FailureTryAgain){
// This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureTryAgain"<< LOG_END;
success = false;
arrow_status = JArrowMetrics::Status::ComeBackLater;
return;
}
else {
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
arrow_status = JArrowMetrics::Status::KeepGoing;
return;

0 comments on commit a1dc577

Please sign in to comment.