Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit telemetry to Scarf during DAG run #1397

Merged
merged 46 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3eb5d08
Add telemetry module and tests
tatiana Dec 17, 2024
6e0c6f5
Emit telemetry via Airflow listener
tatiana Dec 17, 2024
c124b79
Change version
tatiana Dec 18, 2024
ee8e928
Try to fix version issue in CI when running hatch type-check
tatiana Dec 18, 2024
bbcce4a
Fix unit test
tatiana Dec 18, 2024
c484e50
Fix unit test
tatiana Dec 18, 2024
c7d282f
Expose more metrics in Scarf
tatiana Dec 19, 2024
e460db2
Fix spelling
tatiana Dec 19, 2024
0300809
Update cosmos/constants.py
tatiana Dec 19, 2024
e8c1a26
Add docs about telemetry
tatiana Dec 19, 2024
44387b3
Update cosmos/__init__.py
tatiana Dec 19, 2024
3ba1580
Add debug messages
tatiana Dec 19, 2024
79650e4
Add more logs
tatiana Dec 19, 2024
4de8c62
Fix import
tatiana Dec 19, 2024
5b0207a
Change version for testing
tatiana Dec 19, 2024
a84dc7c
Add more logs to troubleshoot
tatiana Dec 19, 2024
585f209
Bump the version to see changes in scarf
tatiana Dec 19, 2024
aac353e
Stop using context manager to measure time of emitting telemetry
tatiana Dec 19, 2024
a851920
troubleshot
tatiana Dec 19, 2024
adc54d1
Troubleshoot
tatiana Dec 19, 2024
1de1083
Upgrade telemetry
tatiana Dec 19, 2024
15c1688
Improve test coverage
tatiana Dec 19, 2024
8919ef0
Fix unit test
tatiana Dec 19, 2024
aaa19da
Change log levels to analyse behaviour in Astro CLI
tatiana Dec 20, 2024
fc7b502
Change logger
tatiana Dec 20, 2024
a1d63ee
Add more logs
tatiana Dec 20, 2024
7ef47f5
improve logs
tatiana Dec 20, 2024
2f3ddb7
Log more info
tatiana Dec 20, 2024
940a1a2
add more logs
tatiana Dec 20, 2024
7661ab3
Add logs
tatiana Dec 20, 2024
7ae6152
Force to fetch the actual dag, add more debug logs
tatiana Dec 20, 2024
5caf6a8
Force to fetch the actual dag, add more debug logs
tatiana Dec 20, 2024
430aec3
Force to fetch the actual dag, add more debug logs
tatiana Dec 20, 2024
bc98126
Force to fetch the actual dag, add more debug logs
tatiana Dec 20, 2024
41a9293
debugging
tatiana Dec 20, 2024
47fb46c
Attempt to work around RuntimeError: UNEXPECTED COMMIT - THIS WILL BR…
tatiana Dec 20, 2024
6dac09a
Remove DagBag from listener
tatiana Dec 20, 2024
f06f86a
Change logic to iterate over dag tasks
tatiana Dec 20, 2024
5b4fda2
Change logic to iterate over dag tasks
tatiana Dec 20, 2024
93da409
Improve identifying COsmos task class
tatiana Dec 20, 2024
ac93714
Clean up working telemetry
tatiana Dec 20, 2024
07752ac
Update privacy notice
tatiana Dec 20, 2024
7dc9f96
Fix privacy notice
tatiana Dec 20, 2024
f2aa9c7
Update cosmos/__init__.py
tatiana Dec 20, 2024
574c3ba
Update test_performance.py
tatiana Dec 20, 2024
c57f884
🎨 [pre-commit.ci] Auto format from pre-commit.com hooks
pre-commit-ci[bot] Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions PRIVACY_NOTICE.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Privacy Notice
==============

This project follows the `Privacy Policy of Astronomer <https://www.astronomer.io/privacy/>`_.

Collection of Data
------------------

Astronomer Cosmos integrates `Scarf <https://about.scarf.sh/>`_ to collect basic telemetry data during operation.
This data assists the project maintainers in better understanding how Cosmos is used.
Insights gained from this telemetry are critical for prioritizing patches, minor releases, and
security fixes. Additionally, this information supports key decisions related to the development road map.

Deployments and individual users can opt-out of analytics by setting the configuration:


.. code-block::

[cosmos] enable_telemetry False


As described in the `official documentation <https://docs.scarf.sh/gateway/#do-not-track>`_, it is also possible to opt out by setting one of the following environment variables:

.. code-block::

DO_NOT_TRACK=True
SCARF_NO_ANALYTICS=True


In addition to Scarf's default data collection, Cosmos collect the following information when running Cosmos-powered DAGs:

- Cosmos version
- Airflow version
- Python version
- Operating system & machine architecture
- Event type
- The DAG hash
- Total tasks
- Total Cosmos tasks

No user-identifiable information (IP included) is stored in Scarf.
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ _______
Privacy Notice
______________

This project follows `Astronomer's Privacy Policy <https://www.astronomer.io/privacy/>`_
The application and this website collect telemetry to support the project's development. These can be disabled by the end-users.

Read the `Privacy Notice <https://github.com/astronomer/astronomer-cosmos/blob/main/PRIVACY_NOTICE.rst>`_ to learn more about it.

.. Tracking pixel for Scarf

Expand Down
4 changes: 4 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,7 @@ def _missing_value_(cls, value): # type: ignore
TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED}

DBT_COMPILE_TASK_ID = "dbt_compile"

TELEMETRY_URL = "https://astronomer.gateway.scarf.sh/astronomer-cosmos/{telemetry_version}/{cosmos_version}/{airflow_version}/{python_version}/{platform_system}/{platform_machine}/{event_type}/{status}/{dag_hash}/{task_count}/{cosmos_task_count}"
TELEMETRY_VERSION = "v1"
TELEMETRY_TIMEOUT = 1.0
Empty file added cosmos/listeners/__init__.py
Empty file.
84 changes: 84 additions & 0 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from __future__ import annotations

from airflow.listeners import hookimpl
from airflow.models.dag import DAG
from airflow.models.dagrun import DagRun

from cosmos import telemetry
from cosmos.log import get_logger

logger = get_logger(__name__)


class EventStatus:
SUCCESS = "success"
FAILED = "failed"


DAG_RUN = "dag_run"


def total_cosmos_tasks(dag: DAG) -> int:
"""
Identify if there are any Cosmos DAGs on a given serialized `airflow.serialization.serialized_objects.SerializedDAG`.

The approach is naive, from the perspective it does not take into account subclasses, but it is inexpensive and
works.
"""
cosmos_tasks = 0
for task in dag.task_dict.values():
# In a real Airflow deployment, the following `task` is an instance of
# `airflow.serialization.serialized_objects.SerializedBaseOperator`
# and the only reference to Cosmos is in the _task_module.
# It is suboptimal, but works as of Airflow 2.10
task_module = getattr(task, "_task_module", None) or task.__class__.__module__
if task_module.startswith("cosmos."):
cosmos_tasks += 1
return cosmos_tasks


# @provide_session
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str) -> None:
logger.debug("Running on_dag_run_success")
# In a real Airflow deployment, the following `serialized_dag` is an instance of
# `airflow.serialization.serialized_objects.SerializedDAG`
# and it is not a subclass of DbtDag, nor contain any references to Cosmos
serialized_dag = dag_run.get_dag()

if not total_cosmos_tasks(serialized_dag):
logger.debug("The DAG does not use Cosmos")
return

additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.SUCCESS,
"task_count": len(serialized_dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(serialized_dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
logger.debug("Completed on_dag_run_success")


@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str) -> None:
logger.debug("Running on_dag_run_failed")
# In a real Airflow deployment, the following `serialized_dag` is an instance of
# `airflow.serialization.serialized_objects.SerializedDAG`
# and it is not a subclass of DbtDag, nor contain any references to Cosmos
serialized_dag = dag_run.get_dag()

if not total_cosmos_tasks(serialized_dag):
logger.debug("The DAG does not use Cosmos")
return

additional_telemetry_metrics = {
"dag_hash": dag_run.dag_hash,
"status": EventStatus.FAILED,
"task_count": len(serialized_dag.task_ids),
"cosmos_task_count": total_cosmos_tasks(serialized_dag),
}

telemetry.emit_usage_metrics_if_enabled(DAG_RUN, additional_telemetry_metrics)
logger.debug("Completed on_dag_run_failed")
2 changes: 2 additions & 0 deletions cosmos/plugin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from flask import abort, url_for
from flask_appbuilder import AppBuilder, expose

from cosmos.listeners import dag_run_listener
from cosmos.settings import dbt_docs_conn_id, dbt_docs_dir, dbt_docs_index_file_name, in_astro_cloud

if in_astro_cloud:
Expand Down Expand Up @@ -269,3 +270,4 @@ class CosmosPlugin(AirflowPlugin):
"href": conf.get("webserver", "base_url") + "/cosmos/dbt_docs",
}
appbuilder_views = [item]
listeners = [dag_run_listener]
22 changes: 19 additions & 3 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,28 @@
remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None)
remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")

# The following environment variable is populated in Astro Cloud
in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")

# The following environment variable is populated in Astro Cloud
in_astro_cloud = os.getenv("ASTRONOMER_ENVIRONMENT") == "cloud"
def convert_to_boolean(value: str | None) -> bool:
"""
Convert a string that represents a boolean to a Python boolean.
"""
value = str(value).lower().strip()
if value in ("f", "false", "0", "", "none"):
return False
return True


# Telemetry-related settings
enable_telemetry = conf.getboolean("cosmos", "enable_telemetry", fallback=True)
do_not_track = convert_to_boolean(os.getenv("DO_NOT_TRACK"))
no_analytics = convert_to_boolean(os.getenv("SCARF_NO_ANALYTICS"))
77 changes: 77 additions & 0 deletions cosmos/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import platform
from urllib import parse
from urllib.parse import urlencode

import httpx
from airflow import __version__ as airflow_version

import cosmos
from cosmos import constants, settings
from cosmos.log import get_logger

logger = get_logger(__name__)


def should_emit() -> bool:
"""
Identify if telemetry metrics should be emitted or not.
"""
return settings.enable_telemetry and not settings.do_not_track and not settings.no_analytics


def collect_standard_usage_metrics() -> dict[str, object]:
"""
Return standard telemetry metrics.
"""
metrics = {
"cosmos_version": cosmos.__version__, # type: ignore[attr-defined]
tatiana marked this conversation as resolved.
Show resolved Hide resolved
"airflow_version": parse.quote(airflow_version),
"python_version": platform.python_version(),
"platform_system": platform.system(),
"platform_machine": platform.machine(),
"variables": {},
}
return metrics


def emit_usage_metrics(metrics: dict[str, object]) -> bool:
"""
Emit desired telemetry metrics to remote telemetry endpoint.

The metrics must contain the necessary fields to build the TELEMETRY_URL.
"""
query_string = urlencode(metrics)
telemetry_url = constants.TELEMETRY_URL.format(
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logger.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
if not response.is_success:
logger.warning(
"Unable to emit usage metrics to %s. Status code: %s. Message: %s",
telemetry_url,
response.status_code,
response.text,
)
return response.is_success


def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str, object]) -> bool:
"""
Checks if telemetry should be emitted, fetch standard metrics, complement with custom metrics
and emit them to remote telemetry endpoint.

:returns: If the event was successfully sent to the telemetry backend or not.
"""
if should_emit():
metrics = collect_standard_usage_metrics()
metrics["event_type"] = event_type
metrics["variables"].update(additional_metrics) # type: ignore[attr-defined]
metrics.update(additional_metrics)
pankajkoti marked this conversation as resolved.
Show resolved Hide resolved
is_success = emit_usage_metrics(metrics)
return is_success
else:
logger.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.")
return False
6 changes: 5 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,14 @@ _______

`Apache License 2.0 <https://github.com/astronomer/astronomer-cosmos/blob/main/LICENSE>`_


Privacy Notice
______________

This project follows `Astronomer's Privacy Policy <https://www.astronomer.io/privacy/>`_
The application and this website collect telemetry to support the project's development. These can be disabled by the end-users.

Read the `Privacy Notice <https://github.com/astronomer/astronomer-cosmos/blob/main/PRIVACY_NOTICE.rst>`_ to learn more about it.


.. Tracking pixel for Scarf
.. raw:: html
Expand Down
Loading
Loading