diff --git a/pydra/engine/environments.py b/pydra/engine/environments.py index 8a45a9828a..52ca5338fd 100644 --- a/pydra/engine/environments.py +++ b/pydra/engine/environments.py @@ -4,10 +4,30 @@ class Environment: + """ + Base class for environments that are used to execute tasks. + Right now it is asssumed that the environment, including container images, + are available and are not removed at the end + TODO: add setup and teardown methods + """ + def setup(self): pass def execute(self, task): + """ + Execute the task in the environment. + + Parameters + ---------- + task : TaskBase + the task to execute + + Returns + ------- + output + Output of the task. + """ raise NotImplementedError def teardown(self): @@ -15,9 +35,11 @@ def teardown(self): class Native(Environment): + """ + Native environment, i.e. the tasks are executed in the current python environment. + """ + def execute(self, task): - # breakpoint() - # args = task.render_arguments_in_root() keys = ["return_code", "stdout", "stderr"] values = execute(task.command_args(), strip=task.strip) output = dict(zip(keys, values)) @@ -31,43 +53,65 @@ def execute(self, task): return output -class Docker(Environment): - def __init__(self, image, tag="latest", output_cpath="/output_pydra", xargs=None): +class Container(Environment): + """ + Base class for container environments used by Docker and Singularity. + + Parameters + ---------- + image : str + Name of the container image + tag : str + Tag of the container image + output_cpath : str + Path to the output directory in the container + xargs : Union[str, List[str]] + Extra arguments to be passed to the container + """ + + def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None): self.image = image self.tag = tag + if xargs is None: + xargs = [] + elif isinstance(xargs, str): + xargs = xargs.split() self.xargs = xargs - self.output_cpath = output_cpath + self.root = root @staticmethod - def bind(loc, mode="ro", root="/mnt/pydra"): # TODO - # XXX Failure mode: {loc} overwrites a critical directory in image - # To fix, we'll need to update any args within loc to a new location - # such as /mnt/pydra/loc + def bind(loc, mode="ro", root="/mnt/pydra"): loc_abs = Path(loc).absolute() - return f"{loc_abs}:{root}{loc_abs}:{mode}" # TODO: moving entire path? + return f"{loc_abs}:{root}{loc_abs}:{mode}" + - def execute(self, task, root="/mnt/pydra"): - # XXX Need to mount all input locations +class Docker(Container): + """Docker environment.""" + + def execute(self, task): docker_img = f"{self.image}:{self.tag}" - # TODO ? - # Skips over any inputs in task.cache_dir - # Needs to include `out_file`s when not relative to working dir - # Possibly a `TargetFile` type to distinguish between `File` and `str`? - mounts = task.get_bindings(root=root) + # mounting all input locations + mounts = task.get_bindings(root=self.root) # todo adding xargsy etc - docker_args = ["docker", "run", "-v", self.bind(task.cache_dir, "rw")] + docker_args = [ + "docker", + "run", + "-v", + self.bind(task.cache_dir, "rw", self.root), + ] + docker_args.extend(self.xargs) docker_args.extend( " ".join( [f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()] ).split() ) - docker_args.extend(["-w", f"{root}{task.output_dir}"]) + docker_args.extend(["-w", f"{self.root}{task.output_dir}"]) keys = ["return_code", "stdout", "stderr"] # print("\n Docker args", docker_args) values = execute( - docker_args + [docker_img] + task.command_args(root="/mnt/pydra"), + docker_args + [docker_img] + task.command_args(root=self.root), strip=task.strip, ) output = dict(zip(keys, values)) @@ -76,39 +120,35 @@ def execute(self, task, root="/mnt/pydra"): raise RuntimeError(output["stderr"]) else: raise RuntimeError(output["stdout"]) - # Any outputs that have been created with a re-rooted path need - # to be de-rooted - # task.finalize_outputs("/mnt/pydra") TODO: probably don't need it return output -class Singularity(Docker): - def execute(self, task, root="/mnt/pydra"): - # XXX Need to mount all input locations +class Singularity(Container): + """Singularity environment.""" + + def execute(self, task): singularity_img = f"{self.image}:{self.tag}" - # TODO ? - # Skips over any inputs in task.cache_dir - # Needs to include `out_file`s when not relative to working dir - # Possibly a `TargetFile` type to distinguish between `File` and `str`? - mounts = task.get_bindings(root=root) + # mounting all input locations + mounts = task.get_bindings(root=self.root) # todo adding xargsy etc singularity_args = [ "singularity", "exec", "-B", - self.bind(task.cache_dir, "rw"), + self.bind(task.cache_dir, "rw", self.root), ] + singularity_args.extend(self.xargs) singularity_args.extend( " ".join( [f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()] ).split() ) - singularity_args.extend(["--pwd", f"{root}{task.output_dir}"]) + singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"]) keys = ["return_code", "stdout", "stderr"] values = execute( - singularity_args + [singularity_img] + task.command_args(root="/mnt/pydra"), + singularity_args + [singularity_img] + task.command_args(root=self.root), strip=task.strip, ) output = dict(zip(keys, values)) @@ -117,7 +157,4 @@ def execute(self, task, root="/mnt/pydra"): raise RuntimeError(output["stderr"]) else: raise RuntimeError(output["stdout"]) - # Any outputs that have been created with a re-rooted path need - # to be de-rooted - # task.finalize_outputs("/mnt/pydra") TODO: probably don't need it return output diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index 9f53f108f6..a8a4a69b79 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -676,23 +676,6 @@ def _check_requires(self, fld, inputs): return False -@attr.s(auto_attribs=True, kw_only=True) -class ContainerSpec(ShellSpec): - """Refine the generic command-line specification to container execution.""" - - image: ty.Union[File, str] = attr.ib( - metadata={"help_string": "image", "mandatory": True} - ) - """The image to be containerized.""" - container: ty.Union[File, str, None] = attr.ib( - metadata={"help_string": "container"} - ) - """The container.""" - container_xargs: ty.Optional[ty.List[str]] = attr.ib( - default=None, metadata={"help_string": "todo"} - ) - - @attr.s class LazyInterface: _task: "core.TaskBase" = attr.ib() diff --git a/pydra/engine/task.py b/pydra/engine/task.py index fd98eeddc8..f977a32406 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -57,7 +57,6 @@ SpecInfo, ShellSpec, ShellOutSpec, - ContainerSpec, attr_fields, ) from .helpers import ( @@ -67,7 +66,7 @@ output_from_inputfields, parse_copyfile, ) -from .helpers_file import template_update, is_local_file +from .helpers_file import template_update from ..utils.typing import TypeParser from .environments import Native @@ -342,10 +341,7 @@ def command_args(self, root=None): pos_args = [] # list for (position, command arg) self._positions_provided = [] - for field in attr_fields( - self.inputs, - exclude_names=("container", "image", "container_xargs"), - ): + for field in attr_fields(self.inputs): name, meta = field.name, field.metadata if ( getattr(self.inputs, name) is attr.NOTHING @@ -527,13 +523,9 @@ def cmdline(self): self.inputs.check_fields_input_spec() if self.state: raise NotImplementedError - if isinstance(self, ContainerTask): - command_args = self.container_args + self.command_args() - else: - command_args = self.command_args() # Skip the executable, which can be a multi-part command, e.g. 'docker run'. - cmdline = command_args[0] - for arg in command_args[1:]: + cmdline = self.command_args()[0] + for arg in self.command_args()[1:]: # If there are spaces in the arg, and it is not enclosed by matching # quotes, add quotes to escape the space. Not sure if this should # be expanded to include other special characters apart from spaces @@ -556,11 +548,6 @@ def _prepare_bindings(self, root: str): """ for fld in attr_fields(self.inputs): if TypeParser.contains_type(FileSet, fld.type): - # Is container_path necessary? Container paths should just be typed PurePath - assert not fld.metadata.get("container_path") - # Should no longer happen with environments; assertion for testing purposes - # XXX: Remove before merge, so "image" can become a valid input file - assert not fld.name == "image" fileset = getattr(self.inputs, fld.name) copy = parse_copyfile(fld)[0] == FileSet.CopyMode.copy @@ -578,134 +565,6 @@ def _prepare_bindings(self, root: str): DEFAULT_COPY_COLLATION = FileSet.CopyCollation.adjacent -class ContainerTask(ShellCommandTask): - """Extend shell command task for containerized execution.""" - - def __init__( - self, - name, - audit_flags: AuditFlag = AuditFlag.NONE, - cache_dir=None, - input_spec: ty.Optional[SpecInfo] = None, - messenger_args=None, - messengers=None, - output_cpath="/output_pydra", - output_spec: ty.Optional[SpecInfo] = None, - rerun=False, - strip=False, - **kwargs, - ): - """ - Initialize this task. - - Parameters - ---------- - name : :obj:`str` - Name of this task. - audit_flags : :obj:`pydra.utils.messenger.AuditFlag` - Auditing configuration - cache_dir : :obj:`os.pathlike` - Cache directory - input_spec : :obj:`pydra.engine.specs.SpecInfo` - Specification of inputs. - messenger_args : - TODO - messengers : - TODO - output_cpath : :obj:`str` - Output path within the container filesystem. - output_spec : :obj:`pydra.engine.specs.BaseSpec` - Specification of inputs. - strip : :obj:`bool` - TODO - - """ - if input_spec is None: - input_spec = SpecInfo(name="Inputs", fields=[], bases=(ContainerSpec,)) - self.output_cpath = Path(output_cpath) - self.bindings = {} - super().__init__( - name=name, - input_spec=input_spec, - output_spec=output_spec, - audit_flags=audit_flags, - messengers=messengers, - messenger_args=messenger_args, - cache_dir=cache_dir, - strip=strip, - rerun=rerun, - **kwargs, - ) - - def _field_value(self, field, check_file=False): - """ - Checking value of the specific field, if value is not set, None is returned. - If check_file is True, checking if field is a local file - and settings bindings if needed. - """ - value = super()._field_value(field) - if value and check_file and is_local_file(field): - # changing path to the cpath (the directory should be mounted) - lpath = Path(str(value)) - cdir = self.bind_paths()[lpath.parent][0] - cpath = cdir.joinpath(lpath.name) - value = str(cpath) - return value - - def container_check(self, container_type): - """Get container-specific CLI arguments.""" - if self.inputs.container is None: - raise AttributeError("Container software is not specified") - elif self.inputs.container != container_type: - raise AttributeError( - f"Container type should be {container_type}, but {self.inputs.container} given" - ) - if self.inputs.image is attr.NOTHING: - raise AttributeError("Container image is not specified") - - def bind_paths(self): - """Get bound mount points - - Returns - ------- - mount points: dict - mapping from local path to tuple of container path + mode - """ - self._prepare_bindings() - return {**self.bindings, **{self.output_dir: (self.output_cpath, "rw")}} - - def binds(self, opt): - """ - Specify mounts to bind from local filesystems to container and working directory. - - Uses py:meth:`bind_paths` - - """ - bargs = [] - for lpath, (cpath, mode) in self.bind_paths().items(): - bargs.extend([opt, f"{lpath}:{cpath}:{mode}"]) - return bargs - - def _prepare_bindings(self): - fields = attr_fields(self.inputs) - for fld in fields: - if TypeParser.contains_type(FileSet, fld.type): - assert not fld.metadata.get( - "container_path" - ) # <-- Is container_path necessary, container paths should just be typed PurePath - if fld.name == "image": # <-- What is the image about? - continue - fileset = getattr(self.inputs, fld.name) - copy_mode, _ = parse_copyfile(fld) - container_path = Path(f"/pydra_inp_{fld.name}") - self.bindings[fileset.parent] = ( - container_path, - "rw" if copy_mode == FileSet.CopyMode.copy else "ro", - ) - - SUPPORTED_COPY_MODES = FileSet.CopyMode.any - FileSet.CopyMode.symlink - - def split_cmd(cmd: str): """Splits a shell command line into separate arguments respecting quotes diff --git a/pydra/engine/tests/test_environments.py b/pydra/engine/tests/test_environments.py index 40e6581fad..bd05d9daed 100644 --- a/pydra/engine/tests/test_environments.py +++ b/pydra/engine/tests/test_environments.py @@ -11,6 +11,7 @@ from .utils import no_win, need_docker, need_singularity import attr +import pytest def makedir(path, name): @@ -79,7 +80,15 @@ def test_docker_1(tmp_path): @no_win @need_docker -def test_docker_1_subm(tmp_path, plugin): +@pytest.mark.parametrize( + "docker", + [ + Docker(image="busybox"), + Docker(image="busybox", tag="latest", xargs="--rm"), + Docker(image="busybox", xargs=["--rm"]), + ], +) +def test_docker_1_subm(tmp_path, docker): """docker env with submitter: simple command, no arguments""" newcache = lambda x: makedir(tmp_path, x) @@ -97,14 +106,14 @@ def test_docker_1_subm(tmp_path, plugin): cache_dir=newcache("shelly_env"), environment=docker, ) - with Submitter(plugin=plugin) as sub: + with Submitter(plugin="cf") as sub: shelly_env(submitter=sub) assert env_res == shelly_env.result().output.__dict__ shelly_call = ShellCommandTask( name="shelly", executable=cmd, cache_dir=newcache("shelly_call") ) - with Submitter(plugin=plugin) as sub: + with Submitter(plugin="cf") as sub: shelly_call(submitter=sub, environment=docker) assert env_res == shelly_call.result().output.__dict__ diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index d2ed6b235d..b3b4f3db32 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -11,7 +11,7 @@ Runtime, Result, ShellSpec, - ContainerSpec, + # ContainerSpec, LazyIn, LazyOut, LazyField, @@ -51,17 +51,17 @@ def test_shellspec(): assert hasattr(spec, "args") -container_attrs = ["image", "container", "container_xargs"] - - -def test_container(): - with pytest.raises(TypeError): - spec = ContainerSpec() - spec = ContainerSpec( - executable="ls", image="busybox", container="docker" - ) # (execute, args, image, cont) - assert all([hasattr(spec, attr) for attr in container_attrs]) - assert hasattr(spec, "executable") +# container_attrs = ["image", "container", "container_xargs"] +# +# +# def test_container(): +# with pytest.raises(TypeError): +# spec = ContainerSpec() +# spec = ContainerSpec( +# executable="ls", image="busybox", container="docker" +# ) # (execute, args, image, cont) +# assert all([hasattr(spec, attr) for attr in container_attrs]) +# assert hasattr(spec, "executable") class NodeTesting: