From c8a9efcd230a248b5a860a68be585b1710a6ed4a Mon Sep 17 00:00:00 2001 From: Adam Fisher Date: Wed, 30 Aug 2023 16:53:56 +0300 Subject: [PATCH] fix: redis span alignment (#474) * fix: redis span alignment * chore: extract exporter wait code --- .../instrumentations/redis/__init__.py | 9 +- .../components/tests/test_attr_max_size.py | 5 +- .../components/tests/test_execution_tags.py | 5 +- .../integration/boto3/tests/test_boto3.py | 5 +- .../integration/fastapi/tests/test_fastapi.py | 11 +- .../integration/flask/tests/test_flask.py | 8 +- .../integration/grpcio/tests/test_grpcio.py | 9 +- .../kafka_python/tests/test_kafka_python.py | 5 +- src/test/integration/pika/app/__init__.py | 1 + src/test/integration/pika/tests/test_pika.py | 5 +- .../integration/pymongo/tests/test_pymongo.py | 5 +- .../integration/pymysql/tests/test_pymysql.py | 5 +- src/test/integration/redis/app/redis_hash.py | 7 + .../redis/app/redis_transaction.py | 10 ++ .../integration/redis/tests/test_redis.py | 127 ++++++++++++++---- src/test/test_utils/span_exporter.py | 8 ++ 16 files changed, 154 insertions(+), 71 deletions(-) create mode 100644 src/test/integration/redis/app/redis_hash.py create mode 100644 src/test/integration/redis/app/redis_transaction.py create mode 100644 src/test/test_utils/span_exporter.py diff --git a/src/lumigo_opentelemetry/instrumentations/redis/__init__.py b/src/lumigo_opentelemetry/instrumentations/redis/__init__.py index 94cb5ce7..21c64a4e 100644 --- a/src/lumigo_opentelemetry/instrumentations/redis/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/redis/__init__.py @@ -1,7 +1,7 @@ -import json -from typing import List, Any, Dict +from typing import Any, Dict, List from opentelemetry.trace.span import Span + from lumigo_opentelemetry.instrumentations import AbstractInstrumentor from lumigo_opentelemetry.instrumentations.instrumentation_utils import ( add_body_attribute, @@ -22,8 +22,9 @@ def install_instrumentation(self) -> None: def request_hook( span: Span, instance: Connection, args: List[Any], kwargs: Dict[Any, Any] ) -> None: - add_body_attribute(span, json.dumps(args), "redis.request.args") - add_body_attribute(span, json.dumps(kwargs), "redis.request.kwargs") + # a db.statement attribute is automatically added by the RedisInstrumentor + # when this hook is called, so we don't need to add anything manually + pass def response_hook(span: Span, instance: Connection, response: Any) -> None: add_body_attribute(span, response, "redis.response.body") diff --git a/src/test/components/tests/test_attr_max_size.py b/src/test/components/tests/test_attr_max_size.py index 6741b5e1..260ca702 100644 --- a/src/test/components/tests/test_attr_max_size.py +++ b/src/test/components/tests/test_attr_max_size.py @@ -1,5 +1,5 @@ -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -14,8 +14,7 @@ def test_large_span_attribute_size_max_size_env_var_was_set(self): assert body is not None - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(4, len(spans_container.spans)) diff --git a/src/test/components/tests/test_execution_tags.py b/src/test/components/tests/test_execution_tags.py index 4b8b78fc..e76df7b2 100644 --- a/src/test/components/tests/test_execution_tags.py +++ b/src/test/components/tests/test_execution_tags.py @@ -1,5 +1,5 @@ -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -14,8 +14,7 @@ def test_execution_tag(self): assert body is not None - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(4, len(spans_container.spans)) diff --git a/src/test/integration/boto3/tests/test_boto3.py b/src/test/integration/boto3/tests/test_boto3.py index 332db067..83c33f68 100644 --- a/src/test/integration/boto3/tests/test_boto3.py +++ b/src/test/integration/boto3/tests/test_boto3.py @@ -1,6 +1,6 @@ import json -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -35,8 +35,7 @@ def test_boto3_instrumentation(self): self.assertEqual(body, {"status": "ok"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(4, len(spans_container.spans)) diff --git a/src/test/integration/fastapi/tests/test_fastapi.py b/src/test/integration/fastapi/tests/test_fastapi.py index 0ca33ab5..c160a2a9 100644 --- a/src/test/integration/fastapi/tests/test_fastapi.py +++ b/src/test/integration/fastapi/tests/test_fastapi.py @@ -1,5 +1,5 @@ -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -14,8 +14,7 @@ def test_200_OK(self): self.assertEqual(body, {"message": "Hello FastAPI!"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(3, len(spans_container.spans)) @@ -50,8 +49,7 @@ def test_requests_instrumentation(self): self.assertIn("https://api.chucknorris.io/jokes/", body["url"]) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(4, len(spans_container.spans)) @@ -86,8 +84,7 @@ def test_large_span_attribute_size_default_max_size(self): assert body is not None - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(4, len(spans_container.spans)) diff --git a/src/test/integration/flask/tests/test_flask.py b/src/test/integration/flask/tests/test_flask.py index cb8afe8e..975ab88e 100644 --- a/src/test/integration/flask/tests/test_flask.py +++ b/src/test/integration/flask/tests/test_flask.py @@ -1,5 +1,5 @@ -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -14,8 +14,7 @@ def test_200_OK(self): self.assertEqual(body, {"message": "Hello Flask!"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(1, len(spans_container.spans)) @@ -37,8 +36,7 @@ def test_requests_instrumentation(self): self.assertIn("https://api.chucknorris.io/jokes/", body["url"]) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() self.assertEqual(2, len(spans_container.spans)) diff --git a/src/test/integration/grpcio/tests/test_grpcio.py b/src/test/integration/grpcio/tests/test_grpcio.py index 5037e445..bce80593 100644 --- a/src/test/integration/grpcio/tests/test_grpcio.py +++ b/src/test/integration/grpcio/tests/test_grpcio.py @@ -1,7 +1,7 @@ -import time -import unittest -import sys import os +import sys +import unittest +from test.test_utils.span_exporter import wait_for_exporter sys.path.append(os.path.join(os.path.dirname(__file__), "..", "app")) @@ -20,8 +20,7 @@ def tearDownClass(cls) -> None: stub.SayHelloUnaryUnary(helloworld_pb2.HelloRequest(name="exit")) def check_spans(self, method: str, request_payload: str, response_payload: str): - # TODO Do something deterministic - time.sleep(5) # allow the exporter to catch up + wait_for_exporter(5) server_file = os.getenv("SERVER_SPANDUMP") server_spans = SpansContainer.get_spans_from_file(server_file) diff --git a/src/test/integration/kafka_python/tests/test_kafka_python.py b/src/test/integration/kafka_python/tests/test_kafka_python.py index 1e2accb3..c3bf71e1 100644 --- a/src/test/integration/kafka_python/tests/test_kafka_python.py +++ b/src/test/integration/kafka_python/tests/test_kafka_python.py @@ -1,6 +1,6 @@ import json -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -43,8 +43,7 @@ def test_kafka_python_instrumentation(self): self.assertEqual(body, {"status": "ok"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() diff --git a/src/test/integration/pika/app/__init__.py b/src/test/integration/pika/app/__init__.py index 7180864d..6be1fefe 100644 --- a/src/test/integration/pika/app/__init__.py +++ b/src/test/integration/pika/app/__init__.py @@ -46,6 +46,7 @@ async def invoke_pika_consumer(request: Request): except Exception as e: print(f"Exception while consuming messages: {e}") channel.stop_consuming() + raise e return {"status": "ok"} diff --git a/src/test/integration/pika/tests/test_pika.py b/src/test/integration/pika/tests/test_pika.py index 5494792f..be13c46d 100644 --- a/src/test/integration/pika/tests/test_pika.py +++ b/src/test/integration/pika/tests/test_pika.py @@ -1,6 +1,6 @@ import json -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -48,8 +48,7 @@ def test_pika_instrumentation(self): self.assertEqual(body, {"status": "ok"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() diff --git a/src/test/integration/pymongo/tests/test_pymongo.py b/src/test/integration/pymongo/tests/test_pymongo.py index fea95b5e..074ce166 100644 --- a/src/test/integration/pymongo/tests/test_pymongo.py +++ b/src/test/integration/pymongo/tests/test_pymongo.py @@ -1,5 +1,5 @@ -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -15,8 +15,7 @@ def test_mongo_instrumentation(self): self.assertEqual(body, {"status": "ok"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() diff --git a/src/test/integration/pymysql/tests/test_pymysql.py b/src/test/integration/pymysql/tests/test_pymysql.py index 3ba8a950..b2575dee 100644 --- a/src/test/integration/pymysql/tests/test_pymysql.py +++ b/src/test/integration/pymysql/tests/test_pymysql.py @@ -1,5 +1,5 @@ -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests @@ -15,8 +15,7 @@ def test_pymysql_instrumentation(self): self.assertEqual(body, {"status": "ok"}) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + wait_for_exporter() spans_container = SpansContainer.get_spans_from_file() diff --git a/src/test/integration/redis/app/redis_hash.py b/src/test/integration/redis/app/redis_hash.py new file mode 100644 index 00000000..e34ad311 --- /dev/null +++ b/src/test/integration/redis/app/redis_hash.py @@ -0,0 +1,7 @@ +import os + +import redis + +client = redis.StrictRedis(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT")) +client.hmset("my-key", {"key1": "value1", "key2": "value2"}) +client.hgetall("my-key") diff --git a/src/test/integration/redis/app/redis_transaction.py b/src/test/integration/redis/app/redis_transaction.py new file mode 100644 index 00000000..78f8fd79 --- /dev/null +++ b/src/test/integration/redis/app/redis_transaction.py @@ -0,0 +1,10 @@ +import os + +import redis + +client = redis.StrictRedis(host=os.getenv("REDIS_HOST"), port=os.getenv("REDIS_PORT")) +pipe = client.pipeline() +pipe.set("my-key", "pre-key-value").get("my-key") +pipe.set("my-key", "key-value").get("my-key") +pipe.get("unknown-key") +pipe.execute() diff --git a/src/test/integration/redis/tests/test_redis.py b/src/test/integration/redis/tests/test_redis.py index 7e0b5eb9..5dc99531 100644 --- a/src/test/integration/redis/tests/test_redis.py +++ b/src/test/integration/redis/tests/test_redis.py @@ -1,47 +1,116 @@ +import ast import os +import subprocess import sys -import time import unittest +from test.test_utils.span_exporter import wait_for_exporter +from test.test_utils.spans_parser import SpansContainer + from testcontainers.redis import RedisContainer -import subprocess -from test.test_utils.spans_parser import SpansContainer +ATTRIBUTES = "attributes" +DB_STATEMENT = "db.statement" +DB_SYSTEM = "db.system" +REDIS_RESPONSE_BODY = "redis.response.body" + + +def run_redis_sample(sample_name: str, redis_host: str, redis_port: int): + sample_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + f"../app/redis_{sample_name}.py", + ) + subprocess.check_output( + [sys.executable, sample_path], + env={ + **os.environ, + "REDIS_HOST": redis_host, + "REDIS_PORT": str(redis_port), + "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", + "OTEL_SERVICE_NAME": f"redis_{sample_name}-app", + }, + ) class TestRedisSpans(unittest.TestCase): - def test_redis_instrumentation(self): + def test_redis_set_and_get(self): with RedisContainer("redis:latest") as redis_server: - example_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "../app/redis_set_and_get.py", - ) - subprocess.check_output( - [sys.executable, example_path], - env={ - **os.environ, - "REDIS_HOST": redis_server.get_container_host_ip(), - "REDIS_PORT": str(redis_server.get_exposed_port(6379)), - "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", - "OTEL_SERVICE_NAME": "app", - }, - ) - # TODO Do something deterministic - time.sleep(3) # Sleep to allow the exporter to catch up + run_redis_sample( + "set_and_get", + redis_server.get_container_host_ip(), + redis_server.get_exposed_port(6379), + ) + + wait_for_exporter() + spans_container = SpansContainer.parse_spans_from_file() self.assertGreaterEqual(len(spans_container.spans), 2) set_span, get_span = spans_container.spans - self.assertEqual(set_span["attributes"]["db.system"], "redis") + self.assertEqual(set_span[ATTRIBUTES][DB_SYSTEM], "redis") self.assertEqual( - set_span["attributes"]["redis.request.args"], - '["SET", "my-key", "my-value"]', + set_span[ATTRIBUTES][DB_STATEMENT], + "SET my-key my-value", + ) + self.assertEqual(set_span[ATTRIBUTES][REDIS_RESPONSE_BODY], "True") + + self.assertEqual(get_span[ATTRIBUTES][DB_SYSTEM], "redis") + self.assertEqual(get_span[ATTRIBUTES][DB_STATEMENT], "GET my-key") + self.assertEqual(get_span[ATTRIBUTES][REDIS_RESPONSE_BODY], "my-value") + + def test_redis_hash(self): + with RedisContainer("redis:latest") as redis_server: + run_redis_sample( + "hash", + redis_server.get_container_host_ip(), + redis_server.get_exposed_port(6379), ) - self.assertEqual(set_span["attributes"]["redis.request.kwargs"], "{}") - self.assertEqual(set_span["attributes"]["redis.response.body"], "True") - self.assertEqual(get_span["attributes"]["db.system"], "redis") + wait_for_exporter() + + spans_container = SpansContainer.parse_spans_from_file() + + self.assertGreaterEqual(len(spans_container.spans), 2) + set_span, get_span = spans_container.spans + self.assertEqual(set_span[ATTRIBUTES][DB_SYSTEM], "redis") + self.assertEqual( + set_span[ATTRIBUTES][DB_STATEMENT], + "HMSET my-key key1 value1 key2 value2", + ) + self.assertEqual(set_span[ATTRIBUTES][REDIS_RESPONSE_BODY], "True") + + self.assertEqual(get_span[ATTRIBUTES][DB_SYSTEM], "redis") + self.assertEqual(get_span[ATTRIBUTES][DB_STATEMENT], "HGETALL my-key") + self.assertEqual( + ast.literal_eval(get_span[ATTRIBUTES][REDIS_RESPONSE_BODY]), + {b"key1": b"value1", b"key2": b"value2"}, + ) + + def test_redis_transaction(self): + with RedisContainer("redis:latest") as redis_server: + run_redis_sample( + "transaction", + redis_server.get_container_host_ip(), + redis_server.get_exposed_port(6379), + ) + + wait_for_exporter() + + spans_container = SpansContainer.parse_spans_from_file() + + self.assertGreaterEqual(len(spans_container.spans), 1) + transaction_span = spans_container.spans[0] + self.assertEqual(transaction_span[ATTRIBUTES][DB_SYSTEM], "redis") + self.assertEqual( + transaction_span[ATTRIBUTES][DB_STATEMENT].split("\n"), + [ + "SET my-key pre-key-value", + "GET my-key", + "SET my-key key-value", + "GET my-key", + "GET unknown-key", + ], + ) self.assertEqual( - get_span["attributes"]["redis.request.args"], '["GET", "my-key"]' + ast.literal_eval(transaction_span[ATTRIBUTES][REDIS_RESPONSE_BODY]), + [True, b"pre-key-value", True, b"key-value", None], ) - self.assertEqual(get_span["attributes"]["redis.request.kwargs"], "{}") - self.assertEqual(get_span["attributes"]["redis.response.body"], "my-value") diff --git a/src/test/test_utils/span_exporter.py b/src/test/test_utils/span_exporter.py new file mode 100644 index 00000000..0907c77b --- /dev/null +++ b/src/test/test_utils/span_exporter.py @@ -0,0 +1,8 @@ +import time + + +def wait_for_exporter(wait_time_sec: int = 3): + """Wait for the exporter to have collected all the spans.""" + + # TODO Do something deterministic + time.sleep(wait_time_sec) # Sleep to allow the exporter to catch up