Skip to content

Commit

Permalink
Add client side for experiment-server
Browse files Browse the repository at this point in the history
  • Loading branch information
sondreso committed Sep 5, 2024
1 parent 2d21583 commit 7d2c786
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 47 deletions.
4 changes: 3 additions & 1 deletion src/ert/ensemble_evaluator/event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Dict, Optional
from typing import Dict, Literal, Optional

from .snapshot import Snapshot

Expand All @@ -17,11 +17,13 @@ class _UpdateEvent:

@dataclass
class FullSnapshotEvent(_UpdateEvent):
event_type: Literal['FullSnapshotEvent']
snapshot: Optional[Snapshot] = None


@dataclass
class SnapshotUpdateEvent(_UpdateEvent):
event_type: Literal['SnapshotUpdateEvent']
snapshot: Optional[Snapshot] = None


Expand Down
2 changes: 1 addition & 1 deletion src/ert/gui/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def _setup_main_window(
SummaryPanel(config),
area=Qt.DockWidgetArea.BottomDockWidgetArea,
)
window.addTool(PlotTool(config_file, window))
window.addTool(PlotTool(window))
window.addTool(ExportTool(config, window.notifier))
window.addTool(WorkflowsTool(config, window.notifier))
window.addTool(
Expand Down
89 changes: 89 additions & 0 deletions src/ert/gui/simulation/event_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations

import logging
from contextlib import suppress
from queue import Empty
from time import sleep
from typing import Optional

from qtpy.QtCore import QObject, Signal, Slot

from ert.ensemble_evaluator import (
EndEvent,
FullSnapshotEvent,
SnapshotUpdateEvent,
Snapshot
)
from ert.gui.model.snapshot import SnapshotModel
from ert.run_models import StatusEvents

from websockets.sync.client import connect

import json

logger = logging.getLogger(__name__)

from pydantic import BaseModel, ConfigDict

class EventWrapper(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
event: StatusEvents

class EventFetcher(QObject):
"""A worker that emits items put on a queue to qt subscribers."""

new_event = Signal(object)
done = Signal()

def __init__(
self,
experiment_id: str,
parent: Optional[QObject] = None,
):
super().__init__(parent)
logger.debug("init EventFetcher")
self._experiment_id = experiment_id
self._stopped = False

@Slot()
def consume_and_emit(self) -> None:
logger.debug("tracking...")
with connect(f"ws://localhost:8000/experiments/{self._experiment_id}/events") as websocket:
while True:
try:
message = websocket.recv(timeout=1.0)
except TimeoutError:
message = None
if self._stopped:
logger.debug("stopped")
break
if message is None:
sleep(0.1)
continue

event_dict = json.loads(message)
if "snapshot" in event_dict:
event_dict["snapshot"] = Snapshot.from_nested_dict(event_dict["snapshot"])
event_wrapper = EventWrapper(**event_dict)
event = event_wrapper.event
# pre-rendering in this thread to avoid work in main rendering thread
if (
isinstance(event, (FullSnapshotEvent, SnapshotUpdateEvent))
and event.snapshot
):
SnapshotModel.prerender(event.snapshot)

logger.debug(f"emit {event}")
self.new_event.emit(event)

if isinstance(event, EndEvent):
logger.debug("got end event")
break

self.done.emit()
logger.debug("tracking done.")

@Slot()
def stop(self) -> None:
logger.debug("stopping...")
self._stopped = True
18 changes: 9 additions & 9 deletions src/ert/gui/simulation/experiment_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,12 @@ def getExperimentName(self) -> str:
def run_experiment(self) -> None:
args = self.get_experiment_arguments()
QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
event_queue: SimpleQueue[StatusEvents] = SimpleQueue()
try:
model = create_model(
self.config,
self._notifier.storage,
args,
event_queue,
SimpleQueue(),
)

except ValueError as e:
Expand Down Expand Up @@ -262,22 +261,23 @@ def run_experiment(self) -> None:
return
QApplication.restoreOverrideCursor()


# TODO: Insert experiment ID
experiment_id = "test"
dialog = RunDialog(
self._config_file,
model,
event_queue,
experiment_id,
self._notifier,
self.parent(), # type: ignore
output_path=self.config.analysis_config.log_path,
)
self.run_button.setEnabled(False)
self.run_button.setText(EXPERIMENT_IS_RUNNING_BUTTON_MESSAGE)
# self.run_button.setEnabled(False)
# self.run_button.setText(EXPERIMENT_IS_RUNNING_BUTTON_MESSAGE)
dialog.run_experiment()
dialog.show()

def exit_handler() -> None:
self.run_button.setText(EXPERIMENT_READY_TO_RUN_BUTTON_MESSAGE)
self.run_button.setEnabled(True)
# self.run_button.setText(EXPERIMENT_READY_TO_RUN_BUTTON_MESSAGE)
# self.run_button.setEnabled(True)
self.toggleExperimentType()
self._notifier.emitErtChange()

Expand Down
51 changes: 20 additions & 31 deletions src/ert/gui/simulation/run_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import logging
from pathlib import Path
from queue import SimpleQueue
from typing import Optional

from qtpy.QtCore import QModelIndex, QSize, Qt, QThread, QTimer, Signal, Slot
Expand Down Expand Up @@ -55,7 +54,6 @@
from ert.gui.tools.file import FileDialog
from ert.gui.tools.plot.plot_tool import PlotTool
from ert.run_models import (
BaseRunModel,
RunModelStatusEvent,
RunModelTimeEvent,
RunModelUpdateBeginEvent,
Expand All @@ -70,7 +68,7 @@
)

from ..find_ert_info import find_ert_info
from .queue_emitter import QueueEmitter
from .event_fetcher import EventFetcher
from .view import ProgressWidget, RealizationWidget, UpdateWidget

_TOTAL_PROGRESS_TEMPLATE = "Total progress {total_progress}% — {iteration_label}"
Expand Down Expand Up @@ -174,23 +172,23 @@ class RunDialog(QDialog):

def __init__(
self,
config_file: str,
run_model: BaseRunModel,
event_queue: SimpleQueue[StatusEvents],
experiment_id: str,
# config_file: str,
# run_model: BaseRunModel,
# event_queue: SimpleQueue[StatusEvents],
notifier: ErtNotifier,
parent: Optional[QWidget] = None,
output_path: Optional[Path] = None,
):
QDialog.__init__(self, parent)
self._experiment_id = experiment_id
self.output_path = output_path
self.setAttribute(Qt.WidgetAttribute.WA_DeleteOnClose)
self.setWindowFlags(Qt.WindowType.Window)
self.setWindowFlags(self.windowFlags() & ~Qt.WindowContextHelpButtonHint) # type: ignore
self.setWindowTitle(f"Experiment - {config_file} {find_ert_info()}")
self.setWindowTitle(f"Experiment - {self._experiment_id} {find_ert_info()}")

self._snapshot_model = SnapshotModel(self)
self._run_model = run_model
self._event_queue = event_queue
self._notifier = notifier

self._minimum_width = 1200
Expand Down Expand Up @@ -224,7 +222,7 @@ def __init__(
self.running_time = QLabel("")
self.memory_usage = QLabel("")

self.plot_tool = PlotTool(config_file, self.parent()) # type: ignore
self.plot_tool = PlotTool(self.parent()) # type: ignore
self.plot_button = QPushButton(self.plot_tool.getName())
self.plot_button.clicked.connect(self.plot_tool.trigger)
self.plot_button.setEnabled(True)
Expand Down Expand Up @@ -345,25 +343,10 @@ def run_experiment(self, restart: bool = False) -> None:
self._snapshot_model.reset()
self._tab_widget.clear()

port_range = None
if self._run_model.queue_system == QueueSystem.LOCAL:
port_range = range(49152, 51819)
evaluator_server_config = EvaluatorServerConfig(custom_port_range=port_range)

def run() -> None:
self._run_model.start_simulations_thread(
evaluator_server_config=evaluator_server_config,
restart=restart,
)

simulation_thread = ErtThread(
name="ert_gui_simulation_thread", target=run, daemon=True
)

self._worker_thread = QThread(parent=self)
self.destroyed.connect(lambda: _stop_worker(self))

self._worker = QueueEmitter(self._event_queue)
self._worker = EventFetcher(experiment_id=self._experiment_id)
self._worker.done.connect(self._worker_thread.quit)
self._worker.new_event.connect(self._on_event)
self._worker.moveToThread(self._worker_thread)
Expand All @@ -374,7 +357,6 @@ def run() -> None:
self._ticker.start(self._RUN_TIME_POLL_RATE)

self._worker_thread.start()
simulation_thread.start()
self._notifier.set_is_simulation_running(True)

def killJobs(self) -> QMessageBox.StandardButton:
Expand All @@ -386,7 +368,8 @@ def killJobs(self) -> QMessageBox.StandardButton:
if kill_job == QMessageBox.Yes:
# Normally this slot would be invoked by the signal/slot system,
# but the worker is busy tracking the evaluation.
self._run_model.cancel()
# self._run_model.cancel()
## TODO: Fix cancellation
self._on_finished()
self.finished.emit(-1)
return kill_job
Expand All @@ -395,8 +378,12 @@ def killJobs(self) -> QMessageBox.StandardButton:
def _on_simulation_done(self, failed: bool, msg: str) -> None:
self.processing_animation.hide()
self.kill_button.setHidden(True)
self.restart_button.setVisible(self._run_model.has_failed_realizations())
self.restart_button.setEnabled(self._run_model.support_restart)
self.restart_button.setVisible(False)
self.restart_button.setEnabled(False)
# self.restart_button.setVisible(self._run_model.has_failed_realizations())
# self.restart_button.setEnabled(self._run_model.support_restart)
# TODO: Restart

self._notifier.set_is_simulation_running(False)
if failed:
self.update_total_progress(1.0, "Failed")
Expand All @@ -407,7 +394,9 @@ def _on_simulation_done(self, failed: bool, msg: str) -> None:

@Slot()
def _on_ticker(self) -> None:
runtime = self._run_model.get_runtime()
runtime = 0
# runtime = self._run_model.get_runtime()
# TODO: This should be based event timestamps
self.running_time.setText(format_running_time(runtime))

maximum_memory_usage = self._snapshot_model.root.max_memory_usage
Expand Down
5 changes: 2 additions & 3 deletions src/ert/gui/tools/plot/plot_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@


class PlotTool(Tool):
def __init__(self, config_file: str, main_window: Optional[QWidget]):
def __init__(self, main_window: Optional[QWidget]):
super().__init__("Create plot", QIcon("img:timeline.svg"))
self._config_file = config_file
self.main_window = main_window

def trigger(self) -> None:
plot_window = PlotWindow(self._config_file, self.main_window)
plot_window = PlotWindow(self.main_window)
plot_window.show()
4 changes: 2 additions & 2 deletions src/ert/gui/tools/plot/plot_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ def open_error_dialog(title: str, content: str) -> None:


class PlotWindow(QMainWindow):
def __init__(self, config_file: str, parent: Optional[QWidget]):
def __init__(self, parent: Optional[QWidget]):
QMainWindow.__init__(self, parent)
t = time.perf_counter()

logger.info("PlotWindow __init__")
self.setMinimumWidth(850)
self.setMinimumHeight(650)
self.setWindowTitle(f"Plotting - {config_file}")
self.setWindowTitle(f"Plotting")
self.activateWindow()
self._preferred_ensemble_x_axis_format = PlotContext.INDEX_AXIS
QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
Expand Down

0 comments on commit 7d2c786

Please sign in to comment.