Skip to content

Commit

Permalink
Merge pull request #346 from JeffersonLab/nbrei_topologybuilder
Browse files Browse the repository at this point in the history
Finish JTopologyBuilder
  • Loading branch information
nathanwbrei authored Oct 15, 2024
2 parents 52f140f + 2111f8c commit ecb73f7
Show file tree
Hide file tree
Showing 23 changed files with 331 additions and 359 deletions.
1 change: 1 addition & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(JANA2_SOURCES
Topology/JEventProcessorArrow.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Topology/JTopologyBuilder.cc

Services/JComponentManager.cc
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ bool JArrowProcessingController::is_timed_out() {
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) {
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_pool_capacity * 1.0) / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
Expand Down
14 changes: 6 additions & 8 deletions src/libraries/JANA/Engine/JPerfSummary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {
os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl;
os << std::endl;

os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Thresh | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl;

for (auto as : s.arrows) {
os << " | "
Expand All @@ -37,17 +37,15 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {

if (!as.is_source) {

os << std::setw(7) << as.threshold << " |"
<< std::setw(8) << as.messages_pending << " |";
os << std::setw(8) << as.messages_pending << " |";
}
else {

os << " - | - |";
os << " - |";
}
os << std::setw(12) << as.total_messages_completed << " |"
<< std::endl;
}
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl;


os << " +--------------------------+-------------+--------------+----------------+--------------+----------------+" << std::endl;
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Engine/JPerfSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ struct ArrowSummary {
int running_upstreams;
bool has_backpressure;
size_t messages_pending;
size_t threshold;

size_t total_messages_completed;
size_t last_messages_completed;
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Engine/JScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ void JScheduler::summarize_arrows(std::vector<ArrowSummary>& summaries) {
summary.is_source = as.arrow->is_source();
summary.is_sink = as.arrow->is_sink();
summary.messages_pending = as.arrow->get_pending();
summary.threshold = as.arrow->get_threshold();

summary.thread_count = as.thread_count;
summary.running_upstreams = as.active_or_draining_upstream_arrow_count;
Expand Down
15 changes: 8 additions & 7 deletions src/libraries/JANA/JEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class JEventProcessor : public jana::components::JComponent,
// any contention.

if (m_status == Status::Uninitialized) {
DoInitialize();
throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
Expand Down Expand Up @@ -121,15 +121,16 @@ class JEventProcessor : public jana::components::JComponent,

auto run_number = event->GetRunNumber();

if (m_status == Status::Uninitialized) {
DoInitialize();
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
}
{
// Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun().
std::lock_guard<std::mutex> lock(m_mutex);

if (m_status == Status::Uninitialized) {
throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
}
if (m_last_run_number != run_number) {
if (m_last_run_number != -1) {
CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
Expand Down
4 changes: 3 additions & 1 deletion src/libraries/JANA/JEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class JEventSource : public jana::components::JComponent,
uint64_t m_nevents = 0;
bool m_enable_finish_event = false;
bool m_enable_get_objects = false;
bool m_enable_preprocess = false;


public:
Expand Down Expand Up @@ -145,11 +146,11 @@ class JEventSource : public jana::components::JComponent,

bool IsGetObjectsEnabled() const { return m_enable_get_objects; }
bool IsFinishEventEnabled() const { return m_enable_finish_event; }
bool IsPreprocessEnabled() const { return m_enable_preprocess; }

uint64_t GetNSkip() { return m_nskip; }
uint64_t GetNEvents() { return m_nevents; }

// TODO: Deprecate me
virtual std::string GetVDescription() const {
return "<description unavailable>";
} ///< Optional for getting description via source rather than JEventSourceGenerator
Expand All @@ -166,6 +167,7 @@ class JEventSource : public jana::components::JComponent,
/// which will hurt performance. Conceptually, FinishEvent isn't great, and so should be avoided when possible.
void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; }
void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; }
void EnablePreprocess(bool enable=true) { m_enable_preprocess = enable; }

void SetNEvents(uint64_t nevents) { m_nevents = nevents; };
void SetNSkip(uint64_t nskip) { m_nskip = nskip; };
Expand Down
70 changes: 0 additions & 70 deletions src/libraries/JANA/Topology/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include <iostream>
#include <atomic>
#include <cassert>
#include <vector>

Expand All @@ -29,8 +27,6 @@ class JArrow {
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows

Expand Down Expand Up @@ -77,11 +73,6 @@ class JArrow {
// TODO: Make no longer virtual
virtual size_t get_pending();

// TODO: Get rid of me
virtual size_t get_threshold();

virtual void set_threshold(size_t /* threshold */);

void attach(JArrow* downstream) {
m_listeners.push_back(downstream);
};
Expand Down Expand Up @@ -113,18 +104,11 @@ struct PlaceRefBase {
size_t max_item_count = 1;

virtual size_t get_pending() { return 0; }
virtual size_t get_threshold() { return 0; }
virtual void set_threshold(size_t) {}
};

template <typename T>
struct PlaceRef : public PlaceRefBase {

PlaceRef(JArrow* parent) {
assert(parent != nullptr);
parent->attach(this);
}

PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
parent->attach(this);
Expand All @@ -133,28 +117,6 @@ struct PlaceRef : public PlaceRefBase {
this->max_item_count = max_item_count;
}

PlaceRef(JArrow* parent, JMailbox<T*>* queue, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
assert(queue != nullptr);
parent->attach(this);
this->place_ref = queue;
this->is_queue = true;
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

PlaceRef(JArrow* parent, JPool<T>* pool, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
assert(pool != nullptr);
parent->attach(this);
this->place_ref = pool;
this->is_queue = false;
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

void set_queue(JMailbox<T*>* queue) {
assert(queue != nullptr);
this->place_ref = queue;
Expand All @@ -176,23 +138,6 @@ struct PlaceRef : public PlaceRefBase {
return 0;
}

size_t get_threshold() override {
assert(place_ref != nullptr);
if (is_input && is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
return queue->get_threshold();
}
return -1;
}

void set_threshold(size_t threshold) override {
assert(place_ref != nullptr);
if (is_input && is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
queue->set_threshold(threshold);
}
}

bool pull(Data<T>& data) {
assert(place_ref != nullptr);
if (is_input) { // Actually pull the data
Expand Down Expand Up @@ -267,19 +212,4 @@ inline size_t JArrow::get_pending() {
return sum;
}

inline size_t JArrow::get_threshold() {
size_t result = -1;
for (PlaceRefBase* place : m_places) {
result = std::min(result, place->get_threshold());
}
return result;

}

inline void JArrow::set_threshold(size_t threshold) {
for (PlaceRefBase* place : m_places) {
place->set_threshold(threshold);
}
}


1 change: 1 addition & 0 deletions src/libraries/JANA/Topology/JArrowMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once
#include <mutex>
#include <chrono>
#include <string>

class JArrowMetrics {

Expand Down
20 changes: 18 additions & 2 deletions src/libraries/JANA/Topology/JEventMapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void JEventMapArrow::add_processor(JEventProcessor* processor) {

void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventSource* source : m_sources) {
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope
source->Preprocess(**event);
Expand All @@ -45,16 +45,32 @@ void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status&
processor->DoMap(*event);
}
}
LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
}

void JEventMapArrow::initialize() {
LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_procs) {
if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) {
LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
processor->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
}
}
LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END;
}

void JEventMapArrow::finalize() {
LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_procs) {
if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) {
LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END;
processor->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END;
}
}
LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END;
}

2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St
}
else if (source_status == JEventSource::Result::FailureTryAgain){
// This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureTryAgain"<< LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
success = false;
arrow_status = JArrowMetrics::Status::ComeBackLater;
return event;
Expand Down
4 changes: 2 additions & 2 deletions src/libraries/JANA/Topology/JEventTapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ void JEventTapArrow::add_processor(JEventProcessor* proc) {

void JEventTapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventProcessor* proc : m_procs) {
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), proc->GetTypeName()); // times execution until this goes out of scope
if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
proc->DoTap(*event);
}
}
LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
}
Expand Down
50 changes: 4 additions & 46 deletions src/libraries/JANA/Topology/JFoldArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,58 +24,16 @@ class JFoldArrow : public JArrow {
public:
JFoldArrow(
std::string name,
//JEventFolder* folder,
JEventLevel parent_level,
JEventLevel child_level,
JMailbox<EventT*>* child_in,
JEventPool* child_out,
JMailbox<EventT*>* parent_out)
JEventLevel child_level)

: JArrow(std::move(name), false, false, false),
// m_folder(folder),
m_parent_level(parent_level),
m_child_level(child_level),
m_child_in(this, child_in, true, 1, 1),
m_child_out(this, child_out, false, 1, 1),
m_parent_out(this, parent_out, false, 1, 1)
{
}

JFoldArrow(
std::string name,
//JEventFolder* folder,
JEventLevel parent_level,
JEventLevel child_level,
JMailbox<EventT*>* child_in,
JMailbox<EventT*>* child_out,
JMailbox<EventT*>* parent_out)

: JArrow(std::move(name), false, false, false),
// m_folder(folder),
m_parent_level(parent_level),
m_child_level(child_level),
m_child_in(this, child_in, true, 1, 1),
m_child_out(this, child_out, false, 1, 1),
m_parent_out(this, parent_out, false, 1, 1)
{
}

JFoldArrow(
std::string name,
//JEventFolder* folder,
JEventLevel parent_level,
JEventLevel child_level,
JMailbox<EventT*>* child_in,
JEventPool* child_out,
JEventPool* parent_out)

: JArrow(std::move(name), false, false, false),
// m_folder(folder),
m_parent_level(parent_level),
m_child_level(child_level),
m_child_in(this, child_in, true, 1, 1),
m_child_out(this, child_out, false, 1, 1),
m_parent_out(this, parent_out, false, 1, 1)
m_child_in(this, true, 1, 1),
m_child_out(this, false, 1, 1),
m_parent_out(this, false, 1, 1)
{
}

Expand Down
Loading

0 comments on commit ecb73f7

Please sign in to comment.