Skip to content

Commit

Permalink
Improve test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Dec 19, 2024
1 parent 69cc194 commit f380cd3
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 17 deletions.
16 changes: 2 additions & 14 deletions cosmos/listeners/dag_run_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,16 @@ class EventStatus:


def is_cosmos_dag(dag: DAG) -> bool:
import inspect

from cosmos.airflow.dag import DbtDag

dag_class = dag.__class__
dag_module = inspect.getmodule(dag_class)

logger.info(
f"is_cosmos_dag ({dag}, {DbtDag}, {dag_class and dag_class.__name__}, {dag_module and dag_module.__name__}): {isinstance(dag, DbtDag)}"
)
return True
if isinstance(dag, DbtDag):
if dag.__class__.__module__.startswith("cosmos."):
return True
return False


def total_cosmos_task_groups(dag: DAG) -> int:
from cosmos.airflow.task_group import DbtTaskGroup

cosmos_task_groups = 0
for group_id, task_group in dag.task_group_dict.items():
if isinstance(task_group, DbtTaskGroup):
if task_group.__class__.__module__.startswith("cosmos."):
cosmos_task_groups += 1
return cosmos_task_groups

Expand Down
4 changes: 2 additions & 2 deletions cosmos/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def emit_usage_metrics(metrics: dict[str, object]) -> bool:
telemetry_url = constants.TELEMETRY_URL.format(
**metrics, telemetry_version=constants.TELEMETRY_VERSION, query_string=query_string
)
logging.info("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
logging.debug("Telemetry is enabled. Emitting the following usage metrics to %s: %s", telemetry_url, metrics)
response = httpx.get(telemetry_url, timeout=constants.TELEMETRY_TIMEOUT, follow_redirects=True)
if not response.is_success:
logging.warning(
Expand All @@ -71,5 +71,5 @@ def emit_usage_metrics_if_enabled(event_type: str, additional_metrics: dict[str,
is_success = emit_usage_metrics(metrics)
return is_success
else:
logging.info("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.")
logging.debug("Telemetry is disabled. To enable it, export AIRFLOW__COSMOS__ENABLE_TELEMETRY=True.")
return False
94 changes: 94 additions & 0 deletions tests/listeners/test_dag_run_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from datetime import datetime
from pathlib import Path

from airflow.models import DAG

from cosmos import DbtRunLocalOperator, ProfileConfig, ProjectConfig
from cosmos.airflow.dag import DbtDag
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.listeners.dag_run_listener import is_cosmos_dag, total_cosmos_task_groups, total_cosmos_tasks, uses_cosmos
from cosmos.profiles import PostgresUserPasswordProfileMapping

DBT_ROOT_PATH = Path(__file__).parent.parent.parent / "dev/dags/dbt"
DBT_PROJECT_NAME = "jaffle_shop"

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="example_conn",
profile_args={"schema": "public"},
disable_event_tracking=True,
),
)


def test_is_cosmos_dag_is_true():
dag = DbtDag(
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
start_date=datetime(2023, 1, 1),
dag_id="basic_cosmos_dag",
)

assert is_cosmos_dag(dag)
assert total_cosmos_task_groups(dag) == 0
assert uses_cosmos(dag)


def test_total_cosmos_task_groups():
with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag:
_ = DbtTaskGroup(
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
)

assert not is_cosmos_dag(dag)
assert total_cosmos_task_groups(dag) == 1
assert uses_cosmos(dag)


def test_total_cosmos_tasks_in_task_group():
with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag:
_ = DbtTaskGroup(
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
)

assert total_cosmos_tasks(dag) == 13
assert uses_cosmos(dag)


def test_total_cosmos_tasks_is_one():

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_ROOT_PATH / "jaffle_shop",
task_id="run",
install_deps=True,
append_env=True,
)
run_operator

assert not is_cosmos_dag(dag)
assert total_cosmos_task_groups(dag) == 0
assert total_cosmos_tasks(dag) == 1
assert uses_cosmos(dag)


def test_not_cosmos_dag():

with DAG("test-id-1", start_date=datetime(2022, 1, 1)) as dag:
pass

assert not is_cosmos_dag(dag)
assert total_cosmos_task_groups(dag) == 0
assert total_cosmos_tasks(dag) == 0
assert not uses_cosmos(dag)
2 changes: 1 addition & 1 deletion tests/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_emit_usage_metrics_fails(mock_httpx_get, caplog):
is_success = telemetry.emit_usage_metrics(sample_metrics)
mock_httpx_get.assert_called_once_with(
f"""https://astronomer.gateway.scarf.sh/astronomer-cosmos/v1/1.8.0a4/2.10.1/3.11/darwin/amd64/dag_run/success/d151d1fa2f03270ea116cc7494f2c591/True/0/3/3""",
timeout=5.0,
timeout=1.0,
follow_redirects=True,
)
assert not is_success
Expand Down

0 comments on commit f380cd3

Please sign in to comment.