From 2ff129d21d19b6ce24f9e20e3f21fd747f09e9db Mon Sep 17 00:00:00 2001 From: mdip226 Date: Tue, 2 May 2023 10:58:51 -0400 Subject: [PATCH 01/31] throw it away --- test/unit_tests/test_ec2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unit_tests/test_ec2.py b/test/unit_tests/test_ec2.py index c21c7088..d7f81e45 100644 --- a/test/unit_tests/test_ec2.py +++ b/test/unit_tests/test_ec2.py @@ -14,8 +14,8 @@ def _get_source_ip(self, cloud_key): def _restrict_ingress(self, ip_address: str = ''): try: super()._restrict_ingress(ip_address) - except NotImplementedError as oh_well: - print(oh_well) + except NotImplementedError as _: + pass @mock_ec2 class EC2ClusterTest(unittest.TestCase): From d133aad35977b4539ca220739672ff61572cf459 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Tue, 2 May 2023 10:59:27 -0400 Subject: [PATCH 02/31] stashed draft from a while back --- stochss_compute/core/messages.py | 17 ++++++++++++----- stochss_compute/core/remote_simulation.py | 7 +++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/stochss_compute/core/messages.py b/stochss_compute/core/messages.py index 28d833e1..70db7325 100644 --- a/stochss_compute/core/messages.py +++ b/stochss_compute/core/messages.py @@ -90,11 +90,15 @@ class SimulationRunRequest(Request): :param model: A model to run. :type model: gillespy2.Model + :param unique: When True, ignore cache completely and return always new results. + :type unique: bool + :param kwargs: kwargs for the model.run() call. :type kwargs: dict[str, Any] ''' - def __init__(self, model, **kwargs): + def __init__(self, model, unique, **kwargs): self.model = model + self.unique = unique self.kwargs = kwargs def encode(self): @@ -102,7 +106,9 @@ def encode(self): JSON-encode model and then encode self to dict. ''' return {'model': self.model.to_json(), - 'kwargs': self.kwargs} + 'unique': self.unique, + 'kwargs': self.kwargs, + } @staticmethod def parse(raw_request): @@ -118,7 +124,8 @@ def parse(raw_request): request_dict = json_decode(raw_request) model = Model.from_json(request_dict['model']) kwargs = request_dict['kwargs'] - return SimulationRunRequest(model, **kwargs) + unique = request_dict['unique'] + return SimulationRunRequest(model, unique, **kwargs) def hash(self): ''' @@ -138,7 +145,7 @@ def hash(self): class SimulationRunResponse(Response): ''' A response from the server regarding a SimulationRunRequest. - + :param status: The status of the simulation. :type status: SimStatus @@ -160,7 +167,7 @@ def __init__(self, status, error_message = None, results_id = None, results = No def encode(self): ''' - Encode self to dict. + Encode self to dict. ''' return {'status': self.status.name, 'error_message': self.error_message or '', diff --git a/stochss_compute/core/remote_simulation.py b/stochss_compute/core/remote_simulation.py index 562b909d..394abc28 100644 --- a/stochss_compute/core/remote_simulation.py +++ b/stochss_compute/core/remote_simulation.py @@ -97,12 +97,15 @@ def is_cached(self, **params): results_dummy.n_traj = params.get('number_of_trajectories', 1) return results_dummy.is_ready - def run(self, **params): + def run(self, unique=False, **params): """ Simulate the Model on the target ComputeServer, returning the results or a handle to a running simulation. See `here `_. + :param unique: When True, ignore cache completely and return always new results. + :type unique: bool + :param params: Arguments to pass directly to the Model#run call on the server. :type params: dict[str, Any] @@ -120,7 +123,7 @@ def run(self, **params): if self.solver is not None: params["solver"] = f"{self.solver.__module__}.{self.solver.__qualname__}" - sim_request = SimulationRunRequest(model=self.model, **params) + sim_request = SimulationRunRequest(self.model, unique, **params) response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=sim_request) if not response_raw.ok: raise Exception(response_raw.reason) From 643ad842c1188f4e62146d439ac40c0cd6fae374 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Tue, 2 May 2023 11:22:21 -0400 Subject: [PATCH 03/31] restructure directory --- stochss_compute/core/messages.py | 395 ------------------ stochss_compute/core/messages/__init__.py | 0 stochss_compute/core/messages/base.py | 83 ++++ stochss_compute/core/messages/results.py | 64 +++ .../core/messages/simulation_run.py | 113 +++++ .../core/messages/simulation_run_unique.py | 113 +++++ stochss_compute/core/messages/source_ip.py | 59 +++ stochss_compute/core/messages/status.py | 71 ++++ stochss_compute/server/run_unique.py | 112 +++++ 9 files changed, 615 insertions(+), 395 deletions(-) delete mode 100644 stochss_compute/core/messages.py create mode 100644 stochss_compute/core/messages/__init__.py create mode 100644 stochss_compute/core/messages/base.py create mode 100644 stochss_compute/core/messages/results.py create mode 100644 stochss_compute/core/messages/simulation_run.py create mode 100644 stochss_compute/core/messages/simulation_run_unique.py create mode 100644 stochss_compute/core/messages/source_ip.py create mode 100644 stochss_compute/core/messages/status.py create mode 100644 stochss_compute/server/run_unique.py diff --git a/stochss_compute/core/messages.py b/stochss_compute/core/messages.py deleted file mode 100644 index 70db7325..00000000 --- a/stochss_compute/core/messages.py +++ /dev/null @@ -1,395 +0,0 @@ -''' -stochss_compute.core.messages -''' -# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. -# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from abc import ABC, abstractmethod -from enum import Enum -from hashlib import md5 -from gillespy2 import Model, Results -from tornado.escape import json_encode, json_decode - - -class SimStatus(Enum): - ''' - Status describing a remote simulation. - ''' - PENDING = 'The simulation is pending.' - RUNNING = 'The simulation is still running.' - READY = 'Simulation is done and results exist in the cache.' - ERROR = 'The Simulation has encountered an error.' - DOES_NOT_EXIST = 'There is no evidence of this simulation either running or on disk.' - - @staticmethod - def from_str(name): - ''' - Convert str to Enum. - ''' - if name == 'PENDING': - return SimStatus.PENDING - if name == 'RUNNING': - return SimStatus.RUNNING - if name == 'READY': - return SimStatus.READY - if name == 'ERROR': - return SimStatus.ERROR - if name == 'DOES_NOT_EXIST': - return SimStatus.DOES_NOT_EXIST - -class Request(ABC): - ''' - Base class. - ''' - @abstractmethod - def encode(self): - ''' - Encode self for http. - ''' - @staticmethod - @abstractmethod - def parse(raw_request): - ''' - Parse http for python. - ''' - -class Response(ABC): - ''' - Base class. - ''' - @abstractmethod - def encode(self): - ''' - Encode self for http. - ''' - @staticmethod - @abstractmethod - def parse(raw_response): - ''' - Parse http for python. - ''' - - -class SimulationRunRequest(Request): - ''' - A simulation request. - - :param model: A model to run. - :type model: gillespy2.Model - - :param unique: When True, ignore cache completely and return always new results. - :type unique: bool - - :param kwargs: kwargs for the model.run() call. - :type kwargs: dict[str, Any] - ''' - def __init__(self, model, unique, **kwargs): - self.model = model - self.unique = unique - self.kwargs = kwargs - - def encode(self): - ''' - JSON-encode model and then encode self to dict. - ''' - return {'model': self.model.to_json(), - 'unique': self.unique, - 'kwargs': self.kwargs, - } - - @staticmethod - def parse(raw_request): - ''' - Parse HTTP request. - - :param raw_request: The request. - :type raw_request: dict[str, str] - - :returns: The decoded object. - :rtype: SimulationRunRequest - ''' - request_dict = json_decode(raw_request) - model = Model.from_json(request_dict['model']) - kwargs = request_dict['kwargs'] - unique = request_dict['unique'] - return SimulationRunRequest(model, unique, **kwargs) - - def hash(self): - ''' - Generate a unique hash of this simulation request. - Does not include number_of_trajectories in this calculation. - - :returns: md5 hex digest. - :rtype: str - ''' - anon_model_string = self.model.to_anon().to_json(encode_private=False) - popped_kwargs = {kw:self.kwargs[kw] for kw in self.kwargs if kw!='number_of_trajectories'} - kwargs_string = json_encode(popped_kwargs) - request_string = f'{anon_model_string}{kwargs_string}' - _hash = md5(str.encode(request_string)).hexdigest() - return _hash - -class SimulationRunResponse(Response): - ''' - A response from the server regarding a SimulationRunRequest. - - :param status: The status of the simulation. - :type status: SimStatus - - :param error_message: Possible error message. - :type error_message: str | None - - :param results_id: Hash of the simulation request. Identifies the results. - :type results_id: str | None - - :param results: JSON-Encoded gillespy2.Results - :type results: str | None - ''' - def __init__(self, status, error_message = None, results_id = None, results = None, task_id = None): - self.status = status - self.error_message = error_message - self.results_id = results_id - self.results = results - self.task_id = task_id - - def encode(self): - ''' - Encode self to dict. - ''' - return {'status': self.status.name, - 'error_message': self.error_message or '', - 'results_id': self.results_id or '', - 'results': self.results or '', - 'task_id': self.task_id or '',} - - @staticmethod - def parse(raw_response): - ''' - Parse HTTP response. - - :param raw_response: The response. - :type raw_response: dict[str, str] - - :returns: The decoded object. - :rtype: SimulationRunResponse - ''' - response_dict = json_decode(raw_response) - status = SimStatus.from_str(response_dict['status']) - results_id = response_dict['results_id'] - error_message = response_dict['error_message'] - task_id = response_dict['task_id'] - if response_dict['results'] != '': - results = Results.from_json(response_dict['results']) - else: - results = None - return SimulationRunResponse(status, error_message, results_id, results, task_id) - -class StatusRequest(Request): - ''' - A request for simulation status. - - :param results_id: Hash of the SimulationRunRequest - :type results_id: str - ''' - def __init__(self, results_id): - self.results_id = results_id - def encode(self): - ''' - :returns: self.__dict__ - :rtype: dict - ''' - return self.__dict__ - - @staticmethod - def parse(raw_request): - ''' - Parse HTTP request. - - :param raw_request: The request. - :type raw_request: dict[str, str] - - :returns: The decoded object. - :rtype: StatusRequest - ''' - request_dict = json_decode(raw_request) - return StatusRequest(request_dict['results_id']) - -class StatusResponse(Response): - ''' - A response from the server about simulation status. - - :param status: Status of the simulation - :type status: SimStatus - - :param message: Possible error message or otherwise - :type message: str - ''' - def __init__(self, status, message = None): - self.status = status - self.message = message - - def encode(self): - ''' - Encodes self. - :returns: self as dict - :rtype: dict[str, str] - ''' - return {'status': self.status.name, - 'message': self.message or ''} - - @staticmethod - def parse(raw_response): - ''' - Parse HTTP response. - - :param raw_response: The response. - :type raw_response: dict[str, str] - - :returns: The decoded object. - :rtype: StatusResponse - ''' - response_dict = json_decode(raw_response) - status = SimStatus.from_str(response_dict['status']) - message = response_dict['message'] - if not message: - return StatusResponse(status) - else: - return StatusResponse(status, message) - -class ResultsRequest(Request): - ''' - Request results from the server. - - :param results_id: Hash of the SimulationRunRequest - :type results_id: str - ''' - def __init__(self, results_id): - self.results_id = results_id - def encode(self): - ''' - :returns: self.__dict__ - :rtype: dict - ''' - return self.__dict__ - @staticmethod - def parse(raw_request): - ''' - Parse HTTP request. - - :param raw_request: The request. - :type raw_request: dict[str, str] - - :returns: The decoded object. - :rtype: ResultsRequest - ''' - request_dict = json_decode(raw_request) - return ResultsRequest(request_dict['results_id']) - -class ResultsResponse(Response): - ''' - A response from the server about the Results. - - :param results: The requested Results from the cache. (JSON) - :type results: str - - ''' - def __init__(self, results = None): - self.results = results - - def encode(self): - ''' - :returns: self.__dict__ - :rtype: dict - ''' - return {'results': self.results or ''} - - @staticmethod - def parse(raw_response): - ''' - Parse HTTP response. - - :param raw_response: The response. - :type raw_response: dict[str, str] - - :returns: The decoded object. - :rtype: ResultsResponse - ''' - response_dict = json_decode(raw_response) - if response_dict['results'] != '': - results = Results.from_json(response_dict['results']) - else: - results = None - return ResultsResponse(results) - -class SourceIpRequest(Request): - ''' - Restrict server access. - - :param cloud_key: Random key generated locally during launch. - :type cloud_key: str - ''' - def __init__(self, cloud_key): - self.cloud_key = cloud_key - def encode(self): - ''' - :returns: self.__dict__ - :rtype: dict - ''' - return self.__dict__ - @staticmethod - def parse(raw_request): - ''' - Parse HTTP request. - - :param raw_request: The request. - :type raw_request: dict[str, str] - - :returns: The decoded object. - :rtype: SourceIpRequest - ''' - request_dict = json_decode(raw_request) - return SourceIpRequest(request_dict['cloud_key']) - -class SourceIpResponse(Response): - ''' - Response from server containing IP address of the source. - - :param source_ip: IP address of the client. - :type source_ip: str - ''' - def __init__(self, source_ip): - self.source_ip = source_ip - - def encode(self): - ''' - :returns: self.__dict__ - :rtype: dict - ''' - return self.__dict__ - - @staticmethod - def parse(raw_response): - ''' - Parses a http response and returns a python object. - - :param raw_response: A raw http SourceIpResponse from the server. - :type raw_response: str - - :returns: The decoded object. - :rtype: SourceIpResponse - ''' - response_dict = json_decode(raw_response) - return SourceIpResponse(response_dict['source_ip']) diff --git a/stochss_compute/core/messages/__init__.py b/stochss_compute/core/messages/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/stochss_compute/core/messages/base.py b/stochss_compute/core/messages/base.py new file mode 100644 index 00000000..df156ecc --- /dev/null +++ b/stochss_compute/core/messages/base.py @@ -0,0 +1,83 @@ +''' +stochss_compute.core.messages +''' +# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. +# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from abc import ABC, abstractmethod +from enum import Enum +from hashlib import md5 +from gillespy2 import Model, Results +from tornado.escape import json_encode, json_decode + + +class SimStatus(Enum): + ''' + Status describing a remote simulation. + ''' + PENDING = 'The simulation is pending.' + RUNNING = 'The simulation is still running.' + READY = 'Simulation is done and results exist in the cache.' + ERROR = 'The Simulation has encountered an error.' + DOES_NOT_EXIST = 'There is no evidence of this simulation either running or on disk.' + + @staticmethod + def from_str(name): + ''' + Convert str to Enum. + ''' + if name == 'PENDING': + return SimStatus.PENDING + if name == 'RUNNING': + return SimStatus.RUNNING + if name == 'READY': + return SimStatus.READY + if name == 'ERROR': + return SimStatus.ERROR + if name == 'DOES_NOT_EXIST': + return SimStatus.DOES_NOT_EXIST + +class Request(ABC): + ''' + Base class. + ''' + @abstractmethod + def encode(self): + ''' + Encode self for http. + ''' + @staticmethod + @abstractmethod + def parse(raw_request): + ''' + Parse http for python. + ''' + +class Response(ABC): + ''' + Base class. + ''' + @abstractmethod + def encode(self): + ''' + Encode self for http. + ''' + @staticmethod + @abstractmethod + def parse(raw_response): + ''' + Parse http for python. + ''' diff --git a/stochss_compute/core/messages/results.py b/stochss_compute/core/messages/results.py new file mode 100644 index 00000000..81c2f6c3 --- /dev/null +++ b/stochss_compute/core/messages/results.py @@ -0,0 +1,64 @@ +class ResultsRequest(Request): + ''' + Request results from the server. + + :param results_id: Hash of the SimulationRunRequest + :type results_id: str + ''' + def __init__(self, results_id): + self.results_id = results_id + def encode(self): + ''' + :returns: self.__dict__ + :rtype: dict + ''' + return self.__dict__ + @staticmethod + def parse(raw_request): + ''' + Parse HTTP request. + + :param raw_request: The request. + :type raw_request: dict[str, str] + + :returns: The decoded object. + :rtype: ResultsRequest + ''' + request_dict = json_decode(raw_request) + return ResultsRequest(request_dict['results_id']) + +class ResultsResponse(Response): + ''' + A response from the server about the Results. + + :param results: The requested Results from the cache. (JSON) + :type results: str + + ''' + def __init__(self, results = None): + self.results = results + + def encode(self): + ''' + :returns: self.__dict__ + :rtype: dict + ''' + return {'results': self.results or ''} + + @staticmethod + def parse(raw_response): + ''' + Parse HTTP response. + + :param raw_response: The response. + :type raw_response: dict[str, str] + + :returns: The decoded object. + :rtype: ResultsResponse + ''' + response_dict = json_decode(raw_response) + if response_dict['results'] != '': + results = Results.from_json(response_dict['results']) + else: + results = None + return ResultsResponse(results) \ No newline at end of file diff --git a/stochss_compute/core/messages/simulation_run.py b/stochss_compute/core/messages/simulation_run.py new file mode 100644 index 00000000..d4b6abb2 --- /dev/null +++ b/stochss_compute/core/messages/simulation_run.py @@ -0,0 +1,113 @@ +class SimulationRunRequest(Request): + ''' + A simulation request. + + :param model: A model to run. + :type model: gillespy2.Model + + :param unique: When True, ignore cache completely and return always new results. + :type unique: bool + + :param kwargs: kwargs for the model.run() call. + :type kwargs: dict[str, Any] + ''' + def __init__(self, model, unique, **kwargs): + self.model = model + self.unique = unique + self.kwargs = kwargs + + def encode(self): + ''' + JSON-encode model and then encode self to dict. + ''' + return {'model': self.model.to_json(), + 'unique': self.unique, + 'kwargs': self.kwargs, + } + + @staticmethod + def parse(raw_request): + ''' + Parse HTTP request. + + :param raw_request: The request. + :type raw_request: dict[str, str] + + :returns: The decoded object. + :rtype: SimulationRunRequest + ''' + request_dict = json_decode(raw_request) + model = Model.from_json(request_dict['model']) + kwargs = request_dict['kwargs'] + unique = request_dict['unique'] + return SimulationRunRequest(model, unique, **kwargs) + + def hash(self): + ''' + Generate a unique hash of this simulation request. + Does not include number_of_trajectories in this calculation. + + :returns: md5 hex digest. + :rtype: str + ''' + anon_model_string = self.model.to_anon().to_json(encode_private=False) + popped_kwargs = {kw:self.kwargs[kw] for kw in self.kwargs if kw!='number_of_trajectories'} + kwargs_string = json_encode(popped_kwargs) + request_string = f'{anon_model_string}{kwargs_string}' + _hash = md5(str.encode(request_string)).hexdigest() + return _hash + +class SimulationRunResponse(Response): + ''' + A response from the server regarding a SimulationRunRequest. + + :param status: The status of the simulation. + :type status: SimStatus + + :param error_message: Possible error message. + :type error_message: str | None + + :param results_id: Hash of the simulation request. Identifies the results. + :type results_id: str | None + + :param results: JSON-Encoded gillespy2.Results + :type results: str | None + ''' + def __init__(self, status, error_message = None, results_id = None, results = None, task_id = None): + self.status = status + self.error_message = error_message + self.results_id = results_id + self.results = results + self.task_id = task_id + + def encode(self): + ''' + Encode self to dict. + ''' + return {'status': self.status.name, + 'error_message': self.error_message or '', + 'results_id': self.results_id or '', + 'results': self.results or '', + 'task_id': self.task_id or '',} + + @staticmethod + def parse(raw_response): + ''' + Parse HTTP response. + + :param raw_response: The response. + :type raw_response: dict[str, str] + + :returns: The decoded object. + :rtype: SimulationRunResponse + ''' + response_dict = json_decode(raw_response) + status = SimStatus.from_str(response_dict['status']) + results_id = response_dict['results_id'] + error_message = response_dict['error_message'] + task_id = response_dict['task_id'] + if response_dict['results'] != '': + results = Results.from_json(response_dict['results']) + else: + results = None + return SimulationRunResponse(status, error_message, results_id, results, task_id) diff --git a/stochss_compute/core/messages/simulation_run_unique.py b/stochss_compute/core/messages/simulation_run_unique.py new file mode 100644 index 00000000..868ef11f --- /dev/null +++ b/stochss_compute/core/messages/simulation_run_unique.py @@ -0,0 +1,113 @@ +class SimulationRunRequest(Request): + ''' + A simulation request. + + :param model: A model to run. + :type model: gillespy2.Model + + :param unique: When True, ignore cache completely and return always new results. + :type unique: bool + + :param kwargs: kwargs for the model.run() call. + :type kwargs: dict[str, Any] + ''' + def __init__(self, model, unique, **kwargs): + self.model = model + self.unique = unique + self.kwargs = kwargs + + def encode(self): + ''' + JSON-encode model and then encode self to dict. + ''' + return {'model': self.model.to_json(), + 'unique': self.unique, + 'kwargs': self.kwargs, + } + + @staticmethod + def parse(raw_request): + ''' + Parse HTTP request. + + :param raw_request: The request. + :type raw_request: dict[str, str] + + :returns: The decoded object. + :rtype: SimulationRunRequest + ''' + request_dict = json_decode(raw_request) + model = Model.from_json(request_dict['model']) + kwargs = request_dict['kwargs'] + unique = request_dict['unique'] + return SimulationRunRequest(model, unique, **kwargs) + + def hash(self): + ''' + Generate a unique hash of this simulation request. + Does not include number_of_trajectories in this calculation. + + :returns: md5 hex digest. + :rtype: str + ''' + anon_model_string = self.model.to_anon().to_json(encode_private=False) + popped_kwargs = {kw:self.kwargs[kw] for kw in self.kwargs if kw!='number_of_trajectories'} + kwargs_string = json_encode(popped_kwargs) + request_string = f'{anon_model_string}{kwargs_string}' + _hash = md5(str.encode(request_string)).hexdigest() + return _hash + +class SimulationRunResponse(Response): + ''' + A response from the server regarding a SimulationRunRequest. + + :param status: The status of the simulation. + :type status: SimStatus + + :param error_message: Possible error message. + :type error_message: str | None + + :param results_id: Hash of the simulation request. Identifies the results. + :type results_id: str | None + + :param results: JSON-Encoded gillespy2.Results + :type results: str | None + ''' + def __init__(self, status, error_message = None, results_id = None, results = None, task_id = None): + self.status = status + self.error_message = error_message + self.results_id = results_id + self.results = results + self.task_id = task_id + + def encode(self): + ''' + Encode self to dict. + ''' + return {'status': self.status.name, + 'error_message': self.error_message or '', + 'results_id': self.results_id or '', + 'results': self.results or '', + 'task_id': self.task_id or '',} + + @staticmethod + def parse(raw_response): + ''' + Parse HTTP response. + + :param raw_response: The response. + :type raw_response: dict[str, str] + + :returns: The decoded object. + :rtype: SimulationRunResponse + ''' + response_dict = json_decode(raw_response) + status = SimStatus.from_str(response_dict['status']) + results_id = response_dict['results_id'] + error_message = response_dict['error_message'] + task_id = response_dict['task_id'] + if response_dict['results'] != '': + results = Results.from_json(response_dict['results']) + else: + results = None + return SimulationRunResponse(status, error_message, results_id, results, task_id) \ No newline at end of file diff --git a/stochss_compute/core/messages/source_ip.py b/stochss_compute/core/messages/source_ip.py new file mode 100644 index 00000000..5e499653 --- /dev/null +++ b/stochss_compute/core/messages/source_ip.py @@ -0,0 +1,59 @@ +class SourceIpRequest(Request): + ''' + Restrict server access. + + :param cloud_key: Random key generated locally during launch. + :type cloud_key: str + ''' + def __init__(self, cloud_key): + self.cloud_key = cloud_key + def encode(self): + ''' + :returns: self.__dict__ + :rtype: dict + ''' + return self.__dict__ + @staticmethod + def parse(raw_request): + ''' + Parse HTTP request. + + :param raw_request: The request. + :type raw_request: dict[str, str] + + :returns: The decoded object. + :rtype: SourceIpRequest + ''' + request_dict = json_decode(raw_request) + return SourceIpRequest(request_dict['cloud_key']) + +class SourceIpResponse(Response): + ''' + Response from server containing IP address of the source. + + :param source_ip: IP address of the client. + :type source_ip: str + ''' + def __init__(self, source_ip): + self.source_ip = source_ip + + def encode(self): + ''' + :returns: self.__dict__ + :rtype: dict + ''' + return self.__dict__ + + @staticmethod + def parse(raw_response): + ''' + Parses a http response and returns a python object. + + :param raw_response: A raw http SourceIpResponse from the server. + :type raw_response: str + + :returns: The decoded object. + :rtype: SourceIpResponse + ''' + response_dict = json_decode(raw_response) + return SourceIpResponse(response_dict['source_ip']) diff --git a/stochss_compute/core/messages/status.py b/stochss_compute/core/messages/status.py new file mode 100644 index 00000000..fecff019 --- /dev/null +++ b/stochss_compute/core/messages/status.py @@ -0,0 +1,71 @@ +class StatusRequest(Request): + ''' + A request for simulation status. + + :param results_id: Hash of the SimulationRunRequest + :type results_id: str + ''' + def __init__(self, results_id): + self.results_id = results_id + def encode(self): + ''' + :returns: self.__dict__ + :rtype: dict + ''' + return self.__dict__ + + @staticmethod + def parse(raw_request): + ''' + Parse HTTP request. + + :param raw_request: The request. + :type raw_request: dict[str, str] + + :returns: The decoded object. + :rtype: StatusRequest + ''' + request_dict = json_decode(raw_request) + return StatusRequest(request_dict['results_id']) + +class StatusResponse(Response): + ''' + A response from the server about simulation status. + + :param status: Status of the simulation + :type status: SimStatus + + :param message: Possible error message or otherwise + :type message: str + ''' + def __init__(self, status, message = None): + self.status = status + self.message = message + + def encode(self): + ''' + Encodes self. + :returns: self as dict + :rtype: dict[str, str] + ''' + return {'status': self.status.name, + 'message': self.message or ''} + + @staticmethod + def parse(raw_response): + ''' + Parse HTTP response. + + :param raw_response: The response. + :type raw_response: dict[str, str] + + :returns: The decoded object. + :rtype: StatusResponse + ''' + response_dict = json_decode(raw_response) + status = SimStatus.from_str(response_dict['status']) + message = response_dict['message'] + if not message: + return StatusResponse(status) + else: + return StatusResponse(status, message) \ No newline at end of file diff --git a/stochss_compute/server/run_unique.py b/stochss_compute/server/run_unique.py new file mode 100644 index 00000000..5404a0f5 --- /dev/null +++ b/stochss_compute/server/run_unique.py @@ -0,0 +1,112 @@ +''' +stochss_compute.server.run +''' +# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. +# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import random +from datetime import datetime +from secrets import token_hex + +from tornado.web import RequestHandler +from tornado.ioloop import IOLoop +from distributed import Client, Future +from gillespy2.core import Results +from stochss_compute.core.messages import SimStatus, SimulationRunRequest, SimulationRunResponse +from stochss_compute.server.cache import Cache + + +class RunUniqueHandler(RequestHandler): + ''' + Endpoint for running Gillespy2 simulations. + ''' + + def initialize(self, scheduler_address, cache_dir): + ''' + Sets the address to the Dask scheduler and the cache directory. + + :param scheduler_address: Scheduler address. + :type scheduler_address: str + + :param cache_dir: Path to the cache. + :type cache_dir: str + ''' + self.scheduler_address = scheduler_address + self.cache_dir = cache_dir + + async def post(self): + ''' + Process simulation run request. + ''' + sim_request = SimulationRunRequest.parse(self.request.body) + sim_hash = sim_request.hash() + log_string = f'{datetime.now()} | <{self.request.remote_ip}> | Simulation Run Request | <{sim_hash}> | ' + cache = Cache(self.cache_dir, sim_hash) + if not cache.exists(): + cache.create() + empty = cache.is_empty() + if not empty: + # Check the number of trajectories in the request, default 1 + n_traj = sim_request.kwargs.get('number_of_trajectories', 1) + # Compare that to the number of cached trajectories + trajectories_needed = cache.n_traj_needed(n_traj) + if trajectories_needed > 0: + sim_request.kwargs['number_of_trajectories'] = trajectories_needed + print(log_string + + f'Partial cache. Running {trajectories_needed} new trajectories.') + client = Client(self.scheduler_address) + future = self._submit(sim_request, sim_hash, client) + self._return_running(sim_hash, future.key) + IOLoop.current().run_in_executor(None, self._cache, sim_hash, future, client) + else: + print(log_string + 'Returning cached results.') + results = cache.get() + ret_traj = random.sample(results, n_traj) + new_results = Results(ret_traj) + new_results_json = new_results.to_json() + sim_response = SimulationRunResponse(SimStatus.READY, results_id = sim_hash, results = new_results_json) + self.write(sim_response.encode()) + self.finish() + if empty: + print(log_string + 'Results not cached. Running simulation.') + client = Client(self.scheduler_address) + future = self._submit(sim_request, sim_hash, client) + self._return_running(sim_hash, future.key) + IOLoop.current().run_in_executor(None, self._cache, sim_hash, future, client) + + def _cache(self, sim_hash, future: Future, client: Client): + results = future.result() + client.close() + cache = Cache(self.cache_dir, sim_hash) + cache.save(results) + + def _submit(self, sim_request, sim_hash, client: Client): + model = sim_request.model + kwargs = sim_request.kwargs + n_traj = kwargs.get('number_of_trajectories', 1) + if "solver" in kwargs: + from pydoc import locate + kwargs["solver"] = locate(kwargs["solver"]) + + # keep client open for now! close? + key = f'{sim_hash}:{n_traj}:{token_hex(8)}' + future = client.submit(model.run, **kwargs, key=key) + return future + + def _return_running(self, results_id, task_id): + sim_response = SimulationRunResponse(SimStatus.RUNNING, results_id=results_id, task_id=task_id) + self.write(sim_response.encode()) + self.finish() From a6460cdfa1d6e28b4b47b93e19240326545996cc Mon Sep 17 00:00:00 2001 From: mdip226 Date: Tue, 2 May 2023 11:39:28 -0400 Subject: [PATCH 04/31] imports --- stochss_compute/core/messages/base.py | 4 ---- stochss_compute/core/messages/results.py | 8 ++++++++ .../core/messages/simulation_run.py | 18 ++++++++++-------- stochss_compute/core/remote_simulation.py | 2 +- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/stochss_compute/core/messages/base.py b/stochss_compute/core/messages/base.py index df156ecc..3184c171 100644 --- a/stochss_compute/core/messages/base.py +++ b/stochss_compute/core/messages/base.py @@ -19,10 +19,6 @@ from abc import ABC, abstractmethod from enum import Enum -from hashlib import md5 -from gillespy2 import Model, Results -from tornado.escape import json_encode, json_decode - class SimStatus(Enum): ''' diff --git a/stochss_compute/core/messages/results.py b/stochss_compute/core/messages/results.py index 81c2f6c3..30c6a5d5 100644 --- a/stochss_compute/core/messages/results.py +++ b/stochss_compute/core/messages/results.py @@ -1,3 +1,11 @@ +''' +stochss_compute.core.messages.results +''' +from tornado.escape import json_decode +from gillespy2 import Results + +from stochss_compute.core.messages.base import Request, Response + class ResultsRequest(Request): ''' Request results from the server. diff --git a/stochss_compute/core/messages/simulation_run.py b/stochss_compute/core/messages/simulation_run.py index d4b6abb2..a2152603 100644 --- a/stochss_compute/core/messages/simulation_run.py +++ b/stochss_compute/core/messages/simulation_run.py @@ -1,3 +1,11 @@ +''' +stochss_compute.core.messages.simulation_run +''' +from hashlib import md5 +from tornado.escape import json_decode, json_encode +from gillespy2 import Model +from stochss_compute.core.messages.base import Request, Response, SimStatus + class SimulationRunRequest(Request): ''' A simulation request. @@ -5,15 +13,11 @@ class SimulationRunRequest(Request): :param model: A model to run. :type model: gillespy2.Model - :param unique: When True, ignore cache completely and return always new results. - :type unique: bool - :param kwargs: kwargs for the model.run() call. :type kwargs: dict[str, Any] ''' - def __init__(self, model, unique, **kwargs): + def __init__(self, model,**kwargs): self.model = model - self.unique = unique self.kwargs = kwargs def encode(self): @@ -21,7 +25,6 @@ def encode(self): JSON-encode model and then encode self to dict. ''' return {'model': self.model.to_json(), - 'unique': self.unique, 'kwargs': self.kwargs, } @@ -39,8 +42,7 @@ def parse(raw_request): request_dict = json_decode(raw_request) model = Model.from_json(request_dict['model']) kwargs = request_dict['kwargs'] - unique = request_dict['unique'] - return SimulationRunRequest(model, unique, **kwargs) + return SimulationRunRequest(model, **kwargs) def hash(self): ''' diff --git a/stochss_compute/core/remote_simulation.py b/stochss_compute/core/remote_simulation.py index 394abc28..d5c42bc0 100644 --- a/stochss_compute/core/remote_simulation.py +++ b/stochss_compute/core/remote_simulation.py @@ -123,7 +123,7 @@ def run(self, unique=False, **params): if self.solver is not None: params["solver"] = f"{self.solver.__module__}.{self.solver.__qualname__}" - sim_request = SimulationRunRequest(self.model, unique, **params) + sim_request = SimulationRunRequest(self.model, **params) response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=sim_request) if not response_raw.ok: raise Exception(response_raw.reason) From 5b6a891bca6af6651cd97cf590594a7c11f94b55 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Wed, 3 May 2023 17:32:07 -0400 Subject: [PATCH 05/31] get imports right --- stochss_compute/client/server.py | 2 +- stochss_compute/cloud/ec2.py | 2 +- stochss_compute/core/messages/base.py | 27 -------------- .../core/messages/simulation_run.py | 8 +++-- .../core/messages/simulation_run_unique.py | 7 ++-- stochss_compute/core/messages/source_ip.py | 9 +++++ stochss_compute/core/messages/status.py | 36 +++++++++++++++++++ stochss_compute/core/remote_results.py | 3 +- stochss_compute/core/remote_simulation.py | 28 ++++++++++++--- stochss_compute/server/is_cached.py | 2 +- stochss_compute/server/results.py | 2 +- stochss_compute/server/run.py | 3 +- stochss_compute/server/run_unique.py | 3 +- stochss_compute/server/sourceip.py | 2 +- stochss_compute/server/status.py | 2 +- test/integration_tests/test_cache.py | 2 +- test/unit_tests/test_compute_server.py | 2 +- test/unit_tests/test_hash.py | 2 +- test/unit_tests/test_is_cached_handler.py | 7 ++-- test/unit_tests/test_results_handler.py | 2 +- test/unit_tests/test_run_handler.py | 2 +- test/unit_tests/test_sourceip_handler.py | 2 +- test/unit_tests/test_status_handler.py | 4 ++- 23 files changed, 101 insertions(+), 58 deletions(-) diff --git a/stochss_compute/client/server.py b/stochss_compute/client/server.py index a381e44c..ae7b6bb5 100644 --- a/stochss_compute/client/server.py +++ b/stochss_compute/client/server.py @@ -21,7 +21,7 @@ from abc import ABC, abstractmethod import requests from stochss_compute.client.endpoint import Endpoint -from stochss_compute.core.messages import Request, Response +from stochss_compute.core.messages.base import Request class Server(ABC): ''' diff --git a/stochss_compute/cloud/ec2.py b/stochss_compute/cloud/ec2.py index 08d09b2f..5d458513 100644 --- a/stochss_compute/cloud/ec2.py +++ b/stochss_compute/cloud/ec2.py @@ -23,7 +23,7 @@ from secrets import token_hex from stochss_compute.client.server import Server from stochss_compute.cloud.ec2_config import EC2LocalConfig, EC2RemoteConfig -from stochss_compute.core.messages import SourceIpRequest, SourceIpResponse +from stochss_compute.core.messages.source_ip import SourceIpRequest, SourceIpResponse from stochss_compute.cloud.exceptions import EC2ImportException, ResourceException, EC2Exception from stochss_compute.client.endpoint import Endpoint try: diff --git a/stochss_compute/core/messages/base.py b/stochss_compute/core/messages/base.py index 3184c171..d7779114 100644 --- a/stochss_compute/core/messages/base.py +++ b/stochss_compute/core/messages/base.py @@ -18,33 +18,6 @@ # along with this program. If not, see . from abc import ABC, abstractmethod -from enum import Enum - -class SimStatus(Enum): - ''' - Status describing a remote simulation. - ''' - PENDING = 'The simulation is pending.' - RUNNING = 'The simulation is still running.' - READY = 'Simulation is done and results exist in the cache.' - ERROR = 'The Simulation has encountered an error.' - DOES_NOT_EXIST = 'There is no evidence of this simulation either running or on disk.' - - @staticmethod - def from_str(name): - ''' - Convert str to Enum. - ''' - if name == 'PENDING': - return SimStatus.PENDING - if name == 'RUNNING': - return SimStatus.RUNNING - if name == 'READY': - return SimStatus.READY - if name == 'ERROR': - return SimStatus.ERROR - if name == 'DOES_NOT_EXIST': - return SimStatus.DOES_NOT_EXIST class Request(ABC): ''' diff --git a/stochss_compute/core/messages/simulation_run.py b/stochss_compute/core/messages/simulation_run.py index a2152603..707dd226 100644 --- a/stochss_compute/core/messages/simulation_run.py +++ b/stochss_compute/core/messages/simulation_run.py @@ -3,8 +3,9 @@ ''' from hashlib import md5 from tornado.escape import json_decode, json_encode -from gillespy2 import Model -from stochss_compute.core.messages.base import Request, Response, SimStatus +from gillespy2 import Model, Results +from stochss_compute.core.messages.base import Request, Response +from stochss_compute.core.messages.status import SimStatus class SimulationRunRequest(Request): ''' @@ -54,6 +55,9 @@ def hash(self): ''' anon_model_string = self.model.to_anon().to_json(encode_private=False) popped_kwargs = {kw:self.kwargs[kw] for kw in self.kwargs if kw!='number_of_trajectories'} + # Explanation of line above: + # Take 'self.kwargs' (a dict), and add all entries to a new dictionary, + # EXCEPT the 'number_of_trajectories' key/value pair. kwargs_string = json_encode(popped_kwargs) request_string = f'{anon_model_string}{kwargs_string}' _hash = md5(str.encode(request_string)).hexdigest() diff --git a/stochss_compute/core/messages/simulation_run_unique.py b/stochss_compute/core/messages/simulation_run_unique.py index 868ef11f..c0dd70ac 100644 --- a/stochss_compute/core/messages/simulation_run_unique.py +++ b/stochss_compute/core/messages/simulation_run_unique.py @@ -11,9 +11,8 @@ class SimulationRunRequest(Request): :param kwargs: kwargs for the model.run() call. :type kwargs: dict[str, Any] ''' - def __init__(self, model, unique, **kwargs): + def __init__(self, model, **kwargs): self.model = model - self.unique = unique self.kwargs = kwargs def encode(self): @@ -21,7 +20,6 @@ def encode(self): JSON-encode model and then encode self to dict. ''' return {'model': self.model.to_json(), - 'unique': self.unique, 'kwargs': self.kwargs, } @@ -39,8 +37,7 @@ def parse(raw_request): request_dict = json_decode(raw_request) model = Model.from_json(request_dict['model']) kwargs = request_dict['kwargs'] - unique = request_dict['unique'] - return SimulationRunRequest(model, unique, **kwargs) + return SimulationRunRequest(model, **kwargs) def hash(self): ''' diff --git a/stochss_compute/core/messages/source_ip.py b/stochss_compute/core/messages/source_ip.py index 5e499653..71f8b789 100644 --- a/stochss_compute/core/messages/source_ip.py +++ b/stochss_compute/core/messages/source_ip.py @@ -1,3 +1,9 @@ +''' +stochss_compute.core.messages.source_ip +''' +from tornado.escape import json_decode +from stochss_compute.core.messages.base import Request, Response + class SourceIpRequest(Request): ''' Restrict server access. @@ -5,14 +11,17 @@ class SourceIpRequest(Request): :param cloud_key: Random key generated locally during launch. :type cloud_key: str ''' + def __init__(self, cloud_key): self.cloud_key = cloud_key + def encode(self): ''' :returns: self.__dict__ :rtype: dict ''' return self.__dict__ + @staticmethod def parse(raw_request): ''' diff --git a/stochss_compute/core/messages/status.py b/stochss_compute/core/messages/status.py index fecff019..a8b5df61 100644 --- a/stochss_compute/core/messages/status.py +++ b/stochss_compute/core/messages/status.py @@ -1,3 +1,39 @@ +''' +stochss_compute.core.messages.status +''' +from enum import Enum +from tornado.escape import json_decode +from stochss_compute.core.messages.base import Request, Response + +class SimStatus(Enum): + ''' + Status describing a remote simulation. + ''' + PENDING = 'The simulation is pending.' + RUNNING = 'The simulation is still running.' + READY = 'Simulation is done and results exist in the cache.' + ERROR = 'The Simulation has encountered an error.' + DOES_NOT_EXIST = 'There is no evidence of this simulation either running or on disk.' + + @staticmethod + def from_str(name): + ''' + Convert str to Enum. + ''' + if name == 'PENDING': + return SimStatus.PENDING + if name == 'RUNNING': + return SimStatus.RUNNING + if name == 'READY': + return SimStatus.READY + if name == 'ERROR': + return SimStatus.ERROR + if name == 'DOES_NOT_EXIST': + return SimStatus.DOES_NOT_EXIST + # pylint: disable=no-member + raise ValueError(f'Not a valid status.\n{SimStatus._member_names_}') + # pylint: enable=no-member + class StatusRequest(Request): ''' A request for simulation status. diff --git a/stochss_compute/core/remote_results.py b/stochss_compute/core/remote_results.py index dc7dce01..34fbce0f 100644 --- a/stochss_compute/core/remote_results.py +++ b/stochss_compute/core/remote_results.py @@ -21,7 +21,8 @@ from gillespy2 import Results from stochss_compute.client.endpoint import Endpoint from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages import ResultsResponse, SimStatus, StatusResponse +from stochss_compute.core.messages.results import ResultsResponse +from stochss_compute.core.messages.status import StatusResponse, SimStatus class RemoteResults(Results): ''' diff --git a/stochss_compute/core/remote_simulation.py b/stochss_compute/core/remote_simulation.py index d5c42bc0..2d8a7682 100644 --- a/stochss_compute/core/remote_simulation.py +++ b/stochss_compute/core/remote_simulation.py @@ -1,5 +1,5 @@ ''' -RemoteSimulation +stochss_compute.core.remote_simulation ''' # StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. # Copyright (C) 2019-2023 GillesPy2 and StochSS developers. @@ -18,7 +18,8 @@ # along with this program. If not, see . from stochss_compute.client.endpoint import Endpoint -from stochss_compute.core.messages import SimulationRunRequest, SimulationRunResponse, SimStatus +from stochss_compute.core.messages.simulation_run import SimulationRunRequest, SimulationRunResponse +from stochss_compute.core.messages.status import SimStatus from stochss_compute.core.errors import RemoteSimulationError from stochss_compute.core.remote_results import RemoteResults @@ -122,8 +123,18 @@ def run(self, unique=False, **params): params["solver"] = f"{params['solver'].__module__}.{params['solver'].__qualname__}" if self.solver is not None: params["solver"] = f"{self.solver.__module__}.{self.solver.__qualname__}" - - sim_request = SimulationRunRequest(self.model, **params) + if unique is True: + sim_request = SimulationRunUniqueRequest(self.model, **params) + self._run_unique(sim_request) + if unique is False: + sim_request = SimulationRunRequest(self.model, **params) + self._run(sim_request) + + def _run(self, request): + ''' + :param request: Request to send to the server. Contains Model and related arguments. + :type request: SimulationRunRequest + ''' response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=sim_request) if not response_raw.ok: raise Exception(response_raw.reason) @@ -143,3 +154,12 @@ def run(self, unique=False, **params): remote_results.task_id = sim_response.task_id return remote_results + + def _run_unique(self, request): + ''' + :param request: Request to send to the server. Contains Model and related arguments. + :type request: SimulationRunUniqueRequest + ''' + remote_results = RemoteResults() + return remote_results + diff --git a/stochss_compute/server/is_cached.py b/stochss_compute/server/is_cached.py index 8e5726fb..83d68931 100644 --- a/stochss_compute/server/is_cached.py +++ b/stochss_compute/server/is_cached.py @@ -20,7 +20,7 @@ from datetime import datetime from tornado.web import RequestHandler from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages import SimStatus, StatusResponse +from stochss_compute.core.messages.status import SimStatus, StatusResponse from stochss_compute.server.cache import Cache class IsCachedHandler(RequestHandler): diff --git a/stochss_compute/server/results.py b/stochss_compute/server/results.py index 4f43f72f..93091aa4 100644 --- a/stochss_compute/server/results.py +++ b/stochss_compute/server/results.py @@ -20,7 +20,7 @@ from datetime import datetime from tornado.web import RequestHandler from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages import ResultsResponse +from stochss_compute.core.messages.results import ResultsResponse from stochss_compute.server.cache import Cache class ResultsHandler(RequestHandler): diff --git a/stochss_compute/server/run.py b/stochss_compute/server/run.py index d6afb6be..1595be20 100644 --- a/stochss_compute/server/run.py +++ b/stochss_compute/server/run.py @@ -25,7 +25,8 @@ from tornado.ioloop import IOLoop from distributed import Client, Future from gillespy2.core import Results -from stochss_compute.core.messages import SimStatus, SimulationRunRequest, SimulationRunResponse +from stochss_compute.core.messages.status import SimStatus +from stochss_compute.core.messages.simulation_run import SimulationRunRequest, SimulationRunResponse from stochss_compute.server.cache import Cache diff --git a/stochss_compute/server/run_unique.py b/stochss_compute/server/run_unique.py index 5404a0f5..558f4498 100644 --- a/stochss_compute/server/run_unique.py +++ b/stochss_compute/server/run_unique.py @@ -25,7 +25,8 @@ from tornado.ioloop import IOLoop from distributed import Client, Future from gillespy2.core import Results -from stochss_compute.core.messages import SimStatus, SimulationRunRequest, SimulationRunResponse +from stochss_compute.core.messages.status import SimStatus +from stochss_compute.core.messages.simulation_run import SimulationRunRequest, SimulationRunResponse from stochss_compute.server.cache import Cache diff --git a/stochss_compute/server/sourceip.py b/stochss_compute/server/sourceip.py index 1fce8e8e..a0068631 100644 --- a/stochss_compute/server/sourceip.py +++ b/stochss_compute/server/sourceip.py @@ -19,7 +19,7 @@ import os from tornado.web import RequestHandler -from stochss_compute.core.messages import SourceIpRequest, SourceIpResponse +from stochss_compute.core.messages.source_ip import SourceIpRequest, SourceIpResponse class SourceIpHandler(RequestHandler): ''' diff --git a/stochss_compute/server/status.py b/stochss_compute/server/status.py index 3ad4ec2e..8e309993 100644 --- a/stochss_compute/server/status.py +++ b/stochss_compute/server/status.py @@ -21,7 +21,7 @@ from distributed import Client from tornado.web import RequestHandler from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages import SimStatus, StatusResponse +from stochss_compute.core.messages.status import SimStatus, StatusResponse from stochss_compute.server.cache import Cache class StatusHandler(RequestHandler): diff --git a/test/integration_tests/test_cache.py b/test/integration_tests/test_cache.py index 470d989d..7ecf2f31 100644 --- a/test/integration_tests/test_cache.py +++ b/test/integration_tests/test_cache.py @@ -5,7 +5,7 @@ import subprocess import unittest from gillespy2 import Model -from stochss_compute.core.messages import SimulationRunRequest +from stochss_compute.core.messages.simulation_run import SimulationRunRequest from stochss_compute.server.cache import Cache from . import gillespy2_models diff --git a/test/unit_tests/test_compute_server.py b/test/unit_tests/test_compute_server.py index f6e9f1e2..0c00f9c2 100644 --- a/test/unit_tests/test_compute_server.py +++ b/test/unit_tests/test_compute_server.py @@ -5,7 +5,7 @@ from stochss_compute import ComputeServer from stochss_compute.client.server import Server from stochss_compute.client.endpoint import Endpoint -from stochss_compute.core.messages import SimulationRunRequest +from stochss_compute.core.messages.simulation_run import SimulationRunRequest from .gillespy2_models import create_decay class ComputeServerTest(unittest.TestCase): diff --git a/test/unit_tests/test_hash.py b/test/unit_tests/test_hash.py index 6f6c533a..ab9f8ac0 100644 --- a/test/unit_tests/test_hash.py +++ b/test/unit_tests/test_hash.py @@ -2,7 +2,7 @@ HashTest(unittest.TestCase) ''' import unittest -from stochss_compute.core.messages import SimulationRunRequest +from stochss_compute.core.messages.simulation_run import SimulationRunRequest from . import gillespy2_models class HashTest(unittest.TestCase): diff --git a/test/unit_tests/test_is_cached_handler.py b/test/unit_tests/test_is_cached_handler.py index 12581c90..89316671 100644 --- a/test/unit_tests/test_is_cached_handler.py +++ b/test/unit_tests/test_is_cached_handler.py @@ -4,10 +4,9 @@ import os import subprocess from tornado.testing import AsyncHTTPTestCase -from stochss_compute.client.compute_server import ComputeServer -from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages import ResultsResponse, SimStatus, SimulationRunRequest, SimulationRunResponse, StatusResponse -from stochss_compute.core.remote_results import RemoteResults +from stochss_compute.core.messages.base import SimStatus +from stochss_compute.core.messages.status import StatusResponse +from stochss_compute.core.messages.simulation_run import SimulationRunRequest from stochss_compute.server.api import _make_app from stochss_compute.server.cache import Cache from .gillespy2_models import create_michaelis_menten diff --git a/test/unit_tests/test_results_handler.py b/test/unit_tests/test_results_handler.py index e8ce5b07..0f8cfa2a 100644 --- a/test/unit_tests/test_results_handler.py +++ b/test/unit_tests/test_results_handler.py @@ -4,7 +4,7 @@ import os import subprocess from tornado.testing import AsyncHTTPTestCase -from stochss_compute.core.messages import SimulationRunRequest +from stochss_compute.core.messages.simulation_run import SimulationRunRequest from stochss_compute.server.api import _make_app from stochss_compute.server.cache import Cache from .gillespy2_models import create_michaelis_menten diff --git a/test/unit_tests/test_run_handler.py b/test/unit_tests/test_run_handler.py index c484a990..1bac117d 100644 --- a/test/unit_tests/test_run_handler.py +++ b/test/unit_tests/test_run_handler.py @@ -1,6 +1,6 @@ import os import subprocess -from stochss_compute.core.messages import SimulationRunRequest +from stochss_compute.core.messages.simulation_run import SimulationRunRequest from stochss_compute.server.api import _make_app from stochss_compute.server.cache import Cache from stochss_compute.server.run import RunHandler diff --git a/test/unit_tests/test_sourceip_handler.py b/test/unit_tests/test_sourceip_handler.py index cc6ee2df..09db5fdd 100644 --- a/test/unit_tests/test_sourceip_handler.py +++ b/test/unit_tests/test_sourceip_handler.py @@ -6,7 +6,7 @@ import os import subprocess from tornado.testing import AsyncHTTPTestCase -from stochss_compute.core.messages import SourceIpRequest, SourceIpResponse +from stochss_compute.core.messages.source_ip import SourceIpRequest, SourceIpResponse from stochss_compute.server.api import _make_app class SourceIpHandlerTest(AsyncHTTPTestCase): diff --git a/test/unit_tests/test_status_handler.py b/test/unit_tests/test_status_handler.py index dbb157d7..7d4c1314 100644 --- a/test/unit_tests/test_status_handler.py +++ b/test/unit_tests/test_status_handler.py @@ -5,7 +5,9 @@ import subprocess from test.unit_tests.gillespy2_models import create_michaelis_menten from tornado.testing import AsyncHTTPTestCase -from stochss_compute.core.messages import SimStatus, SimulationRunRequest, StatusResponse +from stochss_compute.core.messages.base import SimStatus +from stochss_compute.core.messages.simulation_run import SimulationRunRequest +from stochss_compute.core.messages.status import StatusResponse from stochss_compute.server.api import _make_app from stochss_compute.server.cache import Cache From 87d414dc60f85b3538b11fae7d2d17e7ad452737 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Wed, 3 May 2023 20:41:18 -0400 Subject: [PATCH 06/31] new class just dropped --- stochss_compute/core/exceptions.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 stochss_compute/core/exceptions.py diff --git a/stochss_compute/core/exceptions.py b/stochss_compute/core/exceptions.py new file mode 100644 index 00000000..40a88219 --- /dev/null +++ b/stochss_compute/core/exceptions.py @@ -0,0 +1,24 @@ +''' +stochss_compute.core.exceptions +''' +# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. +# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +class PRNGCollision(Exception): + ''' + ...Lucky??? + ''' + \ No newline at end of file From 7d522d2e0941a33b6805a8ba275a280fec4b2a6f Mon Sep 17 00:00:00 2001 From: mdip226 Date: Wed, 3 May 2023 20:41:39 -0400 Subject: [PATCH 07/31] stash-save --- .../core/messages/simulation_run_unique.py | 34 ++-- stochss_compute/core/remote_simulation.py | 23 ++- stochss_compute/server/run.py | 5 +- stochss_compute/server/run_unique.py | 75 ++++----- stochss_compute/server/status_unique.py | 150 ++++++++++++++++++ test/unit_tests/test_compute_server.py | 6 +- test/unit_tests/test_is_cached_handler.py | 2 +- test/unit_tests/test_status_handler.py | 2 +- 8 files changed, 226 insertions(+), 71 deletions(-) create mode 100644 stochss_compute/server/status_unique.py diff --git a/stochss_compute/core/messages/simulation_run_unique.py b/stochss_compute/core/messages/simulation_run_unique.py index c0dd70ac..e1e56c3d 100644 --- a/stochss_compute/core/messages/simulation_run_unique.py +++ b/stochss_compute/core/messages/simulation_run_unique.py @@ -1,4 +1,12 @@ -class SimulationRunRequest(Request): +''' +stochss_compute.core.messages.simulation_run_unique +''' +from secrets import token_hex +from tornado.escape import json_decode, json_encode +from gillespy2 import Model +from stochss_compute.core.messages.base import Request, Response + +class SimulationRunUniqueRequest(Request): ''' A simulation request. @@ -11,9 +19,10 @@ class SimulationRunRequest(Request): :param kwargs: kwargs for the model.run() call. :type kwargs: dict[str, Any] ''' - def __init__(self, model, **kwargs): + def __init__(self, model, **kwargs, ): self.model = model self.kwargs = kwargs + self.unique_key = token_hex(7) def encode(self): ''' @@ -21,6 +30,7 @@ def encode(self): ''' return {'model': self.model.to_json(), 'kwargs': self.kwargs, + 'unique_key': self.unique_key, } @staticmethod @@ -37,22 +47,9 @@ def parse(raw_request): request_dict = json_decode(raw_request) model = Model.from_json(request_dict['model']) kwargs = request_dict['kwargs'] - return SimulationRunRequest(model, **kwargs) - - def hash(self): - ''' - Generate a unique hash of this simulation request. - Does not include number_of_trajectories in this calculation. - - :returns: md5 hex digest. - :rtype: str - ''' - anon_model_string = self.model.to_anon().to_json(encode_private=False) - popped_kwargs = {kw:self.kwargs[kw] for kw in self.kwargs if kw!='number_of_trajectories'} - kwargs_string = json_encode(popped_kwargs) - request_string = f'{anon_model_string}{kwargs_string}' - _hash = md5(str.encode(request_string)).hexdigest() - return _hash + _ = SimulationRunUniqueRequest(model, **kwargs) + _.unique_key = request_dict['unique_key'] + return _ class SimulationRunResponse(Response): ''' @@ -74,7 +71,6 @@ def __init__(self, status, error_message = None, results_id = None, results = No self.status = status self.error_message = error_message self.results_id = results_id - self.results = results self.task_id = task_id def encode(self): diff --git a/stochss_compute/core/remote_simulation.py b/stochss_compute/core/remote_simulation.py index 2d8a7682..81e952ac 100644 --- a/stochss_compute/core/remote_simulation.py +++ b/stochss_compute/core/remote_simulation.py @@ -19,6 +19,7 @@ from stochss_compute.client.endpoint import Endpoint from stochss_compute.core.messages.simulation_run import SimulationRunRequest, SimulationRunResponse +from stochss_compute.core.messages.simulation_run_unique import SimulationRunUniqueRequest from stochss_compute.core.messages.status import SimStatus from stochss_compute.core.errors import RemoteSimulationError from stochss_compute.core.remote_results import RemoteResults @@ -135,7 +136,7 @@ def _run(self, request): :param request: Request to send to the server. Contains Model and related arguments. :type request: SimulationRunRequest ''' - response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=sim_request) + response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=request) if not response_raw.ok: raise Exception(response_raw.reason) @@ -150,7 +151,7 @@ def _run(self, request): remote_results.id = sim_response.results_id remote_results.server = self.server - remote_results.n_traj = params.get('number_of_trajectories', 1) + remote_results.n_traj = request.kwargs.get('number_of_trajectories', 1) remote_results.task_id = sim_response.task_id return remote_results @@ -161,5 +162,23 @@ def _run_unique(self, request): :type request: SimulationRunUniqueRequest ''' remote_results = RemoteResults() + response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=request) + if not response_raw.ok: + raise Exception(response_raw.reason) + + sim_response = SimulationRunResponse.parse(response_raw.text) + + if sim_response.status == SimStatus.ERROR: + raise RemoteSimulationError(sim_response.error_message) + if sim_response.status == SimStatus.READY: + remote_results = RemoteResults(data=sim_response.results.data) + else: + remote_results = RemoteResults() + + remote_results.id = sim_response.results_id + remote_results.server = self.server + remote_results.n_traj = request.kwargs.get('number_of_trajectories', 1) + remote_results.task_id = sim_response.task_id + return remote_results diff --git a/stochss_compute/server/run.py b/stochss_compute/server/run.py index 1595be20..f8389e60 100644 --- a/stochss_compute/server/run.py +++ b/stochss_compute/server/run.py @@ -35,16 +35,19 @@ class RunHandler(RequestHandler): Endpoint for running Gillespy2 simulations. ''' + scheduler_address = None + cache_dir = None def initialize(self, scheduler_address, cache_dir): ''' Sets the address to the Dask scheduler and the cache directory. - + :param scheduler_address: Scheduler address. :type scheduler_address: str :param cache_dir: Path to the cache. :type cache_dir: str ''' + self.scheduler_address = scheduler_address self.cache_dir = cache_dir diff --git a/stochss_compute/server/run_unique.py b/stochss_compute/server/run_unique.py index 558f4498..47142f34 100644 --- a/stochss_compute/server/run_unique.py +++ b/stochss_compute/server/run_unique.py @@ -26,19 +26,29 @@ from distributed import Client, Future from gillespy2.core import Results from stochss_compute.core.messages.status import SimStatus -from stochss_compute.core.messages.simulation_run import SimulationRunRequest, SimulationRunResponse +from stochss_compute.core.exceptions import PRNGCollision +from stochss_compute.core.messages.simulation_run_unique import SimulationRunUniqueRequest, SimulationRunUniqueResponse from stochss_compute.server.cache import Cache -class RunUniqueHandler(RequestHandler): +class SimulationRunUniqueHandler(RequestHandler): ''' Endpoint for running Gillespy2 simulations. ''' + def __init__(self, application, request, **kwargs): + self.scheduler_address = None + self.cache_dir = None + self.unique_key = None + super().__init__(application, request, **kwargs) + + def data_received(self, chunk: bytes): + raise NotImplementedError() + def initialize(self, scheduler_address, cache_dir): ''' Sets the address to the Dask scheduler and the cache directory. - + :param scheduler_address: Scheduler address. :type scheduler_address: str @@ -46,47 +56,25 @@ def initialize(self, scheduler_address, cache_dir): :type cache_dir: str ''' self.scheduler_address = scheduler_address - self.cache_dir = cache_dir + while cache_dir.endswith('/'): + cache_dir = cache_dir[:-1] + self.cache_dir = cache_dir + '/unique/' async def post(self): ''' - Process simulation run request. + Process simulation run unique request. ''' - sim_request = SimulationRunRequest.parse(self.request.body) - sim_hash = sim_request.hash() - log_string = f'{datetime.now()} | <{self.request.remote_ip}> | Simulation Run Request | <{sim_hash}> | ' - cache = Cache(self.cache_dir, sim_hash) - if not cache.exists(): - cache.create() - empty = cache.is_empty() - if not empty: - # Check the number of trajectories in the request, default 1 - n_traj = sim_request.kwargs.get('number_of_trajectories', 1) - # Compare that to the number of cached trajectories - trajectories_needed = cache.n_traj_needed(n_traj) - if trajectories_needed > 0: - sim_request.kwargs['number_of_trajectories'] = trajectories_needed - print(log_string + - f'Partial cache. Running {trajectories_needed} new trajectories.') - client = Client(self.scheduler_address) - future = self._submit(sim_request, sim_hash, client) - self._return_running(sim_hash, future.key) - IOLoop.current().run_in_executor(None, self._cache, sim_hash, future, client) - else: - print(log_string + 'Returning cached results.') - results = cache.get() - ret_traj = random.sample(results, n_traj) - new_results = Results(ret_traj) - new_results_json = new_results.to_json() - sim_response = SimulationRunResponse(SimStatus.READY, results_id = sim_hash, results = new_results_json) - self.write(sim_response.encode()) - self.finish() - if empty: - print(log_string + 'Results not cached. Running simulation.') - client = Client(self.scheduler_address) - future = self._submit(sim_request, sim_hash, client) - self._return_running(sim_hash, future.key) - IOLoop.current().run_in_executor(None, self._cache, sim_hash, future, client) + sim_request = SimulationRunUniqueRequest.parse(self.request.body) + unique_key = sim_request.unique_key + log_string = f'{datetime.now()} | <{self.request.remote_ip}> | Simulation Run Unique Request | <{unique_key}> | ' + cache = Cache(self.cache_dir, unique_key) + if cache.exists(): + raise PRNGCollision('Try again with a different key, because that one is taken.') + cache.create() + client = Client(self.scheduler_address) + future = self._submit(sim_request, client) + self._return_running(unique_key) + IOLoop.current().run_in_executor(None, self._cache, sim_hash, future, client) def _cache(self, sim_hash, future: Future, client: Client): results = future.result() @@ -94,17 +82,16 @@ def _cache(self, sim_hash, future: Future, client: Client): cache = Cache(self.cache_dir, sim_hash) cache.save(results) - def _submit(self, sim_request, sim_hash, client: Client): + def _submit(self, sim_request, client: Client): model = sim_request.model kwargs = sim_request.kwargs - n_traj = kwargs.get('number_of_trajectories', 1) + unique_key = sim_request.unique_key if "solver" in kwargs: from pydoc import locate kwargs["solver"] = locate(kwargs["solver"]) # keep client open for now! close? - key = f'{sim_hash}:{n_traj}:{token_hex(8)}' - future = client.submit(model.run, **kwargs, key=key) + future = client.submit(model.run, **kwargs, key=unique_key) return future def _return_running(self, results_id, task_id): diff --git a/stochss_compute/server/status_unique.py b/stochss_compute/server/status_unique.py new file mode 100644 index 00000000..73279be8 --- /dev/null +++ b/stochss_compute/server/status_unique.py @@ -0,0 +1,150 @@ +''' +stochss_compute.server.status_unique +''' +# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. +# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from datetime import datetime +from distributed import Client +from tornado.web import RequestHandler +from stochss_compute.core.errors import RemoteSimulationError +from stochss_compute.core.messages.status import SimStatus, StatusResponse +from stochss_compute.server.cache import Cache + +class StatusUniqueHandler(RequestHandler): + ''' + Endpoint for requesting the status of a unique (one-off) simulation. + ''' + def __init__(self, application, request, **kwargs): + self.scheduler_address = None + self.cache_dir = None + self.task_id = None + self.results_id = None + super().__init__(application, request, **kwargs) + + def data_received(self, chunk: bytes): + raise NotImplementedError() + + def initialize(self, scheduler_address, cache_dir): + ''' + Sets the address to the Dask scheduler and the cache directory. + + :param scheduler_address: Scheduler address. + :type scheduler_address: str + + :param cache_dir: Path to the cache. + :type cache_dir: str + ''' + self.scheduler_address = scheduler_address + while cache_dir.endswith('/'): + cache_dir = cache_dir[:-1] + self.cache_dir = cache_dir + '/unique/' + + async def get(self, unique_key): + ''' + Process GET request. + + :param unique_key: ID of the running simulation. Required. + :type unique_key: str + ''' + if unique_key == '': + self.set_status(404, reason=f'Malformed request: {self.request.uri}') + self.finish() + raise RemoteSimulationError(f'Malformed request: {self.request.uri}') + + self.unique_key = unique_key + + cache = Cache(self.cache_dir, unique_key) + + print(f'{datetime.now()} | <{self.request.remote_ip}> | \ + Status Request | <{results_id}> | Trajectories: {n_traj} | \ + Task ID: {task_id}' ) + + msg = f'{datetime.now()} | <{results_id}> | <{task_id}> | Status: ' + + exists = cache.exists() + if exists: + empty = cache.is_empty() + if empty: + if self.task_id not in ('', None): + state, err = await self._check_with_scheduler() + print(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}') + if state == 'erred': + self._respond_error(err) + else: + self._respond_running(f'Scheduler task state: {state}') + else: + print(msg+SimStatus.DOES_NOT_EXIST.name) + self._respond_dne() + else: + ready = cache.is_ready(n_traj) + if ready: + print(msg+SimStatus.READY.name) + self._respond_ready() + else: + if self.task_id not in ('', None): + state, err = await self._check_with_scheduler() + print(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}') + if state == 'erred': + self._respond_error(err) + else: + self._respond_running(f'Scheduler task state: {state}') + else: + print(msg+SimStatus.DOES_NOT_EXIST.name) + self._respond_dne() + else: + print(msg+SimStatus.DOES_NOT_EXIST.name) + self._respond_dne() + + def _respond_ready(self): + status_response = StatusResponse(SimStatus.READY) + self.write(status_response.encode()) + self.finish() + + def _respond_error(self, error_message): + status_response = StatusResponse(SimStatus.ERROR, error_message) + self.write(status_response.encode()) + self.finish() + + def _respond_dne(self): + status_response = StatusResponse(SimStatus.DOES_NOT_EXIST, 'There is no record of that simulation.') + self.write(status_response.encode()) + self.finish() + + def _respond_running(self, message): + status_response = StatusResponse(SimStatus.RUNNING, message) + self.write(status_response.encode()) + self.finish() + + async def _check_with_scheduler(self): + ''' + Ask the scheduler for information about a task. + ''' + client = Client(self.scheduler_address) + + # define function here so that it is pickle-able + def scheduler_task_state(task_id, dask_scheduler=None): + task = dask_scheduler.tasks.get(task_id) + if task is None: + return (None, None) + if task.exception_text == "": + return (task.state, None) + return (task.state, task.exception_text) + + # Do not await. Reasons. It returns sync. + ret = client.run_on_scheduler(scheduler_task_state, self.task_id) + client.close() + return ret diff --git a/test/unit_tests/test_compute_server.py b/test/unit_tests/test_compute_server.py index 0c00f9c2..42df8c88 100644 --- a/test/unit_tests/test_compute_server.py +++ b/test/unit_tests/test_compute_server.py @@ -13,14 +13,14 @@ class ComputeServerTest(unittest.TestCase): Test ComputeServer class. ''' def setUp(self) -> None: - self.server = ComputeServer('cRaZyHoSt.lol', 60095) + self.server = ComputeServer('cRaZyHoSt.lol', 7) self.sim_endpoint = Endpoint.SIMULATION_GILLESPY2 def test_init_and_properties(self): ''' Calls init and tests address. ''' - assert self.server.address == 'http://cRaZyHoSt.lol:60095' + assert self.server.address == 'http://cRaZyHoSt.lol:7' def test_get(self): ''' @@ -32,7 +32,7 @@ def test_post(self): ''' calls post to timeout ''' - server = ComputeServer('cRaZyHoSt.lol', 60095) + server = ComputeServer('cRaZyHoSt.lol', 7) sim_endpoint = Endpoint.SIMULATION_GILLESPY2 server.post(sim_endpoint,'', SimulationRunRequest(create_decay())) diff --git a/test/unit_tests/test_is_cached_handler.py b/test/unit_tests/test_is_cached_handler.py index 89316671..6adde4a5 100644 --- a/test/unit_tests/test_is_cached_handler.py +++ b/test/unit_tests/test_is_cached_handler.py @@ -4,7 +4,7 @@ import os import subprocess from tornado.testing import AsyncHTTPTestCase -from stochss_compute.core.messages.base import SimStatus +from stochss_compute.core.messages.status import SimStatus from stochss_compute.core.messages.status import StatusResponse from stochss_compute.core.messages.simulation_run import SimulationRunRequest from stochss_compute.server.api import _make_app diff --git a/test/unit_tests/test_status_handler.py b/test/unit_tests/test_status_handler.py index 7d4c1314..69dc919f 100644 --- a/test/unit_tests/test_status_handler.py +++ b/test/unit_tests/test_status_handler.py @@ -5,7 +5,7 @@ import subprocess from test.unit_tests.gillespy2_models import create_michaelis_menten from tornado.testing import AsyncHTTPTestCase -from stochss_compute.core.messages.base import SimStatus +from stochss_compute.core.messages.status import SimStatus from stochss_compute.core.messages.simulation_run import SimulationRunRequest from stochss_compute.core.messages.status import StatusResponse from stochss_compute.server.api import _make_app From 32ddcbc5d733fe9fe325936e0deb7f94dc941b7c Mon Sep 17 00:00:00 2001 From: mdip226 Date: Thu, 4 May 2023 22:11:04 -0400 Subject: [PATCH 08/31] save again --- .../core/messages/simulation_run_unique.py | 28 +++------ stochss_compute/server/run_unique.py | 58 +++++++++++++------ 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/stochss_compute/core/messages/simulation_run_unique.py b/stochss_compute/core/messages/simulation_run_unique.py index e1e56c3d..b51baf77 100644 --- a/stochss_compute/core/messages/simulation_run_unique.py +++ b/stochss_compute/core/messages/simulation_run_unique.py @@ -2,9 +2,10 @@ stochss_compute.core.messages.simulation_run_unique ''' from secrets import token_hex -from tornado.escape import json_decode, json_encode +from tornado.escape import json_decode from gillespy2 import Model from stochss_compute.core.messages.base import Request, Response +from stochss_compute.core.messages.status import SimStatus class SimulationRunUniqueRequest(Request): ''' @@ -51,9 +52,9 @@ def parse(raw_request): _.unique_key = request_dict['unique_key'] return _ -class SimulationRunResponse(Response): +class SimulationRunUniqueResponse(Response): ''' - A response from the server regarding a SimulationRunRequest. + A response from the server regarding a SimulationRunUniqueRequest. :param status: The status of the simulation. :type status: SimStatus @@ -61,17 +62,10 @@ class SimulationRunResponse(Response): :param error_message: Possible error message. :type error_message: str | None - :param results_id: Hash of the simulation request. Identifies the results. - :type results_id: str | None - - :param results: JSON-Encoded gillespy2.Results - :type results: str | None ''' - def __init__(self, status, error_message = None, results_id = None, results = None, task_id = None): + def __init__(self, status, error_message = None): self.status = status self.error_message = error_message - self.results_id = results_id - self.task_id = task_id def encode(self): ''' @@ -79,9 +73,7 @@ def encode(self): ''' return {'status': self.status.name, 'error_message': self.error_message or '', - 'results_id': self.results_id or '', - 'results': self.results or '', - 'task_id': self.task_id or '',} + } @staticmethod def parse(raw_response): @@ -96,11 +88,5 @@ def parse(raw_response): ''' response_dict = json_decode(raw_response) status = SimStatus.from_str(response_dict['status']) - results_id = response_dict['results_id'] error_message = response_dict['error_message'] - task_id = response_dict['task_id'] - if response_dict['results'] != '': - results = Results.from_json(response_dict['results']) - else: - results = None - return SimulationRunResponse(status, error_message, results_id, results, task_id) \ No newline at end of file + return SimulationRunUniqueResponse(status, error_message) diff --git a/stochss_compute/server/run_unique.py b/stochss_compute/server/run_unique.py index 47142f34..c1289ce3 100644 --- a/stochss_compute/server/run_unique.py +++ b/stochss_compute/server/run_unique.py @@ -1,5 +1,5 @@ ''' -stochss_compute.server.run +stochss_compute.server.run_unique ''' # StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. # Copyright (C) 2019-2023 GillesPy2 and StochSS developers. @@ -17,14 +17,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import random from datetime import datetime -from secrets import token_hex from tornado.web import RequestHandler from tornado.ioloop import IOLoop from distributed import Client, Future -from gillespy2.core import Results from stochss_compute.core.messages.status import SimStatus from stochss_compute.core.exceptions import PRNGCollision from stochss_compute.core.messages.simulation_run_unique import SimulationRunUniqueRequest, SimulationRunUniqueResponse @@ -48,6 +45,7 @@ def data_received(self, chunk: bytes): def initialize(self, scheduler_address, cache_dir): ''' Sets the address to the Dask scheduler and the cache directory. + Creates a new directory for one-off results files identifiable by token. :param scheduler_address: Scheduler address. :type scheduler_address: str @@ -62,39 +60,65 @@ def initialize(self, scheduler_address, cache_dir): async def post(self): ''' - Process simulation run unique request. + Process simulation run unique POST request. ''' sim_request = SimulationRunUniqueRequest.parse(self.request.body) - unique_key = sim_request.unique_key - log_string = f'{datetime.now()} | <{self.request.remote_ip}> | Simulation Run Unique Request | <{unique_key}> | ' - cache = Cache(self.cache_dir, unique_key) + self.unique_key = sim_request.unique_key + cache = Cache(self.cache_dir, self.unique_key) if cache.exists(): + self.set_status(404, reason='Try again with a different key, because that one is taken.') + self.finish() raise PRNGCollision('Try again with a different key, because that one is taken.') cache.create() - client = Client(self.scheduler_address) - future = self._submit(sim_request, client) - self._return_running(unique_key) - IOLoop.current().run_in_executor(None, self._cache, sim_hash, future, client) + future = self._submit(sim_request) + log_string = f'{datetime.now()} | <{self.request.remote_ip}> | Simulation Run Unique Request | <{self.unique_key}> | ' + print(log_string + 'Running simulation.') + self._return_running() + IOLoop.current().run_in_executor(None, self._cache, future, client) + + def _cache(self, future, client): + ''' + Await results, close client, save to disk. - def _cache(self, sim_hash, future: Future, client: Client): + :param future: Handle to the running simulation, to be awaited upon. + :type future: distributed.Future + + :param client: Client to the Dask scheduler. Closing here for good measure, not sure if strictly necessary. + :type client: distributed.Client + ''' results = future.result() client.close() - cache = Cache(self.cache_dir, sim_hash) + cache = Cache(self.cache_dir, self.unique_key) cache.save(results) - def _submit(self, sim_request, client: Client): + def _submit(self, sim_request, client): + ''' + Submit request to dask scheduler. + + :param sim_request: The user's request for a unique simulation. + :type sim_request: SimulationRunUniqueRequest + + :returns: Handle to the running simulation and the results on the worker. + :rtype: distributed.Future + ''' model = sim_request.model kwargs = sim_request.kwargs unique_key = sim_request.unique_key if "solver" in kwargs: + # pylint:disable=import-outside-toplevel from pydoc import locate + # pylint:enable=import-outside-toplevel kwargs["solver"] = locate(kwargs["solver"]) # keep client open for now! close? + client = Client(self.scheduler_address) future = client.submit(model.run, **kwargs, key=unique_key) return future - def _return_running(self, results_id, task_id): - sim_response = SimulationRunResponse(SimStatus.RUNNING, results_id=results_id, task_id=task_id) + def _return_running(self): + ''' + Let the user know we submitted the simulation to the scheduler. + ''' + sim_response = SimulationRunUniqueResponse(SimStatus.RUNNING) self.write(sim_response.encode()) self.finish() From 362c0e62c1c7a6d1f0c4e0a2ec8e154728d3923b Mon Sep 17 00:00:00 2001 From: mdip226 Date: Fri, 5 May 2023 13:02:45 -0400 Subject: [PATCH 09/31] finalize response and request for RunUnique --- .../core/messages/simulation_run_unique.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/stochss_compute/core/messages/simulation_run_unique.py b/stochss_compute/core/messages/simulation_run_unique.py index b51baf77..9429c6a6 100644 --- a/stochss_compute/core/messages/simulation_run_unique.py +++ b/stochss_compute/core/messages/simulation_run_unique.py @@ -9,18 +9,15 @@ class SimulationRunUniqueRequest(Request): ''' - A simulation request. + A one-off simulation request identifiable by a unique key. :param model: A model to run. :type model: gillespy2.Model - :param unique: When True, ignore cache completely and return always new results. - :type unique: bool - :param kwargs: kwargs for the model.run() call. :type kwargs: dict[str, Any] ''' - def __init__(self, model, **kwargs, ): + def __init__(self, model, **kwargs): self.model = model self.kwargs = kwargs self.unique_key = token_hex(7) @@ -37,19 +34,19 @@ def encode(self): @staticmethod def parse(raw_request): ''' - Parse HTTP request. + Parse raw HTTP request. Done server-side. :param raw_request: The request. :type raw_request: dict[str, str] :returns: The decoded object. - :rtype: SimulationRunRequest + :rtype: SimulationRunUniqueRequest ''' request_dict = json_decode(raw_request) model = Model.from_json(request_dict['model']) kwargs = request_dict['kwargs'] _ = SimulationRunUniqueRequest(model, **kwargs) - _.unique_key = request_dict['unique_key'] + _.unique_key = request_dict['unique_key'] # apply correct token (from raw request) after object construction. return _ class SimulationRunUniqueResponse(Response): From 4cda25ee720b4ccf0ba5c9049bc3c54e1c3b3c25 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Fri, 5 May 2023 13:12:58 -0400 Subject: [PATCH 10/31] finalize run_unique endpoint --- stochss_compute/server/run_unique.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/stochss_compute/server/run_unique.py b/stochss_compute/server/run_unique.py index c1289ce3..d57fd713 100644 --- a/stochss_compute/server/run_unique.py +++ b/stochss_compute/server/run_unique.py @@ -70,7 +70,8 @@ async def post(self): self.finish() raise PRNGCollision('Try again with a different key, because that one is taken.') cache.create() - future = self._submit(sim_request) + client = Client(self.scheduler_address) + future = self._submit(sim_request, client) log_string = f'{datetime.now()} | <{self.request.remote_ip}> | Simulation Run Unique Request | <{self.unique_key}> | ' print(log_string + 'Running simulation.') self._return_running() @@ -94,10 +95,14 @@ def _cache(self, future, client): def _submit(self, sim_request, client): ''' Submit request to dask scheduler. + Uses pydoc.locate to convert str to solver class name object. :param sim_request: The user's request for a unique simulation. :type sim_request: SimulationRunUniqueRequest + :param client: Client to the Dask scheduler. + :type client: distributed.Client + :returns: Handle to the running simulation and the results on the worker. :rtype: distributed.Future ''' @@ -110,8 +115,6 @@ def _submit(self, sim_request, client): # pylint:enable=import-outside-toplevel kwargs["solver"] = locate(kwargs["solver"]) - # keep client open for now! close? - client = Client(self.scheduler_address) future = client.submit(model.run, **kwargs, key=unique_key) return future From b001d1cd977c0210525f370c2eb0c52796c2e8e0 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Fri, 5 May 2023 19:38:20 -0400 Subject: [PATCH 11/31] cut out some stuff --- stochss_compute/core/remote_simulation.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/stochss_compute/core/remote_simulation.py b/stochss_compute/core/remote_simulation.py index 81e952ac..1293748f 100644 --- a/stochss_compute/core/remote_simulation.py +++ b/stochss_compute/core/remote_simulation.py @@ -161,24 +161,19 @@ def _run_unique(self, request): :param request: Request to send to the server. Contains Model and related arguments. :type request: SimulationRunUniqueRequest ''' - remote_results = RemoteResults() - response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run", request=request) + response_raw = self.server.post(Endpoint.SIMULATION_GILLESPY2, sub="/run/unique", request=request) + if not response_raw.ok: raise Exception(response_raw.reason) - sim_response = SimulationRunResponse.parse(response_raw.text) + if not sim_response.status != SimStatus.RUNNING: + raise Exception(sim_response.error_message) - if sim_response.status == SimStatus.ERROR: - raise RemoteSimulationError(sim_response.error_message) - if sim_response.status == SimStatus.READY: - remote_results = RemoteResults(data=sim_response.results.data) - else: - remote_results = RemoteResults() - remote_results.id = sim_response.results_id + remote_results = RemoteResults() + remote_results.id = request.unique_key + remote_results.task_id = request.unique_key remote_results.server = self.server remote_results.n_traj = request.kwargs.get('number_of_trajectories', 1) - remote_results.task_id = sim_response.task_id return remote_results - From 57c34fef752dfdbd0e7a320dfa0837123ad27050 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sun, 7 May 2023 13:19:34 -0400 Subject: [PATCH 12/31] stash --- stochss_compute/core/remote_results.py | 2 + stochss_compute/core/unique_results.py | 135 +++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 stochss_compute/core/unique_results.py diff --git a/stochss_compute/core/remote_results.py b/stochss_compute/core/remote_results.py index 34fbce0f..c86a1b52 100644 --- a/stochss_compute/core/remote_results.py +++ b/stochss_compute/core/remote_results.py @@ -49,8 +49,10 @@ class RemoteResults(Results): n_traj = None task_id = None + # pylint:disable=super-init-not-called def __init__(self, data = None): self._data = data + # pylint:enable=super-init-not-called @property def data(self): diff --git a/stochss_compute/core/unique_results.py b/stochss_compute/core/unique_results.py new file mode 100644 index 00000000..d2600c53 --- /dev/null +++ b/stochss_compute/core/unique_results.py @@ -0,0 +1,135 @@ +''' +stochss_compute.core.remote_results +''' +# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. +# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from time import sleep +from gillespy2 import Results +from stochss_compute.client.endpoint import Endpoint +from stochss_compute.core.errors import RemoteSimulationError +from stochss_compute.core.messages.results import ResultsResponse +from stochss_compute.core.messages.status import StatusResponse, SimStatus + +class UniqueResults(Results): + ''' + Wrapper for a gillespy2.Results object that exists on a remote server and which is then downloaded locally. + A Results object is: A List of Trajectory objects created by a gillespy2 solver, extends the UserList object. + + These three fields must be initialized manually: id, server, n_traj, task_id. + + :param data: A list of trajectory objects. + :type data: UserList + + :param id: ID of the cached Results object. + :type id: str + + :param server: The remote instance of StochSS-Compute where the Results are cached. + :type server: stochss_compute.ComputeServer + + :param task_id: Handle for the running simulation. + :type task_id: str + ''' + # These three fields are initialized by the server + id = None + server = None + + # pylint:disable=super-init-not-called + def __init__(self, data = None): + self._data = data + # pylint:enable=super-init-not-called + + @property + def data(self): + """ + The trajectory data. + + :returns: self._data + :rtype: UserList + """ + if None in (self.id, self.server): + raise Exception('RemoteResults must have a self.id, self.server and self.n_traj.') + + if self._data is None: + self._resolve() + return self._data + + @property + def sim_status(self): + ''' + Fetch the simulation status. + + :returns: Simulation status enum as a string. + :rtype: str + ''' + return self._status().status.name + + def get_gillespy2_results(self): + """ + Get the GillesPy2 results object from the remote results. + + :returns: The generated GillesPy2 results object. + :rtype: gillespy.Results + """ + return Results(self.data) + + + @property + def is_ready(self): + """ + True if results exist in cache on the server. + + :returns: status == SimStatus.READY + :rtype: bool + """ + return self._status().status == SimStatus.READY + + def _status(self): + # Request the status of a submitted simulation. + response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, + f"/{self.id}/status") + if not response_raw.ok: + raise RemoteSimulationError(response_raw.reason) + + status_response = StatusResponse.parse(response_raw.text) + return status_response + + def _resolve(self): + status_response = self._status() + status = status_response.status + + if status == SimStatus.RUNNING: + print('Simulation is running. Downloading results when complete......') + while True: + sleep(5) + status_response = self._status() + status = status_response.status + if status != SimStatus.RUNNING: + break + + if status in (SimStatus.DOES_NOT_EXIST, SimStatus.ERROR): + raise RemoteSimulationError(status_response.message) + + + if status == SimStatus.READY: + print('Results ready. Fetching.......') + response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, f"/{self.id}/{self.n_traj}/results") + if not response_raw.ok: + raise RemoteSimulationError(response_raw.reason) + + response = ResultsResponse.parse(response_raw.text) + self._data = response.results.data + From c79ac140ef352042ba1d577f3d1042a62f92dbb0 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 15:29:05 -0400 Subject: [PATCH 13/31] first part of global logging --- stochss_compute/core/log_config.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 stochss_compute/core/log_config.py diff --git a/stochss_compute/core/log_config.py b/stochss_compute/core/log_config.py new file mode 100644 index 00000000..9fabbb6a --- /dev/null +++ b/stochss_compute/core/log_config.py @@ -0,0 +1,24 @@ +''' +stochss_compute.core.log_config + +Global Logging Configuration +''' + +from logging import getLogger + +def init_logging(name): + ''' + Call after import to initialize logs in a module file. + To follow convention, use predefined __name__. + + Like so: + + from stochss_compute.core.log_config import init_logs + logger = init_logs(__name__) + + :returns: A module specific logger with level set by global LOG_LEVEL. + :rtype: logging.Logger + ''' + logger = getLogger(name) + return logger + \ No newline at end of file From 33f66c6efaa1003b72b48990be568504bf9550bd Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 16:08:07 -0400 Subject: [PATCH 14/31] add setter and cleanup --- stochss_compute/core/log_config.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/stochss_compute/core/log_config.py b/stochss_compute/core/log_config.py index 9fabbb6a..c86a4e0f 100644 --- a/stochss_compute/core/log_config.py +++ b/stochss_compute/core/log_config.py @@ -14,11 +14,23 @@ def init_logging(name): Like so: from stochss_compute.core.log_config import init_logs - logger = init_logs(__name__) + log = init_logs(__name__) - :returns: A module specific logger with level set by global LOG_LEVEL. + :param name: Name for the logger. Use the dot-separated module path string. + :type name: str + + :returns: A module specific logger. :rtype: logging.Logger ''' logger = getLogger(name) return logger + +def set_global_log_level(level): + ''' + Sets the root logger log level. + + :param level: NOTSET:0, DEBUG:10, INFO:20, WARNING:30, ERROR:40, CRITICAL:50, etc. + :type level: int | logging._Level + ''' + getLogger().setLevel(level) \ No newline at end of file From a9d89a09a7a8c17a3d2dd07e1eddf9534aca42c1 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 16:09:00 -0400 Subject: [PATCH 15/31] fix logic and rename argument to ignore_cache --- stochss_compute/core/remote_simulation.py | 26 ++++++++++++----------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/stochss_compute/core/remote_simulation.py b/stochss_compute/core/remote_simulation.py index 1293748f..11a4a7fd 100644 --- a/stochss_compute/core/remote_simulation.py +++ b/stochss_compute/core/remote_simulation.py @@ -19,7 +19,7 @@ from stochss_compute.client.endpoint import Endpoint from stochss_compute.core.messages.simulation_run import SimulationRunRequest, SimulationRunResponse -from stochss_compute.core.messages.simulation_run_unique import SimulationRunUniqueRequest +from stochss_compute.core.messages.simulation_run_unique import SimulationRunUniqueRequest, SimulationRunUniqueResponse from stochss_compute.core.messages.status import SimStatus from stochss_compute.core.errors import RemoteSimulationError from stochss_compute.core.remote_results import RemoteResults @@ -99,10 +99,10 @@ def is_cached(self, **params): results_dummy.n_traj = params.get('number_of_trajectories', 1) return results_dummy.is_ready - def run(self, unique=False, **params): + def run(self, ignore_cache=False, **params): + # pylint:disable=line-too-long """ Simulate the Model on the target ComputeServer, returning the results or a handle to a running simulation. - See `here `_. :param unique: When True, ignore cache completely and return always new results. @@ -116,6 +116,7 @@ def run(self, unique=False, **params): :raises RemoteSimulationError: In the case of SimStatus.ERROR """ + # pylint:enable=line-too-long if "solver" in params: if hasattr(params['solver'], 'is_instantiated'): @@ -124,12 +125,12 @@ def run(self, unique=False, **params): params["solver"] = f"{params['solver'].__module__}.{params['solver'].__qualname__}" if self.solver is not None: params["solver"] = f"{self.solver.__module__}.{self.solver.__qualname__}" - if unique is True: + if ignore_cache is True: sim_request = SimulationRunUniqueRequest(self.model, **params) - self._run_unique(sim_request) - if unique is False: + return self._run_unique(sim_request) + if ignore_cache is False: sim_request = SimulationRunRequest(self.model, **params) - self._run(sim_request) + return self._run(sim_request) def _run(self, request): ''' @@ -155,9 +156,11 @@ def _run(self, request): remote_results.task_id = sim_response.task_id return remote_results - + def _run_unique(self, request): ''' + Ignores the cache. Gives each simulation request a unique identifier. + :param request: Request to send to the server. Contains Model and related arguments. :type request: SimulationRunUniqueRequest ''' @@ -165,11 +168,10 @@ def _run_unique(self, request): if not response_raw.ok: raise Exception(response_raw.reason) - sim_response = SimulationRunResponse.parse(response_raw.text) - if not sim_response.status != SimStatus.RUNNING: + sim_response = SimulationRunUniqueResponse.parse(response_raw.text) + if not sim_response.status is SimStatus.RUNNING: raise Exception(sim_response.error_message) - - + # non-conforming object creation ... possible refactor needed to solve, so left in. remote_results = RemoteResults() remote_results.id = request.unique_key remote_results.task_id = request.unique_key From 3cb28a2388edf6cca265be096a48f8d0fbfb4a21 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 16:09:20 -0400 Subject: [PATCH 16/31] delete, not needed --- stochss_compute/core/unique_results.py | 135 --------------------- stochss_compute/server/status_unique.py | 150 ------------------------ 2 files changed, 285 deletions(-) delete mode 100644 stochss_compute/core/unique_results.py delete mode 100644 stochss_compute/server/status_unique.py diff --git a/stochss_compute/core/unique_results.py b/stochss_compute/core/unique_results.py deleted file mode 100644 index d2600c53..00000000 --- a/stochss_compute/core/unique_results.py +++ /dev/null @@ -1,135 +0,0 @@ -''' -stochss_compute.core.remote_results -''' -# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. -# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from time import sleep -from gillespy2 import Results -from stochss_compute.client.endpoint import Endpoint -from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages.results import ResultsResponse -from stochss_compute.core.messages.status import StatusResponse, SimStatus - -class UniqueResults(Results): - ''' - Wrapper for a gillespy2.Results object that exists on a remote server and which is then downloaded locally. - A Results object is: A List of Trajectory objects created by a gillespy2 solver, extends the UserList object. - - These three fields must be initialized manually: id, server, n_traj, task_id. - - :param data: A list of trajectory objects. - :type data: UserList - - :param id: ID of the cached Results object. - :type id: str - - :param server: The remote instance of StochSS-Compute where the Results are cached. - :type server: stochss_compute.ComputeServer - - :param task_id: Handle for the running simulation. - :type task_id: str - ''' - # These three fields are initialized by the server - id = None - server = None - - # pylint:disable=super-init-not-called - def __init__(self, data = None): - self._data = data - # pylint:enable=super-init-not-called - - @property - def data(self): - """ - The trajectory data. - - :returns: self._data - :rtype: UserList - """ - if None in (self.id, self.server): - raise Exception('RemoteResults must have a self.id, self.server and self.n_traj.') - - if self._data is None: - self._resolve() - return self._data - - @property - def sim_status(self): - ''' - Fetch the simulation status. - - :returns: Simulation status enum as a string. - :rtype: str - ''' - return self._status().status.name - - def get_gillespy2_results(self): - """ - Get the GillesPy2 results object from the remote results. - - :returns: The generated GillesPy2 results object. - :rtype: gillespy.Results - """ - return Results(self.data) - - - @property - def is_ready(self): - """ - True if results exist in cache on the server. - - :returns: status == SimStatus.READY - :rtype: bool - """ - return self._status().status == SimStatus.READY - - def _status(self): - # Request the status of a submitted simulation. - response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, - f"/{self.id}/status") - if not response_raw.ok: - raise RemoteSimulationError(response_raw.reason) - - status_response = StatusResponse.parse(response_raw.text) - return status_response - - def _resolve(self): - status_response = self._status() - status = status_response.status - - if status == SimStatus.RUNNING: - print('Simulation is running. Downloading results when complete......') - while True: - sleep(5) - status_response = self._status() - status = status_response.status - if status != SimStatus.RUNNING: - break - - if status in (SimStatus.DOES_NOT_EXIST, SimStatus.ERROR): - raise RemoteSimulationError(status_response.message) - - - if status == SimStatus.READY: - print('Results ready. Fetching.......') - response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, f"/{self.id}/{self.n_traj}/results") - if not response_raw.ok: - raise RemoteSimulationError(response_raw.reason) - - response = ResultsResponse.parse(response_raw.text) - self._data = response.results.data - diff --git a/stochss_compute/server/status_unique.py b/stochss_compute/server/status_unique.py deleted file mode 100644 index 73279be8..00000000 --- a/stochss_compute/server/status_unique.py +++ /dev/null @@ -1,150 +0,0 @@ -''' -stochss_compute.server.status_unique -''' -# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. -# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -from datetime import datetime -from distributed import Client -from tornado.web import RequestHandler -from stochss_compute.core.errors import RemoteSimulationError -from stochss_compute.core.messages.status import SimStatus, StatusResponse -from stochss_compute.server.cache import Cache - -class StatusUniqueHandler(RequestHandler): - ''' - Endpoint for requesting the status of a unique (one-off) simulation. - ''' - def __init__(self, application, request, **kwargs): - self.scheduler_address = None - self.cache_dir = None - self.task_id = None - self.results_id = None - super().__init__(application, request, **kwargs) - - def data_received(self, chunk: bytes): - raise NotImplementedError() - - def initialize(self, scheduler_address, cache_dir): - ''' - Sets the address to the Dask scheduler and the cache directory. - - :param scheduler_address: Scheduler address. - :type scheduler_address: str - - :param cache_dir: Path to the cache. - :type cache_dir: str - ''' - self.scheduler_address = scheduler_address - while cache_dir.endswith('/'): - cache_dir = cache_dir[:-1] - self.cache_dir = cache_dir + '/unique/' - - async def get(self, unique_key): - ''' - Process GET request. - - :param unique_key: ID of the running simulation. Required. - :type unique_key: str - ''' - if unique_key == '': - self.set_status(404, reason=f'Malformed request: {self.request.uri}') - self.finish() - raise RemoteSimulationError(f'Malformed request: {self.request.uri}') - - self.unique_key = unique_key - - cache = Cache(self.cache_dir, unique_key) - - print(f'{datetime.now()} | <{self.request.remote_ip}> | \ - Status Request | <{results_id}> | Trajectories: {n_traj} | \ - Task ID: {task_id}' ) - - msg = f'{datetime.now()} | <{results_id}> | <{task_id}> | Status: ' - - exists = cache.exists() - if exists: - empty = cache.is_empty() - if empty: - if self.task_id not in ('', None): - state, err = await self._check_with_scheduler() - print(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}') - if state == 'erred': - self._respond_error(err) - else: - self._respond_running(f'Scheduler task state: {state}') - else: - print(msg+SimStatus.DOES_NOT_EXIST.name) - self._respond_dne() - else: - ready = cache.is_ready(n_traj) - if ready: - print(msg+SimStatus.READY.name) - self._respond_ready() - else: - if self.task_id not in ('', None): - state, err = await self._check_with_scheduler() - print(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}') - if state == 'erred': - self._respond_error(err) - else: - self._respond_running(f'Scheduler task state: {state}') - else: - print(msg+SimStatus.DOES_NOT_EXIST.name) - self._respond_dne() - else: - print(msg+SimStatus.DOES_NOT_EXIST.name) - self._respond_dne() - - def _respond_ready(self): - status_response = StatusResponse(SimStatus.READY) - self.write(status_response.encode()) - self.finish() - - def _respond_error(self, error_message): - status_response = StatusResponse(SimStatus.ERROR, error_message) - self.write(status_response.encode()) - self.finish() - - def _respond_dne(self): - status_response = StatusResponse(SimStatus.DOES_NOT_EXIST, 'There is no record of that simulation.') - self.write(status_response.encode()) - self.finish() - - def _respond_running(self, message): - status_response = StatusResponse(SimStatus.RUNNING, message) - self.write(status_response.encode()) - self.finish() - - async def _check_with_scheduler(self): - ''' - Ask the scheduler for information about a task. - ''' - client = Client(self.scheduler_address) - - # define function here so that it is pickle-able - def scheduler_task_state(task_id, dask_scheduler=None): - task = dask_scheduler.tasks.get(task_id) - if task is None: - return (None, None) - if task.exception_text == "": - return (task.state, None) - return (task.state, task.exception_text) - - # Do not await. Reasons. It returns sync. - ret = client.run_on_scheduler(scheduler_task_state, self.task_id) - client.close() - return ret From 07ec335f3a187acce538f863a22813be642a58c2 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 16:09:42 -0400 Subject: [PATCH 17/31] logging, debug, install handler --- stochss_compute/server/api.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/stochss_compute/server/api.py b/stochss_compute/server/api.py index 6c8d4428..5eacbaa1 100644 --- a/stochss_compute/server/api.py +++ b/stochss_compute/server/api.py @@ -20,18 +20,24 @@ import os import asyncio import subprocess +from logging import DEBUG, INFO from tornado.web import Application from stochss_compute.server.is_cached import IsCachedHandler from stochss_compute.server.run import RunHandler +from stochss_compute.server.run_unique import SimulationRunUniqueHandler from stochss_compute.server.sourceip import SourceIpHandler from stochss_compute.server.status import StatusHandler from stochss_compute.server.results import ResultsHandler +from stochss_compute.core.log_config import init_logging, set_global_log_level +log = init_logging(__name__) def _make_app(dask_host, dask_scheduler_port, cache): scheduler_address = f'{dask_host}:{dask_scheduler_port}' return Application([ (r"/api/v2/simulation/gillespy2/run", RunHandler, {'scheduler_address': scheduler_address, 'cache_dir': cache}), + (r"/api/v2/simulation/gillespy2/run/unique", SimulationRunUniqueHandler, + {'scheduler_address': scheduler_address, 'cache_dir': cache}), (r"/api/v2/simulation/gillespy2/(?P.*?)/(?P[1-9]\d*?)/(?P.*?)/status", StatusHandler, {'scheduler_address': scheduler_address, 'cache_dir': cache}), (r"/api/v2/simulation/gillespy2/(?P.*?)/(?P[1-9]\d*?)/results", @@ -47,6 +53,7 @@ async def start_api( dask_host = 'localhost', dask_scheduler_port = 8786, rm = False, + debug = False, ): """ Start the REST API with the following arguments. @@ -65,22 +72,36 @@ async def start_api( :param rm: Delete the cache when exiting this program. :type rm: bool + + :param debug: Turn on Debug Logs. + :type debug: bool """ - # clean up lock files here + + if debug: + set_global_log_level(DEBUG) + else: + set_global_log_level(INFO) + # TODO clean up lock files here + cache_path = os.path.abspath(cache) app = _make_app(dask_host, dask_scheduler_port, cache) app.listen(port) - print(f'StochSS-Compute listening on: :{port}') - print(f'Cache directory: {cache_path}') - print(f'Connecting to Dask scheduler at: {dask_host}:{dask_scheduler_port}\n') + msg=""" +========================================================================= + StochSS-Compute listening on port: %(port)d + Cache directory: %(cache_path)s + Connecting to Dask scheduler at: %(dask_host)s:%(dask_scheduler_port)d +========================================================================= +""" + log.info(msg, locals()) try: await asyncio.Event().wait() except asyncio.exceptions.CancelledError as error: - print(error) + log.error(error) finally: if rm and os.path.exists(cache_path): - print('Removing cache...', end='') + log.info('Removing cache...') subprocess.Popen(['rm', '-r', cache_path]) - print('OK') + log.info('Cache Removed OK') \ No newline at end of file From 8d89c44351fbdb9b2d40507f07214046650a0e3a Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 16:10:16 -0400 Subject: [PATCH 18/31] add "unique" arg --- stochss_compute/server/cache.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/stochss_compute/server/cache.py b/stochss_compute/server/cache.py index 10f99c86..57dd915c 100644 --- a/stochss_compute/server/cache.py +++ b/stochss_compute/server/cache.py @@ -33,10 +33,14 @@ class Cache: :param results_id: Simulation hash. :type results_id: str ''' - def __init__(self, cache_dir, results_id): + def __init__(self, cache_dir, results_id, unique=False): + if unique is True: + while cache_dir.endswith('/'): + cache_dir = cache_dir[:-1] + self.cache_dir = cache_dir + '/unique/' self.results_path = os.path.join(cache_dir, f'{results_id}.results') if not os.path.exists(cache_dir): - os.mkdir(cache_dir) + os.makedirs(cache_dir) def create(self) -> None: ''' From f0ace7fa503795e797fd256ed8c63b6f5b309587 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 16:11:05 -0400 Subject: [PATCH 19/31] quick save --- stochss_compute/server/status.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/stochss_compute/server/status.py b/stochss_compute/server/status.py index 8e309993..56c11d8d 100644 --- a/stochss_compute/server/status.py +++ b/stochss_compute/server/status.py @@ -17,13 +17,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -from datetime import datetime from distributed import Client from tornado.web import RequestHandler from stochss_compute.core.errors import RemoteSimulationError from stochss_compute.core.messages.status import SimStatus, StatusResponse from stochss_compute.server.cache import Cache +from stochss_compute.core.log_config import init_logging +log = init_logging(__name__) + class StatusHandler(RequestHandler): ''' Endpoint for requesting the status of a simulation. @@ -33,6 +35,7 @@ def __init__(self, application, request, **kwargs): self.cache_dir = None self.task_id = None self.results_id = None + self.unique = None super().__init__(application, request, **kwargs) def data_received(self, chunk: bytes): @@ -68,18 +71,15 @@ async def get(self, results_id, n_traj, task_id): self.set_status(404, reason=f'Malformed request: {self.request.uri}') self.finish() raise RemoteSimulationError(f'Malformed request: {self.request.uri}') - + log.debug('ITS A DEBUG MESSAGE') self.results_id = results_id self.task_id = task_id n_traj = int(n_traj) - - cache = Cache(self.cache_dir, results_id) - - print(f'{datetime.now()} | <{self.request.remote_ip}> | \ - Status Request | <{results_id}> | Trajectories: {n_traj} | \ - Task ID: {task_id}' ) + cache = Cache(self.cache_dir, task_id, results_id == task_id) + log_string = f'<{self.request.remote_ip}> | Results ID: <{results_id}> | Trajectories: {n_traj} | Task ID: {task_id}' + log.info(log_string) - msg = f'{datetime.now()} | <{results_id}> | <{task_id}> | Status: ' + msg = f'<{results_id}> | <{task_id}> | Status: ' exists = cache.exists() if exists: @@ -87,32 +87,32 @@ async def get(self, results_id, n_traj, task_id): if empty: if self.task_id not in ('', None): state, err = await self._check_with_scheduler() - print(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}') + logger.info(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}') if state == 'erred': self._respond_error(err) else: self._respond_running(f'Scheduler task state: {state}') else: - print(msg+SimStatus.DOES_NOT_EXIST.name) + logger.info(msg+SimStatus.DOES_NOT_EXIST.name) self._respond_dne() else: ready = cache.is_ready(n_traj) if ready: - print(msg+SimStatus.READY.name) + logger.info(msg+SimStatus.READY.name) self._respond_ready() else: if self.task_id not in ('', None): state, err = await self._check_with_scheduler() - print(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}') + logger.info(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}') if state == 'erred': self._respond_error(err) else: self._respond_running(f'Scheduler task state: {state}') else: - print(msg+SimStatus.DOES_NOT_EXIST.name) + logger.info(msg+SimStatus.DOES_NOT_EXIST.name) self._respond_dne() else: - print(msg+SimStatus.DOES_NOT_EXIST.name) + logger.info(msg+SimStatus.DOES_NOT_EXIST.name) self._respond_dne() def _respond_ready(self): From 823acd298d195c583450fddf492fac0ec8f92fce Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:18:52 -0400 Subject: [PATCH 20/31] gets root module name --- stochss_compute/core/log_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stochss_compute/core/log_config.py b/stochss_compute/core/log_config.py index c86a4e0f..3aeb7afe 100644 --- a/stochss_compute/core/log_config.py +++ b/stochss_compute/core/log_config.py @@ -32,5 +32,5 @@ def set_global_log_level(level): :param level: NOTSET:0, DEBUG:10, INFO:20, WARNING:30, ERROR:40, CRITICAL:50, etc. :type level: int | logging._Level ''' - getLogger().setLevel(level) + getLogger(__name__.split('.', maxsplit=1)[0]).setLevel(level) \ No newline at end of file From ae1e333939eecdba7ddd8eead9ea3e38d1533dab Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:19:14 -0400 Subject: [PATCH 21/31] checks for unique --- stochss_compute/core/remote_results.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/stochss_compute/core/remote_results.py b/stochss_compute/core/remote_results.py index c86a1b52..6d88464b 100644 --- a/stochss_compute/core/remote_results.py +++ b/stochss_compute/core/remote_results.py @@ -125,10 +125,12 @@ def _resolve(self): if status in (SimStatus.DOES_NOT_EXIST, SimStatus.ERROR): raise RemoteSimulationError(status_response.message) - if status == SimStatus.READY: print('Results ready. Fetching.......') - response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, f"/{self.id}/{self.n_traj}/results") + if self.id == self.task_id: + response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, f"/{self.id}/results") + else: + response_raw = self.server.get(Endpoint.SIMULATION_GILLESPY2, f"/{self.id}/{self.n_traj}/results") if not response_raw.ok: raise RemoteSimulationError(response_raw.reason) From 96b441cc3d0797d5d2716f720f811a33b96a31a1 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:29:33 -0400 Subject: [PATCH 22/31] cleanup --- stochss_compute/core/remote_results.py | 1 - stochss_compute/server/api.py | 20 ++++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/stochss_compute/core/remote_results.py b/stochss_compute/core/remote_results.py index 6d88464b..a81a8808 100644 --- a/stochss_compute/core/remote_results.py +++ b/stochss_compute/core/remote_results.py @@ -136,4 +136,3 @@ def _resolve(self): response = ResultsResponse.parse(response_raw.text) self._data = response.results.data - diff --git a/stochss_compute/server/api.py b/stochss_compute/server/api.py index 5eacbaa1..38236e9a 100644 --- a/stochss_compute/server/api.py +++ b/stochss_compute/server/api.py @@ -20,11 +20,12 @@ import os import asyncio import subprocess -from logging import DEBUG, INFO +from logging import INFO from tornado.web import Application from stochss_compute.server.is_cached import IsCachedHandler from stochss_compute.server.run import RunHandler from stochss_compute.server.run_unique import SimulationRunUniqueHandler +from stochss_compute.server.results_unique import ResultsUniqueHandler from stochss_compute.server.sourceip import SourceIpHandler from stochss_compute.server.status import StatusHandler from stochss_compute.server.results import ResultsHandler @@ -42,6 +43,8 @@ def _make_app(dask_host, dask_scheduler_port, cache): StatusHandler, {'scheduler_address': scheduler_address, 'cache_dir': cache}), (r"/api/v2/simulation/gillespy2/(?P.*?)/(?P[1-9]\d*?)/results", ResultsHandler, {'cache_dir': cache}), + (r"/api/v2/simulation/gillespy2/(?P.*?)/results", + ResultsUniqueHandler, {'cache_dir': cache}), (r"/api/v2/cache/gillespy2/(?P.*?)/(?P[1-9]\d*?)/is_cached", IsCachedHandler, {'cache_dir': cache}), (r"/api/v2/cloud/sourceip", SourceIpHandler), @@ -53,7 +56,7 @@ async def start_api( dask_host = 'localhost', dask_scheduler_port = 8786, rm = False, - debug = False, + logging_level = INFO, ): """ Start the REST API with the following arguments. @@ -73,26 +76,23 @@ async def start_api( :param rm: Delete the cache when exiting this program. :type rm: bool - :param debug: Turn on Debug Logs. - :type debug: bool + :param logging_level: Set log level for stochss_compute. + :type debug: logging._Level """ - if debug: - set_global_log_level(DEBUG) - else: - set_global_log_level(INFO) + set_global_log_level(logging_level) # TODO clean up lock files here cache_path = os.path.abspath(cache) app = _make_app(dask_host, dask_scheduler_port, cache) app.listen(port) - msg=""" + msg=''' ========================================================================= StochSS-Compute listening on port: %(port)d Cache directory: %(cache_path)s Connecting to Dask scheduler at: %(dask_host)s:%(dask_scheduler_port)d ========================================================================= -""" +''' log.info(msg, locals()) try: From 7c3ad48c3c4fc16eef1f3cdb4c13791a7ad732ed Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:30:06 -0400 Subject: [PATCH 23/31] accounts for requesting unique simulation --- stochss_compute/server/cache.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stochss_compute/server/cache.py b/stochss_compute/server/cache.py index 57dd915c..32c007f2 100644 --- a/stochss_compute/server/cache.py +++ b/stochss_compute/server/cache.py @@ -37,7 +37,7 @@ def __init__(self, cache_dir, results_id, unique=False): if unique is True: while cache_dir.endswith('/'): cache_dir = cache_dir[:-1] - self.cache_dir = cache_dir + '/unique/' + cache_dir = cache_dir + '/unique/' self.results_path = os.path.join(cache_dir, f'{results_id}.results') if not os.path.exists(cache_dir): os.makedirs(cache_dir) @@ -75,7 +75,7 @@ def is_empty(self) -> bool: return filesize == 0 return True - def is_ready(self, n_traj_wanted) -> bool: + def is_ready(self, n_traj_wanted=0) -> bool: ''' Check if the results are ready to be retrieved from the cache. From 3c8904ef7f08f5735bf14bf8810d76d73620c4ff Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:30:53 -0400 Subject: [PATCH 24/31] unique endpoint --- stochss_compute/server/results_unique.py | 72 ++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 stochss_compute/server/results_unique.py diff --git a/stochss_compute/server/results_unique.py b/stochss_compute/server/results_unique.py new file mode 100644 index 00000000..e7661b00 --- /dev/null +++ b/stochss_compute/server/results_unique.py @@ -0,0 +1,72 @@ +''' +stochss_compute.server.results +''' +# StochSS-Compute is a tool for running and caching GillesPy2 simulations remotely. +# Copyright (C) 2019-2023 GillesPy2 and StochSS developers. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from tornado.web import RequestHandler +from stochss_compute.core.errors import RemoteSimulationError +from stochss_compute.core.messages.results import ResultsResponse +from stochss_compute.server.cache import Cache + +from stochss_compute.core.log_config import init_logging +log = init_logging(__name__) + +class ResultsUniqueHandler(RequestHandler): + ''' + Endpoint for Results objects. + ''' + def __init__(self, application, request, **kwargs): + self.cache_dir = None + super().__init__(application, request, **kwargs) + + def data_received(self, chunk: bytes): + raise NotImplementedError() + + def initialize(self, cache_dir): + ''' + Set the cache directory. + + :param cache_dir: Path to the cache. + :type cache_dir: str + ''' + self.cache_dir = cache_dir + + async def get(self, results_id = None): + ''' + Process GET request. + + :param results_id: Unique id + :type results_id: str + + :param n_traj: Number of trajectories in the request. + :type n_traj: str + ''' + remote_ip = str(self.request.remote_ip) + log.info('<%(remote_ip)s> | Results Request | <%(results_id)s>', locals()) + if '' == results_id: + self.set_status(404, reason=f'Malformed request: {self.request.uri}') + self.finish() + raise RemoteSimulationError(f'Malformed request | <{self.request.remote_ip}>') + cache = Cache(self.cache_dir, results_id, unique=True) + if cache.is_ready(): + results = cache.read() + results_response = ResultsResponse(results) + self.write(results_response.encode()) + else: + # This should not happen! + self.set_status(404, f'Results "{results_id}" not found.') + self.finish() From 1e915130fde4bf5e8f4b9e634bc603f808d6abae Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:41:38 -0400 Subject: [PATCH 25/31] cleanup --- stochss_compute/server/status.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/stochss_compute/server/status.py b/stochss_compute/server/status.py index 56c11d8d..e662ba64 100644 --- a/stochss_compute/server/status.py +++ b/stochss_compute/server/status.py @@ -40,7 +40,7 @@ def __init__(self, application, request, **kwargs): def data_received(self, chunk: bytes): raise NotImplementedError() - + def initialize(self, scheduler_address, cache_dir): ''' Sets the address to the Dask scheduler and the cache directory. @@ -71,48 +71,50 @@ async def get(self, results_id, n_traj, task_id): self.set_status(404, reason=f'Malformed request: {self.request.uri}') self.finish() raise RemoteSimulationError(f'Malformed request: {self.request.uri}') - log.debug('ITS A DEBUG MESSAGE') self.results_id = results_id self.task_id = task_id n_traj = int(n_traj) - cache = Cache(self.cache_dir, task_id, results_id == task_id) + unique = results_id == task_id + log.debug('unique: %(unique)s', locals()) + cache = Cache(self.cache_dir, task_id, unique=unique) log_string = f'<{self.request.remote_ip}> | Results ID: <{results_id}> | Trajectories: {n_traj} | Task ID: {task_id}' log.info(log_string) - + msg = f'<{results_id}> | <{task_id}> | Status: ' exists = cache.exists() + log.debug('exists: %(exists)s', locals()) if exists: empty = cache.is_empty() if empty: if self.task_id not in ('', None): state, err = await self._check_with_scheduler() - logger.info(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}') + log.info(msg + SimStatus.RUNNING.name + f' | Task: {state} | Error: {err}') if state == 'erred': self._respond_error(err) else: self._respond_running(f'Scheduler task state: {state}') else: - logger.info(msg+SimStatus.DOES_NOT_EXIST.name) + log.info(msg+SimStatus.DOES_NOT_EXIST.name) self._respond_dne() else: ready = cache.is_ready(n_traj) if ready: - logger.info(msg+SimStatus.READY.name) + log.info(msg+SimStatus.READY.name) self._respond_ready() else: if self.task_id not in ('', None): state, err = await self._check_with_scheduler() - logger.info(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}') + log.info(msg+SimStatus.RUNNING.name+f' | Task: {state} | error: {err}') if state == 'erred': self._respond_error(err) else: self._respond_running(f'Scheduler task state: {state}') else: - logger.info(msg+SimStatus.DOES_NOT_EXIST.name) + log.info(msg+SimStatus.DOES_NOT_EXIST.name) self._respond_dne() else: - logger.info(msg+SimStatus.DOES_NOT_EXIST.name) + log.info(msg+SimStatus.DOES_NOT_EXIST.name) self._respond_dne() def _respond_ready(self): From e1e0a17feda6cd72eba277aeac750652c5bfdc3b Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sat, 13 May 2023 17:41:47 -0400 Subject: [PATCH 26/31] clean up --- stochss_compute/server/results.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/stochss_compute/server/results.py b/stochss_compute/server/results.py index 93091aa4..aed8e175 100644 --- a/stochss_compute/server/results.py +++ b/stochss_compute/server/results.py @@ -27,6 +27,12 @@ class ResultsHandler(RequestHandler): ''' Endpoint for Results objects. ''' + def __init__(self, application, request, **kwargs): + self.cache_dir = None + super().__init__(application, request, **kwargs) + + def data_received(self, chunk: bytes): + raise NotImplementedError() def initialize(self, cache_dir): ''' From 0a7897b36b5cab03dabac16211a53660281c1539 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sun, 14 May 2023 11:19:15 -0400 Subject: [PATCH 27/31] hopefully passes test --- stochss_compute/server/status.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/stochss_compute/server/status.py b/stochss_compute/server/status.py index e662ba64..87c303df 100644 --- a/stochss_compute/server/status.py +++ b/stochss_compute/server/status.py @@ -35,7 +35,6 @@ def __init__(self, application, request, **kwargs): self.cache_dir = None self.task_id = None self.results_id = None - self.unique = None super().__init__(application, request, **kwargs) def data_received(self, chunk: bytes): @@ -76,7 +75,7 @@ async def get(self, results_id, n_traj, task_id): n_traj = int(n_traj) unique = results_id == task_id log.debug('unique: %(unique)s', locals()) - cache = Cache(self.cache_dir, task_id, unique=unique) + cache = Cache(self.cache_dir, results_id, unique=unique) log_string = f'<{self.request.remote_ip}> | Results ID: <{results_id}> | Trajectories: {n_traj} | Task ID: {task_id}' log.info(log_string) From c81cddf7d40bf31141c8fa0bddc10543d25d514d Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sun, 14 May 2023 11:38:17 -0400 Subject: [PATCH 28/31] cleanup --- stochss_compute/server/results_unique.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/stochss_compute/server/results_unique.py b/stochss_compute/server/results_unique.py index e7661b00..c85b7f67 100644 --- a/stochss_compute/server/results_unique.py +++ b/stochss_compute/server/results_unique.py @@ -27,7 +27,7 @@ class ResultsUniqueHandler(RequestHandler): ''' - Endpoint for Results objects. + Endpoint for simulation-run-unique Results objects. ''' def __init__(self, application, request, **kwargs): self.cache_dir = None @@ -52,15 +52,15 @@ async def get(self, results_id = None): :param results_id: Unique id :type results_id: str - :param n_traj: Number of trajectories in the request. - :type n_traj: str ''' - remote_ip = str(self.request.remote_ip) - log.info('<%(remote_ip)s> | Results Request | <%(results_id)s>', locals()) if '' == results_id: self.set_status(404, reason=f'Malformed request: {self.request.uri}') self.finish() raise RemoteSimulationError(f'Malformed request | <{self.request.remote_ip}>') + # pylint:disable=possibly-unused-variable + remote_ip = str(self.request.remote_ip) + # pylint:enable=possibly-unused-variable + log.info('<%(remote_ip)s> | Results Request | <%(results_id)s>', locals()) cache = Cache(self.cache_dir, results_id, unique=True) if cache.is_ready(): results = cache.read() From 6bf957756a7cbc33cece5c32944d7886adc71bf6 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sun, 14 May 2023 11:45:57 -0400 Subject: [PATCH 29/31] cleanup --- test/unit_tests/test_status_handler.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/test/unit_tests/test_status_handler.py b/test/unit_tests/test_status_handler.py index 69dc919f..d84b1dc0 100644 --- a/test/unit_tests/test_status_handler.py +++ b/test/unit_tests/test_status_handler.py @@ -1,5 +1,5 @@ ''' -test.unit_tests.test_launch +test.unit_tests.test_status_handler ''' import os import subprocess @@ -12,7 +12,7 @@ from stochss_compute.server.cache import Cache -class StatusTest(AsyncHTTPTestCase): +class StatusHandlerTest(AsyncHTTPTestCase): ''' Test StatusHandler class. ''' @@ -24,6 +24,7 @@ def tearDown(self) -> None: if os.path.exists(self.cache_dir): r_m = subprocess.Popen(['rm', '-r', self.cache_dir]) r_m.wait() + pass return super().tearDown() def get_app(self): @@ -102,3 +103,21 @@ def test_status_ready(self): status_response = StatusResponse.parse(response_raw.body) assert status_response.status == SimStatus.READY + def test_status_unique(self): + ''' + This uri should return a copy of these results + ''' + model = create_michaelis_menten() + results = model.run() + sim = SimulationRunRequest(model=model) + sim_hash = sim.hash() + cache = Cache(self.cache_dir, sim_hash) + cache.create() + cache.save(results) + uri = f'/api/v2/simulation/gillespy2/{sim_hash}/1//status' + response_raw = self.fetch(uri) + assert response_raw.code == 200 + status_response = StatusResponse.parse(response_raw.body) + assert status_response.status == SimStatus.READY + + From 823a3d9f59d75e87f869b3c6a627a226b23a5c29 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sun, 14 May 2023 11:46:07 -0400 Subject: [PATCH 30/31] forgot --- test/unit_tests/test_status_handler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/unit_tests/test_status_handler.py b/test/unit_tests/test_status_handler.py index d84b1dc0..b99b892f 100644 --- a/test/unit_tests/test_status_handler.py +++ b/test/unit_tests/test_status_handler.py @@ -24,7 +24,6 @@ def tearDown(self) -> None: if os.path.exists(self.cache_dir): r_m = subprocess.Popen(['rm', '-r', self.cache_dir]) r_m.wait() - pass return super().tearDown() def get_app(self): From 2ab1ab1966d872323040a76ecac456fea14bb850 Mon Sep 17 00:00:00 2001 From: mdip226 Date: Sun, 14 May 2023 11:55:27 -0400 Subject: [PATCH 31/31] catch bad regex --- stochss_compute/server/results.py | 2 +- stochss_compute/server/results_unique.py | 2 +- test/unit_tests/test_results_handler.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/stochss_compute/server/results.py b/stochss_compute/server/results.py index aed8e175..537d565d 100644 --- a/stochss_compute/server/results.py +++ b/stochss_compute/server/results.py @@ -53,7 +53,7 @@ async def get(self, results_id = None, n_traj = None): :param n_traj: Number of trajectories in the request. :type n_traj: str ''' - if '' in (results_id, n_traj): + if '' in (results_id, n_traj) or '/' in results_id or '/' in n_traj: self.set_status(404, reason=f'Malformed request: {self.request.uri}') self.finish() raise RemoteSimulationError(f'Malformed request | <{self.request.remote_ip}>') diff --git a/stochss_compute/server/results_unique.py b/stochss_compute/server/results_unique.py index c85b7f67..63ebac39 100644 --- a/stochss_compute/server/results_unique.py +++ b/stochss_compute/server/results_unique.py @@ -53,7 +53,7 @@ async def get(self, results_id = None): :type results_id: str ''' - if '' == results_id: + if '' == results_id or '/' in results_id: self.set_status(404, reason=f'Malformed request: {self.request.uri}') self.finish() raise RemoteSimulationError(f'Malformed request | <{self.request.remote_ip}>') diff --git a/test/unit_tests/test_results_handler.py b/test/unit_tests/test_results_handler.py index 0f8cfa2a..87258a21 100644 --- a/test/unit_tests/test_results_handler.py +++ b/test/unit_tests/test_results_handler.py @@ -1,5 +1,5 @@ ''' -test.unit_tests.test_results +test.unit_tests.test_results_handler ''' import os import subprocess