Skip to content

Commit

Permalink
PerfMeasurements: add ability to simulate measurements
Browse files Browse the repository at this point in the history
The "bottom" measurements - meaning the most core measurement in a list
of measurements that we may run can now be "simulated".

This means that instead of actually running the measurement
tools/commands we instead run the following command:

    echo simulated start

And instead of returning real measurement results we return the same
structure of data with a single 0 value perf interval.

Combining this we can now run all of our ENRT recipes in a "simulation"
that preserves the number of commands per test recipe run and should
return results compatible with all of our tooling with just the basic 0
value.

The cpu measurement simulation does actually call the real measurement,
this is because this isn't a "bottom/core" measurement and is instead
designed to always wrap some other measurement.

Signed-off-by: Ondrej Lichtner <[email protected]>
  • Loading branch information
olichtne committed Jan 25, 2024
1 parent b16fce1 commit a9b14a9
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 3 deletions.
44 changes: 44 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/BaseFlowMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from dataclasses import dataclass
import textwrap
from typing import Optional, Union
Expand All @@ -10,6 +11,8 @@
from lnst.RecipeCommon.Perf.Measurements.MeasurementError import MeasurementError
from lnst.RecipeCommon.Perf.Measurements.BaseMeasurement import BaseMeasurement
from lnst.RecipeCommon.Perf.Measurements.Results import FlowMeasurementResults, AggregatedFlowMeasurementResults
from lnst.RecipeCommon.Perf.Results import PerfInterval
from lnst.RecipeCommon.Perf.Results import SequentialPerfResult
from lnst.RecipeCommon.Perf.Results import ParallelPerfResult


Expand Down Expand Up @@ -185,3 +188,44 @@ def aggregate_multi_flow_results(results):
aggregated_result.add_results(parallel_result)

return [aggregated_result]

def collect_simulated_results(self):
res = []
for test_flow in self.flows:
flow_results = FlowMeasurementResults(
measurement=self,
flow=test_flow,
warmup_duration=test_flow.warmup_duration,
)
flow_results.generator_results = ParallelPerfResult(
[
SequentialPerfResult(
[PerfInterval(0, 1, "bits", time.time())]
* (test_flow.warmup_duration * 2 + test_flow.duration)
)
]
)
flow_results.generator_cpu_stats = PerfInterval(
0,
(test_flow.warmup_duration * 2 + test_flow.duration),
"cpu_percent",
time.time(),
)

flow_results.receiver_results = ParallelPerfResult(
[
SequentialPerfResult(
[PerfInterval(0, 1, "bits", time.time())]
* (test_flow.warmup_duration * 2 + test_flow.duration)
)
]
)
flow_results.receiver_cpu_stats = PerfInterval(
0,
(test_flow.warmup_duration * 2 + test_flow.duration),
"cpu_percent",
time.time(),
)

res.append(flow_results)
return res
9 changes: 9 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/BaseMeasurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@ def recipe_conf(self):
def start(self):
raise NotImplementedError()

def simulate_start(self):
return self.start()

def finish(self):
raise NotImplementedError()

def simulate_finish(self):
return self.finish()

def collect_results(self):
raise NotImplementedError()

def collect_simulated_results(self):
return self.collect_results()

@classmethod
def report_results(cls, recipe, results):
raise NotImplementedError()
Expand Down
27 changes: 27 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/IperfFlowMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import time
import logging
from typing import List

from lnst.Common.IpAddress import ipaddress
Expand Down Expand Up @@ -70,6 +71,21 @@ def start(self):

self._running_measurements = test_flows

def simulate_start(self):
if len(self._running_measurements) > 0:
raise MeasurementError("Measurement already running!")

test_flows = self._prepare_test_flows(self.flows)

result = None
for flow in test_flows:
flow.server_job = flow.server_job.netns.run('echo simulated start', bg=True)

for flow in test_flows:
flow.client_job = flow.client_job.netns.run('echo simulated start', bg=True)

self._running_measurements = test_flows

def finish(self):
test_flows = self._running_measurements
try:
Expand All @@ -85,6 +101,17 @@ def finish(self):
self._running_measurements = []
self._finished_measurements = test_flows

def simulate_finish(self):
logging.info("Simulating minimal 1s measurement duration")
time.sleep(2)
test_flows = self._running_measurements
for flow in test_flows:
flow.client_job.wait()
flow.server_job.wait()

self._running_measurements = []
self._finished_measurements = test_flows

def collect_results(self):
test_flows = self._finished_measurements

Expand Down
27 changes: 27 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/NeperFlowMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import logging
from typing import List, Dict, Tuple
from lnst.Common.IpAddress import ipaddress
from lnst.Controller.Job import Job
Expand Down Expand Up @@ -45,6 +46,21 @@ def start(self):

self._running_measurements = test_flows

def simulate_start(self):
if len(self._running_measurements) > 0:
raise MeasurementError("Measurement already running!")

test_flows = self._prepare_test_flows(self.flows)

result = None
for flow in test_flows:
flow.server_job = flow.server_job.netns.run('echo simulated start', bg=True)

for flow in test_flows:
flow.client_job = flow.client_job.netns.run('echo simulated start', bg=True)

self._running_measurements = test_flows

def finish(self):
test_flows = self._running_measurements
try:
Expand All @@ -60,6 +76,17 @@ def finish(self):
self._running_measurements = []
self._finished_measurements = test_flows

def simulate_finish(self):
logging.info("Simulating minimal 1s measurement duration")
time.sleep(1)
test_flows = self._running_measurements
for flow in test_flows:
flow.client_job.wait()
flow.server_job.wait()

self._running_measurements = []
self._finished_measurements = test_flows

def _prepare_test_flows(self, flows: List[Flow]):
test_flows = []
for flow in flows:
Expand Down
16 changes: 16 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/RDMABandwidthMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any, Optional
import time
import logging

from lnst.Controller.Job import Job
from lnst.Controller.Recipe import BaseRecipe
Expand Down Expand Up @@ -41,6 +42,16 @@ def start(self) -> None:
for endpoint_test in self._endpoint_tests:
endpoint_test.client_job.start(bg=True)

def simulate_start(self):
self._endpoint_tests.extend(self._prepare_endpoint_tests())

for endpoint_test in self._endpoint_tests:
endpoint_test.server_job = endpoint_test.server_job.netns.run("echo simulated start", bg=True)

self._start_timestamp = time.time()
for endpoint_test in self._endpoint_tests:
endpoint_test.client_job = endpoint_test.client_job.netns.run("echo simulated start", bg=True)

def finish(self) -> None:
try:
for endpoint_test in self._endpoint_tests:
Expand All @@ -52,6 +63,11 @@ def finish(self) -> None:
endpoint_test.client_job.kill()
endpoint_test.server_job.kill()

def simulate_finish(self):
logging.info("Simulating minimal 1s measurement duration")
time.sleep(1)
self.finish()

def collect_results(self) -> list[RDMABandwidthMeasurementResults]:
results: list[RDMABandwidthMeasurementResults] = []
for endpoint_test in self._endpoint_tests:
Expand Down
27 changes: 27 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/TRexFlowMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
import signal
import re
import logging
from lnst.Controller.RecipeResults import ResultLevel

from lnst.RecipeCommon.Perf.Results import PerfInterval
Expand Down Expand Up @@ -67,6 +68,21 @@ def start(self):

self._running_measurements = tests

def simulate_start(self):
if len(self._running_measurements) > 0:
raise MeasurementError("Measurement already running!")

tests = self._prepare_tests(self._flows)

result = None
for test in tests:
test.server_job = test.server_job.netns.run("echo simulated start", bg=True)

for test in tests:
test.client_job = test.client_job.netns.run("echo simulated start", bg=True)

self._running_measurements = tests

def finish(self):
tests = self._running_measurements
try:
Expand All @@ -84,6 +100,17 @@ def finish(self):
self._running_measurements = []
self._finished_measurements = tests

def simulate_finish(self):
logging.info("Simulating minimal 1s measurement duration")
time.sleep(1)
tests = self._running_measurements
for test in tests:
test.client_job.wait()
test.server_job.wait()

self._running_measurements = []
self._finished_measurements = tests

def _prepare_tests(self, flows):
tests = []

Expand Down
27 changes: 27 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/TcRunMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import logging
from tempfile import NamedTemporaryFile
from typing import Optional
Expand Down Expand Up @@ -151,6 +152,16 @@ def start(self):
job.start(bg=True)
self._running_jobs.append(job)

def simulate_start(self):
if len(self._running_jobs) > 0:
raise MeasurementError("Measurement already running!")

jobs = self._prepare_jobs()

for job in jobs:
job = job.netns.run("echo simulated start", bg=True)
self._running_jobs.append(job)

def _prepare_jobs(self) -> list[Job]:
params: dict = {
"batchfiles": [i.batchfile_path for i in self.instance_configs],
Expand All @@ -176,6 +187,11 @@ def finish(self):
self._running_jobs = []
self._finished_jobs = jobs

def simulate_finish(self):
logging.info("Simulating minimal 1s measurement duration")
time.sleep(1)
self.finish()

def _make_instances_cfgs(self) -> list[TcRunInstance]:
#TODO perhaps make this be something the recipe or a ResultGenerator creates
configs = []
Expand Down Expand Up @@ -205,6 +221,17 @@ def collect_results(self) -> list[TcRunMeasurementResults]:

return [run_result]

def collect_simulated_results(self):
run_result = TcRunMeasurementResults(
measurement=self,
device=self.device,
)
run_result.rule_install_rate = ParallelPerfResult(
[PerfInterval(0, 1, "rules", time.time())]
)
run_result.run_success = True
return [run_result]

def _get_instance_interval(self, instance_data: dict):
return PerfInterval(
value=self._rules_per_instance,
Expand Down
22 changes: 22 additions & 0 deletions lnst/RecipeCommon/Perf/Measurements/XDPBenchMeasurement.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import time
import logging

from lnst.RecipeCommon.Perf.Measurements.Results.AggregatedXDPBenchMeasurementResults import (
AggregatedXDPBenchMeasurementResults,
)
Expand Down Expand Up @@ -47,6 +50,16 @@ def start(self):

self._running_measurements = net_flows

def simulate_start(self):
net_flows = self._prepare_flows()
for flow in net_flows:
flow.server_job = flow.server_job.netns.run("echo simulated start", bg=True)
flow.client_job = flow.client_job.netns.run("echo simulated start", bg=True)
# server starts immediately, no need to wait
self._running_measurements.append(flow)

self._running_measurements = net_flows

def _prepare_server(self, flow: Flow):
params = {
"command": self.command,
Expand Down Expand Up @@ -97,6 +110,15 @@ def finish(self):
self._finished_measurements = self._running_measurements
self._running_measurements = []

def simulate_finish(self):
logging.info("Simulating minimal 1s measurement duration")
time.sleep(1)
for flow in self._running_measurements:
flow.server_job.wait()
flow.client_job.wait()
self._finished_measurements = self._running_measurements
self._running_measurements = []

def collect_results(self):
test_flows = self._finished_measurements

Expand Down
24 changes: 21 additions & 3 deletions lnst/RecipeCommon/Perf/Recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@ def __init__(
measurements: List[BaseMeasurement],
iterations: int,
parent_recipe_config: Any = None,
simulate_measurements: bool = False,
):
self._measurements = measurements
self._evaluators = dict()
self._iterations = iterations
self._parent_recipe_config = parent_recipe_config
self._simulate_measurements = simulate_measurements

@property
def measurements(self):
return self._measurements

@property
def simulate_measurements(self):
return self._simulate_measurements

@property
def evaluators(self):
return dict(self._evaluators)
Expand Down Expand Up @@ -167,11 +173,23 @@ def perf_test_iteration(

try:
for measurement in recipe_conf.measurements:
measurement.start()
if recipe_conf.simulate_measurements:
logging.info(f"Simulating start of measurement {measurement}")
measurement.simulate_start()
else:
measurement.start()
for measurement in reversed(recipe_conf.measurements):
measurement.finish()
if recipe_conf.simulate_measurements:
logging.info(f"Simulating finish of measurement {measurement}")
measurement.simulate_finish()
else:
measurement.finish()
for measurement in recipe_conf.measurements:
measurement_results = measurement.collect_results()
if recipe_conf.simulate_measurements:
logging.info(f"Simulating result collection of measurement {measurement}")
measurement_results = measurement.collect_simulated_results()
else:
measurement_results = measurement.collect_results()
results.add_measurement_results(
measurement, measurement_results
)
Expand Down
Loading

0 comments on commit a9b14a9

Please sign in to comment.