Skip to content

Commit

Permalink
🐛 Fixed issue with accumulating tracked services (#6631)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrei Neagu <[email protected]>
  • Loading branch information
GitHK and Andrei Neagu authored Oct 30, 2024
1 parent e994f5d commit f6a4e68
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 65 deletions.
2 changes: 1 addition & 1 deletion services/autoscaling/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ markupsafe==2.1.5
# jinja2
mdurl==0.1.2
# via markdown-it-py
msgpack==1.0.8
msgpack==1.1.0
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# distributed
Expand Down
2 changes: 1 addition & 1 deletion services/catalog/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ markupsafe==2.1.5
# mako
mdurl==0.1.2
# via markdown-it-py
msgpack==1.0.8
msgpack==1.1.0
# via aiocache
multidict==6.0.5
# via
Expand Down
2 changes: 1 addition & 1 deletion services/clusters-keeper/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ markupsafe==2.1.5
# jinja2
mdurl==0.1.2
# via markdown-it-py
msgpack==1.0.8
msgpack==1.1.0
# via
# -c requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# distributed
Expand Down
2 changes: 1 addition & 1 deletion services/dask-sidecar/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ markupsafe==2.1.5
# via jinja2
mdurl==0.1.2
# via markdown-it-py
msgpack==1.0.8
msgpack==1.1.0
# via distributed
multidict==6.0.5
# via
Expand Down
2 changes: 1 addition & 1 deletion services/dask-sidecar/requirements/_dask-distributed.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ markupsafe==2.1.5
# via
# -c requirements/./_base.txt
# jinja2
msgpack==1.0.8
msgpack==1.1.0
# via
# -c requirements/./_base.txt
# distributed
Expand Down
2 changes: 1 addition & 1 deletion services/director-v2/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ markupsafe==2.1.5
# mako
mdurl==0.1.2
# via markdown-it-py
msgpack==1.0.8
msgpack==1.1.0
# via
# -r requirements/../../../services/dask-sidecar/requirements/_dask-distributed.txt
# aiocache
Expand Down
2 changes: 1 addition & 1 deletion services/director-v2/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ markupsafe==2.1.5
# -c requirements/_base.txt
# jinja2
# mako
msgpack==1.0.8
msgpack==1.1.0
# via
# -c requirements/_base.txt
# distributed
Expand Down
1 change: 1 addition & 0 deletions services/dynamic-scheduler/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ httpx
packaging
python-socketio
typer[all]
u-msgpack-python
uvicorn[standard]
2 changes: 2 additions & 0 deletions services/dynamic-scheduler/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ typing-extensions==4.10.0
# opentelemetry-sdk
# pydantic
# typer
u-msgpack-python==2.8.0
# via -r requirements/_base.in
urllib3==2.2.2
# via
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
import pickle
from dataclasses import dataclass, field
from datetime import timedelta
from decimal import Decimal
from enum import auto
from typing import Any, Callable, Final
from uuid import UUID

import arrow
import umsgpack # type: ignore[import-untyped]
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.projects import ProjectID
from models_library.users import UserID
from models_library.utils.enums import StrAutoEnum
from pydantic import BaseModel, Field
from servicelib.deferred_tasks import TaskUID

# `umsgpack.Ext`` extension types are part of the msgpack specification
# allows to define serialization and deserialization rules for custom types
# see https://github.com/msgpack/msgpack/blob/master/spec.md#extension-types

_UUID_TYPE: Final[int] = 0x00
_DECIMAL_TYPE: Final[int] = 0x01

_PACKB_EXTENSION_TYPES: Final[dict[type[Any], Callable[[Any], umsgpack.Ext]]] = {
# helpers to serialize an object to bytes
UUID: lambda obj: umsgpack.Ext(_UUID_TYPE, obj.bytes),
Decimal: lambda obj: umsgpack.Ext(_DECIMAL_TYPE, f"{obj}".encode()),
}

_UNPACKB_EXTENSION_TYPES: Final[dict[int, Callable[[umsgpack.Ext], Any]]] = {
# helpers to deserialize an object from bytes
_UUID_TYPE: lambda ext: UUID(bytes=ext.data),
_DECIMAL_TYPE: lambda ext: Decimal(ext.data.decode()),
}


class UserRequestedState(StrAutoEnum):
RUNNING = auto()
Expand All @@ -35,74 +57,67 @@ class SchedulerServiceState(StrAutoEnum):
UNKNOWN = auto()


@dataclass
class TrackedServiceModel: # pylint:disable=too-many-instance-attributes
class TrackedServiceModel(BaseModel): # pylint:disable=too-many-instance-attributes

dynamic_service_start: DynamicServiceStart | None = field(
metadata={
"description": (
"used to create the service in any given moment if the requested_state is RUNNING"
"can be set to None only when stopping the service"
)
}
dynamic_service_start: DynamicServiceStart | None = Field(
description=(
"used to create the service in any given moment if the requested_state is RUNNING"
"can be set to None only when stopping the service"
)
)

user_id: UserID | None = field(
metadata={
"description": "required for propagating status changes to the frontend"
}
user_id: UserID | None = Field(
description="required for propagating status changes to the frontend"
)
project_id: ProjectID | None = field(
metadata={
"description": "required for propagating status changes to the frontend"
}
project_id: ProjectID | None = Field(
description="required for propagating status changes to the frontend"
)

requested_state: UserRequestedState = field(
metadata={
"description": (
"status of the service desidered by the user RUNNING or STOPPED"
)
}
requested_state: UserRequestedState = Field(
description=("status of the service desidered by the user RUNNING or STOPPED")
)

current_state: SchedulerServiceState = field(
current_state: SchedulerServiceState = Field(
default=SchedulerServiceState.UNKNOWN,
metadata={
"description": "to set after parsing the incoming state via the API calls"
},
description="to set after parsing the incoming state via the API calls",
)

def __setattr__(self, name, value):
if name == "current_state" and value != self.current_state:
self.last_state_change = arrow.utcnow().timestamp()
super().__setattr__(name, value)

last_state_change: float = Field(
default_factory=lambda: arrow.utcnow().timestamp(),
metadata={"description": "keeps track when the current_state was last updated"},
)

#############################
### SERVICE STATUS UPDATE ###
#############################

scheduled_to_run: bool = field(
scheduled_to_run: bool = Field(
default=False,
metadata={"description": "set when a job will be immediately scheduled"},
description="set when a job will be immediately scheduled",
)

service_status: str = field(
service_status: str = Field(
default="",
metadata={
"description": "stored for debug mainly this is used to compute ``current_state``"
},
description="stored for debug mainly this is used to compute ``current_state``",
)
service_status_task_uid: TaskUID | None = field(
service_status_task_uid: TaskUID | None = Field(
default=None,
metadata={"description": "uid of the job currently fetching the status"},
description="uid of the job currently fetching the status",
)

check_status_after: float = field(
check_status_after: float = Field(
default_factory=lambda: arrow.utcnow().timestamp(),
metadata={"description": "used to determine when to poll the status again"},
description="used to determine when to poll the status again",
)

last_status_notification: float = field(
last_status_notification: float = Field(
default=0,
metadata={
"description": "used to determine when was the last time the status was notified"
},
description="used to determine when was the last time the status was notified",
)

def set_check_status_after_to(self, delay_from_now: timedelta) -> None:
Expand All @@ -116,8 +131,10 @@ def set_last_status_notification_to_now(self) -> None:
#####################

def to_bytes(self) -> bytes:
return pickle.dumps(self)
result: bytes = umsgpack.packb(self.dict(), ext_handlers=_PACKB_EXTENSION_TYPES)
return result

@classmethod
def from_bytes(cls, data: bytes) -> "TrackedServiceModel":
return pickle.loads(data) # type: ignore # noqa: S301
unpacked_data = umsgpack.unpackb(data, ext_handlers=_UNPACKB_EXTENSION_TYPES)
return cls(**unpacked_data)
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
_INTERVAL_BETWEEN_CHECKS: Final[timedelta] = timedelta(seconds=1)
_MAX_CONCURRENCY: Final[NonNegativeInt] = 10

_REMOVE_AFTER_IDLE_FOR: Final[timedelta] = timedelta(minutes=5)


async def _start_get_status_deferred(
app: FastAPI, node_id: NodeID, *, next_check_delay: timedelta
Expand All @@ -31,6 +33,34 @@ async def _start_get_status_deferred(
await DeferredGetStatus.start(node_id=node_id)


def _can_be_removed(model: TrackedServiceModel) -> bool:

# requested **as** `STOPPED`
# service **reports** `IDLE`
if (
model.current_state == SchedulerServiceState.IDLE
and model.requested_state == UserRequestedState.STOPPED
):
return True

# NOTE: currently dynamic-scheduler does nto automatically start a
# service reported who's requested_state is STARTED
# to avoid monitoring services which no longer exist,
# the service has to be removed.

# requested as `STARTED`
# service **reports** `IDLE` since `_REMOVE_AFTER_IDLE_FOR`
if ( # noqa: SIM103
model.current_state == SchedulerServiceState.IDLE
and model.requested_state == UserRequestedState.RUNNING
and arrow.utcnow().timestamp() - model.last_state_change
> _REMOVE_AFTER_IDLE_FOR.total_seconds()
):
return True

return False


class Monitor:
def __init__(self, app: FastAPI, status_worker_interval: timedelta) -> None:
self.app = app
Expand All @@ -44,7 +74,7 @@ async def _worker_start_get_status_requests(self) -> None:
"""
Check if a service requires it's status to be polled.
Note that the interval at which the status is polled can vary.
This is a relatively low resoruce check.
This is a relatively low resource check.
"""

# NOTE: this worker runs on only once across all instances of the scheduler
Expand All @@ -59,11 +89,7 @@ async def _worker_start_get_status_requests(self) -> None:
current_timestamp = arrow.utcnow().timestamp()

for node_id, model in models.items():
# check if service is idle and status polling should stop
if (
model.current_state == SchedulerServiceState.IDLE
and model.requested_state == UserRequestedState.STOPPED
):
if _can_be_removed(model):
to_remove.append(node_id)
continue

Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
# pylint: disable=redefined-outer-name

from copy import deepcopy
from datetime import timedelta
from pathlib import Path
from uuid import uuid4

import arrow
import pytest
from faker import Faker
from models_library.api_schemas_dynamic_scheduler.dynamic_services import (
DynamicServiceStart,
)
from models_library.projects import ProjectID
from servicelib.deferred_tasks import TaskUID
from simcore_service_dynamic_scheduler.services.service_tracker._models import (
SchedulerServiceState,
Expand Down Expand Up @@ -38,11 +47,23 @@ def test_serialization(
assert TrackedServiceModel.from_bytes(as_bytes) == tracked_model


async def test_set_check_status_after_to():
@pytest.mark.parametrize(
"dynamic_service_start",
[
None,
DynamicServiceStart.parse_obj(
DynamicServiceStart.Config.schema_extra["example"]
),
],
)
@pytest.mark.parametrize("project_id", [None, uuid4()])
async def test_set_check_status_after_to(
dynamic_service_start: DynamicServiceStart | None, project_id: ProjectID | None
):
model = TrackedServiceModel(
dynamic_service_start=None,
dynamic_service_start=dynamic_service_start,
user_id=None,
project_id=None,
project_id=project_id,
requested_state=UserRequestedState.RUNNING,
)
assert model.check_status_after < arrow.utcnow().timestamp()
Expand All @@ -55,3 +76,43 @@ async def test_set_check_status_after_to():

assert model.check_status_after
assert before < model.check_status_after < after


async def test_legacy_format_compatibility(project_slug_dir: Path):
legacy_format_path = (
project_slug_dir / "tests" / "assets" / "legacy_tracked_service_model.bin"
)
assert legacy_format_path.exists()

model_from_disk = TrackedServiceModel.from_bytes(legacy_format_path.read_bytes())

model = TrackedServiceModel(
dynamic_service_start=None,
user_id=None,
project_id=None,
requested_state=UserRequestedState.RUNNING,
# assume same dates are coming in
check_status_after=model_from_disk.check_status_after,
last_state_change=model_from_disk.last_state_change,
)

assert model_from_disk == model


def test_current_state_changes_updates_last_state_change():
model = TrackedServiceModel(
dynamic_service_start=None,
user_id=None,
project_id=None,
requested_state=UserRequestedState.RUNNING,
)

last_changed = deepcopy(model.last_state_change)
model.current_state = SchedulerServiceState.IDLE
assert last_changed != model.last_state_change

last_changed_2 = deepcopy(model.last_state_change)
model.current_state = SchedulerServiceState.IDLE
assert last_changed_2 == model.last_state_change

assert last_changed != last_changed_2
Loading

0 comments on commit f6a4e68

Please sign in to comment.