Skip to content

Commit

Permalink
Slow progress towards JOmniArrow
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Dec 27, 2023
1 parent 3836c18 commit 0499f39
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 175 deletions.
188 changes: 23 additions & 165 deletions src/libraries/JANA/Engine/JOmniArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,114 +8,13 @@
#include <JANA/Engine/JPool.h>


#ifndef JANA2_ARROWDATA_MAX_SIZE
#define JANA2_ARROWDATA_MAX_SIZE 10
#endif

/*
template <typename T>
struct Data {
std::array<T*, JANA2_ARROWDATA_MAX_SIZE> items;
size_t item_count = 0;
size_t reserve_count = 0;
size_t location_id;
Data(size_t location_id = 0) : location_id(location_id) {
items = {nullptr};
}
};
template <typename T>
struct PlaceRef {
JMailbox<T*>* queue = nullptr;
JPool<T>* pool = nullptr;
bool is_input = false;
size_t min_item_count = 1;
size_t max_item_count = 1;
PlaceRef() = default;
PlaceRef(JMailbox<T*>* queue, bool is_input, size_t min_item_count, size_t max_item_count)
: queue(queue), is_input(is_input),
min_item_count(min_item_count), max_item_count(max_item_count) {}
PlaceRef(JPool<T>* pool, bool is_input, size_t min_item_count, size_t max_item_count)
: pool(pool), is_input(is_input),
min_item_count(min_item_count), max_item_count(max_item_count) {}
bool pull(Data<T>& data) {
if (is_input) { // Actually pull the data
if (queue != nullptr) {
data.item_count = queue->pop_and_reserve(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = data.item_count;
return (data.item_count >= min_item_count);
}
else {
data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = 0;
return (data.item_count >= min_item_count);
}
}
else {
if (queue != nullptr) {
// Reserve a space on the output queue
data.item_count = 0;
data.reserve_count = queue->reserve(min_item_count, max_item_count, data.location_id);
return (data.reserve_count >= min_item_count);
}
else {
// No need to reserve on pool -- either there is space or limit_events_in_flight=false
data.item_count = 0;
data.reserve_count = 0;
return true;
}
}
}
void revert(Data<T>& data) {
if (queue != nullptr) {
queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id);
}
else {
if (is_input) {
pool->push(data.items.data(), data.item_count, data.location_id);
}
}
}
size_t push(Data<T>& data) {
if (queue != nullptr) {
queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id);
return is_input ? 0 : data.item_count;
}
else {
pool->push(data.items.data(), data.item_count, data.location_id);
return 0;
}
}
};
*/

template <typename DerivedT, typename... PlaceTs>
class JOmniArrow : public JArrow {

/*
template <typename... Ts>
struct PlaceRefPack {
std::tuple<PlaceRef<PlaceTs>...> places {this};
template <typename Arg>
PlaceRefPack(Arg arg) : places((sizeof(Ts),arg)...) {}
};
*/


protected:
//PlaceRefPack<PlaceTs...> places_pack{this};
[[maybe_unused]]
std::tuple<PlaceRef<PlaceTs>...> places {(sizeof(PlaceTs), this)...};


public:
using Status = JArrowMetrics::Status;

Expand All @@ -128,83 +27,42 @@ class JOmniArrow : public JArrow {
{
}

/*
size_t get_pending() final {
// This is actually used by JScheduler for better or for worse
size_t first_pending = first_input.queue ? first_input.queue->size() : 0;
size_t second_pending = second_input.queue ? second_input.queue->size() : 0;
return first_pending + second_pending;
};

void execute(JArrowMetrics& result, size_t location_id) final {

void set_threshold(size_t) final { }
auto start_total_time = std::chrono::steady_clock::now();
// Create data holders
[[maybe_unused]]
std::tuple<Data<PlaceTs>...> data {(sizeof(PlaceTs), location_id)...};

bool try_pull_all(Data<FirstT>& fi, Data<FirstT>& fo, Data<SecondT>& si, Data<SecondT>& so) {
/*
// Attempt to pull from all places
bool success = (std::get<PlaceRef<PlaceTs>>(places).pull(std::get<Data<PlaceTs>>(data)) && ...);
if (!success) {
bool success;
success = first_input.pull(fi);
if (! success) {
return false;
}
success = first_output.pull(fo);
if (! success) {
first_input.revert(fi);
return false;
}
success = second_input.pull(si);
if (! success) {
first_input.revert(fi);
first_output.revert(fo);
return false;
}
success = second_output.pull(so);
if (! success) {
first_input.revert(fi);
first_output.revert(fo);
second_input.revert(si);
return false;
}
return true;
}
// Revert all PlaceRefs
(std::get<PlaceRef<PlaceTs>>(places).revert(std::get<Data<PlaceTs>>(data)), ...);
size_t push_all(std::tuple<Data<PlaceTs>&...> data) {
size_t message_count = places.push(data...) + ...;
return message_count;
}
*/
// Report back and exit
auto end_total_time = std::chrono::steady_clock::now();
result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
}
else {
void execute(JArrowMetrics& result, size_t location_id) final {
auto start_processing_time = std::chrono::steady_clock::now();
auto start_total_time = std::chrono::steady_clock::now();
/*
std::tuple<
Data<FirstT> first_input_data;
Data<FirstT> first_output_data;
Data<SecondT> second_input_data;
Data<SecondT> second_output_data;
first_input_data.location_id = location_id;
first_output_data.location_id = location_id;
second_input_data.location_id = location_id;
second_output_data.location_id = location_id;
bool success = try_pull_all(first_input_data, first_output_data, second_input_data, second_output_data);
if (success) {
// Call user-provided process() given fully typed Data
auto process_status = static_cast<DerivedT*>(this)->process(data);
auto start_processing_time = std::chrono::steady_clock::now();
auto process_status = static_cast<DerivedT*>(this)->process(first_input_data, first_output_data, second_input_data, second_output_data);
auto end_processing_time = std::chrono::steady_clock::now();
size_t events_processed = push_all(first_input_data, first_output_data, second_input_data, second_output_data);
// Push to all places (always succeeds, assuming user didn't muck with reserved_count)
size_t events_processed = (std::get<PlaceRef<PlaceTs>>(places).push(std::get<Data<PlaceTs>>(data)) + ...);
auto end_total_time = std::chrono::steady_clock::now();
auto latency = (end_processing_time - start_processing_time);
auto overhead = (end_total_time - start_total_time) - latency;
result.update(process_status, events_processed, 1, latency, overhead);
return;
}
else {
auto end_total_time = std::chrono::steady_clock::now();
result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time);
return;
}
*/
}
Expand Down
17 changes: 7 additions & 10 deletions src/programs/unit_tests/ArrowTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ struct TestMapArrow : public JJunctionArrow<TestMapArrow, int, double> {

};

struct TestMapArrow2 : public JOmniArrow<TestMapArrow2, int, double> {
struct TestMapArrow2 : public JOmniArrow<TestMapArrow2, int, int, double, double> {

TestMapArrow2() : JOmniArrow<TestMapArrow2,int,double>("testmaparrow", false, false, true) {
TestMapArrow2() : JOmniArrow<TestMapArrow2,int,int,double,double>("testmaparrow", false, false, true) {
}
/*
Status process(Data<int>& input_int,
Data<int>& output_int,
Data<double>& input_double,
Data<double>& output_double) {

Status process(std::tuple<Data<int>, Data<int>, Data<double>, Data<double>>& data) {
std::cout << "Hello from process" << std::endl;

auto& [input_int, output_int, input_double, output_double] = data;

REQUIRE(input_int.item_count == 1);
REQUIRE(input_int.reserve_count == 1);
REQUIRE(output_int.item_count == 0);
Expand Down Expand Up @@ -101,11 +100,11 @@ struct TestMapArrow2 : public JOmniArrow<TestMapArrow2, int, double> {
output_double.item_count = 1;
return Status::KeepGoing;
}
*/
};




TEST_CASE("ArrowTests_Basic") {

JMailbox<int*> qi {2, 1, false};
Expand All @@ -130,8 +129,6 @@ TEST_CASE("ArrowTests_Basic") {
qd.pop_and_reserve(&y, 1, 1, 0);
REQUIRE(*y == 122.2);

TestMapArrow2 aa;

}


Expand Down

0 comments on commit 0499f39

Please sign in to comment.