Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP semantic convention stability migration for httpx #2631

Merged
merged 11 commits into from
Jul 3, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2616](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2616))
- `opentelemetry-instrumentation-confluent-kafka` Add support for produce purge
([#2638](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2638))
- `opentelemetry-instrumentation-httpx` Implement new semantic convention opt-in migration with stable http semantic conventions
([#2631](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2631))

### Breaking changes

Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
| [opentelemetry-instrumentation-fastapi](./opentelemetry-instrumentation-fastapi) | fastapi ~= 0.58 | Yes | experimental
| [opentelemetry-instrumentation-flask](./opentelemetry-instrumentation-flask) | flask >= 1.0 | Yes | migration
| [opentelemetry-instrumentation-grpc](./opentelemetry-instrumentation-grpc) | grpcio ~= 1.27 | No | experimental
| [opentelemetry-instrumentation-httpx](./opentelemetry-instrumentation-httpx) | httpx >= 0.18.0 | No | experimental
| [opentelemetry-instrumentation-httpx](./opentelemetry-instrumentation-httpx) | httpx >= 0.18.0 | No | migration
| [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 | No | experimental
| [opentelemetry-instrumentation-kafka-python](./opentelemetry-instrumentation-kafka-python) | kafka-python >= 2.0 | No | experimental
| [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging | No | experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ async def async_response_hook(span, request, response):

import httpx

from opentelemetry.instrumentation._semconv import (
_get_schema_url,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_report_new,
_set_http_host,
_set_http_method,
_set_http_network_protocol_version,
_set_http_peer_port_client,
_set_http_status_code,
_set_http_url,
)
from opentelemetry.instrumentation.httpx.package import _instruments
from opentelemetry.instrumentation.httpx.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
Expand All @@ -204,11 +216,15 @@ async def async_response_hook(span, request, response):
is_http_instrumentation_enabled,
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
from opentelemetry.semconv.attributes.network_attributes import (
NETWORK_PEER_ADDRESS,
NETWORK_PEER_PORT,
)
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.span import Span
from opentelemetry.trace.status import Status
from opentelemetry.util.http import remove_url_credentials
from opentelemetry.trace.status import StatusCode
from opentelemetry.util.http import remove_url_credentials, sanitize_method

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -242,25 +258,11 @@ class ResponseInfo(typing.NamedTuple):


def _get_default_span_name(method: str) -> str:
return method.strip()


def _apply_status_code(span: Span, status_code: int) -> None:
if not span.is_recording():
return

span.set_attribute(SpanAttributes.HTTP_STATUS_CODE, status_code)
span.set_status(Status(http_status_to_status_code(status_code)))
method = sanitize_method(method.upper().strip())
if method == "_OTHER":
method = "HTTP"


def _prepare_attributes(method: bytes, url: URL) -> typing.Dict[str, str]:
_method = method.decode().upper()
_url = str(httpx.URL(url))
span_attributes = {
SpanAttributes.HTTP_METHOD: _method,
SpanAttributes.HTTP_URL: _url,
}
return span_attributes
return method


def _prepare_headers(headers: typing.Optional[Headers]) -> httpx.Headers:
Expand All @@ -279,7 +281,7 @@ def _extract_parameters(args, kwargs):
else:
# In httpx < 0.20.0, handle_request receives the parameters separately
method = args[0]
url = args[1]
url = httpx.URL(args[1])
emdneto marked this conversation as resolved.
Show resolved Hide resolved
headers = kwargs.get("headers", args[2] if len(args) > 2 else None)
stream = kwargs.get("stream", args[3] if len(args) > 3 else None)
extensions = kwargs.get(
Expand All @@ -299,6 +301,77 @@ def _inject_propagation_headers(headers, args, kwargs):
kwargs["headers"] = _headers.raw


def _extract_response(
response: typing.Union[
httpx.Response, typing.Tuple[int, Headers, httpx.SyncByteStream, dict]
]
) -> typing.Tuple[int, Headers, httpx.SyncByteStream, dict, str]:
if isinstance(response, httpx.Response):
status_code = response.status_code
headers = response.headers
stream = response.stream
extensions = response.extensions
http_version = response.http_version
else:
status_code, headers, stream, extensions = response
http_version = extensions.get("http_version", b"HTTP/1.1").decode(
"ascii", errors="ignore"
)

return (status_code, headers, stream, extensions, http_version)


def _apply_request_client_attributes_to_span(
span_attributes, url, method_original, span_name, semconv
):
# http semconv transition: http.method -> http.request.method
_set_http_method(span_attributes, method_original, span_name, semconv)
# http semconv transition: http.url -> url.full
_set_http_url(span_attributes, str(url), semconv)
try:
emdneto marked this conversation as resolved.
Show resolved Hide resolved
if _report_new(semconv):
if url.host:
# http semconv transition: http.host -> server.address
_set_http_host(span_attributes, url.host, semconv)
# http semconv transition: net.sock.peer.addr -> network.peer.address
span_attributes[NETWORK_PEER_ADDRESS] = url.host
if url.port:
# http semconv transition: net.sock.peer.port -> network.peer.port
_set_http_peer_port_client(span_attributes, url.port, semconv)
span_attributes[NETWORK_PEER_PORT] = url.port
except ValueError:
_logger.warning("Failed to parse attributes from URL: %s", url)


def _apply_response_client_attributes_to_span(
span, status_code, http_version, semconv
):
span_attributes = {}
# http semconv transition: http.status_code -> http.response.status_code
_set_http_status_code(
span_attributes,
status_code,
semconv,
)
http_status_code = http_status_to_status_code(status_code)
emdneto marked this conversation as resolved.
Show resolved Hide resolved
span.set_status(http_status_code)

if http_status_code is StatusCode.ERROR and _report_new(semconv):
# http semconv transition: new error.type
span_attributes[ERROR_TYPE] = str(status_code)

if http_version and _report_new(semconv):
# http semconv transition: http.flavor -> network.protocol.version
_set_http_network_protocol_version(
span_attributes,
http_version.replace("HTTP/", ""),
semconv,
)

for key, val in span_attributes.items():
span.set_attribute(key, val)


class SyncOpenTelemetryTransport(httpx.BaseTransport):
"""Sync transport class that will trace all requests made with a client.

Expand All @@ -318,12 +391,17 @@ def __init__(
request_hook: typing.Optional[RequestHook] = None,
response_hook: typing.Optional[ResponseHook] = None,
):
_OpenTelemetrySemanticConventionStability._initialize()
self.semconv = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
emdneto marked this conversation as resolved.
Show resolved Hide resolved
_OpenTelemetryStabilitySignalType.HTTP,
)

self._transport = transport
self._tracer = get_tracer(
__name__,
instrumenting_library_version=__version__,
tracer_provider=tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_schema_url(self.semconv),
)
self._request_hook = request_hook
self._response_hook = response_hook
Expand All @@ -340,6 +418,7 @@ def __exit__(
) -> None:
self._transport.__exit__(exc_type, exc_value, traceback)

# pylint: disable=R0914
def handle_request(
self,
*args,
Expand All @@ -355,39 +434,55 @@ def handle_request(
method, url, headers, stream, extensions = _extract_parameters(
args, kwargs
)
span_attributes = _prepare_attributes(method, url)
method_original = method.decode()
lzchen marked this conversation as resolved.
Show resolved Hide resolved
span_name = _get_default_span_name(method_original)
span_attributes = {}
# apply http client response attributes according to semconv
_apply_request_client_attributes_to_span(
span_attributes, url, method_original, span_name, self.semconv
)

request_info = RequestInfo(method, url, headers, stream, extensions)
span_name = _get_default_span_name(
span_attributes[SpanAttributes.HTTP_METHOD]
)

with self._tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if self._request_hook is not None:
exception = None
if callable(self._request_hook):
self._request_hook(span, request_info)

_inject_propagation_headers(headers, args, kwargs)
response = self._transport.handle_request(*args, **kwargs)
if isinstance(response, httpx.Response):
response: httpx.Response = response
status_code = response.status_code
headers = response.headers
stream = response.stream
extensions = response.extensions
else:
status_code, headers, stream, extensions = response

_apply_status_code(span, status_code)

if self._response_hook is not None:
self._response_hook(
span,
request_info,
ResponseInfo(status_code, headers, stream, extensions),

try:
response = self._transport.handle_request(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
response = getattr(exc, "response", None)

if isinstance(response, (httpx.Response, tuple)):
status_code, headers, stream, extensions, http_version = (
_extract_response(response)
)

if span.is_recording():
# apply http client response attributes according to semconv
_apply_response_client_attributes_to_span(
span, status_code, http_version, self.semconv
)

if callable(self._response_hook):
self._response_hook(
span,
request_info,
ResponseInfo(status_code, headers, stream, extensions),
)

if exception and _report_new(self.semconv):
emdneto marked this conversation as resolved.
Show resolved Hide resolved
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)

if exception:
raise exception.with_traceback(exception.__traceback__)

return response

def close(self) -> None:
Expand All @@ -413,12 +508,17 @@ def __init__(
request_hook: typing.Optional[AsyncRequestHook] = None,
response_hook: typing.Optional[AsyncResponseHook] = None,
):
_OpenTelemetrySemanticConventionStability._initialize()
self.semconv = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.HTTP,
)

self._transport = transport
self._tracer = get_tracer(
__name__,
instrumenting_library_version=__version__,
tracer_provider=tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_schema_url(self.semconv),
)
self._request_hook = request_hook
self._response_hook = response_hook
Expand All @@ -435,6 +535,7 @@ async def __aexit__(
) -> None:
await self._transport.__aexit__(exc_type, exc_value, traceback)

# pylint: disable=R0914
async def handle_async_request(self, *args, **kwargs) -> typing.Union[
typing.Tuple[int, "Headers", httpx.AsyncByteStream, dict],
httpx.Response,
Expand All @@ -446,42 +547,57 @@ async def handle_async_request(self, *args, **kwargs) -> typing.Union[
method, url, headers, stream, extensions = _extract_parameters(
args, kwargs
)
span_attributes = _prepare_attributes(method, url)

span_name = _get_default_span_name(
span_attributes[SpanAttributes.HTTP_METHOD]
method_original = method.decode()
span_name = _get_default_span_name(method_original)
span_attributes = {}
# apply http client response attributes according to semconv
_apply_request_client_attributes_to_span(
span_attributes, url, method_original, span_name, self.semconv
)

request_info = RequestInfo(method, url, headers, stream, extensions)

with self._tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if self._request_hook is not None:
exception = None
if callable(self._request_hook):
await self._request_hook(span, request_info)

_inject_propagation_headers(headers, args, kwargs)

response = await self._transport.handle_async_request(
*args, **kwargs
)
if isinstance(response, httpx.Response):
response: httpx.Response = response
status_code = response.status_code
headers = response.headers
stream = response.stream
extensions = response.extensions
else:
status_code, headers, stream, extensions = response

_apply_status_code(span, status_code)

if self._response_hook is not None:
await self._response_hook(
span,
request_info,
ResponseInfo(status_code, headers, stream, extensions),
try:
response = await self._transport.handle_async_request(
*args, **kwargs
)
except Exception as exc: # pylint: disable=W0703
exception = exc
response = getattr(exc, "response", None)

if isinstance(response, (httpx.Response, tuple)):
status_code, headers, stream, extensions, http_version = (
_extract_response(response)
)

if span.is_recording():
# apply http client response attributes according to semconv
_apply_response_client_attributes_to_span(
span, status_code, http_version, self.semconv
)

if callable(self._response_hook):
await self._response_hook(
span,
request_info,
ResponseInfo(status_code, headers, stream, extensions),
)

if exception and _report_new(self.semconv):
span.set_attribute(ERROR_TYPE, type(exception).__qualname__)

if exception:
raise exception.with_traceback(exception.__traceback__)

return response

async def aclose(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@


_instruments = ("httpx >= 0.18.0",)

_supports_metrics = False

_semconv_status = "migration"
Loading