Skip to content

Commit

Permalink
add pyload extraction in psycopg2
Browse files Browse the repository at this point in the history
  • Loading branch information
saartochner-lumigo committed Oct 3, 2023
1 parent ac8c017 commit cb698c4
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
64 changes: 62 additions & 2 deletions src/lumigo_opentelemetry/instrumentations/psycopg2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import wrapt
from opentelemetry.trace import SpanKind
from lumigo_opentelemetry.instrumentations import AbstractInstrumentor
from lumigo_opentelemetry.libs.json_utils import dump_with_context
from lumigo_opentelemetry.libs.general_utils import lumigo_safe_execute


class Psycopg2Instrumentor(AbstractInstrumentor):
Expand All @@ -9,11 +13,67 @@ def check_if_applicable(self) -> None:
import psycopg2 # noqa

def install_instrumentation(self) -> None:
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor

from opentelemetry.instrumentation import psycopg2
from opentelemetry.instrumentation.dbapi import DatabaseApiIntegration

# Patch the connect function to capture parameters (db.statement.parameters)
def new_wrap_connect(original_func, instance, args, kwargs): # type: ignore
kwargs["capture_parameters"] = True
return original_func(*args, **kwargs)

# Patch the _new_cursor_factory function to capture all the responses from all the fetch functions
def patched__new_cursor_factory(original_func, instance, args, kwargs): # type: ignore
db_api = kwargs["db_api"]
_cursor_tracer = psycopg2.CursorTracer(db_api)
original_response = original_func(*args, **kwargs)
# Patch all the fetch functions in `psycopg2.extensions.cursor`
original_functions = {
fetch_function: getattr(original_response, fetch_function)
for fetch_function in ["fetchone", "fetchmany", "fetchall"]
}
for (
fetch_function_name,
original_fetch_function,
) in original_functions.items():

def wrapper( # type: ignore
fetch_function_name: str, original_fetch_function
):
def func(self, *args, **kwargs): # type: ignore
with db_api._tracer.start_as_current_span(
fetch_function_name, kind=SpanKind.CLIENT
) as span:
result = original_fetch_function(self, *args, **kwargs)
with lumigo_safe_execute(
f"set response body ({fetch_function_name})"
):
_cursor_tracer._populate_span(span, None, *args)
span.set_attribute(
"db.response.body",
dump_with_context("responseBody", result),
)
return result

return func

setattr(
original_response,
fetch_function_name,
wrapper(fetch_function_name, original_fetch_function),
)
return original_response

wrapt.wrap_function_wrapper(
DatabaseApiIntegration, "__init__", new_wrap_connect
)
wrapt.wrap_function_wrapper(
psycopg2, "_new_cursor_factory", patched__new_cursor_factory
)

# if we don't skip the dependency check, the instrumentor will fail
# because it can't detect psycopg2-binary
Psycopg2Instrumentor().instrument(skip_dep_check=True)
psycopg2.Psycopg2Instrumentor().instrument(skip_dep_check=True)


instrumentor: AbstractInstrumentor = Psycopg2Instrumentor()
15 changes: 14 additions & 1 deletion src/test/integration/psycopg2/tests/test_psycopg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_psycopg2_create_add_select(self):
spans_container = SpansContainer.get_spans_from_file()

root_spans = spans_container.get_root_spans()
self.assertEqual(len(root_spans), 4, "There should be 4 root spans")
self.assertEqual(len(root_spans), 5, "There should be 5 root spans")

select_version_span = root_spans[0]

Expand Down Expand Up @@ -76,10 +76,23 @@ def test_psycopg2_create_add_select(self):
"INSERT INTO users"
)
)
print(insert_user_span["attributes"]["db.statement.parameters"])
self.assertIn(
f"('{test_name}', '{test_email}')",
insert_user_span["attributes"]["db.statement.parameters"],
)

select_users_span = root_spans[3]

self.assertEqual(select_users_span["attributes"]["db.system"], "postgresql")
self.assertEqual(
select_users_span["attributes"]["db.statement"], "SELECT * FROM users"
)

fetch_all_span = root_spans[4]

self.assertEqual(fetch_all_span["attributes"]["db.system"], "postgresql")
self.assertIn(
f'[[1, "{test_name}", "{test_email}"]]',
fetch_all_span["attributes"]["db.response.body"],
)

0 comments on commit cb698c4

Please sign in to comment.