From 56764b36f489efea491da556991404c8c18223a9 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:36:52 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8Autoscaling:=20EBS-backed=20buffer,=20?= =?UTF-8?q?label=20EC2=20machines=20with=20prepulled=20images=20list=20(#6?= =?UTF-8?q?097)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/pytest_simcore/helpers/aws_ec2.py | 28 +- .../src/simcore_service_autoscaling/models.py | 16 +- .../modules/auto_scaling_core.py | 10 + .../modules/buffer_machines_pool_core.py | 146 ++++-- .../utils/buffer_machines_pool_core.py | 22 + services/autoscaling/tests/unit/conftest.py | 1 + .../unit/test_modules_auto_scaling_dynamic.py | 232 ++++++---- .../unit/test_modules_buffer_machine_core.py | 417 ++++++++++++------ 8 files changed, 602 insertions(+), 270 deletions(-) create mode 100644 services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py index 8095a1df19e..5dc41b9c05f 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/aws_ec2.py @@ -1,9 +1,11 @@ import base64 from typing import Sequence +from models_library.docker import DockerGenericTag +from models_library.utils.json_serialization import json_dumps from types_aiobotocore_ec2 import EC2Client from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType -from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef +from types_aiobotocore_ec2.type_defs import FilterTypeDef, InstanceTypeDef, TagTypeDef async def assert_autoscaled_computational_ec2_instances( @@ -63,6 +65,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances( expected_instance_type: InstanceTypeType, expected_instance_state: InstanceStateNameType, expected_additional_tag_keys: list[str], + expected_pre_pulled_images: list[DockerGenericTag] | None, instance_filters: Sequence[FilterTypeDef] | None, ) -> list[InstanceTypeDef]: return await assert_ec2_instances( @@ -77,6 +80,7 @@ async def assert_autoscaled_dynamic_warm_pools_ec2_instances( "io.simcore.autoscaling.buffer_machine", *expected_additional_tag_keys, ], + expected_pre_pulled_images=expected_pre_pulled_images, expected_user_data=[], instance_filters=instance_filters, ) @@ -91,6 +95,7 @@ async def assert_ec2_instances( expected_instance_state: InstanceStateNameType, expected_instance_tag_keys: list[str], expected_user_data: list[str], + expected_pre_pulled_images: list[DockerGenericTag] | None = None, instance_filters: Sequence[FilterTypeDef] | None = None, ) -> list[InstanceTypeDef]: list_instances: list[InstanceTypeDef] = [] @@ -112,9 +117,28 @@ async def assert_ec2_instances( "Name", } instance_tag_keys = {tag["Key"] for tag in instance["Tags"] if "Key" in tag} - assert instance_tag_keys == expected_tag_keys + if expected_pre_pulled_images is None: + assert ( + "io.simcore.autoscaling.pre_pulled_images" not in instance_tag_keys + ) + else: + assert "io.simcore.autoscaling.pre_pulled_images" in instance_tag_keys + + def _by_pre_pull_image(ec2_tag: TagTypeDef) -> bool: + assert "Key" in ec2_tag + return ec2_tag["Key"] == "io.simcore.autoscaling.pre_pulled_images" + + instance_pre_pulled_images_aws_tag = next( + iter(filter(_by_pre_pull_image, instance["Tags"])) + ) + assert "Value" in instance_pre_pulled_images_aws_tag + assert ( + instance_pre_pulled_images_aws_tag["Value"] + == f"{json_dumps(expected_pre_pulled_images)}" + ) + assert "PrivateDnsName" in instance instance_private_dns_name = instance["PrivateDnsName"] assert instance_private_dns_name.endswith(".ec2.internal") diff --git a/services/autoscaling/src/simcore_service_autoscaling/models.py b/services/autoscaling/src/simcore_service_autoscaling/models.py index f1e88c0fd03..7ebecd51459 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/models.py +++ b/services/autoscaling/src/simcore_service_autoscaling/models.py @@ -52,7 +52,7 @@ class NonAssociatedInstance(_BaseInstance): @dataclass(frozen=True, kw_only=True, slots=True) -class Cluster: +class Cluster: # pylint: disable=too-many-instance-attributes active_nodes: list[AssociatedInstance] = field( metadata={ "description": "This is a EC2-backed docker node which is active and ready to receive tasks (or with running tasks)" @@ -83,6 +83,11 @@ class Cluster: "description": "This is an existing EC2 instance that never properly joined the cluster and is deemed as broken and will be terminated" } ) + buffer_ec2s: list[NonAssociatedInstance] = field( + metadata={ + "description": "This is a prepared stopped EC2 instance, not yet associated to a docker node, ready to be used" + } + ) disconnected_nodes: list[Node] = field( metadata={ "description": "This is a docker node which is not backed by a running EC2 instance" @@ -126,8 +131,9 @@ def _get_instance_ids( f"pending-nodes: count={len(self.pending_nodes)} {_get_instance_ids(self.pending_nodes)}, " f"drained-nodes: count={len(self.drained_nodes)} {_get_instance_ids(self.drained_nodes)}, " f"reserve-drained-nodes: count={len(self.reserve_drained_nodes)} {_get_instance_ids(self.reserve_drained_nodes)}, " - f"pending-ec2-instances: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, " - f"broken-ec2-instances: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, " + f"pending-ec2s: count={len(self.pending_ec2s)} {_get_instance_ids(self.pending_ec2s)}, " + f"broken-ec2s: count={len(self.broken_ec2s)} {_get_instance_ids(self.broken_ec2s)}, " + f"buffer-ec2s: count={len(self.buffer_ec2s)} {_get_instance_ids(self.buffer_ec2s)}, " f"disconnected-nodes: count={len(self.disconnected_nodes)}, " f"terminating-nodes: count={len(self.terminating_nodes)} {_get_instance_ids(self.terminating_nodes)}, " ) @@ -177,6 +183,10 @@ def _sort_by_readyness( else: yield from order + def pre_pulled_instances(self) -> set[EC2InstanceData]: + """returns all the instances that completed image pre pulling""" + return self.ready_instances.union(self.stopping_instances) + def all_instances(self) -> set[EC2InstanceData]: """sorted by importance: READY (stopped) > STOPPING >""" gen = self._sort_by_readyness() diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 2ace2e1d4a9..4063e27c314 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -42,6 +42,7 @@ node_host_name_from_ec2_private_dns, sort_drained_nodes, ) +from ..utils.buffer_machines_pool_core import get_buffer_ec2_tags from ..utils.rabbitmq import post_autoscaling_status_message from .auto_scaling_mode_base import BaseAutoscaling from .docker import get_docker_client @@ -79,6 +80,12 @@ async def _analyze_current_cluster( state_names=["terminated"], ) + buffer_ec2_instances = await get_ec2_client(app).get_instances( + key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], + tags=get_buffer_ec2_tags(app, auto_scaling_mode), + state_names=["stopped"], + ) + attached_ec2s, pending_ec2s = await associate_ec2_instances_with_nodes( docker_nodes, existing_ec2_instances ) @@ -134,6 +141,9 @@ async def _analyze_current_cluster( reserve_drained_nodes=reserve_drained_nodes, pending_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in pending_ec2s], broken_ec2s=[NonAssociatedInstance(ec2_instance=i) for i in broken_ec2s], + buffer_ec2s=[ + NonAssociatedInstance(ec2_instance=i) for i in buffer_ec2_instances + ], terminating_nodes=terminating_nodes, terminated_instances=terminated_ec2_instances, disconnected_nodes=[n for n in docker_nodes if _node_not_ready(n)], diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py index b42585b5a29..45acd57853e 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/buffer_machines_pool_core.py @@ -24,10 +24,10 @@ EC2InstanceConfig, EC2InstanceData, EC2InstanceType, - EC2Tags, Resources, ) from fastapi import FastAPI +from models_library.utils.json_serialization import json_dumps, json_loads from pydantic import NonNegativeInt, parse_obj_as from servicelib.logging_utils import log_context from types_aiobotocore_ec2.literals import InstanceTypeType @@ -35,16 +35,11 @@ from ..core.settings import get_application_settings from ..models import BufferPool, BufferPoolManager from ..utils.auto_scaling_core import ec2_buffer_startup_script +from ..utils.buffer_machines_pool_core import get_buffer_ec2_tags from .auto_scaling_mode_base import BaseAutoscaling from .ec2 import get_ec2_client from .ssm import get_ssm_client -_BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as( - AWSTagKey, "io.simcore.autoscaling.buffer_machine" -) -_BUFFER_MACHINE_EC2_TAGS: EC2Tags = { - _BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true") -} _BUFFER_MACHINE_PULLING_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as( AWSTagKey, "pulling" ) @@ -52,16 +47,12 @@ AWSTagKey, "ssm-command-id" ) _PREPULL_COMMAND_NAME: Final[str] = "docker images pulling" - -_logger = logging.getLogger(__name__) +_PRE_PULLED_IMAGES_EC2_TAG_KEY: Final[AWSTagKey] = parse_obj_as( + AWSTagKey, "io.simcore.autoscaling.pre_pulled_images" +) -def _get_buffer_ec2_tags(app: FastAPI, auto_scaling_mode: BaseAutoscaling) -> EC2Tags: - base_ec2_tags = auto_scaling_mode.get_ec2_tags(app) | _BUFFER_MACHINE_EC2_TAGS - base_ec2_tags[AWSTagKey("Name")] = AWSTagValue( - f"{base_ec2_tags[AWSTagKey('Name')]}-buffer" - ) - return base_ec2_tags +_logger = logging.getLogger(__name__) async def _analyse_current_state( @@ -73,7 +64,7 @@ async def _analyse_current_state( all_buffer_instances = await ec2_client.get_instances( key_names=[app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_KEY_NAME], - tags=_get_buffer_ec2_tags(app, auto_scaling_mode), + tags=get_buffer_ec2_tags(app, auto_scaling_mode), state_names=["stopped", "pending", "running", "stopping"], ) @@ -152,6 +143,41 @@ async def _terminate_unneeded_pools( return buffers_manager +async def _terminate_instances_with_invalid_pre_pulled_images( + app: FastAPI, buffers_manager: BufferPoolManager +) -> BufferPoolManager: + ec2_client = get_ec2_client(app) + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + terminateable_instances = set() + for ( + ec2_type, + ec2_boot_config, + ) in app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES.items(): + instance_type = cast(InstanceTypeType, ec2_type) + all_pre_pulled_instances = buffers_manager.buffer_pools[ + instance_type + ].pre_pulled_instances() + + for instance in all_pre_pulled_instances: + if ( + pre_pulled_images := json_loads( + instance.tags.get(_PRE_PULLED_IMAGES_EC2_TAG_KEY, "[]") + ) + ) and pre_pulled_images != ec2_boot_config.pre_pull_images: + _logger.info( + "%s", + f"{instance.id=} has invalid {pre_pulled_images=}, expected is {ec2_boot_config.pre_pull_images=}", + ) + terminateable_instances.add(instance) + + if terminateable_instances: + await ec2_client.terminate_instances(terminateable_instances) + for instance in terminateable_instances: + buffers_manager.buffer_pools[instance.type].remove_instance(instance) + return buffers_manager + + async def _add_remove_buffer_instances( app: FastAPI, buffers_manager: BufferPoolManager, @@ -162,6 +188,7 @@ async def _add_remove_buffer_instances( app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + # let's find what is missing and what is not needed missing_instances: dict[InstanceTypeType, NonNegativeInt] = defaultdict(int) unneeded_instances: set[EC2InstanceData] = set() for ( @@ -179,6 +206,7 @@ async def _add_remove_buffer_instances( list(all_pool_instances)[ec2_boot_config.buffer_count :] ) unneeded_instances = unneeded_instances.union(terminateable_instances) + for ec2_type, num_to_start in missing_instances.items(): ec2_boot_specific = ( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ec2_type] @@ -189,7 +217,7 @@ async def _add_remove_buffer_instances( name=ec2_type, resources=Resources.create_as_empty(), # fake resources ), - tags=_get_buffer_ec2_tags(app, auto_scaling_mode), + tags=get_buffer_ec2_tags(app, auto_scaling_mode), startup_script=ec2_buffer_startup_script( ec2_boot_specific, app_settings ), @@ -218,7 +246,7 @@ async def _add_remove_buffer_instances( async def _handle_pool_image_pulling( - app: FastAPI, pool: BufferPool + app: FastAPI, instance_type: InstanceTypeType, pool: BufferPool ) -> tuple[InstancesToStop, InstancesToTerminate]: ec2_client = get_ec2_client(app) ssm_client = get_ssm_client(app) @@ -261,44 +289,35 @@ async def _handle_pool_image_pulling( f"{ssm_command.status}: {ssm_command.message}", ) broken_instances_to_terminate.add(instance) + if instances_to_stop: + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + await ec2_client.set_instances_tags( + tuple(instances_to_stop), + tags={ + _PRE_PULLED_IMAGES_EC2_TAG_KEY: AWSTagValue( + json_dumps( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_ALLOWED_TYPES[ + instance_type + ].pre_pull_images + ) + ) + }, + ) return instances_to_stop, broken_instances_to_terminate -async def monitor_buffer_machines( - app: FastAPI, *, auto_scaling_mode: BaseAutoscaling +async def _handle_image_pre_pulling( + app: FastAPI, buffers_manager: BufferPoolManager ) -> None: - """Buffer machine creation works like so: - 1. a EC2 is created with an EBS attached volume wO auto prepulling and wO auto connect to swarm - 2. once running, a AWS SSM task is started to pull the necessary images in a controlled way - 3. once the task is completed, the EC2 is stopped and is made available as a buffer EC2 - 4. once needed the buffer machine is started, and as it is up a SSM task is sent to connect to the swarm, - 5. the usual then happens - """ ec2_client = get_ec2_client(app) - app_settings = get_application_settings(app) - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - - # 1. Analyze the current state by type - buffers_manager = await _analyse_current_state( - app, auto_scaling_mode=auto_scaling_mode - ) - # 2. Terminate unneded warm pools (e.g. if the user changed the allowed instance types) - buffers_manager = await _terminate_unneeded_pools(app, buffers_manager) - - # 3 add/remove buffer instances if needed based on needed buffer counts - buffers_manager = await _add_remove_buffer_instances( - app, buffers_manager, auto_scaling_mode=auto_scaling_mode - ) - - # 4. pull docker images if needed instances_to_stop: set[EC2InstanceData] = set() broken_instances_to_terminate: set[EC2InstanceData] = set() - for pool in buffers_manager.buffer_pools.values(): + for instance_type, pool in buffers_manager.buffer_pools.items(): ( pool_instances_to_stop, pool_instances_to_terminate, - ) = await _handle_pool_image_pulling(app, pool) + ) = await _handle_pool_image_pulling(app, instance_type, pool) instances_to_stop.update(pool_instances_to_stop) broken_instances_to_terminate.update(pool_instances_to_terminate) # 5. now stop and terminate if necessary @@ -322,3 +341,38 @@ async def monitor_buffer_machines( _logger, logging.WARNING, "broken buffer instances, terminating them" ): await ec2_client.terminate_instances(broken_instances_to_terminate) + + +async def monitor_buffer_machines( + app: FastAPI, *, auto_scaling_mode: BaseAutoscaling +) -> None: + """Buffer machine creation works like so: + 1. a EC2 is created with an EBS attached volume wO auto prepulling and wO auto connect to swarm + 2. once running, a AWS SSM task is started to pull the necessary images in a controlled way + 3. once the task is completed, the EC2 is stopped and is made available as a buffer EC2 + 4. once needed the buffer machine is started, and as it is up a SSM task is sent to connect to the swarm, + 5. the usual then happens + """ + + app_settings = get_application_settings(app) + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec + + # 1. Analyze the current state by type + buffers_manager = await _analyse_current_state( + app, auto_scaling_mode=auto_scaling_mode + ) + # 2. Terminate unneeded warm pools (e.g. if the user changed the allowed instance types) + buffers_manager = await _terminate_unneeded_pools(app, buffers_manager) + + buffers_manager = await _terminate_instances_with_invalid_pre_pulled_images( + app, buffers_manager + ) + + # 3. add/remove buffer instances base on ec2 boot specific data + buffers_manager = await _add_remove_buffer_instances( + app, buffers_manager, auto_scaling_mode=auto_scaling_mode + ) + + # 4. pull docker images if needed + await _handle_image_pre_pulling(app, buffers_manager) diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py b/services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py new file mode 100644 index 00000000000..0fdb54783b7 --- /dev/null +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/buffer_machines_pool_core.py @@ -0,0 +1,22 @@ +from typing import Final + +from aws_library.ec2.models import AWSTagKey, AWSTagValue, EC2Tags +from fastapi import FastAPI +from pydantic import parse_obj_as + +from ..modules.auto_scaling_mode_base import BaseAutoscaling + +_BUFFER_MACHINE_TAG_KEY: Final[AWSTagKey] = parse_obj_as( + AWSTagKey, "io.simcore.autoscaling.buffer_machine" +) +_BUFFER_MACHINE_EC2_TAGS: EC2Tags = { + _BUFFER_MACHINE_TAG_KEY: parse_obj_as(AWSTagValue, "true") +} + + +def get_buffer_ec2_tags(app: FastAPI, auto_scaling_mode: BaseAutoscaling) -> EC2Tags: + base_ec2_tags = auto_scaling_mode.get_ec2_tags(app) | _BUFFER_MACHINE_EC2_TAGS + base_ec2_tags[AWSTagKey("Name")] = AWSTagValue( + f"{base_ec2_tags[AWSTagKey('Name')]}-buffer" + ) + return base_ec2_tags diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index cb6cc7f1574..51a2c8f27dd 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -735,6 +735,7 @@ def _creator(**cluter_overrides) -> Cluster: reserve_drained_nodes=[], pending_ec2s=[], broken_ec2s=[], + buffer_ec2s=[], disconnected_nodes=[], terminating_nodes=[], terminated_instances=[], diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 816d48f5520..516172e03b2 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -59,7 +59,6 @@ ) from types_aiobotocore_ec2.client import EC2Client from types_aiobotocore_ec2.literals import InstanceTypeType -from types_aiobotocore_ec2.type_defs import InstanceTypeDef @pytest.fixture @@ -349,33 +348,19 @@ async def test_cluster_scaling_with_service_asking_for_too_much_resources_starts ) -@pytest.mark.acceptance_test() -@pytest.mark.parametrize( - "docker_service_imposed_ec2_type, docker_service_ram, expected_ec2_type", - [ - pytest.param( - None, - parse_obj_as(ByteSize, "128Gib"), - "r5n.4xlarge", - id="No explicit instance defined", - ), - pytest.param( - "t2.xlarge", - parse_obj_as(ByteSize, "4Gib"), - "t2.xlarge", - id="Explicitely ask for t2.xlarge", - ), - pytest.param( - "r5n.8xlarge", - parse_obj_as(ByteSize, "128Gib"), - "r5n.8xlarge", - id="Explicitely ask for r5n.8xlarge", - ), - ], -) -async def test_cluster_scaling_up_and_down( # noqa: PLR0915 - minimal_configuration: None, +@dataclass(frozen=True) +class _ScaleUpParams: + imposed_instance_type: str | None + service_resources: Resources + num_services: int + expected_instance_type: InstanceTypeType + expected_num_instances: int + + +async def _test_cluster_scaling_up_and_down( # noqa: PLR0915 + *, service_monitored_labels: dict[DockerLabelKey, str], + osparc_docker_label_keys: StandardSimcoreDockerLabels, app_settings: ApplicationSettings, initialized_app: FastAPI, create_service: Callable[ @@ -391,29 +376,41 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_set_node_availability: mock.Mock, mock_compute_node_used_resources: mock.Mock, mocker: MockerFixture, - docker_service_imposed_ec2_type: InstanceTypeType | None, - docker_service_ram: ByteSize, - expected_ec2_type: InstanceTypeType, async_docker_client: aiodocker.Docker, with_drain_nodes_labelled: bool, ec2_instance_custom_tags: dict[str, str], + scale_up_params: _ScaleUpParams, ): # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] - # create a service - docker_service = await create_service( - task_template | create_task_reservations(4, docker_service_ram), - service_monitored_labels, - "pending", - ( - [ - f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={ docker_service_imposed_ec2_type}" - ] - if docker_service_imposed_ec2_type - else [] - ), + assert ( + scale_up_params.expected_num_instances == 1 + ), "This test is not made to work with more than 1 expected instance. so please adapt if needed" + + # create the service(s) + created_docker_services = await asyncio.gather( + *( + create_service( + task_template + | create_task_reservations( + int(scale_up_params.service_resources.cpus), + scale_up_params.service_resources.ram, + ), + service_monitored_labels + | osparc_docker_label_keys.to_simcore_runtime_docker_labels(), + "pending", + ( + [ + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" + ] + if scale_up_params.imposed_instance_type + else [] + ), + ) + for _ in range(scale_up_params.num_services) + ) ) # this should trigger a scaling up as we have no nodes @@ -425,8 +422,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -442,7 +439,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 app_settings, initialized_app, instances_running=0, - instances_pending=1, + instances_pending=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() @@ -464,7 +461,9 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NODE_LABELS + app_settings.AUTOSCALING_NODES_MONITORING.NODES_MONITORING_NEW_NODES_LABELS ) - } | {DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type} + } | { + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: scale_up_params.expected_instance_type + } fake_attached_node.Spec.Labels |= expected_docker_node_tags | { _OSPARC_SERVICE_READY_LABEL_KEY: "false" } @@ -531,15 +530,15 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_set_node_availability.assert_not_called() # check the number of instances did not change and is still running - instances: list[InstanceTypeDef] = await assert_autoscaled_dynamic_ec2_instances( + instances = await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) - assert len(instances) == 1 + assert len(instances) == scale_up_params.expected_num_instances assert "PrivateDnsName" in instances[0] internal_dns_name = instances[0]["PrivateDnsName"].removesuffix(".ec2.internal") @@ -551,8 +550,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_rabbitmq_post_message, app_settings, initialized_app, - nodes_total=1, - nodes_active=1, + nodes_total=scale_up_params.expected_num_instances, + nodes_active=scale_up_params.expected_num_instances, cluster_total_resources={ "cpus": fake_attached_node.Description.Resources.NanoCPUs / 1e9, "ram": fake_attached_node.Description.Resources.MemoryBytes, @@ -561,7 +560,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 "cpus": float(0), "ram": 0, }, - instances_running=1, + instances_running=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() @@ -597,8 +596,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -611,16 +610,21 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # # 4. now scaling down by removing the docker service # - assert docker_service.ID - await async_docker_client.services.delete(docker_service.ID) - # + await asyncio.gather( + *( + async_docker_client.services.delete(d.ID) + for d in created_docker_services + if d.ID + ) + ) + await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) # check the number of instances did not change and is still running await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -689,8 +693,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -719,8 +723,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -737,8 +741,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -770,20 +774,98 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="terminated", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) -@dataclass(frozen=True) -class _ScaleUpParams: - imposed_instance_type: str | None - service_resources: Resources - num_services: int - expected_instance_type: str - expected_num_instances: int +@pytest.mark.acceptance_test() +@pytest.mark.parametrize( + "scale_up_params", + [ + pytest.param( + _ScaleUpParams( + imposed_instance_type=None, + service_resources=Resources( + cpus=4, ram=parse_obj_as(ByteSize, "128Gib") + ), + num_services=1, + expected_instance_type="r5n.4xlarge", + expected_num_instances=1, + ), + id="No explicit instance defined", + ), + pytest.param( + _ScaleUpParams( + imposed_instance_type="t2.xlarge", + service_resources=Resources(cpus=4, ram=parse_obj_as(ByteSize, "4Gib")), + num_services=1, + expected_instance_type="t2.xlarge", + expected_num_instances=1, + ), + id="Explicitely ask for t2.xlarge", + ), + pytest.param( + _ScaleUpParams( + imposed_instance_type="r5n.8xlarge", + service_resources=Resources( + cpus=4, ram=parse_obj_as(ByteSize, "128Gib") + ), + num_services=1, + expected_instance_type="r5n.8xlarge", + expected_num_instances=1, + ), + id="Explicitely ask for r5n.8xlarge", + ), + ], +) +async def test_cluster_scaling_up_and_down( + minimal_configuration: None, + service_monitored_labels: dict[DockerLabelKey, str], + osparc_docker_label_keys: StandardSimcoreDockerLabels, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + create_service: Callable[ + [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] + ], + task_template: dict[str, Any], + create_task_reservations: Callable[[int, int], dict[str, Any]], + ec2_client: EC2Client, + mock_docker_tag_node: mock.Mock, + fake_node: Node, + mock_rabbitmq_post_message: mock.Mock, + mock_find_node_with_name_returns_fake_node: mock.Mock, + mock_docker_set_node_availability: mock.Mock, + mock_compute_node_used_resources: mock.Mock, + mocker: MockerFixture, + async_docker_client: aiodocker.Docker, + with_drain_nodes_labelled: bool, + ec2_instance_custom_tags: dict[str, str], + scale_up_params: _ScaleUpParams, +): + await _test_cluster_scaling_up_and_down( + service_monitored_labels=service_monitored_labels, + osparc_docker_label_keys=osparc_docker_label_keys, + app_settings=app_settings, + initialized_app=initialized_app, + create_service=create_service, + task_template=task_template, + create_task_reservations=create_task_reservations, + ec2_client=ec2_client, + mock_docker_tag_node=mock_docker_tag_node, + fake_node=fake_node, + mock_rabbitmq_post_message=mock_rabbitmq_post_message, + mock_find_node_with_name_returns_fake_node=mock_find_node_with_name_returns_fake_node, + mock_docker_set_node_availability=mock_docker_set_node_availability, + mock_compute_node_used_resources=mock_compute_node_used_resources, + mocker=mocker, + async_docker_client=async_docker_client, + with_drain_nodes_labelled=with_drain_nodes_labelled, + ec2_instance_custom_tags=ec2_instance_custom_tags, + scale_up_params=scale_up_params, + ) @pytest.mark.parametrize( diff --git a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py b/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py index 425158e321c..87f3880a0d0 100644 --- a/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py +++ b/services/autoscaling/tests/unit/test_modules_buffer_machine_core.py @@ -9,16 +9,18 @@ import logging import random from collections.abc import Awaitable, Callable, Sequence -from typing import Any, cast +from dataclasses import dataclass +from typing import Any from unittest import mock import pytest import tenacity -from aws_library.ec2.models import EC2InstanceBootSpecific +from aws_library.ec2.models import AWSTagKey, EC2InstanceBootSpecific from faker import Faker from fastapi import FastAPI from fastapi.encoders import jsonable_encoder from models_library.docker import DockerGenericTag +from models_library.utils.json_serialization import json_dumps from pydantic import parse_obj_as from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.aws_ec2 import ( @@ -34,16 +36,19 @@ DynamicAutoscaling, ) from simcore_service_autoscaling.modules.buffer_machines_pool_core import ( - _get_buffer_ec2_tags, + _PRE_PULLED_IMAGES_EC2_TAG_KEY, monitor_buffer_machines, ) +from simcore_service_autoscaling.utils.buffer_machines_pool_core import ( + get_buffer_ec2_tags, +) from types_aiobotocore_ec2 import EC2Client -from types_aiobotocore_ec2.literals import InstanceTypeType +from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType from types_aiobotocore_ec2.type_defs import FilterTypeDef, TagTypeDef @pytest.fixture -def pre_pull_images() -> list[DockerGenericTag]: +def fake_pre_pull_images() -> list[DockerGenericTag]: return parse_obj_as( list[DockerGenericTag], [ @@ -119,6 +124,7 @@ async def test_if_send_command_is_mocked_by_moto( expected_instance_type=next(iter(ec2_instances_allowed_types)), expected_instance_state="running", expected_additional_tag_keys=[], + expected_pre_pulled_images=[], instance_filters=None, ) @@ -141,19 +147,21 @@ def mock_wait_for_has_instance_completed_cloud_init( ) -async def test_monitor_buffer_machines( - minimal_configuration: None, +async def _test_monitor_buffer_machines( + *, ec2_client: EC2Client, + instance_type_filters: Sequence[FilterTypeDef], + initialized_app: FastAPI, buffer_count: int, + pre_pulled_images: list[DockerGenericTag], ec2_instances_allowed_types: dict[InstanceTypeType, Any], - instance_type_filters: Sequence[FilterTypeDef], ec2_instance_custom_tags: dict[str, str], - mock_wait_for_has_instance_completed_cloud_init: mock.Mock | None, - initialized_app: FastAPI, ): # 0. we have no instances now all_instances = await ec2_client.describe_instances(Filters=instance_type_filters) - assert not all_instances["Reservations"] + assert not all_instances[ + "Reservations" + ], f"There should be no instances at the start of the test. Found following instance ids: {[i['InstanceId'] for r in all_instances['Reservations'] if 'Instances' in r for i in r['Instances'] if 'InstanceId' in i]}" # 1. run, this will create as many buffer machines as needed with log_context(logging.INFO, "create buffer machines"): @@ -179,7 +187,10 @@ async def _assert_buffer_machines_running() -> None: expected_num_instances=buffer_count, expected_instance_type=next(iter(ec2_instances_allowed_types)), expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=[ + *list(ec2_instance_custom_tags), + ], + expected_pre_pulled_images=None, instance_filters=instance_type_filters, ) @@ -211,13 +222,17 @@ async def _assert_run_ssm_command_for_pulling() -> None: "ssm-command-id", *list(ec2_instance_custom_tags), ], + expected_pre_pulled_images=None, instance_filters=instance_type_filters, ) - await _assert_run_ssm_command_for_pulling() + if pre_pulled_images: + await _assert_run_ssm_command_for_pulling() # 3. is the command finished? - with log_context(logging.INFO, "wait for SSM commands to finish") as ctx: + with log_context( + logging.INFO, "wait for SSM commands and the machine to be stopped to finish" + ) as ctx: @tenacity.retry( wait=tenacity.wait_fixed(5), @@ -237,29 +252,75 @@ async def _assert_wait_for_ssm_command_to_finish() -> None: expected_num_instances=buffer_count, expected_instance_type=next(iter(ec2_instances_allowed_types)), expected_instance_state="stopped", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_additional_tag_keys=[ + _PRE_PULLED_IMAGES_EC2_TAG_KEY, + *list(ec2_instance_custom_tags), + ], + expected_pre_pulled_images=pre_pulled_images, instance_filters=instance_type_filters, ) await _assert_wait_for_ssm_command_to_finish() +async def test_monitor_buffer_machines( + minimal_configuration: None, + ec2_client: EC2Client, + buffer_count: int, + pre_pull_images: list[DockerGenericTag], + ec2_instances_allowed_types: dict[InstanceTypeType, Any], + instance_type_filters: Sequence[FilterTypeDef], + ec2_instance_custom_tags: dict[str, str], + mock_wait_for_has_instance_completed_cloud_init: mock.Mock | None, + initialized_app: FastAPI, +): + await _test_monitor_buffer_machines( + ec2_client=ec2_client, + instance_type_filters=instance_type_filters, + initialized_app=initialized_app, + buffer_count=buffer_count, + pre_pulled_images=pre_pull_images, + ec2_instances_allowed_types=ec2_instances_allowed_types, + ec2_instance_custom_tags=ec2_instance_custom_tags, + ) + + @pytest.fixture async def create_buffer_machines( ec2_client: EC2Client, aws_ami_id: str, app_settings: ApplicationSettings, initialized_app: FastAPI, -) -> Callable[[int, InstanceTypeType], Awaitable[list[str]]]: - async def _do(num: int, instance_type: InstanceTypeType) -> list[str]: +) -> Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag]], + Awaitable[list[str]], +]: + async def _do( + num: int, + instance_type: InstanceTypeType, + instance_state_name: InstanceStateNameType, + pre_pull_images: list[DockerGenericTag], + ) -> list[str]: assert app_settings.AUTOSCALING_EC2_INSTANCES + assert instance_state_name in [ + "running", + "stopped", + ], "only 'running' and 'stopped' are supported for testing" + resource_tags: list[TagTypeDef] = [ {"Key": tag_key, "Value": tag_value} - for tag_key, tag_value in _get_buffer_ec2_tags( + for tag_key, tag_value in get_buffer_ec2_tags( initialized_app, DynamicAutoscaling() ).items() ] + if pre_pull_images is not None and instance_state_name == "stopped": + resource_tags.append( + { + "Key": _PRE_PULLED_IMAGES_EC2_TAG_KEY, + "Value": f"{json_dumps(pre_pull_images)}", + } + ) with log_context( logging.INFO, f"creating {num} buffer machines of {instance_type}" ): @@ -297,12 +358,42 @@ async def _do(num: int, instance_type: InstanceTypeType) -> list[str]: assert "Name" in instance["State"] assert instance["State"]["Name"] == "running" + if instance_state_name == "stopped": + await ec2_client.stop_instances(InstanceIds=instance_ids) + instances = await ec2_client.describe_instances(InstanceIds=instance_ids) + assert "Reservations" in instances + assert instances["Reservations"] + assert "Instances" in instances["Reservations"][0] + assert len(instances["Reservations"][0]["Instances"]) == num + for instance in instances["Reservations"][0]["Instances"]: + assert "State" in instance + assert "Name" in instance["State"] + assert instance["State"]["Name"] == "stopped" + return instance_ids return _do -async def test_monitor_buffer_machines_terminates_unneeded_instances( +@dataclass +class _BufferMachineParams: + instance_state_name: InstanceStateNameType + pre_pulled_images: list[DockerGenericTag] | None + tag_keys: list[AWSTagKey] + + +@pytest.mark.parametrize( + "expected_buffer_params", + [ + _BufferMachineParams("running", None, []), + _BufferMachineParams( + "stopped", + [], + [parse_obj_as(AWSTagKey, "io.simcore.autoscaling.pre_pulled_images")], + ), + ], +) +async def test_monitor_buffer_machines_terminates_supernumerary_instances( minimal_configuration: None, ec2_client: EC2Client, buffer_count: int, @@ -310,22 +401,89 @@ async def test_monitor_buffer_machines_terminates_unneeded_instances( instance_type_filters: Sequence[FilterTypeDef], ec2_instance_custom_tags: dict[str, str], initialized_app: FastAPI, - create_buffer_machines: Callable[[int, InstanceTypeType], Awaitable[list[str]]], + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag]], + Awaitable[list[str]], + ], + expected_buffer_params: _BufferMachineParams, ): # have too many machines of accepted type buffer_machines = await create_buffer_machines( - buffer_count + 5, next(iter(list(ec2_instances_allowed_types))) + buffer_count + 5, + next(iter(list(ec2_instances_allowed_types))), + expected_buffer_params.instance_state_name, + [], ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, expected_num_reservations=1, expected_num_instances=len(buffer_machines), expected_instance_type=next(iter(ec2_instances_allowed_types)), - expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_instance_state=expected_buffer_params.instance_state_name, + expected_additional_tag_keys=[ + *list(ec2_instance_custom_tags), + *expected_buffer_params.tag_keys, + ], + expected_pre_pulled_images=expected_buffer_params.pre_pulled_images, instance_filters=instance_type_filters, ) - # this will terminate the supernumerary instances + # this will terminate the supernumerary instances and start new ones + await monitor_buffer_machines( + initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=buffer_count, + expected_instance_type=next(iter(ec2_instances_allowed_types)), + expected_instance_state=expected_buffer_params.instance_state_name, + expected_additional_tag_keys=[ + *list(ec2_instance_custom_tags), + *expected_buffer_params.tag_keys, + ], + expected_pre_pulled_images=expected_buffer_params.pre_pulled_images, + instance_filters=instance_type_filters, + ) + + +async def test_monitor_buffer_machines_terminates_instances_with_incorrect_pre_pulled_images( + minimal_configuration: None, + ec2_client: EC2Client, + buffer_count: int, + pre_pull_images: list[DockerGenericTag], + ec2_instances_allowed_types: dict[InstanceTypeType, Any], + instance_type_filters: Sequence[FilterTypeDef], + ec2_instance_custom_tags: dict[str, str], + initialized_app: FastAPI, + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag]], + Awaitable[list[str]], + ], +): + # have machines of correct type with missing pre-pulled images + assert ( + len(pre_pull_images) > 1 + ), "this test relies on pre-pulled images being filled with more than 1 image" + buffer_machines = await create_buffer_machines( + buffer_count + 5, + next(iter(list(ec2_instances_allowed_types))), + "stopped", + pre_pull_images[:-1], + ) + await assert_autoscaled_dynamic_warm_pools_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=len(buffer_machines), + expected_instance_type=next(iter(ec2_instances_allowed_types)), + expected_instance_state="stopped", + expected_additional_tag_keys=[ + *list(ec2_instance_custom_tags), + "io.simcore.autoscaling.pre_pulled_images", + ], + expected_pre_pulled_images=pre_pull_images[:-1], + instance_filters=instance_type_filters, + ) + # this will terminate the wrong instances and start new ones and pre-pull the new set of images await monitor_buffer_machines( initialized_app, auto_scaling_mode=DynamicAutoscaling() ) @@ -336,6 +494,7 @@ async def test_monitor_buffer_machines_terminates_unneeded_instances( expected_instance_type=next(iter(ec2_instances_allowed_types)), expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, # NOTE: these are not pre-pulled yet, just started instance_filters=instance_type_filters, ) @@ -350,6 +509,17 @@ def unneeded_instance_type( return random_type +@pytest.mark.parametrize( + "expected_buffer_params", + [ + _BufferMachineParams("running", None, []), + _BufferMachineParams( + "stopped", + [], + [parse_obj_as(AWSTagKey, "io.simcore.autoscaling.pre_pulled_images")], + ), + ], +) async def test_monitor_buffer_machines_terminates_unneeded_pool( minimal_configuration: None, ec2_client: EC2Client, @@ -358,18 +528,28 @@ async def test_monitor_buffer_machines_terminates_unneeded_pool( instance_type_filters: Sequence[FilterTypeDef], ec2_instance_custom_tags: dict[str, str], initialized_app: FastAPI, - create_buffer_machines: Callable[[int, InstanceTypeType], Awaitable[list[str]]], + create_buffer_machines: Callable[ + [int, InstanceTypeType, InstanceStateNameType, list[DockerGenericTag]], + Awaitable[list[str]], + ], unneeded_instance_type: InstanceTypeType, + expected_buffer_params: _BufferMachineParams, ): - # have too many machines of accepted type - buffer_machines_unneeded = await create_buffer_machines(5, unneeded_instance_type) + # have machines of unneeded type + buffer_machines_unneeded = await create_buffer_machines( + 5, unneeded_instance_type, expected_buffer_params.instance_state_name, [] + ) await assert_autoscaled_dynamic_warm_pools_ec2_instances( ec2_client, expected_num_reservations=1, expected_num_instances=len(buffer_machines_unneeded), expected_instance_type=unneeded_instance_type, - expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_instance_state=expected_buffer_params.instance_state_name, + expected_additional_tag_keys=[ + *list(ec2_instance_custom_tags), + *expected_buffer_params.tag_keys, + ], + expected_pre_pulled_images=expected_buffer_params.pre_pulled_images, instance_filters=instance_type_filters, ) @@ -384,6 +564,7 @@ async def test_monitor_buffer_machines_terminates_unneeded_pool( expected_instance_type=next(iter(ec2_instances_allowed_types)), expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), + expected_pre_pulled_images=None, instance_filters=instance_type_filters, ) @@ -391,17 +572,16 @@ async def test_monitor_buffer_machines_terminates_unneeded_pool( @pytest.fixture def ec2_instances_allowed_types( faker: Faker, - pre_pull_images: list[DockerGenericTag], - buffer_count: int, + fake_pre_pull_images: list[DockerGenericTag], external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific], -) -> dict[InstanceTypeType, Any]: +) -> dict[InstanceTypeType, EC2InstanceBootSpecific]: if not external_ec2_instances_allowed_types: return { - "t2.micro": { - "ami_id": faker.pystr(), - "pre_pull_images": pre_pull_images, - "buffer_count": buffer_count, - } + "t2.micro": EC2InstanceBootSpecific( + ami_id=faker.pystr(), + pre_pull_images=fake_pre_pull_images, + buffer_count=faker.pyint(min_value=1, max_value=10), + ) } allowed_ec2_types = external_ec2_instances_allowed_types @@ -414,11 +594,14 @@ def ec2_instances_allowed_types( allowed_ec2_types.items(), ) ) + assert ( + allowed_ec2_types_with_buffer_defined + ), "one type with buffer is needed for the tests!" assert ( len(allowed_ec2_types_with_buffer_defined) == 1 ), "more than one type with buffer is disallowed in this test!" return { - cast(InstanceTypeType, k): v + parse_obj_as(InstanceTypeType, k): v for k, v in allowed_ec2_types_with_buffer_defined.items() } @@ -456,26 +639,47 @@ def external_ec2_instances_allowed_types( @pytest.fixture def buffer_count( - faker: Faker, - external_ec2_instances_allowed_types: None | dict[str, EC2InstanceBootSpecific], + ec2_instances_allowed_types: dict[InstanceTypeType, EC2InstanceBootSpecific], ) -> int: - if not external_ec2_instances_allowed_types: - return faker.pyint(min_value=1, max_value=9) + def _by_buffer_count( + instance_type_and_settings: tuple[InstanceTypeType, EC2InstanceBootSpecific] + ) -> bool: + _, boot_specific = instance_type_and_settings + return boot_specific.buffer_count > 0 - allowed_ec2_types = external_ec2_instances_allowed_types + allowed_ec2_types = ec2_instances_allowed_types allowed_ec2_types_with_buffer_defined = dict( + filter(_by_buffer_count, allowed_ec2_types.items()) + ) + assert allowed_ec2_types_with_buffer_defined, "you need one type with buffer" + assert ( + len(allowed_ec2_types_with_buffer_defined) == 1 + ), "more than one type with buffer is disallowed in this test!" + return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count + + +@pytest.fixture +def pre_pull_images( + ec2_instances_allowed_types: dict[InstanceTypeType, Any] +) -> list[DockerGenericTag]: + allowed_ec2_types = ec2_instances_allowed_types + allowed_ec2_types_with_pre_pull_images_defined = dict( filter( lambda instance_type_and_settings: instance_type_and_settings[ 1 - ].buffer_count - > 0, + ].pre_pull_images, allowed_ec2_types.items(), ) ) assert ( - len(allowed_ec2_types_with_buffer_defined) == 1 - ), "more than one type with buffer is disallowed in this test!" - return next(iter(allowed_ec2_types_with_buffer_defined.values())).buffer_count + len(allowed_ec2_types_with_pre_pull_images_defined) <= 1 + ), "more than one type with pre-pulled-images is disallowed in this test!" + + if allowed_ec2_types_with_pre_pull_images_defined: + return next( + iter(allowed_ec2_types_with_pre_pull_images_defined.values()) + ).pre_pull_images + return [] @pytest.fixture @@ -484,6 +688,15 @@ def skip_if_external_envfile_dict(external_envfile_dict: EnvVarsDict) -> None: pytest.skip("Skipping test since external-envfile is not set") +def _skip_test_if_not_using_external_envfile( + external_envfile_dict: EnvVarsDict, +) -> None: + if not external_envfile_dict: + pytest.skip( + "This test is only for use directly with AWS server, please define --external-envfile" + ) + + async def test_monitor_buffer_machines_against_aws( skip_if_external_envfile_dict: None, disable_buffers_pool_background_task: None, @@ -492,104 +705,20 @@ async def test_monitor_buffer_machines_against_aws( external_envfile_dict: EnvVarsDict, ec2_client: EC2Client, buffer_count: int, + pre_pull_images: list[DockerGenericTag], ec2_instances_allowed_types: dict[InstanceTypeType, Any], instance_type_filters: Sequence[FilterTypeDef], ec2_instance_custom_tags: dict[str, str], initialized_app: FastAPI, ): - if not external_envfile_dict: - pytest.skip( - "This test is only for use directly with AWS server, please define --external-envfile" - ) - - # 0. we have no instances now - all_instances = await ec2_client.describe_instances(Filters=instance_type_filters) - assert not all_instances["Reservations"] - - # 1. run, this will create as many buffer machines as needed - with log_context(logging.INFO, "create buffer machines"): - await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() - ) - with log_context( - logging.INFO, f"waiting for {buffer_count} buffer instances to be running" - ) as ctx: - - @tenacity.retry( - wait=tenacity.wait_fixed(5), - stop=tenacity.stop_after_delay(120), - retry=tenacity.retry_if_exception_type(AssertionError), - reraise=True, - before_sleep=tenacity.before_sleep_log(ctx.logger, logging.INFO), - after=tenacity.after_log(ctx.logger, logging.INFO), - ) - async def _assert_buffer_machines_running(): - await assert_autoscaled_dynamic_warm_pools_ec2_instances( - ec2_client, - expected_num_reservations=1, - expected_num_instances=buffer_count, - expected_instance_type=next(iter(ec2_instances_allowed_types)), - expected_instance_state="running", - expected_additional_tag_keys=list(ec2_instance_custom_tags), - instance_filters=instance_type_filters, - ) - - await _assert_buffer_machines_running() - - # 2. this should now run a SSM command for pulling - with log_context(logging.INFO, "run SSM commands for pulling") as ctx: - - @tenacity.retry( - wait=tenacity.wait_fixed(5), - stop=tenacity.stop_after_delay(120), - retry=tenacity.retry_if_exception_type(AssertionError), - reraise=True, - before_sleep=tenacity.before_sleep_log(ctx.logger, logging.INFO), - after=tenacity.after_log(ctx.logger, logging.INFO), - ) - async def _assert_ssm_command_for_pulling(): - await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() - ) - await assert_autoscaled_dynamic_warm_pools_ec2_instances( - ec2_client, - expected_num_reservations=1, - expected_num_instances=buffer_count, - expected_instance_type=next(iter(ec2_instances_allowed_types)), - expected_instance_state="running", - expected_additional_tag_keys=[ - "pulling", - "ssm-command-id", - *list(ec2_instance_custom_tags), - ], - instance_filters=instance_type_filters, - ) - - await _assert_ssm_command_for_pulling() - - # 3. is the command finished? - with log_context(logging.INFO, "wait for SSM commands to finish") as ctx: - - @tenacity.retry( - wait=tenacity.wait_fixed(5), - stop=tenacity.stop_after_delay(datetime.timedelta(minutes=10)), - retry=tenacity.retry_if_exception_type(AssertionError), - reraise=True, - before_sleep=tenacity.before_sleep_log(ctx.logger, logging.INFO), - after=tenacity.after_log(ctx.logger, logging.INFO), - ) - async def _assert_wait_for_ssm_command_to_finish(): - await monitor_buffer_machines( - initialized_app, auto_scaling_mode=DynamicAutoscaling() - ) - await assert_autoscaled_dynamic_warm_pools_ec2_instances( - ec2_client, - expected_num_reservations=1, - expected_num_instances=buffer_count, - expected_instance_type=next(iter(ec2_instances_allowed_types)), - expected_instance_state="stopped", - expected_additional_tag_keys=list(ec2_instance_custom_tags), - instance_filters=instance_type_filters, - ) - - await _assert_wait_for_ssm_command_to_finish() + _skip_test_if_not_using_external_envfile(external_envfile_dict) + + await _test_monitor_buffer_machines( + ec2_client=ec2_client, + instance_type_filters=instance_type_filters, + initialized_app=initialized_app, + buffer_count=buffer_count, + pre_pulled_images=pre_pull_images, + ec2_instances_allowed_types=ec2_instances_allowed_types, + ec2_instance_custom_tags=ec2_instance_custom_tags, + )