From 0f5de8a8f149657e78329b2a3b98b811d79b0ff5 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Thu, 10 Oct 2024 15:07:21 -0400 Subject: [PATCH] jana-plot-utilization.py shows barrier events --- scripts/jana-plot-utilization.py | 139 ++++++++++++++---- .../JANA/Topology/JEventSourceArrow.cc | 3 + .../Components/BarrierEventTests.cc | 9 ++ 3 files changed, 125 insertions(+), 26 deletions(-) diff --git a/scripts/jana-plot-utilization.py b/scripts/jana-plot-utilization.py index c46d5637a..80dc9ef03 100644 --- a/scripts/jana-plot-utilization.py +++ b/scripts/jana-plot-utilization.py @@ -8,43 +8,103 @@ def parse_logfile(): # Parse the logs and store intervals - thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name) + thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name, event_nr, result) + barrier_history = [] # [(released_timestamp, finished_timestamp)] + start_times = {} # Key: (thread_id, processor_name), Value: start_time # Define a regular expression to parse the log lines - log_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) (Executing|Executed) arrow (\w+)") + source_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+)$") + source_finish_noemit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+)$") + source_finish_emit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), emitting event# (\d+)$") + source_finish_pending_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), holding back barrier event# (\d+)$") + processor_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+) for event# (\d+)$") + processor_finish_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) for event# (\d+)$") + barrier_inflight_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event is in-flight$") + barrier_finished_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event finished, returning to normal operation$") with open("log.txt", "r") as log_file: for line in log_file: - match = re.search(log_pattern, line.strip()) + + match = re.match(source_start_pattern, line.strip()) if match: - hours_str, mins_str, secs_str, millis_str, thread_id, action, processor_name = match.groups() + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_times[(thread_id, processor_name)] = millis + continue - # Convert timestamp to milliseconds + match = re.match(source_finish_noemit_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, None, result)) + continue + + match = re.match(source_finish_emit_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result)) + continue + + match = re.match(source_finish_pending_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result)) + continue + + match = re.match(processor_start_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_times[(thread_id, processor_name)] = millis + continue + + match = re.match(processor_finish_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result)) + continue + + match = re.match(barrier_inflight_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id = match.groups() millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_times[()] = millis + continue - if action == "Executing": - # Log the start time of the processor for the thread - start_times[(thread_id, processor_name)] = millis + match = re.match(barrier_finished_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((), None) + if start_time: + barrier_history.append((start_time, millis)) + continue - elif action == "Executed": - # Calculate the duration of the processor and store the interval - start_time = start_times.pop((thread_id, processor_name), None) - if start_time: - thread_history[thread_id].append((start_time, millis, processor_name)) + return (thread_history, barrier_history) - return thread_history -def create_svg(all_thread_history): +def create_svg(all_thread_history, barrier_history): # Assign colors to processors processor_colors = {} - color_palette = ['#FF6347', '#4682B4', '#32CD32', '#FFD700', '#9370DB', '#FF69B4'] + color_palette = ['#004E64', '#00A5CF', '#9FFFCB', '#25A18E', '#7AE582', '#FF69B4'] color_index = 0 # Figure out drawing coordinate system - overall_start_time = min(start for history in all_thread_history.values() for (start,_,_) in history) - overall_end_time = max(end for history in all_thread_history.values() for (_,end,_) in history) + overall_start_time = min(start for history in all_thread_history.values() for (start,_,_,_,_) in history) + overall_end_time = max(end for history in all_thread_history.values() for (_,end,_,_,_) in history) thread_count = len(all_thread_history) width=1000 x_scale = width/(overall_end_time-overall_start_time) @@ -55,14 +115,28 @@ def create_svg(all_thread_history): # Create the SVG drawing dwg = svgwrite.Drawing("timeline.svg", profile='tiny', size=(width, height)) - dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white")) + #dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white")) # Draw a rectangle for each processor run on each thread's timeline y_position = thread_y_padding + for barrier_start,barrier_end in barrier_history: + rect_start = (barrier_start-overall_start_time)*x_scale + if (barrier_end == barrier_start): + rect_width=1 + else: + rect_width = (barrier_end-barrier_start)*x_scale + + rect = dwg.rect(insert=(rect_start, 0), + size=(rect_width, height), + fill="red", + stroke="none", + stroke_width=1) + dwg.add(rect) + for thread_id, intervals in all_thread_history.items(): dwg.add(dwg.rect(insert=(0,y_position),size=(1000,thread_height),stroke="lightgray",fill="lightgray")) - for start_time, end_time, processor_name in intervals: + for start_time, end_time, processor_name, event_nr, result in intervals: # Calculate the position and width of the rectangle # Assign a unique color to each processor name if processor_name not in processor_colors: @@ -71,31 +145,44 @@ def create_svg(all_thread_history): # Draw the rectangle rect_start = (start_time-overall_start_time)*x_scale - rect_width = (end_time-start_time)*x_scale + if (end_time == start_time): + rect_width=1 + else: + rect_width = (end_time-start_time)*x_scale + + rect_stroke_color = "black" + if (result == "ComeBackLater" and event_nr is None): + rect_stroke_color = "gray" + + rect = dwg.rect(insert=(rect_start, y_position), size=(rect_width, thread_height), fill=processor_colors[processor_name], - stroke="black", - stroke_width=0) - mouseover = processor_name + ": " + str(end_time-start_time) + "ms" + stroke=rect_stroke_color, + stroke_width=1) + mouseover = "Arrow: " + processor_name + "\nEvent nr: " + str(event_nr) + "\nResult: " + result + "\nTime: "+ str(end_time-start_time) + "ms" rect.add(svgwrite.base.Title(mouseover)) dwg.add(rect) + if (event_nr is not None): + text = dwg.text(str(event_nr), insert=(rect_start+1, y_position+thread_height-1), fill="white", font_size=8) + dwg.add(text) # Move the y position for the next thread y_position += (thread_y_padding + thread_height) + # Save the SVG file dwg.save() if __name__ == "__main__": - thread_history = parse_logfile() + thread_history,barrier_history = parse_logfile() #thread_history = { # 1103:[(0,1,"a"), (2,5,"b"), (6,8,"a")], # 219:[(0,3,"b"), (3,6,"c"), (9,10,"d")], # 3:[(2,7,"a")] #} - create_svg(thread_history) + create_svg(thread_history, barrier_history) diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 74834a114..0c5bc48a7 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -93,6 +93,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St else { // Topology has _not_ finished draining, all we can do is wait LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; arrow_status = JArrowMetrics::Status::ComeBackLater; success = false; return nullptr; @@ -113,6 +114,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St else { // Barrier event has NOT finished LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; success = false; arrow_status = JArrowMetrics::Status::ComeBackLater; return nullptr; @@ -138,6 +140,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St } else if ((*event)->GetSequential()){ // Source succeeded, but returned a barrier event + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << (*event)->GetEventNumber() << LOG_END; m_pending_barrier_event = event; m_barrier_active = true; return nullptr; diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index ae484a48f..ba90ea2d5 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -5,6 +5,7 @@ #include #include #include +#include "JANA/Utils/JBenchUtils.h" #include "catch.hpp" int global_resource = 0; @@ -12,6 +13,8 @@ int global_resource = 0; struct BarrierSource : public JEventSource { + JBenchUtils bench; + BarrierSource() { SetCallbackStyle(CallbackStyle::ExpertMode); } @@ -31,6 +34,7 @@ struct BarrierSource : public JEventSource { else { LOG_INFO(GetLogger()) << "Emitting non-barrier event " << event_nr << LOG_END; } + bench.consume_cpu_ms(50, 0, true); return Result::Success; } }; @@ -39,6 +43,8 @@ struct BarrierSource : public JEventSource { struct BarrierProcessor : public JEventProcessor { + JBenchUtils bench; + BarrierProcessor() { SetCallbackStyle(CallbackStyle::ExpertMode); } @@ -53,6 +59,7 @@ struct BarrierProcessor : public JEventProcessor { LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END; REQUIRE(global_resource == (event.GetEventNumber() / 10)); } + bench.consume_cpu_ms(100, 0, true); } }; @@ -64,6 +71,8 @@ TEST_CASE("BarrierEventTests") { app.Add(new BarrierSource); app.SetParameterValue("nthreads", 4); app.SetParameterValue("jana:nevents", 40); + app.SetParameterValue("jana:log:show_threadstamp", true); + app.SetParameterValue("jana:loglevel", "debug"); app.Run(true); } };