Skip to content

Commit

Permalink
speed up exporter tests (#705)
Browse files Browse the repository at this point in the history
* speed up exporter tests by introducing asgiref async testing
  • Loading branch information
ekneg54 authored Nov 14, 2024
1 parent 205045e commit 50ee02e
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 68 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
"python.testing.pytestEnabled": true,
"python.testing.unittestEnabled": false,
"python.analysis.importFormat": "absolute",
"python.testing.pytestPath": "pytest",
"editor.formatOnSave": true,
"editor.rulers": [
80,
100,
120
],
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit"
"source.organizeImports": "explicit",
"source.unusedImports": "always",
"source.convertImportFormat": "always"
},
"autoDocstring.docstringFormat": "numpy",
"files.exclude": {
Expand Down
6 changes: 5 additions & 1 deletion logprep/metrics/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, configuration: MetricsConfig):
self.server = None
self.healthcheck_functions = None
self._multiprocessing_prepared = False
self.app = None

def prepare_multiprocessing(self):
"""
Expand Down Expand Up @@ -99,10 +100,12 @@ def run(self, daemon=True):

def init_server(self, daemon=True) -> None:
"""Initializes the server"""
if not self.app:
self.app = make_patched_asgi_app(self.healthcheck_functions)
port = self.configuration.port
self.server = http.ThreadingHTTPServer(
self.configuration.uvicorn_config | {"port": port, "host": "0.0.0.0"},
make_patched_asgi_app(self.healthcheck_functions),
self.app,
daemon=daemon,
logger_name="Exporter",
)
Expand All @@ -116,6 +119,7 @@ def restart(self):
def update_healthchecks(self, healthcheck_functions: Iterable[Callable], daemon=True) -> None:
"""Updates the healthcheck functions"""
self.healthcheck_functions = healthcheck_functions
self.app = make_patched_asgi_app(self.healthcheck_functions)
if self.server and self.server.thread and self.server.thread.is_alive():
self.server.shut_down()
self.init_server(daemon=daemon)
Expand Down
12 changes: 7 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ keywords = [
"logdata",
]
dependencies = [
"aiohttp>=3.9.2", # CVE-2024-23334
"aiohttp>=3.9.2", # CVE-2024-23334
"attrs",
"certifi>=2023.7.22", # CVE-2023-37920
"ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11
"certifi>=2023.7.22", # CVE-2023-37920
"ciso8601", # fastest iso8601 datetime parser. can be removed after dropping support for python < 3.11
"colorama",
"confluent-kafka>2",
"geoip2",
"hyperscan>=0.7.0",
"jsonref",
"luqum",
"more-itertools==8.10.0",
"mysql-connector-python>=9.1.0", # CVE-2024-21272
"mysql-connector-python>=9.1.0", # CVE-2024-21272
"numpy>=1.26.0",
"opensearch-py",
"prometheus_client",
Expand All @@ -84,7 +84,7 @@ dependencies = [
"schedule",
"tldextract",
"urlextract",
"urllib3>=1.26.17", # CVE-2023-43804
"urllib3>=1.26.17", # CVE-2023-43804
"uvicorn",
"deepdiff",
"msgspec",
Expand Down Expand Up @@ -113,6 +113,8 @@ dev = [
"jinja2",
"maturin",
"cibuildwheel",
"asgiref",
"pytest-asyncio",
]

doc = [
Expand Down
9 changes: 8 additions & 1 deletion tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=missing-docstring
# pylint: disable=line-too-long
# pylint: disable=too-many-locals
import os
import re
import tempfile
Expand Down Expand Up @@ -115,7 +117,7 @@ def test_start_of_logprep_from_http_with_templated_url_and_config():
output = proc.stdout.readline().decode("utf8")


def test_logprep_exposes_prometheus_metrics(tmp_path):
def test_logprep_exposes_prometheus_metrics_and_healthchecks(tmp_path):
temp_dir = tempfile.gettempdir()
input_file_path = Path(os.path.join(temp_dir, "input.txt"))
input_file_path.touch()
Expand Down Expand Up @@ -246,4 +248,9 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
len(re.findall(both_calculators, metrics)) == 4
), "More or less than 4 rules were found for both calculator"

# check health endpoint
response = requests.get("http://127.0.0.1:8003/health", timeout=7)
response.raise_for_status()
assert "OK" == response.text

proc.kill()
143 changes: 83 additions & 60 deletions tests/unit/metrics/test_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
# pylint: disable=protected-access
# pylint: disable=attribute-defined-outside-init
# pylint: disable=line-too-long
import asyncio
import os.path
from unittest import mock

import pytest
import requests
from prometheus_client import REGISTRY
from asgiref.testing import ApplicationCommunicator
from prometheus_client import CollectorRegistry

from logprep.metrics.exporter import PrometheusExporter
from logprep.metrics.exporter import PrometheusExporter, make_patched_asgi_app
from logprep.util import http
from logprep.util.configuration import MetricsConfig

Expand All @@ -20,7 +21,6 @@
)
class TestPrometheusExporter:
def setup_method(self):
REGISTRY.__init__()
self.metrics_config = MetricsConfig(enabled=True, port=8000)

def test_correct_setup(self):
Expand Down Expand Up @@ -106,80 +106,103 @@ def test_is_running_returns_true_when_server_thread_is_alive(self):
assert exporter.is_running


@mock.patch(
"logprep.util.http.ThreadingHTTPServer", new=mock.create_autospec(http.ThreadingHTTPServer)
)
@mock.patch(
"logprep.metrics.exporter.PrometheusExporter.prepare_multiprocessing",
new=lambda *args, **kwargs: None,
)
class TestHealthEndpoint:
"""These tests uses the `asgiref.testing.ApplicationCommunicator` to test the ASGI app itself
For more information see: https://dokk.org/documentation/django-channels/2.4.0/topics/testing/
"""

def setup_method(self):
REGISTRY.__init__()
self.metrics_config = MetricsConfig(enabled=True, port=8000)
self.registry = CollectorRegistry()
self.captured_status = None
self.captured_headers = None
# Setup ASGI scope
self.scope = {
"client": ("127.0.0.1", 32767),
"headers": [],
"http_version": "1.0",
"method": "GET",
"path": "/",
"query_string": b"",
"scheme": "http",
"server": ("127.0.0.1", 80),
"type": "http",
}
self.communicator = None

def teardown_method(self):
if self.communicator:
asyncio.get_event_loop().run_until_complete(self.communicator.wait())

def seed_app(self, app):
self.communicator = ApplicationCommunicator(app, self.scope)

def test_health_endpoint_returns_503_as_default_health_state(self):
exporter = PrometheusExporter(self.metrics_config)
exporter.run(daemon=False)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.status_code == 503
exporter.server.shut_down()

def test_health_endpoint_calls_health_check_functions(self):
@pytest.mark.parametrize(
"functions, expected_status, expected_body",
[
([lambda: True], 200, b"OK"),
([lambda: True, lambda: True], 200, b"OK"),
([lambda: False], 503, b"FAIL"),
([lambda: False, lambda: False], 503, b"FAIL"),
([lambda: False, lambda: True, lambda: True], 503, b"FAIL"),
],
)
@pytest.mark.asyncio
async def test_asgi_app(self, functions, expected_status, expected_body):
app = make_patched_asgi_app(functions)
self.scope["path"] = "/health"
self.seed_app(app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == expected_status
event = await self.communicator.receive_output(timeout=1)
assert expected_body in event["body"]

@pytest.mark.asyncio
async def test_health_endpoint_calls_health_check_functions(self):
exporter = PrometheusExporter(self.metrics_config)
function_mock = mock.Mock(return_value=True)
exporter.healthcheck_functions = [function_mock]
exporter.run(daemon=False)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.status_code == 200
assert function_mock.call_count == 1

exporter.server.shut_down()

def test_health_endpoint_calls_updated_functions(self):
self.scope["path"] = "/health"
self.seed_app(exporter.app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == 200
event = await self.communicator.receive_output(timeout=1)
assert b"OK" in event["body"]
function_mock.assert_called_once()

@pytest.mark.asyncio
async def test_update_health_checks_injects_new_functions(self):
exporter = PrometheusExporter(self.metrics_config)
function_mock = mock.Mock(return_value=True)
exporter.healthcheck_functions = [function_mock]
exporter.run(daemon=False)
requests.get("http://localhost:8000/health", timeout=0.5)
exporter.server.thread = None
self.scope["path"] = "/health"
self.seed_app(exporter.app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == 200
event = await self.communicator.receive_output(timeout=1)
assert b"OK" in event["body"]
assert function_mock.call_count == 1, "initial function should be called"
new_function_mock = mock.Mock(return_value=True)
exporter.update_healthchecks([new_function_mock])
requests.get("http://localhost:8000/health", timeout=0.5)
self.scope["path"] = "/health"
self.seed_app(exporter.app)
await self.communicator.send_input({"type": "http.request"})
event = await self.communicator.receive_output(timeout=1)
assert event["status"] == 200
event = await self.communicator.receive_output(timeout=1)
assert b"OK" in event["body"]
assert new_function_mock.call_count == 1, "New function should be called"
assert function_mock.call_count == 1, "Old function should not be called"

exporter.server.shut_down()

@pytest.mark.parametrize(
"functions, expected",
[
([lambda: True], 200),
([lambda: True, lambda: True], 200),
([lambda: False], 503),
([lambda: False, lambda: False], 503),
([lambda: False, lambda: True, lambda: True], 503),
],
)
def test_health_check_returns_status_code(self, functions, expected):
exporter = PrometheusExporter(self.metrics_config)
exporter.run(daemon=False)
exporter.update_healthchecks(functions)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.status_code == expected
exporter.server.shut_down()

@pytest.mark.parametrize(
"functions, expected",
[
([lambda: True], "OK"),
([lambda: True, lambda: True], "OK"),
([lambda: False], "FAIL"),
([lambda: False, lambda: False], "FAIL"),
([lambda: False, lambda: True, lambda: True], "FAIL"),
],
)
def test_health_check_returns_body(self, functions, expected):
exporter = PrometheusExporter(self.metrics_config)
exporter.run(daemon=False)
exporter.update_healthchecks(functions)
resp = requests.get("http://localhost:8000/health", timeout=0.5)
assert resp.content.decode() == expected
exporter.server.shut_down()

0 comments on commit 50ee02e

Please sign in to comment.