Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add otel_headers to OTel logic #44258

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,13 @@ metrics:
type: string
example: ~
default: "8889"
otel_headers:
description: |
Headers to be added to the traces.
version_added: 3.0.0
type: string
example: 'Authorization="Bearer token",Potato="Tomato"'
default: ""
otel_prefix:
description: |
The prefix for the Airflow metrics.
Expand Down Expand Up @@ -1239,6 +1246,13 @@ traces:
type: string
example: ~
default: "8889"
otel_headers:
description: |
Headers to be added to the traces.
version_added: 3.0.0
type: string
example: 'Authorization="Bearer token",Potato="Tomato"'
default: ""
otel_service:
description: |
The default service name of traces.
Expand Down
5 changes: 4 additions & 1 deletion airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ def poke_gauge(self, name: str, attributes: Attributes = None) -> GaugeValues:
def get_otel_logger(cls) -> SafeOtelLogger:
host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector"
port = conf.getint("metrics", "otel_port") # ex: 4318
headers = dict([
header.split('=', 1) for header in conf.getlist("traces", "otel_headers")
])
prefix = conf.get("metrics", "otel_prefix") # ex: "airflow"
ssl_active = conf.getboolean("metrics", "otel_ssl_active")
# PeriodicExportingMetricReader will default to an interval of 60000 millis.
Expand All @@ -409,7 +412,7 @@ def get_otel_logger(cls) -> SafeOtelLogger:
PeriodicExportingMetricReader(
OTLPMetricExporter(
endpoint=endpoint,
headers={"Content-Type": "application/json"},
headers={"Content-Type": "application/json", **headers},
),
export_interval_millis=interval,
)
Expand Down
8 changes: 7 additions & 1 deletion airflow/traces/otel_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,20 @@ def get_otel_tracer(cls) -> OtelTrace:
"""Get OTEL tracer from airflow configuration."""
host = conf.get("traces", "otel_host")
port = conf.getint("traces", "otel_port")
headers = dict([
header.split('=', 1) for header in conf.getlist("traces", "otel_headers")
])
ssl_active = conf.getboolean("traces", "otel_ssl_active")
tag_string = cls.get_constant_tags()

protocol = "https" if ssl_active else "http"
endpoint = f"{protocol}://{host}:{port}/v1/traces"
log.info("[OTLPSpanExporter] Connecting to OpenTelemetry Collector at %s", endpoint)
return OtelTrace(
span_exporter=OTLPSpanExporter(endpoint=endpoint, headers={"Content-Type": "application/json"}),
span_exporter=OTLPSpanExporter(
endpoint=endpoint,
headers={"Content-Type": "application/json", **headers}
),
tag_string=tag_string,
)

Expand Down