Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download files to NERSC on job submission #128

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cdmtaskservice/app_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from fastapi import FastAPI, Request
from cdmtaskservice import models
from cdmtaskservice.config import CDMTaskServiceConfig
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.image_remote_lookup import DockerImageInfo
from cdmtaskservice.images import Images
from cdmtaskservice.jobflows.nersc_jaws import NERSCJAWSRunner
Expand Down Expand Up @@ -57,6 +58,7 @@
# This method is getting pretty long but it's stupid simple so...
# May want to parallelize some of this for faster startups. would need to rework prints
logr = logging.getLogger(__name__)
coman = await CoroutineWrangler.create()

Check warning on line 61 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L61

Added line #L61 was not covered by tests
logr.info("Connecting to KBase auth service... ")
auth = await KBaseAuth.create(
cfg.auth_url,
Expand Down Expand Up @@ -102,7 +104,7 @@
runners = {models.Cluster.PERLMUTTER_JAWS: nerscjawsflow}
imginfo = await DockerImageInfo.create(Path(cfg.crane_path).expanduser().absolute())
images = Images(mongodao, imginfo)
job_submit = JobSubmit(mongodao, s3)
job_submit = JobSubmit(mongodao, s3, coman, runners)

Check warning on line 107 in cdmtaskservice/app_state.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/app_state.py#L107

Added line #L107 was not covered by tests
app.state._mongo = mongocli
app.state._cdmstate = AppState(
auth, sfapi_client, s3, job_submit, job_state, images, runners
Expand Down
20 changes: 20 additions & 0 deletions cdmtaskservice/callback_url_paths.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
A module for determining paths for callback URLs for the service.
"""


_CALLBACK = "callback"
_DOWNLOAD_COMPLETE = "download"


def get_download_complete_callback(root_url: str = None, job_id: str = None):
"""
Get a url or path for a service callback to communicate that a download is complete.

root_url - prepend the path with the given root url.
job_id - suffix the path with a job ID.
"""
cb = [root_url] if root_url else []
cb += [_CALLBACK, _DOWNLOAD_COMPLETE]
cb += [job_id] if job_id else []
return "/".join(cb)

Check warning on line 20 in cdmtaskservice/callback_url_paths.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/callback_url_paths.py#L17-L20

Added lines #L17 - L20 were not covered by tests
15 changes: 14 additions & 1 deletion cdmtaskservice/job_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"""

import uuid
from typing import Any

from cdmtaskservice import kb_auth
from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy
from cdmtaskservice.coroutine_manager import CoroutineWrangler
from cdmtaskservice.image_remote_lookup import parse_image_name
from cdmtaskservice.mongo import MongoDAO
from cdmtaskservice.s3.client import S3Client
Expand All @@ -18,13 +20,23 @@
A manager for submitting CDM jobs.
"""

def __init__(self, mongo: MongoDAO, s3client: S3Client):
def __init__(
self,
mongo: MongoDAO,
s3client: S3Client,
coro_manager: CoroutineWrangler,
job_runners: dict[models.Cluster, Any], # Make abstract class if necessary
):
"""
mongo - a MongoDB DAO object.
s3Client - an S3Client pointed at the S3 storage system to use.
coro_manager - a coroutine manager.
job_runners - a mapping of remote compute cluster to the job runner for that cluster.
"""
self._s3 = _not_falsy(s3client, "s3client")
self._mongo = _not_falsy(mongo, "mongo")
self._coman = _not_falsy(coro_manager, "coro_manager")
self._runners = _not_falsy(job_runners, "job_runners")

Check warning on line 39 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L38-L39

Added lines #L38 - L39 were not covered by tests

async def submit(self, job_input: models.JobInput, user: kb_auth.KBaseUser) -> str:
"""
Expand Down Expand Up @@ -76,6 +88,7 @@
)
# TDDO JOBSUBMIT if reference data is required, is it staged?
await self._mongo.save_job(job)
await self._coman.run_coroutine(self._runners[job.job_input.cluster].start_job(job))

Check warning on line 91 in cdmtaskservice/job_submit.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/job_submit.py#L91

Added line #L91 was not covered by tests
return job_id


Expand Down
47 changes: 47 additions & 0 deletions cdmtaskservice/jobflows/nersc_jaws.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
Manages running jobs at NERSC using the JAWS system.
"""

import logging

from cdmtaskservice import models
from cdmtaskservice.arg_checkers import not_falsy as _not_falsy, require_string as _require_string
from cdmtaskservice.job_state import JobState
from cdmtaskservice.nersc.manager import NERSCManager
from cdmtaskservice.s3.client import S3Client
from cdmtaskservice.s3.paths import S3Paths
from cdmtaskservice.callback_url_paths import get_download_complete_callback

# Not sure how other flows would work and how much code they might share. For now just make
# this work and pull it apart / refactor later.
Expand Down Expand Up @@ -50,3 +55,45 @@
self._jtoken = _require_string(jaws_token, "jaws_token")
self._jgroup = _require_string(jaws_group, "jaws_group")
self._callback_root = _require_string(service_root_url, "service_root_url")

async def start_job(self, job: models.Job):
"""
Start running a job. It is expected that the Job has been persisted to the data
storage system and is in the created state. It is further assumed that the input files
are of the S3File type, not bare strings.
"""
if _not_falsy(job, "job").state != models.JobState.CREATED:
raise ValueError("job must be in the created state")
logr = logging.getLogger(__name__)

Check warning on line 67 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L65-L67

Added lines #L65 - L67 were not covered by tests
# TODO PERF this validates the file paths yet again. Maybe the way to go is just have
# a validate method on S3Paths which can be called or not as needed, with
# a validated state boolean
paths = S3Paths([p.file for p in job.job_input.input_files])
try:

Check warning on line 72 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L71-L72

Added lines #L71 - L72 were not covered by tests
# This is making an S3 call again after the call in job_submit. If that's too expensive
# pass in the S3meta as well
# TODO PERF config / set concurrency
# TODO NOW pass this in to avoid race conditions w/ etags
meta = await self._s3.get_object_meta(paths)

Check warning on line 77 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L77

Added line #L77 was not covered by tests
# TODO RELIABILITY config / set expiration time
presigned = await self._s3ext.presign_get_urls(paths)
callback_url = get_download_complete_callback(self._callback_root, job.id)

Check warning on line 80 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L79-L80

Added lines #L79 - L80 were not covered by tests
# TODO PERF config / set concurrency
# TODO PERF CACHING if the same files are used for a different job they're d/ld again
# Instead d/l to a shared file location by etag or something
# Need to ensure atomic writes otherwise 2 processes trying to
# d/l the same file could corrupt it
# Either make cache in JAWS staging area, in which case files
# will be deleted automatically by JAWS, or need own file deletion
# TODO DISKSPACE will need to clean up job downloads @ NERSC
# TODO LOGGING make the remote code log summary of results and upload and store
# TODO NOW how get remote paths at next step?
# TODO NOW store task IDs
task_id = await self._nman.download_s3_files(

Check warning on line 92 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L92

Added line #L92 was not covered by tests
job.id, meta, presigned, callback_url, insecure_ssl=self._s3insecure
)
except Exception as e:

Check warning on line 95 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L95

Added line #L95 was not covered by tests
# TODO LOGGING figure out how logging it going to work etc.
logr.exception(f"Error starting download for job {job.id}")

Check warning on line 97 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L97

Added line #L97 was not covered by tests
# TODO IMPORTANT ERRORHANDLING update job state to ERROR w/ message and don't raise
raise e

Check warning on line 99 in cdmtaskservice/jobflows/nersc_jaws.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/jobflows/nersc_jaws.py#L99

Added line #L99 was not covered by tests
15 changes: 10 additions & 5 deletions cdmtaskservice/nersc/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

# TODO CLEANUP clean up old code versions @NERSC somehow. Not particularly important

_DT_TARGET = Machine.dtns
# TDOO NERSCUIPDATE delete the following line when DTN downloads work.
# See https://nersc.servicenowservices.com/sp?sys_id=ad33e85f1b5a5610ac81a820f54bcba0&view=sp&id=ticket&table=incident
_DT_TARGET = Machine.perlmutter

_COMMAND_PATH = "utilities/command"

_MIN_TIMEOUT_SEC = 300
Expand Down Expand Up @@ -125,7 +130,7 @@
# TODO RELIABILITY atomically write files. For these small ones probably doesn't matter?
cli = self._client_provider()
perlmutter = await cli.compute(Machine.perlmutter)
dtns = await cli.compute(Machine.dtns)
dt = await cli.compute(_DT_TARGET)

Check warning on line 133 in cdmtaskservice/nersc/manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/nersc/manager.py#L133

Added line #L133 was not covered by tests
async with asyncio.TaskGroup() as tg:
for mod in _CTS_DEPENDENCIES:
target = self._nersc_code_path
Expand All @@ -142,7 +147,7 @@
bio=io.BytesIO(_PROCESS_DATA_XFER_MANIFEST.encode()),
make_exe=True,
))
res = tg.create_task(dtns.run('bash -c "echo $SCRATCH"'))
res = tg.create_task(dt.run('bash -c "echo $SCRATCH"'))

Check warning on line 150 in cdmtaskservice/nersc/manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/nersc/manager.py#L150

Added line #L150 was not covered by tests
if _PIP_DEPENDENCIES:
deps = " ".join(
# may need to do something else if module doesn't have __version__
Expand Down Expand Up @@ -252,9 +257,9 @@
):
path = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job_id / filename
cli = self._client_provider()
dtn = await cli.compute(Machine.dtns)
dt = await cli.compute(_DT_TARGET)

Check warning on line 260 in cdmtaskservice/nersc/manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/nersc/manager.py#L260

Added line #L260 was not covered by tests
# TODO CLEANUP manifests after some period of time
await self._upload_file_to_nersc(dtn, path, bio=manifest)
await self._upload_file_to_nersc(dt, path, bio=manifest)

Check warning on line 262 in cdmtaskservice/nersc/manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/nersc/manager.py#L262

Added line #L262 was not covered by tests
command = (
"bash -c '"
+ f"export CTS_CODE_LOCATION={self._nersc_code_path}; "
Expand All @@ -264,7 +269,7 @@
+ f'"$CTS_CODE_LOCATION"/{_PROCESS_DATA_XFER_MANIFEST_FILENAME}'
+ "'"
)
task_id = (await self._run_command(cli, Machine.dtns, command))["task_id"]
task_id = (await self._run_command(cli, _DT_TARGET, command))["task_id"]

Check warning on line 272 in cdmtaskservice/nersc/manager.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/nersc/manager.py#L272

Added line #L272 was not covered by tests
# TODO LOGGING figure out how to handle logging, see other logging todos
logging.getLogger(__name__).info(
f"Created {task_type} task with id {task_id} for job {job_id}")
Expand Down
5 changes: 4 additions & 1 deletion cdmtaskservice/nersc/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
# TODO TEST add tests for this and its dependency functions, including logging
# Potential performance improvement could include a shared cross job cache for files
# only useful if jobs are reusing the same files, whcih seems def possible
# TODO LOGGINNG logging doesn't work
# TODO IMPORTANT ERRORHANDLING write an output file that can be read by the CTS
# sfapi tasks only last for 10 minutes after completions
log = logging.getLogger(__name__)
try:
with open(manifest_file_path) as f:
manifest = json.load(f)
asyncio.run(s3_pdtm(manifest["file-transfers"]))
finally:
log.info(f"pinging callback url {callback_url}")
log.info(f"Pinging callback url {callback_url}")

Check warning on line 42 in cdmtaskservice/nersc/remote.py

View check run for this annotation

Codecov / codecov/patch

cdmtaskservice/nersc/remote.py#L42

Added line #L42 was not covered by tests
ret = requests.get(callback_url)
if ret.status_code < 200 or ret.status_code > 299:
log.error(ret.text)
Expand Down
16 changes: 14 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ services:
dockerfile: Dockerfile
ports:
- 5000:5000
#extra_hosts:
# TODO DOCKERUPGRADE after upgrading docker switch from host mode
# See https://stackoverflow.com/a/43541732/643675
#- "host.docker.internal:host-gateway"
network_mode: host
depends_on:
- mongodb
- minio
Expand All @@ -25,12 +30,19 @@ services:
# Don't commit your token to github please
- KBCTS_JAWS_TOKEN=tokengoeshere
- KBCTS_JAWS_GROUP=kbase
- KBCTS_S3_URL=http://minio:9000
# TODO DOCKERUPGRADE after upgrading docker switch to host.docker.internal
- KBCTS_S3_URL=http://localhost:9002
#- KBCTS_S3_URL=http://host.docker.internal:9002
# local Minio, only useful if not running jobs on NERSC
#- KBCTS_S3_URL=http://minio:9000
- KBCTS_S3_EXTERNAL_URL=https://ci.berkeley.kbase.us:9000
- KBCTS_VERIFY_S3_EXTERNAL_URL=false
- KBCTS_S3_ACCESS_KEY=miniouser
- KBCTS_S3_ACCESS_SECRET=miniopassword
- KBCTS_MONGO_HOST=mongodb://mongodb:27017
- KBCTS_S3_ALLOW_INSECURE=true
# TODO DOCKERUPGRADE switch back when host mode no longer needed
- KBCTS_MONGO_HOST=mongodb://localhost:27017
#- KBCTS_MONGO_HOST=mongodb://mongodb:27017
- KBCTS_MONGO_DB=cdmtaskservice
# TODO MONGOAUTH need to add a user to the cmdtaskservice db before this works out
# of the box.
Expand Down
7 changes: 7 additions & 0 deletions test/callback_url_paths_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# TODO TEST add tests

from cdmtaskservice import callback_url_paths # @UnusedImport


def test_noop():
pass
Loading