Skip to content

Commit

Permalink
Merge pull request #259 from JeffersonLab/nbrei_scheduler_refactoring
Browse files Browse the repository at this point in the history
Refactoring: Move topology and arrow bookkeeping into JScheduler
  • Loading branch information
nathanwbrei authored Nov 1, 2023
2 parents 4fa5ec2 + 3175966 commit 2db622a
Show file tree
Hide file tree
Showing 20 changed files with 684 additions and 517 deletions.
4 changes: 0 additions & 4 deletions src/examples/BlockExample/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ std::shared_ptr<JArrowTopology> configure_block_topology(std::shared_ptr<JArrowT
block_source_arrow->attach(block_disentangler_arrow);
block_disentangler_arrow->attach(processor_arrow);

block_source_arrow->set_running_arrows(&topology->running_arrow_count);
block_disentangler_arrow->set_running_arrows(&topology->running_arrow_count);
processor_arrow->set_running_arrows(&topology->running_arrow_count);

// If you want to add additional processors loaded from plugins, do this like so:
for (auto proc : topology->component_manager->get_evt_procs()) {
processor_arrow->add_processor(proc);
Expand Down
105 changes: 4 additions & 101 deletions src/libraries/JANA/Engine/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,28 @@
class JArrow {

public:
enum class Status { Unopened, Running, Paused, Finished };
enum class NodeType {Source, Sink, Stage, Group};
enum class BackoffStrategy { Constant, Linear, Exponential };
using duration_t = std::chrono::steady_clock::duration;


private:
// Info
const std::string m_name; // Used for human understanding
const bool m_is_parallel; // Whether or not it is safe to parallelize
const NodeType m_type;
JArrowMetrics m_metrics; // Performance information accumulated over all workers

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties

// Knobs
size_t m_chunksize = 1; // Number of items to pop off the input queue at once
BackoffStrategy m_backoff_strategy = BackoffStrategy::Exponential;
duration_t m_initial_backoff_time = std::chrono::microseconds(1);
duration_t m_checkin_time = std::chrono::milliseconds(500);
unsigned m_backoff_tries = 4;

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties, except m_status
std::atomic<Status> m_status {Status::Unopened};

// Scheduler stats
// These are protected by the Topology mutex, NOT the Arrow mutex!!!
int64_t m_thread_count = 0; // Current number of threads assigned to this arrow
std::atomic_int64_t m_running_upstreams {0}; // Current number of running arrows immediately upstream
std::atomic_int64_t* m_running_arrows = nullptr; // Current number of running arrows total, so we can detect pauses
friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows

protected:
Expand Down Expand Up @@ -117,16 +112,6 @@ class JArrow {
m_checkin_time = checkin_time;
}

void update_thread_count(int thread_count_delta) {
std::lock_guard<std::mutex> lock(m_arrow_mutex);
m_thread_count += thread_count_delta;
}

size_t get_thread_count() {
std::lock_guard<std::mutex> lock(m_arrow_mutex);
return m_thread_count;
}

// TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize
JArrowMetrics& get_metrics() {
return m_metrics;
Expand Down Expand Up @@ -157,79 +142,6 @@ class JArrow {
virtual void set_threshold(size_t /* threshold */) {}




Status get_status() const {
return m_status;
}

int64_t get_running_upstreams() const {
return m_running_upstreams;
}

void set_running_arrows(std::atomic_int64_t* running_arrows_ptr) {
m_running_arrows = running_arrows_ptr;
}

void run() {
Status status = m_status;

// if (status == Status::Unopened) {
// LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run(): Not initialized!" << LOG_END;
// throw JException("Arrow %s has not been initialized!", m_name.c_str());
// }
// if (status == Status::Running || m_status == Status::Finished) {
// LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => " << status << LOG_END;
// return;
// }
LOG_DEBUG(m_logger) << "Arrow '" << m_name << "' run() : " << status << " => Running" << LOG_END;
if (m_running_arrows != nullptr) (*m_running_arrows)++;
for (auto listener: m_listeners) {
listener->m_running_upstreams++;
listener->run(); // Activating something recursively activates everything downstream.
}
m_status = Status::Running;
}

void pause() {
Status status = m_status;
if (status != Status::Running) {
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => " << status << LOG_END;
return; // pause() is a no-op unless running
}
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' pause() : " << status << " => Paused" << LOG_END;
if (m_running_arrows != nullptr) (*m_running_arrows)--;
for (auto listener: m_listeners) {
listener->m_running_upstreams--;
// listener->pause();
// This is NOT a sufficient condition for pausing downstream listeners.
// What we need is zero running upstreams AND zero messages in queue AND zero threads currently processing
// Correspondingly, the scheduler or worker needs to be the one to call pause() when this condition is reached.
}
m_status = Status::Paused;
}

void finish() {
Status status = m_status;
LOG_DEBUG(m_logger) << "JArrow '" << m_name << "' finish() : " << status << " => Finished" << LOG_END;
Status old_status = m_status;
// if (old_status == Status::Unopened) {
// LOG_DEBUG(m_logger) << "JArrow '" << m_name << "': Uninitialized!" << LOG_END;
// throw JException("JArrow::finish(): Arrow %s has not been initialized!", m_name.c_str());
// }
if (old_status == Status::Running) {
if (m_running_arrows != nullptr) (*m_running_arrows)--;
for (auto listener: m_listeners) {
listener->m_running_upstreams--;
}
}
if (old_status != Status::Finished) {
LOG_TRACE(m_logger) << "JArrow '" << m_name << "': Finalizing (this must only happen once)" << LOG_END;
this->finalize();
}
m_status = Status::Finished;
}

void attach(JArrow* downstream) {
m_listeners.push_back(downstream);
};
Expand All @@ -247,14 +159,5 @@ inline std::ostream& operator<<(std::ostream& os, const JArrow::NodeType& nt) {
return os;
}

inline std::ostream& operator<<(std::ostream& os, const JArrow::Status& s) {
switch (s) {
case JArrow::Status::Unopened: os << "Unopened"; break;
case JArrow::Status::Running: os << "Running"; break;
case JArrow::Status::Paused: os << "Paused"; break;
case JArrow::Status::Finished: os << "Finished"; break;
}
return os;
}

#endif // GREENFIELD_ARROW_H
7 changes: 3 additions & 4 deletions src/libraries/JANA/Engine/JArrowPerfSummary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ std::ostream& operator<<(std::ostream& os, const JArrowPerfSummary& s) {
os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl;
os << std::endl;

os << " +--------------------------+------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;
os << " | Name | Status | Type | Par | Threads | Chunk | Thresh | Pending | Completed |" << std::endl;
os << " +--------------------------+------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Chunk | Thresh | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;

for (auto as : s.arrows) {
os << " | "
<< std::setw(24) << std::left << as.arrow_name << " | "
<< std::setw(11) << as.status << "| "
<< std::setw(6) << std::left << as.arrow_type << " | "
<< std::setw(3) << std::right << (as.is_parallel ? " T " : " F ") << " | "
<< std::setw(7) << as.thread_count << " |"
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Engine/JArrowPerfSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct ArrowSummary {
JArrow::NodeType arrow_type;
int running_upstreams;
bool has_backpressure;
JArrow::Status status;
size_t messages_pending;
size_t threshold;
size_t chunksize;
Expand Down
93 changes: 24 additions & 69 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void JArrowProcessingController::initialize() {
m_scheduler->logger = m_scheduler_logger;
LOG_INFO(m_logger) << m_topology->mapping << LOG_END;

m_topology->initialize();
m_scheduler->initialize_topology();

}

Expand All @@ -46,8 +46,8 @@ void JArrowProcessingController::initialize() {
/// @param [in] nthreads The number of worker threads to start
void JArrowProcessingController::run(size_t nthreads) {
LOG_INFO(m_logger) << "run(): Launching " << nthreads << " workers" << LOG_END;
// topology->run needs to happen _before_ threads are started so that threads don't quit due to lack of assignments
m_topology->run(nthreads);
// run_topology needs to happen _before_ threads are started so that threads don't quit due to lack of assignments
m_scheduler->run_topology(nthreads);

bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);

Expand All @@ -71,11 +71,11 @@ void JArrowProcessingController::run(size_t nthreads) {
void JArrowProcessingController::scale(size_t nthreads) {

LOG_INFO(m_logger) << "scale(): Stopping all running workers" << LOG_END;
m_topology->request_pause();
m_scheduler->request_topology_pause();
for (JWorker* worker : m_workers) {
worker->wait_for_stop();
}
m_topology->achieve_pause();
m_scheduler->achieve_topology_pause();

LOG_INFO(m_logger) << "scale(): All workers are stopped" << LOG_END;
bool pin_to_cpu = (m_topology->mapping.get_affinity() != JProcessorMapping::AffinityStrategy::None);
Expand All @@ -94,15 +94,15 @@ void JArrowProcessingController::scale(size_t nthreads) {

LOG_INFO(m_logger) << "scale(): Restarting " << nthreads << " workers" << LOG_END;
// topology->run needs to happen _before_ threads are started so that threads don't quit due to lack of assignments
m_topology->run(nthreads);
m_scheduler->run_topology(nthreads);

for (size_t i=0; i<nthreads; ++i) {
m_workers.at(i)->start();
};
}

void JArrowProcessingController::request_pause() {
m_topology->request_pause();
m_scheduler->request_topology_pause();
// Or:
// for (JWorker* worker : m_workers) {
// worker->request_stop();
Expand All @@ -116,7 +116,7 @@ void JArrowProcessingController::wait_until_paused() {
// Join all the worker threads.
// Do not trigger the pause (we may want the pause to come internally, e.g. from an event source running out.)
// Do NOT finish() the topology (we want the ability to be able to restart it)
m_topology->achieve_pause();
m_scheduler->achieve_topology_pause();
}

void JArrowProcessingController::request_stop() {
Expand All @@ -128,7 +128,7 @@ void JArrowProcessingController::request_stop() {
// request_pause => pause
// wait_until_stop => join_then_finish
// wait_until_pause => join
m_topology->drain();
m_scheduler->drain_topology();
}

void JArrowProcessingController::wait_until_stopped() {
Expand All @@ -138,18 +138,16 @@ void JArrowProcessingController::wait_until_stopped() {
}
// finish out the topology
// (note some arrows might have already finished e.g. event sources, but that's fine, finish() is idempotent)
m_topology->achieve_pause();
m_topology->finish();
m_scheduler->achieve_topology_pause();
m_scheduler->finish_topology();
}

bool JArrowProcessingController::is_stopped() {
// TODO: Protect topology current status
return m_topology->m_current_status == JArrowTopology::Status::Paused;
return m_scheduler->get_topology_status() == JScheduler::TopologyStatus::Paused;
}

bool JArrowProcessingController::is_finished() {
// TODO: Protect topology current status
return m_topology->m_current_status == JArrowTopology::Status::Finished;
return m_scheduler->get_topology_status() == JScheduler::TopologyStatus::Finalized;
}

bool JArrowProcessingController::is_timed_out() {
Expand Down Expand Up @@ -234,11 +232,11 @@ std::unique_ptr<const JArrowPerfSummary> JArrowProcessingController::measure_int

// Measure perf on all Workers first, as this will prompt them to publish
// any ArrowMetrics they have collected
m_perf_summary.workers.clear();
for (JWorker* worker : m_workers) {
WorkerSummary summary;
worker->measure_perf(summary);
m_perf_summary.workers.push_back(summary);
if (m_perf_summary.workers.size() != m_workers.size()) {
m_perf_summary.workers = std::vector<WorkerSummary>(m_workers.size());
}
for (size_t i=0; i<m_workers.size(); ++i) {
m_workers[i]->measure_perf(m_perf_summary.workers[i]);
}

size_t monotonic_event_count = 0;
Expand All @@ -253,59 +251,16 @@ std::unique_ptr<const JArrowPerfSummary> JArrowProcessingController::measure_int
double worst_seq_latency = 0;
double worst_par_latency = 0;

m_perf_summary.arrows.clear();
for (JArrow* arrow : m_topology->arrows) {
JArrowMetrics::Status last_status;
size_t total_message_count;
size_t last_message_count;
size_t total_queue_visits;
size_t last_queue_visits;
JArrowMetrics::duration_t total_latency;
JArrowMetrics::duration_t last_latency;
JArrowMetrics::duration_t total_queue_latency;
JArrowMetrics::duration_t last_queue_latency;

arrow->get_metrics().get(last_status, total_message_count, last_message_count, total_queue_visits,
last_queue_visits, total_latency, last_latency, total_queue_latency, last_queue_latency);

auto total_latency_ms = millisecs(total_latency).count();
auto total_queue_latency_ms = millisecs(total_queue_latency).count();

ArrowSummary summary;
summary.arrow_type = arrow->get_type();
summary.is_parallel = arrow->is_parallel();
summary.thread_count = arrow->get_thread_count();
summary.arrow_name = arrow->get_name();
summary.chunksize = arrow->get_chunksize();
summary.messages_pending = arrow->get_pending();
summary.running_upstreams = arrow->get_running_upstreams();
summary.threshold = arrow->get_threshold();
summary.status = arrow->get_status();

summary.total_messages_completed = total_message_count;
summary.last_messages_completed = last_message_count;
summary.queue_visit_count = total_queue_visits;

summary.avg_queue_latency_ms = (total_queue_visits == 0)
? std::numeric_limits<double>::infinity()
: total_queue_latency_ms / total_queue_visits;

summary.avg_queue_overhead_frac = total_queue_latency_ms / (total_queue_latency_ms + total_latency_ms);

summary.avg_latency_ms = (total_message_count == 0)
? std::numeric_limits<double>::infinity()
: total_latency_ms/total_message_count;

summary.last_latency_ms = (last_message_count == 0)
? std::numeric_limits<double>::infinity()
: millisecs(last_latency).count()/last_message_count;

if (arrow->is_parallel()) {
m_scheduler->summarize_arrows(m_perf_summary.arrows);


// Figure out what the bottlenecks in this topology are
for (const ArrowSummary& summary : m_perf_summary.arrows) {
if (summary.is_parallel) {
worst_par_latency = std::max(worst_par_latency, summary.avg_latency_ms);
} else {
worst_seq_latency = std::max(worst_seq_latency, summary.avg_latency_ms);
}
m_perf_summary.arrows.push_back(summary);
}

// bottlenecks
Expand Down
3 changes: 3 additions & 0 deletions src/libraries/JANA/Engine/JArrowProcessingController.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ class JArrowProcessingController : public JProcessingController {
void print_report() override;
void print_final_report() override;

// This is so we can test
inline JScheduler* get_scheduler() { return m_scheduler; }


private:

Expand Down
Loading

0 comments on commit 2db622a

Please sign in to comment.