diff --git a/config.yml b/config.yml index f06ab9f..933764e 100644 --- a/config.yml +++ b/config.yml @@ -95,6 +95,14 @@ globus: client_id: ${GLOBUS_CLIENT_ID} client_secret: ${GLOBUS_CLIENT_SECRET} +harbor_images832: + recon_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c + multires_image: tomorecon_nersc_mpi_hdf5@sha256:cc098a2cfb6b1632ea872a202c66cb7566908da066fd8f8c123b92fa95c2a43c + +ghcr_images832: + recon_image: ghcr.io/als-computing/microct:master + multires_image: ghcr.io/als-computing/microct:master + prefect: deployments: - type_spec: new_file_832 diff --git a/create_deployments_832_nersc.sh b/create_deployments_832_nersc.sh new file mode 100755 index 0000000..4fd437f --- /dev/null +++ b/create_deployments_832_nersc.sh @@ -0,0 +1,10 @@ +export $(grep -v '^#' .env | xargs) + +# create 'nersc_flow_pool' +prefect work-pool create 'nersc_flow_pool' + +# nersc_flow_pool + # in docker-compose.yaml: + # command: prefect agent start --pool "nersc_flow_pool" +prefect deployment build ./orchestration/flows/bl832/nersc.py:nersc_recon_flow -n nersc_recon_flow -p nersc_flow_pool -q nersc_recon_flow_queue +prefect deployment apply nersc_recon_flow-deployment.yaml diff --git a/orchestration/_tests/test_sfapi_flow.py b/orchestration/_tests/test_sfapi_flow.py new file mode 100644 index 0000000..eedf659 --- /dev/null +++ b/orchestration/_tests/test_sfapi_flow.py @@ -0,0 +1,285 @@ +# orchestration/_tests/test_sfapi_flow.py + +import pytest +from unittest.mock import MagicMock, patch, mock_open +from pathlib import Path +from uuid import uuid4 +from prefect.blocks.system import Secret +from prefect.testing.utilities import prefect_test_harness + + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """ + A pytest fixture that automatically sets up and tears down the Prefect test harness + for the entire test session. It creates and saves test secrets and configurations + required for Globus integration. + + Yields: + None + """ + with prefect_test_harness(): + globus_client_id = Secret(value=str(uuid4())) + globus_client_id.save(name="globus-client-id") + globus_client_secret = Secret(value=str(uuid4())) + globus_client_secret.save(name="globus-client-secret") + + yield + + +# ---------------------------- +# Tests for create_sfapi_client +# ---------------------------- + + +def test_create_sfapi_client_success(): + """ + Test successful creation of the SFAPI client. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + + # Mock data for client_id and client_secret files + mock_client_id = 'value' + mock_client_secret = '{"key": "value"}' + + # Create separate mock_open instances for each file + mock_open_client_id = mock_open(read_data=mock_client_id) + mock_open_client_secret = mock_open(read_data=mock_client_secret) + + with patch("orchestration.flows.bl832.nersc.os.getenv") as mock_getenv, \ + patch("orchestration.flows.bl832.nersc.os.path.isfile") as mock_isfile, \ + patch("builtins.open", side_effect=[ + mock_open_client_id.return_value, + mock_open_client_secret.return_value + ]), \ + patch("orchestration.flows.bl832.nersc.JsonWebKey.import_key") as mock_import_key, \ + patch("orchestration.flows.bl832.nersc.Client") as MockClient: + + # Mock environment variables + mock_getenv.side_effect = lambda x: { + "PATH_NERSC_CLIENT_ID": "/path/to/client_id", + "PATH_NERSC_PRI_KEY": "/path/to/client_secret" + }.get(x, None) + + # Mock file existence + mock_isfile.return_value = True + + # Mock JsonWebKey.import_key to return a mock secret + mock_import_key.return_value = "mock_secret" + + # Create the client + client = NERSCTomographyHPCController.create_sfapi_client() + + # Assert that Client was instantiated with 'value' and 'mock_secret' + MockClient.assert_called_once_with("value", "mock_secret") + + # Assert that the returned client is the mocked client + assert client == MockClient.return_value, "Client should be the mocked sfapi_client.Client instance" + + +def test_create_sfapi_client_missing_paths(): + """ + Test creation of the SFAPI client with missing credential paths. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + + with patch("orchestration.flows.bl832.nersc.os.getenv", return_value=None): + with pytest.raises(ValueError, match="Missing NERSC credentials paths."): + NERSCTomographyHPCController.create_sfapi_client() + + +def test_create_sfapi_client_missing_files(): + """ + Test creation of the SFAPI client with missing credential files. + """ + with ( + # Mock environment variables + patch( + "orchestration.flows.bl832.nersc.os.getenv", + side_effect=lambda x: { + "PATH_NERSC_CLIENT_ID": "/path/to/client_id", + "PATH_NERSC_PRI_KEY": "/path/to/client_secret" + }.get(x, None) + ), + + # Mock file existence to simulate missing files + patch("orchestration.flows.bl832.nersc.os.path.isfile", return_value=False) + ): + # Import the module after applying patches to ensure mocks are in place + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + + # Expect a FileNotFoundError due to missing credential files + with pytest.raises(FileNotFoundError, match="NERSC credential files are missing."): + NERSCTomographyHPCController.create_sfapi_client() + +# ---------------------------- +# Fixture for Mocking SFAPI Client +# ---------------------------- + + +@pytest.fixture +def mock_sfapi_client(): + """ + Mock the sfapi_client.Client class with necessary methods. + """ + with patch("orchestration.flows.bl832.nersc.Client") as MockClient: + mock_client_instance = MockClient.return_value + + # Mock the user method + mock_user = MagicMock() + mock_user.name = "testuser" + mock_client_instance.user.return_value = mock_user + + # Mock the compute method to return a mocked compute object + mock_compute = MagicMock() + mock_job = MagicMock() + mock_job.jobid = "12345" + mock_job.state = "COMPLETED" + mock_compute.submit_job.return_value = mock_job + mock_client_instance.compute.return_value = mock_compute + + yield mock_client_instance + + +# ---------------------------- +# Fixture for Mocking Config832 +# ---------------------------- + +@pytest.fixture +def mock_config832(): + """ + Mock the Config832 class to provide necessary configurations. + """ + with patch("orchestration.flows.bl832.nersc.Config832") as MockConfig: + mock_config = MockConfig.return_value + mock_config.harbor_images832 = { + "recon_image": "mock_recon_image", + "multires_image": "mock_multires_image", + } + mock_config.apps = {"als_transfer": "some_config"} + yield mock_config + + +# ---------------------------- +# Tests for NERSCTomographyHPCController +# ---------------------------- + +def test_reconstruct_success(mock_sfapi_client, mock_config832): + """ + Test successful reconstruction job submission. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + from sfapi_client.compute import Machine + + controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832) + file_path = "path/to/file.h5" + + with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None): + result = controller.reconstruct(file_path=file_path) + + # Verify that compute was called with Machine.perlmutter + mock_sfapi_client.compute.assert_called_once_with(Machine.perlmutter) + + # Verify that submit_job was called once + mock_sfapi_client.compute.return_value.submit_job.assert_called_once() + + # Verify that complete was called on the job + mock_sfapi_client.compute.return_value.submit_job.return_value.complete.assert_called_once() + + # Assert that the method returns True + assert result is True, "reconstruct should return True on successful job completion." + + +def test_reconstruct_submission_failure(mock_sfapi_client, mock_config832): + """ + Test reconstruction job submission failure. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + + controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832) + file_path = "path/to/file.h5" + + # Simulate submission failure + mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("Submission failed") + + with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None): + result = controller.reconstruct(file_path=file_path) + + # Assert that the method returns False + assert result is False, "reconstruct should return False on submission failure." + + +def test_build_multi_resolution_success(mock_sfapi_client, mock_config832): + """ + Test successful multi-resolution job submission. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + from sfapi_client.compute import Machine + + controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832) + file_path = "path/to/file.h5" + + with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None): + result = controller.build_multi_resolution(file_path=file_path) + + # Verify that compute was called with Machine.perlmutter + mock_sfapi_client.compute.assert_called_once_with(Machine.perlmutter) + + # Verify that submit_job was called once + mock_sfapi_client.compute.return_value.submit_job.assert_called_once() + + # Verify that complete was called on the job + mock_sfapi_client.compute.return_value.submit_job.return_value.complete.assert_called_once() + + # Assert that the method returns True + assert result is True, "build_multi_resolution should return True on successful job completion." + + +def test_build_multi_resolution_submission_failure(mock_sfapi_client, mock_config832): + """ + Test multi-resolution job submission failure. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + + controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=mock_config832) + file_path = "path/to/file.h5" + + # Simulate submission failure + mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("Submission failed") + + with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None): + result = controller.build_multi_resolution(file_path=file_path) + + # Assert that the method returns False + assert result is False, "build_multi_resolution should return False on submission failure." + + +def test_job_submission(mock_sfapi_client): + """ + Test job submission and status updates. + """ + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + from sfapi_client.compute import Machine + + controller = NERSCTomographyHPCController(client=mock_sfapi_client, config=MagicMock()) + file_path = "path/to/file.h5" + + # Mock Path to extract file and folder names + with patch.object(Path, 'parent', new_callable=MagicMock) as mock_parent, \ + patch.object(Path, 'stem', new_callable=MagicMock) as mock_stem: + mock_parent.name = "to" + mock_stem.return_value = "file" + + with patch("orchestration.flows.bl832.nersc.time.sleep", return_value=None): + controller.reconstruct(file_path=file_path) + + # Verify that compute was called with Machine.perlmutter + mock_sfapi_client.compute.assert_called_once_with(Machine.perlmutter) + + # Verify that submit_job was called once + mock_sfapi_client.compute.return_value.submit_job.assert_called_once() + + # Verify the returned job has the expected attributes + submitted_job = mock_sfapi_client.compute.return_value.submit_job.return_value + assert submitted_job.jobid == "12345", "Job ID should match the mock job ID." + assert submitted_job.state == "COMPLETED", "Job state should be COMPLETED." diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 57de5f4..586b1af 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -21,3 +21,4 @@ def __init__(self) -> None: self.alcf832_raw = self.endpoints["alcf832_raw"] self.alcf832_scratch = self.endpoints["alcf832_scratch"] self.scicat = config["scicat"] + self.ghcr_images832 = config["ghcr_images832"] diff --git a/orchestration/flows/bl832/job_controller.py b/orchestration/flows/bl832/job_controller.py new file mode 100644 index 0000000..1adf1bd --- /dev/null +++ b/orchestration/flows/bl832/job_controller.py @@ -0,0 +1,112 @@ +from abc import ABC, abstractmethod +from dotenv import load_dotenv +from enum import Enum +import logging + +from orchestration.flows.bl832.config import Config832 + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +load_dotenv() + + +class TomographyHPCController(ABC): + """ + Abstract class for tomography HPC controllers. + Provides interface methods for reconstruction and building multi-resolution datasets. + + Args: + ABC: Abstract Base Class + """ + def __init__( + self, + config: Config832 + ) -> None: + pass + + @abstractmethod + def reconstruct( + self, + file_path: str = "", + ) -> bool: + """Perform tomography reconstruction + + :param file_path: Path to the file to reconstruct. + :return: True if successful, False otherwise. + """ + pass + + @abstractmethod + def build_multi_resolution( + self, + file_path: str = "", + ) -> bool: + """Generate multi-resolution version of reconstructed tomography + + :param file_path: Path to the file for which to build multi-resolution data. + :return: True if successful, False otherwise. + """ + pass + + +class HPC(Enum): + """ + Enum representing different HPC environments. + Use enum names as strings to identify HPC sites, ensuring a standard set of values. + + Members: + ALCF: Argonne Leadership Computing Facility + NERSC: National Energy Research Scientific Computing Center + """ + ALCF = "ALCF" + NERSC = "NERSC" + + +def get_controller( + hpc_type: HPC, + config: Config832 +) -> TomographyHPCController: + """ + Factory function that returns an HPC controller instance for the given HPC environment. + + :param hpc_type: A string identifying the HPC environment (e.g., 'ALCF', 'NERSC'). + :return: An instance of a TomographyHPCController subclass corresponding to the given HPC environment. + :raises ValueError: If an invalid or unsupported HPC type is specified. + """ + if not isinstance(hpc_type, HPC): + raise ValueError(f"Invalid HPC type provided: {hpc_type}") + + if not config: + raise ValueError("Config object is required.") + + if hpc_type == HPC.ALCF: + from orchestration.flows.bl832.alcf import ALCFTomographyHPCController + return ALCFTomographyHPCController() + elif hpc_type == HPC.NERSC: + from orchestration.flows.bl832.nersc import NERSCTomographyHPCController + return NERSCTomographyHPCController( + client=NERSCTomographyHPCController.create_sfapi_client(), + config=config + ) + else: + raise ValueError(f"Unsupported HPC type: {hpc_type}") + + +def do_it_all() -> None: + controller = get_controller("ALCF") + controller.reconstruct() + controller.build_multi_resolution() + + file_path = "" + controller = get_controller("NERSC") + controller.reconstruct( + file_path=file_path, + ) + controller.build_multi_resolution( + file_path=file_path, + ) + + +if __name__ == "__main__": + do_it_all() + logger.info("Done.") diff --git a/orchestration/flows/bl832/nersc.py b/orchestration/flows/bl832/nersc.py new file mode 100644 index 0000000..3a9d559 --- /dev/null +++ b/orchestration/flows/bl832/nersc.py @@ -0,0 +1,280 @@ +from dotenv import load_dotenv +import json +import logging +import os +from pathlib import Path +from prefect import flow +import re +import time + +from authlib.jose import JsonWebKey +from sfapi_client import Client +from sfapi_client.compute import Machine + +from orchestration.flows.bl832.config import Config832 +from orchestration.flows.bl832.job_controller import get_controller, HPC, TomographyHPCController + + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +load_dotenv() + + +class NERSCTomographyHPCController(TomographyHPCController): + """ + Implementation for a NERSC-based tomography HPC controller. + + Submits reconstruction and multi-resolution jobs to NERSC via SFAPI. + """ + + def __init__( + self, + client: Client, + config: Config832 + ) -> None: + self.client = client + self.config = config + + @staticmethod + def create_sfapi_client() -> Client: + """Create and return an NERSC client instance""" + + client_id_path = os.getenv("PATH_NERSC_CLIENT_ID") + client_secret_path = os.getenv("PATH_NERSC_PRI_KEY") + + if not client_id_path or not client_secret_path: + logger.error("NERSC credentials paths are missing.") + raise ValueError("Missing NERSC credentials paths.") + if not os.path.isfile(client_id_path) or not os.path.isfile(client_secret_path): + logger.error("NERSC credential files are missing.") + raise FileNotFoundError("NERSC credential files are missing.") + + client_id = None + client_secret = None + with open(client_id_path, "r") as f: + client_id = f.read() + + with open(client_secret_path, "r") as f: + client_secret = JsonWebKey.import_key(json.loads(f.read())) + + try: + client = Client(client_id, client_secret) + logger.info("NERSC client created successfully.") + return client + except Exception as e: + logger.error(f"Failed to create NERSC client: {e}") + raise e + + def reconstruct( + self, + file_path: str = "", + ) -> bool: + """ + Use NERSC for tomography reconstruction + """ + logger.info("Starting NERSC reconstruction process.") + + user = self.client.user() + + home_path = f"/global/homes/{user.name[0]}/{user.name}" + scratch_path = f"/pscratch/sd/{user.name[0]}/{user.name}" + logger.info(home_path) + logger.info(scratch_path) + + image_name = self.config.ghcr_images832["recon_image"] + + logger.info(image_name) + path = Path(file_path) + folder_name = path.parent.name + if not folder_name: + folder_name = "" + + file_name = f"{path.stem}.h5" + + logger.info(f"File name: {file_name}") + logger.info(f"Folder name: {folder_name}") + + # IMPORTANT: job script must be deindented to the leftmost column or it will fail immediately + # Note: If q=debug, there is no minimum time limit + # However, if q=preempt, there is a minimum time limit of 2 hours. Otherwise the job won't run. + + job_script = f"""#!/bin/bash +#SBATCH -q debug +#SBATCH -A als +#SBATCH -C cpu +#SBATCH --job-name=tomo_recon_test-0 +#SBATCH --output={scratch_path}/nerscClient-test/%x_%j.out +#SBATCH --error={scratch_path}/nerscClient-test/%x_%j.err +#SBATCH -N 1 +#SBATCH --ntasks-per-node 1 +#SBATCH --cpus-per-task 64 +#SBATCH --time=0:15:00 +#SBATCH --exclusive + +date +srun podman-hpc run \ +--volume {home_path}/tomo_recon_repo/microct/legacy/sfapi_reconstruction.py:/alsuser/sfapi_reconstruction.py \ +--volume {scratch_path}/microctdata:/alsdata \ +--volume {scratch_path}/microctdata:/alsuser/ \ +{image_name} \ +bash -c "python sfapi_reconstruction.py {file_name} {folder_name}" +date +""" + + try: + logger.info("Submitting reconstruction job script to Perlmutter.") + perlmutter = self.client.compute(Machine.perlmutter) + job = perlmutter.submit_job(job_script) + logger.info(f"Submitted job ID: {job.jobid}") + + try: + job.update() + except Exception as update_err: + logger.warning(f"Initial job update failed, continuing: {update_err}") + + time.sleep(60) + logger.info(f"Job {job.jobid} current state: {job.state}") + + job.complete() # Wait until the job completes + logger.info("Reconstruction job completed successfully.") + return True + + except Exception as e: + logger.info(f"Error during job submission or completion: {e}") + match = re.search(r"Job not found:\s*(\d+)", str(e)) + + if match: + jobid = match.group(1) + logger.info(f"Attempting to recover job {jobid}.") + try: + job = self.client.perlmutter.job(jobid=jobid) + time.sleep(30) + job.complete() + logger.info("Reconstruction job completed successfully after recovery.") + return True + except Exception as recovery_err: + logger.error(f"Failed to recover job {jobid}: {recovery_err}") + return False + else: + # Unknown error: cannot recover + return False + + def build_multi_resolution( + self, + file_path: str = "", + ) -> bool: + """Use NERSC to make multiresolution version of tomography results.""" + + user = self.client.user() + + home_path = f"/global/homes/{user.name[0]}/{user.name}" + scratch_path = f"/pscratch/sd/{user.name[0]}/{user.name}" + logger.info(home_path) + logger.info(scratch_path) + + image_name = self.config.ghcr_images832["multires_image"] + + path = Path(file_path) + folder_name = path.parent.name + file_name = path.stem + + recon_path = f"scratch/{folder_name}/rec{file_name}/" + raw_path = f"{folder_name}/{file_name}.h5" + + # IMPORTANT: job script must be deindented to the leftmost column or it will fail immediately + job_script = f"""#!/bin/bash +#SBATCH -q debug +#SBATCH -A als +#SBATCH -C cpu +#SBATCH --job-name=tomo_multires_test-0 +#SBATCH --output={scratch_path}/nerscClient-test/%x_%j.out +#SBATCH --error={scratch_path}/nerscClient-test/%x_%j.err +#SBATCH -N 1 +#SBATCH --ntasks-per-node 1 +#SBATCH --cpus-per-task 64 +#SBATCH --time=0:15:00 +#SBATCH --exclusive + +date +srun podman-hpc run --volume {home_path}/tomo_recon_repo/microct/legacy/tiff_to_zarr.py:/alsuser/tiff_to_zarr.py \ +--volume {home_path}/tomo_recon_repo/microct/legacy/input.txt:/alsuser/input.txt \ +--volume {scratch_path}/microctdata:/alsdata \ +--volume {scratch_path}/microctdata:/alsuser/ \ +{image_name} \ +bash -c "python tiff_to_zarr.py {recon_path} --raw_file {raw_path}" + +date +""" + try: + logger.info("Submitting Tiff to Zarr job script to Perlmutter.") + perlmutter = self.client.compute(Machine.perlmutter) + job = perlmutter.submit_job(job_script) + logger.info(f"Submitted job ID: {job.jobid}") + + try: + job.update() + except Exception as update_err: + logger.warning(f"Initial job update failed, continuing: {update_err}") + + time.sleep(60) + logger.info(f"Job {job.jobid} current state: {job.state}") + + job.complete() # Wait until the job completes + logger.info("Reconstruction job completed successfully.") + return True + + except Exception as e: + logger.warning(f"Error during job submission or completion: {e}") + match = re.search(r"Job not found:\s*(\d+)", str(e)) + + if match: + jobid = match.group(1) + logger.info(f"Attempting to recover job {jobid}.") + try: + job = self.client.perlmutter.job(jobid=jobid) + time.sleep(30) + job.complete() + logger.info("Reconstruction job completed successfully after recovery.") + return True + except Exception as recovery_err: + logger.error(f"Failed to recover job {jobid}: {recovery_err}") + return False + else: + return False + + +@flow(name="nersc_recon_flow") +def nersc_recon_flow( + file_path: str, + config: Config832, +) -> bool: + """ + Perform tomography reconstruction on NERSC. + + :param file_path: Path to the file to reconstruct. + """ + + # To do: Implement file transfers, pruning, and other necessary steps + + controller = get_controller( + hpc_type=HPC.NERSC, + config=config + ) + nersc_reconstruction_success = controller.reconstruct( + file_path=file_path, + ) + nersc_multi_res_success = controller.build_multi_resolution( + file_path=file_path, + ) + + if nersc_reconstruction_success and nersc_multi_res_success: + return True + else: + return False + + +if __name__ == "__main__": + nersc_recon_flow( + file_path="dabramov/20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast.h5", + config=Config832() + ) diff --git a/orchestration/nersc.py b/orchestration/nersc.py index 777b752..e751281 100644 --- a/orchestration/nersc.py +++ b/orchestration/nersc.py @@ -1,23 +1,30 @@ +''' +DEPRECATION WARNING: NerscClient is deprecated and will be removed when we refactor the ptychography code +''' + import json import logging -from pathlib import Path -import time +# from pathlib import Path +# import time -from authlib.integrations.requests_client import OAuth2Session -from authlib.oauth2.rfc7523 import PrivateKeyJWT +# from authlib.integrations.requests_client import OAuth2Session +# from authlib.oauth2.rfc7523 import PrivateKeyJWT from authlib.jose import JsonWebKey from sfapi_client import Client -from sfapi_client._sync.client import SFAPI_BASE_URL, SFAPI_TOKEN_URL +# from sfapi_client._sync.client import SFAPI_BASE_URL, SFAPI_TOKEN_URL from sfapi_client.compute import Machine # Temporary patch till the sfapi_client is updated from sfapi_client.jobs import JobSacct -from sfapi_client.compute import Compute +# from sfapi_client.compute import Compute JobSacct.model_rebuild() class NerscClient(Client): + ''' + DEPRECATION WARNING: NerscClient is deprecated and will be removed when we refactor the ptychography code + ''' def __init__( self, path_client_id, @@ -34,9 +41,8 @@ def __init__( # Reading the client_id and private key from the files self.client_id = None self.pri_key = None - #self.session = None + # self.session = None self.init_client_info() - super().__init__(self.client_id, self.pri_key) @@ -70,7 +76,7 @@ def init_client_info( ): self.get_client_id() self.get_private_key() - + def init_directory_paths(self): self.home_path = f"/global/homes/{self.user().name[0]}/{self.user().name}" self.scratch_path = f"/pscratch/sd/{self.user().name[0]}/{self.user().name}" @@ -80,16 +86,16 @@ def request_job_status(self): def update_job_id(self): if self.job is None: - self.logger.info(f"No job found") + self.logger.info("No job found") else: self.jobid = self.job.jobid def update_job_state(self): self.request_job_status() self.job_state = self.job.state - + if self.job_state == "RUNNING": - self.has_ran = True + self.has_ran = True elif self.job_state == "COMPLETE": self.logger.info(f"Job {self.jobid} with COMPLETE status") @@ -104,6 +110,5 @@ def submit_job(self, job_script): self.logger.info(f"Submitting job with script: {job_script}") self.job = self.perlmutter.submit_job(job_script) self.update_job_id() - #self.update_job_state() + # self.update_job_state() self.logger.info(f"Submitted job id: {self.jobid}") - diff --git a/requirements.txt b/requirements.txt index 88c2dd6..7252202 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ globus-sdk>=3.0 h5py httpx>=0.22.0 -numpy +numpy==1.23.2 pillow python-dotenv prefect==2.19.5