Skip to content

Commit

Permalink
more improvements to runner
Browse files Browse the repository at this point in the history
  • Loading branch information
rotu committed Oct 31, 2019
1 parent b385ce8 commit 20d24e1
Showing 1 changed file with 182 additions and 79 deletions.
261 changes: 182 additions & 79 deletions performance_test/helper_scripts/run_experiment_rover.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,127 @@
import json
import os
import platform
import shutil
import subprocess
import uuid
import sys
from collections import namedtuple
from datetime import datetime
from io import StringIO
from pathlib import Path
from shlex import shlex

import csv

from pandas import DataFrame
from StringIO import StringIO
from typing import Optional

import cpuinfo # from py-cpuinfo
import pandas as pandas
from pandas import DataFrame


def filename_stem():
return datetime.now().strftime("%Y%m%d-%H%M%S")


def main():
search_paths = []
for p in os.environ.get('AMENT_PREFIX_PATH', '').split(os.pathsep):
if not p: continue
p2 = os.path.join(p, 'lib/performance_test')
if p in search_paths: continue
search_paths.append(p2)
for p in os.environ.get('PATH', '').split(os.pathsep):
if p and p not in search_paths:
search_paths.append(p)
full_search_path = ':'.join(search_paths)

for rmw in ('rmw_cyclonedds_cpp', 'rmw_fastrtps_cpp'):
for topic in (
'Array1k', # 'Array4k',
'Array16k', # 'Array32k',
'Array60k', # 'Array1m',
'Array2m'
):
run_experiment(
exe_path=full_search_path,
rmw=rmw,
reliable=True,
num_subs=16,
topic=topic,
max_runtime=30,
data_dir='data',
data_file_prefix=rmw + '_' + topic + '_',
)

plot_data('data')


def run_experiment(
exe_path=None,
topic='Array1k',
rate=100,
rmw='',
num_subs=10,
num_pubs=10,
timeout=30,
output_directory='./log',
filename=None
num_pubs=1,
max_runtime=30,
dds_domain_id=77,
history_depth: Optional[int] = 100,
reliable=False,
data_dir='data',
data_file_prefix=None,
):
filename = Path(output_directory) / (
filename or (filename_stem() + '.json'))

new_env = dict(os.environ)
new_env.update(
RMW_IMPLEMENTATION=rmw,
)
new_env['RMW_IMPLEMENTATION'] = rmw

test_record = {
'cmd': ['perf_test',
'--topic', str(topic),
'--rate', str(rate),
'--num_subs', str(num_subs),
'--num_pubs', str(num_pubs)
],
'timeout': timeout,
'env': new_env,
}
cmd = [
shutil.which('perf_test', path=exe_path),
'--communication', 'ROS2',
'--topic', str(topic),
'--rate', str(rate),
'--num_sub_threads', str(num_subs),
'--num_pub_threads', str(num_pubs),
'--max_runtime', str(max_runtime),
*(['--reliable'] if reliable else []),
*(['--keep_last', '--history_depth',
str(history_depth)] if history_depth is not None else []),
'--dds_domain_id', str(dds_domain_id),
]

try:
res = subprocess.run(
test_record['cmd'], timeout=timeout + 0.5,
capture_output=True, env=new_env)
test_record['stdout'] = res.stdout
test_record['stderr'] = res.stderr
except subprocess.TimeoutExpired as e:
test_record['stdout'] = e.stdout
test_record['stderr'] = e.stderr
res = subprocess.run(cmd, timeout=max_runtime + 0.5,
env=new_env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8')
stdout = res.stdout
stderr = res.stderr

with open(filename, 'w') as f:
json.dump(test_record, f)
except subprocess.TimeoutExpired as e:
stdout = e.stdout
stderr = e.stderr

if stderr:
print(stderr, file=sys.stderr)
if not stdout:
print('no stdout from perf_test', file=sys.stderr)

experiment_data = extract_experiment_data(stdout)
experiment_id = experiment_data['experiment_id']
experiment_data.update({
'cmd': subprocess.list2cmdline(cmd),
'timeout': max_runtime,
'env': repr(new_env),
'cpu': cpuinfo.get_cpu_info()['brand'],
'host': platform.node(),
'os': platform.platform(),
})

experiment_metrics = extract_metric_data(stdout)
experiment_metrics['experiment_id'] = experiment_id

data_content = {'experiment': experiment_data, 'metrics': experiment_metrics.to_dict()}
Path(data_dir).mkdir(parents=True, exist_ok=True)
data_file_name = (data_file_prefix or '') + filename_stem() + '.json'
with Path(data_dir, data_file_name).open('w') as f:
json.dump(data_content, f, indent=2)


C = namedtuple('Column', ['name', 'parser'])
Expand Down Expand Up @@ -92,52 +156,91 @@ def run_experiment(
'latency_mean(ms)': C('latency_mean_ms', float),
'cpu_usage (%)': C('cpu_usage', lambda x: float(x) / 100.0),
'ru_maxrss': C('ru_maxrss', int),
# todo: the rest of the columns
}

import warnings


def extract_experiment_data(text: str):
text = text.split('---EXPERIMENT-START---')[0]
result = dict()
for line in text.splitlines():
try:
k, v = line.split(':', 1)
except Exception as e:
warnings.warn('failed to parse line: ' + line + str(e))
continue
k = k.strip()
v = v.strip()
column_def = experiment_columns.get(k)
if column_def is None:
result[k] = v
else:
result[column_def.name] = column_def.parser(v)
return result


def extract_metric_data(text: str):
text = text.split('---EXPERIMENT-START---')[-1]
text = text.split('Maximum runtime reached. Exiting.')[0]
data_io = StringIO(text)
return pandas.read_csv(data_io, sep=r'\s*,\s*', engine='python')


import matplotlib.pyplot as plt

size_by_topic = {
'Array1k': 1 << 10,
'Array4k': 4 << 10,
'Array16k': 16 << 10,
'Array32k': 32 << 10,
'Array60k': 60 << 10,
'Array1m': 1 << 20,
'Array2m': 2 << 20,
}


def merge_data(files):
def plot_data(data_dir):
all_experiments = []
all_metrics = []
for file in files:
text = Path(file).read_text()
experiment_text, metrics_test = text.split('\n---EXPERIMENT-START---\n', 2)

experiment_dict = {}
for line in experiment_text.splitlines():
for k, v in line.split(':', 2):
k = k.strip()
v = v.strip()
column_def = experiment_columns.get(k)
if column_def is None:
experiment_dict[k] = v
else:
experiment_dict[column_def.name] = column_def.parser(v)
all_experiments.append(experiment_dict)

experiment_id = experiment_dict['experiment_id']

data_io = StringIO(metrics_test)
df = pandas.read_csv(data_io)
df.assign(experiment_id=experiment_id)
header_data = dict()
for line in experiment_text.splitlines():
for k, v in line.split(':', 2):
header_data[k] = v
df.assign(**header_data)
all_data.append(df)
return pandas.concat(all_data)


def plot_data(df):
grouped = df.group_by('rmw').mean()
grouped.plot.scatter(x='num_pubs', y='latency_mean')

pass


def run_experiments():
pass


def plot_results():
pass
for file in Path(data_dir).glob('*.json'):
with file.open('r') as f:
dobj = json.load(f)
all_experiments.append(dobj['experiment'])
all_metrics.append(DataFrame.from_dict(dobj['metrics']))
e = DataFrame.from_records(all_experiments, index=['experiment_id'])
e['message_size'] = e['topic'].replace(size_by_topic)

m = pandas.concat(all_metrics)
# The first few seconds have suspiciously low CPU usage;
# I suspect a measuring artifact
m = m.loc[m['T_experiment'] > 3]

all_data = m.join(e, on=['experiment_id'])
all_data.sort_values(by='message_size', inplace=True)
all_data['all'] = ''

import seaborn as sns
plt.figure()
ax = sns.violinplot(x='topic', y='latency_mean (ms)', hue='rmw', data=all_data, split=True)
ax.set(xlabel='Topic', ylabel='Mean Latency (ms) - lower is better', yscale='log')
# plt.show()
plt.savefig('latency.png')

plt.figure()
ax = sns.violinplot(x='topic', y='cpu_usage (%)', hue='rmw', data=all_data, split=True)
ax.set(xlabel='Topic', ylabel='CPU usage (%) - lower is better', yscale='linear')
# plt.show()
plt.savefig('cpu.png')

plt.figure()
all_data['ru_maxrss_mb'] = all_data['ru_maxrss'] / 1000
ax = sns.barplot(x='topic', y='ru_maxrss_mb', hue='rmw', data=all_data, ci=None)
ax.set(xlabel='Topic', ylabel='RAM Usage (MB) - lower is better', yscale='linear')
# plt.show()
plt.savefig('ram_usage.png')


if __name__ == '__main__':
main()

0 comments on commit 20d24e1

Please sign in to comment.