diff --git a/src/libraries/JANA/Topology/JArrow.cc b/src/libraries/JANA/Topology/JArrow.cc index 18c8656f3..df636ce68 100644 --- a/src/libraries/JANA/Topology/JArrow.cc +++ b/src/libraries/JANA/Topology/JArrow.cc @@ -1,41 +1,50 @@ #include +void JArrow::create_ports(size_t inputs, size_t outputs) { + m_ports.clear(); + for (size_t i=0; i* queue, size_t port) { // Place index is relative to whether it is an input or not // Port index, however, is agnostic to whether it is an input or not - if (port >= m_places.size()) { - throw JException("Attempting to attach to a non-existent place! arrow=%s, port=%d", m_name.c_str(), port); + if (port >= m_ports.size()) { + throw JException("Attempting to attach to a non-existent port! arrow=%s, port=%d", m_name.c_str(), port); } - m_places[port]->is_queue = true; - m_places[port]->place_ref = queue; + m_ports[port].queue = queue; } void JArrow::attach(JEventPool* pool, size_t port) { // Place index is relative to whether it is an input or not // Port index, however, is agnostic to whether it is an input or not - if (port >= m_places.size()) { + if (port >= m_ports.size()) { throw JException("Attempting to attach to a non-existent place! arrow=%s, port=%d", m_name.c_str(), port); } - m_places[port]->is_queue = false; - m_places[port]->place_ref = pool; + m_ports[port].pool = pool; } -JEvent* JArrow::pull(size_t input_port, size_t location_id) { +JEvent* JArrow::pull(size_t port_index, size_t location_id) { JEvent* event = nullptr; - auto& place = m_places[input_port]; - if (place->is_queue) { - auto queue = static_cast*>(place->place_ref); - queue->pop(&event, 1, 1, location_id); + auto& port = m_ports.at(port_index); + if (port.queue != nullptr) { + port.queue->pop(&event, 1, 1, location_id); + } + else if (port.pool != nullptr){ + port.pool->pop(&event, 1, 1, location_id); } else { - auto pool = static_cast(place->place_ref); - pool->pop(&event, 1, 1, location_id); + throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index); } - // If ether pop() failed, the returned event is nullptr + // If pop() failed, the returned event is nullptr return event; } @@ -43,15 +52,27 @@ JEvent* JArrow::pull(size_t input_port, size_t location_id) { void JArrow::push(OutputData& outputs, size_t output_count, size_t location_id) { for (size_t output = 0; output < output_count; ++output) { JEvent* event = outputs[output].first; - int port = outputs[output].second; - if (m_places[port]->is_queue) { - auto queue = static_cast*>(m_places[port]->place_ref); - queue->push(&event, 1, location_id); + int port_index = outputs[output].second; + Port& port = m_ports.at(port_index); + if (port.queue != nullptr) { + port.queue->push(&event, 1, location_id); + } + else if (port.pool != nullptr) { + bool clear_event = !port.is_input; + port.pool->push(&event, 1, clear_event, location_id); } else { - auto pool = static_cast(m_places[port]->place_ref); - bool clear_event = !m_places[port]->is_input; - pool->push(&event, 1, clear_event, location_id); + throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index); + } + } +} + +inline size_t JArrow::get_pending() { + size_t sum = 0; + for (Port& port : m_ports) { + if (port.is_input && port.queue != nullptr) { + sum += port.queue->size(); } } + return sum; } diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index e2d44993e..2cea3f12f 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -13,10 +13,6 @@ #include -struct Place; -using JEventQueue = JMailbox; - - class JArrow { friend class JScheduler; friend class JTopologyBuilder; @@ -24,6 +20,12 @@ class JArrow { public: using OutputData = std::array, 2>; + struct Port { + JMailbox* queue = nullptr; + JEventPool* pool = nullptr; + bool is_input = false; + }; + private: std::string m_name; // Used for human understanding bool m_is_parallel; // Whether or not it is safe to parallelize @@ -31,11 +33,10 @@ class JArrow { bool m_is_sink; // Whether or not tnis arrow contributes to the final event count JArrowMetrics m_metrics; // Performance information accumulated over all workers - std::vector m_listeners; // Downstream Arrows - protected: + std::vector m_ports; // Will eventually supplant m_listeners + std::vector m_listeners; // Downstream Arrows JLogger m_logger; - std::vector m_places; // Will eventually supplant m_listeners public: std::string get_name() { return m_name; } @@ -77,11 +78,7 @@ class JArrow { m_listeners.push_back(downstream); }; - void attach(Place* place) { - if (std::find(m_places.begin(), m_places.end(), place) == m_places.end()) { - m_places.push_back(place); - } - }; + void create_ports(size_t inputs, size_t outputs); void attach(JMailbox* queue, size_t port); void attach(JEventPool* pool, size_t port); @@ -91,27 +88,6 @@ class JArrow { }; -struct Place { - void* place_ref = nullptr; - bool is_queue = true; - bool is_input = false; - - Place(JArrow* parent, bool is_input) { - assert(parent != nullptr); - parent->attach(this); - this->is_input = is_input; - } -}; -inline size_t JArrow::get_pending() { - size_t sum = 0; - for (Place* place : m_places) { - if (place->is_input && place->is_queue) { - auto queue = static_cast*>(place->place_ref); - sum += queue->size(); - } - } - return sum; -} diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc index 6e971d2a7..feb9041e0 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -13,6 +13,8 @@ JEventMapArrow::JEventMapArrow(std::string name) { set_name(name); set_is_parallel(true); + create_ports(1, 1); + } void JEventMapArrow::add_source(JEventSource* source) { diff --git a/src/libraries/JANA/Topology/JEventMapArrow.h b/src/libraries/JANA/Topology/JEventMapArrow.h index e8d9e1230..346e1c7f5 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.h +++ b/src/libraries/JANA/Topology/JEventMapArrow.h @@ -15,9 +15,6 @@ class JEvent; class JEventMapArrow : public JTriggeredArrow { private: - Place m_input {this, true }; - Place m_output {this, false }; - std::vector m_sources; std::vector m_unfolders; std::vector m_procs; diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index de034a9a3..80b8a5db6 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -14,6 +14,7 @@ JEventSourceArrow::JEventSourceArrow(std::string name, std::vector { +public: + const size_t EVENT_IN = 0; + const size_t EVENT_OUT = 1; + private: std::vector m_sources; size_t m_current_source = 0; bool m_barrier_active = false; JEvent* m_pending_barrier_event = nullptr; - Place m_input {this, true}; - Place m_output {this, false}; - public: JEventSourceArrow(std::string name, std::vector sources); void set_input(JMailbox* queue) { - m_input.set_queue(queue); + m_ports[EVENT_IN].queue = queue; + m_ports[EVENT_IN].pool = nullptr; } void set_input(JEventPool* pool) { - m_input.set_pool(pool); + m_ports[EVENT_IN].queue = nullptr; + m_ports[EVENT_IN].pool = pool; } void set_output(JMailbox* queue) { - m_output.set_queue(queue); + m_ports[EVENT_OUT].queue = queue; + m_ports[EVENT_OUT].pool = nullptr; } void set_output(JEventPool* pool) { - m_output.set_pool(pool); + m_ports[EVENT_OUT].queue = nullptr; + m_ports[EVENT_OUT].pool = pool; } void initialize() final; diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JEventTapArrow.cc index 67f1b89b1..13a4b3f3c 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.cc +++ b/src/libraries/JANA/Topology/JEventTapArrow.cc @@ -10,6 +10,7 @@ JEventTapArrow::JEventTapArrow(std::string name) { set_name(name); + create_ports(1,1); } void JEventTapArrow::add_processor(JEventProcessor* proc) { diff --git a/src/libraries/JANA/Topology/JEventTapArrow.h b/src/libraries/JANA/Topology/JEventTapArrow.h index ec5265b61..0e14cda80 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.h +++ b/src/libraries/JANA/Topology/JEventTapArrow.h @@ -13,8 +13,6 @@ class JEventTapArrow : public JTriggeredArrow { private: std::vector m_procs; - Place m_input {this, true }; - Place m_output {this, false }; public: JEventTapArrow(std::string name); diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index c7b9b193f..85a4de05e 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -18,10 +18,6 @@ class JFoldArrow : public JTriggeredArrow { JEventLevel m_parent_level; JEventLevel m_child_level; - Place m_child_in {this, true}; - Place m_child_out {this, false}; - Place m_parent_out {this, false}; - public: JFoldArrow( std::string name, @@ -32,6 +28,7 @@ class JFoldArrow : public JTriggeredArrow { m_child_level(child_level) { set_name(name); + create_ports(1, 2); m_next_input_port = CHILD_IN; } @@ -40,28 +37,28 @@ class JFoldArrow : public JTriggeredArrow { } void attach_child_in(JMailbox* child_in) { - m_child_in.place_ref = child_in; - m_child_in.is_queue = true; + m_ports[CHILD_IN].queue = child_in; + m_ports[CHILD_IN].pool = nullptr; } void attach_child_out(JMailbox* child_out) { - m_child_out.place_ref = child_out; - m_child_out.is_queue = true; + m_ports[CHILD_OUT].queue = child_out; + m_ports[CHILD_OUT].pool = nullptr; } void attach_child_out(JEventPool* child_out) { - m_child_out.place_ref = child_out; - m_child_out.is_queue = false; + m_ports[CHILD_OUT].queue = nullptr; + m_ports[CHILD_OUT].pool = child_out; } void attach_parent_out(JEventPool* parent_out) { - m_parent_out.place_ref = parent_out; - m_parent_out.is_queue = false; + m_ports[PARENT_OUT].queue = nullptr; + m_ports[PARENT_OUT].pool = parent_out; } void attach_parent_out(JMailbox* parent_out) { - m_parent_out.place_ref = parent_out; - m_parent_out.is_queue = true; + m_ports[PARENT_OUT].queue = parent_out; + m_ports[PARENT_OUT].pool = nullptr; } void initialize() final { diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 29fa12c6c..b1dc6a01d 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -14,9 +14,6 @@ #include -using Event = std::shared_ptr; -using EventQueue = JMailbox; - JTopologyBuilder::JTopologyBuilder() { SetPrefix("jana"); } @@ -63,7 +60,7 @@ std::string JTopologyBuilder::print_topology() { for (JArrow* arrow : arrows) { show_row = true; - for (Place* place : arrow->m_places) { + for (JArrow::Port& port : arrow->m_ports) { if (show_row) { t | arrow->get_name(); t | arrow->is_parallel(); @@ -72,10 +69,11 @@ std::string JTopologyBuilder::print_topology() { else { t | "" | "" ; } + auto place_index = lookup[(port.queue!=nullptr) ? (void*) port.queue : (void*) port.pool]; - t | ((place->is_input) ? "Input ": "Output"); - t | ((place->is_queue) ? "Queue ": "Pool"); - t | lookup[place->place_ref]; + t | ((port.is_input) ? "Input ": "Output"); + t | ((port.queue != nullptr) ? "Queue ": "Pool"); + t | place_index; } } return t.Render(); @@ -148,26 +146,26 @@ void JTopologyBuilder::acquire_services(JServiceLocator *sl) { void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downstream, size_t down_index) { - auto queue = new EventQueue(m_max_inflight_events, mapping.get_loc_count(), m_enable_stealing); + auto queue = new JMailbox(m_max_inflight_events, mapping.get_loc_count(), m_enable_stealing); queues.push_back(queue); size_t i = 0; - for (Place* place : upstream->m_places) { - if (!place->is_input) { + for (JArrow::Port& port : upstream->m_ports) { + if (!port.is_input) { if (i++ == up_index) { // Found the correct output - place->is_queue = true; - place->place_ref = queue; + port.queue = queue; + port.pool = nullptr; } } } i = 0; - for (Place* place : downstream->m_places) { - if (place->is_input) { + for (JArrow::Port& port : downstream->m_ports) { + if (port.is_input) { if (i++ == down_index) { // Found the correct input - place->is_queue = true; - place->place_ref = queue; + port.queue = queue; + port.pool = nullptr; } } } diff --git a/src/libraries/JANA/Topology/JTriggeredArrow.h b/src/libraries/JANA/Topology/JTriggeredArrow.h index c9ea10b6c..8cff29e62 100644 --- a/src/libraries/JANA/Topology/JTriggeredArrow.h +++ b/src/libraries/JANA/Topology/JTriggeredArrow.h @@ -54,7 +54,7 @@ struct JTriggeredArrow : public JArrow { // their input pool. These mustn't be counted as "processed" in the metrics. size_t processed_count = 0; for (size_t output=0; outputis_input) { + if (!m_ports[outputs[output].second].is_input) { processed_count++; } } diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 14db8d3e8..3a64a4e0e 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -18,37 +18,33 @@ class JUnfoldArrow : public JTriggeredArrow { JEvent* m_parent_event = nullptr; JEvent* m_child_event = nullptr; - Place m_parent_in {this, true}; - Place m_child_in {this, true}; - Place m_child_out {this, false}; - public: JUnfoldArrow(std::string name, JEventUnfolder* unfolder) : m_unfolder(unfolder) { set_name(name); + create_ports(2, 1); m_next_input_port = PARENT_IN; } void attach_parent_in(JMailbox* parent_in) { - m_parent_in.place_ref = parent_in; - m_parent_in.is_queue = true; + m_ports[PARENT_IN].queue = parent_in; + m_ports[PARENT_IN].pool = nullptr; } void attach_child_in(JEventPool* child_in) { - m_child_in.place_ref = child_in; - m_child_in.is_queue = false; + m_ports[CHILD_IN].queue = nullptr; + m_ports[CHILD_IN].pool = child_in; } void attach_child_in(JMailbox* child_in) { - m_child_in.place_ref = child_in; - m_child_in.is_queue = true; + m_ports[CHILD_IN].queue = child_in; + m_ports[CHILD_IN].pool = nullptr; } void attach_child_out(JMailbox* child_out) { - m_child_out.place_ref = child_out; - m_child_out.is_queue = true; + m_ports[CHILD_OUT].queue = child_out; + m_ports[CHILD_OUT].pool = nullptr; } - void initialize() final { m_unfolder->DoInit(); LOG_INFO(m_logger) << "Initialized JEventUnfolder '" << m_unfolder->GetTypeName() << "'" << LOG_END; @@ -61,10 +57,9 @@ class JUnfoldArrow : public JTriggeredArrow { size_t get_pending() final { size_t sum = 0; - for (Place* place : m_places) { - if (place->is_input && place->is_queue) { - auto queue = static_cast*>(place->place_ref); - sum += queue->size(); + for (Port& port : m_ports) { + if (port.is_input && port.queue!=nullptr) { + sum += port.queue->size(); } } if (m_parent_event != nullptr) { diff --git a/src/programs/unit_tests/Topology/JArrowTests.cc b/src/programs/unit_tests/Topology/JArrowTests.cc index 6ea867112..42589a69a 100644 --- a/src/programs/unit_tests/Topology/JArrowTests.cc +++ b/src/programs/unit_tests/Topology/JArrowTests.cc @@ -8,10 +8,8 @@ struct TestData { int x; }; struct BasicParallelArrow : public JTriggeredArrow { - Place m_input {this, true }; - Place m_output {this, false }; - BasicParallelArrow() { + create_ports(1, 1); set_is_parallel(true); } diff --git a/src/programs/unit_tests/Topology/TestTopologyComponents.h b/src/programs/unit_tests/Topology/TestTopologyComponents.h index 0bf2b8734..4231ece3a 100644 --- a/src/programs/unit_tests/Topology/TestTopologyComponents.h +++ b/src/programs/unit_tests/Topology/TestTopologyComponents.h @@ -16,9 +16,6 @@ struct EventData { struct RandIntArrow : public JTriggeredArrow { - Place m_input {this, true }; - Place m_output {this, false }; - size_t emit_limit = 20; // How many to emit size_t emit_count = 0; // How many emitted so far int emit_sum = 0; // Sum of all ints emitted so far @@ -26,6 +23,7 @@ struct RandIntArrow : public JTriggeredArrow { RandIntArrow(std::string name, JEventPool* pool, JMailbox* output_queue) { set_name(name); set_is_source(true); + create_ports(1, 1); attach(pool, 0); attach(output_queue, 1); } @@ -57,12 +55,10 @@ struct RandIntArrow : public JTriggeredArrow { struct MultByTwoArrow : public JTriggeredArrow { - Place m_input {this, true }; - Place m_output {this, false }; - MultByTwoArrow(std::string name, JMailbox* input_queue, JMailbox* output_queue) { set_name(name); set_is_parallel(true); + create_ports(1, 1); attach(input_queue, 0); attach(output_queue, 1); } @@ -81,12 +77,10 @@ struct MultByTwoArrow : public JTriggeredArrow { struct SubOneArrow : public JTriggeredArrow { - Place m_input {this, true }; - Place m_output {this, false }; - SubOneArrow(std::string name, JMailbox* input_queue, JMailbox* output_queue) { set_name(name); set_is_parallel(true); + create_ports(1, 1); attach(input_queue, 0); attach(output_queue, 1); } @@ -107,14 +101,12 @@ struct SubOneArrow : public JTriggeredArrow { struct SumArrow : public JTriggeredArrow { - Place m_input {this, true }; - Place m_output {this, false }; - double sum = 0; SumArrow(std::string name, JMailbox* input_queue, JEventPool* pool) { set_name(name); set_is_sink(true); + create_ports(1, 1); attach(input_queue, 0); attach(pool, 1); }