Skip to content

Commit

Permalink
implementing execute for the Docker env; changes to the ShellCommandT…
Browse files Browse the repository at this point in the history
…ask; adding tests (more tests needed)
  • Loading branch information
djarecka committed Feb 27, 2023
1 parent d4396df commit eb049eb
Show file tree
Hide file tree
Showing 7 changed files with 581 additions and 80 deletions.
17 changes: 12 additions & 5 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,13 @@ def cont_dim(self, cont_dim):
self._cont_dim = cont_dim

def __call__(
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
self,
submitter=None,
plugin=None,
plugin_kwargs=None,
rerun=False,
environment=None,
**kwargs,
):
"""Make tasks callable themselves."""
from .submitter import Submitter
Expand All @@ -447,9 +453,9 @@ def __call__(
if submitter:
with submitter as sub:
self.inputs = attr.evolve(self.inputs, **kwargs)
res = sub(self)
res = sub(self, environment=environment)
else: # tasks without state could be run without a submitter
res = self._run(rerun=rerun, **kwargs)
res = self._run(rerun=rerun, environment=environment, **kwargs)
return res

def _modify_inputs(self):
Expand Down Expand Up @@ -482,7 +488,7 @@ def _populate_filesystem(self, checksum, output_dir):
shutil.rmtree(output_dir)
output_dir.mkdir(parents=False, exist_ok=self.can_resume)

def _run(self, rerun=False, **kwargs):
def _run(self, rerun=False, environment=None, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()

Expand All @@ -499,6 +505,7 @@ def _run(self, rerun=False, **kwargs):
return result
cwd = os.getcwd()
self._populate_filesystem(checksum, output_dir)
os.chdir(output_dir)
orig_inputs = self._modify_inputs()
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
Expand All @@ -507,7 +514,7 @@ def _run(self, rerun=False, **kwargs):
self.audit.audit_task(task=self)
try:
self.audit.monitor()
self._run_task()
self._run_task(environment=environment)
result.output = self._collect_outputs(output_dir=output_dir)
except Exception:
etype, eval, etr = sys.exc_info()
Expand Down
47 changes: 32 additions & 15 deletions pydra/engine/environments.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .helpers import execute

from pathlib import Path


class Environment:
def setup(self):
Expand All @@ -14,44 +16,59 @@ def teardown(self):

class Native(Environment):
def execute(self, task):
args = task.render_arguments_in_root()
# breakpoint()
# args = task.render_arguments_in_root()
keys = ["return_code", "stdout", "stderr"]
values = execute(args, strip=task.strip)
values = execute(task.command_args(), strip=task.strip)
output = dict(zip(keys, values))
if output["return_code"]:
msg = f"Error running '{task.name}' task with {task.command_args()}:"
if output["stderr"]:
raise RuntimeError(output["stderr"])
else:
raise RuntimeError(output["stdout"])
msg += "\n\nstderr:\n" + output["stderr"]
if output["stdout"]:
msg += "\n\nstdout:\n" + output["stdout"]
raise RuntimeError(msg)
return output


class Docker(Environment):
def __init__(self, image, tag="latest"):
def __init__(
self, image, tag="latest", binds=None, output_cpath="/output_pydra", xargs=None
):
self.image = image
self.tag = tag
self.xargs = xargs
self.output_cpath = output_cpath

@staticmethod
def bind(loc, mode="ro"):
def bind(loc, mode="ro", root="/mnt/pydra"): # TODO
# XXX Failure mode: {loc} overwrites a critical directory in image
# To fix, we'll need to update any args within loc to a new location
# such as /mnt/pydra/loc
return f"{loc}:{loc}:{mode}"
loc_abs = Path(loc).absolute()
return f"{loc_abs}:{root}{loc_abs}:{mode}" # TODO: moving entire path?

def execute(self, task):
def execute(self, task, root="/mnt/pydra"):
# XXX Need to mount all input locations
docker_img = f"{self.image}:{self.tag}"
# Renders arguments where `File`s have an additional prefix
args = task.render_arguments_in_root("/mnt/pydra")
# TODO ?
# Skips over any inputs in task.cache_dir
# Needs to include `out_file`s when not relative to working dir
# Possibly a `TargetFile` type to distinguish between `File` and `str`?
mounts = task.get_inputs_in_root(root="/mnt/pydra")
mounts = task.get_inputs_in_root(root=root)

# todo adding xargsy etc
docker_args = ["docker", "run", "-v", self.bind(task.cache_dir, "rw")]
docker_args.extend(flatten(["-v", self.bind(mount)] for mount in mounts))
docker_args.extend(
" ".join([f"-v {self.bind(mount)}" for mount in mounts]).split()
)
keys = ["return_code", "stdout", "stderr"]
values = execute(docker_args + [docker_img] + args, strip=task.strip)
# print("\n Docker args", docker_args)

values = execute(
docker_args + [docker_img] + task.command_args(root="/mnt/pydra"),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
Expand All @@ -60,5 +77,5 @@ def execute(self, task):
raise RuntimeError(output["stdout"])
# Any outputs that have been created with a re-rooted path need
# to be de-rooted
task.finalize_outputs("/mnt/pydra")
# task.finalize_outputs("/mnt/pydra") TODO: probably don't need it
return output
15 changes: 9 additions & 6 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ def __init__(self, plugin="cf", **kwargs):
raise NotImplementedError(f"No worker for {self.plugin}")
self.worker.loop = self.loop

def __call__(self, runnable, cache_locations=None, rerun=False):
def __call__(self, runnable, cache_locations=None, rerun=False, environment=None):
"""Submitter run function."""
if cache_locations is not None:
runnable.cache_locations = cache_locations
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
self.loop.run_until_complete(
self.submit_from_call(runnable, rerun, environment)
)
return runnable.result()

async def submit_from_call(self, runnable, rerun):
async def submit_from_call(self, runnable, rerun, environment):
"""
This coroutine should only be called once per Submitter call,
and serves as the bridge between sync/async lands.
Expand All @@ -56,7 +58,7 @@ async def submit_from_call(self, runnable, rerun):
Once Python 3.10 is the minimum, this should probably be refactored into using
structural pattern matching.
"""
if is_workflow(runnable):
if is_workflow(runnable): # TODO: env to wf
# connect and calculate the checksum of the graph before running
runnable._connect_and_propagate_to_tasks(override_task_caches=True)
# 0
Expand All @@ -74,10 +76,11 @@ async def submit_from_call(self, runnable, rerun):
# 2
if runnable.state is None:
# run_el should always return a coroutine
await self.worker.run_el(runnable, rerun=rerun)
print("in SUBM", environment)
await self.worker.run_el(runnable, rerun=rerun, environment=environment)
# 3
else:
await self.expand_runnable(runnable, wait=True, rerun=rerun)
await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO
return True

async def expand_runnable(self, runnable, wait=False, rerun=False):
Expand Down
123 changes: 85 additions & 38 deletions pydra/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def __init__(

self.output_spec = output_spec

def _run_task(self):
def _run_task(self, environment=None):
inputs = attr.asdict(self.inputs, recurse=False)
del inputs["_func"]
self.output_ = None
Expand Down Expand Up @@ -253,7 +253,7 @@ def __init__(
output_spec: ty.Optional[SpecInfo] = None,
rerun=False,
strip=False,
environment=Native,
environment=Native(),
**kwargs,
):
"""
Expand Down Expand Up @@ -322,12 +322,8 @@ def __init__(
)
self.strip = strip
self.environment = environment

def render_arguments_in_root(self, root=None):
if root is None:
args = self.command_args
return [str(arg) for arg in args if arg not in ["", " "]]
raise NotImplementedError
self.bindings = {}
self.inputs_mod_root = {}

def get_inputs_in_root(self, root=None):
"""Take input files and return their location, re-rooted.
Expand All @@ -342,12 +338,16 @@ def get_inputs_in_root(self, root=None):
Returns
-------
inputs: list of str
File paths, adjusted to the root directory
File paths, needed to be exposed to the container
"""
orig_inputs = attr.asdict(self.inputs, recurse=False)

@property
def command_args(self):
if root is None:
return []
else:
self._check_inputs(root=root)
return self.bindings.keys()

def command_args(self, root=None):
"""Get command line arguments"""
if is_lazy(self.inputs):
raise Exception("can't return cmdline, self.inputs has LazyFields")
Expand Down Expand Up @@ -378,7 +378,10 @@ def command_args(self):
if pos_val:
pos_args.append(pos_val)
else:
pos_val = self._command_pos_args(field)
if name in modified_inputs:
pos_val = self._command_pos_args(field, root=root)
else:
pos_val = self._command_pos_args(field)
if pos_val:
pos_args.append(pos_val)

Expand Down Expand Up @@ -415,7 +418,7 @@ def _command_shelltask_args(self, field):
else:
return pos, ensure_list(value, tuple2list=True)

def _command_pos_args(self, field):
def _command_pos_args(self, field, root=None):
"""
Checking all additional input fields, setting pos to None, if position not set.
Creating a list with additional parts of the command that comes from
Expand Down Expand Up @@ -444,6 +447,13 @@ def _command_pos_args(self, field):
pos += 1 if pos >= 0 else -1

value = self._field_value(field, check_file=True)

if value:
if field.name in self.inputs_mod_root:
value = self.inputs_mod_root[field.name]
elif root: # values from templates
value = value.replace(str(self.output_dir), f"{root}{self.output_dir}")

if field.metadata.get("readonly", False) and value is not None:
raise Exception(f"{field.name} is read only, the value can't be provided")
elif (
Expand Down Expand Up @@ -536,10 +546,10 @@ def cmdline(self):
if self.state:
raise NotImplementedError
if isinstance(self, ContainerTask):
command_args = self.container_args + self.command_args
command_args = self.container_args + self.command_args()
else:
command_args = self.command_args
# Skip the executable, which can be a multipart command, e.g. 'docker run'.
command_args = self.command_args()
# Skip the executable, which can be a multi-part command, e.g. 'docker run'.
cmdline = command_args[0]
for arg in command_args[1:]:
# If there are spaces in the arg, and it is not enclosed by matching
Expand All @@ -552,28 +562,65 @@ def cmdline(self):
return cmdline

def _run_task(self, environment=None):
# if environment is None:
# environment = self.environment
# self.output_ = environment.execute(self)
#
# TEST: task.run(); task.output_ == environment.execute(task)
if isinstance(self, ContainerTask):
args = self.container_args + self.command_args
if environment is None:
environment = self.environment

if (
environment == "old"
): # TODO this is just temporarily for testing, remove this part
if isinstance(self, ContainerTask):
args = self.container_args + self.command_args()
else:
args = self.command_args()
if args:
# removing empty strings
args = [str(el) for el in args if el not in ["", " "]]
keys = ["return_code", "stdout", "stderr"]
values = execute(args, strip=self.strip)
self.output_ = dict(zip(keys, values))
if self.output_["return_code"]:
msg = f"Error running '{self.name}' task with {args}:"
if self.output_["stderr"]:
msg += "\n\nstderr:\n" + self.output_["stderr"]
if self.output_["stdout"]:
msg += "\n\nstdout:\n" + self.output_["stdout"]
raise RuntimeError(msg)
else:
args = self.command_args
if args:
# removing empty strings
args = [str(el) for el in args if el not in ["", " "]]
keys = ["return_code", "stdout", "stderr"]
values = execute(args, strip=self.strip)
self.output_ = dict(zip(keys, values))
if self.output_["return_code"]:
msg = f"Error running '{self.name}' task with {args}:"
if self.output_["stderr"]:
msg += "\n\nstderr:\n" + self.output_["stderr"]
if self.output_["stdout"]:
msg += "\n\nstdout:\n" + self.output_["stdout"]
raise RuntimeError(msg)
self.output_ = environment.execute(self)

def _check_inputs(self, root):
fields = attr_fields(self.inputs)
for fld in fields:
if (
fld.type in [File, Directory]
or "pydra.engine.specs.File" in str(fld.type)
or "pydra.engine.specs.Directory" in str(fld.type)
):
if fld.name == "image":
continue
file = Path(getattr(self.inputs, fld.name))
if fld.metadata.get("container_path"): # TODO: this should go..
# if the path is in a container the input should be treated as a str (hash as a str)
# field.type = "str"
# setattr(self, field.name, str(file))
pass
# if this is a local path, checking if the path exists
# TODO: if copyfile, ro -> rw
# TODO: what if it's a directory? add tests
elif file.exists(): # is it ok if two inputs have the same parent?
# todo: probably need only keys
self.bindings[Path(file.parent)] = (
Path(f"{root}{file.parent}"),
"ro",
)
self.inputs_mod_root[fld.name] = f"{root}{Path(file).absolute()}"
# error should be raised only if the type is strictly File or Directory
elif fld.type in [File, Directory]:
raise FileNotFoundError(
f"the file {file} from {fld.name} input does not exist, "
f"if the file comes from the container, "
f"use field.metadata['container_path']=True"
)


class ContainerTask(ShellCommandTask):
Expand Down
2 changes: 1 addition & 1 deletion pydra/engine/tests/test_dockertask.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_docker_1_nosubm():
no submitter
"""
cmd = "whoami"
docky = DockerTask(name="docky", executable=cmd, image="busybox")
docky = DockerTask(name="docky", executable=cmd, image="busybox", environment="old")
assert docky.inputs.image == "busybox"
assert docky.inputs.container == "docker"
assert (
Expand Down
Loading

0 comments on commit eb049eb

Please sign in to comment.