Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of arrow execution machinery #385

Merged
merged 50 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c9b7750
Skeleton of JExecutionEngine
nathanwbrei Oct 14, 2024
27435de
Flesh out JExecutionEngine
nathanwbrei Oct 14, 2024
236e0a4
Remove JWorker retry with backoff
nathanwbrei Oct 30, 2024
0894364
Fold JTriggeredArrow into JArrow
nathanwbrei Oct 30, 2024
abbd55c
Test JExecutionEngine state machine with no workers
nathanwbrei Oct 30, 2024
042494e
Rough cut of new scheduler logic
nathanwbrei Oct 30, 2024
d21e288
Test new scheduler on a full topology with a single event
nathanwbrei Oct 31, 2024
98e262e
Launch and shutdown workers
nathanwbrei Nov 5, 2024
0309f07
JExecutionEngine runs simple self-terminating topology
nathanwbrei Nov 5, 2024
50d582f
Run self-terminating topology with multiple workers
nathanwbrei Nov 5, 2024
0a84471
Bring back ticker
nathanwbrei Nov 6, 2024
96df689
Bring back RequestPause
nathanwbrei Nov 6, 2024
279074a
Bring back PrintFinalReport
nathanwbrei Nov 7, 2024
178dbc5
Bring back basic timeout and exception handling
nathanwbrei Nov 7, 2024
17c9e5c
Detect draining queues
nathanwbrei Nov 7, 2024
5b11237
JApplication uses JExecutionEngine
nathanwbrei Nov 7, 2024
c72fdbf
Rename JExecutionEngine::{ArrowState,WorkerState}
nathanwbrei Nov 7, 2024
ffbb90b
Streamline JEventQueue
nathanwbrei Nov 8, 2024
c31840c
Streamline JEventPool
nathanwbrei Nov 8, 2024
a9925b3
Disable subevent examples and tests
nathanwbrei Nov 8, 2024
8de2bd8
WIP: Arrows use JEventQueue instead of JMailbox<JEvent*>
nathanwbrei Nov 8, 2024
7cd50af
Remove JSubeventArrow, JMailbox
nathanwbrei Nov 8, 2024
29782ba
Remove JArrowProcessingController
nathanwbrei Nov 8, 2024
b8073f9
Fix segfault when calling SetTicker()
nathanwbrei Nov 10, 2024
1b4cd74
Rethink Scale()
nathanwbrei Nov 11, 2024
c92c8fa
Remove JArrow listeners, get_pending()
nathanwbrei Nov 11, 2024
b348fd2
Renegotiate wait/stop/finish
nathanwbrei Nov 11, 2024
4d74cfc
Transition JArrowMetrics::Status to JArrow::FireResult
nathanwbrei Nov 11, 2024
f34e8be
Implement JExecutionEngine::Fire
nathanwbrei Nov 11, 2024
123c154
JArrow::execute no longer depends on JArrowMetrics
nathanwbrei Nov 11, 2024
36218c0
Remove JArrowMetrics
nathanwbrei Nov 11, 2024
3fcb119
Test cases pass again
nathanwbrei Nov 12, 2024
0d719b8
Fix arrow pause/restart logic
nathanwbrei Nov 13, 2024
69ee17c
Bring back warmup timeout
nathanwbrei Nov 13, 2024
8a43a58
Add event number and arrow index to worker state
nathanwbrei Nov 13, 2024
b00a975
Renegotiate boundary between Task and WorkerState
nathanwbrei Nov 13, 2024
7b854d9
JBenchmarker runs again
nathanwbrei Nov 13, 2024
7919eb3
Bring back SIGINT handler
nathanwbrei Nov 14, 2024
a8bc564
Feature: Timeout stack trace
nathanwbrei Nov 15, 2024
9f97b3a
Feature: JBacktrace shows line numbers
nathanwbrei Nov 15, 2024
ba460a4
JTest: Add parameter for intentional timeout
nathanwbrei Nov 15, 2024
824292f
Use new JBacktrace everywhere
nathanwbrei Nov 15, 2024
8fc76c3
Bring back worker report
nathanwbrei Nov 15, 2024
0043432
Fix TSAN complaints
nathanwbrei Nov 15, 2024
66a504a
Renegotiate JExecutionEngine method names
nathanwbrei Nov 15, 2024
971d8bf
Split jBacktrace source from header
nathanwbrei Nov 15, 2024
c5596a4
Bring back SIGUSR1 sending worker report to named pipe
nathanwbrei Nov 15, 2024
cd75fa6
Small fixes
nathanwbrei Nov 19, 2024
51a5a6f
Disable JBacktrace::AddrToLine temporarily
nathanwbrei Nov 19, 2024
074a77d
JExecutionEngine sets exit codes
nathanwbrei Nov 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ add_subdirectory(src/python)
install(DIRECTORY scripts/ DESTINATION bin FILES_MATCHING PATTERN "jana-*.py"
PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_READ WORLD_EXECUTE)

install(FILES "scripts/jana-status.sh" RENAME "jana-status" DESTINATION "bin"
PERMISSIONS OWNER_EXECUTE OWNER_WRITE OWNER_READ GROUP_EXECUTE GROUP_READ WORLD_READ WORLD_EXECUTE)

include(${CMAKE_SOURCE_DIR}/cmake/MakeConfig.cmake)
include(${CMAKE_SOURCE_DIR}/cmake/MakeJanaThis.cmake)
include(${CMAKE_SOURCE_DIR}/cmake/MakeJVersionH.cmake)
6 changes: 1 addition & 5 deletions docs/doxygen/doxygen_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ This website provides documentation for JANA2 C++ API automatically generated by

* [JParameterManager](class_j_parameter_manager.html): Furnish the user with parameters extracted from command line flags and configuration files

## Parallelism engine
* [JExecutionEngine](class_j_execution_engine.html): Runs the topology. Manages the thread team and the topology status.

* [JProcessingController](class_j_processing_controller.html): The interface which any parallelism engine must adhere to
* [JArrowProcessingController](class_j_arrow_processing_controller.html): The entry point into the "Arrow" engine
* [JWorker](class_j_worker.html): Contains the loop for each worker thread, along with startup/shutdown logic and encapsulated worker state.
* [JScheduler](class_j_scheduler.html): Contains the logic for giving a worker a new assignment


35 changes: 35 additions & 0 deletions scripts/jana-status.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash

# Default named pipe path
DEFAULT_PIPE="/tmp/jana_status"

# Check for required arguments
if [[ $# -lt 1 ]]; then
echo "Usage: $0 <PID> [jana_status_pipe]"
exit 1
fi

# Assign variables
PID="$1"
PIPE="${2:-$DEFAULT_PIPE}" # Use the second argument if provided, otherwise use the default

# Validate the PID
if ! kill -0 "$PID" 2>/dev/null; then
echo "Error: Process with PID $PID does not exist."
exit 1
fi

# Check if the pipe exists
if [[ ! -p "$PIPE" ]]; then
echo "Error: Named pipe '$PIPE' does not exist."
exit 1
fi

# Send the USR1 signal to the process
if ! kill -USR1 "$PID"; then
echo "Error: Failed to send SIGUSR1 to PID $PID."
exit 1
fi

# Cat the named pipe
cat "$PIPE"
4 changes: 2 additions & 2 deletions src/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ add_subdirectory(DstExample)
add_subdirectory(Tutorial)
add_subdirectory(StreamingExample)
add_subdirectory(EventGroupExample)
add_subdirectory(SubeventExample)
add_subdirectory(SubeventCUDAExample)
#add_subdirectory(SubeventExample)
#add_subdirectory(SubeventCUDAExample)
add_subdirectory(UnitTestingExample)
add_subdirectory(PodioDatamodel)
add_subdirectory(PodioFileReader)
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/CLI/JMain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ int Execute(JApplication* app, UserOptions &options) {
else {
// Run JANA in normal mode
try {
app->Run();
app->Run(true, true);
}
catch (JException& e) {
std::cout << "----------------------------------------------------------" << std::endl;
Expand Down
137 changes: 30 additions & 107 deletions src/libraries/JANA/CLI/JSignalHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,109 +4,19 @@

#include "JSignalHandler.h"

#include <csignal>
#include <thread>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>

#include <JANA/JApplication.h>
#include <JANA/Engine/JExecutionEngine.h>
#include <JANA/Utils/JBacktrace.h>

#include <unistd.h>
#include <csignal>

/// JSignalHandler bundles together the logic for querying a JApplication
/// about its JStatus with signal handlers for USR1, USR2, and CTRL-C.
namespace JSignalHandler {

JApplication* g_app;
int g_sigint_count = 0;
JLogger* g_logger;
std::string g_path_to_named_pipe = "/tmp/jana_status";
std::map<pthread_t, std::string> g_thread_reports;
std::atomic_int g_thread_report_count;


void create_named_pipe(const std::string& path_to_named_pipe) {

LOG_WARN(*g_logger) << "Creating pipe named \"" << g_path_to_named_pipe
<< "\" for status info." << LOG_END;

mkfifo(path_to_named_pipe.c_str(), 0666);
}


void send_to_named_pipe(const std::string& path_to_named_pipe, const std::string& data) {

int fd = open(path_to_named_pipe.c_str(), O_WRONLY);
if (fd >=0) {
write(fd, data.c_str(), data.length()+1);
close(fd);
}
else {
LOG_ERROR(*g_logger) << "Unable to open named pipe '" << g_path_to_named_pipe << "' for writing. \n"
<< " You can use a different named pipe for status info by setting the parameter `jana:status_fname`.\n"
<< " The status report will still show up in the log." << LOG_END;
}
}

void produce_thread_report() {
std::stringstream bt_str;
make_backtrace(bt_str);
g_thread_reports[pthread_self()] = bt_str.str();
}

/// If something goes wrong, we want to signal all threads to assemble a report
/// Whereas USR1 is meant to be triggered externally and is caught by one thread,
/// produce_overall_report triggers USR2 and is caught by all threads.
std::string produce_overall_report() {
std::stringstream ss;

// Include detailed report from JApplication
auto t = time(nullptr);
ss << "JANA status report: " << ctime(&t) << std::endl;

// Include backtraces from each individual thread
if( typeid(std::thread::native_handle_type) == typeid(pthread_t) ){
ss << "Thread model: pthreads" << std::endl;

// Send every worker thread (but not self) the USR2 signal
auto main_thread_id = pthread_self();
std::vector<pthread_t> threads; // TODO: Populate this
g_thread_report_count = threads.size();
for (auto& thread_id : threads) {
if (main_thread_id == thread_id) {
pthread_kill(thread_id, SIGUSR2);
}
}

// Assemble backtrace for own thread
std::stringstream bt_str;
make_backtrace(bt_str);
g_thread_reports[main_thread_id] = bt_str.str();

// Wait for all other threads to finish handling USR2
for(int i=0; i<1000; i++){
std::this_thread::sleep_for(std::chrono::microseconds(1000));
if (g_thread_report_count == 0) break;
}

// Assemble overall backtrace
for (const auto& thread_report : g_thread_reports) {
ss << thread_report.first << ": " << std::endl << thread_report.second << std::endl;
}

// Clear backtrace
g_thread_reports.clear();
// TODO: Backtrace memory use is unsafe
}
else {
ss << "Thread model: unknown" << std::endl;
}
return ss.str();
}

void send_overall_report_to_named_pipe() {
LOG_WARN(*g_logger) << "Caught USR1 signal! Sending status report to named pipe. `cat " << g_path_to_named_pipe << "` to view." << LOG_END;
send_to_named_pipe(g_path_to_named_pipe, produce_overall_report());
}


/// Handle SIGINT signals (e.g. from hitting Ctrl-C). When a SIGINT
Expand All @@ -115,23 +25,39 @@ void send_overall_report_to_named_pipe() {
/// The first 2 SIGINT signals received will tell JANA to shutdown gracefully.
/// On the 3rd SIGINT, the program will try to exit immediately.
void handle_sigint(int) {
g_app->HandleSigint();
if (g_app->IsInitialized()) {
g_app->GetService<JExecutionEngine>()->HandleSIGINT();
}
else {
exit(-2);
}
}

void handle_usr1(int) {
std::thread th(send_overall_report_to_named_pipe);
th.detach();
if (g_app->IsInitialized()) {
g_app->GetService<JExecutionEngine>()->HandleSIGUSR1();
}
}

void handle_usr2(int) {
produce_thread_report();
if (g_app->IsInitialized()) {
g_app->GetService<JExecutionEngine>()->HandleSIGUSR2();
}
}

void handle_tstp(int) {
if (g_app->IsInitialized()) {
g_app->GetService<JExecutionEngine>()->HandleSIGTSTP();
}
}

void handle_sigsegv(int /*signal_number*/, siginfo_t* /*signal_info*/, void* /*context*/) {
LOG_FATAL(*g_logger) << "Segfault detected! Printing backtraces and exiting." << LOG_END;
auto report = produce_overall_report();
LOG_FATAL(*g_logger) << report << LOG_END;
exit(static_cast<int>(JApplication::ExitCode::Segfault));
LOG_FATAL(*g_logger) << "Segfault detected!" << LOG_END;
JBacktrace backtrace;
backtrace.Capture(3);
LOG_FATAL(*g_logger) << "Hard exit due to segmentation fault! Backtrace:\n\n" << backtrace.ToString() << LOG_END;
_exit(static_cast<int>(JApplication::ExitCode::Segfault));
// _exit() is async-signal-safe, whereas exit() is not
}


Expand All @@ -145,10 +71,6 @@ void register_handlers(JApplication* app) {
// It would be nice to do this in a less unexpected place, and hopefully that will naturally
// emerge from future refactorings.

g_app->GetJParameterManager()->SetDefaultParameter("jana:status_fname", g_path_to_named_pipe,
"Filename of named pipe for retrieving instantaneous status info");
create_named_pipe(g_path_to_named_pipe);

//Define signal action
struct sigaction sSignalAction;
sSignalAction.sa_sigaction = handle_sigsegv;
Expand All @@ -161,6 +83,7 @@ void register_handlers(JApplication* app) {
LOG_WARN(*g_logger) << "Setting signal handler USR1. Use to write status info to the named pipe." << LOG_END;
signal(SIGUSR1, handle_usr1);
signal(SIGUSR2, handle_usr2);
signal(SIGTSTP, handle_tstp);
LOG_WARN(*g_logger) << "Setting signal handler SIGINT (Ctrl-C). Use a single SIGINT to enter the Inspector, or multiple SIGINTs for an immediate shutdown." << LOG_END;
signal(SIGINT, handle_sigint);
}
Expand Down
28 changes: 2 additions & 26 deletions src/libraries/JANA/CLI/JSignalHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,11 @@

#pragma once

#include <csignal>
#include <thread>
#include <fcntl.h>
#include <unistd.h>
#include <sys/stat.h>
class JApplication;

#include <JANA/JApplication.h>

/// JSignalHandler bundles together the logic for querying a JApplication
/// about its JStatus with signal handlers for USR1, USR2, and CTRL-C.
/// JSignalHandler attaches signal handlers for USR1, USR2, and CTRL-C to a given JApplication instance.
namespace JSignalHandler {

extern JApplication* g_app;
extern int g_sigint_count;
extern JLogger* g_logger;
extern std::string g_path_to_named_pipe;
extern std::map<pthread_t, std::string> g_thread_reports;
extern std::atomic_int g_thread_report_count;


void create_named_pipe(const std::string& path_to_named_pipe);
void send_to_named_pipe(const std::string& path_to_named_pipe, const std::string& data);
void produce_thread_report();
std::string produce_overall_report();
void send_overall_report_to_named_pipe();
void handle_sigint(int);
void handle_usr1(int);
void handle_usr2(int);
void handle_sigsegv(int /*signal_number*/, siginfo_t* /*signal_info*/, void* /*context*/);
void register_handlers(JApplication* app);

}; // namespace JSignalHandler
Expand Down
7 changes: 2 additions & 5 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ set(JANA2_SOURCES
JVersion.cc
JEvent.cc

Engine/JArrowProcessingController.cc
Engine/JScheduler.cc
Engine/JWorker.cc
Engine/JPerfMetrics.cc
Engine/JPerfSummary.cc
Engine/JExecutionEngine.cc

Topology/JArrow.cc
Topology/JEventSourceArrow.cc
Expand All @@ -41,6 +37,7 @@ set(JANA2_SOURCES
Utils/JCallGraphRecorder.cc
Utils/JInspector.cc
Utils/JApplicationInspector.cc
Utils/JBacktrace.cc

Calibrations/JCalibration.cc
Calibrations/JCalibrationFile.cc
Expand Down
Loading
Loading