Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve database management, files storage management and sub-process management #662

Draft
wants to merge 81 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
9b62115
rewrite pywps.app.basic.parse_http_url in hope to make it understandable
gschwind Jul 5, 2022
ec3b44a
Specify the expected end point in GetCapabilities
gschwind Jul 5, 2022
534f3f6
Implement an explicit dispatch code in Service.call
gschwind Jul 5, 2022
dc44e36
Update documentation to reflect the actual url endpoint behaviour
gschwind Jul 5, 2022
31a73b7
Update test to explicitly use /wps endpoint
gschwind Jul 5, 2022
a480d1d
Move Process.launch_next_process to Service.launch_next_process
gschwind Jun 29, 2022
4148b65
Move Process.execute to Service.execute_instance
gschwind Jun 29, 2022
0c30dd9
Move Process._execute_process to Service._execute_process
gschwind Jun 29, 2022
d80d1b7
Move Process._run_async to Service._run_async
gschwind Jun 29, 2022
91f8d6f
Split Process._run_process into Service.__run_process
gschwind Jun 29, 2022
da2f1c4
Remove Process.service attribute
gschwind Jun 29, 2022
1d99f58
Improve Service.prepare_process_for_execution
gschwind Jun 29, 2022
5f18d5b
Use staticmethod in Service where it can apply
gschwind Jun 29, 2022
c32a5d0
Rename Process._run_process to Process.run_process
gschwind Jun 29, 2022
dbabad0
Split Service._parse_and_execute into Service._parse_request_inputs
gschwind Jun 29, 2022
040f85f
Improve Service._parse_request_inputs
gschwind Jun 29, 2022
b088e77
Improve WPSRequest.__init__
gschwind Jun 29, 2022
8e32c3a
Improve Service._parse_request_inputs signature definition
gschwind Jun 29, 2022
2cac723
Add and generate uuid upon WPSRequest construction
gschwind Jun 29, 2022
fe7d262
Replace Process.async_ by wps_request.is_async
gschwind Jun 29, 2022
6a47050
Redesign the initialisation of process and wps_request in Service
gschwind Jun 29, 2022
ef41a12
Remove uuid parameter where they are not needed anymore
gschwind Jun 29, 2022
c788d7b
Merge and remove Service._parse_and_execute
gschwind Jun 29, 2022
52fd93a
Merge and remove Service.execute_instance
gschwind Jun 29, 2022
77ad046
Merge and remove Service._execute_process
gschwind Jun 29, 2022
105612b
Simplify dblog.store_request
gschwind Jun 29, 2022
a8576f5
Load configuration value in Service.__init__
gschwind Jun 29, 2022
5c08980
Implement Service._get_accurate_process_counts
gschwind Jun 29, 2022
d6bd023
What ever request is comming try to run stored request
gschwind Jun 29, 2022
7951dc9
Move async check from Service to Process
gschwind Jun 29, 2022
02901ca
Rework Service.execute
gschwind Jun 29, 2022
9337377
Fix Service.launch_next_process
gschwind Jun 29, 2022
bb1903d
Clean up get_response in Service
gschwind Jun 29, 2022
111d606
Do not always start process at the end of a previous one
gschwind Jun 29, 2022
7f87515
Add configuration for sqlalchimy debug
gschwind Jun 30, 2022
be76386
Explicitly close session in dblog.pop_first_stored
gschwind Jun 30, 2022
4e1f840
Change how running process are selected in dblog.get_process_counts
gschwind Jun 30, 2022
46dc800
Implement dblog.pop_first_stored_with_limit
gschwind Jun 30, 2022
089ebef
Add timestamp to RequestInstance to ensure ordered processing
gschwind Jun 30, 2022
5a89162
Remove pywps.inout.storage implementation
gschwind Jul 4, 2022
5f410d7
Remove old tests/test_*storage.py
gschwind Jul 4, 2022
a7e05af
Implement dblog.StorageRecord
gschwind Jul 4, 2022
1a7068f
Implement new pywps.inout.storage.basic
gschwind Jul 4, 2022
955a25d
Implement pywps.inout.storage.DatabaseStorage
gschwind Jul 4, 2022
6805997
Implement pywps.inout.storage.FileStorage
gschwind Jul 4, 2022
ef3ef6b
Implement new tests/test_storage_database.py
gschwind Jul 4, 2022
ec20515
Implement new tests/test_file_storage.py
gschwind Jul 4, 2022
8af9d83
Use new pywps storages
gschwind Jul 4, 2022
9751d82
Implement /files to get files from exposed storage
gschwind Jul 4, 2022
28e66f6
Update default storage to FileStorage
gschwind Jul 4, 2022
f113fe5
Implement pywps.dblog.StatusRecord
gschwind Jul 5, 2022
169b715
Store status records into the database
gschwind Jul 5, 2022
160a6f3
Move ExecuteResponse.preprocess_response where it must be actualy
gschwind Jul 5, 2022
da3da75
Ensure that ExecuteResponse.preprocess_response is run only once
gschwind Jul 5, 2022
8aa06eb
Select ExecuteResponse mimetype in the constructor
gschwind Jul 5, 2022
0cc4f9d
Remove redundant json_response return from pywps.app.basic.get_respon…
gschwind Jul 5, 2022
9ea2ee8
Rename pywps.app.basic.get_response_type to select_response_mimetype
gschwind Jul 5, 2022
585cded
Update test to actualy provide valide FakeRequest
gschwind Jul 5, 2022
f661e08
Make safer mimetype selection in pywps.response.execute
gschwind Jul 5, 2022
2c58d2b
Accept all mimetype in pywps.app.basic.select_response_mimetype if no…
gschwind Jul 5, 2022
9d3122f
Add version in ExecuteResponse.json output
gschwind Jul 5, 2022
eec485f
Implement pywps.response.status.StatusResponse
gschwind Jul 5, 2022
fe800c5
Use StatusResponse in pywps.response.execute.ExecuteResponse
gschwind Jul 5, 2022
884fad3
Implement /status
gschwind Jul 5, 2022
61128fb
Update process to use dynamic status
gschwind Jul 5, 2022
1857d1f
Remove obsolete code relative to execute status
gschwind Jul 5, 2022
49b5168
Update status record of crashed processes
gschwind Jul 5, 2022
4fa38f0
Cleanup all process on status request
gschwind Jul 5, 2022
9892467
Update comments in pywps.dblog
gschwind Jul 6, 2022
55f2f73
Create lock factory for dblog
gschwind Jul 11, 2022
43c3ece
Remove ineffective lock in dblock.get_session
gschwind Jul 11, 2022
eca45a5
Protect all database access by an exclusive lock
gschwind Jul 11, 2022
af13204
Add [logging] database_filelock configuration option
gschwind Jul 11, 2022
587e3b3
Implement a basic util.FileLock
gschwind Jul 11, 2022
9ce4399
Add optional filelock in dblog
gschwind Jul 11, 2022
22683a4
Draft test_dblog.py
gschwind Jul 11, 2022
f09e1fe
fix launch
gschwind Jul 11, 2022
66808e3
Base dblog test
gschwind Jul 12, 2022
ccfe062
add test for dblog storage
gschwind Jul 12, 2022
f6d43de
add test for dblog status
gschwind Jul 12, 2022
f6fa83f
Add test for dblog crashed process
gschwind Jul 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,17 @@ python demo.py

4. Run via web browser

`http://localhost/pywps/?service=WPS&request=GetCapabilities&version=1.0.0`
`http://localhost/pywps/wps?service=WPS&request=GetCapabilities&version=1.0.0`

5. Run in command line:

```bash
curl 'http://localhost/pywps/?service=WPS&request=GetCapabilities&version=1.0.0'
curl 'http://localhost/pywps/wps?service=WPS&request=GetCapabilities&version=1.0.0'
```

# Notes

Pywps know add `/wps` at the end of server url as default endpoint for wps. It also use several other endpoint such as `/api`.

# Issues

Expand Down
3 changes: 2 additions & 1 deletion default-sample.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ contact_role=pointOfContact
[server]
maxsingleinputsize=1mb
maxrequestsize=3mb
url=http://localhost:5000/wps
# Base URL, now pywps have several end point such as /wps or /api that will be added to base url
url=http://localhost:5000/
outputurl=http://localhost:5000/outputs/
outputpath=outputs
workdir=workdir
Expand Down
238 changes: 50 additions & 188 deletions pywps/app/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
from pywps.translations import lower_case_dict
import sys
import traceback
import json
import shutil
import copy
import tempfile

from pywps import dblog
from pywps.response import get_response
from pywps.response.status import WPS_STATUS
from pywps.response.execute import ExecuteResponse
from pywps.app.WPSRequest import WPSRequest
from pywps.inout.inputs import input_from_json
from pywps.inout.outputs import output_from_json
Expand All @@ -23,10 +22,12 @@
ServerBusy, NoApplicableCode,
InvalidParameterValue)
from pywps.app.exceptions import ProcessError
from pywps.inout.storage.builder import StorageBuilder
from pywps.inout.storage import new_storage
from pywps.inout.outputs import ComplexOutput
import importlib

from pywps import configuration


LOGGER = logging.getLogger("PYWPS")

Expand Down Expand Up @@ -70,13 +71,9 @@ def __init__(self, handler, identifier, title, abstract='', keywords=None, profi
self.inputs = inputs if inputs is not None else []
self.outputs = outputs if outputs is not None else []
self.uuid = None
self._status_store = None
# self.status_location = ''
# self.status_url = ''
self.workdir = None
self._grass_mapset = None
self.grass_location = grass_location
self.service = None
self.translations = lower_case_dict(translations)

if store_supported:
Expand Down Expand Up @@ -123,32 +120,18 @@ def from_json(cls, value):
new_process.set_workdir(value['workdir'])
return new_process

def execute(self, wps_request, uuid):
self._set_uuid(uuid)
self._setup_status_storage()
self.async_ = False
response_cls = get_response("execute")
wps_response = response_cls(wps_request, process=self, uuid=self.uuid)

LOGGER.debug('Check if status storage and updating are supported by this process')
if wps_request.store_execute == 'true':
if self.store_supported != 'true':
raise StorageNotSupported('Process does not support the storing of the execute response')

if wps_request.status == 'true':
if self.status_supported != 'true':
raise OperationNotSupported('Process does not support the updating of status')

wps_response.store_status_file = True
self.async_ = True
else:
wps_response.store_status_file = False

LOGGER.debug('Check if updating of status is not required then no need to spawn a process')

wps_response = self._execute_process(self.async_, wps_request, wps_response)

return wps_response
def new_instance(self, wps_request: WPSRequest):
"""Generate a new instance of that process with a new temporary directory"""
# make deep copy of the process instance
# so that processes are not overriding each other
# just for execute
process = copy.deepcopy(self)
process.setup_outputs_from_wps_request(wps_request)
workdir = os.path.abspath(config.get_config_value('server', 'workdir'))
tempdir = tempfile.mkdtemp(prefix='pywps_process_', dir=workdir)
process.set_workdir(tempdir)
process._set_uuid(wps_request.uuid)
return process

def _set_uuid(self, uuid):
"""Set uuid and status location path and url
Expand All @@ -161,172 +144,35 @@ def _set_uuid(self, uuid):
for outpt in self.outputs:
outpt.uuid = uuid

def _setup_status_storage(self):
self._status_store = StorageBuilder.buildStorage()

@property
def status_store(self):
if self._status_store is None:
self._setup_status_storage()
return self._status_store

@property
def status_location(self):
return self.status_store.location(self.status_filename)
base_url = configuration.get_config_value('server', 'url').rstrip('/')
return f'{base_url}/status?uuid={self.uuid}'

@property
def status_filename(self):
return str(self.uuid) + '.xml'

@property
def status_url(self):
return self.status_store.url(self.status_filename)

def _execute_process(self, async_, wps_request, wps_response):
"""Uses :module:`pywps.processing` module for sending process to
background BUT first, check for maxprocesses configuration value

:param async_: run in asynchronous mode
:return: wps_response or None
"""

maxparallel = int(config.get_config_value('server', 'parallelprocesses'))

running, stored = dblog.get_process_counts()

if maxparallel != -1 and running >= maxparallel:
# Try to check for crashed process
dblog.cleanup_crashed_process()
running, stored = dblog.get_process_counts()

# async
if async_:

# run immedietly
LOGGER.debug("Running processes: {} of {} allowed parallelprocesses".format(running, maxparallel))
LOGGER.debug("Stored processes: {}".format(stored))

if running < maxparallel or maxparallel == -1:
wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0)
LOGGER.debug("Accepted request {}".format(self.uuid))
self._run_async(wps_request, wps_response)

# try to store for later usage
else:
maxprocesses = int(config.get_config_value('server', 'maxprocesses'))
if stored >= maxprocesses and maxprocesses != -1:
raise ServerBusy('Maximum number of processes in queue reached. Please try later.')
LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid))
dblog.store_process(self.uuid, wps_request)
wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0)

# not async
else:
if running >= maxparallel and maxparallel != -1:
raise ServerBusy('Maximum number of parallel running processes reached. Please try later.')
wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0)
wps_response = self._run_process(wps_request, wps_response)

return wps_response

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_async(self, wps_request, wps_response):
import pywps.processing
process = pywps.processing.Process(
process=self,
wps_request=wps_request,
wps_response=wps_response)
LOGGER.debug("Starting process for request: {}".format(self.uuid))
process.start()

# This function may not raise exception and must return a valid wps_response
# Failure must be reported as wps_response.status = WPS_STATUS.FAILED
def _run_process(self, wps_request, wps_response):
LOGGER.debug("Started processing request: {} with pid: {}".format(self.uuid, os.getpid()))
# Update the actual pid of current process to check if failed latter
dblog.update_pid(self.uuid, os.getpid())
try:
self._set_grass(wps_request)
# if required set HOME to the current working directory.
if config.get_config_value('server', 'sethomedir') is True:
os.environ['HOME'] = self.workdir
LOGGER.info('Setting HOME to current working directory: {}'.format(os.environ['HOME']))
LOGGER.debug('ProcessID={}, HOME={}'.format(self.uuid, os.environ.get('HOME')))
wps_response._update_status(WPS_STATUS.STARTED, 'PyWPS Process started', 0)
self.handler(wps_request, wps_response) # the user must update the wps_response.
# Ensure process termination
if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED:
# if (not wps_response.status_percentage) or (wps_response.status_percentage != 100):
LOGGER.debug('Updating process status to 100% if everything went correctly')
wps_response._update_status(WPS_STATUS.SUCCEEDED, f'PyWPS Process {self.title} finished', 100)
except Exception as e:
traceback.print_exc()
LOGGER.debug('Retrieving file and line number where exception occurred')
exc_type, exc_obj, exc_tb = sys.exc_info()
found = False
while not found:
# search for the _handler method
m_name = exc_tb.tb_frame.f_code.co_name
if m_name == '_handler':
found = True
else:
if exc_tb.tb_next is not None:
exc_tb = exc_tb.tb_next
else:
# if not found then take the first
exc_tb = sys.exc_info()[2]
break
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
method_name = exc_tb.tb_frame.f_code.co_name

# update the process status to display process failed

msg = 'Process error: method={}.{}, line={}, msg={}'.format(fname, method_name, exc_tb.tb_lineno, e)
LOGGER.error(msg)
# In case of a ProcessError use the validated exception message.
if isinstance(e, ProcessError):
msg = "Process error: {}".format(e)
# Only in debug mode we use the log message including the traceback ...
elif config.get_config_value("logging", "level") != "DEBUG":
# ... otherwise we use a sparse common error message.
msg = 'Process failed, please check server error log'
wps_response._update_status(WPS_STATUS.FAILED, msg, 100)

finally:
# The run of the next pending request if finished here, weather or not it successful
self.launch_next_process()
return self.status_location

def run_process(self, wps_request, wps_response):
self._set_grass(wps_request)
# if required set HOME to the current working directory.
if config.get_config_value('server', 'sethomedir') is True:
os.environ['HOME'] = self.workdir
LOGGER.info('Setting HOME to current working directory: {}'.format(os.environ['HOME']))
LOGGER.debug('ProcessID={}, HOME={}'.format(self.uuid, os.environ.get('HOME')))
wps_response._update_status(WPS_STATUS.STARTED, 'PyWPS Process started', 0)
self.handler(wps_request, wps_response) # the user must update the wps_response.
# Ensure process termination
if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED:
# if (not wps_response.status_percentage) or (wps_response.status_percentage != 100):
LOGGER.debug('Updating process status to 100% if everything went correctly')
wps_response._update_status(WPS_STATUS.SUCCEEDED, f'PyWPS Process {self.title} finished', 100)
return wps_response

def launch_next_process(self):
"""Look at the queue of async process, if the queue is not empty launch the next pending request.
"""
try:
LOGGER.debug("Checking for stored requests")

stored_request = dblog.pop_first_stored()
if not stored_request:
LOGGER.debug("No stored request found")
return

(uuid, request_json) = (stored_request.uuid, stored_request.request)
request_json = request_json.decode('utf-8')
LOGGER.debug("Launching the stored request {}".format(str(uuid)))
new_wps_request = WPSRequest()
new_wps_request.json = json.loads(request_json)
process_identifier = new_wps_request.identifier
process = self.service.prepare_process_for_execution(process_identifier)
process._set_uuid(uuid)
process._setup_status_storage()
process.async_ = True
process.setup_outputs_from_wps_request(new_wps_request)
new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid)
new_wps_response.store_status_file = True
process._run_async(new_wps_request, new_wps_response)
except Exception as e:
LOGGER.exception("Could not run stored process. {}".format(e))

def clean(self):
"""Clean the process working dir and other temporary files
"""
Expand Down Expand Up @@ -356,6 +202,22 @@ def set_workdir(self, workdir):
for outpt in self.outputs:
outpt.workdir = workdir

def is_async(self, wps_request: WPSRequest):
"""Check and return if the request is async
Raise Exception if the request is not compatible with the process
"""
wps_request.is_async = False
if wps_request.store_execute == 'true':
if self.store_supported != 'true':
raise StorageNotSupported('Process does not support the storing of the execute response')

if wps_request.status == 'true':
if self.status_supported != 'true':
raise OperationNotSupported('Process does not support the updating of status')

return True
return False

def _set_grass(self, wps_request):
"""Handle given grass_location parameter of the constructor

Expand Down
Loading