diff --git a/src/examples/BlockExample/BlockExampleSource.h b/src/examples/BlockExample/BlockExampleSource.h index d8e7e05b9..0f20e95cf 100644 --- a/src/examples/BlockExample/BlockExampleSource.h +++ b/src/examples/BlockExample/BlockExampleSource.h @@ -34,10 +34,10 @@ class BlockExampleSource : public JBlockedEventSource { return Status::Success; } - virtual std::vector> DisentangleBlock(MyBlock& block, JEventPool& pool) { + virtual std::vector*> DisentangleBlock(MyBlock& block, JEventPool& pool) { LOG_DEBUG(m_logger) << "BlockDisentangler: Disentangling block " << block.block_number << LOG_END; - std::vector> events; + std::vector*> events; bool result = pool.get_many(events, block.data.size()); if (result == false) { @@ -51,7 +51,7 @@ class BlockExampleSource : public JBlockedEventSource { size_t i = 0; for (auto datum : block.data) { LOG_DEBUG(m_logger) << "BlockDisentangler: extracted event containing " << datum << LOG_END; - events[i++]->Insert(new MyObject(datum)); + (*(events[i++]))->Insert(new MyObject(datum)); } return events; } diff --git a/src/examples/BlockExample/main.cc b/src/examples/BlockExample/main.cc index 2a372f847..91c2cb609 100644 --- a/src/examples/BlockExample/main.cc +++ b/src/examples/BlockExample/main.cc @@ -17,13 +17,15 @@ std::shared_ptr configure_block_topology(std::shared_ptr; - auto event_queue = new JMailbox>; + auto event_queue = new JMailbox*>; + auto block_pool = new JPool(0, 1, false); + block_pool->init(); - // topology->queues.push_back(block_queue); - // FIXME: block_queue is a (very minor) memory leak topology->queues.push_back(event_queue); + topology->queues.push_back(block_queue); + topology->pools.push_back(block_pool); - auto block_source_arrow = new JBlockSourceArrow("block_source", source, block_queue); + auto block_source_arrow = new JBlockSourceArrow("block_source", source, block_pool, block_queue); auto block_disentangler_arrow = new JBlockDisentanglerArrow("block_disentangler", source, block_queue, event_queue, topology->event_pool); auto processor_arrow = new JEventProcessorArrow("processors", event_queue, nullptr, topology->event_pool); @@ -33,9 +35,6 @@ std::shared_ptr configure_block_topology(std::shared_ptrarrows.push_back(block_disentangler_arrow); topology->arrows.push_back(processor_arrow); - topology->sources.push_back(block_source_arrow); - topology->sinks.push_back(processor_arrow); - block_source_arrow->attach(block_disentangler_arrow); block_disentangler_arrow->attach(processor_arrow); diff --git a/src/examples/SubeventExample/SubeventExample.cc b/src/examples/SubeventExample/SubeventExample.cc index 9d0a06e23..27385a6c7 100644 --- a/src/examples/SubeventExample/SubeventExample.cc +++ b/src/examples/SubeventExample/SubeventExample.cc @@ -4,7 +4,6 @@ #include #include -#include #include #include #include @@ -80,8 +79,8 @@ struct SimpleProcessor : public JEventProcessor { int main() { MyProcessor processor; - JMailbox> events_in; - JMailbox> events_out; + JMailbox*> events_in; + JMailbox*> events_out; JMailbox> subevents_in; JMailbox> subevents_out; @@ -109,12 +108,10 @@ int main() { proc_arrow->add_processor(new SimpleProcessor); topology->arrows.push_back(source_arrow); - topology->sources.push_back(source_arrow); topology->arrows.push_back(split_arrow); topology->arrows.push_back(subprocess_arrow); topology->arrows.push_back(merge_arrow); topology->arrows.push_back(proc_arrow); - topology->sinks.push_back(proc_arrow); source_arrow->attach(split_arrow); split_arrow->attach(subprocess_arrow); diff --git a/src/libraries/JANA/CMakeLists.txt b/src/libraries/JANA/CMakeLists.txt index 18f787360..303521a0a 100644 --- a/src/libraries/JANA/CMakeLists.txt +++ b/src/libraries/JANA/CMakeLists.txt @@ -41,6 +41,7 @@ set(JANA2_SOURCES Engine/JEventSourceArrow.h Engine/JBlockSourceArrow.h Engine/JBlockDisentanglerArrow.h + Engine/JPool.h Engine/JMailbox.h Engine/JScheduler.cc diff --git a/src/libraries/JANA/Engine/JArrow.h b/src/libraries/JANA/Engine/JArrow.h index 99b92c103..d0d96a51a 100644 --- a/src/libraries/JANA/Engine/JArrow.h +++ b/src/libraries/JANA/Engine/JArrow.h @@ -14,33 +14,32 @@ #include "JArrowMetrics.h" #include #include +#include +#include -class JArrow { -public: - enum class NodeType {Source, Sink, Stage, Group}; - enum class BackoffStrategy { Constant, Linear, Exponential }; - using duration_t = std::chrono::steady_clock::duration; +#ifndef JANA2_ARROWDATA_MAX_SIZE +#define JANA2_ARROWDATA_MAX_SIZE 10 +#endif +struct PlaceRefBase; +class JArrow { private: - // Info const std::string m_name; // Used for human understanding const bool m_is_parallel; // Whether or not it is safe to parallelize - const NodeType m_type; + const bool m_is_source; // Whether or not this arrow should activate/drain the topology + const bool m_is_sink; // Whether or not tnis arrow contributes to the final event count JArrowMetrics m_metrics; // Performance information accumulated over all workers mutable std::mutex m_arrow_mutex; // Protects access to arrow properties - // Knobs + // TODO: Get rid of me size_t m_chunksize = 1; // Number of items to pop off the input queue at once - BackoffStrategy m_backoff_strategy = BackoffStrategy::Exponential; - duration_t m_initial_backoff_time = std::chrono::microseconds(1); - duration_t m_checkin_time = std::chrono::milliseconds(500); - unsigned m_backoff_tries = 4; friend class JScheduler; - std::vector m_listeners; // Downstream Arrows + std::vector m_listeners; // Downstream Arrows + std::vector m_places; // Will eventually supplant m_listeners, m_chunksize protected: // This is usable by subclasses. @@ -48,81 +47,36 @@ class JArrow { JLogger m_logger {JLogger::Level::OFF}; public: - - // Constants - bool is_parallel() { return m_is_parallel; } + bool is_source() { return m_is_source; } + bool is_sink() { return m_is_sink; } std::string get_name() { return m_name; } - - // Written externally - void set_logger(JLogger logger) { m_logger = logger; } + // TODO: Get rid of me void set_chunksize(size_t chunksize) { std::lock_guard lock(m_arrow_mutex); m_chunksize = chunksize; } + // TODO: Get rid of me size_t get_chunksize() const { std::lock_guard lock(m_arrow_mutex); return m_chunksize; } - void set_backoff_tries(unsigned backoff_tries) { - std::lock_guard lock(m_arrow_mutex); - m_backoff_tries = backoff_tries; - } - - unsigned get_backoff_tries() const { - std::lock_guard lock(m_arrow_mutex); - return m_backoff_tries; - } - - BackoffStrategy get_backoff_strategy() const { - std::lock_guard lock(m_arrow_mutex); - return m_backoff_strategy; - } - - void set_backoff_strategy(BackoffStrategy backoff_strategy) { - std::lock_guard lock(m_arrow_mutex); - m_backoff_strategy = backoff_strategy; - } - - duration_t get_initial_backoff_time() const { - std::lock_guard lock(m_arrow_mutex); - return m_initial_backoff_time; - } - - void set_initial_backoff_time(const duration_t& initial_backoff_time) { - std::lock_guard lock(m_arrow_mutex); - m_initial_backoff_time = initial_backoff_time; - } - - const duration_t& get_checkin_time() const { - std::lock_guard lock(m_arrow_mutex); - return m_checkin_time; - } - - void set_checkin_time(const duration_t& checkin_time) { - std::lock_guard lock(m_arrow_mutex); - m_checkin_time = checkin_time; - } // TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize JArrowMetrics& get_metrics() { return m_metrics; } - NodeType get_type() { - return m_type; - } - - JArrow(std::string name, bool is_parallel, NodeType arrow_type, size_t chunksize=16) : - m_name(std::move(name)), m_is_parallel(is_parallel), m_type(arrow_type), m_chunksize(chunksize) { + JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink, size_t chunksize=16) : + m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink), m_chunksize(chunksize) { m_metrics.clear(); }; @@ -135,28 +89,212 @@ class JArrow { virtual void finalize() {}; - virtual size_t get_pending() { return 0; } - - virtual size_t get_threshold() { return 0; } + // TODO: Make no longer virtual + virtual size_t get_pending(); - virtual void set_threshold(size_t /* threshold */) {} + // TODO: Get rid of me + virtual size_t get_threshold(); + virtual void set_threshold(size_t /* threshold */); void attach(JArrow* downstream) { m_listeners.push_back(downstream); }; + void attach(PlaceRefBase* place) { + if (std::find(m_places.begin(), m_places.end(), place) == m_places.end()) { + m_places.push_back(place); + } + }; }; +template +struct Data { + std::array items; + size_t item_count = 0; + size_t reserve_count = 0; + size_t location_id; + + Data(size_t location_id = 0) : location_id(location_id) { + items = {nullptr}; + } +}; + +struct PlaceRefBase { + void* place_ref = nullptr; + bool is_queue = true; + bool is_input = false; + size_t min_item_count = 1; + size_t max_item_count = 1; + + virtual size_t get_pending() { return 0; } + virtual size_t get_threshold() { return 0; } + virtual void set_threshold(size_t) {} +}; + +template +struct PlaceRef : public PlaceRefBase { + + PlaceRef(JArrow* parent) { + assert(parent != nullptr); + parent->attach(this); + } + + PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) { + assert(parent != nullptr); + parent->attach(this); + this->is_input = is_input; + this->min_item_count = min_item_count; + this->max_item_count = max_item_count; + } + + PlaceRef(JArrow* parent, JMailbox* queue, bool is_input, size_t min_item_count, size_t max_item_count) { + assert(parent != nullptr); + assert(queue != nullptr); + parent->attach(this); + this->place_ref = queue; + this->is_queue = true; + this->is_input = is_input; + this->min_item_count = min_item_count; + this->max_item_count = max_item_count; + } + + PlaceRef(JArrow* parent, JPool* pool, bool is_input, size_t min_item_count, size_t max_item_count) { + assert(parent != nullptr); + assert(pool != nullptr); + parent->attach(this); + this->place_ref = pool; + this->is_queue = false; + this->is_input = is_input; + this->min_item_count = min_item_count; + this->max_item_count = max_item_count; + } + + void set_queue(JMailbox* queue) { + assert(queue != nullptr); + this->place_ref = queue; + this->is_queue = true; + } + + void set_pool(JPool* pool) { + assert(pool != nullptr); + this->place_ref = pool; + this->is_queue = false; + } + + size_t get_pending() override { + assert(place_ref != nullptr); + if (is_input && is_queue) { + auto queue = static_cast*>(place_ref); + return queue->size(); + } + return 0; + } + + size_t get_threshold() override { + assert(place_ref != nullptr); + if (is_input && is_queue) { + auto queue = static_cast*>(place_ref); + return queue->get_threshold(); + } + return -1; + } + + void set_threshold(size_t threshold) override { + assert(place_ref != nullptr); + if (is_input && is_queue) { + auto queue = static_cast*>(place_ref); + queue->set_threshold(threshold); + } + } + + bool pull(Data& data) { + assert(place_ref != nullptr); + if (is_input) { // Actually pull the data + if (is_queue) { + auto queue = static_cast*>(place_ref); + data.item_count = queue->pop_and_reserve(data.items.data(), min_item_count, max_item_count, data.location_id); + data.reserve_count = data.item_count; + return (data.item_count >= min_item_count); + } + else { + auto pool = static_cast*>(place_ref); + data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id); + data.reserve_count = 0; + return (data.item_count >= min_item_count); + } + } + else { + if (is_queue) { + // Reserve a space on the output queue + data.item_count = 0; + auto queue = static_cast*>(place_ref); + data.reserve_count = queue->reserve(min_item_count, max_item_count, data.location_id); + return (data.reserve_count >= min_item_count); + } + else { + // No need to reserve on pool -- either there is space or limit_events_in_flight=false + data.item_count = 0; + data.reserve_count = 0; + return true; + } + } + } + + void revert(Data& data) { + assert(place_ref != nullptr); + if (is_queue) { + auto queue = static_cast*>(place_ref); + queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id); + } + else { + if (is_input) { + auto pool = static_cast*>(place_ref); + pool->push(data.items.data(), data.item_count, data.location_id); + } + } + } + + size_t push(Data& data) { + assert(place_ref != nullptr); + if (is_queue) { + auto queue = static_cast*>(place_ref); + queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id); + data.item_count = 0; + data.reserve_count = 0; + return is_input ? 0 : data.item_count; + } + else { + auto pool = static_cast*>(place_ref); + pool->push(data.items.data(), data.item_count, data.location_id); + data.item_count = 0; + data.reserve_count = 0; + return 1; + } + } +}; + +inline size_t JArrow::get_pending() { + size_t sum = 0; + for (PlaceRefBase* place : m_places) { + sum += place->get_pending(); + } + return sum; +} + +inline size_t JArrow::get_threshold() { + size_t result = -1; + for (PlaceRefBase* place : m_places) { + result = std::min(result, place->get_threshold()); + } + return result; + +} -inline std::ostream& operator<<(std::ostream& os, const JArrow::NodeType& nt) { - switch (nt) { - case JArrow::NodeType::Stage: os << "Stage"; break; - case JArrow::NodeType::Source: os << "Source"; break; - case JArrow::NodeType::Sink: os << "Sink"; break; - case JArrow::NodeType::Group: os << "Group"; break; +inline void JArrow::set_threshold(size_t threshold) { + for (PlaceRefBase* place : m_places) { + place->set_threshold(threshold); } - return os; } diff --git a/src/libraries/JANA/Engine/JArrowPerfSummary.cc b/src/libraries/JANA/Engine/JArrowPerfSummary.cc index 4d92c8b57..33c522e71 100644 --- a/src/libraries/JANA/Engine/JArrowPerfSummary.cc +++ b/src/libraries/JANA/Engine/JArrowPerfSummary.cc @@ -29,12 +29,12 @@ std::ostream& operator<<(std::ostream& os, const JArrowPerfSummary& s) { for (auto as : s.arrows) { os << " | " << std::setw(24) << std::left << as.arrow_name << " | " - << std::setw(6) << std::left << as.arrow_type << " | " + << std::setw(6) << std::left << (as.is_source ? "Src" : (as.is_sink ? "Sink" : "")) << " | " << std::setw(3) << std::right << (as.is_parallel ? " T " : " F ") << " | " << std::setw(7) << as.thread_count << " |" << std::setw(6) << as.chunksize << " |"; - if (as.arrow_type != JArrow::NodeType::Source) { + if (!as.is_source) { os << std::setw(7) << as.threshold << " |" << std::setw(8) << as.messages_pending << " |"; @@ -46,7 +46,7 @@ std::ostream& operator<<(std::ostream& os, const JArrowPerfSummary& s) { os << std::setw(12) << as.total_messages_completed << " |" << std::endl; } - os << " +--------------------------+------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; + os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; os << " +--------------------------+-------------+--------------+----------------+--------------+----------------+" << std::endl; diff --git a/src/libraries/JANA/Engine/JArrowPerfSummary.h b/src/libraries/JANA/Engine/JArrowPerfSummary.h index ae093465f..354a90d7d 100644 --- a/src/libraries/JANA/Engine/JArrowPerfSummary.h +++ b/src/libraries/JANA/Engine/JArrowPerfSummary.h @@ -16,8 +16,9 @@ struct ArrowSummary { std::string arrow_name; bool is_parallel; + bool is_source; + bool is_sink; size_t thread_count; - JArrow::NodeType arrow_type; int running_upstreams; bool has_backpressure; size_t messages_pending; diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index 14ed2321e..6fff722a2 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -240,8 +240,10 @@ std::unique_ptr JArrowProcessingController::measure_int } size_t monotonic_event_count = 0; - for (JArrow* arrow : m_topology->sinks) { - monotonic_event_count += arrow->get_metrics().get_total_message_count(); + for (JArrow* arrow : m_topology->arrows) { + if (arrow->is_sink()) { + monotonic_event_count += arrow->get_metrics().get_total_message_count(); + } } // Uptime diff --git a/src/libraries/JANA/Engine/JArrowTopology.cc b/src/libraries/JANA/Engine/JArrowTopology.cc index efb0da2d4..d184f0ca9 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.cc +++ b/src/libraries/JANA/Engine/JArrowTopology.cc @@ -10,15 +10,15 @@ JArrowTopology::JArrowTopology() = default; JArrowTopology::~JArrowTopology() { - LOG_DEBUG(m_logger) << "JArrowTopology: Entering destructor" << LOG_END; - // finish(); // We don't want to call finish() here in case there was an exception in JArrow::initialize(), finalize() - for (auto arrow : arrows) { delete arrow; } for (auto queue : queues) { delete queue; } + for (auto pool : pools) { + delete pool; + } } diff --git a/src/libraries/JANA/Engine/JArrowTopology.h b/src/libraries/JANA/Engine/JArrowTopology.h index a4e82f13b..5893eda8e 100644 --- a/src/libraries/JANA/Engine/JArrowTopology.h +++ b/src/libraries/JANA/Engine/JArrowTopology.h @@ -19,7 +19,7 @@ struct JArrowTopology { using Event = std::shared_ptr; - using EventQueue = JMailbox; + using EventQueue = JMailbox; explicit JArrowTopology(); virtual ~JArrowTopology(); @@ -28,13 +28,12 @@ struct JArrowTopology { // Ensure that ComponentManager stays alive at least as long as JArrowTopology does // Otherwise there is a potential use-after-free when JArrowTopology or JArrowProcessingController access components - std::shared_ptr event_pool; // TODO: Belongs somewhere else + std::shared_ptr event_pool; // TODO: Move into pools eventually JPerfMetrics metrics; std::vector arrows; - std::vector sources; // Sources needed for activation - std::vector sinks; // Sinks needed for finished message count // TODO: Not anymore - std::vector queues; // Queues shared between arrows + std::vector queues; // Queues shared between arrows + std::vector pools; // Pools shared between arrows JProcessorMapping mapping; size_t event_pool_size = 1; // Will be defaulted to nthreads by builder diff --git a/src/libraries/JANA/Engine/JBlockDisentanglerArrow.h b/src/libraries/JANA/Engine/JBlockDisentanglerArrow.h index 7bd1c94e3..bbc80268f 100644 --- a/src/libraries/JANA/Engine/JBlockDisentanglerArrow.h +++ b/src/libraries/JANA/Engine/JBlockDisentanglerArrow.h @@ -12,8 +12,8 @@ template class JBlockDisentanglerArrow : public JArrow { JBlockedEventSource* m_source; // non-owning - JMailbox* m_block_queue; // owning - JMailbox>* m_event_queue; // non-owning + JMailbox* m_block_queue; // non-owning + JMailbox*>* m_event_queue; // non-owning std::shared_ptr m_pool; size_t m_max_events_per_block = 40; @@ -22,10 +22,10 @@ class JBlockDisentanglerArrow : public JArrow { JBlockDisentanglerArrow(std::string name, JBlockedEventSource* source, JMailbox* block_queue, - JMailbox>* event_queue, + JMailbox*>* event_queue, std::shared_ptr pool ) - : JArrow(std::move(name), true, NodeType::Stage, 1) + : JArrow(std::move(name), true, false, false, 1) , m_source(source) , m_block_queue(block_queue) , m_event_queue(event_queue) @@ -33,7 +33,6 @@ class JBlockDisentanglerArrow : public JArrow { {} ~JBlockDisentanglerArrow() { - delete m_block_queue; } void set_max_events_per_block(size_t max_events_per_block) { @@ -57,15 +56,17 @@ class JBlockDisentanglerArrow : public JArrow { int requested_events = this->get_chunksize() * m_max_events_per_block; // chunksize is measured in blocks int reserved_events = m_event_queue->reserve(requested_events, location_id); - int reserved_blocks = reserved_events / m_max_events_per_block; // truncate + int requested_blocks = reserved_events / m_max_events_per_block; // truncate std::vector block_buffer; // TODO: Get rid of allocations - std::vector> event_buffer; + std::vector*> event_buffer; - auto input_queue_status = m_block_queue->pop(block_buffer, reserved_blocks, location_id); + auto input_queue_status = m_block_queue->pop(block_buffer, requested_blocks, location_id); + int obtained_blocks = block_buffer.size(); for (auto block : block_buffer) { auto events = m_source->DisentangleBlock(*block, *m_pool); event_buffer.insert(event_buffer.end(), events.begin(), events.end()); + delete block; } LOG_TRACE(m_logger) << "JBlockDisentanglerArrow: successfully emitting " << event_buffer.size() << " events" << LOG_END; @@ -73,7 +74,7 @@ class JBlockDisentanglerArrow : public JArrow { if (reserved_events == 0 || input_queue_status == JMailbox::Status::Congested || - output_queue_status == JMailbox::Status::Full || + output_queue_status == JMailbox::Status::Full || input_queue_status == JMailbox::Status::Empty ) { status = JArrowMetrics::Status::ComeBackLater; @@ -81,7 +82,7 @@ class JBlockDisentanglerArrow : public JArrow { else { status = JArrowMetrics::Status::KeepGoing; } - result.update(status, reserved_blocks, 1, latency, overhead); + result.update(status, obtained_blocks, 1, latency, overhead); } }; diff --git a/src/libraries/JANA/Engine/JBlockSourceArrow.h b/src/libraries/JANA/Engine/JBlockSourceArrow.h index fd3908785..11c2c5656 100644 --- a/src/libraries/JANA/Engine/JBlockSourceArrow.h +++ b/src/libraries/JANA/Engine/JBlockSourceArrow.h @@ -5,74 +5,39 @@ #ifndef JANA2_JBLOCKSOURCEARROW_H #define JANA2_JBLOCKSOURCEARROW_H -#include +#include #include #include template -class JBlockSourceArrow : public JArrow { +class JBlockSourceArrow : public JPipelineArrow, T> { JBlockedEventSource* m_source; // non-owning - JMailbox* m_block_queue; // non-owning - - T* m_next_block = nullptr; public: - JBlockSourceArrow(std::string name, JBlockedEventSource* source, JMailbox* block_queue) - : JArrow(name, false, NodeType::Source, 1) + JBlockSourceArrow(std::string name, JBlockedEventSource* source, JPool* pool, JMailbox* block_queue) + : JPipelineArrow,T>(name, false, true, false, nullptr, block_queue, pool) , m_source(source) - , m_block_queue(block_queue) {} void initialize() final { - LOG_DEBUG(m_logger) << "JBlockDisentanglerArrow '" << get_name() << "': " << "Initializing" << LOG_END; m_source->Initialize(); } - void execute(JArrowMetrics& result, size_t location_id) final { - - JArrowMetrics::Status status; - JArrowMetrics::duration_t latency; - JArrowMetrics::duration_t overhead; // TODO: Populate these - size_t message_count = 0; - - int requested_count = this->get_chunksize(); - std::vector chunk_buffer; // TODO: Get rid of allocations - int reserved_count = m_block_queue->reserve(requested_count, location_id); - + void process(T* block, bool& success, JArrowMetrics::Status& status) { using Status = typename JBlockedEventSource::Status; - if (reserved_count != 0) { - - Status lambda_result = Status::Success; - for (int i=0; iNextBlock(*m_next_block); - if (lambda_result == Status::Success) { - LOG_TRACE(m_logger) << "JBlockSourceArrow: Success! Pushing valid block to chunk buffer" << LOG_END; - chunk_buffer.push_back(m_next_block); - m_next_block = nullptr; - } - } - - // We have to return our reservation regardless of whether our pop succeeded - LOG_TRACE(m_logger) << "JBlockSourceArrow: Pushing " << chunk_buffer.size() << " blocks to queue" << LOG_END; - m_block_queue->push(chunk_buffer, reserved_count, location_id); - - if (lambda_result == Status::Success) { - status = JArrowMetrics::Status::KeepGoing; - } - else if (lambda_result == Status::FailTryAgain) { - status = JArrowMetrics::Status::ComeBackLater; - } - else if (lambda_result == Status::FailFinished) { - status = JArrowMetrics::Status::Finished; - } - } - else { // reserved_count = 0 => downstream is full - status = JArrowMetrics::Status::ComeBackLater; - } - result.update(status, message_count, 1, latency, overhead); + Status lambda_result = m_source->NextBlock(*block); + if (lambda_result == Status::Success) { + success = true; + status = JArrowMetrics::Status::KeepGoing; + } + else if (lambda_result == Status::FailTryAgain) { + success = false; + status = JArrowMetrics::Status::ComeBackLater; + } + else if (lambda_result == Status::FailFinished) { + success = false; + status = JArrowMetrics::Status::Finished; + } } }; diff --git a/src/libraries/JANA/Engine/JEventProcessorArrow.cc b/src/libraries/JANA/Engine/JEventProcessorArrow.cc index 621abd580..ae4c5faea 100644 --- a/src/libraries/JANA/Engine/JEventProcessorArrow.cc +++ b/src/libraries/JANA/Engine/JEventProcessorArrow.cc @@ -13,70 +13,33 @@ JEventProcessorArrow::JEventProcessorArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue, std::shared_ptr pool) - : JArrow(std::move(name), true, NodeType::Sink) - , m_input_queue(input_queue) - , m_output_queue(output_queue) - , m_pool(std::move(pool)) { -} + : JPipelineArrow(std::move(name), + true, + false, + true, + input_queue, + output_queue, + pool.get()) {} void JEventProcessorArrow::add_processor(JEventProcessor* processor) { m_processors.push_back(processor); } -void JEventProcessorArrow::execute(JArrowMetrics& result, size_t location_id) { - - auto start_total_time = std::chrono::steady_clock::now(); - - Event x; - bool success; - auto in_status = m_input_queue->pop(x, success, location_id); - LOG_TRACE(m_logger) << "JEventProcessorArrow '" << get_name() << "' [" << location_id << "]: " - << "pop() returned " << ((success) ? "success" : "failure") - << "; queue is now " << in_status << LOG_END; - - auto start_latency_time = std::chrono::steady_clock::now(); - if (success) { - LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Starting event# " << x->GetEventNumber() << LOG_END; - for (JEventProcessor* processor : m_processors) { - JCallGraphEntryMaker cg_entry(*x->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope - processor->DoMap(x); - } - LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Finished event# " << x->GetEventNumber() << LOG_END; - } - auto end_latency_time = std::chrono::steady_clock::now(); +void JEventProcessorArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { + - auto out_status = EventQueue::Status::Ready; - - if (success) { - if (m_output_queue != nullptr) { - // This is NOT the last arrow in the topology. Pass the event onwards. - out_status = m_output_queue->push(x, location_id); - } - else { - // This IS the last arrow in the topology. Notify the event source and return event to the pool. - if( auto es = x->GetJEventSource() ) es->DoFinish(*x); - m_pool->put(x, location_id); - } - } - auto end_queue_time = std::chrono::steady_clock::now(); - - JArrowMetrics::Status status; - if (in_status == EventQueue::Status::Empty) { - status = JArrowMetrics::Status::ComeBackLater; - } - else if (in_status == EventQueue::Status::Ready && out_status == EventQueue::Status::Ready) { - status = JArrowMetrics::Status::KeepGoing; + LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END; + for (JEventProcessor* processor : m_processors) { + // TODO: Move me into JEventProcessor::DoMap + JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope + processor->DoMap(*event); } - else { - status = JArrowMetrics::Status::ComeBackLater; - } - auto latency = (end_latency_time - start_latency_time); - auto overhead = (end_queue_time - start_total_time) - latency; - result.update(status, success, 1, latency, overhead); + LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END; + success = true; + status = JArrowMetrics::Status::KeepGoing; } void JEventProcessorArrow::initialize() { - LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; for (auto processor : m_processors) { processor->DoInitialize(); @@ -92,15 +55,3 @@ void JEventProcessorArrow::finalize() { } } -size_t JEventProcessorArrow::get_pending() { - return m_input_queue->size(); -} - -size_t JEventProcessorArrow::get_threshold() { - return m_input_queue->get_threshold(); -} - -void JEventProcessorArrow::set_threshold(size_t threshold) { - m_input_queue->set_threshold(threshold); -} - diff --git a/src/libraries/JANA/Engine/JEventProcessorArrow.h b/src/libraries/JANA/Engine/JEventProcessorArrow.h index d48c4f6ec..e710fbcd2 100644 --- a/src/libraries/JANA/Engine/JEventProcessorArrow.h +++ b/src/libraries/JANA/Engine/JEventProcessorArrow.h @@ -7,25 +7,19 @@ #include -#include -#include +#include class JEventPool; -class JEventProcessorArrow : public JArrow { +using Event = std::shared_ptr; +using EventQueue = JMailbox; -public: - using Event = std::shared_ptr; - using EventQueue = JMailbox; +class JEventProcessorArrow : public JPipelineArrow { private: std::vector m_processors; - EventQueue* m_input_queue; - EventQueue* m_output_queue; - std::shared_ptr m_pool; public: - JEventProcessorArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue, @@ -33,14 +27,10 @@ class JEventProcessorArrow : public JArrow { void add_processor(JEventProcessor* processor); + void process(Event* event, bool& success, JArrowMetrics::Status& status); + void initialize() final; void finalize() final; - void execute(JArrowMetrics& result, size_t location_id) final; - - size_t get_pending() final; - size_t get_threshold() final; - void set_threshold(size_t) final; - }; diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.cc b/src/libraries/JANA/Engine/JEventSourceArrow.cc index 50cf07d90..defaccf38 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.cc +++ b/src/libraries/JANA/Engine/JEventSourceArrow.cc @@ -16,94 +16,41 @@ JEventSourceArrow::JEventSourceArrow(std::string name, EventQueue* output_queue, std::shared_ptr pool ) - : JArrow(name, false, NodeType::Source) - , m_sources(sources) - , m_output_queue(output_queue) - , m_pool(pool) { + : JPipelineArrow(name, false, true, false, nullptr, output_queue, pool.get()), m_sources(sources) { } +void JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::Status& arrow_status) { -void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { - - JEventSource::ReturnStatus in_status = JEventSource::ReturnStatus::Success; - auto start_time = std::chrono::steady_clock::now(); - - auto chunksize = get_chunksize(); - auto reserved_count = m_output_queue->reserve(chunksize, location_id); - auto emit_count = reserved_count; - - if (reserved_count != chunksize) { - // Ensures that the source _only_ emits in increments of - // chunksize, which happens to come in very handy for - // processing entangled event blocks - in_status = JEventSource::ReturnStatus::TryAgain; - emit_count = 0; - LOG_DEBUG(m_logger) << "JEventSourceArrow asked for " << chunksize << ", but only reserved " << reserved_count << LOG_END; + // If there are no sources available then we are automatically finished. + if (m_sources.empty()) { + success = false; + arrow_status = JArrowMetrics::Status::Finished; + return; } - else { - for (size_t i=0; iget(location_id); - if (event == nullptr) { - in_status = JEventSource::ReturnStatus::TryAgain; - break; - } - - // If there are no sources available then we are automatically finished. - if( m_sources.empty() ) in_status = JEventSource::ReturnStatus::Finished; - - while (m_current_source < m_sources.size()) { - in_status = m_sources[m_current_source]->DoNext(event); - if (in_status == JEventSource::ReturnStatus::Finished) { - m_current_source++; - // TODO: Adjust nskip and nevents for the new source - } - else { - // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater - break; - } - } - if (in_status == JEventSource::ReturnStatus::Success) { - m_chunk_buffer.push_back(std::move(event)); - } - else { - m_pool->put(event, location_id); - } - } - } + while (m_current_source < m_sources.size()) { - auto latency_time = std::chrono::steady_clock::now(); - auto message_count = m_chunk_buffer.size(); - auto out_status = m_output_queue->push(m_chunk_buffer, reserved_count, location_id); - auto finished_time = std::chrono::steady_clock::now(); + auto source_status = m_sources[m_current_source]->DoNext(*event); - if (message_count != 0) { - LOG_DEBUG(m_logger) << "JEventSourceArrow '" << get_name() << "' [" << location_id << "]: " - << "Emitted " << message_count << " events; last GetEvent " - << ((in_status==JEventSource::ReturnStatus::Success) ? "succeeded" : "failed") - << LOG_END; - } - else { - LOG_TRACE(m_logger) << "JEventSourceArrow emitted nothing" << LOG_END; - } - - auto latency = latency_time - start_time; - auto overhead = finished_time - latency_time; - JArrowMetrics::Status status; - - if (in_status == JEventSource::ReturnStatus::Finished) { - // finish(); - // TODO: NWB: It is extra very important to not call finish() here - status = JArrowMetrics::Status::Finished; - } - else if (in_status == JEventSource::ReturnStatus::Success && out_status == EventQueue::Status::Ready) { - status = JArrowMetrics::Status::KeepGoing; - } - else { - status = JArrowMetrics::Status::ComeBackLater; + if (source_status == JEventSource::ReturnStatus::Finished) { + m_current_source++; + // TODO: Adjust nskip and nevents for the new source + } + else if (source_status == JEventSource::ReturnStatus::TryAgain){ + // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater + success = false; + arrow_status = JArrowMetrics::Status::ComeBackLater; + return; + } + else { + success = true; + arrow_status = JArrowMetrics::Status::KeepGoing; + return; + } } - result.update(status, message_count, 1, latency, overhead); + success = false; + arrow_status = JArrowMetrics::Status::Finished; } void JEventSourceArrow::initialize() { diff --git a/src/libraries/JANA/Engine/JEventSourceArrow.h b/src/libraries/JANA/Engine/JEventSourceArrow.h index 2f6172599..1d6d6bb96 100644 --- a/src/libraries/JANA/Engine/JEventSourceArrow.h +++ b/src/libraries/JANA/Engine/JEventSourceArrow.h @@ -6,29 +6,24 @@ #ifndef JANA2_JEVENTSOURCEARROW_H #define JANA2_JEVENTSOURCEARROW_H -#include -#include -#include +#include using Event = std::shared_ptr; -using EventQueue = JMailbox; - +using EventQueue = JMailbox; class JEventPool; -class JEventSourceArrow : public JArrow { +class JEventSourceArrow : public JPipelineArrow { private: std::vector m_sources; size_t m_current_source = 0; - EventQueue* m_output_queue; - std::shared_ptr m_pool; - std::vector m_chunk_buffer; public: JEventSourceArrow(std::string name, std::vector sources, EventQueue* output_queue, std::shared_ptr pool); void initialize() final; void finalize() final; - void execute(JArrowMetrics& result, size_t location_id) final; + + void process(Event* event, bool& success, JArrowMetrics::Status& status); }; #endif //JANA2_JEVENTSOURCEARROW_H diff --git a/src/libraries/JANA/Engine/JJunctionArrow.h b/src/libraries/JANA/Engine/JJunctionArrow.h new file mode 100644 index 000000000..a48975fc2 --- /dev/null +++ b/src/libraries/JANA/Engine/JJunctionArrow.h @@ -0,0 +1,101 @@ +// Copyright 2023, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + +#pragma once + +#include +#include +#include + + + +template +class JJunctionArrow : public JArrow { + +protected: + PlaceRef first_input {this}; + PlaceRef first_output {this}; + PlaceRef second_input {this}; + PlaceRef second_output {this}; + +public: + using Status = JArrowMetrics::Status; + + JJunctionArrow(std::string name, + bool is_parallel, + bool is_source, + bool is_sink + ) + : JArrow(std::move(name), is_parallel, is_source, is_sink) + { + } + + bool try_pull_all(Data& fi, Data& fo, Data& si, Data& so) { + + bool success; + success = first_input.pull(fi); + if (! success) { + return false; + } + success = first_output.pull(fo); + if (! success) { + first_input.revert(fi); + return false; + } + success = second_input.pull(si); + if (! success) { + first_input.revert(fi); + first_output.revert(fo); + return false; + } + success = second_output.pull(so); + if (! success) { + first_input.revert(fi); + first_output.revert(fo); + second_input.revert(si); + return false; + } + return true; + } + + size_t push_all(Data& fi, Data& fo, Data& si, Data& so) { + size_t message_count = 0; + message_count += first_input.push(fi); + message_count += first_output.push(fo); + message_count += second_input.push(si); + message_count += second_output.push(so); + return message_count; + } + + void execute(JArrowMetrics& result, size_t location_id) final { + + auto start_total_time = std::chrono::steady_clock::now(); + + Data first_input_data {location_id}; + Data first_output_data {location_id}; + Data second_input_data {location_id}; + Data second_output_data {location_id}; + + bool success = try_pull_all(first_input_data, first_output_data, second_input_data, second_output_data); + if (success) { + + auto start_processing_time = std::chrono::steady_clock::now(); + auto process_status = static_cast(this)->process(first_input_data, first_output_data, second_input_data, second_output_data); + auto end_processing_time = std::chrono::steady_clock::now(); + size_t events_processed = push_all(first_input_data, first_output_data, second_input_data, second_output_data); + + auto end_total_time = std::chrono::steady_clock::now(); + auto latency = (end_processing_time - start_processing_time); + auto overhead = (end_total_time - start_total_time) - latency; + result.update(process_status, events_processed, 1, latency, overhead); + return; + } + else { + auto end_total_time = std::chrono::steady_clock::now(); + result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time); + return; + } + } +}; + + diff --git a/src/libraries/JANA/Engine/JMailbox.h b/src/libraries/JANA/Engine/JMailbox.h index 2ca6432e1..521287e77 100644 --- a/src/libraries/JANA/Engine/JMailbox.h +++ b/src/libraries/JANA/Engine/JMailbox.h @@ -20,36 +20,47 @@ /// - the underlying queue may be shared by all threads, NUMA-domain-local, or thread-local /// - the Arrow doesn't have to know anything about locality. /// -/// To handle memory locality at different granularities, we introduce the concept of a domain. -/// Each thread belongs to exactly one domain. Domains are represented by contiguous unsigned +/// To handle memory locality at different granularities, we introduce the concept of a location. +/// Each thread belongs to exactly one location, represented by contiguous unsigned /// ints starting at 0. While JArrows are wired to one logical JMailbox, JWorkers interact with -/// the physical DomainLocalMailbox corresponding to their very own memory domain. +/// the physical LocalQueue corresponding to their location. Locations prevent events from crossing +/// NUMA domains as they get picked up by different JWorker threads. /// /// \tparam T must be moveable. Usually this is unique_ptr. /// /// Improvements: -/// 1. Pad DomainLocalMailbox +/// 1. Pad LocalQueue /// 2. Enable work stealing -/// 3. Triple mutex trick to give push() priority? -template -class JMailbox { +class JQueue { +protected: + size_t m_capacity; + size_t m_locations_count; + bool m_enable_work_stealing = false; + +public: + inline size_t get_threshold() { return m_capacity; } + inline size_t get_locations_count() { return m_locations_count; } + inline bool is_work_stealing_enabled() { return m_enable_work_stealing; } + + + inline JQueue(size_t threshold, size_t locations_count, bool enable_work_stealing) + : m_capacity(threshold), m_locations_count(locations_count), m_enable_work_stealing(enable_work_stealing) {} + virtual ~JQueue() = default; +}; -private: +template +class JMailbox : public JQueue { - struct LocalMailbox { + struct LocalQueue { std::mutex mutex; std::deque queue; size_t reserved_count = 0; }; // TODO: Copy these params into DLMB for better locality - size_t m_threshold; - size_t m_locations_count; - bool m_enable_work_stealing = false; - std::unique_ptr m_mailboxes; - JLogger m_logger; + std::unique_ptr m_queues; public: @@ -68,36 +79,37 @@ class JMailbox { /// threshold: the (soft) maximum number of items in the queue at any time - /// domain_count: the number of domains - /// enable_work_stealing: allow domains to pop from other domains' queues when theirs is empty + /// locations_count: the number of locations. More locations = better NUMA performance, worse load balancing + /// enable_work_stealing: allow events to cross locations only when no other work is available. Improves aforementioned load balancing. JMailbox(size_t threshold=100, size_t locations_count=1, bool enable_work_stealing=false) - : m_threshold(threshold) - , m_locations_count(locations_count) - , m_enable_work_stealing(enable_work_stealing) { + : JQueue(threshold, locations_count, enable_work_stealing) { - m_mailboxes = std::unique_ptr(new LocalMailbox[locations_count]); + m_queues = std::unique_ptr(new LocalQueue[locations_count]); } virtual ~JMailbox() { - //delete [] m_mailboxes; + //delete [] m_queues; } - /// size() counts the number of items in the queue across all domains + // We can do this (for now) because we use a deque underneath, so threshold is 'soft' + inline void set_threshold(size_t threshold) { m_capacity = threshold; } + + /// size() counts the number of items in the queue across all locations /// This should be used sparingly because it will mess up a bunch of caches. /// Meant to be used by measure_perf() size_t size() { size_t result = 0; for (size_t i = 0; i lock(m_mailboxes[i].mutex); - result += m_mailboxes[i].queue.size(); + std::lock_guard lock(m_queues[i].mutex); + result += m_queues[i].queue.size(); } return result; }; - /// size(domain) counts the number of items in the queue for a particular domain + /// size(location_id) counts the number of items in the queue for a particular location /// Meant to be used by Scheduler::next_assignment() and measure_perf(), eventually - size_t size(size_t domain) { - return m_mailboxes[domain].queue.size(); + size_t size(size_t location_id) { + return m_queues[location_id].queue.size(); } /// reserve(requested_count) keeps our queues bounded in size. The caller should @@ -107,11 +119,11 @@ class JMailbox { /// reserved on the output queue. Note that because the input queue may return /// fewer items than requested, the caller must push their original reserved_count /// alongside the items, to avoid a "reservation leak". - size_t reserve(size_t requested_count, size_t domain = 0) { + size_t reserve(size_t requested_count, size_t location_id = 0) { - LocalMailbox& mb = m_mailboxes[domain]; + LocalQueue& mb = m_queues[location_id]; std::lock_guard lock(mb.mutex); - size_t doable_count = m_threshold - mb.queue.size() - mb.reserved_count; + size_t doable_count = m_capacity - mb.queue.size() - mb.reserved_count; if (doable_count > 0) { size_t reservation = std::min(doable_count, requested_count); mb.reserved_count += reservation; @@ -120,45 +132,32 @@ class JMailbox { return 0; }; - /// push(items, reserved_count, domain) This function will always + /// push(items, reserved_count, location_id) This function will always /// succeed, although it may exceed the threshold if the caller didn't reserve /// space, and it may take a long time because it will wait on a mutex. /// Note that if the caller had called reserve(), they must pass in the reserved_count here. - Status push(std::vector& buffer, size_t reserved_count = 0, size_t domain = 0) { + Status push(std::vector& buffer, size_t reserved_count = 0, size_t location_id = 0) { - auto& mb = m_mailboxes[domain]; + auto& mb = m_queues[location_id]; std::lock_guard lock(mb.mutex); mb.reserved_count -= reserved_count; for (const T& t : buffer) { mb.queue.push_back(std::move(t)); } buffer.clear(); - if (mb.queue.size() > m_threshold) { + if (mb.queue.size() > m_capacity) { return Status::Full; } return Status::Ready; } - Status push(T& item, size_t reserved_count = 0, size_t domain = 0) { - auto& mb = m_mailboxes[domain]; - std::lock_guard lock(mb.mutex); - mb.reserved_count -= reserved_count; - mb.queue.push_back(std::move(item)); - size_t size = mb.queue.size(); - if (size > m_threshold) { - return Status::Full; - } - return Status::Ready; - } - - - /// pop() will pop up to requested_count items for the desired domain. + /// pop() will pop up to requested_count items for the desired location_id. /// If many threads are contending for the queue, this will fail with Status::Contention, /// in which case the caller should probably consult the Scheduler. Status pop(std::vector& buffer, size_t requested_count, size_t location_id = 0) { - auto& mb = m_mailboxes[location_id]; + auto& mb = m_queues[location_id]; if (!mb.mutex.try_lock()) { return Status::Congested; } @@ -170,7 +169,7 @@ class JMailbox { } auto size = mb.queue.size(); mb.mutex.unlock(); - if (size >= m_threshold) { + if (size >= m_capacity) { return Status::Full; } else if (size != 0) { @@ -183,7 +182,7 @@ class JMailbox { Status pop(T& item, bool& success, size_t location_id = 0) { success = false; - auto& mb = m_mailboxes[location_id]; + auto& mb = m_queues[location_id]; if (!mb.mutex.try_lock()) { return Status::Congested; } @@ -207,8 +206,85 @@ class JMailbox { } - size_t get_threshold() { return m_threshold; } - void set_threshold(size_t threshold) { m_threshold = threshold; } + + + bool try_push(T* buffer, size_t count, size_t location_id = 0) { + auto& mb = m_queues[location_id]; + std::lock_guard lock(mb.mutex); + if (mb.queue.size() + count > m_capacity) return false; + for (size_t i=0; i lock(mb.mutex); + assert(reserved_count <= mb.reserved_count); + assert(mb.queue.size() + count <= m_capacity); + mb.reserved_count -= reserved_count; + for (size_t i=0; i lock(mb.mutex); + + if (mb.queue.size() < min_requested_count) return 0; + + auto nitems = std::min(max_requested_count, mb.queue.size()); + + for (size_t i=0; i lock(mb.mutex); + + if (mb.queue.size() < min_requested_count) return 0; + + auto nitems = std::min(max_requested_count, mb.queue.size()); + mb.reserved_count += nitems; + + for (size_t i=0; i lock(mb.mutex); + size_t available_count = m_capacity - mb.queue.size() - mb.reserved_count; + size_t count = std::min(available_count, max_requested_count); + if (count < min_requested_count) { + return 0; + } + mb.reserved_count += count; + return count; + }; + + void unreserve(size_t reserved_count, size_t location_id) { + + LocalQueue& mb = m_queues[location_id]; + std::lock_guard lock(mb.mutex); + assert(reserved_count <= mb.reserved_count); + mb.reserved_count -= reserved_count; + }; }; diff --git a/src/libraries/JANA/Engine/JMultithreading.cc b/src/libraries/JANA/Engine/JMultithreading.cc deleted file mode 100644 index 3a80c1b6a..000000000 --- a/src/libraries/JANA/Engine/JMultithreading.cc +++ /dev/null @@ -1,2 +0,0 @@ - -#include "JMultithreading.h" diff --git a/src/libraries/JANA/Engine/JMultithreading.h b/src/libraries/JANA/Engine/JMultithreading.h deleted file mode 100644 index d882b102f..000000000 --- a/src/libraries/JANA/Engine/JMultithreading.h +++ /dev/null @@ -1,48 +0,0 @@ - - -#pragma once -#include -#include "JArrow.h" -#include "JArrowTopology.h" - -class JMultithreading { - - - std::mutex m_mutex; - - - struct ArrowState { - enum class Status { Unopened, Running, Paused, Finished }; - - JArrow* arrow_ptr; - Status status = Status::Unopened; - bool is_parallel; - int running_upstreams = 0; - int running_instances = 0; - std::vector listeners; - } - - std::vector arrow_states; - int backoff_tries; - int running_arrows; - - std::shared_ptr m_topology; - size_t m_next_idx; - - - -public: - // Previously on JArrow - // Called by Arrow subclasses - void run_arrow(JArrow* arrow); - void pause_arrow(JArrow* arrow); - void finish_arrow(JArrow* arrow); - - // - - - -}; - - -std::ostream& operator<<(std::ostream& os, const JMultithreading::Status& s) diff --git a/src/libraries/JANA/Engine/JPipelineArrow.h b/src/libraries/JANA/Engine/JPipelineArrow.h new file mode 100644 index 000000000..7c62733d8 --- /dev/null +++ b/src/libraries/JANA/Engine/JPipelineArrow.h @@ -0,0 +1,85 @@ + +// Copyright 2023, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + +#pragma once + +#include +#include +#include + +template +class JPipelineArrow : public JArrow { +private: + PlaceRef m_input {this, true, 1, 1}; + PlaceRef m_output {this, false, 1, 1}; + +public: + JPipelineArrow(std::string name, + bool is_parallel, + bool is_source, + bool is_sink, + JMailbox* input_queue, + JMailbox* output_queue, + JPool* pool + ) + : JArrow(std::move(name), is_parallel, is_source, is_sink) { + + if (input_queue == nullptr) { + assert(pool != nullptr); + m_input.set_pool(pool); + } + else { + m_input.set_queue(input_queue); + } + if (output_queue == nullptr) { + assert(pool != nullptr); + m_output.set_pool(pool); + } + else { + m_output.set_queue(output_queue); + } + } + + void execute(JArrowMetrics& result, size_t location_id) final { + + auto start_total_time = std::chrono::steady_clock::now(); + + Data in_data {location_id}; + Data out_data {location_id}; + + bool success = m_input.pull(in_data) && m_output.pull(out_data); + if (!success) { + m_input.revert(in_data); + m_output.revert(out_data); + // TODO: Test that revert works properly + + auto end_total_time = std::chrono::steady_clock::now(); + result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time); + return; + } + + bool process_succeeded = true; + JArrowMetrics::Status process_status = JArrowMetrics::Status::KeepGoing; + assert(in_data.item_count == 1); + MessageT* event = in_data.items[0]; + + auto start_processing_time = std::chrono::steady_clock::now(); + static_cast(this)->process(event, process_succeeded, process_status); + auto end_processing_time = std::chrono::steady_clock::now(); + + if (process_succeeded) { + in_data.item_count = 0; + out_data.item_count = 1; + out_data.items[0] = event; + } + m_input.push(in_data); + m_output.push(out_data); + + // Publish metrics + auto end_total_time = std::chrono::steady_clock::now(); + auto latency = (end_processing_time - start_processing_time); + auto overhead = (end_total_time - start_total_time) - latency; + result.update(process_status, process_succeeded, 1, latency, overhead); + } +}; diff --git a/src/libraries/JANA/Engine/JPool.h b/src/libraries/JANA/Engine/JPool.h new file mode 100644 index 000000000..83bd2b437 --- /dev/null +++ b/src/libraries/JANA/Engine/JPool.h @@ -0,0 +1,209 @@ +// Copyright 2023, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + +#pragma once +#include +#include +#include + + +class JPoolBase { +protected: + size_t m_pool_size; + size_t m_location_count; + bool m_limit_total_events_in_flight; +public: + JPoolBase( + size_t pool_size, + size_t location_count, + bool limit_total_events_in_flight) + : m_pool_size(pool_size) + , m_location_count(location_count) + , m_limit_total_events_in_flight(limit_total_events_in_flight) {} + + virtual ~JPoolBase() = default; +}; + +template +class JPool : public JPoolBase { +private: + struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool { + std::mutex mutex; + std::vector available_items; + std::vector items; + }; + + std::unique_ptr m_pools; + +public: + JPool(size_t pool_size, + size_t location_count, + bool limit_total_events_in_flight) : JPoolBase(pool_size, location_count, limit_total_events_in_flight) + { + assert(m_location_count >= 1); + assert(m_pool_size > 0 || !m_limit_total_events_in_flight); + } + + virtual ~JPool() = default; + + void init() { + m_pools = std::unique_ptr(new LocalPool[m_location_count]()); + + for (size_t j=0; j(m_pool_size); // Default-construct everything in place + + for (T& item : m_pools[j].items) { + configure_item(&item); + m_pools[j].available_items.push_back(&item); + } + } + } + + virtual void configure_item(T*) { + } + + virtual void release_item(T*) { + } + + + T* get(size_t location=0) { + + assert(m_pools != nullptr); // If you hit this, you forgot to call init(). + LocalPool& pool = m_pools[location % m_location_count]; + std::lock_guard lock(pool.mutex); + + if (pool.available_items.empty()) { + if (m_limit_total_events_in_flight) { + return nullptr; + } + else { + auto t = new T; + configure_item(t); + return t; + } + } + else { + T* item = pool.available_items.back(); + pool.available_items.pop_back(); + return item; + } + } + + + void put(T* item, size_t location=0) { + + assert(m_pools != nullptr); // If you hit this, you forgot to call init(). + + // Do any necessary teardown within the item itself + release_item(item); + + // Consider each location starting with current one + for (size_t l = location; l= &(pool.items[0])) && (item <= &(pool.items[m_pool_size-1]))) { + std::lock_guard lock(pool.mutex); + pool.available_items.push_back(item); + return; + } + + } + // Otherwise it was allocated on the heap + delete item; + } + + // TODO: This is wrong. Do we use this anywhere? + size_t size() { return m_pool_size; } + + // TODO: Remove me + bool get_many(std::vector& dest, size_t count, size_t location=0) { + + assert(m_pools != nullptr); // If you hit this, you forgot to call init(). + + LocalPool& pool = m_pools[location % m_location_count]; + std::lock_guard lock(pool.mutex); + + if (m_limit_total_events_in_flight && pool.available_items.size() < count) { + return false; + } + else { + while (count > 0 && !pool.available_items.empty()) { + T* t = pool.available_items.back(); + pool.available_items.pop_back(); + dest.push_back(t); + count -= 1; + } + while (count > 0) { + auto t = new T; + configure_item(t); + dest.push_back(t); + count -= 1; + } + return true; + } + } + + // TODO: Remove me + void put_many(std::vector& finished_events, size_t location=0) { + for (T* item : finished_events) { + put(item, location); + } + } + + + size_t pop(T** dest, size_t min_count, size_t max_count, size_t location=0) { + + assert(m_pools != nullptr); // If you hit this, you forgot to call init(). + + LocalPool& pool = m_pools[location % m_location_count]; + std::lock_guard lock(pool.mutex); + + size_t available_count = pool.available_items.size(); + + if (m_limit_total_events_in_flight && available_count < min_count) { + // Exit immmediately if we can't reach the minimum + return 0; + } + if (m_limit_total_events_in_flight) { + // Return as many as we can. We aren't allowed to create any more + size_t count = std::min(available_count, max_count); + for (size_t i=0; iget_type() == JArrow::NodeType::Source && + assignment->is_source() && last_result == JArrowMetrics::Status::Finished && // Only Sources get to declare themselves finished! as.status == ArrowStatus::Active; // We only want to deactivate once bool found_draining_stage_or_sink = - assignment->get_type() != JArrow::NodeType::Source && // We aren't a source + !assignment->is_source() && // We aren't a source as.active_or_draining_upstream_arrow_count == 0 && // All upstreams arrows are inactive assignment->get_pending() == 0 && // All upstream queues are empty as.thread_count > 0 && // There are other workers still assigned to this arrow @@ -95,7 +95,7 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l bool found_inactive_stage_or_sink = - assignment->get_type() != JArrow::NodeType::Source && // We aren't a source + !assignment->is_source() && // We aren't a source as.active_or_draining_upstream_arrow_count == 0 && // All upstreams arrows are inactive assignment->get_pending() == 0 && // All upstream queues are empty as.thread_count == 0 && // There are NO other workers still assigned to this arrow @@ -179,7 +179,7 @@ void JScheduler::drain_topology() { // for (size_t i=0; iget_type() == JArrow::NodeType::Source) { + if (as.arrow->is_source()) { pause_arrow_unprotected(i); } } @@ -196,14 +196,20 @@ void JScheduler::run_topology(int nthreads) { } LOG_DEBUG(logger) << "JScheduler: run_topology() : " << current_status << " => Running" << LOG_END; - if (m_topology->sources.empty()) { + bool source_found = false; + for (JArrow* arrow : m_topology->arrows) { + if (arrow->is_source()) { + source_found = true; + } + } + if (!source_found) { throw JException("No event sources found!"); } for (size_t i=0; iget_type() == JArrow::NodeType::Source) { + if (as.arrow->is_source()) { run_arrow_unprotected(i); } } @@ -381,8 +387,9 @@ void JScheduler::summarize_arrows(std::vector& summaries) { auto& summary = summaries[i]; summary.arrow_name = as.arrow->get_name(); - summary.arrow_type = as.arrow->get_type(); summary.is_parallel = as.arrow->is_parallel(); + summary.is_source = as.arrow->is_source(); + summary.is_sink = as.arrow->is_sink(); summary.chunksize = as.arrow->get_chunksize(); summary.messages_pending = as.arrow->get_pending(); summary.threshold = as.arrow->get_threshold(); diff --git a/src/libraries/JANA/Engine/JSubeventArrow.h b/src/libraries/JANA/Engine/JSubeventArrow.h index 71f003967..c54d6e589 100644 --- a/src/libraries/JANA/Engine/JSubeventArrow.h +++ b/src/libraries/JANA/Engine/JSubeventArrow.h @@ -10,7 +10,7 @@ #include "JArrow.h" #include -/// SubtaskProcessor offers sub-event-level parallelism. The idea is to split parent +/// SubeventProcessor offers sub-event-level parallelism. The idea is to split parent /// event S into independent subtasks T, and automatically bundling them with /// bookkeeping information X onto a Queue. process :: T -> U handles the stateless, /// parallel parts; its Arrow pushes messages on to a Queue, so that merge() :: S -> [U] -> V @@ -36,6 +36,21 @@ struct JSubeventProcessor { }; +template +struct SubeventWrapper { + + std::shared_ptr* parent; + SubeventT* data; + size_t id; + size_t total; + + SubeventWrapper(std::shared_ptr* parent, SubeventT* data, size_t id, size_t total) + : parent(std::move(parent)) + , data(data) + , id(id) + , total(total) {} +}; + template class JSubeventArrow : public JArrow { @@ -47,7 +62,7 @@ class JSubeventArrow : public JArrow { JSubeventProcessor* processor, JMailbox>* inbox, JMailbox>* outbox) - : JArrow(name, true, NodeType::Stage), m_processor(processor), m_inbox(inbox), m_outbox(outbox) { + : JArrow(name, true, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) { } size_t get_pending() final { return m_inbox->size(); }; @@ -60,14 +75,14 @@ class JSubeventArrow : public JArrow { template class JSplitArrow : public JArrow { JSubeventProcessor* m_processor; - JMailbox>* m_inbox; + JMailbox*>* m_inbox; JMailbox>* m_outbox; public: JSplitArrow(std::string name, JSubeventProcessor* processor, - JMailbox>* inbox, + JMailbox*>* inbox, JMailbox>* outbox) - : JArrow(name, true, NodeType::Stage), m_processor(processor), m_inbox(inbox), m_outbox(outbox) { + : JArrow(name, true, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) { } size_t get_pending() final { return m_inbox->size(); }; @@ -81,14 +96,14 @@ template class JMergeArrow : public JArrow { JSubeventProcessor* m_processor; JMailbox>* m_inbox; - JMailbox>* m_outbox; - std::map, size_t> m_in_progress; + JMailbox*>* m_outbox; + std::map*, size_t> m_in_progress; public: JMergeArrow(std::string name, JSubeventProcessor* processor, JMailbox>* inbox, - JMailbox>* outbox) - : JArrow(name, false, NodeType::Stage), m_processor(processor), m_inbox(inbox), m_outbox(outbox) { + JMailbox*>* outbox) + : JArrow(name, false, false, false), m_processor(processor), m_inbox(inbox), m_outbox(outbox) { } size_t get_pending() final { return m_inbox->size(); }; @@ -101,11 +116,11 @@ class JMergeArrow : public JArrow { template void JSplitArrow::execute(JArrowMetrics& result, size_t location_id) { - using InQueue = JMailbox>; + using InQueue = JMailbox*>; using OutQueue = JMailbox>; auto start_total_time = std::chrono::steady_clock::now(); - std::shared_ptr event = nullptr; + std::shared_ptr* event = nullptr; bool success; size_t reserved_size = m_outbox->reserve(get_chunksize()); size_t actual_size = reserved_size; @@ -118,7 +133,7 @@ void JSplitArrow::execute(JArrowMetrics& result, size_t locatio if (success) { // Construct prereqs - std::vector originals = event->Get(m_processor->inputTag); + std::vector originals = (*event)->Get(m_processor->inputTag); size_t i = 1; actual_size = originals.size(); @@ -191,7 +206,7 @@ void JSubeventArrow::execute(JArrowMetrics& result, size_t loca template void JMergeArrow::execute(JArrowMetrics& result, size_t location_id) { using InQueue = JMailbox>; - using OutQueue = JMailbox>; + using OutQueue = JMailbox*>; auto start_total_time = std::chrono::steady_clock::now(); @@ -201,14 +216,16 @@ void JMergeArrow::execute(JArrowMetrics& result, size_t locatio auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id); auto start_latency_time = std::chrono::steady_clock::now(); - std::vector> outputs; + std::vector*> outputs; for (const auto& input : inputs) { + LOG_TRACE(m_logger) << "JMergeArrow: Processing input with parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << ", sub=" << input.id << " and total=" << input.total << LOG_END; // Problem: Are we sure we are updating the event in a way which is effectively thread-safe? // Should we be doing this insert, or should the caller? - input.parent->template Insert(input.data); + (*(input.parent))->template Insert(input.data); if (input.total == 1) { // Goes straight into "ready" outputs.push_back(input.parent); + LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END; } else { auto pair = m_in_progress.find(input.parent); @@ -222,6 +239,7 @@ void JMergeArrow::execute(JArrowMetrics& result, size_t locatio else if (pair->second == 1) { pair->second -= 1; outputs.push_back(input.parent); + LOG_TRACE(m_logger) << "JMergeArrow: Finished parent=" << input.parent << ", evt=" << (*(input.parent))->GetEventNumber() << LOG_END; } else { pair->second -= 1; @@ -229,18 +247,16 @@ void JMergeArrow::execute(JArrowMetrics& result, size_t locatio } } } + LOG_DEBUG(m_logger) << "MergeArrow consumed " << inputs.size() << " subevents, produced " << outputs.size() << " events" << LOG_END; auto end_latency_time = std::chrono::steady_clock::now(); - auto out_status = OutQueue::Status::Ready; - size_t outputs_size = outputs.size(); - if (outputs_size > 0) { - assert(m_outbox != nullptr); - out_status = m_outbox->push(outputs, downstream_accepts, location_id); - } + auto outputs_size = outputs.size(); + auto out_status = m_outbox->push(outputs, downstream_accepts, location_id); + auto end_queue_time = std::chrono::steady_clock::now(); JArrowMetrics::Status status; - if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready) { + if (in_status == InQueue::Status::Ready && out_status == OutQueue::Status::Ready && inputs.size() > 0) { status = JArrowMetrics::Status::KeepGoing; } else { diff --git a/src/libraries/JANA/Engine/JSubeventMailbox.h b/src/libraries/JANA/Engine/JSubeventMailbox.h deleted file mode 100644 index fe9a2f6ec..000000000 --- a/src/libraries/JANA/Engine/JSubeventMailbox.h +++ /dev/null @@ -1,226 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_JSUBEVENTMAILBOX_H -#define JANA2_JSUBEVENTMAILBOX_H - -#include -#include -#include -#include -#include - -template -struct SubeventWrapper { - - std::shared_ptr parent; - SubeventT* data; - size_t id; - size_t total; - - SubeventWrapper(std::shared_ptr parent, SubeventT* data, size_t id, size_t total) - : parent(std::move(parent)) - , data(data) - , id(id) - , total(total) {} -}; - - -template -class JSubeventMailbox { - -private: - - struct alignas(JANA2_CACHE_LINE_BYTES) LocalMailbox { - std::mutex mutex; - std::deque> ready; - std::map, size_t> in_progress; - size_t reserved_count = 0; - }; - - size_t m_threshold; - size_t m_locations_count; - bool m_enable_work_stealing = false; - std::unique_ptr m_mailboxes; - JLogger m_logger; - -public: - - enum class Status {Ready, Congested, Empty, Full, Finished}; - - friend std::ostream& operator<<(std::ostream& os, const Status& s) { - switch (s) { - case Status::Ready: os << "Ready"; break; - case Status::Congested: os << "Congested"; break; - case Status::Empty: os << "Empty"; break; - case Status::Full: os << "Full"; break; - case Status::Finished: os << "Finished"; break; - default: os << "Unknown"; break; - } - return os; - } - - - /// threshold: the (soft) maximum number of items in the queue at any time - /// domain_count: the number of domains - /// enable_work_stealing: allow domains to pop from other domains' queues when theirs is empty - JSubeventMailbox(size_t threshold=100, size_t locations_count=1, bool enable_work_stealing=false) - : m_threshold(threshold) - , m_locations_count(locations_count) - , m_enable_work_stealing(enable_work_stealing) { - - m_mailboxes = std::unique_ptr(new LocalMailbox[locations_count]); - } - - virtual ~JSubeventMailbox() { - //delete [] m_mailboxes; - } - - /// size() counts the number of items in the queue across all domains - /// This should be used sparingly because it will mess up a bunch of caches. - /// Meant to be used by measure_perf() - size_t size() { - size_t result = 0; - for (size_t i = 0; i lock(m_mailboxes[i].mutex); - result += m_mailboxes[i].queue.size(); - } - return result; - }; - - /// size(domain) counts the number of items in the queue for a particular domain - /// Meant to be used by Scheduler::next_assignment() and measure_perf(), eventually - size_t size(size_t domain) { - return m_mailboxes[domain].queue.size(); - } - - /// reserve(requested_count) keeps our queues bounded in size. The caller should - /// reserve their desired chunk size on the output queue first. The output - /// queue will return a reservation which is less than or equal to requested_count. - /// The caller may then request as many items from the input queue as have been - /// reserved on the output queue. Note that because the input queue may return - /// fewer items than requested, the caller must push their original reserved_count - /// alongside the items, to avoid a "reservation leak". - size_t reserve(size_t requested_count, size_t domain = 0) { - - LocalMailbox& mb = m_mailboxes[domain]; - std::lock_guard lock(mb.mutex); - size_t doable_count = m_threshold - mb.queue.size() - mb.reserved_count; - if (doable_count > 0) { - size_t reservation = std::min(doable_count, requested_count); - mb.reserved_count += reservation; - return reservation; - } - return 0; - }; - - /// push(items, reserved_count, domain) This function will always - /// succeed, although it may exceed the threshold if the caller didn't reserve - /// space, and it may take a long time because it will wait on a mutex. - /// Note that if the caller had called reserve(), they must pass in the reserved_count here. - Status push(std::vector>& buffer, size_t reserved_count = 0, size_t domain = 0) { - - auto& mb = m_mailboxes[domain]; - std::lock_guard lock(mb.mutex); - mb.reserved_count -= reserved_count; - for (const auto& subevent : buffer) { - - // Problem: Are we sure we are updating the event in a way which is effectively thread-safe? - // Should we be doing this insert, or should the caller? - subevent.parent->template Insert(subevent.data); - if (subevent.total == 1) { - // Goes straight into "ready" - mb.ready.push_back(subevent.parent); - } - else { - auto pair = mb.in_progress.find(subevent.parent); - if (pair == mb.in_progress.end()) { - mb.in_progress[subevent.parent] = subevent.total-1; - } - else { - if (pair->second == 1) { - mb.ready.push_back(subevent.parent); - } - else { - pair->second -= 1; - } - } - } - } - buffer.clear(); - // if (mb.in_progress.size() > m_threshold) { - // return Status::Full; - // } - return Status::Ready; - } - - - /// pop() will pop up to requested_count items for the desired domain. - /// If many threads are contending for the queue, this will fail with Status::Contention, - /// in which case the caller should probably consult the Scheduler. - Status pop(std::vector>& buffer, size_t requested_count, size_t location_id = 0) { - - auto& mb = m_mailboxes[location_id]; - if (!mb.mutex.try_lock()) { - return Status::Congested; - } - auto nitems = std::min(requested_count, mb.ready.size()); - buffer.reserve(nitems); - for (size_t i=0; i= m_threshold) { - return Status::Full; - } - else if (size != 0) { - return Status::Ready; - } - return Status::Empty; - } - -#if 0 - Status pop(std::shared_ptr item, bool& success, size_t location_id = 0) { - - success = false; - auto& mb = m_mailboxes[location_id]; - if (!mb.mutex.try_lock()) { - return Status::Congested; - } - size_t nitems = mb.ready.size(); - if (nitems > 1) { - item = std::move(mb.ready.front()); - mb.ready.pop_front(); - success = true; - mb.mutex.unlock(); - return Status::Ready; - } - else if (nitems == 1) { - item = std::move(mb.ready.front()); - mb.ready.pop_front(); - success = true; - mb.mutex.unlock(); - return Status::Empty; - } - else if (is_active()) { - mb.mutex.unlock(); - return Status::Empty; - } - mb.mutex.unlock(); - return Status::Finished; - } - -#endif - - size_t get_threshold() { return m_threshold; } - void set_threshold(size_t threshold) { m_threshold = threshold; } - -}; - - - -#endif //JANA2_JSUBEVENTMAILBOX_H diff --git a/src/libraries/JANA/Engine/JTopologyBuilder.h b/src/libraries/JANA/Engine/JTopologyBuilder.h index 512402828..cd1a0783a 100644 --- a/src/libraries/JANA/Engine/JTopologyBuilder.h +++ b/src/libraries/JANA/Engine/JTopologyBuilder.h @@ -124,6 +124,7 @@ class JTopologyBuilder : public JService { m_event_pool_size, m_location_count, m_limit_total_events_in_flight); + m_topology->event_pool->init(); return m_topology; } @@ -161,9 +162,7 @@ class JTopologyBuilder : public JService { // Create arrow for sources. JArrow *arrow = new JEventSourceArrow("sources", m_components->get_evt_srces(), queue, m_topology->event_pool); - arrow->set_backoff_tries(0); m_topology->arrows.push_back(arrow); - m_topology->sources.push_back(arrow); arrow->set_chunksize(m_event_source_chunksize); arrow->set_logger(m_arrow_logger); @@ -176,10 +175,7 @@ class JTopologyBuilder : public JService { for (auto proc: m_components->get_evt_procs()) { proc_arrow->add_processor(proc); } - for (auto src_arrow : m_topology->sources) { - src_arrow->attach(proc_arrow); - } - m_topology->sinks.push_back(proc_arrow); + arrow->attach(proc_arrow); return m_topology; } diff --git a/src/libraries/JANA/Engine/JWorker.cc b/src/libraries/JANA/Engine/JWorker.cc index e7f30be69..c24afd2b6 100644 --- a/src/libraries/JANA/Engine/JWorker.cc +++ b/src/libraries/JANA/Engine/JWorker.cc @@ -203,18 +203,13 @@ void JWorker::loop() { } else { - auto initial_backoff_time = m_assignment->get_initial_backoff_time(); - auto backoff_strategy = m_assignment->get_backoff_strategy(); - auto backoff_tries = m_assignment->get_backoff_tries(); - auto checkin_time = m_assignment->get_checkin_time(); - uint32_t current_tries = 0; - auto backoff_duration = initial_backoff_time; + auto backoff_duration = m_initial_backoff_time; - while (current_tries <= backoff_tries && + while (current_tries <= m_backoff_tries && (last_result == JArrowMetrics::Status::KeepGoing || last_result == JArrowMetrics::Status::ComeBackLater || last_result == JArrowMetrics::Status::NotRunYet) && (m_run_state == RunState::Running) && - (jclock_t::now() - start_time) < checkin_time) { + (jclock_t::now() - start_time) < m_checkin_time) { LOG_TRACE(logger) << "Worker " << m_worker_id << " is executing " << m_assignment->get_name() << LOG_END; @@ -228,15 +223,15 @@ void JWorker::loop() { LOG_DEBUG(logger) << "Worker " << m_worker_id << " succeeded at " << m_assignment->get_name() << LOG_END; current_tries = 0; - backoff_duration = initial_backoff_time; + backoff_duration = m_initial_backoff_time; } else { current_tries++; - if (backoff_tries > 0) { - if (backoff_strategy == JArrow::BackoffStrategy::Linear) { - backoff_duration += initial_backoff_time; + if (m_backoff_tries > 0) { + if (m_backoff_strategy == BackoffStrategy::Linear) { + backoff_duration += m_initial_backoff_time; } - else if (backoff_strategy == JArrow::BackoffStrategy::Exponential) { + else if (m_backoff_strategy == BackoffStrategy::Exponential) { backoff_duration *= 2; } LOG_TRACE(logger) << "Worker " << m_worker_id << " backing off with " diff --git a/src/libraries/JANA/Engine/JWorker.h b/src/libraries/JANA/Engine/JWorker.h index 70baa7b90..afc230548 100644 --- a/src/libraries/JANA/Engine/JWorker.h +++ b/src/libraries/JANA/Engine/JWorker.h @@ -22,9 +22,12 @@ class JWorker { /// that the Worker's internal state won't be updated by another thread. public: - /// The Worker may be configured to try different backoff strategies enum class RunState { Running, Stopping, Stopped, TimedOut, Excepted }; + enum class BackoffStrategy { Constant, Linear, Exponential }; + + using duration_t = std::chrono::steady_clock::duration; + /// The logger is made public so that somebody else may set it JLogger logger; @@ -45,6 +48,11 @@ class JWorker { std::mutex m_assignment_mutex; JException m_exception; + BackoffStrategy m_backoff_strategy = BackoffStrategy::Exponential; + duration_t m_initial_backoff_time = std::chrono::microseconds(1); + duration_t m_checkin_time = std::chrono::milliseconds(500); + unsigned m_backoff_tries = 4; + public: JWorker(JArrowProcessingController* japc, JScheduler* scheduler, unsigned worker_id, unsigned cpu_id, unsigned domain_id, bool pin_to_cpu); ~JWorker(); @@ -71,6 +79,22 @@ class JWorker { /// JProcessingController::measure_perf() void measure_perf(WorkerSummary& result); + inline void set_backoff_tries(unsigned backoff_tries) { m_backoff_tries = backoff_tries; } + + inline unsigned get_backoff_tries() const { return m_backoff_tries; } + + inline BackoffStrategy get_backoff_strategy() const { return m_backoff_strategy; } + + inline void set_backoff_strategy(BackoffStrategy backoff_strategy) { m_backoff_strategy = backoff_strategy; } + + inline duration_t get_initial_backoff_time() const { return m_initial_backoff_time; } + + inline void set_initial_backoff_time(duration_t initial_backoff_time) { m_initial_backoff_time = initial_backoff_time; } + + inline duration_t get_checkin_time() const { return m_checkin_time; } + + inline void set_checkin_time(duration_t checkin_time) { m_checkin_time = checkin_time; } + }; diff --git a/src/libraries/JANA/JBlockedEventSource.h b/src/libraries/JANA/JBlockedEventSource.h index 8d788dcd0..7134bae74 100644 --- a/src/libraries/JANA/JBlockedEventSource.h +++ b/src/libraries/JANA/JBlockedEventSource.h @@ -18,7 +18,7 @@ class JBlockedEventSource { virtual Status NextBlock(BlockType& block) = 0; - virtual std::vector> DisentangleBlock(BlockType& block, JEventPool& pool) = 0; + virtual std::vector*> DisentangleBlock(BlockType& block, JEventPool& pool) = 0; }; diff --git a/src/libraries/JANA/Utils/JEventPool.h b/src/libraries/JANA/Utils/JEventPool.h index 890d2968c..c78cc5cc0 100644 --- a/src/libraries/JANA/Utils/JEventPool.h +++ b/src/libraries/JANA/Utils/JEventPool.h @@ -7,128 +7,34 @@ #define JANA2_JEVENTPOOL_H #include -#include -#include #include -class JEventPool { -private: +#include - struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool { - std::mutex mutex; - std::vector> events; - }; + +class JEventPool : public JPool> { std::shared_ptr m_component_manager; - size_t m_pool_size; - size_t m_location_count; - bool m_limit_total_events_in_flight; - std::unique_ptr m_pools; public: inline JEventPool(std::shared_ptr component_manager, size_t pool_size, size_t location_count, bool limit_total_events_in_flight) - : m_component_manager(component_manager) - , m_pool_size(pool_size) - , m_location_count(location_count) - , m_limit_total_events_in_flight(limit_total_events_in_flight) - { - assert(m_location_count >= 1); - assert(m_pool_size > 0 || !m_limit_total_events_in_flight); - m_pools = std::unique_ptr(new LocalPool[location_count]()); - - for (size_t j=0; j(); - m_component_manager->configure_event(*event); - put(event, j); - } - } - } - - inline std::shared_ptr get(size_t location=0) { - - LocalPool& pool = m_pools[location % m_location_count]; - std::lock_guard lock(pool.mutex); - - if (pool.events.empty()) { - if (m_limit_total_events_in_flight) { - return nullptr; - } - else { - auto event = std::make_shared(); - m_component_manager->configure_event(*event); - return event; - } - } - else { - auto event = std::move(pool.events.back()); - pool.events.pop_back(); - event->mFactorySet->Release(); - event->mInspector.Reset(); - event->GetJCallGraphRecorder()->Reset(); - return event; - } - } - - - inline void put(std::shared_ptr& event, size_t location=0) { - - LocalPool& pool = m_pools[location % m_location_count]; - std::lock_guard lock(pool.mutex); - - if (pool.events.size() < m_pool_size) { - pool.events.push_back(std::move(event)); - } + : JPool(pool_size, location_count, limit_total_events_in_flight) + , m_component_manager(component_manager) { } - inline size_t size() { return m_pool_size; } - + void configure_item(std::shared_ptr* item) override { + (*item) = std::make_shared(); + m_component_manager->configure_event(**item); - inline bool get_many(std::vector>& dest, size_t count, size_t location=0) { - std::vector> results; - - LocalPool& pool = m_pools[location % m_location_count]; - std::lock_guard lock(pool.mutex); - // TODO: We probably want to steal from other event pools if jana:enable_stealing=true - - if (m_limit_total_events_in_flight && pool.events.size() < count) { - return false; - } - else { - while (count > 0 && !pool.events.empty()) { - auto event = std::move(pool.events.back()); - pool.events.pop_back(); - event->mFactorySet->Release(); - event->mInspector.Reset(); - event->GetJCallGraphRecorder()->Reset(); - dest.push_back(event); - count -= 1; - } - while (count > 0) { - auto event = std::make_shared(); - m_component_manager->configure_event(*event); - dest.push_back(event); - count -= 1; - } - return true; - } } - inline void put_many(std::vector>& finished_events, size_t location=0) { - - LocalPool& pool = m_pools[location % m_location_count]; - std::lock_guard lock(pool.mutex); - // TODO: We may want to distribute to other event pools if jana:enable_stealing=true - - size_t count = finished_events.size(); - while (count-- > 0 && pool.events.size() < m_pool_size) { - pool.events.push_back(std::move(finished_events.back())); - finished_events.pop_back(); - } - // Items that are added back to the pool get removed from finished_events. - // Remaining items go out of scope on their own + void release_item(std::shared_ptr* item) override { + if (auto source = (*item)->GetJEventSource()) source->DoFinish(**item); + (*item)->mFactorySet->Release(); + (*item)->mInspector.Reset(); + (*item)->GetJCallGraphRecorder()->Reset(); } }; diff --git a/src/programs/unit_tests/ArrowActivationTests.cc b/src/programs/unit_tests/ArrowActivationTests.cc index 70a6aa584..5fbb0073b 100644 --- a/src/programs/unit_tests/ArrowActivationTests.cc +++ b/src/programs/unit_tests/ArrowActivationTests.cc @@ -18,40 +18,43 @@ JArrowMetrics::Status steppe(JArrow* arrow) { TEST_CASE("ArrowActivationTests") { - RandIntSource source; - MultByTwoProcessor p1; - SubOneProcessor p2; - SumSink sink; + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - auto topology = std::make_shared(); + auto p1 = new JPool(0,1,false); + auto p2 = new JPool(0,1,false); + p1->init(); + p2->init(); + + MultByTwoProcessor processor; - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); + auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); + auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); + auto sum_everything = new SumSink("sum_everything", q3, p2); - auto emit_rand_ints = new SourceArrow("emit_rand_ints", source, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", p1, q1, q2); - auto subtract_one = new MapArrow("subtract_one", p2, q2, q3); - auto sum_everything = new SinkArrow("sum_everything", sink, q3); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); subtract_one->attach(sum_everything); - topology->sources.push_back(emit_rand_ints); - topology->arrows.push_back(emit_rand_ints); topology->arrows.push_back(multiply_by_two); topology->arrows.push_back(subtract_one); topology->arrows.push_back(sum_everything); - topology->sinks.push_back(sum_everything); - - emit_rand_ints->set_chunksize(1); - JScheduler scheduler(topology); auto logger = JLogger(JLogger::Level::OFF); topology->m_logger = logger; - source.logger = logger; + emit_rand_ints->set_logger(logger); + multiply_by_two->set_logger(logger); + subtract_one->set_logger(logger); + sum_everything->set_logger(logger); + emit_rand_ints->set_chunksize(1); + + JScheduler scheduler(topology); + scheduler.logger = logger; SECTION("At first, everything is deactivated and all queues are empty") { @@ -92,72 +95,38 @@ TEST_CASE("ArrowActivationTests") { REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); } -} // TEST_CASE - - -TEST_CASE("ActivableDeactivationTests") { - - RandIntSource source; - source.emit_limit = 1; - - MultByTwoProcessor p1; - SubOneProcessor p2; - SumSink sink; - - auto topology = std::make_shared(); - - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); - - auto emit_rand_ints = new SourceArrow("emit_rand_ints", source, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", p1, q1, q2); - auto subtract_one = new MapArrow("subtract_one", p2, q2, q3); - auto sum_everything = new SinkArrow("sum_everything", sink, q3); - - emit_rand_ints->attach(multiply_by_two); - multiply_by_two->attach(subtract_one); - subtract_one->attach(sum_everything); + SECTION("Deactivation") { - topology->sources.push_back(emit_rand_ints); - topology->arrows.push_back(emit_rand_ints); - topology->arrows.push_back(multiply_by_two); - topology->arrows.push_back(subtract_one); - topology->arrows.push_back(sum_everything); - topology->sinks.push_back(sum_everything); + emit_rand_ints->emit_limit = 1; - auto logger = JLogger(JLogger::Level::OFF); - topology->m_logger = logger; - source.logger = logger; + JScheduler::TopologyState state = scheduler.get_topology_state(); - JScheduler scheduler(topology); - scheduler.logger = logger; - JScheduler::TopologyState state = scheduler.get_topology_state(); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Uninitialized); - REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Uninitialized); + scheduler.run_topology(1); + state = scheduler.get_topology_state(); - scheduler.run_topology(1); - state = scheduler.get_topology_state(); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); + auto result = steppe(emit_rand_ints); + REQUIRE(result == JArrowMetrics::Status::Finished); - auto result = steppe(emit_rand_ints); + scheduler.next_assignment(0, emit_rand_ints, result); + state = scheduler.get_topology_state(); - scheduler.next_assignment(0, emit_rand_ints, result); - state = scheduler.get_topology_state(); + REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); + REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); + REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); - REQUIRE(state.arrow_states[1].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[2].status == JScheduler::ArrowStatus::Active); - REQUIRE(state.arrow_states[3].status == JScheduler::ArrowStatus::Active); + } - // TODO: Test that finalize was called exactly once } // TEST_CASE diff --git a/src/programs/unit_tests/ArrowFixtures.h b/src/programs/unit_tests/ArrowFixtures.h deleted file mode 100644 index 812260462..000000000 --- a/src/programs/unit_tests/ArrowFixtures.h +++ /dev/null @@ -1,69 +0,0 @@ -#pragma once - -#include - -#include -#include - -using namespace jana::arrowengine; - -struct RandIntSource : public SourceOp { - - size_t emit_limit = 20; // How many to emit - size_t emit_count = 0; // How many emitted so far - int emit_sum = 0; // Sum of all ints emitted so far - JLogger logger; - - std::pair next() override { - if (emit_count > emit_limit) { - return {Status::FailFinished, 0}; - } - else { - //std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - int x = 7; - emit_count += 1; - emit_sum += x; - return {Status::Success, x}; - } - } -}; - - -struct MultByTwoProcessor : public MapOp { - - double map(int x) override { - return x * 2.0; - } -}; - - -struct SubOneProcessor : public MapOp { - - double map(double x) override { - return x - 1; - } -}; - - -template -struct SumSink : public SinkOp { - - T sum = 0; - JLogger logger; - - void accumulate(T d) override { - sum += d; - LOG_DEBUG(logger) << "SumSink.accumulate() called!" << LOG_END; - } -}; - - -/* - using namespace jana::arrowengine; - - SourceArrow source("emit_rand_ints", false); - StageArrow p1 ("multiply_by_two", true); - StageArrow p2 ("subtract_one", true); - SinkArrow> sink ("sum_everything", false); - - */ \ No newline at end of file diff --git a/src/programs/unit_tests/ArrowTests.cc b/src/programs/unit_tests/ArrowTests.cc new file mode 100644 index 000000000..312008267 --- /dev/null +++ b/src/programs/unit_tests/ArrowTests.cc @@ -0,0 +1,95 @@ + +#include +#include + +namespace jana { +namespace arrowtests { + + +struct TestMapArrow : public JJunctionArrow { + + TestMapArrow(JMailbox* qi, + JPool* pi, + JPool* pd, + JMailbox* qd) + : JJunctionArrow("testmaparrow", false, false, true) { + + first_input = {this, qi, true, 1, 1}; + first_output = {this, pi, false, 1, 1}; + second_input = {this, pd, true, 1, 1}; + second_output = {this, qd, false, 1, 1}; + } + + Status process(Data& input_int, + Data& output_int, + Data& input_double, + Data& output_double) { + std::cout << "Hello from process" << std::endl; + + REQUIRE(input_int.item_count == 1); + REQUIRE(input_int.reserve_count == 1); + REQUIRE(output_int.item_count == 0); + REQUIRE(output_int.reserve_count == 0); + REQUIRE(input_double.item_count == 1); + REQUIRE(input_double.reserve_count == 0); + REQUIRE(output_double.item_count == 0); + REQUIRE(output_double.reserve_count == 1); + + int* x = input_int.items[0]; + input_int.items[0] = nullptr; + input_int.item_count = 0; + + // TODO: Maybe user shouldn't be allowed to modify reserve_count at all + // TODO Maybe user should only be allowed to push and pull from ... range...? + + double* y = input_double.items[0]; + input_double.items[0] = nullptr; + input_double.item_count = 0; + + // Do something useful here + *y = *x + 22.2; + + output_int.items[0] = x; + output_int.item_count = 1; + + output_double.items[0] = y; + output_double.item_count = 1; + return Status::KeepGoing; + } + +}; + + +TEST_CASE("ArrowTests_Basic") { + + JMailbox qi {2, 1, false}; + JPool pi {5, 1, true}; + JPool pd {5, 1, true}; + JMailbox qd {2, 1, false}; + + pi.init(); + pd.init(); + + TestMapArrow a {&qi, &pi, &pd, &qd}; + + int* x; + pi.pop(&x, 1, 1, 0); + *x = 100; + + qi.push_and_unreserve(&x, 1, 0, 0); + JArrowMetrics m; + a.execute(m, 0); + + double* y; + qd.pop_and_reserve(&y, 1, 1, 0); + REQUIRE(*y == 122.2); + +} + + +} // namespace arrowtests +} // namespace jana + + + + diff --git a/src/programs/unit_tests/CMakeLists.txt b/src/programs/unit_tests/CMakeLists.txt index a8749d7a2..2d8b8114b 100644 --- a/src/programs/unit_tests/CMakeLists.txt +++ b/src/programs/unit_tests/CMakeLists.txt @@ -10,8 +10,6 @@ set(TEST_SOURCES SchedulerTests.cc JEventTests.cc JEventTests.h - PerformanceTests.cc - PerformanceTests.h ExactlyOnceTests.cc ExactlyOnceTests.h TerminationTests.cc @@ -37,6 +35,8 @@ set(TEST_SOURCES JAutoactivableTests.cc JTablePrinterTests.cc JMultiFactoryTests.cc + JPoolTests.cc + ArrowTests.cc ) if (${USE_PODIO}) diff --git a/src/programs/unit_tests/JPoolTests.cc b/src/programs/unit_tests/JPoolTests.cc new file mode 100644 index 000000000..c1f3cb16c --- /dev/null +++ b/src/programs/unit_tests/JPoolTests.cc @@ -0,0 +1,95 @@ + +#include +#include + +namespace jana { +namespace jpooltests { + +struct Event { + int x=3; + double y=7.8; + bool* was_dtor_called = nullptr; + + ~Event() { + if (was_dtor_called != nullptr) { + *was_dtor_called = true; + } + } +}; + +TEST_CASE("JPoolTests_SingleLocationLimitEvents") { + + JPool pool(3, 1, true); + pool.init(); + + Event* e = pool.get(0); + REQUIRE(e != nullptr); + REQUIRE(e->x == 3); + + Event* f = pool.get(0); + REQUIRE(f != nullptr); + REQUIRE(f->x == 3); + + Event* g = pool.get(0); + REQUIRE(g != nullptr); + REQUIRE(g->x == 3); + + Event* h = pool.get(0); + REQUIRE(h == nullptr); + + f->x = 5; + pool.put(f, 0); + + h = pool.get(0); + REQUIRE(h != nullptr); + REQUIRE(h->x == 5); +} + +TEST_CASE("JPoolTests_SingleLocationUnlimitedEvents") { + + bool was_dtor_called = false; + JPool pool(3, 1, false); + pool.init(); + + Event* e = pool.get(0); + e->was_dtor_called = &was_dtor_called; + REQUIRE(e != nullptr); + REQUIRE(e->x == 3); + + Event* f = pool.get(0); + f->was_dtor_called = &was_dtor_called; + REQUIRE(f != nullptr); + REQUIRE(f->x == 3); + + Event* g = pool.get(0); + g->was_dtor_called = &was_dtor_called; + REQUIRE(g != nullptr); + REQUIRE(g->x == 3); + + Event* h = pool.get(0); + // Unlike the others, h is allocated on the heap + h->x = 9; + h->was_dtor_called = &was_dtor_called; + REQUIRE(h != nullptr); + REQUIRE(g->x == 3); + + f->x = 5; + pool.put(f, 0); + // f goes back into the pool, so dtor does not get called + REQUIRE(was_dtor_called == false); + + pool.put(h, 0); + // h's dtor DOES get called + REQUIRE(was_dtor_called == true); + + // When we retrieve another event, it comes from the pool + // So we get what used to be f + Event* i = pool.get(0); + REQUIRE(i != nullptr); + REQUIRE(i->x == 5); +} + + + +} // namespace jana +} // namespace jpooltests diff --git a/src/programs/unit_tests/MapArrow.h b/src/programs/unit_tests/MapArrow.h index 068d0064c..d85f2992e 100644 --- a/src/programs/unit_tests/MapArrow.h +++ b/src/programs/unit_tests/MapArrow.h @@ -32,7 +32,7 @@ class MapArrow : public JArrow { public: MapArrow(std::string name, ParallelProcessor& processor, JMailbox *input_queue, JMailbox *output_queue) - : JArrow(name, true, NodeType::Stage) + : JArrow(name, true, false, false) , _processor(processor) , _input_queue(input_queue) , _output_queue(output_queue) { diff --git a/src/programs/unit_tests/PerformanceTests.cc b/src/programs/unit_tests/PerformanceTests.cc deleted file mode 100644 index 42fd4dc27..000000000 --- a/src/programs/unit_tests/PerformanceTests.cc +++ /dev/null @@ -1,97 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#include "catch.hpp" - -#include -#include -#include "PerformanceTests.h" - -TEST_CASE("MemoryBottleneckTest", "[.][performance]") { - - std::cout << "Running performance test" << std::endl; - auto serviceLocator = new JServiceLocator(); - - auto loggingService = std::make_shared(); - serviceLocator->provide(loggingService); - - loggingService->set_level("JThreadTeam", JLogger::Level::OFF); - loggingService->set_level("JScheduler", JLogger::Level::OFF); - loggingService->set_level("JTopology", JLogger::Level::INFO); - - auto params = std::make_shared(); - serviceLocator->provide(params); - - params->chunksize = 10; - params->backoff_tries = 4; - params->backoff_time = std::chrono::microseconds(50); - params->checkin_time = std::chrono::milliseconds(400); - - PerfTestSource parse; - PerfTestMapper disentangle; - PerfTestMapper track; - PerfTestReducer plot; - - parse.message_count_limit = 5000; - parse.latency_ms = 10; - disentangle.latency_ms = 10; - track.latency_ms = 10; - plot.latency_ms = 10; - -// 5Hz for full reconstruction -// 20kHz for stripped-down reconstruction (not per-core) -// Base memory allcation: 100 MB/core + 600MB -// 1 thread/event, disentangle 1 event, turn into 40. -// disentangled (single event size) : 12.5 kB / event (before blown up) -// entangled "block of 40": dis * 40 -// - - auto topology = std::make_shared(); - - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); - - auto parse_arrow = new SourceArrow("parse", parse, q1); - auto disentangle_arrow = new MapArrow("disentangle", disentangle, q1, q2); - auto track_arrow = new MapArrow("track", track, q2, q3); - auto plot_arrow = new SinkArrow("plot", plot, q3); - - parse_arrow->attach(disentangle_arrow); - disentangle_arrow->attach(track_arrow); - track_arrow->attach(plot_arrow); - - parse_arrow->set_chunksize(1); - - topology->sources.push_back(parse_arrow); - topology->sinks.push_back(plot_arrow); - topology->arrows.push_back(parse_arrow); - topology->arrows.push_back(disentangle_arrow); - topology->arrows.push_back(track_arrow); - topology->arrows.push_back(plot_arrow); - - JArrowProcessingController controller(topology); - controller.initialize(); - - controller.run(6); // for whatever mysterious reason we need to pre-warm our thread team - for (int nthreads=1; nthreads<6; nthreads++) { - controller.scale(nthreads); - for (int secs=0; secs<10; secs++) { - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - } - controller.request_pause(); - controller.wait_until_paused(); - auto result = controller.measure_internal_performance(); - std::cout << nthreads << ": " << result->avg_throughput_hz << " Hz" << std::endl; - - } - controller.scale(1); - controller.request_stop(); - controller.wait_until_stopped(); - controller.print_final_report(); - auto perf = controller.measure_internal_performance(); - // REQUIRE(perf->total_events_completed == perf->arrows[0].total_messages_completed); -} - - diff --git a/src/programs/unit_tests/PerformanceTests.h b/src/programs/unit_tests/PerformanceTests.h deleted file mode 100644 index dbc632325..000000000 --- a/src/programs/unit_tests/PerformanceTests.h +++ /dev/null @@ -1,99 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#ifndef JANA2_PERFORMANCETESTS_H -#define JANA2_PERFORMANCETESTS_H - -#include -#include -#include -#include - - -struct Event { - long event_index; - std::map> data; - long emit_sum; - long reduce_sum; -}; - -struct PerfTestSource : public Source { - std::string write_key; - uint64_t latency_ms = 100; - double latency_spread = 0; - size_t write_bytes = 100; - double write_spread = 0; - long next_event_index = 0; - long sum_over_all_events = 0; - long message_count = 0; - long message_count_limit = -1; // Only used when > 0 - - void initialize() override {} - void finalize() override {} - - Status inprocess(std::vector& ts, size_t count) override { - - for (size_t i=0; iemit_sum = write_memory(e->data[write_key], write_bytes, write_spread); - sum_over_all_events += e->emit_sum; - e->event_index = next_event_index++; - ts.push_back(e); - message_count++; - } - if (message_count_limit > 0 && message_count >= message_count_limit) { - return Status::Finished; - } - return Status::KeepGoing; - } -}; - -struct PerfTestMapper : public ParallelProcessor { - std::string read_key = "disentangled"; - std::string write_key = "processed"; - uint64_t latency_ms = 100; - double latency_spread = 0; - size_t write_bytes = 100; - double write_spread = 0; - - virtual Event* process(Event* event) { - consume_cpu_ms(latency_ms, latency_spread); - long sum = read_memory(event->data[read_key]); - sum++; // Suppress compiler warning - - write_memory(event->data[write_key], write_bytes, write_spread); - return event; - }; - - -}; - -struct PerfTestReducer : public Sink { - std::string read_key = "processed"; - uint64_t latency_ms = 100; - double latency_spread = 0; - double sum_over_all_events = 0; - - void initialize() override {} - void finalize() override {} - void outprocess(Event* event) override { - consume_cpu_ms(latency_ms, latency_spread); - event->reduce_sum = read_memory(event->data[read_key]); - sum_over_all_events += event->reduce_sum; - delete event; // Don't do this in the general case - } -}; - -/// To be replaced with the real JParameterManager when the time is right -struct FakeParameterManager : public JService { - using duration_t = std::chrono::steady_clock::duration; - - int chunksize; - int backoff_tries; - duration_t backoff_time; - duration_t checkin_time; -}; - -#endif //JANA2_PERFORMANCETESTS_H diff --git a/src/programs/unit_tests/QueueTests.cc b/src/programs/unit_tests/QueueTests.cc index 641ca65ca..3ba23d627 100644 --- a/src/programs/unit_tests/QueueTests.cc +++ b/src/programs/unit_tests/QueueTests.cc @@ -7,29 +7,33 @@ #include "catch.hpp" -TEST_CASE("Queue: Basic functionality") { - JMailbox q; +TEST_CASE("QueueTests_Basic") { + + JMailbox q; REQUIRE(q.size() == 0); - int item = 22; - q.push(item, 0); + int* item = new int {22}; + bool result = q.try_push(&item, 1, 0); REQUIRE(q.size() == 1); + REQUIRE(result == true); - std::vector items; - auto result = q.pop(items, 22); - REQUIRE(items.size() == 1); + int* items[10]; + auto count = q.pop(items, 1, 10, 0); + REQUIRE(count == 1); REQUIRE(q.size() == 0); - REQUIRE(result == JMailbox::Status::Empty); + REQUIRE(*(items[0]) == 22); + + *(items[0]) = 33; + items[1] = new int {44}; + items[2] = new int {55}; - std::vector buffer {1,2,3}; - q.push(buffer, 0); + size_t reserve_count = q.reserve(3, 5, 0); + REQUIRE(reserve_count == 5); + + q.push_and_unreserve(items, 3, reserve_count, 0); REQUIRE(q.size() == 3); - REQUIRE(buffer.size() == 0); - items.clear(); - result = q.pop(items, 2); - REQUIRE(items.size() == 2); + count = q.pop_and_reserve(items, 2, 2, 0); + REQUIRE(count == 2); REQUIRE(q.size() == 1); - REQUIRE(result == JMailbox::Status::Ready); - } diff --git a/src/programs/unit_tests/SchedulerTests.cc b/src/programs/unit_tests/SchedulerTests.cc index 03abff2eb..8b1008c36 100644 --- a/src/programs/unit_tests/SchedulerTests.cc +++ b/src/programs/unit_tests/SchedulerTests.cc @@ -11,37 +11,45 @@ TEST_CASE("SchedulerTests") { - RandIntSource source; - MultByTwoProcessor p1; - SubOneProcessor p2; - SumSink sink; + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - auto topology = std::make_shared(); + auto p1 = new JPool(0,1,false); + auto p2 = new JPool(0,1,false); + p1->init(); + p2->init(); + + MultByTwoProcessor processor; - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); + auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); + auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); + auto sum_everything = new SumSink("sum_everything", q3, p2); - auto emit_rand_ints = new SourceArrow("emit_rand_ints", source, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", p1, q1, q2); - auto subtract_one = new MapArrow("subtract_one", p2, q2, q3); - auto sum_everything = new SinkArrow("sum_everything", sink, q3); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); subtract_one->attach(sum_everything); - topology->sources.push_back(emit_rand_ints); topology->arrows.push_back(emit_rand_ints); topology->arrows.push_back(multiply_by_two); topology->arrows.push_back(subtract_one); topology->arrows.push_back(sum_everything); - topology->sinks.push_back(sum_everything); + + auto logger = JLogger(JLogger::Level::INFO); + topology->m_logger = logger; + emit_rand_ints->set_logger(logger); + multiply_by_two->set_logger(logger); + subtract_one->set_logger(logger); + sum_everything->set_logger(logger); emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); - // scheduler.logger = JLogger(JLogger::Level::DEBUG); + scheduler.logger = logger; + scheduler.run_topology(1); JArrow* assignment; @@ -89,40 +97,46 @@ TEST_CASE("SchedulerTests") { TEST_CASE("SchedulerRoundRobinBehaviorTests") { - RandIntSource source; - MultByTwoProcessor p1; - SubOneProcessor p2; - SumSink sink; + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - auto topology = std::make_shared(); + auto p1 = new JPool(0,1,false); + auto p2 = new JPool(0,1,false); + p1->init(); + p2->init(); - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + MultByTwoProcessor processor; - auto emit_rand_ints = new SourceArrow("emit_rand_ints", source, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", p1, q1, q2); - auto subtract_one = new MapArrow("subtract_one", p2, q2, q3); - auto sum_everything = new SinkArrow("sum_everything", sink, q3); + auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); + auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); + auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); + auto sum_everything = new SumSink("sum_everything", q3, p2); + + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); subtract_one->attach(sum_everything); - topology->sources.push_back(emit_rand_ints); topology->arrows.push_back(emit_rand_ints); topology->arrows.push_back(multiply_by_two); topology->arrows.push_back(subtract_one); topology->arrows.push_back(sum_everything); - topology->sinks.push_back(sum_everything); + + auto logger = JLogger(JLogger::Level::INFO); + topology->m_logger = logger; + emit_rand_ints->set_logger(logger); + multiply_by_two->set_logger(logger); + subtract_one->set_logger(logger); + sum_everything->set_logger(logger); emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); + scheduler.logger = logger; scheduler.run_topology(1); - auto logger = JLogger(JLogger::Level::OFF); - auto last_result = JArrowMetrics::Status::ComeBackLater; JArrow* assignment = nullptr; diff --git a/src/programs/unit_tests/SinkArrow.h b/src/programs/unit_tests/SinkArrow.h deleted file mode 100644 index ee49c5e41..000000000 --- a/src/programs/unit_tests/SinkArrow.h +++ /dev/null @@ -1,87 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#ifndef GREENFIELD_SINKARROW_H -#define GREENFIELD_SINKARROW_H - -#include - - -/// Sink consumes events of type T, accumulating state along the way. This -/// state is supposed to reside on the Sink itself. -/// The final result of this accumulation is a side-effect which should be -/// safe to retrieve after finalize() is called. (finalize() will be called -/// automatically after all upstream events have been processed) -/// This is conceptually equivalent to the part of JEventProcessor::Process -/// after the lock is acquired. - -template -struct Sink { - virtual void initialize() = 0; - virtual void finalize() = 0; - virtual void outprocess(T t) = 0; -}; - -/// SinkArrow lifts a sink into a streaming, async context -template -class SinkArrow : public JArrow { - -private: - Sink & _sink; - JMailbox * _input_queue; - std::vector _chunk_buffer; - bool _is_initialized = false; - -public: - SinkArrow(std::string name, Sink& sink, JMailbox* input_queue) - : JArrow(name, false, NodeType::Sink) - , _sink(sink) - , _input_queue(input_queue) { - }; - - void execute(JArrowMetrics& result, size_t /* location_id */) override { - if (!_is_initialized) { - _sink.initialize(); - _chunk_buffer.reserve(get_chunksize()); - _is_initialized = true; - } - - auto start_time = std::chrono::steady_clock::now(); - - auto in_status = _input_queue->pop(_chunk_buffer, get_chunksize()); - - auto latency_start_time = std::chrono::steady_clock::now(); - - for (T item : _chunk_buffer) { - _sink.outprocess(item); - } - auto latency_stop_time = std::chrono::steady_clock::now(); - - auto message_count = _chunk_buffer.size(); - _chunk_buffer.clear(); - - auto stop_time = std::chrono::steady_clock::now(); - - auto latency = latency_stop_time - latency_start_time; - auto overhead = (stop_time - start_time) - latency; - - JArrowMetrics::Status status; - if (in_status == JMailbox::Status::Empty) { - status = JArrowMetrics::Status::ComeBackLater; - } - else { - status = JArrowMetrics::Status::KeepGoing; - } - result.update(status, message_count, 1, latency, overhead); - } - - size_t get_pending() final { return _input_queue->size(); } - - size_t get_threshold() final { return _input_queue->get_threshold(); } - - void set_threshold(size_t threshold) final { _input_queue->set_threshold(threshold); } -}; - - -#endif //GREENFIELD_SINKARROW_H diff --git a/src/programs/unit_tests/SourceArrow.h b/src/programs/unit_tests/SourceArrow.h deleted file mode 100644 index 763a88293..000000000 --- a/src/programs/unit_tests/SourceArrow.h +++ /dev/null @@ -1,86 +0,0 @@ - -// Copyright 2020, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - -#ifndef GREENFIELD_SOURCEARROW_H -#define GREENFIELD_SOURCEARROW_H - -#include -#include - - -/// Source stands in for (and improves upon) JEventSource. -/// The type signature of inprocess() is chosen with the following goals: -/// - Chunking data in order to increase parallelism coarseness -/// - The 'no more events' condition is handled automatically -/// - Status information such as 'error' or 'finished' is decoupled from the actual stream of Events -/// - The user is given the opportunity to implement their own rate-limiting -template -struct Source { - - /// Return codes for Sources. These are identical to Queue::StreamStatus (on purpose, because - /// it is convenient to model a Source as a stream) but we keep them separate because one is part - /// of the API and the other is an internal implementation detail. - enum class Status {KeepGoing, ComeBackLater, Finished, Error}; - - virtual void initialize() = 0; - virtual void finalize() = 0; - virtual Status inprocess(std::vector& ts, size_t count) = 0; -}; - - -/// SourceArrow wraps a reference to a Source and 'lifts' it into a Arrow -/// that knows how to stream to and from queues, and is executable by a Worker -template -class SourceArrow : public JArrow { - -private: - Source & _source; - JMailbox * _output_queue; - std::vector _chunk_buffer; - - -public: - SourceArrow(std::string name, Source& source, JMailbox *output_queue) - : JArrow(name, false, NodeType::Source) - , _source(source) - , _output_queue(output_queue) { - } - - void execute(JArrowMetrics& result, size_t /* location_id */) override { - - auto start_time = std::chrono::steady_clock::now(); - auto in_status = _source.inprocess(_chunk_buffer, get_chunksize()); - auto latency_time = std::chrono::steady_clock::now(); - auto out_status = _output_queue->push(_chunk_buffer); - auto message_count = _chunk_buffer.size(); - auto finished_time = std::chrono::steady_clock::now(); - - auto latency = (latency_time - start_time); - auto overhead = (finished_time - latency_time); - - JArrowMetrics::Status status; - if (in_status == Source::Status::Finished) { - status = JArrowMetrics::Status::Finished; - } - else if (in_status == Source::Status::KeepGoing && out_status == JMailbox::Status::Ready) { - status = JArrowMetrics::Status::KeepGoing; - } - else { - status = JArrowMetrics::Status::ComeBackLater; - } - result.update(status, message_count, 1, latency, overhead); - } - - - void initialize() override { - _source.initialize(); - } - - void finalize() override { - _source.finalize(); - } -}; - - -#endif //GREENFIELD_SOURCEARROW_H diff --git a/src/programs/unit_tests/SubeventTests.cc b/src/programs/unit_tests/SubeventTests.cc index 6bece3f02..6a3331119 100644 --- a/src/programs/unit_tests/SubeventTests.cc +++ b/src/programs/unit_tests/SubeventTests.cc @@ -7,7 +7,6 @@ #include #include -#include #include #include "JANA/Engine/JArrowTopology.h" #include "JANA/Engine/JTopologyBuilder.h" @@ -42,30 +41,30 @@ TEST_CASE("Create subevent processor") { REQUIRE(output->z == 29.6f); } - +#if 0 TEST_CASE("Simplest working SubeventMailbox") { std::vector> unmerged; auto event1 = std::make_shared(); auto event2 = std::make_shared(); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(2), 1, 5)); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(4), 2, 5)); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(6), 3, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(2), 1, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(4), 2, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(6), 3, 5)); JSubeventMailbox mailbox; mailbox.push(unmerged); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(8), 4, 5)); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(10), 5, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(8), 4, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(10), 5, 5)); mailbox.push(unmerged); - std::vector> merged; + std::vector*> merged; JSubeventMailbox::Status result = mailbox.pop(merged, 1); REQUIRE(result == JSubeventMailbox::Status::Empty); REQUIRE(merged.size() == 1); - auto items_in_event = merged[0]->Get(); + auto items_in_event = (*(merged[0]))->Get(); REQUIRE(items_in_event.size() == 5); } @@ -74,14 +73,14 @@ TEST_CASE("SubeventMailbox with two overlapping events") { JSubeventMailbox mailbox; std::vector> unmerged; - std::vector> merged; + std::vector*> merged; auto event1 = std::make_shared(); auto event2 = std::make_shared(); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(2), 1, 5)); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(4), 2, 5)); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(6), 3, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(2), 1, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(4), 2, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(6), 3, 5)); mailbox.push(unmerged); @@ -91,45 +90,45 @@ TEST_CASE("SubeventMailbox with two overlapping events") { // Now we mix in some subevents from event 2 - unmerged.push_back(SubeventWrapper(event2, new MyOutput(1), 1, 4)); - unmerged.push_back(SubeventWrapper(event2, new MyOutput(3), 2, 4)); - unmerged.push_back(SubeventWrapper(event2, new MyOutput(5), 3, 4)); + unmerged.push_back(SubeventWrapper(&event2, new MyOutput(1), 1, 4)); + unmerged.push_back(SubeventWrapper(&event2, new MyOutput(3), 2, 4)); + unmerged.push_back(SubeventWrapper(&event2, new MyOutput(5), 3, 4)); // Still not able to pop anything because neither of the events are complete JSubeventMailbox::Status result1 = mailbox.pop(merged, 1); REQUIRE(result1 == JSubeventMailbox::Status::Empty); // Now we receive the rest of the subevents from event 1 - unmerged.push_back(SubeventWrapper(event1, new MyOutput(8), 4, 5)); - unmerged.push_back(SubeventWrapper(event1, new MyOutput(10), 5, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(8), 4, 5)); + unmerged.push_back(SubeventWrapper(&event1, new MyOutput(10), 5, 5)); mailbox.push(unmerged); // We were able to get event1 out, but not event 2 JSubeventMailbox::Status result2 = mailbox.pop(merged, 2); REQUIRE(result2 == JSubeventMailbox::Status::Empty); REQUIRE(merged.size() == 1); - REQUIRE(merged[0] == event1); + REQUIRE(merged[0] == &event1); merged.clear(); // Now we add the remaining subevents from event 2 - unmerged.push_back(SubeventWrapper(event2, new MyOutput(7), 4, 4)); + unmerged.push_back(SubeventWrapper(&event2, new MyOutput(7), 4, 4)); mailbox.push(unmerged); // Now we can pop event2 JSubeventMailbox::Status result3 = mailbox.pop(merged, 2); REQUIRE(result3 == JSubeventMailbox::Status::Empty); REQUIRE(merged.size() == 1); - REQUIRE(merged[0] == event2); - auto items_in_event = merged[0]->Get(); + REQUIRE(merged[0] == &event2); + auto items_in_event = (*(merged[0]))->Get(); REQUIRE(items_in_event.size() == 4); } - +#endif TEST_CASE("Basic subevent arrow functionality") { MyProcessor processor; - JMailbox> events_in; - JMailbox> events_out; + JMailbox*> events_in; + JMailbox*> events_out; JMailbox> subevents_in; JMailbox> subevents_out; @@ -187,12 +186,10 @@ TEST_CASE("Basic subevent arrow functionality") { proc_arrow->add_processor(new SimpleProcessor); topology->arrows.push_back(source_arrow); - topology->sources.push_back(source_arrow); topology->arrows.push_back(split_arrow); topology->arrows.push_back(subprocess_arrow); topology->arrows.push_back(merge_arrow); topology->arrows.push_back(proc_arrow); - topology->sinks.push_back(proc_arrow); source_arrow->attach(split_arrow); split_arrow->attach(subprocess_arrow); subprocess_arrow->attach(merge_arrow); diff --git a/src/programs/unit_tests/TestTopologyComponents.h b/src/programs/unit_tests/TestTopologyComponents.h index 2201ca696..2c69d9fc8 100644 --- a/src/programs/unit_tests/TestTopologyComponents.h +++ b/src/programs/unit_tests/TestTopologyComponents.h @@ -7,37 +7,35 @@ #include #include +#include #include -#include #include -#include -struct RandIntSource : public Source { +struct RandIntSource : public JPipelineArrow { size_t emit_limit = 20; // How many to emit size_t emit_count = 0; // How many emitted so far int emit_sum = 0; // Sum of all ints emitted so far JLogger logger; - Status inprocess(std::vector &items, size_t count) override { + RandIntSource(std::string name, JPool* pool, JMailbox* output_queue) + : JPipelineArrow(name, false, true, false, nullptr, output_queue, pool) {} - for (size_t i = 0; i < count && emit_count < emit_limit; ++i) { - //std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - int x = 7; - items.push_back(x); - emit_count += 1; - emit_sum += x; - } - LOG_DEBUG(logger) << "RandIntSource emitted " << emit_count << " events" << LOG_END; + void process(int* item, bool& success, JArrowMetrics::Status& status) { if (emit_count >= emit_limit) { - return Status::Finished; + success = false; + status = JArrowMetrics::Status::Finished; + return; } -// else if (emit_count % 5 == 0) { -// return StreamStatus::ComeBackLater; -// } - return Status::KeepGoing; + *item = 7; + emit_sum += *item; + emit_count += 1; + LOG_DEBUG(logger) << "RandIntSource emitted event " << emit_count << " with value " << *item << LOG_END; + success = true; + status = (emit_count == emit_limit) ? JArrowMetrics::Status::Finished : JArrowMetrics::Status::KeepGoing; + // This design lets us declare Finished immediately on the last event, instead of after } void initialize() override { @@ -50,40 +48,44 @@ struct RandIntSource : public Source { }; -struct MultByTwoProcessor : public ParallelProcessor { +struct MultByTwoProcessor : public ParallelProcessor { - double process(int x) override { - return x * 2.0; + double* process(int* x) override { + return new double(*x * 2.0); } }; -class SubOneProcessor : public ParallelProcessor { +struct SubOneProcessor : public JPipelineArrow { + + SubOneProcessor(std::string name, JMailbox* input_queue, JMailbox* output_queue) + : JPipelineArrow(name, true, false, false, input_queue, output_queue, nullptr) {} -private: - double process(double x) override { - return x - 1; + void process(double* item, bool&, JArrowMetrics::Status&) { + *item -= 1; } }; template -struct SumSink : public Sink { +struct SumSink : public JPipelineArrow, T> { T sum = 0; - JLogger logger; - void outprocess(T d) override { - sum += d; - LOG_DEBUG(logger) << "SumSink.outprocess() called!" << LOG_END; + SumSink(std::string name, JMailbox* input_queue, JPool* pool) + : JPipelineArrow,T>(name, false, false, true, input_queue, nullptr, pool) {} + + void process(T* item, bool&, JArrowMetrics::Status&) { + sum += *item; + LOG_DEBUG(JArrow::m_logger) << "SumSink.outprocess() called!" << LOG_END; } void initialize() override { - LOG_INFO(logger) << "SumSink.initialize() called!" << LOG_END; + LOG_INFO(JArrow::m_logger) << "SumSink.initialize() called!" << LOG_END; }; void finalize() override { - LOG_INFO(logger) << "SumSink.finalize() called!" << LOG_END; + LOG_INFO(JArrow::m_logger) << "SumSink.finalize() called!" << LOG_END; }; }; diff --git a/src/programs/unit_tests/TopologyTests.cc b/src/programs/unit_tests/TopologyTests.cc index 654750448..fd281995f 100644 --- a/src/programs/unit_tests/TopologyTests.cc +++ b/src/programs/unit_tests/TopologyTests.cc @@ -28,40 +28,44 @@ void log_status(JArrowTopology& /*topology*/) { TEST_CASE("JTopology: Basic functionality") { - RandIntSource source; - MultByTwoProcessor p1; - SubOneProcessor p2; - SumSink sink; + auto q1 = new JMailbox(); + auto q2 = new JMailbox(); + auto q3 = new JMailbox(); - auto topology = std::make_shared(); + auto p1 = new JPool(0,1,false); + auto p2 = new JPool(0,1,false); + p1->init(); + p2->init(); + + MultByTwoProcessor processor; - auto q1 = new JMailbox(); - auto q2 = new JMailbox(); - auto q3 = new JMailbox(); + auto emit_rand_ints = new RandIntSource("emit_rand_ints", p1, q1); + auto multiply_by_two = new MapArrow("multiply_by_two", processor, q1, q2); + auto subtract_one = new SubOneProcessor("subtract_one", q2, q3); + auto sum_everything = new SumSink("sum_everything", q3, p2); - auto emit_rand_ints = new SourceArrow("emit_rand_ints", source, q1); - auto multiply_by_two = new MapArrow("multiply_by_two", p1, q1, q2); - auto subtract_one = new MapArrow("subtract_one", p2, q2, q3); - auto sum_everything = new SinkArrow("sum_everything", sink, q3); + auto topology = std::make_shared(); emit_rand_ints->attach(multiply_by_two); multiply_by_two->attach(subtract_one); subtract_one->attach(sum_everything); - topology->sources.push_back(emit_rand_ints); topology->arrows.push_back(emit_rand_ints); topology->arrows.push_back(multiply_by_two); topology->arrows.push_back(subtract_one); topology->arrows.push_back(sum_everything); - topology->sinks.push_back(sum_everything); + + auto logger = JLogger(JLogger::Level::INFO); + topology->m_logger = logger; + emit_rand_ints->set_logger(logger); + multiply_by_two->set_logger(logger); + subtract_one->set_logger(logger); + sum_everything->set_logger(logger); emit_rand_ints->set_chunksize(1); - auto logger = JLogger(JLogger::Level::TRACE); - topology->m_logger = logger; - source.logger = logger; JScheduler scheduler(topology); - + scheduler.logger = logger; SECTION("Before anything runs...") { @@ -138,7 +142,7 @@ TEST_CASE("JTopology: Basic functionality") { REQUIRE(sum_everything->get_pending() == 0); } - REQUIRE(sink.sum == (7 * 2.0 - 1) * 20); + REQUIRE(sum_everything->sum == (7 * 2.0 - 1) * 20); } SECTION("Running each stage in random order (sequentially) yields the correct results") { @@ -184,14 +188,10 @@ TEST_CASE("JTopology: Basic functionality") { } //topology.log_queue_status(); - REQUIRE(sink.sum == (7 * 2.0 - 1) * 20); + REQUIRE(sum_everything->sum == (7 * 2.0 - 1) * 20); } SECTION("Finished flag propagates") { - logger = JLogger(JLogger::Level::OFF); - topology->m_logger = logger; - source.logger = logger; - scheduler.run_topology(1); auto ts = scheduler.get_topology_state(); JArrowMetrics::Status status; @@ -254,7 +254,7 @@ TEST_CASE("JTopology: Basic functionality") { auto builder = app.GetService(); builder->set(topology); - REQUIRE(sink.sum == 0); + REQUIRE(sum_everything->sum == 0); app.Run(true); auto scheduler = app.GetService()->get_scheduler(); @@ -263,7 +263,7 @@ TEST_CASE("JTopology: Basic functionality") { auto ts = scheduler->get_topology_state(); REQUIRE(ts.current_topology_status == JScheduler::TopologyStatus::Finalized); REQUIRE(ts.arrow_states[0].status == JScheduler::ArrowStatus::Finalized); - REQUIRE(sink.sum == 20 * 13); + REQUIRE(sum_everything->sum == 20 * 13); } }