Skip to content

Commit

Permalink
Merge pull request #128 from kbase/dev-service
Browse files Browse the repository at this point in the history
Download files to NERSC on job submission
  • Loading branch information
MrCreosote authored Dec 12, 2024
2 parents 28874c1 + 8edd17b commit 4cacd35
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 10 deletions.
4 changes: 3 additions & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from fastapi import FastAPI, Request
from cdmtaskservice import models
from cdmtaskservice.config import CDMTaskServiceConfig
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.image_remote_lookup import DockerImageInfo
from cdmtaskservice.images import Images
from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner
Expand Down Expand Up @@ -57,6 +58,7 @@ async def build_app(
# This method is getting pretty long but it's stupid simple so...
# May want to parallelize some of this for faster startups. would need to rework prints
logr = logging.getLogger(__name__)
coman = await CoroutineWrangler.create()
logr.info("Connecting to KBase auth service... ")
auth = await KBaseAuth.create(
cfg.auth_url,
Expand Down Expand Up @@ -102,7 +104,7 @@ async def build_app(
runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow}
imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute())
images = Images(mongodao, imginfo)
job_submit = JobSubmit(mongodao, s3)
job_submit = JobSubmit(mongodao, s3, coman, runners)
app.state._mongo = mongocli
app.state._cdmstate = AppState(
auth, sfapi_client, s3, job_submit, job_state, images, runners
Expand Down
20 changes: 20 additions & 0 deletions cdmtaskservice/callback_url_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
A module for determining paths for callback URLs for the service.
"""


_CALLBACK = "callback"
_DOWNLOAD_COMPLETE = "download"


def get_download_complete_callback(root_url: str = None, job_id: str = None):
"""
Get a url or path for a service callback to communicate that a download is complete.
root_url - prepend the path with the given root url.
job_id - suffix the path with a job ID.
"""
cb = [root_url] if root_url else []
cb += [_CALLBACK, _DOWNLOAD_COMPLETE]
cb += [job_id] if job_id else []
return "/".join(cb)
15 changes: 14 additions & 1 deletion cdmtaskservice/job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"""

import uuid
from typing import Any

from cdmtaskservice import kb_auth
from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.image_remote_lookup import parse_image_name
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.s3.client import S3Client
Expand All @@ -18,13 +20,23 @@ class JobSubmit:
A manager for submitting CDM jobs.
"""

def __init__(self, mongo: MongoDAO, s3client: S3Client):
def __init__(
self,
mongo: MongoDAO,
s3client: S3Client,
coro_manager: CoroutineWrangler,
job_runners: dict[models.Cluster, Any], # Make abstract class if necessary
):
"""
mongo - a MongoDB DAO object.
s3Client - an S3Client pointed at the S3 storage system to use.
coro_manager - a coroutine manager.
job_runners - a mapping of remote compute cluster to the job runner for that cluster.
"""
self._s3 = _not_falsy(s3client, "s3client")
self._mongo = _not_falsy(mongo, "mongo")
self._coman = _not_falsy(coro_manager, "coro_manager")
self._runners = _not_falsy(job_runners, "job_runners")

async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str:
"""
Expand Down Expand Up @@ -76,6 +88,7 @@ async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> s
)
# TDDO JOBSUBMIT if reference data is required, is it staged?
await self._mongo.save_job(job)
await self._coman.run_coroutine(self._runners[job.job_input.cluster].start_job(job))
return job_id


Expand Down
47 changes: 47 additions & 0 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
Manages running jobs at NERSC using the JAWS system.
"""

import logging

from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.job_state import JobState
from cdmtaskservice.nersc.manager import NERSCManager
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.callback_url_paths import get_download_complete_callback

# Not sure how other flows would work and how much code they might share. For now just make
# this work and pull it apart / refactor later.
Expand Down Expand Up @@ -50,3 +55,45 @@ def __init__(
self._jtoken = _require_string(jaws_token, "jaws_token")
self._jgroup = _require_string(jaws_group, "jaws_group")
self._callback_root = _require_string(service_root_url, "service_root_url")

async def start_job(self, job: models.Job):
"""
Start running a job. It is expected that the Job has been persisted to the data
storage system and is in the created state. It is further assumed that the input files
are of the S3File type, not bare strings.
"""
if _not_falsy(job, "job").state != models.JobState.CREATED:
raise ValueError("job must be in the created state")
logr = logging.getLogger(__name__)
# TODO PERF this validates the file paths yet again. Maybe the way to go is just have
# a validate method on S3Paths which can be called or not as needed, with
# a validated state boolean
paths = S3Paths([p.file for p in job.job_input.input_files])
try:
# This is making an S3 call again after the call in job_submit. If that's too expensive
# pass in the S3meta as well
# TODO PERF config / set concurrency
# TODO NOW pass this in to avoid race conditions w/ etags
meta = await self._s3.get_object_meta(paths)
# TODO RELIABILITY config / set expiration time
presigned = await self._s3ext.presign_get_urls(paths)
callback_url = get_download_complete_callback(self._callback_root, job.id)
# TODO PERF config / set concurrency
# TODO PERF CACHING if the same files are used for a different job they're d/ld again
# Instead d/l to a shared file location by etag or something
# Need to ensure atomic writes otherwise 2 processes trying to
# d/l the same file could corrupt it
# Either make cache in JAWS staging area, in which case files
# will be deleted automatically by JAWS, or need own file deletion
# TODO DISKSPACE will need to clean up job downloads @ NERSC
# TODO LOGGING make the remote code log summary of results and upload and store
# TODO NOW how get remote paths at next step?
# TODO NOW store task IDs
task_id = await self._nman.download_s3_files(
job.id, meta, presigned, callback_url, insecure_ssl=self._s3insecure
)
except Exception as e:
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error starting download for job {job.id}")
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e
15 changes: 10 additions & 5 deletions cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

# TODO CLEANUP clean up old code versions @NERSC somehow. Not particularly important

_DT_TARGET = Machine.dtns
# TDOO NERSCUIPDATE delete the following line when DTN downloads work.
# See https://nersc.servicenowservices.com/sp?sys_id=ad33e85f1b5a5610ac81a820f54bcba0&view=sp&id=ticket&table=incident
_DT_TARGET = Machine.perlmutter

_COMMAND_PATH = "utilities/command"

_MIN_TIMEOUT_SEC = 300
Expand Down Expand Up @@ -125,7 +130,7 @@ async def _setup_remote_code(self):
# TODO RELIABILITY atomically write files. For these small ones probably doesn't matter?
cli = self._client_provider()
perlmutter = await cli.compute(Machine.perlmutter)
dtns = await cli.compute(Machine.dtns)
dt = await cli.compute(_DT_TARGET)
async with asyncio.TaskGroup() as tg:
for mod in _CTS_DEPENDENCIES:
target = self._nersc_code_path
Expand All @@ -142,7 +147,7 @@ async def _setup_remote_code(self):
bio=io.BytesIO(_PROCESS_DATA_XFER_MANIFEST.encode()),
make_exe=True,
))
res = tg.create_task(dtns.run('bash -c "echo $SCRATCH"'))
res = tg.create_task(dt.run('bash -c "echo $SCRATCH"'))
if _PIP_DEPENDENCIES:
deps = " ".join(
# may need to do something else if module doesn't have __version__
Expand Down Expand Up @@ -252,9 +257,9 @@ async def _process_manifest(
):
path = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job_id / filename
cli = self._client_provider()
dtn = await cli.compute(Machine.dtns)
dt = await cli.compute(_DT_TARGET)
# TODO CLEANUP manifests after some period of time
await self._upload_file_to_nersc(dtn, path, bio=manifest)
await self._upload_file_to_nersc(dt, path, bio=manifest)
command = (
"bash -c '"
+ f"export CTS_CODE_LOCATION={self._nersc_code_path}; "
Expand All @@ -264,7 +269,7 @@ async def _process_manifest(
+ f'"$CTS_CODE_LOCATION"/{_PROCESS_DATA_XFER_MANIFEST_FILENAME}'
+ "'"
)
task_id = (await self._run_command(cli, Machine.dtns, command))["task_id"]
task_id = (await self._run_command(cli, _DT_TARGET, command))["task_id"]
# TODO LOGGING figure out how to handle logging, see other logging todos
logging.getLogger(__name__).info(
f"Created {task_type} task with id {task_id} for job {job_id}")
Expand Down
5 changes: 4 additions & 1 deletion cdmtaskservice/nersc/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ def process_data_transfer_manifest(manifest_file_path: str, callback_url: str):
# TODO TEST add tests for this and its dependency functions, including logging
# Potential performance improvement could include a shared cross job cache for files
# only useful if jobs are reusing the same files, whcih seems def possible
# TODO LOGGINNG logging doesn't work
# TODO IMPORTANT ERRORHANDLING write an output file that can be read by the CTS
# sfapi tasks only last for 10 minutes after completions
log = logging.getLogger(__name__)
try:
with open(manifest_file_path) as f:
manifest = json.load(f)
asyncio.run(s3_pdtm(manifest["file-transfers"]))
finally:
log.info(f"pinging callback url {callback_url}")
log.info(f"Pinging callback url {callback_url}")
ret = requests.get(callback_url)
if ret.status_code < 200 or ret.status_code > 299:
log.error(ret.text)
Expand Down
16 changes: 14 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ services:
dockerfile: Dockerfile
ports:
- 5000:5000
#extra_hosts:
# TODO DOCKERUPGRADE after upgrading docker switch from host mode
# See https://stackoverflow.com/a/43541732/643675
#- "host.docker.internal:host-gateway"
network_mode: host
depends_on:
- mongodb
- minio
Expand All @@ -25,12 +30,19 @@ services:
# Don't commit your token to github please
- KBCTS_JAWS_TOKEN=tokengoeshere
- KBCTS_JAWS_GROUP=kbase
- KBCTS_S3_URL=http://minio:9000
# TODO DOCKERUPGRADE after upgrading docker switch to host.docker.internal
- KBCTS_S3_URL=http://localhost:9002
#- KBCTS_S3_URL=http://host.docker.internal:9002
# local Minio, only useful if not running jobs on NERSC
#- KBCTS_S3_URL=http://minio:9000
- KBCTS_S3_EXTERNAL_URL=https://ci.berkeley.kbase.us:9000
- KBCTS_VERIFY_S3_EXTERNAL_URL=false
- KBCTS_S3_ACCESS_KEY=miniouser
- KBCTS_S3_ACCESS_SECRET=miniopassword
- KBCTS_MONGO_HOST=mongodb://mongodb:27017
- KBCTS_S3_ALLOW_INSECURE=true
# TODO DOCKERUPGRADE switch back when host mode no longer needed
- KBCTS_MONGO_HOST=mongodb://localhost:27017
#- KBCTS_MONGO_HOST=mongodb://mongodb:27017
- KBCTS_MONGO_DB=cdmtaskservice
# TODO MONGOAUTH need to add a user to the cmdtaskservice db before this works out
# of the box.
Expand Down
7 changes: 7 additions & 0 deletions test/callback_url_paths_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO TEST add tests

from cdmtaskservice import callback_url_paths # @UnusedImport


def test_noop():
pass

0 comments on commit 4cacd35

Please sign in to comment.