Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of arrow execution machinery #385

Merged
merged 50 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c9b7750
Skeleton of JExecutionEngine
nathanwbrei Oct 14, 2024
27435de
Flesh out JExecutionEngine
nathanwbrei Oct 14, 2024
236e0a4
Remove JWorker retry with backoff
nathanwbrei Oct 30, 2024
0894364
Fold JTriggeredArrow into JArrow
nathanwbrei Oct 30, 2024
abbd55c
Test JExecutionEngine state machine with no workers
nathanwbrei Oct 30, 2024
042494e
Rough cut of new scheduler logic
nathanwbrei Oct 30, 2024
d21e288
Test new scheduler on a full topology with a single event
nathanwbrei Oct 31, 2024
98e262e
Launch and shutdown workers
nathanwbrei Nov 5, 2024
0309f07
JExecutionEngine runs simple self-terminating topology
nathanwbrei Nov 5, 2024
50d582f
Run self-terminating topology with multiple workers
nathanwbrei Nov 5, 2024
0a84471
Bring back ticker
nathanwbrei Nov 6, 2024
96df689
Bring back RequestPause
nathanwbrei Nov 6, 2024
279074a
Bring back PrintFinalReport
nathanwbrei Nov 7, 2024
178dbc5
Bring back basic timeout and exception handling
nathanwbrei Nov 7, 2024
17c9e5c
Detect draining queues
nathanwbrei Nov 7, 2024
5b11237
JApplication uses JExecutionEngine
nathanwbrei Nov 7, 2024
c72fdbf
Rename JExecutionEngine::{ArrowState,WorkerState}
nathanwbrei Nov 7, 2024
ffbb90b
Streamline JEventQueue
nathanwbrei Nov 8, 2024
c31840c
Streamline JEventPool
nathanwbrei Nov 8, 2024
a9925b3
Disable subevent examples and tests
nathanwbrei Nov 8, 2024
8de2bd8
WIP: Arrows use JEventQueue instead of JMailbox<JEvent*>
nathanwbrei Nov 8, 2024
7cd50af
Remove JSubeventArrow, JMailbox
nathanwbrei Nov 8, 2024
29782ba
Remove JArrowProcessingController
nathanwbrei Nov 8, 2024
b8073f9
Fix segfault when calling SetTicker()
nathanwbrei Nov 10, 2024
1b4cd74
Rethink Scale()
nathanwbrei Nov 11, 2024
c92c8fa
Remove JArrow listeners, get_pending()
nathanwbrei Nov 11, 2024
b348fd2
Renegotiate wait/stop/finish
nathanwbrei Nov 11, 2024
4d74cfc
Transition JArrowMetrics::Status to JArrow::FireResult
nathanwbrei Nov 11, 2024
f34e8be
Implement JExecutionEngine::Fire
nathanwbrei Nov 11, 2024
123c154
JArrow::execute no longer depends on JArrowMetrics
nathanwbrei Nov 11, 2024
36218c0
Remove JArrowMetrics
nathanwbrei Nov 11, 2024
3fcb119
Test cases pass again
nathanwbrei Nov 12, 2024
0d719b8
Fix arrow pause/restart logic
nathanwbrei Nov 13, 2024
69ee17c
Bring back warmup timeout
nathanwbrei Nov 13, 2024
8a43a58
Add event number and arrow index to worker state
nathanwbrei Nov 13, 2024
b00a975
Renegotiate boundary between Task and WorkerState
nathanwbrei Nov 13, 2024
7b854d9
JBenchmarker runs again
nathanwbrei Nov 13, 2024
7919eb3
Bring back SIGINT handler
nathanwbrei Nov 14, 2024
a8bc564
Feature: Timeout stack trace
nathanwbrei Nov 15, 2024
9f97b3a
Feature: JBacktrace shows line numbers
nathanwbrei Nov 15, 2024
ba460a4
JTest: Add parameter for intentional timeout
nathanwbrei Nov 15, 2024
824292f
Use new JBacktrace everywhere
nathanwbrei Nov 15, 2024
8fc76c3
Bring back worker report
nathanwbrei Nov 15, 2024
0043432
Fix TSAN complaints
nathanwbrei Nov 15, 2024
66a504a
Renegotiate JExecutionEngine method names
nathanwbrei Nov 15, 2024
971d8bf
Split jBacktrace source from header
nathanwbrei Nov 15, 2024
c5596a4
Bring back SIGUSR1 sending worker report to named pipe
nathanwbrei Nov 15, 2024
cd75fa6
Small fixes
nathanwbrei Nov 19, 2024
51a5a6f
Disable JBacktrace::AddrToLine temporarily
nathanwbrei Nov 19, 2024
074a77d
JExecutionEngine sets exit codes
nathanwbrei Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
JExecutionEngine runs simple self-terminating topology
  • Loading branch information
nathanwbrei committed Nov 15, 2024
commit 0309f077af3ecc3401895e2a2eec17b52262c138
52 changes: 32 additions & 20 deletions src/libraries/JANA/Engine/JExecutionEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,13 @@ void JExecutionEngine::Scale(size_t nthreads) {

std::unique_lock<std::mutex> lock(m_mutex);

assert(m_runstatus == RunStatus::Paused);
assert(m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Finished);

auto prev_nthreads = m_workers.size();

if (prev_nthreads < nthreads) {
// We are launching additional worker threads
LOG_DEBUG(GetLogger()) << "Scaling up to " << nthreads << " worker threads" << LOG_END;

for (size_t worker_id=prev_nthreads; worker_id < nthreads; ++worker_id) {
auto worker = std::make_unique<Worker>();
worker->worker_id = worker_id;
Expand All @@ -93,23 +92,29 @@ void JExecutionEngine::Scale(size_t nthreads) {
}
lock.unlock();

m_condvar.notify_all(); // Wake up all threads so that they can exit the condvar wait loop

// We join all (eligible) threads _outside_ of the mutex
for (size_t worker_id=prev_nthreads-1; worker_id <= nthreads; --worker_id) {
if (m_workers[worker_id]->thread == nullptr) {
// Release external workers without joining any threads
m_workers.pop_back();
continue;
}
if (m_workers[worker_id]->is_timed_out) {
// Thread has timed out. Rather than non-cooperatively killing it,
// we relinquish ownership of it but remember that it was ours once and
// is still out there, somewhere, biding its time
m_workers[worker_id]->thread->detach();
if (m_workers[worker_id]->thread != nullptr) {
if (m_workers[worker_id]->is_timed_out) {
// Thread has timed out. Rather than non-cooperatively killing it,
// we relinquish ownership of it but remember that it was ours once and
// is still out there, somewhere, biding its time
m_workers[worker_id]->thread->detach();
}
else {
m_workers[worker_id]->thread->join();
}
}
else {
m_workers[worker_id]->thread->join();
}

lock.lock();
// We retake the mutex so we can safely modify m_workers
for (size_t worker_id=prev_nthreads-1; worker_id <= nthreads; --worker_id) {
if (m_workers.back()->thread != nullptr) {
delete m_workers.back()->thread;
}
delete m_workers[worker_id]->thread;
m_workers[worker_id]->thread = nullptr;
m_workers.pop_back();
}
}
Expand All @@ -130,11 +135,11 @@ void JExecutionEngine::RequestDrain() {
void JExecutionEngine::Wait(bool finish) {

while (true) {
std::unique_lock<std::mutex> lock(m_mutex);
if (m_runstatus == RunStatus::Paused || m_runstatus == RunStatus::Failed) {
auto perf = GetPerf();
if (perf.runstatus == RunStatus::Paused || perf.runstatus == RunStatus::Finished) {
break;
}
LOG_INFO(GetLogger()) << "Processing ..." << LOG_END;
LOG_INFO(GetLogger()) << "Processing ... " << perf.event_count << " events completed" << LOG_END;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
LOG_INFO(GetLogger()) << "... Processing paused" << LOG_END;
Expand Down Expand Up @@ -243,6 +248,10 @@ void JExecutionEngine::ExchangeTask(Task& task, bool nonblocking) {
IngestCompletedTask_Unsafe(task);
}

if (task.worker_id == -1 || task.worker_id >= (int) m_workers.size()) {
// This happens if we are an external worker and someone called Scale() to deactivate us
return;
}
auto& worker = m_workers[task.worker_id];
if (worker->is_stop_requested) {
return;
Expand Down Expand Up @@ -359,9 +368,11 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task) {

bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready. Checking for pause..." << LOG_END;

for (auto& state : m_scheduler_state) {
LOG_INFO(GetLogger()) << "Scheduler: src=" << state.is_source << ", active_tasks=" << state.active_tasks << ", parallel=" << state.is_parallel << LOG_END;
LOG_DEBUG(GetLogger()) << "Scheduler: src=" << state.is_source << ", active_tasks=" << state.active_tasks << ", parallel=" << state.is_parallel << LOG_END;
any_active_source_found |= (state.is_active && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
if (state.is_source && !state.is_active) {
Expand All @@ -379,6 +390,7 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task) {
m_event_count_at_finish += state.events_processed;
}
}
LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}
Expand Down
29 changes: 29 additions & 0 deletions src/programs/unit_tests/Engine/JExecutionEngineTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,35 @@ TEST_CASE("JExecutionEngine_ScaleWorkers") {
}


TEST_CASE("JExecutionEngine_RunSingleEvent") {
JApplication app;
app.SetParameterValue("jana:nevents", 3);
app.SetParameterValue("jana:loglevel", "debug");
app.Add(new TestSource());
app.Add(new TestProc());
app.ProvideService(std::make_shared<JExecutionEngine>());
app.Initialize();
auto sut = app.GetService<JExecutionEngine>();

SECTION("SingleWorker") {
REQUIRE(sut->GetPerf().thread_count == 0);
sut->Scale(1);
sut->Run();

sut->Wait();
REQUIRE(sut->GetPerf().thread_count == 1);
REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Paused);

sut->Finish();
REQUIRE(sut->GetPerf().runstatus == JExecutionEngine::RunStatus::Finished);
REQUIRE(sut->GetPerf().event_count == 3);

REQUIRE(sut->GetPerf().thread_count == 1);
sut->Scale(0);
REQUIRE(sut->GetPerf().thread_count == 0);
}
}

} // jana::engine::tests


Expand Down