diff --git a/docker-compose.yml b/docker-compose.yml index 4a09162fbc..fecf154f06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,6 +19,7 @@ x-cm-variables: &cm PALACE_BASE_URL: "http://localhost:6500" PALACE_CELERY_BROKER_URL: "redis://redis:6379/0" PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_GLOBAL_KEYPREFIX: "test" + PALACE_CELERY_CLOUDWATCH_STATISTICS_DRYRUN: "true" # Set up the environment variables used for testing as well SIMPLIFIED_TEST_DATABASE: "postgresql://palace:test@pg:5432/circ" diff --git a/docker/ci/test_scripts.sh b/docker/ci/test_scripts.sh index 601848f2cb..f55954c4e1 100755 --- a/docker/ci/test_scripts.sh +++ b/docker/ci/test_scripts.sh @@ -22,6 +22,7 @@ check_service_status "$container" /etc/service/cron check_service_status "$container" /etc/service/worker-default check_service_status "$container" /etc/service/worker-high check_service_status "$container" /etc/service/beat +check_service_status "$container" /etc/service/celery-cloudwatch # Ensure the installed crontab has no problems check_crontab "$container" diff --git a/docker/runit-scripts/celery-cloudwatch/run b/docker/runit-scripts/celery-cloudwatch/run new file mode 100755 index 0000000000..9083d49ee5 --- /dev/null +++ b/docker/runit-scripts/celery-cloudwatch/run @@ -0,0 +1,9 @@ +#!/bin/bash +set -e + +# Set the working directory to the root of the project +cd /var/www/circulation +source env/bin/activate + +# Start the cloudwatch celery process +exec env/bin/celery -A "palace.manager.celery.app" events -c "palace.manager.celery.monitoring.Cloudwatch" -F 60 --uid 1000 --gid 1000 --logfile /var/log/celery/cloudwatch.log diff --git a/poetry.lock b/poetry.lock index 5d8f7231c3..6cd5f8141f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "alembic" @@ -217,6 +217,7 @@ boto3 = {version = "1.34.0", optional = true, markers = "extra == \"boto3\""} botocore = {version = "1.34.0", optional = true, markers = "extra == \"boto3\""} botocore-stubs = "*" mypy-boto3-cloudformation = {version = ">=1.34.0,<1.35.0", optional = true, markers = "extra == \"essential\""} +mypy-boto3-cloudwatch = {version = ">=1.34.0,<1.35.0", optional = true, markers = "extra == \"cloudwatch\""} mypy-boto3-dynamodb = {version = ">=1.34.0,<1.35.0", optional = true, markers = "extra == \"essential\""} mypy-boto3-ec2 = {version = ">=1.34.0,<1.35.0", optional = true, markers = "extra == \"essential\""} mypy-boto3-lambda = {version = ">=1.34.0,<1.35.0", optional = true, markers = "extra == \"essential\""} @@ -2240,7 +2241,6 @@ files = [ {file = "lxml-5.2.1-cp36-cp36m-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c38d7b9a690b090de999835f0443d8aa93ce5f2064035dfc48f27f02b4afc3d0"}, {file = "lxml-5.2.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5670fb70a828663cc37552a2a85bf2ac38475572b0e9b91283dc09efb52c41d1"}, {file = "lxml-5.2.1-cp36-cp36m-manylinux_2_28_x86_64.whl", hash = "sha256:958244ad566c3ffc385f47dddde4145088a0ab893504b54b52c041987a8c1863"}, - {file = "lxml-5.2.1-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:b6241d4eee5f89453307c2f2bfa03b50362052ca0af1efecf9fef9a41a22bb4f"}, {file = "lxml-5.2.1-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:2a66bf12fbd4666dd023b6f51223aed3d9f3b40fef06ce404cb75bafd3d89536"}, {file = "lxml-5.2.1-cp36-cp36m-musllinux_1_1_ppc64le.whl", hash = "sha256:9123716666e25b7b71c4e1789ec829ed18663152008b58544d95b008ed9e21e9"}, {file = "lxml-5.2.1-cp36-cp36m-musllinux_1_1_s390x.whl", hash = "sha256:0c3f67e2aeda739d1cc0b1102c9a9129f7dc83901226cc24dd72ba275ced4218"}, @@ -2585,6 +2585,20 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.12\""} +[[package]] +name = "mypy-boto3-cloudwatch" +version = "1.34.83" +description = "Type annotations for boto3.CloudWatch 1.34.83 service generated with mypy-boto3-builder 7.23.2" +optional = false +python-versions = ">=3.8" +files = [ + {file = "mypy-boto3-cloudwatch-1.34.83.tar.gz", hash = "sha256:766e166c5b463d9885a5929dc16bb592e0fa7d7beaf569aa4f501d85a848bc13"}, + {file = "mypy_boto3_cloudwatch-1.34.83-py3-none-any.whl", hash = "sha256:6af4fff0ec7c09e423df5a69fff4df8a74044462686e8679b4fe73c106787854"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.12\""} + [[package]] name = "mypy-boto3-dynamodb" version = "1.34.0" @@ -4892,4 +4906,4 @@ lxml = ">=3.8" [metadata] lock-version = "2.0" python-versions = ">=3.10,<4" -content-hash = "a408d6bc324868f901f8dbdd07fa9aa6bcb4c431c302500b8a3599a1f7a91b77" +content-hash = "ef03f72fe652d1b7e53071457a4023a6e97147cff88aeca4919e251e0e4a5fb2" diff --git a/pyproject.toml b/pyproject.toml index 94afb28862..30c94af89e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -275,7 +275,7 @@ tox-docker = "^4.1" tox-gh-actions = "^3.0" [tool.poetry.group.dev.dependencies] -boto3-stubs = {version = "^1.28", extras = ["boto3", "essential", "logs", "s3"]} +boto3-stubs = {version = "^1.28", extras = ["boto3", "cloudwatch", "essential", "logs", "s3"]} freezegun = "~1.5.0" Jinja2 = "^3.1.2" mypy = "^1.4.1" diff --git a/src/palace/manager/api/saml/python_expression_dsl/util.py b/src/palace/manager/api/saml/python_expression_dsl/util.py index 0c41089728..b8babcf59a 100644 --- a/src/palace/manager/api/saml/python_expression_dsl/util.py +++ b/src/palace/manager/api/saml/python_expression_dsl/util.py @@ -117,7 +117,7 @@ def _parse_binary_expression( right_argument = tokens[2] expression = expression_type(operator_type, left_argument, right_argument) - for tokens_chunk in chunks(tokens, 2, 3): + for tokens_chunk in chunks(tokens, 2, 3): # type: ignore operator_type = tokens_chunk[0] right_argument = tokens_chunk[1] expression = expression_type(operator_type, expression, right_argument) diff --git a/src/palace/manager/celery/monitoring.py b/src/palace/manager/celery/monitoring.py new file mode 100644 index 0000000000..ac7c5dbaa8 --- /dev/null +++ b/src/palace/manager/celery/monitoring.py @@ -0,0 +1,229 @@ +from __future__ import annotations + +from collections.abc import Sequence +from dataclasses import dataclass, field +from datetime import datetime +from typing import TYPE_CHECKING, Any + +import boto3 +from boto3.exceptions import Boto3Error +from botocore.exceptions import BotoCoreError +from celery.events.snapshot import Polaroid +from celery.events.state import State, Task + +from palace.manager.util import chunks +from palace.manager.util.datetime_helpers import utc_now +from palace.manager.util.log import LoggerMixin + +if TYPE_CHECKING: + from mypy_boto3_cloudwatch.literals import StandardUnitType + from mypy_boto3_cloudwatch.type_defs import DimensionTypeDef, MetricDatumTypeDef + + +def metric_dimensions(dimensions: dict[str, str]) -> Sequence[DimensionTypeDef]: + return [{"Name": key, "Value": value} for key, value in dimensions.items()] + + +def value_metric( + metric_name: str, + value: int, + timestamp: datetime, + dimensions: dict[str, str], + unit: StandardUnitType = "Count", +) -> MetricDatumTypeDef: + """ + Format a metric for a single value into the format expected by Cloudwatch. + + See Boto3 documentation: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch/client/put_metric_data.html + """ + return { + "MetricName": metric_name, + "Value": value, + "Timestamp": timestamp.isoformat(), + "Dimensions": metric_dimensions(dimensions), + "Unit": unit, + } + + +def statistic_metric( + metric_name: str, + values: Sequence[float], + timestamp: datetime, + dimensions: dict[str, str], + unit: StandardUnitType = "Seconds", +) -> MetricDatumTypeDef: + """ + Format a metric for multiple values into the format expected by Cloudwatch. This + includes the statistic values for the maximum, minimum, sum, and sample count. + + See Boto3 documentation: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch/client/put_metric_data.html + """ + return { + "MetricName": metric_name, + "StatisticValues": { + "Maximum": max(values), + "Minimum": min(values), + "SampleCount": len(values), + "Sum": sum(values), + }, + "Timestamp": timestamp.isoformat(), + "Dimensions": metric_dimensions(dimensions), + "Unit": unit, + } + + +@dataclass +class TaskStats: + """ + Tracks the number of tasks that have succeeded, failed, and how long they took to run + for a specific task, so we can report this out to Cloudwatch metrics. + """ + + succeeded: int = 0 + failed: int = 0 + runtime: list[float] = field(default_factory=list) + + def update(self, task: Task) -> None: + if task.succeeded: + self.succeeded += 1 + if task.runtime: + self.runtime.append(task.runtime) + if task.failed: + self.failed += 1 + + def metrics( + self, timestamp: datetime, dimensions: dict[str, str] + ) -> list[MetricDatumTypeDef]: + metric_data = [ + value_metric("TaskSucceeded", self.succeeded, timestamp, dimensions), + value_metric("TaskFailed", self.failed, timestamp, dimensions), + ] + + if self.runtime: + metric_data.append( + statistic_metric("TaskRuntime", self.runtime, timestamp, dimensions) + ) + + return metric_data + + +@dataclass +class QueueStats(LoggerMixin): + """ + Tracks the number of tasks queued for a specific queue, so we can + report this out to Cloudwatch metrics. + """ + + queued: set[str] = field(default_factory=set) + + def update(self, task: Task) -> None: + self.log.debug("Task: %r", task) + if task.uuid in self.queued: + if task.started: + self.log.debug("Task %s started.", task.uuid) + self.queued.remove(task.uuid) + else: + if task.sent and not task.started: + self.log.debug("Task %s queued.", task.uuid) + self.queued.add(task.uuid) + + def metrics( + self, timestamp: datetime, dimensions: dict[str, str] + ) -> list[MetricDatumTypeDef]: + return [ + value_metric("QueueWaiting", len(self.queued), timestamp, dimensions), + ] + + +class Cloudwatch(Polaroid): + """ + Implements a Celery custom camera that sends task and queue statistics to Cloudwatch. + + See Celery documentation for more information on custom cameras: + https://docs.celeryq.dev/en/stable/userguide/monitoring.html#custom-camera + """ + + clear_after = True # clear after flush (incl, state.event_count). + + def __init__( + self, + *args: Any, + **kwargs: Any, + ): + super().__init__(*args, **kwargs) + # We use logger_for_cls instead of just inheriting from LoggerMixin + # because the base class Polaroid already defines a logger attribute, + # which conflicts with the logger() method in LoggerMixin. + self.logger = LoggerMixin.logger_for_cls(self.__class__) + region = self.app.conf.get("cloudwatch_statistics_region") + dryrun = self.app.conf.get("cloudwatch_statistics_dryrun") + self.cloudwatch_client = ( + boto3.client("cloudwatch", region_name=region) if not dryrun else None + ) + self.manager_name = self.app.conf.get("broker_transport_options", {}).get( + "global_keyprefix" + ) + self.namespace = self.app.conf.get("cloudwatch_statistics_namespace") + self.upload_size = self.app.conf.get("cloudwatch_statistics_upload_size") + self.queues = { + str(queue.name): QueueStats() for queue in self.app.conf.get("task_queues") + } + + def on_shutter(self, state: State) -> None: + timestamp = utc_now() + tasks = { + task: TaskStats() + for task in self.app.tasks.keys() + if not task.startswith("celery.") + } + + for task in state.tasks.values(): + try: + tasks[task.name].update(task) + self.queues[task.routing_key].update(task) + except KeyError: + self.logger.exception( + "Error processing task %s with routing key %s", + task.name, + task.routing_key, + ) + + self.publish(tasks, self.queues, timestamp) + + def publish( + self, + tasks: dict[str, TaskStats], + queues: dict[str, QueueStats], + timestamp: datetime, + ) -> None: + metric_data = [] + for task_name, task_stats in tasks.items(): + metric_data.extend( + task_stats.metrics( + timestamp, {"TaskName": task_name, "Manager": self.manager_name} + ) + ) + + for queue_name, queue_stats in queues.items(): + metric_data.extend( + queue_stats.metrics( + timestamp, {"QueueName": queue_name, "Manager": self.manager_name} + ) + ) + + for chunk in chunks(metric_data, self.upload_size): + self.logger.info("Sending %d metrics to Cloudwatch.", len(chunk)) + if self.cloudwatch_client is not None: + try: + self.cloudwatch_client.put_metric_data( + Namespace=self.namespace, + MetricData=chunk, + ) + except (Boto3Error, BotoCoreError): + self.logger.exception("Error sending metrics to Cloudwatch.") + else: + self.logger.info("Dry run enabled. Not sending metrics to Cloudwatch.") + for data in chunk: + self.logger.info("Data: %s", data) diff --git a/src/palace/manager/service/celery/configuration.py b/src/palace/manager/service/celery/configuration.py index 36cd5b713b..ebe531e1cb 100644 --- a/src/palace/manager/service/celery/configuration.py +++ b/src/palace/manager/service/celery/configuration.py @@ -15,19 +15,30 @@ class CeleryConfiguration(ServiceConfiguration): broker_transport_options_global_keyprefix: str = "palace" broker_transport_options_queue_order_strategy: str = "priority" - task_acks_late = True - task_reject_on_worker_lost = True - task_remote_tracebacks = True - task_create_missing_queues = False + task_acks_late: bool = True + task_reject_on_worker_lost: bool = True + task_remote_tracebacks: bool = True + task_create_missing_queues: bool = False + task_send_sent_event: bool = True + task_track_started: bool = True worker_cancel_long_running_tasks_on_connection_loss: bool = False worker_max_tasks_per_child: int = 100 worker_prefetch_multiplier: int = 1 worker_hijack_root_logger: bool = False worker_log_color: bool = False + worker_send_task_events: bool = True timezone: str = "US/Eastern" + # These settings are specific to the custom event reporting we are doing + # to send Celery task and queue statistics to Cloudwatch. You can see + # how they are used in `palace.manager.celery.monitoring.Cloudwatch`. + cloudwatch_statistics_dryrun: bool = False + cloudwatch_statistics_namespace: str = "Celery" + cloudwatch_statistics_region: str = "us-west-2" + cloudwatch_statistics_upload_size: int = 500 + class Config: env_prefix = "PALACE_CELERY_" diff --git a/src/palace/manager/util/__init__.py b/src/palace/manager/util/__init__.py index 96ba0d94e5..54d679836e 100644 --- a/src/palace/manager/util/__init__.py +++ b/src/palace/manager/util/__init__.py @@ -5,8 +5,8 @@ import re import string from collections import Counter -from collections.abc import Iterable -from typing import Any +from collections.abc import Generator, Iterable, Sequence +from typing import Any, SupportsIndex, TypeVar import sqlalchemy from money import Money @@ -568,7 +568,12 @@ def first_or_default(collection: Iterable, default: Any | None = None) -> Any: return element -def chunks(lst, chunk_size, start_index=0): +T = TypeVar("T") + + +def chunks( + lst: Sequence[T], chunk_size: int, start_index: int = 0 +) -> Generator[Sequence[T], None, None]: """Yield successive n-sized chunks from lst.""" length = len(lst) diff --git a/src/palace/manager/util/log.py b/src/palace/manager/util/log.py index 1ba4c0d24e..1c718df441 100644 --- a/src/palace/manager/util/log.py +++ b/src/palace/manager/util/log.py @@ -91,6 +91,10 @@ def elapsed_time_logging( class LoggerMixin: """Mixin that adds a logger with a standardized name""" + @staticmethod + def logger_for_cls(cls: type[object]) -> logging.Logger: + return logging.getLogger(f"{cls.__module__}.{cls.__name__}") + @classmethod @functools.cache def logger(cls) -> logging.Logger: @@ -100,7 +104,7 @@ def logger(cls) -> logging.Logger: This is cached so that we don't create a new logger every time it is called. """ - return logging.getLogger(f"{cls.__module__}.{cls.__name__}") + return cls.logger_for_cls(cls) @property def log(self) -> logging.Logger: diff --git a/tests/manager/celery/test_monitoring.py b/tests/manager/celery/test_monitoring.py new file mode 100644 index 0000000000..9611297a22 --- /dev/null +++ b/tests/manager/celery/test_monitoring.py @@ -0,0 +1,324 @@ +from functools import partial +from unittest.mock import MagicMock, create_autospec, patch +from uuid import uuid4 + +import pytest +from boto3.exceptions import Boto3Error +from celery.events.state import State, Task +from freezegun import freeze_time + +from palace.manager.celery.celery import Celery +from palace.manager.celery.monitoring import Cloudwatch, QueueStats, TaskStats +from palace.manager.service.logging.configuration import LogLevel + + +class CloudwatchCameraFixture: + def __init__(self, boto_client: MagicMock): + self.app = create_autospec(Celery) + self.configure_app() + self.app.tasks = { + "task1": MagicMock(), + "task2": MagicMock(), + "celery.built_in": MagicMock(), + } + self.client = boto_client + self.state = create_autospec(State) + self.state.tasks = { + "uuid1": self.mock_task("task1", "queue1", runtime=1.0), + "uuid2": self.mock_task("task1", "queue1", runtime=2.0), + "uuid3": self.mock_task("task2", "queue2", succeeded=False, failed=True), + "uuid4": self.mock_task( + "task2", "queue2", started=False, succeeded=False, uuid="uuid4" + ), + } + self.create_cloudwatch = partial(Cloudwatch, state=self.state, app=self.app) + + def mock_queue(self, name: str) -> MagicMock: + queue = MagicMock() + queue.name = name + return queue + + def mock_task( + self, + name: str | None = None, + routing_key: str | None = None, + sent: bool = True, + started: bool = True, + succeeded: bool = True, + failed: bool = False, + runtime: float | None = None, + uuid: str | None = None, + ) -> Task: + if uuid is None: + uuid = str(uuid4()) + if name is None: + name = "task" + if routing_key is None: + routing_key = "queue" + return Task( + uuid=uuid, + name=name, + routing_key=routing_key, + sent=sent, + started=started, + succeeded=succeeded, + failed=failed, + runtime=runtime, + ) + + def configure_app( + self, + region: str = "region", + dry_run: bool = False, + manager_name: str = "manager", + namespace: str = "namespace", + upload_size: int = 100, + queues: list[str] | None = None, + ) -> None: + queues = queues or ["queue1", "queue2"] + self.app.conf = { + "cloudwatch_statistics_region": region, + "cloudwatch_statistics_dryrun": dry_run, + "broker_transport_options": {"global_keyprefix": manager_name}, + "cloudwatch_statistics_namespace": namespace, + "cloudwatch_statistics_upload_size": upload_size, + "task_queues": [self.mock_queue(queue) for queue in queues], + } + + +@pytest.fixture +def cloudwatch_camera(): + with patch("boto3.client") as boto_client: + yield CloudwatchCameraFixture(boto_client) + + +class TestTaskStats: + def test_update(self, cloudwatch_camera: CloudwatchCameraFixture): + stats = TaskStats() + + mock_task = cloudwatch_camera.mock_task(runtime=1.0) + stats.update(mock_task) + assert stats.failed == 0 + assert stats.succeeded == 1 + assert stats.runtime == [1.0] + + mock_task = cloudwatch_camera.mock_task(succeeded=False, failed=True) + stats.update(mock_task) + assert stats.failed == 1 + assert stats.succeeded == 1 + assert stats.runtime == [1.0] + + mock_task = cloudwatch_camera.mock_task(runtime=2.0) + stats.update(mock_task) + assert stats.failed == 1 + assert stats.succeeded == 2 + assert stats.runtime == [1.0, 2.0] + + def test_update_with_none_runtime(self, cloudwatch_camera: CloudwatchCameraFixture): + stats = TaskStats() + mock_task = cloudwatch_camera.mock_task() + stats.update(mock_task) + assert stats.failed == 0 + assert stats.succeeded == 1 + assert stats.runtime == [] + + def test_metrics(self): + stats = TaskStats(succeeded=2, failed=5, runtime=[3.5, 2.2]) + timestamp = MagicMock() + dimensions = {"key": "value", "key2": "value2"} + + expected_dimensions = [ + {"Name": key, "Value": value} for key, value in dimensions.items() + ] + + [succeeded_metric, failed_metric, runtime_metric] = stats.metrics( + timestamp, dimensions + ) + + assert succeeded_metric["MetricName"] == "TaskSucceeded" + assert succeeded_metric["Value"] == 2 + assert succeeded_metric["Timestamp"] == timestamp.isoformat() + assert succeeded_metric["Dimensions"] == expected_dimensions + assert succeeded_metric["Unit"] == "Count" + + assert failed_metric["MetricName"] == "TaskFailed" + assert failed_metric["Value"] == 5 + assert failed_metric["Timestamp"] == timestamp.isoformat() + assert failed_metric["Dimensions"] == expected_dimensions + assert failed_metric["Unit"] == "Count" + + assert runtime_metric["MetricName"] == "TaskRuntime" + assert runtime_metric["StatisticValues"] == { + "Maximum": 3.5, + "Minimum": 2.2, + "SampleCount": 2, + "Sum": 5.7, + } + assert runtime_metric["Timestamp"] == timestamp.isoformat() + assert runtime_metric["Dimensions"] == expected_dimensions + assert runtime_metric["Unit"] == "Seconds" + + def test_metrics_with_empty_runtime(self): + stats = TaskStats(succeeded=2, failed=5, runtime=[]) + [succeeded_metric, failed_metric] = stats.metrics(MagicMock(), {}) + + assert succeeded_metric["MetricName"] == "TaskSucceeded" + assert failed_metric["MetricName"] == "TaskFailed" + + +class TestQueueStats: + def test_update(self, cloudwatch_camera: CloudwatchCameraFixture): + stats = QueueStats() + + assert len(stats.queued) == 0 + + mock_task = cloudwatch_camera.mock_task(sent=False, started=False) + + # Task is not started or sent, so it should not be in the queue. + stats.update(mock_task) + assert len(stats.queued) == 0 + + # Task is both sent and started, so its being processed and should not be in the queue. + mock_task = cloudwatch_camera.mock_task(sent=True, started=True) + stats.update(mock_task) + assert len(stats.queued) == 0 + + # Task is sent but not started, so it should be in the queue. + mock_task = cloudwatch_camera.mock_task(sent=True, started=False) + stats.update(mock_task) + assert len(stats.queued) == 1 + + # If the task is sent again, it should still be in the queue, but not duplicated. + stats.update(mock_task) + assert len(stats.queued) == 1 + + # If the task is started, it should be removed from the queue. + mock_task.started = True + stats.update(mock_task) + assert len(stats.queued) == 0 + + def test_metrics(self): + stats = QueueStats(queued={"uuid1", "uuid2"}) + timestamp = MagicMock() + dimensions = {"key": "value", "key2": "value2"} + expected_dimensions = [ + {"Name": key, "Value": value} for key, value in dimensions.items() + ] + [metric] = stats.metrics(timestamp, dimensions) + assert metric["MetricName"] == "QueueWaiting" + assert metric["Value"] == 2 + assert metric["Timestamp"] == timestamp.isoformat() + assert metric["Dimensions"] == expected_dimensions + assert metric["Unit"] == "Count" + + stats = QueueStats() + [metric] = stats.metrics(timestamp, dimensions) + assert metric["MetricName"] == "QueueWaiting" + assert metric["Value"] == 0 + + +class TestCloudwatch: + def test__init__(self, cloudwatch_camera: CloudwatchCameraFixture): + cloudwatch = cloudwatch_camera.create_cloudwatch() + assert cloudwatch.logger is not None + assert cloudwatch.logger.name == "palace.manager.celery.monitoring.Cloudwatch" + assert cloudwatch.cloudwatch_client == cloudwatch_camera.client.return_value + cloudwatch_camera.client.assert_called_once_with( + "cloudwatch", region_name="region" + ) + assert cloudwatch.manager_name == "manager" + assert cloudwatch.namespace == "namespace" + assert cloudwatch.upload_size == 100 + assert cloudwatch.queues == {"queue1": QueueStats(), "queue2": QueueStats()} + + def test__init__dryrun(self, cloudwatch_camera: CloudwatchCameraFixture): + cloudwatch_camera.configure_app(dry_run=True) + cloudwatch = cloudwatch_camera.create_cloudwatch() + assert cloudwatch.cloudwatch_client is None + + def test_on_shutter(self, cloudwatch_camera: CloudwatchCameraFixture): + cloudwatch = cloudwatch_camera.create_cloudwatch() + mock_publish = create_autospec(cloudwatch.publish) + cloudwatch.publish = mock_publish + with freeze_time("2021-01-01"): + cloudwatch.on_shutter(cloudwatch_camera.state) + mock_publish.assert_called_once() + [tasks, queues, time] = mock_publish.call_args.args + + assert tasks == { + "task1": TaskStats(succeeded=2, runtime=[1.0, 2.0]), + "task2": TaskStats(failed=1), + } + assert queues == { + "queue1": QueueStats(), + "queue2": QueueStats(queued={"uuid4"}), + } + assert time.isoformat() == "2021-01-01T00:00:00+00:00" + + def test_on_shutter_error( + self, + cloudwatch_camera: CloudwatchCameraFixture, + caplog: pytest.LogCaptureFixture, + ): + cloudwatch_camera.app.tasks = {"task1": MagicMock()} + cloudwatch = cloudwatch_camera.create_cloudwatch() + mock_publish = create_autospec(cloudwatch.publish) + cloudwatch.publish = mock_publish + cloudwatch.on_shutter(cloudwatch_camera.state) + mock_publish.assert_called_once() + [tasks, queues, time] = mock_publish.call_args.args + + assert tasks == {"task1": TaskStats(succeeded=2, runtime=[1.0, 2.0])} + assert queues == {"queue1": QueueStats(), "queue2": QueueStats()} + assert time is not None + assert "Error processing task" in caplog.text + + def test_publish( + self, + cloudwatch_camera: CloudwatchCameraFixture, + caplog: pytest.LogCaptureFixture, + ): + cloudwatch = cloudwatch_camera.create_cloudwatch() + assert cloudwatch.cloudwatch_client is not None + mock_put_metric_data = cloudwatch_camera.client.return_value.put_metric_data + mock_put_metric_data.return_value = { + "ResponseMetadata": {"HTTPStatusCode": 200} + } + timestamp = MagicMock() + tasks = {"task1": TaskStats(succeeded=2, failed=5, runtime=[3.5, 2.2])} + queues = {"queue1": QueueStats(queued={"uuid1", "uuid2"})} + + cloudwatch.publish(tasks, queues, timestamp) + mock_put_metric_data.assert_called_once() + kwargs = mock_put_metric_data.call_args.kwargs + assert kwargs["Namespace"] == "namespace" + expected = [ + *tasks["task1"].metrics( + timestamp, {"TaskName": "task1", "Manager": "manager"} + ), + *queues["queue1"].metrics( + timestamp, {"QueueName": "queue1", "Manager": "manager"} + ), + ] + + assert kwargs["MetricData"] == expected + + # If chunking is enabled, put_metric_data should be called multiple times. Once for each chunk. + cloudwatch.upload_size = 1 + mock_put_metric_data.reset_mock() + cloudwatch.publish(tasks, queues, timestamp) + assert mock_put_metric_data.call_count == len(expected) + + # If there is an error, it should be logged. + mock_put_metric_data.side_effect = Boto3Error("Boom") + cloudwatch.publish(tasks, queues, timestamp) + assert "Error sending metrics to Cloudwatch." in caplog.text + + # If dry run is enabled, no metrics should be sent and a log message should be generated. + caplog.clear() + caplog.set_level(LogLevel.info) + cloudwatch.cloudwatch_client = None + mock_put_metric_data.reset_mock() + cloudwatch.publish(tasks, queues, timestamp) + mock_put_metric_data.assert_not_called() + assert "Dry run enabled. Not sending metrics to Cloudwatch." in caplog.text