Skip to content

Commit

Permalink
♻️ dy-sidecars wait for resources before failing (#3388)
Browse files Browse the repository at this point in the history
  • Loading branch information
GitHK authored Sep 27, 2022
1 parent 7789d30 commit 733e0f2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,13 @@ class DynamicSidecarSettings(BaseCustomSettings):
),
)
DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID: PositiveFloat = Field(
5 * MINS,
60 * MINS,
description=(
"After starting the dynamic-sidecar its docker_node_id is required. "
"This operation can be slow based on system load, sometimes docker "
"swarm takes more than seconds to assign the node."
"Autoscaling of nodes takes time, it is required to wait longer"
"for nodes to be assigned."
),
)
DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: PositiveFloat = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
constrain_service_to_node,
create_network,
create_service_and_get_id,
get_dynamic_sidecar_placement,
get_dynamic_sidecar_state,
get_dynamic_sidecars_to_observe,
get_or_create_networks_ids,
get_projects_networks_containers,
get_service_placement,
get_swarm_network,
inspect_service,
is_dynamic_service_running,
Expand All @@ -29,7 +29,7 @@
"get_dynamic_sidecars_to_observe",
"get_or_create_networks_ids",
"get_projects_networks_containers",
"get_service_placement",
"get_dynamic_sidecar_placement",
"get_swarm_network",
"inspect_service",
"is_dynamic_service_running",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import asyncio
import json
import logging
import time
from typing import Any, Mapping, Optional, Union

import aiodocker
Expand All @@ -14,11 +12,11 @@
from models_library.users import UserID
from servicelib.json_serialization import json_dumps
from servicelib.utils import logged_gather
from tenacity import TryAgain
from tenacity import TryAgain, retry
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_exponential
from tenacity.wait import wait_exponential, wait_random_exponential

from ....core.settings import DynamicSidecarSettings
from ....models.schemas.constants import (
Expand Down Expand Up @@ -137,74 +135,58 @@ async def get_dynamic_sidecars_to_observe(
]


async def _extract_task_data_from_service_for_state(
service_id: str,
dynamic_sidecar_settings: DynamicSidecarSettings,
target_statuses: set[str],
) -> dict[str, Any]:
"""Waits until the dynamic-sidecar task is in one of the target_statuses
and then returns the task"""

async def _sleep_or_error(started: float, task: dict):
await asyncio.sleep(1.0)
elapsed = time.time() - started
if (
elapsed
> dynamic_sidecar_settings.DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID
):
raise DynamicSidecarError(
"Timed out while searching for an assigned NodeID for "
f"service_id={service_id}. Last task inspect result: {task}"
)

async with docker_client() as client:
service_state: Optional[str] = None
task: dict[str, Any] = {}

started = time.time()
async def get_dynamic_sidecar_placement(
service_id: str, dynamic_sidecar_settings: DynamicSidecarSettings
) -> str:
"""
Waits until the service has a task in `running` state and
returns it's `docker_node_id`.
It is assumed that a `docker_node_id` exists if the service
is in `running` state.
"""

while service_state not in target_statuses:
# NOTE: `wait_random_exponential` is key for reducing pressure on docker swarm
# The idea behind it is to avoid having concurrent retrying calls
# when the system is having issues to respond. If the system
# is failing clients are retrying at the same time,
# it makes harder to recover.
# Ideally you'd like to distribute the retries uniformly in time.
# For more details see `wait_random_exponential` documentation.
@retry(
wait=wait_random_exponential(multiplier=2, min=1, max=20),
stop=stop_after_delay(
dynamic_sidecar_settings.DYNAMIC_SIDECAR_TIMEOUT_FETCH_DYNAMIC_SIDECAR_NODE_ID
),
)
async def _get_task_data_when_service_running(service_id: str) -> Mapping[str, Any]:
"""
Waits for dynamic-sidecar task to be `running` and returns the
task data.
"""
async with docker_client() as client:
running_services = await client.tasks.list(
filters={"service": f"{service_id}"}
)

service_container_count = len(running_services)

# the service could not be started yet, let's wait for the next iteration?
if service_container_count == 0:
await _sleep_or_error(started=started, task={})
continue
raise TryAgain()

# The service might have more then one task because the previous might have died out
# Only interested in the latest Task/container as only 1 container per service
# is being run
# The service might have more then one task because the
# previous might have died out.
# Only interested in the latest task as only one task per
# service will be running.
sorted_tasks = sorted(running_services, key=lambda task: task["UpdatedAt"])

task = sorted_tasks[-1]
service_state = task["Status"]["State"]

# avoids waiting 1 extra second when the container is already
# up, this will be the case the majority of times
if service_state in target_statuses:
continue

await _sleep_or_error(started=started, task=task)
if service_state not in TASK_STATES_RUNNING:
raise TryAgain()

return task
return task


async def get_service_placement(
service_id: str, dynamic_sidecar_settings: DynamicSidecarSettings
) -> str:
"""Awaits until the service has a running task and returns the
node's ID where it is running. When in a running state, the service
is most certainly has a NodeID assigned"""

task = await _extract_task_data_from_service_for_state(
service_id=service_id,
dynamic_sidecar_settings=dynamic_sidecar_settings,
target_statuses=TASK_STATES_RUNNING,
)
task = await _get_task_data_when_service_running(service_id=service_id)

docker_node_id = task.get("NodeID", None)
if not docker_node_id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
constrain_service_to_node,
create_network,
create_service_and_get_id,
get_service_placement,
get_dynamic_sidecar_placement,
get_swarm_network,
is_dynamic_sidecar_stack_missing,
)
Expand Down Expand Up @@ -185,8 +185,10 @@ async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
dynamic_sidecar_service_final_spec
)
# constrain service to the same node
scheduler_data.dynamic_sidecar.docker_node_id = await get_service_placement(
dynamic_sidecar_id, dynamic_sidecar_settings
scheduler_data.dynamic_sidecar.docker_node_id = (
await get_dynamic_sidecar_placement(
dynamic_sidecar_id, dynamic_sidecar_settings
)
)
await constrain_service_to_node(
service_name=scheduler_data.service_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async def test_dynamic_sidecar_in_running_state_and_node_id_is_recovered(
)
assert service_id

node_id = await docker_api.get_service_placement(
node_id = await docker_api.get_dynamic_sidecar_placement(
service_id, dynamic_sidecar_settings
)
assert node_id
Expand Down

0 comments on commit 733e0f2

Please sign in to comment.