diff --git a/job_resumption.md b/job_resumption.md index 55e9c21ec..fbf43da43 100644 --- a/job_resumption.md +++ b/job_resumption.md @@ -372,6 +372,61 @@ JSONPUT=pc_noerror.json actiniaput $AUTH $JSONPUT $STATUS_URL_POST ``` +### Job resumption without new process chain + +If the process is not aborted due to an error in the process chain, the job +can be resumed without sending a new process chain. +Here an example process chain where a folder `/test` is requested which does not exist inside actinia. +The process can be resumpt after the folder is created inside actinia. + +`pc_error_not_in_pc.json`: Process chain with not existing folder `/test` +``` +{ + "list": [ + { + "id": "r_mapcalc", + "module": "r.mapcalc", + "inputs": [ + { + "param": "expression", + "value": "baum=5" + } + ] + }, + { + "id": "ls1", + "exe": "ls", + "params": ["/test"] + }, + { + "id": "r_info1", + "module": "r.info", + "inputs": [ + { + "param": "map", + "value": "baum" + } + ], + "flags": "g", + "stdout": {"id": "r_info_1", "format": "kv", "delimiter": "="} + } + ], + "version": "1" +} +``` + +``` +# processing (which ends with error) +JSON=pc_error_not_in_pc.json +actiniapost $AUTH $JSON $URL/$ENDPOINT + +# manually create /test folder inside actinia + +# job resumption +curl -X PUT -H 'Content-Type: application/json' -H 'accept: application/json' -u $AUTH $STATUS_URL_POST +curl -L -u $AUTH $STATUS_URL_POST | jq +``` + ### Cleaning up resources which are not restarted If a job is not restarted and the processing is not finished successfully the interim results will not be automatically cleaned. For this you can delete the diff --git a/src/actinia_core/rest/base/resource_base.py b/src/actinia_core/rest/base/resource_base.py index e72b59a2f..983d76a24 100644 --- a/src/actinia_core/rest/base/resource_base.py +++ b/src/actinia_core/rest/base/resource_base.py @@ -298,6 +298,7 @@ def preprocess( location_name=None, mapset_name=None, map_name=None, + process_chain_list=None, ): """Preprocessing steps for asynchronous processing @@ -317,6 +318,9 @@ def preprocess( computation should be performed map_name: The name of the map or other resource (raster, vector, STRDS, color, ...) + process_chain_list (dict): The process chain list (e.g. for the + job resumption when no new postbody is + send in the PUT request) Returns: The ResourceDataContainer that contains all required information @@ -336,6 +340,8 @@ def preprocess( elif has_json is True: if self.check_for_json() is False: return None + elif process_chain_list is not None: + self.request_data = process_chain_list # Compute the job timeout of the worker queue from the user credentials process_time_limit = self.user_credentials["permissions"][ diff --git a/src/actinia_core/rest/resource_management.py b/src/actinia_core/rest/resource_management.py index 93257a257..fc5b277ae 100644 --- a/src/actinia_core/rest/resource_management.py +++ b/src/actinia_core/rest/resource_management.py @@ -28,7 +28,10 @@ import pickle import re -from flask import g +from flask import ( + g, + request, +) from flask import jsonify, make_response from flask_restful_swagger_2 import Resource from flask_restful_swagger_2 import swagger @@ -307,7 +310,14 @@ def _check_possibility_of_new_iteration( return error_msg def _create_ResourceDataContainer_for_resumption( - self, post_url, pc_step, user_id, resource_id, iteration, endpoint + self, + post_url, + pc_step, + user_id, + resource_id, + iteration, + endpoint, + process_chain_list=None, ): """Create the ResourceDataContainer for the resumption of the resource depending on the post_url @@ -319,6 +329,9 @@ def _create_ResourceDataContainer_for_resumption( user_id (str): The unique user name/id resource_id (str): The id of the resource iteration (int): The number of iteration of this resource + process_chain_list (dict): The process chain list (e.g. for the + job resumption when no new postbody is + send in the PUT request) Returns: rdc (ResourceDataContainer): The data container that contains all @@ -328,6 +341,11 @@ def _create_ResourceDataContainer_for_resumption( start_job (function): The start job function of the processing_resource """ + preprocess_kwargs = {} + if process_chain_list is not None: + preprocess_kwargs["has_json"] = False + preprocess_kwargs["process_chain_list"] = process_chain_list + interim_result = InterimResult( user_id, resource_id, iteration, endpoint ) @@ -346,7 +364,9 @@ def _create_ResourceDataContainer_for_resumption( processing_resource = AsyncEphemeralResource( resource_id, iteration, post_url ) - rdc = processing_resource.preprocess(location_name=location) + rdc = processing_resource.preprocess( + location_name=location, **preprocess_kwargs + ) elif processing_class == "AsyncPersistentResource": # /locations/{location_name}/mapsets/{mapset_name}/processing_async from .persistent_processing import AsyncPersistentResource @@ -357,8 +377,9 @@ def _create_ResourceDataContainer_for_resumption( ) mapset = re.findall(r"mapsets\/(.*?)\/", post_url)[0] rdc = processing_resource.preprocess( - location_name=location, mapset_name=mapset + location_name=location, mapset_name=mapset, **preprocess_kwargs ) + elif processing_class == "AsyncEphemeralExportResource": # /locations/{location_name}/processing_async_export from .ephemeral_processing_with_export import ( @@ -371,7 +392,9 @@ def _create_ResourceDataContainer_for_resumption( processing_resource = AsyncEphemeralExportResource( resource_id, iteration, post_url ) - rdc = processing_resource.preprocess(location_name=location) + rdc = processing_resource.preprocess( + location_name=location, **preprocess_kwargs + ) else: return make_response( jsonify( @@ -463,6 +486,11 @@ def put(self, user_id, resource_id): else: post_url = None + # use old process chain list if no new one is send + process_chain_list = None + if not request.is_json: + process_chain_list = response_model["process_chain_list"][0] + rdc_resp = self._create_ResourceDataContainer_for_resumption( post_url, pc_step, @@ -470,6 +498,7 @@ def put(self, user_id, resource_id): resource_id, iteration, response_model["api_info"]["endpoint"], + process_chain_list=process_chain_list, ) if len(rdc_resp) == 3: