diff --git a/matrix_build.py b/matrix_build.py index 6dc9622c..ddd9e663 100644 --- a/matrix_build.py +++ b/matrix_build.py @@ -4,12 +4,17 @@ import os import shutil import signal -import subprocess import click +import sys +from pathlib import Path +from typing import List import tabulate from constraint import * +from matrix_build_parallel import Executor, execute, get_available_executor_idx, get_finished_executor_idx, \ + cleanup_tempdirs, create_executors, get_source_files_to_link, wait_for_executor_to_finish, copy_caches_to_executors + CONTINUE_ON_ERROR = False BOARDS = [ @@ -279,43 +284,28 @@ def get_value(vb, vk): print(tabulate.tabulate(rows, tablefmt="grid", showindex=map(shorten, keys), colalign=("right",))) -def generate_config_file(flag_values): - content = "#pragma once\n\n" - for key, value in flag_values.items(): - content += "#define {} {}\n".format(key, value) - - with open("Configuration_local_matrix.hpp", 'w') as f: - f.write(content) - print("Generated local config") - print("Path: {}".format(os.path.abspath(f.name))) - print("Content:") - print(content) - - -def create_run_environment(flag_values): - build_env = dict(os.environ) - build_flags = " ".join(["-D{}={}".format(key, value) for key, value in flag_values.items()]) - build_env["PLATFORMIO_BUILD_FLAGS"] = build_flags - return build_env +def print_failed_executor(executor: Executor): + print(f'Error for the following configuration ({executor.proj_dir}):', file=sys.stderr) + print_solutions_matrix([executor.solution]) + configuration_path = Path(executor.proj_dir, 'Configuration_local_matrix.hpp') + print(f'{configuration_path}:') + with open(configuration_path, 'r') as fp: + print(fp.read()) + out_bytes, err_bytes = executor.proc.communicate() + if out_bytes: + print(out_bytes.decode()) + if err_bytes: + print(err_bytes.decode(), file=sys.stderr) -def execute(board, flag_values, use_config_file=True): - if use_config_file: - build_env = dict(os.environ) - build_env["PLATFORMIO_BUILD_FLAGS"] = "-DMATRIX_LOCAL_CONFIG=1" - generate_config_file(flag_values) - else: - build_env = create_run_environment(flag_values) - - proc = subprocess.Popen( - "pio run -e {}".format(board), - # stdout=subprocess.PIPE, - # stderr=subprocess.PIPE, - shell=True, - env=build_env, - ) - (stdout, stderr) = proc.communicate() - return stdout, stdout, proc.returncode +def run_solution_blocking(executor: Executor, solution: dict) -> int: + executor.solution = copy.deepcopy(solution) + board = solution.pop("BOARD") + executor.proc = execute(executor.proj_dir, board, solution, jobs=os.cpu_count(), out_pipe=False) + executor.proc.wait() + if executor.proc.returncode != 0: + print_failed_executor(executor) + return executor.proc.returncode class GracefulKiller: @@ -353,17 +343,60 @@ def solve(board): solutions = problem.getSolutions() print_solutions_matrix(solutions, short_strings=False) - print("Testing {} combinations".format(len(solutions))) - - for num, solution in enumerate(solutions, start=1): - print("[{}/{}] Building ...".format(num, len(solutions)), flush=True) - print_solutions_matrix([solution]) - - board = solution.pop("BOARD") - (o, e, c) = execute(board, solution) - if c and not CONTINUE_ON_ERROR: - exit(c) - print(flush=True) + total_solutions = len(solutions) + print(f'Testing {total_solutions} combinations') + + nproc = min(os.cpu_count(), len(solutions)) + + local_paths_to_link = get_source_files_to_link() + executor_list: List[Executor] = create_executors(nproc, local_paths_to_link) + + print('First run to fill cache') + solution = solutions.pop() + retcode = run_solution_blocking(executor_list[0], solution) + if retcode != 0 and not CONTINUE_ON_ERROR: + exit(retcode) + + copy_caches_to_executors(executor_list[0].proj_dir, executor_list[1:]) + + solutions_built = 2 # We've already built one solution, and we're 1-indexing + exit_early = False # Exit trigger + while solutions: + # First fill any open execution slots + while get_available_executor_idx(executor_list) is not None: + available_executor_idx = get_available_executor_idx(executor_list) + executor = executor_list[available_executor_idx] + try: + solution = solutions.pop() + except IndexError: + # No more solutions to try! + break + print(f'[{solutions_built}/{total_solutions}] Building ...') + executor.solution = copy.deepcopy(solution) + board = solution.pop("BOARD") + executor.proc = execute(executor.proj_dir, board, solution) + solutions_built += 1 + + # Next wait for any processes to finish + wait_for_executor_to_finish(executor_list) + + # Go through all the finished processes and check their status + while get_finished_executor_idx(executor_list) is not None: + finished_executor_idx = get_finished_executor_idx(executor_list) + executor = executor_list[finished_executor_idx] + if executor.proc.returncode != 0: + print_failed_executor(executor) + if not CONTINUE_ON_ERROR: + exit_early = True + del executor.proc + executor.proc = None + + if exit_early: + break + if exit_early: + exit(1) + print('Done!') + cleanup_tempdirs(executor_list) if __name__ == '__main__': diff --git a/matrix_build_parallel.py b/matrix_build_parallel.py new file mode 100644 index 00000000..3b7fea41 --- /dev/null +++ b/matrix_build_parallel.py @@ -0,0 +1,193 @@ +""" +Module where all functionality that purely relates to how we parallelize matrix_build.py +should live. It's not a perfect split of course, but it helps to separate the 'matrix' +logic from the 'how we build' logic. +""" +import os +import shutil +import subprocess +import tempfile +import time +from pathlib import Path +from typing import Optional, List +from dataclasses import dataclass + + +@dataclass +class Executor: + """ + Core data that defines a solution that is being built + """ + # The directory where we are building the solution + proj_dir: Path + # The solution dictionary + solution: Optional[dict] = None + # The process building the solution + proc: Optional[subprocess.Popen] = None + # Object that holds tempdir data, so that it can be cleaned up later + tempdir_obj: Optional[tempfile.TemporaryDirectory] = None + + +def generate_config_file(project_location: Path, flag_values: dict): + content = "#pragma once\n\n" + for key, value in flag_values.items(): + content += "#define {} {}\n".format(key, value) + + with open(Path(project_location, "Configuration_local_matrix.hpp"), 'w') as f: + f.write(content) + f.flush() + + +def execute(project_location: Path, board: str, flag_values: dict, jobs: int = 1, out_pipe=True) -> subprocess.Popen: + """ + Start up an executor that is building a solution + :param project_location: The directory where to build the solution + :param board: The board type (aka environment) + :param flag_values: Dictionary of #defines to create a config file from + :param jobs: How many jobs the build process should use + :param out_pipe: If the executor's stdout/stderr should be pipes + :return: Process object that is executing the solution + """ + build_env = dict(os.environ) + build_env["PLATFORMIO_BUILD_FLAGS"] = "-DMATRIX_LOCAL_CONFIG=1" + generate_config_file(project_location, flag_values) + + proc = subprocess.Popen( + ['pio', + 'run', + f'--project-dir={str(project_location.resolve())}', + f'--environment={board}', + f'--jobs={jobs}', + ], + stdout=subprocess.PIPE if out_pipe else None, + stderr=subprocess.PIPE if out_pipe else None, + env=build_env, + close_fds=True, + ) + return proc + + +def get_available_executor_idx(e_list: List[Executor]) -> Optional[int]: + """ + Get the index of an idle executor + :param e_list: List of executors + :return: Idle executor index, else None if all are busy + """ + for i, executor in enumerate(e_list): + if executor.proc is None: + return i + return None + + +def get_finished_executor_idx(e_list: List[Executor]) -> Optional[int]: + """ + Get the index of a finished executor + :param e_list: List of executors + :return: Finished executor index, else None if all are busy + """ + for i, executor in enumerate(e_list): + if executor.proc is not None and executor.proc.poll() is not None: + return i + return None + + +def cleanup_tempdirs(e_list: List[Executor]): + """ + Delete all the temporary directories that executors were using + :param e_list: List of executors + """ + for executor in e_list: + if executor.tempdir_obj is not None: + tempdir_path = executor.tempdir_obj.name + print(f'Deleting {tempdir_path}') + shutil.rmtree(tempdir_path, ignore_errors=True) + + +def create_executors(num_executors: int, local_paths_to_link: List[Path]) -> List[Executor]: + """ + Create a number of executors and their associated temporary directories, then + soft-link all needed project files + :param num_executors: Number of executors to create + :param local_paths_to_link: List of files to soft-link into the executor projects + :return: List of executors + """ + executor_list: List[Executor] = [] + print(f'Creating {num_executors} executors') + for executor_idx in range(num_executors): + tempdir = tempfile.TemporaryDirectory() + temp_proj_path = Path(tempdir.name) + for local_path in local_paths_to_link: + temp_dst_path = Path(temp_proj_path, local_path).resolve() + os.makedirs(temp_dst_path.parent, exist_ok=True) + os.symlink(local_path.resolve(), temp_dst_path) + executor_list.append(Executor(temp_proj_path, tempdir_obj=tempdir)) + print(f'{executor_idx} ', end='') + print() + return executor_list + + +def copy_caches_to_executors(src_proj_dir: Path, dst_executors: List[Executor]): + """ + Copy cache directories from a source directory to a number of executor project directories + :param src_proj_dir: Directory to copy from + :param dst_executors: List of executors to copy to + """ + print('Copying caches to other executors') + dir_names_to_copy = ['.pio', 'build_cache'] + for dir_name_to_copy in dir_names_to_copy: + src_path = Path(src_proj_dir, dir_name_to_copy) + for dst_executor in dst_executors: + dst_path = Path(dst_executor.proj_dir, dir_name_to_copy) + shutil.copytree(src_path, dst_path) + + +def get_source_files_to_link() -> List[Path]: + """ + Create a list of the important files from the local project. I didn't want to + use git here, since that might not pick up untracked (but needed) files. + :return: List of source files that a project needs in order to compile + """ + local_proj_path = Path('.') + venv_dirs = list(local_proj_path.glob('*venv*/')) + # Don't link the .pio directory because the builds need to be independent + pio_dirs = list(local_proj_path.glob('*.pio*/')) + cmake_dirs = list(local_proj_path.glob('*cmake-build*/')) + + local_dirs_to_not_link = [Path('.git/'), Path('build_cache/')] + venv_dirs + pio_dirs + cmake_dirs + local_filenames_to_not_link = [ + Path('Configuration_local.hpp'), + Path('Configuration_local_matrix.hpp'), + ] + + local_paths_to_link = [] + for local_dir_str, local_subdirs, local_files in os.walk(local_proj_path): + local_dir_path = Path(local_dir_str) + dir_shouldnt_be_linked = any(d == local_dir_path or d in local_dir_path.parents for d in local_dirs_to_not_link) + if dir_shouldnt_be_linked: + continue + for local_file in local_files: + local_file_full_path = Path(local_dir_path, local_file) + file_shouldnt_be_linked = any(local_file_full_path == f for f in local_filenames_to_not_link) + if file_shouldnt_be_linked: + continue + local_paths_to_link.append(local_file_full_path) + return local_paths_to_link + + +def wait_for_executor_to_finish(executor_list: List[Executor], timeout=0.1, poll_time=0.2): + """ + Block until an executor has finished building + :param executor_list: List of executors + :param timeout: Time to communicate() with the running process (kind of a hack) + :param poll_time: Time to wait before checking all executors again + """ + while get_finished_executor_idx(executor_list) is None: + for e in executor_list: + if e.proc is not None and e.proc.poll() is None: + # Communicate with the running processes to stop them from blocking + # (i.e. they spew too much output) + try: + _ = e.proc.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + pass # This is expected and what should happen + time.sleep(poll_time) diff --git a/post_script_remove_patched_files.py b/post_script_remove_patched_files.py index 73735819..bc7fcb61 100644 --- a/post_script_remove_patched_files.py +++ b/post_script_remove_patched_files.py @@ -2,6 +2,7 @@ import os import tempfile +from pathlib import Path def cprint(*args, **kwargs): @@ -13,7 +14,13 @@ def clean_up_patched_files(*_, **__): Removes all temporary patched files created previously in the build process """ # patch_path_key needs to be kept in sync with pre_script_patch_debug.py - patch_path_key = '_patched_' + # We put the current directory name in the key so that we only remove + # patched files that we know were built by the current build process. + # This is only useful in safeguarding against multiple builds being done in + # different directories at the same time. (i.e. we don't want to remove another + # processes' files while they are still in use) + project_dir_name = Path.cwd().name + patch_path_key = f'_{project_dir_name}_patched_' tempdir_path = tempfile.gettempdir() cprint(f'Temp file dir is {tempdir_path}') patched_filepaths = [] @@ -23,7 +30,11 @@ def clean_up_patched_files(*_, **__): patched_filepaths.append(full_filepath) for patched_filepath in patched_filepaths: cprint(f'Removing {patched_filepath}') - os.remove(patched_filepath) + try: + os.remove(patched_filepath) + pass + except FileNotFoundError: + cprint('Not found (deleted already?)') env.AddPostAction('buildprog', clean_up_patched_files) diff --git a/pre_script_patch_debug.py b/pre_script_patch_debug.py index 5c166dd6..bad10b26 100644 --- a/pre_script_patch_debug.py +++ b/pre_script_patch_debug.py @@ -2,6 +2,7 @@ import os import tempfile +from pathlib import Path def cprint(*args, **kwargs): @@ -31,7 +32,8 @@ def patch_function_factory(src_path, output_suffix, replacement_list): def out_func(node): # patch_path_key needs to be kept in sync with post_script_remove_patched_files.py # so that after a successful build the patched file can be removed - patch_path_key = '_patched_' + project_dir_name = Path.cwd().name # See post_script_remove_patched_files.py on why this is needed + patch_path_key = f'_{project_dir_name}_patched_' with tempfile.NamedTemporaryFile(mode='w', suffix=f'{patch_path_key}{output_suffix}', delete=False) as tf: patched_filepath = tf.name cprint(f'Patching {src_path}')