diff --git a/src/libraries/JANA/Engine/JOmniArrow.h b/src/libraries/JANA/Engine/JOmniArrow.h index 5c8379164..26273e916 100644 --- a/src/libraries/JANA/Engine/JOmniArrow.h +++ b/src/libraries/JANA/Engine/JOmniArrow.h @@ -8,114 +8,13 @@ #include -#ifndef JANA2_ARROWDATA_MAX_SIZE -#define JANA2_ARROWDATA_MAX_SIZE 10 -#endif - -/* -template -struct Data { - std::array items; - size_t item_count = 0; - size_t reserve_count = 0; - size_t location_id; - - Data(size_t location_id = 0) : location_id(location_id) { - items = {nullptr}; - } -}; - -template -struct PlaceRef { - JMailbox* queue = nullptr; - JPool* pool = nullptr; - bool is_input = false; - size_t min_item_count = 1; - size_t max_item_count = 1; - - PlaceRef() = default; - - PlaceRef(JMailbox* queue, bool is_input, size_t min_item_count, size_t max_item_count) - : queue(queue), is_input(is_input), - min_item_count(min_item_count), max_item_count(max_item_count) {} - - PlaceRef(JPool* pool, bool is_input, size_t min_item_count, size_t max_item_count) - : pool(pool), is_input(is_input), - min_item_count(min_item_count), max_item_count(max_item_count) {} - - - bool pull(Data& data) { - if (is_input) { // Actually pull the data - if (queue != nullptr) { - data.item_count = queue->pop_and_reserve(data.items.data(), min_item_count, max_item_count, data.location_id); - data.reserve_count = data.item_count; - return (data.item_count >= min_item_count); - } - else { - data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id); - data.reserve_count = 0; - return (data.item_count >= min_item_count); - } - } - else { - if (queue != nullptr) { - // Reserve a space on the output queue - data.item_count = 0; - data.reserve_count = queue->reserve(min_item_count, max_item_count, data.location_id); - return (data.reserve_count >= min_item_count); - } - else { - // No need to reserve on pool -- either there is space or limit_events_in_flight=false - data.item_count = 0; - data.reserve_count = 0; - return true; - } - } - } - - void revert(Data& data) { - if (queue != nullptr) { - queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id); - } - else { - if (is_input) { - pool->push(data.items.data(), data.item_count, data.location_id); - } - } - } - - size_t push(Data& data) { - if (queue != nullptr) { - queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id); - return is_input ? 0 : data.item_count; - } - else { - pool->push(data.items.data(), data.item_count, data.location_id); - return 0; - } - } -}; -*/ - template class JOmniArrow : public JArrow { - /* - template - struct PlaceRefPack { - std::tuple...> places {this}; - - template - PlaceRefPack(Arg arg) : places((sizeof(Ts),arg)...) {} - }; - */ - - protected: - //PlaceRefPack places_pack{this}; + [[maybe_unused]] std::tuple...> places {(sizeof(PlaceTs), this)...}; - public: using Status = JArrowMetrics::Status; @@ -128,83 +27,42 @@ class JOmniArrow : public JArrow { { } - /* - size_t get_pending() final { - // This is actually used by JScheduler for better or for worse - size_t first_pending = first_input.queue ? first_input.queue->size() : 0; - size_t second_pending = second_input.queue ? second_input.queue->size() : 0; - return first_pending + second_pending; - }; + void execute(JArrowMetrics& result, size_t location_id) final { - void set_threshold(size_t) final { } + auto start_total_time = std::chrono::steady_clock::now(); + // Create data holders + [[maybe_unused]] + std::tuple...> data {(sizeof(PlaceTs), location_id)...}; - bool try_pull_all(Data& fi, Data& fo, Data& si, Data& so) { + /* + // Attempt to pull from all places + bool success = (std::get>(places).pull(std::get>(data)) && ...); + if (!success) { - bool success; - success = first_input.pull(fi); - if (! success) { - return false; - } - success = first_output.pull(fo); - if (! success) { - first_input.revert(fi); - return false; - } - success = second_input.pull(si); - if (! success) { - first_input.revert(fi); - first_output.revert(fo); - return false; - } - success = second_output.pull(so); - if (! success) { - first_input.revert(fi); - first_output.revert(fo); - second_input.revert(si); - return false; - } - return true; - } + // Revert all PlaceRefs + (std::get>(places).revert(std::get>(data)), ...); - size_t push_all(std::tuple&...> data) { - size_t message_count = places.push(data...) + ...; - return message_count; - } - */ + // Report back and exit + auto end_total_time = std::chrono::steady_clock::now(); + result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time); + } + else { - void execute(JArrowMetrics& result, size_t location_id) final { + auto start_processing_time = std::chrono::steady_clock::now(); - auto start_total_time = std::chrono::steady_clock::now(); -/* - std::tuple< - Data first_input_data; - Data first_output_data; - Data second_input_data; - Data second_output_data; - first_input_data.location_id = location_id; - first_output_data.location_id = location_id; - second_input_data.location_id = location_id; - second_output_data.location_id = location_id; - - bool success = try_pull_all(first_input_data, first_output_data, second_input_data, second_output_data); - if (success) { + // Call user-provided process() given fully typed Data + auto process_status = static_cast(this)->process(data); - auto start_processing_time = std::chrono::steady_clock::now(); - auto process_status = static_cast(this)->process(first_input_data, first_output_data, second_input_data, second_output_data); auto end_processing_time = std::chrono::steady_clock::now(); - size_t events_processed = push_all(first_input_data, first_output_data, second_input_data, second_output_data); + + // Push to all places (always succeeds, assuming user didn't muck with reserved_count) + size_t events_processed = (std::get>(places).push(std::get>(data)) + ...); auto end_total_time = std::chrono::steady_clock::now(); auto latency = (end_processing_time - start_processing_time); auto overhead = (end_total_time - start_total_time) - latency; result.update(process_status, events_processed, 1, latency, overhead); - return; - } - else { - auto end_total_time = std::chrono::steady_clock::now(); - result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time); - return; } */ } diff --git a/src/programs/unit_tests/ArrowTests.cc b/src/programs/unit_tests/ArrowTests.cc index 5db4dd5b0..affbd506e 100644 --- a/src/programs/unit_tests/ArrowTests.cc +++ b/src/programs/unit_tests/ArrowTests.cc @@ -60,17 +60,16 @@ struct TestMapArrow : public JJunctionArrow { }; -struct TestMapArrow2 : public JOmniArrow { +struct TestMapArrow2 : public JOmniArrow { - TestMapArrow2() : JOmniArrow("testmaparrow", false, false, true) { + TestMapArrow2() : JOmniArrow("testmaparrow", false, false, true) { } -/* - Status process(Data& input_int, - Data& output_int, - Data& input_double, - Data& output_double) { + + Status process(std::tuple, Data, Data, Data>& data) { std::cout << "Hello from process" << std::endl; + auto& [input_int, output_int, input_double, output_double] = data; + REQUIRE(input_int.item_count == 1); REQUIRE(input_int.reserve_count == 1); REQUIRE(output_int.item_count == 0); @@ -101,11 +100,11 @@ struct TestMapArrow2 : public JOmniArrow { output_double.item_count = 1; return Status::KeepGoing; } -*/ }; + TEST_CASE("ArrowTests_Basic") { JMailbox qi {2, 1, false}; @@ -130,8 +129,6 @@ TEST_CASE("ArrowTests_Basic") { qd.pop_and_reserve(&y, 1, 1, 0); REQUIRE(*y == 122.2); - TestMapArrow2 aa; - }