-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathanalyzer.py
194 lines (164 loc) · 9.15 KB
/
analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import pandas as pd
import numpy as np
from event_group import EventGroup
import os
import logging
class Analyzer:
"""
`Analyzer` is responsible for handling the raw performance data generated by `Profiler` and output the report of performance metrics.
"""
def __init__(self, test_dir: str, configs: dict, event_groups: EventGroup) -> None:
"""
Constructor of `Analyzer`
:param `test_dir_path`: a string of the path of test directory,
which can be obtained by `Connector.get_test_dir_path()`
:param `configs`: a dict of parsed configurations (the member `configs` in `Controller`)
:param `event_group`: an instance of `EventGroup`
"""
self.logger = logging.getLogger("hperf")
self.test_dir = test_dir
self.configs = configs
self.event_groups = event_groups
self.cpu_topo: pd.DataFrame = None # for cpu topo (mapping of cpu id and socket id)
self.timeseries: pd.DataFrame = None # for timeseries results
self.aggregated_metrics: pd.DataFrame = None # for aggregated results
def __analyze_cpu_topo(self):
"""
"""
self.cpu_topo = pd.read_csv(os.path.join(self.test_dir, "cpu_topo"),
sep="\t",
header=None,
names=["unit", "socket"],
usecols=[0, 1])
def analyze(self):
"""
"""
self.__analyze_cpu_topo()
# read the raw performance data file generated by `Profiler` and convert to DataFrame
perf_raw_data = pd.read_csv(os.path.join(self.test_dir, "perf_result"),
sep="\t",
header=None,
names=["timestamp", "unit", "value", "metric"],
usecols=[0, 1, 2, 4])
# rename 'unit' according to self.event_groups.events[..]['type']
# e.g. 'duration_time' is a system-wide event, where in each timestamp there is only a value (attribute to CPU0)
# timestamp | unit | value | metric -> timestamp | unit | value | metric
# 1.0000 | CPU0 | 1.001 | duration_time 1.0000 | system | 1.001 | duration_time
# 1.0000 | CPU0 | 12345 | cycles 1.0000 | CPU0 | 12345 | cycles
# 1.0000 | CPU1 | 23456 | cycles 1.0000 | CPU1 | 23456 | cycles
# ...
system_event_flag = False
socket_event_flag = False
def cpu2socket(x):
cpu_id = int(x["unit"][3:])
return "SOCKET" + str(self.cpu_topo.loc[self.cpu_topo.unit == cpu_id]["socket"].values[0])
for item in self.event_groups.events:
if "type" in item:
if item["type"] == "SYSTEM":
perf_raw_data.loc[perf_raw_data.metric == item["perf_name"], ["unit"]] = "SYSTEM"
system_event_flag = True
elif item["type"] == "SOCKET":
# TODO: for some socket-wide events, such as events from SLC shared by a socket, perf will report its value
# attributed to a CPU in this socket.
# e.g. SOCKET 0: CPU 0-15, 32-47 ... SOCKET 1: CPU 16-31, 48-63 ...
# timestamp | unit | value | metric
# 1.0000 | CPU0 | 12345 | uncore_cha_xxx // CPU0 -> SOCKET 0
# 1.0000 | CPU16 | 23456 | uncore_cha_xxx // CPU16 -> SOCKET 1
# 1.0000 | CPU0 | 23456 | cycles
# 1.0000 | CPU1 | 34567 | cycles
# ...
# -------------- faulty --------------
perf_raw_data.loc[perf_raw_data.metric == item["perf_name"], ["unit"]] \
= perf_raw_data.loc[perf_raw_data.metric == item["perf_name"], ["unit"]].apply(cpu2socket, axis=1)
socket_event_flag = True
# -------------- faulty --------------
# in every timestamp, aggregate performance data for selected cpus (aggregate 'unit')
# timestamp | unit | value | metric -> timestamp | value=sum(value) | metric
if self.configs["cpu_list"] == 'all':
scoped_raw_data = perf_raw_data.groupby(["timestamp", "metric"]).agg(
value=("value", np.sum)
).reset_index()
else:
unit_list = [ f"CPU{i}" for i in self.configs["cpu_list"] ]
# besides CPUs, there are also some system-wide and socket-wide events need to be added in 'unit_list'
# e.g. CPU 0, 2, 4, 6 are specified, these 4 CPUs are belong to SOCKET0, so that SOCKET0 and SYSTEM should be added in 'unit_list'.
if system_event_flag:
unit_list.append("SYSTEM")
if socket_event_flag:
for i in self.configs["cpu_list"]:
socket = f'SOCKET{self.cpu_topo.loc[self.cpu_topo.unit == i]["socket"].values[0]}'
if socket not in unit_list:
unit_list.append(socket)
self.logger.debug(f"Unit list: {unit_list}")
scoped_raw_data = perf_raw_data[perf_raw_data["unit"].isin(unit_list)].groupby(["timestamp", "metric"]).agg(
value=("value", np.sum)
).reset_index()
# rename event names used in perf by the generic event names defined by hperf
# e.g.
# timestamp | value | metric -> timestamp | value | metric
# 1.0000 | 98765 | r08d1 1.0000 | 98765 | L1 CACHE MISSES
# 1.0000 | 87654 | r10d1 1.0000 | 87654 | L2 CACHE MISSES
# ...
mapping_perf_name_to_name = {}
mapping_name_to_id = {}
for item in self.event_groups.events:
mapping_perf_name_to_name[item["perf_name"]] = item["name"]
mapping_name_to_id[item["name"]] = item["id"]
scoped_raw_data["metric"] = scoped_raw_data["metric"].apply(
lambda x: mapping_perf_name_to_name[x.split(":")[0]]
)
perf_timeseries = pd.DataFrame() # for final results
# timestamp | <event> | ... | <event> | <metric> | ... | <metric>
timestamps = scoped_raw_data.groupby(["timestamp"]).groups.keys() # get all timestamps
for t in timestamps:
metric_results = {}
metric_results["timestamp"] = t # col. 0
mapping_id_to_value = {}
for item in self.event_groups.events:
val = scoped_raw_data.query(f'metric=="{item["name"]}" & timestamp=={t}')["value"].iloc[0]
metric_results[item["name"]] = val # col. event count
mapping_id_to_value[f"e{item['id']}"] = val
for item in self.event_groups.metrics:
val = eval(item["expression"], mapping_id_to_value)
metric_results[item["metric"]] = val # col. metric result
perf_timeseries = pd.concat([perf_timeseries, pd.DataFrame(metric_results, index=[0])], ignore_index=True)
self.timeseries = pd.concat([perf_timeseries], axis=1)
def get_timeseries(self, to_csv: bool = False) -> pd.DataFrame:
"""
"""
if to_csv:
timeseries_path = os.path.join(self.test_dir, "timeseries.csv")
self.timeseries.to_csv(timeseries_path, header=True)
self.logger.info(f"save timeseries DataFrame to CSV file: {timeseries_path}")
return self.timeseries
def get_timeseries_plot(self):
"""
"""
perf_metrics = [ item["metric"] for item in self.event_groups.metrics ]
metrics = perf_metrics
axes = self.timeseries.plot(x="timestamp",
y=metrics,
subplots=True,
figsize=(10, 2 * len(metrics)))
fig = axes[0].get_figure()
timeseries_plot_path = os.path.join(self.test_dir, "timeseries.png")
fig.savefig(timeseries_plot_path)
self.logger.info(f"timeseries figure saved in: {timeseries_plot_path}")
def get_aggregated_metrics(self, to_csv: bool = False) -> pd.DataFrame:
"""
"""
# use timeseries to get aggregated metrics.
# for events, get the sum of values in differenet timestamps; for metrics, get the average of values in different timestamps.
metric_results = {}
for item in self.event_groups.events:
sum = self.timeseries[item["name"]].sum()
metric_results[item["name"]] = sum
for item in self.event_groups.metrics:
avg = self.timeseries[item["metric"]].mean()
metric_results[item["metric"]] = avg
self.aggregated_metrics = pd.DataFrame(metric_results, index=[0])
if to_csv:
aggregated_metrics_path = os.path.join(self.test_dir, "aggregated_metrics.csv")
self.aggregated_metrics.to_csv(aggregated_metrics_path, header=True)
self.logger.info(f"save aggregated metrics DataFrame to CSV file: {aggregated_metrics_path}")
return self.aggregated_metrics