Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: OTel metrics & metering #1011

Open
wants to merge 2 commits into
base: jjaakola-aiven-fastapi
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions container/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
KARAPACE_GROUP_ID: karapace-schema-registry
KARAPACE_MASTER_ELIGIBILITY: true
KARAPACE_TOPIC_NAME: _schemas
KARAPACE_LOG_LEVEL: DEBUG
KARAPACE_LOG_LEVEL: INFO
KARAPACE_COMPATIBILITY: FULL
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
Expand Down Expand Up @@ -118,7 +118,7 @@ services:
KARAPACE_REGISTRY_HOST: karapace-schema-registry
KARAPACE_REGISTRY_PORT: 8081
KARAPACE_ADMIN_METADATA_MAX_AGE: 0
KARAPACE_LOG_LEVEL: DEBUG
KARAPACE_LOG_LEVEL: INFO
KARAPACE_STATSD_HOST: statsd-exporter
KARAPACE_STATSD_PORT: 8125
KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false
Expand Down Expand Up @@ -154,6 +154,11 @@ services:

prometheus:
image: prom/prometheus
command:
- --storage.tsdb.path=/prometheus
- --storage.tsdb.retention.time=1d
- --enable-feature=otlp-write-receiver
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
- ./prometheus/rules.yml:/etc/prometheus/rules.yml
Expand Down
2 changes: 1 addition & 1 deletion container/opentelemetry/collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ exporters:
tls:
insecure: true
otlphttp/prometheus:
endpoint: prometheus:9090/api/v1/otlp
endpoint: http://prometheus:9090/api/v1/otlp
tls:
insecure: true

Expand Down
4 changes: 4 additions & 0 deletions container/prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ global:
scrape_timeout: 5s # How long until a scrape request times out.
evaluation_interval: 10s # How frequently to evaluate rules.

storage:
tsdb:
out_of_order_time_window: 30m

rule_files:
- /etc/prometheus/rules.yml

Expand Down
6 changes: 3 additions & 3 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
HOSTNAME = socket.gethostname()


OTEL_VERSION = ""
try:
from opentelemetry import version as otel_version

OTEL_VERSION = otel_version.__version__
except: # pylint: disable=bare-except
pass
except Exception:
OTEL_VERSION = ""


class KarapaceTags(BaseModel):
Expand All @@ -47,6 +46,7 @@ class KarapaceTelemetryOTelExporter(str, enum.Enum):
class KarapaceTelemetry(BaseModel):
otel_endpoint_url: str | None = None
otel_exporter: KarapaceTelemetryOTelExporter = KarapaceTelemetryOTelExporter.NOOP
metrics_export_interval_milliseconds: int = 10000
resource_service_name: str = "karapace"
resource_service_instance_id: str = "karapace"
resource_telemetry_sdk_name: str = "opentelemetry"
Expand Down
2 changes: 2 additions & 0 deletions src/schema_registry/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import schema_registry.routers.mode
import schema_registry.routers.schemas
import schema_registry.routers.subjects
import schema_registry.telemetry.meter
import schema_registry.telemetry.middleware
import schema_registry.telemetry.setup
import schema_registry.telemetry.tracer
Expand All @@ -31,6 +32,7 @@
__name__,
schema_registry.controller,
schema_registry.telemetry.tracer,
schema_registry.telemetry.meter,
]
)

Expand Down
3 changes: 2 additions & 1 deletion src/schema_registry/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from schema_registry.middlewares import setup_middlewares
from schema_registry.registry import KarapaceSchemaRegistry
from schema_registry.routers.setup import setup_routers
from schema_registry.telemetry.setup import setup_tracing
from schema_registry.telemetry.setup import setup_metering, setup_tracing
from typing import AsyncContextManager

import logging
Expand Down Expand Up @@ -60,6 +60,7 @@ def create_karapace_application(
app = FastAPI(lifespan=lifespan) # type: ignore[arg-type]

setup_tracing()
setup_metering()
setup_routers(app=app)
setup_exception_handlers(app=app)
setup_middlewares(app=app)
Expand Down
10 changes: 7 additions & 3 deletions src/schema_registry/telemetry/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.semconv.attributes import telemetry_attributes as T
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.tracer import Tracer


def create_tracing_resource(config: Config) -> Resource:
def create_telemetry_resource(config: Config) -> Resource:
return Resource.create(
{
"service.name": config.telemetry.resource_service_name,
Expand All @@ -26,6 +27,9 @@ def create_tracing_resource(config: Config) -> Resource:

class TelemetryContainer(containers.DeclarativeContainer):
karapace_container = providers.Container(KarapaceContainer)
tracing_resource = providers.Factory(create_tracing_resource, config=karapace_container.config)
tracer_provider = providers.Singleton(TracerProvider, resource=tracing_resource)

telemetry_resource = providers.Factory(create_telemetry_resource, config=karapace_container.config)

meter = providers.Singleton(Meter)
tracer = providers.Singleton(Tracer)
tracer_provider = providers.Singleton(TracerProvider, resource=telemetry_resource)
37 changes: 37 additions & 0 deletions src/schema_registry/telemetry/meter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from dependency_injector.wiring import inject, Provide
from karapace.config import Config
from karapace.container import KarapaceContainer
from opentelemetry import metrics
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics.export import (
ConsoleMetricExporter,
MetricExporter,
MetricReader,
PeriodicExportingMetricReader,
)
from typing import Final


class Meter:
START_TIME_KEY: Final = "start_time"

@staticmethod
@inject
def get_meter(config: Config = Provide[KarapaceContainer.config]) -> metrics.Meter:
return metrics.get_meter_provider().get_meter(f"{config.tags.app}.meter")

@staticmethod
@inject
def get_metric_reader(config: Config = Provide[KarapaceContainer.config]) -> MetricReader:
exporter: MetricExporter = ConsoleMetricExporter()
if config.telemetry.otel_endpoint_url:
exporter = OTLPMetricExporter(endpoint=config.telemetry.otel_endpoint_url)
return PeriodicExportingMetricReader(
exporter=exporter,
export_interval_millis=config.telemetry.metrics_export_interval_milliseconds,
)
50 changes: 50 additions & 0 deletions src/schema_registry/telemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from collections.abc import Awaitable, Callable
from dependency_injector.wiring import inject, Provide
from fastapi import FastAPI, Request, Response
from opentelemetry.metrics import Counter, Histogram, UpDownCounter
from opentelemetry.trace import SpanKind
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.tracer import Tracer

import logging
import time

LOG = logging.getLogger(__name__)

Expand All @@ -20,12 +23,59 @@ async def telemetry_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
tracer: Tracer = Provide[TelemetryContainer.tracer],
meter: Meter = Provide[TelemetryContainer.meter],
) -> Response:
resource = request.url.path.split("/")[1]
with tracer.get_tracer().start_as_current_span(name=f"{request.method}: /{resource}", kind=SpanKind.SERVER) as span:
span.add_event("Creating metering resources")
karapace_http_requests_in_progress: UpDownCounter = meter.get_meter().create_up_down_counter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this creating the counter or reusing the existing one? Similar question for the duration histogram and requests counter.

name="karapace_http_requests_in_progress",
description="In-progress requests for HTTP/TCP Protocol",
)
karapace_http_requests_duration_seconds: Histogram = meter.get_meter().create_histogram(
unit="seconds",
name="karapace_http_requests_duration_seconds",
description="Request Duration for HTTP/TCP Protocol",
)
karapace_http_requests_total: Counter = meter.get_meter().create_counter(
name="karapace_http_requests_total",
description="Total Request Count for HTTP/TCP Protocol",
)

# Set start time for request
setattr(request.state, meter.START_TIME_KEY, time.monotonic())

# Extract request labels
path = request.url.path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create quite large cardinality when Karapace handles large number of topics and versions.

method = request.method

# Increment requests in progress before response handler
span.add_event("Metering requests in progress (increase)")
karapace_http_requests_in_progress.add(amount=1, attributes={"method": method, "path": path})

# Call request handler
tracer.update_span_with_request(request=request, span=span)
span.add_event("Calling request handler")
response: Response = await call_next(request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If call to next fails, are the subsequent calls to tracer and meters called?

tracer.update_span_with_response(response=response, span=span)

# Instrument request duration
span.add_event("Metering request duration")
karapace_http_requests_duration_seconds.record(
amount=(time.monotonic() - getattr(request.state, meter.START_TIME_KEY)),
attributes={"method": method, "path": path},
)

# Instrument total requests
span.add_event("Metering total requests")
karapace_http_requests_total.add(
amount=1, attributes={"method": method, "path": path, "status": response.status_code}
)

# Decrement requests in progress after response handler
span.add_event("Metering requests in progress (decrease)")
karapace_http_requests_in_progress.add(amount=-1, attributes={"method": method, "path": path})

return response


Expand Down
14 changes: 13 additions & 1 deletion src/schema_registry/telemetry/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
"""

from dependency_injector.wiring import inject, Provide
from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from schema_registry.telemetry.container import TelemetryContainer
from schema_registry.telemetry.meter import Meter
from schema_registry.telemetry.tracer import Tracer

import logging
Expand All @@ -22,3 +25,12 @@ def setup_tracing(
LOG.info("Setting OTel tracing provider")
tracer_provider.add_span_processor(tracer.get_span_processor())
trace.set_tracer_provider(tracer_provider)


@inject
def setup_metering(
meter: Meter = Provide[TelemetryContainer.meter],
telemetry_resource: Resource = Provide[TelemetryContainer.telemetry_resource],
) -> None:
LOG.info("Setting OTel meter provider")
metrics.set_meter_provider(MeterProvider(resource=telemetry_resource, metric_readers=[meter.get_metric_reader()]))
2 changes: 2 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pytest
import re
import schema_registry.controller
import schema_registry.telemetry.meter
import schema_registry.telemetry.middleware
import schema_registry.telemetry.setup
import schema_registry.telemetry.tracer
Expand Down Expand Up @@ -195,6 +196,7 @@ def fixture_karapace_container() -> KarapaceContainer:
modules=[
schema_registry.controller,
schema_registry.telemetry.tracer,
schema_registry.telemetry.meter,
]
)
return karapace_container
Expand Down
51 changes: 51 additions & 0 deletions tests/unit/schema_registry/telemetry/test_meter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
schema_registry - telemetry meter tests

Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""

from karapace.config import KarapaceTelemetry
from karapace.container import KarapaceContainer
from schema_registry.telemetry.meter import Meter
from unittest.mock import patch


def test_meter(karapace_container: KarapaceContainer):
with patch("schema_registry.telemetry.meter.metrics") as mock_metrics:
Meter.get_meter(config=karapace_container.config())
mock_metrics.get_meter_provider.return_value.get_meter.assert_called_once_with("Karapace.meter")


def test_get_metric_reader_without_otel_endpoint(karapace_container: KarapaceContainer) -> None:
config = karapace_container.config().set_config_defaults(
new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url=None)}
)
with (
patch("schema_registry.telemetry.meter.ConsoleMetricExporter") as mock_console_exporter,
patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader,
):
reader = Meter.get_metric_reader(config=config)
mock_console_exporter.assert_called_once()
mock_periodic_exporting_metric_reader.assert_called_once_with(
exporter=mock_console_exporter.return_value,
export_interval_millis=10000,
)
assert reader is mock_periodic_exporting_metric_reader.return_value


def test_get_metric_reader_with_otel_endpoint(karapace_container: KarapaceContainer) -> None:
config = karapace_container.config().set_config_defaults(
new_config={"telemetry": KarapaceTelemetry(otel_endpoint_url="http://otel:4317")}
)
with (
patch("schema_registry.telemetry.meter.OTLPMetricExporter") as mock_otlp_exporter,
patch("schema_registry.telemetry.meter.PeriodicExportingMetricReader") as mock_periodic_exporting_metric_reader,
):
reader = Meter.get_metric_reader(config=config)
mock_otlp_exporter.assert_called_once_with(endpoint="http://otel:4317")
mock_periodic_exporting_metric_reader.assert_called_once_with(
exporter=mock_otlp_exporter.return_value,
export_interval_millis=10000,
)
assert reader is mock_periodic_exporting_metric_reader.return_value
Loading
Loading