Skip to content

Commit

Permalink
Fix typing and formatting issues
Browse files Browse the repository at this point in the history
  • Loading branch information
BSchilperoort committed Jan 11, 2024
1 parent cb95e3c commit d7f39bd
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 39 deletions.
43 changes: 22 additions & 21 deletions PyStemmusScope/bmi/docker_process.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""The Docker STEMMUS_SCOPE model process wrapper."""
import os
import warnings
from pathlib import Path
from time import sleep
from typing import Any
import warnings
from PyStemmusScope.config_io import read_config
from pathlib import Path
import os


try:
import docker
Expand All @@ -30,7 +31,9 @@ def make_docker_vols_binds(cfg_file: str) -> tuple[list[str], list[str]]:
if not cfg_dir.is_relative_to(cfg["InputPath"]):
volumes.append(str(cfg_dir))
binds.append(f"{str(cfg_dir)}:{str(cfg_dir)}")
if (not Path(cfg["InputPath"]).is_relative_to(cfg_dir)) or (Path(cfg["InputPath"]) == cfg_dir):
if (not Path(cfg["InputPath"]).is_relative_to(cfg_dir)) or (
Path(cfg["InputPath"]) == cfg_dir
):
volumes.append(cfg["InputPath"])
binds.append(f"{cfg['InputPath']}:{cfg['InputPath']}")
if not Path(cfg["OutputPath"]).is_relative_to(cfg_dir):
Expand Down Expand Up @@ -80,8 +83,9 @@ def wait_for_model(phrase: bytes, socket: Any) -> None:

class StemmusScopeDocker:
"""Communicate with a STEMMUS_SCOPE Docker container."""

# Default image, can be overridden with config:
compatible_tags = ("1.5.0", )
compatible_tags = ("1.5.0",)

_process_ready_phrase = b"Select BMI mode:"
_process_finalized_phrase = b"Finished clean up."
Expand All @@ -104,53 +108,50 @@ def __init__(self, cfg_file: str):
detach=True,
user=os.getuid(), # ensure correct user for writing files.
volumes=vols,
host_config=self.client.create_host_config(binds=binds)
host_config=self.client.create_host_config(binds=binds),
)

self.running = False
def wait_for_model(self):

def wait_for_model(self) -> None:
"""Wait for the model to be ready to receive (more) commands."""
wait_for_model(self._process_ready_phrase, self.socket)
def is_alive(self):

def is_alive(self) -> bool:
"""Return if the process is alive."""
return self.running
def initialize(self):

def initialize(self) -> None:
"""Initialize the model and wait for it to be ready."""
if self.is_alive():
self.client.stop(self.container_id)

self.client.start(self.container_id)
self.socket = self.client.attach_socket(
self.container_id, {'stdin': 1, 'stdout': 1, 'stream':1}
self.container_id, {"stdin": 1, "stdout": 1, "stream": 1}
)
self.wait_for_model()
os.write(
self.socket.fileno(),
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8")
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8"),
)
self.wait_for_model()

self.running = True

def update(self):
def update(self) -> None:
"""Update the model and wait for it to be ready."""
if self.is_alive():
os.write(
self.socket.fileno(),
b'update\n'
)
os.write(self.socket.fileno(), b"update\n")
self.wait_for_model()
else:
msg = "Docker container is not alive. Please restart the model."
raise ConnectionError(msg)

def finalize(self):
def finalize(self) -> None:
"""Finalize the model."""
if self.is_alive():
os.write(self.socket.fileno(),b'finalize\n')
os.write(self.socket.fileno(), b"finalize\n")
wait_for_model(self._process_finalized_phrase, self.socket)
sleep(0.5) # Ensure the container can stop cleanly.
self.client.stop(self.container_id)
Expand Down
36 changes: 25 additions & 11 deletions PyStemmusScope/bmi/implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
import sys
from pathlib import Path
from typing import Literal, Protocol
from typing import Literal
from typing import Protocol
from typing import Union
import h5py
import numpy as np
Expand Down Expand Up @@ -124,7 +125,7 @@ def set_variable(
return state


def get_run_mode(config: dict) -> str:
def get_run_mode(config: dict) -> Literal["exe", "docker"]:
"""Get the run mode (docker or EXE) from the config file.
Args:
Expand All @@ -150,6 +151,7 @@ def get_run_mode(config: dict) -> str:

class StemmusScopeProcess(Protocol):
"""Protocol for communicating with the model process."""

def __init__(self, cfg_file: str) -> None:
"""Initialize the process class (e.g. create the container)."""
...
Expand All @@ -171,23 +173,26 @@ def finalize(self) -> None:
...


def load_process(mode: Literal["exe", "docker"]) -> type[StemmusScopeProcess]:
"""Load the right STEMMUS_SCOPE process."""
def start_process(mode: Literal["exe", "docker"], cfg_file: str) -> StemmusScopeProcess:
"""Start the right STEMMUS_SCOPE process."""
if mode == "docker":
try:
from PyStemmusScope.bmi.docker_process import StemmusScopeDocker as Process
from PyStemmusScope.bmi.docker_process import StemmusScopeDocker

return StemmusScopeDocker(cfg_file=cfg_file)
except ImportError as err:
msg = (
"The docker python package is not available."
" Please install before continuing."
)
raise ImportError(msg) from err
elif mode == "exe":
from PyStemmusScope.bmi.local_process import LocalStemmusScope as Process
from PyStemmusScope.bmi.local_process import LocalStemmusScope

return LocalStemmusScope(cfg_file=cfg_file)
else:
msg = "Unknown mode."
raise ValueError(msg)
return Process


class StemmusScopeBmi(InapplicableBmiMethods, Bmi):
Expand All @@ -199,7 +204,7 @@ class StemmusScopeBmi(InapplicableBmiMethods, Bmi):
state_file: Union[Path, None] = None

_run_mode: Union[str, None] = None
_process: Union[type[StemmusScopeProcess], None] = None
_process: Union[StemmusScopeProcess, None] = None

def initialize(self, config_file: str) -> None:
"""Perform startup tasks for the model.
Expand All @@ -213,15 +218,20 @@ def initialize(self, config_file: str) -> None:
self._run_mode = get_run_mode(self.config)
self.state_file = Path(self.config["OutputPath"]) / "STEMMUS_SCOPE_state.mat"

self._process = load_process(self._run_mode)(cfg_file=config_file)
self._process = start_process(self._run_mode, config_file)
self._process.initialize()

def update(self) -> None:
"""Advance the model state by one time step."""
if self.state is not None:
self.state = self.state.close() # Close file to allow matlab to write

self._process.update()
if self._process is not None:
self._process.update()
else:
msg = "The STEMMUS_SCOPE process is not running/connected. Can't update!"
raise ValueError(msg)

self.state = load_state(self.config)

def update_until(self, time: float) -> None:
Expand All @@ -235,7 +245,11 @@ def update_until(self, time: float) -> None:

def finalize(self) -> None:
"""Finalize the STEMMUS_SCOPE model."""
self._process.finalize()
if self._process is not None:
self._process.finalize()
else:
msg = "The STEMMUS_SCOPE process is not running/connected. Can't finalize!"
raise ValueError(msg)

def get_component_name(self) -> str:
"""Name of the component.
Expand Down
11 changes: 5 additions & 6 deletions PyStemmusScope/bmi/local_process.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""The local STEMMUS_SCOPE model process wrapper."""
from pathlib import Path
import os
import subprocess
from pathlib import Path
from typing import Union
from PyStemmusScope.config_io import read_config
import os


def is_alive(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
Expand Down Expand Up @@ -42,6 +42,7 @@ def find_exe(config: dict) -> str:

class LocalStemmusScope:
"""Communicate with the local STEMMUS_SCOPE executable file."""

def __init__(self, cfg_file: str) -> None:
"""Initialize the process."""
self.cfg_file = cfg_file
Expand All @@ -60,7 +61,7 @@ def __init__(self, cfg_file: str) -> None:
)

wait_for_model(self.matlab_process)

def is_alive(self) -> bool:
"""Return if the process is alive."""
try:
Expand All @@ -74,11 +75,10 @@ def initialize(self) -> None:
self.matlab_process = is_alive(self.matlab_process)

self.matlab_process.stdin.write( # type: ignore
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8")
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8")
)
wait_for_model(self.matlab_process)


def update(self) -> None:
"""Update the model and wait for it to be ready."""
if self.matlab_process is None:
Expand All @@ -89,7 +89,6 @@ def update(self) -> None:
self.matlab_process.stdin.write(b"update\n") # type: ignore
wait_for_model(self.matlab_process)


def finalize(self) -> None:
"""Finalize the model."""
self.matlab_process = is_alive(self.matlab_process)
Expand Down
2 changes: 1 addition & 1 deletion PyStemmusScope/soil_io.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Module for the soil data IO of PyStemmusScope."""
from pathlib import Path
from collections.abc import Iterable
from pathlib import Path
import hdf5storage
import numpy as np
import xarray as xr
Expand Down

0 comments on commit d7f39bd

Please sign in to comment.