From 4d958f5315e01ad129526ca66763bb110bf5fb73 Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:24:10 +0530 Subject: [PATCH 1/2] Support rendering build operator task-id with non-ASCII characters (#1415) In PR [#1278](https://github.com/astronomer/astronomer-cosmos/pull/1278), we introduced support for rendering non-ASCII characters in task IDs. However, due to limited access, we were unable to implement the same functionality for the build operator. This PR aims to extend that functionality by adding support for rendering build task IDs with non-ASCII characters. --- cosmos/airflow/graph.py | 12 ++++++++--- tests/airflow/test_graph.py | 41 +++++++++++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index d20a7de22..662833cda 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -138,18 +138,22 @@ def _get_task_id_and_args( use_task_group: bool, normalize_task_id: Callable[..., Any] | None, resource_suffix: str, + include_resource_type: bool = False, ) -> tuple[str, dict[str, Any]]: """ Generate task ID and update args with display name if needed. """ args_update = args + task_display_name = f"{node.name}_{resource_suffix}" + if include_resource_type: + task_display_name = f"{node.name}_{node.resource_type.value}_{resource_suffix}" if use_task_group: task_id = resource_suffix elif normalize_task_id: task_id = normalize_task_id(node) - args_update["task_display_name"] = f"{node.name}_{resource_suffix}" + args_update["task_display_name"] = task_display_name else: - task_id = f"{node.name}_{resource_suffix}" + task_id = task_display_name return task_id, args_update @@ -214,7 +218,9 @@ def create_task_metadata( } if test_behavior == TestBehavior.BUILD and node.resource_type in SUPPORTED_BUILD_RESOURCES: - task_id = f"{node.name}_{node.resource_type.value}_build" + task_id, args = _get_task_id_and_args( + node, args, use_task_group, normalize_task_id, "build", include_resource_type=True + ) elif node.resource_type == DbtResourceType.MODEL: task_id, args = _get_task_id_and_args(node, args, use_task_group, normalize_task_id, "run") elif node.resource_type == DbtResourceType.SOURCE: diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index c00f0cf53..61e6c3a8f 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -621,7 +621,7 @@ def _normalize_task_id(node: DbtNode) -> str: reason="Airflow task did not have display_name until the 2.9 release", ) @pytest.mark.parametrize( - "node_type,node_id,normalize_task_id,use_task_group,expected_node_id,expected_display_name", + "node_type,node_id,normalize_task_id,use_task_group,test_behavior,expected_node_id,expected_display_name", [ # normalize_task_id is None (default) ( @@ -629,6 +629,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", None, False, + None, "test_node_run", None, ), @@ -637,6 +638,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.SOURCE.value}.my_folder.test_node", None, False, + None, "test_node_source", None, ), @@ -645,15 +647,26 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.SEED.value}.my_folder.test_node", None, False, + None, "test_node_seed", None, ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.SEED.value}.my_folder.test_node", + None, + False, + TestBehavior.BUILD, + "test_node_seed_build", + None, + ), # normalize_task_id is passed and use_task_group is False ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, False, + None, "new_task_id_test_node_model", "test_node_run", ), @@ -662,6 +675,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, False, + None, "new_task_id_test_node_source", "test_node_source", ), @@ -670,15 +684,26 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, False, + None, "new_task_id_test_node_seed", "test_node_seed", ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _normalize_task_id, + False, + TestBehavior.BUILD, + "new_task_id_test_node_seed", + "test_node_seed_build", + ), # normalize_task_id is passed and use_task_group is True ( DbtResourceType.MODEL, f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, True, + None, "run", None, ), @@ -687,6 +712,7 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, True, + None, "source", None, ), @@ -695,13 +721,23 @@ def _normalize_task_id(node: DbtNode) -> str: f"{DbtResourceType.MODEL.value}.my_folder.test_node", _normalize_task_id, True, + None, "seed", None, ), + ( + DbtResourceType.SEED, + f"{DbtResourceType.MODEL.value}.my_folder.test_node", + _normalize_task_id, + True, + TestBehavior.BUILD, + "build", + None, + ), ], ) def test_create_task_metadata_normalize_task_id( - node_type, node_id, normalize_task_id, use_task_group, expected_node_id, expected_display_name + node_type, node_id, normalize_task_id, use_task_group, test_behavior, expected_node_id, expected_display_name ): node = DbtNode( unique_id=node_id, @@ -720,6 +756,7 @@ def test_create_task_metadata_normalize_task_id( use_task_group=use_task_group, normalize_task_id=normalize_task_id, source_rendering_behavior=SourceRenderingBehavior.ALL, + test_behavior=test_behavior, ) assert metadata.id == expected_node_id if expected_display_name: From 103c2aee43318f1f7a1bdd4b694507ae31e93c7e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 20 Dec 2024 11:15:58 +0000 Subject: [PATCH 2/2] Emit telemetry to Scarf during DAG run (#1397) Export telemetry related to Cosmos usage to [Scarf](https://about.scarf.sh/). This data assists the project maintainers in better understanding how Cosmos is used. Insights from this telemetry are critical for prioritizing patches, minor releases, and security fixes. Additionally, this information supports critical decisions related to the development road map. Deployments and individual users can opt out of analytics by setting the configuration: ``` [cosmos] enable_telemetry: False ``` As described in the [official documentation](https://docs.scarf.sh/gateway/#do-not-track), it is also possible to opt-out by setting one of the following environment variables: ```commandline AIRFLOW__COSMOS__ENABLE_TELEMETRY=False DO_NOT_TRACK=True SCARF_NO_ANALYTICS=True ``` In addition to Scarf's default data collection, Cosmos collects the following information when running Cosmos-powered DAGs: - Cosmos version - Airflow version - Python version - Operating system & machine architecture - Event type - DAG hash - Total tasks - Total Cosmos tasks No user-identifiable information (IP included) is stored in Scarf, even though Scarf infers information from the IP, such as location, and stores that. The data collection is GDPR compliant. The Apache Foundation supports this same strategy in many of its OpenSource projects, including Airflow ([#39510](https://github.com/apache/airflow/pull/39510)). Example of visualisation of the data via the Scarf UI: Screenshot 2024-12-19 at 10 22 59 Screenshot 2024-12-19 at 10 23 13 Screenshot 2024-12-19 at 10 23 21 Screenshot 2024-12-19 at 10 23 28 Screenshot 2024-12-19 at 10 23 51 Screenshot 2024-12-19 at 10 24 01 Screenshot 2024-12-19 at 10 24 11 Screenshot 2024-12-19 at 10 24 20 Screenshot 2024-12-19 at 10 24 31 Screenshot 2024-12-19 at 10 24 39 Screenshot 2024-12-19 at 10 24 48 Closes: #1143 --- PRIVACY_NOTICE.rst | 41 ++++++++ README.rst | 4 +- cosmos/constants.py | 4 + cosmos/listeners/__init__.py | 0 cosmos/listeners/dag_run_listener.py | 84 +++++++++++++++ cosmos/plugin/__init__.py | 2 + cosmos/settings.py | 22 +++- cosmos/telemetry.py | 77 ++++++++++++++ docs/index.rst | 6 +- tests/listeners/test_dag_run_listener.py | 127 +++++++++++++++++++++++ tests/test_telemetry.py | 115 ++++++++++++++++++++ 11 files changed, 477 insertions(+), 5 deletions(-) create mode 100644 PRIVACY_NOTICE.rst create mode 100644 cosmos/listeners/__init__.py create mode 100644 cosmos/listeners/dag_run_listener.py create mode 100644 cosmos/telemetry.py create mode 100644 tests/listeners/test_dag_run_listener.py create mode 100644 tests/test_telemetry.py diff --git a/PRIVACY_NOTICE.rst b/PRIVACY_NOTICE.rst new file mode 100644 index 000000000..7477ee795 --- /dev/null +++ b/PRIVACY_NOTICE.rst @@ -0,0 +1,41 @@ +Privacy Notice +============== + +This project follows the `Privacy Policy of Astronomer `_. + +Collection of Data +------------------ + +Astronomer Cosmos integrates `Scarf `_ to collect basic telemetry data during operation. +This data assists the project maintainers in better understanding how Cosmos is used. +Insights gained from this telemetry are critical for prioritizing patches, minor releases, and +security fixes. Additionally, this information supports key decisions related to the development road map. + +Deployments and individual users can opt-out of analytics by setting the configuration: + + +.. code-block:: + + [cosmos] enable_telemetry False + + +As described in the `official documentation `_, it is also possible to opt out by setting one of the following environment variables: + +.. code-block:: + + DO_NOT_TRACK=True + SCARF_NO_ANALYTICS=True + + +In addition to Scarf's default data collection, Cosmos collect the following information when running Cosmos-powered DAGs: + +- Cosmos version +- Airflow version +- Python version +- Operating system & machine architecture +- Event type +- The DAG hash +- Total tasks +- Total Cosmos tasks + +No user-identifiable information (IP included) is stored in Scarf. diff --git a/README.rst b/README.rst index 7eb32bcac..e35b8a913 100644 --- a/README.rst +++ b/README.rst @@ -82,7 +82,9 @@ _______ Privacy Notice ______________ -This project follows `Astronomer's Privacy Policy `_ +The application and this website collect telemetry to support the project's development. These can be disabled by the end-users. + +Read the `Privacy Notice `_ to learn more about it. .. Tracking pixel for Scarf diff --git a/cosmos/constants.py b/cosmos/constants.py index 8378e8d10..0513d50d2 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -160,3 +160,7 @@ def _missing_value_(cls, value): # type: ignore TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED} DBT_COMPILE_TASK_ID = "dbt_compile" + +TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}/{cosmos_task_count}" +TELEMETRY_VERSION = "v1" +TELEMETRY_TIMEOUT = 1.0 diff --git a/cosmos/listeners/__init__.py b/cosmos/listeners/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cosmos/listeners/dag_run_listener.py b/cosmos/listeners/dag_run_listener.py new file mode 100644 index 000000000..0314c3474 --- /dev/null +++ b/cosmos/listeners/dag_run_listener.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from airflow.listeners import hookimpl +from airflow.models.dag import DAG +from airflow.models.dagrun import DagRun + +from cosmos import telemetry +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +class EventStatus: + SUCCESS = "success" + FAILED = "failed" + + +DAG_RUN = "dag_run" + + +def total_cosmos_tasks(dag: DAG) -> int: + """ + Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`. + + The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and + works. + """ + cosmos_tasks = 0 + for task in dag.task_dict.values(): + # In a real Airflow deployment, the following `task` is an instance of + # `airflow.serialization.serialized_objects.SerializedBaseOperator` + # and the only reference to Cosmos is in the _task_module. + # It is suboptimal, but works as of Airflow 2.10 + task_module = getattr(task, "_task_module", None) or task.__class__.__module__ + if task_module.startswith("cosmos."): + cosmos_tasks += 1 + return cosmos_tasks + + +# @provide_session +@hookimpl +def on_dag_run_success(dag_run: DagRun, msg: str) -> None: + logger.debug("Running on_dag_run_success") + # In a real Airflow deployment, the following `serialized_dag` is an instance of + # `airflow.serialization.serialized_objects.SerializedDAG` + # and it is not a subclass of DbtDag, nor contain any references to Cosmos + serialized_dag = dag_run.get_dag() + + if not total_cosmos_tasks(serialized_dag): + logger.debug("The DAG does not use Cosmos") + return + + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.SUCCESS, + "task_count": len(serialized_dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(serialized_dag), + } + + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + logger.debug("Completed on_dag_run_success") + + +@hookimpl +def on_dag_run_failed(dag_run: DagRun, msg: str) -> None: + logger.debug("Running on_dag_run_failed") + # In a real Airflow deployment, the following `serialized_dag` is an instance of + # `airflow.serialization.serialized_objects.SerializedDAG` + # and it is not a subclass of DbtDag, nor contain any references to Cosmos + serialized_dag = dag_run.get_dag() + + if not total_cosmos_tasks(serialized_dag): + logger.debug("The DAG does not use Cosmos") + return + + additional_telemetry_metrics = { + "dag_hash": dag_run.dag_hash, + "status": EventStatus.FAILED, + "task_count": len(serialized_dag.task_ids), + "cosmos_task_count": total_cosmos_tasks(serialized_dag), + } + + telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics) + logger.debug("Completed on_dag_run_failed") diff --git a/cosmos/plugin/__init__.py b/cosmos/plugin/__init__.py index 5997a5fe3..4bbea4fa2 100644 --- a/cosmos/plugin/__init__.py +++ b/cosmos/plugin/__init__.py @@ -10,6 +10,7 @@ from flask import abort, url_for from flask_appbuilder import AppBuilder, expose +from cosmos.listeners import dag_run_listener from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud if in_astro_cloud: @@ -269,3 +270,4 @@ class CosmosPlugin(AirflowPlugin): "href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs", } appbuilder_views = [item] + listeners = [dag_run_listener] diff --git a/cosmos/settings.py b/cosmos/settings.py index 5b24321c8..ba9da106a 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -37,12 +37,28 @@ remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) +AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") + +# The following environment variable is populated in Astro Cloud +in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" + try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") except airflow.exceptions.AirflowConfigException: LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE) -AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0") -# The following environment variable is populated in Astro Cloud -in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud" +def convert_to_boolean(value: str | None) -> bool: + """ + Convert a string that represents a boolean to a Python boolean. + """ + value = str(value).lower().strip() + if value in ("f", "false", "0", "", "none"): + return False + return True + + +# Telemetry-related settings +enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True) +do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK")) +no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS")) diff --git a/cosmos/telemetry.py b/cosmos/telemetry.py new file mode 100644 index 000000000..0e267b28b --- /dev/null +++ b/cosmos/telemetry.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import platform +from urllib import parse +from urllib.parse import urlencode + +import httpx +from airflow import __version__ as airflow_version + +import cosmos +from cosmos import constants, settings +from cosmos.log import get_logger + +logger = get_logger(__name__) + + +def should_emit() -> bool: + """ + Identify if telemetry metrics should be emitted or not. + """ + return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics + + +def collect_standard_usage_metrics() -> dict[str, object]: + """ + Return standard telemetry metrics. + """ + metrics = { + "cosmos_version": cosmos.__version__, # type: ignore[attr-defined] + "airflow_version": parse.quote(airflow_version), + "python_version": platform.python_version(), + "platform_system": platform.system(), + "platform_machine": platform.machine(), + "variables": {}, + } + return metrics + + +def emit_usage_metrics(metrics: dict[str, object]) -> bool: + """ + Emit desired telemetry metrics to remote telemetry endpoint. + + The metrics must contain the necessary fields to build the TELEMETRY_URL. + """ + query_string = urlencode(metrics) + telemetry_url = constants.TELEMETRY_URL.format( + **metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string + ) + logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics) + response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True) + if not response.is_success: + logger.warning( + "Unable to emit usage metrics to %s. Status code: %s. Message: %s", + telemetry_url, + response.status_code, + response.text, + ) + return response.is_success + + +def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool: + """ + Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics + and emit them to remote telemetry endpoint. + + :returns: If the event was successfully sent to the telemetry backend or not. + """ + if should_emit(): + metrics = collect_standard_usage_metrics() + metrics["event_type"] = event_type + metrics["variables"].update(additional_metrics) # type: ignore[attr-defined] + metrics.update(additional_metrics) + is_success = emit_usage_metrics(metrics) + return is_success + else: + logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.") + return False diff --git a/docs/index.rst b/docs/index.rst index e788bd04c..7a56b8df7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -137,10 +137,14 @@ _______ `Apache License 2.0 `_ + Privacy Notice ______________ -This project follows `Astronomer's Privacy Policy `_ +The application and this website collect telemetry to support the project's development. These can be disabled by the end-users. + +Read the `Privacy Notice `_ to learn more about it. + .. Tracking pixel for Scarf .. raw:: html diff --git a/tests/listeners/test_dag_run_listener.py b/tests/listeners/test_dag_run_listener.py new file mode 100644 index 000000000..a547f20ad --- /dev/null +++ b/tests/listeners/test_dag_run_listener.py @@ -0,0 +1,127 @@ +import logging +import uuid +from datetime import datetime +from pathlib import Path +from unittest.mock import patch + +import pytest +from airflow.models import DAG +from airflow.utils.state import State + +from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig +from cosmos.airflow.dag import DbtDag +from cosmos.airflow.task_group import DbtTaskGroup +from cosmos.listeners.dag_run_listener import on_dag_run_failed, on_dag_run_success, total_cosmos_tasks +from cosmos.profiles import PostgresUserPasswordProfileMapping + +DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt" +DBT_PROJECT_NAME = "jaffle_shop" + +profile_config = ProfileConfig( + profile_name="default", + target_name="dev", + profile_mapping=PostgresUserPasswordProfileMapping( + conn_id="example_conn", + profile_args={"schema": "public"}, + disable_event_tracking=True, + ), +) + + +@pytest.mark.integration +def test_is_cosmos_dag_is_true(): + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + assert total_cosmos_tasks(dag) == 13 + + +@pytest.mark.integration +def test_total_cosmos_tasks_in_task_group(): + with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: + _ = DbtTaskGroup( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + ) + + assert total_cosmos_tasks(dag) == 13 + + +def test_total_cosmos_tasks_is_one(): + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + run_operator = DbtRunLocalOperator( + profile_config=profile_config, + project_dir=DBT_ROOT_PATH / "jaffle_shop", + task_id="run", + install_deps=True, + append_env=True, + ) + run_operator + + assert total_cosmos_tasks(dag) == 1 + + +def test_not_cosmos_dag(): + + with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag: + pass + + assert total_cosmos_tasks(dag) == 0 + + +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_success(mock_emit_usage_metrics_if_enabled, caplog): + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + run_id = str(uuid.uuid1()) + dag_run = dag.create_dagrun( + state=State.NONE, + run_id=run_id, + ) + + on_dag_run_success(dag_run, msg="test success") + assert "Running on_dag_run_success" in caplog.text + assert "Completed on_dag_run_success" in caplog.text + assert mock_emit_usage_metrics_if_enabled.call_count == 1 + + +@pytest.mark.integration +@patch("cosmos.listeners.dag_run_listener.telemetry.emit_usage_metrics_if_enabled") +def test_on_dag_run_failed(mock_emit_usage_metrics_if_enabled, caplog): + caplog.set_level(logging.DEBUG) + + dag = DbtDag( + project_config=ProjectConfig( + DBT_ROOT_PATH / "jaffle_shop", + ), + profile_config=profile_config, + start_date=datetime(2023, 1, 1), + dag_id="basic_cosmos_dag", + ) + run_id = str(uuid.uuid1()) + dag_run = dag.create_dagrun( + state=State.FAILED, + run_id=run_id, + ) + + on_dag_run_failed(dag_run, msg="test failed") + assert "Running on_dag_run_failed" in caplog.text + assert "Completed on_dag_run_failed" in caplog.text + assert mock_emit_usage_metrics_if_enabled.call_count == 1 diff --git a/tests/test_telemetry.py b/tests/test_telemetry.py new file mode 100644 index 000000000..b11caabe1 --- /dev/null +++ b/tests/test_telemetry.py @@ -0,0 +1,115 @@ +import logging +from unittest.mock import patch + +import pytest + +from cosmos import telemetry + + +def test_should_emit_is_true_by_default(): + assert telemetry.should_emit() + + +@patch("cosmos.settings.enable_telemetry", True) +def test_should_emit_is_true_when_only_enable_telemetry_is_true(): + assert telemetry.should_emit() + + +@patch("cosmos.settings.do_not_track", True) +def test_should_emit_is_false_when_do_not_track(): + assert not telemetry.should_emit() + + +@patch("cosmos.settings.no_analytics", True) +def test_should_emit_is_false_when_no_analytics(): + assert not telemetry.should_emit() + + +def test_collect_standard_usage_metrics(): + metrics = telemetry.collect_standard_usage_metrics() + expected_keys = [ + "airflow_version", + "cosmos_version", + "platform_machine", + "platform_system", + "python_version", + "variables", + ] + assert sorted(metrics.keys()) == expected_keys + + +class MockFailedResponse: + is_success = False + status_code = "404" + text = "Non existent URL" + + +@patch("cosmos.telemetry.httpx.get", return_value=MockFailedResponse()) +def test_emit_usage_metrics_fails(mock_httpx_get, caplog): + sample_metrics = { + "cosmos_version": "1.8.0a4", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "event_type": "dag_run", + "status": "success", + "dag_hash": "d151d1fa2f03270ea116cc7494f2c591", + "task_count": 3, + "cosmos_task_count": 3, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + mock_httpx_get.assert_called_once_with( + f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3""", + timeout=1.0, + follow_redirects=True, + ) + assert not is_success + log_msg = f"""Unable to emit usage metrics to https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/3/3. Status code: 404. Message: Non existent URL""" + assert caplog.text.startswith("WARNING") + assert log_msg in caplog.text + + +@pytest.mark.integration +def test_emit_usage_metrics_succeeds(caplog): + caplog.set_level(logging.DEBUG) + sample_metrics = { + "cosmos_version": "1.8.0a4", + "airflow_version": "2.10.1", + "python_version": "3.11", + "platform_system": "darwin", + "platform_machine": "amd64", + "event_type": "dag_run", + "status": "success", + "dag_hash": "dag-hash-ci", + "task_count": 33, + "cosmos_task_count": 33, + } + is_success = telemetry.emit_usage_metrics(sample_metrics) + assert is_success + assert caplog.text.startswith("DEBUG") + assert "Telemetry is enabled. Emitting the following usage metrics to" in caplog.text + + +@patch("cosmos.telemetry.should_emit", return_value=False) +def test_emit_usage_metrics_if_enabled_fails(mock_should_emit, caplog): + caplog.set_level(logging.DEBUG) + assert not telemetry.emit_usage_metrics_if_enabled("any", {}) + assert caplog.text.startswith("DEBUG") + assert "Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True." in caplog.text + + +@patch("cosmos.telemetry.should_emit", return_value=True) +@patch("cosmos.telemetry.collect_standard_usage_metrics", return_value={"k1": "v1", "k2": "v2", "variables": {}}) +@patch("cosmos.telemetry.emit_usage_metrics") +def test_emit_usage_metrics_if_enabled_succeeds( + mock_emit_usage_metrics, mock_collect_standard_usage_metrics, mock_should_emit +): + assert telemetry.emit_usage_metrics_if_enabled("any", {"k2": "v2"}) + mock_emit_usage_metrics.assert_called_once() + assert mock_emit_usage_metrics.call_args.args[0] == { + "k1": "v1", + "k2": "v2", + "event_type": "any", + "variables": {"k2": "v2"}, + }