From 58e289b2a6d96c9d521dc0c3ff87b733bd8b7ebb Mon Sep 17 00:00:00 2001 From: Aaron Berdy Date: Thu, 26 Oct 2023 11:09:38 -0700 Subject: [PATCH] feat: improve failure status reason (#117) --- base/jobs/docker/1.0/py3/requirements.txt | 2 +- pytorch/jobs/docker/2.0/py3/requirements.txt | 2 +- src/braket_container.py | 153 ++++++++++-------- .../jobs/docker/2.13/py3/requirements.txt | 2 +- test/unit_tests/test_braket_container.py | 132 +++++++-------- 5 files changed, 147 insertions(+), 144 deletions(-) diff --git a/base/jobs/docker/1.0/py3/requirements.txt b/base/jobs/docker/1.0/py3/requirements.txt index dd951578..00a1860f 100644 --- a/base/jobs/docker/1.0/py3/requirements.txt +++ b/base/jobs/docker/1.0/py3/requirements.txt @@ -1,7 +1,7 @@ amazon-braket-default-simulator==1.20.1 amazon-braket-schemas==1.19.1 amazon-braket-pennylane-plugin==1.21.0 -amazon-braket-sdk==1.58.0 +amazon-braket-sdk==1.59.1 awscli==1.29.53 botocore==1.31.53 boto3==1.28.53 diff --git a/pytorch/jobs/docker/2.0/py3/requirements.txt b/pytorch/jobs/docker/2.0/py3/requirements.txt index 94584074..bf0e9a4c 100644 --- a/pytorch/jobs/docker/2.0/py3/requirements.txt +++ b/pytorch/jobs/docker/2.0/py3/requirements.txt @@ -1,7 +1,7 @@ amazon-braket-default-simulator==1.20.1 amazon-braket-schemas==1.19.1 amazon-braket-pennylane-plugin==1.21.0 -amazon-braket-sdk==1.58.0 +amazon-braket-sdk==1.59.1 awscli==1.29.53 botocore==1.31.53 boto3==1.28.53 diff --git a/src/braket_container.py b/src/braket_container.py index f6291d6e..2f1a2264 100644 --- a/src/braket_container.py +++ b/src/braket_container.py @@ -10,12 +10,13 @@ # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. - +import contextlib import errno import importlib import inspect import os import json +import runpy import shutil import subprocess import sys @@ -38,10 +39,10 @@ print("Boto3 Version: ", boto3.__version__) -def log_failure_and_exit(*args): +def _log_failure(*args, display=True): """ Log failures to a file so that it can be parsed by the backend service and included in - failure messages for a job. Exists with code 0. + failure messages for a job. Args: args: variable list of text to write to the file. @@ -50,7 +51,19 @@ def log_failure_and_exit(*args): with open(ERROR_LOG_FILE, 'a') as error_log: for text in args: error_log.write(text) - print(text) + if display: + print(text) + + +def log_failure_and_exit(*args): + """ + Log failures to a file so that it can be parsed by the backend service and included in + failure messages for a job. Exists with code 0. + + Args: + args: variable list of text to write to the file. + """ + _log_failure(*args) sys.exit(0) @@ -136,34 +149,6 @@ def unpack_code_and_add_to_path(local_s3_file: str, compression_type: str): sys.path.append(EXTRACTED_CUSTOMER_CODE_PATH) -def kick_off_customer_script(entry_point: str) -> multiprocessing.Process: - """ - Runs the customer script as a separate process. - - Args: - entry_point (str): the entry point to the customer code, represented as :. - - Returns: - Process: the process handle to the running process. - """ - try: - str_module, _, str_method = entry_point.partition(":") - customer_module = importlib.import_module(str_module) - customer_method = getattr(customer_module, str_method) - - process_kwargs = {"target": customer_method} - - function_args = try_bind_hyperparameters_to_customer_method(customer_method) - if function_args is not None: - process_kwargs["kwargs"] = function_args - - customer_code_process = multiprocessing.Process(**process_kwargs) - customer_code_process.start() - except Exception as e: - log_failure_and_exit(f"Unable to run job at entry point {entry_point}\nException: {e}") - return customer_code_process - - def try_bind_hyperparameters_to_customer_method(customer_method: Callable): hp_file = os.getenv("AMZN_BRAKET_HP_FILE") if hp_file is None: @@ -186,19 +171,6 @@ def try_bind_hyperparameters_to_customer_method(customer_method: Callable): return function_args -def join_customer_script(customer_code_process: multiprocessing.Process): - """ - Joins the process running the customer code. - - Args: - customer_code_process (Process): the process running the customer code. - """ - try: - customer_code_process.join() - except Exception as e: - log_failure_and_exit(f"Job did not exit gracefully.\nException: {e}") - - def get_code_setup_parameters() -> Tuple[str, str, str]: """ Returns the code setup parameters: @@ -254,41 +226,84 @@ def install_additional_requirements() -> None: log_failure_and_exit(f"Unable to install requirements.\nException: {e}") -def run_customer_code_as_process(entry_point: str) -> int: +def extract_customer_code(entry_point: str) -> Callable: + """ + Converts entry point to a runnable function. + """ + if entry_point.find(":") >= 0: + str_module, _, str_method = entry_point.partition(":") + customer_module = importlib.import_module(str_module) + customer_code = getattr(customer_module, str_method) + else: + def customer_code(): + # equivalent to `python -m entry_point` + return runpy.run_module(entry_point, run_name="__main__") + return customer_code + + +@contextlib.contextmanager +def in_extracted_code_dir(): + current_dir = os.getcwd() + try: + os.chdir(EXTRACTED_CUSTOMER_CODE_PATH) + yield + finally: + os.chdir(current_dir) + + +def wrap_customer_code(customer_method: Callable) -> Callable: + def wrapped_customer_code(**kwargs): + try: + with in_extracted_code_dir(): + return customer_method(**kwargs) + except Exception as e: + exception_type = type(e).__name__ + exception_string = ( + exception_type + if not str(e) + else f"{exception_type}: {e}" + ) + _log_failure(exception_string, display=False) + raise e + return wrapped_customer_code + + +def kick_off_customer_script(customer_code: Callable) -> multiprocessing.Process: """ - When provided the name of the package and the method to run, we run them as a process. + Runs the customer script as a separate process. Args: - entry_point (str): the code to run in the format :. + customer_code (Callable): The customer method to be run. Returns: - int: The exit code of the customer code run. + Process: the process handle to the running process. """ print("Running Code As Process") - customer_code_process = kick_off_customer_script(entry_point) - join_customer_script(customer_code_process) - print("Code Run Finished") - return customer_code_process.exitcode + wrapped_customer_code = wrap_customer_code(customer_code) + process_kwargs = {"target": wrapped_customer_code} + + function_args = try_bind_hyperparameters_to_customer_method(customer_code) + if function_args is not None: + process_kwargs["kwargs"] = function_args + customer_code_process = multiprocessing.Process(**process_kwargs) + customer_code_process.start() + return customer_code_process -def run_customer_code_as_subprocess(entry_point: str) -> int: + +def join_customer_script(customer_code_process: multiprocessing.Process): """ - When provided just the name of the module to run, we run it as a subprocess. + Joins the process running the customer code. Args: - entry_point (str): the name of the module to run. - - Returns: - int: The exit code of the customer code run. + customer_code_process (Process): the process running the customer code. """ - print("Running Code As Subprocess") try: - result = subprocess.run(["python", "-m", entry_point], cwd=EXTRACTED_CUSTOMER_CODE_PATH) + customer_code_process.join() except Exception as e: - log_failure_and_exit(f"Unable to run job at entry point {entry_point}\nException: {e}") + log_failure_and_exit(f"Job did not exit gracefully.\nException: {e}") print("Code Run Finished") - return_code = result.returncode - return return_code + return customer_code_process.exitcode def run_customer_code() -> None: @@ -301,12 +316,10 @@ def run_customer_code() -> None: local_s3_file = download_customer_code(s3_uri) unpack_code_and_add_to_path(local_s3_file, compression_type) install_additional_requirements() - if entry_point.find(":") >= 0: - exit_code = run_customer_code_as_process(entry_point) - else: - exit_code = run_customer_code_as_subprocess(entry_point) - if exit_code != 0: - log_failure_and_exit(f"Job at {entry_point} exited with exit code: {exit_code}") + customer_executable = extract_customer_code(entry_point) + customer_process = kick_off_customer_script(customer_executable) + if (exit_code := join_customer_script(customer_process)) != 0: + sys.exit(exit_code) def setup_and_run(): diff --git a/tensorflow/jobs/docker/2.13/py3/requirements.txt b/tensorflow/jobs/docker/2.13/py3/requirements.txt index 4d138a85..1bd1b7fb 100644 --- a/tensorflow/jobs/docker/2.13/py3/requirements.txt +++ b/tensorflow/jobs/docker/2.13/py3/requirements.txt @@ -1,7 +1,7 @@ amazon-braket-default-simulator==1.20.1 amazon-braket-schemas==1.19.1 amazon-braket-pennylane-plugin==1.21.0 -amazon-braket-sdk==1.58.0 +amazon-braket-sdk==1.59.1 awscli==1.29.53 botocore==1.31.53 boto3==1.28.53 diff --git a/test/unit_tests/test_braket_container.py b/test/unit_tests/test_braket_container.py index 98ba7925..ff88fea4 100644 --- a/test/unit_tests/test_braket_container.py +++ b/test/unit_tests/test_braket_container.py @@ -1,5 +1,8 @@ +import importlib import json +import os import re +import tempfile from pathlib import Path from unittest import mock from urllib.parse import urlparse @@ -16,6 +19,9 @@ setup_and_run, try_bind_hyperparameters_to_customer_method, install_additional_requirements, + run_customer_code, + wrap_customer_code, + EXTRACTED_CUSTOMER_CODE_PATH, ) @@ -174,90 +180,74 @@ def test_install_additional_requirements(mock_os, mock_subprocess, file_walk_res assert mock_subprocess.run.call_count == 1 -@pytest.mark.parametrize( - "expected_return_value", [0, 1] -) -@mock.patch('src.braket_container.log_failure_and_exit') -@mock.patch('src.braket_container.subprocess') -@mock.patch('src.braket_container.get_code_setup_parameters') -@mock.patch('src.braket_container.shutil') -@mock.patch('src.braket_container.boto3') -@mock.patch('pathlib._normal_accessor.mkdir') -@mock.patch('src.braket_container.os') -@mock.patch('src.braket_container.sys') -def test_setup_and_run_as_subprocess( - mock_sys, - mock_os, - mock_mkdir, - mock_boto, - mock_shutil, - mock_get_code_setup, - mock_subprocess, - mock_log_failure, - expected_return_value -): - # Setup - mock_os.getenv.return_value = "" - mock_get_code_setup.return_value = "s3://test_bucket/test_location", "test_entry_point", None - run_result_object = mock.MagicMock() - run_result_object.returncode = expected_return_value - mock_subprocess.run.return_value = run_result_object - - # Act - setup_and_run() - - # Assert - mock_subprocess.run.assert_called_with( - ["python", "-m", "test_entry_point"], cwd='/opt/braket/code/customer_code/extracted', - ) - if expected_return_value != 0: - mock_log_failure.assert_called() +def customer_function(): + print("Hello") + return 0 -@pytest.mark.parametrize( - "expected_return_value", [0, 1] -) -@mock.patch('src.braket_container.log_failure_and_exit') +@mock.patch('src.braket_container.wrap_customer_code') @mock.patch('src.braket_container.multiprocessing') @mock.patch('src.braket_container.importlib') @mock.patch('src.braket_container.get_code_setup_parameters') @mock.patch('src.braket_container.shutil') @mock.patch('src.braket_container.boto3') @mock.patch('pathlib._normal_accessor.mkdir') -@mock.patch('src.braket_container.os') +@mock.patch('src.braket_container.os.getenv') @mock.patch('src.braket_container.sys') -def test_setup_and_run_as_process( - mock_sys, - mock_os, - mock_mkdir, - mock_boto, - mock_shutil, - mock_get_code_setup, - mock_importlib, - mock_process, - mock_log_failure, - expected_return_value, - hyperparameters_json, +def test_run_customer_code_function( + mock_sys, + mock_getenv, + mock_mkdir, + mock_boto, + mock_shutil, + mock_get_code_setup, + mock_importlib, + mock_mp, + mock_wrap_code, + hyperparameters_json, ): - # Setup - mock_os.getenv = lambda x: ( + mock_getenv.side_effect = lambda x, y = None: ( "hyperparameters.json" if x == "AMZN_BRAKET_HP_FILE" - else "" + else y or "" ) - mock_get_code_setup.return_value = "s3://test_bucket/test_location", "test_module:test_function", None - mock_process_object = mock.MagicMock() - mock_process.Process.return_value = mock_process_object - mock_process_object.exitcode = expected_return_value - - # Act - setup_and_run() - - # Assert - mock_process_object.start.assert_called() - mock_process_object.join.assert_called() - if expected_return_value != 0: - mock_log_failure.assert_called() + mock_get_code_setup.return_value = ( + "s3://test_bucket/test_location", + "test_module:customer_function", + None, + ) + mock_process = mock.MagicMock() + mock_mp.Process.return_value = mock_process + + run_customer_code() + + mock_wrap_code.assert_called_with(mock_importlib.import_module.return_value.customer_function) + mock_mp.Process.assert_called_with(target=mock_wrap_code.return_value, kwargs={}) + mock_process.start.assert_called_with() + mock_process.join.assert_called_with() + + +def customer_function_fails(): + open("fake_file") + + +@mock.patch('src.braket_container._log_failure') +@mock.patch('os.chdir') +def test_wrapped_function_logs_failure(mock_cd, mock_log): + wrapped = wrap_customer_code(customer_function_fails) + + file_not_found = re.escape("[Errno 2] No such file or directory: 'fake_file'") + with pytest.raises(FileNotFoundError, match=file_not_found): + wrapped() + + mock_cd.called_with(EXTRACTED_CUSTOMER_CODE_PATH) + mock_cd.called_with(os.getcwd()) + mock_log.assert_called_with( + "FileNotFoundError: [Errno 2] No such file or directory: 'fake_file'", + display=False, + ) + + def customer_method_no_args():