Skip to content

Commit

Permalink
Monitor celery tasks in cloudwatch (PP-1150) (#1813)
Browse files Browse the repository at this point in the history
Add new custom metrics for Celery tasks and queues in Cloudwatch:
- Tasks
    * TaskFailed
    * TaskSucceeded
    * TaskRuntime
- Queues
    * QueueWaiting
  • Loading branch information
jonathangreen authored Apr 30, 2024
1 parent d6c4019 commit 27c020a
Show file tree
Hide file tree
Showing 11 changed files with 611 additions and 13 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ x-cm-variables: &cm
PALACE_BASE_URL: "http://localhost:6500"
PALACE_CELERY_BROKER_URL: "redis://redis:6379/0"
PALACE_CELERY_BROKER_TRANSPORT_OPTIONS_GLOBAL_KEYPREFIX: "test"
PALACE_CELERY_CLOUDWATCH_STATISTICS_DRYRUN: "true"

# Set up the environment variables used for testing as well
SIMPLIFIED_TEST_DATABASE: "postgresql://palace:test@pg:5432/circ"
Expand Down
1 change: 1 addition & 0 deletions docker/ci/test_scripts.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ check_service_status "$container" /etc/service/cron
check_service_status "$container" /etc/service/worker-default
check_service_status "$container" /etc/service/worker-high
check_service_status "$container" /etc/service/beat
check_service_status "$container" /etc/service/celery-cloudwatch

# Ensure the installed crontab has no problems
check_crontab "$container"
Expand Down
9 changes: 9 additions & 0 deletions docker/runit-scripts/celery-cloudwatch/run
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash
set -e

# Set the working directory to the root of the project
cd /var/www/circulation
source env/bin/activate

# Start the cloudwatch celery process
exec env/bin/celery -A "palace.manager.celery.app" events -c "palace.manager.celery.monitoring.Cloudwatch" -F 60 --uid 1000 --gid 1000 --logfile /var/log/celery/cloudwatch.log
20 changes: 17 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ tox-docker = "^4.1"
tox-gh-actions = "^3.0"

[tool.poetry.group.dev.dependencies]
boto3-stubs = {version = "^1.28", extras = ["boto3", "essential", "logs", "s3"]}
boto3-stubs = {version = "^1.28", extras = ["boto3", "cloudwatch", "essential", "logs", "s3"]}
freezegun = "~1.5.0"
Jinja2 = "^3.1.2"
mypy = "^1.4.1"
Expand Down
2 changes: 1 addition & 1 deletion src/palace/manager/api/saml/python_expression_dsl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def _parse_binary_expression(
right_argument = tokens[2]
expression = expression_type(operator_type, left_argument, right_argument)

for tokens_chunk in chunks(tokens, 2, 3):
for tokens_chunk in chunks(tokens, 2, 3): # type: ignore
operator_type = tokens_chunk[0]
right_argument = tokens_chunk[1]
expression = expression_type(operator_type, expression, right_argument)
Expand Down
229 changes: 229 additions & 0 deletions src/palace/manager/celery/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
from __future__ import annotations

from collections.abc import Sequence
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any

import boto3
from boto3.exceptions import Boto3Error
from botocore.exceptions import BotoCoreError
from celery.events.snapshot import Polaroid
from celery.events.state import State, Task

from palace.manager.util import chunks
from palace.manager.util.datetime_helpers import utc_now
from palace.manager.util.log import LoggerMixin

if TYPE_CHECKING:
from mypy_boto3_cloudwatch.literals import StandardUnitType
from mypy_boto3_cloudwatch.type_defs import DimensionTypeDef, MetricDatumTypeDef


def metric_dimensions(dimensions: dict[str, str]) -> Sequence[DimensionTypeDef]:
return [{"Name": key, "Value": value} for key, value in dimensions.items()]


def value_metric(
metric_name: str,
value: int,
timestamp: datetime,
dimensions: dict[str, str],
unit: StandardUnitType = "Count",
) -> MetricDatumTypeDef:
"""
Format a metric for a single value into the format expected by Cloudwatch.
See Boto3 documentation:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch/client/put_metric_data.html
"""
return {
"MetricName": metric_name,
"Value": value,
"Timestamp": timestamp.isoformat(),
"Dimensions": metric_dimensions(dimensions),
"Unit": unit,
}


def statistic_metric(
metric_name: str,
values: Sequence[float],
timestamp: datetime,
dimensions: dict[str, str],
unit: StandardUnitType = "Seconds",
) -> MetricDatumTypeDef:
"""
Format a metric for multiple values into the format expected by Cloudwatch. This
includes the statistic values for the maximum, minimum, sum, and sample count.
See Boto3 documentation:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch/client/put_metric_data.html
"""
return {
"MetricName": metric_name,
"StatisticValues": {
"Maximum": max(values),
"Minimum": min(values),
"SampleCount": len(values),
"Sum": sum(values),
},
"Timestamp": timestamp.isoformat(),
"Dimensions": metric_dimensions(dimensions),
"Unit": unit,
}


@dataclass
class TaskStats:
"""
Tracks the number of tasks that have succeeded, failed, and how long they took to run
for a specific task, so we can report this out to Cloudwatch metrics.
"""

succeeded: int = 0
failed: int = 0
runtime: list[float] = field(default_factory=list)

def update(self, task: Task) -> None:
if task.succeeded:
self.succeeded += 1
if task.runtime:
self.runtime.append(task.runtime)
if task.failed:
self.failed += 1

def metrics(
self, timestamp: datetime, dimensions: dict[str, str]
) -> list[MetricDatumTypeDef]:
metric_data = [
value_metric("TaskSucceeded", self.succeeded, timestamp, dimensions),
value_metric("TaskFailed", self.failed, timestamp, dimensions),
]

if self.runtime:
metric_data.append(
statistic_metric("TaskRuntime", self.runtime, timestamp, dimensions)
)

return metric_data


@dataclass
class QueueStats(LoggerMixin):
"""
Tracks the number of tasks queued for a specific queue, so we can
report this out to Cloudwatch metrics.
"""

queued: set[str] = field(default_factory=set)

def update(self, task: Task) -> None:
self.log.debug("Task: %r", task)
if task.uuid in self.queued:
if task.started:
self.log.debug("Task %s started.", task.uuid)
self.queued.remove(task.uuid)
else:
if task.sent and not task.started:
self.log.debug("Task %s queued.", task.uuid)
self.queued.add(task.uuid)

def metrics(
self, timestamp: datetime, dimensions: dict[str, str]
) -> list[MetricDatumTypeDef]:
return [
value_metric("QueueWaiting", len(self.queued), timestamp, dimensions),
]


class Cloudwatch(Polaroid):
"""
Implements a Celery custom camera that sends task and queue statistics to Cloudwatch.
See Celery documentation for more information on custom cameras:
https://docs.celeryq.dev/en/stable/userguide/monitoring.html#custom-camera
"""

clear_after = True # clear after flush (incl, state.event_count).

def __init__(
self,
*args: Any,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
# We use logger_for_cls instead of just inheriting from LoggerMixin
# because the base class Polaroid already defines a logger attribute,
# which conflicts with the logger() method in LoggerMixin.
self.logger = LoggerMixin.logger_for_cls(self.__class__)
region = self.app.conf.get("cloudwatch_statistics_region")
dryrun = self.app.conf.get("cloudwatch_statistics_dryrun")
self.cloudwatch_client = (
boto3.client("cloudwatch", region_name=region) if not dryrun else None
)
self.manager_name = self.app.conf.get("broker_transport_options", {}).get(
"global_keyprefix"
)
self.namespace = self.app.conf.get("cloudwatch_statistics_namespace")
self.upload_size = self.app.conf.get("cloudwatch_statistics_upload_size")
self.queues = {
str(queue.name): QueueStats() for queue in self.app.conf.get("task_queues")
}

def on_shutter(self, state: State) -> None:
timestamp = utc_now()
tasks = {
task: TaskStats()
for task in self.app.tasks.keys()
if not task.startswith("celery.")
}

for task in state.tasks.values():
try:
tasks[task.name].update(task)
self.queues[task.routing_key].update(task)
except KeyError:
self.logger.exception(
"Error processing task %s with routing key %s",
task.name,
task.routing_key,
)

self.publish(tasks, self.queues, timestamp)

def publish(
self,
tasks: dict[str, TaskStats],
queues: dict[str, QueueStats],
timestamp: datetime,
) -> None:
metric_data = []
for task_name, task_stats in tasks.items():
metric_data.extend(
task_stats.metrics(
timestamp, {"TaskName": task_name, "Manager": self.manager_name}
)
)

for queue_name, queue_stats in queues.items():
metric_data.extend(
queue_stats.metrics(
timestamp, {"QueueName": queue_name, "Manager": self.manager_name}
)
)

for chunk in chunks(metric_data, self.upload_size):
self.logger.info("Sending %d metrics to Cloudwatch.", len(chunk))
if self.cloudwatch_client is not None:
try:
self.cloudwatch_client.put_metric_data(
Namespace=self.namespace,
MetricData=chunk,
)
except (Boto3Error, BotoCoreError):
self.logger.exception("Error sending metrics to Cloudwatch.")
else:
self.logger.info("Dry run enabled. Not sending metrics to Cloudwatch.")
for data in chunk:
self.logger.info("Data: %s", data)
19 changes: 15 additions & 4 deletions src/palace/manager/service/celery/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,30 @@ class CeleryConfiguration(ServiceConfiguration):
broker_transport_options_global_keyprefix: str = "palace"
broker_transport_options_queue_order_strategy: str = "priority"

task_acks_late = True
task_reject_on_worker_lost = True
task_remote_tracebacks = True
task_create_missing_queues = False
task_acks_late: bool = True
task_reject_on_worker_lost: bool = True
task_remote_tracebacks: bool = True
task_create_missing_queues: bool = False
task_send_sent_event: bool = True
task_track_started: bool = True

worker_cancel_long_running_tasks_on_connection_loss: bool = False
worker_max_tasks_per_child: int = 100
worker_prefetch_multiplier: int = 1
worker_hijack_root_logger: bool = False
worker_log_color: bool = False
worker_send_task_events: bool = True

timezone: str = "US/Eastern"

# These settings are specific to the custom event reporting we are doing
# to send Celery task and queue statistics to Cloudwatch. You can see
# how they are used in `palace.manager.celery.monitoring.Cloudwatch`.
cloudwatch_statistics_dryrun: bool = False
cloudwatch_statistics_namespace: str = "Celery"
cloudwatch_statistics_region: str = "us-west-2"
cloudwatch_statistics_upload_size: int = 500

class Config:
env_prefix = "PALACE_CELERY_"

Expand Down
11 changes: 8 additions & 3 deletions src/palace/manager/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import re
import string
from collections import Counter
from collections.abc import Iterable
from typing import Any
from collections.abc import Generator, Iterable, Sequence
from typing import Any, SupportsIndex, TypeVar

import sqlalchemy
from money import Money
Expand Down Expand Up @@ -568,7 +568,12 @@ def first_or_default(collection: Iterable, default: Any | None = None) -> Any:
return element


def chunks(lst, chunk_size, start_index=0):
T = TypeVar("T")


def chunks(
lst: Sequence[T], chunk_size: int, start_index: int = 0
) -> Generator[Sequence[T], None, None]:
"""Yield successive n-sized chunks from lst."""
length = len(lst)

Expand Down
Loading

0 comments on commit 27c020a

Please sign in to comment.