diff --git a/CHANGELOG.md b/CHANGELOG.md index e3b91462ba..a748ed6277 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `opentelemetry-instrumentation-sqlalchemy` Add `attrs_provider` to allow providing attributes to span start functions + ([#2733](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2733)) - `opentelemetry-instrumentation-flask` Add `http.route` and `http.target` to metric attributes ([#2621](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2621)) - `opentelemetry-instrumentation-aws-lambda` Enable global propagator for AWS instrumentation diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py index 2107bc3e23..37495108db 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py @@ -137,6 +137,7 @@ def _instrument(self, **kwargs): ``meter_provider``: a MeterProvider, defaults to global ``enable_commenter``: bool to enable sqlcommenter, defaults to False ``commenter_options``: dict of sqlcommenter config, defaults to {} + ``attrs_provider``: a zero-argument callable that returns attributes to be added to all spans, defaults to None Returns: An instrumented engine if passed in as an argument or list of instrumented engines, None otherwise. @@ -165,25 +166,26 @@ def _instrument(self, **kwargs): enable_commenter = kwargs.get("enable_commenter", False) commenter_options = kwargs.get("commenter_options", {}) + attrs_provider = kwargs.get("attrs_provider") _w( "sqlalchemy", "create_engine", _wrap_create_engine( - tracer, connections_usage, enable_commenter, commenter_options + tracer, connections_usage, enable_commenter, commenter_options, attrs_provider ), ) _w( "sqlalchemy.engine", "create_engine", _wrap_create_engine( - tracer, connections_usage, enable_commenter, commenter_options + tracer, connections_usage, enable_commenter, commenter_options, attrs_provider ), ) _w( "sqlalchemy.engine.base", "Engine.connect", - _wrap_connect(tracer), + _wrap_connect(tracer, attrs_provider), ) if parse_version(sqlalchemy.__version__).release >= (1, 4): _w( @@ -194,6 +196,7 @@ def _instrument(self, **kwargs): connections_usage, enable_commenter, commenter_options, + attrs_provider, ), ) if kwargs.get("engine") is not None: @@ -203,6 +206,7 @@ def _instrument(self, **kwargs): connections_usage, kwargs.get("enable_commenter", False), kwargs.get("commenter_options", {}), + kwargs.get("attrs_provider"), ) if kwargs.get("engines") is not None and isinstance( kwargs.get("engines"), Sequence @@ -214,6 +218,7 @@ def _instrument(self, **kwargs): connections_usage, kwargs.get("enable_commenter", False), kwargs.get("commenter_options", {}), + kwargs.get("attrs_provider"), ) for engine in kwargs.get("engines") ] diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py index 172c1193f3..ec3576bd64 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py @@ -27,7 +27,10 @@ from opentelemetry.semconv.trace import NetTransportValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode - +def _get_provided_attributes(attrs_provider): + if attrs_provider is None: + return None + return attrs_provider() def _normalize_vendor(vendor): """Return a canonical name for a type of database.""" if not vendor: @@ -43,7 +46,7 @@ def _normalize_vendor(vendor): def _wrap_create_async_engine( - tracer, connections_usage, enable_commenter=False, commenter_options=None + tracer, connections_usage, enable_commenter=False, commenter_options=None, attrs_provider=None, ): # pylint: disable=unused-argument def _wrap_create_async_engine_internal(func, module, args, kwargs): @@ -57,6 +60,7 @@ def _wrap_create_async_engine_internal(func, module, args, kwargs): connections_usage, enable_commenter, commenter_options, + attrs_provider=attrs_provider, ) return engine @@ -64,7 +68,7 @@ def _wrap_create_async_engine_internal(func, module, args, kwargs): def _wrap_create_engine( - tracer, connections_usage, enable_commenter=False, commenter_options=None + tracer, connections_usage, enable_commenter=False, commenter_options=None, attrs_provider=None, ): def _wrap_create_engine_internal(func, _module, args, kwargs): """Trace the SQLAlchemy engine, creating an `EngineTracer` @@ -77,21 +81,23 @@ def _wrap_create_engine_internal(func, _module, args, kwargs): connections_usage, enable_commenter, commenter_options, + attrs_provider, ) return engine return _wrap_create_engine_internal -def _wrap_connect(tracer): +def _wrap_connect(tracer, attrs_provider): # pylint: disable=unused-argument def _wrap_connect_internal(func, module, args, kwargs): with tracer.start_as_current_span( - "connect", kind=trace.SpanKind.CLIENT + "connect", kind=trace.SpanKind.CLIENT, attributes=_get_provided_attributes(attrs_provider) ) as span: if span.is_recording(): attrs, _ = _get_attributes_from_url(module.url) - span.set_attributes(attrs) + for key, value in attrs.items(): + span.set_attribute(key, value) span.set_attribute( SpanAttributes.DB_SYSTEM, _normalize_vendor(module.name) ) @@ -110,6 +116,7 @@ def __init__( connections_usage, enable_commenter=False, commenter_options=None, + attrs_provider=None, ): self.tracer = tracer self.connections_usage = connections_usage @@ -118,6 +125,7 @@ def __init__( self.commenter_options = commenter_options if commenter_options else {} self._engine_attrs = _get_attributes_from_engine(engine) self._leading_comment_remover = re.compile(r"^/\*.*?\*/") + self._attrs_provider = attrs_provider self._register_event_listener( engine, "before_cursor_execute", self._before_cur_exec, retval=True @@ -213,9 +221,11 @@ def _before_cur_exec( attrs = _get_attributes_from_cursor(self.vendor, cursor, attrs) db_name = attrs.get(SpanAttributes.DB_NAME, "") + attributes=_get_provided_attributes(self._attrs_provider) span = self.tracer.start_span( self._operation_name(db_name, statement), kind=trace.SpanKind.CLIENT, + attributes=attributes ) with trace.use_span(span, end_on_exit=False): if span.is_recording(): diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py index f729fa6d80..eddfff59eb 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py @@ -427,3 +427,45 @@ def make_shortlived_engine(): gc.collect() assert callback.call_count == 5 + + + def test_attribute_provider(self): + SQLAlchemyInstrumentor().instrument(attrs_provider=lambda: {"my_attr": "my_val"}) + from sqlalchemy import create_engine # pylint: disable-all + + engine = create_engine("sqlite:///:memory:") + cnx = engine.connect() + cnx.execute("SELECT 1 + 1;").fetchall() + spans = self.memory_exporter.get_finished_spans() + + self.assertEqual(len(spans), 2) + self.assertEqual( + spans[0].attributes["my_attr"], "my_val" + ) + self.assertEqual( + spans[1].attributes["my_attr"], "my_val" + ) + + @pytest.mark.skipif( + not sqlalchemy.__version__.startswith("1.4"), + reason="only run async tests for 1.4", + ) + def test_async_attribute_provider(self): + async def run(): + from sqlalchemy.ext.asyncio import ( # pylint: disable-all + create_async_engine, + ) + + engine = create_async_engine("sqlite+aiosqlite:///:memory:") + SQLAlchemyInstrumentor().instrument( + engine=engine.sync_engine, tracer_provider=self.tracer_provider, attrs_provider=lambda: {"my_attr": "my_val"} + ) + async with engine.connect() as cnx: + await cnx.execute(sqlalchemy.text("SELECT 1 + 1;")) + spans = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans), 2) + # first span - the connection to the db + self.assertEqual(spans[0].attributes["my_attr"], "my_val") + self.assertEqual(spans[1].attributes["my_attr"], "my_val") + + asyncio.get_event_loop().run_until_complete(run())