diff --git a/.idea/step-pipeline.iml b/.idea/step-pipeline.iml index 68dcab2..af9384f 100644 --- a/.idea/step-pipeline.iml +++ b/.idea/step-pipeline.iml @@ -2,7 +2,7 @@ - + diff --git a/step_pipeline/pipeline.py b/step_pipeline/pipeline.py index f79353f..8cf93a9 100644 --- a/step_pipeline/pipeline.py +++ b/step_pipeline/pipeline.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod import configargparse +import hailtop.fs as hfs import os import random import re @@ -314,6 +315,7 @@ def _transfer_all_steps(self): step.visited = False # begin traversal of DAG + self._output_paths_to_download_when_done = [] while current_steps: for i, step in enumerate(current_steps): step.visited = True @@ -328,6 +330,11 @@ def _transfer_all_steps(self): step_counters[step.name] += 1 + for output_spec in step._output_specs: + if output_spec.download_to_dir: + self._output_paths_to_download_when_done.append( + (output_spec.output_path, output_spec.download_to_dir)) + skip_requested = any( getattr(args, skip_arg_name.replace("-", "_")) for skip_arg_name in step._skip_this_step_arg_names ) @@ -454,6 +461,24 @@ def _generate_post_to_slack_command(self, message, channel=None, slack_token=Non print(response.raw) EOF""" + def _download_output_files(self): + if self._output_paths_to_download_when_done: + for output_path, download_to_dir in self._output_paths_to_download_when_done: + os.makedirs(download_to_dir, exist_ok=True) + local_path = os.path.join(download_to_dir, os.path.basename(output_path)) + if not os.path.isfile(local_path): + try: + stats = hfs.stat(output_path) + except FileNotFoundError as e: + print(f"WARNING: Output file not found: {output_path}. Skipping..") + continue + + size_in_mb = stats.size/10**6 + print(f"Downloading {size_in_mb:0,.2f} Mb file to {local_path}: {output_path}") + hfs.copy(output_path, f"{local_path}.unfinished") + os.system(f"mv {local_path}.unfinished {local_path}") + + def precache_file_paths(self, glob_path): """This method is an alias for the check_input_glob(..) method""" @@ -886,7 +911,8 @@ def output_dir(self, path): """ self._output_dir = path - def output(self, local_path, output_path=None, output_dir=None, name=None, delocalize_by=None, optional=False): + def output(self, local_path, output_path=None, output_dir=None, name=None, delocalize_by=None, optional=False, + download_to_dir=None): """Specify a Step output file or directory. Args: @@ -903,6 +929,7 @@ def output(self, local_path, output_path=None, output_dir=None, name=None, deloc that didn't produce this output will still be skipped even if this output is missing. This is useful for modifying existing pipelines to output additional files (eg. log files) without this triggering a rerun of previously steps that completed previously without generating these files. + download_to_dir (str): If specified, the output will be downloaded to this directory on the local machine. Returns: OutputSpec: An object describing this output. @@ -919,6 +946,7 @@ def output(self, local_path, output_path=None, output_dir=None, name=None, deloc name=name, delocalize_by=delocalize_by, optional=optional, + download_to_dir=download_to_dir, ) self._preprocess_output_spec(output_spec) @@ -927,7 +955,7 @@ def output(self, local_path, output_path=None, output_dir=None, name=None, deloc return output_spec - def outputs(self, local_path, *local_paths, output_dir=None, name=None, delocalize_by=None): + def outputs(self, local_path, *local_paths, output_dir=None, name=None, delocalize_by=None, download_to_dir=None): """Define one or more outputs. Args: @@ -935,6 +963,7 @@ def outputs(self, local_path, *local_paths, output_dir=None, name=None, delocali output_dir (str): Optional destination directory to which the given local_path(s) should be delocalized. name (str): Optional name for the output(s). delocalize_by (Delocalize): How the path(s) should be delocalized. + download_to_dir (str): If specified, the outputs will be downloaded to this directory on the local machine. Returns: list: A list of OutputSpec objects that describe these outputs. The list will contain one entry for each passed-in path. @@ -946,7 +975,8 @@ def outputs(self, local_path, *local_paths, output_dir=None, name=None, delocali local_path, output_dir=output_dir, name=name, - delocalize_by=delocalize_by) + delocalize_by=delocalize_by, + download_to_dir=download_to_dir) output_specs.append(output_spec) @@ -961,6 +991,12 @@ def get_inputs(self): def get_outputs(self): return list(self._output_specs) + def get_output_paths_to_download_when_done(self): + return [ + (o.output_path, os.path.join(o.download_to_dir, os.path.basename(o.output_path))) + for o in self._output_specs if o.download_to_dir + ] + def depends_on(self, upstream_step): """Marks this Step as being downstream of another Step in the pipeline, meaning that this Step can only run after the upstream_step has completed successfully.