diff --git a/.github/workflows/typing.yml b/.github/workflows/typing.yml index 2b7c2a95566..9776580c8c7 100644 --- a/.github/workflows/typing.yml +++ b/.github/workflows/typing.yml @@ -32,11 +32,8 @@ jobs: run: | pip install ".[dev, types]" - name: Install dependencies - # type checking requires protobuf stubs run: | python -m pip install --upgrade pip - python -m pip install grpcio-tools - python -m grpc_tools.protoc -I src/_ert_com_protocol --mypy_out=src/_ert_com_protocol src/_ert_com_protocol/_schema.proto - run: echo ::add-matcher::.github/mypy-matcher.json - name: Run mypy run: | diff --git a/.mypy.ini b/.mypy.ini index f2e399bc2d9..58df62df530 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -39,9 +39,6 @@ ignore_missing_imports = True ignore_missing_imports = True ignore_errors = True -[mypy-ert.experiment_server._schema_pb2] -ignore_errors = True - [mypy-cwrap.*] ignore_missing_imports = True diff --git a/.pylintrc b/.pylintrc index 282e1a8a889..5f40be52800 100644 --- a/.pylintrc +++ b/.pylintrc @@ -32,7 +32,6 @@ limit-inference-results=100 # List of plugins (as comma separated values of python module names) to load, # usually to register additional checkers. load-plugins=pylint.extensions.no_self_use, - pylint_protobuf, # Pickle collected data for later comparisons. persistent=yes @@ -177,11 +176,6 @@ notes=FIXME, # produce valid context managers. contextmanager-decorators=contextlib.contextmanager -# List of members which are set dynamically and missed by pylint inference -# system, and so shouldn't trigger E1101 when accessed. Python regular -# expressions are accepted. -generated-members=_ert_com_protocol.* - # Tells whether missing members accessed in mixin class should be ignored. A # mixin class is detected if its name ends with "mixin" (case insensitive). ignore-mixin-members=yes diff --git a/docs/conf.py b/docs/conf.py index 3a4dba51fc1..c57c7276cdf 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -68,7 +68,6 @@ ("py:class", "pydantic.types.PositiveInt"), ("py:class", "LibresFacade"), ("py:class", "pandas.core.frame.DataFrame"), - ("py:class", "google.protobuf.descriptor.Descriptor"), ("py:class", "websockets.legacy.server.WebSocketServerProtocol"), ("py:class", "EnsembleReader"), ] diff --git a/pyproject.toml b/pyproject.toml index 15c446cd7cb..5662734e84d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,6 @@ requires = [ "ecl", "conan<2", "pybind11>=2.10.0", # If this comes out of sync with the version installed by Conan please update the version in CMakeLists - "grpcio-tools", ] build-backend = "setuptools.build_meta" @@ -66,7 +65,6 @@ dependencies=[ "packaging", "pandas", "pluggy>=1.3.0", - "protobuf", "psutil", "pydantic >= 1.10.8, < 2", "PyQt5", @@ -101,7 +99,6 @@ dev = [ "ecl_data_io", "furo", "flaky", - "grpcio-tools", "hypothesis<=6.83.0; python_version=='3.8'", # ipython pinned to 8.12.2 for python 3.8 support "hypothesis; python_version>='3.9'", "jsonpath_ng", @@ -136,18 +133,15 @@ style = [ "black", "ruff", "pylint", - "pylint-protobuf" ] types = [ "mypy", - "mypy-protobuf<3.4", "types-aiofiles", "types-requests", "types-PyYAML", "types-python-dateutil", "types-decorator", "types-docutils", - "types-protobuf", "types-tqdm", ] diff --git a/setup.py b/setup.py index dead81499f4..7019ad1fc26 100644 --- a/setup.py +++ b/setup.py @@ -1,59 +1,9 @@ import os -import subprocess import sys -from pathlib import Path -from setuptools import Command, find_packages -from setuptools.command.egg_info import egg_info +from setuptools import find_packages from skbuild import setup -# list of pair of .proto file and out directory -PROTOBUF_FILES = [("src/_ert_com_protocol/_schema.proto", "src/_ert_com_protocol")] - - -def compile_protocol_buffers(): - for proto, out_dir in PROTOBUF_FILES: - proto_path = Path(proto).parent - subprocess.run( - [ - sys.executable, - "-m", - "grpc_tools.protoc", - "-I", - proto_path, - f"--python_out={out_dir}", - proto, - ], - check=True, - ) - - -class EggInfo(egg_info): - """scikit-build uses the metadata of ert to determine what to include when building - the project. This determination results in files being copied to a special build - folder. If ert wants to compile e.g. protobuf files and have those included in the - distribution, those files needs to be a part of the distribution metadata, i.e. it - needs to happen in egg_info so that the compiled files are copied to the build - folder.""" - - def run(self): - compile_protocol_buffers() - egg_info.run(self) # old style class, no super() - - -class CompileProtocolBuffers(Command): - user_options = [] - - def initialize_options(self) -> None: - pass - - def finalize_options(self) -> None: - pass - - def run(self): - compile_protocol_buffers() - - # Corporate networks tend to be behind a proxy server with their own non-public # SSL certificates. Conan keeps its own certificates, # whose path we can override @@ -93,10 +43,6 @@ def package_files(directory): "-DCMAKE_OSX_DEPLOYMENT_TARGET=10.15", ], cmake_source_dir="src/clib/", - cmdclass={ - "egg_info": EggInfo, - "compile_protocol_buffers": CompileProtocolBuffers, - }, ) setup(**args) diff --git a/src/_ert_com_protocol/__init__.py b/src/_ert_com_protocol/__init__.py deleted file mode 100644 index 3fb5125b38f..00000000000 --- a/src/_ert_com_protocol/__init__.py +++ /dev/null @@ -1,90 +0,0 @@ -""" -The _ert_com_protocol package provides google protobuffer based state machine, which -holds the current state of an experiment implemented in ExperimentStateMachine. -""" -from ._schema_pb2 import ( - CANCELLED, - DONE, - ENSEMBLE_CANCELLED, - ENSEMBLE_FAILED, - ENSEMBLE_STARTED, - ENSEMBLE_STOPPED, - FAILED, - JOB_FAILURE, - JOB_RUNNING, - JOB_START, - JOB_SUCCESS, - PENDING, - RUNNING, - STARTING, - STEP_FAILURE, - STEP_PENDING, - STEP_RUNNING, - STEP_SUCCESS, - STEP_TIMEOUT, - STEP_UNKNOWN, - STEP_WAITING, - UNKNOWN, - WAITING, - DispatcherMessage, - Ensemble, - EnsembleId, - EnsembleStatus, - Experiment, - ExperimentId, - ExperimentStatus, - Job, - JobId, - JobStatus, - Realization, - RealizationId, - Step, - StepId, - StepStatus, -) -from ._state_machine import ExperimentStateMachine, node_status_builder -from .status_type_enum import queue_state_to_pbuf_type - -__all__ = ( - "ExperimentStateMachine", - "node_status_builder", - "DispatcherMessage", - "Ensemble", - "EnsembleId", - "Experiment", - "ExperimentId", - "Job", - "JobId", - "Realization", - "RealizationId", - "Step", - "StepId", - "WAITING", - "STARTING", - "PENDING", - "DONE", - "FAILED", - "UNKNOWN", - "RUNNING", - "CANCELLED", - "JOB_FAILURE", - "JOB_START", - "JOB_SUCCESS", - "JOB_RUNNING", - "STEP_FAILURE", - "STEP_PENDING", - "STEP_RUNNING", - "STEP_SUCCESS", - "STEP_UNKNOWN", - "STEP_WAITING", - "STEP_TIMEOUT", - "ENSEMBLE_STARTED", - "ENSEMBLE_STOPPED", - "ENSEMBLE_CANCELLED", - "ENSEMBLE_FAILED", - "EnsembleStatus", - "JobStatus", - "StepStatus", - "ExperimentStatus", - "queue_state_to_pbuf_type", -) diff --git a/src/_ert_com_protocol/_schema.proto b/src/_ert_com_protocol/_schema.proto deleted file mode 100644 index 98dcda55b0a..00000000000 --- a/src/_ert_com_protocol/_schema.proto +++ /dev/null @@ -1,132 +0,0 @@ -syntax = - "proto2"; // using proto2 to specify default-values - // could do proto3 but then our code gets more responsibility - // consider stronger support for json in proto3 - -package experimentserver; -import "google/protobuf/timestamp.proto"; - -enum Status { - UNKNOWN = 0; - STARTING = 1; - RUNNING = 2; - DONE = 3; - FAILED = 4; - CANCELLED = 5; - PENDING = 6; - WAITING = 7; -} - -enum StepStatus { - STEP_FAILURE = 0; - STEP_PENDING = 1; - STEP_RUNNING = 2; - STEP_SUCCESS = 3; - STEP_UNKNOWN = 4; - STEP_WAITING = 5; - STEP_TIMEOUT = 6; -} - -enum EnsembleStatus { - ENSEMBLE_STARTED = 0; - ENSEMBLE_STOPPED = 1; - ENSEMBLE_CANCELLED = 2; - ENSEMBLE_FAILED = 3; -} - -enum JobStatus { - JOB_START = 0; - JOB_RUNNING = 1; - JOB_SUCCESS = 2; - JOB_FAILURE = 3; -} - -enum ExperimentStatus { - EXPERIMENT_STARTED = 0; - EXPERIMENT_SUCCEEDED = 1; - EXPERIMENT_FAILED = 2; - EXPERIMENT_CANCELLED = 3; - EXPERIMENT_HOOK_STARTED = 4; - EXPERIMENT_HOOK_ENDED = 5; - EXPERIMENT_ANALYSIS_STARTED = 6; - EXPERIMENT_ANALYSIS_ENDED = 7; -} - -message DispatcherMessage { - oneof object { - Experiment experiment = 1; - Ensemble ensemble = 2; - Realization realization = 3; - Step step = 4; - Job job = 5; - } -} - -message ExperimentId { - required string id = 1 [default = "experiment"]; -} -message EnsembleId { - required ExperimentId experiment = 1; - required string id = 2 - [default = - "ensemble"]; // an ensemble is identified by a string, not index -} -message RealizationId { - required EnsembleId ensemble = 1; - required uint64 realization = 2 [default = 0]; -} -message StepId { - required RealizationId realization = 1; - required uint64 step = 2 [default = 0]; // replace with string? -} -message JobId { - required StepId step = 1; - required uint64 index = 2 [default = 0]; -} - -message Experiment { - required ExperimentId id = 1; // NOTE: in the message the id is optional - // since it often will be streamed - optional ExperimentStatus status = 2 [default = EXPERIMENT_STARTED]; - optional string message = 3; - - map ensembles = 101; -} -message Ensemble { - required EnsembleId id = 1; - optional EnsembleStatus status = 2 [default = ENSEMBLE_STARTED]; - - map realizations = 101; -} -message Realization { - required RealizationId id = 1; - optional StepStatus status = 2 [default = STEP_UNKNOWN]; - optional bool active = 3 - [default = true]; // not derived - active/inactive by user or algorithm - optional double start_time = 4; // can be derived from step-list - optional double end_time = 5; // can be derived from step-list - - map steps = 101; -} -message Step { - required StepId id = 1; - optional StepStatus status = 2 [default = STEP_UNKNOWN]; - optional double start_time = 3; // can be derived from job-list - optional double end_time = 4; // can be derived from job-list - - map jobs = 101; -} -message Job { - required JobId id = 1; - optional JobStatus status = 2 [default = JOB_START]; - optional google.protobuf.Timestamp start_time = 3; - optional google.protobuf.Timestamp end_time = 4; - optional string name = 6; - optional string error = 7; - optional string stdout = 8; - optional string stderr = 9; - - optional uint64 current_memory = 10; - optional uint64 max_memory = 11; - optional int32 exit_code = 12; -} diff --git a/src/_ert_com_protocol/_state_machine.py b/src/_ert_com_protocol/_state_machine.py deleted file mode 100644 index 354a235b0c8..00000000000 --- a/src/_ert_com_protocol/_state_machine.py +++ /dev/null @@ -1,241 +0,0 @@ -# pylint: disable=no-member # might be fixed when we migrate to proto3 -import logging -from functools import singledispatchmethod -from typing import Optional, Union - -import _ert_com_protocol._schema_pb2 as pb2 - -logger = logging.getLogger(__name__) - - -class _StateHandle: - """The :class:`_StateHandle` implements a mechanism for handling the protobuf - defined state machine. - General note: - We cannot directly assign values to a protobuf-map; eg: - experiment.ensembles[ens_id] = Ensemble(id=ens_id) - Useful links: - https://developers.google.com/protocol-buffers/docs/reference/csharp/class/google/protobuf/well-known-types/struct - https://developers.google.com/protocol-buffers/docs/reference/python-generated#map-fields - """ - - def __init__(self) -> None: - self._experiment: Optional[pb2.Experiment] = None - - def get_experiment(self, _id: pb2.ExperimentId) -> pb2.Experiment: - logger.debug(f"experiment id: {_id}") - if self._experiment: - if self._experiment.id.id != _id.id: - raise ExperimentStateMachine.IllegalStateUpdate( - f"Wrong experiment: expected {self._experiment.id.id} got {_id.id}" - ) - else: - self._experiment = pb2.Experiment(id=_id) - return self._experiment - - def get_ensemble(self, _id: pb2.EnsembleId) -> pb2.Ensemble: - logger.debug(f"ensemble id: {_id}") - experiment: pb2.Experiment = self.get_experiment(_id.experiment) - if _id.id not in experiment.ensembles: - experiment.ensembles[_id.id].CopyFrom(pb2.Ensemble(id=_id)) - return experiment.ensembles[_id.id] - - def get_realization(self, _id: pb2.RealizationId) -> pb2.Realization: - logger.debug(f"realization id: {_id}") - ensemble: pb2.Ensemble = self.get_ensemble(_id.ensemble) - if _id.realization not in ensemble.realizations: - ensemble.realizations[_id.realization].CopyFrom(pb2.Realization(id=_id)) - return ensemble.realizations[_id.realization] - - def get_step(self, _id: pb2.StepId) -> pb2.Step: - logger.debug(f"step id: {_id}") - real: pb2.Realization = self.get_realization(_id.realization) - if _id.step not in real.steps: - real.steps[_id.step].CopyFrom(pb2.Step(id=_id)) - return real.steps[_id.step] - - def get_job(self, _id: pb2.JobId) -> pb2.Job: - logger.debug(f"job id: {_id}") - step: pb2.Step = self.get_step(_id.step) - if _id.index not in step.jobs: - step.jobs[_id.index].CopyFrom(pb2.Job(id=_id)) - return step.jobs[_id.index] - - -class ExperimentStateMachine: - """The :class:`ExperimentStateMachine` implements a state machine for the - entire experiment. It allows an experiment to track its own state and - communicate it to others.""" - - def __init__(self) -> None: - self._state_handle: _StateHandle = _StateHandle() - - def successful_realizations(self, ensemble_id: str) -> int: - """Return an integer indicating the number of successful realizations - in the experiment given its ensemble ``id``. - return 0 if the ensemble has no successful realizations or ensemble has - no realizations registered.""" - - if ensemble_id not in self.state.ensembles: - return 0 - return sum( - self.state.ensembles[ensemble_id].realizations[id_real].status - == pb2.STEP_SUCCESS - for id_real in self.state.ensembles[ensemble_id].realizations - ) - - class IllegalStateUpdate(Exception): - def __init__(self, reason: str): - super().__init__(reason) - self.reason: str = reason - - class UninitializedState(IllegalStateUpdate): - pass - - @singledispatchmethod - def _update( - self, - _: Union[pb2.Job, pb2.Step, pb2.Realization, pb2.Ensemble, pb2.Experiment], - ) -> None: - pass - - @_update.register - def _(self, job: pb2.Job) -> None: - logger.debug( - f"Updating job {job.id.index=} " - f"status {pb2.JobStatus.Name(job.status)=} from pbuf!", - ) - old_job = self._state_handle.get_job(job.id) - old_job.MergeFrom(job) - - @_update.register - def _(self, real: pb2.Realization) -> None: - logger.debug( - f"Updating realization id {real.id.realization} " - f" statius {pb2.StepStatus.Name(real.status)} from pbuf!", - ) - old_real = self._state_handle.get_realization(real.id) - old_real.MergeFrom(real) - - @_update.register - def _(self, step: pb2.Step) -> None: - logger.debug( - f"Updating step id {step.id.step} status " - f"{pb2.StepStatus.Name(step.status)} from pbuf!", - ) - old_step = self._state_handle.get_step(step.id) - old_step.MergeFrom(step) - # if step==success then set realization=success - if step.status == pb2.STEP_SUCCESS: - self._update( - pb2.Realization(id=step.id.realization, status=pb2.STEP_SUCCESS), - ) - - @_update.register - def _(self, ens: pb2.Ensemble) -> None: - logger.debug( - f"Updating step id {ens.id.id} status" - f"{pb2.EnsembleStatus.Name(ens.status)} from pbuf!", - ) - old_ens = self._state_handle.get_ensemble(ens.id) - old_ens.MergeFrom(ens) - - @_update.register - def _(self, exp: pb2.Experiment) -> None: - logger.debug(f"Updating experiment id {exp.id.id} from pbuf!") - old_exp = self._state_handle.get_experiment(exp.id) - old_exp.MergeFrom(exp) - - async def update( - self, - msg: Union[ - pb2.Job, - pb2.Step, - pb2.Realization, - pb2.Ensemble, - pb2.Experiment, - pb2.DispatcherMessage, - ], - ) -> None: - try: - if isinstance(msg, pb2.DispatcherMessage): - self._update(getattr(msg, str(msg.WhichOneof("object")))) - else: - self._update(msg) - except Exception: - logger.error(f"Failed state machine update! Current state: {self.state}") - raise - - @property - def state(self) -> pb2.Experiment: - if self._state_handle._experiment is None: - raise ExperimentStateMachine.UninitializedState( - "Experiment must be initialized first!" - ) - return self._state_handle._experiment - - -def node_status_builder( # pylint: disable=too-many-arguments - status: str, - experiment_id: str, - ensemble_id: Optional[str] = None, - realization_id: Optional[int] = None, - step_id: Optional[int] = None, - job_id: Optional[int] = None, -) -> pb2.DispatcherMessage: - """Builds a DispatcherMessage based on the given argument list. - It decides which Protobuf object to create based on the set of ids. - It doesn't check whether the status is a valid one, it just raises ValueError - in case the status was wrong. - - Args: - status: get status and converts it protobuf status, validity not checked. - experiment_id: mandatory id of experiment - ensemble_id: id of ensemble. - realization_id: realization index. - step_id: step index. - job_id: job index. - - Returns: - Dispatcher message encapsulating the corresponding protobuf object. - """ - experiment = pb2.ExperimentId(id=experiment_id) - if ensemble_id is not None: - ensemble = pb2.EnsembleId( - id=ensemble_id, - experiment=experiment, - ) - if realization_id is not None: - realization = pb2.RealizationId( - realization=realization_id, - ensemble=ensemble, - ) - if step_id is not None: - step = pb2.StepId( - step=step_id, - realization=realization, - ) - if job_id is not None: - job = pb2.JobId( - index=job_id, - step=step, - ) - return pb2.DispatcherMessage( - job=pb2.Job(id=job, status=pb2.JobStatus.Value(status)) - ) - return pb2.DispatcherMessage( - step=pb2.Step(id=step, status=pb2.StepStatus.Value(status)) - ) - return pb2.DispatcherMessage( - realization=pb2.Realization( - id=realization, status=pb2.StepStatus.Value(status) - ) - ) - return pb2.DispatcherMessage( - ensemble=pb2.Ensemble(id=ensemble, status=pb2.EnsembleStatus.Value(status)) - ) - return pb2.DispatcherMessage( - experiment=pb2.Experiment( - id=experiment, status=pb2.ExperimentStatus.Value(status) - ) - ) diff --git a/src/_ert_com_protocol/status_type_enum.py b/src/_ert_com_protocol/status_type_enum.py deleted file mode 100644 index e34ebf72b16..00000000000 --- a/src/_ert_com_protocol/status_type_enum.py +++ /dev/null @@ -1,24 +0,0 @@ -from typing import Dict - -_queue_state_to_pbuf_type_map: Dict[str, str] = { - "NOT_ACTIVE": "STEP_WAITING", - "WAITING": "STEP_WAITING", - "SUBMITTED": "STEP_WAITING", - "PENDING": "STEP_PENDING", - "RUNNING": "STEP_RUNNING", - "DONE": "STEP_RUNNING", - "EXIT": "STEP_RUNNING", - "IS_KILLED": "STEP_FAILED", - "DO_KILL": "STEP_FAILED", - "SUCCESS": "STEP_SUCCESS", - "RUNNING_DONE_CALLBACK": "STEP_RUNNING", - "RUNNING_EXIT_CALLBACK": "STEP_RUNNING", - "STATUS_FAILURE": "STEP_UNKNOWN", - "FAILED": "STEP_FAILED", - "DO_KILL_NODE_FAILURE": "STEP_FAILED", - "UNKNOWN": "STEP_UNKNOWN", -} - - -def queue_state_to_pbuf_type(status: str) -> str: - return _queue_state_to_pbuf_type_map[status] diff --git a/src/_ert_job_runner/cli.py b/src/_ert_job_runner/cli.py index 29c96b77f51..a82ed011e8f 100644 --- a/src/_ert_job_runner/cli.py +++ b/src/_ert_job_runner/cli.py @@ -34,14 +34,6 @@ def _setup_reporters( evaluator_url=dispatch_url, token=ee_token, cert_path=ee_cert_path ) ) - elif experiment_id: - reporters.append( - reporting.Protobuf( - experiment_url=dispatch_url, - token=ee_token, - cert_path=ee_cert_path, - ) - ) else: reporters.append(reporting.File()) return reporters diff --git a/src/_ert_job_runner/reporting/__init__.py b/src/_ert_job_runner/reporting/__init__.py index 8f91b9bd3d8..5ca15a74d3d 100644 --- a/src/_ert_job_runner/reporting/__init__.py +++ b/src/_ert_job_runner/reporting/__init__.py @@ -6,12 +6,10 @@ from .event import Event from .file import File from .interactive import Interactive -from .protobuf import Protobuf __all__ = [ "File", "Interactive", "Reporter", "Event", - "Protobuf", ] diff --git a/src/_ert_job_runner/reporting/protobuf.py b/src/_ert_job_runner/reporting/protobuf.py deleted file mode 100644 index bb49671a257..00000000000 --- a/src/_ert_job_runner/reporting/protobuf.py +++ /dev/null @@ -1,124 +0,0 @@ -import logging -import queue -import threading -from pathlib import Path -from typing import Union - -from _ert_com_protocol import ( - JOB_FAILURE, - JOB_RUNNING, - JOB_START, - JOB_SUCCESS, - node_status_builder, -) -from _ert_job_runner.client import Client -from _ert_job_runner.reporting.base import Reporter -from _ert_job_runner.reporting.message import ( - _JOB_EXIT_FAILED_STRING, - Exited, - Finish, - Init, - Running, - Start, -) -from _ert_job_runner.reporting.statemachine import StateMachine - -logger = logging.getLogger(__name__) - - -class Protobuf(Reporter): - def __init__(self, experiment_url, token=None, cert_path=None): - self._url = experiment_url - self._token = token - if cert_path is not None: - with open(cert_path, encoding="utf-8") as f: - self._cert = f.read() - else: - self._cert = None - - self._statemachine = StateMachine() - self._statemachine.add_handler((Init,), self._init_handler) - self._statemachine.add_handler((Start, Running, Exited), self._job_handler) - self._statemachine.add_handler((Finish,), self._finished_handler) - - self._experiment_id = None - self._ens_id = None - self._real_id = None - self._step_id = None - self._event_queue = queue.Queue() - self._event_publisher_thread = threading.Thread(target=self._publish_event) - - def _dump_event(self, msg): - if msg is None: - self._event_queue.put(None) - else: - self._event_queue.put(msg.SerializeToString()) - - def _publish_event(self): - logger.debug("Publishing event.") - with Client(self._url, self._token, self._cert) as client: - while True: - event = self._event_queue.get() - if event is None: - return - client.send(event) - - def report(self, msg): - self._statemachine.transition(msg) - - def _init_handler(self, msg: Init): - self._experiment_id = msg.experiment_id - self._ens_id = msg.ens_id - self._real_id = int(msg.real_id) - self._step_id = int(msg.step_id) - self._event_publisher_thread.start() - - def _job_handler(self, msg: Union[Start, Running, Exited]): - job_name = msg.job.name() - event = node_status_builder( - status="JOB_START", - experiment_id=self._experiment_id, - ensemble_id=self._ens_id, - realization_id=self._real_id, - step_id=self._step_id, - job_id=msg.job.index, - ) - if isinstance(msg, Start): - logger.debug(f"Job {job_name} was successfully started") - event.job.status = JOB_START - event.job.stdout = str(Path(msg.job.std_out).resolve()) - event.job.stderr = str(Path(msg.job.std_err).resolve()) - self._dump_event(event) - if not msg.success(): - logger.error(f"Job {job_name} FAILED to start") - event.job.status = JOB_FAILURE - event.job.error = msg.error_message - self._dump_event(event) - - elif isinstance(msg, Exited): - if msg.success(): - logger.debug(f"Job {job_name} exited successfully") - event.job.status = JOB_SUCCESS - else: - logger.error( - _JOB_EXIT_FAILED_STRING.format( - job_name=msg.job.name(), - exit_code=msg.exit_code, - error_message=msg.error_message, - ) - ) - event.job.status = JOB_FAILURE - event.job.exit_code = msg.exit_code - event.job.error = msg.error_message - self._dump_event(event) - - elif isinstance(msg, Running): - logger.debug(f"{job_name} job is running") - event.job.status = JOB_RUNNING - event.job.current_memory = msg.current_memory_usage - event.job.max_memory = msg.max_memory_usage - self._dump_event(event) - - def _finished_handler(self, msg): - self._dump_event(None) - self._event_publisher_thread.join() diff --git a/src/ert/__main__.py b/src/ert/__main__.py index b315c13e7bd..dc8b016a25f 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -593,13 +593,6 @@ def main() -> None: with start_ert_server(args.mode), ErtPluginContext() as context: context.plugin_manager.add_logging_handle_to_root(logging.getLogger()) logger.info(f"Running ert with {args}") - if ( - FeatureToggling.is_enabled("experiment-server") - and args.mode != ENSEMBLE_EXPERIMENT_MODE - ): - raise NotImplementedError( - f"experiment-server can only run '{ENSEMBLE_EXPERIMENT_MODE}'" - ) args.func(args, context.plugin_manager) except (ErtCliError, ErtTimeoutError) as err: logger.exception(str(err)) diff --git a/src/ert/cli/main.py b/src/ert/cli/main.py index 61bac00459e..932d1d7314f 100644 --- a/src/ert/cli/main.py +++ b/src/ert/cli/main.py @@ -1,11 +1,9 @@ #!/usr/bin/env python -import asyncio import contextlib import logging import os import sys import threading -import uuid from typing import Any, TextIO from ert.cli import ( @@ -24,8 +22,7 @@ from ert.ensemble_evaluator import EvaluatorServerConfig, EvaluatorTracker from ert.libres_facade import LibresFacade from ert.namespace import Namespace -from ert.shared.feature_toggling import FeatureToggling -from ert.storage import StorageAccessor, open_storage +from ert.storage import open_storage from ert.storage.local_storage import local_storage_set_ert_config @@ -87,20 +84,6 @@ def run_cli(args: Namespace, _: Any = None) -> None: responses=ert.ensembleConfig().response_configuration, ) - # Note that asyncio.run should be called once in ert/shared/main.py - if FeatureToggling.is_enabled("experiment-server"): - asyncio.run( - _run_cli_async( - ert, - storage, - args, - evaluator_server_config, - experiment.id, - ), - debug=False, - ) - return - try: model = create_model( ert, @@ -156,20 +139,3 @@ def run_cli(args: Namespace, _: Any = None) -> None: if model.hasRunFailed(): raise ErtCliError(model.getFailMessage()) - - -async def _run_cli_async( - ert: EnKFMain, - storage: StorageAccessor, - args: Any, - ee_config: EvaluatorServerConfig, - experiment_id: uuid.UUID, -) -> None: - # pylint: disable=import-outside-toplevel - from ert.experiment_server import ExperimentServer - - experiment_server = ExperimentServer(ee_config) - experiment_server.add_experiment( - create_model(ert, storage, args, experiment_id) # type: ignore - ) - await experiment_server.run_experiment(experiment_id=experiment_id) diff --git a/src/ert/ensemble_evaluator/_builder/_ensemble.py b/src/ert/ensemble_evaluator/_builder/_ensemble.py index 3e5d3befa49..ce4d5830f88 100644 --- a/src/ert/ensemble_evaluator/_builder/_ensemble.py +++ b/src/ert/ensemble_evaluator/_builder/_ensemble.py @@ -1,5 +1,4 @@ import logging -from abc import abstractmethod from typing import ( TYPE_CHECKING, Any, @@ -15,7 +14,6 @@ from cloudevents.conversion import to_json from cloudevents.http import CloudEvent -from _ert_com_protocol import DispatcherMessage from _ert_job_runner.client import Client from ert.ensemble_evaluator import state from ert.ensemble_evaluator.snapshot import ( @@ -31,8 +29,6 @@ from ._realization import Realization if TYPE_CHECKING: - import asyncio - from ..config import EvaluatorServerConfig logger = logging.getLogger(__name__) @@ -116,11 +112,6 @@ def __repr__(self) -> str: def evaluate(self, config: "EvaluatorServerConfig") -> None: pass - async def evaluate_async( - self, config: "EvaluatorServerConfig", experiment_id: str - ) -> None: - pass - def cancel(self) -> None: pass @@ -160,23 +151,6 @@ async def send_cloudevent( # pylint: disable=too-many-arguments async with Client(url, token, cert, max_retries=retries) as client: await client._send(to_json(event, data_marshaller=evaluator_marshaller)) - # TODO: make legacy-only? - # See https://github.com/equinor/ert/issues/3456 - @property - @abstractmethod - def output_bus( - self, - ) -> "asyncio.Queue[DispatcherMessage]": - raise NotImplementedError - - # TODO: make legacy-only? - # See https://github.com/equinor/ert/issues/3456 - async def queue_cloudevent( - self, - event: DispatcherMessage, - ) -> None: - self.output_bus.put_nowait(event) - def get_successful_realizations(self) -> int: return self._snapshot.get_successful_realizations() diff --git a/src/ert/ensemble_evaluator/_builder/_legacy.py b/src/ert/ensemble_evaluator/_builder/_legacy.py index 90fc556fa9d..7d51edc3195 100644 --- a/src/ert/ensemble_evaluator/_builder/_legacy.py +++ b/src/ert/ensemble_evaluator/_builder/_legacy.py @@ -12,16 +12,12 @@ Callable, Dict, List, - Literal, Optional, Tuple, - Union, - overload, ) from cloudevents.http.event import CloudEvent -import _ert_com_protocol from ert.async_utils import get_event_loop from ert.ensemble_evaluator import identifiers from ert.job_queue import Driver, JobQueue @@ -35,13 +31,10 @@ from ..config import EvaluatorServerConfig from ._realization import Realization -MsgType = Union[CloudEvent, _ert_com_protocol.DispatcherMessage] - CONCURRENT_INTERNALIZATION = 10 logger = logging.getLogger(__name__) event_logger = logging.getLogger("ert.event_log") -experiment_logger = logging.getLogger("ert.experiment_server") class LegacyEnsemble(Ensemble): @@ -63,77 +56,29 @@ def __init__( # pylint: disable=too-many-arguments ) self._analysis_config = analysis_config self._config: Optional[EvaluatorServerConfig] = None - self._output_bus: asyncio.Queue[ - _ert_com_protocol.DispatcherMessage - ] = asyncio.Queue() - - @overload - def generate_event_creator( - self, experiment_id: str - ) -> Callable[[str, Optional[int]], _ert_com_protocol.DispatcherMessage]: - pass - - @overload - def generate_event_creator( - self, experiment_id: Literal[None] = None - ) -> Callable[[str, Optional[int]], CloudEvent]: - pass def generate_event_creator( self, experiment_id: Optional[str] = None - ) -> Union[ - Callable[[str, Optional[int]], CloudEvent], - Callable[[str, Optional[int]], _ert_com_protocol.DispatcherMessage], - ]: - if experiment_id is not None: - - def node_builder( - status: str, real_id: Optional[int] = None - ) -> _ert_com_protocol.DispatcherMessage: - assert experiment_id # mypy error - # TODO: this might got to _ert_com_protocol enum - status_tab = { - identifiers.EVTYPE_ENSEMBLE_STARTED: "ENSEMBLE_STARTED", - identifiers.EVTYPE_ENSEMBLE_FAILED: "ENSEMBLE_FAILED", - identifiers.EVTYPE_ENSEMBLE_CANCELLED: "ENSEMBLE_CANCELLED", - identifiers.EVTYPE_ENSEMBLE_STOPPED: "ENSEMBLE_STOPPED", - identifiers.EVTYPE_FM_STEP_TIMEOUT: "STEP_TIMEOUT", + ) -> Callable[[str, Optional[int]], CloudEvent]: + def event_builder(status: str, real_id: Optional[int] = None) -> CloudEvent: + source = f"/ert/ensemble/{self.id_}" + if real_id is not None: + source += f"/real/{real_id}/step/0" + return CloudEvent( + { + "type": status, + "source": source, + "id": str(uuid.uuid1()), } - step_id = None - if real_id is not None: - step_id = 0 - - return _ert_com_protocol.node_status_builder( - ensemble_id=self.id_, - experiment_id=experiment_id, - status=status_tab[status], - realization_id=real_id, - step_id=step_id, - ) - - return node_builder - - else: - - def event_builder(status: str, real_id: Optional[int] = None) -> CloudEvent: - source = f"/ert/ensemble/{self.id_}" - if real_id is not None: - source += f"/real/{real_id}/step/0" - return CloudEvent( - { - "type": status, - "source": source, - "id": str(uuid.uuid1()), - } - ) + ) - return event_builder + return event_builder def setup_timeout_callback( self, - timeout_queue: asyncio.Queue[MsgType], - cloudevent_unary_send: Callable[[MsgType], Awaitable[None]], - event_generator: Callable[[str, Optional[int]], MsgType], + timeout_queue: asyncio.Queue[CloudEvent], + cloudevent_unary_send: Callable[[CloudEvent], Awaitable[None]], + event_generator: Callable[[str, Optional[int]], CloudEvent], ) -> Tuple[Callable[[int], None], asyncio.Task[None]]: def on_timeout(iens: int) -> None: timeout_queue.put_nowait( @@ -204,22 +149,9 @@ def _evaluate(self) -> None: finally: get_event_loop().close() - async def evaluate_async( - self, - config: EvaluatorServerConfig, - experiment_id: str, - ) -> None: - self._config = config - await self._evaluate_inner( - cloudevent_unary_send=self.queue_cloudevent, # type: ignore - output_bus=self.output_bus, - experiment_id=experiment_id, - ) - async def _evaluate_inner( # pylint: disable=too-many-branches self, - cloudevent_unary_send: Callable[[MsgType], Awaitable[None]], - output_bus: Optional[asyncio.Queue[_ert_com_protocol.DispatcherMessage]] = None, + cloudevent_unary_send: Callable[[CloudEvent], Awaitable[None]], experiment_id: Optional[str] = None, ) -> None: """ @@ -281,40 +213,21 @@ async def _evaluate_inner( # pylint: disable=too-many-branches # Tell queue to pass info to the jobs-file # NOTE: This touches files on disk... sema = threading.BoundedSemaphore(value=CONCURRENT_INTERNALIZATION) - if ( - output_bus and experiment_id is not None - ): # when running experiment server - self._job_queue.add_dispatch_information_to_jobs_file( - ens_id=self.id_, - dispatch_url=self._config.dispatch_uri, - cert=self._config.cert, - token=self._config.token, - experiment_id=experiment_id, - ) - # Finally, run the queue-loop until it finishes or raises - await self._job_queue.execute_queue_comms_via_bus( - experiment_id=experiment_id, - ens_id=self.id_, - pool_sema=sema, - evaluators=queue_evaluators, # type: ignore - output_bus=output_bus, - ) - else: - self._job_queue.add_dispatch_information_to_jobs_file( - ens_id=self.id_, - dispatch_url=self._config.dispatch_uri, - cert=self._config.cert, - token=self._config.token, - ) - # Finally, run the queue-loop until it finishes or raises - await self._job_queue.execute_queue_via_websockets( - self._config.dispatch_uri, - self.id_, - sema, - queue_evaluators, # type: ignore - ee_cert=self._config.cert, - ee_token=self._config.token, - ) + self._job_queue.add_dispatch_information_to_jobs_file( + ens_id=self.id_, + dispatch_url=self._config.dispatch_uri, + cert=self._config.cert, + token=self._config.token, + ) + # Finally, run the queue-loop until it finishes or raises + await self._job_queue.execute_queue_via_websockets( + self._config.dispatch_uri, + self.id_, + sema, + queue_evaluators, # type: ignore + ee_cert=self._config.cert, + ee_token=self._config.token, + ) except asyncio.CancelledError: logger.debug("ensemble was cancelled") @@ -345,9 +258,3 @@ def cancellable(self) -> bool: def cancel(self) -> None: self._job_queue.kill_all_jobs() logger.debug("evaluator cancelled") - - @property - def output_bus( - self, - ) -> "asyncio.Queue[_ert_com_protocol.DispatcherMessage]": - return self._output_bus diff --git a/src/ert/experiment_server/__init__.py b/src/ert/experiment_server/__init__.py deleted file mode 100644 index a51211c71da..00000000000 --- a/src/ert/experiment_server/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -The experiment_server package provides facilities for creating, managing and -running experiments, as well as defining protocols for orderly and meaningful -communication between distributed parts of ERT. - -This package defines the following: - - - An Experiment protocol [1] which all experiments are expected to implement - - The communication protocols for communication between: 1) the client (GUI, - CLI, and other stakeholders) and the server, 2) the server and remote - workers. - - an API for creating, managing and running experiments. - -.. note:: - The experiment server is currently under active design and development, - and is considered experimental. - - -[1] see https://peps.python.org/pep-0544/ for information about protocols -""" -from ._server import ExperimentServer - -__all__ = ("ExperimentServer",) diff --git a/src/ert/experiment_server/_experiment_protocol.py b/src/ert/experiment_server/_experiment_protocol.py deleted file mode 100644 index f44ad637f80..00000000000 --- a/src/ert/experiment_server/_experiment_protocol.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING, Protocol, Union -from uuid import UUID - -from cloudevents.http import CloudEvent - -if TYPE_CHECKING: - from _ert_com_protocol import DispatcherMessage - from ert.ensemble_evaluator import EvaluatorServerConfig - - -class Experiment(Protocol): - """The experiment protocol which all experiments must implement.""" - - async def run(self, evaluator_server_config: EvaluatorServerConfig) -> None: - """Run the experiment to completion.""" - - @property - def id(self) -> UUID: - """The id of the experiment.""" - - async def dispatch(self, event: Union[CloudEvent, DispatcherMessage]) -> None: - """dispatch(self, event) -> None - event is a ``CloudEvent`` https://github.com/cloudevents/sdk-python - or a ``protocol buffer object`` https://developers.google.com/protocol-buffers - The experiment will internalize the event and update its state. - """ - - # TODO: this is preliminary, see https://github.com/equinor/ert/issues/3407 - async def successful_realizations(self, iter_: int) -> int: - """Return the amount of successful realizations.""" diff --git a/src/ert/experiment_server/_registry.py b/src/ert/experiment_server/_registry.py deleted file mode 100644 index 134a3d703a3..00000000000 --- a/src/ert/experiment_server/_registry.py +++ /dev/null @@ -1,34 +0,0 @@ -from typing import Dict, List -from uuid import UUID - -from ._experiment_protocol import Experiment - - -class Registry: - """:class:`Registry` is a registry for registering experiments to be run - or managed by the :class:`ert.experiment_server._server.ExperimentServer`. - The registry should be manageable, but also be queryable by the - :class:`ert.experiment_server._server.ExperimentServer`. The class's - purpose is to decouple grunt tasks like managing the registry from the more - complex task of implementing an API on top of it. - """ - - def __init__(self) -> None: - self._experiments: Dict[UUID, Experiment] = {} - - def add_experiment(self, experiment: Experiment) -> None: - """Add an experiment to the registry. - - An id is generated here, but it should share (or receive) this ID from - ert-storage. See [1]. - - [1] https://github.com/equinor/ert/issues/3437#issue-1247962008 - """ - self._experiments[experiment.id] = experiment - - @property - def all_experiments(self) -> List[Experiment]: - return list(self._experiments.values()) - - def get_experiment(self, experiment_id: UUID) -> Experiment: - return self._experiments[experiment_id] diff --git a/src/ert/experiment_server/_server.py b/src/ert/experiment_server/_server.py deleted file mode 100644 index 88a1b3bde77..00000000000 --- a/src/ert/experiment_server/_server.py +++ /dev/null @@ -1,164 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -import pickle -from contextlib import contextmanager -from typing import TYPE_CHECKING, Iterator, Set, Union -from uuid import UUID - -from cloudevents.exceptions import DataUnmarshallerError -from cloudevents.http import CloudEvent, from_json -from google.protobuf.message import DecodeError -from websockets.legacy.server import WebSocketServerProtocol -from websockets.server import serve - -from _ert_com_protocol import DispatcherMessage -from ert.serialization import evaluator_unmarshaller - -from ._experiment_protocol import Experiment -from ._registry import Registry - -if TYPE_CHECKING: - from ert.ensemble_evaluator.config import EvaluatorServerConfig - - -logger = logging.getLogger(__name__) -event_logger = logging.getLogger("ert.event_log") - - -class ExperimentServer: - """:class:`ExperimentServer` implements the experiment server API, allowing - creation, management and running of experiments as defined by the - :class:`ert.experiment_server._experiment_protocol.Experiment` protocol. - - :class:`ExperimentServer` also runs a server to which clients and remote - workers can connect. - """ - - def __init__(self, ee_config: EvaluatorServerConfig) -> None: - self._config = ee_config - self._registry = Registry() - self._clients: Set[WebSocketServerProtocol] = set() - self._server_done = asyncio.get_running_loop().create_future() - self._server_task = asyncio.create_task(self._server()) - - async def _handler(self, websocket: WebSocketServerProtocol, path: str) -> None: - elements = path.split("/") - if elements[1] == "client": - await self.handle_client(websocket, path) - elif elements[1] == "dispatch": - logger.debug("dispatcher connected") - await self.handle_dispatch(websocket, path) - else: - logger.info(f"Connection attempt to unknown path: {path}.") - - async def stop(self) -> None: - """Stop the server.""" - logger.debug("stopping experiment server gracefully...") - try: - self._server_done.set_result(None) - except asyncio.InvalidStateError: - logger.debug("was already gracefully asked to stop.") - pass - await self._server_task - - async def handle_dispatch( - self, websocket: WebSocketServerProtocol, path: str - ) -> None: - """Handle incoming "dispatch" connections, which refers to remote workers.""" - event: Union[CloudEvent, DispatcherMessage] - async for msg in websocket: - if isinstance(msg, bytes): - # all Protobuf objects come in DispatcherMessage container - # which needs to be parsed - event = DispatcherMessage() - try: - event.ParseFromString(msg) - except DecodeError: - logger.error(f"Cannot parse pbuf event: {msg.decode()}") - raise - else: - try: - event = from_json(msg, data_unmarshaller=evaluator_unmarshaller) - except DataUnmarshallerError: - event = from_json(msg, data_unmarshaller=pickle.loads) - - await self._registry.all_experiments[0].dispatch(event) - - @contextmanager - def store_client(self, websocket: WebSocketServerProtocol) -> Iterator[None]: - """Context manager for a client connection handler, allowing to know how - many clients are connected.""" - logger.debug("client %s connected", websocket) - self._clients.add(websocket) - yield - self._clients.remove(websocket) - - async def handle_client( - self, websocket: WebSocketServerProtocol, path: str - ) -> None: - """Handle incoming client connections.""" - with self.store_client(websocket): - async for message in websocket: - client_event = from_json( - message, data_unmarshaller=evaluator_unmarshaller - ) - logger.debug(f"got message from client: {client_event}") - - async def _server(self) -> None: - try: - async with serve( - self._handler, - sock=self._config.get_socket(), - ssl=self._config.get_server_ssl_context(), - ): - logger.debug("Running experiment server") - await self._server_done - logger.debug("Async server exiting.") - except Exception: # pylint: disable=broad-except - logger.exception("crash/burn") - - def add_experiment(self, experiment: Experiment) -> UUID: - self._registry.add_experiment(experiment) - return experiment.id - - async def run_experiment(self, experiment_id: UUID) -> None: - """Run the experiment with the given experiment_id. - - This is a helper method for use by the CLI, where only one experiment - at a time makes sense. This method therefore runs the experiment, and - attempts to gracefully shut down the server when complete. - """ - logger.debug("running experiment %s", experiment_id) - experiment = self._registry.get_experiment(experiment_id) - - experiment_task = asyncio.create_task(experiment.run(self._config)) - - done, pending = await asyncio.wait( - [self._server_task, experiment_task], return_when=asyncio.FIRST_COMPLETED - ) - - if experiment_task in done: - logger.debug("experiment %s was done", experiment_id) - # raise experiment exception if any - try: - experiment_task.result() - successful_reals = await experiment.successful_realizations(0) - # This is currently API - print(f"Successful realizations: {successful_reals}") - except Exception as e: # pylint: disable=broad-except - print(f"Experiment failed: {str(e)}") - raise - finally: - # wait for shutdown of server - await self.stop() - return - - # experiment is pending, but the server died, so try cancelling the experiment - # then raise the server's exception - for pending_task in pending: - logger.debug("task %s was pending, cancelling...", pending_task) - pending_task.cancel() - for done_task in done: - done_task.result() diff --git a/src/ert/job_queue/queue.py b/src/ert/job_queue/queue.py index 95d878345af..44484b990e9 100644 --- a/src/ert/job_queue/queue.py +++ b/src/ert/job_queue/queue.py @@ -31,7 +31,6 @@ from websockets.datastructures import Headers from websockets.exceptions import ConnectionClosed -import _ert_com_protocol from ert.constant_filenames import CERT_FILE, JOBS_FILE, ERROR_file, STATUS_file from ert.job_queue.job_queue_node import JobQueueNode from ert.job_queue.job_status import JobStatus @@ -252,35 +251,6 @@ def _translate_change_to_cloudevent( }, ) - @staticmethod - def _translate_change_to_protobuf( - experiment_id: str, ens_id: str, real_id: int, status: str - ) -> _ert_com_protocol.DispatcherMessage: - return _ert_com_protocol.node_status_builder( - status=_ert_com_protocol.queue_state_to_pbuf_type(status), - experiment_id=experiment_id, - ensemble_id=ens_id, - realization_id=real_id, - step_id=0, - ) - - @staticmethod - async def _queue_changes( - experiment_id: str, - ens_id: str, - changes: Dict[int, str], - output_bus: "asyncio.Queue[_ert_com_protocol.DispatcherMessage]", - ) -> None: - events = [ - JobQueue._translate_change_to_protobuf( - experiment_id, ens_id, real_id, status - ) - for real_id, status in changes.items() - ] - - for event in events: - output_bus.put_nowait(event) - @staticmethod async def _publish_changes( ens_id: str, @@ -419,60 +389,6 @@ async def execute_queue_via_websockets( # pylint: disable=too-many-arguments ens_id, self._differ.snapshot(), ee_connection ) - async def execute_queue_comms_via_bus( # pylint: disable=too-many-arguments - self, - experiment_id: str, - ens_id: str, - pool_sema: threading.BoundedSemaphore, - evaluators: List[Callable[..., Any]], - output_bus: "asyncio.Queue[_ert_com_protocol.DispatcherMessage]", - ) -> None: - if evaluators is None: - evaluators = [] - try: - await JobQueue._queue_changes( - experiment_id, ens_id, self._differ.snapshot(), output_bus - ) - while True: - self.launch_jobs(pool_sema) - - await asyncio.sleep(1) - - for func in evaluators: - func() - - changes = self.changes_after_transition() - await JobQueue._queue_changes( - experiment_id, ens_id, changes, output_bus - ) - - if self.stopped: - raise asyncio.CancelledError - - if not self.is_active(): - break - - except asyncio.CancelledError: - logger.debug("queue cancelled, stopping jobs...") - await self.stop_jobs_async() - logger.debug("jobs stopped, re-raising CancelledError") - raise - - except Exception: - logger.exception( - "unexpected exception in queue", - exc_info=True, - ) - await self.stop_jobs_async() - logger.debug("jobs stopped, re-raising exception") - raise - - self.assert_complete() - self._differ.transition(self.job_list) - await JobQueue._queue_changes( - experiment_id, ens_id, self._differ.snapshot(), output_bus - ) - # pylint: disable=too-many-arguments def add_job_from_run_arg( self, diff --git a/src/ert/logging/logger.conf b/src/ert/logging/logger.conf index a33e9cf5546..4e6745a8b9f 100644 --- a/src/ert/logging/logger.conf +++ b/src/ert/logging/logger.conf @@ -13,12 +13,6 @@ handlers: filename: ert-log.txt (): ert.logging.TimestampedFileHandler use_log_dir_from_env: true - experiment_server_file: - level: DEBUG - formatter: simple_with_threading - filename: experiment-log.txt - (): ert.logging.TimestampedFileHandler - use_log_dir_from_env: true asyncio_file: level: DEBUG formatter: simple_with_threading @@ -85,10 +79,6 @@ loggers: level: DEBUG handlers: [eefile] propagate: no - ert.experiment_server: - level: DEBUG - handlers: [experiment_server_file] - propagate: yes ert.storage.migration: level: INFO handlers: [terminal, migration_handler] diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index 1ff118d72ce..65b152d5d25 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -1,20 +1,14 @@ from __future__ import annotations import asyncio -import concurrent import logging import os import time import uuid -from abc import abstractmethod from contextlib import contextmanager -from functools import singledispatchmethod from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Union -from cloudevents.http import CloudEvent - -import _ert_com_protocol from ert.cli import MODULE_MODE from ert.config import HookRuntime from ert.enkf_main import EnKFMain @@ -33,7 +27,6 @@ from ert.storage import StorageAccessor event_logger = logging.getLogger("ert.event_log") -experiment_logger = logging.getLogger("ert.experiment_server.base_run_model") if TYPE_CHECKING: from ert.config import QueueConfig @@ -108,8 +101,6 @@ def __init__( self._simulation_arguments = simulation_arguments self._experiment_id = experiment_id self.reset() - # experiment-server - self._state_machine = _ert_com_protocol.ExperimentStateMachine() # mapping from iteration number to ensemble id self._iter_map: Dict[int, str] = {} self.validate() @@ -402,101 +393,10 @@ def _build_ensemble( builder.add_realization(real) return builder.set_id(str(uuid.uuid1()).split("-", maxsplit=1)[0]).build() - async def _evaluate( - self, run_context: RunContext, ee_config: EvaluatorServerConfig - ) -> None: - """Start asynchronous evaluation of an ensemble.""" - experiment_logger.debug("_evaluate") - loop = asyncio.get_running_loop() - experiment_logger.debug("building...") - ensemble = self._build_ensemble(run_context) - self._iter_map[run_context.iteration] = ensemble.id_ - experiment_logger.debug("built") - - ensemble_listener = asyncio.create_task( - self._ensemble_listener(ensemble, iter_=run_context.iteration) - ) - - with concurrent.futures.ThreadPoolExecutor() as pool: - await ensemble.evaluate_async(ee_config, str(self.id)) - - await ensemble_listener - - for iens, run_arg in enumerate(run_context): - if run_context.is_active(iens) and run_arg.run_status in ( - RunStatus.JOB_LOAD_FAILURE, - RunStatus.JOB_RUN_FAILURE, - ): - run_context.deactivate_realization(iens) - - await loop.run_in_executor( - pool, - run_context.sim_fs.sync, - ) - - @abstractmethod - async def run(self, evaluator_server_config: EvaluatorServerConfig) -> None: - raise NotImplementedError() - - async def successful_realizations(self, iter_: int) -> int: - return self._state_machine.successful_realizations(self._iter_map[iter_]) - - async def _run_hook( - self, - hook: HookRuntime, - iter_: int, - loop: asyncio.AbstractEventLoop, - executor: concurrent.futures.Executor, - ) -> None: - event = _ert_com_protocol.node_status_builder( - status="EXPERIMENT_HOOK_STARTED", experiment_id=str(self.id) - ) - event.experiment.message = str(hook) - await self.dispatch(event) - - await loop.run_in_executor( - executor, - self.ert().runWorkflows, - hook, - ) - - event = _ert_com_protocol.node_status_builder( - status="EXPERIMENT_HOOK_ENDED", experiment_id=str(self.id) - ) - event.experiment.message = str(hook) - await self.dispatch(event) - @property def id(self) -> uuid.UUID: return self._experiment_id - async def _ensemble_listener(self, ensemble: Ensemble, iter_: int) -> None: - """Redirect events emitted by the ensemble to this experiment.""" - while True: - event: _ert_com_protocol.DispatcherMessage = await ensemble.output_bus.get() - await self.dispatch(event) - if event.WhichOneof("object") == "ensemble" and event.ensemble.status in ( - _ert_com_protocol.ENSEMBLE_FAILED, - _ert_com_protocol.ENSEMBLE_CANCELLED, - _ert_com_protocol.ENSEMBLE_STOPPED, - ): - break - - @singledispatchmethod - async def dispatch( - self, - event: Union[CloudEvent, _ert_com_protocol.DispatcherMessage], - ) -> None: - ... - - @dispatch.register - async def _(self, event: CloudEvent) -> None: - event_logger.debug(f"dispatch cloudevent: {event} (experiment: {self.id})") - - @dispatch.register - async def _(self, event: _ert_com_protocol.DispatcherMessage) -> None: - await self._state_machine.update(event) - def check_if_runpath_exists(self) -> bool: """ Determine if the run_path exists by checking if it contains diff --git a/src/ert/run_models/ensemble_experiment.py b/src/ert/run_models/ensemble_experiment.py index 20b5d5632d9..e5957b868d7 100644 --- a/src/ert/run_models/ensemble_experiment.py +++ b/src/ert/run_models/ensemble_experiment.py @@ -1,29 +1,21 @@ from __future__ import annotations -import asyncio -import concurrent -import logging from pathlib import Path from typing import TYPE_CHECKING, Any, Dict from uuid import UUID -import _ert_com_protocol -from ert.config import HookRuntime from ert.ensemble_evaluator import EvaluatorServerConfig from ert.realization_state import RealizationState from ert.run_context import RunContext from ert.storage import EnsembleAccessor, StorageAccessor -from .base_run_model import BaseRunModel, ErtRunError +from .base_run_model import BaseRunModel if TYPE_CHECKING: from ert.config import QueueConfig from ert.enkf_main import EnKFMain -experiment_logger = logging.getLogger("ert.experiment_server.ensemble_experiment") - - # pylint: disable=too-many-arguments class EnsembleExperiment(BaseRunModel): def __init__( @@ -36,83 +28,6 @@ def __init__( ): super().__init__(simulation_arguments, ert, storage, queue_config, id_) - async def run(self, evaluator_server_config: EvaluatorServerConfig) -> None: - experiment_logger.debug("starting ensemble experiment") - event = _ert_com_protocol.node_status_builder( - status="EXPERIMENT_STARTED", experiment_id=str(self.id) - ) - await self.dispatch(event) - - current_case = self._simulation_arguments["current_case"] - if isinstance(current_case, UUID): - ensemble = self._storage.get_ensemble(current_case) - else: - ensemble = self._storage.create_ensemble( - self._experiment_id, - name=current_case, - ensemble_size=self._ert.getEnsembleSize(), - ) - - loop = asyncio.get_running_loop() - with concurrent.futures.ThreadPoolExecutor() as executor: - prior_context = await loop.run_in_executor( - executor, - self.ert().ensemble_context, - ensemble, - self._simulation_arguments["active_realizations"], - self._simulation_arguments.get("iter_num", 0), - ) - - def sample_and_create_run_path( - ert: "EnKFMain", run_context: "RunContext" - ) -> None: - ert.sample_prior( - run_context.sim_fs, - run_context.active_realizations, - ) - ert.createRunPath(run_context) - - experiment_logger.debug("creating runpaths") - await loop.run_in_executor( - executor, - sample_and_create_run_path, - self.ert(), - prior_context, - ) - - await self._run_hook( - HookRuntime.PRE_SIMULATION, prior_context.iteration, loop, executor - ) - - experiment_logger.debug("evaluating") - await self._evaluate(prior_context, evaluator_server_config) - - num_successful_realizations = await self.successful_realizations( - prior_context.iteration - ) - - num_successful_realizations += self._simulation_arguments.get( - "prev_successful_realizations", 0 - ) - try: - self.checkHaveSufficientRealizations(num_successful_realizations) - except ErtRunError as e: - event = _ert_com_protocol.node_status_builder( - status="EXPERIMENT_FAILED", experiment_id=str(self._experiment_id) - ) - event.experiment.message = str(e) - await self.dispatch(event) - return - - await self._run_hook( - HookRuntime.POST_SIMULATION, prior_context.iteration, loop, executor - ) - - event = _ert_com_protocol.node_status_builder( - status="EXPERIMENT_SUCCEEDED", experiment_id=str(self.id) - ) - await self.dispatch(event) - def runSimulations__( self, run_msg: str, diff --git a/src/ert/shared/feature_toggling.py b/src/ert/shared/feature_toggling.py index 158abe1a4ff..116d0515b95 100644 --- a/src/ert/shared/feature_toggling.py +++ b/src/ert/shared/feature_toggling.py @@ -21,13 +21,6 @@ class FeatureToggling: "Thank you for testing our new features." ), ), - "experiment-server": _Feature( - default_enabled=False, - msg=( - "The experiment server is a step towards becoming Cloud Native. " - "Thanks for testing." - ), - ), } _conf = deepcopy(_conf_original) diff --git a/tests/unit_tests/cli/test_integration_cli.py b/tests/unit_tests/cli/test_integration_cli.py index bdb63d6647d..66025610e6d 100644 --- a/tests/unit_tests/cli/test_integration_cli.py +++ b/tests/unit_tests/cli/test_integration_cli.py @@ -1,4 +1,3 @@ -import asyncio import fileinput import json import logging @@ -369,39 +368,6 @@ def _run(target): assert not np.isclose(result_1.loc["iter-1"], result_2.loc["iter-1"]).all() -@pytest.mark.integration_test -@pytest.mark.timeout(40) -def test_experiment_server_ensemble_experiment(tmpdir, source_root, capsys): - shutil.copytree( - os.path.join(source_root, "test-data", "poly_example"), - os.path.join(str(tmpdir), "poly_example"), - ) - - with tmpdir.as_cwd(): - parser = ArgumentParser(prog="test_main") - parsed = ert_parser( - parser, - [ - ENSEMBLE_EXPERIMENT_MODE, - "poly_example/poly.ert", - "--port-range", - "1024-65535", - "--enable-experiment-server", - "--realizations", - "0-4", - ], - ) - - FeatureToggling.update_from_args(parsed) - run_cli(parsed) - captured = capsys.readouterr() - with pytest.raises(RuntimeError): - asyncio.get_running_loop() - assert "Successful realizations: 5\n" in captured.out - - FeatureToggling.reset() - - @pytest.mark.filterwarnings("ignore::ert.config.ConfigWarning") def test_bad_config_error_message(tmp_path): (tmp_path / "test.ert").write_text("NUM_REL 10\n") diff --git a/tests/unit_tests/experiment_server/__init__.py b/tests/unit_tests/experiment_server/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/unit_tests/experiment_server/conftest.py b/tests/unit_tests/experiment_server/conftest.py deleted file mode 100644 index da691b9db39..00000000000 --- a/tests/unit_tests/experiment_server/conftest.py +++ /dev/null @@ -1,23 +0,0 @@ -from contextlib import asynccontextmanager -from typing import AsyncGenerator - -import pytest - -import ert.experiment_server -from ert.ensemble_evaluator.config import EvaluatorServerConfig - - -@pytest.fixture -@asynccontextmanager -async def experiment_server_ctx() -> AsyncGenerator[ - ert.experiment_server.ExperimentServer, None -]: - config = EvaluatorServerConfig( - custom_port_range=range(1024, 65535), - custom_host="127.0.0.1", - use_token=False, - generate_cert=False, - ) - server = ert.experiment_server.ExperimentServer(config) - yield server - await server.stop() diff --git a/tests/unit_tests/experiment_server/test_server.py b/tests/unit_tests/experiment_server/test_server.py deleted file mode 100644 index 46cb46582d0..00000000000 --- a/tests/unit_tests/experiment_server/test_server.py +++ /dev/null @@ -1,65 +0,0 @@ -from typing import AsyncContextManager -from unittest.mock import AsyncMock - -import pytest -from cloudevents.conversion import to_json -from cloudevents.http import CloudEvent -from websockets.client import connect - -import ert.experiment_server -from ert.experiment_server._experiment_protocol import Experiment - -# All test coroutines will be treated as marked. -pytestmark = pytest.mark.asyncio - - -async def test_receiving_event_from_cluster( - experiment_server_ctx: AsyncContextManager[ert.experiment_server.ExperimentServer], -): - async with experiment_server_ctx as experiment_server: - experiment = AsyncMock(Experiment) - experiment_server.add_experiment(experiment) - - async for dispatcher in connect( - experiment_server._config.dispatch_uri, open_timeout=None - ): - event = CloudEvent( - { - "type": "test.event", - "source": "test_receiving_event_from_cluster", - } - ) - await dispatcher.send(to_json(event).decode()) - break - - experiment.dispatch.assert_awaited_once_with(event) - - -async def test_successful_run( - experiment_server_ctx: AsyncContextManager[ert.experiment_server.ExperimentServer], - capsys, -): - async with experiment_server_ctx as experiment_server: - experiment = AsyncMock(Experiment) - experiment.successful_realizations.return_value = 5 - id_ = experiment_server.add_experiment(experiment) - await experiment_server.run_experiment(id_) - - captured = capsys.readouterr() - assert captured.out == "Successful realizations: 5\n" - - -async def test_failed_run( - experiment_server_ctx: AsyncContextManager[ert.experiment_server.ExperimentServer], - capsys, -): - async with experiment_server_ctx as experiment_server: - experiment = AsyncMock(Experiment) - experiment.run.side_effect = RuntimeError("boom") - id_ = experiment_server.add_experiment(experiment) - - with pytest.raises(RuntimeError, match="boom"): - await experiment_server.run_experiment(id_) - - captured = capsys.readouterr() - assert captured.out == "Experiment failed: boom\n" diff --git a/tests/unit_tests/experiment_server/test_statemachine.py b/tests/unit_tests/experiment_server/test_statemachine.py deleted file mode 100644 index db93455f8fe..00000000000 --- a/tests/unit_tests/experiment_server/test_statemachine.py +++ /dev/null @@ -1,147 +0,0 @@ -import pytest - -import _ert_com_protocol - -# All test coroutines will be treated as marked. -pytestmark = pytest.mark.asyncio - - -async def test_update_job(): - state_machine = _ert_com_protocol.ExperimentStateMachine() - id_job = _ert_com_protocol.JobId( - index=0, - step=_ert_com_protocol.StepId( - step=0, - realization=_ert_com_protocol.RealizationId( - realization=0, - ensemble=_ert_com_protocol.EnsembleId( - id="ee_id", - experiment=_ert_com_protocol.ExperimentId(id="experiment_id"), - ), - ), - ), - ) - - pjob = _ert_com_protocol.Job(id=id_job) - pjob.status = _ert_com_protocol.JOB_START - pjob.name = "job1" - await state_machine.update(pjob) - assert "ee_id" in state_machine.state.ensembles - assert ( - state_machine.state.ensembles["ee_id"].realizations[0].steps[0].jobs[0].status - == _ert_com_protocol.JOB_START - ) - assert ( - state_machine.state.ensembles["ee_id"].realizations[0].steps[0].jobs[0].name - == "job1" - ) - pjob.status = _ert_com_protocol.JOB_FAILURE - pjob.error = "Failed dramatically" - await state_machine.update(pjob) - assert ( - state_machine.state.ensembles["ee_id"].realizations[0].steps[0].jobs[0].status - == _ert_com_protocol.JOB_FAILURE - ) - assert ( - state_machine.state.ensembles["ee_id"].realizations[0].steps[0].jobs[0].error - == "Failed dramatically" - ) - - -async def test_update_job_different_experiments(): - state_machine = _ert_com_protocol.ExperimentStateMachine() - id_job = _ert_com_protocol.JobId( - index=0, - step=_ert_com_protocol.StepId( - step=0, - realization=_ert_com_protocol.RealizationId( - realization=0, - ensemble=_ert_com_protocol.EnsembleId( - id="ee_id", - experiment=_ert_com_protocol.ExperimentId(id="experiment_id"), - ), - ), - ), - ) - - pjob = _ert_com_protocol.Job(id=id_job) - pjob.status = _ert_com_protocol.JOB_START - pjob.name = "job1" - await state_machine.update(pjob) - assert "ee_id" in state_machine.state.ensembles - assert ( - state_machine.state.ensembles["ee_id"].realizations[0].steps[0].jobs[0].status - == _ert_com_protocol.JOB_START - ) - - id_job_2 = _ert_com_protocol.JobId( - index=0, - step=_ert_com_protocol.StepId( - step=0, - realization=_ert_com_protocol.RealizationId( - realization=0, - ensemble=_ert_com_protocol.EnsembleId( - id="ee_id", - experiment=_ert_com_protocol.ExperimentId(id="another_id"), - ), - ), - ), - ) - pjob = _ert_com_protocol.Job(id=id_job_2) - pjob.status = _ert_com_protocol.JOB_START - with pytest.raises(_ert_com_protocol.ExperimentStateMachine.IllegalStateUpdate): - await state_machine.update(pjob) - - -@pytest.mark.parametrize( - "id_args, expected_type", - [ - (["EXPERIMENT_STARTED", "exid", None, None, None, None], "experiment"), - (["ENSEMBLE_STARTED", "exid", "ensid", None, None, None], "ensemble"), - (["STEP_WAITING", "exid", "ensid", 0, None, None], "realization"), - (["STEP_WAITING", "exid", "ensid", 0, 0, None], "step"), - (["JOB_START", "exid", "ensid", 0, 0, 0], "job"), - ], -) -async def test_node_builder(id_args, expected_type): - msg: _ert_com_protocol.DispatcherMessage = _ert_com_protocol.node_status_builder( - status=id_args[0], - experiment_id=id_args[1], - ensemble_id=id_args[2], - realization_id=id_args[3], - step_id=id_args[4], - job_id=id_args[5], - ) - assert msg.WhichOneof("object") == expected_type - - -async def test_node_builder_wrong_state(): - with pytest.raises(ValueError): - _ert_com_protocol.node_status_builder( - status="SOME_WEIRD_STATUS", experiment_id="exp_id" - ) - - -async def test_get_successful_realizations(): - state_machine = _ert_com_protocol.ExperimentStateMachine() - for real_id in range(5): - job_node: _ert_com_protocol.Job = _ert_com_protocol.node_status_builder( - status="JOB_START", - experiment_id="exp_id", - ensemble_id="ens_id", - realization_id=real_id, - step_id=0, - job_id=0, - ).job - await state_machine.update(job_node) - assert state_machine.successful_realizations("ens_id") == 0 - - step_node: _ert_com_protocol.Step = _ert_com_protocol.node_status_builder( - status="STEP_SUCCESS", - experiment_id="exp_id", - ensemble_id="ens_id", - realization_id=3, - step_id=0, - ).step - await state_machine.update(step_node) - assert state_machine.successful_realizations("ens_id") == 1 diff --git a/tests/unit_tests/job_runner/test_protobuf_reporter.py b/tests/unit_tests/job_runner/test_protobuf_reporter.py deleted file mode 100644 index ab2b95408b2..00000000000 --- a/tests/unit_tests/job_runner/test_protobuf_reporter.py +++ /dev/null @@ -1,243 +0,0 @@ -import os - -import pytest - -from _ert_com_protocol import ( - JOB_FAILURE, - JOB_RUNNING, - JOB_START, - JOB_SUCCESS, - DispatcherMessage, -) -from _ert_job_runner.job import Job -from _ert_job_runner.reporting import Protobuf -from _ert_job_runner.reporting.message import Exited, Finish, Init, Running, Start -from _ert_job_runner.reporting.statemachine import TransitionError -from tests.utils import _mock_ws_thread - - -def test_report_with_successful_start_message_argument(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - experiment_id="experiment_id", - ens_id="ens_id", - real_id=0, - step_id=0, - ) - ) - reporter.report(Start(job1)) - reporter.report(Finish()) - - assert len(lines) == 1 - event = DispatcherMessage() - event.ParseFromString(lines[0]) - assert event.WhichOneof("object") == "job" - # pylint: disable=no-member - assert event.job.status == JOB_START - assert event.job.id.index == 0 - assert event.job.id.step.step == 0 - assert event.job.id.step.realization.realization == 0 - assert event.job.id.step.realization.ensemble.id == "ens_id" - assert event.job.id.step.realization.ensemble.experiment.id == "experiment_id" - assert os.path.basename(event.job.stdout) == "stdout" - assert os.path.basename(event.job.stderr) == "stderr" - - -def test_report_with_failed_start_message_argument(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - ens_id="ens_id", - real_id=0, - step_id=0, - experiment_id="experiment_id", - ) - ) - - msg = Start(job1).with_error("massive_failure") - - reporter.report(msg) - reporter.report(Finish()) - - assert len(lines) == 2 - event = DispatcherMessage() - event.ParseFromString(lines[1]) - # pylint: disable=no-member - assert event.job.status == JOB_FAILURE - assert event.job.error == "massive_failure" - - -def test_report_with_successful_exit_message_argument(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - ens_id="ens_id", - real_id=0, - step_id=0, - experiment_id="experiment_id", - ) - ) - reporter.report(Exited(job1, 0)) - reporter.report(Finish().with_error("failed")) - - assert len(lines) == 1 - event = DispatcherMessage() - event.ParseFromString(lines[0]) - assert event.WhichOneof("object") == "job" - # pylint: disable=no-member - assert event.job.status == JOB_SUCCESS - - -def test_report_with_failed_exit_message_argument(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - ens_id="ens_id", - real_id=0, - step_id=0, - experiment_id="experiment_id", - ) - ) - reporter.report(Exited(job1, 1).with_error("massive_failure")) - reporter.report(Finish()) - - assert len(lines) == 1 - event = DispatcherMessage() - event.ParseFromString(lines[0]) - assert event.WhichOneof("object") == "job" - # pylint: disable=no-member - assert event.job.status == JOB_FAILURE - assert event.job.error == "massive_failure" - assert event.job.exit_code == 1 - - -def test_report_with_running_message_argument(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - ens_id="ens_id", - real_id=0, - step_id=0, - experiment_id="experiment_id", - ) - ) - reporter.report(Running(job1, 100, 10)) - reporter.report(Finish()) - - assert len(lines) == 1 - event = DispatcherMessage() - event.ParseFromString(lines[0]) - assert event.WhichOneof("object") == "job" - # pylint: disable=no-member - assert event.job.status == JOB_RUNNING - assert event.job.max_memory == 100 - assert event.job.current_memory == 10 - - -def test_report_only_job_running_for_successful_run(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - ens_id="ens_id", - real_id=0, - step_id=0, - experiment_id="experiment_id", - ) - ) - reporter.report(Running(job1, 100, 10)) - reporter.report(Finish()) - - assert len(lines) == 1 - - -def test_report_with_failed_finish_message_argument(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - job1 = Job({"name": "job1", "stdout": "stdout", "stderr": "stderr"}, 0) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report( - Init( - [job1], - 1, - 19, - ens_id="ens_id", - real_id=0, - step_id=0, - experiment_id="experiment_id", - ) - ) - reporter.report(Running(job1, 100, 10)) - reporter.report(Finish().with_error("massive_failure")) - - assert len(lines) == 1 - - -def test_report_inconsistent_events(unused_tcp_port): - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Protobuf(experiment_url=url) - - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines), pytest.raises( - TransitionError, - match=r"Illegal transition None -> \(MessageType,\)", - ): - reporter.report(Finish())