-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: OTel metrics & metering #1011
base: jjaakola-aiven-fastapi
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
""" | ||
Copyright (c) 2024 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
|
||
from dependency_injector.wiring import inject, Provide | ||
from karapace.config import Config | ||
from karapace.container import KarapaceContainer | ||
from opentelemetry import metrics | ||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter | ||
from opentelemetry.sdk.metrics.export import ( | ||
ConsoleMetricExporter, | ||
MetricExporter, | ||
MetricReader, | ||
PeriodicExportingMetricReader, | ||
) | ||
from typing import Final | ||
|
||
|
||
class Meter: | ||
START_TIME_KEY: Final = "start_time" | ||
|
||
@staticmethod | ||
@inject | ||
def get_meter(config: Config = Provide[KarapaceContainer.config]) -> metrics.Meter: | ||
return metrics.get_meter_provider().get_meter(f"{config.tags.app}.meter") | ||
|
||
@staticmethod | ||
@inject | ||
def get_metric_reader(config: Config = Provide[KarapaceContainer.config]) -> MetricReader: | ||
exporter: MetricExporter = ConsoleMetricExporter() | ||
if config.telemetry.otel_endpoint_url: | ||
exporter = OTLPMetricExporter(endpoint=config.telemetry.otel_endpoint_url) | ||
return PeriodicExportingMetricReader( | ||
exporter=exporter, | ||
export_interval_millis=config.telemetry.metrics_export_interval_milliseconds, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,11 +6,14 @@ | |
from collections.abc import Awaitable, Callable | ||
from dependency_injector.wiring import inject, Provide | ||
from fastapi import FastAPI, Request, Response | ||
from opentelemetry.metrics import Counter, Histogram, UpDownCounter | ||
from opentelemetry.trace import SpanKind | ||
from schema_registry.telemetry.container import TelemetryContainer | ||
from schema_registry.telemetry.meter import Meter | ||
from schema_registry.telemetry.tracer import Tracer | ||
|
||
import logging | ||
import time | ||
|
||
LOG = logging.getLogger(__name__) | ||
|
||
|
@@ -20,12 +23,59 @@ async def telemetry_middleware( | |
request: Request, | ||
call_next: Callable[[Request], Awaitable[Response]], | ||
tracer: Tracer = Provide[TelemetryContainer.tracer], | ||
meter: Meter = Provide[TelemetryContainer.meter], | ||
) -> Response: | ||
resource = request.url.path.split("/")[1] | ||
with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span: | ||
span.add_event("Creating metering resources") | ||
karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter( | ||
name="karapace_http_requests_in_progress", | ||
description="In-progress requests for HTTP/TCP Protocol", | ||
) | ||
karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram( | ||
unit="seconds", | ||
name="karapace_http_requests_duration_seconds", | ||
description="Request Duration for HTTP/TCP Protocol", | ||
) | ||
karapace_http_requests_total: Counter = meter.get_meter().create_counter( | ||
name="karapace_http_requests_total", | ||
description="Total Request Count for HTTP/TCP Protocol", | ||
) | ||
|
||
# Set start time for request | ||
setattr(request.state, meter.START_TIME_KEY, time.monotonic()) | ||
|
||
# Extract request labels | ||
path = request.url.path | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will create quite large cardinality when Karapace handles large number of topics and versions. |
||
method = request.method | ||
|
||
# Increment requests in progress before response handler | ||
span.add_event("Metering requests in progress (increase)") | ||
karapace_http_requests_in_progress.add(amount=1, attributes={"method": method, "path": path}) | ||
|
||
# Call request handler | ||
tracer.update_span_with_request(request=request, span=span) | ||
span.add_event("Calling request handler") | ||
response: Response = await call_next(request) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If call to next fails, are the subsequent calls to tracer and meters called? |
||
tracer.update_span_with_response(response=response, span=span) | ||
|
||
# Instrument request duration | ||
span.add_event("Metering request duration") | ||
karapace_http_requests_duration_seconds.record( | ||
amount=(time.monotonic() - getattr(request.state, meter.START_TIME_KEY)), | ||
attributes={"method": method, "path": path}, | ||
) | ||
|
||
# Instrument total requests | ||
span.add_event("Metering total requests") | ||
karapace_http_requests_total.add( | ||
amount=1, attributes={"method": method, "path": path, "status": response.status_code} | ||
) | ||
|
||
# Decrement requests in progress after response handler | ||
span.add_event("Metering requests in progress (decrease)") | ||
karapace_http_requests_in_progress.add(amount=-1, attributes={"method": method, "path": path}) | ||
|
||
return response | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
""" | ||
schema_registry - telemetry meter tests | ||
|
||
Copyright (c) 2024 Aiven Ltd | ||
See LICENSE for details | ||
""" | ||
|
||
from karapace.config import KarapaceTelemetry | ||
from karapace.container import KarapaceContainer | ||
from schema_registry.telemetry.meter import Meter | ||
from unittest.mock import patch | ||
|
||
|
||
def test_meter(karapace_container: KarapaceContainer): | ||
with patch("schema_registry.telemetry.meter.metrics") as mock_metrics: | ||
Meter.get_meter(config=karapace_container.config()) | ||
mock_metrics.get_meter_provider.return_value.get_meter.assert_called_once_with("Karapace.meter") | ||
|
||
|
||
def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None: | ||
config = karapace_container.config().set_config_defaults( | ||
new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)} | ||
) | ||
with ( | ||
patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter, | ||
patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, | ||
): | ||
reader = Meter.get_metric_reader(config=config) | ||
mock_console_exporter.assert_called_once() | ||
mock_periodic_exporting_metric_reader.assert_called_once_with( | ||
exporter=mock_console_exporter.return_value, | ||
export_interval_millis=10000, | ||
) | ||
assert reader is mock_periodic_exporting_metric_reader.return_value | ||
|
||
|
||
def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None: | ||
config = karapace_container.config().set_config_defaults( | ||
new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")} | ||
) | ||
with ( | ||
patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter, | ||
patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader, | ||
): | ||
reader = Meter.get_metric_reader(config=config) | ||
mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317") | ||
mock_periodic_exporting_metric_reader.assert_called_once_with( | ||
exporter=mock_otlp_exporter.return_value, | ||
export_interval_millis=10000, | ||
) | ||
assert reader is mock_periodic_exporting_metric_reader.return_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this creating the counter or reusing the existing one? Similar question for the duration histogram and requests counter.