From ec55787ecf32da1aa27ee460208dd2626d119f89 Mon Sep 17 00:00:00 2001 From: Alex Kanitz Date: Fri, 1 Nov 2019 17:39:31 +0100 Subject: [PATCH] specs: use v1.0.0 specs --- requirements.txt | 10 +- ...-0.workflow_execution_service.swagger.yaml | 616 ++++++++++++++++++ wes_elixir/api/controllers.py | 47 ++ wes_elixir/api/register_openapi.py | 11 +- .../api/schema.stdout_stderr.openapi.yaml | 163 +++++ wes_elixir/config/app_config.yaml | 13 +- wes_elixir/database/db_utils.py | 23 +- wes_elixir/ga4gh/wes/endpoints/get_run_log.py | 2 + ... (oldirty's conflicted copy 2020-01-06).py | 515 +++++++++++++++ wes_elixir/tasks/celery_task_monitor.py | 199 ++++-- wes_elixir/tasks/register_celery.py | 13 + 11 files changed, 1550 insertions(+), 62 deletions(-) create mode 100644 wes_elixir/api/20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml create mode 100644 wes_elixir/api/controllers.py create mode 100755 wes_elixir/api/schema.stdout_stderr.openapi.yaml create mode 100644 wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py diff --git a/requirements.txt b/requirements.txt index 9a49eac..701a55c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,12 +13,12 @@ cffi==1.11.5 chardet==3.0.4 click==6.7 clickclick==1.2.2 -connexion==1.5.2 +connexion==2.4.0 cryptography==2.3.1 --e git+https://github.com/ohsu-comp-bio/cwl-tes.git@62840435c5b22ac7b3ad1724047d811f72dd372d#egg=cwl-tes +-e git+https://github.com/uniqueg/cwl-tes-temp.git@e37090e00f2573e84becec30f0ade9c5003820b3#egg=cwl_tes cwltool==1.0.20181217162649 decorator==4.3.0 -Flask==1.0.2 +Flask==1.1.1 Flask-Cors==3.0.6 Flask-PyMongo==2.1.0 future==0.16.0 @@ -40,6 +40,7 @@ mccabe==0.6.1 mistune==0.8.4 mypy-extensions==0.4.1 networkx==2.2 +openapi-spec-validator==0.2.8 prov==1.5.1 psutil==5.4.7 py-tes==0.3.0 @@ -50,7 +51,7 @@ pymongo==3.7.1 pyparsing==2.2.1 python-dateutil==2.6.1 pytz==2018.5 -PyYAML==4.2b1 +PyYAML==5.1.2 rdflib==4.2.2 rdflib-jsonld==0.4.0 requests==2.20.0 @@ -61,6 +62,7 @@ shellescape==3.4.1 six==1.11.0 subprocess32==3.5.2 swagger-spec-validator==2.3.1 +swagger-ui-bundle==0.0.6 typed-ast==1.1.0 typing==3.6.6 typing-extensions==3.7.4 diff --git a/wes_elixir/api/20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml b/wes_elixir/api/20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml new file mode 100644 index 0000000..e2b686b --- /dev/null +++ b/wes_elixir/api/20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml @@ -0,0 +1,616 @@ +basePath: '/ga4gh/wes/v1' +swagger: '2.0' +info: + title: Workflow Execution Service + version: 1.0.0 +schemes: + - https +consumes: + - application/json +produces: + - application/json +paths: + /service-info: + get: + summary: Get information about Workflow Execution Service. + description: |- + May include information related (but not limited to) the workflow descriptor formats, versions supported, the WES API versions supported, and information about general service availability. + x-swagger-router-controller: ga4gh.wes.server + operationId: GetServiceInfo + responses: + '200': + description: '' + schema: + $ref: '#/definitions/ServiceInfo' + '400': + description: The request is malformed. + schema: + $ref: '#/definitions/ErrorResponse' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + tags: + - WorkflowExecutionService + /runs: + get: + summary: List the workflow runs. + description: >- + This list should be provided in a stable ordering. + (The actual ordering is implementation dependent.) + When paging through the list, the client should + not make assumptions about live updates, but should assume the + contents of the list reflect the workflow list at the moment + that the first page is requested. To monitor a specific + workflow run, use GetRunStatus or GetRunLog. + x-swagger-router-controller: ga4gh.wes.server + operationId: ListRuns + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunListResponse' + '400': + description: The request is malformed. + schema: + $ref: '#/definitions/ErrorResponse' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: page_size + description: >- + OPTIONAL + The preferred number of workflow runs to return in a page. + If not provided, the implementation should use a default page size. + The implementation must not return more items + than `page_size`, but it may return fewer. Clients should + not assume that if fewer than `page_size` items are + returned that all items have been returned. The + availability of additional pages is indicated by the value + of `next_page_token` in the response. + in: query + required: false + type: integer + format: int64 + - name: page_token + description: >- + OPTIONAL + Token to use to indicate where to start getting results. If unspecified, return the first + page of results. + in: query + required: false + type: string + tags: + - WorkflowExecutionService + post: + summary: Run a workflow. + description: >- + This endpoint creates a new workflow run and + returns a `RunId` to monitor its progress. + + + The `workflow_attachment` array may be used to upload files + that are required to execute the workflow, including the primary + workflow, tools imported by the workflow, other files + referenced by the workflow, or files which are part of the + input. The implementation should stage these files to a + temporary directory and execute the workflow from there. + These parts must have a Content-Disposition header with a + "filename" provided for each part. Filenames may include + subdirectories, but must not include references to parent + directories with '..' -- implementations should guard against + maliciously constructed filenames. + + + The `workflow_url` is either an absolute URL to a workflow + file that is accessible by the WES endpoint, or a relative URL + corresponding to one of the files attached using + `workflow_attachment`. + + + The `workflow_params` JSON object specifies input parameters, + such as input files. The exact format of the JSON object + depends on the conventions of the workflow language being + used. Input files should either be absolute URLs, or relative + URLs corresponding to files uploaded using + `workflow_attachment`. The WES endpoint must understand and + be able to access URLs supplied in the input. This is + implementation specific. + + + The `workflow_type` is the type of workflow language and + must be "CWL" or "WDL" currently (or another alternative + supported by this WES instance). + + + The `workflow_type_version` is the version of the workflow language + submitted and must be one supported by this WES instance. + + + See the `RunRequest` documentation for details about other fields. + x-swagger-router-controller: ga4gh.wes.server + operationId: RunWorkflow + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunId' + '400': + description: The request is malformed. + schema: + $ref: '#/definitions/ErrorResponse' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + consumes: + - multipart/form-data + parameters: + - in: formData + name: workflow_params + type: string + format: application/json + + - in: formData + name: workflow_type + type: string + + - in: formData + name: workflow_type_version + type: string + + - in: formData + name: tags + type: string + format: application/json + + - in: formData + name: workflow_engine_parameters + type: string + format: application/json + + - in: formData + name: workflow_url + type: string + + - in: formData + name: workflow_attachment + type: array + items: + type: string + format: binary + tags: + - WorkflowExecutionService + /runs/{run_id}: + get: + summary: Get detailed info about a workflow run. + description: >- + This endpoint provides detailed information about a given workflow run. + The returned result has information about the outputs produced by this workflow + (if available), a log object which allows the stderr and stdout to be retrieved, + a log array so stderr/stdout for individual tasks can be retrieved, + and the overall state of the workflow run (e.g. RUNNING, see the State section). + x-swagger-router-controller: ga4gh.wes.server + operationId: GetRunLog + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunLog' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run not found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + tags: + - WorkflowExecutionService + /runs/{run_id}/cancel: + post: + summary: Cancel a running workflow. + x-swagger-router-controller: ga4gh.wes.server + operationId: CancelRun + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunId' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run wasn't found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + tags: + - WorkflowExecutionService + /runs/{run_id}/status: + get: + summary: Get quick status info about a workflow run. + description: >- + This provides an abbreviated (and likely fast depending on implementation) + status of the running workflow, returning a simple result with the + overall state of the workflow run (e.g. RUNNING, see the State section). + x-swagger-router-controller: ga4gh.wes.server + operationId: GetRunStatus + responses: + '200': + description: '' + schema: + $ref: '#/definitions/RunStatus' + '401': + description: The request is unauthorized. + schema: + $ref: '#/definitions/ErrorResponse' + '404': + description: The requested workflow run wasn't found. + schema: + $ref: '#/definitions/ErrorResponse' + '403': + description: The requester is not authorized to perform this action. + schema: + $ref: '#/definitions/ErrorResponse' + '500': + description: An unexpected error occurred. + schema: + $ref: '#/definitions/ErrorResponse' + parameters: + - name: run_id + in: path + required: true + type: string + tags: + - WorkflowExecutionService +definitions: + DefaultWorkflowEngineParameter: + type: object + properties: + name: + type: string + description: The name of the parameter + type: + type: string + description: Describes the type of the parameter, e.g. float. + default_value: + type: string + description: The stringified version of the default parameter. e.g. "2.45". + description: >- + A message that allows one to describe default parameters for a workflow + engine. + Log: + type: object + properties: + name: + type: string + description: The task or workflow name + cmd: + type: array + items: + type: string + description: The command line that was executed + start_time: + type: string + description: When the command started executing, in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ" + end_time: + type: string + description: When the command stopped executing (completed, failed, or cancelled), in ISO 8601 format "%Y-%m-%dT%H:%M:%SZ" + stdout: + type: string + description: >- + A URL to retrieve standard output logs of the workflow run or + task. This URL may change between status requests, or may + not be available until the task or workflow has finished + execution. Should be available using the same credentials + used to access the WES endpoint. + stderr: + type: string + description: >- + A URL to retrieve standard error logs of the workflow run or + task. This URL may change between status requests, or may + not be available until the task or workflow has finished + execution. Should be available using the same credentials + used to access the WES endpoint. + exit_code: + type: integer + format: int32 + description: Exit code of the program + description: Log and other info + ServiceInfo: + type: object + properties: + workflow_type_versions: + type: object + additionalProperties: + $ref: '#/definitions/WorkflowTypeVersion' + description: >- + A map with keys as the workflow format type name (currently only CWL and WDL are used + although a service may support others) and value is a workflow_type_version object which + simply contains an array of one or more version strings + supported_wes_versions: + type: array + items: + type: string + description: The version(s) of the WES schema supported by this service + supported_filesystem_protocols: + type: array + items: + type: string + description: >- + The filesystem protocols supported by this service, currently these may include common + protocols using the terms 'http', 'https', 'sftp', 's3', 'gs', 'file', or 'synapse', but others + are possible and the terms beyond these core protocols are currently not fixed. + This section reports those protocols (either common or not) supported by this WES service. + workflow_engine_versions: + type: object + additionalProperties: + type: string + description: >- + The engine(s) used by this WES service, key is engine name (e.g. Cromwell) and value is version + default_workflow_engine_parameters: + type: array + items: + $ref: '#/definitions/DefaultWorkflowEngineParameter' + description: >- + Each workflow engine can present additional parameters that can be sent to the + workflow engine. This message will list the default values, and their types for each + workflow engine. + system_state_counts: + type: object + additionalProperties: + type: integer + format: int64 + description: >- + The system statistics, key is the statistic, value is the count of runs in that state. + See the State enum for the possible keys. + auth_instructions_url: + type: string + description: >- + A web page URL with human-readable instructions on how to get an + authorization token for use with a specific WES endpoint. + contact_info_url: + type: string + description: >- + An email address URL (mailto:) or web page URL with contact information + for the operator of a specific WES endpoint. Users of the + endpoint should use this to report problems or security + vulnerabilities. + tags: + type: object + additionalProperties: + type: string + description: >- + A key-value map of arbitrary, extended metadata outside the scope of the above but useful + to report back + description: >- + A message containing useful information about the running service, including supported versions and + default settings. + State: + type: string + enum: + - UNKNOWN + - QUEUED + - INITIALIZING + - RUNNING + - PAUSED + - COMPLETE + - EXECUTOR_ERROR + - SYSTEM_ERROR + - CANCELED + - CANCELING + default: UNKNOWN + description: >- + - UNKNOWN: The state of the task is unknown. + This provides a safe default for messages where this field is missing, + for example, so that a missing field does not accidentally imply that + the state is QUEUED. + + + - QUEUED: The task is queued. + + + - INITIALIZING: The task has been assigned to a worker and is currently preparing to run. + For example, the worker may be turning on, downloading input files, etc. + + + - RUNNING: The task is running. Input files are downloaded and the first Executor + has been started. + + + - PAUSED: The task is paused. + An implementation may have the ability to pause a task, but this is not required. + + + - COMPLETE: The task has completed running. Executors have exited without error + and output files have been successfully uploaded. + + + - EXECUTOR_ERROR: The task encountered an error in one of the Executor processes. Generally, + this means that an Executor exited with a non-zero exit code. + + + - SYSTEM_ERROR: The task was stopped due to a system error, but not from an Executor, + for example an upload failed due to network issues, the worker's ran out + of disk space, etc. + + + - CANCELED: The task was canceled by the user. + + + - CANCELING: The task was canceled by the user, and is in the process of stopping. + RunListResponse: + type: object + properties: + runs: + type: array + items: + $ref: '#/definitions/RunStatus' + description: >- + A list of workflow runs that the service has executed or is executing. + The list is filtered to only include runs that the caller has permission to see. + next_page_token: + type: string + description: >- + A token which may be supplied as `page_token` in workflow run list request to get the next page + of results. An empty string indicates there are no more items to return. + description: The service will return a RunListResponse when receiving a successful RunListRequest. + RunLog: + type: object + properties: + run_id: + type: string + description: workflow run ID + request: + $ref: '#/definitions/RunRequest' + description: The original request message used to initiate this execution. + state: + $ref: '#/definitions/State' + description: The state of the run e.g. RUNNING (see State) + run_log: + $ref: '#/definitions/Log' + description: The logs, and other key info like timing and exit code, for the overall run of this workflow. + task_logs: + type: array + items: + $ref: '#/definitions/Log' + description: The logs, and other key info like timing and exit code, for each step in the workflow run. + outputs: + type: object + description: The outputs from the workflow run. + RunRequest: + type: object + properties: + workflow_params: + type: object + description: |- + REQUIRED + The workflow run parameterizations (JSON encoded), including input and output file locations + workflow_type: + type: string + description: |- + REQUIRED + The workflow descriptor type, must be "CWL" or "WDL" currently (or another alternative supported by this WES instance) + workflow_type_version: + type: string + description: |- + REQUIRED + The workflow descriptor type version, must be one supported by this WES instance + tags: + type: object + additionalProperties: + type: string + description: |- + OPTIONAL + A key-value map of arbitrary metadata outside the scope of `workflow_params` but useful to track with this run request + workflow_engine_parameters: + type: object + additionalProperties: + type: string + description: >- + OPTIONAL + + Additional parameters can be sent to the workflow engine using this field. Default values + for these parameters can be obtained using the ServiceInfo endpoint. + workflow_url: + type: string + description: >- + REQUIRED + + The workflow CWL or WDL document. + When `workflow_attachments` is used to attach files, the `workflow_url` may be a relative path + to one of the attachments. + description: |- + To execute a workflow, send a run request including all the details needed to begin downloading + and executing a given workflow. + RunId: + type: object + properties: + run_id: + type: string + description: workflow run ID + RunStatus: + type: object + required: + - run_id + properties: + run_id: + type: string + state: + $ref: '#/definitions/State' + description: Small description of a workflow run, returned by server during listing + WorkflowTypeVersion: + type: object + properties: + workflow_type_version: + type: array + items: + type: string + description: |- + an array of one or more acceptable types for the `workflow_type` + + description: Available workflow types supported by a given instance of the service. + ErrorResponse: + description: >- + An object that can optionally include information about the error. + type: object + properties: + msg: + type: string + description: A detailed error message. + status_code: + type: integer + description: The integer representing the HTTP status code (e.g. 200, 404). diff --git a/wes_elixir/api/controllers.py b/wes_elixir/api/controllers.py new file mode 100644 index 0000000..40d47b5 --- /dev/null +++ b/wes_elixir/api/controllers.py @@ -0,0 +1,47 @@ +"""Controller for auxiliary WES-ELIXIR API endpoints.""" + +import logging + +from celery import current_app as celery_app +from connexion import request +from flask import current_app + +from wes_elixir.security.decorators import auth_token_optional + +# Get logger instance +logger = logging.getLogger(__name__) + + +# GET /stdout/ +@auth_token_optional +def get_stdout(run_id, *args, **kwargs): + """Returns run STDOUT as plain text.""" + response = "" + log_request(request, response) + return response + + +# POST /stderr/ +@auth_token_optional +def get_stderr(run_id, *args, **kwargs): + """Returns run STDERR as plain text.""" + response = "" + log_request(request, response) + return response + + +def log_request(request, response): + """Writes request and response to log.""" + # TODO: write decorator for request logging + logger.debug( + ( + "Response to request \"{method} {path} {protocol}\" from " + "{remote_addr}: {response}" + ).format( + method=request.environ['REQUEST_METHOD'], + path=request.environ['PATH_INFO'], + protocol=request.environ['SERVER_PROTOCOL'], + remote_addr=request.environ['REMOTE_ADDR'], + response=response, + ) + ) diff --git a/wes_elixir/api/register_openapi.py b/wes_elixir/api/register_openapi.py index bec216c..f71bf82 100644 --- a/wes_elixir/api/register_openapi.py +++ b/wes_elixir/api/register_openapi.py @@ -39,13 +39,20 @@ def register_openapi( path = __add_security_definitions(in_file=path) # Generate API endpoints from OpenAPI spec + options = { + "swagger_ui": get_conf(spec, 'swagger_ui'), + "serve_spec": get_conf(spec, 'swagger_json'), + } + base_path = get_conf(spec, 'base_path') + if not base_path: + base_path = None try: app.add_api( path, strict_validation=get_conf(spec, 'strict_validation'), validate_responses=get_conf(spec, 'validate_responses'), - swagger_ui=get_conf(spec, 'swagger_ui'), - swagger_json=get_conf(spec, 'swagger_json'), + options=options, + base_path=base_path, ) logger.info("API endpoints specified in '{path}' added.".format( diff --git a/wes_elixir/api/schema.stdout_stderr.openapi.yaml b/wes_elixir/api/schema.stdout_stderr.openapi.yaml new file mode 100755 index 0000000..42a2c80 --- /dev/null +++ b/wes_elixir/api/schema.stdout_stderr.openapi.yaml @@ -0,0 +1,163 @@ +openapi: 3.0.0 +info: + title: WES-ELIXIR STDOUT & STDERR OpenAPI specification + contact: + name: ELIXIR Cloud & AAI group + email: alexander.kanitz@alumni.ethz.ch + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 + version: 0.14.0 +servers: +- url: /wes-elixir/v1 +paths: + /stdout/{run_id}: + get: + summary: |- + Retrieves the content of the indicated run's STDOUT stream and returns + it as plain text. + parameters: + - in: path + name: run_id + schema: + type: string + required: true + description: Run identifier. + operationId: get_stdout + responses: + 200: + description: |- + STDOUT stream of indicated run as plain text. + content: + text/plain: + schema: + type: string + example: "This is STDOUT." + 400: + description: The request is malformed. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 401: + description: The request is unauthorized. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 403: + description: The requester is not authorized to perform this action. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 404: + description: The requested resource was not found. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 500: + description: An unexpected error occurred. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + x-openapi-router-controller: api.controllers + /stderr/{run_id}: + get: + summary: |- + Retrieves the content of the indicated run's STDERR stream and returns + it as plain text. + operationId: get_stderr + parameters: + - in: path + name: run_id + schema: + type: string + required: true + description: Run identifier. + responses: + 200: + description: |- + STDERR stream of indicated run as plain text. + content: + text/plain: + schema: + type: string + example: "This is STDERR." + 400: + description: The request is malformed. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 401: + description: The request is unauthorized. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 403: + description: The requester is not authorized to perform this action. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 404: + description: The requested resource was not found. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + 500: + description: An unexpected error occurred. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + x-openapi-router-controller: api.controllers +components: + schemas: + Error: + required: + - message + - reason + type: object + properties: + message: + type: string + description: |- + A human readable message providing more details about the error. + example: + Required parameter 'xyz' is missing. + reason: + type: string + description: |- + Unique identifier for this error, but *not* the HTTP response code + (e.g., name of exception). + example: ValueError + description: An individual error message. + ErrorResponse: + required: + - code + - errors + - message + type: object + properties: + code: + type: integer + description: HTTP status code (e.g., 400, 404). + format: int64 + example: 400 + errors: + type: array + description: List of associated errors and warnings. + items: + $ref: '#/components/schemas/Error' + message: + type: string + description: |- + A human readable message providing more details about the error. + example: The request could not be interpreted. + description: A response object for detailed error messages. \ No newline at end of file diff --git a/wes_elixir/config/app_config.yaml b/wes_elixir/config/app_config.yaml index cbde15c..244fe80 100644 --- a/wes_elixir/config/app_config.yaml +++ b/wes_elixir/config/app_config.yaml @@ -53,11 +53,22 @@ celery: # OpenAPI specs api: specs: - - path: '20181010.be85140.workflow_execution_service.swagger.yaml' + - name: 'WES' + path: '20181023.c5406f1-v1-0-0.workflow_execution_service.swagger.yaml' strict_validation: True validate_responses: True swagger_ui: True swagger_json: True + base_path: False + - name: 'stdout_stderr' + path: 'schema.stdout_stderr.openapi.yaml' + strict_validation: True + validate_responses: True + swagger_ui: True + swagger_json: True + base_path: '/wes-elixir/v1' + general_params: + time_format: "%Y-%m-%dT%H:%M:%SZ" endpoint_params: default_page_size: 5 timeout_cancel_run: 60 diff --git a/wes_elixir/database/db_utils.py b/wes_elixir/database/db_utils.py index 95999f2..8b66202 100644 --- a/wes_elixir/database/db_utils.py +++ b/wes_elixir/database/db_utils.py @@ -1,6 +1,6 @@ """Utility functions for MongoDB document insertion, updates and retrieval.""" -from typing import (Any, List, Mapping, Optional) +from typing import (Any, Dict, List, Mapping, Optional) from bson.objectid import ObjectId from pymongo.collection import ReturnDocument @@ -69,8 +69,8 @@ def update_tes_task_state( ) -> Optional[Mapping[Any, Any]]: """Updates `state` field in TES task log and returns updated document.""" return collection.find_one_and_update( - {'task_id': task_id, 'api.task_logs': {'$elemMatch': {'id': tes_id}}}, - {'$set': {'api.task_logs.$.state': state}}, + {'task_id': task_id, 'internal.tes_logs': {'$elemMatch': {'id': tes_id}}}, + {'$set': {'internal.tes_logs.$.state': state}}, return_document=ReturnDocument.AFTER ) @@ -78,12 +78,25 @@ def update_tes_task_state( def append_to_tes_task_logs( collection: Collection, task_id: str, - tes_log: str + task_log: Dict, ) -> Optional[Mapping[Any, Any]]: """Appends task log to TES task logs and returns updated document.""" return collection.find_one_and_update( {'task_id': task_id}, - {'$push': {'api.task_logs': tes_log}}, + {'$push': {'internal.tes_logs': task_log}}, + return_document=ReturnDocument.AFTER + ) + + +def append_to_wes_task_logs( + collection: Collection, + task_id: str, + task_log: Dict, +) -> Optional[Mapping[Any, Any]]: + """Appends task log to WES task logs and returns updated document.""" + return collection.find_one_and_update( + {'task_id': task_id}, + {'$push': {'api.task_logs': task_log}}, return_document=ReturnDocument.AFTER ) diff --git a/wes_elixir/ga4gh/wes/endpoints/get_run_log.py b/wes_elixir/ga4gh/wes/endpoints/get_run_log.py index 3b63acc..66b2363 100644 --- a/wes_elixir/ga4gh/wes/endpoints/get_run_log.py +++ b/wes_elixir/ga4gh/wes/endpoints/get_run_log.py @@ -52,4 +52,6 @@ def get_run_log( ) raise Forbidden + # Remove + return run_log diff --git a/wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py b/wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py new file mode 100644 index 0000000..793f82c --- /dev/null +++ b/wes_elixir/ga4gh/wes/endpoints/run_workflow (oldirty's conflicted copy 2020-01-06).py @@ -0,0 +1,515 @@ +"""Utility functions for POST /runs endpoint.""" + +import logging +import os +import re +import shutil +import string # noqa: F401 +import subprocess + +from celery import uuid +from json import (decoder, loads) +from pymongo.errors import DuplicateKeyError +from random import choice +from typing import Dict +from yaml import dump +from werkzeug.datastructures import ImmutableMultiDict + +from wes_elixir.config.config_parser import get_conf +from wes_elixir.errors.errors import BadRequest +from wes_elixir.tasks.tasks.run_workflow import task__run_workflow + + +# Get logger instance +logger = logging.getLogger(__name__) + + +# Utility function for endpoint POST /runs +def run_workflow( + config: Dict, + form_data: ImmutableMultiDict, + *args, + **kwargs +) -> Dict: + """Executes workflow and save info to database; returns unique run id.""" + # Validate data and prepare run environment + form_data_dict = __immutable_multi_dict_to_nested_dict( + multi_dict=form_data + ) + __validate_run_workflow_request(data=form_data_dict) + __check_service_info_compatibility(data=form_data_dict) + document = __init_run_document(data=form_data_dict) + document = __create_run_environment( + config=config, + document=document, + **kwargs + ) + + # Start workflow run in background + __run_workflow( + config=config, + document=document, + **kwargs + ) + + response = {'run_id': document['run_id']} + return response + + +def __immutable_multi_dict_to_nested_dict( + multi_dict: ImmutableMultiDict +) -> Dict: + """Converts ImmutableMultiDict to nested dictionary.""" + # Convert to flat dictionary + nested_dict = multi_dict.to_dict(flat=True) + for key in nested_dict: + # Try to decode JSON string; ignore JSONDecodeErrors + try: + nested_dict[key] = loads(nested_dict[key]) + except decoder.JSONDecodeError: + pass + return nested_dict + + +def __validate_run_workflow_request(data: Dict) -> None: + """Validates presence and types of workflow run request form data; sets + defaults for optional fields.""" + # The form data is not validated properly because all types except + # 'workflow_attachment' are string and none are labeled as required + # Considering the 'RunRequest' model in the current specs (0.3.0), the + # following assumptions are made and verified for the indicated parameters: + # workflow_params: + # type = dict + # required = True + # workflow_type: + # type = str + # required = True + # workflow_type_version: + # type = str + # required = True + # tags: + # type = dict + # required = False + # workflow_engine_parameters: + # type = dict + # required = False + # workflow_url: + # type = str + # required = True + # workflow_attachment: + # type = [str] + # required = False + + # Set required parameters + required = { + 'workflow_params', + 'workflow_type', + 'workflow_type_version', + 'workflow_url', + } + params_str = [ + 'workflow_type', + 'workflow_type_version', + 'workflow_url', + ] + params_dict = [ + 'workflow_params', + 'workflow_engine_parameters', + 'tags', + ] + type_str = dict((key, data[key]) for key in params_str if key in data) + type_dict = dict((key, data[key]) for key in params_dict if key in data) + # TODO: implement type casting/checking for workflow attachment + + # Raise error if any required params are missing + if not required <= set(data): + logger.error('POST request does not conform to schema.') + raise BadRequest + + # Raise error if any string params are not of type string + if not all(isinstance(value, str) for value in type_str.values()): + logger.error('POST request does not conform to schema.') + raise BadRequest + + # Raise error if any dict params are not of type dict + if not all(isinstance(value, dict) for value in type_dict.values()): + logger.error('POST request does not conform to schema.') + raise BadRequest + + return None + + +def __check_service_info_compatibility(data: Dict) -> None: + """Checks compatibility with service info; raises BadRequest.""" + # TODO: implement me + return None + + +def __init_run_document(data: Dict) -> Dict: + """Initializes workflow run document.""" + document: Dict = dict() + document['api'] = dict() + document['internal'] = dict() + document['api']['request'] = data + document['api']['state'] = 'UNKNOWN' + document['api']['run_log'] = dict() + document['api']['task_logs'] = list() + document['api']['outputs'] = dict() + return document + + +def __create_run_environment( + config: Dict, + document: Dict, + **kwargs +) -> Dict: + """Creates unique run identifier and permanent and temporary storage + directories for current run.""" + collection_runs = get_conf(config, 'database', 'collections', 'runs') + out_dir = get_conf(config, 'storage', 'permanent_dir') + tmp_dir = get_conf(config, 'storage', 'tmp_dir') + run_id_charset = eval(get_conf(config, 'database', 'run_id', 'charset')) + run_id_length = get_conf(config, 'database', 'run_id', 'length') + + # Keep on trying until a unique run id was found and inserted + # TODO: If no more possible IDs => inf loop; fix (raise custom error; 500 + # to user) + while True: + + # Create unique run and task ids + run_id = __create_run_id( + charset=run_id_charset, + length=run_id_length, + ) + task_id = uuid() + + # Set temporary and output directories + current_tmp_dir = os.path.abspath(os.path.join(tmp_dir, run_id)) + current_out_dir = os.path.abspath(os.path.join(out_dir, run_id)) + + # Try to create workflow run directory (temporary) + try: + # TODO: Think about permissions + # TODO: Add working dir (currently one has to run the app from + # outermost dir) + os.makedirs(current_tmp_dir) + os.makedirs(current_out_dir) + + # Try new run id if directory already exists + except FileExistsError: + continue + + # Add run/task/user identifier, temp/output directories to document + document['run_id'] = run_id + document['task_id'] = task_id + if 'user_id' in kwargs: + document['user_id'] = kwargs['user_id'] + else: + document['user_id'] = None + document['internal']['tmp_dir'] = current_tmp_dir + document['internal']['out_dir'] = current_out_dir + + # Process worflow attachments + document = __process_workflow_attachments(document) + + # Try to insert document into database + try: + collection_runs.insert(document) + + # Try new run id if document already exists + except DuplicateKeyError: + + # And remove run directories created previously + shutil.rmtree(current_tmp_dir, ignore_errors=True) + shutil.rmtree(current_out_dir, ignore_errors=True) + + continue + + # Catch other database errors + # TODO: implement properly + except Exception as e: + print('Database error') + print(e) + break + + # Exit loop + break + + return document + + +def __create_run_id( + charset: str = '0123456789', + length: int = 6 +) -> str: + """Creates random run ID.""" + return ''.join(choice(charset) for __ in range(length)) + + +def __process_workflow_attachments(data: Dict) -> Dict: + """Processes workflow attachments.""" + # TODO: implement properly + # Current workaround until processing of workflow attachments is + # implemented + # Use 'workflow_url' for path to (main) CWL workflow file on local file + # system or in Git repo + # Use 'workflow_params' or file in Git repo to generate YAML file + + # Set regular expression for finding workflow files on git repositories + # Assumptions: + # - A URL needs to consist of a root, a "separator" keyword, a + # branch/commit, and a "file path", separated by slashes + # - The root is the part of the URL up to the separator and is assumed to + # represent the "git clone URL" when '.git' is appended + # - Accepted separator keywords are 'blob', 'src' and 'tree' + # - The value branch/commit is used to checkout the repo to that state + # before obtaining the file + # - The "file path" segment represents the relative path to the CWL + # workflow file when inside the repo + # + # All of the above assumptions should be met when copying the links of + # files in most repos on GitHub, GitLab or Bitbucket + # + # Note that the "file path" portion (see above) of a CWL *parameter file* + # can be *optionally* appended to the URL + # + # The following additional rules apply for workflow and/or parameter files: + # - CWL workflow files *must* end in .cwl, .yml, .yaml or .json + # - Parameter files *must* end in '.yml', '.yaml' or '.json' + # - Accepted delimiters for separating workflow and parameter file, if + # specified, are: ',', ';', ':', '|' + re_git_file = re.compile( + ( + r'^(https?:.*)\/(blob|src|tree)\/(.*?)\/(.*?\.(cwl|yml|yaml|json))' + r'[,:;|]?(.*\.(yml|yaml|json))?' + ) + ) + + # Create directory for storing workflow files + workflow_dir = os.path.abspath( + os.path.join( + data['internal']['out_dir'], 'workflow_files' + ) + ) + try: + os.mkdir(workflow_dir) + + except OSError: + # TODO: Do something more reasonable here + pass + + # Get main workflow file + user_string = data['api']['request']['workflow_url'] + m = re_git_file.match(user_string) + + # Get workflow from Git repo if regex matches + if m: + + repo_url = '.'.join([m.group(1), 'git']) + branch_commit = m.group(3) + cwl_path = m.group(4) + + # Try to clone repo + if not subprocess.run( + [ + 'git', + 'clone', + repo_url, + os.path.join(workflow_dir, 'repo') + ], + check=True + ): + logger.error( + ( + 'Could not clone Git repository. Check value of ' + "'workflow_url' in run request." + ) + ) + raise BadRequest + + # Try to checkout branch/commit + if not subprocess.run( + [ + 'git', + '--git-dir', + os.path.join(workflow_dir, 'repo', '.git'), + '--work-tree', + os.path.join(workflow_dir, 'repo'), + 'checkout', + branch_commit + ], + check=True + ): + logger.error( + ( + 'Could not checkout repository commit/branch. Check value ' + "of 'workflow_url' in run request." + ) + ) + raise BadRequest + + # Set CWL path + data['internal']['cwl_path'] = os.path.join( + workflow_dir, + 'repo', + cwl_path + ) + + # Else assume value of 'workflow_url' represents file on local file system + else: + + # Set main CWL workflow file path + data['internal']['cwl_path'] = os.path.abspath( + data['api']['request']['workflow_url'] + ) + + # Extract name and extensions of workflow + workflow_name_ext = os.path.splitext( + os.path.basename( + data['internal']['cwl_path'] + ) + ) + + # Try to get parameters from 'workflow_params' field + if data['api']['request']['workflow_params']: + data['internal']['param_file_path'] = os.path.join( + workflow_dir, + '.'.join([ + str(workflow_name_ext[0]), + 'yml', + ]), + ) + with open(data['internal']['param_file_path'], 'w') as yaml_file: + dump( + data['api']['request']['workflow_params'], + yaml_file, + allow_unicode=True, + default_flow_style=False + ) + + # Or from provided relative file path in repo + elif m and m.group(6): + param_path = m.group(6) + data['internal']['param_file_path'] = os.path.join( + workflow_dir, + 'repo', + param_path, + ) + + # Else try to see if there is a 'yml', 'yaml' or 'json' file with exactly + # the same basename as CWL in same dir + else: + param_file_extensions = ['yml', 'yaml', 'json'] + for ext in param_file_extensions: + possible_param_file = os.path.join( + workflow_dir, + 'repo', + '.'.join([ + str(workflow_name_ext[0]), + ext, + ]), + ) + if os.path.isfile(possible_param_file): + data['internal']['param_file_path'] = possible_param_file + break + + # Raise BadRequest if not parameter file was found + if 'param_file_path' not in data['internal']: + raise BadRequest + + # Extract workflow attachments from form data dictionary + if 'workflow_attachment' in data['api']['request']: + + # TODO: do something with data['workflow_attachment'] + + # Strip workflow attachments from data + del data['api']['request']['workflow_attachment'] + + # Add workflow base name (without extension) to document + data['api']['run_log']['name'] = str(workflow_name_ext[0]) + + # Return form data stripped of workflow attachments + return data + + +def __run_workflow( + config: Dict, + document: Dict, + **kwargs +) -> None: + """Helper function `run_workflow()`.""" + tes_url = get_conf(config, 'tes', 'url') + remote_storage_url = get_conf(config, 'storage', 'remote_storage_url') + run_id = document['run_id'] + task_id = document['task_id'] + tmp_dir = document['internal']['tmp_dir'] + cwl_path = document['internal']['cwl_path'] + param_file_path = document['internal']['param_file_path'] + + # Build command + command_list = [ + 'cwl-tes', + '--debug', + '--leave-outputs', + '--remote-storage-url', remote_storage_url, + '--tes', tes_url, + cwl_path, + param_file_path + ] + + # Add authorization parameters + if 'token' in kwargs: + auth_params = [ + '--token-public-key', get_conf( + config, + 'security', + 'jwt', + 'public_key' + ).encode('unicode_escape').decode('utf-8'), + '--token', kwargs['token'], + ] + command_list[2:2] = auth_params + + # TEST CASE FOR SYSTEM ERROR + # command_list = [ + # '/path/to/non_existing/script', + # ] + # TEST CASE FOR EXECUTOR ERROR + # command_list = [ + # '/bin/false', + # ] + # TEST CASE FOR SLOW COMPLETION WITH ARGUMENT (NO STDOUT/STDERR) + # command_list = [ + # 'sleep', + # '30', + # ] + + # Get timeout duration + timeout_duration = get_conf( + config, + 'api', + 'endpoint_params', + 'timeout_run_workflow', + ) + + # Execute command as background task + logger.info( + ( + "Starting execution of run '{run_id}' as task '{task_id}' in " + "'{tmp_dir}'..." + ).format( + run_id=run_id, + task_id=task_id, + tmp_dir=tmp_dir, + ) + ) + task__run_workflow.apply_async( + None, + { + 'command_list': command_list, + 'tmp_dir': tmp_dir, + }, + task_id=task_id, + soft_time_limit=timeout_duration, + ) + return None diff --git a/wes_elixir/tasks/celery_task_monitor.py b/wes_elixir/tasks/celery_task_monitor.py index 0769e1f..839ab29 100644 --- a/wes_elixir/tasks/celery_task_monitor.py +++ b/wes_elixir/tasks/celery_task_monitor.py @@ -6,7 +6,6 @@ import os import re import requests -from shlex import quote from threading import Thread from time import sleep from typing import (Dict, List, Optional) @@ -24,26 +23,29 @@ logger = logging.getLogger(__name__) -# Set string time format -strf: str = '%Y-%m-%d %H:%M:%S.%f' - - class TaskMonitor(): """Celery task monitor.""" + def __init__( self, celery_app: Celery, collection: Collection, tes_config: Dict[str, str], + stdout_endpoint: Optional[str] = None, + stderr_endpoint: Optional[str] = None, timeout: float = 0, authorization: bool = True, + time_format: str = "%Y-%m-%dT%H:%M:%SZ", ) -> None: """Starts Celery task monitor daemon process.""" self.celery_app = celery_app self.collection = collection + self.stdout_endpoint = stdout_endpoint + self.stderr_endpoint = stderr_endpoint self.timeout = timeout self.authorization = authorization + self.time_format = time_format self.tes_config = tes_config self.thread = Thread(target=self.run, args=()) @@ -52,6 +54,7 @@ def __init__( logger.debug('Celery task monitor daemon process started...') + def run(self) -> None: """Daemon process for Celery task monitor.""" while True: @@ -105,6 +108,7 @@ def run(self) -> None: # Sleep for specified interval sleep(self.timeout) + def on_task_received( self, event: Event @@ -127,16 +131,17 @@ def on_task_received( ) pass - # Build command + # Process command if 'command_list' in kwargs: - if self.authorization: - kwargs['command_list'][3] = '' - kwargs['command_list'][5] = '' - command = ' '.join( - [quote(item) for item in kwargs['command_list']] - ) + cmd = kwargs['command_list'][:] + # Censor sensitive info + try: + cmd[cmd.index("--token") + 1] = ' None: """Event handler for successful, failed and canceled Celery tasks.""" - if not self.collection.find_one({'task_id': event['uuid']}): + document = self.collection.find_one({'task_id': event['uuid']}) + if not document: return None + + # Create dictionary for internal parameters + internal = dict() + internal['task_finished'] = datetime.utcfromtimestamp( + event['timestamp'] + ) + # Parse subprocess results try: (returncode, log, tes_ids) = literal_eval(event['result']) @@ -254,11 +263,20 @@ def on_task_succeeded( ) pass - # Create dictionary for internal parameters - internal = dict() - internal['task_finished'] = datetime.utcfromtimestamp( - event['timestamp'] - ) + # Save STDOUT & STDERR + internal['stdout'] = log + internal['stderr'] = '' + + # Compile API URLs to retreive STDOUT & STDERR + if 'run_id' in document: + if self.stdout_endpoint: + stdout_url = '/'.join([self.stdout_endpoint, document['run_id']]) + else: + stdout_url = 'unavailable' + if self.stderr_endpoint: + stderr_url = '/'.join([self.stderr_endpoint, document['run_id']]) + else: + stderr_url = 'unavailable' # Set final state to be set document = self.collection.find_one( @@ -278,8 +296,17 @@ def on_task_succeeded( # Extract run outputs outputs = self.__cwl_tes_outputs_parser(log) - # Get task logs - task_logs = self.__get_tes_task_logs(tes_ids=tes_ids) + # Get task logs from TES + internal['tes_logs'] = self.__get_tes_task_logs(tes_ids=tes_ids) + + # Process task logs + task_logs = [] + for task_log in internal['tes_logs']: + task_logs.append( + self.__TES_to_WES_task_log( + task_log=task_log, + ) + ) # Update run document in database try: @@ -289,12 +316,10 @@ def on_task_succeeded( internal=internal, outputs=outputs, task_logs=task_logs, - task_finished=datetime.utcfromtimestamp( - event['timestamp'] - ).strftime(strf), - return_code=returncode, - stdout=log, - stderr='', + end_time=internal['task_finished'].strftime(self.time_format), + exit_code=returncode, + stdout=stdout_url, + stderr=stderr_url, ) except Exception as e: logger.exception( @@ -309,6 +334,7 @@ def on_task_succeeded( ) pass + def on_task_tes_task_update( self, event: Event @@ -321,8 +347,15 @@ def on_task_tes_task_update( db_utils.append_to_tes_task_logs( collection=self.collection, task_id=event['uuid'], - tes_log=tes_log, + task_log=tes_log, ) + wes_log = self.__TES_to_WES_task_log(task_log=tes_log) + db_utils.append_to_wes_task_logs( + collection=self.collection, + task_id=event['uuid'], + task_log=wes_log, + ) + except Exception as e: logger.exception( ( @@ -368,6 +401,7 @@ def on_task_tes_task_update( ) pass + def update_run_document( self, event: Event, @@ -416,37 +450,37 @@ def update_run_document( root='api.run_log', **run_log_params, ) + else: + document = dict() # Calculate queue, execution and run time - if document and document['internal']: + if 'internal' in document: run_log = document['internal'] - durations = dict() if 'task_started' in run_log_params: if 'task_started' in run_log and 'task_received' in run_log: pass - durations['time_queue'] = ( + run_log['time_queue'] = ( run_log['task_started'] - run_log['task_received'] ).total_seconds() if 'task_finished' in run_log_params: if 'task_finished' in run_log and 'task_started' in run_log: pass - durations['time_execution'] = ( + run_log['time_execution'] = ( run_log['task_finished'] - run_log['task_started'] ).total_seconds() if 'task_finished' in run_log and 'task_received' in run_log: pass - durations['time_total'] = ( + run_log['time_total'] = ( run_log['task_finished'] - run_log['task_received'] ).total_seconds() - if durations: document = db_utils.upsert_fields_in_root_object( collection=self.collection, task_id=event['uuid'], root='api.run_log', - **durations, + internal=internal, ) # Update state @@ -461,7 +495,7 @@ def update_run_document( raise # Log info message - if document: + if 'run_id' in document: logger.info( ( "State of run '{run_id}' (task id: '{task_id}') changed " @@ -475,6 +509,7 @@ def update_run_document( return document + @staticmethod def __cwl_tes_outputs_parser(log: str) -> Dict: """Parses outputs from cwl-tes log.""" @@ -489,6 +524,7 @@ def __cwl_tes_outputs_parser(log: str) -> Dict: else: return dict() + def __get_tes_task_logs( self, tes_ids: List = list() @@ -499,10 +535,11 @@ def __get_tes_task_logs( task_logs.append(self.__get_tes_task_log(tes_id)) return task_logs + def __get_tes_task_log( self, tes_id: str - ) -> str: + ) -> Dict: """Gets task log from TES instance.""" # Build URL base = self.tes_config['url'] @@ -516,3 +553,65 @@ def __get_tes_task_log( task_log = requests.get(url).json() return task_log + + + @staticmethod + def __TES_to_WES_task_log( + task_log: Dict, + index: int = 0, + ) -> Dict: + """ + Converts TES task info/log to WES-compliant task log. + + Unavailable properties are not set. + + As invididual TES logs currently may return multiple commands for most + properties, but WES logs expect only a single value, only the value in + each array is returned that corresponds to the specified index. + """ + wes_task_log: Dict = dict() + + # Set task name + try: + wes_task_log['name'] = task_log['name'] + except KeyError: + pass + + # Set task start time + try: + wes_task_log['start_time'] = task_log['logs'][index]['start_time'] + except (KeyError, TypeError): + pass + + # Set task end time + try: + wes_task_log['end_time'] = task_log['logs'][index]['end_time'] + except (KeyError, TypeError): + pass + + # Set task exit code + try: + wes_task_log['exit_code'] = \ + task_log['logs'][index]['logs'][index]['exit_code'] + except (IndexError, KeyError, TypeError): + pass + + # Set task command + try: + wes_task_log['cmd'] = task_log['executors'][index]['command'] + except (IndexError, KeyError, TypeError): + pass + + # Set task STDOUT + try: + wes_task_log['stdout'] = task_log['executors'][index]['stdout'] + except (IndexError, KeyError, TypeError): + pass + + # Set task STDERR + try: + wes_task_log['stderr'] = task_log['executors'][index]['stderr'] + except (IndexError, KeyError, TypeError): + pass + + return wes_task_log diff --git a/wes_elixir/tasks/register_celery.py b/wes_elixir/tasks/register_celery.py index 70f3e80..ca9b29c 100644 --- a/wes_elixir/tasks/register_celery.py +++ b/wes_elixir/tasks/register_celery.py @@ -20,6 +20,16 @@ def register_task_service(app: Flask) -> None: # Instantiate Celery app instance celery_app = create_celery_app(app) + # Find STDOUT/STDERR endpoints + stdout_endpoint = None + stderr_endpoint = None + for spec in app.config['api']['specs']: + if spec['name'] == 'stdout_stderr': + base_path = spec['base_path'] + stdout_endpoint = '/'.join([base_path, 'stdout']) + stderr_endpoint = '/'.join([base_path, 'stderr']) + break + # Start task monitor daemon TaskMonitor( celery_app=celery_app, @@ -32,8 +42,11 @@ def register_task_service(app: Flask) -> None: 'logs_endpoint_query_params': app.config['tes']['get_logs']['query_params'], }, + stdout_endpoint=stdout_endpoint, + stderr_endpoint=stderr_endpoint, timeout=app.config['celery']['monitor']['timeout'], authorization=app.config['security']['authorization_required'], + time_format=app.config['api']['general_params']['time_format'], ) logger.info('Celery task monitor registered.')