Skip to content

Commit

Permalink
✨Autoscaling: EBS-backed buffer, label EC2 machines with prepulled im…
Browse files Browse the repository at this point in the history
…ages list (#6097)
  • Loading branch information
sanderegg authored Jul 31, 2024
1 parent e97b4e5 commit 56764b3
Show file tree
Hide file tree
Showing 8 changed files with 602 additions and 270 deletions.
28 changes: 26 additions & 2 deletions packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import base64
from typing import Sequence

from models_library.docker import DockerGenericTag
from models_library.utils.json_serialization import json_dumps
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef
from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef


async def assert_autoscaled_computational_ec2_instances(
Expand Down Expand Up @@ -63,6 +65,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances(
expected_instance_type: InstanceTypeType,
expected_instance_state: InstanceStateNameType,
expected_additional_tag_keys: list[str],
expected_pre_pulled_images: list[DockerGenericTag] | None,
instance_filters: Sequence[FilterTypeDef] | None,
) -> list[InstanceTypeDef]:
return await assert_ec2_instances(
Expand All @@ -77,6 +80,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances(
"io.simcore.autoscaling.buffer_machine",
*expected_additional_tag_keys,
],
expected_pre_pulled_images=expected_pre_pulled_images,
expected_user_data=[],
instance_filters=instance_filters,
)
Expand All @@ -91,6 +95,7 @@ async def assert_ec2_instances(
expected_instance_state: InstanceStateNameType,
expected_instance_tag_keys: list[str],
expected_user_data: list[str],
expected_pre_pulled_images: list[DockerGenericTag] | None = None,
instance_filters: Sequence[FilterTypeDef] | None = None,
) -> list[InstanceTypeDef]:
list_instances: list[InstanceTypeDef] = []
Expand All @@ -112,9 +117,28 @@ async def assert_ec2_instances(
"Name",
}
instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag}

assert instance_tag_keys == expected_tag_keys

if expected_pre_pulled_images is None:
assert (
"io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys
)
else:
assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys

def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool:
assert "Key" in ec2_tag
return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images"

instance_pre_pulled_images_aws_tag = next(
iter(filter(_by_pre_pull_image, instance["Tags"]))
)
assert "Value" in instance_pre_pulled_images_aws_tag
assert (
instance_pre_pulled_images_aws_tag["Value"]
== f"{json_dumps(expected_pre_pulled_images)}"
)

assert "PrivateDnsName" in instance
instance_private_dns_name = instance["PrivateDnsName"]
assert instance_private_dns_name.endswith(".ec2.internal")
Expand Down
16 changes: 13 additions & 3 deletions services/autoscaling/src/simcore_service_autoscaling/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class NonAssociatedInstance(_BaseInstance):


@dataclass(frozen=True, kw_only=True, slots=True)
class Cluster:
class Cluster: # pylint: disable=too-many-instance-attributes
active_nodes: list[AssociatedInstance] = field(
metadata={
"description": "This is a EC2-backed docker node which is active and ready to receive tasks (or with running tasks)"
Expand Down Expand Up @@ -83,6 +83,11 @@ class Cluster:
"description": "This is an existing EC2 instance that never properly joined the cluster and is deemed as broken and will be terminated"
}
)
buffer_ec2s: list[NonAssociatedInstance] = field(
metadata={
"description": "This is a prepared stopped EC2 instance, not yet associated to a docker node, ready to be used"
}
)
disconnected_nodes: list[Node] = field(
metadata={
"description": "This is a docker node which is not backed by a running EC2 instance"
Expand Down Expand Up @@ -126,8 +131,9 @@ def _get_instance_ids(
f"pending-nodes: count={len(self.pending_nodes)} {_get_instance_ids(self.pending_nodes)}, "
f"drained-nodes: count={len(self.drained_nodes)} {_get_instance_ids(self.drained_nodes)}, "
f"reserve-drained-nodes: count={len(self.reserve_drained_nodes)} {_get_instance_ids(self.reserve_drained_nodes)}, "
f"pending-ec2-instances: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, "
f"broken-ec2-instances: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, "
f"pending-ec2s: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, "
f"broken-ec2s: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, "
f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, "
f"disconnected-nodes: count={len(self.disconnected_nodes)}, "
f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, "
)
Expand Down Expand Up @@ -177,6 +183,10 @@ def _sort_by_readyness(
else:
yield from order

def pre_pulled_instances(self) -> set[EC2InstanceData]:
"""returns all the instances that completed image pre pulling"""
return self.ready_instances.union(self.stopping_instances)

def all_instances(self) -> set[EC2InstanceData]:
"""sorted by importance: READY (stopped) > STOPPING >"""
gen = self._sort_by_readyness()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
node_host_name_from_ec2_private_dns,
sort_drained_nodes,
)
from ..utils.buffer_machines_pool_core import get_buffer_ec2_tags
from ..utils.rabbitmq import post_autoscaling_status_message
from .auto_scaling_mode_base import BaseAutoscaling
from .docker import get_docker_client
Expand Down Expand Up @@ -79,6 +80,12 @@ async def _analyze_current_cluster(
state_names=["terminated"],
)

buffer_ec2_instances = await get_ec2_client(app).get_instances(
key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME],
tags=get_buffer_ec2_tags(app, auto_scaling_mode),
state_names=["stopped"],
)

attached_ec2s, pending_ec2s = await associate_ec2_instances_with_nodes(
docker_nodes, existing_ec2_instances
)
Expand Down Expand Up @@ -134,6 +141,9 @@ async def _analyze_current_cluster(
reserve_drained_nodes=reserve_drained_nodes,
pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s],
broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s],
buffer_ec2s=[
NonAssociatedInstance(ec2_instance=i) for i in buffer_ec2_instances
],
terminating_nodes=terminating_nodes,
terminated_instances=terminated_ec2_instances,
disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,35 @@
EC2InstanceConfig,
EC2InstanceData,
EC2InstanceType,
EC2Tags,
Resources,
)
from fastapi import FastAPI
from models_library.utils.json_serialization import json_dumps, json_loads
from pydantic import NonNegativeInt, parse_obj_as
from servicelib.logging_utils import log_context
from types_aiobotocore_ec2.literals import InstanceTypeType

from ..core.settings import get_application_settings
from ..models import BufferPool, BufferPoolManager
from ..utils.auto_scaling_core import ec2_buffer_startup_script
from ..utils.buffer_machines_pool_core import get_buffer_ec2_tags
from .auto_scaling_mode_base import BaseAutoscaling
from .ec2 import get_ec2_client
from .ssm import get_ssm_client

_BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.buffer_machine"
)
_BUFFER_MACHINE_EC2_TAGS: EC2Tags = {
_BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true")
}
_BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "pulling"
)
_BUFFER_MACHINE_PULLING_COMMAND_ID_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "ssm-command-id"
)
_PREPULL_COMMAND_NAME: Final[str] = "docker images pulling"

_logger = logging.getLogger(__name__)
_PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as(
AWSTagKey, "io.simcore.autoscaling.pre_pulled_images"
)


def _get_buffer_ec2_tags(app: FastAPI, auto_scaling_mode: BaseAutoscaling) -> EC2Tags:
base_ec2_tags = auto_scaling_mode.get_ec2_tags(app) | _BUFFER_MACHINE_EC2_TAGS
base_ec2_tags[AWSTagKey("Name")] = AWSTagValue(
f"{base_ec2_tags[AWSTagKey('Name')]}-buffer"
)
return base_ec2_tags
_logger = logging.getLogger(__name__)


async def _analyse_current_state(
Expand All @@ -73,7 +64,7 @@ async def _analyse_current_state(

all_buffer_instances = await ec2_client.get_instances(
key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME],
tags=_get_buffer_ec2_tags(app, auto_scaling_mode),
tags=get_buffer_ec2_tags(app, auto_scaling_mode),
state_names=["stopped", "pending", "running", "stopping"],
)

Expand Down Expand Up @@ -152,6 +143,41 @@ async def _terminate_unneeded_pools(
return buffers_manager


async def _terminate_instances_with_invalid_pre_pulled_images(
app: FastAPI, buffers_manager: BufferPoolManager
) -> BufferPoolManager:
ec2_client = get_ec2_client(app)
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
terminateable_instances = set()
for (
ec2_type,
ec2_boot_config,
) in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.items():
instance_type = cast(InstanceTypeType, ec2_type)
all_pre_pulled_instances = buffers_manager.buffer_pools[
instance_type
].pre_pulled_instances()

for instance in all_pre_pulled_instances:
if (
pre_pulled_images := json_loads(
instance.tags.get(_PRE_PULLED_IMAGES_EC2_TAG_KEY, "[]")
)
) and pre_pulled_images != ec2_boot_config.pre_pull_images:
_logger.info(
"%s",
f"{instance.id=} has invalid {pre_pulled_images=}, expected is {ec2_boot_config.pre_pull_images=}",
)
terminateable_instances.add(instance)

if terminateable_instances:
await ec2_client.terminate_instances(terminateable_instances)
for instance in terminateable_instances:
buffers_manager.buffer_pools[instance.type].remove_instance(instance)
return buffers_manager


async def _add_remove_buffer_instances(
app: FastAPI,
buffers_manager: BufferPoolManager,
Expand All @@ -162,6 +188,7 @@ async def _add_remove_buffer_instances(
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

# let's find what is missing and what is not needed
missing_instances: dict[InstanceTypeType, NonNegativeInt] = defaultdict(int)
unneeded_instances: set[EC2InstanceData] = set()
for (
Expand All @@ -179,6 +206,7 @@ async def _add_remove_buffer_instances(
list(all_pool_instances)[ec2_boot_config.buffer_count :]
)
unneeded_instances = unneeded_instances.union(terminateable_instances)

for ec2_type, num_to_start in missing_instances.items():
ec2_boot_specific = (
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ec2_type]
Expand All @@ -189,7 +217,7 @@ async def _add_remove_buffer_instances(
name=ec2_type,
resources=Resources.create_as_empty(), # fake resources
),
tags=_get_buffer_ec2_tags(app, auto_scaling_mode),
tags=get_buffer_ec2_tags(app, auto_scaling_mode),
startup_script=ec2_buffer_startup_script(
ec2_boot_specific, app_settings
),
Expand Down Expand Up @@ -218,7 +246,7 @@ async def _add_remove_buffer_instances(


async def _handle_pool_image_pulling(
app: FastAPI, pool: BufferPool
app: FastAPI, instance_type: InstanceTypeType, pool: BufferPool
) -> tuple[InstancesToStop, InstancesToTerminate]:
ec2_client = get_ec2_client(app)
ssm_client = get_ssm_client(app)
Expand Down Expand Up @@ -261,44 +289,35 @@ async def _handle_pool_image_pulling(
f"{ssm_command.status}: {ssm_command.message}",
)
broken_instances_to_terminate.add(instance)
if instances_to_stop:
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
await ec2_client.set_instances_tags(
tuple(instances_to_stop),
tags={
_PRE_PULLED_IMAGES_EC2_TAG_KEY: AWSTagValue(
json_dumps(
app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[
instance_type
].pre_pull_images
)
)
},
)
return instances_to_stop, broken_instances_to_terminate


async def monitor_buffer_machines(
app: FastAPI, *, auto_scaling_mode: BaseAutoscaling
async def _handle_image_pre_pulling(
app: FastAPI, buffers_manager: BufferPoolManager
) -> None:
"""Buffer machine creation works like so:
1. a EC2 is created with an EBS attached volume wO auto prepulling and wO auto connect to swarm
2. once running, a AWS SSM task is started to pull the necessary images in a controlled way
3. once the task is completed, the EC2 is stopped and is made available as a buffer EC2
4. once needed the buffer machine is started, and as it is up a SSM task is sent to connect to the swarm,
5. the usual then happens
"""
ec2_client = get_ec2_client(app)
app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

# 1. Analyze the current state by type
buffers_manager = await _analyse_current_state(
app, auto_scaling_mode=auto_scaling_mode
)
# 2. Terminate unneded warm pools (e.g. if the user changed the allowed instance types)
buffers_manager = await _terminate_unneeded_pools(app, buffers_manager)

# 3 add/remove buffer instances if needed based on needed buffer counts
buffers_manager = await _add_remove_buffer_instances(
app, buffers_manager, auto_scaling_mode=auto_scaling_mode
)

# 4. pull docker images if needed
instances_to_stop: set[EC2InstanceData] = set()
broken_instances_to_terminate: set[EC2InstanceData] = set()
for pool in buffers_manager.buffer_pools.values():
for instance_type, pool in buffers_manager.buffer_pools.items():
(
pool_instances_to_stop,
pool_instances_to_terminate,
) = await _handle_pool_image_pulling(app, pool)
) = await _handle_pool_image_pulling(app, instance_type, pool)
instances_to_stop.update(pool_instances_to_stop)
broken_instances_to_terminate.update(pool_instances_to_terminate)
# 5. now stop and terminate if necessary
Expand All @@ -322,3 +341,38 @@ async def monitor_buffer_machines(
_logger, logging.WARNING, "broken buffer instances, terminating them"
):
await ec2_client.terminate_instances(broken_instances_to_terminate)


async def monitor_buffer_machines(
app: FastAPI, *, auto_scaling_mode: BaseAutoscaling
) -> None:
"""Buffer machine creation works like so:
1. a EC2 is created with an EBS attached volume wO auto prepulling and wO auto connect to swarm
2. once running, a AWS SSM task is started to pull the necessary images in a controlled way
3. once the task is completed, the EC2 is stopped and is made available as a buffer EC2
4. once needed the buffer machine is started, and as it is up a SSM task is sent to connect to the swarm,
5. the usual then happens
"""

app_settings = get_application_settings(app)
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec
assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec

# 1. Analyze the current state by type
buffers_manager = await _analyse_current_state(
app, auto_scaling_mode=auto_scaling_mode
)
# 2. Terminate unneeded warm pools (e.g. if the user changed the allowed instance types)
buffers_manager = await _terminate_unneeded_pools(app, buffers_manager)

buffers_manager = await _terminate_instances_with_invalid_pre_pulled_images(
app, buffers_manager
)

# 3. add/remove buffer instances base on ec2 boot specific data
buffers_manager = await _add_remove_buffer_instances(
app, buffers_manager, auto_scaling_mode=auto_scaling_mode
)

# 4. pull docker images if needed
await _handle_image_pre_pulling(app, buffers_manager)
Loading

0 comments on commit 56764b3

Please sign in to comment.