From 0e10e7edd95eff6fe33447a015cac39c47633b56 Mon Sep 17 00:00:00 2001 From: Adam Fisher Date: Mon, 23 Oct 2023 19:42:30 +0300 Subject: [PATCH] fix: update opentelemetry dependencies to patch security vulnerability (#531) * fix: update opentelemetry dependencies * refactor: sensitive/flaky tests --------- Co-authored-by: saartochner --- .github/workflows/push-actions.yml | 2 +- CONTRIBUTING.md | 3 +- noxfile.py | 162 +----------- setup.py | 36 +-- .../instrumentations/fastapi/__init__.py | 8 +- .../fastapi/parsers/__init__.py | 18 +- .../instrumentations/redis/__init__.py | 5 +- .../instrumentations/requests/__init__.py | 34 ++- .../requests/parsers/__init__.py | 73 ------ src/test/components/scripts/start_uvicorn | 4 - src/test/components/start_uvicorn.py | 15 ++ src/test/components/tests/app_runner.py | 54 ++++ .../components/tests/test_attr_max_size.py | 39 --- .../components/tests/test_execution_tags.py | 66 +++-- src/test/integration/boto3-sqs/conftest.py | 6 +- src/test/integration/boto3/conftest.py | 6 +- src/test/integration/conftest.py | 6 +- src/test/integration/django/conftest.py | 6 +- src/test/integration/fastapi/app/__init__.py | 17 +- src/test/integration/fastapi/conftest.py | 6 +- .../integration/fastapi/scripts/start_uvicorn | 3 - src/test/integration/fastapi/start_uvicorn.py | 15 ++ .../integration/fastapi/tests/app_runner.py | 54 ++++ .../integration/fastapi/tests/test_fastapi.py | 230 ++++++++++-------- src/test/integration/flask/conftest.py | 6 +- .../integration/grpcio/app/greeter_server.py | 5 +- src/test/integration/grpcio/conftest.py | 8 + .../grpcio/requirements_others.txt | 3 +- .../integration/grpcio/scripts/start_server | 4 - .../integration/grpcio/tests/app_runner.py | 42 ++++ .../integration/grpcio/tests/test_grpcio.py | 87 ++++--- src/test/integration/kafka_python/conftest.py | 6 +- .../kafka_python/tests/test_kafka_python.py | 2 +- src/test/integration/pika/conftest.py | 6 +- src/test/integration/pika/tests/test_pika.py | 2 +- src/test/integration/psycopg2/conftest.py | 6 +- src/test/integration/pymongo/conftest.py | 6 +- src/test/integration/pymysql/conftest.py | 6 +- .../integration/redis/tests/test_redis.py | 10 +- src/test/test_utils/processes.py | 69 ++++++ src/test/test_utils/spans_parser.py | 46 +++- 41 files changed, 646 insertions(+), 536 deletions(-) delete mode 100644 src/lumigo_opentelemetry/instrumentations/requests/parsers/__init__.py delete mode 100755 src/test/components/scripts/start_uvicorn create mode 100644 src/test/components/start_uvicorn.py create mode 100644 src/test/components/tests/app_runner.py delete mode 100644 src/test/components/tests/test_attr_max_size.py delete mode 100755 src/test/integration/fastapi/scripts/start_uvicorn create mode 100644 src/test/integration/fastapi/start_uvicorn.py create mode 100644 src/test/integration/fastapi/tests/app_runner.py create mode 100644 src/test/integration/grpcio/conftest.py delete mode 100755 src/test/integration/grpcio/scripts/start_server create mode 100644 src/test/integration/grpcio/tests/app_runner.py create mode 100644 src/test/test_utils/processes.py diff --git a/.github/workflows/push-actions.yml b/.github/workflows/push-actions.yml index 73180750..663e3f50 100644 --- a/.github/workflows/push-actions.yml +++ b/.github/workflows/push-actions.yml @@ -26,7 +26,7 @@ jobs: # The task outputs the list of tasks on stdout, each entry separated by a `\n` character. # The task output is piped into the `jq` command, that formats it as a JSON list, as the # we need a JSON datastructure later on for dynamic matrixes. - echo "integration_tests=$(nox -e list_integration_tests_ci | jq -Rnc '[inputs]')" >> $GITHUB_OUTPUT + echo "integration_tests=$(python -m nox -e list_integration_tests_ci | jq -Rnc '[inputs]')" >> $GITHUB_OUTPUT version-testing: needs: [list-instrumentations] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e68d226b..18d1b266 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -25,7 +25,8 @@ It might be necessary to throw in a `brew tap homebrew/core` along the way. ### PyCharm Users -If you are using pycharm, ensure that you set it to use the virtualenv virtual environment manager. This is available in the menu under `PyCharm -> Preferences -> Project -> Interpreter` +If you are using PyCharm, ensure that you set it to use the virtualenv virtual environment manager. +This is available in the menu under `PyCharm -> Settings -> Project -> Interpreter`. ## Running the test suite diff --git a/noxfile.py b/noxfile.py index b2ffc1f3..31df65b2 100755 --- a/noxfile.py +++ b/noxfile.py @@ -10,10 +10,11 @@ from xml.etree import ElementTree import nox -import psutil import requests import yaml +from src.test.test_utils.processes import kill_process, wait_for_app_start + # Ensure nox can load local packages repo_dir = os.path.dirname(__file__) if repo_dir not in sys.path: @@ -41,10 +42,10 @@ def create_component_tempfile(name: str): return temp_file.name -def create_it_tempfile(name: str): +def create_it_tempfile(name: str, prefix: str = "temp_"): temp_file = tempfile.NamedTemporaryFile( suffix=".txt", - prefix="temp_", + prefix=prefix, dir=os.path.abspath(f"src/test/integration/{name}/"), delete=False, ) @@ -160,11 +161,6 @@ def dependency_versions_to_be_tested( ] -def wait_for_app_start(): - # TODO Make this deterministic - time.sleep(8) - - @nox.session() def list_integration_tests_ci(session): integration_tests = { @@ -363,19 +359,6 @@ def integration_tests_fastapi( session.install("-r", OTHER_REQUIREMENTS) try: - session.run( - "sh", - "./scripts/start_uvicorn", - env={ - "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", - "LUMIGO_DEBUG_SPANDUMP": temp_file, - "OTEL_SERVICE_NAME": "app", - }, - external=True, - ) # One happy day we will have https://github.com/wntrblm/nox/issues/198 - - wait_for_app_start() - session.run( "pytest", "--tb", @@ -389,16 +372,11 @@ def integration_tests_fastapi( }, ) finally: - kill_process_and_clean_outputs(temp_file, "uvicorn", session) + clean_outputs(temp_file, session) @nox.session(python=python_versions()) def component_tests(session): - component_tests_attr_max_size( - session=session, - fastapi_version="0.78.0", # arbitrary version - uvicorn_version="0.16.0", # TODO don't update, see https://lumigo.atlassian.net/browse/RD-11466 - ) component_tests_execution_tags( session=session, fastapi_version="0.78.0", # arbitrary version @@ -406,52 +384,6 @@ def component_tests(session): ) -def component_tests_attr_max_size( - session, - fastapi_version, - uvicorn_version, -): - install_package("uvicorn", uvicorn_version, session) - install_package("fastapi", fastapi_version, session) - - session.install(".") - - temp_file = create_component_tempfile("attr_max_size") - with session.chdir("src/test/components"): - session.install("-r", OTHER_REQUIREMENTS) - - try: - session.run( - "sh", - "./scripts/start_uvicorn", - env={ - "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", - "LUMIGO_DEBUG_SPANDUMP": temp_file, - "OTEL_SERVICE_NAME": "app", - "OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT": "1", - }, - external=True, - ) # One happy day we will have https://github.com/wntrblm/nox/issues/198 - - wait_for_app_start() - - session.run( - "pytest", - "--tb", - "native", - "--log-cli-level=INFO", - "--color=yes", - "-v", - "./tests/test_attr_max_size.py", - env={ - "LUMIGO_DEBUG_SPANDUMP": temp_file, - "OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT": "1", - }, - ) - finally: - kill_process_and_clean_outputs(temp_file, "uvicorn", session) - - def component_tests_execution_tags( session, fastapi_version, @@ -467,19 +399,6 @@ def component_tests_execution_tags( session.install("-r", OTHER_REQUIREMENTS) try: - session.run( - "sh", - "./scripts/start_uvicorn", - env={ - "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", - "LUMIGO_DEBUG_SPANDUMP": temp_file, - "OTEL_SERVICE_NAME": "app", - }, - external=True, - ) # One happy day we will have https://github.com/wntrblm/nox/issues/198 - - wait_for_app_start() - session.run( "pytest", "--tb", @@ -493,7 +412,7 @@ def component_tests_execution_tags( }, ) finally: - kill_process_and_clean_outputs(temp_file, "uvicorn", session) + clean_outputs(temp_file, session) @nox.session() @@ -638,29 +557,15 @@ def integration_tests_grpcio( "python", "-m", "pip", "install", "--upgrade", "pip", "setuptools", "wheel" ) - server_spans = tempfile.NamedTemporaryFile( - suffix=".txt", prefix=create_it_tempfile("grpcio") - ).name - client_spans = tempfile.NamedTemporaryFile( - suffix=".txt", prefix=create_it_tempfile("grpcio") - ).name + base_span_file = create_it_tempfile("grpcio") + clean_outputs(base_span_file, session) + + server_spans = create_it_tempfile("grpcio", prefix=base_span_file) + client_spans = create_it_tempfile("grpcio", prefix=base_span_file) with session.chdir("src/test/integration/grpcio"): session.install("-r", OTHER_REQUIREMENTS) try: - session.run( - "sh", - "./scripts/start_server", - env={ - "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", - "LUMIGO_DEBUG_SPANDUMP": server_spans, - "OTEL_SERVICE_NAME": "app", - }, - external=True, - ) # One happy day we will have https://github.com/wntrblm/nox/issues/198 - - wait_for_app_start() - session.run( "pytest", "--tb", @@ -677,7 +582,6 @@ def integration_tests_grpcio( }, ) finally: - kill_process("greeter_server.py") clean_outputs(server_spans, session) clean_outputs(client_spans, session) @@ -1084,50 +988,6 @@ def kill_process_and_clean_outputs(full_path: str, process_name: str, session) - clean_outputs(full_path, session) -def kill_process(process_name: str) -> None: - proc_name = "undefined" - cmd_line = "undefined" - try: - # Kill all processes with the given name - for proc in psutil.process_iter( - attrs=["pid", "name", "cmdline"], ad_value=None - ): - proc_name = proc.name() - if proc.status() == psutil.STATUS_ZOMBIE: - continue - # The python process is named "Python" on OS X and "uvicorn" on CircleCI - if proc_name == process_name: - print(f"Killing process with name {proc_name}...") - proc.kill() - elif proc_name.lower().startswith("python"): - # drop the first argument, which is the python executable - python_command_parts = proc.cmdline()[1:] - # the initial command part is the last part of the path - python_command_parts[0] = python_command_parts[0].split("/")[-1] - # combine the remaining arguments - command = " ".join(python_command_parts) - print( - f"Evaluating process with name '{proc_name}' and command '{command}'..." - ) - if ( - len(cmd_line) > 1 - and "nox" not in command - and process_name in command - ): - print( - f"Killing process with name '{proc_name}' and command '{command}'..." - ) - proc.kill() - except psutil.ZombieProcess as zp: - print( - f"Failed to kill zombie process '{proc_name}' (looking for {process_name}) with command line '{cmd_line}': {str(zp)}" - ) - except psutil.NoSuchProcess as nsp: - print( - f"Failed to kill process '{proc_name}' (looking for {process_name}) with command line '{cmd_line}': {str(nsp)}" - ) - - def clean_outputs(full_path: str, session) -> None: session.run("rm", "-f", full_path, external=True) diff --git a/setup.py b/setup.py index 0b86bf84..eac88770 100644 --- a/setup.py +++ b/setup.py @@ -18,25 +18,25 @@ "protobuf>=3.13.0, <4.0.0", "wrapt>=1.11.0", "lumigo_core==0.0.6", - "opentelemetry-api==1.15.0", - "opentelemetry-sdk==1.15.0", + "opentelemetry-api==1.20.0", + "opentelemetry-sdk==1.20.0", "opentelemetry-sdk-extension-aws==2.0.1", - "opentelemetry-exporter-otlp-proto-http==1.15.0", - "opentelemetry-semantic-conventions==0.36b0", - "opentelemetry-instrumentation==0.36b0", - "opentelemetry-instrumentation-asgi==0.36b0", - "opentelemetry-instrumentation-boto==0.36b0", - "opentelemetry-instrumentation-fastapi==0.36b0", - "opentelemetry-instrumentation-flask==0.36b0", - "opentelemetry-instrumentation-grpc==0.36b0", - "opentelemetry-instrumentation-kafka-python==0.36b0", - "opentelemetry-instrumentation-pika==0.36b0", - "opentelemetry-instrumentation-psycopg2==0.36b0", - "opentelemetry-instrumentation-pymongo==0.36b0", - "opentelemetry-instrumentation-pymysql==0.36b0", - "opentelemetry-instrumentation-requests==0.36b0", - "opentelemetry-instrumentation-redis==0.36b0", - "opentelemetry-instrumentation-django==0.36b0", + "opentelemetry-exporter-otlp-proto-http==1.20.0", + "opentelemetry-semantic-conventions==0.41b0", + "opentelemetry-instrumentation==0.41b0", + "opentelemetry-instrumentation-asgi==0.41b0", + "opentelemetry-instrumentation-boto==0.41b0", + "opentelemetry-instrumentation-fastapi==0.41b0", + "opentelemetry-instrumentation-flask==0.41b0", + "opentelemetry-instrumentation-grpc==0.41b0", + "opentelemetry-instrumentation-kafka-python==0.41b0", + "opentelemetry-instrumentation-pika==0.41b0", + "opentelemetry-instrumentation-psycopg2==0.41b0", + "opentelemetry-instrumentation-pymongo==0.41b0", + "opentelemetry-instrumentation-pymysql==0.41b0", + "opentelemetry-instrumentation-requests==0.41b0", + "opentelemetry-instrumentation-redis==0.41b0", + "opentelemetry-instrumentation-django==0.41b0", # v4.7.1 is the last version that supports python 3.7 "typing_extensions==4.7.1; python_version<'3.8'", ], diff --git a/src/lumigo_opentelemetry/instrumentations/fastapi/__init__.py b/src/lumigo_opentelemetry/instrumentations/fastapi/__init__.py index ce30db8b..5a29e1bd 100644 --- a/src/lumigo_opentelemetry/instrumentations/fastapi/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/fastapi/__init__.py @@ -13,6 +13,13 @@ def install_instrumentation(self) -> None: import wrapt from lumigo_opentelemetry import logger + from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware + + wrapt.wrap_function_wrapper( + OpenTelemetryMiddleware, + "_get_otel_receive", + FastAPIParser.wrapt__get_otel_receive, + ) @wrapt.patch_function_wrapper("fastapi", "FastAPI.__init__") def init_otel_middleware(wrapped, instance, args, kwargs): # type: ignore @@ -23,7 +30,6 @@ def init_otel_middleware(wrapped, instance, args, kwargs): # type: ignore FastAPIInstrumentor().instrument_app( instance, server_request_hook=FastAPIParser.server_request_hook, - client_request_hook=FastAPIParser.client_request_hook, client_response_hook=FastAPIParser.client_response_hook, ) diff --git a/src/lumigo_opentelemetry/instrumentations/fastapi/parsers/__init__.py b/src/lumigo_opentelemetry/instrumentations/fastapi/parsers/__init__.py index 504c7249..7e1542da 100644 --- a/src/lumigo_opentelemetry/instrumentations/fastapi/parsers/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/fastapi/parsers/__init__.py @@ -1,6 +1,5 @@ from typing import Dict, Any -from lumigo_opentelemetry import logger from lumigo_opentelemetry.libs.general_utils import lumigo_safe_execute from lumigo_opentelemetry.libs.json_utils import ( dump, @@ -36,9 +35,20 @@ def server_request_hook(span: Span, scope: Dict[str, Any]) -> None: span.set_attributes(attributes) @staticmethod - def client_request_hook(span: Span, scope: Dict[Any, Any]) -> None: - with lumigo_safe_execute("FastAPIParser: client_request_hook"): - logger.debug(f"client_request_hook span: {span}, scope: {scope}") + def wrapt__get_otel_receive(original_func, instance, args, kwargs): # type: ignore + original_otel_receive = original_func(*args, **kwargs) + + async def new_otel_receive(): # type: ignore + return_value = await original_otel_receive() + with lumigo_safe_execute("FastAPIParser: new_otel_receive"): + with instance.tracer.start_as_current_span("receive_body") as send_span: + send_span.set_attribute( + "http.request.body", + dump_with_context("requestBody", return_value), + ) + return return_value + + return new_otel_receive @staticmethod def client_response_hook(span: Span, message: Dict[str, Any]) -> None: diff --git a/src/lumigo_opentelemetry/instrumentations/redis/__init__.py b/src/lumigo_opentelemetry/instrumentations/redis/__init__.py index 87a2a7ac..909e3fc3 100644 --- a/src/lumigo_opentelemetry/instrumentations/redis/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/redis/__init__.py @@ -23,8 +23,9 @@ def request_hook( span: Span, instance: Connection, args: List[Any], kwargs: Dict[Any, Any] ) -> None: # a db.statement attribute is automatically added by the RedisInstrumentor - # when this hook is called, so we don't need to add anything manually - pass + # when this hook is called, but only includes the command name for some + # versions so we need to set it ourselves. + add_body_attribute(span, " ".join(args), "db.statement") def response_hook(span: Span, instance: Connection, response: Any) -> None: add_body_attribute(span, response, "db.response.body") diff --git a/src/lumigo_opentelemetry/instrumentations/requests/__init__.py b/src/lumigo_opentelemetry/instrumentations/requests/__init__.py index 12d00070..ad356888 100644 --- a/src/lumigo_opentelemetry/instrumentations/requests/__init__.py +++ b/src/lumigo_opentelemetry/instrumentations/requests/__init__.py @@ -1,5 +1,10 @@ from lumigo_opentelemetry.instrumentations import AbstractInstrumentor -from .parsers import HttpParser +from opentelemetry.trace import Span +from lumigo_opentelemetry.libs.general_utils import lumigo_safe_execute +from lumigo_opentelemetry.libs.json_utils import dump_with_context +from lumigo_opentelemetry.instrumentations.instrumentation_utils import ( + add_body_attribute, +) class RequestsInstrumentor(AbstractInstrumentor): @@ -13,8 +18,33 @@ def check_if_applicable(self) -> None: def install_instrumentation(self) -> None: from opentelemetry.instrumentation.requests import RequestsInstrumentor + from requests.models import PreparedRequest, Response - RequestsInstrumentor().instrument(span_callback=HttpParser.request_callback) + def request_hook(span: Span, request: PreparedRequest) -> None: + with lumigo_safe_execute("requests request_hook"): + span.set_attribute( + "http.request.headers", + dump_with_context("requestHeaders", request.headers), + ) + add_body_attribute(span, request.body, "http.request.body") + + def response_hook( + span: Span, request: PreparedRequest, response: Response + ) -> None: + with lumigo_safe_execute("requests response_hook"): + span.set_attribute( + "http.response.headers", + dump_with_context("responseHeaders", response.headers), + ) + add_body_attribute(span, response.content, "http.response.body") + if "x-amzn-requestid" in response.headers: + span.set_attribute( + "messageId", response.headers["x-amzn-requestid"] + ) + + RequestsInstrumentor().instrument( + request_hook=request_hook, response_hook=response_hook + ) instrumentor: AbstractInstrumentor = RequestsInstrumentor() diff --git a/src/lumigo_opentelemetry/instrumentations/requests/parsers/__init__.py b/src/lumigo_opentelemetry/instrumentations/requests/parsers/__init__.py deleted file mode 100644 index bce39eab..00000000 --- a/src/lumigo_opentelemetry/instrumentations/requests/parsers/__init__.py +++ /dev/null @@ -1,73 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from typing import Dict, Any -from requests import Response - -from opentelemetry.trace import Span - -from lumigo_opentelemetry.libs.general_utils import lumigo_safe_execute -from lumigo_opentelemetry.libs.json_utils import dump_with_context - - -@dataclass(frozen=True) -class HttpParser: - response: Response - - @staticmethod - def request_callback(span: Span, response: Response) -> None: - with lumigo_safe_execute("HttpParser: request_callback"): - if not response: - return - attributes = HttpParser.get_parser(response=response).get_attributes() - - span.set_attributes(attributes) - - def parse_request(self) -> Dict[str, Any]: - request = self.response.request - return { - "http.request.body": dump_with_context("requestBody", request.body), - "http.request.headers": dump_with_context( - "requestHeaders", request.headers - ), - } - - def parse_response(self) -> Dict[str, Any]: - return { - "http.response.body": dump_with_context("responseBody", self.response.text), - "http.response.headers": dump_with_context( - "responseHeaders", self.response.headers - ), - } - - def extract_custom_attributes(self) -> Dict[str, Any]: - return {} - - def get_attributes(self) -> Dict[str, Any]: - return { - **self.parse_request(), - **self.parse_response(), - **self.extract_custom_attributes(), - } - - @staticmethod - def get_parser(response: Response) -> HttpParser: - url = response.url - if url.endswith("amazonaws.com") or ( - response.request.headers - and response.request.headers.get("x-amzn-requestid") - ): - return AwsParser(response=response) - return HttpParser(response=response) - - -@dataclass(frozen=True) -class AwsParser(HttpParser): - def extract_custom_attributes(self) -> Dict[str, Any]: - with lumigo_safe_execute("aws: extract_custom_attributes"): - return { - "messageId": self.response.request.headers.get("x-amzn-requestid") - if self.response.request - else "", - } - return {} diff --git a/src/test/components/scripts/start_uvicorn b/src/test/components/scripts/start_uvicorn deleted file mode 100755 index 016dc9c7..00000000 --- a/src/test/components/scripts/start_uvicorn +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -uvicorn fastapi_external_apis.app:app --port 8021 & -uvicorn app:app --port 8020 & diff --git a/src/test/components/start_uvicorn.py b/src/test/components/start_uvicorn.py new file mode 100644 index 00000000..67e98b16 --- /dev/null +++ b/src/test/components/start_uvicorn.py @@ -0,0 +1,15 @@ +import argparse +import uvicorn + + +def serve(app, port): + """Serve the web application.""" + uvicorn.run(app, port=port) + + +if __name__ == "__main__": + argParser = argparse.ArgumentParser() + argParser.add_argument("--app", help="app name") + argParser.add_argument("--port", type=int, help="port number") + args = argParser.parse_args() + serve(args.app, args.port) diff --git a/src/test/components/tests/app_runner.py b/src/test/components/tests/app_runner.py new file mode 100644 index 00000000..7d229cc1 --- /dev/null +++ b/src/test/components/tests/app_runner.py @@ -0,0 +1,54 @@ +import os +import subprocess +import sys +from pathlib import Path +from test.test_utils.processes import kill_process + + +class FastApiApp(object): + def __init__(self, app: str, port: int): + self.app = app + self.port = port + cwd = Path(__file__).parent.parent + print(f"cwd = {cwd}") + env = { + **os.environ, + "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", + "LUMIGO_DEBUG_SPANDUMP": os.environ["LUMIGO_DEBUG_SPANDUMP"], + "OTEL_SERVICE_NAME": "fastapi_test_app", + } + print(f"venv bin path = {Path(sys.executable).parent}") + cmd = [ + sys.executable, + "start_uvicorn.py", + "--app", + self.app, + "--port", + str(self.port), + ] + print(f"cmd = {cmd}") + self.process = subprocess.Popen( + cmd, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + is_app_running = False + for line in self.process.stderr: + print(line) + if "Uvicorn running" in str(line): + is_app_running = True + break + if not is_app_running: + raise Exception( + f"FastApiApp app '{self.app}' failed to start on port {self.port}" + ) + + def __enter__(self): + return self + + def __exit__(self, *args): + # because we need a shell to run uvicorn we need to kill multiple processs, + # but not the process group because that includes the test process as well + kill_process(["uvicorn", self.app, str(self.port)]) diff --git a/src/test/components/tests/test_attr_max_size.py b/src/test/components/tests/test_attr_max_size.py deleted file mode 100644 index e7f91e65..00000000 --- a/src/test/components/tests/test_attr_max_size.py +++ /dev/null @@ -1,39 +0,0 @@ -import unittest -from test.test_utils.span_exporter import wait_for_exporter -from test.test_utils.spans_parser import SpansContainer - -import requests - - -class TestLargeSpans(unittest.TestCase): - def test_large_span_attribute_size_max_size_env_var_was_set(self): - response = requests.get("http://localhost:8020/invoke-requests-large-response") - response.raise_for_status() - - body = response.json() - - assert body is not None - - wait_for_exporter() - - spans_container = SpansContainer.get_spans_from_file() - self.assertEqual(4, len(spans_container.spans)) - - # assert root - root = spans_container.get_first_root() - self.assertIsNotNone(root) - root_attributes = root["attributes"] - self.assertEqual(root_attributes["http.status_code"], 200) - - self.assert_attribute_length(root_attributes, 1) - - # assert child spans - children = spans_container.get_non_internal_children() - self.assertEqual(1, len(children)) - child_attributes = children[0]["attributes"] - self.assert_attribute_length(child_attributes, 1) - - def assert_attribute_length(self, child_attributes: dict, length: int) -> None: - for k, v in child_attributes.items(): - if isinstance(v, str): - assert len(v) == length diff --git a/src/test/components/tests/test_execution_tags.py b/src/test/components/tests/test_execution_tags.py index 030f9789..15e3b2c9 100644 --- a/src/test/components/tests/test_execution_tags.py +++ b/src/test/components/tests/test_execution_tags.py @@ -1,34 +1,48 @@ import unittest -from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests +from .app_runner import FastApiApp + +APP_PORT = 8020 +EXTERNAL_APP_PORT = 8021 + class TestExecutionTags(unittest.TestCase): def test_execution_tag(self): - response = requests.get("http://localhost:8020/invoke-request") - response.raise_for_status() - - body = response.json() - - assert body is not None - - wait_for_exporter() - - spans_container = SpansContainer.get_spans_from_file() - self.assertEqual(4, len(spans_container.spans)) - - # assert root - root = spans_container.get_first_root() - self.assertIsNotNone(root) - root_attributes = root["attributes"] - self.assertEqual( - type(root_attributes["lumigo.execution_tags.response_len"]), int - ) - self.assertEqual(type(root_attributes["lumigo.execution_tags.response"]), str) - self.assertEqual(root_attributes["lumigo.execution_tags.app"], True) - self.assertEqual( - root_attributes["lumigo.execution_tags.app.response"], "success" - ) - self.assertEqual(root_attributes["lumigo.execution_tags.foo"], ["bar", "baz"]) + with FastApiApp("app:app", APP_PORT), FastApiApp( + "fastapi_external_apis.app:app", EXTERNAL_APP_PORT + ): + response = requests.get(f"http://localhost:{APP_PORT}/invoke-request") + response.raise_for_status() + + body = response.json() + + assert body is not None + + spans_container = SpansContainer.get_spans_from_file( + wait_time_sec=10, expected_span_count=4 + ) + self.assertEqual(4, len(spans_container.spans)) + + # assert root + root = spans_container.get_first_root() + self.assertIsNotNone(root) + root_attributes = root["attributes"] + self.assertEqual( + type(root_attributes["lumigo.execution_tags.response_len"]), int + ) + self.assertEqual( + type(root_attributes["lumigo.execution_tags.response"]), str + ) + self.assertEqual(root_attributes["lumigo.execution_tags.app"], True) + self.assertEqual( + root_attributes["lumigo.execution_tags.app.response"], "success" + ) + self.assertEqual( + root_attributes["lumigo.execution_tags.foo"], ["bar", "baz"] + ) + self.assertEqual( + root_attributes["lumigo.execution_tags.foo"], ["bar", "baz"] + ) diff --git a/src/test/integration/boto3-sqs/conftest.py b/src/test/integration/boto3-sqs/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/boto3-sqs/conftest.py +++ b/src/test/integration/boto3-sqs/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/boto3/conftest.py b/src/test/integration/boto3/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/boto3/conftest.py +++ b/src/test/integration/boto3/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/conftest.py b/src/test/integration/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/conftest.py +++ b/src/test/integration/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/django/conftest.py b/src/test/integration/django/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/django/conftest.py +++ b/src/test/integration/django/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/fastapi/app/__init__.py b/src/test/integration/fastapi/app/__init__.py index 929e7049..52afd158 100644 --- a/src/test/integration/fastapi/app/__init__.py +++ b/src/test/integration/fastapi/app/__init__.py @@ -1,4 +1,3 @@ -import requests from fastapi import FastAPI app = FastAPI() @@ -9,16 +8,14 @@ async def root(): return {"message": "Hello FastAPI!"} -@app.get("/invoke-requests") -def invoke_requests(): - response = requests.get("https://api.chucknorris.io/jokes/random") - return response.json() +@app.post("/invoke-requests") +def invoke_requests(request: dict): + return {"url": "http://sheker.kol.shehoo:8021/little-response", "data": "a" * 100} @app.get("/invoke-requests-large-response") def invoke_requests_big_response(): - response = requests.get( - "http://universities.hipolabs.com/search?country=United+States" - ) - response.raise_for_status() - return response.json() + return {"data": "a" * 10_000} + + +print("FastAPI app started") diff --git a/src/test/integration/fastapi/conftest.py b/src/test/integration/fastapi/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/fastapi/conftest.py +++ b/src/test/integration/fastapi/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/fastapi/scripts/start_uvicorn b/src/test/integration/fastapi/scripts/start_uvicorn deleted file mode 100755 index b35745ed..00000000 --- a/src/test/integration/fastapi/scripts/start_uvicorn +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -uvicorn app:app --port 8000 & \ No newline at end of file diff --git a/src/test/integration/fastapi/start_uvicorn.py b/src/test/integration/fastapi/start_uvicorn.py new file mode 100644 index 00000000..67e98b16 --- /dev/null +++ b/src/test/integration/fastapi/start_uvicorn.py @@ -0,0 +1,15 @@ +import argparse +import uvicorn + + +def serve(app, port): + """Serve the web application.""" + uvicorn.run(app, port=port) + + +if __name__ == "__main__": + argParser = argparse.ArgumentParser() + argParser.add_argument("--app", help="app name") + argParser.add_argument("--port", type=int, help="port number") + args = argParser.parse_args() + serve(args.app, args.port) diff --git a/src/test/integration/fastapi/tests/app_runner.py b/src/test/integration/fastapi/tests/app_runner.py new file mode 100644 index 00000000..7cf06620 --- /dev/null +++ b/src/test/integration/fastapi/tests/app_runner.py @@ -0,0 +1,54 @@ +import os +import subprocess +import sys +from pathlib import Path +from test.test_utils.processes import kill_process + + +class FastApiApp(object): + def __init__(self, app: str, port: int): + self.app = app + self.port = port + cwd = Path(__file__).parent.parent + print(f"cwd = {cwd}") + env = { + **os.environ, + "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", + "OTEL_SERVICE_NAME": "fastapi_test_app", + "LUMIGO_DEBUG_SPANDUMP": os.environ["LUMIGO_DEBUG_SPANDUMP"], + } + print(f"venv bin path = {Path(sys.executable).parent}") + cmd = [ + sys.executable, + "start_uvicorn.py", + "--app", + self.app, + "--port", + str(self.port), + ] + print(f"cmd = {cmd}") + self.process = subprocess.Popen( + cmd, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + is_app_running = False + for line in self.process.stderr: + print(line) + if "Uvicorn running" in str(line): + is_app_running = True + break + if not is_app_running: + raise Exception( + f"FastApiApp app '{self.app}' failed to start on port {self.port}" + ) + + def __enter__(self): + return self + + def __exit__(self, *args): + # because we need a shell to run uvicorn we need to kill multiple processs, + # but not the process group because that includes the test process as well + kill_process(["uvicorn", self.app, str(self.port)]) diff --git a/src/test/integration/fastapi/tests/test_fastapi.py b/src/test/integration/fastapi/tests/test_fastapi.py index 6fab6144..ef7d8b7c 100644 --- a/src/test/integration/fastapi/tests/test_fastapi.py +++ b/src/test/integration/fastapi/tests/test_fastapi.py @@ -1,116 +1,152 @@ import unittest -from test.test_utils.span_exporter import wait_for_exporter from test.test_utils.spans_parser import SpansContainer import requests +from .app_runner import FastApiApp + +APP_PORT = 8020 +EXTERNAL_APP_PORT = 8021 + class TestFastApiSpans(unittest.TestCase): def test_200_OK(self): - response = requests.get("http://localhost:8000/") - response.raise_for_status() + with FastApiApp("app:app", APP_PORT): + response = requests.get(f"http://localhost:{APP_PORT}/") + response.raise_for_status() + + body = response.json() + + self.assertEqual(body, {"message": "Hello FastAPI!"}) + + spans_container = SpansContainer.get_spans_from_file( + wait_time_sec=10, expected_span_count=3 + ) + self.assertEqual(3, len(spans_container.spans)) + + # assert root + root = spans_container.get_first_root() + self.assertIsNotNone(root) + self.assertEqual(root["kind"], "SpanKind.SERVER") + self.assertEqual(root["attributes"]["http.status_code"], 200) + self.assertEqual(root["attributes"]["http.method"], "GET") + self.assertEqual( + root["attributes"]["http.url"], f"http://127.0.0.1:{APP_PORT}/" + ) - body = response.json() + # assert internal spans + internals = spans_container.get_internals() + self.assertEqual(2, len(internals)) + self.assertIsNotNone( + spans_container.get_attribute_from_list_of_spans( + internals, "http.response.headers" + ) + ) + self.assertIsNotNone( + spans_container.get_attribute_from_list_of_spans( + internals, "http.response.body" + ) + ) - self.assertEqual(body, {"message": "Hello FastAPI!"}) + def test_requests_instrumentation(self): + with FastApiApp("app:app", APP_PORT): + response = requests.post( + f"http://localhost:{APP_PORT}/invoke-requests", json={"a": "b"} + ) + response.raise_for_status() - wait_for_exporter() + body = response.json() - spans_container = SpansContainer.get_spans_from_file() - self.assertEqual(3, len(spans_container.spans)) + expected_url = "http://sheker.kol.shehoo:8021/little-response" - # assert root - root = spans_container.get_first_root() - self.assertIsNotNone(root) - self.assertEqual(root["kind"], "SpanKind.SERVER") - self.assertEqual(root["attributes"]["http.status_code"], 200) - self.assertEqual(root["attributes"]["http.method"], "GET") - self.assertEqual(root["attributes"]["http.url"], "http://127.0.0.1:8000/") + self.assertIn(expected_url, body["url"]) - # assert internal spans - internals = spans_container.get_internals() - self.assertEqual(2, len(internals)) - self.assertIsNotNone( - spans_container.get_attribute_from_list_of_spans( - internals, "http.response.headers" + spans_container = SpansContainer.get_spans_from_file( + wait_time_sec=10, expected_span_count=4 ) - ) - self.assertIsNotNone( - spans_container.get_attribute_from_list_of_spans( - internals, "http.response.body" + self.assertEqual(5, len(spans_container.spans)) + + # assert root + root = spans_container.get_first_root() + self.assertIsNotNone(root) + self.assertEqual(root["kind"], "SpanKind.SERVER") + self.assertEqual(root["attributes"]["http.status_code"], 200) + self.assertIsNotNone(root["attributes"]["http.request.headers"]) + self.assertEqual( + root["attributes"]["http.url"], + f"http://127.0.0.1:{APP_PORT}/invoke-requests", ) - ) - def test_requests_instrumentation(self): - response = requests.get("http://localhost:8000/invoke-requests") - response.raise_for_status() - - body = response.json() - - self.assertIn("https://api.chucknorris.io/jokes/", body["url"]) - - wait_for_exporter() - - spans_container = SpansContainer.get_spans_from_file() - self.assertEqual(4, len(spans_container.spans)) - - # assert root - root = spans_container.get_first_root() - self.assertIsNotNone(root) - self.assertEqual(root["kind"], "SpanKind.SERVER") - self.assertEqual(root["attributes"]["http.status_code"], 200) - self.assertEqual( - root["attributes"]["http.url"], "http://127.0.0.1:8000/invoke-requests" - ) - - # assert child spans - children = spans_container.get_non_internal_children() - self.assertEqual(1, len(children)) - self.assertEqual(children[0]["attributes"]["http.method"], "GET") - self.assertEqual( - children[0]["attributes"]["http.url"], - "https://api.chucknorris.io/jokes/random", - ) - self.assertEqual(children[0]["attributes"]["http.status_code"], 200) - self.assertIsNotNone(children[0]["attributes"]["http.request.headers"]) - self.assertIsNotNone(children[0]["attributes"]["http.response.headers"]) - self.assertIsNotNone(children[0]["attributes"]["http.response.body"]) + # assert internal spans + internals = spans_container.get_internals() + self.assertEqual(4, len(internals)) + # assert than either of the internal spans have the required attributes + self.assertIsNotNone( + spans_container.get_attribute_from_list_of_spans( + internals, "http.request.body" + ) + ) + self.assertIsNotNone( + spans_container.get_attribute_from_list_of_spans( + internals, "http.response.headers" + ) + ) + self.assertIsNotNone( + spans_container.get_attribute_from_list_of_spans( + internals, "http.response.body" + ) + ) + self.assertEqual( + spans_container.get_attribute_from_list_of_spans( + internals, "http.status_code" + ), + 200, + ) def test_large_span_attribute_size_default_max_size(self): - response = requests.get("http://localhost:8000/invoke-requests-large-response") - response.raise_for_status() - - body = response.json() - - assert body is not None - - wait_for_exporter() - - spans_container = SpansContainer.get_spans_from_file() - self.assertEqual(4, len(spans_container.spans)) - - # assert root - root = spans_container.get_first_root() - self.assertIsNotNone(root) - root_attributes = root["attributes"] - self.assertEqual(root_attributes["http.status_code"], 200) - self.assertEqual( - root_attributes["http.url"], - "http://127.0.0.1:8000/invoke-requests-large-response", - ) - self.assertEqual(root_attributes["http.method"], "GET") - - # assert child spans - children = spans_container.get_non_internal_children() - self.assertEqual(1, len(children)) - children_attributes = children[0]["attributes"] - self.assertEqual(children_attributes["http.method"], "GET") - self.assertEqual( - children_attributes["http.url"], - "http://universities.hipolabs.com/search?country=United+States", - ) - self.assertEqual(len(children_attributes["http.response.body"]), 2048) - self.assertEqual(children_attributes["http.status_code"], 200) - self.assertIsNotNone(children_attributes["http.request.headers"]) - self.assertIsNotNone(children_attributes["http.response.headers"]) - self.assertIsNotNone(children_attributes["http.response.body"]) + with FastApiApp("app:app", APP_PORT): + response = requests.get( + f"http://localhost:{APP_PORT}/invoke-requests-large-response" + ) + response.raise_for_status() + + body = response.json() + + assert body is not None + + spans_container = SpansContainer.get_spans_from_file( + wait_time_sec=10, expected_span_count=3 + ) + self.assertEqual(3, len(spans_container.spans)) + + # assert root + root = spans_container.get_first_root() + self.assertIsNotNone(root) + root_attributes = root["attributes"] + self.assertEqual(root_attributes["http.status_code"], 200) + self.assertEqual( + root_attributes["http.url"], + f"http://127.0.0.1:{APP_PORT}/invoke-requests-large-response", + ) + self.assertEqual(root_attributes["http.method"], "GET") + + # assert internal spans + internals = spans_container.get_internals() + self.assertEqual(2, len(internals)) + # assert than either of the internal spans have the required attributes + self.assertIsNotNone( + spans_container.get_attribute_from_list_of_spans( + internals, "http.response.headers" + ) + ) + http_response_body_attr = spans_container.get_attribute_from_list_of_spans( + internals, "http.response.body" + ) + self.assertIsNotNone(http_response_body_attr) + self.assertEqual(len(http_response_body_attr), 2048) + self.assertEqual( + spans_container.get_attribute_from_list_of_spans( + internals, "http.status_code" + ), + 200, + ) diff --git a/src/test/integration/flask/conftest.py b/src/test/integration/flask/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/flask/conftest.py +++ b/src/test/integration/flask/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/grpcio/app/greeter_server.py b/src/test/integration/grpcio/app/greeter_server.py index 1e47fde8..decc0505 100644 --- a/src/test/integration/grpcio/app/greeter_server.py +++ b/src/test/integration/grpcio/app/greeter_server.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. """The Python implementation of the GRPC helloworld.Greeter server.""" +import logging import threading from concurrent import futures -import logging from typing import Iterable import grpc @@ -26,8 +26,6 @@ class Greeter(helloworld_pb2_grpc.GreeterServicer): def SayHelloUnaryUnary(self, request, context): - if request.name == "exit": - done.set() return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) def SayHelloUnaryStream( @@ -54,6 +52,7 @@ def serve(): server.add_insecure_port("[::]:" + port) server.start() print("Server started, listening on " + port) + # wait until the server process is killed done.wait() server.stop(1).wait() diff --git a/src/test/integration/grpcio/conftest.py b/src/test/integration/grpcio/conftest.py new file mode 100644 index 00000000..6619ae4e --- /dev/null +++ b/src/test/integration/grpcio/conftest.py @@ -0,0 +1,8 @@ +from test.test_utils.spans_parser import SpansContainer + +import pytest + + +@pytest.fixture(autouse=True) +def increment_spans_counter(): + SpansContainer.update_span_offset() diff --git a/src/test/integration/grpcio/requirements_others.txt b/src/test/integration/grpcio/requirements_others.txt index 7d0ca819..a40c1c68 100644 --- a/src/test/integration/grpcio/requirements_others.txt +++ b/src/test/integration/grpcio/requirements_others.txt @@ -1,2 +1,3 @@ -pytest grpcio-tools +psutil +pytest diff --git a/src/test/integration/grpcio/scripts/start_server b/src/test/integration/grpcio/scripts/start_server deleted file mode 100755 index 1992cb12..00000000 --- a/src/test/integration/grpcio/scripts/start_server +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh - -cd app || exit -python greeter_server.py & diff --git a/src/test/integration/grpcio/tests/app_runner.py b/src/test/integration/grpcio/tests/app_runner.py new file mode 100644 index 00000000..c16811e1 --- /dev/null +++ b/src/test/integration/grpcio/tests/app_runner.py @@ -0,0 +1,42 @@ +import os +import subprocess +import sys +from pathlib import Path +from test.test_utils.processes import kill_process, wait_for_app_start + + +class GreeterServerApp(object): + def __init__(self): + self.app = "greeter_server.py" + cwd = Path(__file__).parent.parent / "app" + print(f"cwd = {cwd}") + env = { + **os.environ, + "AUTOWRAPT_BOOTSTRAP": "lumigo_opentelemetry", + "LUMIGO_DEBUG_SPANDUMP": os.environ["SERVER_SPANDUMP"], + "OTEL_SERVICE_NAME": "grpcio_test_app", + } + print(f"venv bin path = {Path(sys.executable).parent}") + cmd = [ + sys.executable, + self.app, + ] + print(f"cmd = {cmd}") + self.process = subprocess.Popen( + cmd, + cwd=cwd, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + # checking the stderr stream for the "Server started" message breaks the + # pycharm debugger + wait_for_app_start() + + def __enter__(self): + return self + + def __exit__(self, *args): + # because we need a shell to run uvicorn we need to kill multiple processs, + # but not the process group because that includes the test process as well + kill_process(self.app) diff --git a/src/test/integration/grpcio/tests/test_grpcio.py b/src/test/integration/grpcio/tests/test_grpcio.py index bce80593..908848ec 100644 --- a/src/test/integration/grpcio/tests/test_grpcio.py +++ b/src/test/integration/grpcio/tests/test_grpcio.py @@ -1,7 +1,8 @@ import os import sys import unittest -from test.test_utils.span_exporter import wait_for_exporter + +from .app_runner import GreeterServerApp sys.path.append(os.path.join(os.path.dirname(__file__), "..", "app")) @@ -13,27 +14,23 @@ class TestGrpcioSpans(unittest.TestCase): - @classmethod - def tearDownClass(cls) -> None: - with grpc.insecure_channel("localhost:50051") as channel: - stub = helloworld_pb2_grpc.GreeterStub(channel) - stub.SayHelloUnaryUnary(helloworld_pb2.HelloRequest(name="exit")) - def check_spans(self, method: str, request_payload: str, response_payload: str): - wait_for_exporter(5) - - server_file = os.getenv("SERVER_SPANDUMP") - server_spans = SpansContainer.get_spans_from_file(server_file) - client_spans = SpansContainer.get_spans_from_file() - + server_spans = SpansContainer.get_spans_from_file( + path=os.environ["SERVER_SPANDUMP"], wait_time_sec=10, expected_span_count=1 + ) assert len(server_spans.spans) == 1 + server_span = server_spans.spans[0] assert server_span["kind"] == "SpanKind.SERVER" assert server_span["attributes"]["rpc.method"] == method assert server_span["attributes"]["rpc.service"] == "helloworld.Greeter" assert server_span["attributes"]["rpc.system"] == "grpc" + client_spans = SpansContainer.get_spans_from_file( + wait_time_sec=10, expected_span_count=1 + ) assert len(client_spans.spans) == 1 + client_span = client_spans.spans[0] assert client_span["kind"] == "SpanKind.CLIENT" assert client_span["attributes"]["rpc.method"] == method @@ -55,34 +52,34 @@ def check_spans(self, method: str, request_payload: str, response_payload: str): return server_span, client_span def test_grpcio_instrumentation_unary_unary(self): - with grpc.insecure_channel("localhost:50051") as channel: + with GreeterServerApp(), grpc.insecure_channel("localhost:50051") as channel: stub = helloworld_pb2_grpc.GreeterStub(channel) response = stub.SayHelloUnaryUnary(helloworld_pb2.HelloRequest(name="you")) - self.assertEqual(response.message, "Hello, you!") - self.check_spans( - method="SayHelloUnaryUnary", - request_payload='name: "you"\n', - response_payload='message: "Hello, you!"\n', - ) + self.assertEqual(response.message, "Hello, you!") + self.check_spans( + method="SayHelloUnaryUnary", + request_payload='name: "you"\n', + response_payload='message: "Hello, you!"\n', + ) def test_grpcio_instrumentation_unary_stream(self): - with grpc.insecure_channel("localhost:50051") as channel: + with GreeterServerApp(), grpc.insecure_channel("localhost:50051") as channel: stub = helloworld_pb2_grpc.GreeterStub(channel) response = stub.SayHelloUnaryStream(helloworld_pb2.HelloRequest(name="you")) all_responses = list(response) - self.assertEqual(len(all_responses), 2) - self.assertEqual(all_responses[0].message, "First hello, you!") - self.assertEqual(all_responses[1].message, "Second hello, you!") - self.check_spans( - method="SayHelloUnaryStream", - request_payload='name: "you"\n', - response_payload='message: "First hello, you!"\n,message: "Second hello, you!"\n', - ) + self.assertEqual(len(all_responses), 2) + self.assertEqual(all_responses[0].message, "First hello, you!") + self.assertEqual(all_responses[1].message, "Second hello, you!") + self.check_spans( + method="SayHelloUnaryStream", + request_payload='name: "you"\n', + response_payload='message: "First hello, you!"\n,message: "Second hello, you!"\n', + ) def test_grpcio_instrumentation_stream_unary(self): - with grpc.insecure_channel("localhost:50051") as channel: + with GreeterServerApp(), grpc.insecure_channel("localhost:50051") as channel: stub = helloworld_pb2_grpc.GreeterStub(channel) response = stub.SayHelloStreamUnary( iter( @@ -93,15 +90,15 @@ def test_grpcio_instrumentation_stream_unary(self): ) ) - self.assertEqual(response.message, "Hello, you1,you2!") - self.check_spans( - method="SayHelloStreamUnary", - request_payload='name: "you1"\n,name: "you2"\n', - response_payload='message: "Hello, you1,you2!"\n', - ) + self.assertEqual(response.message, "Hello, you1,you2!") + self.check_spans( + method="SayHelloStreamUnary", + request_payload='name: "you1"\n,name: "you2"\n', + response_payload='message: "Hello, you1,you2!"\n', + ) def test_grpcio_instrumentation_stream_stream(self): - with grpc.insecure_channel("localhost:50051") as channel: + with GreeterServerApp(), grpc.insecure_channel("localhost:50051") as channel: stub = helloworld_pb2_grpc.GreeterStub(channel) response = stub.SayHelloStreamStream( iter( @@ -113,11 +110,11 @@ def test_grpcio_instrumentation_stream_stream(self): ) all_responses = list(response) - self.assertEqual(len(all_responses), 2) - self.assertEqual(all_responses[0].message, "First hello, you1,you2!") - self.assertEqual(all_responses[1].message, "Second hello, you1,you2!") - self.check_spans( - method="SayHelloStreamStream", - request_payload='name: "you1"\n,name: "you2"\n', - response_payload='message: "First hello, you1,you2!"\n,message: "Second hello, you1,you2!"\n', - ) + self.assertEqual(len(all_responses), 2) + self.assertEqual(all_responses[0].message, "First hello, you1,you2!") + self.assertEqual(all_responses[1].message, "Second hello, you1,you2!") + self.check_spans( + method="SayHelloStreamStream", + request_payload='name: "you1"\n,name: "you2"\n', + response_payload='message: "First hello, you1,you2!"\n,message: "Second hello, you1,you2!"\n', + ) diff --git a/src/test/integration/kafka_python/conftest.py b/src/test/integration/kafka_python/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/kafka_python/conftest.py +++ b/src/test/integration/kafka_python/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/kafka_python/tests/test_kafka_python.py b/src/test/integration/kafka_python/tests/test_kafka_python.py index c3bf71e1..946aeb60 100644 --- a/src/test/integration/kafka_python/tests/test_kafka_python.py +++ b/src/test/integration/kafka_python/tests/test_kafka_python.py @@ -47,7 +47,7 @@ def test_kafka_python_instrumentation(self): spans_container = SpansContainer.get_spans_from_file() - assert len(spans_container.spans) == 10 + assert len(spans_container.spans) > 10 for span in spans_container.spans: if span["name"].startswith(test_topic): diff --git a/src/test/integration/pika/conftest.py b/src/test/integration/pika/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/pika/conftest.py +++ b/src/test/integration/pika/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/pika/tests/test_pika.py b/src/test/integration/pika/tests/test_pika.py index feba7016..1b5c73a2 100644 --- a/src/test/integration/pika/tests/test_pika.py +++ b/src/test/integration/pika/tests/test_pika.py @@ -58,7 +58,7 @@ def test_pika_instrumentation(self): spans_container = SpansContainer.get_spans_from_file() - assert len(spans_container.spans) == 10 + assert len(spans_container.spans) > 10 for span in spans_container.spans: if span["name"].startswith(test_topic): diff --git a/src/test/integration/psycopg2/conftest.py b/src/test/integration/psycopg2/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/psycopg2/conftest.py +++ b/src/test/integration/psycopg2/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/pymongo/conftest.py b/src/test/integration/pymongo/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/pymongo/conftest.py +++ b/src/test/integration/pymongo/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/pymysql/conftest.py b/src/test/integration/pymysql/conftest.py index 988cb3d9..6619ae4e 100644 --- a/src/test/integration/pymysql/conftest.py +++ b/src/test/integration/pymysql/conftest.py @@ -1,8 +1,8 @@ -import pytest - from test.test_utils.spans_parser import SpansContainer +import pytest + @pytest.fixture(autouse=True) def increment_spans_counter(): - SpansContainer.increment_spans() + SpansContainer.update_span_offset() diff --git a/src/test/integration/redis/tests/test_redis.py b/src/test/integration/redis/tests/test_redis.py index fbd108cc..dee8c976 100644 --- a/src/test/integration/redis/tests/test_redis.py +++ b/src/test/integration/redis/tests/test_redis.py @@ -104,11 +104,11 @@ def test_redis_transaction(self): self.assertEqual( transaction_span[ATTRIBUTES][DB_STATEMENT].split("\n"), [ - "SET my-key pre-key-value", - "GET my-key", - "SET my-key key-value", - "GET my-key", - "GET unknown-key", + "SET ? ?", + "GET ?", + "SET ? ?", + "GET ?", + "GET ?", ], ) self.assertEqual( diff --git a/src/test/test_utils/processes.py b/src/test/test_utils/processes.py new file mode 100644 index 00000000..b32d3348 --- /dev/null +++ b/src/test/test_utils/processes.py @@ -0,0 +1,69 @@ +import time +from typing import List, Union + +import psutil + + +def kill_process(process_names: Union[str, List[str]]) -> None: + if isinstance(process_names, str): + process_names = [process_names] + proc_name = "undefined" + cmd_line = "undefined" + # Kill all processes with the given name + for proc in psutil.process_iter(attrs=["pid", "name", "cmdline"], ad_value=None): + try: + proc_name = proc.name() + if proc.status() == psutil.STATUS_ZOMBIE: + continue + # The python process is named "Python" on OS X and "uvicorn" on CircleCI + if is_process_match(proc_name, process_names): + print(f"Killing process with name {proc_name}...") + proc.kill() + elif proc_name.lower().startswith("python"): + # drop the first argument, which is the python executable + python_command_parts = proc.cmdline()[1:] + # the initial command part is the last part of the path + python_command_parts[0] = python_command_parts[0].split("/")[-1] + # combine the remaining arguments + command = " ".join(python_command_parts) + print( + f"Evaluating process with name '{proc_name}' and command '{command}'..." + ) + if ( + len(cmd_line) > 1 + and "nox" not in command + and is_process_match(command, process_names) + ): + print( + f"Killing process with name '{proc_name}' and command '{command}'..." + ) + proc.kill() + except psutil.ZombieProcess as zp: + print( + f"Failed to kill zombie process {print_process_identifier(proc_name, cmd_line, process_names)}: {str(zp)}" + ) + except psutil.NoSuchProcess as nsp: + print( + f"Failed to kill process {print_process_identifier(proc_name, cmd_line, process_names)}: {str(nsp)}" + ) + + +def is_process_match(command: str, process_names: List[str]) -> bool: + if len(process_names) == 1: + command_parts = command.split(" ") + if command_parts[0] == process_names[0]: + return True + if len(process_names) > 1 and all( + [process_name in command for process_name in process_names] + ): + return True + return False + + +def print_process_identifier(proc_name: str, cmd_line: str, process_names: List[str]): + return f"process '{proc_name}' (looking for {','.join(process_names)}) with command line '{cmd_line}'" + + +def wait_for_app_start(): + # TODO Make this deterministic + time.sleep(8) diff --git a/src/test/test_utils/spans_parser.py b/src/test/test_utils/spans_parser.py index 630ccd1a..e003d3fb 100644 --- a/src/test/test_utils/spans_parser.py +++ b/src/test/test_utils/spans_parser.py @@ -2,6 +2,7 @@ import json import os +import time from dataclasses import dataclass from typing import Any, Dict, List, Optional @@ -9,7 +10,7 @@ class SpansCounter: - counter = 0 + counters = {} spanCounter = SpansCounter() @@ -20,23 +21,46 @@ class SpansContainer: spans: List[Dict[str, Any]] @staticmethod - def increment_spans(): - spanCounter.counter = sum(1 for line in open(SPANS_FILE_FULL_PATH)) + def update_span_offset(path: str = SPANS_FILE_FULL_PATH): + spanCounter.counters[path] = sum(1 for _ in open(path)) @staticmethod - def parse_spans_from_file(path: Optional[str] = None) -> SpansContainer: - with open(path or SPANS_FILE_FULL_PATH) as file: + def get_span_offset(path: str = SPANS_FILE_FULL_PATH) -> int: + return spanCounter.counters.get(path, 0) + + @staticmethod + def parse_spans_from_file( + path: Optional[str] = SPANS_FILE_FULL_PATH, + ) -> SpansContainer: + with open(path) as file: spans = [json.loads(line) for line in file.readlines()] return SpansContainer(spans=spans) @staticmethod - def get_spans_from_file(path: Optional[str] = None) -> SpansContainer: - spans = SpansContainer.parse_spans_from_file(path).spans - return SpansContainer(spans=spans[spanCounter.counter :]) # noqa - - def get_first_root(self) -> Dict[str, Any]: - return self.get_root_spans()[0] + def get_spans_from_file( + path: Optional[str] = SPANS_FILE_FULL_PATH, + wait_time_sec: int = 3, + expected_span_count: int = 0, + ) -> SpansContainer: + waited_time_in_sec = 0 + span_offset = SpansContainer.get_span_offset(path) + while waited_time_in_sec < wait_time_sec: + try: + spans = SpansContainer.parse_spans_from_file(path).spans + if len(spans) >= expected_span_count + span_offset: + return SpansContainer(spans=spans[span_offset:]) # noqa + except Exception as err: + print( + f"Failed to parse spans from file after {waited_time_in_sec}s: {err}" + ) + time.sleep(1) + waited_time_in_sec += 1 + return SpansContainer(spans=spans if spans else []) # noqa + + def get_first_root(self) -> Optional[Dict[str, Any]]: + root_spans = self.get_root_spans() + return root_spans[0] if root_spans else None def get_root_spans(self) -> List[Dict[str, Any]]: return list(filter(lambda item: item["parent_id"] is None, self.spans))