From e7ee7378921bba2bd0918bbbbd64e4734d00ad18 Mon Sep 17 00:00:00 2001 From: xjules Date: Wed, 20 Nov 2024 15:57:18 +0100 Subject: [PATCH] Fix test event reporter --- src/_ert/forward_model_runner/client.py | 1 + .../forward_model_runner/reporting/event.py | 30 +-- .../test_event_reporter.py | 243 +++++++++--------- tests/ert/utils.py | 40 +-- 4 files changed, 147 insertions(+), 167 deletions(-) diff --git a/src/_ert/forward_model_runner/client.py b/src/_ert/forward_model_runner/client.py index 03d78872044..fed6293d007 100644 --- a/src/_ert/forward_model_runner/client.py +++ b/src/_ert/forward_model_runner/client.py @@ -33,6 +33,7 @@ def __enter__(self) -> Self: def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> None: self.socket.close() self.context.term() + self.loop.close() async def __aenter__(self) -> Self: return self diff --git a/src/_ert/forward_model_runner/reporting/event.py b/src/_ert/forward_model_runner/reporting/event.py index 11ef56374b6..f30621e8081 100644 --- a/src/_ert/forward_model_runner/reporting/event.py +++ b/src/_ert/forward_model_runner/reporting/event.py @@ -2,9 +2,7 @@ import logging import queue -import threading import time -from datetime import datetime, timedelta from pathlib import Path from typing import Final, Union @@ -19,6 +17,7 @@ ) from _ert.forward_model_runner.client import ( Client, + ClientConnectionError, ) from _ert.forward_model_runner.reporting.base import Reporter from _ert.forward_model_runner.reporting.message import ( @@ -77,10 +76,7 @@ def __init__(self, evaluator_url, token=None, cert_path=None): self._real_id = None self._event_queue: queue.Queue[events.Event | EventSentinel] = queue.Queue() self._event_publisher_thread = ErtThread(target=self._event_publisher) - self._timeout_timestamp = None - self._timestamp_lock = threading.Lock() - # seconds to timeout the reporter the thread after Finish() was received - self._reporter_timeout = 60 + self._done = False def _event_publisher(self): logger.debug("Publishing event.") @@ -91,21 +87,12 @@ def _event_publisher(self): ) as client: events = [] last_sent_time = time.time() - while True: - with self._timestamp_lock: - if ( - self._timeout_timestamp is not None - and datetime.now() > self._timeout_timestamp - ): - self._timeout_timestamp = None - break - + while not self._done: try: event = self._event_queue.get() - logger.debug(f"Got event for zmq: {event}") if event is self._sentinel: + self._done = True if events: - logger.debug(f"Got event class for zmq: {events}") client.send(events) events.clear() break @@ -114,12 +101,13 @@ def _event_publisher(self): current_time = time.time() if current_time - last_sent_time >= 2: if events: - logger.debug(f"Got event class for zmq: {events}") client.send(events) events.clear() last_sent_time = current_time - except Exception as e: + except ClientConnectionError as e: logger.error(f"Failed to send event: {e}") + except Exception as e: + logger.error(f"Error while sending event: {e}") raise def report(self, msg): @@ -184,10 +172,6 @@ def _job_handler(self, msg: Union[Start, Running, Exited]): def _finished_handler(self, _): self._event_queue.put(Event._sentinel) - with self._timestamp_lock: - self._timeout_timestamp = datetime.now() + timedelta( - seconds=self._reporter_timeout - ) if self._event_publisher_thread.is_alive(): self._event_publisher_thread.join() diff --git a/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py b/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py index 0575e78b954..6c9be29b0bb 100644 --- a/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py +++ b/tests/ert/unit_tests/forward_model_runner/test_event_reporter.py @@ -1,5 +1,4 @@ import os -import sys import time from unittest.mock import patch @@ -13,7 +12,6 @@ event_from_json, ) from _ert.forward_model_runner.client import ( - ClientConnectionClosedOK, ClientConnectionError, ) from _ert.forward_model_runner.forward_model_step import ForwardModelStep @@ -27,7 +25,7 @@ Start, ) from _ert.forward_model_runner.reporting.statemachine import TransitionError -from tests.ert.utils import _mock_ws_thread +from tests.ert.utils import mock_zmq_thread def _wait_until(condition, timeout, fail_msg): @@ -39,13 +37,13 @@ def _wait_until(condition, timeout, fail_msg): def test_report_with_successful_start_message_argument(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Start(fmstep1)) reporter.report(Finish()) @@ -62,7 +60,7 @@ def test_report_with_successful_start_message_argument(unused_tcp_port): def test_report_with_failed_start_message_argument(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( @@ -70,7 +68,7 @@ def test_report_with_failed_start_message_argument(unused_tcp_port): ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) msg = Start(fmstep1).with_error("massive_failure") @@ -86,14 +84,14 @@ def test_report_with_failed_start_message_argument(unused_tcp_port): def test_report_with_successful_exit_message_argument(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Exited(fmstep1, 0)) reporter.report(Finish().with_error("failed")) @@ -105,14 +103,14 @@ def test_report_with_successful_exit_message_argument(unused_tcp_port): def test_report_with_failed_exit_message_argument(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Exited(fmstep1, 1).with_error("massive_failure")) reporter.report(Finish()) @@ -125,14 +123,14 @@ def test_report_with_failed_exit_message_argument(unused_tcp_port): def test_report_with_running_message_argument(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish()) @@ -146,14 +144,14 @@ def test_report_with_running_message_argument(unused_tcp_port): def test_report_only_job_running_for_successful_run(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish()) @@ -163,14 +161,14 @@ def test_report_only_job_running_for_successful_run(unused_tcp_port): def test_report_with_failed_finish_message_argument(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Finish().with_error("massive_failure")) @@ -180,12 +178,12 @@ def test_report_with_failed_finish_message_argument(unused_tcp_port): def test_report_inconsistent_events(unused_tcp_port): host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) lines = [] with ( - _mock_ws_thread(host, unused_tcp_port, lines), + mock_zmq_thread(host, unused_tcp_port, lines), pytest.raises( TransitionError, match=r"Illegal transition None -> \(MessageType,\)", @@ -209,14 +207,14 @@ def mock_send(msg): raise ClientConnectionError("Sending failed!") host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" + url = f"tcp://{host}:{unused_tcp_port}" reporter = Event(evaluator_url=url) reporter._reporter_timeout = 4 fmstep1 = ForwardModelStep( {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 ) lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): + with mock_zmq_thread(host, unused_tcp_port, lines): with patch( "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) ): @@ -224,112 +222,107 @@ def mock_send(msg): reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10))) reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=1100, rss=10))) - # set _stop_timestamp reporter.report(Finish()) if reporter._event_publisher_thread.is_alive(): reporter._event_publisher_thread.join() - # set _stop_timestamp to None only when timer stopped - assert reporter._timeout_timestamp is None + assert reporter._done assert len(lines) == 0, "expected 0 Job running messages" - -@pytest.mark.integration_test -@pytest.mark.flaky(reruns=5) -@pytest.mark.skipif( - sys.platform.startswith("darwin"), reason="Performance can be flaky" -) -def test_report_with_reconnected_reporter_but_finished_jobs(unused_tcp_port): - # this is to show when the reporter fails but reconnects - # reporter still manages to send events and completes fine - # see assert reporter._timeout_timestamp is not None - # meaning Finish event initiated _timeout but timeout wasn't reached since - # it finished succesfully - mock_send_retry_time = 0.1 - - def send_func(msg): - time.sleep(mock_send_retry_time) - raise ClientConnectionError("Sending failed!") - - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Event(evaluator_url=url) - fmstep1 = ForwardModelStep( - {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 - ) - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - with patch("_ert.forward_model_runner.client.Client.send") as patched_send: - patched_send.side_effect = send_func - - reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10))) - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10))) - - _wait_until( - condition=lambda: patched_send.call_count == 3, - timeout=10, - fail_msg="10 seconds should be sufficient to send three events", - ) - - # reconnect and continue sending events - # set _stop_timestamp - reporter.report(Finish()) - if reporter._event_publisher_thread.is_alive(): - reporter._event_publisher_thread.join() - # set _stop_timestamp was not set to None since the reporter finished on time - assert reporter._timeout_timestamp is not None - assert len(lines) == 3, "expected 3 Job running messages" - - -@pytest.mark.integration_test -def test_report_with_closed_received_exiting_gracefully(unused_tcp_port): - # Whenever the receiver end closes the connection, a ConnectionClosedOK is raised - # The reporter should exit the publisher thread gracefully and not send any - # more events - mock_send_retry_time = 3 - - def mock_send(msg): - time.sleep(mock_send_retry_time) - raise ClientConnectionClosedOK("Connection Closed") - - host = "localhost" - url = f"ws://{host}:{unused_tcp_port}" - reporter = Event(evaluator_url=url) - fmstep1 = ForwardModelStep( - {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 - ) - lines = [] - with _mock_ws_thread(host, unused_tcp_port, lines): - reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10))) - - # sleep until both Running events have been received - _wait_until( - condition=lambda: len(lines) == 2, - timeout=10, - fail_msg="Should not take 10 seconds to send two events", - ) - - with patch( - "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) - ): - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10))) - # Make sure the publisher thread exits because it got - # ClientConnectionClosedOK. If it hangs it could indicate that the - # exception is not caught/handled correctly - if reporter._event_publisher_thread.is_alive(): - reporter._event_publisher_thread.join() - - reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10))) - reporter.report(Finish()) - - # set _stop_timestamp was not set to None since the reporter finished on time - assert reporter._timeout_timestamp is not None - - # The Running(fmstep1, 300, 10) is popped from the queue, but never sent. - # The following Running is added to queue along with the sentinel - assert reporter._event_queue.qsize() == 2 - # None of the messages after ClientConnectionClosedOK was raised, has been sent - assert len(lines) == 2, "expected 2 Job running messages" + # TODO refactor or remove, zmq handles reconnection automatically + # @pytest.mark.integration_test + # @pytest.mark.flaky(reruns=5) + # @pytest.mark.skipif( + # sys.platform.startswith("darwin"), reason="Performance can be flaky" + # ) + # def test_report_with_reconnected_reporter_but_finished_jobs(unused_tcp_port): + # # this is to show when the reporter fails but reconnects + # # reporter still manages to send events and completes fine + # # see assert reporter._timeout_timestamp is not None + # # meaning Finish event initiated _timeout but timeout wasn't reached since + # # it finished succesfully + # mock_send_retry_time = 0.1 + + # def send_func(msg): + # time.sleep(mock_send_retry_time) + # raise ClientConnectionError("Sending failed!") + + # host = "localhost" + # url = f"tcp://{host}:{unused_tcp_port}" + # reporter = Event(evaluator_url=url) + # fmstep1 = ForwardModelStep( + # {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 + # ) + # lines = [] + # with mock_zmq_thread(host, unused_tcp_port, lines): + # with patch("_ert.forward_model_runner.client.Client.send") as patched_send: + # patched_send.side_effect = send_func + + # reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10))) + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10))) + + # _wait_until( + # condition=lambda: patched_send.call_count >= 1, + # timeout=10, + # fail_msg="10 seconds should be sufficient to send three events", + # ) + + # # reconnect and continue sending events + # reporter.report(Finish()) + # if reporter._event_publisher_thread.is_alive(): + # reporter._event_publisher_thread.join() + # assert reporter._done + # assert len(lines) == 3, "expected 3 Job running messages" + + # @pytest.mark.integration_test + # def test_report_with_closed_received_exiting_gracefully(unused_tcp_port): + # # Whenever the receiver end closes the connection, a ConnectionClosedOK is raised + # # The reporter should exit the publisher thread gracefully and not send any + # # more events + # mock_send_retry_time = 3 + + # def mock_send(msg): + # time.sleep(mock_send_retry_time) + # raise ClientConnectionClosedOK("Connection Closed") + + # host = "localhost" + # url = f"tcp://{host}:{unused_tcp_port}" + # reporter = Event(evaluator_url=url) + # fmstep1 = ForwardModelStep( + # {"name": "fmstep1", "stdout": "stdout", "stderr": "stderr"}, 0 + # ) + # lines = [] + # with mock_zmq_thread(host, unused_tcp_port, lines): + # reporter.report(Init([fmstep1], 1, 19, ens_id="ens_id", real_id=0)) + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=100, rss=10))) + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=200, rss=10))) + + # # sleep until both Running events have been received + # _wait_until( + # condition=lambda: len(lines) == 2, + # timeout=10, + # fail_msg="Should not take 10 seconds to send two events", + # ) + + # with patch( + # "_ert.forward_model_runner.client.Client.send", lambda x, y: mock_send(y) + # ): + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=300, rss=10))) + # # Make sure the publisher thread exits because it got + # # ClientConnectionClosedOK. If it hangs it could indicate that the + # # exception is not caught/handled correctly + # if reporter._event_publisher_thread.is_alive(): + # reporter._event_publisher_thread.join() + + # reporter.report(Running(fmstep1, ProcessTreeStatus(max_rss=400, rss=10))) + # reporter.report(Finish()) + + # # set _stop_timestamp was not set to None since the reporter finished on time + # assert reporter._timeout_timestamp is not None + + # # The Running(fmstep1, 300, 10) is popped from the queue, but never sent. + # # The following Running is added to queue along with the sentinel + # assert reporter._event_queue.qsize() == 2 + # # None of the messages after ClientConnectionClosedOK was raised, has been sent + # assert len(lines) == 2, "expected 2 Job running messages" diff --git a/tests/ert/utils.py b/tests/ert/utils.py index 732f816f8cd..f7982b1f01f 100644 --- a/tests/ert/utils.py +++ b/tests/ert/utils.py @@ -7,7 +7,8 @@ from pathlib import Path from typing import TYPE_CHECKING -import websockets.server +import zmq +import zmq.asyncio from _ert.forward_model_runner.client import Client from _ert.threading import ErtThread @@ -61,42 +62,43 @@ def wait_until(func, interval=0.5, timeout=30): ) -def _mock_ws(host, port, messages, delay_startup=0): +def mock_zmq_server(port, messages): loop = asyncio.new_event_loop() - done = loop.create_future() - async def _handler(websocket, path): + async def _handler(router_socket): while True: - msg = await websocket.recv() - messages.append(msg) - if msg == "stop": - done.set_result(None) - break + dealer, __, *frames = await router_socket.recv_multipart() + if dealer.decode("utf-8").startswith("dispatch"): + await router_socket.send_multipart([dealer, b"", b"ACK"]) + for frame in frames: + raw_msg = frame.decode("utf-8") + messages.append(raw_msg) + if raw_msg == "stop": + return async def _run_server(): - await asyncio.sleep(delay_startup) - async with websockets.server.serve(_handler, host, port): - await done + zmq_context = zmq.asyncio.Context() # type: ignore + router_socket = zmq_context.socket(zmq.ROUTER) + router_socket.bind(f"tcp://*:{port}") + await _handler(router_socket) + router_socket.close() loop.run_until_complete(_run_server()) loop.close() @contextlib.contextmanager -def _mock_ws_thread(host, port, messages): +def mock_zmq_thread(host, port, messages): mock_ws_thread = ErtThread( - target=partial(_mock_ws, messages=messages), - args=( - host, - port, - ), + target=partial(mock_zmq_server, messages=messages), + args=(port,), ) mock_ws_thread.start() try: yield # Make sure to join the thread even if an exception occurs finally: - url = f"ws://{host}:{port}" + url = f"tcp://{host}:{port}" with Client(url) as client: client.send("stop") mock_ws_thread.join()