Skip to content

Commit

Permalink
Job resumption working without new process chain (#400)
Browse files Browse the repository at this point in the history
* job resumption working without new process chain

* linting and job resumption readme

* improve example

* fix job resumption

* Apply suggestions from code review

Co-authored-by: Carmen Tawalika <[email protected]>

Co-authored-by: anikaweinmann <[email protected]>
Co-authored-by: Carmen Tawalika <[email protected]>
  • Loading branch information
3 people authored Dec 16, 2022
1 parent fa5568e commit 3e68dea
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
55 changes: 55 additions & 0 deletions job_resumption.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,61 @@ JSONPUT=pc_noerror.json
actiniaput $AUTH $JSONPUT $STATUS_URL_POST
```

### Job resumption without new process chain

If the process is not aborted due to an error in the process chain, the job
can be resumed without sending a new process chain.
Here an example process chain where a folder `/test` is requested which does not exist inside actinia.
The process can be resumpt after the folder is created inside actinia.

`pc_error_not_in_pc.json`: Process chain with not existing folder `/test`
```
{
"list": [
{
"id": "r_mapcalc",
"module": "r.mapcalc",
"inputs": [
{
"param": "expression",
"value": "baum=5"
}
]
},
{
"id": "ls1",
"exe": "ls",
"params": ["/test"]
},
{
"id": "r_info1",
"module": "r.info",
"inputs": [
{
"param": "map",
"value": "baum"
}
],
"flags": "g",
"stdout": {"id": "r_info_1", "format": "kv", "delimiter": "="}
}
],
"version": "1"
}
```

```
# processing (which ends with error)
JSON=pc_error_not_in_pc.json
actiniapost $AUTH $JSON $URL/$ENDPOINT
# manually create /test folder inside actinia
# job resumption
curl -X PUT -H 'Content-Type: application/json' -H 'accept: application/json' -u $AUTH $STATUS_URL_POST
curl -L -u $AUTH $STATUS_URL_POST | jq
```

### Cleaning up resources which are not restarted
If a job is not restarted and the processing is not finished successfully the
interim results will not be automatically cleaned. For this you can delete the
Expand Down
6 changes: 6 additions & 0 deletions src/actinia_core/rest/base/resource_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def preprocess(
location_name=None,
mapset_name=None,
map_name=None,
process_chain_list=None,
):
"""Preprocessing steps for asynchronous processing
Expand All @@ -317,6 +318,9 @@ def preprocess(
computation should be performed
map_name: The name of the map or other resource (raster, vector,
STRDS, color, ...)
process_chain_list (dict): The process chain list (e.g. for the
job resumption when no new postbody is
send in the PUT request)
Returns:
The ResourceDataContainer that contains all required information
Expand All @@ -336,6 +340,8 @@ def preprocess(
elif has_json is True:
if self.check_for_json() is False:
return None
elif process_chain_list is not None:
self.request_data = process_chain_list

# Compute the job timeout of the worker queue from the user credentials
process_time_limit = self.user_credentials["permissions"][
Expand Down
39 changes: 34 additions & 5 deletions src/actinia_core/rest/resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@

import pickle
import re
from flask import g
from flask import (
g,
request,
)
from flask import jsonify, make_response
from flask_restful_swagger_2 import Resource
from flask_restful_swagger_2 import swagger
Expand Down Expand Up @@ -307,7 +310,14 @@ def _check_possibility_of_new_iteration(
return error_msg

def _create_ResourceDataContainer_for_resumption(
self, post_url, pc_step, user_id, resource_id, iteration, endpoint
self,
post_url,
pc_step,
user_id,
resource_id,
iteration,
endpoint,
process_chain_list=None,
):
"""Create the ResourceDataContainer for the resumption of the resource
depending on the post_url
Expand All @@ -319,6 +329,9 @@ def _create_ResourceDataContainer_for_resumption(
user_id (str): The unique user name/id
resource_id (str): The id of the resource
iteration (int): The number of iteration of this resource
process_chain_list (dict): The process chain list (e.g. for the
job resumption when no new postbody is
send in the PUT request)
Returns:
rdc (ResourceDataContainer): The data container that contains all
Expand All @@ -328,6 +341,11 @@ def _create_ResourceDataContainer_for_resumption(
start_job (function): The start job function of the
processing_resource
"""
preprocess_kwargs = {}
if process_chain_list is not None:
preprocess_kwargs["has_json"] = False
preprocess_kwargs["process_chain_list"] = process_chain_list

interim_result = InterimResult(
user_id, resource_id, iteration, endpoint
)
Expand All @@ -346,7 +364,9 @@ def _create_ResourceDataContainer_for_resumption(
processing_resource = AsyncEphemeralResource(
resource_id, iteration, post_url
)
rdc = processing_resource.preprocess(location_name=location)
rdc = processing_resource.preprocess(
location_name=location, **preprocess_kwargs
)
elif processing_class == "AsyncPersistentResource":
# /locations/{location_name}/mapsets/{mapset_name}/processing_async
from .persistent_processing import AsyncPersistentResource
Expand All @@ -357,8 +377,9 @@ def _create_ResourceDataContainer_for_resumption(
)
mapset = re.findall(r"mapsets\/(.*?)\/", post_url)[0]
rdc = processing_resource.preprocess(
location_name=location, mapset_name=mapset
location_name=location, mapset_name=mapset, **preprocess_kwargs
)

elif processing_class == "AsyncEphemeralExportResource":
# /locations/{location_name}/processing_async_export
from .ephemeral_processing_with_export import (
Expand All @@ -371,7 +392,9 @@ def _create_ResourceDataContainer_for_resumption(
processing_resource = AsyncEphemeralExportResource(
resource_id, iteration, post_url
)
rdc = processing_resource.preprocess(location_name=location)
rdc = processing_resource.preprocess(
location_name=location, **preprocess_kwargs
)
else:
return make_response(
jsonify(
Expand Down Expand Up @@ -463,13 +486,19 @@ def put(self, user_id, resource_id):
else:
post_url = None

# use old process chain list if no new one is send
process_chain_list = None
if not request.is_json:
process_chain_list = response_model["process_chain_list"][0]

rdc_resp = self._create_ResourceDataContainer_for_resumption(
post_url,
pc_step,
user_id,
resource_id,
iteration,
response_model["api_info"]["endpoint"],
process_chain_list=process_chain_list,
)

if len(rdc_resp) == 3:
Expand Down

0 comments on commit 3e68dea

Please sign in to comment.