Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DATAUP-544 API Defined #420

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions execution_engine2.spec
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
/* A job id. */
typedef string job_id;

/* A job state's job status. */
typedef string job_status;

/*
A structure representing the Execution Engine status
git_commit - the Git hash of the version of the module.
Expand Down Expand Up @@ -229,6 +232,20 @@
boolean as_admin;
} BulkRetryParams;


/*
batch_job_id: BATCH_ID to retry
status_list: job states in ['terminated', 'error'] (valid retry states)
as_admin: retry someone else's job in your namespace
#TODO: Possibly Add list<JobRequirements> job_requirements;
*/
typedef structure {
job_id batch_job_id;
list<job_status> status_list;
boolean as_admin;
} BatchRetryParams;


/*
#TODO write retry parent tests to ensure BOTH the parent_job_id is present, and retry_job_id is present
#TODO Add retry child that checks the status of the child? to prevent multiple retries
Expand All @@ -246,14 +263,17 @@
*/
funcdef retry_jobs(BulkRetryParams params) returns (list<RetryResult> retry_result) authentication required;


/*
Retry a job based on a batch id with a job_state status list ['error', 'terminated']
Requires the user to keep track of the job states of the Status enum in the ee2 models file
If no status_list is provided, an exception is thrown.
*/
funcdef retry_batch_jobs_by_status(BatchRetryParams params) returns (list<RetryResult> retry_result) authentication required;


funcdef abandon_children(AbandonChildren params)
returns (BatchSubmission parent_and_child_ids) authentication required;



/* EE2Constants Concierge Params are
request_cpus: int
request_memory: int in MB
Expand Down Expand Up @@ -585,6 +605,29 @@
*/
funcdef cancel_job(CancelJobParams params) returns () authentication required;


/*
batch_job_id: BATCH_ID to cancel
status_list: required filter of one or more of [created, estimating, queued, or running]
terminated_code: optional terminated code, default to terminated by user
as_admin: retry someone else's job in your namespace
@optional terminated_code
*/
typedef structure {
job_id batch_job_id;
list<job_status> status_list;
int terminated_code;
boolean as_admin;
} BatchCancelParams;

/*
Cancels children of a batch job. This results in the status becoming "terminated" with termination_code 0.
Valid statuses are ['created', 'estimating', 'queued', 'running']
(Requires the user to keep track of the job states of the Status enum in the ee2 models file)
If no status_list is provided, an exception is thrown.
*/
funcdef cancel_batch_jobs_by_status(BatchCancelParams params) returns (list<job_id> job_ids) authentication required;

Comment on lines +629 to +630
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if no jobs are in the appropriate states, the endpoint returns [] -- is that correct?

/*
job_id - id of job running method
finished - indicates whether job is done (including error/cancel cases) or not
Expand Down
48 changes: 45 additions & 3 deletions lib/execution_engine2/db/MongoUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import traceback
from contextlib import contextmanager
from datetime import datetime
from typing import Dict, List
from typing import Dict, List, Union

from bson.objectid import ObjectId
from mongoengine import connect, connection
from pymongo import MongoClient, UpdateOne
Expand All @@ -15,9 +16,8 @@
RecordNotFoundException,
InvalidStatusTransitionException,
)

from lib.execution_engine2.utils.arg_processing import parse_bool
from execution_engine2.sdk.EE2Runjob import JobIdPair
from lib.execution_engine2.utils.arg_processing import parse_bool


class MongoUtil:
Expand Down Expand Up @@ -223,6 +223,48 @@ def get_job(self, job_id=None, exclude_fields=None) -> Job:

return job

def get_jobs_with_status(
self,
job_ids: List[str],
status_list: List[str],
only_job_ids: bool = False,
retried_jobs_allowed=True,
) -> Union[List[Job], List[str]]:
if not (job_ids and isinstance(job_ids, list)):
raise ValueError("Please provide a non empty list of job ids")

if not (status_list and isinstance(job_ids, list)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should that second one be isinstance(status_list, list)?

It would be good to put in some tests for this so you can check what is and isn't caught by these conditionals vs by the type checking in the function definition. It's sometimes easier to have arguments that should be a list default to None so that you don't have to check the length of the list.

raise ValueError("Please provide a non empty list of job statuses")

with self.mongo_engine_connection():
# TODO: Only seems to be returning other fields as well. Is .only() broken?
if retried_jobs_allowed:
jobs = Job.objects(id__in=job_ids, status__in=status_list)
else:
jobs = Job.objects(
id__in=job_ids, status__in=status_list, retry_parent__exists=False
)
Comment on lines +244 to +246
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice! is it going to be possible to query ee2 for jobs by cell_id at some point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tried it but there's a check_job_date_range_by_user function that might let you filter on that field.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought it might make more sense to change check_jobs_wsid() to have a filter for specific fields


if only_job_ids:
return [str(job.id) for job in jobs]
return jobs

def eligible_for_retry(self, job: Job):
"""
Checks the job record to see if it has any retry_ids,
and if those retry_ids do not contain an ineligble job state
:param job: Should be a child job of a BATCH job
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should that check be enforced in the code here? Should be relatively simple just to add in a check for whether the batch_id field is present and the batch_job field is false.

"""

if not job.retry_ids:
return True
valid_statuses = [Status.terminated.value, Status.error.value]
jobs = self.get_jobs(job_ids=job.retry_ids)
for job in jobs:
if job.status not in valid_statuses:
return False
return True

def get_jobs(
self, job_ids=None, exclude_fields=None, sort_id_ascending=None
) -> List[Job]:
Expand Down
4 changes: 4 additions & 0 deletions lib/execution_engine2/db/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ class Status(Enum):
A job begins at created, then can either be estimating
"""

@classmethod
def get_status_names(cls):
return list(map(lambda x: x.value, cls._member_map_.values()))

created = "created"
estimating = "estimating"
queued = "queued"
Expand Down
12 changes: 12 additions & 0 deletions lib/execution_engine2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@ def __init__(self, msg=None, *args, **kwargs):
super().__init__(msg or self.__doc__, *args, **kwargs)


class InvalidStatusListException(ExecutionEngineValueError):
"""Invalid job status provided"""


class BatchTerminationException(ExecutionEngineException):
"""No jobs to terminate"""


class IncorrectParamsException(ExecutionEngineValueError):
"""Wrong parameters were provided"""


class NotBatchJobException(ExecutionEngineValueError):
"""Requested job is not a batch job"""


class InvalidParameterForBatch(ExecutionEngineValueError):
"""Workspace ids are not allowed in RunJobParams in Batch Mode"""

Expand Down
Loading