From f6a4e681ce1333de4e39401b395e3b9bd3047186 Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Wed, 30 Oct 2024 10:43:02 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fixed=20issue=20with=20accumulat?= =?UTF-8?q?ing=20tracked=20services=20(#6631)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrei Neagu --- services/autoscaling/requirements/_base.txt | 2 +- services/catalog/requirements/_base.txt | 2 +- .../clusters-keeper/requirements/_base.txt | 2 +- services/dask-sidecar/requirements/_base.txt | 2 +- .../requirements/_dask-distributed.txt | 2 +- services/director-v2/requirements/_base.txt | 2 +- services/director-v2/requirements/_test.txt | 2 +- .../dynamic-scheduler/requirements/_base.in | 1 + .../dynamic-scheduler/requirements/_base.txt | 2 + .../services/service_tracker/_models.py | 107 ++++++++++-------- .../services/status_monitor/_monitor.py | 38 ++++++- .../assets/legacy_tracked_service_model.bin | Bin 0 -> 232 bytes .../unit/service_tracker/test__models.py | 67 ++++++++++- .../test_services_status_monitor__monitor.py | 81 ++++++++++++- .../requirements/_test.txt | 2 +- .../tests/system/requirements/_test.txt | 2 +- services/web/server/requirements/_base.txt | 2 +- 17 files changed, 251 insertions(+), 65 deletions(-) create mode 100644 services/dynamic-scheduler/tests/assets/legacy_tracked_service_model.bin diff --git a/services/autoscaling/requirements/_base.txt b/services/autoscaling/requirements/_base.txt index 835f66c0365..a58d343c4fe 100644 --- a/services/autoscaling/requirements/_base.txt +++ b/services/autoscaling/requirements/_base.txt @@ -240,7 +240,7 @@ markupsafe==2.1.5 # jinja2 mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.8 +msgpack==1.1.0 # via # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # distributed diff --git a/services/catalog/requirements/_base.txt b/services/catalog/requirements/_base.txt index d78334d994c..a1d4ec9de10 100644 --- a/services/catalog/requirements/_base.txt +++ b/services/catalog/requirements/_base.txt @@ -174,7 +174,7 @@ markupsafe==2.1.5 # mako mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.8 +msgpack==1.1.0 # via aiocache multidict==6.0.5 # via diff --git a/services/clusters-keeper/requirements/_base.txt b/services/clusters-keeper/requirements/_base.txt index 1e3817a64f5..38f29931595 100644 --- a/services/clusters-keeper/requirements/_base.txt +++ b/services/clusters-keeper/requirements/_base.txt @@ -238,7 +238,7 @@ markupsafe==2.1.5 # jinja2 mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.8 +msgpack==1.1.0 # via # -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # distributed diff --git a/services/dask-sidecar/requirements/_base.txt b/services/dask-sidecar/requirements/_base.txt index 0e7f75213f9..8cbd9e4a15a 100644 --- a/services/dask-sidecar/requirements/_base.txt +++ b/services/dask-sidecar/requirements/_base.txt @@ -170,7 +170,7 @@ markupsafe==2.1.5 # via jinja2 mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.8 +msgpack==1.1.0 # via distributed multidict==6.0.5 # via diff --git a/services/dask-sidecar/requirements/_dask-distributed.txt b/services/dask-sidecar/requirements/_dask-distributed.txt index e9ebbb2a0f5..e1b822b67bb 100644 --- a/services/dask-sidecar/requirements/_dask-distributed.txt +++ b/services/dask-sidecar/requirements/_dask-distributed.txt @@ -46,7 +46,7 @@ markupsafe==2.1.5 # via # -c requirements/./_base.txt # jinja2 -msgpack==1.0.8 +msgpack==1.1.0 # via # -c requirements/./_base.txt # distributed diff --git a/services/director-v2/requirements/_base.txt b/services/director-v2/requirements/_base.txt index 42c06dd93de..7e548a9fdcb 100644 --- a/services/director-v2/requirements/_base.txt +++ b/services/director-v2/requirements/_base.txt @@ -314,7 +314,7 @@ markupsafe==2.1.5 # mako mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.8 +msgpack==1.1.0 # via # -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt # aiocache diff --git a/services/director-v2/requirements/_test.txt b/services/director-v2/requirements/_test.txt index 0b0bcda2630..ccfb429b50f 100644 --- a/services/director-v2/requirements/_test.txt +++ b/services/director-v2/requirements/_test.txt @@ -171,7 +171,7 @@ markupsafe==2.1.5 # -c requirements/_base.txt # jinja2 # mako -msgpack==1.0.8 +msgpack==1.1.0 # via # -c requirements/_base.txt # distributed diff --git a/services/dynamic-scheduler/requirements/_base.in b/services/dynamic-scheduler/requirements/_base.in index ab95aec0daa..4ce10d7aa02 100644 --- a/services/dynamic-scheduler/requirements/_base.in +++ b/services/dynamic-scheduler/requirements/_base.in @@ -20,4 +20,5 @@ httpx packaging python-socketio typer[all] +u-msgpack-python uvicorn[standard] diff --git a/services/dynamic-scheduler/requirements/_base.txt b/services/dynamic-scheduler/requirements/_base.txt index 2615f16bd15..30f09ba7c89 100644 --- a/services/dynamic-scheduler/requirements/_base.txt +++ b/services/dynamic-scheduler/requirements/_base.txt @@ -368,6 +368,8 @@ typing-extensions==4.10.0 # opentelemetry-sdk # pydantic # typer +u-msgpack-python==2.8.0 + # via -r requirements/_base.in urllib3==2.2.2 # via # -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py index 985ca8feef5..6b1b5b1a75d 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/service_tracker/_models.py @@ -1,17 +1,39 @@ -import pickle -from dataclasses import dataclass, field from datetime import timedelta +from decimal import Decimal from enum import auto +from typing import Any, Callable, Final +from uuid import UUID import arrow +import umsgpack # type: ignore[import-untyped] from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, ) from models_library.projects import ProjectID from models_library.users import UserID from models_library.utils.enums import StrAutoEnum +from pydantic import BaseModel, Field from servicelib.deferred_tasks import TaskUID +# `umsgpack.Ext`` extension types are part of the msgpack specification +# allows to define serialization and deserialization rules for custom types +# see https://github.com/msgpack/msgpack/blob/master/spec.md#extension-types + +_UUID_TYPE: Final[int] = 0x00 +_DECIMAL_TYPE: Final[int] = 0x01 + +_PACKB_EXTENSION_TYPES: Final[dict[type[Any], Callable[[Any], umsgpack.Ext]]] = { + # helpers to serialize an object to bytes + UUID: lambda obj: umsgpack.Ext(_UUID_TYPE, obj.bytes), + Decimal: lambda obj: umsgpack.Ext(_DECIMAL_TYPE, f"{obj}".encode()), +} + +_UNPACKB_EXTENSION_TYPES: Final[dict[int, Callable[[umsgpack.Ext], Any]]] = { + # helpers to deserialize an object from bytes + _UUID_TYPE: lambda ext: UUID(bytes=ext.data), + _DECIMAL_TYPE: lambda ext: Decimal(ext.data.decode()), +} + class UserRequestedState(StrAutoEnum): RUNNING = auto() @@ -35,74 +57,67 @@ class SchedulerServiceState(StrAutoEnum): UNKNOWN = auto() -@dataclass -class TrackedServiceModel: # pylint:disable=too-many-instance-attributes +class TrackedServiceModel(BaseModel): # pylint:disable=too-many-instance-attributes - dynamic_service_start: DynamicServiceStart | None = field( - metadata={ - "description": ( - "used to create the service in any given moment if the requested_state is RUNNING" - "can be set to None only when stopping the service" - ) - } + dynamic_service_start: DynamicServiceStart | None = Field( + description=( + "used to create the service in any given moment if the requested_state is RUNNING" + "can be set to None only when stopping the service" + ) ) - user_id: UserID | None = field( - metadata={ - "description": "required for propagating status changes to the frontend" - } + user_id: UserID | None = Field( + description="required for propagating status changes to the frontend" ) - project_id: ProjectID | None = field( - metadata={ - "description": "required for propagating status changes to the frontend" - } + project_id: ProjectID | None = Field( + description="required for propagating status changes to the frontend" ) - requested_state: UserRequestedState = field( - metadata={ - "description": ( - "status of the service desidered by the user RUNNING or STOPPED" - ) - } + requested_state: UserRequestedState = Field( + description=("status of the service desidered by the user RUNNING or STOPPED") ) - current_state: SchedulerServiceState = field( + current_state: SchedulerServiceState = Field( default=SchedulerServiceState.UNKNOWN, - metadata={ - "description": "to set after parsing the incoming state via the API calls" - }, + description="to set after parsing the incoming state via the API calls", + ) + + def __setattr__(self, name, value): + if name == "current_state" and value != self.current_state: + self.last_state_change = arrow.utcnow().timestamp() + super().__setattr__(name, value) + + last_state_change: float = Field( + default_factory=lambda: arrow.utcnow().timestamp(), + metadata={"description": "keeps track when the current_state was last updated"}, ) ############################# ### SERVICE STATUS UPDATE ### ############################# - scheduled_to_run: bool = field( + scheduled_to_run: bool = Field( default=False, - metadata={"description": "set when a job will be immediately scheduled"}, + description="set when a job will be immediately scheduled", ) - service_status: str = field( + service_status: str = Field( default="", - metadata={ - "description": "stored for debug mainly this is used to compute ``current_state``" - }, + description="stored for debug mainly this is used to compute ``current_state``", ) - service_status_task_uid: TaskUID | None = field( + service_status_task_uid: TaskUID | None = Field( default=None, - metadata={"description": "uid of the job currently fetching the status"}, + description="uid of the job currently fetching the status", ) - check_status_after: float = field( + check_status_after: float = Field( default_factory=lambda: arrow.utcnow().timestamp(), - metadata={"description": "used to determine when to poll the status again"}, + description="used to determine when to poll the status again", ) - last_status_notification: float = field( + last_status_notification: float = Field( default=0, - metadata={ - "description": "used to determine when was the last time the status was notified" - }, + description="used to determine when was the last time the status was notified", ) def set_check_status_after_to(self, delay_from_now: timedelta) -> None: @@ -116,8 +131,10 @@ def set_last_status_notification_to_now(self) -> None: ##################### def to_bytes(self) -> bytes: - return pickle.dumps(self) + result: bytes = umsgpack.packb(self.dict(), ext_handlers=_PACKB_EXTENSION_TYPES) + return result @classmethod def from_bytes(cls, data: bytes) -> "TrackedServiceModel": - return pickle.loads(data) # type: ignore # noqa: S301 + unpacked_data = umsgpack.unpackb(data, ext_handlers=_UNPACKB_EXTENSION_TYPES) + return cls(**unpacked_data) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py index 0d8b5a2723f..8ba70997a93 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/status_monitor/_monitor.py @@ -23,6 +23,8 @@ _INTERVAL_BETWEEN_CHECKS: Final[timedelta] = timedelta(seconds=1) _MAX_CONCURRENCY: Final[NonNegativeInt] = 10 +_REMOVE_AFTER_IDLE_FOR: Final[timedelta] = timedelta(minutes=5) + async def _start_get_status_deferred( app: FastAPI, node_id: NodeID, *, next_check_delay: timedelta @@ -31,6 +33,34 @@ async def _start_get_status_deferred( await DeferredGetStatus.start(node_id=node_id) +def _can_be_removed(model: TrackedServiceModel) -> bool: + + # requested **as** `STOPPED` + # service **reports** `IDLE` + if ( + model.current_state == SchedulerServiceState.IDLE + and model.requested_state == UserRequestedState.STOPPED + ): + return True + + # NOTE: currently dynamic-scheduler does nto automatically start a + # service reported who's requested_state is STARTED + # to avoid monitoring services which no longer exist, + # the service has to be removed. + + # requested as `STARTED` + # service **reports** `IDLE` since `_REMOVE_AFTER_IDLE_FOR` + if ( # noqa: SIM103 + model.current_state == SchedulerServiceState.IDLE + and model.requested_state == UserRequestedState.RUNNING + and arrow.utcnow().timestamp() - model.last_state_change + > _REMOVE_AFTER_IDLE_FOR.total_seconds() + ): + return True + + return False + + class Monitor: def __init__(self, app: FastAPI, status_worker_interval: timedelta) -> None: self.app = app @@ -44,7 +74,7 @@ async def _worker_start_get_status_requests(self) -> None: """ Check if a service requires it's status to be polled. Note that the interval at which the status is polled can vary. - This is a relatively low resoruce check. + This is a relatively low resource check. """ # NOTE: this worker runs on only once across all instances of the scheduler @@ -59,11 +89,7 @@ async def _worker_start_get_status_requests(self) -> None: current_timestamp = arrow.utcnow().timestamp() for node_id, model in models.items(): - # check if service is idle and status polling should stop - if ( - model.current_state == SchedulerServiceState.IDLE - and model.requested_state == UserRequestedState.STOPPED - ): + if _can_be_removed(model): to_remove.append(node_id) continue diff --git a/services/dynamic-scheduler/tests/assets/legacy_tracked_service_model.bin b/services/dynamic-scheduler/tests/assets/legacy_tracked_service_model.bin new file mode 100644 index 0000000000000000000000000000000000000000..8c26d4e8ba55e797158d811e7e0ef6d171d5b013 GIT binary patch literal 232 zcmY+8O$x#=5QROCTaVyMM2H|1mqSRVHnt|3Oa$==Ucz0$Mg2ihToqS>C-63=Qbl+3 z-kW*ftWI*F`9ukDXy-~GIO3Md+y@OxmYbBH?<3kM{H4TpnA|&WQx2&q5O!uwLJaA3k}vKL2YV?o#mKi9@&) b>6_#o@R-nkYOj|sF+|#sik0A`jBdODt087* literal 0 HcmV?d00001 diff --git a/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py b/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py index 6b8e31321b3..077da84dcc7 100644 --- a/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py +++ b/services/dynamic-scheduler/tests/unit/service_tracker/test__models.py @@ -1,8 +1,17 @@ +# pylint: disable=redefined-outer-name + +from copy import deepcopy from datetime import timedelta +from pathlib import Path +from uuid import uuid4 import arrow import pytest from faker import Faker +from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( + DynamicServiceStart, +) +from models_library.projects import ProjectID from servicelib.deferred_tasks import TaskUID from simcore_service_dynamic_scheduler.services.service_tracker._models import ( SchedulerServiceState, @@ -38,11 +47,23 @@ def test_serialization( assert TrackedServiceModel.from_bytes(as_bytes) == tracked_model -async def test_set_check_status_after_to(): +@pytest.mark.parametrize( + "dynamic_service_start", + [ + None, + DynamicServiceStart.parse_obj( + DynamicServiceStart.Config.schema_extra["example"] + ), + ], +) +@pytest.mark.parametrize("project_id", [None, uuid4()]) +async def test_set_check_status_after_to( + dynamic_service_start: DynamicServiceStart | None, project_id: ProjectID | None +): model = TrackedServiceModel( - dynamic_service_start=None, + dynamic_service_start=dynamic_service_start, user_id=None, - project_id=None, + project_id=project_id, requested_state=UserRequestedState.RUNNING, ) assert model.check_status_after < arrow.utcnow().timestamp() @@ -55,3 +76,43 @@ async def test_set_check_status_after_to(): assert model.check_status_after assert before < model.check_status_after < after + + +async def test_legacy_format_compatibility(project_slug_dir: Path): + legacy_format_path = ( + project_slug_dir / "tests" / "assets" / "legacy_tracked_service_model.bin" + ) + assert legacy_format_path.exists() + + model_from_disk = TrackedServiceModel.from_bytes(legacy_format_path.read_bytes()) + + model = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=UserRequestedState.RUNNING, + # assume same dates are coming in + check_status_after=model_from_disk.check_status_after, + last_state_change=model_from_disk.last_state_change, + ) + + assert model_from_disk == model + + +def test_current_state_changes_updates_last_state_change(): + model = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=UserRequestedState.RUNNING, + ) + + last_changed = deepcopy(model.last_state_change) + model.current_state = SchedulerServiceState.IDLE + assert last_changed != model.last_state_change + + last_changed_2 = deepcopy(model.last_state_change) + model.current_state = SchedulerServiceState.IDLE + assert last_changed_2 == model.last_state_change + + assert last_changed != last_changed_2 diff --git a/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py b/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py index 2dd5270b627..c4c0bc8a9d8 100644 --- a/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py +++ b/services/dynamic-scheduler/tests/unit/status_monitor/test_services_status_monitor__monitor.py @@ -2,10 +2,12 @@ # pylint:disable=too-many-positional-arguments # pylint:disable=unused-argument +import itertools import json import re from collections.abc import AsyncIterable, Callable from copy import deepcopy +from datetime import timedelta from typing import Any from unittest.mock import AsyncMock from uuid import uuid4 @@ -32,11 +34,19 @@ set_request_as_running, set_request_as_stopped, ) +from simcore_service_dynamic_scheduler.services.service_tracker._models import ( + SchedulerServiceState, + TrackedServiceModel, + UserRequestedState, +) from simcore_service_dynamic_scheduler.services.status_monitor import _monitor from simcore_service_dynamic_scheduler.services.status_monitor._deferred_get_status import ( DeferredGetStatus, ) -from simcore_service_dynamic_scheduler.services.status_monitor._monitor import Monitor +from simcore_service_dynamic_scheduler.services.status_monitor._monitor import ( + Monitor, + _can_be_removed, +) from simcore_service_dynamic_scheduler.services.status_monitor._setup import get_monitor from tenacity import AsyncRetrying from tenacity.retry import retry_if_exception_type @@ -414,3 +424,72 @@ async def test_expected_calls_to_notify_frontend( # pylint:disable=too-many-arg # pylint:disable=protected-access await monitor._worker_start_get_status_requests() # noqa: SLF001 assert remove_tracked_spy.call_count == remove_tracked_count + + +@pytest.fixture +def mock_tracker_remove_after_idle_for(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_dynamic_scheduler.services.status_monitor._monitor._REMOVE_AFTER_IDLE_FOR", + timedelta(seconds=0.1), + ) + + +@pytest.mark.parametrize( + "requested_state, current_state, immediate_can_be_removed, can_be_removed", + [ + pytest.param( + UserRequestedState.RUNNING, + SchedulerServiceState.IDLE, + False, + True, + id="can_remove_after_an_interval", + ), + pytest.param( + UserRequestedState.STOPPED, + SchedulerServiceState.IDLE, + True, + True, + id="can_remove_no_interval", + ), + *[ + pytest.param( + requested_state, + service_state, + False, + False, + id=f"not_removed_{requested_state=}_{service_state=}", + ) + for requested_state, service_state in itertools.product( + set(UserRequestedState), + {x for x in SchedulerServiceState if x != SchedulerServiceState.IDLE}, + ) + ], + ], +) +async def test__can_be_removed( + mock_tracker_remove_after_idle_for: None, + requested_state: UserRequestedState, + current_state: SchedulerServiceState, + immediate_can_be_removed: bool, + can_be_removed: bool, +): + model = TrackedServiceModel( + dynamic_service_start=None, + user_id=None, + project_id=None, + requested_state=requested_state, + ) + + # This also triggers the setter and updates the last state change timer + model.current_state = current_state + + assert _can_be_removed(model) is immediate_can_be_removed + + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(2), + reraise=True, + retry=retry_if_exception_type(AssertionError), + ): + with attempt: + assert _can_be_removed(model) is can_be_removed diff --git a/services/osparc-gateway-server/requirements/_test.txt b/services/osparc-gateway-server/requirements/_test.txt index 797b272793e..a9fff835004 100644 --- a/services/osparc-gateway-server/requirements/_test.txt +++ b/services/osparc-gateway-server/requirements/_test.txt @@ -89,7 +89,7 @@ markupsafe==2.1.5 # via # -c requirements/../../dask-sidecar/requirements/_dask-distributed.txt # jinja2 -msgpack==1.0.8 +msgpack==1.1.0 # via # -c requirements/../../dask-sidecar/requirements/_dask-distributed.txt # distributed diff --git a/services/osparc-gateway-server/tests/system/requirements/_test.txt b/services/osparc-gateway-server/tests/system/requirements/_test.txt index 410339df3c6..0977f99f778 100644 --- a/services/osparc-gateway-server/tests/system/requirements/_test.txt +++ b/services/osparc-gateway-server/tests/system/requirements/_test.txt @@ -83,7 +83,7 @@ markupsafe==2.1.5 # via # -c requirements/../../../../dask-sidecar/requirements/_dask-distributed.txt # jinja2 -msgpack==1.0.8 +msgpack==1.1.0 # via # -c requirements/../../../../dask-sidecar/requirements/_dask-distributed.txt # distributed diff --git a/services/web/server/requirements/_base.txt b/services/web/server/requirements/_base.txt index df540ac4b88..d566b8d9112 100644 --- a/services/web/server/requirements/_base.txt +++ b/services/web/server/requirements/_base.txt @@ -255,7 +255,7 @@ markupsafe==2.1.1 # mako mdurl==0.1.2 # via markdown-it-py -msgpack==1.0.7 +msgpack==1.1.0 # via -r requirements/_base.in multidict==6.1.0 # via