Skip to content

Commit

Permalink
Remove tracker and tracker_worker (TODO cancel)
Browse files Browse the repository at this point in the history
  • Loading branch information
sondreso committed Dec 15, 2023
1 parent 25bd439 commit b6225c8
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 358 deletions.
15 changes: 7 additions & 8 deletions src/ert/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
import threading
import time
from typing import Any, TextIO

from ert.cli import (
Expand All @@ -19,7 +20,7 @@
from ert.cli.workflow import execute_workflow
from ert.config import ErtConfig, QueueSystem
from ert.enkf_main import EnKFMain
from ert.ensemble_evaluator import EvaluatorServerConfig, EvaluatorTracker
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.namespace import Namespace
from ert.storage import open_storage
from ert.storage.local_storage import local_storage_set_ert_config
Expand Down Expand Up @@ -117,10 +118,6 @@ def run_cli(args: Namespace, _: Any = None) -> None:
)
thread.start()

tracker = EvaluatorTracker(
model, ee_con_info=evaluator_server_config.get_connection_info()
)

with contextlib.ExitStack() as exit_stack:
out: TextIO
if args.disable_monitoring:
Expand All @@ -130,12 +127,14 @@ def run_cli(args: Namespace, _: Any = None) -> None:
else:
out = sys.stderr
monitor = Monitor(out=out, color_always=args.color_always)

monitor.start()
model.add_send_event_callback(monitor.on_event)
try:
monitor.monitor(tracker.track())
while not monitor.done:
time.sleep(0.5)
except (SystemExit, KeyboardInterrupt):
print("\nKilling simulations...")
tracker.request_termination()
# tracker.request_termination()

thread.join()
storage.close()
Expand Down
36 changes: 19 additions & 17 deletions src/ert/cli/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import sys
from datetime import datetime, timedelta
from typing import Dict, Iterator, Optional, TextIO, Tuple, Union
from typing import Dict, Optional, TextIO, Tuple, Union

from tqdm import tqdm

Expand Down Expand Up @@ -57,25 +57,27 @@ def __init__(self, out: TextIO = sys.stdout, color_always: bool = False) -> None

# The dot adds no value without color, so remove it.
self.dot = ""
self.done = False

def monitor(
def start(self) -> None:
self._start_time = datetime.now()

def on_event(
self,
events: Iterator[Union[FullSnapshotEvent, SnapshotUpdateEvent, EndEvent]],
event: Union[FullSnapshotEvent, SnapshotUpdateEvent, EndEvent],
) -> None:
self._start_time = datetime.now()
for event in events:
if isinstance(event, FullSnapshotEvent):
if event.snapshot is not None:
self._snapshots[event.iteration] = event.snapshot
self._progress = event.progress
elif isinstance(event, SnapshotUpdateEvent):
if event.partial_snapshot is not None:
self._snapshots[event.iteration].merge_event(event.partial_snapshot)
self._print_progress(event)
if isinstance(event, EndEvent):
self._print_result(event.failed, event.failed_msg)
self._print_job_errors()
return
if isinstance(event, FullSnapshotEvent):
if event.snapshot is not None:
self._snapshots[event.iteration] = event.snapshot
self._progress = event.progress
elif isinstance(event, SnapshotUpdateEvent):
if event.partial_snapshot is not None:
self._snapshots[event.iteration].merge_event(event.partial_snapshot)
self._print_progress(event)
if isinstance(event, EndEvent):
self._print_result(event.failed, event.failed_msg)
self._print_job_errors()
self.done = True

def _print_job_errors(self) -> None:
failed_jobs: Dict[Optional[str], int] = {}
Expand Down
2 changes: 0 additions & 2 deletions src/ert/ensemble_evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
)
from .config import EvaluatorServerConfig
from .evaluator import EnsembleEvaluator
from .evaluator_tracker import EvaluatorTracker
from .event import EndEvent, FullSnapshotEvent, SnapshotUpdateEvent
from .monitor import Monitor
from .snapshot import PartialSnapshot, Snapshot
Expand All @@ -18,7 +17,6 @@
"EnsembleBuilder",
"EnsembleEvaluator",
"EvaluatorServerConfig",
"EvaluatorTracker",
"ForwardModel",
"FullSnapshotEvent",
"Monitor",
Expand Down
9 changes: 5 additions & 4 deletions src/ert/ensemble_evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,10 @@ def _run_server(self, loop: asyncio.AbstractEventLoop) -> None:
loop.run_until_complete(self.evaluator_server())
logger.debug("Server thread exiting.")

def _start_running(self) -> None:
def start_running(self) -> None:
self._ws_thread.start()
self._ensemble.evaluate(self._config)
logger.debug("Started evaluator, joining until shutdown")

def _stop(self) -> None:
if not self._done.done():
Expand All @@ -417,11 +418,11 @@ def _signal_cancel(self) -> None:
logger.debug("Stopping current ensemble")
self._loop.call_soon_threadsafe(self._stop)

def run_and_get_successful_realizations(self) -> List[int]:
self._start_running()
logger.debug("Started evaluator, joining until shutdown")
def join(self) -> List[int]:
self._ws_thread.join()
logger.debug("Evaluator is done")

def get_successful_realizations(self) -> List[int]:
return self._ensemble.get_successful_realizations()

@staticmethod
Expand Down
240 changes: 0 additions & 240 deletions src/ert/ensemble_evaluator/evaluator_tracker.py

This file was deleted.

Loading

0 comments on commit b6225c8

Please sign in to comment.