Skip to content

Commit

Permalink
Parallelize the matrix build process
Browse files Browse the repository at this point in the history
  • Loading branch information
julianneswinoga committed Dec 13, 2022
1 parent 7720f6e commit 2edb3b2
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 50 deletions.
127 changes: 80 additions & 47 deletions matrix_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import os
import shutil
import signal
import subprocess
import click
import sys
from pathlib import Path
from typing import List

import tabulate
from constraint import *

from matrix_build_parallel import Executor, execute, get_available_executor_idx, get_finished_executor_idx, \
cleanup_tempdirs, create_executors, get_source_files_to_link, wait_for_executor_to_finish, copy_caches_to_executors

CONTINUE_ON_ERROR = False

BOARDS = [
Expand Down Expand Up @@ -279,43 +284,28 @@ def get_value(vb, vk):
print(tabulate.tabulate(rows, tablefmt="grid", showindex=map(shorten, keys), colalign=("right",)))


def generate_config_file(flag_values):
content = "#pragma once\n\n"
for key, value in flag_values.items():
content += "#define {} {}\n".format(key, value)

with open("Configuration_local_matrix.hpp", 'w') as f:
f.write(content)
print("Generated local config")
print("Path: {}".format(os.path.abspath(f.name)))
print("Content:")
print(content)


def create_run_environment(flag_values):
build_env = dict(os.environ)
build_flags = " ".join(["-D{}={}".format(key, value) for key, value in flag_values.items()])
build_env["PLATFORMIO_BUILD_FLAGS"] = build_flags
return build_env
def print_failed_executor(executor: Executor):
print(f'Error for the following configuration ({executor.proj_dir}):', file=sys.stderr)
print_solutions_matrix([executor.solution])
configuration_path = Path(executor.proj_dir, 'Configuration_local_matrix.hpp')
print(f'{configuration_path}:')
with open(configuration_path, 'r') as fp:
print(fp.read())
out_bytes, err_bytes = executor.proc.communicate()
if out_bytes:
print(out_bytes.decode())
if err_bytes:
print(err_bytes.decode(), file=sys.stderr)


def execute(board, flag_values, use_config_file=True):
if use_config_file:
build_env = dict(os.environ)
build_env["PLATFORMIO_BUILD_FLAGS"] = "-DMATRIX_LOCAL_CONFIG=1"
generate_config_file(flag_values)
else:
build_env = create_run_environment(flag_values)

proc = subprocess.Popen(
"pio run -e {}".format(board),
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
shell=True,
env=build_env,
)
(stdout, stderr) = proc.communicate()
return stdout, stdout, proc.returncode
def run_solution_blocking(executor: Executor, solution: dict) -> int:
executor.solution = copy.deepcopy(solution)
board = solution.pop("BOARD")
executor.proc = execute(executor.proj_dir, board, solution, jobs=os.cpu_count(), out_pipe=False)
executor.proc.wait()
if executor.proc.returncode != 0:
print_failed_executor(executor)
return executor.proc.returncode


class GracefulKiller:
Expand Down Expand Up @@ -353,17 +343,60 @@ def solve(board):
solutions = problem.getSolutions()
print_solutions_matrix(solutions, short_strings=False)

print("Testing {} combinations".format(len(solutions)))

for num, solution in enumerate(solutions, start=1):
print("[{}/{}] Building ...".format(num, len(solutions)), flush=True)
print_solutions_matrix([solution])

board = solution.pop("BOARD")
(o, e, c) = execute(board, solution)
if c and not CONTINUE_ON_ERROR:
exit(c)
print(flush=True)
total_solutions = len(solutions)
print(f'Testing {total_solutions} combinations')

nproc = min(os.cpu_count(), len(solutions))

local_paths_to_link = get_source_files_to_link()
executor_list: List[Executor] = create_executors(nproc, local_paths_to_link)

print('First run to fill cache')
solution = solutions.pop()
retcode = run_solution_blocking(executor_list[0], solution)
if retcode != 0 and not CONTINUE_ON_ERROR:
exit(retcode)

copy_caches_to_executors(executor_list[0].proj_dir, executor_list[1:])

solutions_built = 2 # We've already built one solution, and we're 1-indexing
exit_early = False # Exit trigger
while solutions:
# First fill any open execution slots
while get_available_executor_idx(executor_list) is not None:
available_executor_idx = get_available_executor_idx(executor_list)
executor = executor_list[available_executor_idx]
try:
solution = solutions.pop()
except IndexError:
# No more solutions to try!
break
print(f'[{solutions_built}/{total_solutions}] Building ...')
executor.solution = copy.deepcopy(solution)
board = solution.pop("BOARD")
executor.proc = execute(executor.proj_dir, board, solution)
solutions_built += 1

# Next wait for any processes to finish
wait_for_executor_to_finish(executor_list)

# Go through all the finished processes and check their status
while get_finished_executor_idx(executor_list) is not None:
finished_executor_idx = get_finished_executor_idx(executor_list)
executor = executor_list[finished_executor_idx]
if executor.proc.returncode != 0:
print_failed_executor(executor)
if not CONTINUE_ON_ERROR:
exit_early = True
del executor.proc
executor.proc = None

if exit_early:
break
if exit_early:
exit(1)
print('Done!')
cleanup_tempdirs(executor_list)


if __name__ == '__main__':
Expand Down
193 changes: 193 additions & 0 deletions matrix_build_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""
Module where all functionality that purely relates to how we parallelize matrix_build.py
should live. It's not a perfect split of course, but it helps to separate the 'matrix'
logic from the 'how we build' logic.
"""
import os
import shutil
import subprocess
import tempfile
import time
from pathlib import Path
from typing import Optional, List
from dataclasses import dataclass


@dataclass
class Executor:
"""
Core data that defines a solution that is being built
"""
# The directory where we are building the solution
proj_dir: Path
# The solution dictionary
solution: Optional[dict] = None
# The process building the solution
proc: Optional[subprocess.Popen] = None
# Object that holds tempdir data, so that it can be cleaned up later
tempdir_obj: Optional[tempfile.TemporaryDirectory] = None


def generate_config_file(project_location: Path, flag_values: dict):
content = "#pragma once\n\n"
for key, value in flag_values.items():
content += "#define {} {}\n".format(key, value)

with open(Path(project_location, "Configuration_local_matrix.hpp"), 'w') as f:
f.write(content)
f.flush()


def execute(project_location: Path, board: str, flag_values: dict, jobs: int = 1, out_pipe=True) -> subprocess.Popen:
"""
Start up an executor that is building a solution
:param project_location: The directory where to build the solution
:param board: The board type (aka environment)
:param flag_values: Dictionary of #defines to create a config file from
:param jobs: How many jobs the build process should use
:param out_pipe: If the executor's stdout/stderr should be pipes
:return: Process object that is executing the solution
"""
build_env = dict(os.environ)
build_env["PLATFORMIO_BUILD_FLAGS"] = "-DMATRIX_LOCAL_CONFIG=1"
generate_config_file(project_location, flag_values)

proc = subprocess.Popen(
['pio',
'run',
f'--project-dir={str(project_location.resolve())}',
f'--environment={board}',
f'--jobs={jobs}',
],
stdout=subprocess.PIPE if out_pipe else None,
stderr=subprocess.PIPE if out_pipe else None,
env=build_env,
close_fds=True,
)
return proc


def get_available_executor_idx(e_list: List[Executor]) -> Optional[int]:
"""
Get the index of an idle executor
:param e_list: List of executors
:return: Idle executor index, else None if all are busy
"""
for i, executor in enumerate(e_list):
if executor.proc is None:
return i
return None


def get_finished_executor_idx(e_list: List[Executor]) -> Optional[int]:
"""
Get the index of a finished executor
:param e_list: List of executors
:return: Finished executor index, else None if all are busy
"""
for i, executor in enumerate(e_list):
if executor.proc is not None and executor.proc.poll() is not None:
return i
return None


def cleanup_tempdirs(e_list: List[Executor]):
"""
Delete all the temporary directories that executors were using
:param e_list: List of executors
"""
for executor in e_list:
if executor.tempdir_obj is not None:
tempdir_path = executor.tempdir_obj.name
print(f'Deleting {tempdir_path}')
shutil.rmtree(tempdir_path, ignore_errors=True)


def create_executors(num_executors: int, local_paths_to_link: List[Path]) -> List[Executor]:
"""
Create a number of executors and their associated temporary directories, then
soft-link all needed project files
:param num_executors: Number of executors to create
:param local_paths_to_link: List of files to soft-link into the executor projects
:return: List of executors
"""
executor_list: List[Executor] = []
print(f'Creating {num_executors} executors')
for executor_idx in range(num_executors):
tempdir = tempfile.TemporaryDirectory()
temp_proj_path = Path(tempdir.name)
for local_path in local_paths_to_link:
temp_dst_path = Path(temp_proj_path, local_path).resolve()
os.makedirs(temp_dst_path.parent, exist_ok=True)
os.symlink(local_path.resolve(), temp_dst_path)
executor_list.append(Executor(temp_proj_path, tempdir_obj=tempdir))
print(f'{executor_idx} ', end='')
print()
return executor_list


def copy_caches_to_executors(src_proj_dir: Path, dst_executors: List[Executor]):
"""
Copy cache directories from a source directory to a number of executor project directories
:param src_proj_dir: Directory to copy from
:param dst_executors: List of executors to copy to
"""
print('Copying caches to other executors')
dir_names_to_copy = ['.pio', 'build_cache']
for dir_name_to_copy in dir_names_to_copy:
src_path = Path(src_proj_dir, dir_name_to_copy)
for dst_executor in dst_executors:
dst_path = Path(dst_executor.proj_dir, dir_name_to_copy)
shutil.copytree(src_path, dst_path)


def get_source_files_to_link() -> List[Path]:
"""
Create a list of the important files from the local project. I didn't want to
use git here, since that might not pick up untracked (but needed) files.
:return: List of source files that a project needs in order to compile
"""
local_proj_path = Path('.')
venv_dirs = list(local_proj_path.glob('*venv*/'))
# Don't link the .pio directory because the builds need to be independent
pio_dirs = list(local_proj_path.glob('*.pio*/'))
cmake_dirs = list(local_proj_path.glob('*cmake-build*/'))

local_dirs_to_not_link = [Path('.git/'), Path('build_cache/')] + venv_dirs + pio_dirs + cmake_dirs
local_filenames_to_not_link = [
Path('Configuration_local.hpp'),
Path('Configuration_local_matrix.hpp'),
]

local_paths_to_link = []
for local_dir_str, local_subdirs, local_files in os.walk(local_proj_path):
local_dir_path = Path(local_dir_str)
dir_shouldnt_be_linked = any(d == local_dir_path or d in local_dir_path.parents for d in local_dirs_to_not_link)
if dir_shouldnt_be_linked:
continue
for local_file in local_files:
local_file_full_path = Path(local_dir_path, local_file)
file_shouldnt_be_linked = any(local_file_full_path == f for f in local_filenames_to_not_link)
if file_shouldnt_be_linked:
continue
local_paths_to_link.append(local_file_full_path)
return local_paths_to_link


def wait_for_executor_to_finish(executor_list: List[Executor], timeout=0.1, poll_time=0.2):
"""
Block until an executor has finished building
:param executor_list: List of executors
:param timeout: Time to communicate() with the running process (kind of a hack)
:param poll_time: Time to wait before checking all executors again
"""
while get_finished_executor_idx(executor_list) is None:
for e in executor_list:
if e.proc is not None and e.proc.poll() is None:
# Communicate with the running processes to stop them from blocking
# (i.e. they spew too much output)
try:
_ = e.proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
pass # This is expected and what should happen
time.sleep(poll_time)
15 changes: 13 additions & 2 deletions post_script_remove_patched_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import tempfile
from pathlib import Path


def cprint(*args, **kwargs):
Expand All @@ -13,7 +14,13 @@ def clean_up_patched_files(*_, **__):
Removes all temporary patched files created previously in the build process
"""
# patch_path_key needs to be kept in sync with pre_script_patch_debug.py
patch_path_key = '_patched_'
# We put the current directory name in the key so that we only remove
# patched files that we know were built by the current build process.
# This is only useful in safeguarding against multiple builds being done in
# different directories at the same time. (i.e. we don't want to remove another
# processes' files while they are still in use)
project_dir_name = Path.cwd().name
patch_path_key = f'_{project_dir_name}_patched_'
tempdir_path = tempfile.gettempdir()
cprint(f'Temp file dir is {tempdir_path}')
patched_filepaths = []
Expand All @@ -23,7 +30,11 @@ def clean_up_patched_files(*_, **__):
patched_filepaths.append(full_filepath)
for patched_filepath in patched_filepaths:
cprint(f'Removing {patched_filepath}')
os.remove(patched_filepath)
try:
os.remove(patched_filepath)
pass
except FileNotFoundError:
cprint('Not found (deleted already?)')


env.AddPostAction('buildprog', clean_up_patched_files)
Loading

0 comments on commit 2edb3b2

Please sign in to comment.