-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Load manifest, wdl, and input.json files to NERSC #140
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -384,22 +384,48 @@ | |||||||||
"sec-per-GB": _SEC_PER_GB, | ||||||||||
} | ||||||||||
|
||||||||||
async def run_JAWS(self, job: models.Job) -> str: | ||||||||||
async def run_JAWS(self, job: models.Job, file_download_concurrency: int = 10) -> str: | ||||||||||
""" | ||||||||||
Run a JAWS job at NERSC and return the job ID. | ||||||||||
|
||||||||||
job - the job to process | ||||||||||
file_download_concurrency - the number of files at one time to download to NERSC. | ||||||||||
""" | ||||||||||
_check_int(file_download_concurrency, "file_download_concurrency") | ||||||||||
if not _not_falsy(job, "job").job_input.inputs_are_S3File(): | ||||||||||
raise ValueError("Job files must be S3File objects") | ||||||||||
manifest_files = generate_manifest_files(job) | ||||||||||
manifest_file_paths = self._get_manifest_file_paths(job.id, len(manifest_files)) | ||||||||||
fmap = {m: self._localize_s3_path(job.id, m.file) for m in job.job_input.input_files} | ||||||||||
wdljson = wdl.generate_wdl(job, fmap, manifest_file_paths) | ||||||||||
# TODO REMOVE these lines | ||||||||||
logr = logging.getLogger(__name__) | ||||||||||
for m in manifest_files: | ||||||||||
logr.info("***") | ||||||||||
logr.info(m) | ||||||||||
logr.info(f"*** wdl:\n{wdljson.wdl}\njson:\n{json.dumps(wdljson.input_json, indent=4)}") | ||||||||||
uploads = {fp: f for fp, f in zip(manifest_file_paths, manifest_files)} | ||||||||||
pre = self._dtn_scratch / _CTS_SCRATCH_ROOT_DIR / job.id | ||||||||||
wdlpath = pre / "input.wdl" | ||||||||||
jsonpath = pre / "input.json" | ||||||||||
uploads[wdlpath] = wdljson.wdl | ||||||||||
uploads[jsonpath] = json.dumps(wdljson.input_json, indent=4) | ||||||||||
cli = self._client_provider() | ||||||||||
dt = await cli.compute(_DT_TARGET) | ||||||||||
semaphore = asyncio.Semaphore(file_download_concurrency) | ||||||||||
async def sem_coro(coro): | ||||||||||
async with semaphore: | ||||||||||
return await coro | ||||||||||
coros = [] | ||||||||||
try: | ||||||||||
async with asyncio.TaskGroup() as tg: | ||||||||||
for path, file in uploads.items(): | ||||||||||
coros.append(self._upload_file_to_nersc( | ||||||||||
dt, path, bio=io.BytesIO(file.encode()) | ||||||||||
)) | ||||||||||
tg.create_task(sem_coro(coros[-1])) | ||||||||||
except ExceptionGroup as eg: | ||||||||||
e = eg.exceptions[0] # just pick one, essentially at random | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
wondering if we should log all exceptions here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as above |
||||||||||
raise e from eg | ||||||||||
finally: | ||||||||||
# otherwise you can get coroutine never awaited warnings if a failure occurs | ||||||||||
for c in coros: | ||||||||||
c.close() | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't sure why we need to manually call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I copied this setup from the remote code so it's been a while since I messed with it, but from what I recall I got warnings in the logs about unclosed or unawaited coroutines when I didn't close them manually. IIRC when a coroutine fails any remaining coroutines that haven't started running aren't run |
||||||||||
# TODO NEXT run jaws job | ||||||||||
return "fake_job_id" | ||||||||||
|
||||||||||
def _get_manifest_file_paths(self, job_id: str, count: int) -> list[Path]: | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering if we should log all exceptions here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me this seems like overkill for a couple reasons