Skip to content

Commit

Permalink
Minor JArrow/JTopologyBuilder cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Oct 13, 2024
1 parent 7518117 commit e0240dc
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ bool JArrowProcessingController::is_timed_out() {
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) {
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_pool_capacity * 1.0) / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
Expand Down
31 changes: 0 additions & 31 deletions src/libraries/JANA/Topology/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include <iostream>
#include <atomic>
#include <cassert>
#include <vector>

Expand All @@ -29,8 +27,6 @@ class JArrow {
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows

Expand Down Expand Up @@ -120,11 +116,6 @@ struct PlaceRefBase {
template <typename T>
struct PlaceRef : public PlaceRefBase {

PlaceRef(JArrow* parent) {
assert(parent != nullptr);
parent->attach(this);
}

PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
parent->attach(this);
Expand All @@ -133,28 +124,6 @@ struct PlaceRef : public PlaceRefBase {
this->max_item_count = max_item_count;
}

PlaceRef(JArrow* parent, JMailbox<T*>* queue, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
assert(queue != nullptr);
parent->attach(this);
this->place_ref = queue;
this->is_queue = true;
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

PlaceRef(JArrow* parent, JPool<T>* pool, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
assert(pool != nullptr);
parent->attach(this);
this->place_ref = pool;
this->is_queue = false;
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

void set_queue(JMailbox<T*>* queue) {
assert(queue != nullptr);
this->place_ref = queue;
Expand Down
8 changes: 4 additions & 4 deletions src/libraries/JANA/Topology/JJunctionArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ template <typename DerivedT, typename FirstT, typename SecondT>
class JJunctionArrow : public JArrow {

protected:
PlaceRef<FirstT> first_input {this};
PlaceRef<FirstT> first_output {this};
PlaceRef<SecondT> second_input {this};
PlaceRef<SecondT> second_output {this};
PlaceRef<FirstT> first_input {this, true, 1, 1};
PlaceRef<FirstT> first_output {this, false, 1, 1};
PlaceRef<SecondT> second_input {this, true, 1, 1};
PlaceRef<SecondT> second_output {this, false, 1, 1};

public:
using Status = JArrowMetrics::Status;
Expand Down
15 changes: 8 additions & 7 deletions src/libraries/JANA/Topology/JTopologyBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void JTopologyBuilder::create_topology() {
static_cast<JProcessorMapping::LocalityStrategy>(m_locality));

event_pool = new JEventPool(m_components,
m_event_pool_size,
m_pool_capacity,
m_location_count,
m_limit_total_events_in_flight);
event_pool->init();
Expand Down Expand Up @@ -128,19 +128,20 @@ void JTopologyBuilder::acquire_services(JServiceLocator *sl) {
// We parse the 'nthreads' parameter two different ways for backwards compatibility.
if (m_params->Exists("nthreads")) {
if (m_params->GetParameterValue<std::string>("nthreads") == "Ncores") {
m_event_pool_size = JCpuInfo::GetNumCpus();
m_pool_capacity = JCpuInfo::GetNumCpus();
} else {
m_event_pool_size = m_params->GetParameterValue<int>("nthreads");
m_pool_capacity = m_params->GetParameterValue<int>("nthreads");
}
m_queue_capacity = m_pool_capacity;
}

m_params->SetDefaultParameter("jana:event_pool_size", m_event_pool_size,
m_params->SetDefaultParameter("jana:event_pool_size", m_pool_capacity,
"Sets the initial size of the event pool. Having too few events starves the workers; having too many consumes memory and introduces overhead from extra factory initializations")
->SetIsAdvanced(true);
m_params->SetDefaultParameter("jana:limit_total_events_in_flight", m_limit_total_events_in_flight,
"Controls whether the event pool is allowed to automatically grow beyond jana:event_pool_size")
->SetIsAdvanced(true);
m_params->SetDefaultParameter("jana:event_queue_threshold", m_event_queue_threshold,
m_params->SetDefaultParameter("jana:event_queue_threshold", m_queue_capacity,
"Max number of events allowed on the main event queue. Higher => Better load balancing; Lower => Fewer events in flight")
->SetIsAdvanced(true);
m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing,
Expand All @@ -157,7 +158,7 @@ void JTopologyBuilder::acquire_services(JServiceLocator *sl) {

void JTopologyBuilder::connect(JArrow* upstream, size_t up_index, JArrow* downstream, size_t down_index) {

auto queue = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing);
auto queue = new EventQueue(m_queue_capacity, mapping.get_loc_count(), m_enable_stealing);
queues.push_back(queue);

size_t i = 0;
Expand Down Expand Up @@ -260,7 +261,7 @@ void JTopologyBuilder::attach_level(JEventLevel current_level, JUnfoldArrow* par
// --------------------------
// 0. Pool
// --------------------------
JEventPool* pool_at_level = new JEventPool(m_components, m_event_pool_size, m_location_count, m_limit_total_events_in_flight, current_level);
JEventPool* pool_at_level = new JEventPool(m_components, m_pool_capacity, m_location_count, m_limit_total_events_in_flight, current_level);
pool_at_level->init();
pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology

Expand Down
7 changes: 2 additions & 5 deletions src/libraries/JANA/Topology/JTopologyBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class JTopologyBuilder : public JService {
std::vector<JPoolBase*> pools; // Pools shared between arrows

// Topology configuration
size_t m_event_pool_size = 4;
size_t m_event_queue_threshold = 80;
size_t m_pool_capacity = 4;
size_t m_queue_capacity = 4;
size_t m_location_count = 1;
bool m_enable_stealing = false;
bool m_limit_total_events_in_flight = true;
Expand Down Expand Up @@ -68,9 +68,6 @@ class JTopologyBuilder : public JService {
void connect_to_first_available(JArrow* upstream, std::vector<JArrow*> downstreams);
void connect(JArrow* upstream, size_t upstream_index, JArrow* downstream, size_t downstream_index);

void attach_lower_level(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, bool found_sink);
void attach_top_level(JEventLevel current_level);

std::string print_topology();


Expand Down
8 changes: 4 additions & 4 deletions src/programs/unit_tests/Topology/ArrowTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ struct TestMapArrow : public JJunctionArrow<TestMapArrow, int, double> {
JMailbox<double*>* qd)
: JJunctionArrow<TestMapArrow,int,double>("testmaparrow", false, false, true) {

first_input = {this, qi, true, 1, 1};
first_output = {this, pi, false, 1, 1};
second_input = {this, pd, true, 1, 1};
second_output = {this, qd, false, 1, 1};
first_input.set_queue(qi);
first_output.set_pool(pi);
second_input.set_pool(pd);
second_output.set_queue(qd);
}

Status process(Data<int>& input_int,
Expand Down

0 comments on commit e0240dc

Please sign in to comment.