Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathangreen committed Apr 30, 2024
1 parent 05b51fc commit 75e0a6b
Showing 1 changed file with 100 additions and 80 deletions.
180 changes: 100 additions & 80 deletions tests/manager/celery/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from functools import partial
from unittest.mock import MagicMock, create_autospec, patch
from uuid import uuid4

import pytest
from boto3.exceptions import Boto3Error
Expand All @@ -11,42 +12,111 @@
from palace.manager.service.logging.configuration import LogLevel


class CloudwatchCameraFixture:
def __init__(self, boto_client: MagicMock):
self.app = create_autospec(Celery)
self.configure_app()
self.app.tasks = {
"task1": MagicMock(),
"task2": MagicMock(),
"celery.built_in": MagicMock(),
}
self.client = boto_client
self.state = create_autospec(State)
self.state.tasks = {
"uuid1": self.mock_task("task1", "queue1", runtime=1.0),
"uuid2": self.mock_task("task1", "queue1", runtime=2.0),
"uuid3": self.mock_task("task2", "queue2", succeeded=False, failed=True),
"uuid4": self.mock_task(
"task2", "queue2", started=False, succeeded=False, uuid="uuid4"
),
}
self.create_cloudwatch = partial(Cloudwatch, state=self.state, app=self.app)

def mock_queue(self, name: str) -> MagicMock:
queue = MagicMock()
queue.name = name
return queue

def mock_task(
self,
name: str | None = None,
routing_key: str | None = None,
sent: bool = True,
started: bool = True,
succeeded: bool = True,
failed: bool = False,
runtime: float | None = None,
uuid: str | None = None,
) -> Task:
if uuid is None:
uuid = str(uuid4())
if name is None:
name = "task"
if routing_key is None:
routing_key = "queue"
return Task(
uuid=uuid,
name=name,
routing_key=routing_key,
sent=sent,
started=started,
succeeded=succeeded,
failed=failed,
runtime=runtime,
)

def configure_app(
self,
region: str = "region",
dry_run: bool = False,
manager_name: str = "manager",
namespace: str = "namespace",
upload_size: int = 100,
queues: list[str] | None = None,
) -> None:
queues = queues or ["queue1", "queue2"]
self.app.conf = {
"cloudwatch_statistics_region": region,
"cloudwatch_statistics_dryrun": dry_run,
"broker_transport_options": {"global_keyprefix": manager_name},
"cloudwatch_statistics_namespace": namespace,
"cloudwatch_statistics_upload_size": upload_size,
"task_queues": [self.mock_queue(queue) for queue in queues],
}


@pytest.fixture
def cloudwatch_camera():
with patch("boto3.client") as boto_client:
yield CloudwatchCameraFixture(boto_client)


class TestTaskStats:
def test_update(self):
mock_task = create_autospec(Task)
def test_update(self, cloudwatch_camera: CloudwatchCameraFixture):
stats = TaskStats()

mock_task.succeeded = True
mock_task.failed = False
mock_task.runtime = 1.0
mock_task = cloudwatch_camera.mock_task(runtime=1.0)
stats.update(mock_task)
assert stats.failed == 0
assert stats.succeeded == 1
assert stats.runtime == [1.0]

mock_task.succeeded = False
mock_task.failed = True
mock_task.runtime = None
mock_task = cloudwatch_camera.mock_task(succeeded=False, failed=True)
stats.update(mock_task)
assert stats.failed == 1
assert stats.succeeded == 1
assert stats.runtime == [1.0]

mock_task.succeeded = True
mock_task.failed = False
mock_task.runtime = 2.0
mock_task = cloudwatch_camera.mock_task(runtime=2.0)
stats.update(mock_task)
assert stats.failed == 1
assert stats.succeeded == 2
assert stats.runtime == [1.0, 2.0]

def test_update_with_none_runtime(self):
mock_task = create_autospec(Task)
mock_task.succeeded = True
mock_task.failed = False
mock_task.runtime = None

def test_update_with_none_runtime(self, cloudwatch_camera: CloudwatchCameraFixture):
stats = TaskStats()
mock_task = cloudwatch_camera.mock_task()
stats.update(mock_task)
assert stats.failed == 0
assert stats.succeeded == 1
Expand Down Expand Up @@ -97,29 +167,24 @@ def test_metrics_with_empty_runtime(self):


class TestQueueStats:
def test_update(self):
def test_update(self, cloudwatch_camera: CloudwatchCameraFixture):
stats = QueueStats()

assert len(stats.queued) == 0

mock_task = create_autospec(Task)
mock_task.uuid = "uuid"
mock_task.started = False
mock_task.sent = False
mock_task = cloudwatch_camera.mock_task(sent=False, started=False)

# Task is not started or sent, so it should not be in the queue.
stats.update(mock_task)
assert len(stats.queued) == 0

# Task is both sent and started, so its being processed and should not be in the queue.
mock_task.sent = True
mock_task.started = True
mock_task = cloudwatch_camera.mock_task(sent=True, started=True)
stats.update(mock_task)
assert len(stats.queued) == 0

# Task is sent but not started, so it should be in the queue.
mock_task.sent = True
mock_task.started = False
mock_task = cloudwatch_camera.mock_task(sent=True, started=False)
stats.update(mock_task)
assert len(stats.queued) == 1

Expand Down Expand Up @@ -152,57 +217,6 @@ def test_metrics(self):
assert metric["Value"] == 0


class CloudwatchCameraFixture:
def __init__(self, boto_client: MagicMock):
self.app = create_autospec(Celery)
self.configure_app()
self.app.tasks = {
"task1": MagicMock(),
"task2": MagicMock(),
"celery.built_in": MagicMock(),
}
self.client = boto_client
self.state = create_autospec(State)
self.state.tasks = {
"task1": self.mock_task(),
"task2": self.mock_task(),
}
self.create_cloudwatch = partial(Cloudwatch, state=self.state, app=self.app)

def mock_queue(self, name: str) -> MagicMock:
queue = MagicMock()
queue.name = name
return queue

def mock_task(self) -> MagicMock:
return MagicMock(spec=Task)

def configure_app(
self,
region: str = "region",
dry_run: bool = False,
manager_name: str = "manager",
namespace: str = "namespace",
upload_size: int = 100,
queues: list[str] | None = None,
) -> None:
queues = queues or ["queue1", "queue2"]
self.app.conf = {
"cloudwatch_statistics_region": region,
"cloudwatch_statistics_dryrun": dry_run,
"broker_transport_options": {"global_keyprefix": manager_name},
"cloudwatch_statistics_namespace": namespace,
"cloudwatch_statistics_upload_size": upload_size,
"task_queues": [self.mock_queue(queue) for queue in queues],
}


@pytest.fixture
def cloudwatch_camera():
with patch("boto3.client") as boto_client:
yield CloudwatchCameraFixture(boto_client)


class TestCloudwatch:
def test__init__(self, cloudwatch_camera: CloudwatchCameraFixture):
cloudwatch = cloudwatch_camera.create_cloudwatch()
Expand Down Expand Up @@ -231,8 +245,14 @@ def test_on_shutter(self, cloudwatch_camera: CloudwatchCameraFixture):
mock_publish.assert_called_once()
[tasks, queues, time] = mock_publish.call_args.args

assert tasks == {"task1": TaskStats(), "task2": TaskStats()}
assert queues == {"queue1": QueueStats(), "queue2": QueueStats()}
assert tasks == {
"task1": TaskStats(succeeded=2, runtime=[1.0, 2.0]),
"task2": TaskStats(failed=1),
}
assert queues == {
"queue1": QueueStats(),
"queue2": QueueStats(queued={"uuid4"}),
}
assert time.isoformat() == "2021-01-01T00:00:00+00:00"

def test_on_shutter_error(
Expand All @@ -248,7 +268,7 @@ def test_on_shutter_error(
mock_publish.assert_called_once()
[tasks, queues, time] = mock_publish.call_args.args

assert tasks == {"task1": TaskStats()}
assert tasks == {"task1": TaskStats(succeeded=2, runtime=[1.0, 2.0])}
assert queues == {"queue1": QueueStats(), "queue2": QueueStats()}
assert time is not None
assert "Error processing task" in caplog.text
Expand Down

0 comments on commit 75e0a6b

Please sign in to comment.