Skip to content

Commit

Permalink
✨Autoscaling: add a delay before draining a node (#5843)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored May 21, 2024
1 parent 4121401 commit 83c5f31
Show file tree
Hide file tree
Showing 16 changed files with 355 additions and 166 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
cov.xml
*.cover
.hypothesis/
.pytest_cache/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from functools import cached_property
from typing import Any, ClassVar

from pydantic import Field, SecretStr, validator

Expand All @@ -24,7 +25,7 @@ class RegistrySettings(BaseCustomSettings):

@validator("REGISTRY_PATH", pre=True)
@classmethod
def escape_none_string(cls, v):
def escape_none_string(cls, v) -> Any | None:
return None if v == "None" else v

@cached_property
Expand All @@ -34,3 +35,16 @@ def resolved_registry_url(self) -> str:
@cached_property
def api_url(self) -> str:
return f"{self.REGISTRY_URL}/v2"

class Config(BaseCustomSettings.Config):
schema_extra: ClassVar[dict[str, Any]] = { # type: ignore[misc]
"examples": [
{
"REGISTRY_AUTH": "True",
"REGISTRY_USER": "theregistryuser",
"REGISTRY_PW": "some_secret_value",
"REGISTRY_SSL": "True",
"REGISTRY_URL": "registry.osparc-master.speag.com",
}
],
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class EC2InstancesSettings(BaseCustomSettings):
" (https://docs.aws.amazon.com/vpc/latest/userguide/configure-subnets.html), "
"this is required to start a new EC2 instance",
)
EC2_INSTANCES_TIME_BEFORE_DRAINING: datetime.timedelta = Field(
default=datetime.timedelta(seconds=20),
description="Time after which an EC2 instance may be drained (10s<=T<=1 minutes, is automatically capped)"
"(default to seconds, or see https://pydantic-docs.helpmanual.io/usage/types/#datetime-types for string formating)",
)
EC2_INSTANCES_TIME_BEFORE_TERMINATION: datetime.timedelta = Field(
default=datetime.timedelta(minutes=1),
description="Time after which an EC2 instance may be terminated (0<=T<=59 minutes, is automatically capped)"
Expand All @@ -111,9 +116,22 @@ class EC2InstancesSettings(BaseCustomSettings):
"a tag must have a key and an optional value. see [https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/Using_Tags.html]",
)

@validator("EC2_INSTANCES_TIME_BEFORE_DRAINING")
@classmethod
def ensure_draining_delay_time_is_in_range(
cls, value: datetime.timedelta
) -> datetime.timedelta:
if value < datetime.timedelta(seconds=10):
value = datetime.timedelta(seconds=10)
elif value > datetime.timedelta(minutes=1):
value = datetime.timedelta(minutes=1)
return value

@validator("EC2_INSTANCES_TIME_BEFORE_TERMINATION")
@classmethod
def ensure_time_is_in_range(cls, value):
def ensure_termination_delay_time_is_in_range(
cls, value: datetime.timedelta
) -> datetime.timedelta:
if value < datetime.timedelta(minutes=0):
value = datetime.timedelta(minutes=0)
elif value > datetime.timedelta(minutes=59):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def __post_init__(self) -> None:
if self.available_resources == Resources.create_as_empty():
object.__setattr__(self, "available_resources", self.ec2_instance.resources)

def has_assigned_tasks(self) -> bool:
return bool(self.available_resources < self.ec2_instance.resources)


@dataclass(frozen=True, kw_only=True, slots=True)
class AssociatedInstance(_BaseInstance):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,23 +751,61 @@ async def _scale_up_cluster(
return cluster


async def _find_drainable_nodes(
app: FastAPI, cluster: Cluster
) -> list[AssociatedInstance]:
app_settings: ApplicationSettings = app.state.settings
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

if not cluster.active_nodes:
# there is nothing to drain here
return []

# get the corresponding ec2 instance data
drainable_nodes: list[AssociatedInstance] = []

for instance in cluster.active_nodes:
if instance.has_assigned_tasks():
await utils_docker.set_node_found_empty(
get_docker_client(app), instance.node, empty=False
)
continue
node_last_empty = await utils_docker.get_node_empty_since(instance.node)
if not node_last_empty:
await utils_docker.set_node_found_empty(
get_docker_client(app), instance.node, empty=True
)
continue
elapsed_time_since_empty = arrow.utcnow().datetime - node_last_empty
_logger.debug("%s", f"{node_last_empty=}, {elapsed_time_since_empty=}")
if (
elapsed_time_since_empty
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
):
drainable_nodes.append(instance)
else:
_logger.info(
"%s has still %ss before being drainable",
f"{instance.ec2_instance.id=}",
f"{(app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING - elapsed_time_since_empty).total_seconds():.0f}",
)

if drainable_nodes:
_logger.info(
"the following nodes were found to be drainable: '%s'",
f"{[instance.node.Description.Hostname for instance in drainable_nodes if instance.node.Description]}",
)
return drainable_nodes


async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
app_settings = get_application_settings(app)
docker_client = get_docker_client(app)
active_empty_instances: list[AssociatedInstance] = []
active_non_empty_instances: list[AssociatedInstance] = []
for instance in cluster.active_nodes:
if instance.available_resources == instance.ec2_instance.resources:
active_empty_instances.append(instance)
else:
active_non_empty_instances.append(instance)
active_empty_instances = await _find_drainable_nodes(app, cluster)

if not active_empty_instances:
return cluster
_logger.info(
"following nodes will be drained: '%s'",
f"{[instance.node.Description.Hostname for instance in active_empty_instances if instance.node.Description]}",
)

# drain this empty nodes
updated_nodes: list[Node] = await asyncio.gather(
*(
Expand All @@ -782,7 +820,7 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
)
if updated_nodes:
_logger.info(
"following nodes set to drain: '%s'",
"following nodes were set to drain: '%s'",
f"{[node.Description.Hostname for node in updated_nodes if node.Description]}",
)
newly_drained_instances = [
Expand All @@ -791,7 +829,9 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster:
]
return dataclasses.replace(
cluster,
active_nodes=active_non_empty_instances,
active_nodes=[
n for n in cluster.active_nodes if n not in active_empty_instances
],
drained_nodes=cluster.drained_nodes + newly_drained_instances,
)

Expand All @@ -814,7 +854,7 @@ async def _find_terminateable_instances(
elapsed_time_since_drained = (
datetime.datetime.now(datetime.timezone.utc) - node_last_updated
)
_logger.warning("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}")
_logger.debug("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}")
if (
elapsed_time_since_drained
> app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
]


_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: Final[DockerLabelKey] = parse_obj_as(
DockerLabelKey, "io.simcore.osparc-node-found-empty"
)


async def get_monitored_nodes(
docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey]
) -> list[Node]:
Expand Down Expand Up @@ -609,6 +614,38 @@ def get_node_last_readyness_update(node: Node) -> datetime.datetime:
) # mypy


async def set_node_found_empty(
docker_client: AutoscalingDocker,
node: Node,
*,
empty: bool,
) -> Node:
assert node.Spec # nosec
new_tags = deepcopy(cast(dict[DockerLabelKey, str], node.Spec.Labels))
if empty:
new_tags[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = arrow.utcnow().isoformat()
else:
new_tags.pop(_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, None)
return await tag_node(
docker_client,
node,
tags=new_tags,
available=bool(node.Spec.Availability is Availability.active),
)


async def get_node_empty_since(node: Node) -> datetime.datetime | None:
"""returns the last time when the node was found empty or None if it was not empty"""
assert node.Spec # nosec
assert node.Spec.Labels # nosec
if _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY not in node.Spec.Labels:
return None
return cast(
datetime.datetime,
arrow.get(node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY]).datetime,
) # mypy


async def attach_node(
app_settings: ApplicationSettings,
docker_client: AutoscalingDocker,
Expand Down
1 change: 1 addition & 0 deletions services/autoscaling/tests/manual/.env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ AUTOSCALING_EC2_REGION_NAME=us-east-1
AUTOSCALING_REMOTE_DEBUGGING_PORT=3000
EC2_INSTANCES_MACHINES_BUFFER=0
EC2_INSTANCES_MAX_INSTANCES=20
EC2_INSTANCES_TIME_BEFORE_DRAINING="00:00:10"
EC2_INSTANCES_TIME_BEFORE_TERMINATION="00:03:00"
EC2_INSTANCES_ALLOWED_TYPES='{"t2.micro": {"ami_id": "XXXXXXXX", "custom_boot_scripts": ["whoami"], "pre_pull_images": ["ubuntu:latest"]}}'
EC2_INSTANCES_KEY_NAME=XXXXXXXXXX
Expand Down
20 changes: 20 additions & 0 deletions services/autoscaling/tests/unit/test_core_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,26 @@ def test_defining_both_computational_and_dynamic_modes_is_invalid_and_raises(
ApplicationSettings.create_from_envs()


def test_invalid_EC2_INSTANCES_TIME_BEFORE_DRAINING( # noqa: N802
app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch
):
setenvs_from_dict(monkeypatch, {"EC2_INSTANCES_TIME_BEFORE_DRAINING": "1:05:00"})
settings = ApplicationSettings.create_from_envs()
assert settings.AUTOSCALING_EC2_INSTANCES
assert settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
assert (
datetime.timedelta(minutes=1)
== settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
)
setenvs_from_dict(monkeypatch, {"EC2_INSTANCES_TIME_BEFORE_DRAINING": "-1:05:00"})
settings = ApplicationSettings.create_from_envs()
assert settings.AUTOSCALING_EC2_INSTANCES
assert (
datetime.timedelta(seconds=10)
== settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING
)


def test_invalid_EC2_INSTANCES_TIME_BEFORE_TERMINATION( # noqa: N802
app_environment: EnvVarsDict, monkeypatch: pytest.MonkeyPatch
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from simcore_service_autoscaling.modules.docker import get_docker_client
from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API
from simcore_service_autoscaling.utils.utils_docker import (
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY,
_OSPARC_SERVICE_READY_LABEL_KEY,
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY,
)
Expand Down Expand Up @@ -581,7 +582,40 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
# 4. now scaling down, as we deleted all the tasks
#
del dask_future
await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode)
mock_dask_is_worker_connected.assert_called_once()
mock_dask_is_worker_connected.reset_mock()
mock_dask_get_worker_has_results_in_memory.assert_called()
assert mock_dask_get_worker_has_results_in_memory.call_count == 2
mock_dask_get_worker_has_results_in_memory.reset_mock()
mock_dask_get_worker_used_resources.assert_called()
assert mock_dask_get_worker_used_resources.call_count == 2
mock_dask_get_worker_used_resources.reset_mock()
# the node shall be waiting before draining
mock_docker_set_node_availability.assert_not_called()
mock_docker_tag_node.assert_called_once_with(
get_docker_client(initialized_app),
fake_attached_node,
tags=fake_attached_node.Spec.Labels
| {
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY,
},
available=True,
)
mock_docker_tag_node.reset_mock()

# now update the fake node to have the required label as expected
assert app_settings.AUTOSCALING_EC2_INSTANCES
fake_attached_node.Spec.Labels[_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY] = (
arrow.utcnow()
.shift(
seconds=-app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING.total_seconds()
- 1
)
.datetime.isoformat()
)

# now it will drain
await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode)
mock_dask_is_worker_connected.assert_called_once()
mock_dask_is_worker_connected.reset_mock()
Expand All @@ -598,6 +632,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915
fake_attached_node,
tags=fake_attached_node.Spec.Labels
| {
_OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY: mock.ANY,
_OSPARC_SERVICE_READY_LABEL_KEY: "false",
_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: mock.ANY,
},
Expand Down
Loading

0 comments on commit 83c5f31

Please sign in to comment.