Skip to content

Commit

Permalink
Start resolving type checker issues
Browse files Browse the repository at this point in the history
  • Loading branch information
anticorrelator committed Jan 22, 2024
1 parent b3da3c1 commit 413c163
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
from functools import wraps
from importlib import import_module
from inspect import signature
from typing import Any, Callable, Collection, Dict, Optional, Tuple, TypeVar
from typing import IO, Any, Callable, Collection, Dict, Optional, Tuple, TypeVar, cast

from botocore.client import BaseClient
from botocore.response import StreamingBody
from openinference.semconv.trace import MessageAttributes, SpanAttributes
from opentelemetry import context as context_api
from opentelemetry import trace as trace_api
from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor # type: ignore
from opentelemetry.instrumentation.utils import (
_SUPPRESS_INSTRUMENTATION_KEY,
)
from opentelemetry.trace import Tracer
from opentelemetry.util.types import AttributeValue
from wrapt import wrap_function_wrapper
Expand All @@ -31,16 +30,28 @@ def _set_span_attribute(span: trace_api.Span, name: str, value: AttributeValue)
return


class InstrumentedClient(BaseClient):
"""
Proxy class representing an instrumented boto client.
"""

invoke_model: Callable[..., Any]
_unwrapped_invoke_model: Callable[..., Any]


class BufferedStreamingBody(StreamingBody):
def __init__(self, raw_stream: io.IOBase, content_length: int) -> None:
_raw_stream: IO[bytes]

def __init__(self, raw_stream: IO[bytes], content_length: int) -> None:
super().__init__(raw_stream, content_length)
self._buffer: Optional[io.IOBase] = None

def read(self, amt: Optional[int] = None) -> bytes:
if self._buffer is None:
self._buffer = io.BytesIO(self._raw_stream.read())

return self._buffer.read(amt)
output: bytes = self._buffer.read(amt)
return output

def reset(self) -> None:
# Reset the buffer to enable reading the stream again
Expand All @@ -54,11 +65,11 @@ def _client_wrapper(
instance: Optional[Any],
args: Tuple[Any, ...],
kwargs: Dict[str, Any],
) -> CallableType:
) -> BaseClient:
@wraps(wrapped)
def create_instrumented_client(*args: Any, **kwargs: Any) -> Any:
def create_instrumented_client(*args: Any, **kwargs: Any) -> BaseClient:
"""Instruments and calls every function defined in TO_WRAP."""
client = wrapped(*args, **kwargs)
client: BaseClient = wrapped(*args, **kwargs)
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return client

Expand All @@ -67,6 +78,7 @@ def create_instrumented_client(*args: Any, **kwargs: Any) -> Any:
bound_arguments.apply_defaults()

if bound_arguments.arguments.get("service_name") == "bedrock-runtime":
client = cast(InstrumentedClient, client)
client._unwrapped_invoke_model = client.invoke_model
client.invoke_model = _model_invocation_wrapper(tracer)(client)
return client
Expand All @@ -77,11 +89,11 @@ def create_instrumented_client(*args: Any, **kwargs: Any) -> Any:
return _client_wrapper # type: ignore


def _model_invocation_wrapper(tracer: Tracer) -> Callable[[Any], CallableType]:
def _invocation_wrapper(wrapped_client: Any) -> CallableType:
def _model_invocation_wrapper(tracer: Tracer) -> Callable[[InstrumentedClient], Callable[..., Any]]:
def _invocation_wrapper(wrapped_client: InstrumentedClient) -> Callable[..., Any]:
"""Instruments a bedrock client's `invoke_model` method."""

@wraps(wrapped_client)
@wraps(wrapped_client.invoke_model)
def instrumented_response(*args: Any, **kwargs: Any) -> Dict[str, Any]:
with tracer.start_as_current_span("bedrock.invoke_model") as span:
response = wrapped_client._unwrapped_invoke_model(*args, **kwargs)
Expand Down Expand Up @@ -127,7 +139,7 @@ def instrumented_response(*args: Any, **kwargs: Any) -> Dict[str, Any]:

return response

return instrumented_response # type: ignore
return instrumented_response

return _invocation_wrapper

Expand Down

0 comments on commit 413c163

Please sign in to comment.