diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 5a675a1bb64..ed78e4a9cbd 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -214,11 +214,13 @@ class DynamicSidecarSettings(BaseCustomSettings): ), ) DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID: PositiveFloat = Field( - 5 * MINS, + 60 * MINS, description=( "After starting the dynamic-sidecar its docker_node_id is required. " "This operation can be slow based on system load, sometimes docker " "swarm takes more than seconds to assign the node." + "Autoscaling of nodes takes time, it is required to wait longer" + "for nodes to be assigned." ), ) DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: PositiveFloat = Field( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py index 5a4f88baf29..545dee2b89d 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/__init__.py @@ -3,11 +3,11 @@ constrain_service_to_node, create_network, create_service_and_get_id, + get_dynamic_sidecar_placement, get_dynamic_sidecar_state, get_dynamic_sidecars_to_observe, get_or_create_networks_ids, get_projects_networks_containers, - get_service_placement, get_swarm_network, inspect_service, is_dynamic_service_running, @@ -29,7 +29,7 @@ "get_dynamic_sidecars_to_observe", "get_or_create_networks_ids", "get_projects_networks_containers", - "get_service_placement", + "get_dynamic_sidecar_placement", "get_swarm_network", "inspect_service", "is_dynamic_service_running", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py index 2f18ca8b744..70d5ba0f5f1 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_api/_core.py @@ -1,7 +1,5 @@ -import asyncio import json import logging -import time from typing import Any, Mapping, Optional, Union import aiodocker @@ -14,11 +12,11 @@ from models_library.users import UserID from servicelib.json_serialization import json_dumps from servicelib.utils import logged_gather -from tenacity import TryAgain +from tenacity import TryAgain, retry from tenacity._asyncio import AsyncRetrying from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay -from tenacity.wait import wait_exponential +from tenacity.wait import wait_exponential, wait_random_exponential from ....core.settings import DynamicSidecarSettings from ....models.schemas.constants import ( @@ -137,74 +135,58 @@ async def get_dynamic_sidecars_to_observe( ] -async def _extract_task_data_from_service_for_state( - service_id: str, - dynamic_sidecar_settings: DynamicSidecarSettings, - target_statuses: set[str], -) -> dict[str, Any]: - """Waits until the dynamic-sidecar task is in one of the target_statuses - and then returns the task""" - - async def _sleep_or_error(started: float, task: dict): - await asyncio.sleep(1.0) - elapsed = time.time() - started - if ( - elapsed - > dynamic_sidecar_settings.DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID - ): - raise DynamicSidecarError( - "Timed out while searching for an assigned NodeID for " - f"service_id={service_id}. Last task inspect result: {task}" - ) - - async with docker_client() as client: - service_state: Optional[str] = None - task: dict[str, Any] = {} - - started = time.time() +async def get_dynamic_sidecar_placement( + service_id: str, dynamic_sidecar_settings: DynamicSidecarSettings +) -> str: + """ + Waits until the service has a task in `running` state and + returns it's `docker_node_id`. + It is assumed that a `docker_node_id` exists if the service + is in `running` state. + """ - while service_state not in target_statuses: + # NOTE: `wait_random_exponential` is key for reducing pressure on docker swarm + # The idea behind it is to avoid having concurrent retrying calls + # when the system is having issues to respond. If the system + # is failing clients are retrying at the same time, + # it makes harder to recover. + # Ideally you'd like to distribute the retries uniformly in time. + # For more details see `wait_random_exponential` documentation. + @retry( + wait=wait_random_exponential(multiplier=2, min=1, max=20), + stop=stop_after_delay( + dynamic_sidecar_settings.DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID + ), + ) + async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, Any]: + """ + Waits for dynamic-sidecar task to be `running` and returns the + task data. + """ + async with docker_client() as client: running_services = await client.tasks.list( filters={"service": f"{service_id}"} ) - service_container_count = len(running_services) - # the service could not be started yet, let's wait for the next iteration? if service_container_count == 0: - await _sleep_or_error(started=started, task={}) - continue + raise TryAgain() - # The service might have more then one task because the previous might have died out - # Only interested in the latest Task/container as only 1 container per service - # is being run + # The service might have more then one task because the + # previous might have died out. + # Only interested in the latest task as only one task per + # service will be running. sorted_tasks = sorted(running_services, key=lambda task: task["UpdatedAt"]) task = sorted_tasks[-1] service_state = task["Status"]["State"] - # avoids waiting 1 extra second when the container is already - # up, this will be the case the majority of times - if service_state in target_statuses: - continue - - await _sleep_or_error(started=started, task=task) + if service_state not in TASK_STATES_RUNNING: + raise TryAgain() - return task + return task - -async def get_service_placement( - service_id: str, dynamic_sidecar_settings: DynamicSidecarSettings -) -> str: - """Awaits until the service has a running task and returns the - node's ID where it is running. When in a running state, the service - is most certainly has a NodeID assigned""" - - task = await _extract_task_data_from_service_for_state( - service_id=service_id, - dynamic_sidecar_settings=dynamic_sidecar_settings, - target_statuses=TASK_STATES_RUNNING, - ) + task = await _get_task_data_when_service_running(service_id=service_id) docker_node_id = task.get("NodeID", None) if not docker_node_id: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py index e455b9d9a1c..0a00f8b7429 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/events.py @@ -38,7 +38,7 @@ constrain_service_to_node, create_network, create_service_and_get_id, - get_service_placement, + get_dynamic_sidecar_placement, get_swarm_network, is_dynamic_sidecar_stack_missing, ) @@ -185,8 +185,10 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None: dynamic_sidecar_service_final_spec ) # constrain service to the same node - scheduler_data.dynamic_sidecar.docker_node_id = await get_service_placement( - dynamic_sidecar_id, dynamic_sidecar_settings + scheduler_data.dynamic_sidecar.docker_node_id = ( + await get_dynamic_sidecar_placement( + dynamic_sidecar_id, dynamic_sidecar_settings + ) ) await constrain_service_to_node( service_name=scheduler_data.service_name, diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py index aaa2e7a2e81..2a80d52e4e6 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_api.py @@ -504,7 +504,7 @@ async def test_dynamic_sidecar_in_running_state_and_node_id_is_recovered( ) assert service_id - node_id = await docker_api.get_service_placement( + node_id = await docker_api.get_dynamic_sidecar_placement( service_id, dynamic_sidecar_settings ) assert node_id