diff --git a/.gitignore b/.gitignore index b11afb7abb..5e6be3f819 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ venv* v3nv/ tmp/ /src/toil/test/cwl/spec* +/src/toil/test/wdl/wdl-conformance-tests /cwltool_deps/ /docs/generated_rst/ /docker/Dockerfile diff --git a/docs/conf.py b/docs/conf.py index a0b402ebf7..a26d25a07a 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -160,7 +160,8 @@ def setup(app): autoapi_dirs = ["../src/toil"] autodoc_typehints = "description" autoapi_keep_files = True -autoapi_ignore = ["*.pyi", "*/test/cwl/spec*/*.py", "*/fake_mpi_run.py", "*/tutorial_*.py", "*/example_*.py", "*/mkFile.py", "*/debugWorkflow.py"] +autoapi_ignore = ["*.pyi", "*/test/cwl/spec*/*.py", "*/fake_mpi_run.py", "*/tutorial_*.py", "*/example_*.py", "*/mkFile.py", "*/debugWorkflow.py", + "*/test/wdl/wdl-conformance-tests/*"] autoapi_options = [ "members", "undoc-members", diff --git a/src/toil/common.py b/src/toil/common.py index 393ce16f60..99b1fc9266 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -750,7 +750,7 @@ def addOptions( add_cwl_options(parser, suppress=not cwl) add_wdl_options(parser, suppress=not wdl) # Add shared runner options - add_runner_options(parser) + add_runner_options(parser, cwl=cwl, wdl=wdl) def check_arguments(typ: str) -> None: """ @@ -764,7 +764,7 @@ def check_arguments(typ: str) -> None: add_cwl_options(check_parser) if typ == "cwl": add_wdl_options(check_parser) - add_runner_options(check_parser) + for action in check_parser._actions: action.default = SUPPRESS other_options, _ = check_parser.parse_known_args( diff --git a/src/toil/cwl/cwltoil.py b/src/toil/cwl/cwltoil.py index feccb6793b..8d15b60f74 100644 --- a/src/toil/cwl/cwltoil.py +++ b/src/toil/cwl/cwltoil.py @@ -37,7 +37,22 @@ from collections.abc import Iterator, Mapping, MutableMapping, MutableSequence from tempfile import NamedTemporaryFile, TemporaryFile, gettempdir from threading import Thread -from typing import IO, Any, Callable, Literal, Optional, TextIO, TypeVar, Union, cast +from typing import ( + IO, + Any, + Callable, + Iterator, + Mapping, + MutableMapping, + MutableSequence, + Optional, + TextIO, + Tuple, + TypeVar, + Union, + cast, + Literal, Protocol, +) from urllib.parse import quote, unquote, urlparse, urlsplit import cwl_utils.errors @@ -109,14 +124,22 @@ from toil.exceptions import FailedJobsException from toil.fileStores import FileID from toil.fileStores.abstractFileStore import AbstractFileStore -from toil.job import AcceleratorRequirement, Job, Promise, Promised, unwrap +from toil.job import ( + AcceleratorRequirement, + Job, + Promise, + Promised, + unwrap, + ImportsJob, + get_file_sizes, +) from toil.jobStores.abstractJobStore import ( AbstractJobStore, + NoSuchFileException, InvalidImportExportUrlException, LocatorException, - NoSuchFileException, - UnimplementedURLException, ) +from toil.lib.exceptions import UnimplementedURLException from toil.jobStores.fileJobStore import FileJobStore from toil.jobStores.utils import JobStoreUnavailableException, generate_locator from toil.lib.io import mkdtemp @@ -1762,14 +1785,16 @@ def write_to_pipe( return schema_salad.ref_resolver.file_uri(src_path) -def write_file( - writeFunc: Callable[[str], FileID], +def convert_file_uri_to_toil_uri( + applyFunc: Callable[[str], FileID], index: dict[str, str], existing: dict[str, str], file_uri: str, ) -> str: """ - Write a file into the Toil jobstore. + Given a file URI, convert it to a toil file URI. Uses applyFunc to handle the conversion. + + Runs once on every unique file URI. 'existing' is a set of files retrieved as inputs from toil_get_file. This ensures they are mapped back as the same name if passed through. @@ -1786,12 +1811,8 @@ def write_file( else: file_uri = existing.get(file_uri, file_uri) if file_uri not in index: - if not urlparse(file_uri).scheme: - rp = os.path.realpath(file_uri) - else: - rp = file_uri try: - index[file_uri] = "toilfile:" + writeFunc(rp).pack() + index[file_uri] = "toilfile:" + applyFunc(file_uri).pack() existing[index[file_uri]] = file_uri except Exception as e: logger.error("Got exception '%s' while copying '%s'", e, file_uri) @@ -1810,8 +1831,77 @@ def path_to_loc(obj: CWLObjectType) -> None: del obj["path"] -def import_files( - import_function: Callable[[str], FileID], +def extract_file_uri_once( + fileindex: dict[str, str], + existing: dict[str, str], + file_metadata: CWLObjectType, + mark_broken: bool = False, + skip_remote: bool = False, +) -> Optional[str]: + """ + Extract the filename from a CWL file record. + + This function matches the predefined function signature in visit_files, which ensures + that this function is called on all files inside a CWL object. + + Ensures no duplicate files are returned according to fileindex. If a file has not been resolved already (and had file:// prepended) + then resolve symlinks. + :param fileindex: Forward mapping of filename + :param existing: Reverse mapping of filename. This function does not use this + :param file_metadata: CWL file record + :param mark_broken: Whether files should be marked as missing + :param skip_remote: Whether to skip remote files + :return: + """ + location = cast(str, file_metadata["location"]) + if ( + location.startswith("toilfile:") + or location.startswith("toildir:") + or location.startswith("_:") + ): + return None + if location in fileindex: + file_metadata["location"] = fileindex[location] + return None + if not location and file_metadata["path"]: + file_metadata["location"] = location = schema_salad.ref_resolver.file_uri( + cast(str, file_metadata["path"]) + ) + if location.startswith("file://") and not os.path.isfile( + schema_salad.ref_resolver.uri_file_path(location) + ): + if mark_broken: + logger.debug("File %s is missing", file_metadata) + file_metadata["location"] = location = MISSING_FILE + else: + raise cwl_utils.errors.WorkflowException( + "File is missing: %s" % file_metadata + ) + if location.startswith("file://") or not skip_remote: + # This is a local file or a remote file + if location not in fileindex: + # These dictionaries are meant to keep track of what we're going to import + # In the actual import, this is used as a bidirectional mapping from unvirtualized to virtualized + # For this case, keep track of the files to prevent returning duplicate files + # see write_file + + # If there is not a scheme, this file has not been resolved yet or is a URL. + if not urlparse(location).scheme: + rp = os.path.realpath(location) + else: + rp = location + return rp + return None + +V = TypeVar("V", covariant=True) + +class VisitFunc(Protocol[V]): + def __call__(self, fileindex: dict[str, str], existing: dict[str, str], + file_metadata: CWLObjectType, mark_broken: bool, + skip_remote: bool) -> V: ... + +def visit_files( + func: VisitFunc[V], fs_access: StdFsAccess, fileindex: dict[str, str], existing: dict[str, str], @@ -1819,8 +1909,7 @@ def import_files( mark_broken: bool = False, skip_remote: bool = False, bypass_file_store: bool = False, - log_level: int = logging.DEBUG, -) -> None: +) -> list[V]: """ Prepare all files and directories. @@ -1866,18 +1955,12 @@ def import_files( :param log_level: Log imported files at the given level. """ + func_return: list[Any] = list() tool_id = cwl_object.get("id", str(cwl_object)) if cwl_object else "" logger.debug("Importing files for %s", tool_id) logger.debug("Importing files in %s", cwl_object) - def import_and_log(url: str) -> FileID: - """ - Upload a file and log that we are doing so. - """ - logger.log(log_level, "Loading %s...", url) - return import_function(url) - # We need to upload all files to the Toil filestore, and encode structure # recursively into all Directories' locations. But we cannot safely alter # the listing fields of Directory objects, because the handling required by @@ -1895,7 +1978,7 @@ def import_and_log(url: str) -> FileID: if bypass_file_store: # Don't go on to actually import files or encode contents for # directories. - return + return func_return # Otherwise we actually want to put the things in the file store. @@ -1973,15 +2056,15 @@ def visit_file_or_directory_up( # This is a CWL File result: DirectoryContents = {} - - # Upload the file itself, which will adjust its location. - upload_file( - import_and_log, - fileindex, - existing, - rec, - mark_broken=mark_broken, - skip_remote=skip_remote, + # Run a function on the file and store the return + func_return.append( + func( + fileindex, + existing, + rec, + mark_broken=mark_broken, + skip_remote=skip_remote, + ) ) # Make a record for this file under its name @@ -2025,6 +2108,7 @@ def visit_file_or_directory_up( visit_file_or_directory_down, visit_file_or_directory_up, ) + return func_return def upload_directory( @@ -2083,8 +2167,8 @@ def upload_directory( directory_metadata["location"] = encode_directory(directory_contents) -def upload_file( - uploadfunc: Callable[[str], FileID], +def extract_and_convert_file_to_toil_uri( + convertfunc: Callable[[str], FileID], fileindex: dict[str, str], existing: dict[str, str], file_metadata: CWLObjectType, @@ -2092,46 +2176,22 @@ def upload_file( skip_remote: bool = False, ) -> None: """ - Update a file object so that the file will be accessible from another machine. + Extract the file URI out of a file object and convert it to a Toil URI. + + Runs convertfunc on the file URI to handle conversion. - Uploads local files to the Toil file store, and sets their location to a - reference to the toil file store. + Is used to handle importing files into the jobstore. If a file doesn't exist, fails with an error, unless mark_broken is set, in which case the missing file is given a special sentinel location. - Unless skip_remote is set, downloads remote files into the file store and - sets their locations to references into the file store as well. + Unless skip_remote is set, also run on remote files and sets their locations + to toil URIs as well. """ - location = cast(str, file_metadata["location"]) - if ( - location.startswith("toilfile:") - or location.startswith("toildir:") - or location.startswith("_:") - ): - return - if location in fileindex: - file_metadata["location"] = fileindex[location] - return - if not location and file_metadata["path"]: - file_metadata["location"] = location = schema_salad.ref_resolver.file_uri( - cast(str, file_metadata["path"]) - ) - if location.startswith("file://") and not os.path.isfile( - schema_salad.ref_resolver.uri_file_path(location) - ): - if mark_broken: - logger.debug("File %s is missing", file_metadata) - file_metadata["location"] = location = MISSING_FILE - else: - raise cwl_utils.errors.WorkflowException( - "File is missing: %s" % file_metadata - ) - - if location.startswith("file://") or not skip_remote: - # This is a local file, or we also need to download and re-upload remote files - file_metadata["location"] = write_file( - uploadfunc, fileindex, existing, location + location = extract_file_uri_once(fileindex, existing, file_metadata, mark_broken, skip_remote) + if location is not None: + file_metadata["location"] = convert_file_uri_to_toil_uri( + convertfunc, fileindex, existing, location ) logger.debug("Sending file at: %s", file_metadata["location"]) @@ -2832,12 +2892,16 @@ def run(self, file_store: AbstractFileStore) -> Any: fs_access = runtime_context.make_fs_access(runtime_context.basedir) # And a file importer that can go from a file:// URI to a Toil FileID - file_import_function = functools.partial(writeGlobalFileWrapper, file_store) + def file_import_function(url: str, log_level: int = logging.DEBUG) -> FileID: + logger.log(log_level, "Loading %s...", url) + return writeGlobalFileWrapper(file_store, url) + + file_upload_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function) # Upload all the Files and set their and the Directories' locations, if # needed. - import_files( - file_import_function, + visit_files( + file_upload_function, fs_access, index, existing, @@ -2883,9 +2947,10 @@ def makeRootJob( :return: """ - if options.run_imports_on_workers: - import_job = CWLImportJob(initialized_job_order, tool, runtime_context, options) + import_job = CWLImportWrapper( + initialized_job_order, tool, runtime_context, options + ) return import_job else: import_workflow_inputs( @@ -2894,11 +2959,11 @@ def makeRootJob( initialized_job_order=initialized_job_order, tool=tool, ) - rootJob, followOn = makeJob( + root_job, followOn = makeJob( tool, jobobj, runtime_context, None, None ) # toplevel, no name needed - rootJob.cwljob = initialized_job_order - return rootJob + root_job.cwljob = initialized_job_order + return root_job def makeJob( @@ -3448,41 +3513,89 @@ def run( return UnresolvedDict(outobj) -class CWLSetupJob(CWLNamedJob): - """ - Job to take a CWL tool and job order with all files imported and makes a CWLWorkflow as a child to run it. - """ - +class CWLInstallImportsJob(Job): def __init__( self, initialized_job_order: Promised[CWLObjectType], tool: Promised[Process], - runtime_context: cwltool.context.RuntimeContext, - ): - super().__init__() + basedir: str, + skip_remote: bool, + bypass_file_store: bool, + import_data: Promised[dict[str, FileID]], + **kwargs: Any, + ) -> None: + """ + Job to take the entire CWL object and a mapping of filenames to the imported URIs + to convert all file locations to URIs. + + This class is only used when runImportsOnWorkers is enabled. + """ + super().__init__(local=True, **kwargs) self.initialized_job_order = initialized_job_order self.tool = tool - self.runtime_context = runtime_context + self.basedir = basedir + self.skip_remote = skip_remote + self.bypass_file_store = bypass_file_store + self.import_data = import_data - def run(self, file_store: AbstractFileStore) -> Any: + def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]: """ - :return: Returns a CWL object that represents the output of the workflow. + Convert the filenames in the workflow inputs into the URIs + :return: Promise of transformed workflow inputs. A tuple of the job order and process """ + candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data) + initialized_job_order = unwrap(self.initialized_job_order) tool = unwrap(self.tool) - root_job, _ = makeJob( - tool, initialized_job_order, self.runtime_context, None, None + + def convert_file(filename: str) -> FileID: + fileid = candidate_to_fileid[filename] + return fileid + + file_convert_function = functools.partial(extract_and_convert_file_to_toil_uri, convert_file) + fs_access = ToilFsAccess(self.basedir) + fileindex: dict[str, str] = {} + existing: dict[str, str] = {} + visit_files( + file_convert_function, + fs_access, + fileindex, + existing, + initialized_job_order, + mark_broken=True, + skip_remote=self.skip_remote, + bypass_file_store=self.bypass_file_store, + ) + visitSteps( + tool, + functools.partial( + visit_files, + file_convert_function, + fs_access, + fileindex, + existing, + mark_broken=True, + skip_remote=self.skip_remote, + bypass_file_store=self.bypass_file_store, + ), ) - self.addChild(root_job) - root_job.cwljob = initialized_job_order + # We always expect to have processed all files that exist + for param_name, param_value in initialized_job_order.items(): + # Loop through all the parameters for the workflow overall. + # Drop any files that aren't either imported (for when we use + # the file store) or available on disk (for when we don't). + # This will properly make them cause an error later if they + # were required. + rm_unprocessed_secondary_files(param_value) - return root_job.rv() + return tool, initialized_job_order -class CWLImportJob(CWLNamedJob): +class CWLImportWrapper(CWLNamedJob): """ - Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible. + Job to organize importing files on workers instead of the leader. Responsible for extracting filenames and metadata, + calling ImportsJob, applying imports to the job objects, and scheduling the start workflow job This class is only used when runImportsOnWorkers is enabled. """ @@ -3494,25 +3607,115 @@ def __init__( runtime_context: cwltool.context.RuntimeContext, options: Namespace, ): - super().__init__(local=False, disk=options.import_workers_disk) + super().__init__(local=False, disk=options.import_workers_threshold) self.initialized_job_order = initialized_job_order self.tool = tool self.options = options self.runtime_context = runtime_context def run(self, file_store: AbstractFileStore) -> Any: - """ - Import the workflow inputs and then create and run the workflow. - :return: Promise of workflow outputs - """ - import_workflow_inputs( - file_store.jobStore, self.options, self.initialized_job_order, self.tool + filenames = extract_workflow_inputs( + self.options, self.initialized_job_order, self.tool ) - setup_job = CWLSetupJob( - self.initialized_job_order, self.tool, self.runtime_context + file_to_data = get_file_sizes( + filenames, file_store.jobStore, include_remote_files=self.options.reference_inputs + ) + imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold, self.options.import_workers_disk) + self.addChild(imports_job) + install_imports_job = CWLInstallImportsJob( + initialized_job_order=self.initialized_job_order, + tool=self.tool, + basedir=self.options.basedir, + skip_remote=self.options.reference_inputs, + bypass_file_store=self.options.bypass_file_store, + import_data=imports_job.rv(0), + ) + self.addChild(install_imports_job) + imports_job.addFollowOn(install_imports_job) + + start_job = CWLStartJob( + install_imports_job.rv(0), install_imports_job.rv(1), runtime_context=self.runtime_context ) - self.addChild(setup_job) - return setup_job.rv() + self.addChild(start_job) + install_imports_job.addFollowOn(start_job) + + return start_job.rv() + + +class CWLStartJob(CWLNamedJob): + """ + Job responsible for starting the CWL workflow. + + Takes in the workflow/tool and inputs after all files are imported + and creates jobs to run those workflows. + """ + + def __init__( + self, + tool: Promised[Process], + initialized_job_order: Promised[CWLObjectType], + runtime_context: cwltool.context.RuntimeContext, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.tool = tool + self.initialized_job_order = initialized_job_order + self.runtime_context = runtime_context + + def run(self, file_store: AbstractFileStore) -> Any: + initialized_job_order = unwrap(self.initialized_job_order) + tool = unwrap(self.tool) + cwljob, _ = makeJob( + tool, initialized_job_order, self.runtime_context, None, None + ) # toplevel, no name needed + cwljob.cwljob = initialized_job_order + self.addChild(cwljob) + return cwljob.rv() + + +def extract_workflow_inputs( + options: Namespace, initialized_job_order: CWLObjectType, tool: Process +) -> list[str]: + """ + Collect all the workflow input files to import later. + :param options: namespace + :param initialized_job_order: cwl object + :param tool: tool object + :return: + """ + fileindex: dict[str, str] = {} + existing: dict[str, str] = {} + + # Extract out all the input files' filenames + logger.info("Collecting input files...") + fs_access = ToilFsAccess(options.basedir) + filenames = visit_files( + extract_file_uri_once, + fs_access, + fileindex, + existing, + initialized_job_order, + mark_broken=True, + skip_remote=options.reference_inputs, + bypass_file_store=options.bypass_file_store, + ) + # Extract filenames of all the files associated with tools (binaries, etc.). + logger.info("Collecting tool-associated files...") + tool_filenames = visitSteps( + tool, + functools.partial( + visit_files, + extract_file_uri_once, + fs_access, + fileindex, + existing, + mark_broken=True, + skip_remote=options.reference_inputs, + bypass_file_store=options.bypass_file_store, + ), + ) + filenames.extend(tool_filenames) + return [file for file in filenames if file is not None] def import_workflow_inputs( @@ -3520,25 +3723,34 @@ def import_workflow_inputs( options: Namespace, initialized_job_order: CWLObjectType, tool: Process, + log_level: int = logging.DEBUG, ) -> None: + """ + Import all workflow inputs on the leader. + + Ran when not importing on workers. + :param jobstore: Toil jobstore + :param options: Namespace + :param initialized_job_order: CWL object + :param tool: CWL tool + :param log_level: log level + :return: + """ fileindex: dict[str, str] = {} existing: dict[str, str] = {} + # Define something we can call to import a file and get its file # ID. - # We cast this because import_file is overloaded depending on if we - # pass a shared file name or not, and we know the way we call it we - # always get a FileID out. - input_import_function = cast( - Callable[[str], FileID], - functools.partial(jobstore.import_file, symlink=True), - ) - + def file_import_function(url: str) -> FileID: + logger.log(log_level, "Loading %s...", url) + return jobstore.import_file(url, symlink=True) + import_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function) # Import all the input files, some of which may be missing optional # files. logger.info("Importing input files...") fs_access = ToilFsAccess(options.basedir) - import_files( - input_import_function, + visit_files( + import_function, fs_access, fileindex, existing, @@ -3546,7 +3758,6 @@ def import_workflow_inputs( mark_broken=True, skip_remote=options.reference_inputs, bypass_file_store=options.bypass_file_store, - log_level=logging.INFO, ) # Make another function for importing tool files. This one doesn't allow @@ -3564,7 +3775,7 @@ def import_workflow_inputs( visitSteps( tool, functools.partial( - import_files, + visit_files, tool_import_function, fs_access, fileindex, @@ -3572,7 +3783,6 @@ def import_workflow_inputs( mark_broken=True, skip_remote=options.reference_inputs, bypass_file_store=options.bypass_file_store, - log_level=logging.INFO, ), ) @@ -3586,10 +3796,11 @@ def import_workflow_inputs( rm_unprocessed_secondary_files(param_value) +T = TypeVar("T") def visitSteps( cmdline_tool: Process, - op: Callable[[CommentedMap], None], -) -> None: + op: Callable[[CommentedMap], list[T]], +) -> list[T]: """ Iterate over a CWL Process object, running the op on each tool description CWL object. @@ -3598,13 +3809,15 @@ def visitSteps( # For workflows we need to dispatch on steps for step in cmdline_tool.steps: # Handle the step's tool - op(step.tool) + ret = op(step.tool) # Recures on the embedded tool; maybe it's a workflow. - visitSteps(step.embedded_tool, op) + recurse_ret = visitSteps(step.embedded_tool, op) + ret.extend(recurse_ret) + return ret elif isinstance(cmdline_tool, cwltool.process.Process): # All CWL Process objects (including CommandLineTool) will have tools # if they bothered to run the Process __init__. - op(cmdline_tool.tool) + return op(cmdline_tool.tool) else: raise RuntimeError( f"Unsupported type encountered in workflow " @@ -3612,6 +3825,7 @@ def visitSteps( ) + def rm_unprocessed_secondary_files(job_params: Any) -> None: if isinstance(job_params, list): for j in job_params: @@ -3906,13 +4120,6 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int: options = get_options(args) - # Take care of incompatible arguments related to file imports - if options.run_imports_on_workers is True and options.import_workers_disk is None: - logger.error( - "Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers." - ) - return 1 - # Do cwltool setup cwltool.main.setup_schema(args=options, custom_schema_callback=None) tmpdir_prefix = options.tmpdir_prefix = ( diff --git a/src/toil/job.py b/src/toil/job.py index 2a7e56c6b1..3f0226899b 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + import collections import copy import importlib @@ -25,26 +27,37 @@ import uuid from abc import ABCMeta, abstractmethod from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace -from collections.abc import Iterator, Mapping, Sequence from contextlib import contextmanager from io import BytesIO from typing import ( TYPE_CHECKING, Any, Callable, - Literal, + Dict, + Iterator, + List, + Mapping, NamedTuple, Optional, - TypedDict, + Sequence, + Tuple, TypeVar, Union, cast, overload, + TypedDict, + Literal, ) +from urllib.error import HTTPError +from urllib.parse import urlsplit, unquote, urljoin + +from toil import memoize import dill from configargparse import ArgParser +from toil.lib.io import is_remote_url + if sys.version_info < (3, 11): from typing_extensions import NotRequired else: @@ -61,10 +74,14 @@ from toil.resource import ModuleDescriptor from toil.statsAndLogging import set_logging_from_options +from toil.lib.exceptions import UnimplementedURLException + if TYPE_CHECKING: from optparse import OptionParser - from toil.batchSystems.abstractBatchSystem import BatchJobExitReason + from toil.batchSystems.abstractBatchSystem import ( + BatchJobExitReason + ) from toil.fileStores.abstractFileStore import AbstractFileStore from toil.jobStores.abstractJobStore import AbstractJobStore @@ -3758,6 +3775,350 @@ def getUserScript(self): return self.serviceModule +class FileMetadata(NamedTuple): + """ + Metadata for a file. + source is the URL to grab the file from + parent_dir is parent directory of the source + size is the size of the file. Is none if the filesize cannot be retrieved. + """ + + source: str + parent_dir: str + size: Optional[int] + + +def potential_absolute_uris( + uri: str, + path: list[str], + importer: Optional[str] = None, + execution_dir: Optional[str] = None, +) -> Iterator[str]: + """ + Get potential absolute URIs to check for an imported file. + + Given a URI or bare path, yield in turn all the URIs, with schemes, where we + should actually try to find it, given that we want to search under/against + the given paths or URIs, the current directory, and the given importing WDL + document if any. + """ + + if uri == "": + # Empty URIs can't come from anywhere. + return + + # We need to brute-force find this URI relative to: + # + # 1. Itself if a full URI. + # + # 2. Importer's URL, if importer is a URL and this is a + # host-root-relative URL starting with / or scheme-relative + # starting with //, or just plain relative. + # + # 3. Current directory, if a relative path. + # + # 4. All the prefixes in "path". + # + # If it can't be found anywhere, we ought to (probably) throw + # FileNotFoundError like the MiniWDL implementation does, with a + # correct errno. + # + # To do this, we have AbstractFileStore.read_from_url, which can read a + # URL into a binary-mode writable, or throw some kind of unspecified + # exception if the source doesn't exist or can't be fetched. + + # This holds scheme-applied full URIs for all the places to search. + full_path_list = [] + + if importer is not None: + # Add the place the imported file came form, to search first. + full_path_list.append(Toil.normalize_uri(importer)) + + # Then the current directory. We need to make sure to include a filename component here or it will treat the current directory with no trailing / as a document and relative paths will look 1 level up. + # When importing on a worker, the cwd will be a tmpdir and will result in FileNotFoundError after os.path.abspath, so override with the execution dir + full_path_list.append(Toil.normalize_uri(execution_dir or ".") + "/.") + + # Then the specified paths. + # TODO: + # https://github.com/chanzuckerberg/miniwdl/blob/e3e8ef74e80fbe59f137b0ad40b354957915c345/WDL/Tree.py#L1479-L1482 + # seems backward actually and might do these first! + full_path_list += [Toil.normalize_uri(p) for p in path] + + # This holds all the URIs we tried and failed with. + failures: set[str] = set() + + for candidate_base in full_path_list: + # Try fetching based off each base URI + candidate_uri = urljoin(candidate_base, uri) + if candidate_uri in failures: + # Already tried this one, maybe we have an absolute uri input. + continue + logger.debug( + "Consider %s which is %s off of %s", candidate_uri, uri, candidate_base + ) + + # Try it + yield candidate_uri + # If we come back it didn't work + failures.add(candidate_uri) + + +def get_file_sizes( + filenames: List[str], + file_source: AbstractJobStore, + search_paths: Optional[List[str]] = None, + include_remote_files: bool = True, + execution_dir: Optional[str] = None, +) -> Dict[str, FileMetadata]: + """ + Resolve relative-URI files in the given environment and turn them into absolute normalized URIs. Returns a dictionary of the *string values* from the WDL file values + to a tuple of the normalized URI, parent directory ID, and size of the file. The size of the file may be None, which means unknown size. + + :param filenames: list of filenames to evaluate on + :param file_source: Context to search for files with + :param task_path: Dotted WDL name of the user-level code doing the + importing (probably the workflow name). + :param search_paths: If set, try resolving input location relative to the URLs or + directories in this list. + :param include_remote_files: If set, import files from remote locations. Else leave them as URI references. + """ + + @memoize + def get_filename_size(filename: str) -> FileMetadata: + tried = [] + for candidate_uri in potential_absolute_uris( + filename, + search_paths if search_paths is not None else [], + execution_dir=execution_dir, + ): + tried.append(candidate_uri) + try: + if not include_remote_files and is_remote_url(candidate_uri): + # Use remote URIs in place. But we need to find the one that exists. + if not file_source.url_exists(candidate_uri): + # Wasn't found there + continue + + # Now we know this exists, so pass it through + # Get filesizes + filesize = file_source.get_size(candidate_uri) + + except UnimplementedURLException as e: + # We can't find anything that can even support this URL scheme. + # Report to the user, they are probably missing an extra. + logger.critical("Error: " + str(e)) + raise + except HTTPError as e: + # Something went wrong looking for it there. + logger.warning( + "Checked URL %s but got HTTP status %s", candidate_uri, e.code + ) + # Try the next location. + continue + except FileNotFoundError: + # Wasn't found there + continue + except Exception: + # Something went wrong besides the file not being found. Maybe + # we have no auth. + logger.error( + "Something went wrong when testing for existence of %s", + candidate_uri, + ) + raise + + # Work out what the basename for the file was + file_basename = os.path.basename(urlsplit(candidate_uri).path) + + if file_basename == "": + # We can't have files with no basename because we need to + # download them at that basename later in WDL. + raise RuntimeError( + f"File {candidate_uri} has no basename" + ) + + # Was actually found + if is_remote_url(candidate_uri): + # Might be a file URI or other URI. + # We need to make sure file URIs and local paths that point to + # the same place are treated the same. + parsed = urlsplit(candidate_uri) + if parsed.scheme == "file:": + # This is a local file URI. Convert to a path for source directory tracking. + parent_dir = os.path.dirname(unquote(parsed.path)) + else: + # This is some other URL. Get the URL to the parent directory and use that. + parent_dir = urljoin(candidate_uri, ".") + else: + # Must be a local path + parent_dir = os.path.dirname(candidate_uri) + + return cast(FileMetadata, (candidate_uri, parent_dir, filesize)) + # Not found + raise RuntimeError( + f"Could not find {filename} at any of: {list(potential_absolute_uris(filename, search_paths if search_paths is not None else []))}" + ) + + return {k: get_filename_size(k) for k in filenames} + + +class CombineImportsJob(Job): + """ + Combine the outputs of multiple WorkerImportsJob into one promise + """ + + def __init__(self, d: Sequence[Promised[Dict[str, FileID]]], **kwargs): + """ + :param d: Sequence of dictionaries to merge + """ + self._d = d + super().__init__(**kwargs) + + def run(self, file_store: AbstractFileStore) -> Promised[Dict[str, FileID]]: + """ + Merge the dicts + """ + d = unwrap_all(self._d) + return {k: v for item in d for k, v in item.items()} + + +class WorkerImportJob(Job): + """ + Job to do file imports on a worker instead of a leader. Assumes all local and cloud files are accessible. + + For the CWL/WDL runners, this class is only used when runImportsOnWorkers is enabled. + """ + + def __init__( + self, + filenames: List[str], + **kwargs: Any + ): + """ + Setup importing files on a worker. + :param filenames: List of file URIs to import + :param kwargs: args for the superclass + """ + self.filenames = filenames + super().__init__(local=False, **kwargs) + + @staticmethod + def import_files( + files: List[str], file_source: "AbstractJobStore" + ) -> Dict[str, FileID]: + """ + Import a list of files into the jobstore. Returns a mapping of the filename to the associated FileIDs + + When stream is true but the import is not streamable, the worker will run out of + disk space and run a new import job with enough disk space instead. + :param files: list of files to import + :param file_source: AbstractJobStore + :return: Dictionary mapping filenames to associated jobstore FileID + """ + # todo: make the import ensure streaming is done instead of relying on running out of disk space + path_to_fileid = {} + + @memoize + def import_filename(filename: str) -> Optional[FileID]: + return file_source.import_file(filename, symlink=True) + + for file in files: + imported = import_filename(file) + if imported is not None: + path_to_fileid[file] = imported + return path_to_fileid + + def run(self, file_store: AbstractFileStore) -> Promised[Dict[str, FileID]]: + """ + Import the workflow inputs and then create and run the workflow. + :return: Promise of workflow outputs + """ + return self.import_files(self.filenames, file_store.jobStore) + + +class ImportsJob(Job): + """ + Job to organize and delegate files to individual WorkerImportJobs. + + For the CWL/WDL runners, this is only used when runImportsOnWorkers is enabled + """ + + def __init__( + self, + file_to_data: Dict[str, FileMetadata], + max_batch_size: ParseableIndivisibleResource, + import_worker_disk: ParseableIndivisibleResource, + **kwargs: Any, + ): + """ + Job to take the inputs for a workflow and import them on a worker instead of a leader. Assumes all local and cloud files are accessible. + + This class is only used when runImportsOnWorkers is enabled. + + :param file_to_data: mapping of file source name to file metadata + :param max_batch_size: maximum cumulative file size of a batched import + """ + super().__init__(local=True, **kwargs) + self._file_to_data = file_to_data + self._max_batch_size = max_batch_size + self._import_worker_disk = import_worker_disk + + def run( + self, file_store: AbstractFileStore + ) -> Tuple[Promised[Dict[str, FileID]], Dict[str, FileMetadata]]: + """ + Import the workflow inputs and then create and run the workflow. + :return: Tuple of a mapping from the candidate uri to the file id and a mapping of the source filenames to its metadata. The candidate uri is a field in the file metadata + """ + max_batch_size = self._max_batch_size + file_to_data = self._file_to_data + # Run WDL imports on a worker instead + + filenames = list(file_to_data.keys()) + + import_jobs = [] + + # This list will hold lists of batched filenames + file_batches = [] + + # List of filenames for each batch + per_batch_files = [] + per_batch_size = 0 + while len(filenames) > 0: + filename = filenames.pop(0) + # See if adding this to the queue will make the batch job too big + filesize = file_to_data[filename][2] + if per_batch_size + filesize >= max_batch_size: + # batch is too big now, store to schedule the batch + if len(per_batch_files) == 0: + # schedule the individual file + per_batch_files.append(filename) + file_batches.append(per_batch_files) + # reset batching calculation + per_batch_size = 0 + else: + per_batch_size += filesize + per_batch_files.append(filename) + + if per_batch_files: + file_batches.append(per_batch_files) + + # Create batch import jobs for each group of files + for batch in file_batches: + candidate_uris = [file_to_data[filename][0] for filename in batch] + import_jobs.append(WorkerImportJob(candidate_uris, disk=self._import_worker_disk)) + + for job in import_jobs: + self.addChild(job) + + combine_imports_job = CombineImportsJob([job.rv() for job in import_jobs]) + for job in import_jobs: + job.addFollowOn(combine_imports_job) + self.addChild(combine_imports_job) + + return combine_imports_job.rv(), file_to_data + + class Promise: """ References a return value from a method as a *promise* before the method itself is run. diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index dc3bdb2cf7..6e5f4a3b15 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -48,7 +48,7 @@ ) from toil.lib.ftp_utils import FtpFsAccess from toil.lib.compatibility import deprecated -from toil.lib.conversions import strtobool +from toil.lib.exceptions import UnimplementedURLException from toil.lib.io import WriteWatchingStream from toil.lib.memoize import memoize from toil.lib.retry import ErrorCondition, retry @@ -85,23 +85,6 @@ def __init__(self, url: ParseResult) -> None: super().__init__("The URL '%s' is invalid." % url.geturl()) -class UnimplementedURLException(RuntimeError): - def __init__(self, url: ParseResult, operation: str) -> None: - """ - Make a new exception to report that a URL scheme is not implemented, or - that the implementation can't be loaded because its dependencies are - not installed. - - :param url: The given URL - :param operation: Whether we are trying to 'import' or 'export' - """ - super().__init__( - f"No available job store implementation can {operation} the URL " - f"'{url.geturl()}'. Ensure Toil has been installed " - f"with the appropriate extras." - ) - - class NoSuchJobException(Exception): """Indicates that the specified job does not exist.""" diff --git a/src/toil/lib/exceptions.py b/src/toil/lib/exceptions.py index 8140838939..0083d3bf5a 100644 --- a/src/toil/lib/exceptions.py +++ b/src/toil/lib/exceptions.py @@ -15,6 +15,7 @@ # 5.14.2018: copied into Toil from https://github.com/BD2KGenomics/bd2k-python-lib import sys +from urllib.parse import ParseResult # TODO: isn't this built in to Python 3 now? @@ -61,3 +62,20 @@ def raise_(exc_type, exc_value, traceback) -> None: if exc.__traceback__ is not traceback: raise exc.with_traceback(traceback) raise exc + + +class UnimplementedURLException(RuntimeError): + def __init__(self, url: ParseResult, operation: str) -> None: + """ + Make a new exception to report that a URL scheme is not implemented, or + that the implementation can't be loaded because its dependencies are + not installed. + + :param url: The given URL + :param operation: Whether we are trying to 'import' or 'export' + """ + super().__init__( + f"No available job store implementation can {operation} the URL " + f"'{url.geturl()}'. Ensure Toil has been installed " + f"with the appropriate extras." + ) diff --git a/src/toil/lib/io.py b/src/toil/lib/io.py index 9bd74efa55..b99113c053 100644 --- a/src/toil/lib/io.py +++ b/src/toil/lib/io.py @@ -14,6 +14,47 @@ logger = logging.getLogger(__name__) +TOIL_URI_SCHEME = "toilfile:" + + +STANDARD_SCHEMES = ["http:", "https:", "s3:", "gs:", "ftp:"] +REMOTE_SCHEMES = STANDARD_SCHEMES + [TOIL_URI_SCHEME] +ALL_SCHEMES = REMOTE_SCHEMES + ["file:"] + +def is_standard_url(filename: str) -> bool: + return is_url_with_scheme(filename, STANDARD_SCHEMES) + +def is_remote_url(filename: str) -> bool: + """ + Decide if a filename is a known, non-file kind of URL + """ + return is_url_with_scheme(filename, REMOTE_SCHEMES) + +def is_any_url(filename: str) -> bool: + """ + Decide if a string is a URI like http:// or file://. + + Otherwise it might be a bare path. + """ + return is_url_with_scheme(filename, ALL_SCHEMES) + +def is_url_with_scheme(filename: str, schemes: list[str]) -> bool: + """ + Return True if filename is a URL with any of the given schemes and False otherwise. + """ + # TODO: "http:myfile.dat" is a valid filename and *not* a valid URL + for scheme in schemes: + if filename.startswith(scheme): + return True + return False + +def is_toil_url(filename: str) -> bool: + return is_url_with_scheme(filename, [TOIL_URI_SCHEME]) + +def is_file_url(filename: str) -> bool: + return is_url_with_scheme(filename, ["file:"]) + + def mkdtemp( suffix: Optional[str] = None, prefix: Optional[str] = None, diff --git a/src/toil/options/runner.py b/src/toil/options/runner.py index 88550360d4..858daca1be 100644 --- a/src/toil/options/runner.py +++ b/src/toil/options/runner.py @@ -25,14 +25,26 @@ def add_runner_options( help="Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance. " "If set to true, the argument --importWorkersDisk must also be set." ) - import_workers_disk_arguments = ["--importWorkersDisk"] + import_workers_threshold_argument = ["--importWorkersThreshold"] if cwl: - import_workers_disk_arguments.append("--import-workers-disk") + import_workers_threshold_argument.append("--import-workers-threshold") parser.add_argument( - *import_workers_disk_arguments, + *import_workers_threshold_argument, + dest="import_workers_threshold", + type=lambda x: human2bytes(str(x)), + default="1 GiB", + help="Specify the file size threshold that determines how many files go into a batched import. As many files will go into a batch import job until this threshold " + "is reached. This should be set in conjunction with the argument --runImportsOnWorkers." + ) + import_workers_disk_argument = ["--importWorkersDisk"] + if cwl: + import_workers_disk_argument.append("--import-workers-disk") + parser.add_argument( + *import_workers_disk_argument, dest="import_workers_disk", type=lambda x: human2bytes(str(x)), - default=None, - help="Specify the amount of disk space an import worker will use. If file streaming for input files is not available, " - "this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers." + default="1 MiB", + help="Specify the disk size each import worker will get. This may be necessary when file streaming is not possible. For example, downloading from AWS to a GCE " + "job store. If specified, this should be set to the largest file size of all files to import. This should be set in conjunction with the arguments " + "--runImportsOnWorkers and --importWorkersThreshold." ) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 2846ff0399..5f34031b7d 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -35,7 +35,24 @@ from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter from tempfile import mkstemp -from typing import IO, Any, Callable, Optional, Protocol, TypedDict, TypeVar, Union, cast +from typing import ( + Any, + Callable, + Dict, + Generator, + Iterable, + Iterator, + List, + Optional, + Sequence, + Tuple, + TypeVar, + Union, + cast, + TypedDict, + IO, + Protocol, +) if sys.version_info < (3, 11): from typing_extensions import NotRequired @@ -66,24 +83,28 @@ from toil.job import ( AcceleratorRequirement, Job, - ParseableIndivisibleResource, Promise, Promised, TemporaryID, parse_accelerator, unwrap, unwrap_all, + ParseableIndivisibleResource, + ImportsJob, + FileMetadata, + potential_absolute_uris, + get_file_sizes ) from toil.jobStores.abstractJobStore import ( AbstractJobStore, InvalidImportExportUrlException, LocatorException, - UnimplementedURLException, ) +from toil.lib.exceptions import UnimplementedURLException from toil.lib.accelerators import get_individual_local_accelerators from toil.lib.conversions import VALID_PREFIXES, convert_units, human2bytes +from toil.lib.io import mkdtemp, is_any_url, is_file_url, TOIL_URI_SCHEME, is_standard_url, is_toil_url, is_remote_url from toil.lib.integration import resolve_workflow -from toil.lib.io import mkdtemp from toil.lib.memoize import memoize from toil.lib.misc import get_user_name from toil.lib.resources import ResourceMonitor @@ -92,6 +113,16 @@ logger = logging.getLogger(__name__) + +# In regards to "toilfile:" URIs: +# We define a URI scheme kind of like but not actually compatible with the one +# we use for CWL. CWL brings along the file basename in its file type, but +# WDL.Value.File doesn't. So we need to make sure we stash that somewhere in +# the URI. +# TODO: We need to also make sure files from the same source directory end up +# in the same destination directory, when dealing with basename conflicts. + + # We want to use hashlib.file_digest to avoid a 3-line hashing loop like # MiniWDL has. But it is only in 3.11+ # @@ -115,17 +146,21 @@ class ReadableFileObj(Protocol): Would extend the protocol from Typeshed for hashlib but those are only declared for 3.11+. """ + def readinto(self, buf: bytearray, /) -> int: ... def readable(self) -> bool: ... def read(self, number: int) -> bytes: ... + class FileDigester(Protocol): """ Protocol for the features we need from hashlib.file_digest. """ + # We need __ prefixes here or the name of the argument becomes part of the required interface. def __call__(self, __f: ReadableFileObj, __alg_name: str) -> hashlib._Hash: ... + try: # Don't do a direct conditional import to the final name here because then # the polyfill needs *exactly* the signature of file_digest, and not just @@ -137,7 +172,8 @@ def __call__(self, __f: ReadableFileObj, __alg_name: str) -> hashlib._Hash: ... # # TODO: Change to checking sys.version_info because MyPy understands that # better? - from hashlib import file_digest as file_digest_impl # type: ignore[attr-defined,unused-ignore] + from hashlib import file_digest as file_digest_impl # type: ignore[attr-defined,unused-ignore] + file_digest: FileDigester = file_digest_impl except ImportError: # Polyfill file_digest from 3.11+ @@ -149,6 +185,7 @@ def file_digest_fallback_impl(f: ReadableFileObj, alg_name: str) -> hashlib._Has hasher.update(buffer) buffer = f.read(BUFFER_SIZE) return hasher + file_digest = file_digest_fallback_impl # WDL options to pass into the WDL jobs and standard libraries @@ -456,81 +493,6 @@ def first_mismatch(prefix: str, value: str) -> int: return modified -def potential_absolute_uris( - uri: str, - path: list[str], - importer: WDL.Tree.Document | None = None, - execution_dir: str | None = None, -) -> Iterator[str]: - """ - Get potential absolute URIs to check for an imported file. - - Given a URI or bare path, yield in turn all the URIs, with schemes, where we - should actually try to find it, given that we want to search under/against - the given paths or URIs, the current directory, and the given importing WDL - document if any. - """ - - if uri == "": - # Empty URIs can't come from anywhere. - return - - # We need to brute-force find this URI relative to: - # - # 1. Itself if a full URI. - # - # 2. Importer's URL, if importer is a URL and this is a - # host-root-relative URL starting with / or scheme-relative - # starting with //, or just plain relative. - # - # 3. Current directory, if a relative path. - # - # 4. All the prefixes in "path". - # - # If it can't be found anywhere, we ought to (probably) throw - # FileNotFoundError like the MiniWDL implementation does, with a - # correct errno. - # - # To do this, we have AbstractFileStore.read_from_url, which can read a - # URL into a binary-mode writable, or throw some kind of unspecified - # exception if the source doesn't exist or can't be fetched. - - # This holds scheme-applied full URIs for all the places to search. - full_path_list = [] - - if importer is not None: - # Add the place the imported file came form, to search first. - full_path_list.append(Toil.normalize_uri(importer.pos.abspath)) - - # Then the current directory. We need to make sure to include a filename component here or it will treat the current directory with no trailing / as a document and relative paths will look 1 level up. - # When importing on a worker, the cwd will be a tmpdir and will result in FileNotFoundError after os.path.abspath, so override with the execution dir - full_path_list.append(Toil.normalize_uri(execution_dir or ".") + "/.") - - # Then the specified paths. - # TODO: - # https://github.com/chanzuckerberg/miniwdl/blob/e3e8ef74e80fbe59f137b0ad40b354957915c345/WDL/Tree.py#L1479-L1482 - # seems backward actually and might do these first! - full_path_list += [Toil.normalize_uri(p) for p in path] - - # This holds all the URIs we tried and failed with. - failures: set[str] = set() - - for candidate_base in full_path_list: - # Try fetching based off each base URI - candidate_uri = urljoin(candidate_base, uri) - if candidate_uri in failures: - # Already tried this one, maybe we have an absolute uri input. - continue - logger.debug( - "Consider %s which is %s off of %s", candidate_uri, uri, candidate_base - ) - - # Try it - yield candidate_uri - # If we come back it didn't work - failures.add(candidate_uri) - - async def toil_read_source( uri: str, path: list[str], importer: WDL.Tree.Document | None ) -> ReadSourceResult: @@ -544,7 +506,7 @@ async def toil_read_source( # We track our own failures for debugging tried = [] - for candidate_uri in potential_absolute_uris(uri, path, importer): + for candidate_uri in potential_absolute_uris(uri, path, importer=importer.pos.abspath if importer else None): # For each place to try in order destination_buffer = io.BytesIO() logger.debug("Fetching %s", candidate_uri) @@ -806,16 +768,6 @@ def parse_disks( return specified_mount_point, part_size, part_suffix -# We define a URI scheme kind of like but not actually compatible with the one -# we use for CWL. CWL brings along the file basename in its file type, but -# WDL.Value.File doesn't. So we need to make sure we stash that somewhere in -# the URI. -# TODO: We need to also make sure files from the same source directory end up -# in the same destination directory, when dealing with basename conflicts. - -TOIL_URI_SCHEME = "toilfile:" - - def pack_toil_uri( file_id: FileID, task_path: str, dir_id: uuid.UUID, file_basename: str ) -> str: @@ -863,6 +815,7 @@ def unpack_toil_uri(toil_uri: str) -> tuple[FileID, str, str, str]: return file_id, task_path, parent_id, file_basename + ### # Caching machinery and file accessors ### @@ -872,6 +825,7 @@ def unpack_toil_uri(toil_uri: str) -> tuple[FileID, str, str, str]: # We store the shared FS path in an attribute on the WDL File. SHARED_PATH_ATTR = "_shared_fs_path" + def clone_metadata(old_file: WDL.Value.File, new_file: WDL.Value.File) -> None: """ Copy all Toil metadata from one WDL File to another. @@ -880,6 +834,7 @@ def clone_metadata(old_file: WDL.Value.File, new_file: WDL.Value.File) -> None: if hasattr(old_file, attribute): setattr(new_file, attribute, getattr(old_file, attribute)) + def set_file_value(file: WDL.Value.File, new_value: str) -> WDL.Value.File: """ Return a copy of a WDL File with all metadata intact but the value changed. @@ -889,6 +844,7 @@ def set_file_value(file: WDL.Value.File, new_value: str) -> WDL.Value.File: clone_metadata(file, new_file) return new_file + def set_file_nonexistent(file: WDL.Value.File, nonexistent: bool) -> WDL.Value.File: """ Return a copy of a WDL File with all metadata intact but the nonexistent flag set to the given value. @@ -898,13 +854,17 @@ def set_file_nonexistent(file: WDL.Value.File, nonexistent: bool) -> WDL.Value.F setattr(new_file, "nonexistent", nonexistent) return new_file + def get_file_nonexistent(file: WDL.Value.File) -> bool: """ Return the nonexistent flag for a file. """ return cast(bool, getattr(file, "nonexistent", False)) -def set_file_virtualized_value(file: WDL.Value.File, virtualized_value: str) -> WDL.Value.File: + +def set_file_virtualized_value( + file: WDL.Value.File, virtualized_value: str +) -> WDL.Value.File: """ Return a copy of a WDL File with all metadata intact but the virtualized_value attribute set to the given value. """ @@ -913,12 +873,14 @@ def set_file_virtualized_value(file: WDL.Value.File, virtualized_value: str) -> setattr(new_file, "virtualized_value", virtualized_value) return new_file + def get_file_virtualized_value(file: WDL.Value.File) -> Optional[str]: """ Get the virtualized storage location for a file. """ return cast(Optional[str], getattr(file, "virtualized_value", None)) + def get_shared_fs_path(file: WDL.Value.File) -> Optional[str]: """ If a File has a shared filesystem path, get that path. @@ -927,10 +889,13 @@ def get_shared_fs_path(file: WDL.Value.File) -> Optional[str]: """ if hasattr(file, SHARED_PATH_ATTR): result = cast(str, getattr(file, SHARED_PATH_ATTR)) - assert not result.startswith("file://"), f"Found URI shared FS path of {result} on {file}" + assert not result.startswith( + "file://" + ), f"Found URI shared FS path of {result} on {file}" return result return None + def set_shared_fs_path(file: WDL.Value.File, path: str) -> WDL.Value.File: """ Return a copy of the given File associated with the given shared filesystem path. @@ -938,28 +903,39 @@ def set_shared_fs_path(file: WDL.Value.File, path: str) -> WDL.Value.File: This should be the path it was initially imported from, or the path that it has in the call cache. """ # We should not have URLs here, only real paths. - assert not path.startswith("file://"), f"Cannot assign URI shared FS path of {path} to {file}" + assert not path.startswith( + "file://" + ), f"Cannot assign URI shared FS path of {path} to {file}" new_file = WDL.Value.File(file.value, file.expr) clone_metadata(file, new_file) setattr(new_file, SHARED_PATH_ATTR, path) return new_file -def view_shared_fs_paths(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> WDL.Env.Bindings[WDL.Value.Base]: + +def view_shared_fs_paths( + bindings: WDL.Env.Bindings[WDL.Value.Base], +) -> WDL.Env.Bindings[WDL.Value.Base]: """ Given WDL bindings, return a copy where all files have their shared filesystem paths as their values. """ + def file_path_to_use(file: WDL.Value.File) -> WDL.Value.File: """ Return a File at the shared FS path if we have one, or the original File otherwise. """ shared_path = get_shared_fs_path(file) result_path = shared_path or file.value - assert not result_path.startswith("file://"), f"Found file URI {result_path} instead of a path for file {file}" + assert not result_path.startswith( + "file://" + ), f"Found file URI {result_path} instead of a path for file {file}" return set_file_value(file, result_path) return map_over_files_in_bindings(bindings, file_path_to_use) -def poll_execution_cache(node: Union[WDL.Tree.Workflow, WDL.Tree.Task], bindings: WDLBindings) -> tuple[WDLBindings | None, str]: + +def poll_execution_cache( + node: Union[WDL.Tree.Workflow, WDL.Tree.Task], bindings: WDLBindings +) -> tuple[WDLBindings | None, str]: """ Return the cached result of calling this workflow or task, and its key. @@ -971,12 +947,14 @@ def poll_execution_cache(node: Union[WDL.Tree.Workflow, WDL.Tree.Task], bindings transformed_bindings = view_shared_fs_paths(bindings) log_bindings(logger.debug, "Digesting input bindings:", [transformed_bindings]) input_digest = WDL.Value.digest_env(transformed_bindings) - cache_key=f"{node.name}/{node.digest}/{input_digest}" + cache_key = f"{node.name}/{node.digest}/{input_digest}" miniwdl_logger = logging.getLogger("MiniWDL") # TODO: Ship config from leader? It might not see the right environment. miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, transformed_bindings, node.effective_outputs) + cached_result: Optional[WDLBindings] = miniwdl_cache.get( + cache_key, transformed_bindings, node.effective_outputs + ) if cached_result is not None: logger.info("Found call in cache") return cached_result, cache_key @@ -984,7 +962,15 @@ def poll_execution_cache(node: Union[WDL.Tree.Workflow, WDL.Tree.Task], bindings logger.debug("No cache hit for %s", cache_key) return None, cache_key -def fill_execution_cache(cache_key: str, output_bindings: WDLBindings, file_store: AbstractFileStore, wdl_options: WDLContext, miniwdl_logger: Optional[logging.Logger] = None, miniwdl_config: Optional[WDL.runtime.config.Loader] = None) -> WDLBindings: + +def fill_execution_cache( + cache_key: str, + output_bindings: WDLBindings, + file_store: AbstractFileStore, + wdl_options: WDLContext, + miniwdl_logger: Optional[logging.Logger] = None, + miniwdl_config: Optional[WDL.runtime.config.Loader] = None, +) -> WDLBindings: """ Cache the result of calling a workflow or task. @@ -1019,7 +1005,9 @@ def fill_execution_cache(cache_key: str, output_bindings: WDLBindings, file_stor # # In that case we just pout up with useless/unreferenced files in the # cache. - output_directory = os.path.join(miniwdl_cache._call_cache_dir, cache_key, str(uuid.uuid4())) + output_directory = os.path.join( + miniwdl_cache._call_cache_dir, cache_key, str(uuid.uuid4()) + ) # Adjust all files in the output bindings to have shared FS paths outside the job store. def assign_shared_fs_path(file: WDL.Value.File) -> WDL.Value.File: @@ -1040,7 +1028,9 @@ def assign_shared_fs_path(file: WDL.Value.File) -> WDL.Value.File: if virtualized is None: # TODO: If we're passing things around by URL reference and # some of them are file: is this actually allowed? - raise RuntimeError(f"File {file} caught escaping from task unvirtualized") + raise RuntimeError( + f"File {file} caught escaping from task unvirtualized" + ) # We need to save this file somewhere. # This needs to exist before we can export to it. And now we know @@ -1056,13 +1046,14 @@ def assign_shared_fs_path(file: WDL.Value.File) -> WDL.Value.File: wdl_options, devirtualized_to_virtualized, virtualized_to_devirtualized, - export=True + export=True, ) # Remember where it went file = set_shared_fs_path(file, exported_path) return file + output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) # Save the bindings to the cache, representing all files with their shared filesystem paths. @@ -1249,51 +1240,71 @@ def _call_eager( # Return the result as a WDL float value return WDL.Value.Float(total_size) -STANDARD_SCHEMES = ["http:", "https:", "s3:", "gs:", "ftp:"] -REMOTE_SCHEMES = STANDARD_SCHEMES + [TOIL_URI_SCHEME] -ALL_SCHEMES = REMOTE_SCHEMES + ["file:"] -def is_toil_url(filename: str) -> bool: - return is_url_with_scheme(filename, [TOIL_URI_SCHEME]) +def extract_workflow_inputs(environment: WDLBindings) -> list[str]: + filenames = list() -def is_file_url(filename: str) -> bool: - return is_url_with_scheme(filename, ["file:"]) + def add_filename(file: WDL.Value.File) -> WDL.Value.File: + filenames.append(file.value) + return file -def is_standard_url(filename: str) -> bool: - return is_url_with_scheme(filename, STANDARD_SCHEMES) + map_over_files_in_bindings(environment, add_filename) + return filenames -def is_remote_url(filename: str) -> bool: - """ - Decide if a filename is a known, non-file kind of URL +def convert_files( + environment: WDLBindings, + file_to_id: Dict[str, FileID], + file_to_data: Dict[str, FileMetadata], + task_path: str, +) -> WDLBindings: """ - return is_url_with_scheme(filename, REMOTE_SCHEMES) + Resolve relative-URI files in the given environment convert the file values to a new value made from a given mapping. -def is_any_url(filename: str) -> bool: - """ - Decide if a string is a URI like http:// or file://. + Will return bindings with file values set to their corresponding relative-URI. - Otherwise it might be a bare path. + :param environment: Bindings to evaluate on + :return: new bindings object """ - return is_url_with_scheme(filename, ALL_SCHEMES) + dir_ids = {t[1] for t in file_to_data.values()} + dir_to_id = {k: uuid.uuid4() for k in dir_ids} + + def convert_file_to_uri(file: WDL.Value.File) -> WDL.Value.File: + """ + Calls import_filename to detect if a potential URI exists and imports it. Will modify the File object value to the new URI and tack on the virtualized file. + """ + candidate_uri = file_to_data[file.value][0] + file_id = file_to_id[candidate_uri] + + # Work out what the basename for the file was + file_basename = os.path.basename(urlsplit(candidate_uri).path) + + if file_basename == "": + # We can't have files with no basename because we need to + # download them at that basename later. + raise RuntimeError( + f"File {candidate_uri} has no basename and so cannot be a WDL File" + ) + + toil_uri = pack_toil_uri( + file_id, task_path, dir_to_id[file_to_data[file.value][1]], file_basename + ) + + # Don't mutate the original file object + new_file = WDL.Value.File(file.value) + setattr(new_file, "virtualized_value", toil_uri) + return new_file + + return map_over_files_in_bindings(environment, convert_file_to_uri) -def is_url_with_scheme(filename: str, schemes: list[str]) -> bool: - """ - Return True if filename is a URL with any of the given schemes and False otherwise. - """ - # TODO: "http:myfile.dat" is a valid filename and *not* a valid URL - for scheme in schemes: - if filename.startswith(scheme): - return True - return False def convert_remote_files( environment: WDLBindings, file_source: AbstractJobStore, task_path: str, - search_paths: list[str] | None = None, + search_paths: Optional[list[str]] = None, import_remote_files: bool = True, - execution_dir: str | None = None, + execution_dir: Optional[str] = None, ) -> WDLBindings: """ Resolve relative-URI files in the given environment and import all files. @@ -1444,8 +1455,12 @@ def convert_file_to_uri(file: WDL.Value.File) -> WDL.Value.File: # Was actually found and imported assert candidate_uri is not None assert toil_uri is not None - new_file = set_file_virtualized_value(set_file_value(file, candidate_uri), toil_uri) - if candidate_uri is not None and (is_file_url(candidate_uri) or not is_any_url(candidate_uri)): + new_file = set_file_virtualized_value( + set_file_value(file, candidate_uri), toil_uri + ) + if candidate_uri is not None and ( + is_file_url(candidate_uri) or not is_any_url(candidate_uri) + ): # We imported a file so we have a local path assert candidate_uri is not None if is_file_url(candidate_uri): @@ -1571,8 +1586,12 @@ def _read( # but I need to virtualize as well, so I can't remove one or the other. def _f(file: WDL.Value.File) -> WDL.Value.Base: if get_file_virtualized_value(file) is None: - file = set_file_virtualized_value(file, self._virtualize_filename(file.value)) - with open(self._devirtualize_filename(get_file_virtualized_value(file)), "r") as infile: + file = set_file_virtualized_value( + file, self._virtualize_filename(file.value) + ) + with open( + self._devirtualize_filename(get_file_virtualized_value(file)), "r" + ) as infile: return parse(infile.read()) return _f @@ -1606,7 +1625,11 @@ def _devirtualize_file(self, file: WDL.Value.File) -> WDL.Value.File: if virtualized_filename is not None: devirtualized_path = self._devirtualize_filename(virtualized_filename) file = set_file_value(file, devirtualized_path) - logger.debug("For virtualized filename %s got devirtualized file %s", virtualized_filename, file) + logger.debug( + "For virtualized filename %s got devirtualized file %s", + virtualized_filename, + file, + ) else: logger.debug("File has no virtualized value so not changing value") return file @@ -1637,7 +1660,9 @@ def _virtualize_file( logger.debug("File appears nonexistent so marking it nonexistent") return set_file_nonexistent(file, True) virtualized_filename = self._virtualize_filename(file.value) - logger.debug('For file %s got virtualized filename %s', file, virtualized_filename) + logger.debug( + "For file %s got virtualized filename %s", file, virtualized_filename + ) marked_file = set_file_virtualized_value(file, virtualized_filename) return marked_file @@ -1712,9 +1737,7 @@ def _devirtualize_uri( file_id, dest_path, mutable=False, symlink=True ) else: - raise RuntimeError( - f"Unsupported file source: {file_source}" - ) + raise RuntimeError(f"Unsupported file source: {file_source}") else: # Download to a local file with the right name and execute bit. # Open it exclusively @@ -1904,6 +1927,7 @@ def _virtualize_filename(self, filename: str) -> str: self._virtualized_to_devirtualized[result] = abs_filename return result + class ToilWDLStdLibWorkflow(ToilWDLStdLibBase): """ Standard library implementation for workflow scope. @@ -1952,21 +1976,29 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: miniwdl_logger = logging.getLogger("MiniWDL") # TODO: Ship config from leader? It might not see the right environment. miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) - self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + self._miniwdl_cache = WDL.runtime.cache.new( + miniwdl_config, miniwdl_logger + ) # TODO: If we did this before the _virtualize_filename call in the # base _write we wouldn't need to immediately devirtualize. But we # have internal caches to lean on. devirtualized_filename = self._devirtualize_filename(virtualized_file.value) # Hash the file to hex - hex_digest = file_digest(open(devirtualized_filename, "rb"), "sha256").hexdigest() + hex_digest = file_digest( + open(devirtualized_filename, "rb"), "sha256" + ).hexdigest() file_input_bindings = WDL.Env.Bindings( - WDL.Env.Binding("file_sha256", cast(WDL.Value.Base, WDL.Value.String(hex_digest))) + WDL.Env.Binding( + "file_sha256", cast(WDL.Value.Base, WDL.Value.String(hex_digest)) + ) ) # Make an environment of "file_sha256" to that as a WDL string, and # digest that, and make a write_ cache key. No need to transform to # shared FS paths sonce no paths are in it. - log_bindings(logger.debug, "Digesting file bindings:", [file_input_bindings]) + log_bindings( + logger.debug, "Digesting file bindings:", [file_input_bindings] + ) input_digest = WDL.Value.digest_env(file_input_bindings) file_cache_key = "write_/" + input_digest # Construct a description of the types we expect to get from the @@ -2007,7 +2039,7 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: self._wdl_options, {}, {}, - export=True + export=True, ) # Save the cache entry pointing to it @@ -2015,7 +2047,7 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: file_cache_key, WDL.Env.Bindings( WDL.Env.Binding("file", WDL.Value.File(exported_path)) - ) + ), ) # Apply the shared filesystem path to the virtualized file @@ -2559,6 +2591,7 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: task_container.input_path_map[host_path] = container_path task_container.input_path_map_rev[container_path] = host_path + def drop_if_missing( file: WDL.Value.File, standard_library: ToilWDLStdLibBase ) -> WDL.Value.File | None: @@ -2721,7 +2754,9 @@ def map_over_typed_files_in_value( # This is a file so we need to process it orig_file_value = value.value new_file = transform(value) - assert value.value == orig_file_value, "Transformation mutated the original File" + assert ( + value.value == orig_file_value + ), "Transformation mutated the original File" if new_file is None: # Assume the transform checked types if we actually care about the # result. @@ -3052,7 +3087,11 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # TODO: What if the same file is passed through several tasks, and # we get cache hits on those tasks? Won't we upload it several # times? - return self.postprocess(virtualize_files(cached_result, standard_library, enforce_existence=False)) + return self.postprocess( + virtualize_files( + cached_result, standard_library, enforce_existence=False + ) + ) if self._task.inputs: logger.debug("Evaluating task code") @@ -3563,7 +3602,9 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # A current limitation with the singularity/miniwdl cache is it cannot check for image updates if the # filename is the same singularity_cache = os.path.join(os.path.expanduser("~"), ".singularity") - miniwdl_singularity_cache = os.path.join(os.path.expanduser("~"), ".cache/miniwdl") + miniwdl_singularity_cache = os.path.join( + os.path.expanduser("~"), ".cache/miniwdl" + ) # Cache Singularity's layers somewhere known to have space os.environ["SINGULARITY_CACHEDIR"] = os.environ.get( @@ -4002,7 +4043,14 @@ def get_path_in_container(file: WDL.Value.File) -> WDL.Value.File | None: if self._cache_key is not None: # We might need to save to the execution cache - output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options, miniwdl_logger=miniwdl_logger, miniwdl_config=miniwdl_config) + output_bindings = fill_execution_cache( + self._cache_key, + output_bindings, + file_store, + self._wdl_options, + miniwdl_logger=miniwdl_logger, + miniwdl_config=miniwdl_config, + ) # Do postprocessing steps to e.g. apply namespaces. output_bindings = self.postprocess(output_bindings) @@ -5019,7 +5067,11 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # or calculated inputs filled in). cached_result, cache_key = poll_execution_cache(self._workflow, bindings) if cached_result is not None: - return self.postprocess(virtualize_files(cached_result, standard_library, enforce_existence=False)) + return self.postprocess( + virtualize_files( + cached_result, standard_library, enforce_existence=False + ) + ) if self._workflow.inputs: try: @@ -5046,7 +5098,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: sink.rv(), wdl_options=self._wdl_options, cache_key=cache_key, - local=True + local=True, ) sink.addFollowOn(outputs_job) # Caller is responsible for making sure namespaces are applied @@ -5167,7 +5219,9 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: ) if self._cache_key is not None: - output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options) + output_bindings = fill_execution_cache( + self._cache_key, output_bindings, file_store, self._wdl_options + ) return self.postprocess(output_bindings) @@ -5182,7 +5236,7 @@ class WDLStartJob(WDLSectionJob): def __init__( self, target: WDL.Tree.Workflow | WDL.Tree.Task, - inputs: WDLBindings, + inputs: Promised[WDLBindings], wdl_options: WDLContext, **kwargs: Any, ) -> None: @@ -5200,13 +5254,14 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: """ Actually build the subgraph. """ + inputs = unwrap(self._inputs) super().run(file_store) if isinstance(self._target, WDL.Tree.Workflow): # Create a workflow job. We rely in this to handle entering the input # namespace if needed, or handling free-floating inputs. job: WDLBaseJob = WDLWorkflowJob( self._target, - [self._inputs], + [inputs], [self._target.name], wdl_options=self._wdl_options, local=True, @@ -5215,7 +5270,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # There is no workflow. Create a task job. job = WDLTaskWrapperJob( self._target, - [self._inputs], + [inputs], [self._target.name], wdl_options=self._wdl_options, local=True, @@ -5227,15 +5282,52 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: return job.rv() -class WDLImportJob(WDLSectionJob): +class WDLInstallImportsJob(Job): def __init__( self, - target: WDL.Tree.Workflow | WDL.Tree.Task, + task_path: str, + inputs: WDLBindings, + import_data: Promised[Tuple[Dict[str, FileID], Dict[str, FileMetadata]]], + **kwargs: Any, + ) -> None: + """ + Job to take the inputs from the WDL workflow and a mapping of filenames to imported URIs + to convert all file locations to URIs in each binding. + + This class is only used when runImportsOnWorkers is enabled. + """ + super().__init__(local=True, **kwargs) + self._import_data = import_data + self._inputs = inputs + self._task_path = task_path + + def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: + """ + Convert the filenames in the workflow inputs ito the URIs + :return: Promise of transformed workflow inputs + """ + candidate_to_fileid = unwrap(self._import_data)[0] + file_to_data = unwrap(self._import_data)[1] + return convert_files(self._inputs, candidate_to_fileid, file_to_data, self._task_path) + + +class WDLImportWrapper(WDLSectionJob): + """ + Job to organize importing files on workers instead of the leader. Responsible for extracting filenames and metadata, + calling ImportsJob, applying imports to input bindings, and scheduling the start workflow job + + This class is only used when runImportsOnWorkers is enabled. + """ + + def __init__( + self, + target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, wdl_options: WDLContext, - path: list[str] | None = None, - skip_remote: bool = False, - disk_size: ParseableIndivisibleResource | None = None, + inputs_search_path: list[str], + import_remote_files: bool, + import_workers_threshold: ParseableIndivisibleResource, + import_workers_disk: ParseableIndivisibleResource, **kwargs: Any, ): """ @@ -5243,30 +5335,38 @@ def __init__( This class is only used when runImportsOnWorkers is enabled. """ - super().__init__(wdl_options=wdl_options, local=False, disk=disk_size, **kwargs) - self._target = target + super().__init__(local=True, wdl_options=wdl_options, **kwargs) self._inputs = inputs - self._path = path - self._skip_remote = skip_remote + self._target = target + self._inputs_search_path = inputs_search_path + self._import_remote_files = import_remote_files + self._import_workers_threshold = import_workers_threshold + self._import_workers_disk = import_workers_disk def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: - """ - Import the workflow inputs and then create and run the workflow. - :return: Promise of workflow outputs - """ - imported_inputs = convert_remote_files( - self._inputs, + filenames = extract_workflow_inputs(self._inputs) + file_to_data = get_file_sizes( + filenames, file_store.jobStore, - self._target.name, - self._path, - self._skip_remote, - self._wdl_options.get("execution_dir") + self._inputs_search_path, + include_remote_files=self._import_remote_files, + execution_dir=self._wdl_options.get("execution_dir") ) - root_job = WDLStartJob( - self._target, imported_inputs, wdl_options=self._wdl_options + imports_job = ImportsJob(file_to_data, self._import_workers_threshold, self._import_workers_disk) + self.addChild(imports_job) + install_imports_job = WDLInstallImportsJob( + self._target.name, self._inputs, imports_job.rv() + ) + self.addChild(install_imports_job) + imports_job.addFollowOn(install_imports_job) + + start_job = WDLStartJob( + self._target, install_imports_job.rv(), wdl_options=self._wdl_options ) - self.addChild(root_job) - return root_job.rv() + self.addChild(start_job) + install_imports_job.addFollowOn(start_job) + + return start_job.rv() def make_root_job( @@ -5278,14 +5378,14 @@ def make_root_job( options: Namespace, ) -> WDLSectionJob: if options.run_imports_on_workers: - # Run WDL imports on a worker instead - root_job: WDLSectionJob = WDLImportJob( + root_job: WDLSectionJob = WDLImportWrapper( target, inputs, wdl_options=wdl_options, - path=inputs_search_path, - skip_remote=options.reference_inputs, - disk_size=options.import_workers_disk, + inputs_search_path=inputs_search_path, + import_remote_files=options.reference_inputs, + import_workers_threshold=options.import_workers_threshold, + import_workers_disk=options.import_workers_disk ) else: # Run WDL imports on leader @@ -5319,12 +5419,6 @@ def main() -> None: # TODO: Move cwltoil's generate_default_job_store where we can use it options.jobStore = os.path.join(mkdtemp(), "tree") - # Take care of incompatible arguments related to file imports - if options.run_imports_on_workers is True and options.import_workers_disk is None: - raise RuntimeError( - "Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers." - ) - # Having an nargs=? option can put a None in our inputs list, so drop that. input_sources = [x for x in options.inputs_uri if x is not None] if len(input_sources) > 1: @@ -5487,14 +5581,6 @@ def main() -> None: # Get the execution directory execution_dir = os.getcwd() - imported_inputs = convert_remote_files( - input_bindings, - toil._jobStore, - task_path=target.name, - search_paths=inputs_search_path, - import_remote_files=options.reference_inputs, - ) - # Configure workflow interpreter options wdl_options: WDLContext = { "execution_dir": execution_dir, @@ -5508,7 +5594,7 @@ def main() -> None: # Run the workflow and get its outputs namespaced with the workflow name. root_job = make_root_job( target, - imported_inputs, + input_bindings, inputs_search_path, toil, wdl_options,