diff --git a/cdmtaskservice/app_state.py b/cdmtaskservice/app_state.py index cf00a04..3a4df67 100644 --- a/cdmtaskservice/app_state.py +++ b/cdmtaskservice/app_state.py @@ -14,6 +14,7 @@ from fastapi import FastAPI, Request from cdmtaskservice import models from cdmtaskservice.config import CDMTaskServiceConfig +from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.image_remote_lookup import DockerImageInfo from cdmtaskservice.images import Images from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner @@ -57,6 +58,7 @@ async def build_app( # This method is getting pretty long but it's stupid simple so... # May want to parallelize some of this for faster startups. would need to rework prints logr = logging.getLogger(__name__) + coman = await CoroutineWrangler.create() logr.info("Connecting to KBase auth service... ") auth = await KBaseAuth.create( cfg.auth_url, @@ -102,7 +104,7 @@ async def build_app( runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow} imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute()) images = Images(mongodao, imginfo) - job_submit = JobSubmit(mongodao, s3) + job_submit = JobSubmit(mongodao, s3, coman, runners) app.state._mongo = mongocli app.state._cdmstate = AppState( auth, sfapi_client, s3, job_submit, job_state, images, runners diff --git a/cdmtaskservice/callback_url_paths.py b/cdmtaskservice/callback_url_paths.py new file mode 100644 index 0000000..4cc6209 --- /dev/null +++ b/cdmtaskservice/callback_url_paths.py @@ -0,0 +1,20 @@ +""" +A module for determining paths for callback URLs for the service. +""" + + +_CALLBACK = "callback" +_DOWNLOAD_COMPLETE = "download" + + +def get_download_complete_callback(root_url: str = None, job_id: str = None): + """ + Get a url or path for a service callback to communicate that a download is complete. + + root_url - prepend the path with the given root url. + job_id - suffix the path with a job ID. + """ + cb = [root_url] if root_url else [] + cb += [_CALLBACK, _DOWNLOAD_COMPLETE] + cb += [job_id] if job_id else [] + return "/".join(cb) diff --git a/cdmtaskservice/job_submit.py b/cdmtaskservice/job_submit.py index 2ecce30..605a54c 100644 --- a/cdmtaskservice/job_submit.py +++ b/cdmtaskservice/job_submit.py @@ -3,10 +3,12 @@ """ import uuid +from typing import Any from cdmtaskservice import kb_auth from cdmtaskservice import models from cdmtaskservice.arg_checkers import not_falsy as _not_falsy +from cdmtaskservice.coroutine_manager import CoroutineWrangler from cdmtaskservice.image_remote_lookup import parse_image_name from cdmtaskservice.mongo import MongoDAO from cdmtaskservice.s3.client import S3Client @@ -18,13 +20,23 @@ class JobSubmit: A manager for submitting CDM jobs. """ - def __init__(self, mongo: MongoDAO, s3client: S3Client): + def __init__( + self, + mongo: MongoDAO, + s3client: S3Client, + coro_manager: CoroutineWrangler, + job_runners: dict[models.Cluster, Any], # Make abstract class if necessary + ): """ mongo - a MongoDB DAO object. s3Client - an S3Client pointed at the S3 storage system to use. + coro_manager - a coroutine manager. + job_runners - a mapping of remote compute cluster to the job runner for that cluster. """ self._s3 = _not_falsy(s3client, "s3client") self._mongo = _not_falsy(mongo, "mongo") + self._coman = _not_falsy(coro_manager, "coro_manager") + self._runners = _not_falsy(job_runners, "job_runners") async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str: """ @@ -76,6 +88,7 @@ async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> s ) # TDDO JOBSUBMIT if reference data is required, is it staged? await self._mongo.save_job(job) + await self._coman.run_coroutine(self._runners[job.job_input.cluster].start_job(job)) return job_id diff --git a/cdmtaskservice/jobflows/nersc_jaws.py b/cdmtaskservice/jobflows/nersc_jaws.py index 975a9b8..a7cc39f 100644 --- a/cdmtaskservice/jobflows/nersc_jaws.py +++ b/cdmtaskservice/jobflows/nersc_jaws.py @@ -2,10 +2,15 @@ Manages running jobs at NERSC using the JAWS system. """ +import logging + +from cdmtaskservice import models from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string from cdmtaskservice.job_state import JobState from cdmtaskservice.nersc.manager import NERSCManager from cdmtaskservice.s3.client import S3Client +from cdmtaskservice.s3.paths import S3Paths +from cdmtaskservice.callback_url_paths import get_download_complete_callback # Not sure how other flows would work and how much code they might share. For now just make # this work and pull it apart / refactor later. @@ -50,3 +55,45 @@ def __init__( self._jtoken = _require_string(jaws_token, "jaws_token") self._jgroup = _require_string(jaws_group, "jaws_group") self._callback_root = _require_string(service_root_url, "service_root_url") + + async def start_job(self, job: models.Job): + """ + Start running a job. It is expected that the Job has been persisted to the data + storage system and is in the created state. It is further assumed that the input files + are of the S3File type, not bare strings. + """ + if _not_falsy(job, "job").state != models.JobState.CREATED: + raise ValueError("job must be in the created state") + logr = logging.getLogger(__name__) + # TODO PERF this validates the file paths yet again. Maybe the way to go is just have + # a validate method on S3Paths which can be called or not as needed, with + # a validated state boolean + paths = S3Paths([p.file for p in job.job_input.input_files]) + try: + # This is making an S3 call again after the call in job_submit. If that's too expensive + # pass in the S3meta as well + # TODO PERF config / set concurrency + # TODO NOW pass this in to avoid race conditions w/ etags + meta = await self._s3.get_object_meta(paths) + # TODO RELIABILITY config / set expiration time + presigned = await self._s3ext.presign_get_urls(paths) + callback_url = get_download_complete_callback(self._callback_root, job.id) + # TODO PERF config / set concurrency + # TODO PERF CACHING if the same files are used for a different job they're d/ld again + # Instead d/l to a shared file location by etag or something + # Need to ensure atomic writes otherwise 2 processes trying to + # d/l the same file could corrupt it + # Either make cache in JAWS staging area, in which case files + # will be deleted automatically by JAWS, or need own file deletion + # TODO DISKSPACE will need to clean up job downloads @ NERSC + # TODO LOGGING make the remote code log summary of results and upload and store + # TODO NOW how get remote paths at next step? + # TODO NOW store task IDs + task_id = await self._nman.download_s3_files( + job.id, meta, presigned, callback_url, insecure_ssl=self._s3insecure + ) + except Exception as e: + # TODO LOGGING figure out how logging it going to work etc. + logr.exception(f"Error starting download for job {job.id}") + # TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise + raise e diff --git a/cdmtaskservice/nersc/manager.py b/cdmtaskservice/nersc/manager.py index 11ff8fc..1382629 100644 --- a/cdmtaskservice/nersc/manager.py +++ b/cdmtaskservice/nersc/manager.py @@ -28,6 +28,11 @@ # TODO CLEANUP clean up old code versions @NERSC somehow. Not particularly important +_DT_TARGET = Machine.dtns +# TDOO NERSCUIPDATE delete the following line when DTN downloads work. +# See https://nersc.servicenowservices.com/sp?sys_id=ad33e85f1b5a5610ac81a820f54bcba0&view=sp&id=ticket&table=incident +_DT_TARGET = Machine.perlmutter + _COMMAND_PATH = "utilities/command" _MIN_TIMEOUT_SEC = 300 @@ -125,7 +130,7 @@ async def _setup_remote_code(self): # TODO RELIABILITY atomically write files. For these small ones probably doesn't matter? cli = self._client_provider() perlmutter = await cli.compute(Machine.perlmutter) - dtns = await cli.compute(Machine.dtns) + dt = await cli.compute(_DT_TARGET) async with asyncio.TaskGroup() as tg: for mod in _CTS_DEPENDENCIES: target = self._nersc_code_path @@ -142,7 +147,7 @@ async def _setup_remote_code(self): bio=io.BytesIO(_PROCESS_DATA_XFER_MANIFEST.encode()), make_exe=True, )) - res = tg.create_task(dtns.run('bash -c "echo $SCRATCH"')) + res = tg.create_task(dt.run('bash -c "echo $SCRATCH"')) if _PIP_DEPENDENCIES: deps = " ".join( # may need to do something else if module doesn't have __version__ @@ -252,9 +257,9 @@ async def _process_manifest( ): path = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job_id / filename cli = self._client_provider() - dtn = await cli.compute(Machine.dtns) + dt = await cli.compute(_DT_TARGET) # TODO CLEANUP manifests after some period of time - await self._upload_file_to_nersc(dtn, path, bio=manifest) + await self._upload_file_to_nersc(dt, path, bio=manifest) command = ( "bash -c '" + f"export CTS_CODE_LOCATION={self._nersc_code_path}; " @@ -264,7 +269,7 @@ async def _process_manifest( + f'"$CTS_CODE_LOCATION"/{_PROCESS_DATA_XFER_MANIFEST_FILENAME}' + "'" ) - task_id = (await self._run_command(cli, Machine.dtns, command))["task_id"] + task_id = (await self._run_command(cli, _DT_TARGET, command))["task_id"] # TODO LOGGING figure out how to handle logging, see other logging todos logging.getLogger(__name__).info( f"Created {task_type} task with id {task_id} for job {job_id}") diff --git a/cdmtaskservice/nersc/remote.py b/cdmtaskservice/nersc/remote.py index 0beb65d..6706305 100644 --- a/cdmtaskservice/nersc/remote.py +++ b/cdmtaskservice/nersc/remote.py @@ -30,13 +30,16 @@ def process_data_transfer_manifest(manifest_file_path: str, callback_url: str): # TODO TEST add tests for this and its dependency functions, including logging # Potential performance improvement could include a shared cross job cache for files # only useful if jobs are reusing the same files, whcih seems def possible + # TODO LOGGINNG logging doesn't work + # TODO IMPORTANT ERRORHANDLING write an output file that can be read by the CTS + # sfapi tasks only last for 10 minutes after completions log = logging.getLogger(__name__) try: with open(manifest_file_path) as f: manifest = json.load(f) asyncio.run(s3_pdtm(manifest["file-transfers"])) finally: - log.info(f"pinging callback url {callback_url}") + log.info(f"Pinging callback url {callback_url}") ret = requests.get(callback_url) if ret.status_code < 200 or ret.status_code > 299: log.error(ret.text) diff --git a/docker-compose.yaml b/docker-compose.yaml index e94f97c..6bf748d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -11,6 +11,11 @@ services: dockerfile: Dockerfile ports: - 5000:5000 + #extra_hosts: + # TODO DOCKERUPGRADE after upgrading docker switch from host mode + # See https://stackoverflow.com/a/43541732/643675 + #- "host.docker.internal:host-gateway" + network_mode: host depends_on: - mongodb - minio @@ -25,12 +30,19 @@ services: # Don't commit your token to github please - KBCTS_JAWS_TOKEN=tokengoeshere - KBCTS_JAWS_GROUP=kbase - - KBCTS_S3_URL=http://minio:9000 + # TODO DOCKERUPGRADE after upgrading docker switch to host.docker.internal + - KBCTS_S3_URL=http://localhost:9002 + #- KBCTS_S3_URL=http://host.docker.internal:9002 + # local Minio, only useful if not running jobs on NERSC + #- KBCTS_S3_URL=http://minio:9000 - KBCTS_S3_EXTERNAL_URL=https://ci.berkeley.kbase.us:9000 - KBCTS_VERIFY_S3_EXTERNAL_URL=false - KBCTS_S3_ACCESS_KEY=miniouser - KBCTS_S3_ACCESS_SECRET=miniopassword - - KBCTS_MONGO_HOST=mongodb://mongodb:27017 + - KBCTS_S3_ALLOW_INSECURE=true + # TODO DOCKERUPGRADE switch back when host mode no longer needed + - KBCTS_MONGO_HOST=mongodb://localhost:27017 + #- KBCTS_MONGO_HOST=mongodb://mongodb:27017 - KBCTS_MONGO_DB=cdmtaskservice # TODO MONGOAUTH need to add a user to the cmdtaskservice db before this works out # of the box. diff --git a/test/callback_url_paths_test.py b/test/callback_url_paths_test.py new file mode 100644 index 0000000..8bde072 --- /dev/null +++ b/test/callback_url_paths_test.py @@ -0,0 +1,7 @@ +# TODO TEST add tests + +from cdmtaskservice import callback_url_paths # @UnusedImport + + +def test_noop(): + pass